From e7e701a6cc402c9fbeaf032d1a1d4a6607013ebb Mon Sep 17 00:00:00 2001 From: Alan Antonuk Date: Mon, 19 Aug 2013 17:09:05 -0700 Subject: [PATCH] Add: BasicConsumeMessage from multiple consumers Add BasicConsumeMessage() overloads to wait for messages from multiple consumers, or from all the consumers defined on the Channel object. --- src/Channel.cpp | 72 +++++++++------- src/ChannelImpl.cpp | 61 +++---------- src/SimpleAmqpClient/Channel.h | 48 +++++++++++ src/SimpleAmqpClient/ChannelImpl.h | 132 +++++++++++++++++++++++++---- testing/test_consume.cpp | 17 ++++ 5 files changed, 239 insertions(+), 91 deletions(-) diff --git a/src/Channel.cpp b/src/Channel.cpp index 643faf12..80260816 100644 --- a/src/Channel.cpp +++ b/src/Channel.cpp @@ -464,7 +464,8 @@ void Channel::BasicPublish(const std::string &exchange_name, // - connection.clsoe - something really bad happened const boost::array PUBLISH_ACK = { { AMQP_BASIC_ACK_METHOD, AMQP_BASIC_RETURN_METHOD } }; amqp_frame_t response; - m_impl->GetMethodOnChannel(channel, response, PUBLISH_ACK); + boost::array channels = {{ channel }}; + m_impl->GetMethodOnChannel(channels, response, PUBLISH_ACK); if (AMQP_BASIC_RETURN_METHOD == response.payload.method.id) { @@ -472,7 +473,7 @@ void Channel::BasicPublish(const std::string &exchange_name, m_impl->CreateMessageReturnedException(*(reinterpret_cast(response.payload.method.decoded)), channel); const boost::array BASIC_ACK = { { AMQP_BASIC_ACK_METHOD } }; - m_impl->GetMethodOnChannel(channel, response, BASIC_ACK); + m_impl->GetMethodOnChannel(channels, response, BASIC_ACK); m_impl->ReturnChannel(channel); m_impl->MaybeReleaseBuffersOnChannel(channel); throw message_returned; @@ -630,47 +631,60 @@ Envelope::ptr_t Channel::BasicConsumeMessage(const std::string &consumer_tag) return returnval; } +Envelope::ptr_t Channel::BasicConsumeMessage(const std::vector &consumer_tags) +{ + Envelope::ptr_t returnval; + BasicConsumeMessage(consumer_tags, returnval); + return returnval; +} + +Envelope::ptr_t Channel::BasicConsumeMessage() +{ + Envelope::ptr_t returnval; + BasicConsumeMessage(returnval); + return returnval; +} + bool Channel::BasicConsumeMessage(const std::string &consumer_tag, Envelope::ptr_t &message, int timeout) { m_impl->CheckIsConnected(); amqp_channel_t channel = m_impl->GetConsumerChannel(consumer_tag); - const boost::array DELIVER_OR_CANCEL = { { AMQP_BASIC_DELIVER_METHOD, - AMQP_BASIC_CANCEL_METHOD } }; + boost::array channels = {{ channel }}; - boost::chrono::microseconds real_timeout = (timeout >= 0 ? - boost::chrono::milliseconds(timeout) : - boost::chrono::microseconds::max()); + return m_impl->ConsumeMessageOnChannel(channels, message, timeout); +} - amqp_frame_t deliver; - if (!m_impl->GetMethodOnChannel(channel, deliver, DELIVER_OR_CANCEL, real_timeout)) - { - return false; - } - if (AMQP_BASIC_CANCEL_METHOD == deliver.payload.method.id) - { - m_impl->RemoveConsumer(consumer_tag); - m_impl->ReturnChannel(channel); - m_impl->MaybeReleaseBuffersOnChannel(channel); +bool Channel::BasicConsumeMessage(const std::vector &consumer_tags, + Envelope::ptr_t &message, int timeout) +{ + m_impl->CheckIsConnected(); - throw ConsumerCancelledException(consumer_tag); + std::vector channels; + channels.reserve(consumer_tags.size()); + + for (std::vector::const_iterator it = consumer_tags.begin(); + it != consumer_tags.end(); ++it) + { + channels.push_back(m_impl->GetConsumerChannel(*it)); } - amqp_basic_deliver_t *deliver_method = reinterpret_cast(deliver.payload.method.decoded); + return m_impl->ConsumeMessageOnChannel(channels, message, timeout); +} - const std::string exchange((char *)deliver_method->exchange.bytes, deliver_method->exchange.len); - const std::string routing_key((char *)deliver_method->routing_key.bytes, deliver_method->routing_key.len); - const std::string in_consumer_tag((char *)deliver_method->consumer_tag.bytes, deliver_method->consumer_tag.len); - const boost::uint64_t delivery_tag = deliver_method->delivery_tag; - const bool redelivered = (deliver_method->redelivered == 0 ? false : true); - m_impl->MaybeReleaseBuffersOnChannel(channel); +bool Channel::BasicConsumeMessage(Envelope::ptr_t &message, int timeout) +{ + m_impl->CheckIsConnected(); - BasicMessage::ptr_t content = m_impl->ReadContent(channel); - m_impl->MaybeReleaseBuffersOnChannel(channel); + std::vector channels = m_impl->GetAllConsumerChannels(); - message = Envelope::Create(content, in_consumer_tag, delivery_tag, exchange, redelivered, routing_key, channel); - return true; + if (0 == channels.size()) + { + throw ConsumerTagNotFoundException(); + } + + return m_impl->ConsumeMessageOnChannel(channels, message, timeout); } } // namespace AmqpClient diff --git a/src/ChannelImpl.cpp b/src/ChannelImpl.cpp index d2382793..50b82736 100644 --- a/src/ChannelImpl.cpp +++ b/src/ChannelImpl.cpp @@ -305,6 +305,18 @@ amqp_channel_t ChannelImpl::GetConsumerChannel(const std::string &consumer_tag) return it->second; } +std::vector ChannelImpl::GetAllConsumerChannels() const +{ + std::vector ret; + for (consumer_map_t::const_iterator it = m_consumer_channel_map.begin(); + it != m_consumer_channel_map.end(); ++it) + { + ret.push_back(it->second); + } + + return ret; +} + bool ChannelImpl::GetNextFrameFromBroker(amqp_frame_t &frame, boost::chrono::microseconds timeout) { struct timeval *tvp = NULL; @@ -336,52 +348,6 @@ bool ChannelImpl::GetNextFrameFromBroker(amqp_frame_t &frame, boost::chrono::mic return true; } -bool ChannelImpl::GetNextFrameFromBrokerOnChannel(amqp_channel_t channel, amqp_frame_t &frame_out, boost::chrono::microseconds timeout) -{ - boost::chrono::steady_clock::time_point end_point; - boost::chrono::microseconds timeout_left = timeout; - if (timeout != boost::chrono::microseconds::max()) - { - end_point = boost::chrono::steady_clock::now() + timeout; - } - - amqp_frame_t frame; - while (GetNextFrameFromBroker(frame, timeout_left)) - { - if (frame.channel == channel) - { - frame_out = frame; - return true; - } - - if (frame.channel == 0) - { - // Only thing we care to handle on the channel0 is the connection.close method - if (AMQP_FRAME_METHOD == frame.frame_type && - AMQP_CONNECTION_CLOSE_METHOD == frame.payload.method.id) - { - FinishCloseConnection(); - AmqpException::Throw(*reinterpret_cast(frame.payload.method.decoded)); - } - } - else - { - m_frame_queue.push_back(frame); - } - - if (timeout != boost::chrono::microseconds::max()) - { - boost::chrono::steady_clock::time_point now = boost::chrono::steady_clock::now(); - if (now >= end_point) - { - return false; - } - timeout_left = boost::chrono::duration_cast(end_point - now); - } - } - return false; -} - bool ChannelImpl::GetNextFrameOnChannel(amqp_channel_t channel, amqp_frame_t &frame, boost::chrono::microseconds timeout) { frame_queue_t::iterator it = std::find_if(m_frame_queue.begin(), m_frame_queue.end(), @@ -401,7 +367,8 @@ bool ChannelImpl::GetNextFrameOnChannel(amqp_channel_t channel, amqp_frame_t &fr return true; } - return GetNextFrameFromBrokerOnChannel(channel, frame, timeout); + boost::array channels = {{ channel }}; + return GetNextFrameFromBrokerOnChannel(channels, frame, timeout); } void ChannelImpl::MaybeReleaseBuffersOnChannel(amqp_channel_t channel) diff --git a/src/SimpleAmqpClient/Channel.h b/src/SimpleAmqpClient/Channel.h index 18e822c9..f5b5a992 100644 --- a/src/SimpleAmqpClient/Channel.h +++ b/src/SimpleAmqpClient/Channel.h @@ -519,6 +519,24 @@ class SIMPLEAMQPCLIENT_EXPORT Channel : boost::noncopyable */ Envelope::ptr_t BasicConsumeMessage(const std::string &consumer_tag); + /** + * Consumes a single message from a list of consumers + * Waits for a single message to be delivered from a list of consumers. This function only works + * after BasicConsume has been called. + * + * @returns the next message delivered from the broker + */ + Envelope::ptr_t BasicConsumeMessage(const std::vector &consumer_tags); + + /** + * Consumes a single message from any open consumers + * Waits for a message from any consumer open on this Channel object. This function only works + * after BasicConsume has been called. + * + * @returns the next message delivered from the broker + */ + Envelope::ptr_t BasicConsumeMessage(); + /** * Consumes a single message with a timeout (this gets an envelope object) * Waits for a single Basic message to be Delivered or the timeout to expire. @@ -532,6 +550,36 @@ class SIMPLEAMQPCLIENT_EXPORT Channel : boost::noncopyable */ bool BasicConsumeMessage(const std::string &consumer_tag, Envelope::ptr_t &envelope, int timeout = -1); + /** + * Consumes a single message with a timeout from a list of consumers + * + * Waits for a single message to be delivered to one of the listed consumer tags to be + * delivered or for the timeout to expire. This function only works after BasicConsume has + * been successfully called. + * + * @param consumer_tags [in] a list of the consumer tags to wait for a message from + * @param envelope [out] the message object that is delivered. + * @param timeout [in] the timeout in milliseconds for the message to be delivered. 0 works + * like a non-blocking read, -1 is an infinite timeout. + * @returns true if a message was delivered before the timeout, false otherwise. + */ + bool BasicConsumeMessage(const std::vector &consumer_tags, Envelope::ptr_t &envelope, + int timeout = -1); + + /** + * Consumes a single message from any consumers opened for this Channel object + * + * Waits for a single message to be delivered to one of the consumers opened on this Channel + * object to be delivered, or for the timeout to occur. This function only works after BasicConsume + * has been successfully called. + * + * @param envelope [out] the message object that is delivered. + * @param timeout [in] the timeout in milliseconds for the message to be delivered. 0 works + * like a non-blocking read, -1 is an infinite timeout. + * @returns true if a message delivered before the timeout, false otherwise + */ + bool BasicConsumeMessage(Envelope::ptr_t &envelope, int timeout = -1); + protected: boost::scoped_ptr m_impl; }; diff --git a/src/SimpleAmqpClient/ChannelImpl.h b/src/SimpleAmqpClient/ChannelImpl.h index e1876827..de59e7a4 100644 --- a/src/SimpleAmqpClient/ChannelImpl.h +++ b/src/SimpleAmqpClient/ChannelImpl.h @@ -35,9 +35,11 @@ #include "SimpleAmqpClient/AmqpException.h" #include "SimpleAmqpClient/BasicMessage.h" +#include "SimpleAmqpClient/ConsumerCancelledException.h" #include "SimpleAmqpClient/Envelope.h" #include "SimpleAmqpClient/MessageReturnedException.h" +#include #include #include #include @@ -56,6 +58,7 @@ class ChannelImpl : boost::noncopyable ChannelImpl(); virtual ~ChannelImpl(); + typedef std::vector channel_list_t; typedef std::vector frame_queue_t; typedef std::map channel_map_t; typedef channel_map_t::iterator channel_map_iterator_t; @@ -67,32 +70,81 @@ class ChannelImpl : boost::noncopyable bool IsChannelOpen(amqp_channel_t channel); bool GetNextFrameFromBroker(amqp_frame_t &frame, boost::chrono::microseconds timeout); - bool GetNextFrameFromBrokerOnChannel(amqp_channel_t channel, amqp_frame_t &frame, boost::chrono::microseconds timeout = boost::chrono::microseconds::max()); - bool GetNextFrameOnChannel(amqp_channel_t channel, amqp_frame_t &frame, boost::chrono::microseconds timeout = boost::chrono::microseconds::max()); + + template + bool GetNextFrameFromBrokerOnChannel(const ChannelListType channels, amqp_frame_t &frame_out, + boost::chrono::microseconds timeout = boost::chrono::microseconds::max()) + { + boost::chrono::steady_clock::time_point end_point; + boost::chrono::microseconds timeout_left = timeout; + if (timeout != boost::chrono::microseconds::max()) + { + end_point = boost::chrono::steady_clock::now() + timeout; + } + + amqp_frame_t frame; + while (GetNextFrameFromBroker(frame, timeout_left)) + { + if (channels.end() != std::find(channels.begin(), channels.end(), frame.channel)) + { + frame_out = frame; + return true; + } + + if (frame.channel == 0) + { + // Only thing we care to handle on the channel0 is the connection.close method + if (AMQP_FRAME_METHOD == frame.frame_type && + AMQP_CONNECTION_CLOSE_METHOD == frame.payload.method.id) + { + FinishCloseConnection(); + AmqpException::Throw(*reinterpret_cast(frame.payload.method.decoded)); + } + } + else + { + m_frame_queue.push_back(frame); + } + + if (timeout != boost::chrono::microseconds::max()) + { + boost::chrono::steady_clock::time_point now = boost::chrono::steady_clock::now(); + if (now >= end_point) + { + return false; + } + timeout_left = boost::chrono::duration_cast(end_point - now); + } + } + return false; + } + + bool GetNextFrameOnChannel(amqp_channel_t channel, amqp_frame_t &frame, + boost::chrono::microseconds timeout = boost::chrono::microseconds::max()); static bool is_on_channel(const amqp_frame_t &frame, amqp_channel_t channel) { return channel == frame.channel; } - template - static bool is_expected_method(const amqp_frame_t &frame, amqp_channel_t channel, + template + static bool is_expected_method(const amqp_frame_t &frame, const ChannelListType channels, const ResponseListType &expected_responses) { - return channel == frame.channel && + return channels.end() != std::find(channels.begin(), channels.end(), frame.channel) && AMQP_FRAME_METHOD == frame.frame_type && expected_responses.end() != std::find(expected_responses.begin(), expected_responses.end(), frame.payload.method.id); } - template - bool GetMethodOnChannel(amqp_channel_t channel, amqp_frame_t &frame, + template + bool GetMethodOnChannel(const ChannelListType channels, amqp_frame_t &frame, const ResponseListType &expected_responses, boost::chrono::microseconds timeout = boost::chrono::microseconds::max()) { frame_queue_t::iterator desired_frame = std::find_if(m_frame_queue.begin(), m_frame_queue.end(), - boost::bind(&ChannelImpl::is_expected_method, _1, - channel, expected_responses)); + boost::bind(&ChannelImpl::is_expected_method, _1, + channels, expected_responses)); if (m_frame_queue.end() != desired_frame) { @@ -109,9 +161,9 @@ class ChannelImpl : boost::noncopyable } amqp_frame_t incoming_frame; - while (GetNextFrameFromBrokerOnChannel(channel, incoming_frame, timeout_left)) + while (GetNextFrameFromBrokerOnChannel(channels, incoming_frame, timeout_left)) { - if (is_expected_method(incoming_frame, channel, expected_responses)) + if (is_expected_method(incoming_frame, channels, expected_responses)) { frame = incoming_frame; return true; @@ -119,14 +171,14 @@ class ChannelImpl : boost::noncopyable if (AMQP_FRAME_METHOD == incoming_frame.frame_type && AMQP_CHANNEL_CLOSE_METHOD == incoming_frame.payload.method.id) { - FinishCloseChannel(channel); + FinishCloseChannel(incoming_frame.channel); try { AmqpException::Throw(*reinterpret_cast(incoming_frame.payload.method.decoded)); } catch (AmqpException &) { - MaybeReleaseBuffersOnChannel(channel); + MaybeReleaseBuffersOnChannel(incoming_frame.channel); throw; } } @@ -151,7 +203,9 @@ class ChannelImpl : boost::noncopyable CheckForError(amqp_send_method(m_connection, channel, method_id, decoded)); amqp_frame_t response; - GetMethodOnChannel(channel, response, expected_responses); + boost::array channels = {{ channel }}; + + GetMethodOnChannel(channels, response, expected_responses); return response; } @@ -164,6 +218,51 @@ class ChannelImpl : boost::noncopyable return ret; } + template + bool ConsumeMessageOnChannel(const ChannelListType channels, Envelope::ptr_t &message, int timeout) + { + const boost::array DELIVER_OR_CANCEL = { { AMQP_BASIC_DELIVER_METHOD, + AMQP_BASIC_CANCEL_METHOD } }; + + boost::chrono::microseconds real_timeout = (timeout >= 0 ? + boost::chrono::milliseconds(timeout) : + boost::chrono::microseconds::max()); + + amqp_frame_t deliver; + if (!GetMethodOnChannel(channels, deliver, DELIVER_OR_CANCEL, real_timeout)) + { + return false; + } + + if (AMQP_BASIC_CANCEL_METHOD == deliver.payload.method.id) + { + amqp_basic_cancel_t *cancel_method = reinterpret_cast(deliver.payload.method.decoded); + std::string consumer_tag((char *)cancel_method->consumer_tag.bytes, cancel_method->consumer_tag.len); + + RemoveConsumer(consumer_tag); + ReturnChannel(deliver.channel); + MaybeReleaseBuffersOnChannel(deliver.channel); + + throw ConsumerCancelledException(consumer_tag); + } + + amqp_basic_deliver_t *deliver_method = reinterpret_cast(deliver.payload.method.decoded); + + const std::string exchange((char *)deliver_method->exchange.bytes, deliver_method->exchange.len); + const std::string routing_key((char *)deliver_method->routing_key.bytes, deliver_method->routing_key.len); + const std::string in_consumer_tag((char *)deliver_method->consumer_tag.bytes, deliver_method->consumer_tag.len); + const boost::uint64_t delivery_tag = deliver_method->delivery_tag; + const bool redelivered = (deliver_method->redelivered == 0 ? false : true); + MaybeReleaseBuffersOnChannel(deliver.channel); + + BasicMessage::ptr_t content = ReadContent(deliver.channel); + MaybeReleaseBuffersOnChannel(deliver.channel); + + message = Envelope::Create(content, in_consumer_tag, delivery_tag, exchange, redelivered, routing_key, deliver.channel); + return true; + } + + amqp_channel_t CreateNewChannel(); amqp_channel_t GetNextChannelId(); @@ -180,6 +279,7 @@ class ChannelImpl : boost::noncopyable void AddConsumer(const std::string &consumer_tag, amqp_channel_t channel); amqp_channel_t RemoveConsumer(const std::string &consumer_tag); amqp_channel_t GetConsumerChannel(const std::string &consumer_tag); + std::vector GetAllConsumerChannels() const; void MaybeReleaseBuffersOnChannel(amqp_channel_t channel); void CheckIsConnected(); @@ -192,7 +292,9 @@ class ChannelImpl : boost::noncopyable private: frame_queue_t m_frame_queue; - std::map m_consumer_channel_map; + + typedef std::map consumer_map_t; + consumer_map_t m_consumer_channel_map; enum channel_state_t { CS_Closed = 0, diff --git a/testing/test_consume.cpp b/testing/test_consume.cpp index ce3f5e33..1d5c7fce 100644 --- a/testing/test_consume.cpp +++ b/testing/test_consume.cpp @@ -223,3 +223,20 @@ TEST_F(connected_test, consumer_cancelled_one_message) EXPECT_THROW(channel->BasicConsumeMessage(consumer), ConsumerCancelledException); } + +TEST_F(connected_test, consume_multiple) +{ + std::string queue1 = channel->DeclareQueue(""); + std::string queue2 = channel->DeclareQueue(""); + + std::string Body = "Message 1"; + channel->BasicPublish("", queue1, BasicMessage::Create(Body)); + + + channel->BasicConsume(queue1); + channel->BasicConsume(queue2); + + Envelope::ptr_t env = channel->BasicConsumeMessage(); + + EXPECT_EQ(Body, env->Message()->Body()); +}