bus/¶
Unified result-delivery layer for observers, async inter-agent responses, and task callbacks.
Files¶
bus/envelope.py:Envelope,Origin,DeliveryMode,LockModebus/bus.py:MessageBuscoordinator (submit, lock/inject/deliver pipeline)bus/lock_pool.py: shared per-session lock pool ((chat_id, topic_id))bus/adapters.py: conversion helpers from domain results toEnvelopemessenger/telegram/transport.py: Telegram transport adapter + origin-specific formattingmessenger/matrix/transport.py: Matrix transport adapter for room delivery formatting/routing
Why this module exists¶
Before the refactor, delivery logic was split across multiple deliver_* paths and lock dicts. bus/ centralizes delivery into one pipeline:
- convert result ->
Envelope - optionally acquire shared session lock
- optionally inject prompt into active session
- deliver through registered transport(s)
Envelope model¶
Core fields:
- identity:
origin,chat_id,topic_id - input for injection:
prompt,prompt_preview - output for delivery:
result_text,status,is_error - routing flags:
delivery,lock_mode,needs_injection - telegram metadata:
reply_to_message_id,thread_id - context:
provider,model,session_name,session_id,metadata
Lock key: envelope.lock_key -> (chat_id, topic_id).
MessageBus flow¶
submit(envelope):
- assign
envelope_idwhen missing - run optional audit hook
- if
lock_mode=REQUIRED: lock via sharedLockPool - if
needs_injection: callSessionInjector.inject_prompt(...)(orchestrator) - run optional pre-delivery hook
- transport-aware delivery routing (see below)
Registered transports: TelegramTransport, MatrixTransport.
Transport-aware delivery¶
Each TransportAdapter exposes a transport_name property (e.g. "tg", "mx"). The Envelope.transport field identifies the target transport for routing.
Delivery modes:
- BROADCAST: delivered to all registered transports unconditionally via
deliver_broadcast(). - UNICAST: filtered by
envelope.transport— only the matching transport receives the envelope viadeliver(). Whenenvelope.transportis unset, all transports receive the envelope (backward compatibility).
Cascading fallback for UNICAST:
- If the target transport is not registered (e.g. envelope targets
"tg"but only Matrix is running), the bus falls back to an available transport. - The fallback envelope is rewritten as a BROADCAST with a "Delivery fallback" explanation prepended to the result text, so the user understands the message was intended for a different transport.
Adapter mapping (adapters.py)¶
from_background_result(...)from_cron_result(...)from_heartbeat(...)from_webhook_cron_result(...)from_webhook_wake(...)from_interagent_result(...)from_task_result(...)from_task_question(...)from_user_message(...)(audit-only envelope)
Task/topic nuance:
- task result envelopes map
thread_id -> topic_id - task question envelopes also carry
topic_id - injected responses route back into the originating forum topic session
Wiring¶
- Single-transport mode: the active bot creates
MessageBus(lock_pool=self._lock_pool)and registers its transport - Multi-transport mode:
MultiBotAdaptercreates one sharedMessageBus; each bot registers its own transport adapter run_startup()callsorch.wire_observers_to_bus(bot._bus, wake_handler=...)ObserverManager.wire_to_bus(...)connects cron/heartbeat/background/webhook callbacks in one callbus.set_injector(orchestrator)enables prompt injection paths
Locking model¶
A single LockPool is shared by:
SequentialMiddleware(Telegram ingress)MessageBus(observer/result routing)
ApiServer currently creates its own LockPool for WebSocket session locking, so API locking is separate from the Telegram/message-bus lock domain.