# backpressure-pipeline

**A bounded fan-out pipeline with an explicit load-shed policy and a leak-free structured shutdown.**

A self-contained building block for the common shape "one fast producer
→ N async workers" where overload must degrade *predictably* instead of
crashing.

## The problem

A producer feeding N async workers has exactly one correct behaviour
under overload, and two common bugs:

- **Unbounded queue.** `mpsc::unbounded_channel()` applies no
  back-pressure, so a slow consumer turns a fast producer into an OOM —
  a crash, not a slowdown.
- **Implicit / wrong drop policy.** When the buffer is full you *must*
  choose what to discard. A **position/state stream** wants the freshest
  sample → drop the **oldest**. An **event/command stream** needs order
  and visibility of loss → drop the **newest** (and alarm). A bounded
  `tokio::mpsc` only offers drop-newest (`try_send` → `Full`);
  drop-oldest needs a real ring.

There is also a shutdown sequence people get wrong: *stop intake → close
the queue → let workers drain the bounded backlog → bounded deadline →
abort stragglers*. Detached tasks that are never joined leak work.

## The approach

A bounded MPMC hand-off queue built as a `Semaphore` (permits == items
available) over a `Mutex<VecDeque>`. Permit accounting is exact — a
held permit *is* a reserved item — so there is no lost-wakeup reasoning
to get wrong. The shed policy (`DropNewest` / `DropOldest`) is a
required, explicit choice. Shutdown is structured via a
`CancellationToken` tree and a `JoinSet`; **every** spawned task is
joined and the run reports `workers_joined` / `aborted` so leaks are
observable. The conservation law `produced == processed + dropped` is
asserted in the demo and the tests.

## Run it

```bash
cargo run --bin demo     # policy illustration + overload stress + conservation
cargo test               # deterministic policy + conservation + no-leak tests
```

Sample demo output:

```
== policy illustration: buffer=4, push ids 0..8, then drain ==
  DropNewest keeps [0, 1, 2, 3]  (newest arrivals shed; backlog order preserved)
  DropOldest keeps [4, 5, 6, 7]  (oldest shed; always the freshest state)

== overload stress: 4000-item burst, buffer=16, 4 slow workers ==
policy        produced  processed  dropped   joined  aborted
DropNewest        4000         56     3944        4        0
DropOldest        4000         80     3920        4        0
conservation held for both policies; no tasks leaked. demo OK.
```

## What the tests prove

- `drop_newest_keeps_oldest_items` / `drop_oldest_keeps_newest_items` —
  fully deterministic proof of each policy's semantics.
- `close_drains_before_none` — a worker never exits while items remain.
- `conservation_and_no_leak` — under real overload, both policies satisfy
  `produced == processed + dropped` and every worker is joined.
- `external_stop_is_clean` — an external cancel stops intake early, yet
  conservation still holds and all workers are still joined (structured,
  not detached).

## License

MIT OR Apache-2.0.
