Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(cstor-volume-mgmt): add replica scale down support #1499

Merged
merged 4 commits into from
Nov 5, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion cmd/cstor-pool-mgmt/volumereplica/volumereplica.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,12 @@ package volumereplica

import (
"fmt"
"github.com/openebs/maya/pkg/alertlog"
"os"
"strings"
"time"

"github.com/openebs/maya/pkg/alertlog"

"encoding/json"

apis "github.com/openebs/maya/pkg/apis/openebs.io/v1alpha1"
Expand Down
139 changes: 124 additions & 15 deletions cmd/cstor-volume-mgmt/controller/volume-controller/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -480,11 +480,38 @@ func (c *CStorVolumeController) addResizeConditions(
return updatedCVObj, nil
}

// updateCStorVolumeDRF updates desiredReplicationFactor in memory istgt
// configuration on success updates istgt.conf file
// updateCStorVolumeDRF updates volume configurations to changes made to
// cStorVolume CR.
// If no.of entries in cStorvolume spec.ReplicaDetails.KnownReplicas is less
// than entries in status.ReplicaDetails.knownReplicas then the changes are
// identified for scaledown
// process.
// If no.of entries in cStorVolume spec.ReplicaDetails.KnownReplicas is equal to
// no.of entries in stauts.ReplicaDetails.KnownReplicas then the changes are
// identified for scaleup process(Only one process should be triggered either
// scaleup or scaledown).
// NOTE: No.of entries in spec.KnownReplicas and status.knownReplicas will vary
// only in the scaledown process else always it will be same
func (c *CStorVolumeController) updateCStorVolumeDRF(
cStorVolume *apis.CStorVolume) {
err := volume.ExecuteDesiredReplicationFactorCommand(cStorVolume)
// make changes to copyCV instead of cStorVolume
copyCV := cStorVolume.DeepCopy()
var err error
specKnownReplicaCount := len(copyCV.Spec.ReplicaDetails.KnownReplicas)
statusKnownReplicaCount := len(copyCV.Status.ReplicaDetails.KnownReplicas)
if specKnownReplicaCount < statusKnownReplicaCount {
copyCV, err = c.triggerScaleDownProcess(copyCV)
} else if specKnownReplicaCount == statusKnownReplicaCount {
copyCV, err = c.triggerScaleUpProcess(copyCV)
} else {
// entries in spec.ReplicaDetails.KnownReplicas can't be greater than
// status.ReplicaDetails.KnownReplicas(unkown changes)
err = pkg_errors.Errorf("unkown changes are made to cStorVolume no.of "+
"entries in spec.ReplicaDetails.knownReplicas are %d and no.of entries "+
"in status.ReplicaDetails.KnownReplicas are %d",
specKnownReplicaCount,
statusKnownReplicaCount)
}
if err != nil {
c.recorder.Event(cStorVolume,
corev1.EventTypeWarning,
Expand All @@ -493,6 +520,27 @@ func (c *CStorVolumeController) updateCStorVolumeDRF(
" Error: %v", cStorVolume.Spec.DesiredReplicationFactor, err,
),
)
return
}
cStorVolume = copyCV
c.recorder.Event(cStorVolume,
corev1.EventTypeNormal,
string(common.SuccessUpdated),
fmt.Sprintf("Successfully updated the desiredReplicationFactor to %d",
cStorVolume.Spec.DesiredReplicationFactor),
)
}

// triggerScaleUpProcess returns error in case of any error occurred during scaleup
// process else it will return cstorvolume object
func (c *CStorVolumeController) triggerScaleUpProcess(
cStorVolume *apis.CStorVolume) (*apis.CStorVolume, error) {
err := volume.ExecuteDesiredReplicationFactorCommand(
cStorVolume,
volume.GetScaleUpCommand,
)
if err != nil {
return nil, err
}
fileOperator := util.RealFileOperator{}
updatedConfig := fmt.Sprintf("%s %d",
Expand All @@ -506,20 +554,81 @@ func (c *CStorVolumeController) updateCStorVolumeDRF(
)
cstorvolume_v1alpha1.ConfFileMutex.Unlock()
if err != nil {
c.recorder.Event(cStorVolume,
corev1.EventTypeWarning,
string(common.FailureUpdate),
fmt.Sprintf("failed to update confile with desired replication factor %d"+
" Error: %v", cStorVolume.Spec.DesiredReplicationFactor, err,
),
)
return nil, pkg_errors.Wrapf(err, "failed to update conf file with "+
"desiredReplicationFactor %d", cStorVolume.Spec.DesiredReplicationFactor)
}
c.recorder.Event(cStorVolume,
corev1.EventTypeNormal,
string(common.SuccessUpdated),
fmt.Sprintf("Successfully updated the desiredReplicationFactor to %d",
cStorVolume.Spec.DesiredReplicationFactor),
return cStorVolume, nil
}

// triggerScaleDownProcess returns error in case of any error during the scaledown
// process else it will return cstorvolume object. Following steps are executed
// 1. Verify whether all the replicas are healthy other than removing replica.
// 2. If step1 is passed then update the istgt.conf file with latest
// information.
// 3. Update cStorVolume CR(replicationFactor, ConsistencyFactor and known
// replica list.
// 4. Trigger istgtcontrol command (istgtcontrol drf <vol_name> <value>
// <remaining_replica_list>
// In triggerScaleDownProcess cStorVolumeAPI is used to access fields in CV
// object and cStorvolume is used to access methods of cStorVolume
func (c *CStorVolumeController) triggerScaleDownProcess(
cStorVolumeAPI *apis.CStorVolume) (*apis.CStorVolume, error) {
volumeStatus, err := volume.GetVolumeStatus(cStorVolumeAPI)
if err != nil {
return nil, err
}
if common.CStorVolumeStatus(volumeStatus.Status) != common.CVStatusHealthy {
return nil, pkg_errors.Errorf("cStorvolume is not in healthy to trigger scaledown")
}
cStorVolume := cstorvolume_v1alpha1.NewForAPIObject(cStorVolumeAPI)
if !cStorVolume.AreSpecReplicasHealthy(volumeStatus) {
return nil, pkg_errors.Errorf("spec replicas are not in healthy state")
}
replicaID := cStorVolume.GetRemovingReplicaID()
if replicaID == "" {
return nil, pkg_errors.Errorf("removing replica is not present in volume status")
}
if cStorVolumeAPI.Spec.DesiredReplicationFactor < cStorVolumeAPI.Spec.ReplicationFactor {
configData := cStorVolume.BuildScaleDownConfigData(replicaID)
fileOperator := util.RealFileOperator{}
cstorvolume_v1alpha1.ConfFileMutex.Lock()
err = fileOperator.UpdateOrAppendMultipleLines(
cstorvolume_v1alpha1.IstgtConfPath,
configData,
0644)
cstorvolume_v1alpha1.ConfFileMutex.Unlock()
if err != nil {
return nil, pkg_errors.Wrapf(err, "failed to update istgt conf file %v", configData)
}
cStorVolumeAPI.Spec.ReplicationFactor = cStorVolumeAPI.Spec.DesiredReplicationFactor
cStorVolumeAPI.Spec.ConsistencyFactor = (cStorVolumeAPI.Spec.ReplicationFactor)/2 + 1
cStorVolumeAPI, err = c.clientset.
OpenebsV1alpha1().
CStorVolumes(cStorVolumeAPI.Namespace).
Update(cStorVolumeAPI)
if err != nil {
return nil, pkg_errors.Wrapf(err, "failed to update cstorvolume")
}
}
err = volume.ExecuteDesiredReplicationFactorCommand(
cStorVolumeAPI,
volume.GetScaleDownCommand,
)
if err != nil {
return nil, err
}
cStorVolumeAPI.Status.ReplicaDetails.KnownReplicas =
cStorVolumeAPI.Spec.ReplicaDetails.KnownReplicas
cStorVolumeAPI, err = c.clientset.
OpenebsV1alpha1().
CStorVolumes(cStorVolumeAPI.Namespace).
Update(cStorVolumeAPI)
if err != nil {
return nil, pkg_errors.Wrapf(err,
"failed to update cstorvolume status with scaledown replica information",
)
}
return cStorVolumeAPI, nil
}

// resizeCStorVolume resize the cstorvolume and if any error occurs updates the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,11 +126,9 @@ func NewCStorVolumeController(
} else if IsDestroyEvent(newCStorVolume) {
q.Operation = common.QOpDestroy
klog.Infof("cStorVolume Destroy event : %v, %v", newCStorVolume.ObjectMeta.Name, string(newCStorVolume.ObjectMeta.UID))
controller.recorder.Event(newCStorVolume, corev1.EventTypeNormal, string(common.SuccessSynced), string(common.MessageDestroySynced))
} else {
q.Operation = common.QOpModify
klog.Infof("cStorVolume Modify event : %v, %v", newCStorVolume.ObjectMeta.Name, string(newCStorVolume.ObjectMeta.UID))
controller.recorder.Event(newCStorVolume, corev1.EventTypeNormal, string(common.SuccessSynced), string(common.MessageModifySynced))
}
controller.enqueueCStorVolume(newCStorVolume, q)
},
Expand Down
38 changes: 23 additions & 15 deletions cmd/cstor-volume-mgmt/volume/volume.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,11 +281,13 @@ func ResizeTargetVolume(cStorVolume *apis.CStorVolume) error {
return nil
}

//ExecuteDesiredReplicationFactorCommand executes istgtcontrol command to update
//desired replication factor
func ExecuteDesiredReplicationFactorCommand(cStorVolume *apis.CStorVolume) error {
// ExecuteDesiredReplicationFactorCommand executes istgtcontrol command to update
// desired replication factor
func ExecuteDesiredReplicationFactorCommand(
cStorVolume *apis.CStorVolume,
getDRFCmd func(*apis.CStorVolume) string) error {
// send desiredReplicationFactor command to istgt and read the response
drfCmd := getDRFCommand(cStorVolume)
drfCmd := getDRFCmd(cStorVolume)
sockResp, err := UnixSockVar.SendCommand(drfCmd)
if err != nil {
return errors.Wrapf(
Expand All @@ -307,16 +309,29 @@ func ExecuteDesiredReplicationFactorCommand(cStorVolume *apis.CStorVolume) error
return nil
}

//getResizeCommand will return data required to execute istgtcontrol drf
//command
//Ex command: drf <vol_name> <value>
func getDRFCommand(cstorVolume *apis.CStorVolume) string {
// GetScaleUpCommand will return data required to execute istgtcontrol drf
// command
// Ex command: drf <vol_name> <value>
func GetScaleUpCommand(cstorVolume *apis.CStorVolume) string {
return fmt.Sprintf("%s %s %d", util.IstgtDRFCmd,
cstorVolume.Name,
cstorVolume.Spec.DesiredReplicationFactor,
)
}

// GetScaleDownCommand return replica scale down command
// Ex command: drf <vol_name> <value> <known replica list>
func GetScaleDownCommand(cStorVolume *apis.CStorVolume) string {
cmd := fmt.Sprintf("%s %s %d ", util.IstgtDRFCmd,
cStorVolume.Name,
cStorVolume.Spec.DesiredReplicationFactor,
)
for repID := range cStorVolume.Spec.ReplicaDetails.KnownReplicas {
cmd = cmd + fmt.Sprintf("%s ", repID)
}
return cmd
}

// getResizeCommand returns resize used to resize volumes
// Ex command for resize: Resize volname 10G 10 30
func getResizeCommand(cstorVolume *apis.CStorVolume) string {
Expand Down Expand Up @@ -349,13 +364,6 @@ func CheckValidVolume(cStorVolume *apis.CStorVolume) error {
if cStorVolume.Spec.DesiredReplicationFactor == 0 {
return fmt.Errorf("DesiredReplicationFactor cannot be zero")
}
if cStorVolume.Spec.DesiredReplicationFactor < cStorVolume.Spec.ReplicationFactor {
return fmt.Errorf("DesiredReplicationFactor %d cannot be less "+
"than ReplicationFactor %d",
cStorVolume.Spec.DesiredReplicationFactor,
cStorVolume.Spec.ReplicationFactor,
)
}
}
if cStorVolume.Spec.ReplicationFactor == 0 {
return fmt.Errorf("replicationFactor cannot be zero")
Expand Down
2 changes: 1 addition & 1 deletion cmd/cstor-volume-mgmt/volume/volume_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ func TestCheckValidVolume(t *testing.T) {
},
},
"invalid desiredreplicationfactor/replicationfactor": {
expectedError: true,
expectedError: false,
test: &apis.CStorVolume{
ObjectMeta: v1.ObjectMeta{
Name: "testvol1",
Expand Down
63 changes: 61 additions & 2 deletions pkg/cstor/volume/v1alpha1/cstorvolume.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,10 +188,15 @@ func (c *CStorVolume) IsDRFPending() bool {
)
return false
}
drfStringValue := fmt.Sprintf("%d", c.object.Spec.DesiredReplicationFactor)
drfStringValue := fmt.Sprintf(" %d", c.object.Spec.DesiredReplicationFactor)
// gotConfig will have " DesiredReplicationFactor 3" and we will extract
// numeric character from output
return !strings.HasSuffix(gotConfig, drfStringValue)
if !strings.HasSuffix(gotConfig, drfStringValue) {
return true
}
// reconciliation check for replica scaledown scenarion
return (len(c.object.Spec.ReplicaDetails.KnownReplicas) <
len(c.object.Status.ReplicaDetails.KnownReplicas))
}

// GetCVCondition returns corresponding cstorvolume condition based argument passed
Expand All @@ -215,6 +220,60 @@ func (c *CStorVolume) IsConditionPresent(condType apis.CStorVolumeConditionType)
return false
}

// AreSpecReplicasHealthy return true if all the spec replicas are in Healthy
// state else return false
func (c *CStorVolume) AreSpecReplicasHealthy(volStatus *apis.CVStatus) bool {
var isReplicaExist bool
var replicaInfo apis.ReplicaStatus
for _, replicaValue := range c.object.Spec.ReplicaDetails.KnownReplicas {
isReplicaExist = false
for _, replicaInfo = range volStatus.ReplicaStatuses {
if replicaInfo.ID == replicaValue {
isReplicaExist = true
break
}
}
if (isReplicaExist && replicaInfo.Mode != "Healthy") || !isReplicaExist {
return false
}
}
return true
}

// GetRemovingReplicaID return replicaID that present in status but not in spec
func (c *CStorVolume) GetRemovingReplicaID() string {
for repID := range c.object.Status.ReplicaDetails.KnownReplicas {
// If known replica is not exist in spec but if it exist in status then
// user/operator selected that replica for removal
if _, isReplicaExist :=
c.object.Spec.ReplicaDetails.KnownReplicas[repID]; !isReplicaExist {
return string(repID)
}
}
return ""
}

// BuildScaleDownConfigData build data based on replica that needs to remove
func (c *CStorVolume) BuildScaleDownConfigData(repID string) map[string]string {
configData := map[string]string{}
newReplicationFactor := c.object.Spec.DesiredReplicationFactor
newConsistencyFactor := (newReplicationFactor / 2) + 1
key := fmt.Sprintf(" ReplicationFactor")
value := fmt.Sprintf(" ReplicationFactor %d", newReplicationFactor)
configData[key] = value
key = fmt.Sprintf(" ConsistencyFactor")
value = fmt.Sprintf(" ConsistencyFactor %d", newConsistencyFactor)
configData[key] = value
key = fmt.Sprintf(" DesiredReplicationFactor")
value = fmt.Sprintf(" DesiredReplicationFactor %d",
c.object.Spec.DesiredReplicationFactor)
configData[key] = value
key = fmt.Sprintf(" Replica %s", repID)
value = fmt.Sprintf("")
configData[key] = value
mittachaitu marked this conversation as resolved.
Show resolved Hide resolved
return configData
}

// PredicateList holds a list of cstor volume
// based predicates
type PredicateList []Predicate
Expand Down
9 changes: 8 additions & 1 deletion pkg/util/fileoperator.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ func (r RealFileOperator) Updatefile(fileName, updatedVal, searchString string,
func (r RealFileOperator) UpdateOrAppendMultipleLines(fileName string,
keyUpdateValue map[string]string, perm os.FileMode) error {
var newLines []string
var index int
var line string
buffer, err := ioutil.ReadFile(filepath.Clean(fileName))
if err != nil {
return errors.Wrapf(err, "failed to read %s file", fileName)
Expand All @@ -105,13 +107,18 @@ func (r RealFileOperator) UpdateOrAppendMultipleLines(fileName string,
// will be doing after current blockers
for key, updatedValue := range keyUpdateValue {
found := false
for index, line := range newLines {
for index, line = range newLines {
if strings.HasPrefix(line, key) {
newLines[index] = updatedValue
found = true
break
}
}
// To remove particular line that matched with key
if found && updatedValue == "" {
newLines = append(newLines[:index], newLines[index+1:]...)
continue
}
if found == false {
newLines = append(newLines, updatedValue)
}
Expand Down
Loading