Skip to content

Commit

Permalink
add dontwait option (#174)
Browse files Browse the repository at this point in the history
* add `socket.dontwait()` option

fixes #173

* switch to socket.dontwait to accept bool

* remove extra space inside parens
  • Loading branch information
reqshark authored and nickdesaulniers committed Mar 13, 2017
1 parent 14ce201 commit ca496da
Show file tree
Hide file tree
Showing 4 changed files with 105 additions and 2 deletions.
18 changes: 18 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ nano.socket('bus', { raw: true } );
* `'rcvmaxsize'` *(Number, default: `false`)*: see [`socket.rcvmaxsize(size)`](https://github.com/nickdesaulniers/node-nanomsg#socketrcvmaxsizesize)
* `'chan'` *(Array, default: `['']`)*: see [`socket.chan(Array)`](https://github.com/nickdesaulniers/node-nanomsg#socketchanarray)
* `'wsopt'` *(String, default: `'binary'`)*: see [`socket.wsopt(str)`](https://github.com/nickdesaulniers/node-nanomsg#socketwsoptstr)
* `'dontwait'` *(boolean, default: `true`)*: see [`socket.dontwait(boolean)`](https://github.com/nickdesaulniers/node-nanomsg#socketdontwaitboolean)

### socket.shutdown(address)

Expand Down Expand Up @@ -335,6 +336,23 @@ console.log(socket.wsopt()); // 'text'

If you are implementing nanomsg websockets in the browser, please carefully review the spec: https://raw.githubusercontent.com/nanomsg/nanomsg/master/rfc/sp-websocket-mapping-01.txt

### socket.dontwait(boolean)

*(Function, param: Boolean, default: `true`, except PUSH sockets)*: Sets the NN_DONTWAIT flag, specifying that the operation should be performed in non-blocking mode,

* `true` for non-blocking mode
* `false` for blocking mode

Pass no parameter for the socket's current mode.

```js
socket.dontwait(false);
console.log(socket.dontwait()); // false

// or set when socket is started:
require('nanomsg').socket('pub', { dontwait: false });
```

# test

```bash
Expand Down
23 changes: 21 additions & 2 deletions lib/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ function Socket (type, opts) {
this.protocol = nn.NN_PUSH;
this.sender = true;
this.receiver = false;
// nndontwait otherwise defaults true, PUSH socket default is false
opts.dontwait = opts.dontwait || false;
break;

case 'pull':
Expand Down Expand Up @@ -102,6 +104,9 @@ function Socket (type, opts) {
this.connected = {};
this.queue = [];

/* async I/O option */
this.dontwait('dontwait' in opts ? opts.dontwait : true);

/* subscription filter control */
this.channels = {};

Expand Down Expand Up @@ -181,7 +186,7 @@ Socket.prototype._receive = function () {
return;
}

var msg = nn.Recv(this.binding, nn.NN_DONTWAIT);
var msg = nn.Recv(this.binding, this.nndontwait);

if(this.type === 'surveyor') {
if(msg < 0 && nn.Errno() === nn.EFSM) {
Expand Down Expand Up @@ -236,7 +241,7 @@ Socket.prototype._register = function(chan){
};

Socket.prototype._write = function(buf, _, cb){
this.send(buf, nn.NN_DONTWAIT);
this.send(buf, this.nndontwait);
cb();
}

Expand Down Expand Up @@ -459,6 +464,20 @@ Socket.prototype.rmchan = function() {
};
}

Socket.prototype.dontwait = function (bool) {
if (arguments.length) {
if(bool){
this.nndontwait = nn.NN_DONTWAIT;
return true;
} else {
this.nndontwait = 0;
return false;
}
} else {
return Boolean(this.nndontwait);
}
}

/**
* module API
*/
Expand Down
54 changes: 54 additions & 0 deletions test/pushstress.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
var nano = require('..');
var test = require('tape');

test('push stress', function (t) {
t.plan(4);
var i = 9999;
var pull = nano.socket('pull', {encoding:'utf8'} );
var push = nano.socket('push');
var addr = 'tcp://127.0.0.1:7799';
var msg = 'hello from nanomsg stream api'
pull.connect(addr);
push.bind(addr);
pull.on('data', function(data){
if (data !== msg)
throw new Error('assertion failed');
if (++i === 9998) {
t.equal(push.dontwait(), false);
t.equal(push.nndontwait, 0);
t.equal(pull.dontwait(), true);
t.equal(pull.nndontwait, 1);
push.close();
pull.close();
}
});
while (i--)
push.send(msg);
});

test('set dontwait at init and verify sockopt results', function (t) {
t.plan(4);
var i = 9;
var pull = nano.socket('pull', {encoding:'utf8', dontwait: false} );
var push = nano.socket('push', {dontwait: true});
var addr = 'tcp://127.0.0.1:7899';
var msg = 'hello from nanomsg stream api'
pull.connect(addr);
push.bind(addr);
pull.on('data', function(data){
if (data !== msg)
throw new Error('assertion failed');
if (++i === 8) {
// check nndontwait values set at init and compare assumption w/ sockopt
t.equal(pull.nndontwait, 0);
t.equal(pull.dontwait(), false);

t.equal(push.nndontwait, 1);
t.equal(push.dontwait(), true);
push.close();
pull.close();
}
});
while (i--)
push.send(msg);
});
12 changes: 12 additions & 0 deletions test/sockoptapi.js
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,18 @@ test('sockopt api methods', function(t){
t.equal( sock.wsopt('binary'), true, 'sock.wsopt(binary) sets: binary');
t.equal( sock.wsopt(), 'binary', 'sock.wsopt() gets: binary');

//get default dontwait option for push
t.equal( sock.dontwait(), false, 'sock.dontwait() gets: false');

//set dontwait option
t.equal( sock.dontwait(true), true, 'sock.dontwait(true) sets: true');
t.equal( sock.dontwait(), true, 'sock.dontwait(1) gets: true');

//get default dontwait option for pull
var pull = nano.socket('pull');
t.equal( pull.dontwait(), true, 'sock.dontwait() gets: true');
pull.close();

sock.close();
t.end();
});
Expand Down

0 comments on commit ca496da

Please sign in to comment.