Skip to content

Commit

Permalink
Merge pull request kubernetes#92786 from answer1991/feature/enhance-b…
Browse files Browse the repository at this point in the history
…ootstrap-certificate

make Kubelet bootstrap certificate signal aware
  • Loading branch information
k8s-ci-robot authored Jul 6, 2020
2 parents 205d5c5 + db71941 commit a26e588
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 25 deletions.
22 changes: 11 additions & 11 deletions cmd/kubelet/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,12 +256,12 @@ HTTP server: The kubelet can also listen for HTTP and respond to a simple API
// add the kubelet config controller to kubeletDeps
kubeletDeps.KubeletConfigController = kubeletConfigController

// set up stopCh here in order to be reused by kubelet and docker shim
stopCh := genericapiserver.SetupSignalHandler()
// set up signal context here in order to be reused by kubelet and docker shim
ctx := genericapiserver.SetupSignalContext()

// run the kubelet
klog.V(5).Infof("KubeletConfiguration: %#v", kubeletServer.KubeletConfiguration)
if err := Run(kubeletServer, kubeletDeps, utilfeature.DefaultFeatureGate, stopCh); err != nil {
if err := Run(ctx, kubeletServer, kubeletDeps, utilfeature.DefaultFeatureGate); err != nil {
klog.Fatal(err)
}
},
Expand Down Expand Up @@ -403,7 +403,7 @@ func UnsecuredDependencies(s *options.KubeletServer, featureGate featuregate.Fea
// The kubeDeps argument may be nil - if so, it is initialized from the settings on KubeletServer.
// Otherwise, the caller is assumed to have set up the Dependencies object and a default one will
// not be generated.
func Run(s *options.KubeletServer, kubeDeps *kubelet.Dependencies, featureGate featuregate.FeatureGate, stopCh <-chan struct{}) error {
func Run(ctx context.Context, s *options.KubeletServer, kubeDeps *kubelet.Dependencies, featureGate featuregate.FeatureGate) error {
logOption := logs.NewOptions()
logOption.LogFormat = s.Logging.Format
logOption.Apply()
Expand All @@ -412,7 +412,7 @@ func Run(s *options.KubeletServer, kubeDeps *kubelet.Dependencies, featureGate f
if err := initForOS(s.KubeletFlags.WindowsService); err != nil {
return fmt.Errorf("failed OS init: %v", err)
}
if err := run(s, kubeDeps, featureGate, stopCh); err != nil {
if err := run(ctx, s, kubeDeps, featureGate); err != nil {
return fmt.Errorf("failed to run Kubelet: %v", err)
}
return nil
Expand Down Expand Up @@ -469,7 +469,7 @@ func makeEventRecorder(kubeDeps *kubelet.Dependencies, nodeName types.NodeName)
}
}

func run(s *options.KubeletServer, kubeDeps *kubelet.Dependencies, featureGate featuregate.FeatureGate, stopCh <-chan struct{}) (err error) {
func run(ctx context.Context, s *options.KubeletServer, kubeDeps *kubelet.Dependencies, featureGate featuregate.FeatureGate) (err error) {
// Set global feature gates based on the value on the initial KubeletServer
err = utilfeature.DefaultMutableFeatureGate.SetFromMap(s.KubeletConfiguration.FeatureGates)
if err != nil {
Expand Down Expand Up @@ -552,7 +552,7 @@ func run(s *options.KubeletServer, kubeDeps *kubelet.Dependencies, featureGate f
klog.Warningf("standalone mode, no API client")

case kubeDeps.KubeClient == nil, kubeDeps.EventClient == nil, kubeDeps.HeartbeatClient == nil:
clientConfig, closeAllConns, err := buildKubeletClientConfig(s, nodeName)
clientConfig, closeAllConns, err := buildKubeletClientConfig(ctx, s, nodeName)
if err != nil {
return err
}
Expand Down Expand Up @@ -597,7 +597,7 @@ func run(s *options.KubeletServer, kubeDeps *kubelet.Dependencies, featureGate f
return err
}
kubeDeps.Auth = auth
runAuthenticatorCAReload(stopCh)
runAuthenticatorCAReload(ctx.Done())
}

var cgroupRoots []string
Expand Down Expand Up @@ -799,7 +799,7 @@ func run(s *options.KubeletServer, kubeDeps *kubelet.Dependencies, featureGate f
select {
case <-done:
break
case <-stopCh:
case <-ctx.Done():
break
}

Expand All @@ -808,7 +808,7 @@ func run(s *options.KubeletServer, kubeDeps *kubelet.Dependencies, featureGate f

// buildKubeletClientConfig constructs the appropriate client config for the kubelet depending on whether
// bootstrapping is enabled or client certificate rotation is enabled.
func buildKubeletClientConfig(s *options.KubeletServer, nodeName types.NodeName) (*restclient.Config, func(), error) {
func buildKubeletClientConfig(ctx context.Context, s *options.KubeletServer, nodeName types.NodeName) (*restclient.Config, func(), error) {
if s.RotateCertificates {
// Rules for client rotation and the handling of kube config files:
//
Expand Down Expand Up @@ -878,7 +878,7 @@ func buildKubeletClientConfig(s *options.KubeletServer, nodeName types.NodeName)
}

if len(s.BootstrapKubeconfig) > 0 {
if err := bootstrap.LoadClientCert(s.KubeConfig, s.BootstrapKubeconfig, s.CertDirectory, nodeName); err != nil {
if err := bootstrap.LoadClientCert(ctx, s.KubeConfig, s.BootstrapKubeconfig, s.CertDirectory, nodeName); err != nil {
return nil, nil, err
}
}
Expand Down
16 changes: 8 additions & 8 deletions pkg/kubelet/certificate/bootstrap/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func LoadClientConfig(kubeconfigPath, bootstrapPath, certDir string) (certConfig
// The kubeconfig at bootstrapPath is used to request a client certificate from the API server.
// On success, a kubeconfig file referencing the generated key and obtained certificate is written to kubeconfigPath.
// The certificate and key file are stored in certDir.
func LoadClientCert(kubeconfigPath, bootstrapPath, certDir string, nodeName types.NodeName) error {
func LoadClientCert(ctx context.Context, kubeconfigPath, bootstrapPath, certDir string, nodeName types.NodeName) error {
// Short-circuit if the kubeconfig file exists and is valid.
ok, err := isClientConfigStillValid(kubeconfigPath)
if err != nil {
Expand Down Expand Up @@ -156,11 +156,11 @@ func LoadClientCert(kubeconfigPath, bootstrapPath, certDir string, nodeName type
}
}

if err := waitForServer(*bootstrapClientConfig, 1*time.Minute); err != nil {
if err := waitForServer(ctx, *bootstrapClientConfig, 1*time.Minute); err != nil {
klog.Warningf("Error waiting for apiserver to come up: %v", err)
}

certData, err := requestNodeCertificate(bootstrapClient, keyData, nodeName)
certData, err := requestNodeCertificate(ctx, bootstrapClient, keyData, nodeName)
if err != nil {
return err
}
Expand Down Expand Up @@ -278,20 +278,20 @@ func verifyKeyData(data []byte) bool {
return err == nil
}

func waitForServer(cfg restclient.Config, deadline time.Duration) error {
func waitForServer(ctx context.Context, cfg restclient.Config, deadline time.Duration) error {
cfg.NegotiatedSerializer = scheme.Codecs.WithoutConversion()
cfg.Timeout = 1 * time.Second
cli, err := restclient.UnversionedRESTClientFor(&cfg)
if err != nil {
return fmt.Errorf("couldn't create client: %v", err)
}

ctx, cancel := context.WithTimeout(context.TODO(), deadline)
ctx, cancel := context.WithTimeout(ctx, deadline)
defer cancel()

var connected bool
wait.JitterUntil(func() {
if _, err := cli.Get().AbsPath("/healthz").Do(context.TODO()).Raw(); err != nil {
if _, err := cli.Get().AbsPath("/healthz").Do(ctx).Raw(); err != nil {
klog.Infof("Failed to connect to apiserver: %v", err)
return
}
Expand All @@ -312,7 +312,7 @@ func waitForServer(cfg restclient.Config, deadline time.Duration) error {
// certificate (pem-encoded). If there is any errors, or the watch timeouts, it
// will return an error. This is intended for use on nodes (kubelet and
// kubeadm).
func requestNodeCertificate(client clientset.Interface, privateKeyData []byte, nodeName types.NodeName) (certData []byte, err error) {
func requestNodeCertificate(ctx context.Context, client clientset.Interface, privateKeyData []byte, nodeName types.NodeName) (certData []byte, err error) {
subject := &pkix.Name{
Organization: []string{"system:nodes"},
CommonName: "system:node:" + string(nodeName),
Expand Down Expand Up @@ -349,7 +349,7 @@ func requestNodeCertificate(client clientset.Interface, privateKeyData []byte, n
return nil, err
}

ctx, cancel := context.WithTimeout(context.Background(), 3600*time.Second)
ctx, cancel := context.WithTimeout(ctx, 3600*time.Second)
defer cancel()

klog.V(2).Infof("Waiting for client certificate to be issued")
Expand Down
7 changes: 4 additions & 3 deletions pkg/kubelet/certificate/bootstrap/bootstrap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package bootstrap

import (
"context"
"fmt"
"io/ioutil"
"os"
Expand Down Expand Up @@ -95,7 +96,7 @@ users:
}

func TestRequestNodeCertificateNoKeyData(t *testing.T) {
certData, err := requestNodeCertificate(newClientset(fakeClient{}), []byte{}, "fake-node-name")
certData, err := requestNodeCertificate(context.TODO(), newClientset(fakeClient{}), []byte{}, "fake-node-name")
if err == nil {
t.Errorf("Got no error, wanted error an error because there was an empty private key passed in.")
}
Expand All @@ -113,7 +114,7 @@ func TestRequestNodeCertificateErrorCreatingCSR(t *testing.T) {
t.Fatalf("Unable to generate a new private key: %v", err)
}

certData, err := requestNodeCertificate(client, privateKeyData, "fake-node-name")
certData, err := requestNodeCertificate(context.TODO(), client, privateKeyData, "fake-node-name")
if err == nil {
t.Errorf("Got no error, wanted error an error because client.Create failed.")
}
Expand All @@ -128,7 +129,7 @@ func TestRequestNodeCertificate(t *testing.T) {
t.Fatalf("Unable to generate a new private key: %v", err)
}

certData, err := requestNodeCertificate(newClientset(fakeClient{}), privateKeyData, "fake-node-name")
certData, err := requestNodeCertificate(context.TODO(), newClientset(fakeClient{}), privateKeyData, "fake-node-name")
if err != nil {
t.Errorf("Got %v, wanted no error.", err)
}
Expand Down
16 changes: 13 additions & 3 deletions staging/src/k8s.io/apiserver/pkg/server/signal.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package server

import (
"context"
"os"
"os/signal"
)
Expand All @@ -27,21 +28,30 @@ var shutdownHandler chan os.Signal
// SetupSignalHandler registered for SIGTERM and SIGINT. A stop channel is returned
// which is closed on one of these signals. If a second signal is caught, the program
// is terminated with exit code 1.
// Only one of SetupSignalContext and SetupSignalHandler should be called, and only can
// be called once.
func SetupSignalHandler() <-chan struct{} {
return SetupSignalContext().Done()
}

// SetupSignalContext is same as SetupSignalHandler, but a context.Context is returned.
// Only one of SetupSignalContext and SetupSignalHandler should be called, and only can
// be called once.
func SetupSignalContext() context.Context {
close(onlyOneSignalHandler) // panics when called twice

shutdownHandler = make(chan os.Signal, 2)

stop := make(chan struct{})
ctx, cancel := context.WithCancel(context.Background())
signal.Notify(shutdownHandler, shutdownSignals...)
go func() {
<-shutdownHandler
close(stop)
cancel()
<-shutdownHandler
os.Exit(1) // second signal. Exit directly.
}()

return stop
return ctx
}

// RequestShutdown emulates a received event that is considered as shutdown signal (SIGTERM/SIGINT)
Expand Down

0 comments on commit a26e588

Please sign in to comment.