/claim #9844 /close #9844

Summary

This PR adds a shutdownCause method to Queue, Dequeue, and Enqueue. This allows shutting down a queue with a specific Cause[Nothing], which is then propagated to all fibers currently suspended on take or offer, and causes all future interactions with the queue to fail with that specific cause.

Motivation

Currently, Queue#shutdown interrupts all waiting fibers with a generic interruption. However, in many concurrent patterns, a fiber that “owns” a queue might fail with a specific defect. It is often necessary to propagate the real cause of that failure to the producers or consumers communicating via the queue so they can handle it or report it correctly.

By using shutdownCause(Cause.die(ex)), the owner of the queue can ensure that all other parties are notified of the failure reason immediately.

Changes

zio-core

  • Dequeue / Enqueue traits: Added def shutdownCause(cause: Cause[Nothing])(implicit trace: Trace): UIO[Unit].
  • QueueImpl:
    • Refactored internal shutdownFlag from AtomicBoolean to AtomicReference[Cause[Nothing]].
    • Implemented shutdownCause using compareAndSet to ensure atomicity (the first caller to shut down the queue sets the “winning” cause).
    • Updated offer, take, poll, size, etc., to check the shutdown cause and fail with Exit.failCause(cause) if the queue is shut down.
    • Updated shutdown to call shutdownCause(Cause.interrupt(fiberId)), ensuring full backward compatibility.
  • Strategy API: Updated the internal shutdown signature and implementations (BackPressure, Dropping, Sliding) to accept and propagate the shutdown cause to pending putters.

zio-streams

  • Verified that ZStream.fromQueue correctly handles the shutdown cause. Because it already uses catchAllCause on queue.take, it will now either end gracefully (if the cause is a simple interruption) or fail the stream (if the cause is a defect).

Compatibility

This change is source and binary compatible for existing users calling shutdown.

We chose Cause[Nothing] instead of adding a typed error parameter E to Queue to avoid a massive breaking change that would require updating almost every usage of Queue in the ecosystem. This provides the best balance between functionality and compatibility.

Verification Results

Automated Tests

Added a new suite to QueueSpec.scala:

  • shutdownCause interrupts pending takers: Verifies that a fiber blocked on take resumes with the specified cause.
  • shutdownCause interrupts pending putters: Verifies that a fiber blocked on offer (backpressure) resumes with the specified cause.
  • future offers fail with cause: Verifies that calling offer after shutdown fails immediately.
  • future takes fail with cause: Verifies that calling take after shutdown fails immediately.
  • shutdownCause is atomic: Verifies that if multiple fibers call shutdownCause concurrently, only the first one’s cause is preserved.

Checklist

  • Added tests in QueueSpec.scala
  • Verified ZStream integration
  • Ensured binary compatibility of the Queue trait

Claim

Total prize pool $100
Total paid $0
Status Pending
Submitted January 24, 2026
Last updated January 24, 2026

Contributors

NA

nathan9513-aps

@nathan9513-aps

100%

Sponsors

ZI

ZIO

@ZIO

$100