Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Compatibility] Added ZRANGESTORE command #826

Merged
merged 15 commits into from
Dec 8, 2024
Merged
86 changes: 86 additions & 0 deletions libs/resources/RespCommandsDocs.json
Original file line number Diff line number Diff line change
Expand Up @@ -5918,6 +5918,92 @@
}
]
},
{
"Command": "ZRANGESTORE",
"Name": "ZRANGESTORE",
"Summary": "Stores a range of members from sorted set in a key.",
"Group": "SortedSet",
"Complexity": "O(log(N)\u002BM) with N being the number of elements in the sorted set and M the number of elements stored into the destination key.",
"Arguments": [
{
"TypeDiscriminator": "RespCommandKeyArgument",
"Name": "DST",
"DisplayText": "dst",
"Type": "Key",
"KeySpecIndex": 0
},
{
"TypeDiscriminator": "RespCommandKeyArgument",
"Name": "SRC",
"DisplayText": "src",
"Type": "Key",
"KeySpecIndex": 1
},
{
"TypeDiscriminator": "RespCommandBasicArgument",
"Name": "MIN",
"DisplayText": "min",
"Type": "String"
},
{
"TypeDiscriminator": "RespCommandBasicArgument",
"Name": "MAX",
"DisplayText": "max",
"Type": "String"
},
{
"TypeDiscriminator": "RespCommandContainerArgument",
"Name": "SORTBY",
"Type": "OneOf",
"ArgumentFlags": "Optional",
"Arguments": [
{
"TypeDiscriminator": "RespCommandBasicArgument",
"Name": "BYSCORE",
"DisplayText": "byscore",
"Type": "PureToken",
"Token": "BYSCORE"
},
{
"TypeDiscriminator": "RespCommandBasicArgument",
"Name": "BYLEX",
"DisplayText": "bylex",
"Type": "PureToken",
"Token": "BYLEX"
}
]
},
{
"TypeDiscriminator": "RespCommandBasicArgument",
"Name": "REV",
"DisplayText": "rev",
"Type": "PureToken",
"Token": "REV",
"ArgumentFlags": "Optional"
},
{
"TypeDiscriminator": "RespCommandContainerArgument",
"Name": "LIMIT",
"Type": "Block",
"Token": "LIMIT",
"ArgumentFlags": "Optional",
"Arguments": [
{
"TypeDiscriminator": "RespCommandBasicArgument",
"Name": "OFFSET",
"DisplayText": "offset",
"Type": "Integer"
},
{
"TypeDiscriminator": "RespCommandBasicArgument",
"Name": "COUNT",
"DisplayText": "count",
"Type": "Integer"
}
]
}
]
},
{
"Command": "ZRANK",
"Name": "ZRANK",
Expand Down
38 changes: 38 additions & 0 deletions libs/resources/RespCommandsInfo.json
Original file line number Diff line number Diff line change
Expand Up @@ -4364,6 +4364,44 @@
}
]
},
{
"Command": "ZRANGESTORE",
"Name": "ZRANGESTORE",
"Arity": -5,
"Flags": "DenyOom, Write",
"FirstKey": 1,
"LastKey": 2,
"Step": 1,
"AclCategories": "SortedSet, Slow, Write",
"KeySpecifications": [
{
"BeginSearch": {
"TypeDiscriminator": "BeginSearchIndex",
"Index": 1
},
"FindKeys": {
"TypeDiscriminator": "FindKeysRange",
"LastKey": 0,
"KeyStep": 1,
"Limit": 0
},
"Flags": "OW, Update"
},
{
"BeginSearch": {
"TypeDiscriminator": "BeginSearchIndex",
"Index": 2
},
"FindKeys": {
"TypeDiscriminator": "FindKeysRange",
"LastKey": 0,
"KeyStep": 1,
"Limit": 0
},
"Flags": "RO, Access"
}
]
},
{
"Command": "ZRANK",
"Name": "ZRANK",
Expand Down
4 changes: 4 additions & 0 deletions libs/server/API/GarnetApiObjectCommands.cs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ public GarnetStatus SortedSetAdd(ArgSlice key, (ArgSlice score, ArgSlice member)
public GarnetStatus SortedSetAdd(byte[] key, ref ObjectInput input, ref GarnetObjectStoreOutput output)
=> storageSession.SortedSetAdd(key, ref input, ref output, ref objectContext);

/// <inheritdoc />
public GarnetStatus SortedSetRangeStore(ArgSlice distKey, ArgSlice sbKey, ref ObjectInput input, out int result)
=> storageSession.SortedSetRangeStore(distKey, sbKey, ref input, out result, ref objectContext);

/// <inheritdoc />
public GarnetStatus SortedSetRemove(ArgSlice key, ArgSlice member, out int zremCount)
=> storageSession.SortedSetRemove(key.ToArray(), member, out zremCount, ref objectContext);
Expand Down
10 changes: 10 additions & 0 deletions libs/server/API/IGarnetApi.cs
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,16 @@ public interface IGarnetApi : IGarnetReadApi, IGarnetAdvancedApi
/// <returns></returns>
GarnetStatus SortedSetAdd(byte[] key, ref ObjectInput input, ref GarnetObjectStoreOutput output);

/// <summary>
/// Stores a range of sorted set elements in the specified key space.
/// </summary>
/// <param name="distKey">The distribution key for the sorted set.</param>
/// <param name="sbKey">The sub-key for the sorted set.</param>
/// <param name="input">The input object containing the elements to store.</param>
/// <param name="result">The result of the store operation.</param>
/// <returns>A <see cref="GarnetStatus"/> indicating the status of the operation.</returns>
GarnetStatus SortedSetRangeStore(ArgSlice distKey, ArgSlice sbKey, ref ObjectInput input, out int result);
TalZaccai marked this conversation as resolved.
Show resolved Hide resolved

/// <summary>
/// Removes the specified member from the sorted set stored at key.
/// </summary>
Expand Down
2 changes: 2 additions & 0 deletions libs/server/Objects/SortedSet/SortedSetObject.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ public enum SortedSetOperation : byte
ZRANK,
ZRANGE,
ZRANGEBYSCORE,
ZRANGESTORE,
GEOADD,
GEOHASH,
GEODIST,
Expand Down Expand Up @@ -256,6 +257,7 @@ public override unsafe bool Operate(ref ObjectInput input, ref SpanByteAndMemory
SortedSetRank(ref input, ref output);
break;
case SortedSetOperation.ZRANGE:
case SortedSetOperation.ZRANGESTORE:
SortedSetRange(ref input, ref output);
break;
case SortedSetOperation.ZRANGEBYSCORE:
Expand Down
3 changes: 3 additions & 0 deletions libs/server/Objects/SortedSet/SortedSetObjectImpl.cs
Original file line number Diff line number Diff line change
Expand Up @@ -435,6 +435,9 @@ private void SortedSetRange(ref ObjectInput input, ref SpanByteAndMemory output)
ZRangeOptions options = new();
switch (input.header.SortedSetOp)
{
case SortedSetOperation.ZRANGESTORE:
options.WithScores = true;
break;
case SortedSetOperation.ZRANGEBYSCORE:
options.ByScore = true;
break;
Expand Down
32 changes: 32 additions & 0 deletions libs/server/Resp/Objects/SortedSetCommands.cs
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,38 @@ private unsafe bool SortedSetRange<TGarnetApi>(RespCommand command, ref TGarnetA
return true;
}

private unsafe bool SortedSetRangeStore<TGarnetApi>(ref TGarnetApi storageApi)
where TGarnetApi : IGarnetApi
{
// ZRANGESTORE dst src min max [BYSCORE | BYLEX] [REV] [LIMIT offset count]
if (parseState.Count is < 4 or > 9)
{
return AbortWithWrongNumberOfArguments(nameof(RespCommand.ZRANGESTORE));
}

var destKey = parseState.GetArgSliceByRef(0);
var sbKey = parseState.GetArgSliceByRef(1);

var header = new RespInputHeader(GarnetObjectType.SortedSet) { SortedSetOp = SortedSetOperation.ZRANGESTORE };
var input = new ObjectInput(header, ref parseState, startIdx: 2, arg1: respProtocolVersion);

var status = storageApi.SortedSetRangeStore(destKey, sbKey, ref input, out int result);

switch (status)
{
case GarnetStatus.OK:
while (!RespWriteUtils.WriteInteger(result, ref dcurr, dend))
SendAndReset();
break;
case GarnetStatus.WRONGTYPE:
while (!RespWriteUtils.WriteError(CmdStrings.RESP_ERR_WRONG_TYPE, ref dcurr, dend))
SendAndReset();
break;
}

return true;
}

/// <summary>
/// Returns the score of member in the sorted set at key.
/// If member does not exist in the sorted set, or key does not exist, nil is returned.
Expand Down
5 changes: 5 additions & 0 deletions libs/server/Resp/Parser/RespCommand.cs
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ public enum RespCommand : ushort
ZINCRBY,
ZPOPMAX,
ZPOPMIN,
ZRANGESTORE,
ZREM,
ZREMRANGEBYLEX,
ZREMRANGEBYRANK,
Expand Down Expand Up @@ -1401,6 +1402,10 @@ private RespCommand FastParseArrayCommand(ref int count, ref ReadOnlySpan<byte>
{
return RespCommand.INCRBYFLOAT;
}
else if (*(ulong*)(ptr + 2) == MemoryMarshal.Read<ulong>("1\r\nZRANG"u8) && *(ulong*)(ptr + 10) == MemoryMarshal.Read<ulong>("ESTORE\r\n"u8))
{
return RespCommand.ZRANGESTORE;
}
break;

case 12:
Expand Down
1 change: 1 addition & 0 deletions libs/server/Resp/RespServerSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -598,6 +598,7 @@ private bool ProcessArrayCommands<TGarnetApi>(RespCommand cmd, ref TGarnetApi st
RespCommand.ZINCRBY => SortedSetIncrement(ref storageApi),
RespCommand.ZRANK => SortedSetRank(cmd, ref storageApi),
RespCommand.ZRANGE => SortedSetRange(cmd, ref storageApi),
RespCommand.ZRANGESTORE => SortedSetRangeStore(ref storageApi),
RespCommand.ZRANGEBYSCORE => SortedSetRange(cmd, ref storageApi),
RespCommand.ZREVRANK => SortedSetRank(cmd, ref storageApi),
RespCommand.ZREMRANGEBYLEX => SortedSetLengthByValue(cmd, ref storageApi),
Expand Down
102 changes: 102 additions & 0 deletions libs/server/Storage/Session/ObjectStore/SortedSetOps.cs
Original file line number Diff line number Diff line change
Expand Up @@ -692,6 +692,108 @@ public GarnetStatus SortedSetAdd<TObjectContext>(byte[] key, ref ObjectInput inp
where TObjectContext : ITsavoriteContext<byte[], IGarnetObject, ObjectInput, GarnetObjectStoreOutput, long, ObjectSessionFunctions, ObjectStoreFunctions, ObjectStoreAllocator>
=> RMWObjectStoreOperationWithOutput(key, ref input, ref objectStoreContext, ref output);

/// <summary>
/// ZRANGESTORE - Stores a range of sorted set elements into a destination key.
/// </summary>
/// <typeparam name="TObjectContext">The type of the object context.</typeparam>
/// <param name="distKey">The destination key where the range will be stored.</param>
/// <param name="sbKey">The source key from which the range will be taken.</param>
/// <param name="input">The input object containing range parameters.</param>
/// <param name="result">The result of the operation, indicating the number of elements stored.</param>
/// <param name="objectStoreContext">The context of the object store.</param>
/// <returns>Returns a GarnetStatus indicating the success or failure of the operation.</returns>
public unsafe GarnetStatus SortedSetRangeStore<TObjectContext>(ArgSlice distKey, ArgSlice sbKey, ref ObjectInput input, out int result, ref TObjectContext objectStoreContext)
where TObjectContext : ITsavoriteContext<byte[], IGarnetObject, ObjectInput, GarnetObjectStoreOutput, long, ObjectSessionFunctions, ObjectStoreFunctions, ObjectStoreAllocator>
{
result = 0;

if (distKey.Length == 0 || sbKey.Length == 0)
return GarnetStatus.OK;

var createTransaction = false;

if (txnManager.state != TxnState.Running)
{
Debug.Assert(txnManager.state == TxnState.None);
createTransaction = true;
txnManager.SaveKeyEntryToLock(distKey, true, LockType.Exclusive);
txnManager.SaveKeyEntryToLock(sbKey, true, LockType.Shared);
_ = txnManager.Run(true);
}

// SetObject
var objectStoreLockableContext = txnManager.ObjectStoreLockableContext;

try
{
SpanByteAndMemory rangeOutputMem = default;
var rangeOutput = new GarnetObjectStoreOutput() { spanByteAndMemory = rangeOutputMem };
var status = SortedSetRange(sbKey.ToArray(), ref input, ref rangeOutput, ref objectStoreLockableContext);
TalZaccai marked this conversation as resolved.
Show resolved Hide resolved
rangeOutputMem = rangeOutput.spanByteAndMemory;

if (status == GarnetStatus.WRONGTYPE)
{
return GarnetStatus.WRONGTYPE;
}

if (status == GarnetStatus.NOTFOUND)
{
// Expire/Delete the destination key if the source key is not found
_ = EXPIRE(distKey, TimeSpan.Zero, out _, StoreType.Object, ExpireOption.None, ref lockableContext, ref objectStoreLockableContext);
return GarnetStatus.OK;
}

Debug.Assert(!rangeOutputMem.IsSpanByte, "Output should not be in SpanByte format when the status is OK");

var rangeOutputHandler = rangeOutputMem.Memory.Memory.Pin();
try
{
var rangeOutPtr = (byte*)rangeOutputHandler.Pointer;
ref var currOutPtr = ref rangeOutPtr;
var endOutPtr = rangeOutPtr + rangeOutputMem.Length;

var destinationKey = distKey.ToArray();
objectStoreLockableContext.Delete(ref destinationKey);

RespReadUtils.ReadUnsignedArrayLength(out var arrayLen, ref currOutPtr, endOutPtr);
Debug.Assert(arrayLen % 2 == 0, "Should always contain element and its score");
result = arrayLen / 2;

if (result > 0)
{
parseState.Initialize(arrayLen); // 2 elements per pair (result * 2)

for (int j = 0; j < result; j++)
{
// Read member/element into parse state
parseState.Read((2 * j) + 1, ref currOutPtr, endOutPtr);
// Read score into parse state
parseState.Read(2 * j, ref currOutPtr, endOutPtr);
}

var zAddInput = new ObjectInput(new RespInputHeader
{
type = GarnetObjectType.SortedSet,
SortedSetOp = SortedSetOperation.ZADD,
}, ref parseState);

var zAddOutput = new GarnetObjectStoreOutput { spanByteAndMemory = new SpanByteAndMemory(null) };
RMWObjectStoreOperationWithOutput(destinationKey, ref zAddInput, ref objectStoreLockableContext, ref zAddOutput);
}
}
finally
{
rangeOutputHandler.Dispose();
}
return status;
}
finally
{
if (createTransaction)
txnManager.Commit(true);
}
}

/// <summary>
/// Removes the specified members from the sorted set stored at key.
/// Non existing members are ignored.
Expand Down
1 change: 1 addition & 0 deletions playground/CommandInfoUpdater/SupportedCommand.cs
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,7 @@ public class SupportedCommand
new("ZRANDMEMBER", RespCommand.ZRANDMEMBER),
new("ZRANGE", RespCommand.ZRANGE),
new("ZRANGEBYSCORE", RespCommand.ZRANGEBYSCORE),
new("ZRANGESTORE", RespCommand.ZRANGESTORE),
new("ZRANK", RespCommand.ZRANK),
new("ZREM", RespCommand.ZREM),
new("ZREMRANGEBYLEX", RespCommand.ZREMRANGEBYLEX),
Expand Down
Loading