-
Notifications
You must be signed in to change notification settings - Fork 6
Messages
Payload in kino is transferred wrapped into a message. Declaring a message actually means creating a class derived from Payload:
public class MyMessage : Payload
{
private static readonly byte[] MessageIdentity = "NAMESPACE.MYMESSAGE".GetBytes();
private static readonly byte[] MessageVersion = "1.0".GetBytes();
// Here comes declaration of message properties
public override byte[] Version => MessageVersion;
public override byte[] Identity => MessageIdentity;
}
Identity and Version properties are used to route a message to corresponding actor. Because all messages are globally visible to all nodes of the kino network, it is a good practice to prefix message Identity with namespace for logical grouping.
Default message serializer is protobuf-net. Nevertheless, it is possible to define a different serializer for a message by injecting it via constructor overload:
public abstract class Payload : IPayload
{
private static readonly IMessageSerializer DefaultSerializer = new ProtobufMessageSerializer();
private readonly IMessageSerializer messageSerializer;
protected Payload(IMessageSerializer messageSerializer)
{
this.messageSerializer = messageSerializer;
}
protected Payload()
{
messageSerializer = DefaultSerializer;
}
public virtual T Deserialize<T>(byte[] content)
=> messageSerializer.Deserialize<T>(content);
public virtual byte[] Serialize()
=> messageSerializer.Serialize(this);
}
Messages can be sent as unicast, broadcast or directly to a specific node (MessageRouter).
In first case, message is forwarded only to the first found actor, which is able to process it.
In case of broadcasting, message is sent to all actors, registered within the network:
IPayload payload = new MyMessage();
IMessage message = Message.Create(payload, DistributionPattern.Broadcast);
If message is created as the first (start) message of the flow, a new unique CorrelationId will be assigned to it:
IMessage message = Message.CreateFlowStartMessage(new MyMessage());
Every other message, which will be created during the flow, will automatically get the same CorrelationId assigned. This is the responsibility of the framework.
You can as well send a message to the specific node, i.e. MessageRouter. Let's assume you've got the SocketIdentifier of the target node and assigned it to receiverIdentity variable (for instance, by analyzing the KnownMessageRoutesMessage received as a response to RequestKnownMessageRoutesMessage). Then the code which accomplishes the task may look like this:
IMessage message = Message.CreateFlowStartMessage(new MyMessage());
message.SetReceiverNode(receiverIdentity);
// Now, send the message
In this case sender's ExternalRoutingTable will be looked up by the value, stored in receiverIdentity. If it appears, that the receiver is either not registered or is not able to process the message, no further routing attempts will be done and the message will not be sent.
In most cases you can simple call GetPayload() method of the message instance:
MyMessage payload = message.GetPayload<MyMessage>();
Or, if you are not sure about exact message type you received, you can check it first comparing Identity and Version properties:
MessageIdentifier messageIdentifier = MessageIdentifier.Create<MyMessage>();
if(Unsafe.Equals(messageIdentifier.Identity, message.Identity
&& Unsafe.Equals(messageIdentifier.Version, messageVersion)))
{
MyMessage payload = message.GetPayload<MyMessage>();
}
FRAME CONTENT DESCRIPTION
(of n
total
frames) bits
=====================================================================================
0 Receiver socket ID Identity of the receiving socket
1 [0] Empty frame
..
.... more ......
Body Frame Message body frames
.. Body ..
.... more ......
ROUTER URI <= VERSIONING: new frames come on top
ROUTER ID
.. Message routing ..
.... more ......
Callback Partition <= VERSIONING: new frames come on top
Callback Version
Callback Message ID
.. Callbacks ..
n-13 1-16 Routing Start Frame Offset Offset of the first message route frame
17-32 Routing Entry Count Number of routing entries
33-48 Routing Frane Divisor Number of frames per one routing entry
49-64 Message Hops Number of times a Message was sent away to another node
n-12 1-16 Callback Start Frame Offset Offset of the first callback frame
17-32 Callback Entry Count Number of callback entries
33-48 Callback Frame Divisor Number of frames per one callback entry
n-11 Receiver ID Socket Identity of the receiver of the message. Empty frame, for the non-specific receiver
n-10 Callback Receiver ID Value from this field will be copied to Receiver ID
when Message ID becomes equal to Callback Message ID
n-9 Receiver Node ID MessageRouter Socket Identity to which the message should be routed
n-8 Partition Message Partition
n-7 Version Message Version
n-6 Message ID Message Identity
n-5 1-16 Trace Options Type of the tracing info to be recorded, i.e. routing
17-32 Distribution Pattern One of the following distribution patterns {Unicast, Broadcast}
Third distribution pattern, Direct, uses address in Receiver Node ID
n-4 Correlation ID Unique identifier, copied onto all messages, spawned within the same flow, from the initial one
n-3 TTL
n-2 1-16 Body First Frame Offset Offset of the first message body frame
17-32 Body Frame Count Number of body frames
n-1 Wire Format Version Version of message wire format