Fixes #9810 where ZStream.buffer(1) was buffering 2 elements.
buffer(capacity) called toQueueOfElements(capacity), which creates a Queue.bounded(capacity). Because Queue.bounded(n) can hold n items, the producer can fill the queue and then the consumer dequeues one element — at which point the producer is unblocked to add another. The result: the consumer holds 1 element plus up to capacity elements sit in the queue, giving capacity + 1 total in-flight elements. For buffer(1) this means 2 elements are prefetched, not 1.
Two cases:
capacity == 1: use ZStream.Handoff (the existing synchronous one-slot rendezvous primitive already in ZStream). Handoff.offer blocks until Handoff.take is called, so the producer is limited to exactly 1 element ahead of the consumer — strict buffer(1) semantics.
capacity > 1: use Queue.bounded(capacity - 1). The queue holds capacity - 1 items; the consumer holds 1; total in-flight = capacity.
The change is local to ZStream.buffer and does not touch queue internals, bufferChunks, bufferDropping, or any other variant.
Added two nonFlaky tests:
buffer(1) prefetches exactly 1 element - not 2: blocks element-2 production behind a Promise, unblocks it, then verifies element-3 has not started while element-1 is still being consumed.buffer(2) can prefetch a third element: same setup with buffer(2), verifies element-3 has started once element-2 is unblocked (correct prefetch behaviour).sbt -Dsbt.supershell=false "streamsTestsJVM/testOnly zio.stream.ZStreamSpec -- -t buffer"
Closes #9810
/claim #9810
CharlesWong
@CharlesWong
ZIO
@ZIO