Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix FirewallRules concurency issues using a status known LastTransiti… #124

Merged
merged 3 commits into from
Jul 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions api/v1alpha1/firewallrule_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,12 @@ type FirewallRuleStatus struct {
// The latest FirewallRule specification applied, used to make API requests to cloud providers only if the resource has been changed to avoid throttling issues.
LastApplied *string `json:"lastApplied,omitempty"`

// lastTransitionTime is the last time the status transitioned from one status to another.
// +kubebuilder:validation:Required
// +kubebuilder:validation:Type=string
// +kubebuilder:validation:Format=date-time
LastTransitionTime metav1.Time `json:"lastTransitionTime"`

// The firewall rule identifier
FirewallRuleID *string `json:"firewallRuleID,omitempty"`

Expand Down
1 change: 1 addition & 0 deletions api/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 7 additions & 0 deletions config/crd/bases/kubestatic.quortex.io_firewallrules.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -126,12 +126,19 @@ spec:
make API requests to cloud providers only if the resource has been
changed to avoid throttling issues.
type: string
lastTransitionTime:
description: lastTransitionTime is the last time the status transitioned
from one status to another.
format: date-time
type: string
networkInterfaceID:
description: The network interface identifier
type: string
state:
description: The current state of the FirewallRule
type: string
required:
- lastTransitionTime
type: object
type: object
served: true
Expand Down
102 changes: 87 additions & 15 deletions controllers/firewallrule_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/google/uuid"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/utils/pointer"
Expand All @@ -47,9 +48,10 @@ const (
// FirewallRuleReconciler reconciles a FirewallRule object
type FirewallRuleReconciler struct {
client.Client
Log logr.Logger
Scheme *runtime.Scheme
Provider provider.Provider
Log logr.Logger
Scheme *runtime.Scheme
Provider provider.Provider
frLastTransitionTime map[string]metav1.Time
}

//+kubebuilder:rbac:groups=kubestatic.quortex.io,resources=firewallrules,verbs=get;list;watch;create;update;patch;delete
Expand Down Expand Up @@ -82,6 +84,26 @@ func (r *FirewallRuleReconciler) Reconcile(ctx context.Context, req ctrl.Request
return ctrl.Result{}, nil
}

// LastTransitionTime is not set. This should happen when kubestatic is
// upgraded from a version that does not support this field, we set it with
// the current time.
if firewallRule.Status.LastTransitionTime.IsZero() {
firewallRule.Status.LastTransitionTime = metav1.Now()
if err := r.Status().Update(ctx, firewallRule); err != nil {
log.Error(err, "Failed to update FirewallRule state", "firewallRule", firewallRule.Name)
return ctrl.Result{}, err
}
r.frLastTransitionTime[firewallRule.Name] = firewallRule.Status.LastTransitionTime
}

// Check for LastTransitionTime consistency, if not, requeueing.
knownLastTransitionTime := r.frLastTransitionTime[firewallRule.Name]
if firewallRule.Status.LastTransitionTime.Before(&knownLastTransitionTime) {
log.V(1).Info("FirewallRule LastTransitionTime inconsistency, requeuing in 1 second")
return ctrl.Result{RequeueAfter: time.Second}, nil
}
r.frLastTransitionTime[firewallRule.Name] = firewallRule.Status.LastTransitionTime

// Lifecycle reconciliation
if firewallRule.ObjectMeta.DeletionTimestamp.IsZero() {
return r.reconcileFirewallRule(ctx, log, firewallRule)
Expand Down Expand Up @@ -120,9 +142,18 @@ func (r *FirewallRuleReconciler) reconcileFirewallRule(ctx context.Context, log

// Check for other rules associated to the node.
// If there is already one, we update the group of rules, if not, we create a new group.
rulesAssociated := v1alpha1.FilterFirewallRules(frs.Items, func(fr v1alpha1.FirewallRule) bool {
return fr.Name != rule.Name && fr.Status.State != v1alpha1.FirewallRuleStateNone
})
rulesAssociated := []v1alpha1.FirewallRule{}
for _, fr := range frs.Items {
knownLastTransitionTime := r.frLastTransitionTime[fr.Name]
if !fr.Status.LastTransitionTime.Equal(&knownLastTransitionTime) {
log.V(1).Info("FirewallRule LastTransitionTime inconsistency, requeuing in 1 second", "firewallRuleName", fr.Name)
return ctrl.Result{RequeueAfter: time.Second}, nil
}
if fr.Name != rule.Name && fr.Status.State != v1alpha1.FirewallRuleStateNone {
rulesAssociated = append(rulesAssociated, fr)
}
}

if len(rulesAssociated) > 0 {
firewallRuleID := helper.StringValue(rulesAssociated[0].Status.FirewallRuleID)
log.V(1).Info("Updating FirewallRule group", "firewallRuleID", firewallRuleID)
Expand Down Expand Up @@ -156,6 +187,7 @@ func (r *FirewallRuleReconciler) reconcileFirewallRule(ctx context.Context, log
log.Info("Created firewall rule", "id", id)

// Update status
rule.Status.LastTransitionTime = metav1.Now()
rule.Status.State = v1alpha1.FirewallRuleStateReserved
rule.Status.FirewallRuleID = &id
lastApplied, err := json.Marshal(rule.Spec)
Expand All @@ -164,7 +196,13 @@ func (r *FirewallRuleReconciler) reconcileFirewallRule(ctx context.Context, log
}
rule.Status.LastApplied = helper.StringPointerOrNil(string(lastApplied))
log.V(1).Info("Updating FirewallRule", "state", rule.Status.State, "firewallRuleID", rule.Status.FirewallRuleID)
return ctrl.Result{RequeueAfter: time.Second * 5}, r.Status().Update(ctx, rule)
if err = r.Status().Update(ctx, rule); err != nil {
log.Error(err, "Failed to update FirewallRule status", "firewallRule", rule.Name, "status", rule.Status.State)
return ctrl.Result{}, err
}
r.frLastTransitionTime[rule.Name] = rule.Status.LastTransitionTime
return ctrl.Result{RequeueAfter: time.Second * 5}, nil

} else if rule.Spec.NodeName != nil {
lastApplied := &v1alpha1.FirewallRuleSpec{}
if err := json.Unmarshal([]byte(helper.StringValue(rule.Status.LastApplied)), lastApplied); err != nil {
Expand All @@ -185,8 +223,17 @@ func (r *FirewallRuleReconciler) reconcileFirewallRule(ctx context.Context, log
log.Error(err, "Unable to list FirewallRules")
return ctrl.Result{}, err
}
rules := []v1alpha1.FirewallRule{}
for _, fr := range frs.Items {
knownLastTransitionTime := r.frLastTransitionTime[fr.Name]
if !fr.Status.LastTransitionTime.Equal(&knownLastTransitionTime) {
log.V(1).Info("FirewallRule LastTransitionTime inconsistency, requeuing in 1 second", "firewallRuleName", fr.Name)
return ctrl.Result{RequeueAfter: time.Second}, nil
}
rules = append(rules, fr)
}
log.V(1).Info("Updating FirewallRule group", "firewallRuleID", firewallRuleID)
_, err = r.Provider.UpdateFirewallRuleGroup(ctx, encodeUpdateFirewallRuleGroupRequest(firewallRuleID, frs.Items))
_, err = r.Provider.UpdateFirewallRuleGroup(ctx, encodeUpdateFirewallRuleGroupRequest(firewallRuleID, rules))
} else {
log.V(1).Info("Updating FirewallRule", "firewallRuleID", firewallRuleID)
_, err = r.Provider.UpdateFirewallRule(ctx, encodeUpdateFirewallRuleRequest(firewallRuleID, rule))
Expand All @@ -203,9 +250,15 @@ func (r *FirewallRuleReconciler) reconcileFirewallRule(ctx context.Context, log
if err != nil {
return ctrl.Result{}, fmt.Errorf("Failed to marshal last applied firewallrule: %w", err)
}
rule.Status.LastTransitionTime = metav1.Now()
rule.Status.LastApplied = helper.StringPointerOrNil(string(lastApplied))
log.V(1).Info("Updating FirewallRule", "state", rule.Status.State, "firewallRuleID", rule.Status.FirewallRuleID)
return ctrl.Result{RequeueAfter: time.Second * 5}, r.Status().Update(ctx, rule)
if err = r.Status().Update(ctx, rule); err != nil {
log.Error(err, "Failed to update FirewallRule status", "firewallRule", rule.Name, "status", rule.Status.State)
return ctrl.Result{}, err
}
r.frLastTransitionTime[rule.Name] = rule.Status.LastTransitionTime
return ctrl.Result{RequeueAfter: time.Second * 5}, nil
}
}

Expand Down Expand Up @@ -262,11 +315,17 @@ func (r *FirewallRuleReconciler) reconcileFirewallRule(ctx context.Context, log
log.Info("Associated firewall rule", "firewallRuleID", *rule.Status.FirewallRuleID, "instanceID", instanceID, "networkInterfaceID", networkInterface.NetworkInterfaceID)

// Update status
rule.Status.LastTransitionTime = metav1.Now()
rule.Status.State = v1alpha1.FirewallRuleStateAssociated
rule.Status.InstanceID = &instanceID
rule.Status.NetworkInterfaceID = &networkInterface.NetworkInterfaceID
log.V(1).Info("Updating FirewallRule", "state", rule.Status.State, "instanceID", rule.Status.InstanceID, "networkInterfaceID", rule.Status.NetworkInterfaceID)
return ctrl.Result{RequeueAfter: time.Second * 5}, r.Status().Update(ctx, rule)
if err = r.Status().Update(ctx, rule); err != nil {
log.Error(err, "Failed to update FirewallRule status", "firewallRule", rule.Name, "status", rule.Status.State)
return ctrl.Result{}, err
}
r.frLastTransitionTime[rule.Name] = rule.Status.LastTransitionTime
return ctrl.Result{RequeueAfter: time.Second * 5}, nil
}

// FirewallRule reliability check
Expand All @@ -283,12 +342,14 @@ func (r *FirewallRuleReconciler) reconcileFirewallRule(ctx context.Context, log
log.Info("Node not found. Set state back to Reserved", "nodeName", rule.Spec.NodeName)

// Set status back to Reserved
rule.Status.LastTransitionTime = metav1.Now()
rule.Status.State = v1alpha1.FirewallRuleStateReserved
log.V(1).Info("Updating FirewallRule", "state", rule.Status.State, "InstanceID", rule.Status.InstanceID)
if err = r.Status().Update(ctx, rule); err != nil {
log.Error(err, "Failed to update FirewallRule status", "firewallRule", rule.Name, "status", rule.Status.State)
return ctrl.Result{}, err
}
r.frLastTransitionTime[rule.Name] = rule.Status.LastTransitionTime

rule.Spec.NodeName = nil
return ctrl.Result{}, r.Update(ctx, rule)
Expand Down Expand Up @@ -344,7 +405,7 @@ func (r *FirewallRuleReconciler) clearFirewallRule(ctx context.Context, log logr

toDelete := false
if r.Provider.HasGroupedFirewallRules() {
// List FirewallRules with identical nodeName
// List FirewallRules
frs := &v1alpha1.FirewallRuleList{}
if err := r.List(ctx, frs); err != nil {
log.Error(err, "Unable to list FirewallRules")
Expand All @@ -353,9 +414,17 @@ func (r *FirewallRuleReconciler) clearFirewallRule(ctx context.Context, log logr

// Check for other rules associated to the node.
// If there is other ones, we only update the group of rules, if not, we also disassociate the group.
rules := v1alpha1.FilterFirewallRules(frs.Items, func(fr v1alpha1.FirewallRule) bool {
return fr.Name != rule.Name && helper.StringValue(fr.Status.FirewallRuleID) == helper.StringValue(rule.Status.FirewallRuleID)
})
rules := []v1alpha1.FirewallRule{}
for _, fr := range frs.Items {
knownLastTransitionTime := r.frLastTransitionTime[fr.Name]
if !fr.Status.LastTransitionTime.Equal(&knownLastTransitionTime) {
log.V(1).Info("FirewallRule LastTransitionTime inconsistency, requeuing in 1 second", "firewallRuleName", fr.Name)
return ctrl.Result{RequeueAfter: time.Second}, nil
}
if fr.Name != rule.Name && helper.StringValue(fr.Status.FirewallRuleID) == helper.StringValue(rule.Status.FirewallRuleID) {
rules = append(rules, fr)
}
}
if len(rules) > 0 {
log.V(1).Info("Updating FirewallRule", "firewallRuleID", firewallRuleID)
if _, err := r.Provider.UpdateFirewallRuleGroup(ctx, encodeUpdateFirewallRuleGroupRequest(firewallRuleID, rules)); err != nil {
Expand Down Expand Up @@ -403,12 +472,13 @@ func (r *FirewallRuleReconciler) clearFirewallRule(ctx context.Context, log logr
}

// Update status
rule.Status = v1alpha1.FirewallRuleStatus{State: v1alpha1.FirewallRuleStateNone}
rule.Status = v1alpha1.FirewallRuleStatus{State: v1alpha1.FirewallRuleStateNone, LastTransitionTime: metav1.Now()}
log.V(1).Info("Updating FirewallRule", "state", rule.Status.State)
if err := r.Status().Update(ctx, rule); err != nil {
log.Error(err, "Failed to update FirewallRule state", "firewallRule", rule.Name)
return ctrl.Result{}, err
}
r.frLastTransitionTime[rule.Name] = rule.Status.LastTransitionTime

return ctrl.Result{RequeueAfter: time.Second * 5}, nil
}
Expand All @@ -423,6 +493,8 @@ func (r *FirewallRuleReconciler) SetupWithManager(mgr ctrl.Manager) error {
return err
}

r.frLastTransitionTime = make(map[string]metav1.Time)

return ctrl.NewControllerManagedBy(mgr).
For(&v1alpha1.FirewallRule{}).
Complete(r)
Expand Down
4 changes: 2 additions & 2 deletions helm/kubestatic/Chart.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@ type: application
# This is the chart version. This version number should be incremented each time you make changes
# to the chart and its templates, including the app version.
# Versions are expected to follow Semantic Versioning (https://semver.org/)
version: 0.10.1
version: 0.11.0

# This is the version number of the application being deployed. This version number should be
# incremented each time you make changes to the application. Versions are not expected to
# follow Semantic Versioning. They should reflect the version the application is using.
# It is recommended to use it with quotes.
appVersion: 0.10.1
appVersion: 0.11.0
2 changes: 1 addition & 1 deletion helm/kubestatic/README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# kubestatic

![Version: 0.10.1](https://img.shields.io/badge/Version-0.10.1-informational?style=flat-square) ![Type: application](https://img.shields.io/badge/Type-application-informational?style=flat-square) ![AppVersion: 0.10.1](https://img.shields.io/badge/AppVersion-0.10.1-informational?style=flat-square)
![Version: 0.11.0](https://img.shields.io/badge/Version-0.11.0-informational?style=flat-square) ![Type: application](https://img.shields.io/badge/Type-application-informational?style=flat-square) ![AppVersion: 0.11.0](https://img.shields.io/badge/AppVersion-0.11.0-informational?style=flat-square)

An operator to manage the lifecycle of public cloud providers resources needed to expose endpoints on public nodes.

Expand Down
Loading
Loading