Skip to content

Commit

Permalink
[orc8r][service_registry] Add cache to service registry service (#4648)
Browse files Browse the repository at this point in the history
Signed-off-by: Hunter Gatewood <hgatewood@gmail.com>
  • Loading branch information
hcgatewood authored and themarwhal committed Feb 2, 2021
1 parent b534333 commit 47625d3
Show file tree
Hide file tree
Showing 5 changed files with 221 additions and 102 deletions.
5 changes: 5 additions & 0 deletions orc8r/cloud/configs/service_registry.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,11 @@
# proxy_aliases refers to proxy config when a service might have more than one
# port. Example in magma/feg/cloud/configs/service_registry.yml

# poll_frequency sets how frequently the service registry service should reach
# out to the kube-apiserver to refresh its cache of tracked services.
# Ref: https://pkg.go.dev/github.com/robfig/cron#hdr-CRON_Expression_Format
poll_frequency: "@every 30s"

services:
analytics:
host: "localhost"
Expand Down
29 changes: 23 additions & 6 deletions orc8r/cloud/go/orc8r/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,10 @@ package orc8r

const (
ModuleName = "orc8r"
)

// State and entities
const (
NetworkFeaturesConfig = "orc8r_features"
MagmadGatewayType = "magmad_gateway"
AccessGatewayRecordType = "access_gateway_record"
Expand All @@ -29,12 +32,19 @@ const (
UpgradeReleaseChannelEntityType = "upgrade_release_channel"

CallTraceEntityType = "call_trace"
)

// ServiceHostnameEnvVar is the name of an environment variable which is
// required to hold the public IP of the service.
// In dev, this will generally be localhost.
// In prod, this will be the relevant pod's IP.
ServiceHostnameEnvVar = "SERVICE_HOSTNAME"
// K8s
const (
// PartOfLabel and PartOfOrc8rApp are K8s label key and values indicating
// a service is an orc8r application service.
PartOfLabel = "app.kubernetes.io/part-of"
PartOfOrc8rApp = "orc8r-app"

GRPCPortName = "grpc"
HTTPPortName = "http"

AnnotationFieldSeparator = ","

AnalyticsCollectorLabel = "orc8r.io/analytics_collector"
MconfigBuilderLabel = "orc8r.io/mconfig_builder"
Expand All @@ -47,6 +57,13 @@ const (
StateIndexerVersionAnnotation = "orc8r.io/state_indexer_version"
StateIndexerTypesAnnotation = "orc8r.io/state_indexer_types"
StreamProviderStreamsAnnotation = "orc8r.io/stream_provider_streams"
)

AnnotationFieldSeparator = ","
// Environment variables
const (
// ServiceHostnameEnvVar is the name of an environment variable which is
// required to hold the public IP of the service.
// In dev, this will generally be localhost.
// In prod, this will be the relevant pod's IP.
ServiceHostnameEnvVar = "SERVICE_HOSTNAME"
)
18 changes: 4 additions & 14 deletions orc8r/cloud/go/services/service_registry/service_registry/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ import (
const (
defaultK8sQPS = 50
defaultK8sBurst = 50

pollFrequencyConfigKey = "poll_frequency"
)

func main() {
Expand All @@ -50,15 +52,15 @@ func main() {
protos.RegisterServiceRegistryServer(srv.GrpcServer, servicer)
case registry.K8sRegistryMode:
glog.Infof("Registry Mode set to %s. Creating k8s service registry", registry.K8sRegistryMode)
config, err := getK8sClientConfig()
config, err := rest.InClusterConfig()
if err != nil {
glog.Fatalf("Error querying kubernetes config: %s", err)
}
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
glog.Fatalf("Error creating kubernetes clientset: %s", err)
}
servicer, err := servicers.NewKubernetesServiceRegistryServicer(clientset.CoreV1())
servicer, err := servicers.NewKubernetesServiceRegistryServicer(clientset.CoreV1(), srv.Config.MustGetString(pollFrequencyConfigKey), nil)
if err != nil {
glog.Fatal(err)
}
Expand All @@ -71,15 +73,3 @@ func main() {
glog.Fatalf("Error while running service: %s", err)
}
}

func getK8sClientConfig() (*rest.Config, error) {
config, err := rest.InClusterConfig()
if err != nil {
return nil, err
}
// TODO: Remove QPS and Burst overrides after service registry cache
// is implemented.
config.QPS = defaultK8sQPS
config.Burst = defaultK8sBurst
return config, err
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,83 +17,96 @@ import (
"fmt"
"os"
"strings"
"sync"

"magma/orc8r/cloud/go/orc8r"
"magma/orc8r/lib/go/protos"

"github.com/golang/glog"
"github.com/pkg/errors"
"github.com/robfig/cron/v3"
"golang.org/x/net/context"
corev1types "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
corev1 "k8s.io/client-go/kubernetes/typed/core/v1"
)

const (
partOfLabel = "app.kubernetes.io/part-of"
partOfOrc8rApp = "orc8r-app"
orc8rServiceNamePrefix = "orc8r-"
serviceRegistryNamespaceEnvVar = "SERVICE_REGISTRY_NAMESPACE"
grpcPortName = "grpc"
httpPortName = "http"
ServiceRegistryNamespaceEnvVar = "SERVICE_REGISTRY_NAMESPACE"

orc8rServiceNamePrefix = "orc8r-"
)

type KubernetesServiceRegistryServicer struct {
sync.RWMutex
client corev1.CoreV1Interface
namespace string
cache []corev1types.Service
reporter *Reporter
}

// NewKubernetesServiceRegistryServicer creates a new service registry servicer
// that is backed by Kubernetes.
func NewKubernetesServiceRegistryServicer(k8sClient corev1.CoreV1Interface) (*KubernetesServiceRegistryServicer, error) {
namespaceEnvValue := os.Getenv(serviceRegistryNamespaceEnvVar)
//
// Takes an argument for how frequently to refresh the local cache of tracked
// services.
// Ref: https://pkg.go.dev/github.com/robfig/cron#hdr-CRON_Expression_Format
func NewKubernetesServiceRegistryServicer(k8sClient corev1.CoreV1Interface, refreshCacheFrequency string, reporter *Reporter) (*KubernetesServiceRegistryServicer, error) {
namespaceEnvValue := os.Getenv(ServiceRegistryNamespaceEnvVar)
if len(namespaceEnvValue) == 0 {
return nil, fmt.Errorf("%s was not provided as an environment variable", serviceRegistryNamespaceEnvVar)
return nil, fmt.Errorf("%s was not provided as an environment variable", ServiceRegistryNamespaceEnvVar)
}
return &KubernetesServiceRegistryServicer{
client: k8sClient,
namespace: namespaceEnvValue,
}, nil

k := &KubernetesServiceRegistryServicer{client: k8sClient, namespace: namespaceEnvValue, reporter: reporter}

c := cron.New()
_, err := c.AddFunc(refreshCacheFrequency, k.refreshServicesCache)
if err != nil {
return nil, err
}
c.Start()

// Seed registry with initial values
go k.refreshServicesCache()

return k, nil
}

// ListAllServices returns the service name of all services in the registry.
func (s *KubernetesServiceRegistryServicer) ListAllServices(ctx context.Context, req *protos.Void) (*protos.ListAllServicesResponse, error) {
func (k *KubernetesServiceRegistryServicer) ListAllServices(ctx context.Context, req *protos.Void) (*protos.ListAllServicesResponse, error) {
k.RLock()
defer k.RUnlock()
ret := &protos.ListAllServicesResponse{}
orc8rListOption := metav1.ListOptions{
LabelSelector: fmt.Sprintf("%s=%s", partOfLabel, partOfOrc8rApp),
}
svcList, err := s.client.Services(s.namespace).List(orc8rListOption)
if err != nil {
return ret, err
}
for _, svc := range svcList.Items {
formattedName := s.convertK8sServiceNameToMagmaServiceName(svc.Name)
for _, svc := range k.cache {
formattedName := convertK8sServiceNameToMagmaServiceName(svc.Name)
ret.Services = append(ret.Services, formattedName)
}
return ret, nil
}

// FindServices returns all services in that have the provided label.
func (s *KubernetesServiceRegistryServicer) FindServices(ctx context.Context, req *protos.FindServicesRequest) (*protos.FindServicesResponse, error) {
func (k *KubernetesServiceRegistryServicer) FindServices(ctx context.Context, req *protos.FindServicesRequest) (*protos.FindServicesResponse, error) {
k.RLock()
defer k.RUnlock()
ret := &protos.FindServicesResponse{}
orc8rListOption := metav1.ListOptions{
LabelSelector: fmt.Sprintf("%s=%s,%s=true", partOfLabel, partOfOrc8rApp, req.GetLabel()),
}
svcList, err := s.client.Services(s.namespace).List(orc8rListOption)
if err != nil {
return ret, err
}
for _, svc := range svcList.Items {
formattedName := s.convertK8sServiceNameToMagmaServiceName(svc.Name)
ret.Services = append(ret.Services, formattedName)
for _, svc := range k.cache {
if hasLabel(svc, req.Label) {
formattedName := convertK8sServiceNameToMagmaServiceName(svc.Name)
ret.Services = append(ret.Services, formattedName)
}
}
return ret, nil
}

// GetServiceAddress return the address of the gRPC server for the provided
// service.
func (s *KubernetesServiceRegistryServicer) GetServiceAddress(ctx context.Context, req *protos.GetServiceAddressRequest) (*protos.GetServiceAddressResponse, error) {
func (k *KubernetesServiceRegistryServicer) GetServiceAddress(ctx context.Context, req *protos.GetServiceAddressRequest) (*protos.GetServiceAddressResponse, error) {
k.RLock()
defer k.RUnlock()
if req == nil {
return &protos.GetServiceAddressResponse{}, fmt.Errorf("GetServiceAddressRequest was nil")
}
serviceAddress, err := s.getAddressForPortName(req.GetService(), grpcPortName)
serviceAddress, err := k.getAddressForPortName(req.GetService(), orc8r.GRPCPortName)
if err != nil {
return &protos.GetServiceAddressResponse{}, err
}
Expand All @@ -104,11 +117,13 @@ func (s *KubernetesServiceRegistryServicer) GetServiceAddress(ctx context.Contex

// GetHttpServerAddress returns the address of the HTTP server for the provided
// service.
func (s *KubernetesServiceRegistryServicer) GetHttpServerAddress(ctx context.Context, req *protos.GetHttpServerAddressRequest) (*protos.GetHttpServerAddressResponse, error) {
func (k *KubernetesServiceRegistryServicer) GetHttpServerAddress(ctx context.Context, req *protos.GetHttpServerAddressRequest) (*protos.GetHttpServerAddressResponse, error) {
k.RLock()
defer k.RUnlock()
if req == nil {
return &protos.GetHttpServerAddressResponse{}, fmt.Errorf("GetHttpServerAddressRequest was nil")
}
httpServerAddress, err := s.getAddressForPortName(req.GetService(), httpPortName)
httpServerAddress, err := k.getAddressForPortName(req.GetService(), orc8r.HTTPPortName)
if err != nil {
return &protos.GetHttpServerAddressResponse{}, err
}
Expand All @@ -119,8 +134,10 @@ func (s *KubernetesServiceRegistryServicer) GetHttpServerAddress(ctx context.Con

// GetAnnotation returns the annotation value for the provided service and
// annotation.
func (s *KubernetesServiceRegistryServicer) GetAnnotation(ctx context.Context, req *protos.GetAnnotationRequest) (*protos.GetAnnotationResponse, error) {
svc, err := s.getServiceForServiceName(req.GetService())
func (k *KubernetesServiceRegistryServicer) GetAnnotation(ctx context.Context, req *protos.GetAnnotationRequest) (*protos.GetAnnotationResponse, error) {
k.RLock()
defer k.RUnlock()
svc, err := k.getServiceForServiceName(req.GetService())
if err != nil {
return &protos.GetAnnotationResponse{}, err
}
Expand All @@ -134,8 +151,8 @@ func (s *KubernetesServiceRegistryServicer) GetAnnotation(ctx context.Context, r
return &protos.GetAnnotationResponse{}, fmt.Errorf("Annotation '%s' was not found for service '%s'", req.GetAnnotation(), req.GetService())
}

func (s *KubernetesServiceRegistryServicer) getAddressForPortName(service string, portName string) (string, error) {
svc, err := s.getServiceForServiceName(service)
func (k *KubernetesServiceRegistryServicer) getAddressForPortName(service string, portName string) (string, error) {
svc, err := k.getServiceForServiceName(service)
if err != nil {
return "", err
}
Expand All @@ -144,39 +161,77 @@ func (s *KubernetesServiceRegistryServicer) getAddressForPortName(service string
return fmt.Sprintf("%s:%d", svc.Name, port.Port), nil
}
}
return "", fmt.Errorf("Could not find '%s' port for service '%s'", portName, service)
return "", fmt.Errorf("could not find '%s' port for service '%s'", portName, service)
}

func (s *KubernetesServiceRegistryServicer) getServiceForServiceName(serviceName string) (*corev1types.Service, error) {
orc8rListOption := metav1.ListOptions{
LabelSelector: fmt.Sprintf("%s=%s", partOfLabel, partOfOrc8rApp),
func (k *KubernetesServiceRegistryServicer) getServiceForServiceName(serviceName string) (*corev1types.Service, error) {
formattedSvcName := convertMagmaServiceNameToK8sServiceName(serviceName)
for _, svc := range k.cache {
if svc.Name == formattedSvcName {
return &svc, nil
}
}
return nil, fmt.Errorf("could not find service '%s'", serviceName)
}

formattedSvcName := s.convertMagmaServiceNameToK8sServiceName(serviceName)
svcList, err := s.client.Services(s.namespace).List(orc8rListOption)
func (k *KubernetesServiceRegistryServicer) refreshServicesCache() {
if k.reporter != nil {
k.reporter.RefreshStart()
}

opts := metav1.ListOptions{
LabelSelector: fmt.Sprintf("%s=%s", orc8r.PartOfLabel, orc8r.PartOfOrc8rApp),
}
services, err := k.client.Services(k.namespace).List(opts)
if err != nil {
return nil, err
// Log error and leave previous cache intact
err = errors.Wrap(err, "refresh service registry cache of K8s services")
glog.Error(err)
return
}
for _, svc := range svcList.Items {
if svc.Name == formattedSvcName {
return &svc, nil
}
for _, s := range services.Items {
s.Name = convertK8sServiceNameToMagmaServiceName(s.Name)
}

k.Lock()
k.cache = services.Items
k.Unlock()

glog.V(1).Infof("Refreshed service registry cache. Found %d services.", len(k.cache))
if k.reporter != nil {
k.reporter.RefreshDone()
}
return nil, fmt.Errorf("Could not find service '%s'", serviceName)
}

// Orc8r helm services are formatted as orc8r-<svc-name>. Magma convention is
// to use underscores in service names, so remove prefix and convert any
// hyphens in the k8s service name.
func (s *KubernetesServiceRegistryServicer) convertK8sServiceNameToMagmaServiceName(serviceName string) string {
func convertK8sServiceNameToMagmaServiceName(serviceName string) string {
trimmedSvcName := strings.TrimPrefix(serviceName, orc8rServiceNamePrefix)
return strings.ReplaceAll(trimmedSvcName, "-", "_")
}

// Orc8r helm services are formatted as orc8r-<svc-name>. Magma convention is
// to use underscores in service names, so add prefix and convert any
// underscores to hyphens
func (s *KubernetesServiceRegistryServicer) convertMagmaServiceNameToK8sServiceName(serviceName string) string {
func convertMagmaServiceNameToK8sServiceName(serviceName string) string {
k8sSvcNameSuffix := strings.ReplaceAll(serviceName, "_", "-")
return fmt.Sprintf("%s%s", orc8rServiceNamePrefix, k8sSvcNameSuffix)
}

// hasLabel returns true if the service has the passed label and the label's
// value is "true".
func hasLabel(service corev1types.Service, label string) bool {
for l, v := range service.ObjectMeta.Labels {
if l == label && v == "true" {
return true
}
}
return false
}

// Reporter reports service registry events.
type Reporter struct {
RefreshStart func()
RefreshDone func()
}
Loading

0 comments on commit 47625d3

Please sign in to comment.