Skip to content

Commit

Permalink
Implementation of incremental MCP (istio#22033)
Browse files Browse the repository at this point in the history
* add option EnableIncrementalMCP to galley and pilot, support incremental update in mcp controller

* ut bug fix

* remove useless check and rename params

* goimports

* go fmt
  • Loading branch information
kevin21th authored Apr 12, 2020
1 parent 3cf77cc commit 086fd92
Show file tree
Hide file tree
Showing 8 changed files with 219 additions and 5 deletions.
3 changes: 3 additions & 0 deletions galley/pkg/envvar/envvars.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ var (

// MCPSourceReqFreq is the frequency that is used by the rate limiter in MCP Sources
MCPSourceReqFreq = env.RegisterDurationVar("MCP_SOURCE_REQ_FREQ", time.Second, "")

// EnableIncrementalMCP is an option to enable incremental mcp
EnableIncrementalMCP = env.RegisterBoolVar("ENABLE_INCREMENTAL_MCP", false, "")
)

// RegisteredEnvVarNames returns the names of registered environment variables.
Expand Down
7 changes: 7 additions & 0 deletions galley/pkg/server/components/processing.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,13 @@ func (p *Processing) Start() (err error) {
ConnRateLimiter: mcpSourceRateLimiter,
}

// set incremental flag of all collections to true when incremental mcp enabled
if envvar.EnableIncrementalMCP.Get() {
for i := range options.CollectionsOptions {
options.CollectionsOptions[i].Incremental = true
}
}

md := grpcMetadata.MD{
versionMetadataKey: []string{version.Info.Version},
}
Expand Down
2 changes: 1 addition & 1 deletion pilot/pkg/bootstrap/configcontroller.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ func (s *Server) mcpController(
all := collections.Pilot.All()
cols := make([]sink.CollectionOptions, 0, len(all))
for _, c := range all {
cols = append(cols, sink.CollectionOptions{Name: c.Name().String(), Incremental: false})
cols = append(cols, sink.CollectionOptions{Name: c.Name().String(), Incremental: features.EnableIncrementalMCP})
}

mcpController := mcp.NewController(opts)
Expand Down
7 changes: 7 additions & 0 deletions pilot/pkg/features/pilot.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,4 +290,11 @@ var (

ClusterName = env.RegisterStringVar("CLUSTER_ID", "Kubernetes",
"Defines the cluster and service registry that this Istiod instance is belongs to")

EnableIncrementalMCP = env.RegisterBoolVar(
"PILOT_ENABLE_INCREMENTAL_MCP",
false,
"If enabled, pilot will set the incremental flag of the options in the mcp controller "+
"to true, and then galley may push data incrementally, it depends on whether the "+
"resource supports incremental. By default, this is false.").Get()
)
57 changes: 57 additions & 0 deletions pilot/pkg/serviceregistry/mcp/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,23 @@ func (c *controller) Apply(change *sink.Change) error {

c.configStoreMu.Lock()
prevStore = c.configStore[kind]
// Incremental update when received incremental change
if change.Incremental {

//Although it is not a deep copy, there is no problem because the config will not be modified
prevCache := make(map[string]map[string]*model.Config, len(prevStore))
for namespace, namedConfig := range prevStore {
prevCache[namespace] = make(map[string]*model.Config, len(namedConfig))
for name, config := range namedConfig {
prevCache[namespace][name] = config
}
}
prevStore = prevCache

c.removeConfig(kind, change.Removed)
c.incrementalUpdate(kind, innerStore)
innerStore = c.configStore[kind]
}
c.configStore[kind] = innerStore
c.configStoreMu.Unlock()
c.sync(change.Collection)
Expand Down Expand Up @@ -330,3 +347,43 @@ func extractNameNamespace(metadataName string) (string, string) {
}
return "", segments[0]
}

func (c *controller) removeConfig(kind resource.GroupVersionKind, resources []string) {
for _, fullName := range resources {
namespace, name := extractNameNamespace(fullName)
if byType, ok := c.configStore[kind]; ok {
if byNamespace, ok := byType[namespace]; ok {
if conf, ok := byNamespace[name]; ok {
delete(byNamespace, conf.Name)
}
// clear parent map also
if len(byNamespace) == 0 {
delete(byType, namespace)
}
}
// clear parent map also
if len(byType) == 0 {
delete(c.configStore, kind)
}
}
}
}

func (c *controller) incrementalUpdate(kind resource.GroupVersionKind, conf map[string]map[string]*model.Config) {
if len(conf) == 0 {
return
}
if byType, ok := c.configStore[kind]; ok {
for namespace, namedConf := range conf {
if byNamespace, ok := byType[namespace]; ok {
for name, config := range namedConf {
byNamespace[name] = config
}
} else {
byType[namespace] = namedConf
}
}
} else {
c.configStore[kind] = conf
}
}
139 changes: 139 additions & 0 deletions pilot/pkg/serviceregistry/mcp/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -851,3 +851,142 @@ func (f *FakeXdsUpdater) SvcUpdate(_, _, _ string, _ model.Event) {

func (f *FakeXdsUpdater) ProxyUpdate(_, _ string) {
}

func TestApplyIncrementalChangeRemove(t *testing.T) {
g := NewGomegaWithT(t)

fx := NewFakeXDS()
testControllerOptions.XDSUpdater = fx
controller := mcp.NewController(testControllerOptions)

message := convertToResource(g, collections.IstioNetworkingV1Alpha3Gateways.Resource().Proto(), gateway)

change := convertToChange([]proto.Message{message},
[]string{"random-namespace/test-gateway"},
setIncremental(),
setCollection(collections.IstioNetworkingV1Alpha3Gateways.Name().String()),
setTypeURL(collections.IstioNetworkingV1Alpha3Gateways.Resource().Proto()))

err := controller.Apply(change)
g.Expect(err).ToNot(HaveOccurred())

entries, err := controller.List(gatewayGvk, "")
g.Expect(err).ToNot(HaveOccurred())
g.Expect(entries).To(HaveLen(1))
g.Expect(entries[0].Name).To(Equal("test-gateway"))

update := <-fx.Events
g.Expect(update).To(Equal("ConfigUpdate"))

message2 := convertToResource(g, collections.IstioNetworkingV1Alpha3Gateways.Resource().Proto(), gateway2)
change = convertToChange([]proto.Message{message2},
[]string{"random-namespace/test-gateway2"},
setIncremental(),
setCollection(collections.IstioNetworkingV1Alpha3Gateways.Name().String()),
setTypeURL(collections.IstioNetworkingV1Alpha3Gateways.Resource().Proto()))

err = controller.Apply(change)
g.Expect(err).ToNot(HaveOccurred())

entries, err = controller.List(gatewayGvk, "")
g.Expect(err).ToNot(HaveOccurred())
g.Expect(entries).To(HaveLen(2))

update = <-fx.Events
g.Expect(update).To(Equal("ConfigUpdate"))

for _, gw := range entries {
g.Expect(gw.GroupVersionKind()).To(Equal(gatewayGvk))
switch gw.Name {
case "test-gateway":
g.Expect(gw.Spec).To(Equal(message))
case "test-gateway2":
g.Expect(gw.Spec).To(Equal(message2))
}
}

change = convertToChange([]proto.Message{message2},
[]string{"random-namespace/test-gateway2"},
setIncremental(),
setRemoved([]string{"random-namespace/test-gateway"}),
setCollection(collections.IstioNetworkingV1Alpha3Gateways.Name().String()),
setTypeURL(collections.IstioNetworkingV1Alpha3Gateways.Resource().Proto()))

err = controller.Apply(change)
g.Expect(err).ToNot(HaveOccurred())

entries, err = controller.List(gatewayGvk, "")
g.Expect(err).ToNot(HaveOccurred())
g.Expect(entries).To(HaveLen(1))
g.Expect(entries[0].Name).To(Equal("test-gateway2"))
g.Expect(entries[0].Spec).To(Equal(message2))

update = <-fx.Events
g.Expect(update).To(Equal("ConfigUpdate"))
}

func TestApplyIncrementalChange(t *testing.T) {
g := NewGomegaWithT(t)

fx := NewFakeXDS()
testControllerOptions.XDSUpdater = fx
controller := mcp.NewController(testControllerOptions)

message := convertToResource(g, collections.IstioNetworkingV1Alpha3Gateways.Resource().Proto(), gateway)

change := convertToChange([]proto.Message{message},
[]string{"random-namespace/test-gateway"},
setIncremental(),
setCollection(collections.IstioNetworkingV1Alpha3Gateways.Name().String()),
setTypeURL(collections.IstioNetworkingV1Alpha3Gateways.Resource().Proto()))

err := controller.Apply(change)
g.Expect(err).ToNot(HaveOccurred())

entries, err := controller.List(gatewayGvk, "")
g.Expect(err).ToNot(HaveOccurred())
g.Expect(entries).To(HaveLen(1))
g.Expect(entries[0].Name).To(Equal("test-gateway"))

update := <-fx.Events
g.Expect(update).To(Equal("ConfigUpdate"))

message2 := convertToResource(g, collections.IstioNetworkingV1Alpha3Gateways.Resource().Proto(), gateway2)
change = convertToChange([]proto.Message{message2},
[]string{"random-namespace/test-gateway2"},
setIncremental(),
setCollection(collections.IstioNetworkingV1Alpha3Gateways.Name().String()),
setTypeURL(collections.IstioNetworkingV1Alpha3Gateways.Resource().Proto()))

err = controller.Apply(change)
g.Expect(err).ToNot(HaveOccurred())

entries, err = controller.List(gatewayGvk, "")
g.Expect(err).ToNot(HaveOccurred())
g.Expect(entries).To(HaveLen(2))

for _, gw := range entries {
g.Expect(gw.GroupVersionKind()).To(Equal(gatewayGvk))
switch gw.Name {
case "test-gateway":
g.Expect(gw.Spec).To(Equal(message))
case "test-gateway2":
g.Expect(gw.Spec).To(Equal(message2))
}
}

update = <-fx.Events
g.Expect(update).To(Equal("ConfigUpdate"))
}

func setIncremental() func(*sink.Change) {
return func(c *sink.Change) {
c.Incremental = true
}
}

func setRemoved(removed []string) func(*sink.Change) {
return func(c *sink.Change) {
c.Removed = removed
}
}
2 changes: 1 addition & 1 deletion pkg/mcp/source/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,7 @@ func (con *connection) pushServerResponse(w *watch, resp *WatchResponse) error {
Collection: resp.Collection,
Resources: added,
RemovedResources: removed,
Incremental: incremental,
Incremental: con.streamNonce > 0 && incremental, // the first response was not consider as incremental
}

// increment nonce
Expand Down
7 changes: 4 additions & 3 deletions pkg/mcp/source/source_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -796,9 +796,10 @@ func TestSourceACKAddUpdateDelete_Incremental(t *testing.T) {
inject *WatchResponse
}{
{
name: "ack add A0",
inject: makeWatchResponse(test.FakeType0Collection, "1", true, test.Type0A[0]),
wantResources: test.MakeResources(true, test.FakeType0Collection, "1", "1", nil, test.Type0A[0]),
name: "ack add A0",
inject: makeWatchResponse(test.FakeType0Collection, "1", true, test.Type0A[0]),
// the first response can not be consider as incremental
wantResources: test.MakeResources(false, test.FakeType0Collection, "1", "1", nil, test.Type0A[0]),
request: test.MakeRequest(true, test.FakeType0Collection, "1", codes.OK),
},
{
Expand Down

0 comments on commit 086fd92

Please sign in to comment.