Class Disruptor<T>
- java.lang.Object
-
- com.lmax.disruptor.dsl.Disruptor<T>
-
- Type Parameters:
T
- the type of event used.
public class Disruptor<T> extends java.lang.Object
A DSL-style API for setting up the disruptor pattern around a ring buffer (aka the Builder pattern).
A simple example of setting up the disruptor with two event handlers that must process events in order:
Disruptor<MyEvent> disruptor = new Disruptor<MyEvent>(MyEvent.FACTORY, 32, Executors.newCachedThreadPool()); EventHandler<MyEvent> handler1 = new EventHandler<MyEvent>() { ... }; EventHandler<MyEvent> handler2 = new EventHandler<MyEvent>() { ... }; disruptor.handleEventsWith(handler1); disruptor.after(handler1).handleEventsWith(handler2); RingBuffer ringBuffer = disruptor.start();
-
-
Field Summary
Fields Modifier and Type Field Description private ConsumerRepository<T>
consumerRepository
private ExceptionHandler<? super T>
exceptionHandler
private java.util.concurrent.Executor
executor
private RingBuffer<T>
ringBuffer
private java.util.concurrent.atomic.AtomicBoolean
started
-
Constructor Summary
Constructors Modifier Constructor Description Disruptor(EventFactory<T> eventFactory, int ringBufferSize, java.util.concurrent.Executor executor)
Deprecated.Use aThreadFactory
instead of anExecutor
as a the ThreadFactory is able to report errors when it is unable to construct a thread to run a producer.Disruptor(EventFactory<T> eventFactory, int ringBufferSize, java.util.concurrent.Executor executor, ProducerType producerType, WaitStrategy waitStrategy)
Deprecated.Use aThreadFactory
instead of anExecutor
as a the ThreadFactory is able to report errors when it is unable to construct a thread to run a producer.Disruptor(EventFactory<T> eventFactory, int ringBufferSize, java.util.concurrent.ThreadFactory threadFactory)
Create a new Disruptor.Disruptor(EventFactory<T> eventFactory, int ringBufferSize, java.util.concurrent.ThreadFactory threadFactory, ProducerType producerType, WaitStrategy waitStrategy)
Create a new Disruptor.private
Disruptor(RingBuffer<T> ringBuffer, java.util.concurrent.Executor executor)
Private constructor helper
-
Method Summary
All Methods Instance Methods Concrete Methods Deprecated Methods Modifier and Type Method Description EventHandlerGroup<T>
after(EventHandler<T>... handlers)
Create a group of event handlers to be used as a dependency.EventHandlerGroup<T>
after(EventProcessor... processors)
Create a group of event processors to be used as a dependency.private void
checkNotStarted()
private void
checkOnlyStartedOnce()
(package private) EventHandlerGroup<T>
createEventProcessors(Sequence[] barrierSequences, EventProcessorFactory<T>[] processorFactories)
(package private) EventHandlerGroup<T>
createEventProcessors(Sequence[] barrierSequences, EventHandler<? super T>[] eventHandlers)
(package private) EventHandlerGroup<T>
createWorkerPool(Sequence[] barrierSequences, WorkHandler<? super T>[] workHandlers)
T
get(long sequence)
Get the event for a given sequence in the RingBuffer.SequenceBarrier
getBarrierFor(EventHandler<T> handler)
Get theSequenceBarrier
used by a specific handler.long
getBufferSize()
The capacity of the data structure to hold entries.long
getCursor()
Get the value of the cursor indicating the published sequence.RingBuffer<T>
getRingBuffer()
TheRingBuffer
used by this Disruptor.long
getSequenceValueFor(EventHandler<T> b1)
Gets the sequence value for the specified event handlers.void
halt()
CallsEventProcessor.halt()
on all of the event processors created via this disruptor.EventHandlerGroup<T>
handleEventsWith(EventProcessorFactory<T>... eventProcessorFactories)
Set up custom event processors to handle events from the ring buffer.EventHandlerGroup<T>
handleEventsWith(EventHandler<? super T>... handlers)
Set up event handlers to handle events from the ring buffer.EventHandlerGroup<T>
handleEventsWith(EventProcessor... processors)
Set up custom event processors to handle events from the ring buffer.EventHandlerGroup<T>
handleEventsWithWorkerPool(WorkHandler<T>... workHandlers)
Set up aWorkerPool
to distribute an event to one of a pool of work handler threads.ExceptionHandlerSetting<T>
handleExceptionsFor(EventHandler<T> eventHandler)
Override the default exception handler for a specific handler.void
handleExceptionsWith(ExceptionHandler<? super T> exceptionHandler)
Deprecated.This method only applies to future event handlers.private boolean
hasBacklog()
Confirms if all messages have been consumed by all event processorsvoid
publishEvent(EventTranslator<T> eventTranslator)
Publish an event to the ring buffer.<A> void
publishEvent(EventTranslatorOneArg<T,A> eventTranslator, A arg)
Publish an event to the ring buffer.<A,B,C>
voidpublishEvent(EventTranslatorThreeArg<T,A,B,C> eventTranslator, A arg0, B arg1, C arg2)
Publish an event to the ring buffer.<A,B>
voidpublishEvent(EventTranslatorTwoArg<T,A,B> eventTranslator, A arg0, B arg1)
Publish an event to the ring buffer.<A> void
publishEvents(EventTranslatorOneArg<T,A> eventTranslator, A[] arg)
Publish a batch of events to the ring buffer.void
setDefaultExceptionHandler(ExceptionHandler<? super T> exceptionHandler)
Specify an exception handler to be used for event handlers and worker pools created by this Disruptor.void
shutdown()
Waits until all events currently in the disruptor have been processed by all event processors and then halts the processors.void
shutdown(long timeout, java.util.concurrent.TimeUnit timeUnit)
Waits until all events currently in the disruptor have been processed by all event processors and then halts the processors.RingBuffer<T>
start()
Starts the event processors and returns the fully configured ring buffer.java.lang.String
toString()
private void
updateGatingSequencesForNextInChain(Sequence[] barrierSequences, Sequence[] processorSequences)
-
-
-
Field Detail
-
ringBuffer
private final RingBuffer<T> ringBuffer
-
executor
private final java.util.concurrent.Executor executor
-
consumerRepository
private final ConsumerRepository<T> consumerRepository
-
started
private final java.util.concurrent.atomic.AtomicBoolean started
-
exceptionHandler
private ExceptionHandler<? super T> exceptionHandler
-
-
Constructor Detail
-
Disruptor
@Deprecated public Disruptor(EventFactory<T> eventFactory, int ringBufferSize, java.util.concurrent.Executor executor)
Deprecated.Use aThreadFactory
instead of anExecutor
as a the ThreadFactory is able to report errors when it is unable to construct a thread to run a producer.Create a new Disruptor. Will default toBlockingWaitStrategy
andProducerType
.MULTI- Parameters:
eventFactory
- the factory to create events in the ring buffer.ringBufferSize
- the size of the ring buffer.executor
- anExecutor
to execute event processors.
-
Disruptor
@Deprecated public Disruptor(EventFactory<T> eventFactory, int ringBufferSize, java.util.concurrent.Executor executor, ProducerType producerType, WaitStrategy waitStrategy)
Deprecated.Use aThreadFactory
instead of anExecutor
as a the ThreadFactory is able to report errors when it is unable to construct a thread to run a producer.Create a new Disruptor.- Parameters:
eventFactory
- the factory to create events in the ring buffer.ringBufferSize
- the size of the ring buffer, must be power of 2.executor
- anExecutor
to execute event processors.producerType
- the claim strategy to use for the ring buffer.waitStrategy
- the wait strategy to use for the ring buffer.
-
Disruptor
public Disruptor(EventFactory<T> eventFactory, int ringBufferSize, java.util.concurrent.ThreadFactory threadFactory)
Create a new Disruptor. Will default toBlockingWaitStrategy
andProducerType
.MULTI- Parameters:
eventFactory
- the factory to create events in the ring buffer.ringBufferSize
- the size of the ring buffer.threadFactory
- aThreadFactory
to create threads to for processors.
-
Disruptor
public Disruptor(EventFactory<T> eventFactory, int ringBufferSize, java.util.concurrent.ThreadFactory threadFactory, ProducerType producerType, WaitStrategy waitStrategy)
Create a new Disruptor.- Parameters:
eventFactory
- the factory to create events in the ring buffer.ringBufferSize
- the size of the ring buffer, must be power of 2.threadFactory
- aThreadFactory
to create threads for processors.producerType
- the claim strategy to use for the ring buffer.waitStrategy
- the wait strategy to use for the ring buffer.
-
Disruptor
private Disruptor(RingBuffer<T> ringBuffer, java.util.concurrent.Executor executor)
Private constructor helper
-
-
Method Detail
-
handleEventsWith
@SafeVarargs public final EventHandlerGroup<T> handleEventsWith(EventHandler<? super T>... handlers)
Set up event handlers to handle events from the ring buffer. These handlers will process events as soon as they become available, in parallel.
This method can be used as the start of a chain. For example if the handler
A
must process events before handlerB
:dw.handleEventsWith(A).then(B);
This call is additive, but generally should only be called once when setting up the Disruptor instance
- Parameters:
handlers
- the event handlers that will process events.- Returns:
- a
EventHandlerGroup
that can be used to chain dependencies.
-
handleEventsWith
@SafeVarargs public final EventHandlerGroup<T> handleEventsWith(EventProcessorFactory<T>... eventProcessorFactories)
Set up custom event processors to handle events from the ring buffer. The Disruptor will automatically start these processors when
start()
is called.This method can be used as the start of a chain. For example if the handler
A
must process events before handlerB
:dw.handleEventsWith(A).then(B);
Since this is the start of the chain, the processor factories will always be passed an empty
Sequence
array, so the factory isn't necessary in this case. This method is provided for consistency withEventHandlerGroup.handleEventsWith(EventProcessorFactory...)
andEventHandlerGroup.then(EventProcessorFactory...)
which do have barrier sequences to provide.This call is additive, but generally should only be called once when setting up the Disruptor instance
- Parameters:
eventProcessorFactories
- the event processor factories to use to create the event processors that will process events.- Returns:
- a
EventHandlerGroup
that can be used to chain dependencies.
-
handleEventsWith
public EventHandlerGroup<T> handleEventsWith(EventProcessor... processors)
Set up custom event processors to handle events from the ring buffer. The Disruptor will automatically start this processors when
start()
is called.This method can be used as the start of a chain. For example if the processor
A
must process events before handlerB
:dw.handleEventsWith(A).then(B);
- Parameters:
processors
- the event processors that will process events.- Returns:
- a
EventHandlerGroup
that can be used to chain dependencies.
-
handleEventsWithWorkerPool
@SafeVarargs public final EventHandlerGroup<T> handleEventsWithWorkerPool(WorkHandler<T>... workHandlers)
Set up aWorkerPool
to distribute an event to one of a pool of work handler threads. Each event will only be processed by one of the work handlers. The Disruptor will automatically start this processors whenstart()
is called.- Parameters:
workHandlers
- the work handlers that will process events.- Returns:
- a
EventHandlerGroup
that can be used to chain dependencies.
-
handleExceptionsWith
public void handleExceptionsWith(ExceptionHandler<? super T> exceptionHandler)
Deprecated.This method only applies to future event handlers. Use setDefaultExceptionHandler instead which applies to existing and new event handlers.Specify an exception handler to be used for any future event handlers.
Note that only event handlers set up after calling this method will use the exception handler.
- Parameters:
exceptionHandler
- the exception handler to use for any futureEventProcessor
.
-
setDefaultExceptionHandler
public void setDefaultExceptionHandler(ExceptionHandler<? super T> exceptionHandler)
Specify an exception handler to be used for event handlers and worker pools created by this Disruptor.
The exception handler will be used by existing and future event handlers and worker pools created by this Disruptor instance.
- Parameters:
exceptionHandler
- the exception handler to use.
-
handleExceptionsFor
public ExceptionHandlerSetting<T> handleExceptionsFor(EventHandler<T> eventHandler)
Override the default exception handler for a specific handler.disruptorWizard.handleExceptionsIn(eventHandler).with(exceptionHandler);
- Parameters:
eventHandler
- the event handler to set a different exception handler for.- Returns:
- an ExceptionHandlerSetting dsl object - intended to be used by chaining the with method call.
-
after
@SafeVarargs public final EventHandlerGroup<T> after(EventHandler<T>... handlers)
Create a group of event handlers to be used as a dependency. For example if the handler
A
must process events before handlerB
:dw.after(A).handleEventsWith(B);
- Parameters:
handlers
- the event handlers, previously set up withhandleEventsWith(com.lmax.disruptor.EventHandler[])
, that will form the barrier for subsequent handlers or processors.- Returns:
- an
EventHandlerGroup
that can be used to setup a dependency barrier over the specified event handlers.
-
after
public EventHandlerGroup<T> after(EventProcessor... processors)
Create a group of event processors to be used as a dependency.- Parameters:
processors
- the event processors, previously set up withhandleEventsWith(com.lmax.disruptor.EventProcessor...)
, that will form the barrier for subsequent handlers or processors.- Returns:
- an
EventHandlerGroup
that can be used to setup aSequenceBarrier
over the specified event processors. - See Also:
after(com.lmax.disruptor.EventHandler[])
-
publishEvent
public void publishEvent(EventTranslator<T> eventTranslator)
Publish an event to the ring buffer.- Parameters:
eventTranslator
- the translator that will load data into the event.
-
publishEvent
public <A> void publishEvent(EventTranslatorOneArg<T,A> eventTranslator, A arg)
Publish an event to the ring buffer.- Type Parameters:
A
- Class of the user supplied argument.- Parameters:
eventTranslator
- the translator that will load data into the event.arg
- A single argument to load into the event
-
publishEvents
public <A> void publishEvents(EventTranslatorOneArg<T,A> eventTranslator, A[] arg)
Publish a batch of events to the ring buffer.- Type Parameters:
A
- Class of the user supplied argument.- Parameters:
eventTranslator
- the translator that will load data into the event.arg
- An array single arguments to load into the events. One Per event.
-
publishEvent
public <A,B> void publishEvent(EventTranslatorTwoArg<T,A,B> eventTranslator, A arg0, B arg1)
Publish an event to the ring buffer.- Type Parameters:
A
- Class of the user supplied argument.B
- Class of the user supplied argument.- Parameters:
eventTranslator
- the translator that will load data into the event.arg0
- The first argument to load into the eventarg1
- The second argument to load into the event
-
publishEvent
public <A,B,C> void publishEvent(EventTranslatorThreeArg<T,A,B,C> eventTranslator, A arg0, B arg1, C arg2)
Publish an event to the ring buffer.- Type Parameters:
A
- Class of the user supplied argument.B
- Class of the user supplied argument.C
- Class of the user supplied argument.- Parameters:
eventTranslator
- the translator that will load data into the event.arg0
- The first argument to load into the eventarg1
- The second argument to load into the eventarg2
- The third argument to load into the event
-
start
public RingBuffer<T> start()
Starts the event processors and returns the fully configured ring buffer.
The ring buffer is set up to prevent overwriting any entry that is yet to be processed by the slowest event processor.
This method must only be called once after all event processors have been added.
- Returns:
- the configured ring buffer.
-
halt
public void halt()
CallsEventProcessor.halt()
on all of the event processors created via this disruptor.
-
shutdown
public void shutdown()
Waits until all events currently in the disruptor have been processed by all event processors and then halts the processors. It is critical that publishing to the ring buffer has stopped before calling this method, otherwise it may never return.
This method will not shutdown the executor, nor will it await the final termination of the processor threads.
-
shutdown
public void shutdown(long timeout, java.util.concurrent.TimeUnit timeUnit) throws TimeoutException
Waits until all events currently in the disruptor have been processed by all event processors and then halts the processors.
This method will not shutdown the executor, nor will it await the final termination of the processor threads.
- Parameters:
timeout
- the amount of time to wait for all events to be processed.-1
will give an infinite timeouttimeUnit
- the unit the timeOut is specified in- Throws:
TimeoutException
- if a timeout occurs before shutdown completes.
-
getRingBuffer
public RingBuffer<T> getRingBuffer()
TheRingBuffer
used by this Disruptor. This is useful for creating custom event processors if the behaviour ofBatchEventProcessor
is not suitable.- Returns:
- the ring buffer used by this Disruptor.
-
getCursor
public long getCursor()
Get the value of the cursor indicating the published sequence.- Returns:
- value of the cursor for events that have been published.
-
getBufferSize
public long getBufferSize()
The capacity of the data structure to hold entries.- Returns:
- the size of the RingBuffer.
- See Also:
Sequenced.getBufferSize()
-
get
public T get(long sequence)
Get the event for a given sequence in the RingBuffer.- Parameters:
sequence
- for the event.- Returns:
- event for the sequence.
- See Also:
RingBuffer.get(long)
-
getBarrierFor
public SequenceBarrier getBarrierFor(EventHandler<T> handler)
Get theSequenceBarrier
used by a specific handler. Note that theSequenceBarrier
may be shared by multiple event handlers.- Parameters:
handler
- the handler to get the barrier for.- Returns:
- the SequenceBarrier used by handler.
-
getSequenceValueFor
public long getSequenceValueFor(EventHandler<T> b1)
Gets the sequence value for the specified event handlers.- Parameters:
b1
- eventHandler to get the sequence for.- Returns:
- eventHandler's sequence
-
hasBacklog
private boolean hasBacklog()
Confirms if all messages have been consumed by all event processors
-
createEventProcessors
EventHandlerGroup<T> createEventProcessors(Sequence[] barrierSequences, EventHandler<? super T>[] eventHandlers)
-
updateGatingSequencesForNextInChain
private void updateGatingSequencesForNextInChain(Sequence[] barrierSequences, Sequence[] processorSequences)
-
createEventProcessors
EventHandlerGroup<T> createEventProcessors(Sequence[] barrierSequences, EventProcessorFactory<T>[] processorFactories)
-
createWorkerPool
EventHandlerGroup<T> createWorkerPool(Sequence[] barrierSequences, WorkHandler<? super T>[] workHandlers)
-
checkNotStarted
private void checkNotStarted()
-
checkOnlyStartedOnce
private void checkOnlyStartedOnce()
-
toString
public java.lang.String toString()
- Overrides:
toString
in classjava.lang.Object
-
-