This PR addresses two critical race conditions in ZIO that can cause data loss and unexpected interruption behavior in high-concurrency scenarios:
uninterruptibleMask
where interruption could occur at the boundary between async operations and their continuationsItems could disappear when Queue.take
operations were interrupted during concurrent offer
operations. This happened when:
queue.take()
on an empty queue, creating a promise and adding it to the takers
dequequeue.offer(item)
, polls the promise from takers
, and attempts to complete ittakers
The unsafeCompletePromise
method ignored the return value of Promise.unsafe.done()
, which indicates whether promise completion was successful (returns false
if the promise was already completed due to interruption).
unsafeCompletePromise
to return a Boolean
indicating completion successoffer
, offerAll
, and unsafeCompleteTakers
methods to check promise completion successThere was an interruption “gap” in uninterruptibleMask
where interruption could occur at the boundary between restore
and subsequent operations like flatMap
, even when the restored operation itself was uninterruptible.
This happened when:
uninterruptibleMask
sets the fiber to uninterruptible moderestore
temporarily restores interruptibility for the inner operationasyncMaybe
or asyncInterrupt
returns an immediate result instead of suspendingflatMap
continuation should continue with the result, but if interruption happens in the gap, the continuation is lostIn FiberRuntime.runLoop
, after initiateAsync
returns an immediate result from asyncMaybe
/asyncInterrupt
, the code checks for interruption before processing the continuation:
if (shouldInterrupt()) {
cur = Exit.failCause(getInterruptedCause())
}
This check happens after getting the immediate result but before continuing with the next operation, creating the interruption gap.
initiateAsync
uninterruptibleMask
semantics are preservedcore/shared/src/main/scala/zio/Queue.scala
: Queue race condition fixcore/shared/src/main/scala/zio/internal/FiberRuntime.scala
: Interruption gap fixcore-tests/shared/src/test/scala/zio/QueueSpec.scala
: Added tests for Queue race conditionscore-tests/shared/src/test/scala/zio/ZIOSpec.scala
: Added test for interruption gap fixBoth fixes:
The fixes include comprehensive test cases that:
Closes #9973 Closes #9974
/claim #9973
Vishwanath Martur
@vishwamartur
ZIO
@ZIO