k8s源码解析(4)--apiserver请求处理
apiserver请求处理
当apiserver启动后,就可以接受客户端的请求了。
认证:客户端是否合法
鉴权:客户端是否具备当前请求资源的权限
准入控制器:提供回调钩子,资源持久化前对资源的值做改动或者验证等操作
持久化:持久化到ETCD
1. 认证
请求到达apiserver后第一个是需要进行认证,辨别请求来源的身份。
前面讲过在buildGenericConfig
调用了 genericapiserver.NewConfig
。
NewConfig创建Config结构时给BuildHandlerChainFunc
字段传入DefaultBuildHandlerChain
这个函数
func DefaultBuildHandlerChain(apiHandler http.Handler, c *Config) http.Handler {
...
//这就是构建认证处理器的
handler = genericapifilters.WithAuthentication(handler, c.Authentication.Authenticator, failedHandler, c.Authentication.APIAudiences)
...
}
vendor/k8s.io/apiserver/pkg/endpoints/filters/authentication.go
func withAuthentication(handler http.Handler, auth authenticator.Request, failed http.Handler, apiAuds authenticator.Audiences, metrics recordMetrics) http.Handler {
if auth == nil {
klog.Warning("Authentication is disabled")
return handler
}
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
authenticationStart := time.Now()
if len(apiAuds) > 0 {
req = req.WithContext(authenticator.WithAudiences(req.Context(), apiAuds))
}
//执行认证逻辑的地方,如果认证失败则会返回返回失败。认证成功会把请求头中Authorization去掉,再调用里层的handler函数handler.ServeHTTP(w, req)
resp, ok, err := auth.AuthenticateRequest(req)
authenticationFinish := time.Now()
defer func() {
metrics(req.Context(), resp, ok, err, apiAuds, authenticationStart, authenticationFinish)
}()
if err != nil || !ok {
if err != nil {
klog.ErrorS(err, "Unable to authenticate the request")
}
failed.ServeHTTP(w, req)
return
}
if !audiencesAreAcceptable(apiAuds, resp.Audiences) {
err = fmt.Errorf("unable to match the audience: %v , accepted: %v", resp.Audiences, apiAuds)
klog.Error(err)
failed.ServeHTTP(w, req)
return
}
// authorization header is not required anymore in case of a successful authentication.
req.Header.Del("Authorization")
req = req.WithContext(genericapirequest.WithUser(req.Context(), resp.User))
handler.ServeHTTP(w, req)
})
}
1.1 认证接口
type Request interface {
//该方法接收客户端请求。若验证失败,bool 返回 false,验证成功,bool 返回 true,Response 中携带身份验证用户的信息,例如 Name、UID、Groups、Extra。
AuthenticateRequest(req *http.Request) (*Response, bool, error)
}
1.2 认证实现
// AuthenticateRequest authenticates the request using a chain of authenticator.Request objects.
func (authHandler *unionAuthRequestHandler) AuthenticateRequest(req *http.Request) (*authenticator.Response, bool, error) {
var errlist []error
//遍历所有启用的认证方式,只有一个成功了就可以了
for _, currAuthRequestHandler := range authHandler.Handlers {
resp, ok, err := currAuthRequestHandler.AuthenticateRequest(req)
if err != nil {
if authHandler.FailOnError {
return resp, ok, err
}
errlist = append(errlist, err)
continue
}
if ok {
return resp, ok, err
}
}
return nil, false, utilerrors.NewAggregate(errlist)
}
各种认证方式文档:https://kubernetes.io/zh-cn/docs/reference/access-authn-authz/authentication/
2. 鉴权
授权和认证的方式类似
buildGenericConfig
中通过以下代码完成Authorizer的创建
genericConfig.Authorization.Authorizer, genericConfig.RuleResolver, err = BuildAuthorizer(s, genericConfig.EgressSelector, versionedInformers)
if err != nil {
lastErr = fmt.Errorf("invalid authorization config: %v", err)
return
}
func (config Config) New() (authorizer.Authorizer, authorizer.RuleResolver, error) {
if len(config.AuthorizationModes) == 0 {
return nil, nil, fmt.Errorf("at least one authorization mode must be passed")
}
var (
authorizers []authorizer.Authorizer
ruleResolvers []authorizer.RuleResolver
)
for _, authorizationMode := range config.AuthorizationModes {
// Keep cases in sync with constant list in k8s.io/kubernetes/pkg/kubeapiserver/authorizer/modes/modes.go.
switch authorizationMode {
case modes.ModeNode:
node.RegisterMetrics()
graph := node.NewGraph()
node.AddGraphEventHandlers(
graph,
config.VersionedInformerFactory.Core().V1().Nodes(),
config.VersionedInformerFactory.Core().V1().Pods(),
config.VersionedInformerFactory.Core().V1().PersistentVolumes(),
config.VersionedInformerFactory.Storage().V1().VolumeAttachments(),
)
nodeAuthorizer := node.NewAuthorizer(graph, nodeidentifier.NewDefaultNodeIdentifier(), bootstrappolicy.NodeRules())
authorizers = append(authorizers, nodeAuthorizer)
ruleResolvers = append(ruleResolvers, nodeAuthorizer)
case modes.ModeAlwaysAllow:
alwaysAllowAuthorizer := authorizerfactory.NewAlwaysAllowAuthorizer()
authorizers = append(authorizers, alwaysAllowAuthorizer)
ruleResolvers = append(ruleResolvers, alwaysAllowAuthorizer)
case modes.ModeAlwaysDeny:
alwaysDenyAuthorizer := authorizerfactory.NewAlwaysDenyAuthorizer()
authorizers = append(authorizers, alwaysDenyAuthorizer)
ruleResolvers = append(ruleResolvers, alwaysDenyAuthorizer)
case modes.ModeABAC:
abacAuthorizer, err := abac.NewFromFile(config.PolicyFile)
if err != nil {
return nil, nil, err
}
authorizers = append(authorizers, abacAuthorizer)
ruleResolvers = append(ruleResolvers, abacAuthorizer)
case modes.ModeWebhook:
if config.WebhookRetryBackoff == nil {
return nil, nil, errors.New("retry backoff parameters for authorization webhook has not been specified")
}
clientConfig, err := webhookutil.LoadKubeconfig(config.WebhookConfigFile, config.CustomDial)
if err != nil {
return nil, nil, err
}
webhookAuthorizer, err := webhook.New(clientConfig,
config.WebhookVersion,
config.WebhookCacheAuthorizedTTL,
config.WebhookCacheUnauthorizedTTL,
*config.WebhookRetryBackoff,
)
if err != nil {
return nil, nil, err
}
authorizers = append(authorizers, webhookAuthorizer)
ruleResolvers = append(ruleResolvers, webhookAuthorizer)
case modes.ModeRBAC:
rbacAuthorizer := rbac.New(
&rbac.RoleGetter{Lister: config.VersionedInformerFactory.Rbac().V1().Roles().Lister()},
&rbac.RoleBindingLister{Lister: config.VersionedInformerFactory.Rbac().V1().RoleBindings().Lister()},
&rbac.ClusterRoleGetter{Lister: config.VersionedInformerFactory.Rbac().V1().ClusterRoles().Lister()},
&rbac.ClusterRoleBindingLister{Lister: config.VersionedInformerFactory.Rbac().V1().ClusterRoleBindings().Lister()},
)
authorizers = append(authorizers, rbacAuthorizer)
ruleResolvers = append(ruleResolvers, rbacAuthorizer)
default:
return nil, nil, fmt.Errorf("unknown authorization mode %s specified", authorizationMode)
}
}
return union.New(authorizers...), union.NewRuleResolvers(ruleResolvers...), nil
}
2.1 鉴权小知识
kube-apiserver 目前提供了 6 种鉴权机制:
AlwaysAllow:总是允许
AlwaysDeny:总是拒绝
ABAC:基于属性的访问控制(Attribute-Based Access Control)
Node:节点鉴权,专门鉴权给 kubelet 发出的 API 请求
RBAC:基于角色的访问控制(Role-Based Access Control)
Webhook:基于 webhook 的一种 HTTP 回调机制,可以进行远程鉴权管理
鉴权中有三个概念:
Decision:决策状态
Authorizer:鉴权接口
RuleResolver:规则解析器
2.1.1 Decision:决策状态
Decision 决策状态类似于认证中的 true 和 false,用于决定是否鉴权成功。 鉴权支持三种 Decision 决策状态,例如鉴权成功,则返回 DecisionAllow,代码如下:
//staging/src/k8s.io/apiserver/pkg/authorization/authorizer/interfaces.go
type Decision int
const (
DecisionDeny Decision = iota
DecisionAllow
DecisionNoOpinion
)
DecisionDeny:拒绝该操作
DecisionAllow:允许该操作
DecisionNoOpinion:表示无明显意见允许或拒绝,继续执行下一个鉴权模块
2.1.2 Authorizer:鉴权接口
每个鉴权模块都要实现该接口的方法,代码如下:
//vendor/k8s.io/apiserver/pkg/authorization/authorizer/interfaces.go
type Authorizer interface {
Authorize(ctx context.Context, a Attributes) (authorized Decision, reason string, err error)
}
Authorizer 接口定义了 Authorize 方法,该方法接收一个 Attribute 参数。
Attributes 是决定鉴权模块从 HTTP 请求中获取鉴权信息方法的参数,它是一个方法集合的接口, 例如 GetUser、GetVerb、GetNamespace、GetResource 等鉴权信息方法。 如果鉴权成功,Decision 状态变成 DecisionAllow, 如果鉴权失败,Decision 状态变成 DecisionDeny,并返回失败的原因。
type Attributes interface {
// GetUser returns the user.Info object to authorize
GetUser() user.Info
// GetVerb returns the kube verb associated with API requests (this includes get, list, watch, create, update, patch, delete, deletecollection, and proxy),
// or the lowercased HTTP verb associated with non-API requests (this includes get, put, post, patch, and delete)
GetVerb() string
// When IsReadOnly() == true, the request has no side effects, other than
// caching, logging, and other incidentals.
IsReadOnly() bool
// The namespace of the object, if a request is for a REST object.
GetNamespace() string
// The kind of object, if a request is for a REST object.
GetResource() string
// GetSubresource returns the subresource being requested, if present
GetSubresource() string
// GetName returns the name of the object as parsed off the request. This will not be present for all request types, but
// will be present for: get, update, delete
GetName() string
// The group of the resource, if a request is for a REST object.
GetAPIGroup() string
// GetAPIVersion returns the version of the group requested, if a request is for a REST object.
GetAPIVersion() string
// IsResourceRequest returns true for requests to API resources, like /api/v1/nodes,
// and false for non-resource endpoints like /api, /healthz
IsResourceRequest() bool
// GetPath returns the path of the request
GetPath() string
}
2.1.3 RuleResolver:规则解析器
鉴权模块通过 RuleResolver 解析规则,定义如下:
//staging/src/k8s.io/apiserver/pkg/authorization/authorizer/interfaces.go
type RuleResolver interface {
// RulesFor get the list of cluster wide rules, the list of rules in the specific namespace, incomplete status and errors.
RulesFor(user user.Info, namespace string) ([]ResourceRuleInfo, []NonResourceRuleInfo, bool, error)
}
RuleResolver 接口定义的 RulesFor 方法,所有鉴权模块都要实现。 RulesFor 方法接受 user 用户信息和 namespace 命名空间参数,解析出规则列表并返回。
规则列表分成两种:
ResourceRuleInfo:资源类型的规则列表,例如 /api/v1/pods 的资源接口
NonResourceRuleInfo:非资源类型的规则列表,例如 /healthz 的非资源接口
以 ResourceRuleInfo 资源类型为例,定义如下:
//staging/src/k8s.io/apiserver/pkg/authorization/authorizer/rule.go
type DefaultResourceRuleInfo struct {
Verbs []string
APIGroups []string
Resources []string
ResourceNames []string
}
比如:
resourcesRules: []authorizer.ResourceRuleInfo {
&authorizer.DefaultResourceRuleInfo {
// 通配符(*)表示匹配所有,该规则表示用户对所有资源版本的 Pod 拥有所有操作权限, 即:get、list、watch、create、update、patch、delete、deletecollection
Verbs: []string{"*"}
APIGroups: []string{"*"}
Resources: []string{"pods"}
}
}
2.2 鉴权分析
func DefaultBuildHandlerChain(apiHandler http.Handler, c *Config) http.Handler {
...
//这就是构建鉴权处理器的
handler = genericapifilters.WithAuthorization(handler, c.Authorization.Authorizer, c.Serializer)
...
}
// WithAuthorizationCheck passes all authorized requests on to handler, and returns a forbidden error otherwise.
func WithAuthorization(handler http.Handler, a authorizer.Authorizer, s runtime.NegotiatedSerializer) http.Handler {
if a == nil {
klog.Warning("Authorization is disabled")
return handler
}
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
ctx := req.Context()
attributes, err := GetAuthorizerAttributes(ctx)
if err != nil {
responsewriters.InternalError(w, req, err)
return
}
//鉴权
authorized, reason, err := a.Authorize(ctx, attributes)
// an authorizer like RBAC could encounter evaluation errors and still allow the request, so authorizer decision is checked before error here.
if authorized == authorizer.DecisionAllow {
audit.AddAuditAnnotations(ctx,
decisionAnnotationKey, decisionAllow,
reasonAnnotationKey, reason)
handler.ServeHTTP(w, req)
return
}
if err != nil {
audit.AddAuditAnnotation(ctx, reasonAnnotationKey, reasonError)
responsewriters.InternalError(w, req, err)
return
}
klog.V(4).InfoS("Forbidden", "URI", req.RequestURI, "Reason", reason)
audit.AddAuditAnnotations(ctx,
decisionAnnotationKey, decisionForbid,
reasonAnnotationKey, reason)
responsewriters.Forbidden(ctx, attributes, w, req, reason, s)
})
}
// Authorizes against a chain of authorizer.Authorizer objects and returns nil if successful and returns error if unsuccessful
func (authzHandler unionAuthzHandler) Authorize(ctx context.Context, a authorizer.Attributes) (authorizer.Decision, string, error) {
var (
errlist []error
reasonlist []string
)
//同样的道理,如果kube-apiserver 启用了多种鉴权方式,则会遍历所有鉴权模块,进行鉴权
for _, currAuthzHandler := range authzHandler {
decision, reason, err := currAuthzHandler.Authorize(ctx, a)
if err != nil {
errlist = append(errlist, err)
}
if len(reason) != 0 {
reasonlist = append(reasonlist, reason)
}
switch decision {
case authorizer.DecisionAllow, authorizer.DecisionDeny:
return decision, reason, err
//这里我们可以看出 上一个鉴权失败会继续走下一个
case authorizer.DecisionNoOpinion:
// continue to the next authorizer
}
}
return authorizer.DecisionNoOpinion, strings.Join(reasonlist, "\n"), utilerrors.NewAggregate(errlist)
}
比如:
--authorization-mode=ABAC
:启用 ABAC 鉴权--authorization-mode=RBAC
: 启用RBAC鉴权--authorization-mode = Node
: 启用 Node 鉴权--authorization-mode=Webhook
:启用 webhook 鉴权
具体的鉴权详情,通过官方文档了解即可: https://kubernetes.io/zh-cn/docs/reference/access-authn-authz/authorization/
3. 准入控制器
准入控制器是在对象持久化之前用于对 Kubernetes API Server 的请求进行拦截的代码段。
在 Kubernetes apiserver 中包含两个特殊的准入控制器:MutatingAdmissionWebhook和 ValidatingAdmissionWebhook。
Mutate,可修改提交上来的资源(Mutate里面也可以包含验证操作)
Validate, 是对提交上来的资源进行验证
在buildGenericConfig
通过以下代码创建:
func buildGenericConfig(
s *options.ServerRunOptions,
proxyTransport *http.Transport,
){
...
err = s.Admission.ApplyTo(
genericConfig,
versionedInformers,
kubeClientConfig,
utilfeature.DefaultFeatureGate,
pluginInitializers...)
if err != nil {
lastErr = fmt.Errorf("failed to initialize admission: %v", err)
return
}
...
}
最终调用:
func (a *AdmissionOptions) ApplyTo(
c *server.Config,
informers informers.SharedInformerFactory,
kubeAPIServerClientConfig *rest.Config,
features featuregate.FeatureGate,
pluginInitializers ...admission.PluginInitializer,
) error {
if a == nil {
return nil
}
// Admission depends on CoreAPI to set SharedInformerFactory and ClientConfig.
if informers == nil {
return fmt.Errorf("admission depends on a Kubernetes core API shared informer, it cannot be nil")
}
pluginNames := a.enabledPluginNames()
//获取各个准入控制器的provider
pluginsConfigProvider, err := admission.ReadAdmissionConfiguration(pluginNames, a.ConfigFile, configScheme)
if err != nil {
return fmt.Errorf("failed to read plugin config: %v", err)
}
clientset, err := kubernetes.NewForConfig(kubeAPIServerClientConfig)
if err != nil {
return err
}
genericInitializer := initializer.New(clientset, informers, c.Authorization.Authorizer, features)
initializersChain := admission.PluginInitializers{}
pluginInitializers = append(pluginInitializers, genericInitializer)
initializersChain = append(initializersChain, pluginInitializers...)
//将准入控制器集合串成一个admissionChain,类似于之前处理认证与授权一样的方式
admissionChain, err := a.Plugins.NewFromPlugins(pluginNames, pluginsConfigProvider, initializersChain, a.Decorators)
if err != nil {
return err
}
//可统计指标的wrapper
c.AdmissionControl = admissionmetrics.WithStepMetrics(admissionChain)
return nil
}
func (ps *Plugins) NewFromPlugins(pluginNames []string, configProvider ConfigProvider, pluginInitializer PluginInitializer, decorator Decorator) (Interface, error) {
handlers := []Interface{}
mutationPlugins := []string{}
validationPlugins := []string{}
for _, pluginName := range pluginNames {
pluginConfig, err := configProvider.ConfigFor(pluginName)
if err != nil {
return nil, err
}
//初始化插件
plugin, err := ps.InitPlugin(pluginName, pluginConfig, pluginInitializer)
if err != nil {
return nil, err
}
if plugin != nil {
if decorator != nil {
handlers = append(handlers, decorator.Decorate(plugin, pluginName))
} else {
handlers = append(handlers, plugin)
}
if _, ok := plugin.(MutationInterface); ok {
mutationPlugins = append(mutationPlugins, pluginName)
}
if _, ok := plugin.(ValidationInterface); ok {
validationPlugins = append(validationPlugins, pluginName)
}
}
}
if len(mutationPlugins) != 0 {
klog.Infof("Loaded %d mutating admission controller(s) successfully in the following order: %s.", len(mutationPlugins), strings.Join(mutationPlugins, ","))
}
if len(validationPlugins) != 0 {
klog.Infof("Loaded %d validating admission controller(s) successfully in the following order: %s.", len(validationPlugins), strings.Join(validationPlugins, ","))
}
return newReinvocationHandler(chainAdmissionHandler(handlers)), nil
}
// InitPlugin creates an instance of the named interface.
func (ps *Plugins) InitPlugin(name string, config io.Reader, pluginInitializer PluginInitializer) (Interface, error) {
if name == "" {
klog.Info("No admission plugin specified.")
return nil, nil
}
//获取插件
plugin, found, err := ps.getPlugin(name, config)
if err != nil {
return nil, fmt.Errorf("couldn't init admission plugin %q: %v", name, err)
}
if !found {
return nil, fmt.Errorf("unknown admission plugin: %s", name)
}
pluginInitializer.Initialize(plugin)
// ensure that plugins have been properly initialized
if err := ValidateInitialization(plugin); err != nil {
return nil, fmt.Errorf("failed to initialize admission plugin %q: %v", name, err)
}
return plugin, nil
}
func (ps *Plugins) getPlugin(name string, config io.Reader) (Interface, bool, error) {
ps.lock.Lock()
defer ps.lock.Unlock()
//获取插件
f, found := ps.registry[name]
if !found {
return nil, false, nil
}
config1, config2, err := splitStream(config)
if err != nil {
return nil, true, err
}
if !PluginEnabledFn(name, config1) {
return nil, true, nil
}
ret, err := f(config2)
return ret, true, err
}
3.1 初始化
ps.registry
这个是在哪初始化的呢?
//cmd/kube-apiserver/app/server.go
// NewAPIServerCommand creates a *cobra.Command object with default parameters
func NewAPIServerCommand() *cobra.Command {
s := options.NewServerRunOptions()
...
}
// NewServerRunOptions creates a new ServerRunOptions object with default parameters
func NewServerRunOptions() *ServerRunOptions {
s := ServerRunOptions{
GenericServerRunOptions: genericoptions.NewServerRunOptions(),
Etcd: genericoptions.NewEtcdOptions(storagebackend.NewDefaultConfig(kubeoptions.DefaultEtcdPathPrefix, nil)),
SecureServing: kubeoptions.NewSecureServingOptions(),
Audit: genericoptions.NewAuditOptions(),
Features: genericoptions.NewFeatureOptions(),
//new Admission options
Admission: kubeoptions.NewAdmissionOptions(),
}
...
}
func NewAdmissionOptions() *AdmissionOptions {
options := genericoptions.NewAdmissionOptions()
// register all admission plugins
RegisterAllAdmissionPlugins(options.Plugins)
}
func RegisterAllAdmissionPlugins(plugins *admission.Plugins) {
//注册了很多plugin
}
//准入机制的定义 所有的plugin
var AllOrderedPlugins = []string{
admit.PluginName, // AlwaysAdmit
autoprovision.PluginName, // NamespaceAutoProvision
lifecycle.PluginName, // NamespaceLifecycle
exists.PluginName, // NamespaceExists
scdeny.PluginName, // SecurityContextDeny
antiaffinity.PluginName, // LimitPodHardAntiAffinityTopology
limitranger.PluginName, // LimitRanger
serviceaccount.PluginName, // ServiceAccount
noderestriction.PluginName, // NodeRestriction
nodetaint.PluginName, // TaintNodesByCondition
alwayspullimages.PluginName, // AlwaysPullImages
imagepolicy.PluginName, // ImagePolicyWebhook
podsecurity.PluginName, // PodSecurity - before PodSecurityPolicy so audit/warn get exercised even if PodSecurityPolicy denies
podsecuritypolicy.PluginName, // PodSecurityPolicy
podnodeselector.PluginName, // PodNodeSelector
podpriority.PluginName, // Priority
defaulttolerationseconds.PluginName, // DefaultTolerationSeconds
podtolerationrestriction.PluginName, // PodTolerationRestriction
eventratelimit.PluginName, // EventRateLimit
extendedresourcetoleration.PluginName, // ExtendedResourceToleration
label.PluginName, // PersistentVolumeLabel
setdefault.PluginName, // DefaultStorageClass
storageobjectinuseprotection.PluginName, // StorageObjectInUseProtection
gc.PluginName, // OwnerReferencesPermissionEnforcement
resize.PluginName, // PersistentVolumeClaimResize
runtimeclass.PluginName, // RuntimeClass
certapproval.PluginName, // CertificateApproval
certsigning.PluginName, // CertificateSigning
certsubjectrestriction.PluginName, // CertificateSubjectRestriction
defaultingressclass.PluginName, // DefaultIngressClass
denyserviceexternalips.PluginName, // DenyServiceExternalIPs
// new admission plugins should generally be inserted above here
// webhook, resourcequota, and deny plugins must go at the end
mutatingwebhook.PluginName, // MutatingAdmissionWebhook
validatingwebhook.PluginName, // ValidatingAdmissionWebhook
resourcequota.PluginName, // ResourceQuota
deny.PluginName, // AlwaysDeny
}
4. handler
前面我们讲过代码最终是通过installer.Install
将handler和web服务进行绑定,最终实现访问rest api可以请求到对应的hander。
在其中有个registerResourceHandlers
方法,通过此方法来进行的注册。
//这个方法比较长
func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storage, ws *restful.WebService) (*metav1.APIResource, *storageversion.ResourceInfo, error) {
...
}
这个方法有三个入参
代表URL的path
资源存储相关的类storage
用于存放路由的go-rest对象webservice
以pod举例:
//pod是命名空间级别的资源
case "POST": // Create a resource.
var handler restful.RouteFunction
if isNamedCreater {
handler = restfulCreateNamedResource(namedCreater, reqScope, admit)
} else {
handler = restfulCreateResource(creater, reqScope, admit)
}
func restfulCreateNamedResource(r rest.NamedCreater, scope handlers.RequestScope, admit admission.Interface) restful.RouteFunction {
return func(req *restful.Request, res *restful.Response) {
handlers.CreateNamedResource(r, &scope, admit)(res.ResponseWriter, req.Request)
}
}
// CreateNamedResource returns a function that will handle a resource creation with name.
func CreateNamedResource(r rest.NamedCreater, scope *RequestScope, admission admission.Interface) http.HandlerFunc {
return createHandler(r, scope, admission, true)
}
func createHandler(r rest.NamedCreater, scope *RequestScope, admit admission.Interface, includeName bool) http.HandlerFunc {
return func(w http.ResponseWriter, req *http.Request) {
//从请求中获取资源的namespace,name,GV等信息
namespace, name, err := scope.Namer.Name(req)
gv := scope.Kind.GroupVersion()
//获取反序列化器,
decodeSerializer := s.Serializer
decoder := scope.Serializer.DecoderToVersion(decodeSerializer, scope.HubGroupVersion)
//将body的数据反序列化为runtime.Object
obj, gvk, err := decoder.Decode(body, &defaultGVK, original)
//调用storage的create,同时传入Validate准入控制器,准备持久化到Etcd
requestFunc := func() (runtime.Object, error) {
return r.Create(
ctx,
name,
obj,
rest.AdmissionToValidateObjectFunc(admit, admissionAttributes, scope),
options,
)
}
result, err := finisher.FinishRequest(ctx, func() (runtime.Object, error) {
if scope.FieldManager != nil {
liveObj, err := scope.Creater.New(scope.Kind)
if err != nil {
return nil, fmt.Errorf("failed to create new object (Create for %v): %v", scope.Kind, err)
}
obj = scope.FieldManager.UpdateNoErrors(liveObj, obj, managerOrUserAgent(options.FieldManager, req.UserAgent()))
admit = fieldmanager.NewManagedFieldsValidatingAdmissionController(admit)
}
//执行mutating准入控制器
if mutatingAdmission, ok := admit.(admission.MutationInterface); ok && mutatingAdmission.Handles(admission.Create) {
if err := mutatingAdmission.Admit(ctx, admissionAttributes, scope); err != nil {
return nil, err
}
}
// Dedup owner references again after mutating admission happens
dedupOwnerReferencesAndAddWarning(obj, req.Context(), true)
result, err := requestFunc()
// If the object wasn't committed to storage because it's serialized size was too large,
// it is safe to remove managedFields (which can be large) and try again.
if isTooLargeError(err) {
if accessor, accessorErr := meta.Accessor(obj); accessorErr == nil {
accessor.SetManagedFields(nil)
result, err = requestFunc()
}
}
return result, err
})
}
}
5. 持久化到ETCD
通过r.Create 我们最终能找到这个接口:
type Creater interface {
// New returns an empty object that can be used with Create after request data has been put into it.
// This object must be a pointer type for use with Codec.DecodeInto([]byte, runtime.Object)
New() runtime.Object
// Create creates a new version of a resource.
Create(ctx context.Context, obj runtime.Object, createValidation ValidateObjectFunc, options *metav1.CreateOptions) (runtime.Object, error)
}
看其实现,我们可以找到比如pod的存储实现:
//pkg/registry/core/pod/storage/storage.go
// REST implements a RESTStorage for pods
type REST struct {
*genericregistry.Store
proxyTransport http.RoundTripper
}
func (e *Store) Create(ctx context.Context, obj runtime.Object, createValidation rest.ValidateObjectFunc, options *metav1.CreateOptions) (runtime.Object, error) {
var finishCreate FinishFunc = finishNothing
if e.BeginCreate != nil {
fn, err := e.BeginCreate(ctx, obj, options)
if err != nil {
return nil, err
}
finishCreate = fn
defer func() {
finishCreate(ctx, false)
}()
}
if err := rest.BeforeCreate(e.CreateStrategy, ctx, obj); err != nil {
return nil, err
}
// at this point we have a fully formed object. It is time to call the validators that the apiserver
// handling chain wants to enforce.
if createValidation != nil {
//调用了validate准入控制器验证资源
if err := createValidation(ctx, obj.DeepCopyObject()); err != nil {
return nil, err
}
}
////生成name,key等信息用于后续持久化到Etcd
name, err := e.ObjectNameFunc(obj)
if err != nil {
return nil, err
}
key, err := e.KeyFunc(ctx, name)
if err != nil {
return nil, err
}
qualifiedResource := e.qualifiedResourceFromContext(ctx)
ttl, err := e.calculateTTL(obj, 0, false)
if err != nil {
return nil, err
}
//创建一个新的空的资源用于成功时返回结果
out := e.NewFunc()
////调用storage的Create,准备持久化到Etcd
//如果持久化成功,out里面就会填上持久化后的所有信息到里面
if err := e.Storage.Create(ctx, key, obj, out, ttl, dryrun.IsDryRun(options.DryRun)); err != nil {
err = storeerr.InterpretCreateError(err, qualifiedResource, name)
err = rest.CheckGeneratedNameError(ctx, e.CreateStrategy, err, obj)
if !apierrors.IsAlreadyExists(err) {
return nil, err
}
if errGet := e.Storage.Get(ctx, key, storage.GetOptions{}, out); errGet != nil {
return nil, err
}
accessor, errGetAcc := meta.Accessor(out)
if errGetAcc != nil {
return nil, err
}
if accessor.GetDeletionTimestamp() != nil {
msg := &err.(*apierrors.StatusError).ErrStatus.Message
*msg = fmt.Sprintf("object is being deleted: %s", *msg)
}
return nil, err
}
// The operation has succeeded. Call the finish function if there is one,
// and then make sure the defer doesn't call it again.
fn := finishCreate
finishCreate = finishNothing
fn(ctx, true)
if e.AfterCreate != nil {
e.AfterCreate(out, options)
}
if e.Decorator != nil {
e.Decorator(out)
}
return out, nil
}
// Create implements storage.Interface.Create.
func (s *store) Create(ctx context.Context, key string, obj, out runtime.Object, ttl uint64) error {
preparedKey, err := s.prepareKey(key)
if err != nil {
return err
}
if version, err := s.versioner.ObjectResourceVersion(obj); err == nil && version != 0 {
return errors.New("resourceVersion should not be set on objects to be created")
}
if err := s.versioner.PrepareObjectForStorage(obj); err != nil {
return fmt.Errorf("PrepareObjectForStorage failed: %v", err)
}
data, err := runtime.Encode(s.codec, obj)
if err != nil {
return err
}
opts, err := s.ttlOpts(ctx, int64(ttl))
if err != nil {
return err
}
//将资源转换成适合存储的格式
newData, err := s.transformer.TransformToStorage(ctx, data, authenticatedDataString(preparedKey))
if err != nil {
return storage.NewInternalError(err.Error())
}
startTime := time.Now()
//检查资源是否已经存在了
txnResp, err := s.client.KV.Txn(ctx).If(
notFound(preparedKey),
).Then(
//不存在才调用Put把资源存进去
clientv3.OpPut(preparedKey, string(newData), opts...),
).Commit()
metrics.RecordEtcdRequestLatency("create", getTypeName(obj), startTime)
if err != nil {
return err
}
if !txnResp.Succeeded {
return storage.NewKeyExistsError(preparedKey, 0)
}
if out != nil {
//转换响应结果
putResp := txnResp.Responses[0].GetResponsePut()
return decode(s.codec, s.versioner, data, out, putResp.Header.Revision)
}
return nil
}