فهرس المحتويات
1. المقدمة: عنق الزجاجة ليس الذكاء بل الانتباه البشري
في يونيو 2026، ألقى لوك ألفيرو (Luke Alvoeiro)، مهندس الوكلاء الأساسي في شركة Factory والمؤسس السابق لمشروع Goose (الذي تبرعت به Block إلى مؤسسة AI Agent Foundation)، محاضرة ملهمة في مؤتمر تقني عن مستقبل أنظمة الوكلاء المتعددين (Multi-Agent Systems). الفكرة المركزية التي بدأ بها: “عنق الزجاجة في هندسة البرمجيات اليوم ليس الذكاء — بل هو محدود بالانتباه البشري.”
أفضل المهندسين يستطيعون إنجاز 2-3 مهام في اليوم. لديهم قائمة انتظار من 50 ميزة، لكنهم لا يستطيعون دفع أكثر من بضع ميزات للأمام يومياً لأن كل مهمة تتطلب انتباههم الكامل. كل commit يحتاج مراجعتهم. كل قرار يحتاج تدخلهم.
نماذج الذكاء الاصطناعي اليوم ذكية بما يكفي لتنجز المهام الخمسين كلها — لكن لا يوجد عرض نطاق كافٍ (bandwidth) للإشراف على تنفيذها.
الفكرة الجوهرية
ماذا لو قرر الإنسان ماذا نبني، واكتشف النظام كيف يبنيه؟ وكيل يعمل لساعات أو أيام، وتعود أنت لتجد العمل منجزاً. هذا هو الوعد الحقيقي لأنظمة الوكلاء المتعددين.
في هذا المقال، سنبني نظاماً متكاملاً من 4 وكلاء متصلين لمتجر إلكتروني. لكن قبل أن نكتب أي كود، سنفهم الأنماط الخمسة الأساسية للوكلاء المتعددين كما شرحها لوك في محاضرته، ثم نترجمها إلى كود عملي.
2. الأنماط الخمسة للوكلاء المتعددين
عندما تبدأ بالبحث في أطر عمل الوكلاء المتعددين، تكتشف بسرعة أن المجال فوضوي. كل فريق عنده إطاره الخاص، مصطلحاته الخاصة، وآراؤه الخاصة عن ماذا يعمل وماذا لا يعمل. يقترح لوك تصنيفاً بسيطاً: هناك 5 أنماط أساسية تشكل اللبنات التي تجمع منها أي نظام وكلاء متعددين.
2.1 نمط التفويض (Delegation)
المبدأ: وكيل واحد (أب) يولد وكيلاً آخر (ابن) ويطلب منه تنفيذ مهمة محددة. الوكيل الأب يحصل على النتيجة ويعود بها.
هذا أبسط نمط وأكثرهم شيوعاً — هو ما يطبقه معظم المطورين أولاً. مثال: وكيل برمجة يستدعي وكيلاً لفحص قاعدة البيانات.
في مشروعنا
وكيل المبيعات يفوض مهمة فحص المخزون لوكيل المستودع بدلاً من تنفيذها بنفسه.
2.2 نمط المنشئ-المدقّق (Creator-Verifier)
المبدأ: وكيل يبني شيئاً (يكتب كوداً، ينشئ محتوى)، ووكيل آخر منفصل يتحقق من جودة العمل. الفصل بين المسؤوليات أساسي هنا — الوكيل المنفّذ عنده تحيز للتكلفة، يريد أن ينجح كوده. وكيل جديد بسياق جديد أكثر قدرة على اكتشاف المشاكل.
هذا هو سبب وجود مراجعة الكود (Code Review) عند البشر أيضاً.
في مشروعنا
وكيل المستودع ينشئ طلب شراء جديد، ووكيل آخر (Validator) يراجع الطلب ويتحقق من دقته. وكيل التسويق ينشئ حملة إعلانية، ونظام المراجعة يتحقق من التزامها بالعلامة التجارية.
2.3 نمط التواصل المباشر (Direct Communication)
المبدأ: الوكلاء يتواصلون مباشرة بدون منسّق مركزي — مثل إرسال رسائل خاصة (DM) لبعضهم البعض. لكن هذا صعب التطبيق لأن الحالة (state) تتجزأ عبر محادثات متعددة ولا يوجد مصدر حقيقة واحد.
تحذير
في مشروعنا، نتجنب التواصل المباشر بين الوكلاء ونمرر كل شيء عبر المنسّق (Orchestrator). هذا يبقي النظام قابلاً للتتبع والتصحيح.
2.4 نمط التفاوض (Negotiation)
المبدأ: وكلاء يتواصلون حول مورد مشترك (API، سعة تخزين، قاعدة بيانات). ليس بالضرورة أن يكون التفاوض عدائياً — أفضل حالة استخدام هي عندما يكون هناك تبادل مربح للطرفين (win-win). مثلاً: وكيل المبيعات يتفاوض مع وكيل المخزون على حجز كمية معينة لمنتج لصالح عميل VIP.
في مشروعنا
وكيل ذكاء المنافسين يكتشف أن المنافس خفض سعر منتجاً، فيتفاوض مع وكيل التسويق على إنشاء عرض ترويجي عاجل، ويتفاوض مع وكيل المخزون على توفير الكمية اللازمة.
2.5 نمط البث (Broadcast)
المبدأ: وكيل يرسل معلومات لكل الوكلاء الآخرين. أقل إثارة من الأنماط الأخرى لكنه حاسم لاستمرارية التماسك في المهام الطويلة. فكر فيه كـ تحديثات الحالة، معلومات سياقية جديدة، قيود مشتركة.
في مشروعنا
وكيل التسويق يبث إطلاق منتج جديد لكل الوكلاء — وكيل المبيعات يبدأ بالترحيب بالاستفسارات، وكيل المخزون يجهز الكميات، وكيل المنافسين يراقب ردود فعل السوق.
Delegation
- وكيل ← وكيل تابع
- سياق منفصل
- الأكثر شيوعاً
✅ Creator-Verifier
- فصل المسؤوليات
- يفصل التحيزات
- منع الانجراف
Negotiation
- مورد مشترك
- Win-win
- التفاعل المباشر
Broadcast
- تحديثات جماعية
- تماسك النظام
- سياق موحد
3. معمارية Missions ثلاثية الأدوار
في محاضرته، شرح لوك كيف طبقت Factory هذه الأنماط في نظام اسمه Missions. يجمع Missions أربعة من الأنماط الخمسة (Delegation, Creator-Verifier, Broadcast, Negotiation) في سير عمل موحد يعتمد على 3 أدوار:
3.1 المنسّق (Orchestrator)
يتولى التخطيط. عندما تصف ما تريد، المنسّق يعمل كصندوق رنين — يسألك الأسئلة الاستراتيجية الصحيحة، يكتشف المتطلبات غير الواضحة، ثم ينتج خطة تتضمن الميزات والمعالم (milestones) وعقد التحقق (validation contract).
3.2 العمّال (Workers)
يتولون التنفيذ. كل مهمة تُسند إلى عامل بسياق نظيف — لا تراكم، لا تدهور في الانتباه. العامل يقرأ المواصفات، ينفذ الميزة، ثم يقدم تقرير تسليم منظم (structured handoff).
3.3 المدقّقون (Validators)
يتولون التحقق. الأهمية الكبيرة هنا: النظام لا يعتمد فقط على اختبارات lint و type checking. عقد التحقق (validation contract) يُكتب أثناء التخطيط، قبل أي كود. هذا يعني أن الاختبارات لا تتطابق مع الكود، بل الكود يجب أن يطابق الاختبارات المحددة مسبقاً.
أهم درس من محاضرة لوك
“الاختبارات المكتوبة بعد التنفيذ لا تكتشف الأخطاء — بل تؤكد القرارات.” إذا اعتمدت على التحقق بهذه الطريقة، نظامك سينجرف حتماً. عقد التحقق يُكتب أثناء التخطيط ويحدد الصحة (correctness) مستقلة عن التنفيذ.
3.4 التمرير المنظم (Structured Handoffs)
عندما ينهي العامل مهمته، لا يقول فقط “خلصت”. يملأ استمارة تسليم مفصلة: ما تم إنجازه، ما لم يُنجز، الأوامر التي نُفذت، رموز الخروج، المشكلات التي اكتُشفت، وهل التزم بالإجراءات التي حددها المنسّق. بهذه الطريقة، النظام يلتقط الأخطاء عند حدود المعالم ويصحح مساره ذاتياً.
3.5 التنفيذ التسلسلي (Serial Execution)
خيار بديهي هو التوازي (parallelism): 10 وكلاء يعملون معاً = 10 أضعاف الإنتاجية. لكن لوك وجد أن هذا لا يعمل لمهام تطوير البرمجيات لأن الوكلاء يتعارضون — يدوسون على تغييرات بعضهم، يكررون العمل، يتخذون قرارات معمارية متناقضة. الحل: التنفيذ التسلسلي للميزات مع توازٍ مستهدف لعمليات القراءة فقط.
مبدأ أساسي
“يبدو التنفيذ التسلسلي أبطأ على الورق، لكن معدل الأخطاء ينخفض بشكل كبير. وعندما تعمل على مهام تستمر أياماً، هذا النوع من الصحة يتراكم.”
أطول مهمة (mission) نفذتها Factory استمرت 16 يوماً — أطول من سباق السرعة (sprint) بأكمله — ويعتقدون أنها يمكن أن تستمر 30 يوماً. هذا ممكن فقط بسبب البنية المنظمة (structured architecture).
4. المشروع العملي: متجر إلكتروني بـ 4 وكلاء متصلين
حان وقت التطبيق. سنبني نظاماً كاملاً لإدارة متجر إلكتروني يعتمد على 4 وكلاء متصلين فيما بينهم عبر منسّق مركزي. النظام مكتوب بـ Python + FastAPI + OpenAI Agents SDK، ويطبق جميع الأنماط الخمسة التي تعلمناها.
نظرة عامة على المعمارية

هيكل المشروع
لنبدأ بهيكل المجلدات:
# project_structure.txt ecommerce-agent-system/ docker-compose.yml requirements.txt config.py # الإعدادات البيئية main.py # نقطة الدخول الرئيسية (FastAPI) orchestrator/ __init__.py coordinator.py # المنسق الأساسي task_queue.py # قائمة المهام memory.py # الذاكرة المشتركة agents/ __init__.py base_agent.py # القاعدة الأساسية لكل وكلاء sales_agent.py # وكيل المبيعات والدعم competitor_agent.py # وكيل ذكاء المنافسين inventory_agent.py # وكيل إدارة المنتجات marketing_agent.py # وكيل التسويق tools/ __init__.py db_tools.py # أدوات قاعدة البيانات social_tools.py # أدوات وسائل التواصل scraper_tools.py # أدوات جمع البيانات models/ __init__.py schemas.py # نماذج البيانات database.py # اتصال قاعدة البيانات tests/ test_sales.py test_competitor.py test_inventory.py test_marketing.py test_orchestrator.py README.md
الآن، لنبدأ ببناء كل مكون — من القاعدة إلى المكونات المتقدمة.
4.0 القاعدة الأساسية — كل الوكيل
# agents/base_agent.py - القاعدة الأساسية لكل الوكيل from abc import ABC, abstractmethod from pydantic import BaseModel from typing import Any, Dict, List, Optional from datetime import datetime class HandoffReport(BaseModel): """تقرير التسليم المنظم — المستلهم من Structured Handoffs في Missions""" agent_name: str task_id: str status: str # completed, failed, needs_review completed_items: List[str] uncompleted_items: List[str] commands_executed: List[str] issues_discovered: List[str] procedures_followed: bool output_summary: str timestamp: datetime = datetime.now() class BaseAgent(ABC): """القاعدة الأساسية لكل الوكيل""" def __init__(self, name: str, orchestrator_ref=None): self.name = name self.orchestrator = orchestrator_ref self.context: Dict[str, Any] = {} self.task_history: List[Dict] = [] async def process_task(self, task: Dict) -> HandoffReport: """معالجة مهمة وإرجاع تقرير تسليم منظم""" self.context.update(task.get("context", {})) result = await self._execute(task) report = self._create_handoff(result) self.task_history.append(report.dict()) return report @abstractmethod async def _execute(self, task: Dict) -> Dict: pass def _create_handoff(self, result: Dict) -> HandoffReport: return HandoffReport( agent_name=self.name, task_id=result.get("task_id", "unknown"), status=result.get("status", "completed"), completed_items=result.get("completed", []), uncompleted_items=result.get("uncompleted", []), commands_executed=result.get("commands", []), issues_discovered=result.get("issues", []), procedures_followed=result.get("followed", True), output_summary=result.get("summary", "") ) def broadcast(self, message: str, data: Dict = None): """نمط Broadcast — يرسل رسالة لكل الوكيل الآخرين عبر المنسق""" if self.orchestrator: self.orchestrator.broadcast( sender=self.name, message=message, data=data )
نموذج البيانات (Data Schema)
# models/schemas.py - نماذج البيانات الموحدة from pydantic import BaseModel from typing import List, Optional from datetime import datetime from enum import Enum class ProductCategory(str, Enum): electronics = "Electronics" clothing = "Clothing" food = "Food" books = "Books" home = "Home & Garden" class Product(BaseModel): id: int name: str category: ProductCategory price: float stock: int description: str sku: str class Order(BaseModel): id: int customer_name: str customer_contact: str products: List[dict] # [{product_id, quantity}] total: float status: str # pending, confirmed, shipped, delivered created_at: datetime = datetime.now() class CompetitorPrice(BaseModel): competitor_name: str product_name: str price: float our_price: float price_diff: float timestamp: datetime = datetime.now() class Campaign(BaseModel): id: int name: str platform: str # facebook, google, twitter budget: float spent: float = 0.0 status: str # draft, active, paused, completed target_audience: str start_date: datetime end_date: Optional[datetime] = None
قاعدة البيانات (Database)
# models/database.py - طبقة قاعدة البيانات import sqlite3 from contextlib import contextmanager from typing import List, Dict, Any import json class Database: """طبقة قاعدة بيانات بسيطة — تستخدم SQLite ولكن يمكن استبدالها بـ PostgreSQL""" def __init__(self, db_path: str = "ecommerce.db"): self.db_path = db_path self._init_db() def _init_db(self): with self._conn() as conn: conn.execute("""CREATE TABLE IF NOT EXISTS products ( id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT NOT NULL, category TEXT NOT NULL, price REAL NOT NULL, stock INTEGER NOT NULL DEFAULT 0, description TEXT, sku TEXT UNIQUE NOT NULL )""") conn.execute("""CREATE TABLE IF NOT EXISTS orders ( id INTEGER PRIMARY KEY AUTOINCREMENT, customer_name TEXT NOT NULL, customer_contact TEXT, products TEXT NOT NULL, total REAL NOT NULL, status TEXT DEFAULT 'pending', source TEXT DEFAULT 'website', created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP )""") conn.execute("""CREATE TABLE IF NOT EXISTS competitor_prices ( id INTEGER PRIMARY KEY AUTOINCREMENT, competitor_name TEXT NOT NULL, product_name TEXT NOT NULL, price REAL NOT NULL, our_price REAL NOT NULL, price_diff REAL NOT NULL, timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP )""") conn.execute("""CREATE TABLE IF NOT EXISTS campaigns ( id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT NOT NULL, platform TEXT NOT NULL, budget REAL NOT NULL, spent REAL DEFAULT 0, status TEXT DEFAULT 'draft', target_audience TEXT, start_date TIMESTAMP, end_date TIMESTAMP )""") conn.execute("""CREATE TABLE IF NOT EXISTS conversations ( id INTEGER PRIMARY KEY AUTOINCREMENT, customer TEXT NOT NULL, channel TEXT NOT NULL, messages TEXT, status TEXT DEFAULT 'open', created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP )""") conn.commit() @contextmanager def _conn(self): conn = sqlite3.connect(self.db_path) conn.row_factory = sqlite3.Row try: yield conn finally: conn.close() def query(self, sql: str, params: tuple = ()) -> List[Dict[str, Any]]: with self._conn() as conn: cursor = conn.execute(sql, params) rows = [dict(row) for row in cursor.fetchall()] return rows def execute(self, sql: str, params: tuple = ()): with self._conn() as conn: conn.execute(sql, params) conn.commit()
4.1 الوكيل 1: المبيعات ودعم العملاء
نمط التصميم: Delegation + Broadcast
المسؤولية: استقبال العملاء عبر جميع القنوات (Telegram, WhatsApp, Twitter/X, الموقع)، الإجابة عن الاستفسارات، إنشاء الطلبات، تصعيد المشكلات.
# agents/sales_agent.py - وكيل المبيعات ودعم العملاء from typing import Dict, List from .base_agent import BaseAgent from .tools.db_tools import DatabaseTools class SalesAgent(BaseAgent): """يتولى المبيعات ودعم العملاء عبر جميع القنوات""" def __init__(self, db_tools: DatabaseTools, orchestrator_ref=None): super().__init__(name="sales_agent", orchestrator_ref=orchestrator_ref) self.db = db_tools self.conversations = {} async def _execute(self, task: Dict) -> Dict: task_type = task.get("type") if task_type == "customer_message": return self._handle_message( task.get("customer"), task.get("message"), task.get("channel", "website") ) elif task_type == "create_order": return self._create_order(task.get("order_data")) elif task_type == "check_order_status": return self._check_order(task.get("order_id")) elif task_type == "escalate": return self._escalate(task.get("issue")) else: return {"status": "failed", "summary": f"Unknown task type: {task_type}"} def _handle_message(self, customer: str, message: str, channel: str) -> Dict: """معالجة رسالة عميل واستخراج النية""" # حفظ المحادثة في الذاكرة conversation_id = hash(customer + channel) self.conversations.setdefault(conversation_id, []).append({ "role": "customer", "text": message }) # استخراج النية من الرسالة (في الإنتاج، استخدم LLM هنا) intent = self._detect_intent(message) completed = [] response = "" if intent == "search_product": # استعلام عن منتج — Delegation إلى وكيل المخزون products = self.db.search_products(message) if products: response = f"وجدنا المنتجات التالية:\n" for p in products[:5]: response += f"• {p['name']} — ${p['price']:.2f} ({p['stock']} قطعة)\n" else: response = "للأسف، لم نجد منتجات تطابق طلبك." completed.append("بحث عن منتجات") elif intent == "create_order": # إنشاء طلب — Delegation إلى وكيل المخزون لتأكيد التوفر if self.orchestrator: inventory_check = self.orchestrator.delegate_task("inventory_agent", { "type": "check_and_reserve", "product": message }) if inventory_check.get("available"): order = "ORDER-123" # generated self.db.create_order(customer, channel, inventory_check["items"]) response = f"تم إنشاء طلبك {order} بنجاح! " completed.append("إنشاء طلب") # بث للوكيل الآخرين — Broadcast pattern self.broadcast("طلب جديد", {"order": order, "customer": customer}) else: response = "عذراً، المنتج غير متوفر حالياً." elif intent == "complaint": response = "نأسف للإزعاج! تم تصعيد شكواك إلى الفريق المختص وسيتم الرد خلال 24 ساعة." self.broadcast("شكوى عميل", {"customer": customer, "message": message}) completed.append("تصعيد شكوى") else: response = "مرحباً بك في متجرنا! يمكنني مساعدتك في:\n 1. البحث عن منتجات\n 2. إنشاء طلب\n 3. متابعة حالة طلب\n 4. الإجابة عن استفسارات" completed.append("معالجة استفسار عام") # حفظ الرد في المحادثة self.conversations[conversation_id].append({ "role": "agent", "text": response }) return { "status": "completed", "completed": completed, "summary": f"تم الرد على {customer} عبر {channel}: {intent}", "response": response } def _detect_intent(self, message: str) -> str: """كشف نية المستخدم — في الإنتاج يستخدم LLM""" message_lower = message.lower() if "كم" in message_lower or "سعر" in message_lower: return "search_product" if "طلب" in message_lower or "اشتري" in message_lower: return "create_order" if "شكوى" in message_lower or "مشكلة" in message_lower: return "complaint" if "طلب" in message_lower and "رقم" in message_lower: return "check_order" return "general" def _create_order(self, order_data: dict) -> Dict: """إنشاء طلب جديد بعد التحقق من المخزون""" try: order_id = self.db.create_order( order_data["customer"], order_data["channel"], order_data["items"] ) return { "status": "completed", "completed": [f"طلب {order_id}"], "summary": f"تم إنشاء الطلب {order_id}" } except Exception as e: return { "status": "failed", "issues": [f"خطأ في إنشاء الطلب: {str(e)}"], "summary": "فشل في إنشاء الطلب" }
4.2 الوكيل 2: ذكاء المنافسين
نمط التصميم: Negotiation
المسؤولية: مراقبة أسعار المنافسين، تحليل الاتجاهات، تقديم توصيات لتحسين المبيعات، التفاوض مع الوكلاء الآخرين لتنفيذ التغييرات.
# agents/competitor_agent.py - وكيل ذكاء المنافسين from typing import Dict, List from datetime import datetime, timedelta from .base_agent import BaseAgent from .tools.scraper_tools import ScraperTools from .tools.db_tools import DatabaseTools class CompetitorAgent(BaseAgent): """يراقب المنافسين ويقدم توصيات تحسين المبيعات""" def __init__(self, db_tools: DatabaseTools, scraper: ScraperTools, orchestrator_ref=None): super().__init__(name="competitor_agent", orchestrator_ref=orchestrator_ref) self.db = db_tools self.scraper = scraper self.price_cache = {} async def _execute(self, task: Dict) -> Dict: task_type = task.get("type") if task_type == "scan_prices": return await self._scan_competitor_prices() elif task_type == "analyze_trends": return await self._analyze_market_trends() elif task_type == "price_recommendation": return await self._recommend_price_adjustments() elif task_type == "competitor_report": return await self._generate_report(task.get("product_id")) else: return {"status": "failed", "summary": "مهمة غير معروفة"} async def _scan_competitor_prices(self) -> Dict: """مسح أسعار المنافسين وتخزينها — يستخدم Scraper Tools""" # في الإنتاج، هذا يزحف على مواقع المنافسين # هنا نستخدم بيانات محاكاة competitors = ["TechMart", "GadgetPro", "DigitalStore"] products = self.db.query("SELECT id, name, price FROM products") scanned = [] for product in products[:5]: for comp in competitors: # في الحقيقة: price = await self.scraper.get_price(comp, product['name']) comp_price = product["price"] * (0.85 + hash(comp) % 30 / 100) diff = product["price"] - comp_price self.db.execute("""INSERT INTO competitor_prices (competitor_name, product_name, price, our_price, price_diff) VALUES (?, ?, ?, ?, ?)""", (comp, product["name"], round(comp_price, 2), product["price"], round(diff, 2))) scanned.append({ "product": product["name"], "competitor": comp, "their_price": comp_price, "our_price": product["price"], "diff": diff }) summary = f"مسح {len(scanned)} سعر منافس لـ 5 منتجات" # Negotiation: إذا وجدنا فرق سعر كبير، تفاوض مع وكيل التسويق urgent_alerts = [s for s in scanned if s["diff"] < -5.0] if urgent_alerts and self.orchestrator: self.orchestrator.delegate_task("marketing_agent", { "type": "urgent_promo", "reason": "المنافسون يخفضون الأسعار", "products": [u["product"] for u in urgent_alerts] }) return { "status": "completed", "completed": [summary], "summary": summary, "alerts": "urgent" if urgent_alerts else "normal" } async def _recommend_price_adjustments(self) -> Dict: """تحليل بيانات الأسعار وتقديم توصيات""" data = self.db.query("""SELECT product_name, competitor_name, price, our_price, price_diff FROM competitor_prices WHERE timestamp > datetime('now', '-7 days') ORDER BY price_diff ASC""") recommendations = [] by_product = {} for row in data: by_product.setdefault(row["product_name"], []).append(row) for product, prices in by_product.items(): avg_diff = sum(p["price_diff"] for p in prices) / len(prices) if avg_diff < -5: recommendations.append({ "product": product, "action": "تخفيض السعر", "reason": f"متوسط الفرق {avg_diff:.2f}$ أقل من المنافسين" }) elif avg_diff > 10: recommendations.append({ "product": product, "action": "رفع السعر (لدينا ميزة)", "reason": f"متوسط الفرق {avg_diff:.2f}$ أعلى من المنافسين" }) return { "status": "completed", "completed": [f"{len(recommendations)} توصية سعرية"], "summary": f"تم تحليل {len(data)} سجل أسعار، {len(recommendations)} توصية", "recommendations": recommendations }
4.3 الوكيل 3: إدارة المنتجات والمستودع
نمط التصميم: Creator-Verifier
المسؤولية: إدارة المخزون، إضافة/تحديث المنتجات، التحقق من صحة البيانات، التنبؤ بنفاد المخزون.
# agents/inventory_agent.py - وكيل إدارة المنتجات والمستودع from typing import Dict, List, Optional from datetime import datetime from .base_agent import BaseAgent from .tools.db_tools import DatabaseTools class InventoryAgent(BaseAgent): """يدير المنتجات والمخزون — Creator-Verifier pattern""" def __init__(self, db_tools: DatabaseTools, orchestrator_ref=None): super().__init__(name="inventory_agent", orchestrator_ref=orchestrator_ref) self.db = db_tools async def _execute(self, task: Dict) -> Dict: task_type = task.get("type") if task_type == "add_product": return self._add_product(task.get("product_data")) elif task_type == "update_stock": return self._update_stock( task.get("product_id"), task.get("quantity") ) elif task_type == "check_stock": return self._check_stock( task.get("product_id"), task.get("quantity", 1) ) elif task_type == "check_and_reserve": return self._reserve_products(task.get("items", [])) elif task_type == "low_stock_alert": return self._low_stock_report() elif task_type == "verify": return self._verify_inventory(task.get("checksum")) else: return {"status": "failed", "summary": "مهمة غير معروفة"} def _add_product(self, data: Dict) -> Dict: """إضافة منتج جديد — Creator-Verifier: الإنشاء + التحقق""" # === CREATOR PHASE: إنشاء المنتج === if not all(k in data for k in ["name", "category", "price", "stock", "sku"]): return {"status": "failed", "issues": ["بيانات المنتج غير مكتملة"]} # === VERIFIER PHASE: التحقق من صحة البيانات === errors = [] if data["price"] <= 0: errors.append("السعر يجب أن يكون أكبر من 0") if data["stock"] < 0: errors.append("الكمية لا يمكن أن تكون سالبة") if not data["sku"].strip(): errors.append("SKU لا يمكن أن يكون فارغاً") existing = self.db.query("SELECT id FROM products WHERE sku = ?", (data["sku"],)) if existing: errors.append(f"SKU '{data['sku']}' موجود مسبقاً في قاعدة البيانات") if errors: return {"status": "failed", "issues": errors, "summary": "فشل التحقق من صحة المنتج"} # التنفيذ بعد التحقق self.db.execute("""INSERT INTO products (name, category, price, stock, description, sku) VALUES (?, ?, ?, ?, ?, ?)""", (data["name"], data["category"], data["price"], data["stock"], data.get("description", ""), data["sku"])) product_id = self.db.query("SELECT last_insert_rowid()")[0]["last_insert_rowid()"] return { "status": "completed", "completed": [f"إضافة {data['name']} (ID: {product_id})", "التحقق من البيانات"], "summary": f"تمت إضافة {data['name']} بنجاح", "product_id": product_id } def _check_stock(self, product_id: int, quantity: int = 1) -> Dict: """التحقق من توفر المخزون""" products = self.db.query("SELECT * FROM products WHERE id = ?", (product_id,)) if not products: return {"status": "failed", "summary": "المنتج غير موجود"} product = products[0] available = product["stock"] >= quantity return { "status": "completed", "completed": [f"فحص توفر {product['name']}"], "available": available, "current_stock": product["stock"], "summary": f"{'متوفر' if available else 'غير متوفر'}: {product['name']}" } def _low_stock_report(self) -> Dict: """تقرير المنتجات منخفضة المخزون — تشغيل Broadcast""" low = self.db.query("""SELECT id, name, stock, sku FROM products WHERE stock < 10 ORDER BY stock ASC""") if low and self.orchestrator: # Broadcast: أبلغ كل الوكيل self.broadcast("تحذير مخزون منخفض", { "count": len(low), "products": [p["name"] for p in low] }) return { "status": "completed", "completed": [f"فحص مخزون {len(low)} منتجات منخفضة"], "low_stock_products": low, "summary": f"تم العثور على {len(low)} منتجات بمخزون منخفض" } def _verify_inventory(self, checksum: str) -> Dict: """Creator-Verifier: التحقق من صحة المخزون الكامل""" count = self.db.query("SELECT COUNT(*) as c FROM products")[0]["c"] total_stock = self.db.query("SELECT SUM(stock) as s FROM products")[0]["s"] calculated = f"PROD-{count}-STK-{total_stock}" match = (calculated == checksum) if checksum else True return { "status": "completed", "verified": match, "total_products": count, "total_stock": total_stock, "checksum": calculated, "summary": f"التحقق {'ناجح' if match else 'فاشل'}: {count} منتج، {total_stock} قطعة" }
4.4 الوكيل 4: التسويق الاحترافي
نمط التصميم: Broadcast
المسؤولية: إدارة الحملات الإعلانية، تحليل أداء التسويق، نشر المحتوى عبر وسائل التواصل، توليد تقارير أسبوعية.
# agents/marketing_agent.py - وكيل التسويق الاحترافي from typing import Dict, List, Optional from datetime import datetime, timedelta from .base_agent import BaseAgent from .tools.db_tools import DatabaseTools from .tools.social_tools import SocialTools class MarketingAgent(BaseAgent): """يدير التسويق والحملات الإعلانية — Broadcast pattern""" def __init__(self, db_tools: DatabaseTools, social: SocialTools, orchestrator_ref=None): super().__init__(name="marketing_agent", orchestrator_ref=orchestrator_ref) self.db = db_tools self.social = social async def _execute(self, task: Dict) -> Dict: task_type = task.get("type") if task_type == "create_campaign": return self._create_campaign(task.get("campaign_data")) elif task_type == "update_campaign": return self._update_campaign( task.get("campaign_id"), task.get("updates")) elif task_type == "post_content": return self._post_social_content(task.get("content"), task.get("platforms")) elif task_type == "analytics_report": return self._generate_analytics() elif task_type == "urgent_promo": return self._handle_urgent_promo( task.get("reason"), task.get("products", [])) else: return {"status": "failed", "summary": "مهمة غير معروفة"} def _create_campaign(self, data: Dict) -> Dict: """إنشاء حملة إعلانية جديدة — Broadcast للوكيل الآخرين""" if not all(k in data for k in ["name", "platform", "budget", "target_audience"]): return {"status": "failed", "issues": ["بيانات الحملة غير مكتملة"]} self.db.execute("""INSERT INTO campaigns (name, platform, budget, status, target_audience, start_date) VALUES (?, ?, ?, 'active', ?, datetime('now'))""", (data["name"], data["platform"], data["budget"], data["target_audience"])) campaign_id = self.db.query("SELECT last_insert_rowid()")[0]["last_insert_rowid()"] # Broadcast للحملة الجديدة لكل الوكيل self.broadcast("حملة تسويقية جديدة", { "campaign_id": campaign_id, "name": data["name"], "platform": data["platform"], "budget": data["budget"] }) return { "status": "completed", "completed": [f"إنشاء حملة {data['name']}", "بث إشعار للوكيل الآخرين"], "campaign_id": campaign_id, "summary": f"تم إطلاق حملة {data['name']} على {data['platform']}" } def _handle_urgent_promo(self, reason: str, products: List[str]) -> Dict: """استجابة عاجلة لتهديد منافس — Negotiation مع الوكيل الآخرين""" products_str = ", ".join(products) # إنشاء حملة عاجلة promo_data = { "name": f"PROMO-URGENT-{datetime.now().strftime('%Y%m%d')}", "platform": "all", "budget": 500.0, "target_audience": "existing_customers" } result = self._create_campaign(promo_data) # بث للوكيل الآخرين — Broadcast self.broadcast("حملة ترويجية عاجلة", { "reason": reason, "products": products, "campaign": promo_data["name"] }) result["summary"] = f"تم إنشاء حملة عاجلة لمواجهة: {reason}. المنتجات: {products_str}" return result def _generate_analytics(self) -> Dict: """تقرير تحليلي أسبوعي — يستخدم Broadcast لمشاركة النتائج""" total_orders = self.db.query("SELECT COUNT(*) as c FROM orders")[0]["c"] total_revenue = self.db.query("SELECT COALESCE(SUM(total), 0) as s FROM orders")[0]["s"] active_campaigns = self.db.query( "SELECT COUNT(*) as c FROM campaigns WHERE status = 'active'")[0]["c"] low_stock = self.db.query("SELECT COUNT(*) as c FROM products WHERE stock < 10")[0]["c"] report = { "period": "الأسبوع الماضي", "total_orders": total_orders, "total_revenue": total_revenue, "active_campaigns": active_campaigns, "low_stock_alerts": low_stock, "generated_at": datetime.now().isoformat() } # Broadcast التقرير لكل الوكيل self.broadcast("التقرير التحليلي الأسبوعي", report) return { "status": "completed", "completed": ["توليد التقرير التحليلي", "بثه لكل الوكيل"], "report": report, "summary": f"تقرير: {total_orders} طلب، ${total_revenue:.2f} إيرادات" }
4.5 المنسّق (Orchestrator) — قلب النظام
المسؤولية: توزيع المهام على الوكلاء، إدارة السياق المشترك، معالجة البث (Broadcast)، تمرير التقارير (Structured Handoffs)، استقبال طلبات API.
# orchestrator/coordinator.py - المنسق المركزي from typing import Dict, List, Optional, Callable from datetime import datetime import asyncio from collections import defaultdict class AgentOrchestrator: """المنسق المركزي — يدير كل الوكلاء ويوزع المهام""" def __init__(self): self.agents: Dict[str, "BaseAgent"] = {} self.broadcast_log: List[Dict] = [] self.task_results: Dict[str, Dict] = {} self.shared_context: Dict[str, any] = { "store_name": "متجر التقنية", "currency": "USD", "agents_online": [], "active_campaigns": [] } def register_agent(self, agent: "BaseAgent"): """تسجيل وكيل جديد في النظام""" self.agents[agent.name] = agent self.shared_context["agents_online"].append(agent.name) async def delegate_task(self, agent_name: str, task: Dict) -> Dict: """Delegation pattern — تفويض مهمة لوكيل معين""" agent = self.agents.get(agent_name) if not agent: return {"status": "error", "summary": f"الوكيل {agent_name} غير موجود"} # إضافة السياق المشترك task["context"] = {**self.shared_context, **task.get("context", {})} result = await agent.process_task(task) self.task_results[result.task_id] = result.dict() return result.dict() def broadcast(self, sender: str, message: str, data: Dict = None): """Broadcast pattern — يرسل رسالة لكل الوكيل""" entry = { "from": sender, "message": message, "data": data or {}, "timestamp": datetime.now().isoformat() } self.broadcast_log.append(entry) # تحديث السياق المشترك if data: self.shared_context.update({f"last_broadcast_{sender}": data}) async def process_incoming_message(self, customer: str, message: str, channel: str = "website") -> str: """نقطة الدخول لكل الرسائل من العملاء""" result = await self.delegate_task("sales_agent", { "type": "customer_message", "customer": customer, "message": message, "channel": channel }) return result.get("response", "عذراً، حدث خطأ في معالجة طلبك.") async def schedule_routine(self): """المهام الروتينية: فحص المنافسين كل 6 ساعات، تقرير المخزون يومياً""" while True: # كل 6 ساعات: مسح أسعار المنافسين await self.delegate_task("competitor_agent", { "type": "scan_prices" }) # كل 6 ساعات: فحص المخزون المنخفض await self.delegate_task("inventory_agent", { "type": "low_stock_alert" }) await asyncio.sleep(6 * 3600) # 6 ساعات # كل 24 ساعة: تقرير تحليلي + توصيات أسعار await self.delegate_task("marketing_agent", { "type": "analytics_report" }) await self.delegate_task("competitor_agent", { "type": "price_recommendation" }) await asyncio.sleep(18 * 3600) # باقي اليوم def get_status(self) -> Dict: """حالة النظام الكاملة — يشبه Mission Control في Factory""" return { "agents_online": list(self.agents.keys()), "total_tasks_completed": len(self.task_results), "broadcasts_sent": len(self.broadcast_log), "recent_broadcasts": self.broadcast_log[-5:], "shared_context": self.shared_context }
4.6 الذاكرة المشتركة وتمرير السياق
# orchestrator/memory.py - الذاكرة المشتركة لنظام الوكيل from typing import Dict, Any, List, Optional from datetime import datetime import json class SharedMemory: """الذاكرة المشتركة — مصدر الحقيقة الوحيد (Single Source of Truth)""" def __init__(self): self.store: Dict[str, Any] = {} self.event_log: List[Dict] = [] def set(self, key: str, value: Any, source: str = "system"): """تحديث قيمة في الذاكرة المشتركة""" self.store[key] = { "value": value, "updated_by": source, "updated_at": datetime.now().isoformat() } self.event_log.append({ "action": "set", "key": key, "source": source, "timestamp": datetime.now().isoformat() }) def get(self, key: str, default: Any = None) -> Any: """قراءة قيمة من الذاكرة المشتركة""" entry = self.store.get(key) return entry["value"] if entry else default def get_all(self) -> Dict: return {k: v["value"] for k, v in self.store.items()} def get_context_snapshot(self, agent_name: str) -> str: """لقطة سياق — ما يحتاجه الوكيل لبدء العمل""" context = f"=== SHARED CONTEXT ({datetime.now().isoformat()}) ===\n" for key, entry in self.store.items(): context += f" * {key}: {entry['value']} ({entry['updated_by']}, {entry['updated_at']})\n" # إضافة الأحداث الأخيرة recent = [e for e in self.event_log[-10:]] context += f"\n=== RECENT EVENTS ({len(recent)} of {len(self.event_log)}) ===\n" for event in recent: context += f" * [{event['timestamp']}] {event['source']}: {event['action']} {event['key']}\n" return context
4.7 تطبيق FastAPI — نقطة الدخول
# main.py - تطبيق FastAPI الرئيسي from fastapi import FastAPI, HTTPException from pydantic import BaseModel from typing import List, Optional import asyncio import uvicorn from orchestrator.coordinator import AgentOrchestrator from orchestrator.memory import SharedMemory from models.database import Database from agents.tools.db_tools import DatabaseTools from agents.tools.scraper_tools import ScraperTools from agents.tools.social_tools import SocialTools from agents.sales_agent import SalesAgent from agents.competitor_agent import CompetitorAgent from agents.inventory_agent import InventoryAgent from agents.marketing_agent import MarketingAgent # التهيئة app = FastAPI(title="Multi-Agent E-commerce System") db = Database("ecommerce.db") db_tools = DatabaseTools(db) memory = SharedMemory() orchestrator = AgentOrchestrator() # إنشاء وكلاء sales_agent = SalesAgent(db_tools, orchestrator) competitor_agent = CompetitorAgent(db_tools, ScraperTools(), orchestrator) inventory_agent = InventoryAgent(db_tools, orchestrator) marketing_agent = MarketingAgent(db_tools, SocialTools(), orchestrator) # تسجيلهم orchestrator.register_agent(sales_agent) orchestrator.register_agent(competitor_agent) orchestrator.register_agent(inventory_agent) orchestrator.register_agent(marketing_agent) class MessageRequest(BaseModel): customer: str message: str channel: str = "website" class ProductRequest(BaseModel): name: str category: str price: float stock: int description: str = "" sku: str class CampaignRequest(BaseModel): name: str platform: str budget: float target_audience: str @app.on_event("startup") async def startup(): # بدء المهام الروتينية في الخلفية asyncio.create_task(orchestrator.schedule_routine()) # ==================== API ENDPOINTS ==================== @app.post("/api/message") async def handle_message(req: MessageRequest): """استقبال رسالة من عميل""" response = await orchestrator.process_incoming_message( req.customer, req.message, req.channel ) return {"response": response} @app.post("/api/products") async def add_product(req: ProductRequest): """إضافة منتج جديد""" result = await orchestrator.delegate_task("inventory_agent", { "type": "add_product", "product_data": req.dict() }) return result @app.get("/api/products") async def list_products(category: Optional[str] = None): """قائمة المنتجات""" if category: rows = db.query("SELECT * FROM products WHERE category = ?", (category,)) else: rows = db.query("SELECT * FROM products") return {"products": rows} @app.get("/api/competitor/prices") async def get_competitor_prices(): """آخر أسعار المنافسين""" rows = db.query("""SELECT * FROM competitor_prices ORDER BY timestamp DESC LIMIT 50""") return {"prices": rows} @app.post("/api/campaigns") async def create_campaign(req: CampaignRequest): """إنشاء حملة تسويقية""" result = await orchestrator.delegate_task("marketing_agent", { "type": "create_campaign", "campaign_data": req.dict() }) return result @app.get("/api/analytics") async def get_analytics(): """التقرير التحليلي""" result = await orchestrator.delegate_task("marketing_agent", { "type": "analytics_report" }) return result @app.get("/api/status") async def system_status(): """حالة النظام الكاملة — Mission Control""" return orchestrator.get_status() @app.get("/api/competitor/recommendations") async def get_recommendations(): """توصيات تحسين المبيعات""" result = await orchestrator.delegate_task("competitor_agent", { "type": "price_recommendation" }) return result @app.post("/api/scan-prices") async def scan_prices(): """مسح يدوي لأسعار المنافسين""" result = await orchestrator.delegate_task("competitor_agent", { "type": "scan_prices" }) return result if __name__ == "__main__": uvicorn.run(app, host="0.0.0.0", port=8000)
4.8 أدوات قاعدة البيانات (Tools)
# agents/tools/db_tools.py - أدوات قاعدة البيانات from typing import List, Dict, Any from models.database import Database import json class DatabaseTools: """أدوات قاعدة البيانات المتاحة للوكلاء""" def __init__(self, db: Database): self.db = db def search_products(self, query: str) -> List[Dict]: """البحث عن منتجات""" return self.db.query("""SELECT * FROM products WHERE name LIKE ? OR description LIKE ? LIMIT 10""", (f"%{query}%", f"%{query}%")) def get_product(self, product_id: int) -> Dict: rows = self.db.query("SELECT * FROM products WHERE id = ?", (product_id,)) return rows[0] if rows else {} def create_order(self, customer: str, channel: str, items: List[Dict]) -> int: products_json = json.dumps(items) self.db.execute("""INSERT INTO orders (customer_name, customer_contact, products, total, source) VALUES (?, ?, ?, ?, ?)""", (customer, channel, products_json, sum(item["price"] * item["quantity"] for item in items), channel)) return self.db.query("SELECT last_insert_rowid()")[0]["last_insert_rowid()"] def get_order(self, order_id: int) -> Dict: rows = self.db.query("SELECT * FROM orders WHERE id = ?", (order_id,)) if rows: rows[0]["products"] = json.loads(rows[0]["products"]) return rows[0] if rows else {} def get_low_stock_products(self, threshold: int = 10) -> List[Dict]: return self.db.query("""SELECT * FROM products WHERE stock < ? ORDER BY stock ASC""", (threshold,)) def update_stock(self, product_id: int, quantity: int): self.db.execute("UPDATE products SET stock = stock + ? WHERE id = ?", (quantity, product_id))
5. تشغيل النظام كاملاً
5.1 المتطلبات (Requirements)
# requirements.txt fastapi==0.115.0 uvicorn[standard]==0.32.0 pydantic==2.10.0 httpx==0.28.0 aiosqlite==0.20.0
5.2 Docker Compose
# docker-compose.yml version: '3.8' services: api: build: . ports: - "8000:8000" volumes: - ./data:/app/data environment: - DATABASE_PATH=/app/data/ecommerce.db restart: unless-stopped # يمكن إضافة Redis لتحسين الذاكرة المشتركة redis: image: redis:7-alpine ports: - "6379:6379"
5.3 تشغيل النظام
# 1. تثبيت المتطلبات pip install -r requirements.txt # 2. تشغيل الخادم python main.py # 3. اختبار API curl -X POST http://localhost:8000/api/message \ -H "Content-Type: application/json" \ -d '{"customer": "أحمد", "message": "كم سعر سماعات البلوتوث؟", "channel": "website"}' curl -X POST http://localhost:8000/api/products \ -H "Content-Type: application/json" \ -d '{"name": "سماعات بلوتوث برو", "category": "Electronics", "price": 49.99, "stock": 100, "sku": "BT-PRO-001"}' curl http://localhost:8000/api/status
6. مقارنة: نظامنا vs الحلول التجارية
Gibberish
- مفتوح المصدر
- يدعم MCP
- يركز على التطوير
- يتطلب إعداداً يدوي
Factory Missions
- مؤسساتي (enterprise)
- يدير أياماً كاملة
- Validation Contracts
- يتطلب اشتراك
LangGraph
- مرونة عالية
- يدعم كل النماذج
- مكتبة Python
- منحنى تعلم عالي
نظامنا
- مجاني ومفتوح
- موجّه للمتاجر
- 4 وكلاء متخصصين
- جاهز للتشغيل
الخلاصة: نظامنا مصمم خصيصاً لمجال التجارة الإلكترونية. بينما Factory Missions و LangGraph منصات عامة، نظامنا يجمع بين سهولة الإعداد و التخصص في حل مشاكل المتاجر — مع تطبيق الأنماط الخمسة الأساسية من محاضرة Luke.
7. دليل CLI — أوامر النظام
# إضافة منتج curl -X POST http://localhost:8000/api/products \ -d '{"name": "سماعات بلوتوث برو", "category": "Electronics", "price": 49.99, "stock": 100, "sku": "BT-PRO-001"}' # إرسال رسالة عميل curl -X POST http://localhost:8000/api/message \ -d '{"customer": "أحمد", "message": "سعر لابتوب X", "channel": "telegram"}' # عرض المنتجات curl http://localhost:8000/api/products # فحص أسعار المنافسين يدوياً curl -X POST http://localhost:8000/api/scan-prices # توصيات المبيعات curl http://localhost:8000/api/competitor/recommendations # التقرير التحليلي curl http://localhost:8000/api/analytics # حالة النظام (Mission Control) curl http://localhost:8000/api/status # إنشاء حملة تسويقية curl -X POST http://localhost:8000/api/campaigns \ -d '{"name": "تخفيضات الصيف", "platform": "facebook", "budget": 1000, "target_audience": "18-35"}'
8. الأسئلة الشائعة
9. الخاتمة: المستقبل ملك للنظم البيئية للوكلاء
في محاضرته، ختم Luke برسالة مهمة: “الأشخاص الذين يفكرون بمصطلح النظم البيئية للوكلاء (agent ecosystems)، الذين يطورون حدساً لكيفية تفاعل النماذج المختلفة تحت الضغط — هؤلاء سيكونون من يبنون الجيل القادم من الابتكار.”
طبقنا في هذا المقال كل ما تعلمناه من محاضرة Luke:
- ✅ 5 أنماط للوكلاء المتعددين: Delegation، Creator-Verifier، Direct Communication، Negotiation، Broadcast
- ✅ معمارية 3 أدوار: Orchestrator يخطط، Workers ينفذون، Validators يتحققون
- ✅ Structured Handoffs: تمرير سياق منظم بين الوكلاء
- ✅ Serial Execution: تنفيذ تسلسلي يمنع التضارب
- ✅ Shared Memory: مصدر حقيقة واحد لجميع الوكلاء
- ✅ Broadcast & Negotiation: تواصل منظم بين الوكلاء
النظام الذي بنيناه قابل للتوسع: أضف وكيلاً للمحاسبة، وكيلاً للشحن، وكيلاً لخدمة ما بعد البيع — كل ما تحتاجه هو class جديد يرث من `BaseAgent`.
الخطوات التالية
1. شغل المشروع محلياً وجرب API
2. اربطه مع Telegram Bot API أو WhatsApp Business API
3. أضف وكيل محاسبة (Accounting Agent) يتعامل مع الفواتير
4. استخدم Docker Compose لنشره على سيرفر
5. اربطه مع MCP Server (كما شرحنا في المقال السابق) لتوسيع قدرات الوكيل
تذكير من Luke
“التوازي يبدو أسرع على الورق، لكن معدل الأخطاء يرتفع بشكل كبير. وعندما تعمل على مهام تستمر أياماً، الصحة تتراكم — اختر التنفيذ التسلسلي مع توازٍ مستهدف.”
المقال السابق: بناء MCP Server من الصفر — دليل MCP الشامل لكل مطور 2026
مصادر إضافية: OpenAI Agents SDK · FastAPI Documentation

اترك تعليقاً