Tutorial: Handle Bursty Traffic with Client Pool
In this tutorial you'll build a program that uses ClientPool to handle bursts of concurrent Mixnet operations without blocking on client creation. You'll see how the pool pre-creates clients in the background, how to pop them under load, and what happens when demand exceeds supply.
What you'll learn
- Creating and starting a
ClientPool - Popping clients from the pool for concurrent operations
- Falling back to on-demand client creation when the pool is empty
- Observing pool replenishment
- Graceful shutdown
97068b2. If the API has changed since then, check the examples in the repo for the latest usage.Prerequisites
- Rust toolchain (1.70+)
- A working internet connection
Step 1: Set up the project
cargo init nym-pool-demo
cd nym-pool-demoAdd dependencies to Cargo.toml:
[dependencies]
nym-sdk = { git = "https://github.com/nymtech/nym", rev = "97068b2" }
nym-network-defaults = { git = "https://github.com/nymtech/nym", rev = "97068b2" }
nym-bin-common = { git = "https://github.com/nymtech/nym", rev = "97068b2", features = ["basic_tracing"] }
tokio = { version = "1", features = ["full"] }
blake3 = "=1.7.0" # required pin — see https://nymtech.net/docs/developers/rust/importingStep 2: Create and start the pool
The pool is created with a reserve size: the number of connected clients it tries to maintain at all times. The start() method runs a background loop that creates clients whenever the pool drops below the reserve.
Create src/main.rs:
use nym_sdk::client_pool::ClientPool;
use nym_sdk::mixnet::MixnetMessageSender;
use nym_network_defaults::setup_env;
use std::time::Duration;
#[tokio::main]
async fn main() {
nym_bin_common::logging::setup_tracing_logger();
// Load mainnet network defaults into env vars (required by ClientPool)
setup_env(None::<String>);
// Create a pool that maintains 3 clients in reserve
let pool = ClientPool::new(3);
// Start the pool in a background task.
// It immediately begins connecting clients.
let pool_bg = pool.clone();
tokio::spawn(async move {
pool_bg.start().await.unwrap();
});
println!("Pool started — waiting for clients to connect...");
tokio::time::sleep(Duration::from_secs(15)).await;
// Check how many are ready
let count = pool.get_client_count().await;
println!("Pool has {count} clients ready");Creating a MixnetClient takes several seconds (gateway handshake, key generation, topology fetch). The pool does this work ahead of time so your application doesn't block when it needs a client.
Step 3: Pop clients and use them
When you call get_mixnet_client(), the pool removes a client and returns it. The background loop notices the shortfall and starts creating a replacement.
// Simulate a burst of 3 concurrent tasks, each needing a client
let mut handles = vec![];
for i in 1..=3 {
let pool = pool.clone();
let handle = tokio::spawn(async move {
// Pop a client from the pool
let mut client = match pool.get_mixnet_client().await {
Some(c) => {
println!("Task {i}: got client {} from pool", c.nym_address());
c
}
None => {
// Pool is empty — fall back to creating one on the fly.
// This is slower but keeps things working.
println!("Task {i}: pool empty, creating client on the fly...");
nym_sdk::mixnet::MixnetClient::connect_new().await.unwrap()
}
};
// Do something with the client — here, send a message to ourselves
let addr = *client.nym_address();
client
.send_plain_message(addr, format!("hello from task {i}"))
.await
.unwrap();
// Wait for the message to arrive
if let Some(msgs) = client.wait_for_messages().await {
for msg in msgs {
if !msg.message.is_empty() {
println!(
"Task {i}: received {:?}",
String::from_utf8_lossy(&msg.message)
);
}
}
}
// Disconnect when done — the pool will create a replacement
client.disconnect().await;
println!("Task {i}: done");
});
handles.push(handle);
}
// Wait for all tasks to finish
for h in handles {
h.await.unwrap();
}Step 4: Observe replenishment
After popping all 3 clients, the pool background loop starts creating replacements. Give it time and check:
// Pool should be replenishing
println!("\nWaiting for pool to replenish...");
tokio::time::sleep(Duration::from_secs(15)).await;
let count = pool.get_client_count().await;
println!("Pool has {count} clients ready again");Step 5: Shut down gracefully
// Disconnect all remaining clients and stop the background loop
pool.disconnect_pool().await;
println!("Pool shut down");
}Step 6: Run it
RUST_LOG=info cargo runYou'll see output like:
Pool started — waiting for clients to connect...
Pool has 3 clients ready
Task 1: got client 8gk4Y...@2xU4d... from pool
Task 2: got client F3qR7...@9nK2m... from pool
Task 3: got client A7bN2...@4pL8w... from pool
Task 1: received "hello from task 1"
Task 2: received "hello from task 2"
Task 3: received "hello from task 3"
Task 1: done
Task 2: done
Task 3: done
Waiting for pool to replenish...
Pool has 3 clients ready again
Pool shut downWhen to use the pool
The pool is most useful when:
- You have bursty traffic: many concurrent operations that each need their own client
- Latency matters: you can't afford the several-second delay of creating a client on each request
- You're building a service: an API endpoint that creates a client per request would benefit from pre-warmed clients
If your application only ever needs one client at a time, just use MixnetClient::connect_new() directly.
The NymProxyClient (TcpProxy module) uses a ClientPool internally: one client per incoming TCP connection.
What you've learned
ClientPool::new(n)creates a pool targetingnreserve clientspool.start()runs a background loop that creates clients whenever the pool is below reservepool.get_mixnet_client()pops a client; returnsNoneif the pool is empty- Clients are consumed, not returned. The pool automatically creates replacements
pool.disconnect_pool()shuts down all remaining clients and stops the background loop- Fall back to on-demand creation when the pool is empty for resilience
Complete code
use nym_sdk::client_pool::ClientPool;
use nym_sdk::mixnet::MixnetMessageSender;
use nym_network_defaults::setup_env;
use std::time::Duration;
#[tokio::main]
async fn main() {
nym_bin_common::logging::setup_tracing_logger();
setup_env(None::<String>);
let pool = ClientPool::new(3);
let pool_bg = pool.clone();
tokio::spawn(async move { pool_bg.start().await.unwrap() });
println!("Waiting for pool to fill...");
tokio::time::sleep(Duration::from_secs(15)).await;
println!("Pool has {} clients", pool.get_client_count().await);
let mut handles = vec![];
for i in 1..=3 {
let pool = pool.clone();
handles.push(tokio::spawn(async move {
let mut client = match pool.get_mixnet_client().await {
Some(c) => c,
None => nym_sdk::mixnet::MixnetClient::connect_new().await.unwrap(),
};
let addr = *client.nym_address();
client
.send_plain_message(addr, format!("hello from task {i}"))
.await
.unwrap();
if let Some(msgs) = client.wait_for_messages().await {
for msg in msgs.iter().filter(|m| !m.message.is_empty()) {
println!("Task {i}: {}", String::from_utf8_lossy(&msg.message));
}
}
client.disconnect().await;
}));
}
for h in handles {
h.await.unwrap();
}
println!("Waiting for replenishment...");
tokio::time::sleep(Duration::from_secs(15)).await;
println!("Pool has {} clients", pool.get_client_count().await);
pool.disconnect_pool().await;
println!("Done");
}