Part 6: Middleware and Cross-Cutting Concerns

Adding logging, validation, retries, idempotency, and more.

Table of contents

  1. What is Middleware?
  2. Middleware Signature
    1. Basic Structure
  3. Built-in Middleware
    1. Validation
    2. Recovery (Panic Handler)
    3. Logging
    4. Retry with Backoff
    5. Correlation ID
    6. Idempotency
  4. Recommended Order
  5. Custom Middleware
    1. Timing
    2. Authentication
  6. Observability Middleware (v0.4.0)
    1. Prometheus Metrics
    2. OpenTelemetry Tracing
  7. Key Takeaways

This is Part 6 of an 8-part series on Event Sourcing and CQRS with Go.


What is Middleware?

Middleware wraps command execution—think of it as an onion:

┌─────────────────────────────────────────────────────────────┐
│ Logging Middleware                                          │
│  ┌───────────────────────────────────────────────────────┐  │
│  │ Validation Middleware                                 │  │
│  │  ┌─────────────────────────────────────────────────┐  │  │
│  │  │ Recovery Middleware                             │  │  │
│  │  │  ┌───────────────────────────────────────────┐  │  │  │
│  │  │  │            Handler                        │  │  │  │
│  │  │  └───────────────────────────────────────────┘  │  │  │
│  │  └─────────────────────────────────────────────────┘  │  │
│  └───────────────────────────────────────────────────────┘  │
└─────────────────────────────────────────────────────────────┘

Middleware Signature

type MiddlewareFunc func(ctx context.Context, cmd Command) (CommandResult, error)
type Middleware func(next MiddlewareFunc) MiddlewareFunc

Basic Structure

func MyMiddleware() mink.Middleware {
    return func(next mink.MiddlewareFunc) mink.MiddlewareFunc {
        return func(ctx context.Context, cmd mink.Command) (mink.CommandResult, error) {
            // Before
            fmt.Printf("About to handle %s\n", cmd.CommandType())

            result, err := next(ctx, cmd)

            // After
            fmt.Printf("Finished handling %s\n", cmd.CommandType())

            return result, err
        }
    }
}

Built-in Middleware

Validation

bus := mink.NewCommandBus(
    mink.WithMiddleware(mink.ValidationMiddleware()),
)

Recovery (Panic Handler)

mink.RecoveryMiddleware()  // Catches panics, returns clean errors

Logging

mink.LoggingMiddleware(logger)

Retry with Backoff

mink.RetryMiddleware(mink.RetryConfig{
    MaxAttempts:  3,
    InitialDelay: 100 * time.Millisecond,
    MaxDelay:     2 * time.Second,
    Multiplier:   2.0,
})

Correlation ID

mink.CorrelationIDMiddleware(nil)  // Auto-generates if not present

Idempotency

mink.IdempotencyMiddleware(mink.IdempotencyConfig{
    Store: idempotencyStore,
    TTL:   24 * time.Hour,
})

bus := mink.NewCommandBus(
    mink.WithMiddleware(
        mink.RecoveryMiddleware(),        // 1. Catch panics
        mink.LoggingMiddleware(logger),   // 2. Log everything
        mink.MetricsMiddleware(collector),// 3. Track metrics
        mink.TimeoutMiddleware(30*time.Second),
        mink.CorrelationIDMiddleware(nil),
        mink.ValidationMiddleware(),      // 7. Validate
        mink.RetryMiddleware(config),     // 8. Retry transient failures
        mink.IdempotencyMiddleware(config),// 9. Prevent duplicates
    ),
)

Custom Middleware

Timing

func TimingMiddleware() mink.Middleware {
    return func(next mink.MiddlewareFunc) mink.MiddlewareFunc {
        return func(ctx context.Context, cmd mink.Command) (mink.CommandResult, error) {
            start := time.Now()
            result, err := next(ctx, cmd)
            log.Printf("%s took %v", cmd.CommandType(), time.Since(start))
            return result, err
        }
    }
}

Authentication

func AuthMiddleware(authService AuthService) mink.Middleware {
    return func(next mink.MiddlewareFunc) mink.MiddlewareFunc {
        return func(ctx context.Context, cmd mink.Command) (mink.CommandResult, error) {
            userID := UserIDFromContext(ctx)
            if !authService.CanExecute(userID, cmd.CommandType()) {
                return mink.NewErrorResult(ErrForbidden), ErrForbidden
            }
            return next(ctx, cmd)
        }
    }
}

Observability Middleware (v0.4.0)

Prometheus Metrics

import "github.com/AshkanYarmoradi/go-mink/middleware/metrics"

// Create metrics
m := metrics.New(
    metrics.WithNamespace("myapp"),
    metrics.WithMetricsServiceName("order-service"),
)

// Register with Prometheus
m.MustRegister()

// Add to command bus
bus.Use(m.CommandMiddleware())

// Wrap event store
metricsStore := m.WrapEventStore(adapter)

// Wrap projections
metricsProjection := m.WrapProjection(projection)

Collected Metrics:

  • mink_commands_total - Command count by type/status
  • mink_command_duration_seconds - Command execution histogram
  • mink_commands_in_flight - Currently executing commands
  • mink_eventstore_operations_total - Event store operations
  • mink_projections_processed_total - Projection processing
  • mink_projection_lag_events - Projection lag

OpenTelemetry Tracing

import "github.com/AshkanYarmoradi/go-mink/middleware/tracing"

// Create tracer
tracer := tracing.NewTracer(
    tracing.WithServiceName("order-service"),
    tracing.WithTracerProvider(provider), // Optional custom provider
)

// Add to command bus
bus.Use(tracer.CommandMiddleware())

// Wrap event store
tracedStore := tracing.NewEventStoreMiddleware(adapter, tracer)

// Add events to current span
tracing.AddEvent(ctx, "Processing order", map[string]string{"order_id": "123"})

Key Takeaways

  1. Middleware is an onion: Commands flow in, results flow out
  2. Order matters: Place broad concerns outside, specific inside
  3. Recovery first: Catch panics before anything else
  4. Idempotency last: Closest to handler for caching
  5. Custom middleware is simple: Just a function wrapping a function

← Part 5: CQRS Part 7: Projections →