Two BlockingQueue - dead end
I have a requirement to manipulate two queues atomically and am not sure what the correct synchronization strategy is. This is what I tried:
public class transfer {
BlockingQueue firstQ;
BlockingQueue secondQ;
public moveToSecond() {
synchronized (this){
Object a = firstQ.take();
secondQ.put(a)
}
}
public moveToFirst() {
synchronized(this) {
Object a = secondQ.take();
firstQ.put(a);
}
}
}
Is this the correct pattern? In the moveToSecond () method, if firstQ is empty, the method will wait for firstQ.take (), but it still holds the lock on that object. This will prevent moveToFirst () from moving.
I am confused about releasing a lock while waiting. Does the thread release all locks [both this and BlockedQUeue lock?]? What is the correct pattern to enforce atomicity when dealing with multiple blocking queues?
You are using the correct approach, using a shared mutex to synchronize between both queues. However, to avoid the situation you describe with an empty empty queue, I suggest overriding moveToFirst()
and moveToSecond()
for use poll()
rather than take()
; eg.
public void boolean moveToFirst() {
// Synchronize on simple mutex; could use a Lock here but probably
// not worth the extra dev. effort.
synchronzied(queueLock) {
boolean success;
// Will return immediately, returning null if the queue is empty.
Object o = firstQ.poll();
if (o != null) {
// Put could block if the queue is full. If you're using a bounded
// queue you could use add(Object) instead to avoid any blocking but
// you would need to handle the exception somehow.
secondQ.put(o);
success = true;
} else {
success = false;
}
}
return success;
}
source to share
Another failure condition that you did not mention is if firstQ is not empty and secondQ is full, the element will be removed from firstQ, but there will be no space.
So the only correct way is to use a poll and suggest with timeouts and code to return things before they were before the failure (important!) And then try again at a random time until both polls and offers are successful ...
This is an optimistic approach; effective in normal operation, but rather ineffective when blocking is frequent (average latency depends on the selected timeout)
source to share
You have to use the locking mechanism from java.util.concurrency, for example:
Lock lock = new ReentrantLock();
....
lock.lock();
try {
secondQ.put(firstQ.take());
} finally {
lock.unlock();
}
Do the same for firstQ.put (secondQ.take ()) using the same lock object.
There is no need to use wait / not lowlevel methods for the Object class anymore unless you are writing new concurrency primitives.
source to share
In your code, when a thread is locked on BlockingQueue.take()
, it holds the lock on this
. The lock is not released until the code exiting from which the synchronized block is issued or this.wait()
.
Here I am assuming that moveToFirst()
both moveToSecond()
should block and that your class controls all queue access.
private final BlockingQueue<Object> firstQ = new LinkedBlockingQueue();
private final Semaphore firstSignal = new Semaphore(0);
private final BlockingQueue<Object> secondQ = LinkedBlockingQueue();
private final Semaphore secondSignal = new Semaphore(0);
private final Object monitor = new Object();
public void moveToSecond() {
int moved = 0;
while (moved == 0) {
// bock until someone adds to the queue
firstSignal.aquire();
// attempt to move an item from one queue to another atomically
synchronized (monitor) {
moved = firstQ.drainTo(secondQ, 1);
}
}
}
public void putInFirst(Object object) {
firstQ.put(object);
// notify any blocking threads that the queue has an item
firstSignal.release();
}
You will have similar code for moveToFirst()
and putInSecond()
. while
is only required if some other code can remove items from the queue. If you want a method that gets removed in the queue, waiting for pending moves, it must get the semaphore resolution, and the semaphore must be instantiated as a fair semaphore, so the first thread to call aquire
will be released first:
firstSignal = new Semaphore(0, true);
If you don't want to block moveToFirst()
, you have several options.
- Ask the method to do its job in the
Runnable
posted onExecutor
- Pass a timeout
moveToFirst()
and useBlockingQueue.poll(int, TimeUnit)
- Use
BlockingQueue.drainTo(secondQ, 1)
and modifymoveToFirst()
to return a boolean to indicate if it was successful.
You don't need a semaphore for the above three parameters.
Finally, I doubt the need to make motion an atom. If multiple threads are added or removed from queues, then the watch queue will not be able to determine if it was moveToFirst()
atomic.
source to share