Construire un système d'IA agentique avec Confluent
Guide pratique complet : de la configuration à l'implémentation d'agents intelligents réactifs
Ce que vous allez construire
Dans ce tutoriel, nous allons créer un système d'agents IA capables de traiter des événements métier en temps réel. Notre exemple : un système de gestion d'e-commerce où des agents automatisent la détection de fraude, la gestion d'inventaire et les recommandations personnalisées.
Architecture cible
- • Confluent Cloud : plateforme de streaming managée
- • LangChain : framework pour agents IA
- • OpenAI GPT : moteur de raisonnement des agents
- • Python : langage de développement
Guide étape par étape
1
Configuration de l'environnement Confluent
Créer un cluster Confluent Cloud
Commencez par créer un compte sur Confluent Cloud et configurez votre premier cluster Basic.
Paramètres recommandés :
- • Cloud Provider : AWS (us-east-1)
- • Cluster Type : Basic
- • Topics : orders, inventory, recommendations, fraud-alerts
Configuration des API Keys
# Variables d'environnement requises
export CONFLUENT_BOOTSTRAP_SERVERS="your-cluster.confluent.cloud:9092"
export CONFLUENT_API_KEY="your-api-key"
export CONFLUENT_API_SECRET="your-api-secret"
export OPENAI_API_KEY="your-openai-key"
2
Installation des dépendances Python
# Créer un environnement virtuel
python -m venv confluent-agents-env
source confluent-agents-env/bin/activate # Linux/Mac
# confluent-agents-env\\Scripts\\activate # Windows
# Installer les packages requis
pip install confluent-kafka langchain openai langchain-openai pydantic
Structure du projet
confluent-agents/
├── config/
│ ├── __init__.py
│ └── kafka_config.py
├── agents/
│ ├── __init__.py
│ ├── base_agent.py
│ ├── fraud_agent.py
│ └── inventory_agent.py
├── models/
│ ├── __init__.py
│ └── events.py
└── main.py
3
Modèles de données événementielles
models/events.py
from pydantic import BaseModel
from typing import Dict, Any, Optional
from datetime import datetime
class OrderEvent(BaseModel):
event_id: str
timestamp: datetime
user_id: str
order_id: str
items: list[Dict[str, Any]]
total_amount: float
payment_method: str
shipping_address: Dict[str, str]
class InventoryEvent(BaseModel):
event_id: str
timestamp: datetime
product_id: str
stock_level: int
location: str
action: str # "sale", "restock", "reserved"
class FraudAlert(BaseModel):
alert_id: str
timestamp: datetime
order_id: str
risk_score: float
reasons: list[str]
recommended_action: str
4
Agent de base événementiel
agents/base_agent.py
import json
import asyncio
from abc import ABC, abstractmethod
from confluent_kafka import Consumer, Producer
from langchain.llms import OpenAI
from typing import Dict, Any
class BaseEventAgent(ABC):
def __init__(self, agent_name: str, kafka_config: Dict[str, str]):
self.agent_name = agent_name
self.llm = OpenAI(temperature=0.1)
# Configuration Kafka Consumer
consumer_config = {
**kafka_config,
'group.id': f'{agent_name}-group',
'auto.offset.reset': 'latest',
'enable.auto.commit': True,
}
self.consumer = Consumer(consumer_config)
# Configuration Kafka Producer
self.producer = Producer(kafka_config)
@abstractmethod
def process_event(self, event: Dict[str, Any]) -> Optional[Dict[str, Any]]:
"""Traite un événement et retourne une action si nécessaire"""
pass
@abstractmethod
def get_subscribed_topics(self) -> list[str]:
"""Retourne la liste des topics à écouter"""
pass
def start_listening(self):
"""Démarre la boucle de traitement en temps réel"""
topics = self.get_subscribed_topics()
self.consumer.subscribe(topics)
print(f"[{self.agent_name}] Écoute des topics: {topics}")
try:
while True:
msg = self.consumer.poll(1.0)
if msg is None:
continue
if msg.error():
print(f"Erreur consumer: {msg.error()}")
continue
# Traitement de l'événement
event_data = json.loads(msg.value().decode('utf-8'))
print(f"[{self.agent_name}] Événement reçu: {event_data.get('event_id', 'N/A')}")
# Déléguer à la logique métier de l'agent
action = self.process_event(event_data)
if action:
self.publish_action(action)
except KeyboardInterrupt:
print(f"[{self.agent_name}] Arrêt demandé")
finally:
self.consumer.close()
def publish_action(self, action: Dict[str, Any]):
"""Publie une action sur Kafka"""
topic = action.get('target_topic')
if topic:
self.producer.produce(
topic,
json.dumps(action).encode('utf-8')
)
self.producer.flush()
print(f"[{self.agent_name}] Action publiée sur {topic}")
🎯 Résultats attendus
Réactivité temps réel
Vos agents traitent les événements métier dès leur publication (< 100ms)
Intelligence contextuelle
Chaque agent prend des décisions basées sur l'historique et le contexte complet
Scalabilité automatique
Ajoutez facilement de nouveaux agents sans impacter l'existant