Skip to content

Commit

Permalink
daemon/k8s: Replace Resource[Pod] with Table[LocalPod]
Browse files Browse the repository at this point in the history
As the first conversion of Resource[T] to Table[T],
refactor Resource[Pod] into Table[LocalPod].

Signed-off-by: Jussi Maki <jussi@isovalent.com>
  • Loading branch information
joamaki committed Jan 28, 2025
1 parent 31708b0 commit 6a4fe9c
Showing 18 changed files with 294 additions and 308 deletions.
34 changes: 20 additions & 14 deletions daemon/k8s/pods.go
Original file line number Diff line number Diff line change
@@ -80,14 +80,28 @@ const (

var (
PodNameIndex = newNameIndex[LocalPod]()

PodTableCell = cell.Group(
cell.ProvidePrivate(NewPodTable),
cell.Provide(statedb.RWTable[LocalPod].ToTable),
cell.Invoke(registerPodReflector),
)
PodTableCell = cell.Provide(NewPodTableAndReflector)
)

// NewPodTableAndReflector returns the read-only Table[LocalPod] and registers
// the k8s reflector. These are combined to ensure any dependency on Table[LocalPod]
// will start after the reflector, ensuring that Start hooks can wait for the table
// to initialize.
func NewPodTableAndReflector(jg job.Group, db *statedb.DB, cs client.Clientset) (statedb.Table[LocalPod], error) {
pods, err := NewPodTable(db)
if err != nil {
return nil, err
}

if !cs.IsEnabled() {
return pods, nil
}

cfg := podReflectorConfig(cs, pods)
err = k8s.RegisterReflector(jg, db, cfg)
return pods, err
}

func PodByName(namespace, name string) statedb.Query[LocalPod] {
return PodNameIndex.Query(namespace + "/" + name)
}
@@ -103,14 +117,6 @@ func NewPodTable(db *statedb.DB) (statedb.RWTable[LocalPod], error) {
return tbl, db.RegisterTable(tbl)
}

func registerPodReflector(jg job.Group, db *statedb.DB, cs client.Clientset, pods statedb.RWTable[LocalPod]) error {
if !cs.IsEnabled() {
return nil
}
cfg := podReflectorConfig(cs, pods)
return k8s.RegisterReflector(jg, db, cfg)
}

func podReflectorConfig(cs client.Clientset, pods statedb.RWTable[LocalPod]) k8s.ReflectorConfig[LocalPod] {
lw := utils.ListerWatcherWithModifiers(
utils.ListerWatcherFromTyped(cs.Slim().CoreV1().Pods("")),
13 changes: 0 additions & 13 deletions daemon/k8s/resources.go
Original file line number Diff line number Diff line change
@@ -68,14 +68,6 @@ var (
},
)
},
func(lc cell.Lifecycle, cs client.Clientset) (LocalPodResource, error) {
return k8s.PodResource(
lc, cs,
func(opts *metav1.ListOptions) {
opts.FieldSelector = fields.ParseSelectorOrDie("spec.nodeName=" + nodeTypes.GetName()).String()
},
)
},
),
)
)
@@ -88,10 +80,6 @@ type LocalNodeResource resource.Resource[*slim_corev1.Node]
// CiliumNode object associated with the node we are currently running on.
type LocalCiliumNodeResource resource.Resource[*cilium_api_v2.CiliumNode]

// LocalPodResource is a resource.Resource[*slim_corev1.Pod] but one which will only stream updates for pod
// objects scheduled on the node we are currently running on.
type LocalPodResource resource.Resource[*slim_corev1.Pod]

// Resources is a convenience struct to group all the agent k8s resources as cell constructor parameters.
type Resources struct {
cell.In
@@ -100,7 +88,6 @@ type Resources struct {
Endpoints resource.Resource[*k8s.Endpoints]
LocalNode LocalNodeResource
LocalCiliumNode LocalCiliumNodeResource
LocalPods LocalPodResource
Namespaces resource.Resource[*slim_corev1.Namespace]
NetworkPolicies resource.Resource[*slim_networkingv1.NetworkPolicy]
CiliumNetworkPolicies resource.Resource[*cilium_api_v2.CiliumNetworkPolicy]
9 changes: 9 additions & 0 deletions daemon/k8s/script_test.go
Original file line number Diff line number Diff line change
@@ -9,9 +9,11 @@ import (
"os"
"testing"

"github.com/cilium/hive/cell"
"github.com/cilium/hive/hivetest"
"github.com/cilium/hive/script"
"github.com/cilium/hive/script/scripttest"
"github.com/cilium/statedb"
"github.com/spf13/pflag"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
@@ -37,6 +39,13 @@ func TestScript(t *testing.T) {
h := hive.New(
client.FakeClientCell,
TablesCell,

// Instantiate the tables we're testing. Without this the
// tables and reflectors would not be created (as nothing
// would depend on them).
cell.Invoke(
func(statedb.Table[LocalPod]) {},
),
)

flags := pflag.NewFlagSet("", pflag.ContinueOnError)
11 changes: 4 additions & 7 deletions daemon/k8s/tables.go
Original file line number Diff line number Diff line change
@@ -10,15 +10,12 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

// ResourcesCell provides a set of handles to Kubernetes resources used throughout the
// agent. Each of the resources share a client-go informer and backing store so we only
// have one watch API call for each resource kind and that we maintain only one copy of each object.
//
// See pkg/k8s/resource/resource.go for documentation on the Resource[T] type.

// TablesCell provides a set of StateDB tables for common Kubernetes objects.
// The tables are populated with the StateDB k8s reflector (pkg/k8s/statedb.go).
// Some tables are provided as OnDemand[Table[T]]
//
// NOTE: When adding new k8s tables make sure to provide and register from a
// single provider to ensure reflector starts before anyone depending on the table.
// See [NewPodTableAndReflector] for example.
var TablesCell = cell.Module(
"k8s-tables",
"StateDB tables of Kubernetes objects",
1 change: 1 addition & 0 deletions pkg/ciliumenvoyconfig/exp_test.go
Original file line number Diff line number Diff line change
@@ -48,6 +48,7 @@ func TestScript(t *testing.T) {
h := hive.New(
client.FakeClientCell,
daemonk8s.ResourcesCell,
daemonk8s.TablesCell,
cell.Config(cecConfig{}),
cell.Config(envoy.ProxyConfig{}),
experimental.Cell,
7 changes: 5 additions & 2 deletions pkg/ipam/metadata/cell.go
Original file line number Diff line number Diff line change
@@ -5,6 +5,7 @@ package metadata

import (
"github.com/cilium/hive/cell"
"github.com/cilium/statedb"

"github.com/cilium/cilium/daemon/k8s"
"github.com/cilium/cilium/pkg/ipam"
@@ -28,7 +29,8 @@ type managerParams struct {
DaemonConfig *option.DaemonConfig

NamespaceResource resource.Resource[*slim_core_v1.Namespace]
PodResource k8s.LocalPodResource
DB *statedb.DB
Pods statedb.Table[k8s.LocalPod]
}

func newIPAMMetadataManager(params managerParams) Manager {
@@ -37,8 +39,9 @@ func newIPAMMetadataManager(params managerParams) Manager {
}

manager := &manager{
db: params.DB,
namespaceResource: params.NamespaceResource,
podResource: params.PodResource,
pods: params.Pods,
}
params.Lifecycle.Append(manager)

22 changes: 6 additions & 16 deletions pkg/ipam/metadata/manager.go
Original file line number Diff line number Diff line change
@@ -8,6 +8,7 @@ import (
"strings"

"github.com/cilium/hive/cell"
"github.com/cilium/statedb"
"k8s.io/apimachinery/pkg/util/validation"

"github.com/cilium/cilium/daemon/k8s"
@@ -57,10 +58,10 @@ type Manager interface {
}

type manager struct {
db *statedb.DB
namespaceResource resource.Resource[*slim_core_v1.Namespace]
namespaceStore resource.Store[*slim_core_v1.Namespace]
podResource k8s.LocalPodResource
podStore resource.Store[*slim_core_v1.Pod]
pods statedb.Table[k8s.LocalPod]
}

func (m *manager) Start(ctx cell.HookContext) (err error) {
@@ -69,17 +70,11 @@ func (m *manager) Start(ctx cell.HookContext) (err error) {
return fmt.Errorf("failed to obtain namespace store: %w", err)
}

m.podStore, err = m.podResource.Store(ctx)
if err != nil {
return fmt.Errorf("failed to obtain pod store: %w", err)
}

return nil
}

func (m *manager) Stop(ctx cell.HookContext) error {
m.namespaceStore = nil
m.podStore = nil
return nil
}

@@ -117,7 +112,7 @@ func determinePoolByAnnotations(annotations map[string]string, family ipam.Famil
}

func (m *manager) GetIPPoolForPod(owner string, family ipam.Family) (pool string, err error) {
if m.namespaceStore == nil || m.podStore == nil {
if m.namespaceStore == nil || m.pods == nil {
return "", &ManagerStoppedError{}
}

@@ -133,13 +128,8 @@ func (m *manager) GetIPPoolForPod(owner string, family ipam.Family) (pool string
}

// Check annotation on pod
pod, ok, err := m.podStore.GetByKey(resource.Key{
Name: name,
Namespace: namespace,
})
if err != nil {
return "", fmt.Errorf("failed to lookup pod %q: %w", namespace+"/"+name, err)
} else if !ok {
pod, _, found := m.pods.Get(m.db.ReadTxn(), k8s.PodByName(namespace, name))
if !found {
return "", &ResourceNotFound{Resource: "Pod", Namespace: namespace, Name: name}
} else if ippool, ok := determinePoolByAnnotations(pod.Annotations, family); ok {
return ippool, nil
107 changes: 49 additions & 58 deletions pkg/ipam/metadata/manager_test.go
Original file line number Diff line number Diff line change
@@ -7,9 +7,11 @@ import (
"errors"
"testing"

"github.com/cilium/statedb"
"github.com/stretchr/testify/require"
"k8s.io/client-go/tools/cache"

"github.com/cilium/cilium/daemon/k8s"
"github.com/cilium/cilium/pkg/annotation"
"github.com/cilium/cilium/pkg/ipam"
"github.com/cilium/cilium/pkg/k8s/resource"
@@ -52,21 +54,18 @@ func (m mockStore[T]) Release() {
panic("not implemented")
}

func podKey(ns, name string) resource.Key {
return resource.Key{
Namespace: ns,
Name: name,
}
}

func namespaceKey(name string) resource.Key {
return resource.Key{
Name: name,
}
}

func TestManager_GetIPPoolForPod(t *testing.T) {
db := statedb.New()
pods, err := k8s.NewPodTable(db)
require.NoError(t, err, "NewPodTable")
m := &manager{
db: db,
namespaceStore: mockStore[*slim_core_v1.Namespace]{
namespaceKey("default"): &slim_core_v1.Namespace{},
namespaceKey("special"): &slim_core_v1.Namespace{
@@ -77,60 +76,52 @@ func TestManager_GetIPPoolForPod(t *testing.T) {
},
},
},
podStore: mockStore[*slim_core_v1.Pod]{
podKey("default", "client"): &slim_core_v1.Pod{},
podKey("default", "custom-workload"): &slim_core_v1.Pod{
ObjectMeta: slim_meta_v1.ObjectMeta{
Annotations: map[string]string{
annotation.IPAMPoolKey: "custom-pool",
},
},
},
podKey("default", "custom-workload2"): &slim_core_v1.Pod{
ObjectMeta: slim_meta_v1.ObjectMeta{
Annotations: map[string]string{
annotation.IPAMIPv4PoolKey: "ipv4-pool",
},
},
},
podKey("default", "custom-workload3"): &slim_core_v1.Pod{
ObjectMeta: slim_meta_v1.ObjectMeta{
Annotations: map[string]string{
annotation.IPAMIPv4PoolKey: "ipv4-pool",
annotation.IPAMPoolKey: "custom-pool",
},
},
},
podKey("default", "custom-workload4"): &slim_core_v1.Pod{
ObjectMeta: slim_meta_v1.ObjectMeta{
Annotations: map[string]string{
annotation.IPAMIPv4PoolKey: "ipv4-pool",
annotation.IPAMIPv6PoolKey: "ipv6-pool",
},
},
},
podKey("default", "custom-workload5"): &slim_core_v1.Pod{
ObjectMeta: slim_meta_v1.ObjectMeta{
Annotations: map[string]string{
annotation.IPAMIPv4PoolKey: "ipv4-pool",
annotation.IPAMIPv6PoolKey: "ipv6-pool",
annotation.IPAMPoolKey: "custom-pool",
},
},
},
pods: pods,
}

podKey("special", "server"): &slim_core_v1.Pod{},
podKey("special", "server2"): &slim_core_v1.Pod{
ObjectMeta: slim_meta_v1.ObjectMeta{
Annotations: map[string]string{
annotation.IPAMPoolKey: "pod-pool",
},
},
txn := db.WriteTxn(pods)
newPod := func(namespace, name string, annotations map[string]string) k8s.LocalPod {
return k8s.LocalPod{Pod: &slim_core_v1.Pod{
ObjectMeta: slim_meta_v1.ObjectMeta{
Namespace: namespace,
Name: name,
Annotations: annotations,
},

podKey("missing-ns", "pod"): &slim_core_v1.Pod{},
},
}}
}
pods.Insert(txn, newPod("default", "client", nil))
pods.Insert(txn, newPod("default", "custom-workload",
map[string]string{
annotation.IPAMPoolKey: "custom-pool",
}))
pods.Insert(txn, newPod("default", "custom-workload2",
map[string]string{
annotation.IPAMIPv4PoolKey: "ipv4-pool",
}))
pods.Insert(txn, newPod("default", "custom-workload3",
map[string]string{
annotation.IPAMIPv4PoolKey: "ipv4-pool",
annotation.IPAMPoolKey: "custom-pool",
}))
pods.Insert(txn, newPod("default", "custom-workload4",
map[string]string{
annotation.IPAMIPv4PoolKey: "ipv4-pool",
annotation.IPAMIPv6PoolKey: "ipv6-pool",
}))
pods.Insert(txn, newPod("default", "custom-workload5",
map[string]string{
annotation.IPAMIPv4PoolKey: "ipv4-pool",
annotation.IPAMIPv6PoolKey: "ipv6-pool",
annotation.IPAMPoolKey: "custom-pool",
}))

pods.Insert(txn, newPod("special", "server", nil))
pods.Insert(txn, newPod("special", "server2",
map[string]string{
annotation.IPAMPoolKey: "pod-pool",
}))
pods.Insert(txn, newPod("missing-ns", "pod", nil))
txn.Commit()

tests := []struct {
name string
Loading
Oops, something went wrong.

0 comments on commit 6a4fe9c

Please sign in to comment.