Class MultiProducerSequencer
- java.lang.Object
-
- com.lmax.disruptor.AbstractSequencer
-
- com.lmax.disruptor.MultiProducerSequencer
-
public final class MultiProducerSequencer extends AbstractSequencer
Coordinator for claiming sequences for access to a data structure while tracking dependent
Sequence
s. Suitable for use for sequencing across multiple publisher threads.* Note on
Cursored.getCursor()
: With this sequencer the cursor value is updated after the call toSequenced.next()
, to determine the highest available sequence that can be read, thenSequencer.getHighestPublishedSequence(long, long)
should be used.
-
-
Field Summary
Fields Modifier and Type Field Description private int[]
availableBuffer
private static long
BASE
private Sequence
gatingSequenceCache
private int
indexMask
private int
indexShift
private static long
SCALE
private static sun.misc.Unsafe
UNSAFE
-
Fields inherited from class com.lmax.disruptor.AbstractSequencer
bufferSize, cursor, gatingSequences, waitStrategy
-
Fields inherited from interface com.lmax.disruptor.Sequencer
INITIAL_CURSOR_VALUE
-
-
Constructor Summary
Constructors Constructor Description MultiProducerSequencer(int bufferSize, WaitStrategy waitStrategy)
Construct a Sequencer with the selected wait strategy and buffer size.
-
Method Summary
All Methods Instance Methods Concrete Methods Modifier and Type Method Description private int
calculateAvailabilityFlag(long sequence)
private int
calculateIndex(long sequence)
void
claim(long sequence)
Claim a specific sequence.long
getHighestPublishedSequence(long lowerBound, long availableSequence)
Get the highest sequence number that can be safely read from the ring buffer.boolean
hasAvailableCapacity(int requiredCapacity)
Has the buffer got capacity to allocate another sequence.private boolean
hasAvailableCapacity(Sequence[] gatingSequences, int requiredCapacity, long cursorValue)
private void
initialiseAvailableBuffer()
boolean
isAvailable(long sequence)
Confirms if a sequence is published and the event is available for use; non-blocking.long
next()
Claim the next event in sequence for publishing.long
next(int n)
Claim the next n events in sequence for publishing.void
publish(long sequence)
Publishes a sequence.void
publish(long lo, long hi)
Batch publish sequences.long
remainingCapacity()
Get the remaining capacity for this sequencer.private void
setAvailable(long sequence)
The below methods work on the availableBuffer flag.private void
setAvailableBufferValue(int index, int flag)
long
tryNext()
Attempt to claim the next event in sequence for publishing.long
tryNext(int n)
Attempt to claim the next n events in sequence for publishing.-
Methods inherited from class com.lmax.disruptor.AbstractSequencer
addGatingSequences, getBufferSize, getCursor, getMinimumSequence, newBarrier, newPoller, removeGatingSequence, toString
-
-
-
-
Field Detail
-
UNSAFE
private static final sun.misc.Unsafe UNSAFE
-
BASE
private static final long BASE
-
SCALE
private static final long SCALE
-
gatingSequenceCache
private final Sequence gatingSequenceCache
-
availableBuffer
private final int[] availableBuffer
-
indexMask
private final int indexMask
-
indexShift
private final int indexShift
-
-
Constructor Detail
-
MultiProducerSequencer
public MultiProducerSequencer(int bufferSize, WaitStrategy waitStrategy)
Construct a Sequencer with the selected wait strategy and buffer size.- Parameters:
bufferSize
- the size of the buffer that this will sequence over.waitStrategy
- for those waiting on sequences.
-
-
Method Detail
-
hasAvailableCapacity
public boolean hasAvailableCapacity(int requiredCapacity)
Description copied from interface:Sequenced
Has the buffer got capacity to allocate another sequence. This is a concurrent method so the response should only be taken as an indication of available capacity.- Parameters:
requiredCapacity
- in the buffer- Returns:
- true if the buffer has the capacity to allocate the next sequence otherwise false.
- See Also:
Sequenced.hasAvailableCapacity(int)
-
hasAvailableCapacity
private boolean hasAvailableCapacity(Sequence[] gatingSequences, int requiredCapacity, long cursorValue)
-
claim
public void claim(long sequence)
Description copied from interface:Sequencer
Claim a specific sequence. Only used if initialising the ring buffer to a specific value.- Parameters:
sequence
- The sequence to initialise too.- See Also:
Sequencer.claim(long)
-
next
public long next()
Description copied from interface:Sequenced
Claim the next event in sequence for publishing.- Returns:
- the claimed sequence value
- See Also:
Sequenced.next()
-
next
public long next(int n)
Description copied from interface:Sequenced
Claim the next n events in sequence for publishing. This is for batch event producing. Using batch producing requires a little care and some math.int n = 10; long hi = sequencer.next(n); long lo = hi - (n - 1); for (long sequence = lo; sequence <= hi; sequence++) { // Do work. } sequencer.publish(lo, hi);
- Parameters:
n
- the number of sequences to claim- Returns:
- the highest claimed sequence value
- See Also:
Sequenced.next(int)
-
tryNext
public long tryNext() throws InsufficientCapacityException
Description copied from interface:Sequenced
Attempt to claim the next event in sequence for publishing. Will return the number of the slot if there is at leastrequiredCapacity
slots available.- Returns:
- the claimed sequence value
- Throws:
InsufficientCapacityException
- thrown if there is no space available in the ring buffer.- See Also:
Sequenced.tryNext()
-
tryNext
public long tryNext(int n) throws InsufficientCapacityException
Description copied from interface:Sequenced
Attempt to claim the next n events in sequence for publishing. Will return the highest numbered slot if there is at leastrequiredCapacity
slots available. Have a look atSequenced.next()
for a description on how to use this method.- Parameters:
n
- the number of sequences to claim- Returns:
- the claimed sequence value
- Throws:
InsufficientCapacityException
- thrown if there is no space available in the ring buffer.- See Also:
Sequenced.tryNext(int)
-
remainingCapacity
public long remainingCapacity()
Description copied from interface:Sequenced
Get the remaining capacity for this sequencer.- Returns:
- The number of slots remaining.
- See Also:
Sequenced.remainingCapacity()
-
initialiseAvailableBuffer
private void initialiseAvailableBuffer()
-
publish
public void publish(long sequence)
Description copied from interface:Sequenced
Publishes a sequence. Call when the event has been filled.- Parameters:
sequence
- the sequence to be published.- See Also:
Sequenced.publish(long)
-
publish
public void publish(long lo, long hi)
Description copied from interface:Sequenced
Batch publish sequences. Called when all of the events have been filled.- Parameters:
lo
- first sequence number to publishhi
- last sequence number to publish- See Also:
Sequenced.publish(long, long)
-
setAvailable
private void setAvailable(long sequence)
The below methods work on the availableBuffer flag.The prime reason is to avoid a shared sequence object between publisher threads. (Keeping single pointers tracking start and end would require coordination between the threads).
-- Firstly we have the constraint that the delta between the cursor and minimum gating sequence will never be larger than the buffer size (the code in next/tryNext in the Sequence takes care of that). -- Given that; take the sequence value and mask off the lower portion of the sequence as the index into the buffer (indexMask). (aka modulo operator) -- The upper portion of the sequence becomes the value to check for availability. ie: it tells us how many times around the ring buffer we've been (aka division) -- Because we can't wrap without the gating sequences moving forward (i.e. the minimum gating sequence is effectively our last available position in the buffer), when we have new data and successfully claimed a slot we can simply write over the top.
-
setAvailableBufferValue
private void setAvailableBufferValue(int index, int flag)
-
isAvailable
public boolean isAvailable(long sequence)
Description copied from interface:Sequencer
Confirms if a sequence is published and the event is available for use; non-blocking.- Parameters:
sequence
- of the buffer to check- Returns:
- true if the sequence is available for use, false if not
- See Also:
Sequencer.isAvailable(long)
-
getHighestPublishedSequence
public long getHighestPublishedSequence(long lowerBound, long availableSequence)
Description copied from interface:Sequencer
Get the highest sequence number that can be safely read from the ring buffer. Depending on the implementation of the Sequencer this call may need to scan a number of values in the Sequencer. The scan will range from nextSequence to availableSequence. If there are no available values>= nextSequence
the return value will benextSequence - 1
. To work correctly a consumer should pass a value that is 1 higher than the last sequence that was successfully processed.- Parameters:
lowerBound
- The sequence to start scanning from.availableSequence
- The sequence to scan to.- Returns:
- The highest value that can be safely read, will be at least
nextSequence - 1
.
-
calculateAvailabilityFlag
private int calculateAvailabilityFlag(long sequence)
-
calculateIndex
private int calculateIndex(long sequence)
-
-