Developers
Tutorial

Tutorial: Build a Private Echo Server

In this tutorial you'll build two programs: a server that listens for incoming streams and echoes back whatever it receives, and a client that opens a stream, sends data, and reads the echo. Both communicate through the Nym Mixnet using AsyncRead and AsyncWrite, just like TCP sockets.

What you'll learn

  • Setting up a MixnetListener to accept incoming streams
  • Opening an outbound stream with open_stream()
  • Reading and writing with standard tokio I/O traits
  • How streams are multiplexed over a single MixnetClient
  • Clean shutdown and stream lifecycle
Code verified against commit 97068b2. If the API has changed since then, check the examples in the repo for the latest usage.

Prerequisites

  • Rust toolchain (1.70+)
  • A working internet connection (clients connect to the live Nym Mixnet)

Step 1: Set up the project

cargo init nym-echo
cd nym-echo
rm src/main.rs

Add dependencies to Cargo.toml:

[dependencies]
nym-sdk = { git = "https://github.com/nymtech/nym", rev = "97068b2" }
nym-bin-common = { git = "https://github.com/nymtech/nym", rev = "97068b2", features = ["basic_tracing"] }
tokio = { version = "1", features = ["full"] }
rand = "0.8"
blake3 = "=1.7.0"  # required pin — see https://nymtech.net/docs/developers/rust/importing

Step 2: Build the echo server

The server connects a MixnetClient, creates a listener, and accepts streams in a loop. Each stream gets its own task that reads data and writes it back.

Create src/bin/server.rs:

use nym_sdk::mixnet;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
 
#[tokio::main]
async fn main() {
    nym_bin_common::logging::setup_tracing_logger();
 
    // Connect to the Mixnet
    let mut client = mixnet::MixnetClient::connect_new().await.unwrap();
    println!("Echo server listening at: {}", client.nym_address());
 
    // Create a listener — this activates stream mode.
    // From this point, message-based methods are disabled.
    let mut listener = client.listener().unwrap();
 
    // Accept streams in a loop
    loop {
        let mut stream = match listener.accept().await {
            Some(s) => s,
            None => {
                println!("Listener closed");
                break;
            }
        };
 
        let stream_id = stream.id();
        println!("Accepted stream {stream_id}");
 
        // Spawn a task to handle each stream concurrently
        tokio::spawn(async move {
            let mut buf = vec![0u8; 32_000];
 
            loop {
                let n = match stream.read(&mut buf).await {
                    Ok(0) => break,     // EOF — stream closed
                    Ok(n) => n,
                    Err(e) => {
                        eprintln!("Stream {stream_id} read error: {e}");
                        break;
                    }
                };
 
                let data = &buf[..n];
                println!("Stream {stream_id} received {n} bytes");
 
                // Echo it back
                if let Err(e) = stream.write_all(data).await {
                    eprintln!("Stream {stream_id} write error: {e}");
                    break;
                }
                stream.flush().await.unwrap();
            }
 
            println!("Stream {stream_id} closed");
        });
    }
}

listener() can only be called once per client. It takes exclusive ownership of the inbound message channel. A second call returns Error::ListenerAlreadyTaken.

Step 3: Build the client

The client connects, opens a stream to the server, sends a few messages, reads back the echoes, and disconnects.

Create src/bin/client.rs:

use nym_sdk::mixnet::{self, Recipient};
use std::time::Duration;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
 
const TIMEOUT: Duration = Duration::from_secs(60);
 
#[tokio::main]
async fn main() {
    nym_bin_common::logging::setup_tracing_logger();
 
    // Read the server's Nym address from the command line
    let server_addr: Recipient = std::env::args()
        .nth(1)
        .expect("Usage: client <SERVER_NYM_ADDRESS>")
        .parse()
        .expect("Invalid Nym address");
 
    // Connect to the Mixnet
    let mut client = mixnet::MixnetClient::connect_new().await.unwrap();
    println!("Client address: {}", client.nym_address());
 
    // Open a stream to the server.
    // The second argument (None) uses the default number of reply SURBs.
    let mut stream = client.open_stream(server_addr, None).await.unwrap();
    println!("Stream opened: {}", stream.id());
 
    // Give the Open message time to traverse the mixnet and reach the server.
    // open_stream() returns immediately after sending — it doesn't wait for
    // the server to accept. Writing too soon risks the data arriving before
    // the Open, which the server would drop.
    tokio::time::sleep(Duration::from_secs(5)).await;
 
    // Send three payloads of different sizes and verify the echo.
    // Random bytes show that streams are binary-safe — not just text.
    let sizes = [320, 25_000, 1280];
 
    for (i, &size) in sizes.iter().enumerate() {
        let payload: Vec<u8> = (0..size).map(|_| rand::random::<u8>()).collect();
        println!("Sending message {} ({size} bytes)", i + 1);
 
        stream.write_all(&payload).await.unwrap();
        stream.flush().await.unwrap();
 
        // Read the echo
        let mut buf = vec![0u8; 32_000];
        let n = tokio::time::timeout(TIMEOUT, stream.read(&mut buf))
            .await
            .expect("timed out waiting for echo")
            .expect("read failed");
 
        assert_eq!(&buf[..n], &payload[..], "echo mismatch on message {}", i + 1);
        println!("Received echo: {n} bytes ok");
    }
 
    // Drop the stream to deregister it from the router
    drop(stream);
 
    // Disconnect the client
    client.disconnect().await;
    println!("Done!");
}

Step 4: Run it

In one terminal, start the server:

RUST_LOG=info cargo run --bin server

It prints its Nym address:

Echo server listening at: 8gk4Y...@2xU4d...

In a second terminal, start the client with the server's address:

RUST_LOG=info cargo run --bin client -- 8gk4Y...@2xU4d...

You'll see the messages traverse the Mixnet and echo back:

Client address: F3qR7...@9nK2m...
Stream opened: 12345678
Sending message 1 (320 bytes)
Received echo: 320 bytes ok
Sending message 2 (25000 bytes)
Received echo: 25000 bytes ok
Sending message 3 (1280 bytes)
Received echo: 1280 bytes ok
Done!

On the server side:

Accepted stream 12345678
Stream 12345678 received 320 bytes
Stream 12345678 received 25000 bytes
Stream 12345678 received 1280 bytes
Stream 12345678 closed

How it works internally

  1. The server's listener() activates stream mode, which spawns a router task that decodes incoming Mixnet messages and dispatches them by stream ID.

  2. The client's open_stream() generates a random 8-byte StreamId, sends an Open message through the Mixnet, and registers the stream in a local routing table.

  3. When the server's router receives the Open message, it delivers it to listener.accept(), which creates the inbound MixnetStream.

  4. Each write_all() prepends a 16-byte LP frame header ([LpFrameKind: 2B][StreamId: 8B][MsgType: 1B][SequenceNum: 4B][Reserved: 1B]) and sends the data through the Mixnet as a Sphinx packet.

  5. On arrival, the router reads the LpFrameKind to identify it as stream traffic, decodes the header, finds the matching stream by ID, and delivers the raw payload to read().

  6. The inbound stream replies via reply SURBs, the same anonymous reply mechanism as the message API, applied transparently. The server never learns the client's Nym address.

  7. When a stream is dropped, it deregisters from the local router. No close message is sent over the wire, since a close could race ahead of in-flight data.

See the Architecture page for the full technical details.

What you've learned

  • client.listener() activates stream mode and returns a MixnetListener
  • listener.accept() blocks until a remote peer opens a stream
  • client.open_stream(recipient, surbs) opens an outbound stream to a Nym address
  • MixnetStream implements AsyncRead + AsyncWrite, so standard tokio I/O works unchanged
  • Multiple streams are multiplexed over a single client
  • Streams deregister on drop; no close handshake is needed
  • The server replies via SURBs and never learns the client's address

Complete code

Server (src/bin/server.rs)

use nym_sdk::mixnet;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
 
#[tokio::main]
async fn main() {
    nym_bin_common::logging::setup_tracing_logger();
 
    let mut client = mixnet::MixnetClient::connect_new().await.unwrap();
    println!("Echo server listening at: {}", client.nym_address());
 
    let mut listener = client.listener().unwrap();
 
    loop {
        let mut stream = match listener.accept().await {
            Some(s) => s,
            None => break,
        };
 
        let stream_id = stream.id();
        println!("Accepted stream {stream_id}");
 
        tokio::spawn(async move {
            let mut buf = vec![0u8; 32_000];
            loop {
                let n = match stream.read(&mut buf).await {
                    Ok(0) | Err(_) => break,
                    Ok(n) => n,
                };
                if let Err(_) = stream.write_all(&buf[..n]).await {
                    break;
                }
                stream.flush().await.unwrap();
            }
            println!("Stream {stream_id} closed");
        });
    }
}

Client (src/bin/client.rs)

use nym_sdk::mixnet::{self, Recipient};
use std::time::Duration;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
 
const TIMEOUT: Duration = Duration::from_secs(60);
 
#[tokio::main]
async fn main() {
    nym_bin_common::logging::setup_tracing_logger();
 
    let server_addr: Recipient = std::env::args()
        .nth(1)
        .expect("Usage: client <SERVER_NYM_ADDRESS>")
        .parse()
        .expect("Invalid Nym address");
 
    let mut client = mixnet::MixnetClient::connect_new().await.unwrap();
    println!("Client address: {}", client.nym_address());
 
    let mut stream = client.open_stream(server_addr, None).await.unwrap();
    println!("Stream opened: {}", stream.id());
 
    // Wait for the Open message to reach the server through the mixnet
    tokio::time::sleep(Duration::from_secs(5)).await;
 
    let sizes = [320, 25_000, 1280];
 
    for (i, &size) in sizes.iter().enumerate() {
        let payload: Vec<u8> = (0..size).map(|_| rand::random::<u8>()).collect();
        println!("Sending message {} ({size} bytes)", i + 1);
 
        stream.write_all(&payload).await.unwrap();
        stream.flush().await.unwrap();
 
        let mut buf = vec![0u8; 32_000];
        let n = tokio::time::timeout(TIMEOUT, stream.read(&mut buf))
            .await
            .expect("timed out waiting for echo")
            .expect("read failed");
 
        assert_eq!(&buf[..n], &payload[..], "echo mismatch on message {}", i + 1);
        println!("Received echo: {n} bytes ok");
    }
 
    drop(stream);
    client.disconnect().await;
    println!("Done!");
}