Multi-agent system tutorial : de zéro à un système multi-agents en production
Apprenez à construire un système multi-agents de A à Z. Ce tutoriel couvre la conception, l'orchestration, la communication inter-agents, le debugging et le déploiement d'un système multi-agents en production.
Multi-agent system tutorial : de zéro à un système multi-agents en production
Introduction
Un système multi-agents, ce n'est pas un chatbot boosté. C'est un ensemble d'entités autonomes qui communiquent, se coordonnent et partagent un objectif commun — sans que vous ayez à orchestrer chaque étape manuellement.
Dans ce tutoriel, on construit un système de veille marché concret :
- Agent 1 — scrape les actualités
- Agent 2 — les analyse
- Agent 3 — rédige un rapport
Chaque agent a un rôle précis, une mémoire partagée, et un protocole de communication.
Stack utilisée : OpenClaw + Python + Docker.
Résumé rapide
| Critère | Valeur |
|---|---|
| Type d'article | Tutoriel production-ready |
| Stack principale | OpenClaw + Python |
| Use case | Veille marché multi-agents |
| Déploiement | Docker Compose |
| Monitoring | Prometheus + Grafana |
| Niveau | Intermédiaire à avancé |
Qu'est-ce qu'un système multi-agents ?
Un système multi-agents est un ensemble d'agents IA autonomes qui collaborent pour atteindre un objectif qu'aucun agent ne pourrait accomplir seul.
Chaque agent possède :
- Un rôle défini — spécialité, compétences, outils disponibles
- Une mémoire propre — contexte, historique de ses tâches
- Une capacité de communication — envoi et réception de messages
- Une logique de décision — quand agir, quand attendre, quand déléguer
La différence avec un agent unique : la décomposition fonctionnelle. Plutôt qu'un agent giant qui fait tout, on crée plusieurs agents spécialisés qui coopèrent. C'est l'approche derrière les architectures multi-agents.
Pourquoi passer à un système multi-agents ?
| Approche | Limite |
|---|---|
| Agent unique monolithique | ✓ Tâches simples et linéaires |
| Système multi-agents | ✓ Pipelines complexes, parallèles, rôles distincts |
Exemple concret — veille marché :
- L'agent de scraping → compétences web
- L'agent d'analyse → compétences data
- L'agent de rédaction → compétences writing SEO
Un seul agent devrait multitâcher ces trois domaines : moins efficace, plus cher en tokens.
Conception du système
5 minutes de conception avant d'écrire du code. Un système multi-agents mal conçu est un cauchemar à débugger.
Définir les rôles
Pour notre use case de veille marché, trois rôles :
Agent 1 — Scraper
Rôle : Collecter les actualités du secteur
Outils : Requêtes HTTP, parsing HTML
Output : Liste brute d'actualités structurées
Agent 2 — Analyste
Rôle : Analyser les actualités, détecter les signaux forts
Input : Résultat de l'agent Scraper
Output : Liste d'actus scorées par importance
Agent 3 — Rédacteur
Rôle : Rédiger le rapport de veille formaté
Input : Résultat de l'agent Analyste
Output : Rapport MD/HTML prêt à publish
Définir les interfaces
Chaque agent expose un protocole de messages :
from dataclasses import dataclass
from enum import Enum
class AgentMessageType(Enum):
SCRAPE_REQUEST = "scrape_request"
SCRAPE_RESULT = "scrape_result"
ANALYSIS_REQUEST = "analysis_request"
ANALYSIS_RESULT = "analysis_result"
REPORT_REQUEST = "report_request"
REPORT_RESULT = "report_result"
ERROR = "error"
@dataclass
class AgentMessage:
sender: str # "scraper" | "analyst" | "reporter"
recipient: str # "scraper" | "analyst" | "reporter" | "orchestrator"
type: AgentMessageType
payload: dict
trace_id: str # pour le distributed tracing
timestamp: str # ISO 8601
Définir le shared state
Le piège le plus courant : partager trop ou pas assez d'état.
Notre choix pour ce tutoriel : shared memory via un store centralisé (dictionnaire thread-safe). Pour des cas plus complexes, on passe à Redis.
from threading import Lock
from datetime import datetime
class SharedStore:
def __init__(self):
self._data = {}
self._lock = Lock()
def set(self, key: str, value) -> None:
with self._lock:
self._data[key] = {
"value": value,
"updated_at": datetime.utcnow().isoformat()
}
def get(self, key: str):
with self._lock:
entry = self._data.get(key)
return entry["value"] if entry else None
def append(self, key: str, value) -> None:
with self._lock:
if key not in self._data:
self._data[key] = {"value": [], "updated_at": ""}
self._data[key]["value"].append(value)
self._data[key]["updated_at"] = datetime.utcnow().isoformat()
shared_store = SharedStore()
Points de défaillance à anticiper
| Point de défaillance | Conséquence | Mitigation |
|---|---|---|
| Agent Scraper timeout | Pipeline bloqué | Timeout configurable + message d'erreur |
| Store corrompu | Données incohérentes | Validation schema sur write |
| Agent ne répond pas | Deadlock | TTL sur les messages + dead letter queue |
| Cascade d'erreurs | Propagation incontrôlée | Circuit breaker par agent |
Orchestration avec OpenClaw
OpenClaw est le framework d'orchestration qui pilote notre système multi-agents. Chaque agent tourne dans un contexte isolé mais partage la mémoire et les outils via le hub.
Setup des agents
AGENTS_CONFIG = {
"scraper": {
"role": "Web Scraper Agent",
"goal": "Collecter les actualités marché pertinentes",
"tools": ["http_request", "html_parser", "rss_reader"],
"backstory": (
"Tu es un expert du scraping web. Tu collectes rapidement et proprement "
"les articles de veille sans surcharger le système."
),
"memory_scope": "ephemeral",
},
"analyst": {
"role": "Market Analyst Agent",
"goal": "Identifier les signaux forts dans les actualités collectées",
"tools": ["text_similarity", "keyword_extractor", "sentiment_analyzer"],
"backstory": (
"Tu es analyste marché. Tu détectes les tendances, les anomalies "
"et les nouvelles importantes dans les données brutes."
),
"memory_scope": "session",
},
"reporter": {
"role": "Report Writer Agent",
"goal": "Rédiger un rapport de veille clair et actionnable",
"tools": ["markdown_writer", "seo_optimizer"],
"backstory": (
"Tu es rédacteur SEO expert. Tu produis des rapports structurés, "
"clairs et optimisés pour le lecteur."
),
"memory_scope": "session",
},
"orchestrator": {
"role": "Pipeline Orchestrator",
"goal": "Coordonner le pipeline scrape → analyse → report",
"tools": ["message_dispatcher", "shared_store_reader"],
"backstory": (
"Tu es le maestro du pipeline. Tu dispatches les tâches aux bons agents, "
"gères les erreurs et assembles le résultat final."
),
"memory_scope": "persistent",
},
}
Routing des messages
from openclaw.models import AgentMessage, AgentMessageType
from openclaw.context import get_current_agent
class MessageRouter:
def __init__(self, shared_store):
self.store = shared_store
def dispatch(self, message: AgentMessage) -> None:
current = get_current_agent()
self.store.append("dispatch_log", {
"from": message.sender,
"to": message.recipient,
"type": message.type.value,
"timestamp": message.timestamp,
})
if message.recipient == "orchestrator":
self._route_to_orchestrator(message)
elif message.recipient == "scraper":
self._route_to_scraper(message)
elif message.recipient == "analyst":
self._route_to_analyst(message)
elif message.recipient == "reporter":
self._route_to_reporter(message)
def _route_to_orchestrator(self, message: AgentMessage) -> None:
if message.type == AgentMessageType.SCRAPE_RESULT:
self._forward_to_analyst(message.payload)
elif message.type == AgentMessageType.ANALYSIS_RESULT:
self._forward_to_reporter(message.payload)
elif message.type == AgentMessageType.REPORT_RESULT:
self.store.set("final_report", message.payload)
elif message.type == AgentMessageType.ERROR:
self.store.append("pipeline_errors", {
"source": message.sender,
"error": message.payload.get("message"),
"timestamp": message.timestamp,
})
def _forward_to_analyst(self, payload: dict) -> None:
from openclaw.tools import invoke_agent
invoke_agent(agent_name="analyst", task={"type": "analysis_request", "data": payload})
def _forward_to_reporter(self, payload: dict) -> None:
from openclaw.tools import invoke_agent
invoke_agent(agent_name="reporter", task={"type": "report_request", "data": payload})
Shared memory avec OpenClaw
from openclaw.memory import VectorMemory, ShortTermMemory
def build_shared_memory():
return {
"vector_store": VectorMemory(
index_name="market_intel",
dimension=1536,
persist=True,
),
"short_term": ShortTermMemory(
max_items=100,
ttl_seconds=3600,
),
"long_term": {
"pipeline_history": [],
"source_credentials": {},
"report_templates": {},
},
}
Communication inter-agents
Trois patterns de communication pour les systèmes multi-agents Python.
Pattern 1 — Message Passing direct
Un agent envoie un message synchrone à un autre via le router. Bloquant jusqu'à réception.
def run_pipeline(urls: list[str]) -> dict:
# Étape 1 — Kick off scraper
message = AgentMessage(
sender="orchestrator",
recipient="scraper",
type=AgentMessageType.SCRAPE_REQUEST,
payload={"urls": urls, "selectors": [".article-title", ".article-body"]},
trace_id=generate_trace_id(),
timestamp=datetime.utcnow().isoformat(),
)
result = send_blocking(message)
# Étape 2 — Passer à l'analyse (non-bloquant via dispatch)
dispatch(AgentMessage(
sender="orchestrator",
recipient="analyst",
type=AgentMessageType.ANALYSIS_REQUEST,
payload={"articles": result["articles"]},
trace_id=message.trace_id,
timestamp=datetime.utcnow().isoformat(),
))
return {"status": "pipeline_started", "trace_id": message.trace_id}
Pattern 2 — Shared Memory (Store centralisé)
Les agents écrivent dans un store commun. Les autres lisent quand ils en ont besoin. Pattern idéal pour les données qui doivent survivre à la mort d'un agent.
class AnalystAgent:
def run(self, scraped_data: dict) -> dict:
articles = scraped_data["articles"]
# Analyse chaque article
scored = [{**a, "market_score": self._score_article(a)} for a in articles]
top = sorted(scored, key=lambda x: x["market_score"], reverse=True)[:10]
# Écrire dans le store partagé
shared_store.set("analyst_results", {
"articles": top,
"total_analyzed": len(articles),
"timestamp": datetime.utcnow().isoformat(),
})
return {"status": "analysis_complete", "count": len(top)}
def _score_article(self, article: dict) -> float:
market_keywords = [
"acquisition", "fusion", "IPO", "croissance", "investissement",
"partenariat", "lancement", "rétique", " expansion ",
]
text = (article.get("title", "") + " " + article.get("excerpt", "")).lower()
return sum(1 for kw in market_keywords if kw in text)
Pattern 3 — Event-driven avec Redis
Pour des systèmes multi-conteneurs ou haute disponibilité. Les agents Pub/Sub sur Redis. Non-bloquant, distribué.
import redis
import json
class EventBus:
def __init__(self, redis_url: str = "redis://redis:6379/0"):
self.redis = redis.from_url(redis_url)
def publish(self, channel: str, message: AgentMessage) -> None:
self.redis.publish(channel, json.dumps({
"sender": message.sender,
"type": message.type.value,
"payload": message.payload,
"trace_id": message.trace_id,
"timestamp": message.timestamp,
}))
def subscribe(self, channel: str, callback: Callable) -> None:
sub = self.redis.pubsub()
sub.subscribe(channel)
for msg in sub.listen():
if msg["type"] == "message":
callback(json.loads(msg["data"]))
Comparatif des patterns
| Critère | Message Passing | Shared Memory | Event-driven (Redis) |
|---|---|---|---|
| Latence | Faible (synchrone) | Très faible | Moyenne (réseau) |
| Complexité | Faible | Faible | Moyenne |
| Shared state | Non | Oui | Oui |
| Distribution | Non | Non | Oui |
| Fault tolerance | Faible | Moyenne | Forte |
Recommandation : démarrez avec shared memory. Passez à Redis quand le système atteint 3+ conteneurs ou требует haute disponibilité.
Implémentation pas-à-pas
Le code complet du pipeline scrape → analyse → report. Chaque fonction est production-ready.
Prérequis
pip install openclaw requests beautifulsoup4 lxml redis
Code complet — Pipeline de veille marché
# veille_pipeline.py
"""
Système multi-agents de veille marché.
Pipeline : scrape (Agent 1) → analyse (Agent 2) → report (Agent 3).
"""
import requests
from bs4 import BeautifulSoup
from datetime import datetime
from dataclasses import dataclass, field
from enum import Enum
import uuid
# ─── Shared State ────────────────────────────────────────────────────────────
from threading import Lock
class SharedStore:
def __init__(self):
self._data = {}
self._lock = Lock()
def set(self, key: str, value, metadata: dict = None):
with self._lock:
self._data[key] = {
"value": value,
"updated_at": datetime.utcnow().isoformat(),
**(metadata or {})
}
def get(self, key: str):
with self._lock:
entry = self._data.get(key)
return entry["value"] if entry else None
def append(self, key: str, value):
with self._lock:
if key not in self._data:
self._data[key] = {"value": [], "updated_at": ""}
self._data[key]["value"].append(value)
self._data[key]["updated_at"] = datetime.utcnow().isoformat()
shared_store = SharedStore()
# ─── Message Protocol ─────────────────────────────────────────────────────────
class AgentMessageType(Enum):
SCRAPE_REQUEST = "scrape_request"
SCRAPE_RESULT = "scrape_result"
ANALYSIS_REQUEST = "analysis_request"
ANALYSIS_RESULT = "analysis_result"
REPORT_REQUEST = "report_request"
REPORT_RESULT = "report_result"
ERROR = "error"
@dataclass
class AgentMessage:
sender: str
recipient: str
type: AgentMessageType
payload: dict
trace_id: str
timestamp: str = field(default_factory=lambda: datetime.utcnow().isoformat())
# ─── Agent 1 — Scraper ───────────────────────────────────────────────────────
class ScraperAgent:
def run(self, message: AgentMessage) -> AgentMessage:
urls = message.payload.get("urls", [])
selectors = message.payload.get("selectors", ["article", ".post"])
articles = []
for url in urls:
try:
res = requests.get(url, timeout=10, headers={
"User-Agent": "Mozilla/5.0 (compatible; MarketVeilBot/1.0)"
})
res.raise_for_status()
soup = BeautifulSoup(res.content, "lxml")
for article in soup.select(selectors[0]):
title_el = article.select_one("h2, h3") or article
body_el = article.select_one("p") or article
articles.append({
"url": url,
"title": title_el.get_text(strip=True),
"excerpt": body_el.get_text(strip=True)[:200],
"scraped_at": datetime.utcnow().isoformat(),
})
except requests.RequestException as e:
shared_store.append("scraper_errors", {
"url": url, "error": str(e),
"timestamp": datetime.utcnow().isoformat(),
})
return AgentMessage(
sender="scraper",
recipient="orchestrator",
type=AgentMessageType.SCRAPE_RESULT,
payload={"articles": articles, "count": len(articles)},
trace_id=message.trace_id,
)
# ─── Agent 2 — Analyste ──────────────────────────────────────────────────────
class AnalystAgent:
def _score_article(self, article: dict) -> float:
market_keywords = [
"acquisition", "fusion", "IPO", "croissance", "investissement",
"partenariat", "lancement", "rétique", " expansion ",
]
text = (article.get("title", "") + " " + article.get("excerpt", "")).lower()
return sum(1 for kw in market_keywords if kw in text)
def run(self, message: AgentMessage) -> AgentMessage:
articles = message.payload.get("articles", [])
scored = [{**a, "market_score": self._score_article(a)} for a in articles]
top = sorted(scored, key=lambda x: x["market_score"], reverse=True)[:10]
highlights = [{"title": a["title"], "excerpt": a["excerpt"][:100]} for a in top[:3]]
return AgentMessage(
sender="analyst",
recipient="orchestrator",
type=AgentMessageType.ANALYSIS_RESULT,
payload={
"top_articles": top,
"highlights": highlights,
"total_analyzed": len(articles),
"timestamp": datetime.utcnow().isoformat(),
},
trace_id=message.trace_id,
)
# ─── Agent 3 — Rédacteur ─────────────────────────────────────────────────────
class ReporterAgent:
def run(self, message: AgentMessage) -> AgentMessage:
analysis = message.payload
lines = [
f"# Veille Marché — {datetime.utcnow().strftime('%Y-%m-%d')}",
"",
"## Highlights",
"",
]
for i, h in enumerate(analysis["highlights"], 1):
lines.append(f"**{i}. {h['title']}**")
lines.append(f">{h['excerpt']}")
lines.append("")
lines.extend([
"## Analyse complète",
"",
f"* {analysis['total_analyzed']} articles analysés — Top 10 ci-dessous *",
"",
])
for i, a in enumerate(analysis["top_articles"], 1):
lines.append(f"### {i}. {a['title']}")
lines.append(f"Source : {a.get('url', 'N/A')}")
lines.append(f"Score marché : {a.get('market_score', 0)}/10")
lines.append(f"{a.get('excerpt', '')}")
lines.append("")
report = "\n".join(lines)
with open("/tmp/veille_rapport.md", "w") as f:
f.write(report)
shared_store.set("final_report", {
"report": report,
"generated_at": datetime.utcnow().isoformat(),
"article_count": analysis["total_analyzed"],
})
return AgentMessage(
sender="reporter",
recipient="orchestrator",
type=AgentMessageType.REPORT_RESULT,
payload={"report": report, "path": "/tmp/veille_rapport.md"},
trace_id=message.trace_id,
)
# ─── Orchestrateur ────────────────────────────────────────────────────────────
class MessageRouter:
def __init__(self, store):
self.store = store
self.scraper = ScraperAgent()
self.analyst = AnalystAgent()
self.reporter = ReporterAgent()
def route(self, message: AgentMessage) -> AgentMessage:
if message.recipient not in ("scraper", "analyst", "reporter", "orchestrator"):
return self._make_error(message, f"Unknown agent: {message.recipient}")
handlers = {
"scraper": self.scraper.run,
"analyst": self.analyst.run,
"reporter": self.reporter.run,
}
handler = handlers.get(message.recipient)
if handler:
try:
return handler(message)
except Exception as e:
return self._make_error(message, str(e))
return self._make_error(message, "No handler found")
def _make_error(self, message: AgentMessage, error: str) -> AgentMessage:
shared_store.append("pipeline_errors", {
"from": message.sender,
"error": error,
"timestamp": datetime.utcnow().isoformat(),
})
return AgentMessage(
sender=message.recipient,
recipient="orchestrator",
type=AgentMessageType.ERROR,
payload={"error": error, "original_message": message.payload},
trace_id=message.trace_id,
)
# ─── Pipeline principal ────────────────────────────────────────────────────────
def run_veille_pipeline(urls: list[str]) -> dict:
trace_id = str(uuid.uuid4())[:8]
print(f"[{trace_id}] Starting pipeline for {len(urls)} URLs")
router = MessageRouter(shared_store)
# Step 1 — Scrape
print(f"[{trace_id}] Step 1/3 — Scraping...")
scrape_msg = AgentMessage(
sender="orchestrator",
recipient="scraper",
type=AgentMessageType.SCRAPE_REQUEST,
payload={"urls": urls, "selectors": ["article", ".post"]},
trace_id=trace_id,
)
scrape_result = router.route(scrape_msg)
if scrape_result.type == AgentMessageType.ERROR:
return {"status": "error", "error": scrape_result.payload["error"]}
# Step 2 — Analyse
print(f"[{trace_id}] Step 2/3 — Analyzing...")
analysis_msg = AgentMessage(
sender="orchestrator",
recipient="analyst",
type=AgentMessageType.ANALYSIS_REQUEST,
payload=scrape_result.payload,
trace_id=trace_id,
)
analysis_result = router.route(analysis_msg)
if analysis_result.type == AgentMessageType.ERROR:
return {"status": "error", "error": analysis_result.payload["error"]}
# Step 3 — Rédaction
print(f"[{trace_id}] Step 3/3 — Reporting...")
report_msg = AgentMessage(
sender="orchestrator",
recipient="reporter",
type=AgentMessageType.REPORT_REQUEST,
payload=analysis_result.payload,
trace_id=trace_id,
)
report_result = router.route(report_msg)
print(f"[{trace_id}] Pipeline complete.")
return {
"status": "success",
"trace_id": trace_id,
"report": report_result.payload.get("report", ""),
"path": report_result.payload.get("path"),
}
if __name__ == "__main__":
result = run_veille_pipeline(["https://news.ycombinator.com/"])
print(result["report"] if result.get("report") else result)
Comment étendre le système
Le système actuel est linéaire (scrape → analyse → report). Pour l'étendre :
- Ajouter un agent de scoring — entre analyse et rédaction, un agent qui check la qualité SEO du rapport
- Paralleliser le scraping — modifier
ScraperAgent.run()pour traiter les URLs en parallèle avecThreadPoolExecutor - Ajouter Redis — remplacer
SharedStoreparEventBuspour le distribuer
Debugging d'un système multi-agents
Un pipeline multi-agents, c'est dur à débugger. Voici la boîte à outils.
Logging distribué
Chaque message porte un trace_id. Centralisez les logs :
import logging
import json
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)s [%(name)s] %(message)s",
)
def log_message(message: AgentMessage, status: str = "sent"):
logging.info(json.dumps({
"event": "agent_message",
"status": status,
"trace_id": message.trace_id,
"from": message.sender,
"to": message.recipient,
"type": message.type.value,
"timestamp": message.timestamp,
}))
Replay des conversations
def replay_pipeline(trace_id: str, store: SharedStore) -> list[dict]:
dispatch_log = store.get("dispatch_log") or {"value": []}
entries = dispatch_log["value"]
pipeline = [e for e in entries if e.get("trace_id") == trace_id]
return sorted(pipeline, key=lambda x: x["timestamp"])
Outils de diagnostic
| Outil | Usage |
|---|---|
shared_store.get_full("dispatch_log") | Voir le flux complet des messages |
shared_store.get("pipeline_errors") | Lister toutes les erreurs |
shared_store.get("scraper_errors") | Erreurs de scraping par URL |
replay_pipeline(trace_id, store) | Rejouer une exécution |
| Prometheus metrics | Latence par agent, taux d'erreur |
Déploiement en production
Docker Compose multi-conteneur
# docker-compose.yml
version: "3.9"
services:
app:
build: .
container_name: veille-app
restart: unless-stopped
volumes:
- ./reports:/app/reports
environment:
- REDIS_URL=redis://redis:6379/0
- LOG_LEVEL=INFO
depends_on:
redis:
condition: service_healthy
networks:
- veille-net
redis:
image: redis:7-alpine
container_name: veille-redis
restart: unless-stopped
healthcheck:
test: ["CMD", "redis-cli", "ping"]
interval: 10s
timeout: 5s
retries: 3
networks:
- veille-net
prometheus:
image: prom/prometheus:latest
container_name: veille-prometheus
restart: unless-stopped
volumes:
- ./prometheus.yml:/etc/prometheus/prometheus.yml
- prometheus-data:/prometheus
command:
- '--config.file=/etc/prometheus/prometheus.yml'
- '--storage.tsdb.path=/prometheus'
networks:
- veille-net
grafana:
image: grafana/grafana:latest
container_name: veille-grafana
restart: unless-stopped
ports:
- "3000:3000"
volumes:
- grafana-data:/var/lib/grafana
environment:
- GF_SECURITY_ADMIN_PASSWORD=${GRAFANA_PASSWORD}
networks:
- veille-net
networks:
veille-net:
driver: bridge
volumes:
prometheus-data:
grafana-data:
Configuration Prometheus
# prometheus.yml
global:
scrape_interval: 15s
scrape_configs:
- job_name: "veille-pipeline"
static_configs:
- targets: ["app:8000"]
metrics_path: "/metrics"
Monitoring et alertes
| Métrique | Seuil d'alerte | Action |
|---|---|---|
pipeline_errors_total | > 5/heure | Investiguer le dispatch log |
| Scrape latency | > 30s | Vérifier les sources |
| Analyse latency | > 10s | Vérifier la taille du dataset |
| Rédaction latency | > 5s | Vérifier le modèle |
Estimation des coûts
Pour un pipeline avec 3 agents OpenClaw :
| Élément | Coût estimé |
|---|---|
| Compute (1 conteneur) | ~5€/mois (Hetzner CPX) |
| Redis (géré) | ~5€/mois |
| Monitoring | ~0€ (auto-hébergé) |
| Total | ~10€/mois pour 100 runs/jour |
Questions fréquentes
Comment créer un système multi-agents en Python from scratch ?
La méthode la plus simple :
- Définissez 2-3 rôles distincts (ex : scraper, analyste, rédacteur)
- Implémentez un store partagé (dictionnaire thread-safe ou Redis)
- Définissez un protocole de messages entre agents
- Écrivez chaque agent comme une classe avec une méthode
run(message) - Ajoutez un orchestrateur qui route les messages d'un agent à l'autre
Pas besoin de framework pour démarrer. Le code ci-dessus fonctionne avec uniquement la stdlib Python + requests + beautifulsoup4.
Quel framework choisir pour un système multi-agents Python en production ?
| Framework | Force | Use case idéal |
|---|---|---|
| OpenClaw | Orchestration avancée, tool sharing | Production, multi-canal |
| CrewAI | Simplicité, prompt basé | Prototypage rapide |
| AutoGen | Conversational agents | Agents qui dialoguent |
| LangGraph | Graphes de workflow | Pipelines complexes avec cycles |
Pour ce tutoriel, on utilise OpenClaw pour son système d'orchestration et sa shared memory intégrés. Pour une alternative plus légère, voir notre guide sur CrewAI.
Comment gérer la communication entre agents distants ?
Trois options :
- Message passing synchrone : un agent appelle l'autre directement. Simple, faible latence.
- Shared memory : tous les agents écrivent/lisent dans un store commun. Bon pour la coordination sans couplage fort.
- Event bus (Redis) : agents Pub/Sub via Redis. Le seul pattern distribuable multi-conteneurs.
Quels sont les erreurs courantes avec les systèmes multi-agents ?
- Couplage fort — un agent qui dépend directement de l'implémentation d'un autre → utiliser des interfaces de messages
- Pas de circuit breaker — une erreur cascade et bloque tout → TTL sur les messages + dead letter queue
- Shared state non thread-safe — accès concurrent problématiques → Lock ou Redis
- Logging insuffisant — impossible de retracer une exécution → trace_id sur chaque message
Qu'est-ce qu'OpenClaw ?
OpenClaw est un framework d'agents IA avec un système d'orchestration intégré. Il permet de :
- Définir des agents avec rôle, goal, backstory et outils
- Partager des outils entre agents (tool sharing)
- Router des messages entre agents via un hub
- Persister la mémoire cross-session (vector store + short term)
- Déployer en production via Docker Compose
Ce qu'il faut retenir
- Un système multi-agents = agents spécialisés + protocole de communication
- Trois patterns de comm : message passing, shared memory, event-driven (Redis)
- OpenClaw comme framework d'orchestration → shared memory + tool sharing intégrés
- Debugging : trace_id sur chaque message + centralized logging
- Coût : ~10€/mois pour 100 runs/jour sur Hetzner + Redis géré
Articles liés
- Systèmes multi-agents — Guide complet — Concepts fondamentaux, patterns d'architecture
- OpenClaw — Guide complet — Configuration détaillée du framework
- CrewAI — Guide complet — Alternative légère pour prototyper rapidement
- Architectures multi-agents — Patterns derrière les systèmes modernes
- Outils agents IA — Bases vectorielles, mémoire, tool calling
Restez informé sur les agents IA
Nouveaux tutoriels, comparatifs et guides pratiques directement dans votre boîte mail.