Personal systems project · Rust / tokio · source below, raw files linked per section
← all projects# backpressure-pipeline **A bounded fan-out pipeline with an explicit load-shed policy and a leak-free structured shutdown.** A self-contained building block for the common shape "one fast producer → N async workers" where overload must degrade *predictably* instead of crashing. ## The problem A producer feeding N async workers has exactly one correct behaviour under overload, and two common bugs: - **Unbounded queue.** `mpsc::unbounded_channel()` applies no back-pressure, so a slow consumer turns a fast producer into an OOM — a crash, not a slowdown. - **Implicit / wrong drop policy.** When the buffer is full you *must* choose what to discard. A **position/state stream** wants the freshest sample → drop the **oldest**. An **event/command stream** needs order and visibility of loss → drop the **newest** (and alarm). A bounded `tokio::mpsc` only offers drop-newest (`try_send` → `Full`); drop-oldest needs a real ring. There is also a shutdown sequence people get wrong: *stop intake → close the queue → let workers drain the bounded backlog → bounded deadline → abort stragglers*. Detached tasks that are never joined leak work. ## The approach A bounded MPMC hand-off queue built as a `Semaphore` (permits == items available) over a `Mutex<VecDeque>`. Permit accounting is exact — a held permit *is* a reserved item — so there is no lost-wakeup reasoning to get wrong. The shed policy (`DropNewest` / `DropOldest`) is a required, explicit choice. Shutdown is structured via a `CancellationToken` tree and a `JoinSet`; **every** spawned task is joined and the run reports `workers_joined` / `aborted` so leaks are observable. The conservation law `produced == processed + dropped` is asserted in the demo and the tests. ## Run it ```bash cargo run --bin demo # policy illustration + overload stress + conservation cargo test # deterministic policy + conservation + no-leak tests ``` Sample demo output: ``` == policy illustration: buffer=4, push ids 0..8, then drain == DropNewest keeps [0, 1, 2, 3] (newest arrivals shed; backlog order preserved) DropOldest keeps [4, 5, 6, 7] (oldest shed; always the freshest state) == overload stress: 4000-item burst, buffer=16, 4 slow workers == policy produced processed dropped joined aborted DropNewest 4000 56 3944 4 0 DropOldest 4000 80 3920 4 0 conservation held for both policies; no tasks leaked. demo OK. ``` ## What the tests prove - `drop_newest_keeps_oldest_items` / `drop_oldest_keeps_newest_items` — fully deterministic proof of each policy's semantics. - `close_drains_before_none` — a worker never exits while items remain. - `conservation_and_no_leak` — under real overload, both policies satisfy `produced == processed + dropped` and every worker is joined. - `external_stop_is_clean` — an external cancel stops intake early, yet conservation still holds and all workers are still joined (structured, not detached). ## License MIT OR Apache-2.0.
[package]
name = "backpressure-pipeline"
version = "0.1.0"
edition = "2021"
description = "A bounded fan-out pipeline with explicit load-shed policy and leak-free structured shutdown."
license = "MIT OR Apache-2.0"
publish = false
[dependencies]
tokio = { version = "1", features = ["rt-multi-thread", "macros", "sync", "time", "signal"] }
tokio-util = { version = "0.7", features = ["rt"] }
[dev-dependencies]
tokio = { version = "1", features = ["rt-multi-thread", "macros", "sync", "time", "test-util"] }
[[bin]]
name = "demo"
path = "src/main.rs"
[lib]
name = "backpressure_pipeline"
path = "src/lib.rs"
//! # backpressure-pipeline
//!
//! A bounded fan-out work pipeline with an **explicit, chosen** load-shed
//! policy and a **leak-free structured shutdown**.
//!
//! ## The problem this solves
//!
//! A producer feeding N async workers has exactly one correct behaviour
//! under overload, and two common bugs:
//!
//! * **Bug 1 — unbounded queue.** `mpsc::unbounded_channel()` never
//! applies back-pressure, so a slow consumer turns a fast producer
//! into an OOM. On a ground station ingesting drone telemetry that is
//! a crash, not a slowdown.
//! * **Bug 2 — implicit / wrong drop policy.** When the buffer is full
//! you *must* decide what to discard. For a **position stream** the
//! freshest sample matters → drop the *oldest*. For an **event/command
//! stream** every item matters in order → drop the *newest* (and
//! alarm). A bounded `tokio::mpsc` only gives you drop-newest
//! (`try_send` → `Full`); drop-oldest needs a real ring. This crate
//! makes the choice explicit and correct for both.
//!
//! It also shows the shutdown sequence people get wrong: *stop intake →
//! close the queue → let workers drain the bounded backlog → bounded
//! deadline → abort stragglers* — and asserts **every spawned task is
//! joined**, so nothing is detached and leaked.
//!
//! The bounded queue is a `Semaphore` (permits == items available) over
//! a `Mutex<VecDeque>`. Permit accounting is exact, so there is no
//! lost-wakeup reasoning to get wrong — see [`BoundedFanout`].
use std::collections::VecDeque;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, Mutex};
use std::time::Duration;
use tokio::task::JoinSet;
use tokio_util::sync::CancellationToken;
/// What to discard when the buffer is full. There is no safe default —
/// the caller must pick based on the stream's semantics.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum DropPolicy {
/// Keep the backlog, discard the just-arrived item (events/commands:
/// order matters, loss must be visible).
DropNewest,
/// Discard the oldest queued item to make room (position/state:
/// only the freshest sample matters).
DropOldest,
}
struct Inner<T> {
q: Mutex<VecDeque<T>>,
cap: usize,
policy: DropPolicy,
dropped: AtomicU64,
}
/// A bounded MPMC hand-off queue.
///
/// Invariant: `items.available_permits()` + (permits held by consumers
/// mid-`recv`) == `q.len()`. Every `push` that enqueues adds exactly one
/// permit; every successful `recv` consumes exactly one and pops exactly
/// one. A drop-oldest eviction removes one and adds one → permit count
/// unchanged. Because a held permit *is* a reserved item, a consumer
/// that acquired a permit is guaranteed a `Some` — no lost wakeups.
#[derive(Clone)]
pub struct BoundedFanout<T> {
inner: Arc<Inner<T>>,
items: Arc<tokio::sync::Semaphore>,
}
impl<T> BoundedFanout<T> {
pub fn new(cap: usize, policy: DropPolicy) -> Self {
assert!(cap > 0, "capacity must be > 0");
Self {
inner: Arc::new(Inner {
q: Mutex::new(VecDeque::with_capacity(cap)),
cap,
policy,
dropped: AtomicU64::new(0),
}),
items: Arc::new(tokio::sync::Semaphore::new(0)),
}
}
/// Enqueue, applying the shed policy if full. Never blocks, never
/// allocates past `cap`, never panics. Returns `true` if the item
/// was enqueued, `false` if it (or an older item) was shed.
pub fn push(&self, item: T) -> bool {
let mut q = self.inner.q.lock().unwrap();
if q.len() < self.inner.cap {
q.push_back(item);
drop(q);
self.items.add_permits(1);
true
} else {
match self.inner.policy {
DropPolicy::DropNewest => {
self.inner.dropped.fetch_add(1, Ordering::Relaxed);
false
}
DropPolicy::DropOldest => {
q.pop_front();
q.push_back(item);
// permit count intentionally unchanged: -1 +1.
self.inner.dropped.fetch_add(1, Ordering::Relaxed);
false
}
}
}
}
/// Await the next item. Returns `None` once the queue is
/// [`close`](Self::close)d *and* drained — the signal for a worker
/// to exit cleanly.
pub async fn recv(&self) -> Option<T> {
match self.items.clone().acquire_owned().await {
Ok(permit) => {
permit.forget(); // the permit's item is now ours to pop
self.inner.q.lock().unwrap().pop_front()
}
Err(_) => {
// Closed. Drain whatever is still buffered, then stop.
self.inner.q.lock().unwrap().pop_front()
}
}
}
/// Stop intake and wake every blocked consumer. Buffered items are
/// still drainable via [`recv`](Self::recv) until empty.
pub fn close(&self) {
self.items.close();
}
pub fn dropped(&self) -> u64 {
self.inner.dropped.load(Ordering::Relaxed)
}
pub fn len(&self) -> usize {
self.inner.q.lock().unwrap().len()
}
pub fn is_empty(&self) -> bool {
self.len() == 0
}
}
/// Run summary. The conservation law `produced == processed + dropped`
/// must always hold — it is asserted in tests and the demo.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct Stats {
pub produced: u64,
pub processed: u64,
pub dropped: u64,
pub workers_joined: usize,
pub aborted: usize,
}
/// Configuration for one pipeline run.
pub struct Config {
pub workers: usize,
pub capacity: usize,
pub policy: DropPolicy,
pub total_items: u64,
/// Per-item simulated work. Make it larger than the produce interval
/// to force the buffer to fill and the shed policy to engage.
pub work: Duration,
/// Hard ceiling for draining the backlog on shutdown before
/// stragglers are aborted.
pub drain_deadline: Duration,
}
/// Spawn the producer + `workers` consumers, run to completion (or until
/// `external_stop` fires), then shut down in the correct order and
/// assert nothing leaked.
pub async fn run(cfg: Config, external_stop: CancellationToken) -> Stats {
let queue = BoundedFanout::<u64>::new(cfg.capacity, cfg.policy);
let processed = Arc::new(AtomicU64::new(0));
let produced = Arc::new(AtomicU64::new(0));
// Intake is a child of the external stop: cancelling the parent
// cascades and stops production immediately.
let intake = external_stop.child_token();
// Producer.
let producer = {
let q = queue.clone();
let produced = produced.clone();
let intake = intake.clone();
tokio::spawn(async move {
for id in 0..cfg.total_items {
if intake.is_cancelled() {
break;
}
q.push(id);
produced.fetch_add(1, Ordering::Relaxed);
// Produce faster than a worker can drain → overload.
tokio::task::yield_now().await;
}
// Intake done: closing the queue lets workers drain the
// bounded backlog and then see `None`.
q.close();
})
};
// Workers.
let mut set = JoinSet::new();
for _ in 0..cfg.workers {
let q = queue.clone();
let processed = processed.clone();
let work = cfg.work;
set.spawn(async move {
let mut n = 0u64;
while let Some(_item) = q.recv().await {
if !work.is_zero() {
tokio::time::sleep(work).await;
}
processed.fetch_add(1, Ordering::Relaxed);
n += 1;
}
n
});
}
// If an external stop arrives before natural completion, force the
// shutdown path: stop intake, close the queue.
let stop_watch = {
let q = queue.clone();
let intake = intake.clone();
tokio::spawn(async move {
external_stop.cancelled().await;
intake.cancel();
q.close();
})
};
let _ = producer.await;
// Drain with a bounded deadline. Stragglers past the deadline are
// aborted rather than allowed to hang shutdown forever.
let mut joined = 0usize;
let mut aborted = 0usize;
let deadline = tokio::time::sleep(cfg.drain_deadline);
tokio::pin!(deadline);
loop {
tokio::select! {
res = set.join_next() => match res {
Some(Ok(_)) => joined += 1,
Some(Err(_)) => aborted += 1, // join error == was aborted
None => break, // every worker accounted for
},
_ = &mut deadline => {
queue.close(); // ensure no worker is parked forever
set.abort_all();
}
}
}
stop_watch.abort();
Stats {
produced: produced.load(Ordering::Relaxed),
processed: processed.load(Ordering::Relaxed),
dropped: queue.dropped(),
workers_joined: joined,
aborted,
}
}
//! Runnable demo for `backpressure-pipeline`.
//!
//! Runs the SAME overload scenario twice — once with `DropNewest`, once
//! with `DropOldest` — with a fast producer, slow workers and a small
//! buffer so the shed policy is forced to engage. It prints a comparison
//! table and asserts the conservation law `produced == processed +
//! dropped` plus "every worker joined, nothing leaked".
//!
//! Terminates on its own. Run: `cargo run --bin demo`
use backpressure_pipeline::{run, BoundedFanout, Config, DropPolicy, Stats};
use std::time::Duration;
use tokio_util::sync::CancellationToken;
/// Intuitive, fully deterministic illustration: a buffer of 4, eight
/// items pushed (0..8) with nobody consuming yet — which four survive?
async fn illustrate(policy: DropPolicy) -> Vec<u64> {
let q = BoundedFanout::<u64>::new(4, policy);
for id in 0..8 {
q.push(id);
}
q.close();
let mut kept = Vec::new();
while let Some(v) = q.recv().await {
kept.push(v);
}
kept
}
async fn scenario(policy: DropPolicy) -> Stats {
run(
Config {
workers: 4,
capacity: 16,
policy,
total_items: 4_000,
work: Duration::from_micros(200), // slower than production → overload
drain_deadline: Duration::from_secs(5),
},
CancellationToken::new(), // not triggered: run to natural completion
)
.await
}
#[tokio::main]
async fn main() {
println!("== policy illustration: buffer=4, push ids 0..8, then drain ==");
println!(
" DropNewest keeps {:?} (newest arrivals shed; backlog order preserved)",
illustrate(DropPolicy::DropNewest).await
);
println!(
" DropOldest keeps {:?} (oldest shed; always the freshest state)",
illustrate(DropPolicy::DropOldest).await
);
println!();
println!("== overload stress: 4000-item burst, buffer=16, 4 slow workers ==");
println!(
"{:<12} {:>9} {:>10} {:>8} {:>8} {:>8}",
"policy", "produced", "processed", "dropped", "joined", "aborted"
);
println!("{}", "-".repeat(60));
for policy in [DropPolicy::DropNewest, DropPolicy::DropOldest] {
let s = scenario(policy).await;
println!(
"{:<12} {:>9} {:>10} {:>8} {:>8} {:>8}",
format!("{policy:?}"),
s.produced,
s.processed,
s.dropped,
s.workers_joined,
s.aborted
);
// The two guarantees this project exists to prove:
assert_eq!(
s.produced,
s.processed + s.dropped,
"conservation law must hold ({policy:?})"
);
assert_eq!(s.workers_joined, 4, "every worker must be joined");
assert_eq!(s.aborted, 0, "no straggler had to be force-aborted");
assert!(s.dropped > 0, "the scenario is meant to overload and shed");
}
println!("{}", "-".repeat(60));
println!("conservation held for both policies; no tasks leaked. demo OK.");
}
//! Tests: the deterministic policy semantics, the conservation law, the
//! no-leak guarantee, and external-stop behaviour.
use backpressure_pipeline::{run, BoundedFanout, Config, DropPolicy};
use std::time::Duration;
use tokio_util::sync::CancellationToken;
/// Fully deterministic policy proof: fill capacity, over-push by 3 with
/// nobody consuming, then drain and inspect which ids survived.
#[tokio::test]
async fn drop_newest_keeps_oldest_items() {
let q = BoundedFanout::<u64>::new(4, DropPolicy::DropNewest);
for id in 0..7 {
q.push(id); // 0..4 enqueued; 4,5,6 dropped (newest)
}
q.close();
let mut got = Vec::new();
while let Some(v) = q.recv().await {
got.push(v);
}
assert_eq!(got, vec![0, 1, 2, 3]);
assert_eq!(q.dropped(), 3);
}
#[tokio::test]
async fn drop_oldest_keeps_newest_items() {
let q = BoundedFanout::<u64>::new(4, DropPolicy::DropOldest);
for id in 0..7 {
q.push(id); // each over-push evicts the front: keeps 3,4,5,6
}
q.close();
let mut got = Vec::new();
while let Some(v) = q.recv().await {
got.push(v);
}
assert_eq!(got, vec![3, 4, 5, 6]);
assert_eq!(q.dropped(), 3);
}
/// `recv` returns `None` only after close *and* full drain — a worker
/// must never exit while items remain.
#[tokio::test]
async fn close_drains_before_none() {
let q = BoundedFanout::<u64>::new(8, DropPolicy::DropNewest);
for id in 0..5 {
q.push(id);
}
q.close();
for expect in 0..5 {
assert_eq!(q.recv().await, Some(expect));
}
assert_eq!(q.recv().await, None);
}
fn cfg(policy: DropPolicy, total: u64) -> Config {
Config {
workers: 4,
capacity: 16,
policy,
total_items: total,
work: Duration::from_micros(100),
drain_deadline: Duration::from_secs(5),
}
}
/// Conservation + no-leak for both policies under real overload.
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn conservation_and_no_leak() {
for policy in [DropPolicy::DropNewest, DropPolicy::DropOldest] {
let s = run(cfg(policy, 3_000), CancellationToken::new()).await;
assert_eq!(s.produced, s.processed + s.dropped, "{policy:?}");
assert_eq!(s.produced, 3_000);
assert_eq!(s.workers_joined, 4, "all workers joined ({policy:?})");
assert_eq!(s.aborted, 0, "nothing force-aborted ({policy:?})");
}
}
/// An external cancel stops intake early, yet conservation still holds
/// and every worker is still joined (structured, not detached).
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn external_stop_is_clean() {
let stop = CancellationToken::new();
let s_handle = {
let stop = stop.clone();
tokio::spawn(async move { run(cfg(DropPolicy::DropOldest, 1_000_000), stop).await })
};
tokio::time::sleep(Duration::from_millis(30)).await;
stop.cancel();
let s = s_handle.await.unwrap();
assert!(s.produced < 1_000_000, "intake stopped early");
assert_eq!(s.produced, s.processed + s.dropped, "conservation under stop");
assert_eq!(s.workers_joined, 4, "workers still joined after stop");
}