Skip to content

Commit

Permalink
Added queue for controller's response to nodes
Browse files Browse the repository at this point in the history
 controller can use incoming and outgoing queue to control nodes
 no need for direct access from controler to serial gateway
 so controller can sit now anywhere

Added queue for messages to nodes; all messages will be dalyed
until a message from the node is received. Node is supposed to wait after
the first transmition for reception

Home automation can now send messages to nodes. Either immediately if
node does not sleep, or when nodes is awake
  • Loading branch information
RalfJL committed Sep 4, 2016
1 parent 25bdc42 commit 7ed7288
Showing 1 changed file with 195 additions and 6 deletions.
201 changes: 195 additions & 6 deletions mqttGateway2.pl
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,12 @@ sub variableTypeToIdx
I_NONCE_RESPONSE
I_HEARTBEAT I_PRESENTATION I_DISCOVER I_DISCOVER_RESPONSE I_HEARTBEAT_RESPONSE
I_LOCKED I_PING I_PONG I_REGISTRATION_REQUEST I_REGISTRATION_RESPONSE I_DEBUG};

sub imessageTypeToIdx {
my $var = shift;
return first { (internalMessageTypesToStr)[$_] eq $var }
0 .. scalar(internalMessageTypesToStr);
}

#-- Sensor types
use enum qw { S_DOOR=0 S_MOTION S_SMOKE S_LIGHT S_DIMMER S_COVER S_TEMP S_HUM S_BARO S_WIND
Expand All @@ -124,7 +130,26 @@ sub variableTypeToIdx
use enum qw { P_STRING=0 P_BYTE P_INT16 P_UINT16 P_LONG32 P_ULONG32 P_CUSTOM P_FLOAT32 };
use constant payloadTypesToStr => qw{ P_STRING P_BYTE P_INT16 P_UINT16 P_LONG32 P_ULONG32 P_CUSTOM P_FLOAT32 };

#-- Message types from controller to node that should be delayed until node is active
# most nodes sleep long and often, so they receive only shortly after they woke up
# messages of these types from the controller are delayed until a new message
# from the sensor is received by the gateway; make sure that the sensor waits a while
# after the first message or you will lose both messages from node and from controller
# you can use the topic2node channel if you do not need an immediate communication with
# a sensor node, like the controller needs, most of the time
# all communication through topic2node will be delayed
my @delayedMessageTypes = qw { I_REBOOT };
my %delayedMessages; # hash to save messages for nodes; nodeId is key


my $topicRoot = '/mySensors'; # Omit trailing slash
my $topicC2N = '/mySensorsC2N'; # queue for controller to send to nodes

# queue for all messages delayed until sensor sends the first package
my $topic2node = '/mySensors2node';

# please make sure the sensor uses wait() after the first message, or the message from
# the sensor and the gateway will both be lost (collision)

my $mqttHost = 'localhost'; # Hostname of MQTT broker
my $mqttPort = 1883; # Port of MQTT broker
Expand Down Expand Up @@ -153,6 +178,8 @@ sub variableTypeToIdx
'parity:s' => \$serialParity,
'stop:i' => \$serialStop,
'root:s' => \$topicRoot,
'topicC2N:s' => \$topicC2N,
'topic2node:s' => \$topic2node,
'mqtthost:s' => \$mqttHost,
'mqttport:i' => \$mqttPort,
'gwhost:s' => \$mysnsHost,
Expand Down Expand Up @@ -180,6 +207,15 @@ sub variableTypeToIdx
print STDERR "\n--- Error: $exit_cause ---\n" if (defined($exit_cause));
PrintHelpAndExit() if (defined($helpme) || defined($exit_cause));

# take care about the wildcard topics; we need to subscribe to all messages
# so we have to add the mqtt wildcard
if ( !( $topicC2N =~ m/\/\#$/ ) ) {
$topicC2N =~ s/[\/]{0,1}$/\/#/;
}
if ( !( $topic2node =~ m/\/\#$/ ) ) {
$topic2node =~ s/[\/]{0,1}$/\/#/;
}

my $retain = 1;
my $qos = MQTT_QOS_AT_LEAST_ONCE;
my $keep_alive_timer = 120;
Expand Down Expand Up @@ -302,15 +338,147 @@ sub createTopic
return sprintf("%s/%d/%d/%s", $topicRoot, $msgRef->{'radioId'}, $msgRef->{'childId'}, $st);
}

# save message for later transmission
sub queueDelayedMessage {
my $msg = shift;
my ($nodeID) = split( ";", $msg );
unshift( @{ $delayedMessages{$nodeID} }, $msg );
return;
}

# do we have to delay the message to a node
sub delayMessage {
my $msgRef = shift;
my $sub = subTypeToStr( $msgRef->{'cmd'}, $msgRef->{'subType'} );
if ( $sub ~~ @delayedMessageTypes ) {
return 1;
}
return 0;
}

# handle messages that will be send to nodes
# message will always be delayed until a new message from the node is recevied
sub onMqtt2node {
my ( $topic, $message ) = @_;
my $ntopic = $topic2node;
chop $ntopic;
if ( $topic !~ s/^@{[$ntopic]}\/?// ) {
return undef;
}
my ( $nodeId, $childId, $msgtype, $sub ) = split( "/", $topic );

# sanity checks
if ( !defined $sub ) { # queue sends only nodeID, childId and sub
$sub = $msgtype;
if ( $sub =~ m/I_.*/ ) {
$msgtype = C_INTERNAL;
}
else {
$msgtype = C_SET;
}
}
$message = "" if !defined $message;
debug(
"Recevied message from node queue: $nodeId;$childId;$msgtype;$sub Payload: \"$message\""
);

# map message type
if ( $sub =~ m/[IV]_.*/ ) { # map sub to int
if ( $sub =~ m/[I]_.*/ ) {
$sub = imessageTypeToIdx($sub);
}
else {
$sub = variableTypeToIdx($sub);
}
}
my $msgRef = {
radioId => int($nodeId),
childId => int($childId),
cmd => int($msgtype),
ack => 0, # No way to tell ack from topic
subType => $sub,
payload => $message
};
return $msgRef;
}

# handle controller messages that will be send to nodes
sub onMqttControler {
my ( $topic, $message ) = @_;
my $ctopic = $topicC2N;
chop $ctopic;
if ( $topic !~ s/^@{[$ctopic]}\/?// ) {
return undef;
}
my ( $nodeId, $childId, $msgtype, $sub ) = split( "/", $topic );

# sanity checks
if ( !defined $sub ) { # controler only sends only nodeID, childId and sub
$sub = $msgtype;
if ( $sub =~ m/I_.*/ ) {
$msgtype = C_INTERNAL;
}
else {
$msgtype = C_SET;
}
}
$message = "" if !defined $message;
debug(
"Recevied message from controller: $nodeId;$childId;$msgtype;$sub Payload: \"$message\""
);

# map message type
if ( $sub =~ m/[IV]_.*/ ) { # map sub to int
if ( $sub =~ m/[I]_.*/ ) {
$sub = imessageTypeToIdx($sub);
}
else {
$sub = variableTypeToIdx($sub);
}
}
my $msgRef = {
radioId => int($nodeId),
childId => int($childId),
cmd => int($msgtype),
ack => 0, # No way to tell ack from topic
subType => $sub,
payload => $message
};
return $msgRef;
}

# Callback when MQTT broker publishes a message
sub onMqttPublish
{
my($topic, $message) = @_;
my $msgRef = parseTopicMessage($topic, $message);
my $mySnsMsg = createMsg($msgRef);
$msgRef->{'raw'} = $mySnsMsg;
dumpMsg("TX", $msgRef, "Publish to gateway '$topic':'$message'");
$gw_handle->push_write("$mySnsMsg\n") if (defined $gw_handle);
my $ctopic = $topicC2N;
my $ntopic = $topic2node;
chop $ctopic; # strip mqtt wildcard
chop $ntopic;
my $delay = 0;
my $msgRef;
if ( $topic =~ m/^$ctopic/ ) {
$msgRef = onMqttControler( $topic, $message );
}
elsif ( $topic =~ m/^$ntopic/ ) {
$delay = 1;
$msgRef = onMqtt2node( $topic, $message );
}
else {
$msgRef = parseTopicMessage( $topic, $message );
}
my $mySnsMsg = createMsg($msgRef);
$msgRef->{'raw'} = $mySnsMsg;
if ( $delay || delayMessage($msgRef) )
{ # delay message because node can not receive at the moment
dumpMsg( "Delayed", $msgRef,
"Delayed until message from node is received" );
queueDelayedMessage($mySnsMsg);
}
else {
dumpMsg( "TX", $msgRef, "Publish to gateway '$topic':'$message'" );
$gw_handle->push_write("$mySnsMsg\n") if ( defined $gw_handle );
}
}

sub subscribe
Expand Down Expand Up @@ -426,11 +594,22 @@ sub handleRead
dumpMsg("RX", $msgRef, "Ignored");
}
}
# check for delayed messages and send
# but send only one message to not collide with messages from node
if ( my $msg = pop( @{ $delayedMessages{ $msgRef->{radioId} } } ) ) {
my $mRef = parseMsg($msg);
dumpMsg( "TX", $mRef, "Publish to node '$msgRef->{radioId}'" );
$gw_handle->push_write("$msg\n") if ( defined $gw_handle );
}
}

sub onCtrlC
{
dolog("\nShutdown requested...");
dolog("unsubscribe to Controller topic \"$topicC2N\"");
unsubscribe($topicC2N);
dolog("unsubscribe to sensors node topic \"$topic2node\"");
unsubscribe($topic2node);
store(\@subscriptions, $subscriptionStorageFile);
exit;
}
Expand Down Expand Up @@ -550,6 +729,14 @@ sub onCtrlC
}
);
}

# subscribe to Controller topic
dolog("subscribe to Controller topic \"$topicC2N\"");
subscribe($topicC2N);

# subscribe to node topic
dolog("subscribe to node topic \"$topic2node\"");
subscribe($topic2node);

# On shutdown active subscriptions will be saved to file.
# Read this file here and restore subscriptions.
Expand Down Expand Up @@ -587,7 +774,7 @@ sub PrintHelpAndExit
Usage: mqttGateway2 [--serial dev [--baud baudrate] [--bits bits] [--parity [none|odd|even]] [--stop numbits]]
[--gwhost hostip [--gwport port]]
[--root mqttroot]
[--root mqttroot] [--topicC2N mqttcontrq] [--topic2node mqttnodeq]
[--mqtthost mqtthost] [--mqttport mqttport]
[--storage file] [--log file]
mqttGateway2 --help
Expand All @@ -605,6 +792,8 @@ sub PrintHelpAndExit
Generic options:
--root MQTT root topic (no trailing slash), defaults to /mySensors
--topicC2N MQTT topic where controller messages are received and sent to nodes (/mySensorsC2N)
--topic2node MQTT topic used to send any message to a sensor node, defaults to /mySensors2node
--mqtthost IP address of MQTT broker, defaults to localhost
--mqttport Port of MQTT broker, defaults to 1883
--storage File used for storing active subscriptions, defaults to /var/run/mqttMySensors/Gateway_xxx
Expand Down

0 comments on commit 7ed7288

Please sign in to comment.