Skip to content

Commit

Permalink
Add first wip of api-server to call checkpoint/restore/migration from…
Browse files Browse the repository at this point in the history
… API
  • Loading branch information
vutuong committed Dec 29, 2020
1 parent 64c1259 commit a25912b
Show file tree
Hide file tree
Showing 12 changed files with 550 additions and 28 deletions.
47 changes: 47 additions & 0 deletions api-server/cmd/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package main

import (
"os"

apiserver "github.com/SSU-DCN/podmigration-operator/api-server"
podmigrationv1 "github.com/SSU-DCN/podmigration-operator/api/v1"

"k8s.io/apimachinery/pkg/runtime"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
ctrl "sigs.k8s.io/controller-runtime"
kubelog "sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/log/zap"
"sigs.k8s.io/controller-runtime/pkg/runtime/signals"
)

var (
runLog = kubelog.Log.WithName("podmigration-cp").WithName("run")
scheme = runtime.NewScheme()
)

func init() {
// Initialize the scheme so that kubernetes dynamic client knows
// how to work with new CRD and native kubernetes types
_ = clientgoscheme.AddToScheme(scheme)
_ = podmigrationv1.AddToScheme(scheme)
}

func main() {
kubelog.SetLogger(zap.New(zap.UseDevMode(true)))

mgr, err := apiserver.NewManager(ctrl.GetConfigOrDie(), apiserver.Options{
Scheme: scheme,
Port: 5000,
AllowedDomains: []string{},
})
if err != nil {
runLog.Error(err, "unable to create api-server manager")
os.Exit(1)
}

runLog.Info("starting api-server manager")
if err := mgr.Start(signals.SetupSignalHandler()); err != nil {
runLog.Error(err, "problem running api-server manager")
os.Exit(1)
}
}
29 changes: 29 additions & 0 deletions api-server/endpoints/converter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package endpoints

import (
v1 "github.com/SSU-DCN/podmigration-operator/api/v1"
)

var From = &from{}

type from struct{}

func (c *from) Object(pm *v1.Podmigration) *Podmigration {
return &Podmigration{
Name: pm.Name,
Replicas: pm.Spec.Replicas,
Selector: pm.Spec.Selector,
// Template: pm.Spec.Template,
Status: &pm.Status,
}
}

func (c *from) List(list *v1.PodmigrationList) *List {
items := make([]Podmigration, len(list.Items))
for i, r := range list.Items {
items[i] = *c.Object(&r)
}
return &List{
Items: items,
}
}
118 changes: 118 additions & 0 deletions api-server/endpoints/podmigration.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
package endpoints

import (
"fmt"

v1 "github.com/SSU-DCN/podmigration-operator/api/v1"
"github.com/emicklei/go-restful"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"sigs.k8s.io/controller-runtime/pkg/client"
kubelog "sigs.k8s.io/controller-runtime/pkg/log"
)

type PodmigrationEndpoint struct {
client client.Client
}

func NewPodmigrationEndpoint(client client.Client) *PodmigrationEndpoint {
return &PodmigrationEndpoint{client: client}
}

func (pe *PodmigrationEndpoint) SetupWithWS(ws *restful.WebService) {
ws.Route(ws.GET("Podmigrations").To(pe.list).
Doc("List of Podmigrations").
Returns(200, "OK", &List{}))

ws.Route(ws.POST("Podmigrations").To(pe.create).
Doc("Create a new Podmigration").
Reads(&Podmigration{}).
Returns(200, "OK", &Podmigration{}).
Returns(400, "Bad Request", nil))
}

func (pe *PodmigrationEndpoint) list(request *restful.Request, response *restful.Response) {
dl := new(v1.PodmigrationList)
err := pe.client.List(request.Request.Context(), dl, &client.ListOptions{})
if err != nil {
writeError(response, 404, Error{
Title: "Error",
Details: fmt.Sprintf("Could not retrieve list: %s", err),
})
} else {
l := From.List(dl)
if err := response.WriteAsJson(l); err != nil {
writeError(response, 404, Error{
Title: "Error",
Details: "Could not list resources",
})
}
}
}

func (pe *PodmigrationEndpoint) create(request *restful.Request, response *restful.Response) {
pm := new(Podmigration)
err := request.ReadEntity(pm)

if err != nil {
writeError(response, 400, Error{
Title: "Bad Request",
Details: "Could not read entity",
})
return
}

if err := pm.Validate(); err != nil {
writeError(response, 400, Error{
Title: "Validation error",
Details: err.Error(),
})
return
}

//TODO(tuong): convert request data-raw to this Template.
template := corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{"app": "redis"},
Annotations: map[string]string{"snapshotPolicy": "restore", "snapshotPath": "/var/lib/kubelet/migration/" + pm.Name},
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: "redis",
Image: "redis",
Ports: []corev1.ContainerPort{
{ContainerPort: 6379, Protocol: "TCP"},
},
},
},
},
}

obj := &v1.Podmigration{
ObjectMeta: metav1.ObjectMeta{Name: pm.Name, Namespace: "default"},
Spec: v1.PodmigrationSpec{Replicas: pm.Replicas, Selector: pm.Selector, Template: template},
}

err = pe.client.Create(request.Request.Context(), obj, &client.CreateOptions{})
if err != nil {
writeError(response, 400, Error{
Title: "Error",
Details: fmt.Sprintf("Could not create object: %s", err),
})
} else {
d := From.Object(obj)
if err := response.WriteAsJson(d); err != nil {
writeError(response, 422, Error{
Title: "Error",
Details: "Could not write response",
})
}
}
}

func writeError(response *restful.Response, httpStatus int, err Error) {
if err := response.WriteHeaderAndJson(httpStatus, err, "application/json"); err != nil {
kubelog.Log.Error(err, "Could not write the error response")
}
}
51 changes: 51 additions & 0 deletions api-server/endpoints/types.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package endpoints

import (
"errors"
// "fmt"
// "strings"

v1 "github.com/SSU-DCN/podmigration-operator/api/v1"
"github.com/emicklei/go-restful"

// corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

type Endpoint interface {
SetupWithWS(ws *restful.WebService)
}

type Podmigration struct {
Name string `json:"name"`
// Replicas string `json:"replicas"`
// Selector string `json:"selector"`
Replicas *int32 `json:"replicas"`
Selector *metav1.LabelSelector `json:"selector"`
// Template corev1.PodTemplateSpec `json:"template"`
Status *v1.PodmigrationStatus `json:"status,omitempty"`
}

func (pm *Podmigration) Validate() error {
var validated bool
validated = true
//TODO(Tuong): check template is valid or not
// if pm.Template == checkTemplate {
// return error.New("template can't be empty")
// } else {
// validated = true
// }
if validated {
return nil
}
return errors.New("source type validation was not performed, type can only be [WebFolder,S3]")
}

type List struct {
Items []Podmigration `json:"items"`
}

type Error struct {
Title string `json:"title"`
Details string `json:"details"`
}
127 changes: 127 additions & 0 deletions api-server/manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
package apiserver

/*
Now we will create an API Server Manager that will create the K8S client and keep a reference to it.
It will also create a cache that will be used to create a cached K8S client,
initialize the cache properly and in the end handle the termination signals.
*/

import (
"time"

"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/rest"
"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/apiutil"
)

var (
defaultRetryPeriod = 2 * time.Second
)

// Options to customize Manager behaviour and pass information
type Options struct {
Scheme *runtime.Scheme
Namespace string
Port int
AllowedDomains []string
}

type Manager interface {
Start(stop <-chan struct{}) error
}

type manager struct {
config *rest.Config
client client.Client
server *apiServer
started bool
internalStop <-chan struct{}
internalStopper chan<- struct{}
cache cache.Cache
errSignal *errSignaler
port int
allowedDomains []string
}

func NewManager(config *rest.Config, options Options) (Manager, error) {
mapper, err := apiutil.NewDynamicRESTMapper(config)
if err != nil {
return nil, err
}

cc, err := cache.New(config, cache.Options{
Scheme: options.Scheme,
Mapper: mapper,
Resync: &defaultRetryPeriod,
Namespace: options.Namespace,
})
if err != nil {
return nil, err
}

c, err := client.New(config, client.Options{Scheme: options.Scheme, Mapper: mapper})
if err != nil {
return nil, err
}

stop := make(chan struct{})
return &manager{
config: config,
cache: cc,
client: &client.DelegatingClient{
Reader: &client.DelegatingReader{
CacheReader: cc,
ClientReader: c,
},
Writer: c,
StatusClient: c,
},
internalStop: stop,
internalStopper: stop,
port: options.Port,
allowedDomains: options.AllowedDomains,
}, nil
}

func (m *manager) Start(stop <-chan struct{}) error {
defer close(m.internalStopper)
// initialize this here so that we reset the signal channel state on every start
m.errSignal = &errSignaler{errSignal: make(chan struct{})}
m.waitForCache()

srv, err := newApiServer(m.port, m.allowedDomains, m.client)
if err != nil {
return err
}

go func() {
if err := srv.Start(m.internalStop); err != nil {
m.errSignal.SignalError(err)
}
}()
select {
case <-stop:
return nil
case <-m.errSignal.GotError():
// Error starting the cache
return m.errSignal.Error()
}
}

func (m *manager) waitForCache() {
if m.started {
return
}

go func() {
if err := m.cache.Start(m.internalStop); err != nil {
m.errSignal.SignalError(err)
}
}()

// Wait for the caches to sync.
m.cache.WaitForCacheSync(m.internalStop)
m.started = true
}
Loading

0 comments on commit a25912b

Please sign in to comment.