Skip to content

Commit

Permalink
[CELEBORN-1071] Ensure guardedBy is satisfied, fix DCL bugs as well
Browse files Browse the repository at this point in the history
Ensure appropriate lock is held when accessing/mutating state - as marked with `GuardedBy`.
More [here](https://errorprone.info/bugpattern/GuardedBy).

This also fixes [DCL](https://errorprone.info/bugpattern/DoubleCheckedLocking) bugs observed.

Fix bug with locking as identified by error-prone

No

Unit tests

Closes apache#2018 from mridulm/fix-locking-issues-found.

Authored-by: Mridul Muralidharan <mridulatgmail.com>
Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
  • Loading branch information
mridulm authored and pan3793 committed Oct 23, 2023
1 parent 5fb3680 commit e6657e7
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -187,27 +187,25 @@ public ConcurrentHashMap<Long, StreamState> getStreams() {
}

private void startRecycleThread() {
if (recycleThread == null) {
synchronized (lock) {
if (recycleThread == null) {
recycleThread =
new Thread(
() -> {
while (true) {
try {
DelayedStreamId delayedStreamId = recycleStreamIds.take();
cleanResource(delayedStreamId.streamId);
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
}
synchronized (lock) {
if (recycleThread == null) {
recycleThread =
new Thread(
() -> {
while (true) {
try {
DelayedStreamId delayedStreamId = recycleStreamIds.take();
cleanResource(delayedStreamId.streamId);
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
}
},
"recycle-thread");
recycleThread.setDaemon(true);
recycleThread.start();
}
},
"recycle-thread");
recycleThread.setDaemon(true);
recycleThread.start();

logger.info("start stream recycle thread");
}
logger.info("start stream recycle thread");
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,25 +152,26 @@ public void decrementPendingWrites() {
numPendingWrites.decrementAndGet();
}

@GuardedBy("flushLock")
protected void flush(boolean finalFlush) throws IOException {
// flushBuffer == null here means writer already closed
if (flushBuffer != null) {
int numBytes = flushBuffer.readableBytes();
if (numBytes != 0) {
notifier.checkException();
notifier.numPendingFlushes.incrementAndGet();
FlushTask task = null;
if (channel != null) {
task = new LocalFlushTask(flushBuffer, channel, notifier);
} else if (fileInfo.isHdfs()) {
task = new HdfsFlushTask(flushBuffer, fileInfo.getHdfsPath(), notifier);
}
addTask(task);
flushBuffer = null;
fileInfo.updateBytesFlushed(numBytes);
if (!finalFlush) {
takeBuffer();
synchronized (flushLock) {
// flushBuffer == null here means writer already closed
if (flushBuffer != null) {
int numBytes = flushBuffer.readableBytes();
if (numBytes != 0) {
notifier.checkException();
notifier.numPendingFlushes.incrementAndGet();
FlushTask task = null;
if (channel != null) {
task = new LocalFlushTask(flushBuffer, channel, notifier);
} else if (fileInfo.isHdfs()) {
task = new HdfsFlushTask(flushBuffer, fileInfo.getHdfsPath(), notifier);
}
addTask(task);
flushBuffer = null;
fileInfo.updateBytesFlushed(numBytes);
if (!finalFlush) {
takeBuffer();
}
}
}
}
Expand Down Expand Up @@ -372,7 +373,9 @@ protected void takeBuffer() {
}

// real action
flushBuffer = flusher.takeBuffer();
synchronized (flushLock) {
flushBuffer = flusher.takeBuffer();
}

// metrics end
if (source.metricsCollectCriticalEnabled()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,20 +193,30 @@ private void addBuffer(ByteBuf buffer, BufferRecycler bufferRecycler) {
}
}

public synchronized void sendData() {
while (!buffersToSend.isEmpty() && credits.get() > 0) {
RecyclableBuffer wrappedBuffer;
synchronized (lock) {
if (!isReleased) {
wrappedBuffer = buffersToSend.poll();
} else {
return;
}
private RecyclableBuffer fetchBufferToSend() {
synchronized (lock) {
if (!buffersToSend.isEmpty() && credits.get() > 0 && !isReleased) {
return buffersToSend.poll();
} else {
return null;
}
}
}

private int getNumBuffersToSend() {
synchronized (lock) {
return buffersToSend.size();
}
}

int backlog = buffersToSend.size();
public synchronized void sendData() {
RecyclableBuffer buffer;
while (null != (buffer = fetchBufferToSend())) {
final RecyclableBuffer wrappedBuffer = buffer;
int readableBytes = wrappedBuffer.byteBuf.readableBytes();
logger.debug("send data start: {}, {}, {}", streamId, readableBytes, backlog);
if (logger.isDebugEnabled()) {
logger.debug("send data start: {}, {}, {}", streamId, readableBytes, getNumBuffersToSend());
}
ReadData readData = new ReadData(streamId, wrappedBuffer.byteBuf);
associatedChannel
.writeAndFlush(readData)
Expand All @@ -228,7 +238,15 @@ public synchronized void sendData() {
logger.debug("stream {} credit {}", streamId, currentCredit);
}

if (readFinished && buffersToSend.isEmpty()) {
boolean shouldRecycle = false;
synchronized (lock) {
if (isReleased) return;
if (readFinished && buffersToSend.isEmpty()) {
shouldRecycle = true;
}
}

if (shouldRecycle) {
recycle();
}
}
Expand Down Expand Up @@ -358,7 +376,9 @@ private boolean readBuffer(ByteBuf buffer) throws IOException {
return true;
} catch (Throwable throwable) {
logger.error("Failed to read partition file.", throwable);
isReleased = true;
synchronized (lock) {
isReleased = true;
}
throw throwable;
}
}
Expand Down Expand Up @@ -497,6 +517,8 @@ public AtomicInteger getNumInUseBuffers() {
}

public boolean shouldReadData() {
return !isReleased && !readFinished;
synchronized (lock) {
return !isReleased && !readFinished;
}
}
}

0 comments on commit e6657e7

Please sign in to comment.