DispatchBus
The DispatchBus provides asynchronous inter-agent communication. Agents emit events that other agents process later, enabling loose coupling.
Architecture
┌─────────────┐ emit() ┌──────────────────┐
│ Suasor │ ───────────────►│ _smrt_dispatch │
│ Agent │ │ (pending) │
└─────────────┘ └────────┬─────────┘
│
│ process('Fiscus')
▼
┌─────────────┐ handleDispatch() ┌──────────────────┐
│ Fiscus │ ◄───────────────────│ DispatchBus │
│ Agent │ └──────────────────┘
└─────────────┘ Creating a DispatchBus
import { createDispatchBus } from '@happyvertical/smrt-core';
const bus = await createDispatchBus({
db: { type: 'sqlite', url: 'app.db' }
});
// Or with existing DatabaseInterface
const bus = await createDispatchBus({ db: existingDb }); Emitting Events
const dispatch = await bus.emit(
'campaign.completed', // Signal type
{ campaignId: 'camp-123', revenue: 5000 }, // Payload
{
source: 'Suasor', // Emitting agent
sourceId: 'instance-1', // Optional instance ID
metadata: { // Optional trace metadata
traceId: 'trace-abc123',
environment: 'production'
}
}
); The dispatch is persisted immediately with status: 'pending'.
Subscriptions
Persistent Subscriptions
Survive application restarts:
// Create subscription
await bus.subscribe({
signalType: 'campaign.completed',
subscriber: 'Fiscus',
handler: 'handleCampaignRevenue',
enabled: true
});
// Remove subscription
await bus.unsubscribe('campaign.completed', 'Fiscus');
// List subscriptions
const subs = await bus.listSubscriptions('Fiscus'); In-memory Handlers
For immediate reactions:
// Register handler (fire-and-forget)
bus.on('campaign.completed', async (payload, metadata) => {
console.log(`Campaign ${payload.campaignId} completed`);
await sendSlackNotification(payload);
});
// Unregister
bus.off('campaign.completed', handler); Wildcard Subscriptions
Pattern matching for event families:
| Pattern | Matches | Does NOT Match |
|---|---|---|
campaign.* | campaign.started, campaign.completed | campaign.summer.started |
agent.*.completed | agent.suasor.completed, agent.fiscus.completed | agent.suasor.started |
* | All events | - |
*.*.completed | agent.suasor.completed, task.backup.completed | agent.completed |
Rules:
*matches exactly one segment (anything except.)- Multiple wildcards allowed
- Disabled subscriptions never match
Handler Types
In-memory Handlers
- Called synchronously when emit() is invoked
- Fire-and-forget (errors logged, not propagated)
- Useful for real-time notifications
Persistent Subscriptions
- Stored in database
- Processed when process() is called
- Supports retry on failure
- Survives restarts
// In-memory (immediate)
bus.on('order.placed', async (payload, metadata) => {
await sendConfirmationEmail(payload);
});
// Persistent (processed later)
await bus.subscribe({
signalType: 'order.placed',
subscriber: 'Fulfillment'
});
// Process pending dispatches
await bus.process('Fulfillment', async (payload, metadata) => {
await createShipment(payload);
}); Processing Dispatches
const count = await bus.process(
'Fiscus', // Subscriber name
async (payload, metadata) => { // Handler function
console.log(`Processing ${metadata.type}`);
console.log(`From: ${metadata.source}`);
console.log(`Trace: ${metadata.metadata.traceId}`);
// Process the dispatch...
},
{
limit: 100, // Max dispatches to process
signalTypes: ['campaign.completed'] // Optional filter
}
);
console.log(`Processed ${count} dispatches`); Handler receives:
payload- Original dispatch datametadata- Dispatch metadata (id, type, source, attempts, etc.)
Dispatch Lifecycle
pending ──► processing ──► completed
└──► failed ──► (retry) ──► pending | Status | Description |
|---|---|
pending | Waiting to be processed |
processing | Currently being handled |
completed | Successfully processed |
failed | Handler threw an error |
Retry Mechanism
// Retry failed dispatches
const resetCount = await bus.retry({
maxAttempts: 3, // Only retry if under this limit
signalTypes: ['campaign.*'], // Optional filter
olderThan: new Date('2024-01-01') // Optional age filter
});
console.log(`Reset ${resetCount} dispatches for retry`);
// Then process again
await bus.process('Fiscus', handler); Retry resets status from failed to pending. The attempts counter is preserved.
Cleanup
Delete old dispatches to manage database size:
const result = await bus.cleanup({
completedOlderThanDays: 30,
failedOlderThanDays: 90
});
console.log(`Deleted ${result.completedDeleted} completed`);
console.log(`Deleted ${result.failedDeleted} failed`); Query Dispatches
// List with filters
const pending = await bus.list({
status: 'pending',
source: 'Suasor',
type: 'campaign.completed',
limit: 50,
offset: 0,
orderBy: 'created_at DESC'
});
// Get by ID
const dispatch = await bus.get('dispatch-uuid'); Tracing
Include metadata for observability:
await bus.emit(
'order.placed',
{ orderId: 'ord-123' },
{
source: 'Checkout',
sourceId: 'checkout-instance-1',
metadata: {
traceId: 'trace-abc123',
spanId: 'span-xyz789',
requestId: 'req-456',
userId: 'user-789'
}
}
);
// Access in handler
await bus.process('Billing', async (payload, metadata) => {
logger.info('Processing order', {
dispatchId: metadata.id,
traceId: metadata.metadata.traceId,
source: metadata.source,
attempts: metadata.attempts
});
}); DispatchMetadata Fields
| Field | Type | Description |
|---|---|---|
id | string | Dispatch UUID |
type | string | Signal type |
source | string | Emitting agent name |
sourceId | string | Instance identifier |
createdAt | Date | Creation timestamp |
attempts | number | Processing attempts |
metadata | Record<string, unknown> | Custom trace data |
Internal Tables
_smrt_dispatch
Stores dispatch messages:
| Column | Type | Description |
|---|---|---|
id | TEXT | Unique identifier |
type | TEXT | Signal type (e.g., campaign.completed) |
source | TEXT | Emitting agent name |
source_id | TEXT | Optional instance identifier |
payload | TEXT | JSON-encoded payload |
status | TEXT | pending, processing, completed, failed |
attempts | INTEGER | Number of processing attempts |
last_error | TEXT | Error message from last failure |
processed_by | TEXT | Subscriber that processed |
metadata | TEXT | JSON-encoded trace metadata |
_smrt_dispatch_subscriptions
Stores persistent subscriptions:
| Column | Type | Description |
|---|---|---|
id | TEXT | Unique identifier |
signal_type | TEXT | Pattern to match (supports wildcards) |
subscriber | TEXT | Agent name |
handler | TEXT | Method name to invoke |
enabled | INTEGER | 1 = active, 0 = disabled |
CLI Commands
# List dispatches
smrt dispatch:list
smrt dispatch:list --status pending
smrt dispatch:list --source Suasor
# Process pending
smrt dispatch:process --subscriber Fiscus
# Retry failed
smrt dispatch:retry --max-attempts 3
# Cleanup old
smrt dispatch:cleanup --completed-older-than 30
# Manage subscriptions
smrt dispatch:subscriptions --subscriber Fiscus
smrt dispatch:subscribe --signal-type campaign.* --subscriber Fiscus
smrt dispatch:unsubscribe --signal-type campaign.* --subscriber Fiscus Best Practices
Signal Types
// Good - clear hierarchy
'campaign.completed'
'order.payment.failed'
'agent.fiscus.ready'
// Bad - ambiguous
'done'
'error'
'data' Always Include Source
// Good - traceable
await bus.emit('event', payload, { source: 'Suasor', sourceId: 'instance-1' });
// Bad - unknown origin
await bus.emit('event', payload); Handle Failures Gracefully
await bus.process('Fiscus', async (payload, metadata) => {
try {
await processPayload(payload);
} catch (error) {
// Log but rethrow to mark as failed
console.error(`Failed to process ${metadata.id}:`, error);
throw error;
}
}); Monitor with Wildcards
// Monitor all events for logging/metrics
await bus.subscribe({
signalType: '*',
subscriber: 'MetricsCollector'
}); Schedule Cleanup
// In a scheduled job (daily)
await bus.cleanup({
completedOlderThanDays: 7,
failedOlderThanDays: 30
});