Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

merge bitcoin #16953 #16918 #16917 #16898 #14696 #16888 #16737 #16404 #15687 #16294 : Backport #4610

Merged
merged 10 commits into from
Dec 17, 2021
Prev Previous commit
Next Next commit
Merge bitcoin#16404: qa: Test ZMQ notification after chain reorg
abdfc5e qa: Test ZMQ notification after chain reorg (João Barbosa)
aa2622a qa: Refactor ZMQ test (João Barbosa)
6bc1ff9 doc: Add note regarding ZMQ block notification (João Barbosa)

Pull request description:

Top commit has no ACKs.

Tree-SHA512: b93237adc8c84b3aa72ccc28097090eabcb006cf408083218bebf6fec703bd0de2ded80b6879e77096872e14ba9402a6d3f923b146a54d4c4e41dcb862c3e765
  • Loading branch information
MarcoFalke authored and vijaydasmp committed Dec 15, 2021
commit cf43f40fb469f0c1a88f4714fda13f9f18a4b2e4
4 changes: 3 additions & 1 deletion doc/zmq.md
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,9 @@ using other means such as firewalling.

Note that when the block chain tip changes, a reorganisation may occur
and just the tip will be notified. It is up to the subscriber to
retrieve the chain from the last known block to the new tip.
retrieve the chain from the last known block to the new tip. Also note
that no notification occurs if the tip was in the active chain - this
is the case after calling invalidateblock RPC.

There are several possibilities that ZMQ notification can get lost
during transmission depending on the communication type you are
Expand Down
92 changes: 56 additions & 36 deletions test/functional/interface_zmq.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,10 @@
from test_framework.messages import dashhash
from test_framework.util import (
assert_equal,
connect_nodes,
UdjinM6 marked this conversation as resolved.
Show resolved Hide resolved
hash256,
)

ADDRESS = "tcp://127.0.0.1:28332"

def dashhash_helper(b):
return encode(dashhash(b)[::-1], 'hex_codec').decode('ascii')

Expand Down Expand Up @@ -48,62 +47,59 @@ def skip_test_if_missing_module(self):
self.skip_if_no_py3_zmq()
self.skip_if_no_bitcoind_zmq()

def setup_nodes(self):
def run_test(self):
import zmq
self.ctx = zmq.Context()
try:
self.test_basic()
self.test_reorg()
finally:
# Destroy the ZMQ context.
self.log.debug("Destroying ZMQ context")
self.ctx.destroy(linger=None)

# Initialize ZMQ context and socket.
def test_basic(self):
# All messages are received in the same socket which means
# that this test fails if the publishing order changes.
# Note that the publishing order is not defined in the documentation and
# is subject to change.
self.zmq_context = zmq.Context()
socket = self.zmq_context.socket(zmq.SUB)
import zmq
address = 'tcp://127.0.0.1:28332'
socket = self.ctx.socket(zmq.SUB)
socket.set(zmq.RCVTIMEO, 60000)
socket.connect(ADDRESS)
socket.connect(address)

# Subscribe to all available topics.
self.hashblock = ZMQSubscriber(socket, b"hashblock")
self.hashtx = ZMQSubscriber(socket, b"hashtx")
self.rawblock = ZMQSubscriber(socket, b"rawblock")
self.rawtx = ZMQSubscriber(socket, b"rawtx")

self.extra_args = [
["-zmqpub%s=%s" % (sub.topic.decode(), ADDRESS) for sub in [self.hashblock, self.hashtx, self.rawblock, self.rawtx]],
[]
]
self.add_nodes(self.num_nodes, self.extra_args)
self.start_nodes()
hashblock = ZMQSubscriber(socket, b"hashblock")
hashtx = ZMQSubscriber(socket, b"hashtx")
rawblock = ZMQSubscriber(socket, b"rawblock")
rawtx = ZMQSubscriber(socket, b"rawtx")

def run_test(self):
try:
self._zmq_test()
finally:
# Destroy the ZMQ context.
self.log.debug("Destroying ZMQ context")
self.zmq_context.destroy(linger=None)

def _zmq_test(self):
self.restart_node(0, ["-zmqpub%s=%s" % (sub.topic.decode(), address) for sub in [hashblock, hashtx, rawblock, rawtx]])
connect_nodes(self.nodes[0], 1)
num_blocks = 5
self.log.info("Generate %(n)d blocks (and %(n)d coinbase txes)" % {"n": num_blocks})
genhashes = self.nodes[0].generatetoaddress(num_blocks, ADDRESS_BCRT1_UNSPENDABLE)

self.sync_all()

for x in range(num_blocks):
# Should receive the coinbase txid.
txid = self.hashtx.receive()
txid = hashtx.receive()

# Should receive the coinbase raw transaction.
hex = self.rawtx.receive()
hex = rawtx.receive()
assert_equal(hash256(hex), txid)

# Should receive the generated block hash.
hash = self.hashblock.receive().hex()
hash = hashblock.receive().hex()
assert_equal(genhashes[x], hash)
# The block should only have the coinbase txid.
assert_equal([txid.hex()], self.nodes[1].getblock(hash)["tx"])

# Should receive the generated raw block.
block = self.rawblock.receive()
block = rawblock.receive()
assert_equal(genhashes[x], dashhash_helper(block[:80]))

if self.is_wallet_compiled():
Expand All @@ -112,23 +108,47 @@ def _zmq_test(self):
self.sync_all()

# Should receive the broadcasted txid.
txid = self.hashtx.receive()
txid = hashtx.receive()
assert_equal(payment_txid, txid.hex())

# Should receive the broadcasted raw transaction.
hex = self.rawtx.receive()
hex = rawtx.receive()
assert_equal(payment_txid, hash256(hex).hex())


self.log.info("Test the getzmqnotifications RPC")
assert_equal(self.nodes[0].getzmqnotifications(), [
{"type": "pubhashblock", "address": ADDRESS, "hwm": 1000},
{"type": "pubhashtx", "address": ADDRESS, "hwm": 1000},
{"type": "pubrawblock", "address": ADDRESS, "hwm": 1000},
{"type": "pubrawtx", "address": ADDRESS, "hwm": 1000},
{"type": "pubhashblock", "address": address, "hwm": 1000},
{"type": "pubhashtx", "address": address, "hwm": 1000},
{"type": "pubrawblock", "address": address, "hwm": 1000},
{"type": "pubrawtx", "address": address, "hwm": 1000},
])

assert_equal(self.nodes[1].getzmqnotifications(), [])

def test_reorg(self):
import zmq
address = 'tcp://127.0.0.1:28333'
socket = self.ctx.socket(zmq.SUB)
socket.set(zmq.RCVTIMEO, 60000)
socket.connect(address)
hashblock = ZMQSubscriber(socket, b'hashblock')

# Should only notify the tip if a reorg occurs
self.restart_node(0, ['-zmqpub%s=%s' % (hashblock.topic.decode(), address)])

# Generate 1 block in nodes[0] and receive all notifications
self.nodes[0].generatetoaddress(1, ADDRESS_BCRT1_UNSPENDABLE)
assert_equal(self.nodes[0].getbestblockhash(), hashblock.receive().hex())

# Generate 2 blocks in nodes[1]
self.nodes[1].generatetoaddress(2, ADDRESS_BCRT1_UNSPENDABLE)

# nodes[0] will reorg chain after connecting back nodes[1]
connect_nodes(self.nodes[0], 1)

# Should receive nodes[1] tip
assert_equal(self.nodes[1].getbestblockhash(), hashblock.receive().hex())

if __name__ == '__main__':
ZMQTest().main()