Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Streaming snapshot checkpoint in Tsavorite #824

Merged
merged 20 commits into from
Dec 13, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
updates
  • Loading branch information
badrishc committed Nov 23, 2024
commit 20e07633752cacb718a5067ff5f0ca601448fcb1
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,10 @@ private protected bool ScanLookup<TInput, TOutput, TScanFunctions, TScanIterator
}

// Update the cursor to point to the next record.
cursor = iter.NextAddress;
if (scanCursorState.retryLastRecord)
cursor = iter.CurrentAddress;
else
cursor = iter.NextAddress;

// Now see if we completed the enumeration.
if (scanCursorState.stop)
Expand Down Expand Up @@ -288,6 +291,8 @@ internal Status ConditionalScanPush<TInput, TOutput, TContext, TSessionFunctions
Interlocked.Increment(ref scanCursorState.acceptedCount);
if ((cursorRecordResult & CursorRecordResult.EndBatch) != 0)
scanCursorState.endBatch = true;
if ((cursorRecordResult & CursorRecordResult.RetryLastRecord) != 0)
scanCursorState.retryLastRecord = true;
}
internalStatus = OperationStatus.SUCCESS;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,12 @@ public enum CursorRecordResult
/// <summary>
/// End the current cursor batch (as if "count" had been met); return a valid cursor for the next ScanCursor call
/// </summary>
EndBatch = 4
EndBatch = 4,

/// <summary>
/// Retry the last record when returning a valid cursor
/// </summary>
RetryLastRecord = 8,
}

/// <summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,17 @@ namespace Tsavorite.core
internal sealed class ScanCursorState<TKey, TValue>
{
internal IScanIteratorFunctions<TKey, TValue> functions;
internal long acceptedCount; // Number of records pushed to and accepted by the caller
internal bool endBatch; // End the batch (but return a valid cursor for the next batch, as of "count" records had been returned)
internal bool stop; // Stop the operation (as if all records in the db had been returned)
internal long acceptedCount; // Number of records pushed to and accepted by the caller
internal bool endBatch; // End the batch (but return a valid cursor for the next batch, as if "count" records had been returned)
internal bool retryLastRecord; // Retry the last record when returning a valid cursor
internal bool stop; // Stop the operation (as if all records in the db had been returned)

internal void Initialize(IScanIteratorFunctions<TKey, TValue> scanIteratorFunctions)
{
functions = scanIteratorFunctions;
acceptedCount = 0;
endBatch = false;
retryLastRecord = false;
stop = false;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,6 @@ public override SystemState NextState(SystemState start)
result.Phase = Phase.WAIT_FLUSH;
break;
case Phase.WAIT_FLUSH:
result.Phase = Phase.PERSISTENCE_CALLBACK;
break;
case Phase.PERSISTENCE_CALLBACK:
result.Phase = Phase.REST;
break;
default:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

namespace Tsavorite.core
{

/// <summary>
/// A Streaming Snapshot persists a version by yielding a stream of key-value pairs that correspond to
/// a consistent snapshot of the database, for the old version (v). Unlike Snapshot, StreamingSnapshot
Expand All @@ -29,76 +30,23 @@ public override void GlobalBeforeEnteringState(SystemState next, TsavoriteKV<TKe
base.GlobalBeforeEnteringState(next, store);
store._hybridLogCheckpointToken = Guid.NewGuid();
store._lastSnapshotCheckpoint.Dispose();
_ = Task.Run(store.StreamingSnapshotScanPhase1);
break;
case Phase.PREPARE:
store.InitializeHybridLogCheckpoint(store._hybridLogCheckpointToken, next.Version);
base.GlobalBeforeEnteringState(next, store);
store._hybridLogCheckpoint.info.useSnapshotFile = 1;
break;
case Phase.WAIT_FLUSH:
base.GlobalBeforeEnteringState(next, store);
store._hybridLogCheckpoint.info.finalLogicalAddress = store.hlogBase.GetTailAddress();
store._hybridLogCheckpoint.info.snapshotFinalLogicalAddress = store._hybridLogCheckpoint.info.finalLogicalAddress;

store._hybridLogCheckpoint.snapshotFileDevice =
store.checkpointManager.GetSnapshotLogDevice(store._hybridLogCheckpointToken);
store._hybridLogCheckpoint.snapshotFileObjectLogDevice =
store.checkpointManager.GetSnapshotObjectLogDevice(store._hybridLogCheckpointToken);
store._hybridLogCheckpoint.snapshotFileDevice.Initialize(store.hlogBase.GetSegmentSize());
store._hybridLogCheckpoint.snapshotFileObjectLogDevice.Initialize(-1);

// If we are using a NullDevice then storage tier is not enabled and FlushedUntilAddress may be ReadOnlyAddress; get all records in memory.
store._hybridLogCheckpoint.info.snapshotStartFlushedLogicalAddress = store.hlogBase.IsNullDevice ? store.hlogBase.HeadAddress : store.hlogBase.FlushedUntilAddress;

long startPage = store.hlogBase.GetPage(store._hybridLogCheckpoint.info.snapshotStartFlushedLogicalAddress);
long endPage = store.hlogBase.GetPage(store._hybridLogCheckpoint.info.finalLogicalAddress);
if (store._hybridLogCheckpoint.info.finalLogicalAddress >
store.hlog.GetStartLogicalAddress(endPage))
{
endPage++;
}

// We are writing pages outside epoch protection, so callee should be able to
// handle corrupted or unexpected concurrent page changes during the flush, e.g., by
// resuming epoch protection if necessary. Correctness is not affected as we will
// only read safe pages during recovery.
store.hlogBase.AsyncFlushPagesToDevice(
startPage,
endPage,
store._hybridLogCheckpoint.info.finalLogicalAddress,
store._hybridLogCheckpoint.info.startLogicalAddress,
store._hybridLogCheckpoint.snapshotFileDevice,
store._hybridLogCheckpoint.snapshotFileObjectLogDevice,
out store._hybridLogCheckpoint.flushedSemaphore,
store.ThrottleCheckpointFlushDelayMs);
break;
case Phase.PERSISTENCE_CALLBACK:
// Set actual FlushedUntil to the latest possible data in main log that is on disk
// If we are using a NullDevice then storage tier is not enabled and FlushedUntilAddress may be ReadOnlyAddress; get all records in memory.
store._hybridLogCheckpoint.info.flushedLogicalAddress = store.hlogBase.IsNullDevice ? store.hlogBase.HeadAddress : store.hlogBase.FlushedUntilAddress;
base.GlobalBeforeEnteringState(next, store);
store._lastSnapshotCheckpoint = store._hybridLogCheckpoint.Transfer();
var finalLogicalAddress = store.hlogBase.GetTailAddress();
Task.Run(() => store.StreamingSnapshotScanPhase2(finalLogicalAddress));
break;
default:
base.GlobalBeforeEnteringState(next, store);
break;
}
}

public override void GlobalAfterEnteringState(SystemState next, TsavoriteKV<TKey, TValue, TStoreFunctions, TAllocator> store)
{
switch (next.Phase)
{
case Phase.PREP_STREAMING_SNAPSHOT_CHECKPOINT:
base.GlobalAfterEnteringState(next, store);
// TODO: spawn task that scans and yields until ReadOnlyAddress
// TODO: Then calls store.GlobalStateMachineStep(next);
break;
default:
base.GlobalAfterEnteringState(next, store);
break;
}
}

/// <inheritdoc />
public override void OnThreadState<TInput, TOutput, TContext, TSessionFunctionsWrapper>(
SystemState current,
Expand All @@ -112,7 +60,7 @@ public override void OnThreadState<TInput, TOutput, TContext, TSessionFunctionsW

if (current.Phase != Phase.WAIT_FLUSH) return;

if (ctx is null || !ctx.prevCtx.markers[EpochPhaseIdx.WaitFlush])
if (ctx is null)
{
var s = store._hybridLogCheckpoint.flushedSemaphore;

Expand All @@ -124,16 +72,7 @@ public override void OnThreadState<TInput, TOutput, TContext, TSessionFunctionsW
Debug.Assert(s != null);
valueTasks.Add(new ValueTask(s.WaitAsync(token).ContinueWith(t => s.Release())));
}

if (!notify) return;

if (ctx is not null)
ctx.prevCtx.markers[EpochPhaseIdx.WaitFlush] = true;
}

store.epoch.Mark(EpochPhaseIdx.WaitFlush, current.Version);
if (store.epoch.CheckIsComplete(EpochPhaseIdx.WaitFlush, current.Version))
store.GlobalStateMachineStep(current);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.

using System;
using System.Diagnostics;

namespace Tsavorite.core
{
public partial class TsavoriteKV<TKey, TValue, TStoreFunctions, TAllocator> : TsavoriteBase
where TStoreFunctions : IStoreFunctions<TKey, TValue>
where TAllocator : IAllocator<TKey, TValue, TStoreFunctions>
{
IScanIteratorFunctions<TKey, TValue> streamingSnapshotScanIteratorFunctions;
long scannedUntilAddressCursor;

struct ScanPhase1Functions : IScanIteratorFunctions<TKey, TValue>
{
readonly IScanIteratorFunctions<TKey, TValue> userScanIteratorFunctions;

public ScanPhase1Functions(IScanIteratorFunctions<TKey, TValue> userScanIteratorFunctions)
{
this.userScanIteratorFunctions = userScanIteratorFunctions;
}

/// <inheritdoc />
public bool SingleReader(ref TKey key, ref TValue value, RecordMetadata recordMetadata, long numberOfRecords, out CursorRecordResult cursorRecordResult)
=> userScanIteratorFunctions.SingleReader(ref key, ref value, recordMetadata, numberOfRecords, out cursorRecordResult);

/// <inheritdoc />
public bool ConcurrentReader(ref TKey key, ref TValue value, RecordMetadata recordMetadata, long numberOfRecords, out CursorRecordResult cursorRecordResult)
{
cursorRecordResult = CursorRecordResult.EndBatch | CursorRecordResult.RetryLastRecord;
return false;
}

/// <inheritdoc />
public void OnException(Exception exception, long numberOfRecords)
=> userScanIteratorFunctions.OnException(exception, numberOfRecords);

/// <inheritdoc />
public bool OnStart(long beginAddress, long endAddress)
=> userScanIteratorFunctions.OnStart(beginAddress, long.MaxValue);

/// <inheritdoc />
public void OnStop(bool completed, long numberOfRecords) { }
}

public void StreamingSnapshotScanPhase1()
{
try
{
Debug.Assert(systemState.Phase == Phase.PREP_STREAMING_SNAPSHOT_CHECKPOINT);

// Iterate all the read-only records in the store
scannedUntilAddressCursor = 0;
var scanFunctions = new ScanPhase1Functions(streamingSnapshotScanIteratorFunctions);
using var s = NewSession<Empty, Empty, Empty, SessionFunctionsBase<TKey, TValue, Empty, Empty, Empty>>(default);
_ = s.ScanCursor(ref scannedUntilAddressCursor, long.MaxValue, scanFunctions);
}
finally
{
Debug.Assert(systemState.Phase == Phase.PREP_STREAMING_SNAPSHOT_CHECKPOINT);
GlobalStateMachineStep(systemState);
}
}

struct ScanPhase2Functions : IScanIteratorFunctions<TKey, TValue>
{
readonly IScanIteratorFunctions<TKey, TValue> userScanIteratorFunctions;

public ScanPhase2Functions(IScanIteratorFunctions<TKey, TValue> userScanIteratorFunctions)
{
this.userScanIteratorFunctions = userScanIteratorFunctions;
}

/// <inheritdoc />
public bool SingleReader(ref TKey key, ref TValue value, RecordMetadata recordMetadata, long numberOfRecords, out CursorRecordResult cursorRecordResult)
=> userScanIteratorFunctions.SingleReader(ref key, ref value, recordMetadata, numberOfRecords, out cursorRecordResult);

/// <inheritdoc />
public bool ConcurrentReader(ref TKey key, ref TValue value, RecordMetadata recordMetadata, long numberOfRecords, out CursorRecordResult cursorRecordResult)
{
// NOTE: SingleReader is invoked here, because records in (v) are read-only during the WAIT_FLUSH phase
return userScanIteratorFunctions.SingleReader(ref key, ref value, recordMetadata, numberOfRecords, out cursorRecordResult);
}

/// <inheritdoc />
public void OnException(Exception exception, long numberOfRecords)
=> userScanIteratorFunctions.OnException(exception, numberOfRecords);

/// <inheritdoc />
public bool OnStart(long beginAddress, long endAddress) => true;

/// <inheritdoc />
public void OnStop(bool completed, long numberOfRecords)
=> userScanIteratorFunctions.OnStop(completed, numberOfRecords);
}

public void StreamingSnapshotScanPhase2(long untilAddress)
{
try
{
Debug.Assert(systemState.Phase == Phase.WAIT_FLUSH);

// Iterate all the (v) records in the store
var scanFunctions = new ScanPhase2Functions(streamingSnapshotScanIteratorFunctions);
using var s = NewSession<Empty, Empty, Empty, SessionFunctionsBase<TKey, TValue, Empty, Empty, Empty>>(default);

// TODO: This requires ScanCursor to provide a consistent snapshot considering only records up to untilAddress
// There is a bug in the current implementation of ScanCursor, where it does not provide such a consistent snapshot
_ = s.ScanCursor(ref scannedUntilAddressCursor, long.MaxValue, scanFunctions, untilAddress);

// Reset the cursor to 0
scannedUntilAddressCursor = 0;

// Reset the callback functions
streamingSnapshotScanIteratorFunctions = null;

// Release the semaphore to allow the checkpoint waiting task to proceed
_hybridLogCheckpoint.flushedSemaphore.Release();
}
finally
{
Debug.Assert(systemState.Phase == Phase.WAIT_FLUSH);
GlobalStateMachineStep(systemState);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ public TsavoriteKV(KVSettings<TKey, TValue> kvSettings, TStoreFunctions storeFun
/// fail if we are already taking a checkpoint or performing some other
/// operation such as growing the index). Use CompleteCheckpointAsync to wait completion.
/// </returns>
public bool TryInitiateFullCheckpoint(out Guid token, CheckpointType checkpointType, long targetVersion = -1)
public bool TryInitiateFullCheckpoint(out Guid token, CheckpointType checkpointType, long targetVersion = -1, IScanIteratorFunctions<TKey, TValue> streamingSnapshotScanIteratorFunctions = null)
{
token = default;
bool result;
Expand All @@ -209,6 +209,7 @@ public bool TryInitiateFullCheckpoint(out Guid token, CheckpointType checkpointT
}
else if (checkpointType == CheckpointType.StreamingSnapshot)
{
this.streamingSnapshotScanIteratorFunctions = streamingSnapshotScanIteratorFunctions;
result = StartStateMachine(new StreamingSnapshotCheckpointStateMachine<TKey, TValue, TStoreFunctions, TAllocator>(targetVersion));
}
else
Expand Down