s-m-r-t

DispatchBus

The DispatchBus provides asynchronous inter-agent communication. Agents emit events that other agents process later, enabling loose coupling.

Architecture

┌─────────────┐     emit()      ┌──────────────────┐
│   Suasor    │ ───────────────►│  _smrt_dispatch  │
│   Agent     │                 │     (pending)    │
└─────────────┘                 └────────┬─────────┘
                                         │
                                         │ process('Fiscus')
                                         ▼
┌─────────────┐   handleDispatch()  ┌──────────────────┐
│   Fiscus    │ ◄───────────────────│  DispatchBus     │
│   Agent     │                     └──────────────────┘
└─────────────┘

Creating a DispatchBus

import { createDispatchBus } from '@happyvertical/smrt-core';

const bus = await createDispatchBus({
  db: { type: 'sqlite', url: 'app.db' }
});

// Or with existing DatabaseInterface
const bus = await createDispatchBus({ db: existingDb });

Emitting Events

const dispatch = await bus.emit(
  'campaign.completed',                    // Signal type
  { campaignId: 'camp-123', revenue: 5000 }, // Payload
  {
    source: 'Suasor',                      // Emitting agent
    sourceId: 'instance-1',                // Optional instance ID
    metadata: {                            // Optional trace metadata
      traceId: 'trace-abc123',
      environment: 'production'
    }
  }
);

The dispatch is persisted immediately with status: 'pending'.

Subscriptions

Persistent Subscriptions

Survive application restarts:

// Create subscription
await bus.subscribe({
  signalType: 'campaign.completed',
  subscriber: 'Fiscus',
  handler: 'handleCampaignRevenue',
  enabled: true
});

// Remove subscription
await bus.unsubscribe('campaign.completed', 'Fiscus');

// List subscriptions
const subs = await bus.listSubscriptions('Fiscus');

In-memory Handlers

For immediate reactions:

// Register handler (fire-and-forget)
bus.on('campaign.completed', async (payload, metadata) => {
  console.log(`Campaign ${payload.campaignId} completed`);
  await sendSlackNotification(payload);
});

// Unregister
bus.off('campaign.completed', handler);

Wildcard Subscriptions

Pattern matching for event families:

PatternMatchesDoes NOT Match
campaign.*campaign.started, campaign.completedcampaign.summer.started
agent.*.completedagent.suasor.completed, agent.fiscus.completedagent.suasor.started
*All events-
*.*.completedagent.suasor.completed, task.backup.completedagent.completed

Rules:

  • * matches exactly one segment (anything except .)
  • Multiple wildcards allowed
  • Disabled subscriptions never match

Handler Types

In-memory Handlers

  • Called synchronously when emit() is invoked
  • Fire-and-forget (errors logged, not propagated)
  • Useful for real-time notifications

Persistent Subscriptions

  • Stored in database
  • Processed when process() is called
  • Supports retry on failure
  • Survives restarts
// In-memory (immediate)
bus.on('order.placed', async (payload, metadata) => {
  await sendConfirmationEmail(payload);
});

// Persistent (processed later)
await bus.subscribe({
  signalType: 'order.placed',
  subscriber: 'Fulfillment'
});

// Process pending dispatches
await bus.process('Fulfillment', async (payload, metadata) => {
  await createShipment(payload);
});

Processing Dispatches

const count = await bus.process(
  'Fiscus',                              // Subscriber name
  async (payload, metadata) => {         // Handler function
    console.log(`Processing ${metadata.type}`);
    console.log(`From: ${metadata.source}`);
    console.log(`Trace: ${metadata.metadata.traceId}`);

    // Process the dispatch...
  },
  {
    limit: 100,                          // Max dispatches to process
    signalTypes: ['campaign.completed']  // Optional filter
  }
);

console.log(`Processed ${count} dispatches`);

Handler receives:

  • payload - Original dispatch data
  • metadata - Dispatch metadata (id, type, source, attempts, etc.)

Dispatch Lifecycle

pending ──► processing ──► completed
                     └──► failed ──► (retry) ──► pending
StatusDescription
pendingWaiting to be processed
processingCurrently being handled
completedSuccessfully processed
failedHandler threw an error

Retry Mechanism

// Retry failed dispatches
const resetCount = await bus.retry({
  maxAttempts: 3,                        // Only retry if under this limit
  signalTypes: ['campaign.*'],           // Optional filter
  olderThan: new Date('2024-01-01')      // Optional age filter
});

console.log(`Reset ${resetCount} dispatches for retry`);

// Then process again
await bus.process('Fiscus', handler);

Retry resets status from failed to pending. The attempts counter is preserved.

Cleanup

Delete old dispatches to manage database size:

const result = await bus.cleanup({
  completedOlderThanDays: 30,
  failedOlderThanDays: 90
});

console.log(`Deleted ${result.completedDeleted} completed`);
console.log(`Deleted ${result.failedDeleted} failed`);

Query Dispatches

// List with filters
const pending = await bus.list({
  status: 'pending',
  source: 'Suasor',
  type: 'campaign.completed',
  limit: 50,
  offset: 0,
  orderBy: 'created_at DESC'
});

// Get by ID
const dispatch = await bus.get('dispatch-uuid');

Tracing

Include metadata for observability:

await bus.emit(
  'order.placed',
  { orderId: 'ord-123' },
  {
    source: 'Checkout',
    sourceId: 'checkout-instance-1',
    metadata: {
      traceId: 'trace-abc123',
      spanId: 'span-xyz789',
      requestId: 'req-456',
      userId: 'user-789'
    }
  }
);

// Access in handler
await bus.process('Billing', async (payload, metadata) => {
  logger.info('Processing order', {
    dispatchId: metadata.id,
    traceId: metadata.metadata.traceId,
    source: metadata.source,
    attempts: metadata.attempts
  });
});

DispatchMetadata Fields

FieldTypeDescription
idstringDispatch UUID
typestringSignal type
sourcestringEmitting agent name
sourceIdstringInstance identifier
createdAtDateCreation timestamp
attemptsnumberProcessing attempts
metadataRecord<string, unknown>Custom trace data

Internal Tables

_smrt_dispatch

Stores dispatch messages:

ColumnTypeDescription
idTEXTUnique identifier
typeTEXTSignal type (e.g., campaign.completed)
sourceTEXTEmitting agent name
source_idTEXTOptional instance identifier
payloadTEXTJSON-encoded payload
statusTEXTpending, processing, completed, failed
attemptsINTEGERNumber of processing attempts
last_errorTEXTError message from last failure
processed_byTEXTSubscriber that processed
metadataTEXTJSON-encoded trace metadata

_smrt_dispatch_subscriptions

Stores persistent subscriptions:

ColumnTypeDescription
idTEXTUnique identifier
signal_typeTEXTPattern to match (supports wildcards)
subscriberTEXTAgent name
handlerTEXTMethod name to invoke
enabledINTEGER1 = active, 0 = disabled

CLI Commands

# List dispatches
smrt dispatch:list
smrt dispatch:list --status pending
smrt dispatch:list --source Suasor

# Process pending
smrt dispatch:process --subscriber Fiscus

# Retry failed
smrt dispatch:retry --max-attempts 3

# Cleanup old
smrt dispatch:cleanup --completed-older-than 30

# Manage subscriptions
smrt dispatch:subscriptions --subscriber Fiscus
smrt dispatch:subscribe --signal-type campaign.* --subscriber Fiscus
smrt dispatch:unsubscribe --signal-type campaign.* --subscriber Fiscus

Best Practices

Signal Types

// Good - clear hierarchy
'campaign.completed'
'order.payment.failed'
'agent.fiscus.ready'

// Bad - ambiguous
'done'
'error'
'data'

Always Include Source

// Good - traceable
await bus.emit('event', payload, { source: 'Suasor', sourceId: 'instance-1' });

// Bad - unknown origin
await bus.emit('event', payload);

Handle Failures Gracefully

await bus.process('Fiscus', async (payload, metadata) => {
  try {
    await processPayload(payload);
  } catch (error) {
    // Log but rethrow to mark as failed
    console.error(`Failed to process ${metadata.id}:`, error);
    throw error;
  }
});

Monitor with Wildcards

// Monitor all events for logging/metrics
await bus.subscribe({
  signalType: '*',
  subscriber: 'MetricsCollector'
});

Schedule Cleanup

// In a scheduled job (daily)
await bus.cleanup({
  completedOlderThanDays: 7,
  failedOlderThanDays: 30
});