Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Compatibility] Added ZRANGESTORE command #826

Merged
merged 15 commits into from
Dec 8, 2024
Merged
Next Next commit
WIP of ZRANGESTORE
  • Loading branch information
Vijay-Nirmal committed Nov 23, 2024
commit e055f4287c2cc62dd07e979b97254499d58a5e7e
86 changes: 86 additions & 0 deletions libs/resources/RespCommandsDocs.json
Original file line number Diff line number Diff line change
Expand Up @@ -5697,6 +5697,92 @@
}
]
},
{
"Command": "ZRANGESTORE",
"Name": "ZRANGESTORE",
"Summary": "Stores a range of members from sorted set in a key.",
"Group": "SortedSet",
"Complexity": "O(log(N)\u002BM) with N being the number of elements in the sorted set and M the number of elements stored into the destination key.",
"Arguments": [
{
"TypeDiscriminator": "RespCommandKeyArgument",
"Name": "DST",
"DisplayText": "dst",
"Type": "Key",
"KeySpecIndex": 0
},
{
"TypeDiscriminator": "RespCommandKeyArgument",
"Name": "SRC",
"DisplayText": "src",
"Type": "Key",
"KeySpecIndex": 1
},
{
"TypeDiscriminator": "RespCommandBasicArgument",
"Name": "MIN",
"DisplayText": "min",
"Type": "String"
},
{
"TypeDiscriminator": "RespCommandBasicArgument",
"Name": "MAX",
"DisplayText": "max",
"Type": "String"
},
{
"TypeDiscriminator": "RespCommandContainerArgument",
"Name": "SORTBY",
"Type": "OneOf",
"ArgumentFlags": "Optional",
"Arguments": [
{
"TypeDiscriminator": "RespCommandBasicArgument",
"Name": "BYSCORE",
"DisplayText": "byscore",
"Type": "PureToken",
"Token": "BYSCORE"
},
{
"TypeDiscriminator": "RespCommandBasicArgument",
"Name": "BYLEX",
"DisplayText": "bylex",
"Type": "PureToken",
"Token": "BYLEX"
}
]
},
{
"TypeDiscriminator": "RespCommandBasicArgument",
"Name": "REV",
"DisplayText": "rev",
"Type": "PureToken",
"Token": "REV",
"ArgumentFlags": "Optional"
},
{
"TypeDiscriminator": "RespCommandContainerArgument",
"Name": "LIMIT",
"Type": "Block",
"Token": "LIMIT",
"ArgumentFlags": "Optional",
"Arguments": [
{
"TypeDiscriminator": "RespCommandBasicArgument",
"Name": "OFFSET",
"DisplayText": "offset",
"Type": "Integer"
},
{
"TypeDiscriminator": "RespCommandBasicArgument",
"Name": "COUNT",
"DisplayText": "count",
"Type": "Integer"
}
]
}
]
},
{
"Command": "ZRANK",
"Name": "ZRANK",
Expand Down
38 changes: 38 additions & 0 deletions libs/resources/RespCommandsInfo.json
Original file line number Diff line number Diff line change
Expand Up @@ -4326,6 +4326,44 @@
}
]
},
{
"Command": "ZRANGESTORE",
"Name": "ZRANGESTORE",
"Arity": -5,
"Flags": "DenyOom, Write",
"FirstKey": 1,
"LastKey": 2,
"Step": 1,
"AclCategories": "SortedSet, Slow, Write",
"KeySpecifications": [
{
"BeginSearch": {
"TypeDiscriminator": "BeginSearchIndex",
"Index": 1
},
"FindKeys": {
"TypeDiscriminator": "FindKeysRange",
"LastKey": 0,
"KeyStep": 1,
"Limit": 0
},
"Flags": "OW, Update"
},
{
"BeginSearch": {
"TypeDiscriminator": "BeginSearchIndex",
"Index": 2
},
"FindKeys": {
"TypeDiscriminator": "FindKeysRange",
"LastKey": 0,
"KeyStep": 1,
"Limit": 0
},
"Flags": "RO, Access"
}
]
},
{
"Command": "ZRANK",
"Name": "ZRANK",
Expand Down
4 changes: 4 additions & 0 deletions libs/server/API/GarnetApiObjectCommands.cs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ public GarnetStatus SortedSetAdd(ArgSlice key, (ArgSlice score, ArgSlice member)
public GarnetStatus SortedSetAdd(byte[] key, ref ObjectInput input, ref GarnetObjectStoreOutput output)
=> storageSession.SortedSetAdd(key, ref input, ref output, ref objectContext);

/// <inheritdoc />
public GarnetStatus SortedSetRangeStore(ArgSlice distKey, ArgSlice sbKey, ref ObjectInput input, out int result)
=> storageSession.SortedSetRangeStore(distKey, sbKey, ref input, out result, ref objectContext);

/// <inheritdoc />
public GarnetStatus SortedSetRemove(ArgSlice key, ArgSlice member, out int zremCount)
=> storageSession.SortedSetRemove(key.ToArray(), member, out zremCount, ref objectContext);
Expand Down
10 changes: 10 additions & 0 deletions libs/server/API/IGarnetApi.cs
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,16 @@ public interface IGarnetApi : IGarnetReadApi, IGarnetAdvancedApi
/// <returns></returns>
GarnetStatus SortedSetAdd(byte[] key, ref ObjectInput input, ref GarnetObjectStoreOutput output);

/// <summary>
/// Stores a range of sorted set elements in the specified key space.
/// </summary>
/// <param name="distKey">The distribution key for the sorted set.</param>
/// <param name="sbKey">The sub-key for the sorted set.</param>
/// <param name="input">The input object containing the elements to store.</param>
/// <param name="result">The result of the store operation.</param>
/// <returns>A <see cref="GarnetStatus"/> indicating the status of the operation.</returns>
GarnetStatus SortedSetRangeStore(ArgSlice distKey, ArgSlice sbKey, ref ObjectInput input, out int result);
TalZaccai marked this conversation as resolved.
Show resolved Hide resolved

/// <summary>
/// Removes the specified member from the sorted set stored at key.
/// </summary>
Expand Down
32 changes: 32 additions & 0 deletions libs/server/Resp/Objects/SortedSetCommands.cs
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,38 @@ private unsafe bool SortedSetRange<TGarnetApi>(RespCommand command, ref TGarnetA
return true;
}

private unsafe bool SortedSetRangeStore<TGarnetApi>(RespCommand command, ref TGarnetApi storageApi)
where TGarnetApi : IGarnetApi
{
// ZRANGESTORE dst src min max [BYSCORE | BYLEX] [REV] [LIMIT offset count]
if (parseState.Count is < 4 or > 9)
{
return AbortWithWrongNumberOfArguments(nameof(RespCommand.ZRANGESTORE));
}

var destKey = parseState.GetArgSliceByRef(0);
var sbKey = parseState.GetArgSliceByRef(1);

var header = new RespInputHeader(GarnetObjectType.SortedSet);
var input = new ObjectInput(header, ref parseState, startIdx: 2, arg1: respProtocolVersion);

var status = storageApi.SortedSetRangeStore(destKey, sbKey, ref input, out int result);

switch (status)
{
case GarnetStatus.OK:
while (!RespWriteUtils.WriteInteger(result, ref dcurr, dend))
SendAndReset();
break;
case GarnetStatus.WRONGTYPE:
while (!RespWriteUtils.WriteError(CmdStrings.RESP_ERR_WRONG_TYPE, ref dcurr, dend))
SendAndReset();
break;
}

return true;
}

/// <summary>
/// Returns the score of member in the sorted set at key.
/// If member does not exist in the sorted set, or key does not exist, nil is returned.
Expand Down
5 changes: 5 additions & 0 deletions libs/server/Resp/Parser/RespCommand.cs
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ public enum RespCommand : ushort
ZINCRBY,
ZPOPMAX,
ZPOPMIN,
ZRANGESTORE,
ZREM,
ZREMRANGEBYLEX,
ZREMRANGEBYRANK,
Expand Down Expand Up @@ -1400,6 +1401,10 @@ private RespCommand FastParseArrayCommand(ref int count, ref ReadOnlySpan<byte>
{
return RespCommand.INCRBYFLOAT;
}
else if (*(ulong*)(ptr + 2) == MemoryMarshal.Read<ulong>("1\r\nZRANG"u8) && *(ulong*)(ptr + 10) == MemoryMarshal.Read<ulong>("ESTORE\r\n"u8))
{
return RespCommand.ZRANGESTORE;
}
break;

case 12:
Expand Down
14 changes: 14 additions & 0 deletions libs/server/Resp/Parser/SessionParseState.cs
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,20 @@ public void SetArguments(int i, params ArgSlice[] args)
}
}

/// <summary>
/// Set arguments starting at a specific index
/// </summary>
/// <param name="i">Index of buffer at which to start setting arguments</param>
/// <param name="args">Arguments to set</param>
public void SetArguments(int i, ReadOnlySpan<ArgSlice> args)
{
Debug.Assert(i + args.Length - 1 < Count);
for (var j = 0; j < args.Length; j++)
{
*(bufferPtr + i + j) = args[j];
}
}

/// <summary>
/// Get serialized length of parse state
/// </summary>
Expand Down
1 change: 1 addition & 0 deletions libs/server/Resp/RespServerSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -596,6 +596,7 @@ private bool ProcessArrayCommands<TGarnetApi>(RespCommand cmd, ref TGarnetApi st
RespCommand.ZINCRBY => SortedSetIncrement(ref storageApi),
RespCommand.ZRANK => SortedSetRank(cmd, ref storageApi),
RespCommand.ZRANGE => SortedSetRange(cmd, ref storageApi),
RespCommand.ZRANGE => SortedSetRangeStore(ref storageApi),
RespCommand.ZRANGEBYSCORE => SortedSetRange(cmd, ref storageApi),
RespCommand.ZREVRANK => SortedSetRank(cmd, ref storageApi),
RespCommand.ZREMRANGEBYLEX => SortedSetLengthByValue(cmd, ref storageApi),
Expand Down
107 changes: 107 additions & 0 deletions libs/server/Storage/Session/ObjectStore/SortedSetOps.cs
Original file line number Diff line number Diff line change
Expand Up @@ -692,6 +692,113 @@ public GarnetStatus SortedSetAdd<TObjectContext>(byte[] key, ref ObjectInput inp
where TObjectContext : ITsavoriteContext<byte[], IGarnetObject, ObjectInput, GarnetObjectStoreOutput, long, ObjectSessionFunctions, ObjectStoreFunctions, ObjectStoreAllocator>
=> RMWObjectStoreOperationWithOutput(key, ref input, ref objectStoreContext, ref output);

/// <summary>
/// ZRANGESTORE - Stores a range of sorted set elements into a destination key.
/// </summary>
/// <typeparam name="TObjectContext">The type of the object context.</typeparam>
/// <param name="distKey">The destination key where the range will be stored.</param>
/// <param name="sbKey">The source key from which the range will be taken.</param>
/// <param name="input">The input object containing range parameters.</param>
/// <param name="result">The result of the operation, indicating the number of elements stored.</param>
/// <param name="objectStoreContext">The context of the object store.</param>
/// <returns>Returns a GarnetStatus indicating the success or failure of the operation.</returns>
public GarnetStatus SortedSetRangeStore<TObjectContext>(ArgSlice distKey, ArgSlice sbKey, ref ObjectInput input, out int result, ref TObjectContext objectStoreContext)
where TObjectContext : ITsavoriteContext<byte[], IGarnetObject, ObjectInput, GarnetObjectStoreOutput, long, ObjectSessionFunctions, ObjectStoreFunctions, ObjectStoreAllocator>
{
result = 0;

if (distKey.Length == 0 || sbKey.Length == 0)
return GarnetStatus.OK;

var createTransaction = false;

if (txnManager.state != TxnState.Running)
{
Debug.Assert(txnManager.state == TxnState.None);
createTransaction = true;
txnManager.SaveKeyEntryToLock(distKey, true, LockType.Exclusive);
txnManager.SaveKeyEntryToLock(sbKey, true, LockType.Shared);
_ = txnManager.Run(true);
}

// SetObject
var setObjectStoreLockableContext = txnManager.ObjectStoreLockableContext;

try
{
var rangeParseState = new SessionParseState();
rangeParseState.Initialize(input.parseState.Count - 1);
rangeParseState.SetArguments(0, input.parseState.Parameters.Slice(1));
rangeParseState.SetArguments(input.parseState.Count - 2, ArgSlice.FromPinnedSpan(CmdStrings.WITHSCORES));

var rangeInput = new ObjectInput(input.header, ref rangeParseState);
SpanByteAndMemory rangeOutputMem = default;
var rangeOutput = new GarnetObjectStoreOutput() { spanByteAndMemory = rangeOutputMem };
var status = SortedSetRange(sbKey.ToArray(), ref rangeInput, ref rangeOutput, ref objectStoreContext);

if (status == GarnetStatus.WRONGTYPE)
{
return GarnetStatus.WRONGTYPE;
}

if (status == GarnetStatus.NOTFOUND)
{
// Expire/Delete the destination key if the source key is not found
_ = EXPIRE(distKey, TimeSpan.Zero, out _, StoreType.Object, ExpireOption.None, ref lockableContext, ref objectStoreLockableContext);
return GarnetStatus.OK;
}

Debug.Assert(!rangeOutputMem.IsSpanByte, "Output should not be in SpanByte format when the status is OK");

var rangeOutputHandler = rangeOutputMem.Memory.Memory.Pin();

if (status == GarnetStatus.OK)
{
var destinationKey = distKey.ToArray();
objectStoreLockableContext.Delete(ref destinationKey);

var zParseState = new SessionParseState();
zParseState.Initialize(foundItems * 2);

for (int j = 0; j < foundItems; j++)
{
RespReadUtils.ReadUnsignedArrayLength(out var innerLength, ref currOutPtr, endOutPtr);
Debug.Assert(innerLength == 2, "Should always has location and hash or distance");

RespReadUtils.TrySliceWithLengthHeader(out var location, ref currOutPtr, endOutPtr);
if (storeDistIdx != -1)
{
RespReadUtils.ReadSpanWithLengthHeader(out var score, ref currOutPtr, endOutPtr);
zParseState.SetArgument(2 * j, ArgSlice.FromPinnedSpan(score));
zParseState.SetArgument((2 * j) + 1, ArgSlice.FromPinnedSpan(location));
}
else
{
RespReadUtils.ReadIntegerAsSpan(out var score, ref currOutPtr, endOutPtr);
zParseState.SetArgument(2 * j, ArgSlice.FromPinnedSpan(score));
zParseState.SetArgument((2 * j) + 1, ArgSlice.FromPinnedSpan(location));
}
}

var zAddInput = new ObjectInput(new RespInputHeader
{
type = GarnetObjectType.SortedSet,
SortedSetOp = SortedSetOperation.ZADD,
}, ref zParseState);

var zAddOutput = new GarnetObjectStoreOutput { spanByteAndMemory = new SpanByteAndMemory(null) };
RMWObjectStoreOperationWithOutput(destinationKey, ref zAddInput, ref objectStoreLockableContext, ref zAddOutput);
}

return status;
}
finally
{
if (createTransaction)
txnManager.Commit(true);
}
}

/// <summary>
/// Removes the specified members from the sorted set stored at key.
/// Non existing members are ignored.
Expand Down
2 changes: 1 addition & 1 deletion playground/CommandInfoUpdater/CommandDocsUpdater.cs
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ private static IReadOnlyDictionary<string, RespCommandDocs> GetUpdatedCommandsDo
}

// Update commands docs with commands to add
foreach (var command in commandsToAdd.Keys)
foreach (var command in commandsToAdd.Keys.Where(x => x.Command == "ZRANGESTORE"))
{
RespCommandDocs baseCommandDocs;
List<RespCommandDocs> updatedSubCommandsDocs;
Expand Down
1 change: 1 addition & 0 deletions playground/CommandInfoUpdater/SupportedCommand.cs
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,7 @@ public class SupportedCommand
new("ZRANDMEMBER", RespCommand.ZRANDMEMBER),
new("ZRANGE", RespCommand.ZRANGE),
new("ZRANGEBYSCORE", RespCommand.ZRANGEBYSCORE),
new("ZRANGESTORE", RespCommand.ZRANGESTORE),
new("ZRANK", RespCommand.ZRANK),
new("ZREM", RespCommand.ZREM),
new("ZREMRANGEBYLEX", RespCommand.ZREMRANGEBYLEX),
Expand Down