Skip to content

Commit

Permalink
feat(cstor-volume-mgmt): add replica scale down support (openebs-arch…
Browse files Browse the repository at this point in the history
…ive#1499)

Signed-off-by: mittachaitu <sai.chaithanya@mayadata.io>
  • Loading branch information
sai chaithanya authored and shubham14bajpai committed Dec 27, 2019
1 parent bd2b95d commit b24a8a0
Show file tree
Hide file tree
Showing 8 changed files with 260 additions and 49 deletions.
3 changes: 2 additions & 1 deletion cmd/cstor-pool-mgmt/volumereplica/volumereplica.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,12 @@ package volumereplica

import (
"fmt"
"github.com/openebs/maya/pkg/alertlog"
"os"
"strings"
"time"

"github.com/openebs/maya/pkg/alertlog"

"encoding/json"

apis "github.com/openebs/maya/pkg/apis/openebs.io/v1alpha1"
Expand Down
139 changes: 124 additions & 15 deletions cmd/cstor-volume-mgmt/controller/volume-controller/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -480,11 +480,38 @@ func (c *CStorVolumeController) addResizeConditions(
return updatedCVObj, nil
}

// updateCStorVolumeDRF updates desiredReplicationFactor in memory istgt
// configuration on success updates istgt.conf file
// updateCStorVolumeDRF updates volume configurations to changes made to
// cStorVolume CR.
// If no.of entries in cStorvolume spec.ReplicaDetails.KnownReplicas is less
// than entries in status.ReplicaDetails.knownReplicas then the changes are
// identified for scaledown
// process.
// If no.of entries in cStorVolume spec.ReplicaDetails.KnownReplicas is equal to
// no.of entries in stauts.ReplicaDetails.KnownReplicas then the changes are
// identified for scaleup process(Only one process should be triggered either
// scaleup or scaledown).
// NOTE: No.of entries in spec.KnownReplicas and status.knownReplicas will vary
// only in the scaledown process else always it will be same
func (c *CStorVolumeController) updateCStorVolumeDRF(
cStorVolume *apis.CStorVolume) {
err := volume.ExecuteDesiredReplicationFactorCommand(cStorVolume)
// make changes to copyCV instead of cStorVolume
copyCV := cStorVolume.DeepCopy()
var err error
specKnownReplicaCount := len(copyCV.Spec.ReplicaDetails.KnownReplicas)
statusKnownReplicaCount := len(copyCV.Status.ReplicaDetails.KnownReplicas)
if specKnownReplicaCount < statusKnownReplicaCount {
copyCV, err = c.triggerScaleDownProcess(copyCV)
} else if specKnownReplicaCount == statusKnownReplicaCount {
copyCV, err = c.triggerScaleUpProcess(copyCV)
} else {
// entries in spec.ReplicaDetails.KnownReplicas can't be greater than
// status.ReplicaDetails.KnownReplicas(unkown changes)
err = pkg_errors.Errorf("unkown changes are made to cStorVolume no.of "+
"entries in spec.ReplicaDetails.knownReplicas are %d and no.of entries "+
"in status.ReplicaDetails.KnownReplicas are %d",
specKnownReplicaCount,
statusKnownReplicaCount)
}
if err != nil {
c.recorder.Event(cStorVolume,
corev1.EventTypeWarning,
Expand All @@ -493,6 +520,27 @@ func (c *CStorVolumeController) updateCStorVolumeDRF(
" Error: %v", cStorVolume.Spec.DesiredReplicationFactor, err,
),
)
return
}
cStorVolume = copyCV
c.recorder.Event(cStorVolume,
corev1.EventTypeNormal,
string(common.SuccessUpdated),
fmt.Sprintf("Successfully updated the desiredReplicationFactor to %d",
cStorVolume.Spec.DesiredReplicationFactor),
)
}

// triggerScaleUpProcess returns error in case of any error occurred during scaleup
// process else it will return cstorvolume object
func (c *CStorVolumeController) triggerScaleUpProcess(
cStorVolume *apis.CStorVolume) (*apis.CStorVolume, error) {
err := volume.ExecuteDesiredReplicationFactorCommand(
cStorVolume,
volume.GetScaleUpCommand,
)
if err != nil {
return nil, err
}
fileOperator := util.RealFileOperator{}
updatedConfig := fmt.Sprintf("%s %d",
Expand All @@ -506,20 +554,81 @@ func (c *CStorVolumeController) updateCStorVolumeDRF(
)
cstorvolume_v1alpha1.ConfFileMutex.Unlock()
if err != nil {
c.recorder.Event(cStorVolume,
corev1.EventTypeWarning,
string(common.FailureUpdate),
fmt.Sprintf("failed to update confile with desired replication factor %d"+
" Error: %v", cStorVolume.Spec.DesiredReplicationFactor, err,
),
)
return nil, pkg_errors.Wrapf(err, "failed to update conf file with "+
"desiredReplicationFactor %d", cStorVolume.Spec.DesiredReplicationFactor)
}
c.recorder.Event(cStorVolume,
corev1.EventTypeNormal,
string(common.SuccessUpdated),
fmt.Sprintf("Successfully updated the desiredReplicationFactor to %d",
cStorVolume.Spec.DesiredReplicationFactor),
return cStorVolume, nil
}

// triggerScaleDownProcess returns error in case of any error during the scaledown
// process else it will return cstorvolume object. Following steps are executed
// 1. Verify whether all the replicas are healthy other than removing replica.
// 2. If step1 is passed then update the istgt.conf file with latest
// information.
// 3. Update cStorVolume CR(replicationFactor, ConsistencyFactor and known
// replica list.
// 4. Trigger istgtcontrol command (istgtcontrol drf <vol_name> <value>
// <remaining_replica_list>
// In triggerScaleDownProcess cStorVolumeAPI is used to access fields in CV
// object and cStorvolume is used to access methods of cStorVolume
func (c *CStorVolumeController) triggerScaleDownProcess(
cStorVolumeAPI *apis.CStorVolume) (*apis.CStorVolume, error) {
volumeStatus, err := volume.GetVolumeStatus(cStorVolumeAPI)
if err != nil {
return nil, err
}
if common.CStorVolumeStatus(volumeStatus.Status) != common.CVStatusHealthy {
return nil, pkg_errors.Errorf("cStorvolume is not in healthy to trigger scaledown")
}
cStorVolume := cstorvolume_v1alpha1.NewForAPIObject(cStorVolumeAPI)
if !cStorVolume.AreSpecReplicasHealthy(volumeStatus) {
return nil, pkg_errors.Errorf("spec replicas are not in healthy state")
}
replicaID := cStorVolume.GetRemovingReplicaID()
if replicaID == "" {
return nil, pkg_errors.Errorf("removing replica is not present in volume status")
}
if cStorVolumeAPI.Spec.DesiredReplicationFactor < cStorVolumeAPI.Spec.ReplicationFactor {
configData := cStorVolume.BuildScaleDownConfigData(replicaID)
fileOperator := util.RealFileOperator{}
cstorvolume_v1alpha1.ConfFileMutex.Lock()
err = fileOperator.UpdateOrAppendMultipleLines(
cstorvolume_v1alpha1.IstgtConfPath,
configData,
0644)
cstorvolume_v1alpha1.ConfFileMutex.Unlock()
if err != nil {
return nil, pkg_errors.Wrapf(err, "failed to update istgt conf file %v", configData)
}
cStorVolumeAPI.Spec.ReplicationFactor = cStorVolumeAPI.Spec.DesiredReplicationFactor
cStorVolumeAPI.Spec.ConsistencyFactor = (cStorVolumeAPI.Spec.ReplicationFactor)/2 + 1
cStorVolumeAPI, err = c.clientset.
OpenebsV1alpha1().
CStorVolumes(cStorVolumeAPI.Namespace).
Update(cStorVolumeAPI)
if err != nil {
return nil, pkg_errors.Wrapf(err, "failed to update cstorvolume")
}
}
err = volume.ExecuteDesiredReplicationFactorCommand(
cStorVolumeAPI,
volume.GetScaleDownCommand,
)
if err != nil {
return nil, err
}
cStorVolumeAPI.Status.ReplicaDetails.KnownReplicas =
cStorVolumeAPI.Spec.ReplicaDetails.KnownReplicas
cStorVolumeAPI, err = c.clientset.
OpenebsV1alpha1().
CStorVolumes(cStorVolumeAPI.Namespace).
Update(cStorVolumeAPI)
if err != nil {
return nil, pkg_errors.Wrapf(err,
"failed to update cstorvolume status with scaledown replica information",
)
}
return cStorVolumeAPI, nil
}

// resizeCStorVolume resize the cstorvolume and if any error occurs updates the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,11 +126,9 @@ func NewCStorVolumeController(
} else if IsDestroyEvent(newCStorVolume) {
q.Operation = common.QOpDestroy
klog.Infof("cStorVolume Destroy event : %v, %v", newCStorVolume.ObjectMeta.Name, string(newCStorVolume.ObjectMeta.UID))
controller.recorder.Event(newCStorVolume, corev1.EventTypeNormal, string(common.SuccessSynced), string(common.MessageDestroySynced))
} else {
q.Operation = common.QOpModify
klog.Infof("cStorVolume Modify event : %v, %v", newCStorVolume.ObjectMeta.Name, string(newCStorVolume.ObjectMeta.UID))
controller.recorder.Event(newCStorVolume, corev1.EventTypeNormal, string(common.SuccessSynced), string(common.MessageModifySynced))
}
controller.enqueueCStorVolume(newCStorVolume, q)
},
Expand Down
38 changes: 23 additions & 15 deletions cmd/cstor-volume-mgmt/volume/volume.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,11 +281,13 @@ func ResizeTargetVolume(cStorVolume *apis.CStorVolume) error {
return nil
}

//ExecuteDesiredReplicationFactorCommand executes istgtcontrol command to update
//desired replication factor
func ExecuteDesiredReplicationFactorCommand(cStorVolume *apis.CStorVolume) error {
// ExecuteDesiredReplicationFactorCommand executes istgtcontrol command to update
// desired replication factor
func ExecuteDesiredReplicationFactorCommand(
cStorVolume *apis.CStorVolume,
getDRFCmd func(*apis.CStorVolume) string) error {
// send desiredReplicationFactor command to istgt and read the response
drfCmd := getDRFCommand(cStorVolume)
drfCmd := getDRFCmd(cStorVolume)
sockResp, err := UnixSockVar.SendCommand(drfCmd)
if err != nil {
return errors.Wrapf(
Expand All @@ -307,16 +309,29 @@ func ExecuteDesiredReplicationFactorCommand(cStorVolume *apis.CStorVolume) error
return nil
}

//getResizeCommand will return data required to execute istgtcontrol drf
//command
//Ex command: drf <vol_name> <value>
func getDRFCommand(cstorVolume *apis.CStorVolume) string {
// GetScaleUpCommand will return data required to execute istgtcontrol drf
// command
// Ex command: drf <vol_name> <value>
func GetScaleUpCommand(cstorVolume *apis.CStorVolume) string {
return fmt.Sprintf("%s %s %d", util.IstgtDRFCmd,
cstorVolume.Name,
cstorVolume.Spec.DesiredReplicationFactor,
)
}

// GetScaleDownCommand return replica scale down command
// Ex command: drf <vol_name> <value> <known replica list>
func GetScaleDownCommand(cStorVolume *apis.CStorVolume) string {
cmd := fmt.Sprintf("%s %s %d ", util.IstgtDRFCmd,
cStorVolume.Name,
cStorVolume.Spec.DesiredReplicationFactor,
)
for repID := range cStorVolume.Spec.ReplicaDetails.KnownReplicas {
cmd = cmd + fmt.Sprintf("%s ", repID)
}
return cmd
}

// getResizeCommand returns resize used to resize volumes
// Ex command for resize: Resize volname 10G 10 30
func getResizeCommand(cstorVolume *apis.CStorVolume) string {
Expand Down Expand Up @@ -349,13 +364,6 @@ func CheckValidVolume(cStorVolume *apis.CStorVolume) error {
if cStorVolume.Spec.DesiredReplicationFactor == 0 {
return fmt.Errorf("DesiredReplicationFactor cannot be zero")
}
if cStorVolume.Spec.DesiredReplicationFactor < cStorVolume.Spec.ReplicationFactor {
return fmt.Errorf("DesiredReplicationFactor %d cannot be less "+
"than ReplicationFactor %d",
cStorVolume.Spec.DesiredReplicationFactor,
cStorVolume.Spec.ReplicationFactor,
)
}
}
if cStorVolume.Spec.ReplicationFactor == 0 {
return fmt.Errorf("replicationFactor cannot be zero")
Expand Down
2 changes: 1 addition & 1 deletion cmd/cstor-volume-mgmt/volume/volume_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ func TestCheckValidVolume(t *testing.T) {
},
},
"invalid desiredreplicationfactor/replicationfactor": {
expectedError: true,
expectedError: false,
test: &apis.CStorVolume{
ObjectMeta: v1.ObjectMeta{
Name: "testvol1",
Expand Down
63 changes: 61 additions & 2 deletions pkg/cstor/volume/v1alpha1/cstorvolume.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,10 +188,15 @@ func (c *CStorVolume) IsDRFPending() bool {
)
return false
}
drfStringValue := fmt.Sprintf("%d", c.object.Spec.DesiredReplicationFactor)
drfStringValue := fmt.Sprintf(" %d", c.object.Spec.DesiredReplicationFactor)
// gotConfig will have " DesiredReplicationFactor 3" and we will extract
// numeric character from output
return !strings.HasSuffix(gotConfig, drfStringValue)
if !strings.HasSuffix(gotConfig, drfStringValue) {
return true
}
// reconciliation check for replica scaledown scenarion
return (len(c.object.Spec.ReplicaDetails.KnownReplicas) <
len(c.object.Status.ReplicaDetails.KnownReplicas))
}

// GetCVCondition returns corresponding cstorvolume condition based argument passed
Expand All @@ -215,6 +220,60 @@ func (c *CStorVolume) IsConditionPresent(condType apis.CStorVolumeConditionType)
return false
}

// AreSpecReplicasHealthy return true if all the spec replicas are in Healthy
// state else return false
func (c *CStorVolume) AreSpecReplicasHealthy(volStatus *apis.CVStatus) bool {
var isReplicaExist bool
var replicaInfo apis.ReplicaStatus
for _, replicaValue := range c.object.Spec.ReplicaDetails.KnownReplicas {
isReplicaExist = false
for _, replicaInfo = range volStatus.ReplicaStatuses {
if replicaInfo.ID == replicaValue {
isReplicaExist = true
break
}
}
if (isReplicaExist && replicaInfo.Mode != "Healthy") || !isReplicaExist {
return false
}
}
return true
}

// GetRemovingReplicaID return replicaID that present in status but not in spec
func (c *CStorVolume) GetRemovingReplicaID() string {
for repID := range c.object.Status.ReplicaDetails.KnownReplicas {
// If known replica is not exist in spec but if it exist in status then
// user/operator selected that replica for removal
if _, isReplicaExist :=
c.object.Spec.ReplicaDetails.KnownReplicas[repID]; !isReplicaExist {
return string(repID)
}
}
return ""
}

// BuildScaleDownConfigData build data based on replica that needs to remove
func (c *CStorVolume) BuildScaleDownConfigData(repID string) map[string]string {
configData := map[string]string{}
newReplicationFactor := c.object.Spec.DesiredReplicationFactor
newConsistencyFactor := (newReplicationFactor / 2) + 1
key := fmt.Sprintf(" ReplicationFactor")
value := fmt.Sprintf(" ReplicationFactor %d", newReplicationFactor)
configData[key] = value
key = fmt.Sprintf(" ConsistencyFactor")
value = fmt.Sprintf(" ConsistencyFactor %d", newConsistencyFactor)
configData[key] = value
key = fmt.Sprintf(" DesiredReplicationFactor")
value = fmt.Sprintf(" DesiredReplicationFactor %d",
c.object.Spec.DesiredReplicationFactor)
configData[key] = value
key = fmt.Sprintf(" Replica %s", repID)
value = fmt.Sprintf("")
configData[key] = value
return configData
}

// PredicateList holds a list of cstor volume
// based predicates
type PredicateList []Predicate
Expand Down
9 changes: 8 additions & 1 deletion pkg/util/fileoperator.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ func (r RealFileOperator) Updatefile(fileName, updatedVal, searchString string,
func (r RealFileOperator) UpdateOrAppendMultipleLines(fileName string,
keyUpdateValue map[string]string, perm os.FileMode) error {
var newLines []string
var index int
var line string
buffer, err := ioutil.ReadFile(filepath.Clean(fileName))
if err != nil {
return errors.Wrapf(err, "failed to read %s file", fileName)
Expand All @@ -105,13 +107,18 @@ func (r RealFileOperator) UpdateOrAppendMultipleLines(fileName string,
// will be doing after current blockers
for key, updatedValue := range keyUpdateValue {
found := false
for index, line := range newLines {
for index, line = range newLines {
if strings.HasPrefix(line, key) {
newLines[index] = updatedValue
found = true
break
}
}
// To remove particular line that matched with key
if found && updatedValue == "" {
newLines = append(newLines[:index], newLines[index+1:]...)
continue
}
if found == false {
newLines = append(newLines, updatedValue)
}
Expand Down
Loading

0 comments on commit b24a8a0

Please sign in to comment.