backpressure-pipeline

Personal systems project · Rust / tokio · source below, raw files linked per section

← all projects

README.md (raw)

# backpressure-pipeline

**A bounded fan-out pipeline with an explicit load-shed policy and a leak-free structured shutdown.**

A self-contained building block for the common shape "one fast producer
→ N async workers" where overload must degrade *predictably* instead of
crashing.

## The problem

A producer feeding N async workers has exactly one correct behaviour
under overload, and two common bugs:

- **Unbounded queue.** `mpsc::unbounded_channel()` applies no
  back-pressure, so a slow consumer turns a fast producer into an OOM —
  a crash, not a slowdown.
- **Implicit / wrong drop policy.** When the buffer is full you *must*
  choose what to discard. A **position/state stream** wants the freshest
  sample → drop the **oldest**. An **event/command stream** needs order
  and visibility of loss → drop the **newest** (and alarm). A bounded
  `tokio::mpsc` only offers drop-newest (`try_send` → `Full`);
  drop-oldest needs a real ring.

There is also a shutdown sequence people get wrong: *stop intake → close
the queue → let workers drain the bounded backlog → bounded deadline →
abort stragglers*. Detached tasks that are never joined leak work.

## The approach

A bounded MPMC hand-off queue built as a `Semaphore` (permits == items
available) over a `Mutex<VecDeque>`. Permit accounting is exact — a
held permit *is* a reserved item — so there is no lost-wakeup reasoning
to get wrong. The shed policy (`DropNewest` / `DropOldest`) is a
required, explicit choice. Shutdown is structured via a
`CancellationToken` tree and a `JoinSet`; **every** spawned task is
joined and the run reports `workers_joined` / `aborted` so leaks are
observable. The conservation law `produced == processed + dropped` is
asserted in the demo and the tests.

## Run it

```bash
cargo run --bin demo     # policy illustration + overload stress + conservation
cargo test               # deterministic policy + conservation + no-leak tests
```

Sample demo output:

```
== policy illustration: buffer=4, push ids 0..8, then drain ==
  DropNewest keeps [0, 1, 2, 3]  (newest arrivals shed; backlog order preserved)
  DropOldest keeps [4, 5, 6, 7]  (oldest shed; always the freshest state)

== overload stress: 4000-item burst, buffer=16, 4 slow workers ==
policy        produced  processed  dropped   joined  aborted
DropNewest        4000         56     3944        4        0
DropOldest        4000         80     3920        4        0
conservation held for both policies; no tasks leaked. demo OK.
```

## What the tests prove

- `drop_newest_keeps_oldest_items` / `drop_oldest_keeps_newest_items` —
  fully deterministic proof of each policy's semantics.
- `close_drains_before_none` — a worker never exits while items remain.
- `conservation_and_no_leak` — under real overload, both policies satisfy
  `produced == processed + dropped` and every worker is joined.
- `external_stop_is_clean` — an external cancel stops intake early, yet
  conservation still holds and all workers are still joined (structured,
  not detached).

## License

MIT OR Apache-2.0.

Cargo.toml (raw)

[package]
name = "backpressure-pipeline"
version = "0.1.0"
edition = "2021"
description = "A bounded fan-out pipeline with explicit load-shed policy and leak-free structured shutdown."
license = "MIT OR Apache-2.0"
publish = false

[dependencies]
tokio = { version = "1", features = ["rt-multi-thread", "macros", "sync", "time", "signal"] }
tokio-util = { version = "0.7", features = ["rt"] }

[dev-dependencies]
tokio = { version = "1", features = ["rt-multi-thread", "macros", "sync", "time", "test-util"] }

[[bin]]
name = "demo"
path = "src/main.rs"

[lib]
name = "backpressure_pipeline"
path = "src/lib.rs"

src/lib.rs (raw)

//! # backpressure-pipeline
//!
//! A bounded fan-out work pipeline with an **explicit, chosen** load-shed
//! policy and a **leak-free structured shutdown**.
//!
//! ## The problem this solves
//!
//! A producer feeding N async workers has exactly one correct behaviour
//! under overload, and two common bugs:
//!
//! * **Bug 1 — unbounded queue.** `mpsc::unbounded_channel()` never
//!   applies back-pressure, so a slow consumer turns a fast producer
//!   into an OOM. On a ground station ingesting drone telemetry that is
//!   a crash, not a slowdown.
//! * **Bug 2 — implicit / wrong drop policy.** When the buffer is full
//!   you *must* decide what to discard. For a **position stream** the
//!   freshest sample matters → drop the *oldest*. For an **event/command
//!   stream** every item matters in order → drop the *newest* (and
//!   alarm). A bounded `tokio::mpsc` only gives you drop-newest
//!   (`try_send` → `Full`); drop-oldest needs a real ring. This crate
//!   makes the choice explicit and correct for both.
//!
//! It also shows the shutdown sequence people get wrong: *stop intake →
//! close the queue → let workers drain the bounded backlog → bounded
//! deadline → abort stragglers* — and asserts **every spawned task is
//! joined**, so nothing is detached and leaked.
//!
//! The bounded queue is a `Semaphore` (permits == items available) over
//! a `Mutex<VecDeque>`. Permit accounting is exact, so there is no
//! lost-wakeup reasoning to get wrong — see [`BoundedFanout`].

use std::collections::VecDeque;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, Mutex};
use std::time::Duration;
use tokio::task::JoinSet;
use tokio_util::sync::CancellationToken;

/// What to discard when the buffer is full. There is no safe default —
/// the caller must pick based on the stream's semantics.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum DropPolicy {
    /// Keep the backlog, discard the just-arrived item (events/commands:
    /// order matters, loss must be visible).
    DropNewest,
    /// Discard the oldest queued item to make room (position/state:
    /// only the freshest sample matters).
    DropOldest,
}

struct Inner<T> {
    q: Mutex<VecDeque<T>>,
    cap: usize,
    policy: DropPolicy,
    dropped: AtomicU64,
}

/// A bounded MPMC hand-off queue.
///
/// Invariant: `items.available_permits()` + (permits held by consumers
/// mid-`recv`) == `q.len()`. Every `push` that enqueues adds exactly one
/// permit; every successful `recv` consumes exactly one and pops exactly
/// one. A drop-oldest eviction removes one and adds one → permit count
/// unchanged. Because a held permit *is* a reserved item, a consumer
/// that acquired a permit is guaranteed a `Some` — no lost wakeups.
#[derive(Clone)]
pub struct BoundedFanout<T> {
    inner: Arc<Inner<T>>,
    items: Arc<tokio::sync::Semaphore>,
}

impl<T> BoundedFanout<T> {
    pub fn new(cap: usize, policy: DropPolicy) -> Self {
        assert!(cap > 0, "capacity must be > 0");
        Self {
            inner: Arc::new(Inner {
                q: Mutex::new(VecDeque::with_capacity(cap)),
                cap,
                policy,
                dropped: AtomicU64::new(0),
            }),
            items: Arc::new(tokio::sync::Semaphore::new(0)),
        }
    }

    /// Enqueue, applying the shed policy if full. Never blocks, never
    /// allocates past `cap`, never panics. Returns `true` if the item
    /// was enqueued, `false` if it (or an older item) was shed.
    pub fn push(&self, item: T) -> bool {
        let mut q = self.inner.q.lock().unwrap();
        if q.len() < self.inner.cap {
            q.push_back(item);
            drop(q);
            self.items.add_permits(1);
            true
        } else {
            match self.inner.policy {
                DropPolicy::DropNewest => {
                    self.inner.dropped.fetch_add(1, Ordering::Relaxed);
                    false
                }
                DropPolicy::DropOldest => {
                    q.pop_front();
                    q.push_back(item);
                    // permit count intentionally unchanged: -1 +1.
                    self.inner.dropped.fetch_add(1, Ordering::Relaxed);
                    false
                }
            }
        }
    }

    /// Await the next item. Returns `None` once the queue is
    /// [`close`](Self::close)d *and* drained — the signal for a worker
    /// to exit cleanly.
    pub async fn recv(&self) -> Option<T> {
        match self.items.clone().acquire_owned().await {
            Ok(permit) => {
                permit.forget(); // the permit's item is now ours to pop
                self.inner.q.lock().unwrap().pop_front()
            }
            Err(_) => {
                // Closed. Drain whatever is still buffered, then stop.
                self.inner.q.lock().unwrap().pop_front()
            }
        }
    }

    /// Stop intake and wake every blocked consumer. Buffered items are
    /// still drainable via [`recv`](Self::recv) until empty.
    pub fn close(&self) {
        self.items.close();
    }

    pub fn dropped(&self) -> u64 {
        self.inner.dropped.load(Ordering::Relaxed)
    }

    pub fn len(&self) -> usize {
        self.inner.q.lock().unwrap().len()
    }

    pub fn is_empty(&self) -> bool {
        self.len() == 0
    }
}

/// Run summary. The conservation law `produced == processed + dropped`
/// must always hold — it is asserted in tests and the demo.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct Stats {
    pub produced: u64,
    pub processed: u64,
    pub dropped: u64,
    pub workers_joined: usize,
    pub aborted: usize,
}

/// Configuration for one pipeline run.
pub struct Config {
    pub workers: usize,
    pub capacity: usize,
    pub policy: DropPolicy,
    pub total_items: u64,
    /// Per-item simulated work. Make it larger than the produce interval
    /// to force the buffer to fill and the shed policy to engage.
    pub work: Duration,
    /// Hard ceiling for draining the backlog on shutdown before
    /// stragglers are aborted.
    pub drain_deadline: Duration,
}

/// Spawn the producer + `workers` consumers, run to completion (or until
/// `external_stop` fires), then shut down in the correct order and
/// assert nothing leaked.
pub async fn run(cfg: Config, external_stop: CancellationToken) -> Stats {
    let queue = BoundedFanout::<u64>::new(cfg.capacity, cfg.policy);
    let processed = Arc::new(AtomicU64::new(0));
    let produced = Arc::new(AtomicU64::new(0));

    // Intake is a child of the external stop: cancelling the parent
    // cascades and stops production immediately.
    let intake = external_stop.child_token();

    // Producer.
    let producer = {
        let q = queue.clone();
        let produced = produced.clone();
        let intake = intake.clone();
        tokio::spawn(async move {
            for id in 0..cfg.total_items {
                if intake.is_cancelled() {
                    break;
                }
                q.push(id);
                produced.fetch_add(1, Ordering::Relaxed);
                // Produce faster than a worker can drain → overload.
                tokio::task::yield_now().await;
            }
            // Intake done: closing the queue lets workers drain the
            // bounded backlog and then see `None`.
            q.close();
        })
    };

    // Workers.
    let mut set = JoinSet::new();
    for _ in 0..cfg.workers {
        let q = queue.clone();
        let processed = processed.clone();
        let work = cfg.work;
        set.spawn(async move {
            let mut n = 0u64;
            while let Some(_item) = q.recv().await {
                if !work.is_zero() {
                    tokio::time::sleep(work).await;
                }
                processed.fetch_add(1, Ordering::Relaxed);
                n += 1;
            }
            n
        });
    }

    // If an external stop arrives before natural completion, force the
    // shutdown path: stop intake, close the queue.
    let stop_watch = {
        let q = queue.clone();
        let intake = intake.clone();
        tokio::spawn(async move {
            external_stop.cancelled().await;
            intake.cancel();
            q.close();
        })
    };

    let _ = producer.await;

    // Drain with a bounded deadline. Stragglers past the deadline are
    // aborted rather than allowed to hang shutdown forever.
    let mut joined = 0usize;
    let mut aborted = 0usize;
    let deadline = tokio::time::sleep(cfg.drain_deadline);
    tokio::pin!(deadline);
    loop {
        tokio::select! {
            res = set.join_next() => match res {
                Some(Ok(_)) => joined += 1,
                Some(Err(_)) => aborted += 1, // join error == was aborted
                None => break,                // every worker accounted for
            },
            _ = &mut deadline => {
                queue.close(); // ensure no worker is parked forever
                set.abort_all();
            }
        }
    }
    stop_watch.abort();

    Stats {
        produced: produced.load(Ordering::Relaxed),
        processed: processed.load(Ordering::Relaxed),
        dropped: queue.dropped(),
        workers_joined: joined,
        aborted,
    }
}

src/main.rs (raw)

//! Runnable demo for `backpressure-pipeline`.
//!
//! Runs the SAME overload scenario twice — once with `DropNewest`, once
//! with `DropOldest` — with a fast producer, slow workers and a small
//! buffer so the shed policy is forced to engage. It prints a comparison
//! table and asserts the conservation law `produced == processed +
//! dropped` plus "every worker joined, nothing leaked".
//!
//! Terminates on its own. Run: `cargo run --bin demo`

use backpressure_pipeline::{run, BoundedFanout, Config, DropPolicy, Stats};
use std::time::Duration;
use tokio_util::sync::CancellationToken;

/// Intuitive, fully deterministic illustration: a buffer of 4, eight
/// items pushed (0..8) with nobody consuming yet — which four survive?
async fn illustrate(policy: DropPolicy) -> Vec<u64> {
    let q = BoundedFanout::<u64>::new(4, policy);
    for id in 0..8 {
        q.push(id);
    }
    q.close();
    let mut kept = Vec::new();
    while let Some(v) = q.recv().await {
        kept.push(v);
    }
    kept
}

async fn scenario(policy: DropPolicy) -> Stats {
    run(
        Config {
            workers: 4,
            capacity: 16,
            policy,
            total_items: 4_000,
            work: Duration::from_micros(200), // slower than production → overload
            drain_deadline: Duration::from_secs(5),
        },
        CancellationToken::new(), // not triggered: run to natural completion
    )
    .await
}

#[tokio::main]
async fn main() {
    println!("== policy illustration: buffer=4, push ids 0..8, then drain ==");
    println!(
        "  DropNewest keeps {:?}  (newest arrivals shed; backlog order preserved)",
        illustrate(DropPolicy::DropNewest).await
    );
    println!(
        "  DropOldest keeps {:?}  (oldest shed; always the freshest state)",
        illustrate(DropPolicy::DropOldest).await
    );

    println!();
    println!("== overload stress: 4000-item burst, buffer=16, 4 slow workers ==");
    println!(
        "{:<12} {:>9} {:>10} {:>8} {:>8} {:>8}",
        "policy", "produced", "processed", "dropped", "joined", "aborted"
    );
    println!("{}", "-".repeat(60));

    for policy in [DropPolicy::DropNewest, DropPolicy::DropOldest] {
        let s = scenario(policy).await;
        println!(
            "{:<12} {:>9} {:>10} {:>8} {:>8} {:>8}",
            format!("{policy:?}"),
            s.produced,
            s.processed,
            s.dropped,
            s.workers_joined,
            s.aborted
        );

        // The two guarantees this project exists to prove:
        assert_eq!(
            s.produced,
            s.processed + s.dropped,
            "conservation law must hold ({policy:?})"
        );
        assert_eq!(s.workers_joined, 4, "every worker must be joined");
        assert_eq!(s.aborted, 0, "no straggler had to be force-aborted");
        assert!(s.dropped > 0, "the scenario is meant to overload and shed");
    }

    println!("{}", "-".repeat(60));
    println!("conservation held for both policies; no tasks leaked. demo OK.");
}

tests/pipeline.rs (raw)

//! Tests: the deterministic policy semantics, the conservation law, the
//! no-leak guarantee, and external-stop behaviour.

use backpressure_pipeline::{run, BoundedFanout, Config, DropPolicy};
use std::time::Duration;
use tokio_util::sync::CancellationToken;

/// Fully deterministic policy proof: fill capacity, over-push by 3 with
/// nobody consuming, then drain and inspect which ids survived.
#[tokio::test]
async fn drop_newest_keeps_oldest_items() {
    let q = BoundedFanout::<u64>::new(4, DropPolicy::DropNewest);
    for id in 0..7 {
        q.push(id); // 0..4 enqueued; 4,5,6 dropped (newest)
    }
    q.close();
    let mut got = Vec::new();
    while let Some(v) = q.recv().await {
        got.push(v);
    }
    assert_eq!(got, vec![0, 1, 2, 3]);
    assert_eq!(q.dropped(), 3);
}

#[tokio::test]
async fn drop_oldest_keeps_newest_items() {
    let q = BoundedFanout::<u64>::new(4, DropPolicy::DropOldest);
    for id in 0..7 {
        q.push(id); // each over-push evicts the front: keeps 3,4,5,6
    }
    q.close();
    let mut got = Vec::new();
    while let Some(v) = q.recv().await {
        got.push(v);
    }
    assert_eq!(got, vec![3, 4, 5, 6]);
    assert_eq!(q.dropped(), 3);
}

/// `recv` returns `None` only after close *and* full drain — a worker
/// must never exit while items remain.
#[tokio::test]
async fn close_drains_before_none() {
    let q = BoundedFanout::<u64>::new(8, DropPolicy::DropNewest);
    for id in 0..5 {
        q.push(id);
    }
    q.close();
    for expect in 0..5 {
        assert_eq!(q.recv().await, Some(expect));
    }
    assert_eq!(q.recv().await, None);
}

fn cfg(policy: DropPolicy, total: u64) -> Config {
    Config {
        workers: 4,
        capacity: 16,
        policy,
        total_items: total,
        work: Duration::from_micros(100),
        drain_deadline: Duration::from_secs(5),
    }
}

/// Conservation + no-leak for both policies under real overload.
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn conservation_and_no_leak() {
    for policy in [DropPolicy::DropNewest, DropPolicy::DropOldest] {
        let s = run(cfg(policy, 3_000), CancellationToken::new()).await;
        assert_eq!(s.produced, s.processed + s.dropped, "{policy:?}");
        assert_eq!(s.produced, 3_000);
        assert_eq!(s.workers_joined, 4, "all workers joined ({policy:?})");
        assert_eq!(s.aborted, 0, "nothing force-aborted ({policy:?})");
    }
}

/// An external cancel stops intake early, yet conservation still holds
/// and every worker is still joined (structured, not detached).
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn external_stop_is_clean() {
    let stop = CancellationToken::new();
    let s_handle = {
        let stop = stop.clone();
        tokio::spawn(async move { run(cfg(DropPolicy::DropOldest, 1_000_000), stop).await })
    };
    tokio::time::sleep(Duration::from_millis(30)).await;
    stop.cancel();

    let s = s_handle.await.unwrap();
    assert!(s.produced < 1_000_000, "intake stopped early");
    assert_eq!(s.produced, s.processed + s.dropped, "conservation under stop");
    assert_eq!(s.workers_joined, 4, "workers still joined after stop");
}