public abstract class AbstractWriteBehindQueue extends java.lang.Object implements WriteBehind
Modifier and Type | Class and Description |
---|---|
private class |
AbstractWriteBehindQueue.ProcessingThread
Thread this will continuously process the items in the queue.
|
Modifier and Type | Field and Description |
---|---|
private java.util.concurrent.atomic.AtomicBoolean |
busyProcessing |
private java.lang.String |
cacheName |
private CacheWriter |
cacheWriter |
private OperationsFilter |
filter |
private java.util.concurrent.atomic.AtomicLong |
lastProcessing |
private java.util.concurrent.atomic.AtomicLong |
lastWorkDone |
private static java.util.logging.Logger |
LOGGER |
private int |
maxQueueSize |
private long |
maxWriteDelayMs |
private long |
minWriteDelayMs |
private static int |
MS_IN_SEC |
private java.lang.Thread |
processingThread |
private java.util.concurrent.locks.Condition |
queueIsEmpty |
private java.util.concurrent.locks.Condition |
queueIsFull |
private java.util.concurrent.locks.Condition |
queueIsStopped |
private java.util.concurrent.locks.ReentrantReadWriteLock |
queueLock |
private java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock |
queueReadLock |
private java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock |
queueWriteLock |
private int |
rateLimitPerSecond |
private int |
retryAttemptDelaySeconds |
private int |
retryAttempts |
private boolean |
stopped |
private boolean |
stopping |
private boolean |
writeBatching |
private int |
writeBatchSize |
Constructor and Description |
---|
AbstractWriteBehindQueue(CacheConfiguration config)
Create a new write behind queue.
|
Modifier and Type | Method and Description |
---|---|
protected abstract void |
addItem(SingleOperation operation)
Add an item to the write behind queue
|
void |
delete(CacheEntry entry)
Add a delete operation for the given cache entry
|
private int |
determineBatchSize(java.util.List<SingleOperation> quarantined) |
private void |
filterQuarantined(java.util.List<SingleOperation> quarantined) |
private long |
getLastProcessing() |
protected java.lang.Thread |
getProcessingThread()
Backdoor to allow killing the processing thread for testing purposes.
|
abstract long |
getQueueSize()
Gets the best estimate for items in the queue still awaiting processing.
|
private java.lang.String |
getThreadName() |
private boolean |
isStopped() |
private void |
processBatchedOperations(java.util.List<SingleOperation> quarantined) |
private void |
processItems() |
private void |
processQuarantinedItems(java.util.List<SingleOperation> quarantined) |
private void |
processSingleOperation(java.util.List<SingleOperation> quarantined) |
protected abstract java.util.List<SingleOperation> |
quarantineItems()
Quarantine items to be processed.
|
private void |
reassemble(java.util.List<SingleOperation> quarantined) |
protected abstract void |
reinsertUnprocessedItems(java.util.List<SingleOperation> operations)
Reinsert any unfinished operations into the queue.
|
void |
setOperationsFilter(OperationsFilter filter)
Set the operations filter that should be used.
|
void |
start(CacheWriter writer)
Start the write behind queue with a cache writer
|
void |
stop()
Stop the coordinator and all the internal data structures.
|
private void |
waitForQueueSizeToDrop() |
private void |
waitUntilEnoughTimeHasPassed(java.util.List<SingleOperation> quarantined,
int batchSize,
long secondsSinceLastWorkDone) |
private void |
waitUntilEnoughWorkItemsAvailable(java.util.List<SingleOperation> quarantined,
int workSize) |
void |
write(Element element)
Add a write operation for a given element.
|
private static final java.util.logging.Logger LOGGER
private static final int MS_IN_SEC
private final java.lang.String cacheName
private final long minWriteDelayMs
private final long maxWriteDelayMs
private final int rateLimitPerSecond
private final int maxQueueSize
private final boolean writeBatching
private final int writeBatchSize
private final int retryAttempts
private final int retryAttemptDelaySeconds
private final java.lang.Thread processingThread
private final java.util.concurrent.locks.ReentrantReadWriteLock queueLock
private final java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock queueReadLock
private final java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock queueWriteLock
private final java.util.concurrent.locks.Condition queueIsFull
private final java.util.concurrent.locks.Condition queueIsEmpty
private final java.util.concurrent.locks.Condition queueIsStopped
private final java.util.concurrent.atomic.AtomicLong lastProcessing
private final java.util.concurrent.atomic.AtomicLong lastWorkDone
private final java.util.concurrent.atomic.AtomicBoolean busyProcessing
private volatile OperationsFilter filter
private CacheWriter cacheWriter
private boolean stopping
private boolean stopped
public AbstractWriteBehindQueue(CacheConfiguration config)
config
- the configuration for the queueprotected abstract java.util.List<SingleOperation> quarantineItems()
protected abstract void addItem(SingleOperation operation)
operation
- operation to be doneprotected abstract void reinsertUnprocessedItems(java.util.List<SingleOperation> operations)
operations
- list of unfinished operationspublic void start(CacheWriter writer)
start
in interface WriteBehind
writer
- the cache writer that should be used to process the operationsWriteBehind.stop()
public void setOperationsFilter(OperationsFilter filter)
setOperationsFilter
in interface WriteBehind
filter
- the filter that will be used as of nowprivate long getLastProcessing()
private void processItems() throws CacheException
CacheException
private void waitUntilEnoughWorkItemsAvailable(java.util.List<SingleOperation> quarantined, int workSize)
private void waitUntilEnoughTimeHasPassed(java.util.List<SingleOperation> quarantined, int batchSize, long secondsSinceLastWorkDone)
private int determineBatchSize(java.util.List<SingleOperation> quarantined)
private void filterQuarantined(java.util.List<SingleOperation> quarantined)
private void processQuarantinedItems(java.util.List<SingleOperation> quarantined)
private void processBatchedOperations(java.util.List<SingleOperation> quarantined)
private void processSingleOperation(java.util.List<SingleOperation> quarantined)
public void write(Element element)
write
in interface WriteBehind
element
- the element for which a write operation will be added to the write behind queueprivate void waitForQueueSizeToDrop()
public void delete(CacheEntry entry)
delete
in interface WriteBehind
entry
- the cache entry for which a delete operation will be added to the write behind queuepublic void stop() throws CacheException
stop
in interface WriteBehind
CacheException
WriteBehind.start(net.sf.ehcache.writer.CacheWriter)
public abstract long getQueueSize()
getQueueSize
in interface WriteBehind
private boolean isStopped()
private java.lang.String getThreadName()
private void reassemble(java.util.List<SingleOperation> quarantined)
protected java.lang.Thread getProcessingThread()