Skip to main content

API Reference

Creating a Disruptor

d, err := seqflow.New[T](opts ...Option) (*Disruptor[T], error)

Options

OptionDefaultDescription
WithCapacity(n)1024Ring buffer size (power of 2)
WithWriterCount(n)1Concurrent writers. >1 enables shared sequencer
WithWaitStrategy(s)SleepingStrategyBackpressure strategy
WithMetrics(m)nilOptional metrics collector
WithHandler(name, h, opts...)requiredRegister a named handler

Handler Options

OptionDescription
DependsOn(names...)Declare dependencies on other handlers

Disruptor Methods

Reserve

func (d *Disruptor[T]) Reserve(count uint32) (int64, error)

Claims count slots in the ring buffer. Returns the upper sequence number. Blocks if buffer is full.

TryReserve

func (d *Disruptor[T]) TryReserve(count uint32) (int64, error)

Non-blocking version. Returns ErrCapacityUnavailable if buffer is full.

Commit

func (d *Disruptor[T]) Commit(lower, upper int64)

Makes reserved slots visible to consumers. Must be called exactly once after Reserve.

RingBuffer

func (d *Disruptor[T]) RingBuffer() *RingBuffer[T]

Returns the ring buffer for reading/writing events.

Listen

func (d *Disruptor[T]) Listen()

Blocks the calling goroutine, running all consumer handlers. Call with go d.Listen().

Close

func (d *Disruptor[T]) Close() error

Immediately stops all consumers without draining.

Drain

func (d *Disruptor[T]) Drain(ctx context.Context) error

Waits for all committed events to be processed, then stops. Respects context cancellation.

Close and Drain are mutually exclusive. Second call returns ErrClosed.

Errors

ErrorDescription
ErrInvalidReservationZero slots or exceeds capacity
ErrCapacityUnavailableTryReserve: buffer full
ErrClosedDisruptor has been shut down
ErrInvalidCapacityCapacity not a positive power of 2
ErrNoHandlersNo handlers registered
ErrDuplicateHandlerDuplicate handler name
ErrUnknownDependencyDependsOn references unknown handler
ErrCyclicDependencyCircular dependency detected