Automatiser un workflow complexe avec l'IA
Fan-out, pipeline, conditional, state machine : maîtrisez les patterns d'orchestration IA. Stack 2026, code OpenClaw et exemple concret.
Automatiser un workflow complexe avec l'IA : patterns et implémentation
Introduction
Un script qui appelle une API toutes les 5 minutes, ce n'est pas un workflow complexe. C'est un cron un peu sophistiqué. Un vrai workflow complexe, c'est un système avec état partagé, embranchements conditionnels, reprise sur erreur, et cycles de feedback. Quand la complexité monte, les approches naïves lâchent.
Dans cet article, on explore les 4 patterns d'orchestration qui permettent de structurer ces workflows — fan-out, pipeline, conditional branching, et state machine — avec du code concret en OpenClaw et LangGraph. On regarde aussi la stack technique recommandée pour la production en 2026, et on finit avec un exemple complet de pipeline de veille concurrentielle que vous pouvez adapter dès demain.
Résumé rapide
| Critère | Fan-out | Pipeline | Conditional | State Machine |
|---|---|---|---|---|
| Structure | Parallèle | Séquentielle | Branchement | Graphe à états |
| Cas d'usage | Tâches indépendantes simultanées | Traitement en étapes | Logique métier à bifurcations | Workflows avec étapes + transitions |
| Complexité de gestion | Moyenne | Faible | Moyenne | Élevée |
| Outil principal | OpenClaw tasks | OpenClaw pipeline | OpenClaw conditional + n8n | LangGraph |
| Reprise sur erreur | Retry par tâche | Retry global | Retry par branche | Retry par état |
💡 Pas sûr du pattern qui correspond à votre cas ? Chaque pattern répond à un niveau de complexité spécifique — ne complexifiez pas prématurément.
Comprendre les workflows complexes
Ce qui distingue un workflow complexe d'un script simple
Un script simple suit une sequence linéaire : entrée → traitement → sortie.
Un workflow complexe réunit trois critères accumulationnels :
1. Multi-étapes avec état partagé — Les données circulent entre les étapes. Chaque étape hérite du contexte des précédentes.
2. Branchement conditionnel — Certaines branches s'exécutent, d'autres non, selon l'état courant.
3. Gestion des erreurs intégrée — Une exception ne fait pas échouer tout le workflow. Le système relance, contourne, ou redirige proprement.
Quand ces trois critères sont réunis, un script simple ne suffit plus. Il faut une architecture d'orchestration.
Pourquoi les patterns d'orchestration changent tout
Les patterns d'orchestration sont des modèles réutilisables pour catégories de problèmes récurrents.
Un workflow bien patterné est :
- Debuggable — sémantique claire par pattern
- Testable — chaque bloc se valide isolément
- Scalable — fan-out et pipeline se paralélisent
- Monitorable — chaque transition est traçable
Les 4 patterns d'orchestration
1. Fan-out — Traitement parallèle
Le pattern fan-out dispatche une tâche vers plusieurs exécuteurs en parallèle. Résultats agrégés en fin de cycle.
Quand l'utiliser :
- Scrapper 50 pages web en même temps
- Envoyer une notification à une liste de destinataires
- Lancer une analyse sur plusieurs segments clients simultanément
Quand éviter :
- Si les tâches ont des dépendances entre elles
- Si le nombre de tâches parallèles explose vos limites de rate API
# OpenClaw fan-out pattern
import asyncio
from openclaw import TaskRunner, Agent
async def scrape_competitor_pages(urls: list[str]) -> list[dict]:
async def scrape_one(url: str) -> dict:
agent = Agent(role="web-scraper", model="claude")
result = await agent.run(f"Scrape the page at {url} and return structured data")
return {"url": url, "data": result}
# Fan-out : toutes les tâches en parallèle
tasks = [scrape_one(url) for url in urls]
results = await asyncio.gather(*tasks, return_exceptions=True)
# Agrégation avec gestion des erreurs
valid = [r for r in results if isinstance(r, dict)]
failed = [str(r) for r in results if not isinstance(r, dict)]
return {"scrape_results": valid, "errors": failed}
2. Pipeline — Séquence d'étapes
Le pattern pipeline enchaîne les étapes dans un ordre strict. Chaque étape reçoit le output de la précédente. C'est le modèle le plus intuitif et le plus-simple à débugger.
Quand l'utiliser :
- ETL : extraire → transformer → charger
- Rédaction : research → draft → review → publish
- Traitement de document : parse → extract → structure → store
Quand éviter :
- Si certaines étapes peuvent s'exécuter indépendamment (préférez fan-out ou hybride)
- Si le flux a des embranchements (préférez conditional ou state machine)
# OpenClaw pipeline pattern
async def veille_concurrentielle_pipeline(urls: list[str]) -> dict:
async def scrape(urls: list[str]) -> list[dict]:
results = await scrape_competitor_pages(urls)
return results["scrape_results"]
async def extract(data_list: list[dict]) -> list[dict]:
agent = Agent(role="data-extractor", model="claude")
extracted = []
for item in data_list:
result = await agent.run(
f"Extract key information from: {item['data']}",
context=item
)
extracted.append(result)
return extracted
async def compare_and_alert(data: list[dict]) -> dict:
agent = Agent(role="analyst", model="claude")
report = await agent.run(
"Compare these competitor data points and identify key changes",
context={"data": data}
)
return report
# Pipeline sériel
scraped = await scrape(urls)
extracted = await extract(scraped)
report = await compare_and_alert(extracted)
return report
3. Conditional Branching — Logique à embranchements
Le pattern conditional exécute différentes branches selon l'état courant. C'est un if/else structuré, mais sur un état partagé et avec des hooks d'entrée/sortie par branche.
Quand l'utiliser :
- Routage automatique selon le type de demande client
- Validation conditionnelle (si X → action Y, sinon action Z)
- Attribution de traitement selon des seuils
Quand éviter :
- Si vous avez plus de 5 niveaux de branchement (préférez state machine)
- Si les conditions mélangent trop de variables (préférez table de décision)
# OpenClaw conditional pattern
from openclaw import ConditionalStep, Agent
async def route_demande(demande: dict) -> dict:
agent = Agent(role="router", model="claude")
routing = await agent.run(
"Classify this request type: escalation_needed, standard, urgent",
context=demande
)
routing = routing.lower().strip()
if "escalation" in routing:
return await escalade(demande)
elif "urgent" in routing:
return await traitement_urgent(demande)
else:
return await traitement_standard(demande)
async def escalade(d: dict) -> dict:
escalation_agent = Agent(role="escalation", model="claude")
return await escalation_agent.run(
"Create escalation report for human review",
context=d
)
async def traitement_urgent(d: dict) -> dict:
urgent_agent = Agent(role="urgent-handler", model="claude")
return await urgent_agent.run(
"Handle with priority SLA",
context=d
)
async def traitement_standard(d: dict) -> dict:
standard_agent = Agent(role="standard-handler", model="claude")
return await standard_agent.run(
"Process through standard workflow",
context=d
)
4. State Machine avec LangGraph — Graphe d'états
Le pattern state machine modélise le workflow comme un graphe d'états. Chaque état a des transitions définies vers d'autres états. La machine reçoit un état courant, applique une transition, et produit un nouvel état. C'est le pattern le plus puissant pour les workflows complexes avec de multiples étapes et des transitions conditionnelles.
Quand l'utiliser :
- Workflow multi-agents avec cycles de feedback
- Processus métier avec étapes non-linéaires
- Systèmes de dialogue et de mémoire persistante
- Workflows où le même état peut revenir à une étape précédente
Quand éviter :
- Workflow simples et linéaires (trop d'overhead)
- Si vous n'avez pas besoin de retour en arrière
# LangGraph State Machine
from langgraph.graph import StateGraph, END
from typing import TypedDict, Optional
from openclaw import Agent
class WorkflowState(TypedDict):
demande: dict
status: str
rapport: Optional[dict]
errors: list
def create_workflow_graph():
graph = StateGraph(WorkflowState)
# Définir les nœuds
graph.add_node("validate", validate_node)
graph.add_node("process", process_node)
graph.add_node("review", review_node)
graph.add_node("escalate", escalate_node)
graph.add_node("deliver", deliver_node)
# Point d'entrée
graph.set_entry_point("validate")
# Transitions conditionnelles
graph.add_conditional_edges(
"validate",
route_from_validation,
{
"escalate": "escalate",
"process": "process"
}
)
graph.add_edge("process", "review")
graph.add_conditional_edges(
"review",
route_from_review,
{
"process": "process",
"deliver": "deliver"
}
)
graph.add_edge("escalate", END)
graph.add_edge("deliver", END)
return graph.compile()
def route_from_validation(state: WorkflowState) -> str:
if state.get("errors"):
return "escalate"
return "process"
def route_from_review(state: WorkflowState) -> str:
if state["status"] == "needs_revision":
return "process"
return "deliver"
# Nodes
async def validate_node(state: WorkflowState) -> WorkflowState:
agent = Agent(role="validator", model="claude")
result = await agent.run("Validate input data", context=state["demande"])
return {"status": "validated"}
async def process_node(state: WorkflowState) -> WorkflowState:
agent = Agent(role="processor", model="claude")
rapport = await agent.run("Process the demande", context=state["demande"])
return {"rapport": rapport, "status": "processed"}
async def review_node(state: WorkflowState) -> WorkflowState:
agent = Agent(role="reviewer", model="claude")
verdict = await agent.run("Review the report", context=state["rapport"])
return {"status": verdict}
async def escalate_node(state: WorkflowState) -> WorkflowState:
return {"status": "escalated"}
async def deliver_node(state: WorkflowState) -> WorkflowState:
return {"status": "delivered"}
# Compilation et exécution
graph = create_workflow_graph()
result = await graph.ainvoke({
"demande": {"user_id": "123", "type": "report"},
"status": "pending",
"rapport": None,
"errors": []
})
📌 Vous avez choisi vos patterns ? Passons à la stack technique recommandée pour la production en 2026.
Stack technique recommandée en 2026
Architecture de référence
L'architecture recommandée pour un workflow IA complexe en production repose sur quatre couches :
| Couche | Outil | Rôle |
|---|---|---|
| Orchestration principale | OpenClaw | Coordonne les agents, skills, tâches et sous-agents |
| Triggers & Webhooks | n8n | Déclenche les workflows sur événements (schedule, webhook, form) |
| State management | LangGraph | Gère les machines à états et le contexte persistant |
| Mémoire partagée | PostgreSQL + pgvector | Store vecteur pour retrieval, état pour audit |
Pourquoi ce stack
OpenClaw — conducteur central. Orchestre les sous-agents, skills, et le contexte.
n8n — triggers schedule-based et webhooks. Interfaçage avec le monde extérieur (APIs tierces, notifications, bases de données).
LangGraph — state machine pour les graphes d'états avec transitions complexes, cycles, et retours arrière.
PostgreSQL + pgvector — mémoire persistante. Vecteur pour le retrieval sémantique, relationnel pour l'état et les logs.
Intégration OpenClaw + LangGraph + n8n
L'intégration type fonctionne comme suit :
n8n (trigger cron/webhook)
→ appelle un webhook OpenClaw
→ Lance un sous-agent qui lance le workflow LangGraph
→ Les nodes LangGraph délèguent à des agents OpenClaw
→ Logs et état → PostgreSQL/pgvector
→ n8n notifie les canaux (Slack, email)
Exemple concret : pipeline veille concurrentielle complet
📋 Contexte — Vous surveillez 10 concurrents. Chaque matin : scraper → extraire → comparer → alerter → rapporter.
Vous êtes une équipe produit qui surveille 10 concurrents. Chaque matin, vous voulez :
- Scrapper les 10 blogs/pages produit de vos concurrents
- Extraire les trois metrics principales (prix, features, news)
- Comparer avec votre état précédent
- Générer une alerte si un changement significatif est détecté
- Produire un rapport structuré
Implémentation OpenClaw
"""
Pipeline complet de veille concurrentielle
Déclenché par n8n chaque jour via webhook OpenClaw
"""
import asyncio
from openclaw import Agent, TaskRunner, memory_store
from datetime import datetime
COMPETITORS = [
"https://competitor-a.com/blog",
"https://competitor-b.com/changelog",
"https://competitor-c.com/news",
# ... 10 URLs total
]
async def scrape_task(url: str) -> dict:
"""Tâche fan-out : scraper une page."""
agent = Agent(role="scraper", model="claude")
try:
content = await agent.run(
f"Fetch and parse the page {url}. Extract: title, main content, dates, prices.",
max_retries=2,
timeout=30
)
return {"status": "success", "url": url, "content": content}
except Exception as e:
return {"status": "error", "url": url, "error": str(e)}
async def extract_task(raw: dict) -> dict:
"""Extraction structurée d'un contenu scrapé."""
if raw["status"] == "error":
return raw
agent = Agent(role="extractor", model="claude")
extracted = await agent.run(
"Extract key information from scraped content as JSON",
context={"content": raw["content"]}
)
return {"url": raw["url"], "extracted": extracted}
async def compare_alert_task(extracted_items: list[dict], prev_state: dict) -> dict:
"""Comparaison avec l'état précédent + generation d'alertes."""
agent = Agent(role="analyst", model="claude")
alert_report = await agent.run(
"Compare current data vs previous state. Identify changes. Return alert level: NONE / LOW / MEDIUM / HIGH",
context={"current": extracted_items, "previous": prev_state or []}
)
return alert_report
async def generate_report(alerts: dict, all_data: list[dict]) -> str:
"""Rapport final formaté pour la publication."""
agent = Agent(role="reporter", model="claude")
report = await agent.run(
"Generate a concise morning briefing in Markdown. Sections per competitor. Only HIGH and MEDIUM alerts. Under 500 words.",
context={"alerts": alerts, "data": all_data}
)
return report
async def veille_pipeline():
# Étape 1 : Fan-out scraping
print(f"[{datetime.now():%H:%M}] Scrapping {len(COMPETITORS)} competitors...")
scrape_results = await asyncio.gather(
*[scrape_task(url) for url in COMPETITORS],
return_exceptions=True
)
valid_scrape = [r for r in scrape_results if isinstance(r, dict) and r["status"] == "success"]
# Étape 2 : Extraction
print(f"[{datetime.now():%H:%M}] Extracting data from {len(valid_scrape)} pages...")
extracted = await asyncio.gather(*[extract_task(r) for r in valid_scrape])
# Étape 3 : Récupérer l'état précédent
prev_state = await memory_store.get("veille_etat_precedent")
# Étape 4 : Comparaison et alertes
print(f"[{datetime.now():%H:%M}] Analyzing changes...")
alerts = await compare_alert_task(extracted, prev_state or [])
# Étape 5 : Génération du rapport
report = await generate_report(alerts, extracted)
# Étape 6 : Sauvegarder l'état courant
await memory_store.set("veille_etat_precedent", extracted)
return {
"report": report,
"alert_level": alerts.get("alert_level", "NONE"),
"pages_scraped": len(valid_scrape),
"competitors_count": len(COMPETITORS)
}
if __name__ == "__main__":
result = asyncio.run(veille_pipeline())
print(result["report"])
Ce que ce pipeline accomplit
En production : rapport de veille en 3 à 5 minutes pour 10 concurrents (vs 2-3h manuel). Alerting automatique sur changements de prix et features.
Les points de crispation gérés :
- Rate limiting — les retries avec backoff évitent le ban des serveurs cibles
- Partial failure — si un scrap échoue, les 9 autres continuent
- Memory continuity — pgvector stocke l'historique pour des comparaisons de tendances sur 30 jours
Gestion d'erreurs avancée
Retry exponentiel
Un rate limit qui recover en 10 secondes n'a pas besoin qu'on attende 60 secondes. Le retry basique ne suffit pas.
import asyncio
import random
async def retry_with_backoff(
fn,
*args,
base_delay: float = 1.0,
max_delay: float = 60.0,
max_retries: int = 5,
jitter: bool = True,
**kwargs
):
"""Retry avec backoff exponentiel + jitter."""
for attempt in range(max_retries):
try:
return await fn(*args, **kwargs)
except RateLimitError:
if attempt == max_retries - 1:
raise
delay = min(base_delay * (2 ** attempt), max_delay)
if jitter:
delay = delay * (0.5 + random.random())
print(f"Rate limited. Retry in {delay:.1f}s (attempt {attempt+1}/{max_retries})")
await asyncio.sleep(delay)
except TransientError as e:
if attempt == max_retries - 1:
raise
delay = base_delay * (2 ** attempt)
await asyncio.sleep(delay)
Circuit Breaker pattern
Le circuit breaker coupe la cascade d'échecs. Quand un service fail N fois, le breaker s'ouvre et rejecte immédiatement les appels.
from enum import Enum
class CircuitState(Enum):
CLOSED = "closed"
OPEN = "open"
HALF_OPEN = "half_open"
class CircuitBreaker:
def __init__(self, threshold: int = 5, timeout: float = 60.0):
self.threshold = threshold
self.timeout = timeout
self.state = CircuitState.CLOSED
self.failures = 0
self.last_failure_time = None
async def call(self, fn, *args, **kwargs):
if self.state == CircuitState.OPEN:
if asyncio.get_event_loop().time() - self.last_failure_time > self.timeout:
self.state = CircuitState.HALF_OPEN
else:
raise CircuitOpenError(f"Circuit open. Retry after {self.timeout}s")
result = None
try:
result = await fn(*args, **kwargs)
self._on_success()
return result
except Exception:
self._on_failure()
raise
def _on_success(self):
self.failures = 0
self.state = CircuitState.CLOSED
def _on_failure(self):
self.failures += 1
self.last_failure_time = asyncio.get_event_loop().time()
if self.failures >= self.threshold:
self.state = CircuitState.OPEN
Dead Letter Queue
Les messages qui échouent après tous les retries vont en DLQ (Dead Letter Queue) pour inspection et réinjection later.
import asyncpg
import json
import os
async def dlq_enqueue(message: dict, error: str, workflow_id: str):
conn = await asyncpg.connect(os.environ["DATABASE_URL"])
await conn.execute("""
INSERT INTO dlq (workflow_id, payload, error, failed_at)
VALUES ($1, $2, $3, NOW())
""", workflow_id, json.dumps(message), error)
await conn.close()
async def dlq_reinject(dlq_id: int):
conn = await asyncpg.connect(os.environ["DATABASE_URL"])
row = await conn.fetchrow("SELECT * FROM dlq WHERE id = $1", dlq_id)
await conn.execute("DELETE FROM dlq WHERE id = $1", dlq_id)
await conn.close()
await workflow_manager.replay(json.loads(row["payload"]))
Monitoring et observabilité
Centraliser les logs
Sans centralisation, les logs sont éparpillés et les dashboards incompatibles. La stack recommandée :
- Structured logging — JSON par défaut (pas de plain text)
- Agent de collecte — Vector ou Fluentd vers un agrégateur
- Stockage — Loki pour les logs, Prometheus pour les metrics, Grafana pour la visualisation
# Configuration Vector pour OpenClaw logs
sources:
openclaw_logs:
type: file
reads: follow
include:
- /var/log/openclaw/*.log
transforms:
normalize:
type: remap
source: |
. = parse_json!(.message)
.agent_id = .metadata.agent_id
.workflow_id = .metadata.workflow_id
.duration_ms = .metrics.duration_ms
sinks:
loki:
type: loki
endpoint: https://loki.example.com
labels:
env: production
service: openclaw
Métriques clés à suivre
| Métrique | Description | Seuil d'alerte |
|---|---|---|
workflow_duration_seconds | Temps total d'exécution | > P95 historique + 20% |
agent_calls_total | Nombre d'appels agents | Spike > 3x baseline |
step_error_rate | Taux d'erreur par étape | > 5% |
api_cost_usd | Coût cumulé des appels API | > budget journalier |
retry_rate | Pourcentage de tâches relancées | > 20% |
dlq_size | Nombre de messages en DLQ | > 0 (alert immediate) |
Dashboard Grafana type
Créez un dashboard avec 4 panneaux :
- Workflow executions — timeline des executions avec statut (success/failed/running)
- Step Error Rate — heatmap par étape (identifie les étapes problématiques)
- Cost over time — area chart du coût journalier avec projection mensuelle
- DLQ Monitor — single stat avec trend, passe en rouge si > 0
Questions fréquentes
Qu'est-ce qu'un fan-out agent et quand l'utiliser ?
Un fan-out agent dispatche des sous-tâches à plusieurs agents en parallèle. Utilisez-le quand les tâches sont indépendantes. Scrapper 20 pages en même temps au lieu de séquentiellement divise le temps par 20.
Quelle est la différence entre un pipeline et une state machine ?
Pipeline = étapes dans un ordre fixe et linéaire. State machine = états avec transitions conditionnelles. Pipeline plus simple ; state machine plus puissante pour branchements, cycles, et retours en arrière.
Comment gérer les erreurs dans un workflow multi-agents ?
Trois couches : (1) retry exponentiel avec jitter pour les erreurs temporaires, (2) circuit breaker pour éviter de submerger un service qui fail, (3) DLQ pour stocker les messages qui échouent après tous les retries.
OpenClaw suffit-il pour orchestrer un workflow complexe ?
Pour les workflows pipeline ou fan-out, OpenClaw suffit amplement. Pour les embranchements profonds ou les cycles de feedback, combinez avec LangGraph (state machine) et n8n (triggers).
Comment monitorer un workflow IA en production ?
Centralisez les logs (Vector → Loki), exportez les metrics (Prometheus), visualisez (Grafana). Alertez sur le taux d'erreur par étape, la taille DLQ, et le coût cumulé.
Articles liés
Un workflow complexe se construit en ajoutant progressivement de la sophistication : commencez par un pipeline simple, ajoutez le fan-out quand les tâches sont indépendantes, passez à la state machine quand la logique métier devient un graphe d'états. Chaque pattern répond à un niveau de complexité spécifique — ne complexifiez pas prématurément.
Si vous découvrez l'orchestration d'agents, le guide complet sur OpenClaw explique les fondamentaux. Pour approfondir les state machines, le guide LangGraph détaille la modélisation de workflows complexes. Le tutoriel sur les workflows agentiques montre comment combiner ces patterns pas à pas. Enfin, la veille automatisée illustre un cas d'usage concret inspiré de l'exemple de cet article.
- Comment installer OpenClaw sur un VPS — Préparer votre environnement de production
- Pilote Automatisation agents IA — Vue d'ensemble du cluster automatisation
- Frameworks d'agents IA — Panorama complet des frameworks du marché
- Outils agents IA — Les outils complémentaires (vector stores, memory, tools)
- Automatiser la recherche concurrentielle — Use case veille IA concret
Restez informé sur les agents IA
Nouveaux tutoriels, comparatifs et guides pratiques directement dans votre boîte mail.