messenger/¶
Transport abstraction layer: protocols, capabilities, registry, and multi-transport adapter. Everything in this package is transport-agnostic. Concrete transports live in sub-packages (messenger/telegram/, messenger/matrix/).
For transport-specific details see bot.md (Telegram) and matrix.md (Matrix).
Files¶
| File | Purpose |
|---|---|
messenger/__init__.py | Public re-exports for protocols, command classification, send options, multi-transport helpers, and bot factory |
messenger/commands.py | Shared direct/orchestrator/multi-agent command sets + classify_command() |
messenger/callback_router.py | Shared callback-data dispatch helpers for selector/button routing |
messenger/protocol.py | BotProtocol — runtime-checkable interface every transport implements |
messenger/capabilities.py | MessengerCapabilities dataclass + per-transport presets |
messenger/registry.py | create_bot() factory + _TRANSPORT_FACTORIES dispatch table |
messenger/notifications.py | NotificationService protocol + CompositeNotificationService fan-out |
messenger/send_opts.py | Base send-option model shared by transport senders |
messenger/multi.py | MultiBotAdapter — multi-transport facade behind BotProtocol |
BotProtocol¶
BotProtocol (protocol.py) is a typing.Protocol decorated with @runtime_checkable. The supervisor, AgentStack, and InterAgentBus depend only on this protocol, never on transport-specific classes.
Required surface:
| Member | Kind | Description |
|---|---|---|
orchestrator | property | Current Orchestrator (or None before startup) |
config | property | AgentConfig |
notification_service | property | NotificationService |
run() | async | Start event loop, block until shutdown, return exit code |
shutdown() | async | Graceful teardown |
register_startup_hook(hook) | method | Callback invoked after orchestrator creation |
set_abort_all_callback(cb) | method | Multi-agent abort injection point |
on_async_interagent_result(result) | async | Deliver async inter-agent result |
on_task_result(result) | async | Deliver background task completion |
on_task_question(...) | async | Deliver background task question |
file_roots(paths) | method | Allowed root directories for file sends |
Both TelegramBot and MatrixBot implement this protocol.
MessengerCapabilities¶
MessengerCapabilities (capabilities.py) is a frozen, slotted dataclass that declares what a transport supports:
| Field | Type | Default |
|---|---|---|
name | str | "" |
supports_inline_buttons | bool | False |
supports_reactions | bool | False |
supports_message_editing | bool | False |
supports_threads | bool | False |
supports_typing_indicator | bool | True |
supports_file_send | bool | True |
supports_streaming_edit | bool | False |
supports_seen_indicator | bool | False |
max_message_length | int | 4096 |
Two presets are shipped:
| Preset | Key differences |
|---|---|
TELEGRAM_CAPABILITIES | inline buttons, message editing, threads, streaming edit, seen indicator, 4096 char limit |
MATRIX_CAPABILITIES | reactions (no inline buttons), no message editing, no threads, seen indicator, 40000 char limit |
Orchestrator and delivery code queries capabilities at runtime to decide between streaming-edit vs. segment-based streaming, inline buttons vs. reaction buttons, etc.
supports_seen_indicator signals whether the transport can acknowledge incoming messages with a "seen" indicator. Telegram uses an emoji reaction; Matrix uses a read receipt. The feature is gated by config.scene.reaction_style -- when set to "off", no reactions are sent. When "seen", only 👀 and 👌 are shown. When "detailed", intermediate statuses (🤔 thinking, ✍️ tool use, 💯 compacting) are also displayed.
Transport Registry¶
create_bot() (registry.py) is the single entry point for bot construction. It inspects config.is_multi_transport:
- Single transport: looks up the transport name in
_TRANSPORT_FACTORIESand calls the matching factory. - Multi transport: returns a
MultiBotAdapterwrapping all configured transports.
_TRANSPORT_FACTORIES is a dict[str, _Factory] mapping transport names to lazy-import factory functions:
_TRANSPORT_FACTORIES: dict[str, _Factory] = {
"telegram": _create_telegram,
"matrix": _create_matrix,
}
Each factory accepts (config, *, agent_name, bus, lock_pool) and returns a BotProtocol. Imports are deferred inside the factory body so that unused transports do not need their dependencies installed.
Raises ValueError for unknown transport names.
NotificationService¶
NotificationService (notifications.py) is a runtime-checkable protocol with two methods:
notify(chat_id, text)— send to a specific chat/room.notify_all(text)— broadcast to all authorized users/rooms.
Both TelegramNotificationService and MatrixNotificationService implement this protocol. The supervisor and bus use it without knowing which transport is active.
CompositeNotificationService¶
CompositeNotificationService fans out calls to multiple underlying services. It holds a list[NotificationService] and iterates sequentially on both notify() and notify_all().
Used by MultiBotAdapter to aggregate all transports' notification services.
Multi-Transport Mode¶
MultiBotAdapter (multi.py) wraps multiple transport bots behind a single BotProtocol facade. It is returned by create_bot() when config.is_multi_transport is true.
Construction¶
- Creates a shared
LockPoolandMessageBus. - Iterates
config.transportsand calls_create_single_bot()for each, injecting the shared bus and lock pool. - First bot becomes the primary; the rest are secondaries.
- Builds a
CompositeNotificationServicefrom all bots.
Startup sequence (run())¶
- Registers a startup hook on the primary that sets an
asyncio.Event. - Launches the primary bot as an
asyncio.Task. - Waits for the orchestrator-ready event.
- Injects the primary's orchestrator into all secondary bots.
- Launches secondary bots as tasks.
asyncio.wait(FIRST_COMPLETED)— when any bot finishes, the rest are cancelled.- Returns the exit code from the first completed bot (e.g.
42for restart).
Delegation rules¶
| Method | Delegation |
|---|---|
orchestrator | primary |
config | own _config |
notification_service | CompositeNotificationService |
register_startup_hook | primary |
set_abort_all_callback | all bots |
on_async_interagent_result | all bots |
on_task_result | all bots |
on_task_question | all bots |
file_roots | primary |
shutdown | all bots |
Shared resources¶
All bots in a MultiBotAdapter share:
MessageBus— single instance for cross-transport envelope deliveryLockPool— single instance for per-chat lockingOrchestrator— created by the primary, injected into secondaries
Adding a New Transport¶
-
Create the sub-package
messenger/<name>/with at least a bot module implementingBotProtocol. -
Define capabilities in
capabilities.py:
- Add a factory in
registry.py:
def _create_discord(
config: AgentConfig,
*,
agent_name: str,
bus: MessageBus | None,
lock_pool: LockPool | None,
) -> BotProtocol:
from sygen_bot.messenger.discord.bot import DiscordBot
return DiscordBot(config, agent_name=agent_name,
bus=bus, lock_pool=lock_pool)
_TRANSPORT_FACTORIES["discord"] = _create_discord
-
Implement
NotificationServicefor the transport soCompositeNotificationServicecan include it. -
Add a
MessageBustransport adapter (transport.py) that mapsEnvelopeobjects to the transport's native send API. -
Guard the dependency behind an optional extra in
pyproject.tomland use deferred imports in the factory so the package is not required unless the transport is selected. -
Add config fields to
AgentConfigfor the new transport (credentials, allowed users/rooms, etc.). -
Write tests — mock the transport client and verify the bot satisfies
isinstance(bot, BotProtocol).