Skip to content

Commit

Permalink
Keep track of DTLS packet sizes to prevent partial reads.
Browse files Browse the repository at this point in the history
The current use of rtc::FifoBuffer can lead to reading across DTLS packet
boundaries which could cause packets to not being processed correctly.

This CL introduces the new class rtc::BufferQueue and changes the
StreamInterfaceChannel to use it instead of the rtc::FifoBuffer.

BUG=chromium:447431
R=juberti@google.com

Review URL: https://webrtc-codereview.appspot.com/52509004

Cr-Commit-Position: refs/heads/master@{#9254}
  • Loading branch information
fancycode committed May 21, 2015
1 parent a3ba0c7 commit 6f2ef74
Show file tree
Hide file tree
Showing 8 changed files with 245 additions and 26 deletions.
2 changes: 2 additions & 0 deletions webrtc/base/BUILD.gn
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,8 @@ static_library("rtc_base_approved") {
"bitbuffer.h",
"buffer.cc",
"buffer.h",
"bufferqueue.cc",
"bufferqueue.h",
"bytebuffer.cc",
"bytebuffer.h",
"byteorder.h",
Expand Down
2 changes: 2 additions & 0 deletions webrtc/base/base.gyp
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@
'bitbuffer.h',
'buffer.cc',
'buffer.h',
'bufferqueue.cc',
'bufferqueue.h',
'bytebuffer.cc',
'bytebuffer.h',
'byteorder.h',
Expand Down
1 change: 1 addition & 0 deletions webrtc/base/base_tests.gyp
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
'bind_unittest.cc',
'bitbuffer_unittest.cc',
'buffer_unittest.cc',
'bufferqueue_unittest.cc',
'bytebuffer_unittest.cc',
'byteorder_unittest.cc',
'callback_unittest.cc',
Expand Down
80 changes: 80 additions & 0 deletions webrtc/base/bufferqueue.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Copyright 2015 The WebRTC Project Authors. All rights reserved.
*
* Use of this source code is governed by a BSD-style license
* that can be found in the LICENSE file in the root of the source
* tree. An additional intellectual property rights grant can be found
* in the file PATENTS. All contributing project authors may
* be found in the AUTHORS file in the root of the source tree.
*/

#include "webrtc/base/bufferqueue.h"

namespace rtc {

BufferQueue::BufferQueue(size_t capacity, size_t default_size)
: capacity_(capacity), default_size_(default_size) {
}

BufferQueue::~BufferQueue() {
CritScope cs(&crit_);

for (Buffer* buffer : queue_) {
delete buffer;
}
for (Buffer* buffer : free_list_) {
delete buffer;
}
}

size_t BufferQueue::size() const {
CritScope cs(&crit_);
return queue_.size();
}

bool BufferQueue::ReadFront(void* buffer, size_t bytes, size_t* bytes_read) {
CritScope cs(&crit_);
if (queue_.empty()) {
return false;
}

Buffer* packet = queue_.front();
queue_.pop_front();

size_t next_packet_size = packet->size();
if (bytes > next_packet_size) {
bytes = next_packet_size;
}

memcpy(buffer, packet->data(), bytes);
if (bytes_read) {
*bytes_read = bytes;
}
free_list_.push_back(packet);
return true;
}

bool BufferQueue::WriteBack(const void* buffer, size_t bytes,
size_t* bytes_written) {
CritScope cs(&crit_);
if (queue_.size() == capacity_) {
return false;
}

Buffer* packet;
if (!free_list_.empty()) {
packet = free_list_.back();
free_list_.pop_back();
} else {
packet = new Buffer(bytes, default_size_);
}

packet->SetData(static_cast<const uint8_t*>(buffer), bytes);
if (bytes_written) {
*bytes_written = bytes;
}
queue_.push_back(packet);
return true;
}

} // namespace rtc
50 changes: 50 additions & 0 deletions webrtc/base/bufferqueue.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Copyright 2015 The WebRTC Project Authors. All rights reserved.
*
* Use of this source code is governed by a BSD-style license
* that can be found in the LICENSE file in the root of the source
* tree. An additional intellectual property rights grant can be found
* in the file PATENTS. All contributing project authors may
* be found in the AUTHORS file in the root of the source tree.
*/

#ifndef WEBRTC_BASE_BUFFERQUEUE_H_
#define WEBRTC_BASE_BUFFERQUEUE_H_

#include <deque>
#include <vector>

#include "webrtc/base/buffer.h"
#include "webrtc/base/criticalsection.h"

namespace rtc {

class BufferQueue {
public:
// Creates a buffer queue queue with a given capacity and default buffer size.
BufferQueue(size_t capacity, size_t default_size);
~BufferQueue();

// Return number of queued buffers.
size_t size() const;

// ReadFront will only read one buffer at a time and will truncate buffers
// that don't fit in the passed memory.
bool ReadFront(void* data, size_t bytes, size_t* bytes_read);

// WriteBack always writes either the complete memory or nothing.
bool WriteBack(const void* data, size_t bytes, size_t* bytes_written);

private:
size_t capacity_;
size_t default_size_;
std::deque<Buffer*> queue_;
std::vector<Buffer*> free_list_;
mutable CriticalSection crit_; // object lock

DISALLOW_COPY_AND_ASSIGN(BufferQueue);
};

} // namespace rtc

#endif // WEBRTC_BASE_BUFFERQUEUE_H_
86 changes: 86 additions & 0 deletions webrtc/base/bufferqueue_unittest.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* Copyright 2015 The WebRTC Project Authors. All rights reserved.
*
* Use of this source code is governed by a BSD-style license
* that can be found in the LICENSE file in the root of the source
* tree. An additional intellectual property rights grant can be found
* in the file PATENTS. All contributing project authors may
* be found in the AUTHORS file in the root of the source tree.
*/

#include "webrtc/base/bufferqueue.h"
#include "webrtc/base/gunit.h"

namespace rtc {

TEST(BufferQueueTest, TestAll) {
const size_t kSize = 16;
const char in[kSize * 2 + 1] = "0123456789ABCDEFGHIJKLMNOPQRSTUV";
char out[kSize * 2];
size_t bytes;
BufferQueue queue1(1, kSize);
BufferQueue queue2(2, kSize);

// The queue is initially empty.
EXPECT_EQ(0u, queue1.size());
EXPECT_FALSE(queue1.ReadFront(out, kSize, &bytes));

// A write should succeed.
EXPECT_TRUE(queue1.WriteBack(in, kSize, &bytes));
EXPECT_EQ(kSize, bytes);
EXPECT_EQ(1u, queue1.size());

// The queue is full now (only one buffer allowed).
EXPECT_FALSE(queue1.WriteBack(in, kSize, &bytes));
EXPECT_EQ(1u, queue1.size());

// Reading previously written buffer.
EXPECT_TRUE(queue1.ReadFront(out, kSize, &bytes));
EXPECT_EQ(kSize, bytes);
EXPECT_EQ(0, memcmp(in, out, kSize));

// The queue is empty again now.
EXPECT_FALSE(queue1.ReadFront(out, kSize, &bytes));
EXPECT_EQ(0u, queue1.size());

// Reading only returns available data.
EXPECT_TRUE(queue1.WriteBack(in, kSize, &bytes));
EXPECT_EQ(kSize, bytes);
EXPECT_EQ(1u, queue1.size());
EXPECT_TRUE(queue1.ReadFront(out, kSize * 2, &bytes));
EXPECT_EQ(kSize, bytes);
EXPECT_EQ(0, memcmp(in, out, kSize));
EXPECT_EQ(0u, queue1.size());

// Reading maintains buffer boundaries.
EXPECT_TRUE(queue2.WriteBack(in, kSize / 2, &bytes));
EXPECT_EQ(1u, queue2.size());
EXPECT_TRUE(queue2.WriteBack(in + kSize / 2, kSize / 2, &bytes));
EXPECT_EQ(2u, queue2.size());
EXPECT_TRUE(queue2.ReadFront(out, kSize, &bytes));
EXPECT_EQ(kSize / 2, bytes);
EXPECT_EQ(0, memcmp(in, out, kSize / 2));
EXPECT_EQ(1u, queue2.size());
EXPECT_TRUE(queue2.ReadFront(out, kSize, &bytes));
EXPECT_EQ(kSize / 2, bytes);
EXPECT_EQ(0, memcmp(in + kSize / 2, out, kSize / 2));
EXPECT_EQ(0u, queue2.size());

// Reading truncates buffers.
EXPECT_TRUE(queue2.WriteBack(in, kSize / 2, &bytes));
EXPECT_EQ(1u, queue2.size());
EXPECT_TRUE(queue2.WriteBack(in + kSize / 2, kSize / 2, &bytes));
EXPECT_EQ(2u, queue2.size());
// Read first packet partially in too-small buffer.
EXPECT_TRUE(queue2.ReadFront(out, kSize / 4, &bytes));
EXPECT_EQ(kSize / 4, bytes);
EXPECT_EQ(0, memcmp(in, out, kSize / 4));
EXPECT_EQ(1u, queue2.size());
// Remainder of first packet is truncated, reading starts with next packet.
EXPECT_TRUE(queue2.ReadFront(out, kSize, &bytes));
EXPECT_EQ(kSize / 2, bytes);
EXPECT_EQ(0, memcmp(in + kSize / 2, out, kSize / 2));
EXPECT_EQ(0u, queue2.size());
}

} // namespace rtc
32 changes: 20 additions & 12 deletions webrtc/p2p/base/dtlstransportchannel.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

#include "webrtc/p2p/base/common.h"
#include "webrtc/base/buffer.h"
#include "webrtc/base/checks.h"
#include "webrtc/base/dscp.h"
#include "webrtc/base/messagequeue.h"
#include "webrtc/base/sslstreamadapter.h"
Expand All @@ -25,6 +26,10 @@ static const size_t kDtlsRecordHeaderLen = 13;
static const size_t kMaxDtlsPacketLen = 2048;
static const size_t kMinRtpPacketLen = 12;

// Maximum number of pending packets in the queue. Packets are read immediately
// after they have been written, so a capacity of "1" is sufficient.
static const size_t kMaxPendingPackets = 1;

static bool IsDtlsPacket(const char* data, size_t len) {
const uint8* u = reinterpret_cast<const uint8*>(data);
return (len >= kDtlsRecordHeaderLen && (u[0] > 19 && u[0] < 64));
Expand All @@ -34,6 +39,12 @@ static bool IsRtpPacket(const char* data, size_t len) {
return (len >= kMinRtpPacketLen && (u[0] & 0xC0) == 0x80);
}

StreamInterfaceChannel::StreamInterfaceChannel(TransportChannel* channel)
: channel_(channel),
state_(rtc::SS_OPEN),
packets_(kMaxPendingPackets, kMaxDtlsPacketLen) {
}

rtc::StreamResult StreamInterfaceChannel::Read(void* buffer,
size_t buffer_len,
size_t* read,
Expand All @@ -43,7 +54,11 @@ rtc::StreamResult StreamInterfaceChannel::Read(void* buffer,
if (state_ == rtc::SS_OPENING)
return rtc::SR_BLOCK;

return fifo_.Read(buffer, buffer_len, read, error);
if (!packets_.ReadFront(buffer, buffer_len, read)) {
return rtc::SR_BLOCK;
}

return rtc::SR_SUCCESS;
}

rtc::StreamResult StreamInterfaceChannel::Write(const void* data,
Expand All @@ -62,21 +77,15 @@ rtc::StreamResult StreamInterfaceChannel::Write(const void* data,
}

bool StreamInterfaceChannel::OnPacketReceived(const char* data, size_t size) {
// We force a read event here to ensure that we don't overflow our FIFO.
// Under high packet rate this can occur if we wait for the FIFO to post its
// own SE_READ.
bool ret = (fifo_.WriteAll(data, size, NULL, NULL) == rtc::SR_SUCCESS);
// We force a read event here to ensure that we don't overflow our queue.
bool ret = packets_.WriteBack(data, size, NULL);
CHECK(ret) << "Failed to write packet to queue.";
if (ret) {
SignalEvent(this, rtc::SE_READ, 0);
}
return ret;
}

void StreamInterfaceChannel::OnEvent(rtc::StreamInterface* stream,
int sig, int err) {
SignalEvent(this, sig, err);
}

DtlsTransportChannelWrapper::DtlsTransportChannelWrapper(
Transport* transport,
TransportChannelImpl* channel)
Expand Down Expand Up @@ -242,8 +251,7 @@ bool DtlsTransportChannelWrapper::GetRemoteCertificate(
}

bool DtlsTransportChannelWrapper::SetupDtls() {
StreamInterfaceChannel* downward =
new StreamInterfaceChannel(worker_thread_, channel_);
StreamInterfaceChannel* downward = new StreamInterfaceChannel(channel_);

dtls_.reset(rtc::SSLStreamAdapter::Create(downward));
if (!dtls_) {
Expand Down
18 changes: 4 additions & 14 deletions webrtc/p2p/base/dtlstransportchannel.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

#include "webrtc/p2p/base/transportchannelimpl.h"
#include "webrtc/base/buffer.h"
#include "webrtc/base/bufferqueue.h"
#include "webrtc/base/scoped_ptr.h"
#include "webrtc/base/sslstreamadapter.h"
#include "webrtc/base/stream.h"
Expand All @@ -24,15 +25,9 @@ namespace cricket {

// A bridge between a packet-oriented/channel-type interface on
// the bottom and a StreamInterface on the top.
class StreamInterfaceChannel : public rtc::StreamInterface,
public sigslot::has_slots<> {
class StreamInterfaceChannel : public rtc::StreamInterface {
public:
StreamInterfaceChannel(rtc::Thread* owner, TransportChannel* channel)
: channel_(channel),
state_(rtc::SS_OPEN),
fifo_(kFifoSize, owner) {
fifo_.SignalEvent.connect(this, &StreamInterfaceChannel::OnEvent);
}
StreamInterfaceChannel(TransportChannel* channel);

// Push in a packet; this gets pulled out from Read().
bool OnPacketReceived(const char* data, size_t size);
Expand All @@ -46,14 +41,9 @@ class StreamInterfaceChannel : public rtc::StreamInterface,
size_t* written, int* error);

private:
static const size_t kFifoSize = 8192;

// Forward events
virtual void OnEvent(rtc::StreamInterface* stream, int sig, int err);

TransportChannel* channel_; // owned by DtlsTransportChannelWrapper
rtc::StreamState state_;
rtc::FifoBuffer fifo_;
rtc::BufferQueue packets_;

DISALLOW_COPY_AND_ASSIGN(StreamInterfaceChannel);
};
Expand Down

0 comments on commit 6f2ef74

Please sign in to comment.