Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MESOS: inject certain MESOS_ envvars into all container env #20845

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 54 additions & 41 deletions contrib/mesos/pkg/executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ type Executor struct {
kubeletFinished <-chan struct{} // signals that kubelet Run() died
exitFunc func(int)
staticPodsConfigPath string
staticPodsFilters podutil.Filters
launchGracePeriod time.Duration
nodeInfos chan<- NodeInfo
initCompleted chan struct{} // closes upon completion of Init()
Expand All @@ -113,18 +114,21 @@ type Executor struct {
}

type Config struct {
APIClient *clientset.Clientset
Docker dockertools.DockerInterface
ShutdownAlert func()
SuicideTimeout time.Duration
KubeletFinished <-chan struct{} // signals that kubelet Run() died
ExitFunc func(int)
StaticPodsConfigPath string
LaunchGracePeriod time.Duration
NodeInfos chan<- NodeInfo
Registry Registry
APIClient *clientset.Clientset
Docker dockertools.DockerInterface
ShutdownAlert func()
SuicideTimeout time.Duration
KubeletFinished <-chan struct{} // signals that kubelet Run() died
ExitFunc func(int)
LaunchGracePeriod time.Duration
NodeInfos chan<- NodeInfo
Registry Registry
Options []Option // functional options
}

// Option is a functional option type for Executor
type Option func(*Executor)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1


func (k *Executor) isConnected() bool {
return connectedState == (&k.state).get()
}
Expand All @@ -139,22 +143,26 @@ func New(config Config) *Executor {
launchGracePeriod = time.Duration(math.MaxInt64)
}
k := &Executor{
state: disconnectedState,
terminate: make(chan struct{}),
outgoing: make(chan func() (mesos.Status, error), 1024),
dockerClient: config.Docker,
suicideTimeout: config.SuicideTimeout,
kubeletFinished: config.KubeletFinished,
suicideWatch: &suicideTimer{},
shutdownAlert: config.ShutdownAlert,
exitFunc: config.ExitFunc,
staticPodsConfigPath: config.StaticPodsConfigPath,
launchGracePeriod: launchGracePeriod,
nodeInfos: config.NodeInfos,
initCompleted: make(chan struct{}),
registry: config.Registry,
kubeAPI: &clientAPIWrapper{config.APIClient},
nodeAPI: &clientAPIWrapper{config.APIClient},
state: disconnectedState,
terminate: make(chan struct{}),
outgoing: make(chan func() (mesos.Status, error), 1024),
dockerClient: config.Docker,
suicideTimeout: config.SuicideTimeout,
kubeletFinished: config.KubeletFinished,
suicideWatch: &suicideTimer{},
shutdownAlert: config.ShutdownAlert,
exitFunc: config.ExitFunc,
launchGracePeriod: launchGracePeriod,
nodeInfos: config.NodeInfos,
initCompleted: make(chan struct{}),
registry: config.Registry,
kubeAPI: &clientAPIWrapper{config.APIClient},
nodeAPI: &clientAPIWrapper{config.APIClient},
}

// apply functional options
for _, opt := range config.Options {
opt(k)
}

runtime.On(k.initCompleted, k.runSendLoop)
Expand All @@ -165,6 +173,14 @@ func New(config Config) *Executor {
return k
}

// StaticPods creates a static pods Option for an Executor
func StaticPods(configPath string, f podutil.Filters) Option {
return func(k *Executor) {
k.staticPodsFilters = f
k.staticPodsConfigPath = configPath
}
}

// Done returns a chan that closes when the executor is shutting down
func (k *Executor) Done() <-chan struct{} {
return k.terminate
Expand Down Expand Up @@ -226,12 +242,7 @@ func (k *Executor) Registered(
log.Errorf("failed to register/transition to a connected state")
}

if executorInfo != nil && executorInfo.Data != nil {
err := k.initializeStaticPodsSource(slaveInfo.GetHostname(), executorInfo.Data)
if err != nil {
log.Errorf("failed to initialize static pod configuration: %v", err)
}
}
k.initializeStaticPodsSource(executorInfo)

annotations, err := annotationsFor(executorInfo)
if err != nil {
Expand Down Expand Up @@ -296,15 +307,17 @@ func (k *Executor) Reregistered(driver bindings.ExecutorDriver, slaveInfo *mesos
}

// initializeStaticPodsSource unzips the data slice into the static-pods directory
func (k *Executor) initializeStaticPodsSource(hostname string, data []byte) error {
log.V(2).Infof("extracting static pods config to %s", k.staticPodsConfigPath)
// annotate the pod with BindingHostKey so that the scheduler will ignore the pod
// once it appears in the pod registry. the stock kubelet sets the pod host in order
// to accomplish the same; we do this because the k8sm scheduler works differently.
annotator := podutil.Annotator(map[string]string{
meta.BindingHostKey: hostname,
})
return podutil.WriteToDir(annotator.Do(podutil.Gunzip(data)), k.staticPodsConfigPath)
func (k *Executor) initializeStaticPodsSource(executorInfo *mesos.ExecutorInfo) {
if data := executorInfo.GetData(); len(data) > 0 && k.staticPodsConfigPath != "" {
log.V(2).Infof("extracting static pods config to %s", k.staticPodsConfigPath)
err := podutil.WriteToDir(
k.staticPodsFilters.Do(podutil.Gunzip(executorInfo.Data)),
k.staticPodsConfigPath,
)
if err != nil {
log.Errorf("failed to initialize static pod configuration: %v", err)
}
}
}

// Disconnected is called when the executor is disconnected from the slave.
Expand Down
18 changes: 15 additions & 3 deletions contrib/mesos/pkg/executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"net/http/httptest"
"os"
"path/filepath"
"reflect"
"sync"
"sync/atomic"
"testing"
Expand Down Expand Up @@ -349,16 +350,27 @@ func TestExecutorInitializeStaticPodsSource(t *testing.T) {
}

// extract the pods into staticPodsConfigPath
hostname := "h1"
err = executor.initializeStaticPodsSource(hostname, gzipped)
assert.NoError(t, err)
executor.initializeStaticPodsSource(&mesosproto.ExecutorInfo{Data: gzipped})

actualpods, errs := podutil.ReadFromDir(staticPodsConfigPath)
reportErrors(errs)

list := podutil.List(actualpods)
assert.NotNil(t, list)
assert.Equal(t, expectedStaticPodsNum, len(list.Items))

var (
expectedNames = map[string]struct{}{
"spod-01": {},
"spod-02": {},
}
actualNames = map[string]struct{}{}
)
for _, pod := range list.Items {
actualNames[pod.Name] = struct{}{}
}
assert.True(t, reflect.DeepEqual(expectedNames, actualNames), "expected %v instead of %v", expectedNames, actualNames)

wg.Wait()
}

Expand Down
126 changes: 0 additions & 126 deletions contrib/mesos/pkg/executor/service/podsource.go

This file was deleted.

Loading