public class ReceiveQueueBuffer
extends java.lang.Object
Synchronization strategy: - Threads must hold the TaskSpawnSyncPoint object monitor to spawn a new task or modify the number of inflight tasks - Threads must hold the monitor of the "futures" list to modify the list - Threads must hold the monitor of the "finishedTasks" list to modify the list - If you need to lock both futures and finishedTasks, lock futures first and finishedTasks second
Modifier and Type | Class and Description |
---|---|
private static interface |
ReceiveQueueBuffer.Predicate<T>
Simple interface to represent a condition
|
private class |
ReceiveQueueBuffer.ReceiveMessageBatchTask
Task to receive messages from SQS.
|
private class |
ReceiveQueueBuffer.ReceiveMessageFuture |
Modifier and Type | Field and Description |
---|---|
private long |
bufferCounter |
private QueueBufferConfig |
config |
private java.util.concurrent.Executor |
executor |
private java.util.LinkedList<ReceiveQueueBuffer.ReceiveMessageBatchTask> |
finishedTasks
finished batches are stored in this list.
|
private java.util.LinkedList<ReceiveQueueBuffer.ReceiveMessageFuture> |
futures
message delivery futures we gave out
|
private int |
inflightReceiveMessageBatches
Used as permits controlling the number of in flight receive batches.
|
private static org.apache.commons.logging.Log |
log |
private java.lang.String |
qUrl |
(package private) boolean |
shutDown
shutdown buffer does not retrieve any more messages from sqs
|
private AmazonSQS |
sqsClient |
private java.lang.Object |
taskSpawnSyncPoint
synchronize on this object to create new receive batches or modify inflight message count
|
private long |
visibilityTimeoutNanos
This buffer's queue visibility timeout.
|
Constructor and Description |
---|
ReceiveQueueBuffer(AmazonSQS paramSQS,
java.util.concurrent.Executor paramExecutor,
QueueBufferConfig paramConfig,
java.lang.String url) |
Modifier and Type | Method and Description |
---|---|
void |
clear()
Clears and nacks any pre-fetched messages in this buffer.
|
private void |
fufillFuture(ReceiveQueueBuffer.ReceiveMessageFuture future)
Fills the future with whatever results were received by the full batch currently at the head
of the completed batch queue.
|
private ReceiveQueueBuffer.ReceiveMessageFuture |
issueFuture(int size,
QueueBufferCallback<ReceiveMessageRequest,ReceiveMessageResult> callback)
Creates and returns a new future object.
|
private void |
pruneExpiredTasks()
Prune any expired tasks that do not have an exception associated with them.
|
private int |
pruneHeadTasks(ReceiveQueueBuffer.Predicate<ReceiveQueueBuffer.ReceiveMessageBatchTask> pruneCondition)
Prune all tasks at the beginning of the finishedTasks list that meet the given condition.
|
QueueBufferFuture<ReceiveMessageRequest,ReceiveMessageResult> |
receiveMessageAsync(ReceiveMessageRequest rq,
QueueBufferCallback<ReceiveMessageRequest,ReceiveMessageResult> callback)
Submits the request for retrieval of messages from the queue and returns a future that will
be signalled when the request is satisfied.
|
(package private) void |
reportBatchFinished(ReceiveQueueBuffer.ReceiveMessageBatchTask batch)
This method is called by the batches after they have finished retrieving the messages.
|
private void |
satisfyFuturesFromBuffer()
Attempts to satisfy some or all of the already-issued futures from the local buffer.
|
void |
shutdown()
Prevents spawning of new retrieval batches and waits for all in-flight retrieval batches to
finish
|
private void |
spawnMoreReceiveTasks()
maybe create more receive tasks.
|
private static org.apache.commons.logging.Log log
private final QueueBufferConfig config
private final java.lang.String qUrl
private final java.util.concurrent.Executor executor
private final AmazonSQS sqsClient
private long bufferCounter
private volatile long visibilityTimeoutNanos
receiveMessage
call. Synchronized by receiveMessageLock
. -1
indicates that the time is uninitialized.private volatile int inflightReceiveMessageBatches
taskSpawnSyncPoint
.private final java.lang.Object taskSpawnSyncPoint
volatile boolean shutDown
private final java.util.LinkedList<ReceiveQueueBuffer.ReceiveMessageFuture> futures
private java.util.LinkedList<ReceiveQueueBuffer.ReceiveMessageBatchTask> finishedTasks
ReceiveQueueBuffer(AmazonSQS paramSQS, java.util.concurrent.Executor paramExecutor, QueueBufferConfig paramConfig, java.lang.String url)
public void shutdown()
public QueueBufferFuture<ReceiveMessageRequest,ReceiveMessageResult> receiveMessageAsync(ReceiveMessageRequest rq, QueueBufferCallback<ReceiveMessageRequest,ReceiveMessageResult> callback)
private ReceiveQueueBuffer.ReceiveMessageFuture issueFuture(int size, QueueBufferCallback<ReceiveMessageRequest,ReceiveMessageResult> callback)
private void satisfyFuturesFromBuffer()
private void fufillFuture(ReceiveQueueBuffer.ReceiveMessageFuture future)
private void pruneExpiredTasks()
private int pruneHeadTasks(ReceiveQueueBuffer.Predicate<ReceiveQueueBuffer.ReceiveMessageBatchTask> pruneCondition)
pruneCondition
- Condition on whether a task is eligible to be prunedprivate void spawnMoreReceiveTasks()
void reportBatchFinished(ReceiveQueueBuffer.ReceiveMessageBatchTask batch)
public void clear()