Fixes #9810
/claim #9810
The ZStream.buffer(1)
method was incorrectly buffering 2 elements instead of 1, due to the asynchronous nature of the underlying queue implementation. This occurred because the queue allowed concurrent offer
and take
operations, leading to a race condition where 2 elements could be buffered simultaneously.
Implemented a special case for buffer(1)
that uses a synchronous approach:
bufferOne
method: A dedicated implementation for capacity=1 that uses Semaphore(1)
to ensure only one element can be buffered at any timerunIntoQueueElementsSynchronous
: A synchronous version of queue operations that wraps all queue interactions with semaphore permitsCreated comprehensive test suite in BufferOneSpec.scala
:
buffer(1)
only buffers exactly 1 element using TestClockbuffer(1)
and buffer(2)
buffer(1)
uses the new implementationstreams/shared/src/main/scala/zio/stream/ZStream.scala
:
capacity == 1
bufferOne
method with semaphore-controlled synchronizationrunIntoQueueElementsSynchronous
for synchronized queue operationsstreams-tests/shared/src/test/scala/zio/stream/BufferOneSpec.scala
:
The fix ensures that ZStream.buffer(1)
behaves correctly as documented, buffering exactly 1 element instead of 2, while maintaining full compatibility with existing code.
JoΓ£o Luccas
@jucalast
ZIO
@ZIO