FrameworksAgents.com Logo

Automatiser un workflow complexe avec l'IA

Guidecalendar_todayMis à jour le 1 juin 2026schedule16 min de lectureworkflow ia complexeautomatisation ia workflow

Fan-out, pipeline, conditional, state machine : maîtrisez les patterns d'orchestration IA. Stack 2026, code OpenClaw et exemple concret.

Automatiser un workflow complexe avec l'IA : patterns et implémentation

Introduction

Un script qui appelle une API toutes les 5 minutes, ce n'est pas un workflow complexe. C'est un cron un peu sophistiqué. Un vrai workflow complexe, c'est un système avec état partagé, embranchements conditionnels, reprise sur erreur, et cycles de feedback. Quand la complexité monte, les approches naïves lâchent.

Dans cet article, on explore les 4 patterns d'orchestration qui permettent de structurer ces workflows — fan-out, pipeline, conditional branching, et state machine — avec du code concret en OpenClaw et LangGraph. On regarde aussi la stack technique recommandée pour la production en 2026, et on finit avec un exemple complet de pipeline de veille concurrentielle que vous pouvez adapter dès demain.


Résumé rapide

CritèreFan-outPipelineConditionalState Machine
StructureParallèleSéquentielleBranchementGraphe à états
Cas d'usageTâches indépendantes simultanéesTraitement en étapesLogique métier à bifurcationsWorkflows avec étapes + transitions
Complexité de gestionMoyenneFaibleMoyenneÉlevée
Outil principalOpenClaw tasksOpenClaw pipelineOpenClaw conditional + n8nLangGraph
Reprise sur erreurRetry par tâcheRetry globalRetry par brancheRetry par état

💡 Pas sûr du pattern qui correspond à votre cas ? Chaque pattern répond à un niveau de complexité spécifique — ne complexifiez pas prématurément.


Comprendre les workflows complexes

Ce qui distingue un workflow complexe d'un script simple

Un script simple suit une sequence linéaire : entrée → traitement → sortie.

Un workflow complexe réunit trois critères accumulationnels :

1. Multi-étapes avec état partagé — Les données circulent entre les étapes. Chaque étape hérite du contexte des précédentes.

2. Branchement conditionnel — Certaines branches s'exécutent, d'autres non, selon l'état courant.

3. Gestion des erreurs intégrée — Une exception ne fait pas échouer tout le workflow. Le système relance, contourne, ou redirige proprement.

Quand ces trois critères sont réunis, un script simple ne suffit plus. Il faut une architecture d'orchestration.

Pourquoi les patterns d'orchestration changent tout

Les patterns d'orchestration sont des modèles réutilisables pour catégories de problèmes récurrents.

Un workflow bien patterné est :

  • Debuggable — sémantique claire par pattern
  • Testable — chaque bloc se valide isolément
  • Scalable — fan-out et pipeline se paralélisent
  • Monitorable — chaque transition est traçable

Les 4 patterns d'orchestration

1. Fan-out — Traitement parallèle

Le pattern fan-out dispatche une tâche vers plusieurs exécuteurs en parallèle. Résultats agrégés en fin de cycle.

Quand l'utiliser :

  • Scrapper 50 pages web en même temps
  • Envoyer une notification à une liste de destinataires
  • Lancer une analyse sur plusieurs segments clients simultanément

Quand éviter :

  • Si les tâches ont des dépendances entre elles
  • Si le nombre de tâches parallèles explose vos limites de rate API
# OpenClaw fan-out pattern
import asyncio
from openclaw import TaskRunner, Agent

async def scrape_competitor_pages(urls: list[str]) -> list[dict]:
    async def scrape_one(url: str) -> dict:
        agent = Agent(role="web-scraper", model="claude")
        result = await agent.run(f"Scrape the page at {url} and return structured data")
        return {"url": url, "data": result}

    # Fan-out : toutes les tâches en parallèle
    tasks = [scrape_one(url) for url in urls]
    results = await asyncio.gather(*tasks, return_exceptions=True)

    # Agrégation avec gestion des erreurs
    valid = [r for r in results if isinstance(r, dict)]
    failed = [str(r) for r in results if not isinstance(r, dict)]

    return {"scrape_results": valid, "errors": failed}

2. Pipeline — Séquence d'étapes

Le pattern pipeline enchaîne les étapes dans un ordre strict. Chaque étape reçoit le output de la précédente. C'est le modèle le plus intuitif et le plus-simple à débugger.

Quand l'utiliser :

  • ETL : extraire → transformer → charger
  • Rédaction : research → draft → review → publish
  • Traitement de document : parse → extract → structure → store

Quand éviter :

  • Si certaines étapes peuvent s'exécuter indépendamment (préférez fan-out ou hybride)
  • Si le flux a des embranchements (préférez conditional ou state machine)
# OpenClaw pipeline pattern
async def veille_concurrentielle_pipeline(urls: list[str]) -> dict:
    async def scrape(urls: list[str]) -> list[dict]:
        results = await scrape_competitor_pages(urls)
        return results["scrape_results"]

    async def extract(data_list: list[dict]) -> list[dict]:
        agent = Agent(role="data-extractor", model="claude")
        extracted = []
        for item in data_list:
            result = await agent.run(
                f"Extract key information from: {item['data']}",
                context=item
            )
            extracted.append(result)
        return extracted

    async def compare_and_alert(data: list[dict]) -> dict:
        agent = Agent(role="analyst", model="claude")
        report = await agent.run(
            "Compare these competitor data points and identify key changes",
            context={"data": data}
        )
        return report

    # Pipeline sériel
    scraped = await scrape(urls)
    extracted = await extract(scraped)
    report = await compare_and_alert(extracted)

    return report

3. Conditional Branching — Logique à embranchements

Le pattern conditional exécute différentes branches selon l'état courant. C'est un if/else structuré, mais sur un état partagé et avec des hooks d'entrée/sortie par branche.

Quand l'utiliser :

  • Routage automatique selon le type de demande client
  • Validation conditionnelle (si X → action Y, sinon action Z)
  • Attribution de traitement selon des seuils

Quand éviter :

  • Si vous avez plus de 5 niveaux de branchement (préférez state machine)
  • Si les conditions mélangent trop de variables (préférez table de décision)
# OpenClaw conditional pattern
from openclaw import ConditionalStep, Agent

async def route_demande(demande: dict) -> dict:
    agent = Agent(role="router", model="claude")
    routing = await agent.run(
        "Classify this request type: escalation_needed, standard, urgent",
        context=demande
    )
    routing = routing.lower().strip()

    if "escalation" in routing:
        return await escalade(demande)
    elif "urgent" in routing:
        return await traitement_urgent(demande)
    else:
        return await traitement_standard(demande)

async def escalade(d: dict) -> dict:
    escalation_agent = Agent(role="escalation", model="claude")
    return await escalation_agent.run(
        "Create escalation report for human review",
        context=d
    )

async def traitement_urgent(d: dict) -> dict:
    urgent_agent = Agent(role="urgent-handler", model="claude")
    return await urgent_agent.run(
        "Handle with priority SLA",
        context=d
    )

async def traitement_standard(d: dict) -> dict:
    standard_agent = Agent(role="standard-handler", model="claude")
    return await standard_agent.run(
        "Process through standard workflow",
        context=d
    )

4. State Machine avec LangGraph — Graphe d'états

Le pattern state machine modélise le workflow comme un graphe d'états. Chaque état a des transitions définies vers d'autres états. La machine reçoit un état courant, applique une transition, et produit un nouvel état. C'est le pattern le plus puissant pour les workflows complexes avec de multiples étapes et des transitions conditionnelles.

Quand l'utiliser :

  • Workflow multi-agents avec cycles de feedback
  • Processus métier avec étapes non-linéaires
  • Systèmes de dialogue et de mémoire persistante
  • Workflows où le même état peut revenir à une étape précédente

Quand éviter :

  • Workflow simples et linéaires (trop d'overhead)
  • Si vous n'avez pas besoin de retour en arrière
# LangGraph State Machine
from langgraph.graph import StateGraph, END
from typing import TypedDict, Optional
from openclaw import Agent

class WorkflowState(TypedDict):
    demande: dict
    status: str
    rapport: Optional[dict]
    errors: list

def create_workflow_graph():
    graph = StateGraph(WorkflowState)

    # Définir les nœuds
    graph.add_node("validate", validate_node)
    graph.add_node("process", process_node)
    graph.add_node("review", review_node)
    graph.add_node("escalate", escalate_node)
    graph.add_node("deliver", deliver_node)

    # Point d'entrée
    graph.set_entry_point("validate")

    # Transitions conditionnelles
    graph.add_conditional_edges(
        "validate",
        route_from_validation,
        {
            "escalate": "escalate",
            "process": "process"
        }
    )

    graph.add_edge("process", "review")
    graph.add_conditional_edges(
        "review",
        route_from_review,
        {
            "process": "process",
            "deliver": "deliver"
        }
    )

    graph.add_edge("escalate", END)
    graph.add_edge("deliver", END)

    return graph.compile()

def route_from_validation(state: WorkflowState) -> str:
    if state.get("errors"):
        return "escalate"
    return "process"

def route_from_review(state: WorkflowState) -> str:
    if state["status"] == "needs_revision":
        return "process"
    return "deliver"

# Nodes
async def validate_node(state: WorkflowState) -> WorkflowState:
    agent = Agent(role="validator", model="claude")
    result = await agent.run("Validate input data", context=state["demande"])
    return {"status": "validated"}

async def process_node(state: WorkflowState) -> WorkflowState:
    agent = Agent(role="processor", model="claude")
    rapport = await agent.run("Process the demande", context=state["demande"])
    return {"rapport": rapport, "status": "processed"}

async def review_node(state: WorkflowState) -> WorkflowState:
    agent = Agent(role="reviewer", model="claude")
    verdict = await agent.run("Review the report", context=state["rapport"])
    return {"status": verdict}

async def escalate_node(state: WorkflowState) -> WorkflowState:
    return {"status": "escalated"}

async def deliver_node(state: WorkflowState) -> WorkflowState:
    return {"status": "delivered"}

# Compilation et exécution
graph = create_workflow_graph()
result = await graph.ainvoke({
    "demande": {"user_id": "123", "type": "report"},
    "status": "pending",
    "rapport": None,
    "errors": []
})

📌 Vous avez choisi vos patterns ? Passons à la stack technique recommandée pour la production en 2026.

Stack technique recommandée en 2026

Architecture de référence

L'architecture recommandée pour un workflow IA complexe en production repose sur quatre couches :

CoucheOutilRôle
Orchestration principaleOpenClawCoordonne les agents, skills, tâches et sous-agents
Triggers & Webhooksn8nDéclenche les workflows sur événements (schedule, webhook, form)
State managementLangGraphGère les machines à états et le contexte persistant
Mémoire partagéePostgreSQL + pgvectorStore vecteur pour retrieval, état pour audit

Pourquoi ce stack

OpenClaw — conducteur central. Orchestre les sous-agents, skills, et le contexte.

n8n — triggers schedule-based et webhooks. Interfaçage avec le monde extérieur (APIs tierces, notifications, bases de données).

LangGraph — state machine pour les graphes d'états avec transitions complexes, cycles, et retours arrière.

PostgreSQL + pgvector — mémoire persistante. Vecteur pour le retrieval sémantique, relationnel pour l'état et les logs.

Intégration OpenClaw + LangGraph + n8n

L'intégration type fonctionne comme suit :

n8n (trigger cron/webhook)
  → appelle un webhook OpenClaw
      → Lance un sous-agent qui lance le workflow LangGraph
          → Les nodes LangGraph délèguent à des agents OpenClaw
              → Logs et état → PostgreSQL/pgvector
                  → n8n notifie les canaux (Slack, email)

Exemple concret : pipeline veille concurrentielle complet

📋 Contexte — Vous surveillez 10 concurrents. Chaque matin : scraper → extraire → comparer → alerter → rapporter.

Vous êtes une équipe produit qui surveille 10 concurrents. Chaque matin, vous voulez :

  1. Scrapper les 10 blogs/pages produit de vos concurrents
  2. Extraire les trois metrics principales (prix, features, news)
  3. Comparer avec votre état précédent
  4. Générer une alerte si un changement significatif est détecté
  5. Produire un rapport structuré

Implémentation OpenClaw

"""
Pipeline complet de veille concurrentielle
Déclenché par n8n chaque jour via webhook OpenClaw
"""

import asyncio
from openclaw import Agent, TaskRunner, memory_store
from datetime import datetime

COMPETITORS = [
    "https://competitor-a.com/blog",
    "https://competitor-b.com/changelog",
    "https://competitor-c.com/news",
    # ... 10 URLs total
]

async def scrape_task(url: str) -> dict:
    """Tâche fan-out : scraper une page."""
    agent = Agent(role="scraper", model="claude")
    try:
        content = await agent.run(
            f"Fetch and parse the page {url}. Extract: title, main content, dates, prices.",
            max_retries=2,
            timeout=30
        )
        return {"status": "success", "url": url, "content": content}
    except Exception as e:
        return {"status": "error", "url": url, "error": str(e)}

async def extract_task(raw: dict) -> dict:
    """Extraction structurée d'un contenu scrapé."""
    if raw["status"] == "error":
        return raw

    agent = Agent(role="extractor", model="claude")
    extracted = await agent.run(
        "Extract key information from scraped content as JSON",
        context={"content": raw["content"]}
    )
    return {"url": raw["url"], "extracted": extracted}

async def compare_alert_task(extracted_items: list[dict], prev_state: dict) -> dict:
    """Comparaison avec l'état précédent + generation d'alertes."""
    agent = Agent(role="analyst", model="claude")
    alert_report = await agent.run(
        "Compare current data vs previous state. Identify changes. Return alert level: NONE / LOW / MEDIUM / HIGH",
        context={"current": extracted_items, "previous": prev_state or []}
    )
    return alert_report

async def generate_report(alerts: dict, all_data: list[dict]) -> str:
    """Rapport final formaté pour la publication."""
    agent = Agent(role="reporter", model="claude")
    report = await agent.run(
        "Generate a concise morning briefing in Markdown. Sections per competitor. Only HIGH and MEDIUM alerts. Under 500 words.",
        context={"alerts": alerts, "data": all_data}
    )
    return report

async def veille_pipeline():
    # Étape 1 : Fan-out scraping
    print(f"[{datetime.now():%H:%M}] Scrapping {len(COMPETITORS)} competitors...")
    scrape_results = await asyncio.gather(
        *[scrape_task(url) for url in COMPETITORS],
        return_exceptions=True
    )
    valid_scrape = [r for r in scrape_results if isinstance(r, dict) and r["status"] == "success"]

    # Étape 2 : Extraction
    print(f"[{datetime.now():%H:%M}] Extracting data from {len(valid_scrape)} pages...")
    extracted = await asyncio.gather(*[extract_task(r) for r in valid_scrape])

    # Étape 3 : Récupérer l'état précédent
    prev_state = await memory_store.get("veille_etat_precedent")

    # Étape 4 : Comparaison et alertes
    print(f"[{datetime.now():%H:%M}] Analyzing changes...")
    alerts = await compare_alert_task(extracted, prev_state or [])

    # Étape 5 : Génération du rapport
    report = await generate_report(alerts, extracted)

    # Étape 6 : Sauvegarder l'état courant
    await memory_store.set("veille_etat_precedent", extracted)

    return {
        "report": report,
        "alert_level": alerts.get("alert_level", "NONE"),
        "pages_scraped": len(valid_scrape),
        "competitors_count": len(COMPETITORS)
    }

if __name__ == "__main__":
    result = asyncio.run(veille_pipeline())
    print(result["report"])

Ce que ce pipeline accomplit

En production : rapport de veille en 3 à 5 minutes pour 10 concurrents (vs 2-3h manuel). Alerting automatique sur changements de prix et features.

Les points de crispation gérés :

  • Rate limiting — les retries avec backoff évitent le ban des serveurs cibles
  • Partial failure — si un scrap échoue, les 9 autres continuent
  • Memory continuity — pgvector stocke l'historique pour des comparaisons de tendances sur 30 jours

Gestion d'erreurs avancée

Retry exponentiel

Un rate limit qui recover en 10 secondes n'a pas besoin qu'on attende 60 secondes. Le retry basique ne suffit pas.

import asyncio
import random

async def retry_with_backoff(
    fn,
    *args,
    base_delay: float = 1.0,
    max_delay: float = 60.0,
    max_retries: int = 5,
    jitter: bool = True,
    **kwargs
):
    """Retry avec backoff exponentiel + jitter."""
    for attempt in range(max_retries):
        try:
            return await fn(*args, **kwargs)
        except RateLimitError:
            if attempt == max_retries - 1:
                raise
            delay = min(base_delay * (2 ** attempt), max_delay)
            if jitter:
                delay = delay * (0.5 + random.random())
            print(f"Rate limited. Retry in {delay:.1f}s (attempt {attempt+1}/{max_retries})")
            await asyncio.sleep(delay)
        except TransientError as e:
            if attempt == max_retries - 1:
                raise
            delay = base_delay * (2 ** attempt)
            await asyncio.sleep(delay)

Circuit Breaker pattern

Le circuit breaker coupe la cascade d'échecs. Quand un service fail N fois, le breaker s'ouvre et rejecte immédiatement les appels.

from enum import Enum

class CircuitState(Enum):
    CLOSED = "closed"
    OPEN = "open"
    HALF_OPEN = "half_open"

class CircuitBreaker:
    def __init__(self, threshold: int = 5, timeout: float = 60.0):
        self.threshold = threshold
        self.timeout = timeout
        self.state = CircuitState.CLOSED
        self.failures = 0
        self.last_failure_time = None

    async def call(self, fn, *args, **kwargs):
        if self.state == CircuitState.OPEN:
            if asyncio.get_event_loop().time() - self.last_failure_time > self.timeout:
                self.state = CircuitState.HALF_OPEN
            else:
                raise CircuitOpenError(f"Circuit open. Retry after {self.timeout}s")

        result = None
        try:
            result = await fn(*args, **kwargs)
            self._on_success()
            return result
        except Exception:
            self._on_failure()
            raise

    def _on_success(self):
        self.failures = 0
        self.state = CircuitState.CLOSED

    def _on_failure(self):
        self.failures += 1
        self.last_failure_time = asyncio.get_event_loop().time()
        if self.failures >= self.threshold:
            self.state = CircuitState.OPEN

Dead Letter Queue

Les messages qui échouent après tous les retries vont en DLQ (Dead Letter Queue) pour inspection et réinjection later.

import asyncpg
import json
import os

async def dlq_enqueue(message: dict, error: str, workflow_id: str):
    conn = await asyncpg.connect(os.environ["DATABASE_URL"])
    await conn.execute("""
        INSERT INTO dlq (workflow_id, payload, error, failed_at)
        VALUES ($1, $2, $3, NOW())
    """, workflow_id, json.dumps(message), error)
    await conn.close()

async def dlq_reinject(dlq_id: int):
    conn = await asyncpg.connect(os.environ["DATABASE_URL"])
    row = await conn.fetchrow("SELECT * FROM dlq WHERE id = $1", dlq_id)
    await conn.execute("DELETE FROM dlq WHERE id = $1", dlq_id)
    await conn.close()
    await workflow_manager.replay(json.loads(row["payload"]))

Monitoring et observabilité

Centraliser les logs

Sans centralisation, les logs sont éparpillés et les dashboards incompatibles. La stack recommandée :

  • Structured logging — JSON par défaut (pas de plain text)
  • Agent de collecte — Vector ou Fluentd vers un agrégateur
  • Stockage — Loki pour les logs, Prometheus pour les metrics, Grafana pour la visualisation
# Configuration Vector pour OpenClaw logs
sources:
  openclaw_logs:
    type: file
    reads: follow
    include:
      - /var/log/openclaw/*.log

transforms:
  normalize:
    type: remap
    source: |
      . = parse_json!(.message)
      .agent_id = .metadata.agent_id
      .workflow_id = .metadata.workflow_id
      .duration_ms = .metrics.duration_ms

sinks:
  loki:
    type: loki
    endpoint: https://loki.example.com
    labels:
      env: production
      service: openclaw

Métriques clés à suivre

MétriqueDescriptionSeuil d'alerte
workflow_duration_secondsTemps total d'exécution> P95 historique + 20%
agent_calls_totalNombre d'appels agentsSpike > 3x baseline
step_error_rateTaux d'erreur par étape> 5%
api_cost_usdCoût cumulé des appels API> budget journalier
retry_ratePourcentage de tâches relancées> 20%
dlq_sizeNombre de messages en DLQ> 0 (alert immediate)

Dashboard Grafana type

Créez un dashboard avec 4 panneaux :

  1. Workflow executions — timeline des executions avec statut (success/failed/running)
  2. Step Error Rate — heatmap par étape (identifie les étapes problématiques)
  3. Cost over time — area chart du coût journalier avec projection mensuelle
  4. DLQ Monitor — single stat avec trend, passe en rouge si > 0

Questions fréquentes

Qu'est-ce qu'un fan-out agent et quand l'utiliser ?

Un fan-out agent dispatche des sous-tâches à plusieurs agents en parallèle. Utilisez-le quand les tâches sont indépendantes. Scrapper 20 pages en même temps au lieu de séquentiellement divise le temps par 20.

Quelle est la différence entre un pipeline et une state machine ?

Pipeline = étapes dans un ordre fixe et linéaire. State machine = états avec transitions conditionnelles. Pipeline plus simple ; state machine plus puissante pour branchements, cycles, et retours en arrière.

Comment gérer les erreurs dans un workflow multi-agents ?

Trois couches : (1) retry exponentiel avec jitter pour les erreurs temporaires, (2) circuit breaker pour éviter de submerger un service qui fail, (3) DLQ pour stocker les messages qui échouent après tous les retries.

OpenClaw suffit-il pour orchestrer un workflow complexe ?

Pour les workflows pipeline ou fan-out, OpenClaw suffit amplement. Pour les embranchements profonds ou les cycles de feedback, combinez avec LangGraph (state machine) et n8n (triggers).

Comment monitorer un workflow IA en production ?

Centralisez les logs (Vector → Loki), exportez les metrics (Prometheus), visualisez (Grafana). Alertez sur le taux d'erreur par étape, la taille DLQ, et le coût cumulé.


Articles liés

Un workflow complexe se construit en ajoutant progressivement de la sophistication : commencez par un pipeline simple, ajoutez le fan-out quand les tâches sont indépendantes, passez à la state machine quand la logique métier devient un graphe d'états. Chaque pattern répond à un niveau de complexité spécifique — ne complexifiez pas prématurément.

Si vous découvrez l'orchestration d'agents, le guide complet sur OpenClaw explique les fondamentaux. Pour approfondir les state machines, le guide LangGraph détaille la modélisation de workflows complexes. Le tutoriel sur les workflows agentiques montre comment combiner ces patterns pas à pas. Enfin, la veille automatisée illustre un cas d'usage concret inspiré de l'exemple de cet article.

Restez informé sur les agents IA

Nouveaux tutoriels, comparatifs et guides pratiques directement dans votre boîte mail.

homeAccueilcodeFrameworkssmart_toyAgentsmenu_bookTutorielsTwitter