Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Implement "Async" RPC #22

Closed
wants to merge 28 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
a17bcfe
feat: Implement "Async" RPC
L3tum Feb 5, 2024
0179dc4
fix: Issues uncovered by Psalm
L3tum Feb 5, 2024
e7372c9
feat: Add getResponses as well as fix a number of logic or Psalm errors
L3tum Feb 5, 2024
0a71a52
feat: Add an unholy amount of tests
L3tum Feb 5, 2024
38d73e6
feat: Optimize getResponse to remove array_search and fix potential o…
L3tum Feb 5, 2024
1edc9ae
chore: Add test for response buffer handling
L3tum Feb 5, 2024
f366338
fix: Accidentally saved too many responses
L3tum Feb 6, 2024
4605e3f
fix: Reorder methods to make sure we do not lose a relay
L3tum Feb 6, 2024
1648eeb
fix: Wrong order for $seq
L3tum Feb 6, 2024
05fee35
fix: Up response buffer maximum
L3tum Feb 6, 2024
a52fda3
fix: Add Error Handling to MultiRPC::getResponses()
L3tum Feb 6, 2024
e7dd727
feat: Simplify MultiRPC and MultiRelayHelper, fixes some issues resul…
L3tum Feb 7, 2024
419fa7e
fix: Actually call tests
L3tum Feb 7, 2024
c8ac30b
fix: Model array_key_last output as docblock
L3tum Feb 7, 2024
2122e95
fix: Issues introduced by simplification of relay handling
L3tum Feb 7, 2024
96fafa7
fix: Gracefully handle socket disconnect without blocking (too much) …
L3tum Feb 7, 2024
43fadcf
fix: Handle cloning of MultiRPC
L3tum Feb 8, 2024
812da39
fix: Handle cloning of MultiRPC with SocketRelay
L3tum Feb 8, 2024
6adeb39
fix: Typo in Testclass Name
L3tum Feb 12, 2024
8c0018e
fix: Add comments documenting ensureFreeRelayAvailable and getRespons…
L3tum Feb 12, 2024
bead301
fix: Simplify getResponses() a bit
L3tum Feb 12, 2024
74aa106
fix: Add configurable buffer threshold and change exception message
L3tum Feb 12, 2024
bb190ce
feat: Refactor code around specialty handling of SocketRelay and add …
L3tum Feb 13, 2024
33b234a
fix: Make exception more descriptive
L3tum Feb 13, 2024
9bea714
fix: Missing extends statement in interface
L3tum Feb 13, 2024
0514503
fix: Enforce __clone impl and test streams with data on them
L3tum Feb 13, 2024
c4b41cf
fix: Remove @throws Error annotation
L3tum Feb 13, 2024
e19db31
Fix code style
roxblnfk Feb 19, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
feat: Refactor code around specialty handling of SocketRelay and add …
…more test coverage
  • Loading branch information
L3tum committed Feb 13, 2024
commit bb190ce8f26e3568703aee79e5fd9133ecde5454
30 changes: 30 additions & 0 deletions src/ConnectedRelayInterface.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
<?php

namespace Spiral\Goridge;

use Spiral\Goridge\Exception\RelayException;

/**
* This interface describes a Relay that explictily establishes a connection.
* That connection can also be re-established on the fly (in comparison to StreamRelay, which relies on the existence of the streams).
* The object is also clonable, i.e. supports cloning without data errors due to shared state.
L3tum marked this conversation as resolved.
Show resolved Hide resolved
*/
interface ConnectedRelayInterface
{
/**
* Returns true if the underlying connection is already established
*/
public function isConnected(): bool;

/**
* Establishes the underlying connection and returns true on success, false on failure, or throws an exception in case of an error.
*
* @throws RelayException
*/
public function connect(): bool;

/**
* Closes the underlying connection.
*/
public function close(): void;
}
8 changes: 4 additions & 4 deletions src/MultiRelayHelper.php
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ public static function findRelayWithMessage(array $relays, int $timeoutInMicrose
}

/**
* @param array<array-key, RelayInterface> $relays
* @param array<array-key, ConnectedRelayInterface> $relays
* @return array-key[]|false
* @internal
* Returns either
Expand All @@ -98,14 +98,14 @@ public static function findRelayWithMessage(array $relays, int $timeoutInMicrose
*/
public static function checkConnected(array $relays): array|false
{
if (count($relays) === 0 || !$relays[array_key_first($relays)] instanceof SocketRelay) {
if (count($relays) === 0) {
return false;
}

$keysNotConnected = [];
foreach ($relays as $key => $relay) {
assert($relay instanceof SocketRelay);
if ($relay->socket === null) {
assert($relay instanceof ConnectedRelayInterface);
if (!$relay->isConnected()) {
$relay->connect();
$keysNotConnected[] = $key;
}
Expand Down
48 changes: 26 additions & 22 deletions src/RPC/MultiRPC.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

namespace Spiral\Goridge\RPC;

use Spiral\Goridge\ConnectedRelayInterface;
use Spiral\Goridge\Exception\RelayException;
use Spiral\Goridge\Exception\TransportException;
use Spiral\Goridge\Frame;
Expand All @@ -23,18 +24,21 @@ class MultiRPC extends AbstractRPC implements AsyncRPCInterface
private const DEFAULT_BUFFER_THRESHOLD = 10_000;

/**
* @var array<int, RelayInterface>
* @var array<int, ConnectedRelayInterface>
*/
private array $freeRelays = [];

/**
* Occupied Relays alone is a map of seq to relay to make removal easier once a response is received.
* @var array<positive-int, RelayInterface>
* Occupied Relays is a map of seq to relay to make removal easier once a response is received.
* @var array<positive-int, ConnectedRelayInterface>
*/
private array $occupiedRelays = [];

/**
* @var array<positive-int, RelayInterface>
* A map of seq to relay to use for decodeResponse().
* Technically the relay there is only needed in case of an error.
*
* @var array<positive-int, ConnectedRelayInterface>
*/
private array $seqToRelayMap = [];

Expand All @@ -46,6 +50,9 @@ class MultiRPC extends AbstractRPC implements AsyncRPCInterface
*/
private array $asyncResponseBuffer = [];

/**
* The threshold after which the asyncResponseBuffer is flushed of all entries.
*/
private int $asyncBufferThreshold = self::DEFAULT_BUFFER_THRESHOLD;

/**
Expand All @@ -56,9 +63,19 @@ public function __construct(
int $asyncBufferThreshold = self::DEFAULT_BUFFER_THRESHOLD,
CodecInterface $codec = new JsonCodec()
) {
if (count($relays) === 0) {
throw new RPCException("MultiRPC needs at least one relay. Zero provided.");
}

foreach ($relays as $relay) {
if (!($relay instanceof SocketRelay)) {
throw new RPCException("MultiRPC can only be used with SocketRelay");
if (!($relay instanceof ConnectedRelayInterface)) {
throw new RPCException(
sprintf(
"MultiRPC can only be used with relays implementing the %s, such as %s",
ConnectedRelayInterface::class,
SocketRelay::class
)
);
}
}

Expand Down Expand Up @@ -106,22 +123,12 @@ public static function create(
}

/**
* Force-connects all SocketRelays.
* Does nothing if no SocketRelay.
* Force-connects all relays.
* @throws RelayException
*/
public function preConnectRelays(): void
{
if (count($this->freeRelays) === 0) {
return;
}

if (!$this->freeRelays[0] instanceof SocketRelay) {
return;
}

foreach ($this->freeRelays as $relay) {
assert($relay instanceof SocketRelay);
// Force connect
$relay->connect();
}
Expand Down Expand Up @@ -370,14 +377,11 @@ private function ensureFreeRelayAvailable(): int
/**
* Gets a response from the relay, blocking for it if necessary, with some error handling in regards to mismatched seq
*
* @param RelayInterface $relay
* @param positive-int $expectedSeq
* @param bool $onlySaveResponseInCaseOfMismatchedSeq
* @return Frame
*/
private function getResponseFromRelay(RelayInterface $relay, int $expectedSeq, bool $onlySaveResponseInCaseOfMismatchedSeq = false): Frame
private function getResponseFromRelay(ConnectedRelayInterface $relay, int $expectedSeq, bool $onlySaveResponseInCaseOfMismatchedSeq = false): Frame
{
if ($relay instanceof SocketRelay && !$relay->isConnected()) {
if (!$relay->isConnected()) {
throw new TransportException("Unable to read payload from the stream");
}

Expand Down
2 changes: 1 addition & 1 deletion src/SocketRelay.php
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
*
* @psalm-suppress DeprecatedInterface
*/
class SocketRelay extends Relay implements Stringable
class SocketRelay extends Relay implements Stringable, ConnectedRelayInterface
{
final public const RECONNECT_RETRIES = 10;
final public const RECONNECT_TIMEOUT = 100;
Expand Down
4 changes: 2 additions & 2 deletions tests/Goridge/MsgPackMultiRPCTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ public function testJsonExceptionNotThrownWithIgnoreResponse(): void
/**
* @return MultiRPC
*/
protected function makeRPC(): MultiRPC
protected function makeRPC(int $count = 10): MultiRPC
{
return parent::makeRPC()->withCodec(new MsgpackCodec());
return parent::makeRPC($count)->withCodec(new MsgpackCodec());
}
}
97 changes: 89 additions & 8 deletions tests/Goridge/MultiRPC.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
use PHPUnit\Framework\TestCase;
use ReflectionMethod;
use ReflectionProperty;
use Spiral\Goridge\ConnectedRelayInterface;
use Spiral\Goridge\Exception\TransportException;
use Spiral\Goridge\RelayInterface;
use Spiral\Goridge\RPC\Codec\MsgpackCodec;
Expand Down Expand Up @@ -659,6 +660,7 @@ public function testHandleRelayDisconnectWithPressureGetResponses(): void
public function testHandleCloneCorrectly(): void
{
$this->rpc->preConnectRelays();
$this->rpc->callIgnoreResponse('Service.Ping', 'ping');
$clonedRpc = $this->rpc->withCodec(new MsgpackCodec());
for ($i = 0; $i < 50; $i++) {
$clonedRpc->callIgnoreResponse('Service.Ping', 'ping');
Expand All @@ -684,12 +686,92 @@ public function testHandleCloneCorrectly(): void
$this->assertSame('pong', $this->rpc->call('Service.Ping', 'ping'));
}

public function testAllowsOnlySockets(): void{
public function testAllowsOnlySockets(): void
{
$this->expectException(RPCException::class);
$this->expectExceptionMessage("MultiRPC can only be used with SocketRelay");
$this->expectExceptionMessage(
sprintf(
"MultiRPC can only be used with relays implementing the %s, such as %s",
ConnectedRelayInterface::class,
SocketRelay::class
)
);
$this->rpc = new GoridgeMultiRPC([new StreamRelay(STDIN, STDOUT)]);
}

public function testNeedsAtLeastOne(): void
{
$this->expectException(RPCException::class);
$this->expectExceptionMessage("MultiRPC needs at least one relay. Zero provided.");
$this->rpc = new GoridgeMultiRPC([]);
}

public function testChecksIfResponseIsInRelay(): void
{
$id = $this->rpc->callAsync('Service.Ping', 'ping');
// Wait a bit
usleep(100 * 1000);

$this->assertTrue($this->rpc->hasResponse($id));
}

public function testChecksIfResponseIsInBuffer(): void
{
$id = $this->rpc->callAsync('Service.Ping', 'ping');
// Wait a bit
usleep(100 * 1000);
$this->forceFlushRpc($this->rpc);

$this->assertTrue($this->rpc->hasResponse($id));
}

public function testChecksIfResponseIsNotReceivedYet(): void
{
$id = $this->rpc->callAsync('Service.Ping', 'ping');
$this->assertFalse($this->rpc->hasResponse($id));
}

public function testChecksMultipleResponses(): void
{
$ids = [];
$ids[] = $this->rpc->callAsync('Service.Ping', 'ping');
$this->forceFlushRpc($this->rpc);
$ids[] = $this->rpc->callAsync('Service.Ping', 'ping');
usleep(100 * 1000);
$ids[] = $this->rpc->callAsync('Service.Ping', 'ping');
$responses = $this->rpc->hasResponses($ids);
$this->assertContains($ids[0], $responses);
$this->assertContains($ids[1], $responses);
$this->assertNotContains($ids[2], $responses);
}

public function testHasResponsesReturnsEmptyArrayWhenNoResponses(): void
{
$id = $this->rpc->callAsync('Service.Ping', 'ping');
$this->assertEmpty($this->rpc->hasResponses([$id]));
}

public function testGetResponsesReturnsWhenNoRelaysAvailableToAvoidInfiniteLoop(): void
{
// occupiedRelays is already empty
$rpc = $this->makeRPC();
$property = new ReflectionProperty(GoridgeMultiRPC::class, 'freeRelays');
$property->setValue($rpc, []);
$this->expectException(RPCException::class);
$this->expectExceptionMessage("No relays available at all");
$rpc->call('Service.Ping', 'ping');
}

public function testMultiRPCIsUsableWithOneRelay(): void
{
$rpc = $this->makeRPC(1);
$rpc->callIgnoreResponse('Service.Ping', 'ping');
$rpc->callIgnoreResponse('Service.SleepEcho', 'Hello');
$id = $rpc->callAsync('Service.Ping', 'ping');
$rpc->callIgnoreResponse('Service.Echo', 'Hello');
$this->assertSame('pong', $rpc->getResponse($id));
}

protected function setUp(): void
{
$this->rpc = $this->makeRPC();
Expand All @@ -698,13 +780,12 @@ protected function setUp(): void
/**
* @return GoridgeMultiRPC
*/
protected function makeRPC(): GoridgeMultiRPC
protected function makeRPC(int $count = 10): GoridgeMultiRPC
{
$relays = [];
for ($i = 0; $i < 10; $i++) {
$relays[] = $this->makeRelay();
}
return new GoridgeMultiRPC($relays);
$type = self::SOCK_TYPE->value;
$address = self::SOCK_ADDR;
$port = self::SOCK_PORT;
return GoridgeMultiRPC::create("$type://$address:$port", $count);
}

/**
Expand Down
17 changes: 17 additions & 0 deletions tests/Goridge/MultiRelayHelperTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
<?php

namespace Goridge;

use PHPUnit\Framework\TestCase;
use Spiral\Goridge\MultiRelayHelper;
use Spiral\Goridge\StreamRelay;

class MultiRelayHelperTest extends TestCase
{
public function testSupportsStreamRelay(): void
{
$relays = [new StreamRelay(STDIN, STDOUT), new StreamRelay(STDIN, STDERR)];
// No message available on STDIN, aka a read would block, so this returns false
$this->assertFalse(MultiRelayHelper::findRelayWithMessage($relays));
}
L3tum marked this conversation as resolved.
Show resolved Hide resolved
L3tum marked this conversation as resolved.
Show resolved Hide resolved
}