Stream Module
The Mixnet is fundamentally message-based: no persistent connections, no guaranteed ordering, no TCP. The default message API works at this level, sending individual payloads independently through Mix Nodes. This is effective for privacy but unlike how most networking code is structured.
The Stream module bridges the gap by providing persistent, bidirectional byte channels that behave like TCP sockets. Each MixnetStream implements AsyncRead (opens in a new tab) and AsyncWrite (opens in a new tab), so tokio::io::copy, codecs, BufReader/BufWriter, and any other async I/O consumer work without modification. If you're coming from socket-based networking, start here.
All streams are multiplexed over a single MixnetClient. A background router task reads a small header on each incoming message and dispatches the payload to the correct stream by ID, so multiple concurrent streams require no additional connections or gateways.
How it works
The two sides of a stream connection follow a client/server pattern:
- Opener calls
client.open_stream(recipient, surbs). This generates a randomStreamId, registers the stream locally, and sends anOpenmessage through the Mixnet. - Listener calls
listener.accept(), which blocks until anOpenarrives, registers the new stream, and returns aMixnetStreamready for reading and writing. - Both sides read and write using standard
AsyncRead/AsyncWrite. Bytes are wrapped in a 16-byte LP frame header (stream ID, message type, sequence number), routed through the Mixnet, and demultiplexed on arrival. - Cleanup happens on
drop. The stream deregisters from the local router. No close message is sent over the wire, since a close could race ahead of in-flight data.
┌─────────────────────────────────────────────────────────┐
│ MixnetClient │
│ │
│ ┌──────────────┐ ┌──────────────┐ │
│ │ MixnetStream │ │ MixnetStream │ ... │
│ │ (peer A) │ │ (peer B) │ │
│ └──────┬───────┘ └──────┬───────┘ │
│ │writes │writes │
│ ▼ ▼ │
│ ┌─────────────────────────────────┐ │
│ │ ClientInput.input_sender │ │
│ └──────────────┬──────────────────┘ │
│ │ │
│ ▼ │
│ ── mixnet ── │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────┐ │
│ │ reconstructed_receiver │ │
│ └──────────────┬──────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────┐ │
│ │ Router task │ │
│ │ decode header → dispatch by ID │ │
│ └──┬──────────────────────────┬───┘ │
│ │ Open messages │ Data messages │
│ ▼ ▼ │
│ ┌──────────────┐ ┌──────────────────┐ │
│ │MixnetListener│ │ StreamMap lookup │ │
│ │ .accept() │ │ → per-stream tx │ │
│ └──────────────┘ └──────────────────┘ │
└─────────────────────────────────────────────────────────┘Complete example
A minimal example with two clients on the same machine: one opens a stream to the other, sends a message, and reads a reply.
use nym_sdk::mixnet;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use std::time::Duration;
const TIMEOUT: Duration = Duration::from_secs(60);
#[tokio::main]
async fn main() {
// Connect two ephemeral clients
let mut sender = mixnet::MixnetClient::connect_new().await.unwrap();
let mut receiver = mixnet::MixnetClient::connect_new().await.unwrap();
let receiver_addr = *receiver.nym_address();
// The receiver creates a listener (activates stream mode)
let mut listener = receiver.listener().unwrap();
// The sender opens a stream to the receiver's Nym address
let mut outbound = sender.open_stream(receiver_addr, None).await.unwrap();
// The receiver accepts the incoming stream
let mut inbound = tokio::time::timeout(TIMEOUT, listener.accept())
.await
.expect("timed out")
.expect("listener closed");
// Send data and read it back — just like a TCP socket
outbound.write_all(b"hello from sender").await.unwrap();
outbound.flush().await.unwrap();
let mut buf = vec![0u8; 1024];
let n = tokio::time::timeout(TIMEOUT, inbound.read(&mut buf))
.await
.expect("timed out")
.expect("read failed");
println!("Receiver got: {}", String::from_utf8_lossy(&buf[..n]));
// Reply back through the same stream
inbound.write_all(b"hello from receiver").await.unwrap();
inbound.flush().await.unwrap();
let n = tokio::time::timeout(TIMEOUT, outbound.read(&mut buf))
.await
.expect("timed out")
.expect("read failed");
println!("Sender got: {}", String::from_utf8_lossy(&buf[..n]));
// Streams deregister on drop, then disconnect clients
drop(outbound);
drop(inbound);
sender.disconnect().await;
receiver.disconnect().await;
}The receiver replies via reply SURBs (Single Use Reply Blocks) and never learns the sender's Nym address.
When to use streams vs messages
| Messages | Streams | TcpProxy | |
|---|---|---|---|
| Pattern | Raw message payloads | Persistent bidirectional channels | TCP socket proxying |
| API | send_plain_message() / wait_for_messages() | AsyncRead + AsyncWrite | Localhost TCP socket |
| Multiplexing | N/A | Multiple streams per client | One client per TCP connection |
| Ordering | No guarantees | Sequence-based reordering | Session-based ordering |
| Best for | Simple notifications, one-shot requests | Interactive protocols, streaming data, any code expecting async I/O | Wrapping existing TCP applications |
| Status | Stable | New | Deprecated |
Streams and messages are mutually exclusive. Once you call open_stream() or listener(), the message-based API (send_plain_message, wait_for_messages) is permanently disabled on that client. This is a one-way transition: there is no switching back without disconnecting and reconnecting. See the stream_mode_guard.rs example (opens in a new tab) for details.
Next steps
- Tutorial: Build a private echo server: server and client communicating over streams
- Architecture: wire protocol, router task, data flow, stream cleanup, and known limitations
- Examples: annotated walkthroughs of the SDK examples (multi-stream, idle timeout, throughput testing)