framed-codec

Personal systems project · Rust / tokio · source below, raw files linked per section

← all projects

README.md (raw)

# framed-codec

**Cancel-safe, zero-allocation-per-frame length-prefixed framing for tokio byte streams.**

A small, self-contained building block for streaming binary telemetry
(drone/robot links, sensor feeds, any binary wire protocol) over
anything that is `AsyncRead + AsyncWrite` — TCP, a Unix socket, or an
in-process pipe.

## The problem

Two bugs recur in hand-rolled async framing code:

1. **Torn frames across packet boundaries.** A `read()` returns whatever
   bytes happened to arrive — frequently *half* a frame. Code that
   assumes "one read = one frame" silently corrupts the stream.
2. **Lost data on cancellation.** `read_exact()` into a stack buffer
   inside a `tokio::select!` is **not cancel-safe**: if another branch
   wins, the partially-read bytes on the stack are discarded and the
   stream is desynchronised permanently.

## The approach

A length-prefixed [`tokio_util::codec`] `Decoder`/`Encoder`. All
partial-read state lives inside the `Framed` buffer, never on the stack,
so the read future (`StreamExt::next`) **is** cancel-safe — a `select!`
can drop it mid-frame and the next poll resumes exactly where it left
off. The decoder validates the declared length *before* indexing, so a
hostile or corrupt length is a clean protocol error, never a panic or an
OOM. Decoding a frame produces a `Copy` value with no per-frame heap
allocation.

`run_link` is the production-shaped read loop: decode frames, reset a
liveness heartbeat on every frame, and race the read against a heartbeat
deadline and a `CancellationToken` — mapping silence to a safe-state
transition instead of an indefinite hang.

## Run it

```bash
cargo run --bin demo     # end-to-end: split frame + heartbeat → safe state
cargo test               # unit + integration + property tests
```

The demo uses an in-process `tokio::io::duplex` pipe (no sockets, no
ports — runs anywhere). It deliberately splits one frame across two
writes with a stall between them to prove the receiver reconstructs it
intact, then goes silent to show the heartbeat tripping a safe state.

## What the tests prove

- `decodes_only_when_complete_even_byte_by_byte` — a frame fed one byte
  at a time decodes exactly once, on the final byte.
- `many_frames_arbitrary_chunking` — 50 frames fed in 7-byte chunks all
  reconstruct in order.
- `rejects_out_of_bounds_length_without_panic` — a ~4 GiB declared
  length is a clean `InvalidData` error.
- `no_torn_frame_across_select_cancellation` — the headline guarantee: a
  frame split across the wire, with a `select!` that cancels the read
  mid-frame, still decodes intact.
- `roundtrip_any_split` (proptest) — any frame, split at any boundary,
  round-trips bit-exact.

## License

MIT OR Apache-2.0.

Cargo.toml (raw)

[package]
name = "framed-codec"
version = "0.1.0"
edition = "2021"
description = "Cancel-safe, zero-allocation-per-frame length-prefixed framing for tokio byte streams."
license = "MIT OR Apache-2.0"
publish = false

[dependencies]
tokio = { version = "1", features = ["rt-multi-thread", "macros", "io-util", "time", "sync"] }
tokio-util = { version = "0.7", features = ["codec", "rt"] }
bytes = "1"
futures = "0.3"

[dev-dependencies]
proptest = "1"
tokio = { version = "1", features = ["rt-multi-thread", "macros", "io-util", "time", "sync", "test-util"] }

[[bin]]
name = "demo"
path = "src/main.rs"

[lib]
name = "framed_codec"
path = "src/lib.rs"

src/lib.rs (raw)

//! # framed-codec
//!
//! A cancel-safe, zero-allocation-per-frame length-prefixed codec for
//! streaming binary telemetry over a tokio byte stream (TCP, a UDS, an
//! in-process duplex, anything `AsyncRead + AsyncWrite`).
//!
//! ## The problem this solves
//!
//! Two bugs show up again and again in hand-rolled async framing code:
//!
//! 1. **Torn frames across packet boundaries.** A `read()` returns
//!    whatever bytes happen to have arrived — often *half* a frame. Naive
//!    code that assumes one read == one frame corrupts the stream.
//! 2. **Lost data on cancellation.** If you `read_exact()` into a
//!    stack buffer inside a `tokio::select!` and another branch wins, the
//!    partially-read bytes on the stack are dropped on the floor and the
//!    stream is desynchronised forever. `read_exact` is **not
//!    cancel-safe**.
//!
//! This crate fixes both by keeping all partial-read state inside a
//! [`tokio_util::codec::Framed`] buffer instead of on the stack. The
//! frame future ([`futures::StreamExt::next`]) is cancel-safe: if a
//! `select!` drops it mid-frame, the bytes already pulled from the
//! socket stay buffered and the next poll resumes exactly where it left
//! off. No torn frames, no desync, no per-frame heap allocation.
//!
//! See [`run_link`] for the production-shaped read loop, and the
//! `demo` binary for a runnable end-to-end example.

use bytes::{Buf, BufMut, BytesMut};
use std::io;
use std::time::Duration;
use tokio::io::{AsyncRead, AsyncWrite};
use tokio_util::codec::{Decoder, Encoder};

/// Fixed telemetry payload: `seq(u32) ts_us(u64) pitch/roll/yaw(f32)`,
/// big-endian, 24 bytes. `Copy` — decoding one produces a value, never a
/// heap allocation.
pub const PAYLOAD_LEN: usize = 4 + 8 + 4 + 4 + 4; // = 24
const HEADER_LEN: usize = 4; // u32 big-endian length prefix

/// Reject absurd declared lengths *before* indexing the buffer. An
/// attacker (or a corrupt link) sending a 4 GiB length must not panic or
/// OOM the process — it must be a clean protocol error.
pub const MAX_FRAME: usize = 64 * 1024;

/// One decoded telemetry sample. `Copy`, no owned heap data.
#[derive(Debug, Clone, Copy, PartialEq)]
pub struct Telemetry {
    pub seq: u32,
    pub ts_us: u64,
    pub pitch: f32,
    pub roll: f32,
    pub yaw: f32,
}

impl Telemetry {
    fn write_payload(&self, dst: &mut BytesMut) {
        dst.put_u32(self.seq);
        dst.put_u64(self.ts_us);
        dst.put_f32(self.pitch);
        dst.put_f32(self.roll);
        dst.put_f32(self.yaw);
    }

    /// `src` is exactly [`PAYLOAD_LEN`] bytes. Reads fields directly out
    /// of the framing buffer — no intermediate allocation.
    fn read_payload(mut src: impl Buf) -> Telemetry {
        Telemetry {
            seq: src.get_u32(),
            ts_us: src.get_u64(),
            pitch: src.get_f32(),
            roll: src.get_f32(),
            yaw: src.get_f32(),
        }
    }
}

/// Length-prefixed codec: `[u32 BE length][payload]`.
///
/// The length prefix is what makes this robust to a payload split
/// across N socket reads: the decoder simply returns `Ok(None)` until
/// the full frame is buffered, and `Framed` retains the partial bytes.
#[derive(Debug, Default, Clone)]
pub struct TelemetryCodec;

impl Decoder for TelemetryCodec {
    type Item = Telemetry;
    type Error = io::Error;

    fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Telemetry>, io::Error> {
        if src.len() < HEADER_LEN {
            return Ok(None); // header not fully arrived yet — wait
        }
        let len = u32::from_be_bytes(src[..HEADER_LEN].try_into().unwrap()) as usize;

        // Validate BEFORE using `len` to index. A bad length is a
        // protocol error, never a panic / never an OOM.
        if len == 0 || len > MAX_FRAME {
            return Err(io::Error::new(
                io::ErrorKind::InvalidData,
                format!("declared frame length {len} out of bounds"),
            ));
        }
        if src.len() < HEADER_LEN + len {
            // Body not fully arrived. Reserve once so steady-state has
            // no per-frame realloc, keep the partial bytes, and wait.
            src.reserve(HEADER_LEN + len - src.len());
            return Ok(None);
        }

        src.advance(HEADER_LEN); // consume the prefix
        let body = src.split_to(len); // O(1) refcount split — not a payload copy
        if len != PAYLOAD_LEN {
            return Err(io::Error::new(
                io::ErrorKind::InvalidData,
                format!("unexpected payload size {len}, want {PAYLOAD_LEN}"),
            ));
        }
        Ok(Some(Telemetry::read_payload(body)))
    }
}

impl Encoder<Telemetry> for TelemetryCodec {
    type Error = io::Error;

    fn encode(&mut self, item: Telemetry, dst: &mut BytesMut) -> Result<(), io::Error> {
        dst.reserve(HEADER_LEN + PAYLOAD_LEN);
        dst.put_u32(PAYLOAD_LEN as u32);
        item.write_payload(dst);
        Ok(())
    }
}

/// Outcome of [`run_link`]. A real C2 caller maps `HeartbeatTimeout` to
/// a failsafe / safe-state transition.
#[derive(Debug, PartialEq, Eq)]
pub enum LinkEvent {
    /// Peer closed the connection cleanly.
    PeerClosed,
    /// No frame for `heartbeat` — the link is presumed dead.
    HeartbeatTimeout,
    /// A cooperative shutdown was requested.
    ShutdownRequested,
}

/// Production-shaped read loop: decode frames, reset a liveness
/// heartbeat on every frame, and race the read against both a
/// heartbeat deadline and a shutdown signal.
///
/// The whole point: when the heartbeat or shutdown branch wins, the
/// in-flight `framed.next()` future is dropped — but every byte it had
/// already pulled from the socket is still inside `framed`'s buffer, so
/// nothing is torn or lost. Contrast the *broken* version:
///
/// ```ignore
/// // WRONG — read_exact is NOT cancel-safe:
/// tokio::select! {
///     _ = sock.read_exact(&mut buf) => { /* ... */ }   // partial buf
///     _ = deadline => { /* buf bytes silently lost; stream desynced */ }
/// }
/// ```
pub async fn run_link<S>(
    stream: S,
    heartbeat: Duration,
    shutdown: tokio_util::sync::CancellationToken,
    mut on_frame: impl FnMut(Telemetry),
) -> io::Result<LinkEvent>
where
    S: AsyncRead + AsyncWrite + Unpin,
{
    use futures::StreamExt;
    let mut framed = tokio_util::codec::Framed::new(stream, TelemetryCodec);

    loop {
        // Fresh deadline each iteration => the heartbeat resets every
        // time a frame arrives. Cancel-safe: sleep holds no data.
        let deadline = tokio::time::sleep(heartbeat);
        tokio::pin!(deadline);

        tokio::select! {
            // `biased` is safe here: every branch *pends* until its
            // event. (A `watch` whose sender was dropped would resolve
            // immediately forever and, under `biased`, spin — which is
            // exactly why this uses a CancellationToken, not a watch.)
            biased;
            _ = shutdown.cancelled() => return Ok(LinkEvent::ShutdownRequested),
            frame = framed.next() => match frame {
                Some(Ok(f)) => on_frame(f),
                Some(Err(e)) => return Err(e),
                None => return Ok(LinkEvent::PeerClosed),
            },
            _ = &mut deadline => return Ok(LinkEvent::HeartbeatTimeout),
        }
    }
}

/// Encode one frame to wire bytes — used by the demo and tests to
/// produce deliberately-split payloads.
pub fn encode_frame(t: Telemetry) -> BytesMut {
    let mut buf = BytesMut::new();
    TelemetryCodec.encode(t, &mut buf).expect("encode is infallible");
    buf
}

src/main.rs (raw)

//! Runnable end-to-end demo for `framed-codec`.
//!
//! Uses an in-process `tokio::io::duplex` pipe (no sockets, no ports —
//! runs anywhere) to show three things:
//!
//!  1. Normal framed send/receive.
//!  2. A frame deliberately split across two writes with a pause in
//!     between — the receiver still reconstructs it intact (no torn
//!     frame across a packet boundary).
//!  3. The producer goes silent; the heartbeat fires and the link
//!     transitions to a SAFE STATE instead of hanging forever.
//!
//! Run: `cargo run --bin demo`

use framed_codec::{encode_frame, run_link, LinkEvent, Telemetry};
use std::time::Duration;
use tokio::io::AsyncWriteExt;
use tokio_util::sync::CancellationToken;

#[tokio::main]
async fn main() {
    let (mut producer, consumer) = tokio::io::duplex(4096);
    // Never cancelled in this demo — the link ends via the heartbeat.
    let shutdown = CancellationToken::new();

    // Consumer: run the production read loop, print every frame.
    let consumer_task = tokio::spawn(async move {
        let mut count = 0u32;
        let outcome = run_link(consumer, Duration::from_millis(300), shutdown, |f| {
            count += 1;
            println!(
                "  recv  seq={:<3} ts={:>8}us  pitch={:+.2} roll={:+.2} yaw={:+.2}",
                f.seq, f.ts_us, f.pitch, f.roll, f.yaw
            );
        })
        .await;
        (count, outcome)
    });

    println!("-- sending 5 frames (frame #3 is split mid-wire on purpose) --");
    for seq in 0..5u32 {
        let frame = Telemetry {
            seq,
            ts_us: seq as u64 * 10_000,
            pitch: seq as f32 * 0.5,
            roll: -(seq as f32),
            yaw: seq as f32 * 2.0,
        };
        let bytes = encode_frame(frame);

        if seq == 3 {
            // Split this frame across two writes with a stall between —
            // the consumer must NOT tear it.
            let mid = bytes.len() / 2;
            producer.write_all(&bytes[..mid]).await.unwrap();
            println!("  ..wrote first {mid}B of seq=3, stalling 80ms..");
            tokio::time::sleep(Duration::from_millis(80)).await;
            producer.write_all(&bytes[mid..]).await.unwrap();
        } else {
            producer.write_all(&bytes).await.unwrap();
        }
        tokio::time::sleep(Duration::from_millis(30)).await;
    }

    println!("-- producer now goes silent; heartbeat is 300ms --");
    // Keep the pipe open but send nothing. run_link's heartbeat fires.
    let (count, outcome) = consumer_task.await.unwrap();

    println!("-- received {count} frames, link outcome: {outcome:?} --");
    assert_eq!(count, 5, "all 5 frames (incl. the split one) must arrive");
    assert_eq!(outcome.unwrap(), LinkEvent::HeartbeatTimeout);
    println!("LINK SILENT -> SAFE STATE. demo OK.");

    // Hold the producer until here so the pipe isn't closed early (which
    // would surface as PeerClosed instead of the heartbeat path).
    drop(producer);
}

tests/codec.rs (raw)

//! Integration tests: round-trip, arbitrary chunk splitting, malformed
//! input, and the cancel-safety guarantee under a real `select!`.

use bytes::BytesMut;
use framed_codec::{encode_frame, Telemetry, TelemetryCodec};
use tokio_util::codec::Decoder;

fn sample(seq: u32) -> Telemetry {
    Telemetry { seq, ts_us: seq as u64 * 7, pitch: 1.0, roll: -2.5, yaw: 0.25 }
}

/// A frame delivered one byte at a time must still decode exactly once,
/// only when the final byte arrives. This is the partial-read guarantee.
#[test]
fn decodes_only_when_complete_even_byte_by_byte() {
    let wire = encode_frame(sample(42));
    let mut codec = TelemetryCodec;
    let mut buf = BytesMut::new();

    for (i, b) in wire.iter().enumerate() {
        buf.extend_from_slice(&[*b]);
        let got = codec.decode(&mut buf).unwrap();
        if i + 1 < wire.len() {
            assert!(got.is_none(), "must not decode at partial byte {i}");
        } else {
            assert_eq!(got, Some(sample(42)), "decodes exactly on last byte");
        }
    }
    assert!(buf.is_empty(), "fully consumed");
}

/// Many frames concatenated, then fed in arbitrary chunk sizes, must all
/// come out in order with nothing torn.
#[test]
fn many_frames_arbitrary_chunking() {
    let mut wire = BytesMut::new();
    for s in 0..50 {
        wire.extend_from_slice(&encode_frame(sample(s)));
    }
    let mut codec = TelemetryCodec;
    let mut buf = BytesMut::new();
    let mut out = Vec::new();
    for chunk in wire.chunks(7) {
        buf.extend_from_slice(chunk);
        while let Some(f) = codec.decode(&mut buf).unwrap() {
            out.push(f);
        }
    }
    assert_eq!(out.len(), 50);
    for (i, f) in out.iter().enumerate() {
        assert_eq!(*f, sample(i as u32));
    }
}

/// A corrupt / hostile length prefix must be a clean error, never a
/// panic or an allocation blow-up.
#[test]
fn rejects_out_of_bounds_length_without_panic() {
    let mut codec = TelemetryCodec;
    let mut buf = BytesMut::new();
    buf.extend_from_slice(&u32::to_be_bytes(0xFFFF_FFFF)); // ~4 GiB
    buf.extend_from_slice(&[0u8; 8]);
    let err = codec.decode(&mut buf).unwrap_err();
    assert_eq!(err.kind(), std::io::ErrorKind::InvalidData);
}

/// The headline guarantee: a frame split across the wire, with a
/// `select!` that *cancels the read mid-frame* in between, still decodes
/// intact. `read_exact` would lose the first half here.
#[tokio::test]
async fn no_torn_frame_across_select_cancellation() {
    use futures::StreamExt;
    use tokio::io::AsyncWriteExt;
    use tokio_util::codec::Framed;

    let (mut tx, rx) = tokio::io::duplex(64);
    let mut framed = Framed::new(rx, TelemetryCodec);
    let wire = encode_frame(sample(7));
    let mid = wire.len() / 2;

    tx.write_all(&wire[..mid]).await.unwrap();

    // Race the frame read against a short timer that WINS, cancelling
    // `framed.next()` while only half the frame is buffered.
    let first = tokio::select! {
        f = framed.next() => Some(f),
        _ = tokio::time::sleep(std::time::Duration::from_millis(20)) => None,
    };
    assert!(first.is_none(), "timer won; the read future was cancelled");

    // The half-frame bytes are still safe inside `framed`. Deliver the
    // rest and poll again — it must reconstruct, not desync.
    tx.write_all(&wire[mid..]).await.unwrap();
    let f = framed.next().await.unwrap().unwrap();
    assert_eq!(f, sample(7));
}

/// Property: any frame, split at any boundary, round-trips exactly.
mod prop {
    use super::*;
    use proptest::prelude::*;

    proptest! {
        #[test]
        fn roundtrip_any_split(
            seq in any::<u32>(), ts in any::<u64>(),
            p in any::<f32>(), r in any::<f32>(), y in any::<f32>(),
            split in 0usize..28,
        ) {
            let t = Telemetry { seq, ts_us: ts, pitch: p, roll: r, yaw: y };
            let wire = encode_frame(t);
            let split = split.min(wire.len());
            let mut codec = TelemetryCodec;
            let mut buf = BytesMut::new();

            buf.extend_from_slice(&wire[..split]);
            prop_assert!(codec.decode(&mut buf).unwrap().is_none() || split == wire.len());
            buf.extend_from_slice(&wire[split..]);
            let got = codec.decode(&mut buf).unwrap().unwrap();

            // NaN != NaN, so compare bit patterns for the floats.
            prop_assert_eq!(got.seq, t.seq);
            prop_assert_eq!(got.ts_us, t.ts_us);
            prop_assert_eq!(got.pitch.to_bits(), t.pitch.to_bits());
            prop_assert_eq!(got.roll.to_bits(), t.roll.to_bits());
            prop_assert_eq!(got.yaw.to_bits(), t.yaw.to_bits());
        }
    }
}