npm install nanomsg
var nano = require('nanomsg');
var pub = nano.socket('pub');
var sub = nano.socket('sub');
var addr = 'tcp://127.0.0.1:7789'
pub.bind(addr);
sub.connect(addr);
sub.on('data', function (buf) {
console.log(String(buf));
pub.close();
sub.close();
});
setTimeout(function () {
pub.send("Hello from nanomsg!");
}, 100);
Starts a new socket. The nanomsg socket can bind or connect to multiple heterogeneous endpoints as well as shutdown any of these established links.
'raw'
(Boolean, default:false
): determines the domain of the socket.AF_SP
, the default, creates a standard full-blown SP socket.AF_SP_RAW
family sockets operate over internal network protocols and interfaces. Raw sockets omit the end-to-end functionality found inAF_SP
sockets and thus can be used to implement intermediary devices in SP topologies, see nanomsg docs or consult your man page entrysocket(2)
for more info.
//ex. starting raw sockets
nano.socket('bus', { raw: true } );
'tcpnodelay'
(Boolean, default:false
): seesocket.tcpnodelay(boolean)
'linger'
(Number, default:1000
): seesocket.linger(duration)
'sndbuf'
(Number, size in bytes, default:128kB
): seesocket.sndbuf(size)
'rcvbuf'
(Number, size in bytes, default:128kB
): seesocket.rcvbuf(size)
'sndtimeo'
(Number, default:-1
): seesocket.sndtimeo(duration)
'rcvtimeo'
(Number, default:-1
): seesocket.rcvtimeo(duration)
'reconn'
(Number, default:100
): seesocket.reconn(duration)
'maxreconn'
(Number, default:0
): seesocket.maxreconn(duration)
'sndprio'
(Number, default:0
): seesocket.sndprio(priority)
'rcvprio'
(Number, default:0
): seesocket.rcvprio(priority)
'ipv6'
(Boolean, default:false
): seesocket.ipv6(boolean)
'chan'
(Array, default:['']
): seesocket.chan(Array)
(Function, param: String): Removes an endpoint established by calls to bind()
or connect()
. The nanomsg library will try to deliver any outstanding outbound messages to the endpoint for the time specified by linger
.
socket.shutdown('tcp://127.0.0.1:5555');
(Function, param: String): Adds a local endpoint to the socket. The endpoint can be then used by other applications to connect.
bind()
(or connect()
) may be called multiple times on the same socket thus allowing the socket to communicate with multiple heterogeneous endpoints.
socket.bind('tcp://eth0:5555');
recommend checking your machine's ifconfig
first before using a named interface. ipconfig
on windows.
(Function, param: String): Adds a remote endpoint to the socket. The nanomsg library would then try to connect to the specified remote endpoint.
connect()
(as well as bind()
) may be called multiple times on the same socket thus allowing the socket to communicate with multiple heterogeneous endpoints.
socket.connect('tcp://127.0.0.1:5555');
When connecting over remote TCP allow 100ms
or more depending on round trip time for the operation to complete.
(Function, param: Function): Closes the socket. Any buffered inbound messages that were not yet received by the application will be discarded. The nanomsg library will try to deliver any outstanding outbound messages for the time specified by linger
.
(Function, param: String or Buffer): send a message.
socket.send('hello from nanømsg!');
send(msg)
is automatically invoked during Writable
consumption of some other Readable
stream. In that case a Writable
's pipe()
method can be used to transmit across readable data sources. See example for more detail. The flow of data distributes to endpoint(s) determined by the particular socket type.
var source = require('fs').createReadStream('filename.ext');
source.pipe(socket); //sends each chunk as a msg to socket's particular endpoint
(Function, param order: String, Function): The Readable
stream's on()
function is an event listener that emits 'data'
events. To receive messages, pass the string 'data'
followed a callback containing a single data parameter.
// the default inbound message is a node buffer
// setEncoding sets the message type, use utf8 to receive strings instead.
socket.setEncoding('utf8');
socket.on('data', function (msg) {
console.log(msg); //'hello from nanømsg!'
});
The readable stream's data
event is automatically invoked when piped to a Writable
or Transform
consumer stream. See example for more detail. Here msgprocessor
is a transform you could pipe to a writable or the next transform:
var through = require('through');
var msgprocessor = through(function(msg){
var str = msg; //'hello from nanømsg'
this.queue(str + ' and cheers!');
});
socket.pipe(msgprocessor); //msg transformed to: 'hello from nanømsg and cheers!'
(Function, param: Array of Strings, default: ['']
): Allows for sub sockets
to filter messages based on a prefix. Not applicable to non sub sockets.
By default, all sub sockets are subscribed to the ''
channel. Once you opt
in to filtering on a channel, you are unsubscribed from ''
.
(Function, param: String): Allows for sub sockets to remove channel filters. Not applicable to non sub sockets. This function is variadic; you can pass multiple strings and all will be unfiltered.
If you unsubscribe from the default channel, ''
, without subscribing to any
new channels, your sub socket will stop receiving messages.
(Function, param: Boolean, default: false): When set, disables Nagle’s algorithm. It also disables delaying of TCP acknowledgments. Using this option improves latency at the expense of throughput.
Pass no parameter for current tcp nodelay setting.
//default
console.log(socket.tcpnodelay()); //tcp nodelay: off
socket.tcpnodelay(true); //disabling Nagle's algorithm
console.log(socket.tcpnodelay()); //tcp nodelay: on
(Function, param: Number, default: 1000
): Specifies how long the socket should try to send pending outbound messages after socket.close()
or socket.shutdown()
is called, in milliseconds.
Pass no parameter for the linger duration.
socket.linger(5000);
console.log(socket.linger()); //5000
(Function, param: Number, size in bytes, default: 128kB
): Size of the send buffer, in bytes. To prevent blocking for messages larger than the buffer, exactly one message may be buffered in addition to the data in the send buffer.
Pass no parameter for the socket's send buffer size.
socket.sndbuf(131072);
console.log(socket.sndbuf()); // 131072
(Function, param: Number, size in bytes, default: 128kB
): Size of the receive buffer, in bytes. To prevent blocking for messages larger than the buffer, exactly one message may be buffered in addition to the data in the receive buffer.
Pass no parameter for the socket's receive buffer size.
socket.rcvbuf(20480);
console.log(socket.rcvbuf()); // 20480
(Function, param: Number, default: -1
): The timeout for send operation on the socket, in milliseconds.
Pass no parameter for the socket's send timeout.
socket.sndtimeo(200);
console.log(socket.sndtimeo()); // 200
(Function, param: Number, default: -1
): The timeout for recv operation on the socket, in milliseconds.
Pass no parameter for the socket's recv timeout.
socket.rcvtimeo(50);
console.log(socket.rcvtimeo()); // 50
(Function, param: Number, default: 100
): For connection-based transports such as TCP, this option specifies how long to wait, in milliseconds, when connection is broken before trying to re-establish it. Note that actual reconnect interval may be randomized to some extent to prevent severe reconnection storms.
Pass no parameter for the socket's reconnect
interval.
socket.reconn(600);
console.log(socket.reconn()); // 600
(Function, param: Number, default: 0
): Only to be used in addition to socket.reconn()
. maxreconn()
specifies maximum reconnection interval. On each reconnect attempt, the previous interval is doubled until maxreconn
is reached. Value of zero means that no exponential backoff is performed and reconnect interval is based only on reconn
. If maxreconn
is less than reconn
, it is ignored.
Pass no parameter for the socket's maxreconn
interval.
socket.maxreconn(60000);
console.log(socket.maxreconn()); // 60000
(Function, param: Number, default: 8
): Sets outbound priority for endpoints subsequently added to the socket.
This option has no effect on socket types that send messages to all the peers. However, if the socket type sends each message to a single peer (or a limited set of peers), peers with high priority take precedence over peers with low priority.
Highest priority is 1, lowest is 16. Pass no parameter for the socket's current outbound priority.
socket.sndprio(2);
console.log(socket.sndprio()); // 2
(Function, param: Number, default: 8
): Sets inbound priority for endpoints subsequently added to the socket.
This option has no effect on socket types that are not able to receive messages.
When receiving a message, messages from peer with higher priority are received before messages from peer with lower priority.
Highest priority is 1, lowest is 16. Pass no parameter for the socket's current inbound priority.
socket.rcvprio(10);
console.log(socket.rcvprio()); // 10
(Function, param: Boolean, default: false
): Allows for the use of IPv6 addresses to bind or connect to.
By default, nanomsg only works with IPv4 addresses, and support for IPv6 addresses must explicitly be enabled.
If enabled, both IPv4 and IPv6 addresses can be used.
socket.ipv6(true);
console.log(socket.ipv6()); // true
$ git clone https://github.com/nickdesaulniers/node-nanomsg.git nano
$ cd nano && git submodule update --init
# now you can build the project and run the test suite:
$ make && make check
# or perhaps you'd prefer to use the npm commands instead:
$ npm i
$ npm t
# let's say you switch to another version of node/iojs, you might want to run:
$ make clean && make && make check
# for the super deluxe make clean, rebuild, and test suite:
$ make full
Note: you must git submodule update --init
to initialize the nanomsg repository.
run benchmarks:
$ make perf
for more info how to do that and your own custom comparisons check out: running benchmarks
and if you want you can also run:
$ make bench
:)
Issues and pull requests welcome!
Please run clang-format -style=Mozilla -i <file>
on all C/C++ code.
WIP
MIT