Advanced Patterns

Table of contents

  1. Command Bus & CQRS (v0.2.0) ✅
    1. Command Definition
    2. Command Handler (Type-Safe Generics)
    3. AggregateHandler (Combined Load/Save)
    4. Command Bus
    5. Command Middleware Pipeline
    6. Built-in Middleware Reference
  2. Idempotency (v0.2.0) ✅
    1. Idempotency Store Interface
    2. In-Memory Store (Testing)
    3. PostgreSQL Store (Production)
    4. Idempotency Key Generation
    5. Idempotency Configuration
    6. Idempotency Flow
    7. PostgreSQL Schema
  3. Correlation & Causation Tracking (v0.2.0) ✅
    1. Correlation ID
    2. Causation ID
    3. Metadata Propagation
  4. Saga / Process Manager (v0.5.0) ✅
    1. Core Interfaces
    2. SagaBase - Default Implementation
    3. Example: Order Fulfillment Saga
    4. Saga Manager
    5. Usage Example
    6. Saga Store
    7. PostgreSQL Schema
    8. Testing Sagas
  5. Outbox Pattern
    1. Outbox Design
    2. Outbox Processor
    3. Integration with Event Store

Command Bus & CQRS (v0.2.0) ✅

Complete CQRS pattern implementation with first-class Command support.

Command Definition

// Command represents intent to change state
type Command interface {
    CommandType() string
    Validate() error
}

// CommandBase provides common functionality (embed in your commands)
type CommandBase struct {
    id            string
    correlationID string
    causationID   string
    tenantID      string
    metadata      map[string]string
}

// CommandBase methods
func (c *CommandBase) GetID() string                   { return c.id }
func (c *CommandBase) SetID(id string)                 { c.id = id }
func (c *CommandBase) GetCorrelationID() string        { return c.correlationID }
func (c *CommandBase) SetCorrelationID(id string)      { c.correlationID = id }
func (c *CommandBase) GetCausationID() string          { return c.causationID }
func (c *CommandBase) SetCausationID(id string)        { c.causationID = id }
func (c *CommandBase) GetTenantID() string             { return c.tenantID }
func (c *CommandBase) SetTenantID(id string)           { c.tenantID = id }
func (c *CommandBase) GetMetadata() map[string]string  { return c.metadata }
func (c *CommandBase) SetMetadata(m map[string]string) { c.metadata = m }

// Example command with validation
type CreateOrder struct {
    mink.CommandBase
    CustomerID string `json:"customerId"`
    Items      []Item `json:"items"`
}

func (c CreateOrder) CommandType() string { return "CreateOrder" }
func (c CreateOrder) Validate() error {
    if c.CustomerID == "" {
        return mink.NewValidationError("CreateOrder", "CustomerID", "required")
    }
    return nil
}

Command Handler (Type-Safe Generics)

// GenericHandler provides type-safe command handling
handler := mink.NewGenericHandler[CreateOrder](func(ctx context.Context, cmd CreateOrder) (mink.CommandResult, error) {
    // Create aggregate and apply command
    order := NewOrder(uuid.NewString())
    if err := order.Create(cmd.CustomerID, cmd.Items); err != nil {
        return mink.CommandResult{}, err
    }
    
    // Persist using event store
    if err := store.SaveAggregate(ctx, order); err != nil {
        return mink.CommandResult{}, err
    }
    
    return mink.NewSuccessResult(order.AggregateID(), order.Version()), nil
})

// Or use struct-based handler implementing CommandHandler interface
type CreateOrderHandler struct {
    store *mink.EventStore
}

func (h *CreateOrderHandler) CommandType() string { return "CreateOrder" }

func (h *CreateOrderHandler) Handle(ctx context.Context, cmd mink.Command) (mink.CommandResult, error) {
    c := cmd.(CreateOrder)
    // ... implementation
    return mink.NewSuccessResult("order-123", 1), nil
}

AggregateHandler (Combined Load/Save)

// AggregateHandler automatically loads/saves aggregates
handler := mink.NewAggregateHandler[CreateOrder, *Order](
    store,
    func(cmd CreateOrder) string {
        return cmd.OrderID // Aggregate ID from command
    },
    func() *Order {
        return NewOrder("") // Aggregate factory
    },
    func(ctx context.Context, agg *Order, cmd CreateOrder) error {
        return agg.Create(cmd.CustomerID, cmd.Items)
    },
)

Command Bus

// Create command bus
bus := mink.NewCommandBus()

// Register handlers
bus.Register(handler)                                    // Struct handler
bus.RegisterFunc("CreateOrder", handleFunc)             // Function handler

// Dispatch command
result, err := bus.Dispatch(ctx, CreateOrder{
    CustomerID: "cust-123",
    Items:      []Item,
})

// CommandResult contains execution results
type CommandResult struct {
    AggregateID string          // ID of affected aggregate
    Version     int64           // New version after command
    Events      []interface{}   // Events produced
    Data        interface{}     // Optional return data
}

// Helper constructor
result := mink.NewSuccessResult("order-123", 1)

Command Middleware Pipeline

bus := mink.NewCommandBus()

// Add middleware (executed in order)
bus.Use(mink.ValidationMiddleware())      // Validate commands
bus.Use(mink.RecoveryMiddleware())        // Panic recovery
bus.Use(mink.LoggingMiddleware(logger))   // Log commands
bus.Use(mink.MetricsMiddleware(metrics))  // Record metrics
bus.Use(mink.TimeoutMiddleware(5*time.Second))  // Timeout
bus.Use(mink.RetryMiddleware(3, time.Second))   // Retry on failure
bus.Use(mink.CorrelationIDMiddleware(nil))      // Auto-generate correlation ID
bus.Use(mink.CausationIDMiddleware())           // Track causation chain
bus.Use(mink.TenantMiddleware(func(ctx context.Context) string {
    return ctx.Value("tenantID").(string)  // Multi-tenancy
}))

Built-in Middleware Reference

Middleware Description
ValidationMiddleware() Calls cmd.Validate() before handling
RecoveryMiddleware() Catches panics and returns PanicError
LoggingMiddleware(logger) Logs command start/end with timing
MetricsMiddleware(metrics) Records command count, duration, errors
TimeoutMiddleware(duration) Adds context timeout
RetryMiddleware(attempts, delay) Retries on transient failures
CorrelationIDMiddleware(generator) Sets/generates correlation ID
CausationIDMiddleware() Tracks event causation chain
TenantMiddleware(resolver) Sets tenant ID from context
IdempotencyMiddleware(config) Prevents duplicate processing

Idempotency (v0.2.0) ✅

Prevent duplicate command processing - essential for reliability.

Idempotency Store Interface

// IdempotencyStore tracks processed commands
type IdempotencyStore interface {
    // Store records a command execution result
    Store(ctx context.Context, key string, response []byte, expiration time.Duration) error
    
    // Get retrieves a previously stored result
    Get(ctx context.Context, key string) ([]byte, error)
    
    // Close releases resources
    Close() error
}

In-Memory Store (Testing)

import "github.com/AshkanYarmoradi/go-mink/adapters/memory"

store := memory.NewIdempotencyStore()
defer store.Close()

// Use with middleware
bus.Use(mink.IdempotencyMiddleware(mink.DefaultIdempotencyConfig(store)))

PostgreSQL Store (Production)

import "github.com/AshkanYarmoradi/go-mink/adapters/postgres"

store, err := postgres.NewIdempotencyStore(connStr)
if err != nil {
    log.Fatal(err)
}
defer store.Close()

// Initialize schema (run once)
if err := store.Initialize(ctx); err != nil {
    log.Fatal(err)
}

// Use with middleware
bus.Use(mink.IdempotencyMiddleware(mink.DefaultIdempotencyConfig(store)))

Idempotency Key Generation

// Commands can implement IdempotentCommand for custom keys
type IdempotentCommand interface {
    Command
    IdempotencyKey() string
}

// Example with explicit idempotency key
type CreateOrder struct {
    mink.CommandBase
    ClientRequestID string `json:"clientRequestId"` // Client-provided
    CustomerID      string `json:"customerId"`
}

func (c CreateOrder) IdempotencyKey() string {
    if c.ClientRequestID != "" {
        return c.ClientRequestID
    }
    return "" // Fall back to auto-generation
}

// Auto-generated keys use SHA256 hash of command content
// Format: "CreateOrder:<hash>" or "CreateOrder:type-only:<hash>" for fallback

Idempotency Configuration

// IdempotencyConfig controls middleware behavior
type IdempotencyConfig struct {
    Store       IdempotencyStore        // Required: storage backend
    KeyFunc     func(Command) string    // Optional: custom key generator
    TTL         time.Duration           // Optional: result expiration (default: 24h)
    Serializer  func(CommandResult) []byte  // Optional: result serializer
}

// Use default configuration
config := mink.DefaultIdempotencyConfig(store)
bus.Use(mink.IdempotencyMiddleware(config))

// Or customize
config := mink.IdempotencyConfig{
    Store: store,
    TTL:   1 * time.Hour,
    KeyFunc: func(cmd mink.Command) string {
        // Custom key generation
        return fmt.Sprintf("%s:%s", cmd.CommandType(), extractKey(cmd))
    },
}
bus.Use(mink.IdempotencyMiddleware(config))

Idempotency Flow

1. Command arrives
2. IdempotencyMiddleware generates key
3. Check if key exists in store
   ├─ EXISTS: Return cached result (skip handler)
   └─ NOT EXISTS: Continue to handler
4. Handler processes command
5. Store result with key and TTL
6. Return result to caller

PostgreSQL Schema

-- Created by store.Initialize(ctx)
CREATE TABLE IF NOT EXISTS mink_idempotency (
    key VARCHAR(255) PRIMARY KEY,
    response JSONB NOT NULL,
    created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    expires_at TIMESTAMPTZ NOT NULL
);

CREATE INDEX IF NOT EXISTS idx_mink_idempotency_expires_at 
    ON mink_idempotency(expires_at);

Correlation & Causation Tracking (v0.2.0) ✅

Track request flow through distributed systems.

Correlation ID

Links all events/commands from a single external request.

// Middleware auto-generates or propagates correlation ID
bus.Use(mink.CorrelationIDMiddleware(nil)) // Uses UUID generator

// Or provide custom generator
bus.Use(mink.CorrelationIDMiddleware(func() string {
    return fmt.Sprintf("req-%d", time.Now().UnixNano())
}))

// Access in handler
func (h *Handler) Handle(ctx context.Context, cmd mink.Command) (mink.CommandResult, error) {
    correlationID := cmd.(interface{ GetCorrelationID() string }).GetCorrelationID()
    // Use for logging, tracing, etc.
}

Causation ID

Links events to the command/event that caused them.

// Middleware automatically sets causation ID
bus.Use(mink.CausationIDMiddleware())

// Access in handler
func (h *Handler) Handle(ctx context.Context, cmd mink.Command) (mink.CommandResult, error) {
    causationID := cmd.(interface{ GetCausationID() string }).GetCausationID()
    // The causation ID is the ID of the previous command in the chain
}

Metadata Propagation

// Commands carry metadata through the system
type CreateOrder struct {
    mink.CommandBase
    CustomerID string
}

// Set metadata before dispatch
cmd := CreateOrder{CustomerID: "cust-123"}
cmd.SetCorrelationID("external-request-id")
cmd.SetMetadata(map[string]string{
    "source": "web-api",
    "version": "v2",
})

result, err := bus.Dispatch(ctx, cmd)

Saga / Process Manager (v0.5.0) ✅

Orchestrate long-running business processes across aggregates. Sagas coordinate operations that span multiple aggregates or external services, providing compensation (rollback) when things go wrong.

Core Interfaces

// Saga defines the interface for saga implementations.
// A saga coordinates long-running business processes across multiple aggregates.
type Saga interface {
    // SagaID returns the unique identifier for this saga instance.
    SagaID() string

    // SagaType returns the type of this saga (e.g., "OrderFulfillment").
    SagaType() string

    // Status returns the current status of the saga.
    Status() SagaStatus

    // SetStatus sets the saga status.
    SetStatus(status SagaStatus)

    // CurrentStep returns the current step number (0-based).
    CurrentStep() int

    // SetCurrentStep sets the current step number.
    SetCurrentStep(step int)

    // CorrelationID returns the correlation ID for this saga.
    CorrelationID() string

    // SetCorrelationID sets the correlation ID.
    SetCorrelationID(id string)

    // HandledEvents returns the list of event types this saga handles.
    HandledEvents() []string

    // HandleEvent processes an event and returns commands to dispatch.
    HandleEvent(ctx context.Context, event StoredEvent) ([]Command, error)

    // Compensate is called when the saga needs to rollback.
    Compensate(ctx context.Context, failedStep int, failureReason error) ([]Command, error)

    // IsComplete returns true if the saga has completed successfully.
    IsComplete() bool

    // StartedAt returns when the saga started.
    StartedAt() time.Time

    // CompletedAt returns when the saga completed (nil if not completed).
    CompletedAt() *time.Time

    // Data returns the saga's internal state as a map.
    Data() map[string]interface{}

    // SetData restores the saga's internal state from a map.
    SetData(data map[string]interface{})

    // Version returns the saga version for optimistic concurrency.
    Version() int64
}

// SagaStatus represents the current status of a saga.
type SagaStatus int

const (
    SagaStatusStarted      SagaStatus = iota  // Saga has started
    SagaStatusRunning                         // Actively processing
    SagaStatusCompleted                       // Completed successfully
    SagaStatusFailed                          // Failed without compensation
    SagaStatusCompensating                    // Executing compensation
    SagaStatusCompensated                     // Compensated after failure
)

// SagaState represents the persisted state of a saga.
type SagaState struct {
    ID              string                 `json:"id"`
    Type            string                 `json:"type"`
    CorrelationID   string                 `json:"correlationId,omitempty"`
    Status          SagaStatus             `json:"status"`
    CurrentStep     int                    `json:"currentStep"`
    Data            map[string]interface{} `json:"data,omitempty"`
    ProcessedEvents []string               `json:"processedEvents,omitempty"` // For idempotency
    Steps           []SagaStep             `json:"steps,omitempty"`
    StartedAt       time.Time              `json:"startedAt"`
    UpdatedAt       time.Time              `json:"updatedAt"`
    CompletedAt     *time.Time             `json:"completedAt,omitempty"`
    FailureReason   string                 `json:"failureReason,omitempty"`
    Version         int64                  `json:"version"`
}

// SagaStore defines the interface for saga persistence.
type SagaStore interface {
    Save(ctx context.Context, state *SagaState) error
    Load(ctx context.Context, sagaID string) (*SagaState, error)
    FindByCorrelationID(ctx context.Context, correlationID string) (*SagaState, error)
    FindByType(ctx context.Context, sagaType string, statuses ...SagaStatus) ([]*SagaState, error)
    Delete(ctx context.Context, sagaID string) error
    Close() error
}

SagaBase - Default Implementation

// Embed SagaBase in your saga types to get default behavior
type OrderFulfillmentSaga struct {
    mink.SagaBase  // Provides ID, Type, Status, Version management
    
    // Your saga-specific state
    OrderID        string
    CustomerID     string
    PaymentDone    bool
    InventoryDone  bool
}

func NewOrderFulfillmentSaga(id string) *OrderFulfillmentSaga {
    return &OrderFulfillmentSaga{
        SagaBase: mink.NewSagaBase(id, "OrderFulfillment"),
    }
}

Example: Order Fulfillment Saga

type OrderFulfillmentSaga struct {
    mink.SagaBase
    
    // Saga state
    OrderID         string
    CustomerID      string
    Items           []Item
    TotalAmount     float64
    
    // Step tracking
    PaymentReceived    bool
    InventoryReserved  bool
    ShipmentCreated    bool
}

func (s *OrderFulfillmentSaga) HandledEvents() []string {
    return []string{
        "OrderCreated",
        "PaymentReceived", "PaymentFailed",
        "InventoryReserved", "InventoryReservationFailed",
        "ShipmentCreated", "ShipmentFailed",
    }
}

func (s *OrderFulfillmentSaga) HandleEvent(ctx context.Context, event Event) ([]Command, error) {
    switch e := event.Data.(type) {
    
    case OrderCreated:
        // Step 1: Request payment
        s.OrderID = e.OrderID
        s.CustomerID = e.CustomerID
        s.Items = e.Items
        s.TotalAmount = e.TotalAmount
        return []Command{
            RequestPayment{
                OrderID:    s.OrderID,
                CustomerID: s.CustomerID,
                Amount:     s.TotalAmount,
            },
        }, nil
        
    case PaymentReceived:
        // Step 2: Reserve inventory
        s.PaymentReceived = true
        return []Command{
            ReserveInventory{
                OrderID: s.OrderID,
                Items:   s.Items,
            },
        }, nil
        
    case PaymentFailed:
        // Saga failed at step 1 - no compensation needed
        return nil, &SagaFailedError{Step: 1, Reason: e.Reason}
        
    case InventoryReserved:
        // Step 3: Create shipment
        s.InventoryReserved = true
        return []Command{
            CreateShipment{
                OrderID: s.OrderID,
                Items:   s.Items,
            },
        }, nil
        
    case InventoryReservationFailed:
        // Saga failed at step 2 - compensate payment
        return nil, &SagaFailedError{Step: 2, Reason: e.Reason}
        
    case ShipmentCreated:
        // Saga complete!
        s.ShipmentCreated = true
        return []Command{
            CompleteOrder{OrderID: s.OrderID},
        }, nil
        
    case ShipmentFailed:
        // Saga failed at step 3 - compensate inventory and payment
        return nil, &SagaFailedError{Step: 3, Reason: e.Reason}
    }
    
    return nil, nil
}

func (s *OrderFulfillmentSaga) Compensate(ctx context.Context, failedStep int, err error) ([]Command, error) {
    var commands []Command
    
    // Compensate in reverse order
    if failedStep >= 3 && s.InventoryReserved {
        commands = append(commands, ReleaseInventory{
            OrderID: s.OrderID,
            Items:   s.Items,
        })
    }
    
    if failedStep >= 2 && s.PaymentReceived {
        commands = append(commands, RefundPayment{
            OrderID:    s.OrderID,
            CustomerID: s.CustomerID,
            Amount:     s.TotalAmount,
        })
    }
    
    commands = append(commands, FailOrder{
        OrderID: s.OrderID,
        Reason:  err.Error(),
    })
    
    return commands, nil
}

func (s *OrderFulfillmentSaga) IsComplete() bool {
    return s.PaymentReceived && s.InventoryReserved && s.ShipmentCreated
}

Saga Manager

The SagaManager orchestrates saga lifecycle, event processing, and command dispatch. It provides:

  • Event correlation: Routes events to the correct saga instances
  • Idempotency: Automatically deduplicates events (stored in ProcessedEvents)
  • Concurrency control: Per-saga locking with retry on conflicts
  • Compensation: Triggers rollback commands when sagas fail
// SagaManager orchestrates saga lifecycle
type SagaManager struct {
    store      SagaStore                        // Persists saga state
    eventSub   adapters.SubscriptionAdapter     // Event subscription
    commandBus *CommandBus                      // Dispatches commands
    registry   map[string]SagaFactory           // Creates saga instances
    handlers   map[string][]string              // Event -> Saga type mappings
}

// SagaManagerOption configures the manager
type SagaManagerOption func(*SagaManager)

// Create a new SagaManager
func NewSagaManager(store SagaStore, eventSub adapters.SubscriptionAdapter, 
    commandBus *CommandBus, opts ...SagaManagerOption) *SagaManager

// Register a saga type with the manager
func (m *SagaManager) Register(sagaType string, factory SagaFactory, correlations ...SagaCorrelation)

// Start begins listening for events
func (m *SagaManager) Start(ctx context.Context) error

// Stop gracefully shuts down the manager
func (m *SagaManager) Stop() error

// Compensate manually triggers compensation for a saga
func (m *SagaManager) Compensate(ctx context.Context, sagaID string, reason error) error

// Resume resumes a stalled saga
func (m *SagaManager) Resume(ctx context.Context, sagaID string) error

Usage Example

// Create saga manager
manager := mink.NewSagaManager(
    sagaStore,
    postgresAdapter,  // SubscriptionAdapter
    commandBus,
    mink.WithSagaRetries(3),
    mink.WithSagaTimeout(5 * time.Minute),
)

// Register saga types
manager.Register("OrderFulfillment", 
    func(id string) mink.Saga { return NewOrderFulfillmentSaga(id) },
    mink.SagaCorrelation{
        SagaType:       "OrderFulfillment",
        StartingEvents: []string{"OrderCreated"},
        CorrelationIDFunc: func(e mink.StoredEvent) string {
            return e.StreamID  // Correlate by order stream
        },
    },
)

// Start processing events
go manager.Start(ctx)
defer manager.Stop()

Saga Store

// PostgreSQL implementation
sagaStore := postgres.NewSagaStore(db,
    postgres.WithSagaSchema("myapp"),
    postgres.WithSagaTable("sagas"),
)

// Initialize schema
if err := sagaStore.Initialize(ctx); err != nil {
    log.Fatal(err)
}

// In-memory for testing
testStore := memory.NewSagaStore()

PostgreSQL Schema

-- Saga state storage
CREATE TABLE mink_sagas (
    id VARCHAR(255) PRIMARY KEY,
    type VARCHAR(255) NOT NULL,
    correlation_id VARCHAR(255),
    status INT NOT NULL DEFAULT 0,
    current_step INT NOT NULL DEFAULT 0,
    data JSONB NOT NULL DEFAULT '{}',
    processed_events JSONB NOT NULL DEFAULT '[]',  -- For idempotency
    steps JSONB NOT NULL DEFAULT '[]',
    failure_reason TEXT,
    started_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    completed_at TIMESTAMPTZ,
    version BIGINT NOT NULL DEFAULT 0
);

-- Indexes for efficient querying
CREATE INDEX idx_sagas_correlation ON mink_sagas(correlation_id);
CREATE INDEX idx_sagas_type_status ON mink_sagas(type, status);
CREATE INDEX idx_sagas_started ON mink_sagas(started_at);

-- Partial indexes for active sagas
CREATE INDEX idx_sagas_running ON mink_sagas(type) 
    WHERE status IN (0, 1, 4);  -- Started, Running, Compensating

Testing Sagas

import "github.com/AshkanYarmoradi/go-mink/testing/sagas"

func TestOrderFulfillmentSaga_HappyPath(t *testing.T) {
    saga := NewOrderFulfillmentSaga("saga-123")
    
    // Use the testing fixture
    adapter := sagas.NewMinkSagaAdapter(saga)
    
    sagas.TestSaga(t, adapter).
        GivenEvents(
            createOrderEvent("order-123", "customer-456"),
            paymentReceivedEvent("order-123"),
            inventoryReservedEvent("order-123"),
            shipmentCreatedEvent("order-123"),
        ).
        ThenCompleted().
        ThenCommands(
            RequestPayment{OrderID: "order-123"},
            ReserveInventory{OrderID: "order-123"},
            CreateShipment{OrderID: "order-123"},
            CompleteOrder{OrderID: "order-123"},
        )
}

func TestOrderFulfillmentSaga_Compensation(t *testing.T) {
    saga := NewOrderFulfillmentSaga("saga-123")
    adapter := sagas.NewMinkSagaAdapter(saga)
    
    sagas.TestCompensation(t, adapter).
        GivenFailureAfter(
            createOrderEvent("order-123", "customer-456"),
            paymentReceivedEvent("order-123"),
            inventoryReservationFailedEvent("order-123"),
        ).
        ThenCompensates(
            RefundPayment{OrderID: "order-123"},
            FailOrder{OrderID: "order-123"},
        )
}

Outbox Pattern

Reliable event publishing to external systems.

Outbox Design

// OutboxMessage represents a message to be published
type OutboxMessage struct {
    ID           string
    AggregateID  string
    EventType    string
    Destination  string            // "kafka:orders-topic", "webhook:partner"
    Payload      []byte
    Headers      map[string]string
    ScheduledAt  time.Time
    Attempts     int
    LastError    string
    Status       OutboxStatus
}

type OutboxStatus int
const (
    OutboxPending OutboxStatus = iota
    OutboxProcessing
    OutboxCompleted
    OutboxFailed
)

// Outbox interface
type Outbox interface {
    // Schedule message in same transaction as events
    Schedule(ctx context.Context, tx Transaction, messages []OutboxMessage) error
    
    // Fetch pending messages for processing
    FetchPending(ctx context.Context, limit int) ([]OutboxMessage, error)
    
    // Mark messages as processed
    MarkCompleted(ctx context.Context, ids []string) error
    
    // Mark as failed with error
    MarkFailed(ctx context.Context, id string, err error) error
    
    // Retry failed messages
    RetryFailed(ctx context.Context, maxAttempts int) error
}

Outbox Processor

// OutboxProcessor publishes messages to external systems
type OutboxProcessor struct {
    outbox     Outbox
    publishers map[string]Publisher
    options    ProcessorOptions
}

type ProcessorOptions struct {
    BatchSize     int
    PollInterval  time.Duration
    MaxRetries    int
    RetryBackoff  time.Duration
}

// Publisher sends messages to specific destination
type Publisher interface {
    Publish(ctx context.Context, messages []OutboxMessage) error
    Destination() string // "kafka", "webhook", "sns"
}

// Built-in publishers
func NewKafkaPublisher(brokers []string, opts ...KafkaOption) Publisher
func NewWebhookPublisher(httpClient *http.Client) Publisher
func NewSNSPublisher(snsClient *sns.Client) Publisher

// Processor loop
func (p *OutboxProcessor) Start(ctx context.Context) error {
    ticker := time.NewTicker(p.options.PollInterval)
    defer ticker.Stop()
    
    for {
        select {
        case <-ctx.Done():
            return ctx.Err()
        case <-ticker.C:
            p.processBatch(ctx)
        }
    }
}

func (p *OutboxProcessor) processBatch(ctx context.Context) error {
    messages, _ := p.outbox.FetchPending(ctx, p.options.BatchSize)
    
    // Group by destination
    byDest := make(map[string][]OutboxMessage)
    for _, msg := range messages {
        dest := strings.Split(msg.Destination, ":")[0]
        byDest[dest] = append(byDest[dest], msg)
    }
    
    // Publish to each destination
    for dest, msgs := range byDest {
        publisher := p.publishers[dest]
        if err := publisher.Publish(ctx, msgs); err != nil {
            for _, msg := range msgs {
                p.outbox.MarkFailed(ctx, msg.ID, err)
            }
            continue
        }
        
        ids := make([]string, len(msgs))
        for i, msg := range msgs {
            ids[i] = msg.ID
        }
        p.outbox.MarkCompleted(ctx, ids)
    }
    
    return nil
}

Integration with Event Store

// Event store with outbox support
type EventStoreWithOutbox struct {
    *EventStore
    outbox Outbox
    routes []OutboxRoute
}

type OutboxRoute struct {
    EventTypes  []string
    Destination string
    Transform   func(Event) ([]byte, error)
}

// Append with automatic outbox scheduling
func (s *EventStoreWithOutbox) Append(ctx context.Context, streamID string, 
    events []interface{}, opts ...AppendOption) error {
    
    return s.adapter.WithTransaction(ctx, func(tx Transaction) error {
        // Store events
        stored, err := s.appendInTx(ctx, tx, streamID, events, opts...)
        if err != nil {
            return err
        }
        
        // Schedule outbox messages
        var messages []OutboxMessage
        for _, event := range stored {
            for _, route := range s.routes {
                if !contains(route.EventTypes, event.Type) {
                    continue
                }
                payload, _ := route.Transform(event)
                messages = append(messages, OutboxMessage{
                    ID:          uuid.NewString(),
                    AggregateID: streamID,
                    EventType:   event.Type,
                    Destination: route.Destination,
                    Payload:     payload,
                    ScheduledAt: time.Now(),
                })
            }
        }
        
        return s.outbox.Schedule(ctx, tx, messages)
    })
}

// Configuration
store := mink.NewEventStoreWithOutbox(adapter, outbox,
    mink.OutboxRoute{
        EventTypes:  []string{"OrderCreated", "OrderShipped"},
        Destination: "kafka:orders",
        Transform:   mink.JSONTransform,
    },
    mink.OutboxRoute{
        EventTypes:  []string{"OrderShipped"},
        Destination: "webhook:shipping-partner",
        Transform:   shippingWebhookTransform,
    },
)

Next: Testing →