Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement zmq notifications for chainlocked blocks #2899

Merged
merged 2 commits into from
May 8, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 19 additions & 11 deletions contrib/zmq/zmq_sub.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,14 @@ def __init__(self):

self.zmqSubSocket = self.zmqContext.socket(zmq.SUB)
self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "hashblock")
self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "hashchainlock")
self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "hashtx")
self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "hashtxlock")
self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "hashgovernancevote")
self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "hashgovernanceobject")
self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "hashinstantsenddoublespend")
self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "rawblock")
self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "rawchainlock")
self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "rawtx")
self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "rawtxlock")
self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "rawgovernancevote")
Expand All @@ -69,38 +71,44 @@ async def handle(self) :
if topic == b"hashblock":
print('- HASH BLOCK ('+sequence+') -')
print(binascii.hexlify(body).decode("utf-8"))
elif topic == b"hashchainlock":
print('- HASH CHAINLOCK ('+sequence+') -')
print(binascii.hexlify(body).decode("utf-8"))
elif topic == b"hashtx":
print ('- HASH TX ('+sequence+') -')
print(binascii.hexlify(body).decode("utf-8"))
elif topic == b"hashtxlock":
print('- HASH TX LOCK ('+sequence+') -')
print(binascii.hexlify(body).decode("utf-8"))
elif topic == b"hashgovernancevote":
print('- HASH GOVERNANCE VOTE ('+sequence+') -')
print(binascii.hexlify(body).decode("utf-8"))
elif topic == b"hashgovernanceobject":
print('- HASH GOVERNANCE OBJECT ('+sequence+') -')
print(binascii.hexlify(body).decode("utf-8"))
elif topic == b"hashinstantsenddoublespend":
print('- HASH IS DOUBLE SPEND ('+sequence+') -')
print(binascii.hexlify(body).decode("utf-8"))
elif topic == b"rawblock":
print('- RAW BLOCK HEADER ('+sequence+') -')
print(binascii.hexlify(body[:80]).decode("utf-8"))
elif topic == b"rawchainlock":
print('- RAW CHAINLOCK ('+sequence+') -')
print(binascii.hexlify(body[:80]).decode("utf-8"))
elif topic == b"rawtx":
print('- RAW TX ('+sequence+') -')
print(binascii.hexlify(body).decode("utf-8"))
elif topic == b"rawtxlock":
print('- RAW TX LOCK ('+sequence+') -')
print(binascii.hexlify(body).decode("utf-8"))
elif topic == b"rawinstantsenddoublespend":
print('- RAW IS DOUBLE SPEND ('+sequence+') -')
print(binascii.hexlify(body).decode("utf-8"))
elif topic == b"hashgovernancevote":
print('- HASH GOVERNANCE VOTE ('+sequence+') -')
print(binascii.hexlify(body).decode("utf-8"))
elif topic == b"hashgovernanceobject":
print('- HASH GOVERNANCE OBJECT ('+sequence+') -')
print(binascii.hexlify(body).decode("utf-8"))
elif topic == b"rawgovernancevote":
print('- RAW GOVERNANCE VOTE ('+sequence+') -')
print(binascii.hexlify(body).decode("utf-8"))
elif topic == b"rawgovernanceobject":
print('- RAW GOVERNANCE OBJECT ('+sequence+') -')
print(binascii.hexlify(body).decode("utf-8"))
elif topic == b"hashinstantsenddoublespend":
print('- HASH IS DOUBLE SPEND ('+sequence+') -')
elif topic == b"rawinstantsenddoublespend":
print('- RAW IS DOUBLE SPEND ('+sequence+') -')
print(binascii.hexlify(body).decode("utf-8"))
# schedule ourselves to receive the next message
asyncio.ensure_future(self.handle())
Expand Down
30 changes: 19 additions & 11 deletions contrib/zmq/zmq_sub3.4.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,14 @@ def __init__(self):

self.zmqSubSocket = self.zmqContext.socket(zmq.SUB)
self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "hashblock")
self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "hashchainlock")
self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "hashtx")
self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "hashtxlock")
self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "hashgovernancevote")
self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "hashgovernanceobject")
self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "hashinstantsenddoublespend")
self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "rawblock")
self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "rawchainlock")
self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "rawtx")
self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "rawtxlock")
self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "rawgovernancevote")
Expand All @@ -72,38 +74,44 @@ def handle(self) :
if topic == b"hashblock":
print('- HASH BLOCK ('+sequence+') -')
print(binascii.hexlify(body).decode("utf-8"))
elif topic == b"hashchainlock":
print('- HASH CHAINLOCK ('+sequence+') -')
print(binascii.hexlify(body).decode("utf-8"))
elif topic == b"hashtx":
print ('- HASH TX ('+sequence+') -')
print(binascii.hexlify(body).decode("utf-8"))
elif topic == b"hashtxlock":
print('- HASH TX LOCK ('+sequence+') -')
print(binascii.hexlify(body).decode("utf-8"))
elif topic == b"hashgovernancevote":
print('- HASH GOVERNANCE VOTE ('+sequence+') -')
print(binascii.hexlify(body).decode("utf-8"))
elif topic == b"hashgovernanceobject":
print('- HASH GOVERNANCE OBJECT ('+sequence+') -')
print(binascii.hexlify(body).decode("utf-8"))
elif topic == b"hashinstantsenddoublespend":
print('- HASH IS DOUBLE SPEND ('+sequence+') -')
print(binascii.hexlify(body).decode("utf-8"))
elif topic == b"rawblock":
print('- RAW BLOCK HEADER ('+sequence+') -')
print(binascii.hexlify(body[:80]).decode("utf-8"))
elif topic == b"rawchainlock":
print('- RAW CHAINLOCK ('+sequence+') -')
print(binascii.hexlify(body[:80]).decode("utf-8"))
elif topic == b"rawtx":
print('- RAW TX ('+sequence+') -')
print(binascii.hexlify(body).decode("utf-8"))
elif topic == b"rawtxlock":
print('- RAW TX LOCK ('+sequence+') -')
print(binascii.hexlify(body).decode("utf-8"))
elif topic == b"rawinstantsenddoublespend":
print('- RAW IS DOUBLE SPEND ('+sequence+') -')
print(binascii.hexlify(body).decode("utf-8"))
elif topic == b"hashgovernancevote":
print('- HASH GOVERNANCE VOTE ('+sequence+') -')
print(binascii.hexlify(body).decode("utf-8"))
elif topic == b"hashgovernanceobject":
print('- HASH GOVERNANCE OBJECT ('+sequence+') -')
print(binascii.hexlify(body).decode("utf-8"))
elif topic == b"rawgovernancevote":
print('- RAW GOVERNANCE VOTE ('+sequence+') -')
print(binascii.hexlify(body).decode("utf-8"))
elif topic == b"rawgovernanceobject":
print('- RAW GOVERNANCE OBJECT ('+sequence+') -')
print(binascii.hexlify(body).decode("utf-8"))
elif topic == b"hashinstantsenddoublespend":
print('- HASH IS DOUBLE SPEND ('+sequence+') -')
elif topic == b"rawinstantsenddoublespend":
print('- RAW IS DOUBLE SPEND ('+sequence+') -')
print(binascii.hexlify(body).decode("utf-8"))
# schedule ourselves to receive the next message
asyncio.ensure_future(self.handle())
Expand Down
12 changes: 7 additions & 5 deletions doc/zmq.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,18 +56,20 @@ the command line or in the configuration file.

Currently, the following notifications are supported:

-zmqpubhashblock=address
-zmqpubhashchainlock=address
-zmqpubhashtx=address
-zmqpubhashtxlock=address
-zmqpubhashblock=address
-zmqpubhashgovernancevote=address
-zmqpubhashgovernanceobject=address
-zmqpubhashinstantsenddoublespend=address
-zmqpubrawblock=address
-zmqpubrawchainlock=address
-zmqpubrawtx=address
-zmqpubrawtxlock=address
-zmqpubhashgovernancevote=address
-zmqpubhashgovernanceobject=address
-zmqpubrawgovernancevote=address
-zmqpubhashgovernanceobject=address
-zmqpubrawgovernanceobject=address
-zmqpubrawinstantsenddoublespend=address
-zmqpubhashinstantsenddoublespend=address

The socket type is PUB and the address must be a valid ZeroMQ socket
address. The same address can be used in more than one notification.
Expand Down
5 changes: 5 additions & 0 deletions src/zmq/zmqabstractnotifier.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,11 @@ bool CZMQAbstractNotifier::NotifyBlock(const CBlockIndex * /*CBlockIndex*/)
return true;
}

bool CZMQAbstractNotifier::NotifyChainLock(const CBlockIndex * /*CBlockIndex*/)
{
return true;
}

bool CZMQAbstractNotifier::NotifyTransaction(const CTransaction &/*transaction*/)
{
return true;
Expand Down
1 change: 1 addition & 0 deletions src/zmq/zmqabstractnotifier.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ class CZMQAbstractNotifier
virtual void Shutdown() = 0;

virtual bool NotifyBlock(const CBlockIndex *pindex);
virtual bool NotifyChainLock(const CBlockIndex *pindex);
virtual bool NotifyTransaction(const CTransaction &transaction);
virtual bool NotifyTransactionLock(const CTransaction &transaction);
virtual bool NotifyGovernanceVote(const CGovernanceVote &vote);
Expand Down
19 changes: 19 additions & 0 deletions src/zmq/zmqnotificationinterface.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,14 @@ CZMQNotificationInterface* CZMQNotificationInterface::Create()
std::list<CZMQAbstractNotifier*> notifiers;

factories["pubhashblock"] = CZMQAbstractNotifier::Create<CZMQPublishHashBlockNotifier>;
factories["pubhashchainlock"] = CZMQAbstractNotifier::Create<CZMQPublishHashChainLockNotifier>;
factories["pubhashtx"] = CZMQAbstractNotifier::Create<CZMQPublishHashTransactionNotifier>;
factories["pubhashtxlock"] = CZMQAbstractNotifier::Create<CZMQPublishHashTransactionLockNotifier>;
factories["pubhashgovernancevote"] = CZMQAbstractNotifier::Create<CZMQPublishHashGovernanceVoteNotifier>;
factories["pubhashgovernanceobject"] = CZMQAbstractNotifier::Create<CZMQPublishHashGovernanceObjectNotifier>;
factories["pubhashinstantsenddoublespend"] = CZMQAbstractNotifier::Create<CZMQPublishHashInstantSendDoubleSpendNotifier>;
factories["pubrawblock"] = CZMQAbstractNotifier::Create<CZMQPublishRawBlockNotifier>;
factories["pubrawchainlock"] = CZMQAbstractNotifier::Create<CZMQPublishRawChainLockNotifier>;
factories["pubrawtx"] = CZMQAbstractNotifier::Create<CZMQPublishRawTransactionNotifier>;
factories["pubrawtxlock"] = CZMQAbstractNotifier::Create<CZMQPublishRawTransactionLockNotifier>;
factories["pubrawgovernancevote"] = CZMQAbstractNotifier::Create<CZMQPublishRawGovernanceVoteNotifier>;
Expand Down Expand Up @@ -152,6 +154,23 @@ void CZMQNotificationInterface::UpdatedBlockTip(const CBlockIndex *pindexNew, co
}
}

void CZMQNotificationInterface::NotifyChainLock(const CBlockIndex *pindex)
{
for (std::list<CZMQAbstractNotifier*>::iterator i = notifiers.begin(); i!=notifiers.end(); )
{
CZMQAbstractNotifier *notifier = *i;
if (notifier->NotifyChainLock(pindex))
{
i++;
}
else
{
notifier->Shutdown();
i = notifiers.erase(i);
}
}
}

void CZMQNotificationInterface::SyncTransaction(const CTransaction& tx, const CBlockIndex* pindex, int posInBlock)
{
for (std::list<CZMQAbstractNotifier*>::iterator i = notifiers.begin(); i!=notifiers.end(); )
Expand Down
1 change: 1 addition & 0 deletions src/zmq/zmqnotificationinterface.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ class CZMQNotificationInterface : public CValidationInterface
// CValidationInterface
void SyncTransaction(const CTransaction& tx, const CBlockIndex *pindex, int posInBlock) override;
void UpdatedBlockTip(const CBlockIndex *pindexNew, const CBlockIndex *pindexFork, bool fInitialDownload) override;
void NotifyChainLock(const CBlockIndex *pindex) override;
void NotifyTransactionLock(const CTransaction &tx) override;
void NotifyGovernanceVote(const CGovernanceVote& vote) override;
void NotifyGovernanceObject(const CGovernanceObject& object) override;
Expand Down
57 changes: 45 additions & 12 deletions src/zmq/zmqpublishnotifier.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,20 @@

static std::multimap<std::string, CZMQAbstractPublishNotifier*> mapPublishNotifiers;

static const char *MSG_HASHBLOCK = "hashblock";
static const char *MSG_HASHTX = "hashtx";
static const char *MSG_HASHTXLOCK = "hashtxlock";
static const char *MSG_HASHGVOTE = "hashgovernancevote";
static const char *MSG_HASHGOBJ = "hashgovernanceobject";
static const char *MSG_HASHISCON = "hashinstantsenddoublespend";
static const char *MSG_RAWBLOCK = "rawblock";
static const char *MSG_RAWTX = "rawtx";
static const char *MSG_RAWTXLOCK = "rawtxlock";
static const char *MSG_RAWGVOTE = "rawgovernancevote";
static const char *MSG_RAWGOBJ = "rawgovernanceobject";
static const char *MSG_RAWISCON = "rawinstantsenddoublespend";
static const char *MSG_HASHBLOCK = "hashblock";
static const char *MSG_HASHCHAINLOCK = "hashchainlock";
static const char *MSG_HASHTX = "hashtx";
static const char *MSG_HASHTXLOCK = "hashtxlock";
static const char *MSG_HASHGVOTE = "hashgovernancevote";
static const char *MSG_HASHGOBJ = "hashgovernanceobject";
static const char *MSG_HASHISCON = "hashinstantsenddoublespend";
static const char *MSG_RAWBLOCK = "rawblock";
static const char *MSG_RAWCHAINLOCK = "rawchainlock";
static const char *MSG_RAWTX = "rawtx";
static const char *MSG_RAWTXLOCK = "rawtxlock";
static const char *MSG_RAWGVOTE = "rawgovernancevote";
static const char *MSG_RAWGOBJ = "rawgovernanceobject";
static const char *MSG_RAWISCON = "rawinstantsenddoublespend";

// Internal function to send multipart message
static int zmq_send_multipart(void *sock, const void* data, size_t size, ...)
Expand Down Expand Up @@ -159,6 +161,16 @@ bool CZMQPublishHashBlockNotifier::NotifyBlock(const CBlockIndex *pindex)
return SendMessage(MSG_HASHBLOCK, data, 32);
}

bool CZMQPublishHashChainLockNotifier::NotifyChainLock(const CBlockIndex *pindex)
{
uint256 hash = pindex->GetBlockHash();
LogPrint("zmq", "zmq: Publish hashchainlock %s\n", hash.GetHex());
char data[32];
for (unsigned int i = 0; i < 32; i++)
data[31 - i] = hash.begin()[i];
return SendMessage(MSG_HASHCHAINLOCK, data, 32);
}

bool CZMQPublishHashTransactionNotifier::NotifyTransaction(const CTransaction &transaction)
{
uint256 hash = transaction.GetHash();
Expand Down Expand Up @@ -234,6 +246,27 @@ bool CZMQPublishRawBlockNotifier::NotifyBlock(const CBlockIndex *pindex)
return SendMessage(MSG_RAWBLOCK, &(*ss.begin()), ss.size());
}

bool CZMQPublishRawChainLockNotifier::NotifyChainLock(const CBlockIndex *pindex)
{
LogPrint("zmq", "zmq: Publish rawchainlock %s\n", pindex->GetBlockHash().GetHex());

const Consensus::Params& consensusParams = Params().GetConsensus();
CDataStream ss(SER_NETWORK, PROTOCOL_VERSION);
{
LOCK(cs_main);
CBlock block;
if(!ReadBlockFromDisk(block, pindex, consensusParams))
{
zmqError("Can't read block from disk");
return false;
}

ss << block;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't this push the whole block instead of just the header?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, why?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I see 🙈 fixed the description 😊

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't this overkill? I'd assume that listeners already listen for blocks through other notifications, so this would usually generate duplicate data?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The use cases for these "lock" notifications is when you want to ignore "normal" notifications in some way and at the same time you want to avoid querying rpc to get the actual data. For example, if you need smth like "I'm only interested in locked blocks" - use rawchainlock (you don't need rawblock in this case). If instead you want smth like "I'm interested in all blocks and also I want to know when some of them are locked" then you need a rawblock+hashchainlock combination (should be trivial to calculate the hash from the block itself and match them).

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's ping @snogcel to see if I understand these use cases correctly :)

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@UdjinM6 summarized the use cases perfectly - rawchainlock provides a lot of data but can be thought of as a replacement for the hashchainlock + getblock (JSON-RPC) combination you'd see otherwise.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, understood

}

return SendMessage(MSG_RAWCHAINLOCK, &(*ss.begin()), ss.size());
}

bool CZMQPublishRawTransactionNotifier::NotifyTransaction(const CTransaction &transaction)
{
uint256 hash = transaction.GetHash();
Expand Down
12 changes: 12 additions & 0 deletions src/zmq/zmqpublishnotifier.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,12 @@ class CZMQPublishHashBlockNotifier : public CZMQAbstractPublishNotifier
bool NotifyBlock(const CBlockIndex *pindex) override;
};

class CZMQPublishHashChainLockNotifier : public CZMQAbstractPublishNotifier
{
public:
bool NotifyChainLock(const CBlockIndex *pindex) override;
};

class CZMQPublishHashTransactionNotifier : public CZMQAbstractPublishNotifier
{
public:
Expand Down Expand Up @@ -72,6 +78,12 @@ class CZMQPublishRawBlockNotifier : public CZMQAbstractPublishNotifier
bool NotifyBlock(const CBlockIndex *pindex) override;
};

class CZMQPublishRawChainLockNotifier : public CZMQAbstractPublishNotifier
{
public:
bool NotifyChainLock(const CBlockIndex *pindex) override;
};

class CZMQPublishRawTransactionNotifier : public CZMQAbstractPublishNotifier
{
public:
Expand Down