Personal systems project · Rust / tokio · source below, raw files linked per section
← all projects# framed-codec **Cancel-safe, zero-allocation-per-frame length-prefixed framing for tokio byte streams.** A small, self-contained building block for streaming binary telemetry (drone/robot links, sensor feeds, any binary wire protocol) over anything that is `AsyncRead + AsyncWrite` — TCP, a Unix socket, or an in-process pipe. ## The problem Two bugs recur in hand-rolled async framing code: 1. **Torn frames across packet boundaries.** A `read()` returns whatever bytes happened to arrive — frequently *half* a frame. Code that assumes "one read = one frame" silently corrupts the stream. 2. **Lost data on cancellation.** `read_exact()` into a stack buffer inside a `tokio::select!` is **not cancel-safe**: if another branch wins, the partially-read bytes on the stack are discarded and the stream is desynchronised permanently. ## The approach A length-prefixed [`tokio_util::codec`] `Decoder`/`Encoder`. All partial-read state lives inside the `Framed` buffer, never on the stack, so the read future (`StreamExt::next`) **is** cancel-safe — a `select!` can drop it mid-frame and the next poll resumes exactly where it left off. The decoder validates the declared length *before* indexing, so a hostile or corrupt length is a clean protocol error, never a panic or an OOM. Decoding a frame produces a `Copy` value with no per-frame heap allocation. `run_link` is the production-shaped read loop: decode frames, reset a liveness heartbeat on every frame, and race the read against a heartbeat deadline and a `CancellationToken` — mapping silence to a safe-state transition instead of an indefinite hang. ## Run it ```bash cargo run --bin demo # end-to-end: split frame + heartbeat → safe state cargo test # unit + integration + property tests ``` The demo uses an in-process `tokio::io::duplex` pipe (no sockets, no ports — runs anywhere). It deliberately splits one frame across two writes with a stall between them to prove the receiver reconstructs it intact, then goes silent to show the heartbeat tripping a safe state. ## What the tests prove - `decodes_only_when_complete_even_byte_by_byte` — a frame fed one byte at a time decodes exactly once, on the final byte. - `many_frames_arbitrary_chunking` — 50 frames fed in 7-byte chunks all reconstruct in order. - `rejects_out_of_bounds_length_without_panic` — a ~4 GiB declared length is a clean `InvalidData` error. - `no_torn_frame_across_select_cancellation` — the headline guarantee: a frame split across the wire, with a `select!` that cancels the read mid-frame, still decodes intact. - `roundtrip_any_split` (proptest) — any frame, split at any boundary, round-trips bit-exact. ## License MIT OR Apache-2.0.
[package]
name = "framed-codec"
version = "0.1.0"
edition = "2021"
description = "Cancel-safe, zero-allocation-per-frame length-prefixed framing for tokio byte streams."
license = "MIT OR Apache-2.0"
publish = false
[dependencies]
tokio = { version = "1", features = ["rt-multi-thread", "macros", "io-util", "time", "sync"] }
tokio-util = { version = "0.7", features = ["codec", "rt"] }
bytes = "1"
futures = "0.3"
[dev-dependencies]
proptest = "1"
tokio = { version = "1", features = ["rt-multi-thread", "macros", "io-util", "time", "sync", "test-util"] }
[[bin]]
name = "demo"
path = "src/main.rs"
[lib]
name = "framed_codec"
path = "src/lib.rs"
//! # framed-codec
//!
//! A cancel-safe, zero-allocation-per-frame length-prefixed codec for
//! streaming binary telemetry over a tokio byte stream (TCP, a UDS, an
//! in-process duplex, anything `AsyncRead + AsyncWrite`).
//!
//! ## The problem this solves
//!
//! Two bugs show up again and again in hand-rolled async framing code:
//!
//! 1. **Torn frames across packet boundaries.** A `read()` returns
//! whatever bytes happen to have arrived — often *half* a frame. Naive
//! code that assumes one read == one frame corrupts the stream.
//! 2. **Lost data on cancellation.** If you `read_exact()` into a
//! stack buffer inside a `tokio::select!` and another branch wins, the
//! partially-read bytes on the stack are dropped on the floor and the
//! stream is desynchronised forever. `read_exact` is **not
//! cancel-safe**.
//!
//! This crate fixes both by keeping all partial-read state inside a
//! [`tokio_util::codec::Framed`] buffer instead of on the stack. The
//! frame future ([`futures::StreamExt::next`]) is cancel-safe: if a
//! `select!` drops it mid-frame, the bytes already pulled from the
//! socket stay buffered and the next poll resumes exactly where it left
//! off. No torn frames, no desync, no per-frame heap allocation.
//!
//! See [`run_link`] for the production-shaped read loop, and the
//! `demo` binary for a runnable end-to-end example.
use bytes::{Buf, BufMut, BytesMut};
use std::io;
use std::time::Duration;
use tokio::io::{AsyncRead, AsyncWrite};
use tokio_util::codec::{Decoder, Encoder};
/// Fixed telemetry payload: `seq(u32) ts_us(u64) pitch/roll/yaw(f32)`,
/// big-endian, 24 bytes. `Copy` — decoding one produces a value, never a
/// heap allocation.
pub const PAYLOAD_LEN: usize = 4 + 8 + 4 + 4 + 4; // = 24
const HEADER_LEN: usize = 4; // u32 big-endian length prefix
/// Reject absurd declared lengths *before* indexing the buffer. An
/// attacker (or a corrupt link) sending a 4 GiB length must not panic or
/// OOM the process — it must be a clean protocol error.
pub const MAX_FRAME: usize = 64 * 1024;
/// One decoded telemetry sample. `Copy`, no owned heap data.
#[derive(Debug, Clone, Copy, PartialEq)]
pub struct Telemetry {
pub seq: u32,
pub ts_us: u64,
pub pitch: f32,
pub roll: f32,
pub yaw: f32,
}
impl Telemetry {
fn write_payload(&self, dst: &mut BytesMut) {
dst.put_u32(self.seq);
dst.put_u64(self.ts_us);
dst.put_f32(self.pitch);
dst.put_f32(self.roll);
dst.put_f32(self.yaw);
}
/// `src` is exactly [`PAYLOAD_LEN`] bytes. Reads fields directly out
/// of the framing buffer — no intermediate allocation.
fn read_payload(mut src: impl Buf) -> Telemetry {
Telemetry {
seq: src.get_u32(),
ts_us: src.get_u64(),
pitch: src.get_f32(),
roll: src.get_f32(),
yaw: src.get_f32(),
}
}
}
/// Length-prefixed codec: `[u32 BE length][payload]`.
///
/// The length prefix is what makes this robust to a payload split
/// across N socket reads: the decoder simply returns `Ok(None)` until
/// the full frame is buffered, and `Framed` retains the partial bytes.
#[derive(Debug, Default, Clone)]
pub struct TelemetryCodec;
impl Decoder for TelemetryCodec {
type Item = Telemetry;
type Error = io::Error;
fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Telemetry>, io::Error> {
if src.len() < HEADER_LEN {
return Ok(None); // header not fully arrived yet — wait
}
let len = u32::from_be_bytes(src[..HEADER_LEN].try_into().unwrap()) as usize;
// Validate BEFORE using `len` to index. A bad length is a
// protocol error, never a panic / never an OOM.
if len == 0 || len > MAX_FRAME {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
format!("declared frame length {len} out of bounds"),
));
}
if src.len() < HEADER_LEN + len {
// Body not fully arrived. Reserve once so steady-state has
// no per-frame realloc, keep the partial bytes, and wait.
src.reserve(HEADER_LEN + len - src.len());
return Ok(None);
}
src.advance(HEADER_LEN); // consume the prefix
let body = src.split_to(len); // O(1) refcount split — not a payload copy
if len != PAYLOAD_LEN {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
format!("unexpected payload size {len}, want {PAYLOAD_LEN}"),
));
}
Ok(Some(Telemetry::read_payload(body)))
}
}
impl Encoder<Telemetry> for TelemetryCodec {
type Error = io::Error;
fn encode(&mut self, item: Telemetry, dst: &mut BytesMut) -> Result<(), io::Error> {
dst.reserve(HEADER_LEN + PAYLOAD_LEN);
dst.put_u32(PAYLOAD_LEN as u32);
item.write_payload(dst);
Ok(())
}
}
/// Outcome of [`run_link`]. A real C2 caller maps `HeartbeatTimeout` to
/// a failsafe / safe-state transition.
#[derive(Debug, PartialEq, Eq)]
pub enum LinkEvent {
/// Peer closed the connection cleanly.
PeerClosed,
/// No frame for `heartbeat` — the link is presumed dead.
HeartbeatTimeout,
/// A cooperative shutdown was requested.
ShutdownRequested,
}
/// Production-shaped read loop: decode frames, reset a liveness
/// heartbeat on every frame, and race the read against both a
/// heartbeat deadline and a shutdown signal.
///
/// The whole point: when the heartbeat or shutdown branch wins, the
/// in-flight `framed.next()` future is dropped — but every byte it had
/// already pulled from the socket is still inside `framed`'s buffer, so
/// nothing is torn or lost. Contrast the *broken* version:
///
/// ```ignore
/// // WRONG — read_exact is NOT cancel-safe:
/// tokio::select! {
/// _ = sock.read_exact(&mut buf) => { /* ... */ } // partial buf
/// _ = deadline => { /* buf bytes silently lost; stream desynced */ }
/// }
/// ```
pub async fn run_link<S>(
stream: S,
heartbeat: Duration,
shutdown: tokio_util::sync::CancellationToken,
mut on_frame: impl FnMut(Telemetry),
) -> io::Result<LinkEvent>
where
S: AsyncRead + AsyncWrite + Unpin,
{
use futures::StreamExt;
let mut framed = tokio_util::codec::Framed::new(stream, TelemetryCodec);
loop {
// Fresh deadline each iteration => the heartbeat resets every
// time a frame arrives. Cancel-safe: sleep holds no data.
let deadline = tokio::time::sleep(heartbeat);
tokio::pin!(deadline);
tokio::select! {
// `biased` is safe here: every branch *pends* until its
// event. (A `watch` whose sender was dropped would resolve
// immediately forever and, under `biased`, spin — which is
// exactly why this uses a CancellationToken, not a watch.)
biased;
_ = shutdown.cancelled() => return Ok(LinkEvent::ShutdownRequested),
frame = framed.next() => match frame {
Some(Ok(f)) => on_frame(f),
Some(Err(e)) => return Err(e),
None => return Ok(LinkEvent::PeerClosed),
},
_ = &mut deadline => return Ok(LinkEvent::HeartbeatTimeout),
}
}
}
/// Encode one frame to wire bytes — used by the demo and tests to
/// produce deliberately-split payloads.
pub fn encode_frame(t: Telemetry) -> BytesMut {
let mut buf = BytesMut::new();
TelemetryCodec.encode(t, &mut buf).expect("encode is infallible");
buf
}
//! Runnable end-to-end demo for `framed-codec`.
//!
//! Uses an in-process `tokio::io::duplex` pipe (no sockets, no ports —
//! runs anywhere) to show three things:
//!
//! 1. Normal framed send/receive.
//! 2. A frame deliberately split across two writes with a pause in
//! between — the receiver still reconstructs it intact (no torn
//! frame across a packet boundary).
//! 3. The producer goes silent; the heartbeat fires and the link
//! transitions to a SAFE STATE instead of hanging forever.
//!
//! Run: `cargo run --bin demo`
use framed_codec::{encode_frame, run_link, LinkEvent, Telemetry};
use std::time::Duration;
use tokio::io::AsyncWriteExt;
use tokio_util::sync::CancellationToken;
#[tokio::main]
async fn main() {
let (mut producer, consumer) = tokio::io::duplex(4096);
// Never cancelled in this demo — the link ends via the heartbeat.
let shutdown = CancellationToken::new();
// Consumer: run the production read loop, print every frame.
let consumer_task = tokio::spawn(async move {
let mut count = 0u32;
let outcome = run_link(consumer, Duration::from_millis(300), shutdown, |f| {
count += 1;
println!(
" recv seq={:<3} ts={:>8}us pitch={:+.2} roll={:+.2} yaw={:+.2}",
f.seq, f.ts_us, f.pitch, f.roll, f.yaw
);
})
.await;
(count, outcome)
});
println!("-- sending 5 frames (frame #3 is split mid-wire on purpose) --");
for seq in 0..5u32 {
let frame = Telemetry {
seq,
ts_us: seq as u64 * 10_000,
pitch: seq as f32 * 0.5,
roll: -(seq as f32),
yaw: seq as f32 * 2.0,
};
let bytes = encode_frame(frame);
if seq == 3 {
// Split this frame across two writes with a stall between —
// the consumer must NOT tear it.
let mid = bytes.len() / 2;
producer.write_all(&bytes[..mid]).await.unwrap();
println!(" ..wrote first {mid}B of seq=3, stalling 80ms..");
tokio::time::sleep(Duration::from_millis(80)).await;
producer.write_all(&bytes[mid..]).await.unwrap();
} else {
producer.write_all(&bytes).await.unwrap();
}
tokio::time::sleep(Duration::from_millis(30)).await;
}
println!("-- producer now goes silent; heartbeat is 300ms --");
// Keep the pipe open but send nothing. run_link's heartbeat fires.
let (count, outcome) = consumer_task.await.unwrap();
println!("-- received {count} frames, link outcome: {outcome:?} --");
assert_eq!(count, 5, "all 5 frames (incl. the split one) must arrive");
assert_eq!(outcome.unwrap(), LinkEvent::HeartbeatTimeout);
println!("LINK SILENT -> SAFE STATE. demo OK.");
// Hold the producer until here so the pipe isn't closed early (which
// would surface as PeerClosed instead of the heartbeat path).
drop(producer);
}
//! Integration tests: round-trip, arbitrary chunk splitting, malformed
//! input, and the cancel-safety guarantee under a real `select!`.
use bytes::BytesMut;
use framed_codec::{encode_frame, Telemetry, TelemetryCodec};
use tokio_util::codec::Decoder;
fn sample(seq: u32) -> Telemetry {
Telemetry { seq, ts_us: seq as u64 * 7, pitch: 1.0, roll: -2.5, yaw: 0.25 }
}
/// A frame delivered one byte at a time must still decode exactly once,
/// only when the final byte arrives. This is the partial-read guarantee.
#[test]
fn decodes_only_when_complete_even_byte_by_byte() {
let wire = encode_frame(sample(42));
let mut codec = TelemetryCodec;
let mut buf = BytesMut::new();
for (i, b) in wire.iter().enumerate() {
buf.extend_from_slice(&[*b]);
let got = codec.decode(&mut buf).unwrap();
if i + 1 < wire.len() {
assert!(got.is_none(), "must not decode at partial byte {i}");
} else {
assert_eq!(got, Some(sample(42)), "decodes exactly on last byte");
}
}
assert!(buf.is_empty(), "fully consumed");
}
/// Many frames concatenated, then fed in arbitrary chunk sizes, must all
/// come out in order with nothing torn.
#[test]
fn many_frames_arbitrary_chunking() {
let mut wire = BytesMut::new();
for s in 0..50 {
wire.extend_from_slice(&encode_frame(sample(s)));
}
let mut codec = TelemetryCodec;
let mut buf = BytesMut::new();
let mut out = Vec::new();
for chunk in wire.chunks(7) {
buf.extend_from_slice(chunk);
while let Some(f) = codec.decode(&mut buf).unwrap() {
out.push(f);
}
}
assert_eq!(out.len(), 50);
for (i, f) in out.iter().enumerate() {
assert_eq!(*f, sample(i as u32));
}
}
/// A corrupt / hostile length prefix must be a clean error, never a
/// panic or an allocation blow-up.
#[test]
fn rejects_out_of_bounds_length_without_panic() {
let mut codec = TelemetryCodec;
let mut buf = BytesMut::new();
buf.extend_from_slice(&u32::to_be_bytes(0xFFFF_FFFF)); // ~4 GiB
buf.extend_from_slice(&[0u8; 8]);
let err = codec.decode(&mut buf).unwrap_err();
assert_eq!(err.kind(), std::io::ErrorKind::InvalidData);
}
/// The headline guarantee: a frame split across the wire, with a
/// `select!` that *cancels the read mid-frame* in between, still decodes
/// intact. `read_exact` would lose the first half here.
#[tokio::test]
async fn no_torn_frame_across_select_cancellation() {
use futures::StreamExt;
use tokio::io::AsyncWriteExt;
use tokio_util::codec::Framed;
let (mut tx, rx) = tokio::io::duplex(64);
let mut framed = Framed::new(rx, TelemetryCodec);
let wire = encode_frame(sample(7));
let mid = wire.len() / 2;
tx.write_all(&wire[..mid]).await.unwrap();
// Race the frame read against a short timer that WINS, cancelling
// `framed.next()` while only half the frame is buffered.
let first = tokio::select! {
f = framed.next() => Some(f),
_ = tokio::time::sleep(std::time::Duration::from_millis(20)) => None,
};
assert!(first.is_none(), "timer won; the read future was cancelled");
// The half-frame bytes are still safe inside `framed`. Deliver the
// rest and poll again — it must reconstruct, not desync.
tx.write_all(&wire[mid..]).await.unwrap();
let f = framed.next().await.unwrap().unwrap();
assert_eq!(f, sample(7));
}
/// Property: any frame, split at any boundary, round-trips exactly.
mod prop {
use super::*;
use proptest::prelude::*;
proptest! {
#[test]
fn roundtrip_any_split(
seq in any::<u32>(), ts in any::<u64>(),
p in any::<f32>(), r in any::<f32>(), y in any::<f32>(),
split in 0usize..28,
) {
let t = Telemetry { seq, ts_us: ts, pitch: p, roll: r, yaw: y };
let wire = encode_frame(t);
let split = split.min(wire.len());
let mut codec = TelemetryCodec;
let mut buf = BytesMut::new();
buf.extend_from_slice(&wire[..split]);
prop_assert!(codec.decode(&mut buf).unwrap().is_none() || split == wire.len());
buf.extend_from_slice(&wire[split..]);
let got = codec.decode(&mut buf).unwrap().unwrap();
// NaN != NaN, so compare bit patterns for the floats.
prop_assert_eq!(got.seq, t.seq);
prop_assert_eq!(got.ts_us, t.ts_us);
prop_assert_eq!(got.pitch.to_bits(), t.pitch.to_bits());
prop_assert_eq!(got.roll.to_bits(), t.roll.to_bits());
prop_assert_eq!(got.yaw.to_bits(), t.yaw.to_bits());
}
}
}