public class RMIAsynchronousCacheReplicator extends RMISynchronousCacheReplicator
CacheManager
and Cache
events and propagates those to
CachePeer
peers of the Cache asynchronously.
Updates are guaranteed to be replicated in the order in which they are received.
While much faster in operation than RMISynchronousCacheReplicator
, it does suffer from a number
of problems. Elements, which may be being spooled to DiskStore may stay around in memory because references
are being held to them from EventMessage
s which are queued up. The replication thread runs once
per second, limiting the build up. However a lot of elements can be put into a cache in that time. We do not want
to get an OutOfMemoryError
using distribution in circumstances when it would not happen if we were
just using the DiskStore.
Accordingly, the Element values in EventMessage
s are held by SoftReference
in the queue,
so that they can be discarded if required by the GC to avoid an OutOfMemoryError
. A log message
will be issued on each flush of the queue if there were any forced discards. One problem with GC collection
of SoftReferences is that the VM (JDK1.5 anyway) will do that rather than grow the heap size to the maximum.
The workaround is to either set minimum heap size to the maximum heap size to force heap allocation at start
up, or put up with a few lost messages while the heap grows.Modifier and Type | Class and Description |
---|---|
private class |
RMIAsynchronousCacheReplicator.ReplicationThread
A background daemon thread that writes objects to the file.
|
Modifier and Type | Field and Description |
---|---|
private static org.slf4j.Logger |
LOG |
private int |
maximumBatchSize
The maximum number of Element replication in single RMI message.
|
private int |
replicationInterval
The amount of time the replication thread sleeps after it detects the replicationQueue is empty
before checking again.
|
private java.util.Queue<java.lang.Object> |
replicationQueue
A queue of updates.
|
private java.lang.Thread |
replicationThread
A thread which handles replication, so that replication can take place asynchronously and not hold up the cache
|
replicatePuts, replicatePutsViaCopy, replicateRemovals, replicateUpdates, replicateUpdatesViaCopy, status
Constructor and Description |
---|
RMIAsynchronousCacheReplicator(boolean replicatePuts,
boolean replicatePutsViaCopy,
boolean replicateUpdates,
boolean replicateUpdatesViaCopy,
boolean replicateRemovals,
int replicationInterval,
int maximumBatchSize)
Constructor for internal and subclass use
|
Modifier and Type | Method and Description |
---|---|
protected void |
addToReplicationQueue(RmiEventMessage eventMessage)
Adds a message to the queue.
|
java.lang.Object |
clone()
Creates a clone of this listener.
|
void |
dispose()
Give the replicator a chance to flush the replication queue, then cleanup and free resources when no longer needed
|
private java.util.List<EventMessage> |
extractEventMessages(int limit)
Extracts CacheEventMessages and attempts to get a hard reference to the underlying EventMessage
If an EventMessage has been invalidated due to SoftReference collection of the Element, it is not
propagated.
|
private void |
flushReplicationQueue() |
void |
notifyElementPut(Ehcache cache,
Element element)
Called immediately after an element has been put into the cache.
|
void |
notifyElementRemoved(Ehcache cache,
Element element)
Called immediately after an attempt to remove an element.
|
void |
notifyElementUpdated(Ehcache cache,
Element element)
Called immediately after an element has been put into the cache and the element already
existed in the cache.
|
void |
notifyRemoveAll(Ehcache cache)
Called during
Ehcache.removeAll() to indicate that the all
elements have been removed from the cache in a bulk operation. |
private void |
replicationThreadMain()
RemoteDebugger method for the replicationQueue thread.
|
private void |
writeReplicationQueue()
Gets called once per
replicationInterval . |
alive, isReplicateUpdatesViaCopy, listRemoteCachePeers, notAlive, notifyElementEvicted, notifyElementExpired, replicatePutNotification, replicateRemovalNotification, replicateRemoveAllNotification
private static final org.slf4j.Logger LOG
private final java.lang.Thread replicationThread
private final int replicationInterval
private final int maximumBatchSize
private final java.util.Queue<java.lang.Object> replicationQueue
public RMIAsynchronousCacheReplicator(boolean replicatePuts, boolean replicatePutsViaCopy, boolean replicateUpdates, boolean replicateUpdatesViaCopy, boolean replicateRemovals, int replicationInterval, int maximumBatchSize)
private void replicationThreadMain()
public final void notifyElementPut(Ehcache cache, Element element) throws CacheException
Cache.put(net.sf.ehcache.Element)
method
will block until this method returns.
Implementers may wish to have access to the Element's fields, including value, so the element is provided.
Implementers should be careful not to modify the element. The effect of any modifications is undefined.
This implementation queues the put notification for in-order replication to peers.notifyElementPut
in interface CacheEventListener
notifyElementPut
in class RMISynchronousCacheReplicator
cache
- the cache emitting the notificationelement
- the element which was just put into the cache.CacheException
public final void notifyElementUpdated(Ehcache cache, Element element) throws CacheException
Cache.put(net.sf.ehcache.Element)
method
will block until this method returns.
Implementers may wish to have access to the Element's fields, including value, so the element is provided.
Implementers should be careful not to modify the element. The effect of any modifications is undefined.notifyElementUpdated
in interface CacheEventListener
notifyElementUpdated
in class RMISynchronousCacheReplicator
cache
- the cache emitting the notificationelement
- the element which was just put into the cache.CacheException
public final void notifyElementRemoved(Ehcache cache, Element element) throws CacheException
notifyElementRemoved
in interface CacheEventListener
notifyElementRemoved
in class RMISynchronousCacheReplicator
cache
- the cache emitting the notificationelement
- the element just deleted, or a synthetic element with just the key set if
no element was removed.CacheException
public void notifyRemoveAll(Ehcache cache)
Ehcache.removeAll()
to indicate that the all
elements have been removed from the cache in a bulk operation. The usual
notifyElementRemoved(net.sf.ehcache.Ehcache,net.sf.ehcache.Element)
is not called.
This notification exists because clearing a cache is a special case. It is often
not practical to serially process notifications where potentially millions of elements
have been bulk deleted.notifyRemoveAll
in interface CacheEventListener
notifyRemoveAll
in class RMISynchronousCacheReplicator
cache
- the cache emitting the notificationprotected void addToReplicationQueue(RmiEventMessage eventMessage)
cacheEventMessage
- private void writeReplicationQueue()
replicationInterval
.
Sends accumulated messages in bulk to each peer. i.e. if ther are 100 messages and 1 peer,
1 RMI invocation results, not 100. Also, if a peer is unavailable this is discovered in only 1 try.
Makes a copy of the queue so as not to hold up the enqueue operations.
Any exceptions are caught so that the replication thread does not die, and because errors are expected,
due to peers becoming unavailable.
This method issues warnings for problems that can be fixed with configuration changes.private void flushReplicationQueue()
private java.util.List<EventMessage> extractEventMessages(int limit)
replicationQueueCopy
- public final void dispose()
dispose
in interface CacheEventListener
dispose
in class RMISynchronousCacheReplicator
public java.lang.Object clone() throws java.lang.CloneNotSupportedException
clone
in interface CacheEventListener
clone
in class RMISynchronousCacheReplicator
java.lang.CloneNotSupportedException
- if the listener could not be cloned.