FrameworksAgents.com Logo

Multi-agent system tutorial : de zéro à un système multi-agents en production

Tutorielcalendar_todayPublié le 28 mai 2026schedule16 min de lecturesystème multi-agents pythontutoriel multi-agent

Apprenez à construire un système multi-agents de A à Z. Ce tutoriel couvre la conception, l'orchestration, la communication inter-agents, le debugging et le déploiement d'un système multi-agents en production.

Multi-agent system tutorial : de zéro à un système multi-agents en production

Introduction

Un système multi-agents, ce n'est pas un chatbot boosté. C'est un ensemble d'entités autonomes qui communiquent, se coordonnent et partagent un objectif commun — sans que vous ayez à orchestrer chaque étape manuellement.

Dans ce tutoriel, on construit un système de veille marché concret :

  • Agent 1 — scrape les actualités
  • Agent 2 — les analyse
  • Agent 3 — rédige un rapport

Chaque agent a un rôle précis, une mémoire partagée, et un protocole de communication.

Stack utilisée : OpenClaw + Python + Docker.

Résumé rapide

CritèreValeur
Type d'articleTutoriel production-ready
Stack principaleOpenClaw + Python
Use caseVeille marché multi-agents
DéploiementDocker Compose
MonitoringPrometheus + Grafana
NiveauIntermédiaire à avancé

Qu'est-ce qu'un système multi-agents ?

Un système multi-agents est un ensemble d'agents IA autonomes qui collaborent pour atteindre un objectif qu'aucun agent ne pourrait accomplir seul.

Chaque agent possède :

  • Un rôle défini — spécialité, compétences, outils disponibles
  • Une mémoire propre — contexte, historique de ses tâches
  • Une capacité de communication — envoi et réception de messages
  • Une logique de décision — quand agir, quand attendre, quand déléguer

La différence avec un agent unique : la décomposition fonctionnelle. Plutôt qu'un agent giant qui fait tout, on crée plusieurs agents spécialisés qui coopèrent. C'est l'approche derrière les architectures multi-agents.

Pourquoi passer à un système multi-agents ?

ApprocheLimite
Agent unique monolithique✓ Tâches simples et linéaires
Système multi-agents✓ Pipelines complexes, parallèles, rôles distincts

Exemple concret — veille marché :

  • L'agent de scraping → compétences web
  • L'agent d'analyse → compétences data
  • L'agent de rédaction → compétences writing SEO

Un seul agent devrait multitâcher ces trois domaines : moins efficace, plus cher en tokens.

Conception du système

5 minutes de conception avant d'écrire du code. Un système multi-agents mal conçu est un cauchemar à débugger.

Définir les rôles

Pour notre use case de veille marché, trois rôles :

Agent 1 — Scraper
  Rôle    : Collecter les actualités du secteur
  Outils  : Requêtes HTTP, parsing HTML
  Output  : Liste brute d'actualités structurées

Agent 2 — Analyste
  Rôle    : Analyser les actualités, détecter les signaux forts
  Input   : Résultat de l'agent Scraper
  Output  : Liste d'actus scorées par importance

Agent 3 — Rédacteur
  Rôle    : Rédiger le rapport de veille formaté
  Input   : Résultat de l'agent Analyste
  Output  : Rapport MD/HTML prêt à publish

Définir les interfaces

Chaque agent expose un protocole de messages :

from dataclasses import dataclass
from enum import Enum

class AgentMessageType(Enum):
    SCRAPE_REQUEST = "scrape_request"
    SCRAPE_RESULT  = "scrape_result"
    ANALYSIS_REQUEST = "analysis_request"
    ANALYSIS_RESULT = "analysis_result"
    REPORT_REQUEST  = "report_request"
    REPORT_RESULT   = "report_result"
    ERROR          = "error"

@dataclass
class AgentMessage:
    sender:    str      # "scraper" | "analyst" | "reporter"
    recipient: str      # "scraper" | "analyst" | "reporter" | "orchestrator"
    type:       AgentMessageType
    payload:    dict
    trace_id:   str     # pour le distributed tracing
    timestamp:  str     # ISO 8601

Définir le shared state

Le piège le plus courant : partager trop ou pas assez d'état.

Notre choix pour ce tutoriel : shared memory via un store centralisé (dictionnaire thread-safe). Pour des cas plus complexes, on passe à Redis.

from threading import Lock
from datetime import datetime

class SharedStore:
    def __init__(self):
        self._data = {}
        self._lock = Lock()
    
    def set(self, key: str, value) -> None:
        with self._lock:
            self._data[key] = {
                "value": value,
                "updated_at": datetime.utcnow().isoformat()
            }
    
    def get(self, key: str):
        with self._lock:
            entry = self._data.get(key)
            return entry["value"] if entry else None
    
    def append(self, key: str, value) -> None:
        with self._lock:
            if key not in self._data:
                self._data[key] = {"value": [], "updated_at": ""}
            self._data[key]["value"].append(value)
            self._data[key]["updated_at"] = datetime.utcnow().isoformat()

shared_store = SharedStore()

Points de défaillance à anticiper

Point de défaillanceConséquenceMitigation
Agent Scraper timeoutPipeline bloquéTimeout configurable + message d'erreur
Store corrompuDonnées incohérentesValidation schema sur write
Agent ne répond pasDeadlockTTL sur les messages + dead letter queue
Cascade d'erreursPropagation incontrôléeCircuit breaker par agent

Orchestration avec OpenClaw

OpenClaw est le framework d'orchestration qui pilote notre système multi-agents. Chaque agent tourne dans un contexte isolé mais partage la mémoire et les outils via le hub.

Setup des agents

AGENTS_CONFIG = {
    "scraper": {
        "role": "Web Scraper Agent",
        "goal": "Collecter les actualités marché pertinentes",
        "tools": ["http_request", "html_parser", "rss_reader"],
        "backstory": (
            "Tu es un expert du scraping web. Tu collectes rapidement et proprement "
            "les articles de veille sans surcharger le système."
        ),
        "memory_scope": "ephemeral",
    },
    
    "analyst": {
        "role": "Market Analyst Agent",
        "goal": "Identifier les signaux forts dans les actualités collectées",
        "tools": ["text_similarity", "keyword_extractor", "sentiment_analyzer"],
        "backstory": (
            "Tu es analyste marché. Tu détectes les tendances, les anomalies "
            "et les nouvelles importantes dans les données brutes."
        ),
        "memory_scope": "session",
    },
    
    "reporter": {
        "role": "Report Writer Agent",
        "goal": "Rédiger un rapport de veille clair et actionnable",
        "tools": ["markdown_writer", "seo_optimizer"],
        "backstory": (
            "Tu es rédacteur SEO expert. Tu produis des rapports structurés, "
            "clairs et optimisés pour le lecteur."
        ),
        "memory_scope": "session",
    },
    
    "orchestrator": {
        "role": "Pipeline Orchestrator",
        "goal": "Coordonner le pipeline scrape → analyse → report",
        "tools": ["message_dispatcher", "shared_store_reader"],
        "backstory": (
            "Tu es le maestro du pipeline. Tu dispatches les tâches aux bons agents, "
            "gères les erreurs et assembles le résultat final."
        ),
        "memory_scope": "persistent",
    },
}

Routing des messages

from openclaw.models import AgentMessage, AgentMessageType
from openclaw.context import get_current_agent

class MessageRouter:
    def __init__(self, shared_store):
        self.store = shared_store
    
    def dispatch(self, message: AgentMessage) -> None:
        current = get_current_agent()
        
        self.store.append("dispatch_log", {
            "from": message.sender,
            "to": message.recipient,
            "type": message.type.value,
            "timestamp": message.timestamp,
        })
        
        if message.recipient == "orchestrator":
            self._route_to_orchestrator(message)
        elif message.recipient == "scraper":
            self._route_to_scraper(message)
        elif message.recipient == "analyst":
            self._route_to_analyst(message)
        elif message.recipient == "reporter":
            self._route_to_reporter(message)
    
    def _route_to_orchestrator(self, message: AgentMessage) -> None:
        if message.type == AgentMessageType.SCRAPE_RESULT:
            self._forward_to_analyst(message.payload)
        elif message.type == AgentMessageType.ANALYSIS_RESULT:
            self._forward_to_reporter(message.payload)
        elif message.type == AgentMessageType.REPORT_RESULT:
            self.store.set("final_report", message.payload)
        elif message.type == AgentMessageType.ERROR:
            self.store.append("pipeline_errors", {
                "source": message.sender,
                "error": message.payload.get("message"),
                "timestamp": message.timestamp,
            })
    
    def _forward_to_analyst(self, payload: dict) -> None:
        from openclaw.tools import invoke_agent
        invoke_agent(agent_name="analyst", task={"type": "analysis_request", "data": payload})
    
    def _forward_to_reporter(self, payload: dict) -> None:
        from openclaw.tools import invoke_agent
        invoke_agent(agent_name="reporter", task={"type": "report_request", "data": payload})

Shared memory avec OpenClaw

from openclaw.memory import VectorMemory, ShortTermMemory

def build_shared_memory():
    return {
        "vector_store": VectorMemory(
            index_name="market_intel",
            dimension=1536,
            persist=True,
        ),
        "short_term": ShortTermMemory(
            max_items=100,
            ttl_seconds=3600,
        ),
        "long_term": {
            "pipeline_history": [],
            "source_credentials": {},
            "report_templates": {},
        },
    }

Communication inter-agents

Trois patterns de communication pour les systèmes multi-agents Python.

Pattern 1 — Message Passing direct

Un agent envoie un message synchrone à un autre via le router. Bloquant jusqu'à réception.

def run_pipeline(urls: list[str]) -> dict:
    # Étape 1 — Kick off scraper
    message = AgentMessage(
        sender="orchestrator",
        recipient="scraper",
        type=AgentMessageType.SCRAPE_REQUEST,
        payload={"urls": urls, "selectors": [".article-title", ".article-body"]},
        trace_id=generate_trace_id(),
        timestamp=datetime.utcnow().isoformat(),
    )
    result = send_blocking(message)
    
    # Étape 2 — Passer à l'analyse (non-bloquant via dispatch)
    dispatch(AgentMessage(
        sender="orchestrator",
        recipient="analyst",
        type=AgentMessageType.ANALYSIS_REQUEST,
        payload={"articles": result["articles"]},
        trace_id=message.trace_id,
        timestamp=datetime.utcnow().isoformat(),
    ))
    
    return {"status": "pipeline_started", "trace_id": message.trace_id}

Pattern 2 — Shared Memory (Store centralisé)

Les agents écrivent dans un store commun. Les autres lisent quand ils en ont besoin. Pattern idéal pour les données qui doivent survivre à la mort d'un agent.

class AnalystAgent:
    def run(self, scraped_data: dict) -> dict:
        articles = scraped_data["articles"]
        
        # Analyse chaque article
        scored = [{**a, "market_score": self._score_article(a)} for a in articles]
        top = sorted(scored, key=lambda x: x["market_score"], reverse=True)[:10]
        
        # Écrire dans le store partagé
        shared_store.set("analyst_results", {
            "articles": top,
            "total_analyzed": len(articles),
            "timestamp": datetime.utcnow().isoformat(),
        })
        
        return {"status": "analysis_complete", "count": len(top)}
    
    def _score_article(self, article: dict) -> float:
        market_keywords = [
            "acquisition", "fusion", "IPO", "croissance", "investissement",
            "partenariat", "lancement", "rétique", " expansion ",
        ]
        text = (article.get("title", "") + " " + article.get("excerpt", "")).lower()
        return sum(1 for kw in market_keywords if kw in text)

Pattern 3 — Event-driven avec Redis

Pour des systèmes multi-conteneurs ou haute disponibilité. Les agents Pub/Sub sur Redis. Non-bloquant, distribué.

import redis
import json

class EventBus:
    def __init__(self, redis_url: str = "redis://redis:6379/0"):
        self.redis = redis.from_url(redis_url)
    
    def publish(self, channel: str, message: AgentMessage) -> None:
        self.redis.publish(channel, json.dumps({
            "sender": message.sender,
            "type": message.type.value,
            "payload": message.payload,
            "trace_id": message.trace_id,
            "timestamp": message.timestamp,
        }))
    
    def subscribe(self, channel: str, callback: Callable) -> None:
        sub = self.redis.pubsub()
        sub.subscribe(channel)
        for msg in sub.listen():
            if msg["type"] == "message":
                callback(json.loads(msg["data"]))

Comparatif des patterns

CritèreMessage PassingShared MemoryEvent-driven (Redis)
LatenceFaible (synchrone)Très faibleMoyenne (réseau)
ComplexitéFaibleFaibleMoyenne
Shared stateNonOuiOui
DistributionNonNonOui
Fault toleranceFaibleMoyenneForte

Recommandation : démarrez avec shared memory. Passez à Redis quand le système atteint 3+ conteneurs ou требует haute disponibilité.

Implémentation pas-à-pas

Le code complet du pipeline scrape → analyse → report. Chaque fonction est production-ready.

Prérequis

pip install openclaw requests beautifulsoup4 lxml redis

Code complet — Pipeline de veille marché

# veille_pipeline.py
"""
Système multi-agents de veille marché.
Pipeline : scrape (Agent 1) → analyse (Agent 2) → report (Agent 3).
"""

import requests
from bs4 import BeautifulSoup
from datetime import datetime
from dataclasses import dataclass, field
from enum import Enum
import uuid

# ─── Shared State ────────────────────────────────────────────────────────────

from threading import Lock

class SharedStore:
    def __init__(self):
        self._data = {}
        self._lock = Lock()
    
    def set(self, key: str, value, metadata: dict = None):
        with self._lock:
            self._data[key] = {
                "value": value,
                "updated_at": datetime.utcnow().isoformat(),
                **(metadata or {})
            }
    
    def get(self, key: str):
        with self._lock:
            entry = self._data.get(key)
            return entry["value"] if entry else None
    
    def append(self, key: str, value):
        with self._lock:
            if key not in self._data:
                self._data[key] = {"value": [], "updated_at": ""}
            self._data[key]["value"].append(value)
            self._data[key]["updated_at"] = datetime.utcnow().isoformat()

shared_store = SharedStore()

# ─── Message Protocol ─────────────────────────────────────────────────────────

class AgentMessageType(Enum):
    SCRAPE_REQUEST = "scrape_request"
    SCRAPE_RESULT  = "scrape_result"
    ANALYSIS_REQUEST = "analysis_request"
    ANALYSIS_RESULT = "analysis_result"
    REPORT_REQUEST  = "report_request"
    REPORT_RESULT   = "report_result"
    ERROR          = "error"

@dataclass
class AgentMessage:
    sender:    str
    recipient: str
    type:      AgentMessageType
    payload:   dict
    trace_id:  str
    timestamp: str = field(default_factory=lambda: datetime.utcnow().isoformat())

# ─── Agent 1 — Scraper ───────────────────────────────────────────────────────

class ScraperAgent:
    def run(self, message: AgentMessage) -> AgentMessage:
        urls = message.payload.get("urls", [])
        selectors = message.payload.get("selectors", ["article", ".post"])
        articles = []
        
        for url in urls:
            try:
                res = requests.get(url, timeout=10, headers={
                    "User-Agent": "Mozilla/5.0 (compatible; MarketVeilBot/1.0)"
                })
                res.raise_for_status()
                soup = BeautifulSoup(res.content, "lxml")
                
                for article in soup.select(selectors[0]):
                    title_el = article.select_one("h2, h3") or article
                    body_el = article.select_one("p") or article
                    
                    articles.append({
                        "url": url,
                        "title": title_el.get_text(strip=True),
                        "excerpt": body_el.get_text(strip=True)[:200],
                        "scraped_at": datetime.utcnow().isoformat(),
                    })
            except requests.RequestException as e:
                shared_store.append("scraper_errors", {
                    "url": url, "error": str(e),
                    "timestamp": datetime.utcnow().isoformat(),
                })
        
        return AgentMessage(
            sender="scraper",
            recipient="orchestrator",
            type=AgentMessageType.SCRAPE_RESULT,
            payload={"articles": articles, "count": len(articles)},
            trace_id=message.trace_id,
        )

# ─── Agent 2 — Analyste ──────────────────────────────────────────────────────

class AnalystAgent:
    def _score_article(self, article: dict) -> float:
        market_keywords = [
            "acquisition", "fusion", "IPO", "croissance", "investissement",
            "partenariat", "lancement", "rétique", " expansion ",
        ]
        text = (article.get("title", "") + " " + article.get("excerpt", "")).lower()
        return sum(1 for kw in market_keywords if kw in text)
    
    def run(self, message: AgentMessage) -> AgentMessage:
        articles = message.payload.get("articles", [])
        
        scored = [{**a, "market_score": self._score_article(a)} for a in articles]
        top = sorted(scored, key=lambda x: x["market_score"], reverse=True)[:10]
        highlights = [{"title": a["title"], "excerpt": a["excerpt"][:100]} for a in top[:3]]
        
        return AgentMessage(
            sender="analyst",
            recipient="orchestrator",
            type=AgentMessageType.ANALYSIS_RESULT,
            payload={
                "top_articles": top,
                "highlights": highlights,
                "total_analyzed": len(articles),
                "timestamp": datetime.utcnow().isoformat(),
            },
            trace_id=message.trace_id,
        )

# ─── Agent 3 — Rédacteur ─────────────────────────────────────────────────────

class ReporterAgent:
    def run(self, message: AgentMessage) -> AgentMessage:
        analysis = message.payload
        
        lines = [
            f"# Veille Marché — {datetime.utcnow().strftime('%Y-%m-%d')}",
            "",
            "## Highlights",
            "",
        ]
        
        for i, h in enumerate(analysis["highlights"], 1):
            lines.append(f"**{i}. {h['title']}**")
            lines.append(f">{h['excerpt']}")
            lines.append("")
        
        lines.extend([
            "## Analyse complète",
            "",
            f"* {analysis['total_analyzed']} articles analysés — Top 10 ci-dessous *",
            "",
        ])
        
        for i, a in enumerate(analysis["top_articles"], 1):
            lines.append(f"### {i}. {a['title']}")
            lines.append(f"Source : {a.get('url', 'N/A')}")
            lines.append(f"Score marché : {a.get('market_score', 0)}/10")
            lines.append(f"{a.get('excerpt', '')}")
            lines.append("")
        
        report = "\n".join(lines)
        
        with open("/tmp/veille_rapport.md", "w") as f:
            f.write(report)
        
        shared_store.set("final_report", {
            "report": report,
            "generated_at": datetime.utcnow().isoformat(),
            "article_count": analysis["total_analyzed"],
        })
        
        return AgentMessage(
            sender="reporter",
            recipient="orchestrator",
            type=AgentMessageType.REPORT_RESULT,
            payload={"report": report, "path": "/tmp/veille_rapport.md"},
            trace_id=message.trace_id,
        )

# ─── Orchestrateur ────────────────────────────────────────────────────────────

class MessageRouter:
    def __init__(self, store):
        self.store = store
        self.scraper = ScraperAgent()
        self.analyst = AnalystAgent()
        self.reporter = ReporterAgent()
    
    def route(self, message: AgentMessage) -> AgentMessage:
        if message.recipient not in ("scraper", "analyst", "reporter", "orchestrator"):
            return self._make_error(message, f"Unknown agent: {message.recipient}")
        
        handlers = {
            "scraper": self.scraper.run,
            "analyst": self.analyst.run,
            "reporter": self.reporter.run,
        }
        
        handler = handlers.get(message.recipient)
        if handler:
            try:
                return handler(message)
            except Exception as e:
                return self._make_error(message, str(e))
        
        return self._make_error(message, "No handler found")
    
    def _make_error(self, message: AgentMessage, error: str) -> AgentMessage:
        shared_store.append("pipeline_errors", {
            "from": message.sender,
            "error": error,
            "timestamp": datetime.utcnow().isoformat(),
        })
        return AgentMessage(
            sender=message.recipient,
            recipient="orchestrator",
            type=AgentMessageType.ERROR,
            payload={"error": error, "original_message": message.payload},
            trace_id=message.trace_id,
        )

# ─── Pipeline principal ────────────────────────────────────────────────────────

def run_veille_pipeline(urls: list[str]) -> dict:
    trace_id = str(uuid.uuid4())[:8]
    print(f"[{trace_id}] Starting pipeline for {len(urls)} URLs")
    
    router = MessageRouter(shared_store)
    
    # Step 1 — Scrape
    print(f"[{trace_id}] Step 1/3 — Scraping...")
    scrape_msg = AgentMessage(
        sender="orchestrator",
        recipient="scraper",
        type=AgentMessageType.SCRAPE_REQUEST,
        payload={"urls": urls, "selectors": ["article", ".post"]},
        trace_id=trace_id,
    )
    scrape_result = router.route(scrape_msg)
    
    if scrape_result.type == AgentMessageType.ERROR:
        return {"status": "error", "error": scrape_result.payload["error"]}
    
    # Step 2 — Analyse
    print(f"[{trace_id}] Step 2/3 — Analyzing...")
    analysis_msg = AgentMessage(
        sender="orchestrator",
        recipient="analyst",
        type=AgentMessageType.ANALYSIS_REQUEST,
        payload=scrape_result.payload,
        trace_id=trace_id,
    )
    analysis_result = router.route(analysis_msg)
    
    if analysis_result.type == AgentMessageType.ERROR:
        return {"status": "error", "error": analysis_result.payload["error"]}
    
    # Step 3 — Rédaction
    print(f"[{trace_id}] Step 3/3 — Reporting...")
    report_msg = AgentMessage(
        sender="orchestrator",
        recipient="reporter",
        type=AgentMessageType.REPORT_REQUEST,
        payload=analysis_result.payload,
        trace_id=trace_id,
    )
    report_result = router.route(report_msg)
    
    print(f"[{trace_id}] Pipeline complete.")
    return {
        "status": "success",
        "trace_id": trace_id,
        "report": report_result.payload.get("report", ""),
        "path": report_result.payload.get("path"),
    }

if __name__ == "__main__":
    result = run_veille_pipeline(["https://news.ycombinator.com/"])
    print(result["report"] if result.get("report") else result)

Comment étendre le système

Le système actuel est linéaire (scrape → analyse → report). Pour l'étendre :

  1. Ajouter un agent de scoring — entre analyse et rédaction, un agent qui check la qualité SEO du rapport
  2. Paralleliser le scraping — modifier ScraperAgent.run() pour traiter les URLs en parallèle avec ThreadPoolExecutor
  3. Ajouter Redis — remplacer SharedStore par EventBus pour le distribuer

Debugging d'un système multi-agents

Un pipeline multi-agents, c'est dur à débugger. Voici la boîte à outils.

Logging distribué

Chaque message porte un trace_id. Centralisez les logs :

import logging
import json

logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s %(levelname)s [%(name)s] %(message)s",
)

def log_message(message: AgentMessage, status: str = "sent"):
    logging.info(json.dumps({
        "event": "agent_message",
        "status": status,
        "trace_id": message.trace_id,
        "from": message.sender,
        "to": message.recipient,
        "type": message.type.value,
        "timestamp": message.timestamp,
    }))

Replay des conversations

def replay_pipeline(trace_id: str, store: SharedStore) -> list[dict]:
    dispatch_log = store.get("dispatch_log") or {"value": []}
    entries = dispatch_log["value"]
    
    pipeline = [e for e in entries if e.get("trace_id") == trace_id]
    return sorted(pipeline, key=lambda x: x["timestamp"])

Outils de diagnostic

OutilUsage
shared_store.get_full("dispatch_log")Voir le flux complet des messages
shared_store.get("pipeline_errors")Lister toutes les erreurs
shared_store.get("scraper_errors")Erreurs de scraping par URL
replay_pipeline(trace_id, store)Rejouer une exécution
Prometheus metricsLatence par agent, taux d'erreur

Déploiement en production

Docker Compose multi-conteneur

# docker-compose.yml
version: "3.9"

services:
  
  app:
    build: .
    container_name: veille-app
    restart: unless-stopped
    volumes:
      - ./reports:/app/reports
    environment:
      - REDIS_URL=redis://redis:6379/0
      - LOG_LEVEL=INFO
    depends_on:
      redis:
        condition: service_healthy
    networks:
      - veille-net

  redis:
    image: redis:7-alpine
    container_name: veille-redis
    restart: unless-stopped
    healthcheck:
      test: ["CMD", "redis-cli", "ping"]
      interval: 10s
      timeout: 5s
      retries: 3
    networks:
      - veille-net

  prometheus:
    image: prom/prometheus:latest
    container_name: veille-prometheus
    restart: unless-stopped
    volumes:
      - ./prometheus.yml:/etc/prometheus/prometheus.yml
      - prometheus-data:/prometheus
    command:
      - '--config.file=/etc/prometheus/prometheus.yml'
      - '--storage.tsdb.path=/prometheus'
    networks:
      - veille-net

  grafana:
    image: grafana/grafana:latest
    container_name: veille-grafana
    restart: unless-stopped
    ports:
      - "3000:3000"
    volumes:
      - grafana-data:/var/lib/grafana
    environment:
      - GF_SECURITY_ADMIN_PASSWORD=${GRAFANA_PASSWORD}
    networks:
      - veille-net

networks:
  veille-net:
    driver: bridge

volumes:
  prometheus-data:
  grafana-data:

Configuration Prometheus

# prometheus.yml
global:
  scrape_interval: 15s

scrape_configs:
  - job_name: "veille-pipeline"
    static_configs:
      - targets: ["app:8000"]
    metrics_path: "/metrics"

Monitoring et alertes

MétriqueSeuil d'alerteAction
pipeline_errors_total> 5/heureInvestiguer le dispatch log
Scrape latency> 30sVérifier les sources
Analyse latency> 10sVérifier la taille du dataset
Rédaction latency> 5sVérifier le modèle

Estimation des coûts

Pour un pipeline avec 3 agents OpenClaw :

ÉlémentCoût estimé
Compute (1 conteneur)~5€/mois (Hetzner CPX)
Redis (géré)~5€/mois
Monitoring~0€ (auto-hébergé)
Total~10€/mois pour 100 runs/jour

Questions fréquentes

Comment créer un système multi-agents en Python from scratch ?

La méthode la plus simple :

  1. Définissez 2-3 rôles distincts (ex : scraper, analyste, rédacteur)
  2. Implémentez un store partagé (dictionnaire thread-safe ou Redis)
  3. Définissez un protocole de messages entre agents
  4. Écrivez chaque agent comme une classe avec une méthode run(message)
  5. Ajoutez un orchestrateur qui route les messages d'un agent à l'autre

Pas besoin de framework pour démarrer. Le code ci-dessus fonctionne avec uniquement la stdlib Python + requests + beautifulsoup4.

Quel framework choisir pour un système multi-agents Python en production ?

FrameworkForceUse case idéal
OpenClawOrchestration avancée, tool sharingProduction, multi-canal
CrewAISimplicité, prompt baséPrototypage rapide
AutoGenConversational agentsAgents qui dialoguent
LangGraphGraphes de workflowPipelines complexes avec cycles

Pour ce tutoriel, on utilise OpenClaw pour son système d'orchestration et sa shared memory intégrés. Pour une alternative plus légère, voir notre guide sur CrewAI.

Comment gérer la communication entre agents distants ?

Trois options :

  • Message passing synchrone : un agent appelle l'autre directement. Simple, faible latence.
  • Shared memory : tous les agents écrivent/lisent dans un store commun. Bon pour la coordination sans couplage fort.
  • Event bus (Redis) : agents Pub/Sub via Redis. Le seul pattern distribuable multi-conteneurs.

Quels sont les erreurs courantes avec les systèmes multi-agents ?

  1. Couplage fort — un agent qui dépend directement de l'implémentation d'un autre → utiliser des interfaces de messages
  2. Pas de circuit breaker — une erreur cascade et bloque tout → TTL sur les messages + dead letter queue
  3. Shared state non thread-safe — accès concurrent problématiques → Lock ou Redis
  4. Logging insuffisant — impossible de retracer une exécution → trace_id sur chaque message

Qu'est-ce qu'OpenClaw ?

OpenClaw est un framework d'agents IA avec un système d'orchestration intégré. Il permet de :

  • Définir des agents avec rôle, goal, backstory et outils
  • Partager des outils entre agents (tool sharing)
  • Router des messages entre agents via un hub
  • Persister la mémoire cross-session (vector store + short term)
  • Déployer en production via Docker Compose

Ce qu'il faut retenir

  • Un système multi-agents = agents spécialisés + protocole de communication
  • Trois patterns de comm : message passing, shared memory, event-driven (Redis)
  • OpenClaw comme framework d'orchestration → shared memory + tool sharing intégrés
  • Debugging : trace_id sur chaque message + centralized logging
  • Coût : ~10€/mois pour 100 runs/jour sur Hetzner + Redis géré

Articles liés

Restez informé sur les agents IA

Nouveaux tutoriels, comparatifs et guides pratiques directement dans votre boîte mail.

homeAccueilcodeFrameworkssmart_toyAgentsmenu_bookTutorielsTwitter