Skip to content

Commit

Permalink
fixes to properly handle map pointers to ensure we're working on the …
Browse files Browse the repository at this point in the history
…right data
  • Loading branch information
Gilbert Mena committed Dec 29, 2023
1 parent 933ada8 commit e37d25f
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 27 deletions.
50 changes: 41 additions & 9 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,8 @@ func CreateStateMachineTableIfNotExists(db *sql.DB, stateMachineName string) err
UpdatedTimestamp TIMESTAMP,
UsesGlobalLock BOOLEAN,
UsesLocalLock BOOLEAN,
UnlockTimestamp TIMESTAMP NULL,
UnlockedTimestamp TIMESTAMP NULL,
LastRetryTimestamp TIMESTAMP NULL,
INDEX (LookupKey)
-- Add other columns as needed
);`, normalizeTableName(stateMachineName))
Expand Down Expand Up @@ -176,9 +177,24 @@ func obtainLocalLock(tx *sql.Tx, sm *StateMachine) error {
usesLocalLock = true
}

// Convert zero time.Time to nil for SQL insertion
var unlockedTimestamp interface{}
if sm.UnlockedTimestamp.IsZero() {
unlockedTimestamp = nil
} else {
unlockedTimestamp = sm.UnlockedTimestamp.UTC()
}

var lastRetry interface{}
if sm.LastRetry.IsZero() {
lastRetry = nil
} else {
lastRetry = sm.LastRetry.UTC()
}

// Insert a new local lock record into the state machine's table with the custom lookup key.
_, err = tx.Exec(fmt.Sprintf("INSERT INTO %s (ID, CurrentState, LookupKey, ResumeFromStep, SaveAfterStep, KafkaEventTopic, SerializedState, CreatedTimestamp, UpdatedTimestamp, UsesGlobalLock, UsesLocalLock, UnlockTimestamp) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, NULL);", normalizeTableName(sm.Name)),
sm.ID, sm.CurrentState, sm.LookupKey, sm.ResumeFromStep, sm.SaveAfterEachStep, sm.KafkaEventTopic, string(serializedState), sm.CreatedTimestamp, sm.UpdatedTimestamp, usesGlobalLock, usesLocalLock)
_, err = tx.Exec(fmt.Sprintf("INSERT INTO %s (ID, CurrentState, LookupKey, ResumeFromStep, SaveAfterStep, KafkaEventTopic, SerializedState, CreatedTimestamp, UpdatedTimestamp, UsesGlobalLock, UnlockedTimestamp, LastRetryTimestamp, UsesLocalLock, UnlockTimestamp) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, NULL);", normalizeTableName(sm.Name)),
sm.ID, sm.CurrentState, sm.LookupKey, sm.ResumeFromStep, sm.SaveAfterEachStep, sm.KafkaEventTopic, string(serializedState), sm.CreatedTimestamp, sm.UpdatedTimestamp, unlockedTimestamp, lastRetry, usesGlobalLock, usesLocalLock)

if err != nil {
return err
Expand All @@ -191,8 +207,8 @@ func obtainLocalLock(tx *sql.Tx, sm *StateMachine) error {
func insertStateMachine(sm *StateMachine) error {
tableName := normalizeTableName(sm.Name)
insertSQL := fmt.Sprintf(`
INSERT INTO %s (ID, CurrentState, LookupKey, ResumeFromStep, SaveAfterStep, KafkaEventTopic, SerializedState, CreatedTimestamp, UpdatedTimestamp, UsesGlobalLock, UsesLocalLock)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);`, tableName)
INSERT INTO %s (ID, CurrentState, LookupKey, ResumeFromStep, SaveAfterStep, KafkaEventTopic, SerializedState, CreatedTimestamp, UpdatedTimestamp, UnlockedTimestamp, LastRetryTimestamp, UsesGlobalLock, UsesLocalLock)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);`, tableName)

serializedState, err := sm.serializeToJSON()
if err != nil {
Expand All @@ -207,8 +223,23 @@ func insertStateMachine(sm *StateMachine) error {
usesLocalLock = true
}

// Convert zero time.Time to nil for SQL insertion
var unlockedTimestamp interface{}
if sm.UnlockedTimestamp == nil {
unlockedTimestamp = nil
} else {
unlockedTimestamp = sm.UnlockedTimestamp.UTC()
}

var lastRetry interface{}
if sm.LastRetry == nil {
lastRetry = nil
} else {
lastRetry = sm.LastRetry.UTC()
}

// Execute the SQL statement within the transaction
_, err = sm.DB.Exec(insertSQL, sm.ID, sm.CurrentState, sm.LookupKey, sm.ResumeFromStep, sm.SaveAfterEachStep, sm.KafkaEventTopic, string(serializedState), sm.CreatedTimestamp.UTC(), sm.UpdatedTimestamp.UTC(), usesGlobalLock, usesLocalLock)
_, err = sm.DB.Exec(insertSQL, sm.ID, sm.CurrentState, sm.LookupKey, sm.ResumeFromStep, sm.SaveAfterEachStep, sm.KafkaEventTopic, string(serializedState), sm.CreatedTimestamp.UTC(), sm.UpdatedTimestamp.UTC(), unlockedTimestamp, lastRetry, usesGlobalLock, usesLocalLock)

return err
}
Expand Down Expand Up @@ -296,10 +327,11 @@ func loadStateMachineWithNoLock(sm *StateMachine) (*StateMachine, error) {
}

if unlockedTimestampStr.Valid {
sm.UnlockedTimestamp, err = parseTimestamp(unlockedTimestampStr.String)
unlockedTimestamp, err := parseTimestamp(unlockedTimestampStr.String)
if err != nil {
return nil, err
}
sm.UnlockedTimestamp = &unlockedTimestamp
}
}

Expand Down Expand Up @@ -347,11 +379,11 @@ func loadStateMachine(tx *sql.Tx, sm *StateMachine) (*StateMachine, error) {
}

if unlockedTimestampStr.Valid {
loadedSM.UnlockedTimestamp, err = parseTimestamp(unlockedTimestampStr.String)
unlockedTimestamp, err := parseTimestamp(unlockedTimestampStr.String)
if err != nil {
tx.Rollback()
return nil, err
}
loadedSM.UnlockedTimestamp = &unlockedTimestamp
}

loadedSM.SerializedState = serializedState
Expand Down
23 changes: 12 additions & 11 deletions finitemachine.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,33 +55,33 @@ func DetermineTerminalStateEvent(smCtx *Context) (executionEvent Event) {
}

// DetermineExecutionAction determines the execution action based on the input state.
func DetermineExecutionAction(smCtx *Context) (executionEvent Event, err error) {
func DetermineExecutionAction(inputArbitraryData map[string]interface{}, smCtx *Context) (executionEvent Event, err error) {
if IsTerminalState(smCtx.InputState) {
return DetermineTerminalStateEvent(smCtx), nil
}

var outputData map[string]interface{}
switch smCtx.InputState {
case StateParked:
// If parked, try to restart execution based on previous state
lastState := getLastNonParkedState(smCtx.TransitionHistory)
return restartExecutionFromState(lastState, smCtx)

case StatePending, StateOpen:
executionEvent, _, err = smCtx.Handler.ExecuteForward(smCtx.InputArbitraryData, smCtx.TransitionHistory)
executionEvent, outputData, err = smCtx.Handler.ExecuteForward(inputArbitraryData, smCtx.TransitionHistory)

case StatePaused:
executionEvent, _, err = smCtx.Handler.ExecutePause(smCtx.InputArbitraryData, smCtx.TransitionHistory)
executionEvent, outputData, err = smCtx.Handler.ExecutePause(inputArbitraryData, smCtx.TransitionHistory)

case StateRollback:
executionEvent, _, err = smCtx.Handler.ExecuteBackward(smCtx.InputArbitraryData, smCtx.TransitionHistory)
executionEvent, outputData, err = smCtx.Handler.ExecuteBackward(inputArbitraryData, smCtx.TransitionHistory)

case StateResume:
executionEvent, _, err = smCtx.Handler.ExecuteResume(smCtx.InputArbitraryData, smCtx.TransitionHistory)
executionEvent, outputData, err = smCtx.Handler.ExecuteResume(inputArbitraryData, smCtx.TransitionHistory)

default: // not sure what happened so let's park it
executionEvent = OnParked
}

smCtx.OutputArbitraryData = outputData
return executionEvent, err
}

Expand Down Expand Up @@ -118,17 +118,18 @@ func restartExecutionFromState(state State, smCtx *Context) (Event, error) {

func (smCtx *Context) Handle() (executionEvent Event, err error) {

inputArbitraryData := CopyMap(smCtx.InputArbitraryData)
outputArbitraryData := CopyMap(smCtx.InputArbitraryData)
inputArbitraryData := CopyMap(smCtx.InputArbitraryData) // this will end up with the same value as out
outputArbitraryData := inputArbitraryData

defer func() {
err = smCtx.finishHandlingContext(executionEvent, inputArbitraryData, outputArbitraryData)
err = smCtx.finishHandlingContext(executionEvent, smCtx.InputArbitraryData, outputArbitraryData)
if err != nil {
executionEvent = OnError
}
}()

executionEvent, err = DetermineExecutionAction(smCtx)
executionEvent, err = DetermineExecutionAction(inputArbitraryData, smCtx)
outputArbitraryData = CopyMap(smCtx.OutputArbitraryData)

return executionEvent, err
}
Expand Down
11 changes: 4 additions & 7 deletions statemachine.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,13 +175,13 @@ type StateMachine struct {
CurrentArbitraryData map[string]interface{} `json:"currentArbitraryData"` // Additional field for storing arbitrary data
CreatedTimestamp time.Time `json:"createdTimestamp"`
UpdatedTimestamp time.Time `json:"updatedTimestamp"`
UnlockedTimestamp time.Time `json:"unlockedTimestamp"`
UnlockedTimestamp *time.Time `json:"unlockedTimestamp"`
LockType LockType `json:"lockType"`
RetryCount int `json:"retryCount"`
RetryType RetryType `json:"retryType"`
MaxTimeout time.Duration `json:"maxTimeout"`
BaseDelay time.Duration `json:"baseDelay"`
LastRetry time.Time `json:"lastRetry"`
LastRetry *time.Time `json:"lastRetry"`
SerializedState []byte `json:"-"`
}

Expand Down Expand Up @@ -404,10 +404,6 @@ func (sm *StateMachine) processStateMachine(context *Context) error {
sm.CurrentArbitraryData = make(map[string]interface{})
}

if sm.Handlers == nil || len(sm.Handlers) == 0 {
return fmt.Errorf("no handlers found")
}

var handler Handler
// Let's check if this is a success and we are done
if sm.ResumeFromStep >= len(sm.Handlers) || sm.ResumeFromStep < 0 {
Expand Down Expand Up @@ -521,7 +517,8 @@ func (sm *StateMachine) handleTransition(context *Context, event Event) error {
if newState == sm.CurrentState {
sm.RetryCount++
}
sm.LastRetry = time.Now()
lastRetry := time.Now()
sm.LastRetry = &lastRetry
remainingDelay = sm.GetRemainingDelay()
shouldRetry = true

Expand Down

0 comments on commit e37d25f

Please sign in to comment.