Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Implement "Async" RPC #22

Closed
wants to merge 28 commits into from
Closed
Changes from 1 commit
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
a17bcfe
feat: Implement "Async" RPC
L3tum Feb 5, 2024
0179dc4
fix: Issues uncovered by Psalm
L3tum Feb 5, 2024
e7372c9
feat: Add getResponses as well as fix a number of logic or Psalm errors
L3tum Feb 5, 2024
0a71a52
feat: Add an unholy amount of tests
L3tum Feb 5, 2024
38d73e6
feat: Optimize getResponse to remove array_search and fix potential o…
L3tum Feb 5, 2024
1edc9ae
chore: Add test for response buffer handling
L3tum Feb 5, 2024
f366338
fix: Accidentally saved too many responses
L3tum Feb 6, 2024
4605e3f
fix: Reorder methods to make sure we do not lose a relay
L3tum Feb 6, 2024
1648eeb
fix: Wrong order for $seq
L3tum Feb 6, 2024
05fee35
fix: Up response buffer maximum
L3tum Feb 6, 2024
a52fda3
fix: Add Error Handling to MultiRPC::getResponses()
L3tum Feb 6, 2024
e7dd727
feat: Simplify MultiRPC and MultiRelayHelper, fixes some issues resul…
L3tum Feb 7, 2024
419fa7e
fix: Actually call tests
L3tum Feb 7, 2024
c8ac30b
fix: Model array_key_last output as docblock
L3tum Feb 7, 2024
2122e95
fix: Issues introduced by simplification of relay handling
L3tum Feb 7, 2024
96fafa7
fix: Gracefully handle socket disconnect without blocking (too much) …
L3tum Feb 7, 2024
43fadcf
fix: Handle cloning of MultiRPC
L3tum Feb 8, 2024
812da39
fix: Handle cloning of MultiRPC with SocketRelay
L3tum Feb 8, 2024
6adeb39
fix: Typo in Testclass Name
L3tum Feb 12, 2024
8c0018e
fix: Add comments documenting ensureFreeRelayAvailable and getRespons…
L3tum Feb 12, 2024
bead301
fix: Simplify getResponses() a bit
L3tum Feb 12, 2024
74aa106
fix: Add configurable buffer threshold and change exception message
L3tum Feb 12, 2024
bb190ce
feat: Refactor code around specialty handling of SocketRelay and add …
L3tum Feb 13, 2024
33b234a
fix: Make exception more descriptive
L3tum Feb 13, 2024
9bea714
fix: Missing extends statement in interface
L3tum Feb 13, 2024
0514503
fix: Enforce __clone impl and test streams with data on them
L3tum Feb 13, 2024
c4b41cf
fix: Remove @throws Error annotation
L3tum Feb 13, 2024
e19db31
Fix code style
roxblnfk Feb 19, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
feat: Optimize getResponse to remove array_search and fix potential o…
…pportunities to lose a relay
  • Loading branch information
L3tum committed Feb 5, 2024
commit 38d73e6c47b0c789ba273cbd5d0f83a6539ea74d
2 changes: 1 addition & 1 deletion src/MultiRelayHelper.php
Original file line number Diff line number Diff line change
@@ -33,7 +33,7 @@ public static function findRelayWithMessage(array $relays, int $timeoutInMicrose
// A non-connected relay implies that it is free. We can eat the connection-cost if it means
// we'll have more Relays available.
// Not doing this would also potentially result in never using the relay in the first place.
if($relay->socket === null){
if ($relay->socket === null) {
return $index;
}

65 changes: 44 additions & 21 deletions src/RPC/MultiRPC.php
Original file line number Diff line number Diff line change
@@ -5,6 +5,7 @@
namespace Spiral\Goridge\RPC;

use RuntimeException;
use Spiral\Goridge\Exception\RelayException;
use Spiral\Goridge\Frame;
use Spiral\Goridge\MultiRelayHelper;
use Spiral\Goridge\Relay;
@@ -21,7 +22,8 @@ class MultiRPC extends AbstractRPC implements AsyncRPCInterface
private array $freeRelays = [];

/**
* @var array<int, RelayInterface>
* Occupied Relays alone is a map of seq to relay to make removal easier once a response is received.
* @var array<positive-int, RelayInterface>
*/
private array $occupiedRelays = [];

@@ -137,10 +139,10 @@ public function callAsync(string $method, mixed $payload): int

$relay = $this->getNextFreeRelay();
$relay->send($this->packFrame($method, $payload));
$this->occupiedRelays[] = $relay;
$seq = self::$seq;
$this->seqToRelayMap[$seq] = $relay;
self::$seq++;
$this->occupiedRelays[$seq] = $relay;
$this->seqToRelayMap[$seq] = $relay;
return $seq;
}

@@ -203,11 +205,10 @@ public function getResponse(int $seq, mixed $options = null): mixed
* Thus we only re-add (and do so without searching for it first) if we don't have the response yet.
*/
} else {
$this->freeRelays[] = $this->occupiedRelays[$seq];
unset($this->occupiedRelays[$seq]);

$frame = $relay->waitFrame();
if (($index = array_search($relay, $this->occupiedRelays, true)) !== false) {
/** @psalm-suppress MixedPropertyTypeCoercion It's always a RelayInterface, never mixed */
$this->freeRelays[] = array_splice($this->occupiedRelays, $index, 1)[0];
}
}

if (count($frame->options) !== 2) {
@@ -244,6 +245,7 @@ public function getResponses(array $seqs, mixed $options = null): iterable
} else {
$seqsToDo[] = $seq;
$relays[] = $relay;
unset($this->occupiedRelays[$seq]);
}
}

@@ -268,8 +270,9 @@ public function getResponses(array $seqs, mixed $options = null): iterable
/** @var positive-int $seq */
$seq = array_splice($seqsToDo, $relayIndex, 1)[0];

$frame = $relay->waitFrame();
// Add before waitFrame() to make sure we keep track of the $relay
$this->freeRelays[] = $relay;
$frame = $relay->waitFrame();

yield $seq => $this->decodeResponse($frame, $relay, $options);
}
@@ -292,7 +295,7 @@ private function getNextFreeRelay(): RelayInterface
$indexKeyed = array_flip($index);
foreach ($this->occupiedRelaysIgnoreResponse as $relayIndex => $relay) {
if (isset($indexKeyed[$relayIndex])) {
$relay->waitFrame();
$this->tryFlushRelay($relay, true);
$this->freeRelays[] = $relay;
} else {
$occupiedRelaysIgnoreResponse[] = $relay;
@@ -304,21 +307,27 @@ private function getNextFreeRelay(): RelayInterface
} elseif ($index !== false) {
/** @var RelayInterface $relay */
$relay = array_splice($this->occupiedRelaysIgnoreResponse, $index, 1)[0];
$relay->waitFrame();
$this->tryFlushRelay($relay, true);
return $relay;
}
}

if (count($this->occupiedRelays) > 0) {
// Check if the other relays have a free one
$index = MultiRelayHelper::findRelayWithMessage($this->occupiedRelays);

// This array_keys/array_values is so we can use socket_select/stream_select
$relayValues = array_values($this->occupiedRelays);
$relayKeys = array_keys($this->occupiedRelays);
$index = MultiRelayHelper::findRelayWithMessage($relayValues);
// To make sure nobody uses this
unset($relayValues);

if ($index === false) {
if (count($this->occupiedRelaysIgnoreResponse) > 0) {
// Wait for an ignore-response relay to become free (the oldest since it makes the most sense)
/** @var RelayInterface $relay */
$relay = array_shift($this->occupiedRelaysIgnoreResponse);
$relay->waitFrame();
$this->tryFlushRelay($relay, true);
return $relay;
} else {
// Use the oldest occupied relay for this instead
@@ -331,20 +340,34 @@ private function getNextFreeRelay(): RelayInterface
$index = $index[0];
}

// Put response into buffer
/** @var RelayInterface $relay */
$relay = array_splice($this->occupiedRelays, $index, 1)[0];
$frame = $relay->waitFrame();
$key = $relayKeys[$index];

if (count($frame->options) === 2) {
/** @var positive-int $responseSeq */
$responseSeq = $frame->options[0];
$this->asyncResponseBuffer[$responseSeq] = $frame;
}
// Put response into buffer
$relay = $this->occupiedRelays[$key];
unset($this->occupiedRelays[$key]);
$this->tryFlushRelay($relay, true);

return $relay;
}

throw new RuntimeException("No relays???");
}

private function tryFlushRelay(RelayInterface $relay, bool $saveResponse = false): void
{
try {
if (!$saveResponse) {
$relay->waitFrame();
} else {
$frame = $relay->waitFrame();
if (count($frame->options) === 2) {
/** @var positive-int $responseSeq */
$responseSeq = $frame->options[0];
$this->asyncResponseBuffer[$responseSeq] = $frame;
}
}
} catch (RelayException $exception) {
// Intentionally left blank
}
}
}
Loading
Oops, something went wrong.