Part 6: Middleware and Cross-Cutting Concerns
Adding logging, validation, retries, idempotency, and more.
Table of contents
- What is Middleware?
- Middleware Signature
- Built-in Middleware
- Recommended Order
- Custom Middleware
- Observability Middleware (v0.4.0)
- Key Takeaways
This is Part 6 of an 8-part series on Event Sourcing and CQRS with Go.
What is Middleware?
Middleware wraps command execution—think of it as an onion:
┌─────────────────────────────────────────────────────────────┐
│ Logging Middleware │
│ ┌───────────────────────────────────────────────────────┐ │
│ │ Validation Middleware │ │
│ │ ┌─────────────────────────────────────────────────┐ │ │
│ │ │ Recovery Middleware │ │ │
│ │ │ ┌───────────────────────────────────────────┐ │ │ │
│ │ │ │ Handler │ │ │ │
│ │ │ └───────────────────────────────────────────┘ │ │ │
│ │ └─────────────────────────────────────────────────┘ │ │
│ └───────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────┘
Middleware Signature
type MiddlewareFunc func(ctx context.Context, cmd Command) (CommandResult, error)
type Middleware func(next MiddlewareFunc) MiddlewareFunc
Basic Structure
func MyMiddleware() mink.Middleware {
return func(next mink.MiddlewareFunc) mink.MiddlewareFunc {
return func(ctx context.Context, cmd mink.Command) (mink.CommandResult, error) {
// Before
fmt.Printf("About to handle %s\n", cmd.CommandType())
result, err := next(ctx, cmd)
// After
fmt.Printf("Finished handling %s\n", cmd.CommandType())
return result, err
}
}
}
Built-in Middleware
Validation
bus := mink.NewCommandBus(
mink.WithMiddleware(mink.ValidationMiddleware()),
)
Recovery (Panic Handler)
mink.RecoveryMiddleware() // Catches panics, returns clean errors
Logging
mink.LoggingMiddleware(logger)
Retry with Backoff
mink.RetryMiddleware(mink.RetryConfig{
MaxAttempts: 3,
InitialDelay: 100 * time.Millisecond,
MaxDelay: 2 * time.Second,
Multiplier: 2.0,
})
Correlation ID
mink.CorrelationIDMiddleware(nil) // Auto-generates if not present
Idempotency
mink.IdempotencyMiddleware(mink.IdempotencyConfig{
Store: idempotencyStore,
TTL: 24 * time.Hour,
})
Recommended Order
bus := mink.NewCommandBus(
mink.WithMiddleware(
mink.RecoveryMiddleware(), // 1. Catch panics
mink.LoggingMiddleware(logger), // 2. Log everything
mink.MetricsMiddleware(collector),// 3. Track metrics
mink.TimeoutMiddleware(30*time.Second),
mink.CorrelationIDMiddleware(nil),
mink.ValidationMiddleware(), // 7. Validate
mink.RetryMiddleware(config), // 8. Retry transient failures
mink.IdempotencyMiddleware(config),// 9. Prevent duplicates
),
)
Custom Middleware
Timing
func TimingMiddleware() mink.Middleware {
return func(next mink.MiddlewareFunc) mink.MiddlewareFunc {
return func(ctx context.Context, cmd mink.Command) (mink.CommandResult, error) {
start := time.Now()
result, err := next(ctx, cmd)
log.Printf("%s took %v", cmd.CommandType(), time.Since(start))
return result, err
}
}
}
Authentication
func AuthMiddleware(authService AuthService) mink.Middleware {
return func(next mink.MiddlewareFunc) mink.MiddlewareFunc {
return func(ctx context.Context, cmd mink.Command) (mink.CommandResult, error) {
userID := UserIDFromContext(ctx)
if !authService.CanExecute(userID, cmd.CommandType()) {
return mink.NewErrorResult(ErrForbidden), ErrForbidden
}
return next(ctx, cmd)
}
}
}
Observability Middleware (v0.4.0)
Prometheus Metrics
import "github.com/AshkanYarmoradi/go-mink/middleware/metrics"
// Create metrics
m := metrics.New(
metrics.WithNamespace("myapp"),
metrics.WithMetricsServiceName("order-service"),
)
// Register with Prometheus
m.MustRegister()
// Add to command bus
bus.Use(m.CommandMiddleware())
// Wrap event store
metricsStore := m.WrapEventStore(adapter)
// Wrap projections
metricsProjection := m.WrapProjection(projection)
Collected Metrics:
mink_commands_total- Command count by type/statusmink_command_duration_seconds- Command execution histogrammink_commands_in_flight- Currently executing commandsmink_eventstore_operations_total- Event store operationsmink_projections_processed_total- Projection processingmink_projection_lag_events- Projection lag
OpenTelemetry Tracing
import "github.com/AshkanYarmoradi/go-mink/middleware/tracing"
// Create tracer
tracer := tracing.NewTracer(
tracing.WithServiceName("order-service"),
tracing.WithTracerProvider(provider), // Optional custom provider
)
// Add to command bus
bus.Use(tracer.CommandMiddleware())
// Wrap event store
tracedStore := tracing.NewEventStoreMiddleware(adapter, tracer)
// Add events to current span
tracing.AddEvent(ctx, "Processing order", map[string]string{"order_id": "123"})
Key Takeaways
- Middleware is an onion: Commands flow in, results flow out
- Order matters: Place broad concerns outside, specific inside
- Recovery first: Catch panics before anything else
- Idempotency last: Closest to handler for caching
- Custom middleware is simple: Just a function wrapping a function