/claim #9844 /close #9844
This PR adds a shutdownCause method to Queue, Dequeue, and Enqueue. This allows shutting down a queue with a specific Cause[Nothing], which is then propagated to all fibers currently suspended on take or offer, and causes all future interactions with the queue to fail with that specific cause.
Currently, Queue#shutdown interrupts all waiting fibers with a generic interruption. However, in many concurrent patterns, a fiber that “owns” a queue might fail with a specific defect. It is often necessary to propagate the real cause of that failure to the producers or consumers communicating via the queue so they can handle it or report it correctly.
By using shutdownCause(Cause.die(ex)), the owner of the queue can ensure that all other parties are notified of the failure reason immediately.
zio-coreDequeue / Enqueue traits: Added def shutdownCause(cause: Cause[Nothing])(implicit trace: Trace): UIO[Unit].QueueImpl:
shutdownFlag from AtomicBoolean to AtomicReference[Cause[Nothing]].shutdownCause using compareAndSet to ensure atomicity (the first caller to shut down the queue sets the “winning” cause).offer, take, poll, size, etc., to check the shutdown cause and fail with Exit.failCause(cause) if the queue is shut down.shutdown to call shutdownCause(Cause.interrupt(fiberId)), ensuring full backward compatibility.Strategy API: Updated the internal shutdown signature and implementations (BackPressure, Dropping, Sliding) to accept and propagate the shutdown cause to pending putters.zio-streamsZStream.fromQueue correctly handles the shutdown cause. Because it already uses catchAllCause on queue.take, it will now either end gracefully (if the cause is a simple interruption) or fail the stream (if the cause is a defect).This change is source and binary compatible for existing users calling shutdown.
We chose Cause[Nothing] instead of adding a typed error parameter E to Queue to avoid a massive breaking change that would require updating almost every usage of Queue in the ecosystem. This provides the best balance between functionality and compatibility.
Added a new suite to QueueSpec.scala:
shutdownCause interrupts pending takers: Verifies that a fiber blocked on take resumes with the specified cause.shutdownCause interrupts pending putters: Verifies that a fiber blocked on offer (backpressure) resumes with the specified cause.future offers fail with cause: Verifies that calling offer after shutdown fails immediately.future takes fail with cause: Verifies that calling take after shutdown fails immediately.shutdownCause is atomic: Verifies that if multiple fibers call shutdownCause concurrently, only the first one’s cause is preserved.ZStream integrationQueue traitnathan9513-aps
@nathan9513-aps
ZIO
@ZIO