Skip to content

Commit

Permalink
Prevent events from being dropped during the connection process (#612)
Browse files Browse the repository at this point in the history
* Spawn all shards before identifying; add identify delay even when not in the initial connection phase.

* Revert example

* Buffer gateway events until client is ready; avoid parsing gateway events multiple times.

* Add log message when identifying to the Gateway

* Prevent identify locks when Gateway is closing

* Ensure Gateway.events is a broadcast stream

* Check for remaining identify requests at every identify

* Refactor resumability check in shard

* Add more logging for shards

* Correct shard starting delay calculation

* Add test for multiple shards
  • Loading branch information
abitofevrything authored Dec 31, 2023
1 parent 896004a commit bb54de1
Show file tree
Hide file tree
Showing 6 changed files with 188 additions and 88 deletions.
3 changes: 2 additions & 1 deletion lib/nyxx.dart
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,8 @@ export 'src/http/managers/interaction_manager.dart' show InteractionManager;
export 'src/http/managers/entitlement_manager.dart' show EntitlementManager;

export 'src/gateway/gateway.dart' show Gateway;
export 'src/gateway/message.dart' show Disconnecting, Dispose, ErrorReceived, EventReceived, GatewayMessage, Send, Sent, ShardData, ShardMessage;
export 'src/gateway/message.dart'
show Disconnecting, Dispose, ErrorReceived, EventReceived, GatewayMessage, Send, Sent, ShardData, ShardMessage, Identify, RequestingIdentify, StartShard;
export 'src/gateway/shard.dart' show Shard;

export 'src/models/discord_color.dart' show DiscordColor;
Expand Down
100 changes: 62 additions & 38 deletions lib/src/gateway/gateway.dart
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ import 'package:nyxx/src/models/presence.dart';
import 'package:nyxx/src/models/snowflake.dart';
import 'package:nyxx/src/models/user/user.dart';
import 'package:nyxx/src/utils/cache_helpers.dart';
import 'package:nyxx/src/utils/iterable_extension.dart';
import 'package:nyxx/src/utils/parsing_helpers.dart';

/// Handles the connection to Discord's Gateway with shards, manages the client's cache based on Gateway events and provides an interface to the Gateway.
Expand All @@ -63,23 +62,29 @@ class Gateway extends GatewayManager with EventParser {
final List<Shard> shards;

/// A stream of messages received from all shards.
Stream<ShardMessage> get messages => _messagesController.stream;
// Adapting _messagesController.stream to a broadcast stream instead of
// simply making _messagesController a broadcast controller means events will
// be buffered until this field is initialized, which prevents events from
// being dropped during the connection process.
late final Stream<ShardMessage> messages = _messagesController.stream.asBroadcastStream();

final StreamController<ShardMessage> _messagesController = StreamController.broadcast();
final StreamController<ShardMessage> _messagesController = StreamController();

/// A stream of dispatch events received from all shards.
Stream<DispatchEvent> get events => messages.map((message) {
if (message is! EventReceived) {
return null;
}

final event = message.event;
if (event is! RawDispatchEvent) {
return null;
}

return parseDispatchEvent(event);
}).whereType<DispatchEvent>();
// Make this late instead of a getter so only a single subscription is made, which prevents events from being parsed multiple times.
late final Stream<DispatchEvent> events = messages.transform(StreamTransformer.fromBind((messages) async* {
await for (final message in messages) {
if (message is! EventReceived) continue;

final event = message.event;
if (event is! RawDispatchEvent) continue;

final parsedEvent = parseDispatchEvent(event);
// Update the cache as needed.
client.updateCacheWith(parsedEvent);
yield parsedEvent;
}
})).asBroadcastStream();

bool _closing = false;

Expand All @@ -90,9 +95,49 @@ class Gateway extends GatewayManager with EventParser {

/// Create a new [Gateway].
Gateway(this.client, this.gatewayBot, this.shards, this.totalShards, this.shardIds) : super.create() {
final logger = Logger('${client.options.loggerName}.Gateway');

const identifyDelay = Duration(seconds: 5);
final maxConcurrency = gatewayBot.sessionStartLimit.maxConcurrency;
var remainingIdentifyRequests = gatewayBot.sessionStartLimit.remaining;

// A mapping of rateLimitId (shard.id % maxConcurrency) to Futures that complete when the identify lock for that rate_limit_key is no longer used.
final identifyLocks = <int, Future<void>>{};

// Handle messages from the shards and start them according to their rate limit key.
for (final shard in shards) {
final rateLimitKey = shard.id % maxConcurrency;

// Delay the shard starting until it is (approximately) also ready to identify.
Timer(identifyDelay * (shard.id ~/ maxConcurrency), () {
logger.fine('Starting shard ${shard.id}');
shard.add(StartShard());
});

shard.listen(
_messagesController.add,
(event) {
_messagesController.add(event);

if (event is RequestingIdentify) {
final currentLock = identifyLocks[rateLimitKey] ?? Future.value();
identifyLocks[rateLimitKey] = currentLock.then((_) async {
if (_closing) return;

if (remainingIdentifyRequests < client.options.minimumSessionStarts * 5) {
logger.warning('$remainingIdentifyRequests session starts remaining');
}

if (remainingIdentifyRequests < client.options.minimumSessionStarts) {
await client.close();
throw OutOfRemainingSessionsError(gatewayBot);
}

remainingIdentifyRequests--;
shard.add(Identify());
return await Future.delayed(identifyDelay);
});
}
},
onError: _messagesController.addError,
onDone: () async {
if (_closing) {
Expand All @@ -106,9 +151,6 @@ class Gateway extends GatewayManager with EventParser {
cancelOnError: false,
);
}

// Handle all events which should update cache.
events.listen(client.updateCacheWith);
}

/// Connect to the gateway using the provided [client] and [gatewayBot] configuration.
Expand All @@ -126,14 +168,6 @@ class Gateway extends GatewayManager with EventParser {
' Remaining Session Starts: ${gatewayBot.sessionStartLimit.remaining}, Reset After: ${gatewayBot.sessionStartLimit.resetAfter}',
);

if (gatewayBot.sessionStartLimit.remaining < 50) {
logger.warning('${gatewayBot.sessionStartLimit.remaining} session starts remaining');
}

if (gatewayBot.sessionStartLimit.remaining < client.options.minimumSessionStarts) {
throw OutOfRemainingSessionsError(gatewayBot);
}

assert(
shardIds.every((element) => element < totalShards),
'Shard ID exceeds total shard count',
Expand All @@ -154,17 +188,7 @@ class Gateway extends GatewayManager with EventParser {
'Cannot enable payload compression when using the ETF payload format',
);

const identifyDelay = Duration(seconds: 5);

final shards = shardIds.indexed.map(((int, int) info) {
final (index, id) = info;

return Future.delayed(
identifyDelay * (index ~/ gatewayBot.sessionStartLimit.maxConcurrency),
() => Shard.connect(id, totalShards, client.apiOptions, gatewayBot.url, client),
);
});

final shards = shardIds.map((id) => Shard.connect(id, totalShards, client.apiOptions, gatewayBot.url, client));
return Gateway(client, gatewayBot, await Future.wait(shards), totalShards, shardIds);
}

Expand Down
9 changes: 9 additions & 0 deletions lib/src/gateway/message.dart
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@ class Sent extends ShardMessage {
Sent({required this.payload});
}

/// A shard message sent when the shard is waiting to identify on the Gateway.
class RequestingIdentify extends ShardMessage {}

/// The base class for all control messages sent from the client to the shard.
abstract class GatewayMessage with ToStringHelper {}

Expand All @@ -85,6 +88,12 @@ class Send extends GatewayMessage {
Send({required this.opcode, required this.data});
}

/// A gateway message sent when the [Gateway] instance is ready for the shard to start.
class StartShard extends GatewayMessage {}

/// A gateway message sent as a response to [RequestingIdentify] to allow the shard to identify.
class Identify extends GatewayMessage {}

/// A gateway message sent to instruct the shard to disconnect & stop handling any further messages.
///
/// The shard can no longer be used after this is sent.
Expand Down
16 changes: 10 additions & 6 deletions lib/src/gateway/shard.dart
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ class Shard extends Stream<ShardMessage> implements StreamSink<GatewayMessage> {
logger.info('Reconnected to Gateway');
}
}
} else if (message is RequestingIdentify) {
logger.fine('Ready to identify');
}
});

Expand All @@ -103,11 +105,11 @@ class Shard extends Stream<ShardMessage> implements StreamSink<GatewayMessage> {
static Future<Shard> connect(int id, int totalShards, GatewayApiOptions apiOptions, Uri connectionUri, NyxxGateway client) async {
final logger = Logger('${client.options.loggerName}.Shards[$id]');

logger.info('Connecting to Gateway');

final receivePort = ReceivePort('Shard #$id message stream (main)');
final receiveStream = receivePort.asBroadcastStream();

logger.fine('Spawning shard runner');

final isolate = await Isolate.spawn(
_isolateMain,
_IsolateSpawnData(
Expand All @@ -130,6 +132,8 @@ class Shard extends Stream<ShardMessage> implements StreamSink<GatewayMessage> {

final sendPort = await receiveStream.first as SendPort;

logger.fine('Shard runner ready');

return Shard(id, isolate, receiveStream, sendPort, client);
}

Expand All @@ -149,6 +153,8 @@ class Shard extends Stream<ShardMessage> implements StreamSink<GatewayMessage> {
..finer('Opcode: ${event.opcode.value}, Data: ${event.data}');
} else if (event is Dispose) {
logger.info('Disposing');
} else if (event is Identify) {
logger.info('Connecting to Gateway');
}
sendPort.send(event);
}
Expand All @@ -172,12 +178,10 @@ class Shard extends Stream<ShardMessage> implements StreamSink<GatewayMessage> {
Future<void> doClose() async {
add(Dispose());

// Wait for disconnection confirmation
await firstWhere((message) => message is Disconnecting);

// Give the isolate time to shut down cleanly, but kill it if it takes too long.
try {
await drain().timeout(const Duration(seconds: 5));
// Wait for disconnection confirmation
await firstWhere((message) => message is Disconnecting).then(drain).timeout(const Duration(seconds: 5));
} on TimeoutException {
logger.warning('Isolate took too long to shut down, killing it');
isolate.kill(priority: Isolate.immediate);
Expand Down
Loading

0 comments on commit bb54de1

Please sign in to comment.