Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Streaming snapshot checkpoint in Tsavorite #824

Merged
merged 20 commits into from
Dec 13, 2024
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 24 additions & 12 deletions libs/storage/Tsavorite/cs/src/core/Allocator/AllocatorScan.cs
Original file line number Diff line number Diff line change
Expand Up @@ -186,25 +186,28 @@ internal unsafe bool GetFromDiskAndPushToReader<TScanFunctions>(ref TKey key, re
/// <remarks>Currently we load an entire page, which while inefficient in performance, allows us to make the cursor safe (by ensuring we align to a valid record) if it is not
/// the last one returned. We could optimize this to load only the subset of a page that is pointed to by the cursor and do GetRequiredRecordSize/RetrievedFullRecord as in
/// AsyncGetFromDiskCallback. However, this would not validate the cursor and would therefore require maintaining a cursor history.</remarks>
internal abstract bool ScanCursor<TScanFunctions>(TsavoriteKV<TKey, TValue, TStoreFunctions, TAllocator> store, ScanCursorState<TKey, TValue> scanCursorState, ref long cursor, long count, TScanFunctions scanFunctions, long endAddress, bool validateCursor)
internal abstract bool ScanCursor<TScanFunctions>(TsavoriteKV<TKey, TValue, TStoreFunctions, TAllocator> store, ScanCursorState<TKey, TValue> scanCursorState, ref long cursor, long count, TScanFunctions scanFunctions, long endAddress, bool validateCursor, long maxAddress)
where TScanFunctions : IScanIteratorFunctions<TKey, TValue>;

private protected bool ScanLookup<TInput, TOutput, TScanFunctions, TScanIterator>(TsavoriteKV<TKey, TValue, TStoreFunctions, TAllocator> store,
ScanCursorState<TKey, TValue> scanCursorState, ref long cursor, long count, TScanFunctions scanFunctions, TScanIterator iter, bool validateCursor)
ScanCursorState<TKey, TValue> scanCursorState, ref long cursor, long count, TScanFunctions scanFunctions, TScanIterator iter, bool validateCursor, long maxAddress)
where TScanFunctions : IScanIteratorFunctions<TKey, TValue>
where TScanIterator : ITsavoriteScanIterator<TKey, TValue>, IPushScanIterator<TKey>
{
using var session = store.NewSession<TInput, TOutput, Empty, LogScanCursorFunctions<TInput, TOutput>>(new LogScanCursorFunctions<TInput, TOutput>());
var bContext = session.BasicContext;

if (cursor >= GetTailAddress())
goto IterationComplete;

if (cursor < BeginAddress) // This includes 0, which means to start the Scan
cursor = BeginAddress;
else if (validateCursor)
iter.SnapCursorToLogicalAddress(ref cursor);

if (!scanFunctions.OnStart(cursor, iter.EndAddress))
return false;

if (cursor >= GetTailAddress())
goto IterationComplete;

scanCursorState.Initialize(scanFunctions);

long numPending = 0;
Expand All @@ -214,7 +217,7 @@ private protected bool ScanLookup<TInput, TOutput, TScanFunctions, TScanIterator
{
ref var key = ref iter.GetKey();
ref var value = ref iter.GetValue();
var status = bContext.ConditionalScanPush(scanCursorState, recordInfo, ref key, ref value, iter.CurrentAddress, iter.NextAddress);
var status = bContext.ConditionalScanPush(scanCursorState, recordInfo, ref key, ref value, iter.CurrentAddress, iter.NextAddress, maxAddress);
if (status.IsPending)
{
++numPending;
Expand All @@ -227,13 +230,19 @@ private protected bool ScanLookup<TInput, TOutput, TScanFunctions, TScanIterator
}

// Update the cursor to point to the next record.
cursor = iter.NextAddress;
if (scanCursorState.retryLastRecord)
cursor = iter.CurrentAddress;
else
cursor = iter.NextAddress;

// Now see if we completed the enumeration.
if (scanCursorState.stop)
goto IterationComplete;
if (scanCursorState.acceptedCount >= count || scanCursorState.endBatch)
{
scanFunctions.OnStop(true, scanCursorState.acceptedCount);
return true;
}
}

// Drain any pending pushes. We have ended the iteration; there are no more records, so drop through to end it.
Expand All @@ -242,12 +251,13 @@ private protected bool ScanLookup<TInput, TOutput, TScanFunctions, TScanIterator

IterationComplete:
cursor = 0;
scanFunctions.OnStop(false, scanCursorState.acceptedCount);
return false;
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
internal Status ConditionalScanPush<TInput, TOutput, TContext, TSessionFunctionsWrapper>(TSessionFunctionsWrapper sessionFunctions, ScanCursorState<TKey, TValue> scanCursorState, RecordInfo recordInfo,
ref TKey key, ref TValue value, long currentAddress, long minAddress)
ref TKey key, ref TValue value, long currentAddress, long minAddress, long maxAddress)
where TSessionFunctionsWrapper : ISessionFunctionsWrapper<TKey, TValue, TInput, TOutput, TContext, TStoreFunctions, TAllocator>
{
Debug.Assert(epoch.ThisInstanceProtected(), "This is called only from ScanLookup so the epoch should be protected");
Expand All @@ -259,7 +269,7 @@ internal Status ConditionalScanPush<TInput, TOutput, TContext, TSessionFunctions
do
{
// If a more recent version of the record exists, do not push this one. Start by searching in-memory.
if (sessionFunctions.Store.TryFindRecordInMainLogForConditionalOperation<TInput, TOutput, TContext, TSessionFunctionsWrapper>(sessionFunctions, ref key, ref stackCtx, currentAddress, minAddress, out internalStatus, out needIO))
if (sessionFunctions.Store.TryFindRecordInMainLogForConditionalOperation<TInput, TOutput, TContext, TSessionFunctionsWrapper>(sessionFunctions, ref key, ref stackCtx, currentAddress, minAddress, maxAddress, out internalStatus, out needIO))
return Status.CreateFound();
}
while (sessionFunctions.Store.HandleImmediateNonPendingRetryStatus<TInput, TOutput, TContext, TSessionFunctionsWrapper>(internalStatus, sessionFunctions));
Expand All @@ -270,7 +280,7 @@ internal Status ConditionalScanPush<TInput, TOutput, TContext, TSessionFunctions
{
// A more recent version of the key was not (yet) found and we need another IO to continue searching.
internalStatus = PrepareIOForConditionalScan(sessionFunctions, ref pendingContext, ref key, ref input, ref value, ref output, default,
ref stackCtx, minAddress, scanCursorState);
ref stackCtx, minAddress, maxAddress, scanCursorState);
}
else
{
Expand All @@ -288,6 +298,8 @@ internal Status ConditionalScanPush<TInput, TOutput, TContext, TSessionFunctions
Interlocked.Increment(ref scanCursorState.acceptedCount);
if ((cursorRecordResult & CursorRecordResult.EndBatch) != 0)
scanCursorState.endBatch = true;
if ((cursorRecordResult & CursorRecordResult.RetryLastRecord) != 0)
scanCursorState.retryLastRecord = true;
}
internalStatus = OperationStatus.SUCCESS;
}
Expand All @@ -298,12 +310,12 @@ internal Status ConditionalScanPush<TInput, TOutput, TContext, TSessionFunctions
internal static OperationStatus PrepareIOForConditionalScan<TInput, TOutput, TContext, TSessionFunctionsWrapper>(TSessionFunctionsWrapper sessionFunctions,
ref TsavoriteKV<TKey, TValue, TStoreFunctions, TAllocator>.PendingContext<TInput, TOutput, TContext> pendingContext,
ref TKey key, ref TInput input, ref TValue value, ref TOutput output, TContext userContext,
ref OperationStackContext<TKey, TValue, TStoreFunctions, TAllocator> stackCtx, long minAddress, ScanCursorState<TKey, TValue> scanCursorState)
ref OperationStackContext<TKey, TValue, TStoreFunctions, TAllocator> stackCtx, long minAddress, long maxAddress, ScanCursorState<TKey, TValue> scanCursorState)
where TSessionFunctionsWrapper : ISessionFunctionsWrapper<TKey, TValue, TInput, TOutput, TContext, TStoreFunctions, TAllocator>
{
// WriteReason is not surfaced for this operation, so pick anything.
var status = sessionFunctions.Store.PrepareIOForConditionalOperation(sessionFunctions, ref pendingContext, ref key, ref input, ref value, ref output,
userContext, ref stackCtx, minAddress, WriteReason.Compaction, OperationType.CONDITIONAL_SCAN_PUSH);
userContext, ref stackCtx, minAddress, maxAddress, WriteReason.Compaction, OperationType.CONDITIONAL_SCAN_PUSH);
pendingContext.scanCursorState = scanCursorState;
return status;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -273,10 +273,10 @@ internal override bool Scan<TScanFunctions>(TsavoriteKV<TKey, TValue, TStoreFunc
/// Implementation for push-scanning Tsavorite log with a cursor, called from LogAccessor
/// </summary>
internal override bool ScanCursor<TScanFunctions>(TsavoriteKV<TKey, TValue, TStoreFunctions, BlittableAllocator<TKey, TValue, TStoreFunctions>> store,
ScanCursorState<TKey, TValue> scanCursorState, ref long cursor, long count, TScanFunctions scanFunctions, long endAddress, bool validateCursor)
ScanCursorState<TKey, TValue> scanCursorState, ref long cursor, long count, TScanFunctions scanFunctions, long endAddress, bool validateCursor, long maxAddress)
{
using BlittableScanIterator<TKey, TValue, TStoreFunctions> iter = new(store, this, cursor, endAddress, ScanBufferingMode.SinglePageBuffering, false, epoch, logger: logger);
return ScanLookup<long, long, TScanFunctions, BlittableScanIterator<TKey, TValue, TStoreFunctions>>(store, scanCursorState, ref cursor, count, scanFunctions, iter, validateCursor);
return ScanLookup<long, long, TScanFunctions, BlittableScanIterator<TKey, TValue, TStoreFunctions>>(store, scanCursorState, ref cursor, count, scanFunctions, iter, validateCursor, maxAddress);
}

/// <summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1013,10 +1013,10 @@ internal override bool Scan<TScanFunctions>(TsavoriteKV<TKey, TValue, TStoreFunc
/// Implementation for push-scanning Tsavorite log with a cursor, called from LogAccessor
/// </summary>
internal override bool ScanCursor<TScanFunctions>(TsavoriteKV<TKey, TValue, TStoreFunctions, GenericAllocator<TKey, TValue, TStoreFunctions>> store,
ScanCursorState<TKey, TValue> scanCursorState, ref long cursor, long count, TScanFunctions scanFunctions, long endAddress, bool validateCursor)
ScanCursorState<TKey, TValue> scanCursorState, ref long cursor, long count, TScanFunctions scanFunctions, long endAddress, bool validateCursor, long maxAddress)
{
using GenericScanIterator<TKey, TValue, TStoreFunctions> iter = new(store, this, cursor, endAddress, ScanBufferingMode.SinglePageBuffering, false, epoch, logger: logger);
return ScanLookup<long, long, TScanFunctions, GenericScanIterator<TKey, TValue, TStoreFunctions>>(store, scanCursorState, ref cursor, count, scanFunctions, iter, validateCursor);
return ScanLookup<long, long, TScanFunctions, GenericScanIterator<TKey, TValue, TStoreFunctions>>(store, scanCursorState, ref cursor, count, scanFunctions, iter, validateCursor, maxAddress);
}

/// <summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,12 @@ public enum CursorRecordResult
/// <summary>
/// End the current cursor batch (as if "count" had been met); return a valid cursor for the next ScanCursor call
/// </summary>
EndBatch = 4
EndBatch = 4,

/// <summary>
/// Retry the last record when returning a valid cursor
/// </summary>
RetryLastRecord = 8,
}

/// <summary>
Expand All @@ -42,7 +47,7 @@ public interface IScanIteratorFunctions<TKey, TValue>
/// <param name="key">Reference to the current record's key</param>
/// <param name="value">Reference to the current record's Value</param>
/// <param name="recordMetadata">Record metadata, including <see cref="RecordInfo"/> and the current record's logical address</param>
/// <param name="numberOfRecords">The number of records returned so far, including the current one.</param>
/// <param name="numberOfRecords">The number of records accepted so far, not including the current one.</param>
/// <param name="cursorRecordResult">Indicates whether the current record was accepted, or whether to end the current ScanCursor call.
/// Ignored for non-cursor Scans; set to <see cref="CursorRecordResult.Accept"/>.</param>
/// <returns>True to continue iteration, else false</returns>
Expand All @@ -52,7 +57,7 @@ public interface IScanIteratorFunctions<TKey, TValue>
/// <param name="key">Reference to the current record's key</param>
/// <param name="value">Reference to the current record's Value</param>
/// <param name="recordMetadata">Record metadata, including <see cref="RecordInfo"/> and the current record's logical address</param>
/// <param name="numberOfRecords">The number of records returned so far, including the current one.</param>
/// <param name="numberOfRecords">The number of records accepted so far, not including the current one.</param>
/// <param name="cursorRecordResult">Indicates whether the current record was accepted, or whether to end the current ScanCursor call.
/// Ignored for non-cursor Scans; set to <see cref="CursorRecordResult.Accept"/>.</param>
/// <returns>True to continue iteration, else false</returns>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.

using System;

namespace Tsavorite.core
{
/// <summary>
/// Callback functions for streaming snapshot iteration
/// </summary>
public interface IStreamingSnapshotIteratorFunctions<TKey, TValue>
{
/// <summary>Iteration is starting.</summary>
/// <param name="checkpointToken">Checkpoint token</param>
/// <param name="currentVersion">Current version of database</param>
/// <param name="targetVersion">Target version of database</param>
/// <returns>True to continue iteration, else false</returns>
bool OnStart(Guid checkpointToken, long currentVersion, long targetVersion);

/// <summary>Next record in the streaming snapshot.</summary>
/// <param name="key">Reference to the current record's key</param>
/// <param name="value">Reference to the current record's Value</param>
/// <param name="recordMetadata">Record metadata, including <see cref="RecordInfo"/> and the current record's logical address</param>
/// <param name="numberOfRecords">The number of records returned so far, not including the current one.</param>
/// <returns>True to continue iteration, else false</returns>
bool Reader(ref TKey key, ref TValue value, RecordMetadata recordMetadata, long numberOfRecords);

/// <summary>Iteration is complete.</summary>
/// <param name="completed">If true, the iteration completed; else OnStart() or Reader() returned false to stop the iteration.</param>
/// <param name="numberOfRecords">The number of records returned before the iteration stopped.</param>
void OnStop(bool completed, long numberOfRecords);

/// <summary>An exception was thrown on iteration (likely during <see name="Reader"/>.</summary>
/// <param name="exception">The exception that was thrown.</param>
/// <param name="numberOfRecords">The number of records returned before the exception.</param>
void OnException(Exception exception, long numberOfRecords);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,17 @@ namespace Tsavorite.core
internal sealed class ScanCursorState<TKey, TValue>
{
internal IScanIteratorFunctions<TKey, TValue> functions;
internal long acceptedCount; // Number of records pushed to and accepted by the caller
internal bool endBatch; // End the batch (but return a valid cursor for the next batch, as of "count" records had been returned)
internal bool stop; // Stop the operation (as if all records in the db had been returned)
internal long acceptedCount; // Number of records pushed to and accepted by the caller
internal bool endBatch; // End the batch (but return a valid cursor for the next batch, as if "count" records had been returned)
internal bool retryLastRecord; // Retry the last record when returning a valid cursor
internal bool stop; // Stop the operation (as if all records in the db had been returned)

internal void Initialize(IScanIteratorFunctions<TKey, TValue> scanIteratorFunctions)
{
functions = scanIteratorFunctions;
acceptedCount = 0;
endBatch = false;
retryLastRecord = false;
stop = false;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -359,10 +359,10 @@ internal override bool Scan<TScanFunctions>(TsavoriteKV<SpanByte, SpanByte, TSto
/// Implementation for push-scanning Tsavorite log with a cursor, called from LogAccessor
/// </summary>
internal override bool ScanCursor<TScanFunctions>(TsavoriteKV<SpanByte, SpanByte, TStoreFunctions, SpanByteAllocator<TStoreFunctions>> store,
ScanCursorState<SpanByte, SpanByte> scanCursorState, ref long cursor, long count, TScanFunctions scanFunctions, long endAddress, bool validateCursor)
ScanCursorState<SpanByte, SpanByte> scanCursorState, ref long cursor, long count, TScanFunctions scanFunctions, long endAddress, bool validateCursor, long maxAddress)
{
using SpanByteScanIterator<TStoreFunctions> iter = new(store, this, cursor, endAddress, ScanBufferingMode.SinglePageBuffering, false, epoch, logger: logger);
return ScanLookup<SpanByte, SpanByteAndMemory, TScanFunctions, SpanByteScanIterator<TStoreFunctions>>(store, scanCursorState, ref cursor, count, scanFunctions, iter, validateCursor);
return ScanLookup<SpanByte, SpanByteAndMemory, TScanFunctions, SpanByteScanIterator<TStoreFunctions>>(store, scanCursorState, ref cursor, count, scanFunctions, iter, validateCursor, maxAddress);
}

/// <summary>
Expand Down
Loading
Loading