杭州政府网站建设/重庆seo关键词优化服务
如果大家对kubernetes组件以及架构相关分析,可以看我之前的源码阅读,今天只从一个函数分析容器的启动过程,这个函数就是SyncPod,这个是创建kubelet里面最核心的一个函数了。这个方法分为五步:
// 1. Compute sandbox and container changes.
// 2. Kill pod sandbox if necessary.
// 3. Kill any containers that should not be running.
// 4. Create sandbox if necessary.
// 5. Create init containers.
// 6. Create normal containers.
下面就逐一讲解:
1.计算容器的变化
这部分代码主要完成容器状态的变化统计,就是确定哪些容器要创建,哪些容器要删除。
// Step 1: Compute sandbox and container changes.podContainerChanges := m.computePodContainerChanges(pod, podStatus)glog.V(3).Infof("computePodContainerChanges got %+v for pod %q", podContainerChanges, format.Pod(pod))if podContainerChanges.CreateSandbox {ref, err := ref.GetReference(api.Scheme, pod)if err != nil {glog.Errorf("Couldn't make a ref to pod %q: '%v'", format.Pod(pod), err)}if podContainerChanges.SandboxID != "" {m.recorder.Eventf(ref, v1.EventTypeNormal, "SandboxChanged", "Pod sandbox changed, it will be killed and re-created.")} else {glog.V(4).Infof("SyncPod received new pod %q, will create a new sandbox for it", format.Pod(pod))}}
具体实现看m.computePodContainerChanges,如下:
func (m *kubeGenericRuntimeManager) computePodContainerChanges(pod *v1.Pod, podStatus *kubecontainer.PodStatus) podContainerSpecChanges {glog.V(5).Infof("Syncing Pod %q: %+v", format.Pod(pod), pod)sandboxChanged, attempt, sandboxID := m.podSandboxChanged(pod, podStatus)changes := podContainerSpecChanges{CreateSandbox: sandboxChanged,SandboxID: sandboxID,Attempt: attempt,ContainersToStart: make(map[int]string),ContainersToKeep: make(map[kubecontainer.ContainerID]int),InitContainersToKeep: make(map[kubecontainer.ContainerID]int),ContainersToKill: make(map[kubecontainer.ContainerID]containerToKillInfo),}// check the status of init containers.initFailed := false// always reset the init containers if the sandbox is changed.if !sandboxChanged {// Keep all successfully completed containers. If there are failing containers,// only keep the first failing one.initFailed = checkAndKeepInitContainers(pod, podStatus, changes.InitContainersToKeep)}changes.InitFailed = initFailed// check the status of containers.for index, container := range pod.Spec.Containers {containerStatus := podStatus.FindContainerStatusByName(container.Name)if containerStatus == nil || containerStatus.State != kubecontainer.ContainerStateRunning {if kubecontainer.ShouldContainerBeRestarted(&container, pod, podStatus) {message := fmt.Sprintf("Container %+v is dead, but RestartPolicy says that we should restart it.", container)glog.Info(message)changes.ContainersToStart[index] = message}continue}if sandboxChanged {if pod.Spec.RestartPolicy != v1.RestartPolicyNever {message := fmt.Sprintf("Container %+v's pod sandbox is dead, the container will be recreated.", container)glog.Info(message)changes.ContainersToStart[index] = message}continue}if initFailed {// Initialization failed and Container exists.// If we have an initialization failure everything will be killed anyway.// If RestartPolicy is Always or OnFailure we restart containers that were running before.if pod.Spec.RestartPolicy != v1.RestartPolicyNever {message := fmt.Sprintf("Failed to initialize pod. %q will be restarted.", container.Name)glog.V(1).Info(message)changes.ContainersToStart[index] = message}continue}expectedHash := kubecontainer.HashContainer(&container)containerChanged := containerStatus.Hash != expectedHashif containerChanged {message := fmt.Sprintf("Pod %q container %q hash changed (%d vs %d), it will be killed and re-created.",pod.Name, container.Name, containerStatus.Hash, expectedHash)glog.Info(message)changes.ContainersToStart[index] = messagecontinue}liveness, found := m.livenessManager.Get(containerStatus.ID)if !found || liveness == proberesults.Success {changes.ContainersToKeep[containerStatus.ID] = indexcontinue}if pod.Spec.RestartPolicy != v1.RestartPolicyNever {message := fmt.Sprintf("pod %q container %q is unhealthy, it will be killed and re-created.", format.Pod(pod), container.Name)glog.Info(message)changes.ContainersToStart[index] = message}}// Don't keep init containers if they are the only containers to keep.if !sandboxChanged && len(changes.ContainersToStart) == 0 && len(changes.ContainersToKeep) == 0 {changes.InitContainersToKeep = make(map[kubecontainer.ContainerID]int)}// compute containers to be killedrunningContainerStatuses := podStatus.GetRunningContainerStatuses()for _, containerStatus := range runningContainerStatuses {_, keep := changes.ContainersToKeep[containerStatus.ID]_, keepInit := changes.InitContainersToKeep[containerStatus.ID]if !keep && !keepInit {var podContainer *v1.Containervar killMessage stringfor i, c := range pod.Spec.Containers {if c.Name == containerStatus.Name {podContainer = &pod.Spec.Containers[i]killMessage = changes.ContainersToStart[i]break}}changes.ContainersToKill[containerStatus.ID] = containerToKillInfo{name: containerStatus.Name,container: podContainer,message: killMessage,}}}return changes
}
计算出哪些容器要启动放到ContainersToStart这个管道里面,哪些容器要删除放到ContainersToKill这个管道里面。
2.删除sandbox变化的pod
这个主要是当沙箱变化的时候,需要重建pod,譬如切换了pause镜像,就会触发个操作
// Step 2: Kill the pod if the sandbox has changed.if podContainerChanges.CreateSandbox || (len(podContainerChanges.ContainersToKeep) == 0 && len(podContainerChanges.ContainersToStart) == 0) {if len(podContainerChanges.ContainersToKeep) == 0 && len(podContainerChanges.ContainersToStart) == 0 {glog.V(4).Infof("Stopping PodSandbox for %q because all other containers are dead.", format.Pod(pod))} else {glog.V(4).Infof("Stopping PodSandbox for %q, will start new one", format.Pod(pod))}killResult := m.killPodWithSyncResult(pod, kubecontainer.ConvertPodStatusToRunningPod(m.runtimeName, podStatus), nil)result.AddPodSyncResult(killResult)if killResult.Error() != nil {glog.Errorf("killPodWithSyncResult failed: %v", killResult.Error())return}}
3. 删除不需要运行的容器
当kubernetes执行删除pod的操作时候,有些pods是没必要存在的。
// Step 3: kill any running containers in this pod which are not to keep.for containerID, containerInfo := range podContainerChanges.ContainersToKill {glog.V(3).Infof("Killing unwanted container %q(id=%q) for pod %q", containerInfo.name, containerID, format.Pod(pod))killContainerResult := kubecontainer.NewSyncResult(kubecontainer.KillContainer, containerInfo.name)result.AddSyncResult(killContainerResult)if err := m.killContainer(pod, containerID, containerInfo.name, containerInfo.message, nil); err != nil {killContainerResult.Fail(kubecontainer.ErrKillContainer, err.Error())glog.Errorf("killContainer %q(id=%q) for pod %q failed: %v", containerInfo.name, containerID, format.Pod(pod), err)return}}
下面是调用具体的容器运行时,去删除
func (m *kubeGenericRuntimeManager) killContainer(pod *v1.Pod, containerID kubecontainer.ContainerID, containerName string, reason string, gracePeriodOverride *int64) error {var containerSpec *v1.Containerif pod != nil {containerSpec = kubecontainer.GetContainerSpec(pod, containerName)} else {// Restore necessary information if one of the specs is nil.restoredPod, restoredContainer, err := m.restoreSpecsFromContainerLabels(containerID)if err != nil {return err}pod, containerSpec = restoredPod, restoredContainer}// From this point , pod and container must be non-nil.gracePeriod := int64(minimumGracePeriodInSeconds)switch {case pod.DeletionGracePeriodSeconds != nil:gracePeriod = *pod.DeletionGracePeriodSecondscase pod.Spec.TerminationGracePeriodSeconds != nil:gracePeriod = *pod.Spec.TerminationGracePeriodSeconds}glog.V(2).Infof("Killing container %q with %d second grace period", containerID.String(), gracePeriod)// Run the pre-stop lifecycle hooks if applicable.if containerSpec.Lifecycle != nil && containerSpec.Lifecycle.PreStop != nil {gracePeriod = gracePeriod - m.executePreStopHook(pod, containerID, containerSpec, gracePeriod)}// always give containers a minimal shutdown window to avoid unnecessary SIGKILLsif gracePeriod < minimumGracePeriodInSeconds {gracePeriod = minimumGracePeriodInSeconds}if gracePeriodOverride != nil {gracePeriod = *gracePeriodOverrideglog.V(3).Infof("Killing container %q, but using %d second grace period override", containerID, gracePeriod)}err := m.runtimeService.StopContainer(containerID.ID, gracePeriod)if err != nil {glog.Errorf("Container %q termination failed with gracePeriod %d: %v", containerID.String(), gracePeriod, err)} else {glog.V(3).Infof("Container %q exited normally", containerID.String())}message := fmt.Sprintf("Killing container with id %s", containerID.String())if reason != "" {message = fmt.Sprint(message, ":", reason)}m.generateContainerEvent(containerID, v1.EventTypeNormal, events.KillingContainer, message)m.containerRefManager.ClearRef(containerID)return err
m.runtimeService.StopContainer带着优雅关闭时间的停止container,每种容器都是实现StopContainer这个接口,这个在kubelet启动的时候初始化的,之前代码已经分析过了,不再赘述。
4.创建沙箱
其实kubelet之所以引入沙箱,是想建立一个容器标准,这里可以简单理解成那个pause容器。所有的网络都是挂在这个基础容器里面。
// Step 4: Create a sandbox for the pod if necessary.podSandboxID := podContainerChanges.SandboxIDif podContainerChanges.CreateSandbox && len(podContainerChanges.ContainersToStart) > 0 {var msg stringvar err errorglog.V(4).Infof("Creating sandbox for pod %q", format.Pod(pod))createSandboxResult := kubecontainer.NewSyncResult(kubecontainer.CreatePodSandbox, format.Pod(pod))result.AddSyncResult(createSandboxResult)podSandboxID, msg, err = m.createPodSandbox(pod, podContainerChanges.Attempt)if err != nil {createSandboxResult.Fail(kubecontainer.ErrCreatePodSandbox, msg)glog.Errorf("createPodSandbox for pod %q failed: %v", format.Pod(pod), err)return}glog.V(4).Infof("Created PodSandbox %q for pod %q", podSandboxID, format.Pod(pod))podSandboxStatus, err := m.runtimeService.PodSandboxStatus(podSandboxID)if err != nil {glog.Errorf("Failed to get pod sandbox status: %v; Skipping pod %q", err, format.Pod(pod))result.Fail(err)return}// If we ever allow updating a pod from non-host-network to// host-network, we may use a stale IP.if !kubecontainer.IsHostNetworkPod(pod) {// Overwrite the podIP passed in the pod status, since we just started the pod sandbox.podIP = m.determinePodSandboxIP(pod.Namespace, pod.Name, podSandboxStatus)glog.V(4).Infof("Determined the ip %q for pod %q after sandbox changed", podIP, format.Pod(pod))}}
和上面删除的方法一样,调用相应的容器运行时去完成操作
func (m *kubeGenericRuntimeManager) createPodSandbox(pod *v1.Pod, attempt uint32) (string, string, error) {podSandboxConfig, err := m.generatePodSandboxConfig(pod, attempt)if err != nil {message := fmt.Sprintf("GeneratePodSandboxConfig for pod %q failed: %v", format.Pod(pod), err)glog.Error(message)return "", message, err}// Create pod logs directoryerr = m.osInterface.MkdirAll(podSandboxConfig.LogDirectory, 0755)if err != nil {message := fmt.Sprintf("Create pod log directory for pod %q failed: %v", format.Pod(pod), err)glog.Errorf(message)return "", message, err}podSandBoxID, err := m.runtimeService.RunPodSandbox(podSandboxConfig)if err != nil {message := fmt.Sprintf("CreatePodSandbox for pod %q failed: %v", format.Pod(pod), err)glog.Error(message)return "", message, err}return podSandBoxID, "", nil
}
如果是docker就在pkg/kubelet/dockershim/docker_sandbox.go这个里面RunPodSandbox实现
无非是先拉镜像然后启动容器,只有一个cni的部分需要注意,这个在以后的代码分析中会介绍。言归正传,创建完基础容器后
5.启动init容器
init容器是为业务容器做初始化工作的,譬如可以预先从网络上面加载一些动态资源
// Step 5: start init containers.status, next, done := findNextInitContainerToRun(pod, podStatus)if status != nil && status.ExitCode != 0 {// container initialization has failed, flag the pod as failedinitContainerResult := kubecontainer.NewSyncResult(kubecontainer.InitContainer, status.Name)initContainerResult.Fail(kubecontainer.ErrRunInitContainer, fmt.Sprintf("init container %q exited with %d", status.Name, status.ExitCode))result.AddSyncResult(initContainerResult)if pod.Spec.RestartPolicy == v1.RestartPolicyNever {utilruntime.HandleError(fmt.Errorf("error running pod %q init container %q, restart=Never: %#v", format.Pod(pod), status.Name, status))return}utilruntime.HandleError(fmt.Errorf("Error running pod %q init container %q, restarting: %#v", format.Pod(pod), status.Name, status))}if next != nil {if len(podContainerChanges.ContainersToStart) == 0 {glog.V(4).Infof("No containers to start, stopping at init container %+v in pod %v", next.Name, format.Pod(pod))return}// If we need to start the next container, do so now then exitcontainer := nextstartContainerResult := kubecontainer.NewSyncResult(kubecontainer.StartContainer, container.Name)result.AddSyncResult(startContainerResult)isInBackOff, msg, err := m.doBackOff(pod, container, podStatus, backOff)if isInBackOff {startContainerResult.Fail(err, msg)glog.V(4).Infof("Backing Off restarting init container %+v in pod %v", container, format.Pod(pod))return}glog.V(4).Infof("Creating init container %+v in pod %v", container, format.Pod(pod))if msg, err := m.startContainer(podSandboxID, podSandboxConfig, container, pod, podStatus, pullSecrets, podIP); err != nil {startContainerResult.Fail(err, msg)utilruntime.HandleError(fmt.Errorf("init container start failed: %v: %s", err, msg))return}// Successfully started the container; clear the entry in the failureglog.V(4).Infof("Completed init container %q for pod %q", container.Name, format.Pod(pod))return}if !done {// init container still runningglog.V(4).Infof("An init container is still running in pod %v", format.Pod(pod))return}if podContainerChanges.InitFailed {glog.V(4).Infof("Not all init containers have succeeded for pod %v", format.Pod(pod))return}
方法通过m.startContainer启动initcontainer。
6.启动业务容器
到这一步才是启动客户自己定义的容器。
// Step 6: start containers in podContainerChanges.ContainersToStart.for idx := range podContainerChanges.ContainersToStart {container := &pod.Spec.Containers[idx]startContainerResult := kubecontainer.NewSyncResult(kubecontainer.StartContainer, container.Name)result.AddSyncResult(startContainerResult)isInBackOff, msg, err := m.doBackOff(pod, container, podStatus, backOff)if isInBackOff {startContainerResult.Fail(err, msg)glog.V(4).Infof("Backing Off restarting container %+v in pod %v", container, format.Pod(pod))continue}glog.V(4).Infof("Creating container %+v in pod %v", container, format.Pod(pod))if msg, err := m.startContainer(podSandboxID, podSandboxConfig, container, pod, podStatus, pullSecrets, podIP); err != nil {startContainerResult.Fail(err, msg)utilruntime.HandleError(fmt.Errorf("container start failed: %v: %s", err, msg))continue}}
就是通过读取podContainerChanges.ContainersToStart管道里面,需要启动的容器,然后for循环逐一创建这个pod里面的container。
整个pod同步的代码说完了,总计6个步骤。