k8s源码解析(3)--apiserver-启动分析

apiserver-启动分析

1. 概念

APIServer提供了 k8s各类资源对象的CURD/watch、认证授权、准入控制等众多核心功能,在k8s中定位类似于大脑和心脏,它的功能包括:

  • 提供了集群管理的REST API接口(包括资源CURD、认证授权、数据校验以及集群状态变更);

  • 是所有模块的数据交互和通信的枢纽,各模块的运作都依赖于APIServer

  • 提供丰富多样的集群安全管控机制

  • 直连后端存储(Etcd),是唯一与存储后端直接通信的模块

Kubernetes API Server通过一个名为kube-apiserver的进程提供服务。

一般我们通过kubectl或者客户端(比如client-go)与apiserver进行交互。

2. cobra

cobra是一个用于创建命令行工具的库,地址是https://github.com/spf13/cobra,在k8s中使用cobra来创建命令行工具

比如:

kubectl get pod|service [podName|serviceName] -n <namespace>

cobra将

  • kubectl称作做rootcmd(即根命令)。

  • get称做rootcmd的subcmd。

  • pod|service则是get的subcmd。

  • podName、serviceName是pod/service的args

  • -n称作flag。

cobra的建议目录:

▾ appName/
  ▾ cmd/
      add.go
      your.go
      commands.go
      here.go
    main.go

将命令写在cmd下,一般每一个文件为一个命令

cobra程序入口一般为:

func main() {
  cmd.Execute()
}

3. apiserver启动

源码位于cmd/kube-apiserver/apiserver.go

func main() {
 command := app.NewAPIServerCommand()
 code := cli.Run(command)
 os.Exit(code)
}

上述启动过程就是利用cobra进行处理。

func NewAPIServerCommand() *cobra.Command {
 s := options.NewServerRunOptions()
 cmd := &cobra.Command{
  Use: "kube-apiserver",
  Long: `The Kubernetes API server validates and configures data
for the api objects which include pods, services, replicationcontrollers, and
others. The API Server services REST operations and provides the frontend to the
cluster's shared state through which all other components interact.`,

  // stop printing usage when the command errors
  SilenceUsage: true,
  PersistentPreRunE: func(*cobra.Command, []string) error {
   // silence client-go warnings.
   // kube-apiserver loopback clients should not log self-issued warnings.
   rest.SetDefaultWarningHandler(rest.NoWarnings{})
   return nil
  },
        //最终cobra会调用RunE这个函数,最终会执行Run函数进行启动
  RunE: func(cmd *cobra.Command, args []string) error {
   verflag.PrintAndExitIfRequested()
   fs := cmd.Flags()

   // Activate logging as soon as possible, after that
   // show flags with the final logging configuration.
   if err := s.Logs.ValidateAndApply(utilfeature.DefaultFeatureGate); err != nil {
    return err
   }
   cliflag.PrintFlags(fs)

   // set default options
   completedOptions, err := Complete(s)
   if err != nil {
    return err
   }

   // validate options
   if errs := completedOptions.Validate(); len(errs) != 0 {
    return utilerrors.NewAggregate(errs)
   }

   return Run(completedOptions, genericapiserver.SetupSignalHandler())
  },
  Args: func(cmd *cobra.Command, args []string) error {
   for _, arg := range args {
    if len(arg) > 0 {
     return fmt.Errorf("%q does not take any arguments, got %q", cmd.CommandPath(), args)
    }
   }
   return nil
  },
 }

 fs := cmd.Flags()
 namedFlagSets := s.Flags()
 verflag.AddFlags(namedFlagSets.FlagSet("global"))
 globalflag.AddGlobalFlags(namedFlagSets.FlagSet("global"), cmd.Name(), logs.SkipLoggingConfigurationFlags())
 options.AddCustomGlobalFlags(namedFlagSets.FlagSet("generic"))
 for _, f := range namedFlagSets.FlagSets {
  fs.AddFlagSet(f)
 }

 cols, _, _ := term.TerminalSize(cmd.OutOrStdout())
 cliflag.SetUsageAndHelpFunc(cmd, namedFlagSets, cols)

 return cmd
}

Run函数做了几件事:

  • 启动apiserver的3个server组件的路由

  • 注册健康检查,就绪探针,存活探针的地址

  • 启动http服务

func Run(completeOptions completedServerRunOptions, stopCh <-chan struct{}) error {
 // To help debugging, immediately log version
 klog.Infof("Version: %+v", version.Get())

 klog.InfoS("Golang settings", "GOGC", os.Getenv("GOGC"), "GOMAXPROCS", os.Getenv("GOMAXPROCS"), "GOTRACEBACK", os.Getenv("GOTRACEBACK"))
 //启动apiserver的3个server组件的路由
 server, err := CreateServerChain(completeOptions, stopCh)
 if err != nil {
  return err
 }
 //注册健康检查,就绪探针,存活探针的地址
 prepared, err := server.PrepareRun()
 if err != nil {
  return err
 }
 //启动http服务
 return prepared.Run(stopCh)
}

apiserver中包含3个server组件,apiserver依靠这3个组件来对不同类型的请求提供处理

  • APIExtensionServer: 主要负责处理CustomResourceDefination(CRD)方面的请求

  • KubeAPIServer: 主要负责处理k8s内置资源的请求,此外还会包括通用处理,认证、鉴权等

  • AggregratorServer: 主要负责aggregrate方面的处理,它充当一个代理服务器,将请求转发到聚合进来的k8s service中。

// CreateServerChain creates the apiservers connected via delegation.
func CreateServerChain(completedOptions completedServerRunOptions, stopCh <-chan struct{}) (*aggregatorapiserver.APIAggregator, error) {
 kubeAPIServerConfig, serviceResolver, pluginInitializer, err := CreateKubeAPIServerConfig(completedOptions)
 if err != nil {
  return nil, err
 }

 // If additional API servers are added, they should be gated.
 apiExtensionsConfig, err := createAPIExtensionsConfig(*kubeAPIServerConfig.GenericConfig, kubeAPIServerConfig.ExtraConfig.VersionedInformers, pluginInitializer, completedOptions.ServerRunOptions, completedOptions.MasterCount,
  serviceResolver, webhook.NewDefaultAuthenticationInfoResolverWrapper(kubeAPIServerConfig.ExtraConfig.ProxyTransport, kubeAPIServerConfig.GenericConfig.EgressSelector, kubeAPIServerConfig.GenericConfig.LoopbackClientConfig, kubeAPIServerConfig.GenericConfig.TracerProvider))
 if err != nil {
  return nil, err
 }

 notFoundHandler := notfoundhandler.New(kubeAPIServerConfig.GenericConfig.Serializer, genericapifilters.NoMuxAndDiscoveryIncompleteKey)
    //创建APIExtensionsServer并注册路由
 apiExtensionsServer, err := createAPIExtensionsServer(apiExtensionsConfig, genericapiserver.NewEmptyDelegateWithCustomHandler(notFoundHandler))
 if err != nil {
  return nil, err
 }
//创建KubeAPIServer并注册路由
 kubeAPIServer, err := CreateKubeAPIServer(kubeAPIServerConfig, apiExtensionsServer.GenericAPIServer)
 if err != nil {
  return nil, err
 }

 // aggregator comes last in the chain
 aggregatorConfig, err := createAggregatorConfig(*kubeAPIServerConfig.GenericConfig, completedOptions.ServerRunOptions, kubeAPIServerConfig.ExtraConfig.VersionedInformers, serviceResolver, kubeAPIServerConfig.ExtraConfig.ProxyTransport, pluginInitializer)
 if err != nil {
  return nil, err
 }
    //创建aggregatorServer并注册路由
 aggregatorServer, err := createAggregatorServer(aggregatorConfig, kubeAPIServer.GenericAPIServer, apiExtensionsServer.Informers)
 if err != nil {
  // we don't need special handling for innerStopCh because the aggregator server doesn't create any go routines
  return nil, err
 }

 return aggregatorServer, nil
}

创建每个server都要有对应它的config。

  • apiExtensionServer和aggregatorServer的Config需要依赖kubeAPIServerConfig

  • 这几个ServerConfig都需要依赖GenericConfig

  • CreateKubeAPIServerConfig创建kubeAPIServerConfig

  • 而CreateKubeAPIServerConfig调用buildGenericConfig创建GenericConfig。

ServerRunOptions来源:/etc/kubernetes/manifests/kube-apiserver.yaml

apiVersion: v1
kind: Pod
metadata:
  annotations:
    kubeadm.kubernetes.io/kube-apiserver.advertise-address.endpoint: 192.168.200.101:6443
  creationTimestamp: null
  labels:
    component: kube-apiserver
    tier: control-plane
  name: kube-apiserver
  namespace: kube-system
spec:
  containers:
  - command:
    - kube-apiserver
    - --advertise-address=192.168.200.101 # 向集群成员通知 apiserver 消息的 IP 地址。这个地址必须能够被集群中其他成员访问。如果 IP 地址为空,将会使用 --bind-address, 如果未指定 --bind-address,将会使用主机的默认接口地址。
    - --allow-privileged=true # 特权模式
    - --authorization-mode=Node,RBAC # 鉴权顺序 逗号分隔 :AlwaysAllow、AlwaysDeny、ABAC、Webhook、RBAC、Node
    - --client-ca-file=/etc/kubernetes/pki/ca.crt  # ca文件地址
    - --enable-admission-plugins=NodeRestriction # 启用的准入控制器,列表按逗号分隔:AlwaysAdmit、AlwaysDeny、AlwaysPullImages、CertificateApproval、CertificateSigning、CertificateSubjectRestriction、DefaultIngressClass、DefaultStorageClass、DefaultTolerationSeconds、DenyServiceExternalIPs、EventRateLimit、ExtendedResourceToleration、ImagePolicyWebhook、LimitPodHardAntiAffinityTopology、LimitRanger、MutatingAdmissionWebhook、NamespaceAutoProvision、NamespaceExists、NamespaceLifecycle、NodeRestriction、OwnerReferencesPermissionEnforcement、PersistentVolumeClaimResize、PersistentVolumeLabel、PodNodeSelector、PodSecurity、PodSecurityPolicy、PodTolerationRestriction、Priority、ResourceQuota、RuntimeClass、SecurityContextDeny、ServiceAccount、StorageObjectInUseProtection、TaintNodesByCondition、ValidatingAdmissionWebhook
    - --enable-bootstrap-token-auth=true # 启用以允许将 "kube-system" 名字空间中类型为 "bootstrap.kubernetes.io/token" 的 Secret 用于 TLS 引导身份验证。
    - --etcd-cafile=/etc/kubernetes/pki/etcd/ca.crt # etcd ca文件地址
    - --etcd-certfile=/etc/kubernetes/pki/apiserver-etcd-client.crt
    - --etcd-keyfile=/etc/kubernetes/pki/apiserver-etcd-client.key
    - --etcd-servers=https://127.0.0.1:2379 # etcd地址,逗号分隔
    - --kubelet-client-certificate=/etc/kubernetes/pki/apiserver-kubelet-client.crt
    - --kubelet-client-key=/etc/kubernetes/pki/apiserver-kubelet-client.key
    - --kubelet-preferred-address-types=InternalIP,ExternalIP,Hostname # 用于 kubelet连接的首选获取节点地址方式
    - --proxy-client-cert-file=/etc/kubernetes/pki/front-proxy-client.crt
    - --proxy-client-key-file=/etc/kubernetes/pki/front-proxy-client.key
    - --requestheader-allowed-names=front-proxy-client # 允许访问的客户端 common names 列表,通过 header 中 –requestheader-username-headers 参数指定的字段获取。客户端 common names 的名称需要在 client-ca-file 中进行设置,将其设置为空值时,表示任意客户端都可访问。
    - --requestheader-client-ca-file=/etc/kubernetes/pki/front-proxy-ca.crt
    - --requestheader-extra-headers-prefix=X-Remote-Extra- # 用于查验请求头的前缀列表
    - --requestheader-group-headers=X-Remote-Group # 用于查验用户组的请求头列表。建议使用 X-Remote-Group
    - --requestheader-username-headers=X-Remote-User # 用于查验用户名的请求头列表。建议使用 X-Remote-User
    - --secure-port=6443 # 带身份验证和鉴权机制的 HTTPS 服务端口
    - --service-account-issuer=https://kubernetes.default.svc.cluster.local
    - --service-account-key-file=/etc/kubernetes/pki/sa.pub
    - --service-account-signing-key-file=/etc/kubernetes/pki/sa.key
    - --service-cluster-ip-range=10.96.0.0/12 
    - --tls-cert-file=/etc/kubernetes/pki/apiserver.crt
    - --tls-private-key-file=/etc/kubernetes/pki/apiserver.key
    image: registry.aliyuncs.com/google_containers/kube-apiserver:v1.24.0
    imagePullPolicy: IfNotPresent
    livenessProbe:
      failureThreshold: 8
      httpGet:
        host: 192.168.200.101
        path: /livez
        port: 6443
        scheme: HTTPS
      initialDelaySeconds: 10
      periodSeconds: 10
      timeoutSeconds: 15
    name: kube-apiserver
    readinessProbe:
      failureThreshold: 3
      httpGet:
        host: 192.168.200.101
        path: /readyz
        port: 6443
        scheme: HTTPS
      periodSeconds: 1
      timeoutSeconds: 15
    resources:
      requests:
        cpu: 250m
    startupProbe:
      failureThreshold: 24
      httpGet:
        host: 192.168.200.101
        path: /livez
        port: 6443
        scheme: HTTPS
      initialDelaySeconds: 10
      periodSeconds: 10
      timeoutSeconds: 15
    volumeMounts:
    - mountPath: /etc/ssl/certs
      name: ca-certs
      readOnly: true
    - mountPath: /etc/pki
      name: etc-pki
      readOnly: true
    - mountPath: /etc/kubernetes/pki
      name: k8s-certs
      readOnly: true
  hostNetwork: true
  priorityClassName: system-node-critical
  securityContext:
    seccompProfile:
      type: RuntimeDefault
  volumes:
  - hostPath:
      path: /etc/ssl/certs
      type: DirectoryOrCreate
    name: ca-certs
  - hostPath:
      path: /etc/pki
      type: DirectoryOrCreate
    name: etc-pki
  - hostPath:
      path: /etc/kubernetes/pki
      type: DirectoryOrCreate
    name: k8s-certs
status: {}

// BuildGenericConfig takes the master server options and produces the genericapiserver.Config associated with it
func buildGenericConfig(
 s *options.ServerRunOptions,
 proxyTransport *http.Transport,
) (
 genericConfig *genericapiserver.Config,
 versionedInformers clientgoinformers.SharedInformerFactory,
 serviceResolver aggregatorapiserver.ServiceResolver,
 pluginInitializers []admission.PluginInitializer,
 admissionPostStartHook genericapiserver.PostStartHookFunc,
 storageFactory *serverstorage.DefaultStorageFactory,
 lastErr error,
) {
    //创建一个genericConfig对象
 genericConfig = genericapiserver.NewConfig(legacyscheme.Codecs)
    //设置genericConfig的字段
    //DefaultAPIResourceConfigSource启用或者禁用GV以及对应的resource等
 genericConfig.MergedResourceConfig = controlplane.DefaultAPIResourceConfigSource()

 if lastErr = s.GenericServerRunOptions.ApplyTo(genericConfig); lastErr != nil {
  return
 }

 if lastErr = s.SecureServing.ApplyTo(&genericConfig.SecureServing, &genericConfig.LoopbackClientConfig); lastErr != nil {
  return
 }
 if lastErr = s.Features.ApplyTo(genericConfig); lastErr != nil {
  return
 }
 if lastErr = s.APIEnablement.ApplyTo(genericConfig, controlplane.DefaultAPIResourceConfigSource(), legacyscheme.Scheme); lastErr != nil {
  return
 }
 if lastErr = s.EgressSelector.ApplyTo(genericConfig); lastErr != nil {
  return
 }
 if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.APIServerTracing) {
  if lastErr = s.Traces.ApplyTo(genericConfig.EgressSelector, genericConfig); lastErr != nil {
   return
  }
 }
 // wrap the definitions to revert any changes from disabled features
 getOpenAPIDefinitions := openapi.GetOpenAPIDefinitionsWithoutDisabledFeatures(generatedopenapi.GetOpenAPIDefinitions)
    //openAPI的配置 有关openapi的规范参考:https://swagger.io/specification/#introduction
 genericConfig.OpenAPIConfig = genericapiserver.DefaultOpenAPIConfig(getOpenAPIDefinitions, openapinamer.NewDefinitionNamer(legacyscheme.Scheme, extensionsapiserver.Scheme, aggregatorscheme.Scheme))
 genericConfig.OpenAPIConfig.Info.Title = "Kubernetes"
 if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.OpenAPIV3) {
  genericConfig.OpenAPIV3Config = genericapiserver.DefaultOpenAPIV3Config(getOpenAPIDefinitions, openapinamer.NewDefinitionNamer(legacyscheme.Scheme, extensionsapiserver.Scheme, aggregatorscheme.Scheme))
  genericConfig.OpenAPIV3Config.Info.Title = "Kubernetes"
 }

 genericConfig.LongRunningFunc = filters.BasicLongRunningRequestCheck(
  sets.NewString("watch", "proxy"),
  sets.NewString("attach", "exec", "proxy", "log", "portforward"),
 )

 kubeVersion := version.Get()
 genericConfig.Version = &kubeVersion

 storageFactoryConfig := kubeapiserver.NewStorageFactoryConfig()
 storageFactoryConfig.APIResourceConfig = genericConfig.MergedResourceConfig
    //etcd的配置
 completedStorageFactoryConfig, err := storageFactoryConfig.Complete(s.Etcd)
 if err != nil {
  lastErr = err
  return
 }
 storageFactory, lastErr = completedStorageFactoryConfig.New()
 if lastErr != nil {
  return
 }
 if genericConfig.EgressSelector != nil {
  storageFactory.StorageConfig.Transport.EgressLookup = genericConfig.EgressSelector.Lookup
 }
 if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.APIServerTracing) && genericConfig.TracerProvider != nil {
  storageFactory.StorageConfig.Transport.TracerProvider = genericConfig.TracerProvider
 }
 if lastErr = s.Etcd.ApplyWithStorageFactoryTo(storageFactory, genericConfig); lastErr != nil {
  return
 }

 // Use protobufs for self-communication.
 // Since not every generic apiserver has to support protobufs, we
 // cannot default to it in generic apiserver and need to explicitly
 // set it in kube-apiserver.
 genericConfig.LoopbackClientConfig.ContentConfig.ContentType = "application/vnd.kubernetes.protobuf"
 // Disable compression for self-communication, since we are going to be
 // on a fast local network
 genericConfig.LoopbackClientConfig.DisableCompression = true

 kubeClientConfig := genericConfig.LoopbackClientConfig
 clientgoExternalClient, err := clientgoclientset.NewForConfig(kubeClientConfig)
 if err != nil {
  lastErr = fmt.Errorf("failed to create real external clientset: %v", err)
  return
 }
 versionedInformers = clientgoinformers.NewSharedInformerFactory(clientgoExternalClient, 10*time.Minute)

 // Authentication.ApplyTo requires already applied OpenAPIConfig and EgressSelector if present
    //创建认证实例
 if lastErr = s.Authentication.ApplyTo(&genericConfig.Authentication, genericConfig.SecureServing, genericConfig.EgressSelector, genericConfig.OpenAPIConfig, genericConfig.OpenAPIV3Config, clientgoExternalClient, versionedInformers); lastErr != nil {
  return
 }
 //创建鉴权实例
 genericConfig.Authorization.Authorizer, genericConfig.RuleResolver, err = BuildAuthorizer(s, genericConfig.EgressSelector, versionedInformers)
 if err != nil {
  lastErr = fmt.Errorf("invalid authorization config: %v", err)
  return
 }
 if !sets.NewString(s.Authorization.Modes...).Has(modes.ModeRBAC) {
  genericConfig.DisabledPostStartHooks.Insert(rbacrest.PostStartHookName)
 }

 lastErr = s.Audit.ApplyTo(genericConfig)
 if lastErr != nil {
  return
 }

 admissionConfig := &kubeapiserveradmission.Config{
  ExternalInformers:    versionedInformers,
  LoopbackClientConfig: genericConfig.LoopbackClientConfig,
  CloudConfigFile:      s.CloudProvider.CloudConfigFile,
 }
 serviceResolver = buildServiceResolver(s.EnableAggregatorRouting, genericConfig.LoopbackClientConfig.Host, versionedInformers)
 pluginInitializers, admissionPostStartHook, err = admissionConfig.New(proxyTransport, genericConfig.EgressSelector, serviceResolver, genericConfig.TracerProvider)
 if err != nil {
  lastErr = fmt.Errorf("failed to create admission plugin initializer: %v", err)
  return
 }
 //准入控制器
 err = s.Admission.ApplyTo(
  genericConfig,
  versionedInformers,
  kubeClientConfig,
  utilfeature.DefaultFeatureGate,
  pluginInitializers...)
 if err != nil {
  lastErr = fmt.Errorf("failed to initialize admission: %v", err)
  return
 }

 if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.APIPriorityAndFairness) && s.GenericServerRunOptions.EnablePriorityAndFairness {
  genericConfig.FlowControl, lastErr = BuildPriorityAndFairness(s, clientgoExternalClient, versionedInformers)
 }

 return
}

3.1 scheme

之前我们了解过Group,Version,Resource,Kind的概念,为了管理资源,apiserver设计了一个scheme的结构将资源存储在内存当中。

scheme有如下作用:

  • 提供资源的版本转换功能

  • 提供资源的序列化/反序列化功能

Scheme支持注册两种类型的资源:

  • UnversionedType 无版本资源。这个在现版本的k8s中使用非常少,可以忽略

  • VersionedType 几乎所有的资源都是携带版本的,是常用的类型

我们在server文件的import中会发现scheme包的导入:

"k8s.io/kubernetes/pkg/api/legacyscheme"
aggregatorscheme "k8s.io/kube-aggregator/pkg/apiserver/scheme"

legacyscheme包在导入时会初始化空的scheme注册表。

aggregatorscheme包导入的时候会想scheme中注册gvk

var (
 // Scheme defines methods for serializing and deserializing API objects.
 Scheme = runtime.NewScheme()
 // Codecs provides methods for retrieving codecs and serializers for specific
 // versions and content types.
 Codecs = serializer.NewCodecFactory(Scheme)
)

func init() {
 AddToScheme(Scheme)
 install.Install(Scheme)
}

// AddToScheme adds the types of this group into the given scheme.
func AddToScheme(scheme *runtime.Scheme) {
 utilruntime.Must(v1beta1.AddToScheme(scheme))
 utilruntime.Must(v1.AddToScheme(scheme))
 utilruntime.Must(apiregistration.AddToScheme(scheme))
}

其实我们查看pkg/下的各个group,比如pkg/apis/apps/install

func init() {
    //init加载此文件的时候会调用,将gvk注册到scheme中
 Install(legacyscheme.Scheme)
}

// Install registers the API group and adds types to a scheme
func Install(scheme *runtime.Scheme) {
 utilruntime.Must(apps.AddToScheme(scheme))
 utilruntime.Must(v1beta1.AddToScheme(scheme))
 utilruntime.Must(v1beta2.AddToScheme(scheme))
 utilruntime.Must(v1.AddToScheme(scheme))
 utilruntime.Must(scheme.SetVersionPriority(v1.SchemeGroupVersion, v1beta2.SchemeGroupVersion, v1beta1.SchemeGroupVersion))
}

3.2 认证配置说明

APIServer支持如下的认证策略:

  • X509 Client Certs

  • Static Token File

  • Bootstrap Tokens

  • Service Account Tokens

  • OpenID Connect Tokens(OIDC)

  • Webhook Token Authentication

  • Authenticating Proxy

会依次创建各个认证器,当请求进来会进行依次匹配,只要有一个返回true就意味着认证通过。

参考地址: https://kubernetes.io/docs/reference/access-authn-authz/authentication/#authentication-strategies

func (config Config) New() (authenticator.Request, *spec.SecurityDefinitions, error) {
 var authenticators []authenticator.Request
 var tokenAuthenticators []authenticator.Token
 securityDefinitions := spec.SecurityDefinitions{}

 // front-proxy, BasicAuth methods, local first, then remote
 // Add the front proxy authenticator if requested
 if config.RequestHeaderConfig != nil {
  requestHeaderAuthenticator := headerrequest.NewDynamicVerifyOptionsSecure(
   config.RequestHeaderConfig.CAContentProvider.VerifyOptions,
   config.RequestHeaderConfig.AllowedClientNames,
   config.RequestHeaderConfig.UsernameHeaders,
   config.RequestHeaderConfig.GroupHeaders,
   config.RequestHeaderConfig.ExtraHeaderPrefixes,
  )
  authenticators = append(authenticators, authenticator.WrapAudienceAgnosticRequest(config.APIAudiences, requestHeaderAuthenticator))
 }

 // X509 methods
 if config.ClientCAContentProvider != nil {
  certAuth := x509.NewDynamic(config.ClientCAContentProvider.VerifyOptions, x509.CommonNameUserConversion)
  authenticators = append(authenticators, certAuth)
 }

 // Bearer token methods, local first, then remote
 if len(config.TokenAuthFile) > 0 {
  tokenAuth, err := newAuthenticatorFromTokenFile(config.TokenAuthFile)
  if err != nil {
   return nil, nil, err
  }
  tokenAuthenticators = append(tokenAuthenticators, authenticator.WrapAudienceAgnosticToken(config.APIAudiences, tokenAuth))
 }
 if len(config.ServiceAccountKeyFiles) > 0 {
  serviceAccountAuth, err := newLegacyServiceAccountAuthenticator(config.ServiceAccountKeyFiles, config.ServiceAccountLookup, config.APIAudiences, config.ServiceAccountTokenGetter)
  if err != nil {
   return nil, nil, err
  }
  tokenAuthenticators = append(tokenAuthenticators, serviceAccountAuth)
 }
 if len(config.ServiceAccountIssuers) > 0 {
  serviceAccountAuth, err := newServiceAccountAuthenticator(config.ServiceAccountIssuers, config.ServiceAccountKeyFiles, config.APIAudiences, config.ServiceAccountTokenGetter)
  if err != nil {
   return nil, nil, err
  }
  tokenAuthenticators = append(tokenAuthenticators, serviceAccountAuth)
 }
 if config.BootstrapToken {
  if config.BootstrapTokenAuthenticator != nil {
   // TODO: This can sometimes be nil because of
   tokenAuthenticators = append(tokenAuthenticators, authenticator.WrapAudienceAgnosticToken(config.APIAudiences, config.BootstrapTokenAuthenticator))
  }
 }
 // NOTE(ericchiang): Keep the OpenID Connect after Service Accounts.
 //
 // Because both plugins verify JWTs whichever comes first in the union experiences
 // cache misses for all requests using the other. While the service account plugin
 // simply returns an error, the OpenID Connect plugin may query the provider to
 // update the keys, causing performance hits.
 if len(config.OIDCIssuerURL) > 0 && len(config.OIDCClientID) > 0 {
  // TODO(enj): wire up the Notifier and ControllerRunner bits when OIDC supports CA reload
  var oidcCAContent oidc.CAContentProvider
  if len(config.OIDCCAFile) != 0 {
   var oidcCAErr error
   oidcCAContent, oidcCAErr = dynamiccertificates.NewDynamicCAContentFromFile("oidc-authenticator", config.OIDCCAFile)
   if oidcCAErr != nil {
    return nil, nil, oidcCAErr
   }
  }

  oidcAuth, err := newAuthenticatorFromOIDCIssuerURL(oidc.Options{
   IssuerURL:            config.OIDCIssuerURL,
   ClientID:             config.OIDCClientID,
   CAContentProvider:    oidcCAContent,
   UsernameClaim:        config.OIDCUsernameClaim,
   UsernamePrefix:       config.OIDCUsernamePrefix,
   GroupsClaim:          config.OIDCGroupsClaim,
   GroupsPrefix:         config.OIDCGroupsPrefix,
   SupportedSigningAlgs: config.OIDCSigningAlgs,
   RequiredClaims:       config.OIDCRequiredClaims,
  })
  if err != nil {
   return nil, nil, err
  }
  tokenAuthenticators = append(tokenAuthenticators, authenticator.WrapAudienceAgnosticToken(config.APIAudiences, oidcAuth))
 }
 if len(config.WebhookTokenAuthnConfigFile) > 0 {
  webhookTokenAuth, err := newWebhookTokenAuthenticator(config)
  if err != nil {
   return nil, nil, err
  }

  tokenAuthenticators = append(tokenAuthenticators, webhookTokenAuth)
 }

 if len(tokenAuthenticators) > 0 {
  // Union the token authenticators
  tokenAuth := tokenunion.New(tokenAuthenticators...)
  // Optionally cache authentication results
  if config.TokenSuccessCacheTTL > 0 || config.TokenFailureCacheTTL > 0 {
   tokenAuth = tokencache.New(tokenAuth, true, config.TokenSuccessCacheTTL, config.TokenFailureCacheTTL)
  }
  authenticators = append(authenticators, bearertoken.New(tokenAuth), websocket.NewProtocolAuthenticator(tokenAuth))
  securityDefinitions["BearerToken"] = &spec.SecurityScheme{
   SecuritySchemeProps: spec.SecuritySchemeProps{
    Type:        "apiKey",
    Name:        "authorization",
    In:          "header",
    Description: "Bearer Token authentication",
   },
  }
 }

 if len(authenticators) == 0 {
  if config.Anonymous {
   return anonymous.NewAuthenticator(), &securityDefinitions, nil
  }
  return nil, &securityDefinitions, nil
 }

 authenticator := union.New(authenticators...)

 authenticator = group.NewAuthenticatedGroupAdder(authenticator)

 if config.Anonymous {
  // If the authenticator chain returns an error, return an error (don't consider a bad bearer token
  // or invalid username/password combination anonymous).
  authenticator = union.NewFailOnError(authenticator, anonymous.NewAuthenticator())
 }

 return authenticator, &securityDefinitions, nil
}

authenticator := union.New(authenticators...)集合了所有的认证器。

每一个认证器都会封装进http.Handler函数中来接收和处理客户端的认证请求。

3.3 授权配置

授权配置的流程与认证配置基本一致。

APIServer支持如下的授权策略:

  • AlwaysAllow

  • AlwaysDeny

  • webhook授权

  • node授权

  • ABAC授权

  • RBAC授权

参考官方文档:

https://kubernetes.io/docs/reference/access-authn-authz/authorization/

// BuildAuthorizer constructs the authorizer
func BuildAuthorizer(s *options.ServerRunOptions, EgressSelector *egressselector.EgressSelector, versionedInformers clientgoinformers.SharedInformerFactory) (authorizer.Authorizer, authorizer.RuleResolver, error) {
 authorizationConfig := s.Authorization.ToAuthorizationConfig(versionedInformers)

 if EgressSelector != nil {
  egressDialer, err := EgressSelector.Lookup(egressselector.ControlPlane.AsNetworkContext())
  if err != nil {
   return nil, nil, err
  }
  authorizationConfig.CustomDial = egressDialer
 }

 return authorizationConfig.New()
}

4. APIExtensionServer

APIExtensionServer的创建流程大致包含以下几个步骤

  • 创建genericServer

  • 实例化CustomResourceDefinitions

  • 实例化APIGroupInfo (存储GV,GVR等信息)

  • InstallAPIGroup(注册为http服务的handler)

func createAPIExtensionsServer(apiextensionsConfig *apiextensionsapiserver.Config, delegateAPIServer genericapiserver.DelegationTarget) (*apiextensionsapiserver.CustomResourceDefinitions, error) {
 return apiextensionsConfig.Complete().New(delegateAPIServer)
}


// New returns a new instance of CustomResourceDefinitions from the given config.
func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget) (*CustomResourceDefinitions, error) {
    //创建genericServer
    //genericServer提供了一个通用的http server,定义了通用的模板,例如地址、端口、认证、授权、健康检查等等通用功能。无论是APIServer还是APIExtensionsServer都依赖于genericServer。
 genericServer, err := c.GenericConfig.New("apiextensions-apiserver", delegationTarget)
 if err != nil {
  return nil, err
 }

 // hasCRDInformerSyncedSignal is closed when the CRD informer this server uses has been fully synchronized.
 // It ensures that requests to potential custom resource endpoints while the server hasn't installed all known HTTP paths get a 503 error instead of a 404
 hasCRDInformerSyncedSignal := make(chan struct{})
 if err := genericServer.RegisterMuxAndDiscoveryCompleteSignal("CRDInformerHasNotSynced", hasCRDInformerSyncedSignal); err != nil {
  return nil, err
 }
 //实例化 CustomResourceDefinitions
 s := &CustomResourceDefinitions{
  GenericAPIServer: genericServer,
 }

 apiResourceConfig := c.GenericConfig.MergedResourceConfig
    //实例化APIGroupInfo
 apiGroupInfo := genericapiserver.NewDefaultAPIGroupInfo(apiextensions.GroupName, Scheme, metav1.ParameterCodec, Codecs)
 storage := map[string]rest.Storage{}
 // customresourcedefinitions
    //判断配置中是否开启了apiextensions.k8s.io/v1这个GV
 if resource := "customresourcedefinitions"; apiResourceConfig.ResourceEnabled(v1.SchemeGroupVersion.WithResource(resource)) {
        //生成CRD对应的RESTStorage
  customResourceDefinitionStorage, err := customresourcedefinition.NewREST(Scheme, c.GenericConfig.RESTOptionsGetter)
  if err != nil {
   return nil, err
  }
  storage[resource] = customResourceDefinitionStorage
        //生成CRD status子资源对应的RESTStorage
  storage[resource+"/status"] = customresourcedefinition.NewStatusREST(Scheme, customResourceDefinitionStorage)
 }
 if len(storage) > 0 {
        //将apiGroupInfo和RESTStorage关联起来,下一步注册apiGroupInfo会用到
  apiGroupInfo.VersionedResourcesStorageMap[v1.SchemeGroupVersion.Version] = storage
 }
 //InstallAPIGroup注册 
 if err := s.GenericAPIServer.InstallAPIGroup(&apiGroupInfo); err != nil {
  return nil, err
 }

 crdClient, err := clientset.NewForConfig(s.GenericAPIServer.LoopbackClientConfig)
 if err != nil {
  // it's really bad that this is leaking here, but until we can fix the test (which I'm pretty sure isn't even testing what it wants to test),
  // we need to be able to move forward
  return nil, fmt.Errorf("failed to create clientset: %v", err)
 }
 s.Informers = externalinformers.NewSharedInformerFactory(crdClient, 5*time.Minute)

 delegateHandler := delegationTarget.UnprotectedHandler()
 if delegateHandler == nil {
  delegateHandler = http.NotFoundHandler()
 }

 versionDiscoveryHandler := &versionDiscoveryHandler{
  discovery: map[schema.GroupVersion]*discovery.APIVersionHandler{},
  delegate:  delegateHandler,
 }
 groupDiscoveryHandler := &groupDiscoveryHandler{
  discovery: map[string]*discovery.APIGroupHandler{},
  delegate:  delegateHandler,
 }
 establishingController := establish.NewEstablishingController(s.Informers.Apiextensions().V1().CustomResourceDefinitions(), crdClient.ApiextensionsV1())
 crdHandler, err := NewCustomResourceDefinitionHandler(
  versionDiscoveryHandler,
  groupDiscoveryHandler,
  s.Informers.Apiextensions().V1().CustomResourceDefinitions(),
  delegateHandler,
  c.ExtraConfig.CRDRESTOptionsGetter,
  c.GenericConfig.AdmissionControl,
  establishingController,
  c.ExtraConfig.ServiceResolver,
  c.ExtraConfig.AuthResolverWrapper,
  c.ExtraConfig.MasterCount,
  s.GenericAPIServer.Authorizer,
  c.GenericConfig.RequestTimeout,
  time.Duration(c.GenericConfig.MinRequestTimeout)*time.Second,
  apiGroupInfo.StaticOpenAPISpec,
  c.GenericConfig.MaxRequestBodyBytes,
 )
 if err != nil {
  return nil, err
 }
 s.GenericAPIServer.Handler.NonGoRestfulMux.Handle("/apis", crdHandler)
 s.GenericAPIServer.Handler.NonGoRestfulMux.HandlePrefix("/apis/", crdHandler)

 discoveryController := NewDiscoveryController(s.Informers.Apiextensions().V1().CustomResourceDefinitions(), versionDiscoveryHandler, groupDiscoveryHandler)
 namingController := status.NewNamingConditionController(s.Informers.Apiextensions().V1().CustomResourceDefinitions(), crdClient.ApiextensionsV1())
 nonStructuralSchemaController := nonstructuralschema.NewConditionController(s.Informers.Apiextensions().V1().CustomResourceDefinitions(), crdClient.ApiextensionsV1())
 apiApprovalController := apiapproval.NewKubernetesAPIApprovalPolicyConformantConditionController(s.Informers.Apiextensions().V1().CustomResourceDefinitions(), crdClient.ApiextensionsV1())
 finalizingController := finalizer.NewCRDFinalizer(
  s.Informers.Apiextensions().V1().CustomResourceDefinitions(),
  crdClient.ApiextensionsV1(),
  crdHandler,
 )
 openapiController := openapicontroller.NewController(s.Informers.Apiextensions().V1().CustomResourceDefinitions())
 var openapiv3Controller *openapiv3controller.Controller
 if utilfeature.DefaultFeatureGate.Enabled(features.OpenAPIV3) {
  openapiv3Controller = openapiv3controller.NewController(s.Informers.Apiextensions().V1().CustomResourceDefinitions())
 }

 s.GenericAPIServer.AddPostStartHookOrDie("start-apiextensions-informers", func(context genericapiserver.PostStartHookContext) error {
  s.Informers.Start(context.StopCh)
  return nil
 })
 s.GenericAPIServer.AddPostStartHookOrDie("start-apiextensions-controllers", func(context genericapiserver.PostStartHookContext) error {
  // OpenAPIVersionedService and StaticOpenAPISpec are populated in generic apiserver PrepareRun().
  // Together they serve the /openapi/v2 endpoint on a generic apiserver. A generic apiserver may
  // choose to not enable OpenAPI by having null openAPIConfig, and thus OpenAPIVersionedService
  // and StaticOpenAPISpec are both null. In that case we don't run the CRD OpenAPI controller.
  if s.GenericAPIServer.StaticOpenAPISpec != nil {
   if s.GenericAPIServer.OpenAPIVersionedService != nil {
    go openapiController.Run(s.GenericAPIServer.StaticOpenAPISpec, s.GenericAPIServer.OpenAPIVersionedService, context.StopCh)
   }

   if s.GenericAPIServer.OpenAPIV3VersionedService != nil && utilfeature.DefaultFeatureGate.Enabled(features.OpenAPIV3) {
    go openapiv3Controller.Run(s.GenericAPIServer.OpenAPIV3VersionedService, context.StopCh)
   }
  }

  go namingController.Run(context.StopCh)
  go establishingController.Run(context.StopCh)
  go nonStructuralSchemaController.Run(5, context.StopCh)
  go apiApprovalController.Run(5, context.StopCh)
  go finalizingController.Run(5, context.StopCh)

  discoverySyncedCh := make(chan struct{})
  go discoveryController.Run(context.StopCh, discoverySyncedCh)
  select {
  case <-context.StopCh:
  case <-discoverySyncedCh:
  }

  return nil
 })
 // we don't want to report healthy until we can handle all CRDs that have already been registered.  Waiting for the informer
 // to sync makes sure that the lister will be valid before we begin.  There may still be races for CRDs added after startup,
 // but we won't go healthy until we can handle the ones already present.
 s.GenericAPIServer.AddPostStartHookOrDie("crd-informer-synced", func(context genericapiserver.PostStartHookContext) error {
  return wait.PollImmediateUntil(100*time.Millisecond, func() (bool, error) {
   if s.Informers.Apiextensions().V1().CustomResourceDefinitions().Informer().HasSynced() {
    close(hasCRDInformerSyncedSignal)
    return true, nil
   }
   return false, nil
  }, context.StopCh)
 })

 return s, nil
}
// NewREST returns a RESTStorage object that will work against API services.
func NewREST(scheme *runtime.Scheme, optsGetter generic.RESTOptionsGetter) (*REST, error) {
 strategy := NewStrategy(scheme)
 //Store结构体是一个对于各类k8s资源通用的REST封装的实现,可以实现CURD功能以及满足相应的CURD策略,同时实现了对接后端etcd存储进行相应的增删改查操作。
 store := &genericregistry.Store{
  NewFunc:                  func() runtime.Object { return &apiextensions.CustomResourceDefinition{} },
  NewListFunc:              func() runtime.Object { return &apiextensions.CustomResourceDefinitionList{} },
  PredicateFunc:            MatchCustomResourceDefinition,
  DefaultQualifiedResource: apiextensions.Resource("customresourcedefinitions"),

  CreateStrategy:      strategy,
  UpdateStrategy:      strategy,
  DeleteStrategy:      strategy,
  ResetFieldsStrategy: strategy,

  // TODO: define table converter that exposes more than name/creation timestamp
  TableConvertor: rest.NewDefaultTableConvertor(apiextensions.Resource("customresourcedefinitions")),
 }
 options := &generic.StoreOptions{RESTOptions: optsGetter, AttrFunc: GetAttrs}
 if err := store.CompleteWithOptions(options); err != nil {
  return nil, err
 }
 return &REST{store}, nil
}

注册了CRD对应的Store后,即可实现其对应的CURD方法,例如查询某个CRD对象对应的api是:

GET /apis/apiextensions.k8s.io/v1/namespaces/${namespace}/crd/${crd-name}

4.1 installAPIGroup


// Exposes given api groups in the API.
func (s *GenericAPIServer) InstallAPIGroups(apiGroupInfos ...*APIGroupInfo) error {
 for _, apiGroupInfo := range apiGroupInfos {
  // Do not register empty group or empty version.  Doing so claims /apis/ for the wrong entity to be returned.
  // Catching these here places the error  much closer to its origin
  if len(apiGroupInfo.PrioritizedVersions[0].Group) == 0 {
   return fmt.Errorf("cannot register handler with an empty group for %#v", *apiGroupInfo)
  }
  if len(apiGroupInfo.PrioritizedVersions[0].Version) == 0 {
   return fmt.Errorf("cannot register handler with an empty version for %#v", *apiGroupInfo)
  }
 }

 openAPIModels, err := s.getOpenAPIModels(APIGroupPrefix, apiGroupInfos...)
 if err != nil {
  return fmt.Errorf("unable to get openapi models: %v", err)
 }

 for _, apiGroupInfo := range apiGroupInfos {
        //核心 将apiGroupInfos所有的resource进行注册
  if err := s.installAPIResources(APIGroupPrefix, apiGroupInfo, openAPIModels); err != nil {
   return fmt.Errorf("unable to install api resources: %v", err)
  }

  // setup discovery
  // Install the version handler.
  // Add a handler at /apis/<groupName> to enumerate all versions supported by this group.
  apiVersionsForDiscovery := []metav1.GroupVersionForDiscovery{}
  for _, groupVersion := range apiGroupInfo.PrioritizedVersions {
   // Check the config to make sure that we elide versions that don't have any resources
   if len(apiGroupInfo.VersionedResourcesStorageMap[groupVersion.Version]) == 0 {
    continue
   }
   apiVersionsForDiscovery = append(apiVersionsForDiscovery, metav1.GroupVersionForDiscovery{
    GroupVersion: groupVersion.String(),
    Version:      groupVersion.Version,
   })
  }
  preferredVersionForDiscovery := metav1.GroupVersionForDiscovery{
   GroupVersion: apiGroupInfo.PrioritizedVersions[0].String(),
   Version:      apiGroupInfo.PrioritizedVersions[0].Version,
  }
  apiGroup := metav1.APIGroup{
   Name:             apiGroupInfo.PrioritizedVersions[0].Group,
   Versions:         apiVersionsForDiscovery,
   PreferredVersion: preferredVersionForDiscovery,
  }

  s.DiscoveryGroupManager.AddGroup(apiGroup)
  s.Handler.GoRestfulContainer.Add(discovery.NewAPIGroupHandler(s.Serializer, apiGroup).WebService())
 }
 return nil
}

// installAPIResources is a private method for installing the REST storage backing each api groupversionresource
func (s *GenericAPIServer) installAPIResources(apiPrefix string, apiGroupInfo *APIGroupInfo, openAPIModels openapiproto.Models) error {
 var resourceInfos []*storageversion.ResourceInfo
 for _, groupVersion := range apiGroupInfo.PrioritizedVersions {
  if len(apiGroupInfo.VersionedResourcesStorageMap[groupVersion.Version]) == 0 {
   klog.Warningf("Skipping API %v because it has no resources.", groupVersion)
   continue
  }

  apiGroupVersion, err := s.getAPIGroupVersion(apiGroupInfo, groupVersion, apiPrefix)
  if err != nil {
   return err
  }
  if apiGroupInfo.OptionsExternalVersion != nil {
   apiGroupVersion.OptionsExternalVersion = apiGroupInfo.OptionsExternalVersion
  }
  apiGroupVersion.OpenAPIModels = openAPIModels

  if openAPIModels != nil && utilfeature.DefaultFeatureGate.Enabled(features.ServerSideApply) {
   typeConverter, err := fieldmanager.NewTypeConverter(openAPIModels, false)
   if err != nil {
    return err
   }
   apiGroupVersion.TypeConverter = typeConverter
  }

  apiGroupVersion.MaxRequestBodyBytes = s.maxRequestBodyBytes
  //将gv注册为rest服务的handler 用于后续调用
  r, err := apiGroupVersion.InstallREST(s.Handler.GoRestfulContainer)
  if err != nil {
   return fmt.Errorf("unable to setup API %v: %v", apiGroupInfo, err)
  }
  resourceInfos = append(resourceInfos, r...)
 }

 if utilfeature.DefaultFeatureGate.Enabled(features.StorageVersionAPI) &&
  utilfeature.DefaultFeatureGate.Enabled(features.APIServerIdentity) {
  // API installation happens before we start listening on the handlers,
  // therefore it is safe to register ResourceInfos here. The handler will block
  // write requests until the storage versions of the targeting resources are updated.
  s.StorageVersionManager.AddResourceInfo(resourceInfos...)
 }

 return nil
}

// Install handlers for API resources.
func (a *APIInstaller) Install() ([]metav1.APIResource, []*storageversion.ResourceInfo, *restful.WebService, []error) {
 var apiResources []metav1.APIResource
 var resourceInfos []*storageversion.ResourceInfo
 var errors []error
    // new 一个go-restful WebService实例
 ws := a.newWebService()

 // Register the paths in a deterministic (sorted) order to get a deterministic swagger spec.
 paths := make([]string, len(a.group.Storage))
 var i int = 0
 for path := range a.group.Storage {
  paths[i] = path
  i++
 }
 sort.Strings(paths)
 for _, path := range paths {
        //将上一步的RESTStorage与ws绑定,通过此方法,将http 路径路由到RESTStorage对应的方法上去
  apiResource, resourceInfo, err := a.registerResourceHandlers(path, a.group.Storage[path], ws)
  if err != nil {
   errors = append(errors, fmt.Errorf("error in registering resource: %s, %v", path, err))
  }
  if apiResource != nil {
   apiResources = append(apiResources, *apiResource)
  }
  if resourceInfo != nil {
   resourceInfos = append(resourceInfos, resourceInfo)
  }
 }
 return apiResources, resourceInfos, ws, errors
}

当apiExtensionServer启动后,可以访问该gv下的资源列表:

#免认证代理
[root@master ~]# nohup kubectl proxy --address=0.0.0.0 --port=8000 --accept-hosts=^.* &
[root@master ~]# curl http://127.0.0.1:8000/apis/apiextensions.k8s.io/v1               
{
  "kind": "APIResourceList",
  "apiVersion": "v1",
  "groupVersion": "apiextensions.k8s.io/v1",
  "resources": [
    {
      "name": "customresourcedefinitions",
      "singularName": "",
      "namespaced": false,
      "kind": "CustomResourceDefinition",
      "verbs": [
        "create",
        "delete",
        "deletecollection",
        "get",
        "list",
        "patch",
        "update",
        "watch"
      ],
      "shortNames": [
        "crd",
        "crds"
      ],
      "categories": [
        "api-extensions"
      ],
      "storageVersionHash": "jfWCUB31mvA="
    },
    {
      "name": "customresourcedefinitions/status",
      "singularName": "",
      "namespaced": false,
      "kind": "CustomResourceDefinition",
      "verbs": [
        "get",
        "patch",
        "update"
      ]
    }
  ]
}

4.2 go-restful

地址:https://github.com/emicklei/go-restful

go-restful层级结构概念自顶上下依次有:

  • Container: 一个Container就是一个独立的http server,可拥有独立的地址端口组合(类似nginx的server层级)

  • WebService: 大粒度的分类,某一类别的服务可归属到同一个WebService中,其下包含多个Route,这组route都会有一个共同的basePath或者说他们的URL的prefix是相同的

  • Route: 每个Route对应具体的uri路径,将该路径路由到对应的handler函数上

5. KubeAPIServer

KubeAPIServer处理k8s内置资源请求,它的创建流程与APIExtensionServer类似,包含下面几个步骤

  • 创建GeneriAPIServer

  • 实例化Instance

  • installLegacyAPI (/api下的资源)

  • installAPI(/apis下的资源)

其中Instance是KubeAPIServer的Server对象。

// New returns a new instance of Master from the given config.
// Certain config fields will be set to a default value if unset.
// Certain config fields must be specified, including:
//
//	KubeletClientConfig
func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget) (*Instance, error) {
 if reflect.DeepEqual(c.ExtraConfig.KubeletClientConfig, kubeletclient.KubeletClientConfig{}) {
  return nil, fmt.Errorf("Master.New() called with empty config.KubeletClientConfig")
 }
 ////创建GenericServer
 s, err := c.GenericConfig.New("kube-apiserver", delegationTarget)
 if err != nil {
  return nil, err
 }

 if c.ExtraConfig.EnableLogsSupport {
  routes.Logs{}.Install(s.Handler.GoRestfulContainer)
 }

 // Metadata and keys are expected to only change across restarts at present,
 // so we just marshal immediately and serve the cached JSON bytes.
 md, err := serviceaccount.NewOpenIDMetadata(
  c.ExtraConfig.ServiceAccountIssuerURL,
  c.ExtraConfig.ServiceAccountJWKSURI,
  c.GenericConfig.ExternalAddress,
  c.ExtraConfig.ServiceAccountPublicKeys,
 )
 if err != nil {
  // If there was an error, skip installing the endpoints and log the
  // error, but continue on. We don't return the error because the
  // metadata responses require additional, backwards incompatible
  // validation of command-line options.
  msg := fmt.Sprintf("Could not construct pre-rendered responses for"+
   " ServiceAccountIssuerDiscovery endpoints. Endpoints will not be"+
   " enabled. Error: %v", err)
  if c.ExtraConfig.ServiceAccountIssuerURL != "" {
   // The user likely expects this feature to be enabled if issuer URL is
   // set and the feature gate is enabled. In the future, if there is no
   // longer a feature gate and issuer URL is not set, the user may not
   // expect this feature to be enabled. We log the former case as an Error
   // and the latter case as an Info.
   klog.Error(msg)
  } else {
   klog.Info(msg)
  }
 } else {
  routes.NewOpenIDMetadataServer(md.ConfigJSON, md.PublicKeysetJSON).
   Install(s.Handler.GoRestfulContainer)
 }
 //创建Install实例
 m := &Instance{
  GenericAPIServer:          s,
  ClusterAuthenticationInfo: c.ExtraConfig.ClusterAuthenticationInfo,
 }
 //KubeAPIServer创建和Install的APIGroup需要调用两个方法,一个是installLegacyAPI,另一个是installAPI,原因在于k8s的apiGroup分了/api和/apis两种。

//初期的资源其实没有apiGroup这个概念,而后期引入了groupVersion为了兼容原有的才把旧的资源类型的URI地址都归属于/api这个路径下的,新的全部在/apis这个路径下,因此在创建注册APIGroup时都分了两类。
 // install legacy rest storage
 if err := m.InstallLegacyAPI(&c, c.GenericConfig.RESTOptionsGetter); err != nil {
  return nil, err
 }

 // The order here is preserved in discovery.
 // If resources with identical names exist in more than one of these groups (e.g. "deployments.apps"" and "deployments.extensions"),
 // the order of this list determines which group an unqualified resource name (e.g. "deployments") should prefer.
 // This priority order is used for local discovery, but it ends up aggregated in `k8s.io/kubernetes/cmd/kube-apiserver/app/aggregator.go
 // with specific priorities.
 // TODO: describe the priority all the way down in the RESTStorageProviders and plumb it back through the various discovery
 // handlers that we have.
 restStorageProviders := []RESTStorageProvider{
  apiserverinternalrest.StorageProvider{},
  authenticationrest.RESTStorageProvider{Authenticator: c.GenericConfig.Authentication.Authenticator, APIAudiences: c.GenericConfig.Authentication.APIAudiences},
  authorizationrest.RESTStorageProvider{Authorizer: c.GenericConfig.Authorization.Authorizer, RuleResolver: c.GenericConfig.RuleResolver},
  autoscalingrest.RESTStorageProvider{},
  batchrest.RESTStorageProvider{},
  certificatesrest.RESTStorageProvider{},
  coordinationrest.RESTStorageProvider{},
  discoveryrest.StorageProvider{},
  networkingrest.RESTStorageProvider{},
  noderest.RESTStorageProvider{},
  policyrest.RESTStorageProvider{},
  rbacrest.RESTStorageProvider{Authorizer: c.GenericConfig.Authorization.Authorizer},
  schedulingrest.RESTStorageProvider{},
  storagerest.RESTStorageProvider{},
  flowcontrolrest.RESTStorageProvider{InformerFactory: c.GenericConfig.SharedInformerFactory},
  // keep apps after extensions so legacy clients resolve the extensions versions of shared resource names.
  // See https://github.com/kubernetes/kubernetes/issues/42392
  appsrest.StorageProvider{},
  admissionregistrationrest.RESTStorageProvider{},
  eventsrest.RESTStorageProvider{TTL: c.ExtraConfig.EventTTL},
 }
 if err := m.InstallAPIs(c.ExtraConfig.APIResourceConfigSource, c.GenericConfig.RESTOptionsGetter, restStorageProviders...); err != nil {
  return nil, err
 }

 m.GenericAPIServer.AddPostStartHookOrDie("start-cluster-authentication-info-controller", func(hookContext genericapiserver.PostStartHookContext) error {
  kubeClient, err := kubernetes.NewForConfig(hookContext.LoopbackClientConfig)
  if err != nil {
   return err
  }
  controller := clusterauthenticationtrust.NewClusterAuthenticationTrustController(m.ClusterAuthenticationInfo, kubeClient)

  // generate a context  from stopCh. This is to avoid modifying files which are relying on apiserver
  // TODO: See if we can pass ctx to the current method
  ctx, cancel := context.WithCancel(context.Background())
  go func() {
   select {
   case <-hookContext.StopCh:
    cancel() // stopCh closed, so cancel our context
   case <-ctx.Done():
   }
  }()

  // prime values and start listeners
  if m.ClusterAuthenticationInfo.ClientCA != nil {
   m.ClusterAuthenticationInfo.ClientCA.AddListener(controller)
   if controller, ok := m.ClusterAuthenticationInfo.ClientCA.(dynamiccertificates.ControllerRunner); ok {
    // runonce to be sure that we have a value.
    if err := controller.RunOnce(ctx); err != nil {
     runtime.HandleError(err)
    }
    go controller.Run(ctx, 1)
   }
  }
  if m.ClusterAuthenticationInfo.RequestHeaderCA != nil {
   m.ClusterAuthenticationInfo.RequestHeaderCA.AddListener(controller)
   if controller, ok := m.ClusterAuthenticationInfo.RequestHeaderCA.(dynamiccertificates.ControllerRunner); ok {
    // runonce to be sure that we have a value.
    if err := controller.RunOnce(ctx); err != nil {
     runtime.HandleError(err)
    }
    go controller.Run(ctx, 1)
   }
  }

  go controller.Run(ctx, 1)
  return nil
 })

 if utilfeature.DefaultFeatureGate.Enabled(apiserverfeatures.APIServerIdentity) {
  m.GenericAPIServer.AddPostStartHookOrDie("start-kube-apiserver-identity-lease-controller", func(hookContext genericapiserver.PostStartHookContext) error {
   kubeClient, err := kubernetes.NewForConfig(hookContext.LoopbackClientConfig)
   if err != nil {
    return err
   }
   controller := lease.NewController(
    clock.RealClock{},
    kubeClient,
    m.GenericAPIServer.APIServerID,
    int32(c.ExtraConfig.IdentityLeaseDurationSeconds),
    nil,
    time.Duration(c.ExtraConfig.IdentityLeaseRenewIntervalSeconds)*time.Second,
    metav1.NamespaceSystem,
    labelAPIServerHeartbeat)
   go controller.Run(wait.NeverStop)
   return nil
  })
  m.GenericAPIServer.AddPostStartHookOrDie("start-kube-apiserver-identity-lease-garbage-collector", func(hookContext genericapiserver.PostStartHookContext) error {
   kubeClient, err := kubernetes.NewForConfig(hookContext.LoopbackClientConfig)
   if err != nil {
    return err
   }
   go apiserverleasegc.NewAPIServerLeaseGC(
    kubeClient,
    time.Duration(c.ExtraConfig.IdentityLeaseDurationSeconds)*time.Second,
    metav1.NamespaceSystem,
    KubeAPIServerIdentityLeaseLabelSelector,
   ).Run(wait.NeverStop)
   return nil
  })
 }

 return m, nil
}

5.1 InstallLegacyAPI

// InstallLegacyAPI will install the legacy APIs for the restStorageProviders if they are enabled.
func (m *Instance) InstallLegacyAPI(c *completedConfig, restOptionsGetter generic.RESTOptionsGetter) error {
 legacyRESTStorageProvider := corerest.LegacyRESTStorageProvider{
  StorageFactory:              c.ExtraConfig.StorageFactory,
  ProxyTransport:              c.ExtraConfig.ProxyTransport,
  KubeletClientConfig:         c.ExtraConfig.KubeletClientConfig,
  EventTTL:                    c.ExtraConfig.EventTTL,
  ServiceIPRange:              c.ExtraConfig.ServiceIPRange,
  SecondaryServiceIPRange:     c.ExtraConfig.SecondaryServiceIPRange,
  ServiceNodePortRange:        c.ExtraConfig.ServiceNodePortRange,
  LoopbackClientConfig:        c.GenericConfig.LoopbackClientConfig,
  ServiceAccountIssuer:        c.ExtraConfig.ServiceAccountIssuer,
  ExtendExpiration:            c.ExtraConfig.ExtendExpiration,
  ServiceAccountMaxExpiration: c.ExtraConfig.ServiceAccountMaxExpiration,
  APIAudiences:                c.GenericConfig.Authentication.APIAudiences,
 }
    //注册api下的资源
 legacyRESTStorage, apiGroupInfo, err := legacyRESTStorageProvider.NewLegacyRESTStorage(c.ExtraConfig.APIResourceConfigSource, restOptionsGetter)
 if err != nil {
  return fmt.Errorf("error building core storage: %v", err)
 }
 if len(apiGroupInfo.VersionedResourcesStorageMap) == 0 { // if all core storage is disabled, return.
  return nil
 }

 controllerName := "bootstrap-controller"
 coreClient := corev1client.NewForConfigOrDie(c.GenericConfig.LoopbackClientConfig)
 bootstrapController, err := c.NewBootstrapController(legacyRESTStorage, coreClient, coreClient, coreClient, coreClient.RESTClient())
 if err != nil {
  return fmt.Errorf("error creating bootstrap controller: %v", err)
 }
 m.GenericAPIServer.AddPostStartHookOrDie(controllerName, bootstrapController.PostStartHook)
 m.GenericAPIServer.AddPreShutdownHookOrDie(controllerName, bootstrapController.PreShutdownHook)
 //DefaultLegacyAPIPrefix就是api apiGroupInfo是上面创建好的 存储了api下的资源
 if err := m.GenericAPIServer.InstallLegacyAPIGroup(genericapiserver.DefaultLegacyAPIPrefix, &apiGroupInfo); err != nil {
  return fmt.Errorf("error in registering group versions: %v", err)
 }
 return nil
}

InstallLegacyAPIGroup后续的操作也是调用InstallREST,然后调用Install,将资源和http handler进行绑定。

5.2 InstallAPIs

// InstallAPIs will install the APIs for the restStorageProviders if they are enabled.
func (m *Instance) InstallAPIs(apiResourceConfigSource serverstorage.APIResourceConfigSource, restOptionsGetter generic.RESTOptionsGetter, restStorageProviders ...RESTStorageProvider) error {
 apiGroupsInfo := []*genericapiserver.APIGroupInfo{}

 // used later in the loop to filter the served resource by those that have expired.
 resourceExpirationEvaluator, err := genericapiserver.NewResourceExpirationEvaluator(*m.GenericAPIServer.Version)
 if err != nil {
  return err
 }

 for _, restStorageBuilder := range restStorageProviders {
  groupName := restStorageBuilder.GroupName()
  apiGroupInfo, err := restStorageBuilder.NewRESTStorage(apiResourceConfigSource, restOptionsGetter)
  if err != nil {
   return fmt.Errorf("problem initializing API group %q : %v", groupName, err)
  }
  if len(apiGroupInfo.VersionedResourcesStorageMap) == 0 {
   // If we have no storage for any resource configured, this API group is effectively disabled.
   // This can happen when an entire API group, version, or development-stage (alpha, beta, GA) is disabled.
   klog.Infof("API group %q is not enabled, skipping.", groupName)
   continue
  }

  // Remove resources that serving kinds that are removed.
  // We do this here so that we don't accidentally serve versions without resources or openapi information that for kinds we don't serve.
  // This is a spot above the construction of individual storage handlers so that no sig accidentally forgets to check.
  resourceExpirationEvaluator.RemoveDeletedKinds(groupName, apiGroupInfo.Scheme, apiGroupInfo.VersionedResourcesStorageMap)
  if len(apiGroupInfo.VersionedResourcesStorageMap) == 0 {
   klog.V(1).Infof("Removing API group %v because it is time to stop serving it because it has no versions per APILifecycle.", groupName)
   continue
  }

  klog.V(1).Infof("Enabling API group %q.", groupName)

  if postHookProvider, ok := restStorageBuilder.(genericapiserver.PostStartHookProvider); ok {
   name, hook, err := postHookProvider.PostStartHook()
   if err != nil {
    klog.Fatalf("Error building PostStartHook: %v", err)
   }
   m.GenericAPIServer.AddPostStartHookOrDie(name, hook)
  }

  apiGroupsInfo = append(apiGroupsInfo, &apiGroupInfo)
 }
 //注册apis下的资源 后续操作和InstallLegacyAPI一样
 if err := m.GenericAPIServer.InstallAPIGroups(apiGroupsInfo...); err != nil {
  return fmt.Errorf("error in registering group versions: %v", err)
 }
 return nil
}

6. AggregratorServer

用于处理聚合进来的api请求,实际是做七层转发,它的创建流程与APIExtensionServer的最为相似

  • 创建GenericAPIServer

  • 实例化Aggregrator

  • 实例化APIGroupInfo

  • InstallAPIGroup

AggregatorServer也与APIExtensionsServer类似,只不过group换成了apiregistration.k8s.io

7. 运行http server

api的路由绑定完毕,最后就是要把http server跑起来。

func (s preparedGenericAPIServer) NonBlockingRun(stopCh <-chan struct{}, shutdownTimeout time.Duration) (<-chan struct{}, <-chan struct{}, error) {
 // Use an stop channel to allow graceful shutdown without dropping audit events
 // after http server shutdown.
 auditStopCh := make(chan struct{})

 // Start the audit backend before any request comes in. This means we must call Backend.Run
 // before http server start serving. Otherwise the Backend.ProcessEvents call might block.
 if s.AuditBackend != nil {
  if err := s.AuditBackend.Run(auditStopCh); err != nil {
   return nil, nil, fmt.Errorf("failed to run the audit backend: %v", err)
  }
 }

 // Use an internal stop channel to allow cleanup of the listeners on error.
 internalStopCh := make(chan struct{})
 var stoppedCh <-chan struct{}
 var listenerStoppedCh <-chan struct{}
 if s.SecureServingInfo != nil && s.Handler != nil {
  var err error
  stoppedCh, listenerStoppedCh, err = s.SecureServingInfo.Serve(s.Handler, shutdownTimeout, internalStopCh)
  if err != nil {
   close(internalStopCh)
   close(auditStopCh)
   return nil, nil, err
  }
 }

 // Now that listener have bound successfully, it is the
 // responsibility of the caller to close the provided channel to
 // ensure cleanup.
 go func() {
  <-stopCh
  close(internalStopCh)
  if stoppedCh != nil {
   <-stoppedCh
  }
  s.HandlerChainWaitGroup.Wait()
  close(auditStopCh)
 }()

 s.RunPostStartHooks(stopCh)

 if _, err := systemd.SdNotify(true, "READY=1\n"); err != nil {
  klog.Errorf("Unable to send systemd daemon successful start message: %v\n", err)
 }

 return stoppedCh, listenerStoppedCh, nil
}
func (s *SecureServingInfo) Serve(handler http.Handler, shutdownTimeout time.Duration, stopCh <-chan struct{}) (<-chan struct{}, <-chan struct{}, error) {
 if s.Listener == nil {
  return nil, nil, fmt.Errorf("listener must not be nil")
 }

 tlsConfig, err := s.tlsConfig(stopCh)
 if err != nil {
  return nil, nil, err
 }

 secureServer := &http.Server{
  Addr:           s.Listener.Addr().String(),
  Handler:        handler,
  MaxHeaderBytes: 1 << 20,
  TLSConfig:      tlsConfig,

  IdleTimeout:       90 * time.Second, // matches http.DefaultTransport keep-alive timeout
  ReadHeaderTimeout: 32 * time.Second, // just shy of requestTimeoutUpperBound
 }

 // At least 99% of serialized resources in surveyed clusters were smaller than 256kb.
 // This should be big enough to accommodate most API POST requests in a single frame,
 // and small enough to allow a per connection buffer of this size multiplied by `MaxConcurrentStreams`.
 const resourceBody99Percentile = 256 * 1024

 http2Options := &http2.Server{
  IdleTimeout: 90 * time.Second, // matches http.DefaultTransport keep-alive timeout
 }

 // shrink the per-stream buffer and max framesize from the 1MB default while still accommodating most API POST requests in a single frame
 http2Options.MaxUploadBufferPerStream = resourceBody99Percentile
 http2Options.MaxReadFrameSize = resourceBody99Percentile

 // use the overridden concurrent streams setting or make the default of 250 explicit so we can size MaxUploadBufferPerConnection appropriately
 if s.HTTP2MaxStreamsPerConnection > 0 {
  http2Options.MaxConcurrentStreams = uint32(s.HTTP2MaxStreamsPerConnection)
 } else {
  http2Options.MaxConcurrentStreams = 250
 }

 // increase the connection buffer size from the 1MB default to handle the specified number of concurrent streams
 http2Options.MaxUploadBufferPerConnection = http2Options.MaxUploadBufferPerStream * int32(http2Options.MaxConcurrentStreams)

 if !s.DisableHTTP2 {
  // apply settings to the server
  if err := http2.ConfigureServer(secureServer, http2Options); err != nil {
   return nil, nil, fmt.Errorf("error configuring http2: %v", err)
  }
 }

 // use tlsHandshakeErrorWriter to handle messages of tls handshake error
 tlsErrorWriter := &tlsHandshakeErrorWriter{os.Stderr}
 tlsErrorLogger := log.New(tlsErrorWriter, "", 0)
 secureServer.ErrorLog = tlsErrorLogger

 klog.Infof("Serving securely on %s", secureServer.Addr)
 return RunServer(secureServer, s.Listener, shutdownTimeout, stopCh)
}


k8s源码解析(3)--apiserver-启动分析
http://47.123.5.226:8090//archives/k8syuan-ma-jie-xi-3---gou-jian-fang-shi
作者
pony
发布于
2024年05月09日
许可协议