-
Notifications
You must be signed in to change notification settings - Fork 10
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add first wip of api-server to call checkpoint/restore/migration from…
… API
- Loading branch information
Showing
12 changed files
with
550 additions
and
28 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,47 @@ | ||
package main | ||
|
||
import ( | ||
"os" | ||
|
||
apiserver "github.com/SSU-DCN/podmigration-operator/api-server" | ||
podmigrationv1 "github.com/SSU-DCN/podmigration-operator/api/v1" | ||
|
||
"k8s.io/apimachinery/pkg/runtime" | ||
clientgoscheme "k8s.io/client-go/kubernetes/scheme" | ||
ctrl "sigs.k8s.io/controller-runtime" | ||
kubelog "sigs.k8s.io/controller-runtime/pkg/log" | ||
"sigs.k8s.io/controller-runtime/pkg/log/zap" | ||
"sigs.k8s.io/controller-runtime/pkg/runtime/signals" | ||
) | ||
|
||
var ( | ||
runLog = kubelog.Log.WithName("podmigration-cp").WithName("run") | ||
scheme = runtime.NewScheme() | ||
) | ||
|
||
func init() { | ||
// Initialize the scheme so that kubernetes dynamic client knows | ||
// how to work with new CRD and native kubernetes types | ||
_ = clientgoscheme.AddToScheme(scheme) | ||
_ = podmigrationv1.AddToScheme(scheme) | ||
} | ||
|
||
func main() { | ||
kubelog.SetLogger(zap.New(zap.UseDevMode(true))) | ||
|
||
mgr, err := apiserver.NewManager(ctrl.GetConfigOrDie(), apiserver.Options{ | ||
Scheme: scheme, | ||
Port: 5000, | ||
AllowedDomains: []string{}, | ||
}) | ||
if err != nil { | ||
runLog.Error(err, "unable to create api-server manager") | ||
os.Exit(1) | ||
} | ||
|
||
runLog.Info("starting api-server manager") | ||
if err := mgr.Start(signals.SetupSignalHandler()); err != nil { | ||
runLog.Error(err, "problem running api-server manager") | ||
os.Exit(1) | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,29 @@ | ||
package endpoints | ||
|
||
import ( | ||
v1 "github.com/SSU-DCN/podmigration-operator/api/v1" | ||
) | ||
|
||
var From = &from{} | ||
|
||
type from struct{} | ||
|
||
func (c *from) Object(pm *v1.Podmigration) *Podmigration { | ||
return &Podmigration{ | ||
Name: pm.Name, | ||
Replicas: pm.Spec.Replicas, | ||
Selector: pm.Spec.Selector, | ||
// Template: pm.Spec.Template, | ||
Status: &pm.Status, | ||
} | ||
} | ||
|
||
func (c *from) List(list *v1.PodmigrationList) *List { | ||
items := make([]Podmigration, len(list.Items)) | ||
for i, r := range list.Items { | ||
items[i] = *c.Object(&r) | ||
} | ||
return &List{ | ||
Items: items, | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,118 @@ | ||
package endpoints | ||
|
||
import ( | ||
"fmt" | ||
|
||
v1 "github.com/SSU-DCN/podmigration-operator/api/v1" | ||
"github.com/emicklei/go-restful" | ||
corev1 "k8s.io/api/core/v1" | ||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" | ||
"sigs.k8s.io/controller-runtime/pkg/client" | ||
kubelog "sigs.k8s.io/controller-runtime/pkg/log" | ||
) | ||
|
||
type PodmigrationEndpoint struct { | ||
client client.Client | ||
} | ||
|
||
func NewPodmigrationEndpoint(client client.Client) *PodmigrationEndpoint { | ||
return &PodmigrationEndpoint{client: client} | ||
} | ||
|
||
func (pe *PodmigrationEndpoint) SetupWithWS(ws *restful.WebService) { | ||
ws.Route(ws.GET("Podmigrations").To(pe.list). | ||
Doc("List of Podmigrations"). | ||
Returns(200, "OK", &List{})) | ||
|
||
ws.Route(ws.POST("Podmigrations").To(pe.create). | ||
Doc("Create a new Podmigration"). | ||
Reads(&Podmigration{}). | ||
Returns(200, "OK", &Podmigration{}). | ||
Returns(400, "Bad Request", nil)) | ||
} | ||
|
||
func (pe *PodmigrationEndpoint) list(request *restful.Request, response *restful.Response) { | ||
dl := new(v1.PodmigrationList) | ||
err := pe.client.List(request.Request.Context(), dl, &client.ListOptions{}) | ||
if err != nil { | ||
writeError(response, 404, Error{ | ||
Title: "Error", | ||
Details: fmt.Sprintf("Could not retrieve list: %s", err), | ||
}) | ||
} else { | ||
l := From.List(dl) | ||
if err := response.WriteAsJson(l); err != nil { | ||
writeError(response, 404, Error{ | ||
Title: "Error", | ||
Details: "Could not list resources", | ||
}) | ||
} | ||
} | ||
} | ||
|
||
func (pe *PodmigrationEndpoint) create(request *restful.Request, response *restful.Response) { | ||
pm := new(Podmigration) | ||
err := request.ReadEntity(pm) | ||
|
||
if err != nil { | ||
writeError(response, 400, Error{ | ||
Title: "Bad Request", | ||
Details: "Could not read entity", | ||
}) | ||
return | ||
} | ||
|
||
if err := pm.Validate(); err != nil { | ||
writeError(response, 400, Error{ | ||
Title: "Validation error", | ||
Details: err.Error(), | ||
}) | ||
return | ||
} | ||
|
||
//TODO(tuong): convert request data-raw to this Template. | ||
template := corev1.PodTemplateSpec{ | ||
ObjectMeta: metav1.ObjectMeta{ | ||
Labels: map[string]string{"app": "redis"}, | ||
Annotations: map[string]string{"snapshotPolicy": "restore", "snapshotPath": "/var/lib/kubelet/migration/" + pm.Name}, | ||
}, | ||
Spec: corev1.PodSpec{ | ||
Containers: []corev1.Container{ | ||
{ | ||
Name: "redis", | ||
Image: "redis", | ||
Ports: []corev1.ContainerPort{ | ||
{ContainerPort: 6379, Protocol: "TCP"}, | ||
}, | ||
}, | ||
}, | ||
}, | ||
} | ||
|
||
obj := &v1.Podmigration{ | ||
ObjectMeta: metav1.ObjectMeta{Name: pm.Name, Namespace: "default"}, | ||
Spec: v1.PodmigrationSpec{Replicas: pm.Replicas, Selector: pm.Selector, Template: template}, | ||
} | ||
|
||
err = pe.client.Create(request.Request.Context(), obj, &client.CreateOptions{}) | ||
if err != nil { | ||
writeError(response, 400, Error{ | ||
Title: "Error", | ||
Details: fmt.Sprintf("Could not create object: %s", err), | ||
}) | ||
} else { | ||
d := From.Object(obj) | ||
if err := response.WriteAsJson(d); err != nil { | ||
writeError(response, 422, Error{ | ||
Title: "Error", | ||
Details: "Could not write response", | ||
}) | ||
} | ||
} | ||
} | ||
|
||
func writeError(response *restful.Response, httpStatus int, err Error) { | ||
if err := response.WriteHeaderAndJson(httpStatus, err, "application/json"); err != nil { | ||
kubelog.Log.Error(err, "Could not write the error response") | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,51 @@ | ||
package endpoints | ||
|
||
import ( | ||
"errors" | ||
// "fmt" | ||
// "strings" | ||
|
||
v1 "github.com/SSU-DCN/podmigration-operator/api/v1" | ||
"github.com/emicklei/go-restful" | ||
|
||
// corev1 "k8s.io/api/core/v1" | ||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" | ||
) | ||
|
||
type Endpoint interface { | ||
SetupWithWS(ws *restful.WebService) | ||
} | ||
|
||
type Podmigration struct { | ||
Name string `json:"name"` | ||
// Replicas string `json:"replicas"` | ||
// Selector string `json:"selector"` | ||
Replicas *int32 `json:"replicas"` | ||
Selector *metav1.LabelSelector `json:"selector"` | ||
// Template corev1.PodTemplateSpec `json:"template"` | ||
Status *v1.PodmigrationStatus `json:"status,omitempty"` | ||
} | ||
|
||
func (pm *Podmigration) Validate() error { | ||
var validated bool | ||
validated = true | ||
//TODO(Tuong): check template is valid or not | ||
// if pm.Template == checkTemplate { | ||
// return error.New("template can't be empty") | ||
// } else { | ||
// validated = true | ||
// } | ||
if validated { | ||
return nil | ||
} | ||
return errors.New("source type validation was not performed, type can only be [WebFolder,S3]") | ||
} | ||
|
||
type List struct { | ||
Items []Podmigration `json:"items"` | ||
} | ||
|
||
type Error struct { | ||
Title string `json:"title"` | ||
Details string `json:"details"` | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,127 @@ | ||
package apiserver | ||
|
||
/* | ||
Now we will create an API Server Manager that will create the K8S client and keep a reference to it. | ||
It will also create a cache that will be used to create a cached K8S client, | ||
initialize the cache properly and in the end handle the termination signals. | ||
*/ | ||
|
||
import ( | ||
"time" | ||
|
||
"k8s.io/apimachinery/pkg/runtime" | ||
"k8s.io/client-go/rest" | ||
"sigs.k8s.io/controller-runtime/pkg/cache" | ||
"sigs.k8s.io/controller-runtime/pkg/client" | ||
"sigs.k8s.io/controller-runtime/pkg/client/apiutil" | ||
) | ||
|
||
var ( | ||
defaultRetryPeriod = 2 * time.Second | ||
) | ||
|
||
// Options to customize Manager behaviour and pass information | ||
type Options struct { | ||
Scheme *runtime.Scheme | ||
Namespace string | ||
Port int | ||
AllowedDomains []string | ||
} | ||
|
||
type Manager interface { | ||
Start(stop <-chan struct{}) error | ||
} | ||
|
||
type manager struct { | ||
config *rest.Config | ||
client client.Client | ||
server *apiServer | ||
started bool | ||
internalStop <-chan struct{} | ||
internalStopper chan<- struct{} | ||
cache cache.Cache | ||
errSignal *errSignaler | ||
port int | ||
allowedDomains []string | ||
} | ||
|
||
func NewManager(config *rest.Config, options Options) (Manager, error) { | ||
mapper, err := apiutil.NewDynamicRESTMapper(config) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
cc, err := cache.New(config, cache.Options{ | ||
Scheme: options.Scheme, | ||
Mapper: mapper, | ||
Resync: &defaultRetryPeriod, | ||
Namespace: options.Namespace, | ||
}) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
c, err := client.New(config, client.Options{Scheme: options.Scheme, Mapper: mapper}) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
stop := make(chan struct{}) | ||
return &manager{ | ||
config: config, | ||
cache: cc, | ||
client: &client.DelegatingClient{ | ||
Reader: &client.DelegatingReader{ | ||
CacheReader: cc, | ||
ClientReader: c, | ||
}, | ||
Writer: c, | ||
StatusClient: c, | ||
}, | ||
internalStop: stop, | ||
internalStopper: stop, | ||
port: options.Port, | ||
allowedDomains: options.AllowedDomains, | ||
}, nil | ||
} | ||
|
||
func (m *manager) Start(stop <-chan struct{}) error { | ||
defer close(m.internalStopper) | ||
// initialize this here so that we reset the signal channel state on every start | ||
m.errSignal = &errSignaler{errSignal: make(chan struct{})} | ||
m.waitForCache() | ||
|
||
srv, err := newApiServer(m.port, m.allowedDomains, m.client) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
go func() { | ||
if err := srv.Start(m.internalStop); err != nil { | ||
m.errSignal.SignalError(err) | ||
} | ||
}() | ||
select { | ||
case <-stop: | ||
return nil | ||
case <-m.errSignal.GotError(): | ||
// Error starting the cache | ||
return m.errSignal.Error() | ||
} | ||
} | ||
|
||
func (m *manager) waitForCache() { | ||
if m.started { | ||
return | ||
} | ||
|
||
go func() { | ||
if err := m.cache.Start(m.internalStop); err != nil { | ||
m.errSignal.SignalError(err) | ||
} | ||
}() | ||
|
||
// Wait for the caches to sync. | ||
m.cache.WaitForCacheSync(m.internalStop) | ||
m.started = true | ||
} |
Oops, something went wrong.