Skip to content

Commit

Permalink
[Truffle] Implement Queue and SizedQueue#num_waiting.
Browse files Browse the repository at this point in the history
  • Loading branch information
eregon committed Jul 12, 2015
1 parent 56f1479 commit 3416f71
Show file tree
Hide file tree
Showing 4 changed files with 78 additions and 2 deletions.

This file was deleted.

This file was deleted.

38 changes: 38 additions & 0 deletions truffle/src/main/java/org/jruby/truffle/nodes/core/QueueNodes.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
import java.util.EnumSet;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

import org.jruby.truffle.nodes.RubyNode;
import org.jruby.truffle.nodes.cast.BooleanCastWithDefaultNodeGen;
Expand All @@ -22,6 +24,7 @@
import org.jruby.truffle.runtime.core.RubyClass;
import org.jruby.truffle.runtime.object.BasicObjectType;
import org.jruby.truffle.runtime.subsystems.ThreadManager.BlockingActionWithoutGlobalLock;
import org.jruby.util.unsafe.UnsafeHolder;

import com.oracle.truffle.api.CompilerDirectives;
import com.oracle.truffle.api.CompilerDirectives.TruffleBoundary;
Expand Down Expand Up @@ -197,4 +200,39 @@ public Object marshal_dump(RubyBasicObject self) {

}

@CoreMethod(names = "num_waiting")
public abstract static class NumWaitingNode extends CoreMethodArrayArgumentsNode {

private static final long LOCK_FIELD_OFFSET = UnsafeHolder.fieldOffset(LinkedBlockingQueue.class, "takeLock");
private static final long NOT_EMPTY_CONDITION_FIELD_OFFSET = UnsafeHolder.fieldOffset(LinkedBlockingQueue.class, "notEmpty");

public NumWaitingNode(RubyContext context, SourceSection sourceSection) {
super(context, sourceSection);
}

@SuppressWarnings("restriction")
@Specialization
public int num_waiting(RubyBasicObject self) {
final BlockingQueue<Object> queue = getQueue(self);

final LinkedBlockingQueue<Object> linkedBlockingQueue = (LinkedBlockingQueue<Object>) queue;
final ReentrantLock lock = (ReentrantLock) UnsafeHolder.U.getObject(linkedBlockingQueue, LOCK_FIELD_OFFSET);
final Condition notEmptyCondition = (Condition) UnsafeHolder.U.getObject(linkedBlockingQueue, NOT_EMPTY_CONDITION_FIELD_OFFSET);

getContext().getThreadManager().runUntilResult(new BlockingActionWithoutGlobalLock<Boolean>() {
@Override
public Boolean block() throws InterruptedException {
lock.lockInterruptibly();
return SUCCESS;
}
});
try {
return lock.getWaitQueueLength(notEmptyCondition);
} finally {
lock.unlock();
}
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

import org.jruby.runtime.Visibility;
import org.jruby.truffle.nodes.RubyNode;
Expand All @@ -23,6 +25,7 @@
import org.jruby.truffle.runtime.core.RubyClass;
import org.jruby.truffle.runtime.object.BasicObjectType;
import org.jruby.truffle.runtime.subsystems.ThreadManager.BlockingActionWithoutGlobalLock;
import org.jruby.util.unsafe.UnsafeHolder;

import com.oracle.truffle.api.CompilerDirectives;
import com.oracle.truffle.api.dsl.CreateCast;
Expand Down Expand Up @@ -281,4 +284,41 @@ public RubyBasicObject clear(RubyBasicObject self) {

}

@CoreMethod(names = "num_waiting")
public abstract static class NumWaitingNode extends CoreMethodArrayArgumentsNode {

private static final long LOCK_FIELD_OFFSET = UnsafeHolder.fieldOffset(ArrayBlockingQueue.class, "lock");
private static final long NOT_EMPTY_CONDITION_FIELD_OFFSET = UnsafeHolder.fieldOffset(ArrayBlockingQueue.class, "notEmpty");
private static final long NOT_FULL_CONDITION_FIELD_OFFSET = UnsafeHolder.fieldOffset(ArrayBlockingQueue.class, "notFull");

public NumWaitingNode(RubyContext context, SourceSection sourceSection) {
super(context, sourceSection);
}

@SuppressWarnings("restriction")
@Specialization
public int num_waiting(RubyBasicObject self) {
final BlockingQueue<Object> queue = getQueue(self);

final ArrayBlockingQueue<Object> arrayBlockingQueue = (ArrayBlockingQueue<Object>) queue;
final ReentrantLock lock = (ReentrantLock) UnsafeHolder.U.getObject(arrayBlockingQueue, LOCK_FIELD_OFFSET);
final Condition notEmptyCondition = (Condition) UnsafeHolder.U.getObject(arrayBlockingQueue, NOT_EMPTY_CONDITION_FIELD_OFFSET);
final Condition notFullCondition = (Condition) UnsafeHolder.U.getObject(arrayBlockingQueue, NOT_FULL_CONDITION_FIELD_OFFSET);

getContext().getThreadManager().runUntilResult(new BlockingActionWithoutGlobalLock<Boolean>() {
@Override
public Boolean block() throws InterruptedException {
lock.lockInterruptibly();
return SUCCESS;
}
});
try {
return lock.getWaitQueueLength(notEmptyCondition) + lock.getWaitQueueLength(notFullCondition);
} finally {
lock.unlock();
}
}

}

}

0 comments on commit 3416f71

Please sign in to comment.