Implements the Queue shutdown improvements requested in #9844.
When a worker fiber processing queue messages fails, it needs to propagate the failure to:
take or offerThe existing shutdown only sends a generic interrupt, making it impossible for callers to distinguish why the queue shut down.
// Shutdown with a specific cause — returns buffered items for cleanup
val buffered: UIO[Chunk[Request]] = queue.shutdownCause(Cause.fail(ConnectionClosed("host unreachable")))
// Convenience: wrap a value in Cause.fail
val buffered: UIO[Chunk[Request]] = queue.shutdownWith(ConnectionClosed("host unreachable"))
// Inspect the shutdown cause
val maybeReason: UIO[Option[Cause[Any]]] = queue.shutdownCauseOption
// Fibers blocked on take() receive the cause:
queue.take.catchAll { e: ConnectionClosed => handleError(e) }
Queue.shutdownCause(cause: Cause[Any]): UIO[Chunk[A]] — atomic shutdown with cause propagationQueue.shutdownWith[E](error: E): UIO[Chunk[A]] — convenience wrapper: shutdownCause(Cause.fail(e))Queue.shutdownCauseOption: UIO[Option[Cause[Any]]] — inspect the shutdown causeAtomicBoolean CAS as shutdown; concurrent callers get idempotent empty resultBackPressure strategy extended to fail blocked offerAll putters with the typed causeExit.failCause(cause) via the unsafe Promise APIawaitShutdown works correctly with shutdownCause (same shutdownHook Promise)isShutdown set to trueshutdownCauseOption before/after shutdownshutdownCauseOption is None for regular shutdownawaitShutdown triggered by shutdownCauseshutdownWith convenience methodshutdownCause returns empty chunkshutdownCause after shutdown is a no-opCause.die) propagationThis PR goes further than the competitor:
shutdownCauseOption to inspect the cause post-shutdownshutdownCause for caller cleanupCause.die (defects), not just Cause.failCloses #9844
/claim #9844
CharlesWong
@CharlesWong
Supreme Labs
@supreme2580
ZIO
@ZIO