Skip to content

Commit

Permalink
Merge pull request kubevirt#10838 from xpivarc/isolate-domain-watcher
Browse files Browse the repository at this point in the history
Refactor: Isolate domain watcher
  • Loading branch information
kubevirt-bot authored Sep 5, 2024
2 parents f75acdd + 3573494 commit 084b1b0
Show file tree
Hide file tree
Showing 5 changed files with 348 additions and 313 deletions.
2 changes: 1 addition & 1 deletion cmd/virt-handler/virt-handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ func (app *virtHandlerApp) Run() {
vmiTargetInformer := factory.VMITargetHost(app.HostOverride)

// Wire Domain controller
domainSharedInformer, err := virtcache.NewSharedInformer(app.VirtShareDir, int(app.WatchdogTimeoutDuration.Seconds()), recorder, vmiSourceInformer.GetStore(), time.Duration(app.domainResyncPeriodSeconds)*time.Second)
domainSharedInformer := virtcache.NewSharedInformer(app.VirtShareDir, int(app.WatchdogTimeoutDuration.Seconds()), recorder, vmiSourceInformer.GetStore(), time.Duration(app.domainResyncPeriodSeconds)*time.Second)
if err != nil {
panic(err)
}
Expand Down
1 change: 1 addition & 0 deletions pkg/virt-handler/cache/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ go_library(
name = "go_default_library",
srcs = [
"cache.go",
"domain-watcher.go",
"maps.go",
],
importpath = "kubevirt.io/kubevirt/pkg/virt-handler/cache",
Expand Down
309 changes: 2 additions & 307 deletions pkg/virt-handler/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,26 +21,20 @@ package cache

import (
"fmt"
"net"
"os"
"sync"
"time"

"k8s.io/client-go/tools/record"

k8sv1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/tools/cache"

"kubevirt.io/client-go/log"

"kubevirt.io/kubevirt/pkg/checkpoint"
diskutils "kubevirt.io/kubevirt/pkg/ephemeral-disk-utils"
"kubevirt.io/kubevirt/pkg/util"
cmdclient "kubevirt.io/kubevirt/pkg/virt-handler/cmd-client"
notifyserver "kubevirt.io/kubevirt/pkg/virt-handler/notify-server"
"kubevirt.io/kubevirt/pkg/virt-launcher/virtwrap/api"
)

Expand Down Expand Up @@ -75,38 +69,6 @@ func newIterableCheckpointManager(base string) IterableCheckpointManager {
}
}

const socketDialTimeout = 5

func newListWatchFromNotify(virtShareDir string, watchdogTimeout int, recorder record.EventRecorder, vmiStore cache.Store, resyncPeriod time.Duration) cache.ListerWatcher {
d := &DomainWatcher{
backgroundWatcherStarted: false,
virtShareDir: virtShareDir,
watchdogTimeout: watchdogTimeout,
recorder: recorder,
vmiStore: vmiStore,
unresponsiveSockets: make(map[string]int64),
resyncPeriod: resyncPeriod,
}

return d
}

type DomainWatcher struct {
lock sync.Mutex
wg sync.WaitGroup
stopChan chan struct{}
eventChan chan watch.Event
backgroundWatcherStarted bool
virtShareDir string
watchdogTimeout int
recorder record.EventRecorder
vmiStore cache.Store
resyncPeriod time.Duration

watchDogLock sync.Mutex
unresponsiveSockets map[string]int64
}

type ghostRecord struct {
Name string `json:"name"`
Namespace string `json:"namespace"`
Expand Down Expand Up @@ -289,274 +251,7 @@ func listSockets() ([]string, error) {
return sockets, nil
}

func (d *DomainWatcher) startBackground() error {
d.lock.Lock()
defer d.lock.Unlock()

if d.backgroundWatcherStarted {
return nil
}

d.stopChan = make(chan struct{}, 1)
d.eventChan = make(chan watch.Event, 100)

d.wg.Add(1)
go func() {
defer d.wg.Done()

resyncTicker := time.NewTicker(d.resyncPeriod)
resyncTickerChan := resyncTicker.C
defer resyncTicker.Stop()

// Divide the watchdogTimeout by 3 for our ticker.
// This ensures we always have at least 2 response failures
// in a row before we mark the socket as unavailable (which results in shutdown of VMI)
expiredWatchdogTicker := time.NewTicker(time.Duration((d.watchdogTimeout/3)+1) * time.Second)
defer expiredWatchdogTicker.Stop()

expiredWatchdogTickerChan := expiredWatchdogTicker.C

srvErr := make(chan error)
go func() {
defer close(srvErr)
err := notifyserver.RunServer(d.virtShareDir, d.stopChan, d.eventChan, d.recorder, d.vmiStore)
srvErr <- err
}()

for {
select {
case <-resyncTickerChan:
d.handleResync()
case <-expiredWatchdogTickerChan:
d.handleStaleSocketConnections()
case err := <-srvErr:
if err != nil {
log.Log.Reason(err).Errorf("Unexpected err encountered with Domain Notify aggregation server")
}

// server exitted so this goroutine is done.
return
}
}
}()

d.backgroundWatcherStarted = true
return nil
}

func (d *DomainWatcher) handleResync() {
socketFiles, err := listSockets()
if err != nil {
log.Log.Reason(err).Error("failed to list sockets")
return
}

log.Log.Infof("resyncing virt-launcher domains")
for _, socket := range socketFiles {
client, err := cmdclient.NewClient(socket)
if err != nil {
log.Log.Reason(err).Error("failed to connect to cmd client socket during resync")
// Ignore failure to connect to client.
// These are all local connections via unix socket.
// A failure to connect means there's nothing on the other
// end listening.
continue
}
defer client.Close()

domain, exists, err := client.GetDomain()
if err != nil {
// this resync is best effort only.
log.Log.Reason(err).Errorf("unable to retrieve domain at socket %s during resync", socket)
continue
} else if !exists {
// nothing to sync if it doesn't exist
continue
}

d.eventChan <- watch.Event{Type: watch.Modified, Object: domain}
}
}

func (d *DomainWatcher) handleStaleSocketConnections() error {
var unresponsive []string

socketFiles, err := listSockets()
if err != nil {
log.Log.Reason(err).Error("failed to list sockets")
return err
}

for _, socket := range socketFiles {
sock, err := net.DialTimeout("unix", socket, time.Duration(socketDialTimeout)*time.Second)
if err == nil {
// socket is alive still
sock.Close()
continue
}
unresponsive = append(unresponsive, socket)
}

d.watchDogLock.Lock()
defer d.watchDogLock.Unlock()

now := time.Now().UTC().Unix()

// Add new unresponsive sockets
for _, socket := range unresponsive {
_, ok := d.unresponsiveSockets[socket]
if !ok {
d.unresponsiveSockets[socket] = now
}
}

for key, timeStamp := range d.unresponsiveSockets {
found := false
for _, socket := range unresponsive {
if socket == key {
found = true
break
}
}
// reap old unresponsive sockets
// remove from unresponsive list if not found unresponsive this iteration
if !found {
delete(d.unresponsiveSockets, key)
break
}

diff := now - timeStamp

if diff > int64(d.watchdogTimeout) {

record, exists := findGhostRecordBySocket(key)

if !exists {
// ignore if info file doesn't exist
// this is possible with legacy VMIs that haven't
// been updated. The watchdog file will catch these.
} else {
domain := api.NewMinimalDomainWithNS(record.Namespace, record.Name)
domain.ObjectMeta.UID = record.UID
domain.Spec.Metadata.KubeVirt.UID = record.UID
now := k8sv1.Now()
domain.ObjectMeta.DeletionTimestamp = &now
log.Log.Object(domain).Warningf("detected unresponsive virt-launcher command socket (%s) for domain", key)
d.eventChan <- watch.Event{Type: watch.Modified, Object: domain}

err := cmdclient.MarkSocketUnresponsive(key)
if err != nil {
log.Log.Reason(err).Errorf("Unable to mark vmi as unresponsive socket %s", key)
}
}
}
}

return nil
}

func (d *DomainWatcher) listAllKnownDomains() ([]*api.Domain, error) {
var domains []*api.Domain

socketFiles, err := listSockets()
if err != nil {
return nil, err
}
for _, socketFile := range socketFiles {

exists, err := diskutils.FileExists(socketFile)
if err != nil {
log.Log.Reason(err).Error("failed access cmd client socket")
continue
}

if !exists {
record, recordExists := findGhostRecordBySocket(socketFile)
if recordExists {
domain := api.NewMinimalDomainWithNS(record.Namespace, record.Name)
domain.ObjectMeta.UID = record.UID
now := k8sv1.Now()
domain.ObjectMeta.DeletionTimestamp = &now
log.Log.Object(domain).Warning("detected stale domain from ghost record")
domains = append(domains, domain)
}
continue
}

log.Log.V(3).Infof("List domains from sock %s", socketFile)
client, err := cmdclient.NewClient(socketFile)
if err != nil {
log.Log.Reason(err).Error("failed to connect to cmd client socket")
// Ignore failure to connect to client.
// These are all local connections via unix socket.
// A failure to connect means there's nothing on the other
// end listening.
continue
}
defer client.Close()

domain, exists, err := client.GetDomain()
if err != nil {
log.Log.Reason(err).Error("failed to list domains on cmd client socket")
// Failure to get domain list means that client
// was unable to contact libvirt. As soon as the connection
// is restored on the client's end, a domain notification will
// be sent.
continue
}
if exists {
domains = append(domains, domain)
}
}
return domains, nil
}

func (d *DomainWatcher) List(_ k8sv1.ListOptions) (runtime.Object, error) {

log.Log.V(3).Info("Synchronizing domains")
err := d.startBackground()
if err != nil {
return nil, err
}

domains, err := d.listAllKnownDomains()
if err != nil {
return nil, err
}

list := api.DomainList{
Items: []api.Domain{},
}

for _, domain := range domains {
list.Items = append(list.Items, *domain)
}
return &list, nil
}

func (d *DomainWatcher) Watch(_ k8sv1.ListOptions) (watch.Interface, error) {
return d, nil
}

func (d *DomainWatcher) Stop() {
d.lock.Lock()
defer d.lock.Unlock()

if !d.backgroundWatcherStarted {
return
}
close(d.stopChan)
d.wg.Wait()
d.backgroundWatcherStarted = false
close(d.eventChan)
}

func (d *DomainWatcher) ResultChan() <-chan watch.Event {
return d.eventChan
}

func NewSharedInformer(virtShareDir string, watchdogTimeout int, recorder record.EventRecorder, vmiStore cache.Store, resyncPeriod time.Duration) (cache.SharedInformer, error) {
func NewSharedInformer(virtShareDir string, watchdogTimeout int, recorder record.EventRecorder, vmiStore cache.Store, resyncPeriod time.Duration) cache.SharedInformer {
lw := newListWatchFromNotify(virtShareDir, watchdogTimeout, recorder, vmiStore, resyncPeriod)
informer := cache.NewSharedInformer(lw, &api.Domain{}, 0)
return informer, nil
return cache.NewSharedInformer(lw, &api.Domain{}, 0)
}
Loading

0 comments on commit 084b1b0

Please sign in to comment.