跳到主要内容

核心概念

RingBuffer[T]

泛型预分配环形缓冲区,容量必须是 2 的幂。

rb, _ := seqflow.NewRingBuffer[MyEvent](1024)
rb.Set(seq, MyEvent{Value: 42})
event := rb.Get(seq) // 返回 *MyEvent(零拷贝指针)

Get() 返回指向缓冲区槽位的指针,该指针在生产者回绕覆写之前有效。

Reserve / Commit

两阶段发布协议:

upper, err := d.Reserve(1)     // 预留 1 个槽位
lower := upper // 单槽位时 lower == upper
d.RingBuffer().Set(lower, data) // 写入数据
d.Commit(lower, upper) // 对消费者可见

批量发布:

upper, _ := d.Reserve(16)       // 一次预留 16 个槽位
lower := upper - 15
for seq := lower; seq <= upper; seq++ {
d.RingBuffer().Set(seq, data)
}
d.Commit(lower, upper) // 16 个槽位一次性发布

批量预留摊销了原子操作开销。Reserve(16) 每条消息比 channel 快 ~160 倍。

Handler

消费者回调接口:

type Handler interface {
Handle(lower, upper int64)
}

容量(Capacity)

WithCapacity(n) 设置环形缓冲区的槽位数量。

  • 必须是 2 的幂(用位运算取模:seq & (capacity-1)
  • 太小:生产者频繁等待消费者
  • 太大:浪费内存,影响缓存局部性
  • 推荐:大多数场景 1024 ~ 65536

内存占用 = n * sizeof(T)