Skip to content

Commit

Permalink
Fix a problem in Thread::Send.
Browse files Browse the repository at this point in the history
Previously if thread A->Send is called on thread B, B->ReceiveSends will be called, which enables an arbitrary thread to invoke calls on B while B is wait for A->Send to return. This caused mutliple problems like issue 3559, 3579.
The fix is to limit B->ReceiveSends to only process requests from A.
Also disallow the worker thread invoking other threads.

BUG=3559
R=juberti@webrtc.org

Review URL: https://webrtc-codereview.appspot.com/15089004

git-svn-id: http://webrtc.googlecode.com/svn/trunk@7290 4adac7df-926f-26a2-2b94-8c16560cd09d
  • Loading branch information
jiayl@webrtc.org committed Sep 24, 2014
1 parent a0ce9fa commit 3987b6d
Show file tree
Hide file tree
Showing 8 changed files with 121 additions and 23 deletions.
9 changes: 1 addition & 8 deletions talk/app/webrtc/peerconnection_unittest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -481,9 +481,8 @@ class PeerConnectionTestClientBase
if (!allocator_factory_) {
return false;
}
audio_thread_.Start();
fake_audio_capture_module_ = FakeAudioCaptureModule::Create(
&audio_thread_);
rtc::Thread::Current());

if (fake_audio_capture_module_ == NULL) {
return false;
Expand Down Expand Up @@ -557,12 +556,6 @@ class PeerConnectionTestClientBase
}

std::string id_;
// Separate thread for executing |fake_audio_capture_module_| tasks. Audio
// processing must not be performed on the same thread as signaling due to
// signaling time constraints and relative complexity of the audio pipeline.
// This is consistent with the video pipeline that us a a separate thread for
// encoding and decoding.
rtc::Thread audio_thread_;

rtc::scoped_refptr<webrtc::PortAllocatorFactoryInterface>
allocator_factory_;
Expand Down
1 change: 1 addition & 0 deletions talk/app/webrtc/peerconnectionfactory.cc
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
#include "talk/media/webrtc/webrtcmediaengine.h"
#include "talk/media/webrtc/webrtcvideodecoderfactory.h"
#include "talk/media/webrtc/webrtcvideoencoderfactory.h"
#include "webrtc/base/bind.h"
#include "webrtc/modules/audio_device/include/audio_device.h"

using rtc::scoped_refptr;
Expand Down
3 changes: 1 addition & 2 deletions talk/app/webrtc/test/peerconnectiontestwrapper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,8 @@ bool PeerConnectionTestWrapper::CreatePc(
return false;
}

audio_thread_.Start();
fake_audio_capture_module_ = FakeAudioCaptureModule::Create(
&audio_thread_);
rtc::Thread::Current());
if (fake_audio_capture_module_ == NULL) {
return false;
}
Expand Down
1 change: 0 additions & 1 deletion talk/app/webrtc/test/peerconnectiontestwrapper.h
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,6 @@ class PeerConnectionTestWrapper
bool video, const webrtc::FakeConstraints& video_constraints);

std::string name_;
rtc::Thread audio_thread_;
rtc::scoped_refptr<webrtc::PortAllocatorFactoryInterface>
allocator_factory_;
rtc::scoped_refptr<webrtc::PeerConnectionInterface> peer_connection_;
Expand Down
6 changes: 6 additions & 0 deletions talk/session/media/channelmanager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,12 @@ void ChannelManager::Construct(MediaEngineInterface* me,
this, &ChannelManager::OnVideoCaptureStateChange);
capture_manager_->SignalCapturerStateChange.connect(
this, &ChannelManager::OnVideoCaptureStateChange);

if (worker_thread_ != rtc::Thread::Current()) {
// Do not allow invoking calls to other threads on the worker thread.
worker_thread_->Invoke<bool>(
rtc::Bind(&rtc::Thread::SetAllowBlockingCalls, worker_thread_, false));
}
}

ChannelManager::~ChannelManager() {
Expand Down
33 changes: 26 additions & 7 deletions webrtc/base/thread.cc
Original file line number Diff line number Diff line change
Expand Up @@ -411,15 +411,12 @@ void Thread::Stop() {
}

void Thread::Send(MessageHandler *phandler, uint32 id, MessageData *pdata) {
AssertBlockingIsAllowedOnCurrentThread();

if (fStop_)
return;

// Sent messages are sent to the MessageHandler directly, in the context
// of "thread", like Win32 SendMessage. If in the right context,
// call the handler directly.

Message msg;
msg.phandler = phandler;
msg.message_id = id;
Expand All @@ -429,6 +426,8 @@ void Thread::Send(MessageHandler *phandler, uint32 id, MessageData *pdata) {
return;
}

AssertBlockingIsAllowedOnCurrentThread();

AutoThread thread;
Thread *current_thread = Thread::Current();
ASSERT(current_thread != NULL); // AutoThread ensures this
Expand All @@ -451,7 +450,9 @@ void Thread::Send(MessageHandler *phandler, uint32 id, MessageData *pdata) {
crit_.Enter();
while (!ready) {
crit_.Leave();
current_thread->ReceiveSends();
// We need to limit "ReceiveSends" to |this| thread to avoid an arbitrary
// thread invoking calls on the current thread.
current_thread->ReceiveSendsFromThread(this);
current_thread->socketserver()->Wait(kForever, false);
waited = true;
crit_.Enter();
Expand All @@ -475,24 +476,42 @@ void Thread::Send(MessageHandler *phandler, uint32 id, MessageData *pdata) {
}

void Thread::ReceiveSends() {
ReceiveSendsFromThread(NULL);
}

void Thread::ReceiveSendsFromThread(const Thread* source) {
// Receive a sent message. Cleanup scenarios:
// - thread sending exits: We don't allow this, since thread can exit
// only via Join, so Send must complete.
// - thread receiving exits: Wakeup/set ready in Thread::Clear()
// - object target cleared: Wakeup/set ready in Thread::Clear()
_SendMessage smsg;

crit_.Enter();
while (!sendlist_.empty()) {
_SendMessage smsg = sendlist_.front();
sendlist_.pop_front();
while (PopSendMessageFromThread(source, &smsg)) {
crit_.Leave();

smsg.msg.phandler->OnMessage(&smsg.msg);

crit_.Enter();
*smsg.ready = true;
smsg.thread->socketserver()->WakeUp();
}
crit_.Leave();
}

bool Thread::PopSendMessageFromThread(const Thread* source, _SendMessage* msg) {
for (std::list<_SendMessage>::iterator it = sendlist_.begin();
it != sendlist_.end(); ++it) {
if (it->thread == source || source == NULL) {
*msg = *it;
sendlist_.erase(it);
return true;
}
}
return false;
}

void Thread::Clear(MessageHandler *phandler, uint32 id,
MessageList* removed) {
CritScope cs(&crit_);
Expand Down
19 changes: 14 additions & 5 deletions webrtc/base/thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,6 @@ class Thread : public MessageQueue {
// See ScopedDisallowBlockingCalls for details.
template <class ReturnT, class FunctorT>
ReturnT Invoke(const FunctorT& functor) {
AssertBlockingIsAllowedOnCurrentThread();
FunctorMessageHandler<ReturnT, FunctorT> handler(functor);
Send(&handler);
return handler.result();
Expand Down Expand Up @@ -210,6 +209,10 @@ class Thread : public MessageQueue {
// of whatever code is conditionally executing because of the return value!
bool RunningForTest() { return running(); }

// Sets the per-thread allow-blocking-calls flag and returns the previous
// value.
bool SetAllowBlockingCalls(bool allow);

protected:
// This method should be called when thread is created using non standard
// method, like derived implementation of rtc::Thread and it can not be
Expand All @@ -226,10 +229,6 @@ class Thread : public MessageQueue {
// Blocks the calling thread until this thread has terminated.
void Join();

// Sets the per-thread allow-blocking-calls flag and returns the previous
// value.
bool SetAllowBlockingCalls(bool allow);

static void AssertBlockingIsAllowedOnCurrentThread();

friend class ScopedDisallowBlockingCalls;
Expand All @@ -248,6 +247,16 @@ class Thread : public MessageQueue {
// Return true if the thread was started and hasn't yet stopped.
bool running() { return running_.Wait(0); }

// Processes received "Send" requests. If |source| is not NULL, only requests
// from |source| are processed, otherwise, all requests are processed.
void ReceiveSendsFromThread(const Thread* source);

// If |source| is not NULL, pops the first "Send" message from |source| in
// |sendlist_|, otherwise, pops the first "Send" message of |sendlist_|.
// The caller must lock |crit_| before calling.
// Returns true if there is such a message.
bool PopSendMessageFromThread(const Thread* source, _SendMessage* msg);

std::list<_SendMessage> sendlist_;
std::string name_;
ThreadPriority priority_;
Expand Down
72 changes: 72 additions & 0 deletions webrtc/base/thread_unittest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,78 @@ TEST(ThreadTest, DISABLED_ON_MAC(Invoke)) {
thread.Invoke<void>(&LocalFuncs::Func2);
}

// Verifies that two threads calling Invoke on each other at the same time does
// not deadlock.
TEST(ThreadTest, TwoThreadsInvokeNoDeadlock) {
AutoThread thread;
Thread* current_thread = Thread::Current();
ASSERT_TRUE(current_thread != NULL);

Thread other_thread;
other_thread.Start();

struct LocalFuncs {
static void Set(bool* out) { *out = true; }
static void InvokeSet(Thread* thread, bool* out) {
thread->Invoke<void>(Bind(&Set, out));
}
};

bool called = false;
other_thread.Invoke<void>(
Bind(&LocalFuncs::InvokeSet, current_thread, &called));

EXPECT_TRUE(called);
}

// Verifies that if thread A invokes a call on thread B and thread C is trying
// to invoke A at the same time, thread A does not handle C's invoke while
// invoking B.
TEST(ThreadTest, ThreeThreadsInvoke) {
AutoThread thread;
Thread* thread_a = Thread::Current();
Thread thread_b, thread_c;
thread_b.Start();
thread_c.Start();

struct LocalFuncs {
static void Set(bool* out) { *out = true; }
static void InvokeSet(Thread* thread, bool* out) {
thread->Invoke<void>(Bind(&Set, out));
}

// Set |out| true and call InvokeSet on |thread|.
static void SetAndInvokeSet(bool* out, Thread* thread, bool* out_inner) {
*out = true;
InvokeSet(thread, out_inner);
}

// Asynchronously invoke SetAndInvokeSet on |thread1| and wait until
// |thread1| starts the call.
static void AsyncInvokeSetAndWait(
Thread* thread1, Thread* thread2, bool* out) {
bool async_invoked = false;

AsyncInvoker invoker;
invoker.AsyncInvoke<void>(
thread1, Bind(&SetAndInvokeSet, &async_invoked, thread2, out));

EXPECT_TRUE_WAIT(async_invoked, 2000);
}
};

bool thread_a_called = false;

// Start the sequence A --(invoke)--> B --(async invoke)--> C --(invoke)--> A.
// Thread B returns when C receives the call and C should be blocked until A
// starts to process messages.
thread_b.Invoke<void>(Bind(&LocalFuncs::AsyncInvokeSetAndWait,
&thread_c, thread_a, &thread_a_called));
EXPECT_FALSE(thread_a_called);

EXPECT_TRUE_WAIT(thread_a_called, 2000);
}

class AsyncInvokeTest : public testing::Test {
public:
void IntCallback(int value) {
Expand Down

0 comments on commit 3987b6d

Please sign in to comment.