Rust Concurrency Patterns for AI Agents
· 7 min read
Production patterns for building fast, concurrent AI agents in Rust.
Why Rust over Python?
# Python: GIL blocks true parallelism
await asyncio.gather(
rag_search(query), # waiting...
web_search(query) # also waiting... (not parallel for CPU)
)
// Rust: actually parallel
let (rag, web) = tokio::join!(
rag_search(&query), // thread 1
web_search(&query) // thread 2 - truly concurrent
);
| Python | Rust | |
|---|---|---|
| CPU Parallelism | GIL blocks | Real threads |
| Memory | ~28 bytes/int + GC | 4 bytes/int, no GC |
| Latency | GC spikes (50-200ms) | Consistent |
| Scaling | Limited by GIL | Linear with cores |
Bottom line: Python's GIL prevents true parallelism for CPU-bound work. Rust gives you real concurrency with predictable performance.
Tokio vs Rayon: Quick Reference
┌─────────────────────────────────────────────────────────────┐
│ AI Agent Workloads │
├─────────────────────────┬───────────────────────────────────┤
│ I/O-bound │ CPU-bound │
│ (Tokio) │ (Rayon) │
├─────────────────────────┼───────────────────────────────────┤
│ • LLM API calls │ • Local embedding │
│ • Web search │ • Reranking/scoring │
│ • Vector DB queries │ • Text chunking │
│ • URL fetching │ • Matrix operations │
└─────────────────────────┴───────────────────────────────────┘
Rule of thumb:
- Network/API call →
tokio - Local CPU compute →
rayon - Both →
tokio+spawn_blocking+rayon
Pattern 1: Bounded Concurrent Processing
Use case: Parse thousands of files concurrently with memory limits
use tokio::sync::Semaphore;
use futures::stream::{self, StreamExt};
use dashmap::DashMap;
pub struct DocumentProcessor {
max_concurrent: usize,
cache: Arc<DashMap<String, Document>>,
}
async fn process_all(&self, files: Vec<PathBuf>) -> Result<Vec<Document>> {
let sem = Arc::new(Semaphore::new(self.max_concurrent));
stream::iter(files)
.map(|path| {
let sem = sem.clone();
async move {
let _permit = sem.acquire().await?;
tokio::task::spawn_blocking(move || {
parse_document(&path)
}).await?
}
})
.buffer_unordered(self.max_concurrent)
.collect()
.await
}
| Choice | Why | Alternative |
|---|---|---|
Semaphore | Limit memory usage | join_all - OOM with 10K files |
buffer_unordered | Process as complete | buffer - waits for order |
spawn_blocking | Don't block tokio | Inline - blocks runtime |
DashMap | Lock-free cache | Mutex<HashMap> - contention |
Pattern 2: Parallel Pipeline Stages
Use case: RAG pipeline - retrieve, rerank, generate
async fn run(&self, query: &str) -> Result<Response> {
// Stage 1: Parallel retrieval (I/O)
let (docs, web, graph) = tokio::try_join!(
self.retriever.search_docs(query),
self.retriever.search_web(query),
self.retriever.search_graph(query)
)?;
// Stage 2: Rerank (CPU)
let ranked = tokio::task::spawn_blocking({
let all = merge(docs, web, graph);
move || rerank(&all)
}).await?;
// Stage 3: Generate (I/O)
self.generator.generate(query, &ranked).await
}
| Choice | Why | Alternative |
|---|---|---|
try_join! | Fail fast, static dispatch | join! - waits on error |
spawn_blocking | CPU rerank off async | Inline - blocks 100ms+ |
| Sequential stages | Rerank needs retrieval | Full parallel - impossible |
Pattern 3: Multi-Query Parallel Search
Use case: Query expansion - search "auth", "login", "JWT" together
use futures::future::try_join_all;
async fn multi_search(&self, queries: &[String], k: usize) -> Result<Vec<SearchResult>> {
let futures = queries.iter().map(|q| self.search(q, k));
let results = try_join_all(futures).await?;
Ok(merge_and_dedupe(results))
}
async fn search(&self, query: &str, k: usize) -> Result<Vec<SearchResult>> {
let emb = tokio::task::spawn_blocking({
let q = query.to_string();
move || embed(&q)
}).await??;
self.vector_store.search(&emb, k).await
}
| Choice | Why | Alternative |
|---|---|---|
try_join_all | Dynamic query count | try_join! - fixed at compile |
try_join_all | Fail fast | join_all - collects errors |
spawn_blocking | Embed is CPU | Inline - blocks tokio |
Pattern 4: Provider Fallback Chain
Use case: Embedding fallback - OpenAI → Ollama → local ONNX
pub struct EmbeddingService {
openai: Option<OpenAIProvider>,
ollama: Option<OllamaProvider>,
onnx: Option<OnnxModel>,
}
async fn embed(&self, texts: &[String]) -> Result<Vec<Vec<f32>>> {
// Remote APIs (I/O)
if let Some(openai) = &self.openai {
if let Ok(emb) = openai.embed_batch(texts).await {
return Ok(emb);
}
}
if let Some(ollama) = &self.ollama {
if let Ok(emb) = ollama.embed_batch(texts).await {
return Ok(emb);
}
}
// Local model (CPU)
if let Some(onnx) = &self.onnx {
return tokio::task::spawn_blocking({
let t = texts.to_vec();
let m = onnx.clone();
move || m.embed_sync(&t)
}).await?;
}
Err(anyhow!("No provider available"))
}
| Choice | Why | Alternative |
|---|---|---|
| Sequential try | Stop on success | select! - wastes all |
| Early return | Fast path | Collect all - slower |
spawn_blocking | ONNX is CPU | Inline - blocks 50-200ms |
Pattern 5: Event Loop with Select
Use case: Agent daemon - queries, file changes, shutdown
async fn run_agent(&self, mut events: Receiver<Event>) -> Result<()> {
loop {
tokio::select! {
_ = tokio::time::sleep(Duration::from_secs(30)) => {
self.cleanup_cache().await?;
}
Some(event) = events.recv() => {
match event {
Event::Query(q) => self.handle_query(q).await?,
Event::Index(path) => self.reindex(&path).await?,
}
}
_ = tokio::signal::ctrl_c() => break,
}
}
Ok(())
}
| Choice | Why | Alternative |
|---|---|---|
select! | Multiplex N events | Separate loops - complex |
select! | Auto-cancel losers | Manual cancel - bugs |
| Channel | Decouple producers | Shared state - locks |
Pattern 6: RwLock for Read-Heavy Cache
Use case: Embedding cache - track stats without blocking reads
use parking_lot::RwLock;
use dashmap::DashMap;
pub struct EmbeddingCache<K, V> {
data: DashMap<K, V>,
stats: Arc<RwLock<CacheStats>>,
order: Arc<Mutex<VecDeque<K>>>,
}
impl<K, V> EmbeddingCache<K, V> {
pub fn get(&self, key: &K) -> Option<V> {
if let Some(value) = self.data.get(key) {
if let Some(mut stats) = self.stats.try_write() {
stats.hits += 1;
}
return Some(value.clone());
}
if let Some(mut stats) = self.stats.try_write() {
stats.misses += 1;
}
None
}
pub fn hit_ratio(&self) -> f64 {
let stats = self.stats.read();
stats.hits as f64 / (stats.hits + stats.misses) as f64
}
}
| Choice | Why | Alternative |
|---|---|---|
DashMap | Lock-free K/V | RwLock<HashMap> - contention |
RwLock | Many readers OK | Mutex - blocks reads |
parking_lot | Faster, no poison | std::sync - slower |
try_write | Non-blocking | write - hurts latency |
Mutex for order | Always mutating | RwLock - no benefit |
Concurrency Primitives Summary
| Type | Use Case | Trade-off |
|---|---|---|
Semaphore | Limit concurrent tasks | Permit overhead |
DashMap | High-concurrency K/V | Memory (sharding) |
RwLock | Read-heavy data | Write starvation |
Mutex | Write-heavy, ordering | Readers wait |
try_lock() | Non-critical updates | May skip |
spawn_blocking | CPU in async context | Thread pool overhead |
Rig Agent Patterns
Agent Builder with Tool Registration
use rig::providers::openai;
pub struct AgentBuilder {
executor: Arc<GraphToolExecutor>,
tier: ContextTier,
}
impl AgentBuilder {
pub fn new(executor: Arc<GraphToolExecutor>) -> Self {
Self { executor, tier: ContextTier::Standard }
}
pub fn tier(mut self, tier: ContextTier) -> Self {
self.tier = tier;
self
}
pub fn build(self) -> Result<Agent> {
let factory = GraphToolFactory::new(self.executor);
let agent = openai::Client::from_env()
.agent("gpt-4")
.preamble(&self.system_prompt())
.max_tokens(self.max_tokens())
// Register tools via factory
.tool(factory.semantic_search())
.tool(factory.dependency_analysis())
.tool(factory.code_complexity())
.tool(factory.call_chain())
.build();
Ok(agent)
}
}
| Choice | Why | Alternative |
|---|---|---|
| Builder pattern | Fluent config | Constructor args - messy |
| Tool factory | Centralized creation | Inline tools - duplication |
Arc<Executor> | Share across tools | Clone executor - expensive |
ReAct Multi-Turn Agent
use rig::agent::Agent;
pub struct ReactAgent {
agent: Agent,
factory: GraphToolFactory,
max_turns: usize,
}
impl ReactAgent {
pub async fn execute(&self, query: &str) -> Result<String> {
let response = self.agent
.prompt(query)
.multi_turn(self.max_turns) // reasoning loop
.await?;
// Track tool usage
let tool_count = self.factory.take_tool_call_count();
let traces = self.factory.take_tool_traces();
tracing::info!("Tools called: {}, traces: {:?}", tool_count, traces);
Ok(response)
}
pub async fn execute_stream(&self, query: &str) -> impl Stream<Item = AgentEvent> {
stream! {
yield AgentEvent::Thinking;
let response = self.agent
.prompt(query)
.multi_turn(self.max_turns)
.await;
match response {
Ok(text) => {
for chunk in text.chars().collect::<Vec<_>>().chunks(50) {
yield AgentEvent::Chunk(chunk.iter().collect());
}
yield AgentEvent::Done;
}
Err(e) => yield AgentEvent::Error(e.to_string()),
}
}
}
}
| Choice | Why | Alternative |
|---|---|---|
multi_turn | ReAct reasoning loop | Single turn - no iteration |
| Tool traces | Observability | No tracking - blind |
| Stream events | Progressive UI | Wait for full response |
Tool Factory with Counting
pub struct GraphToolFactory {
executor: Arc<GraphToolExecutor>,
call_count: Arc<AtomicUsize>,
traces: Arc<Mutex<Vec<ToolTrace>>>,
}
impl GraphToolFactory {
pub fn new(executor: Arc<GraphToolExecutor>) -> Self {
Self {
executor,
call_count: Arc::new(AtomicUsize::new(0)),
traces: Arc::new(Mutex::new(Vec::new())),
}
}
pub fn semantic_search(&self) -> impl Tool {
CountingTool::new(
SemanticSearchTool::new(self.executor.clone()),
self.call_count.clone(),
self.traces.clone(),
)
}
pub fn take_tool_call_count(&self) -> usize {
self.call_count.swap(0, Ordering::SeqCst)
}
}
struct CountingTool<T> {
inner: T,
count: Arc<AtomicUsize>,
traces: Arc<Mutex<Vec<ToolTrace>>>,
}
impl<T: Tool> Tool for CountingTool<T> {
async fn call(&self, input: Value) -> Result<Value> {
self.count.fetch_add(1, Ordering::SeqCst);
let start = Instant::now();
let result = self.inner.call(input.clone()).await;
if let Ok(ref traces) = self.traces.try_lock() {
traces.push(ToolTrace {
tool: std::any::type_name::<T>(),
input,
duration: start.elapsed(),
});
}
result
}
}
| Choice | Why | Alternative |
|---|---|---|
AtomicUsize | Lock-free counting | Mutex<usize> - contention |
try_lock | Don't block on trace | lock - hurts latency |
| Wrapper pattern | Add counting to any tool | Modify each tool - tedious |
Full Example: RAG Pipeline
pub async fn search_and_answer(query: &str) -> Result<Answer> {
// 1. Parallel retrieval (I/O)
let (rag, web) = tokio::try_join!(
async {
let (vec, kw) = tokio::join!(
vector_search(query),
keyword_search(query),
);
rrf_merge(vec![vec, kw])
},
async {
let urls = web_search(query).await?;
parallel_fetch(&urls).await
},
)?;
// 2. Rerank (CPU)
let top_k = tokio::task::spawn_blocking({
let merged = merge(rag, web);
let q = query.to_string();
move || {
use rayon::prelude::*;
let mut scored: Vec<_> = merged
.par_iter()
.map(|doc| (doc.clone(), score(&q, doc)))
.collect();
scored.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap());
scored.into_iter().take(10).map(|(d, _)| d).collect()
}
}).await?;
// 3. Generate (I/O)
generate_with_citations(query, &top_k).await
}
Resources
- Rig - Rust AI agent framework
- Tokio - Async runtime
- Rayon - Data parallelism
- DashMap - Concurrent HashMap
- parking_lot - Fast locks
