Fixes #9810
/claim #9810
The ZStream.buffer(1) method was incorrectly buffering 2 elements instead of 1, due to the asynchronous nature of the underlying queue implementation. This occurred because the queue allowed concurrent offer and take operations, leading to a race condition where 2 elements could be buffered simultaneously.
Implemented a special case for buffer(1) that uses a synchronous approach:
bufferOne method: A dedicated implementation for capacity=1 that uses Semaphore(1) to ensure only one element can be buffered at any timerunIntoQueueElementsSynchronous: A synchronous version of queue operations that wraps all queue interactions with semaphore permitsCreated comprehensive test suite in BufferOneSpec.scala:
buffer(1) only buffers exactly 1 element using TestClockbuffer(1) and buffer(2)buffer(1) uses the new implementationstreams/shared/src/main/scala/zio/stream/ZStream.scala:
capacity == 1bufferOne method with semaphore-controlled synchronizationrunIntoQueueElementsSynchronous for synchronized queue operationsstreams-tests/shared/src/test/scala/zio/stream/BufferOneSpec.scala:
The fix ensures that ZStream.buffer(1) behaves correctly as documented, buffering exactly 1 element instead of 2, while maintaining full compatibility with existing code.
JoΓ£o Luccas
@jucalast
ZIO
@ZIO