Accueil Confluent Tutoriel système d'IA agentique

Construire un système d'IA agentique avec Confluent

Guide pratique complet : de la configuration à l'implémentation d'agents intelligents réactifs

Ce que vous allez construire

Dans ce tutoriel, nous allons créer un système d'agents IA capables de traiter des événements métier en temps réel. Notre exemple : un système de gestion d'e-commerce où des agents automatisent la détection de fraude, la gestion d'inventaire et les recommandations personnalisées.

Architecture cible

  • Confluent Cloud : plateforme de streaming managée
  • LangChain : framework pour agents IA
  • OpenAI GPT : moteur de raisonnement des agents
  • Python : langage de développement

Guide étape par étape

1

Configuration de l'environnement Confluent

Créer un cluster Confluent Cloud

Commencez par créer un compte sur Confluent Cloud et configurez votre premier cluster Basic.

Paramètres recommandés :
  • • Cloud Provider : AWS (us-east-1)
  • • Cluster Type : Basic
  • • Topics : orders, inventory, recommendations, fraud-alerts

Configuration des API Keys

# Variables d'environnement requises
export CONFLUENT_BOOTSTRAP_SERVERS="your-cluster.confluent.cloud:9092"
export CONFLUENT_API_KEY="your-api-key"
export CONFLUENT_API_SECRET="your-api-secret"
export OPENAI_API_KEY="your-openai-key"
2

Installation des dépendances Python

# Créer un environnement virtuel
python -m venv confluent-agents-env
source confluent-agents-env/bin/activate  # Linux/Mac
# confluent-agents-env\\Scripts\\activate  # Windows

# Installer les packages requis
pip install confluent-kafka langchain openai langchain-openai pydantic

Structure du projet

confluent-agents/
├── config/
│   ├── __init__.py
│   └── kafka_config.py
├── agents/
│   ├── __init__.py
│   ├── base_agent.py
│   ├── fraud_agent.py
│   └── inventory_agent.py
├── models/
│   ├── __init__.py
│   └── events.py
└── main.py
3

Modèles de données événementielles

models/events.py

from pydantic import BaseModel
from typing import Dict, Any, Optional
from datetime import datetime

class OrderEvent(BaseModel):
    event_id: str
    timestamp: datetime
    user_id: str
    order_id: str
    items: list[Dict[str, Any]]
    total_amount: float
    payment_method: str
    shipping_address: Dict[str, str]

class InventoryEvent(BaseModel):
    event_id: str
    timestamp: datetime
    product_id: str
    stock_level: int
    location: str
    action: str  # "sale", "restock", "reserved"

class FraudAlert(BaseModel):
    alert_id: str
    timestamp: datetime
    order_id: str
    risk_score: float
    reasons: list[str]
    recommended_action: str
4

Agent de base événementiel

agents/base_agent.py

import json
import asyncio
from abc import ABC, abstractmethod
from confluent_kafka import Consumer, Producer
from langchain.llms import OpenAI
from typing import Dict, Any

class BaseEventAgent(ABC):
    def __init__(self, agent_name: str, kafka_config: Dict[str, str]):
        self.agent_name = agent_name
        self.llm = OpenAI(temperature=0.1)

        # Configuration Kafka Consumer
        consumer_config = {
            **kafka_config,
            'group.id': f'{agent_name}-group',
            'auto.offset.reset': 'latest',
            'enable.auto.commit': True,
        }
        self.consumer = Consumer(consumer_config)

        # Configuration Kafka Producer
        self.producer = Producer(kafka_config)

    @abstractmethod
    def process_event(self, event: Dict[str, Any]) -> Optional[Dict[str, Any]]:
        """Traite un événement et retourne une action si nécessaire"""
        pass

    @abstractmethod
    def get_subscribed_topics(self) -> list[str]:
        """Retourne la liste des topics à écouter"""
        pass

    def start_listening(self):
        """Démarre la boucle de traitement en temps réel"""
        topics = self.get_subscribed_topics()
        self.consumer.subscribe(topics)

        print(f"[{self.agent_name}] Écoute des topics: {topics}")

        try:
            while True:
                msg = self.consumer.poll(1.0)
                if msg is None:
                    continue

                if msg.error():
                    print(f"Erreur consumer: {msg.error()}")
                    continue

                # Traitement de l'événement
                event_data = json.loads(msg.value().decode('utf-8'))

                print(f"[{self.agent_name}] Événement reçu: {event_data.get('event_id', 'N/A')}")

                # Déléguer à la logique métier de l'agent
                action = self.process_event(event_data)

                if action:
                    self.publish_action(action)

        except KeyboardInterrupt:
            print(f"[{self.agent_name}] Arrêt demandé")
        finally:
            self.consumer.close()

    def publish_action(self, action: Dict[str, Any]):
        """Publie une action sur Kafka"""
        topic = action.get('target_topic')
        if topic:
            self.producer.produce(
                topic,
                json.dumps(action).encode('utf-8')
            )
            self.producer.flush()
            print(f"[{self.agent_name}] Action publiée sur {topic}")

🎯 Résultats attendus

Réactivité temps réel

Vos agents traitent les événements métier dès leur publication (< 100ms)

Intelligence contextuelle

Chaque agent prend des décisions basées sur l'historique et le contexte complet

Scalabilité automatique

Ajoutez facilement de nouveaux agents sans impacter l'existant