Skip to content

Commit

Permalink
feat: 新增example demo
Browse files Browse the repository at this point in the history
fix: 修改code review代码格式

fix: 修改code review代码格式

fix: 去掉image图片

fix: 修改PR代码格式问题
  • Loading branch information
googs1025 authored and astraw99 committed Dec 13, 2022
1 parent ec1bcd1 commit bd58669
Show file tree
Hide file tree
Showing 26 changed files with 1,609 additions and 0 deletions.
189 changes: 189 additions & 0 deletions demo/examples/controller/controller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
package main

import (
"K8s_demo/demo/examples/init-client"
"fmt"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"
"time"
)

// 控制器
type Controller struct {
// 支持插入index的本地缓存
indexer cache.Indexer
// 工作队列,把监听到的资源放入队列
queue workqueue.RateLimitingInterface
// informer控制器
informer cache.Controller
}

func NewController(queue workqueue.RateLimitingInterface, indexer cache.Indexer, informer cache.Controller) *Controller {
return &Controller{
indexer: indexer,
queue: queue,
informer: informer,
}
}

// Run 开始 watch 和同步
func (c *Controller) Run(threadNum int, stopC chan struct{}) {
defer runtime.HandleCrash()

// 停止控制器后关掉队列
defer c.queue.ShutDown()

klog.Info("Controller Started!")

// 启动informer监听
go c.informer.Run(stopC)

// 等待所有相关的缓存同步,然后再开始处理队列中的资源
if !cache.WaitForCacheSync(stopC, c.informer.HasSynced) {
runtime.HandleError(fmt.Errorf("%s", "Timed out waiting for caches to sync"))
return
}
// 启动worker数量
for i := 0 ; i < threadNum; i++ {
go wait.Until(c.runWorker, time.Second, stopC)
}
<- stopC

klog.Info("Stopping controller")

}

func (c *Controller) runWorker() {
for c.processNextItem() {

}
}

func (c *Controller) processNextItem() bool {
// 等到工作队列中有一个新元素
key, quit := c.queue.Get()
if quit {
return false
}
// 告诉队列已经完成了处理此 key 的操作
// 这将为其他 worker 解锁该 key
// 这将确保安全的并行处理,因为永远不会并行处理具有相同 key 的两个Pod
defer c.queue.Done(key)

// 调用包含业务逻辑的方法
err := c.syncToStdout(key.(string))
c.handleErr(err, key) // 如果业务逻辑有错,需要handle
return true

}

// syncToStdout 是控制器的业务逻辑实现
// 在此控制器中,它只是将有关 Pod 的信息打印到 stdout
// 如果发生错误,则简单地返回错误
// 此外重试逻辑不应成为业务逻辑的一部分。
func (c *Controller) syncToStdout(key string) error {
// 从本地存储中获取 key 对应的对象
obj, exists, err := c.indexer.GetByKey(key)
if err != nil {
klog.Errorf("Fetching object with key %s from store failed with %v", key, err)
return err
}

// 如果不存在,打印不存在;如果存在,打印
if !exists {
fmt.Printf("Pod %s does not exists anymore\n", key)
} else {
fmt.Printf("Sync/Add/Update for Pod %s\n", obj.(*v1.Pod).GetName())
}


return nil
}

// handleErr 错误处理:检查是否发生错误,并确保重试次数
func (c *Controller) handleErr(err error, key interface{}) {
// 忘记每次成功同步时 key 的#AddRateLimited历史记录。
// 这样可以确保不会因过时的错误历史记录而延迟此 key 更新的以后处理。
if err == nil {
c.queue.Forget(key)
return
}
// 如果有问题,重新放入控制器5次
if c.queue.NumRequeues(key) < 5 {
klog.Infof("Error syncing pod %v: %v", key, err)
// 重新加入 key 到限速队列
// 根据队列上的速率限制器和重新入队历史记录,稍后将再次处理该 key
c.queue.AddRateLimited(key)
return
}
// 多次重试,也无法成功处理该key
c.queue.Forget(key)
runtime.HandleError(err)
klog.Infof("Dropping pod %q out of the queue: %v", key, err)

}


func main() {
client := init_client.ClientSet.Client
// 创建 资源 ListWatcher
podListWatcher := cache.NewListWatchFromClient(client.CoreV1().RESTClient(), "pods", v1.NamespaceDefault, fields.Everything())
// 创建队列
queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())

// 在 informer 的帮助下,将工作队列绑定到缓存
// 这样,我们确保无论何时更新缓存,都将 pod key 添加到工作队列中
// 注意,当我们最终从工作队列中处理元素时,我们可能会看到 Pod 的版本比响应触发更新的版本新
indexer, informer := cache.NewIndexerInformer(podListWatcher, &v1.Pod{}, 0, cache.ResourceEventHandlerFuncs{
// 回调:主要informer监听后,需要放入worker queue队列

AddFunc: func(obj interface{}) {
key, err := cache.MetaNamespaceKeyFunc(obj)
if err != nil {
klog.Errorf(err.Error())
}
queue.Add(key)
},

UpdateFunc: func(oldObj, newObj interface{}) {
key, err := cache.MetaNamespaceKeyFunc(newObj)
if err != nil {
klog.Errorf(err.Error())
}
queue.Add(key)
},

DeleteFunc: func(obj interface{}) {
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
if err != nil {
klog.Errorf(err.Error())
}
queue.Add(key)
},
}, cache.Indexers{})

// controller
controller := NewController(queue, indexer, informer)

_ = indexer.Add(&v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "mypod",
Namespace: v1.NamespaceDefault,
},
})

// start controller
stopCh := make(chan struct{})
defer close(stopCh)
// 启动controller
go controller.Run(1, stopCh)

select {}

}
46 changes: 46 additions & 0 deletions demo/examples/convert-type/convert_unstructure_type_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package convert_type

import (
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"reflect"
"testing"
)

func TestConvertType(t *testing.T) {

unstructuredConfigMap := unstructured.Unstructured{
Object: map[string]interface{}{
"apiVersion": "v1",
"kind": "ConfigMap",
"metadata": map[string]interface{}{
"creationTimestamp": nil,
"namespace": "default",
"name": "my-configmap",
},
"data": map[string]interface{}{
"foo": "bar",
},
},
}

// Unstructured -> Typed
var typeConfigMap corev1.ConfigMap
err := runtime.DefaultUnstructuredConverter.FromUnstructured(unstructuredConfigMap.Object, &typeConfigMap)
if err != nil {
panic(err.Error())
}
if typeConfigMap.GetName() != "my-configmap" {
panic("Typed config map has unexpected data")
}

// Typed -> Unstructured
object, err := runtime.DefaultUnstructuredConverter.ToUnstructured(&typeConfigMap)
if err != nil {
panic(err.Error())
}
if !reflect.DeepEqual(unstructured.Unstructured{Object: object}, unstructuredConfigMap) {
panic("Unstructured config map has unexpected data")
}
}
18 changes: 18 additions & 0 deletions demo/examples/dynamic/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
### 常见的资源使用dynamic client客户端操作

#### 动态客户端常用的 GVR 对应的是k8s中资源的标示。
```bigquery
// k8s.io/apimachinery/pkg/runtime/schema/group_version.go
// 对应一个 http 路径
type GroupVersionResource struct {
Group string
Version string
Resource string
}
// 对应一个golang struct
type GroupVersionKind struct {
Group string
Version string
Kind string
}
```
85 changes: 85 additions & 0 deletions demo/examples/dynamic/dynamic_practice_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package dynamic

import (
"K8s_demo/demo/examples/init-client"
"context"
"fmt"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"log"
"reflect"
"testing"
)

/*
使用动态客户端 crud资源。
*/

func TestDynamicClient(t *testing.T) {

client := init_client.ClientSet.DynamicClient

namespace := "default"
res := schema.GroupVersionResource{
Group: "",
Version: "v1",
Resource: "configmaps",
}

unstructuredObj := &unstructured.Unstructured{
Object: map[string]interface{}{
"apiVersion": "v1",
"kind": "ConfigMap",
"metadata": map[string]interface{}{
"namespace": namespace,
"generateName": "crud-dynamic-simple-",
},
"data": map[string]interface{}{
"foo": "bar",
},
},
}

obj, err := client.Resource(res).Namespace("default").
Create(context.Background(), unstructuredObj, metav1.CreateOptions{})
if err != nil {
fmt.Println("create error:", err)
}

fmt.Printf("create configmap:%s\n", obj.GetName())

data, _, _ := unstructured.NestedStringMap(obj.Object, "data")
if !reflect.DeepEqual(map[string]string{"foo": "bar"}, data) {
log.Fatal("Created ConfigMap has unexpected data")
}

getObj, err := client.Resource(res).Namespace(namespace).
Get(context.Background(), obj.GetName(), metav1.GetOptions{})
if err != nil {
fmt.Println("get error:")
}

fmt.Printf("get configmap:%s\n", getObj.GetName())

err = unstructured.SetNestedField(getObj.Object, "qux", "data", "foo")
if err != nil {
fmt.Println("operator error:", err)
}
updateObj, err := client.Resource(res).Namespace(namespace).
Update(context.Background(), getObj, metav1.UpdateOptions{})
if err != nil {
fmt.Println("update error:", err)
}

fmt.Printf("update ConfigMap %s\n", updateObj.GetName())

// Delete
err = client.Resource(res).Namespace(namespace).
Delete(context.Background(), updateObj.GetName(), metav1.DeleteOptions{})
if err != nil {
fmt.Println("delete error:", err)
}
fmt.Printf("Deleted ConfigMap %s\n", updateObj.GetName())

}
Empty file.
49 changes: 49 additions & 0 deletions demo/examples/fake-client-test/fakeClient_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package fake_client_test

import (
. "K8s_demo/demo/examples/init-client"
"context"
"fmt"
v1 "k8s.io/api/core/v1"
v12 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes/fake"
"testing"
)

func TestFakeClient(t *testing.T) {
// 方法一:直接调用
client := fake.NewSimpleClientset(
NewPod("abc", "pod1"), // 在fake client插入数据
)
ctx := context.Background()
pod, err := client.CoreV1().Pods("abc").Get(ctx, "pod1", v12.GetOptions{})
fmt.Println(pod, err)

// 方法二:组合成结构体方式。
clientset := fake.NewSimpleClientset(
NewPod("abc", "pod2"),
)
ClientSet.Client = clientset
pod2, err := ClientSet.Client.CoreV1().Pods("abc").Get(ctx, "pod2", v12.GetOptions{})
fmt.Println(pod2, err)

}

func TestPodPatch(t *testing.T) {
pod := NewPod("abc", "p1")

// 加入进去。
//pod.Annotations = map[string]string{
// "try": "aaa",
//}
clientSet := fake.NewSimpleClientset(pod)
_ = PatchPod(pod, clientSet)

}

func NewPod(namespace string, name string) *v1.Pod {
pod := &v1.Pod{}
pod.Name = name
pod.Namespace = namespace
return pod
}
Loading

0 comments on commit bd58669

Please sign in to comment.