Skip to content

Commit

Permalink
Add: BasicConsumeMessage from multiple consumers
Browse files Browse the repository at this point in the history
Add BasicConsumeMessage() overloads to wait for messages from multiple
consumers, or from all the consumers defined on the Channel object.
  • Loading branch information
alanxz committed Aug 20, 2013
1 parent e9de652 commit e7e701a
Show file tree
Hide file tree
Showing 5 changed files with 239 additions and 91 deletions.
72 changes: 43 additions & 29 deletions src/Channel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -464,15 +464,16 @@ void Channel::BasicPublish(const std::string &exchange_name,
// - connection.clsoe - something really bad happened
const boost::array<boost::uint32_t, 2> PUBLISH_ACK = { { AMQP_BASIC_ACK_METHOD, AMQP_BASIC_RETURN_METHOD } };
amqp_frame_t response;
m_impl->GetMethodOnChannel(channel, response, PUBLISH_ACK);
boost::array<amqp_channel_t, 1> channels = {{ channel }};
m_impl->GetMethodOnChannel(channels, response, PUBLISH_ACK);

if (AMQP_BASIC_RETURN_METHOD == response.payload.method.id)
{
MessageReturnedException message_returned =
m_impl->CreateMessageReturnedException(*(reinterpret_cast<amqp_basic_return_t *>(response.payload.method.decoded)), channel);

const boost::array<boost::uint32_t, 1> BASIC_ACK = { { AMQP_BASIC_ACK_METHOD } };
m_impl->GetMethodOnChannel(channel, response, BASIC_ACK);
m_impl->GetMethodOnChannel(channels, response, BASIC_ACK);
m_impl->ReturnChannel(channel);
m_impl->MaybeReleaseBuffersOnChannel(channel);
throw message_returned;
Expand Down Expand Up @@ -630,47 +631,60 @@ Envelope::ptr_t Channel::BasicConsumeMessage(const std::string &consumer_tag)
return returnval;
}

Envelope::ptr_t Channel::BasicConsumeMessage(const std::vector<std::string> &consumer_tags)
{
Envelope::ptr_t returnval;
BasicConsumeMessage(consumer_tags, returnval);
return returnval;
}

Envelope::ptr_t Channel::BasicConsumeMessage()
{
Envelope::ptr_t returnval;
BasicConsumeMessage(returnval);
return returnval;
}

bool Channel::BasicConsumeMessage(const std::string &consumer_tag, Envelope::ptr_t &message, int timeout)
{
m_impl->CheckIsConnected();
amqp_channel_t channel = m_impl->GetConsumerChannel(consumer_tag);

const boost::array<boost::uint32_t, 2> DELIVER_OR_CANCEL = { { AMQP_BASIC_DELIVER_METHOD,
AMQP_BASIC_CANCEL_METHOD } };
boost::array<amqp_channel_t, 1> channels = {{ channel }};

boost::chrono::microseconds real_timeout = (timeout >= 0 ?
boost::chrono::milliseconds(timeout) :
boost::chrono::microseconds::max());
return m_impl->ConsumeMessageOnChannel(channels, message, timeout);
}

amqp_frame_t deliver;
if (!m_impl->GetMethodOnChannel(channel, deliver, DELIVER_OR_CANCEL, real_timeout))
{
return false;
}

if (AMQP_BASIC_CANCEL_METHOD == deliver.payload.method.id)
{
m_impl->RemoveConsumer(consumer_tag);
m_impl->ReturnChannel(channel);
m_impl->MaybeReleaseBuffersOnChannel(channel);
bool Channel::BasicConsumeMessage(const std::vector<std::string> &consumer_tags,
Envelope::ptr_t &message, int timeout)
{
m_impl->CheckIsConnected();

throw ConsumerCancelledException(consumer_tag);
std::vector<amqp_channel_t> channels;
channels.reserve(consumer_tags.size());

for (std::vector<std::string>::const_iterator it = consumer_tags.begin();
it != consumer_tags.end(); ++it)
{
channels.push_back(m_impl->GetConsumerChannel(*it));
}

amqp_basic_deliver_t *deliver_method = reinterpret_cast<amqp_basic_deliver_t *>(deliver.payload.method.decoded);
return m_impl->ConsumeMessageOnChannel(channels, message, timeout);
}

const std::string exchange((char *)deliver_method->exchange.bytes, deliver_method->exchange.len);
const std::string routing_key((char *)deliver_method->routing_key.bytes, deliver_method->routing_key.len);
const std::string in_consumer_tag((char *)deliver_method->consumer_tag.bytes, deliver_method->consumer_tag.len);
const boost::uint64_t delivery_tag = deliver_method->delivery_tag;
const bool redelivered = (deliver_method->redelivered == 0 ? false : true);
m_impl->MaybeReleaseBuffersOnChannel(channel);
bool Channel::BasicConsumeMessage(Envelope::ptr_t &message, int timeout)
{
m_impl->CheckIsConnected();

BasicMessage::ptr_t content = m_impl->ReadContent(channel);
m_impl->MaybeReleaseBuffersOnChannel(channel);
std::vector<amqp_channel_t> channels = m_impl->GetAllConsumerChannels();

message = Envelope::Create(content, in_consumer_tag, delivery_tag, exchange, redelivered, routing_key, channel);
return true;
if (0 == channels.size())
{
throw ConsumerTagNotFoundException();
}

return m_impl->ConsumeMessageOnChannel(channels, message, timeout);
}

} // namespace AmqpClient
61 changes: 14 additions & 47 deletions src/ChannelImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,18 @@ amqp_channel_t ChannelImpl::GetConsumerChannel(const std::string &consumer_tag)
return it->second;
}

std::vector<amqp_channel_t> ChannelImpl::GetAllConsumerChannels() const
{
std::vector<amqp_channel_t> ret;
for (consumer_map_t::const_iterator it = m_consumer_channel_map.begin();
it != m_consumer_channel_map.end(); ++it)
{
ret.push_back(it->second);
}

return ret;
}

bool ChannelImpl::GetNextFrameFromBroker(amqp_frame_t &frame, boost::chrono::microseconds timeout)
{
struct timeval *tvp = NULL;
Expand Down Expand Up @@ -336,52 +348,6 @@ bool ChannelImpl::GetNextFrameFromBroker(amqp_frame_t &frame, boost::chrono::mic
return true;
}

bool ChannelImpl::GetNextFrameFromBrokerOnChannel(amqp_channel_t channel, amqp_frame_t &frame_out, boost::chrono::microseconds timeout)
{
boost::chrono::steady_clock::time_point end_point;
boost::chrono::microseconds timeout_left = timeout;
if (timeout != boost::chrono::microseconds::max())
{
end_point = boost::chrono::steady_clock::now() + timeout;
}

amqp_frame_t frame;
while (GetNextFrameFromBroker(frame, timeout_left))
{
if (frame.channel == channel)
{
frame_out = frame;
return true;
}

if (frame.channel == 0)
{
// Only thing we care to handle on the channel0 is the connection.close method
if (AMQP_FRAME_METHOD == frame.frame_type &&
AMQP_CONNECTION_CLOSE_METHOD == frame.payload.method.id)
{
FinishCloseConnection();
AmqpException::Throw(*reinterpret_cast<amqp_connection_close_t *>(frame.payload.method.decoded));
}
}
else
{
m_frame_queue.push_back(frame);
}

if (timeout != boost::chrono::microseconds::max())
{
boost::chrono::steady_clock::time_point now = boost::chrono::steady_clock::now();
if (now >= end_point)
{
return false;
}
timeout_left = boost::chrono::duration_cast<boost::chrono::microseconds>(end_point - now);
}
}
return false;
}

bool ChannelImpl::GetNextFrameOnChannel(amqp_channel_t channel, amqp_frame_t &frame, boost::chrono::microseconds timeout)
{
frame_queue_t::iterator it = std::find_if(m_frame_queue.begin(), m_frame_queue.end(),
Expand All @@ -401,7 +367,8 @@ bool ChannelImpl::GetNextFrameOnChannel(amqp_channel_t channel, amqp_frame_t &fr
return true;
}

return GetNextFrameFromBrokerOnChannel(channel, frame, timeout);
boost::array<amqp_channel_t, 1> channels = {{ channel }};
return GetNextFrameFromBrokerOnChannel(channels, frame, timeout);
}

void ChannelImpl::MaybeReleaseBuffersOnChannel(amqp_channel_t channel)
Expand Down
48 changes: 48 additions & 0 deletions src/SimpleAmqpClient/Channel.h
Original file line number Diff line number Diff line change
Expand Up @@ -519,6 +519,24 @@ class SIMPLEAMQPCLIENT_EXPORT Channel : boost::noncopyable
*/
Envelope::ptr_t BasicConsumeMessage(const std::string &consumer_tag);

/**
* Consumes a single message from a list of consumers
* Waits for a single message to be delivered from a list of consumers. This function only works
* after BasicConsume has been called.
*
* @returns the next message delivered from the broker
*/
Envelope::ptr_t BasicConsumeMessage(const std::vector<std::string> &consumer_tags);

/**
* Consumes a single message from any open consumers
* Waits for a message from any consumer open on this Channel object. This function only works
* after BasicConsume has been called.
*
* @returns the next message delivered from the broker
*/
Envelope::ptr_t BasicConsumeMessage();

/**
* Consumes a single message with a timeout (this gets an envelope object)
* Waits for a single Basic message to be Delivered or the timeout to expire.
Expand All @@ -532,6 +550,36 @@ class SIMPLEAMQPCLIENT_EXPORT Channel : boost::noncopyable
*/
bool BasicConsumeMessage(const std::string &consumer_tag, Envelope::ptr_t &envelope, int timeout = -1);

/**
* Consumes a single message with a timeout from a list of consumers
*
* Waits for a single message to be delivered to one of the listed consumer tags to be
* delivered or for the timeout to expire. This function only works after BasicConsume has
* been successfully called.
*
* @param consumer_tags [in] a list of the consumer tags to wait for a message from
* @param envelope [out] the message object that is delivered.
* @param timeout [in] the timeout in milliseconds for the message to be delivered. 0 works
* like a non-blocking read, -1 is an infinite timeout.
* @returns true if a message was delivered before the timeout, false otherwise.
*/
bool BasicConsumeMessage(const std::vector<std::string> &consumer_tags, Envelope::ptr_t &envelope,
int timeout = -1);

/**
* Consumes a single message from any consumers opened for this Channel object
*
* Waits for a single message to be delivered to one of the consumers opened on this Channel
* object to be delivered, or for the timeout to occur. This function only works after BasicConsume
* has been successfully called.
*
* @param envelope [out] the message object that is delivered.
* @param timeout [in] the timeout in milliseconds for the message to be delivered. 0 works
* like a non-blocking read, -1 is an infinite timeout.
* @returns true if a message delivered before the timeout, false otherwise
*/
bool BasicConsumeMessage(Envelope::ptr_t &envelope, int timeout = -1);

protected:
boost::scoped_ptr<Detail::ChannelImpl> m_impl;
};
Expand Down
Loading

0 comments on commit e7e701a

Please sign in to comment.