Fixes #9810 where ZStream.buffer(1) was buffering 2 elements.

Root cause

buffer(capacity) called toQueueOfElements(capacity), which creates a Queue.bounded(capacity). Because Queue.bounded(n) can hold n items, the producer can fill the queue and then the consumer dequeues one element — at which point the producer is unblocked to add another. The result: the consumer holds 1 element plus up to capacity elements sit in the queue, giving capacity + 1 total in-flight elements. For buffer(1) this means 2 elements are prefetched, not 1.

Fix

Two cases:

  1. capacity == 1: use ZStream.Handoff (the existing synchronous one-slot rendezvous primitive already in ZStream). Handoff.offer blocks until Handoff.take is called, so the producer is limited to exactly 1 element ahead of the consumer — strict buffer(1) semantics.

  2. capacity > 1: use Queue.bounded(capacity - 1). The queue holds capacity - 1 items; the consumer holds 1; total in-flight = capacity.

The change is local to ZStream.buffer and does not touch queue internals, bufferChunks, bufferDropping, or any other variant.

Regression tests

Added two nonFlaky tests:

  • buffer(1) prefetches exactly 1 element - not 2: blocks element-2 production behind a Promise, unblocks it, then verifies element-3 has not started while element-1 is still being consumed.
  • buffer(2) can prefetch a third element: same setup with buffer(2), verifies element-3 has started once element-2 is unblocked (correct prefetch behaviour).

Validation

sbt -Dsbt.supershell=false "streamsTestsJVM/testOnly zio.stream.ZStreamSpec -- -t buffer"

Closes #9810

/claim #9810

Claim

Total prize pool $250
Total paid $0
Status Pending
Submitted March 11, 2026
Last updated March 11, 2026

Contributors

CH

CharlesWong

@CharlesWong

100%

Sponsors

ZI

ZIO

@ZIO

$250