Part 5: CQRS and the Command Bus

Separating reads from writes with Command Query Responsibility Segregation.

Table of contents

  1. What is CQRS?
    1. Why Separate?
  2. Commands vs Queries
    1. Commands
    2. The Golden Rule
  3. The Command Interface
    1. Aggregate Command
  4. Command Handlers
    1. Generic Handler
    2. Aggregate Handler
  5. The Command Bus
  6. Key Takeaways

This is Part 5 of an 8-part series on Event Sourcing and CQRS with Go.


What is CQRS?

CQRS (Command Query Responsibility Segregation) separates read and write operations into different models:

   Commands (Write)              Queries (Read)
         │                             │
         ▼                             ▼
   ┌───────────┐               ┌─────────────┐
   │ Command   │               │    Query    │
   │ Handlers  │               │  Handlers   │
   └─────┬─────┘               └──────┬──────┘
         │                            │
         ▼                            ▼
   ┌───────────┐               ┌─────────────┐
   │  Event    │   ───────►    │    Read     │
   │  Store    │  (Projections)│   Model     │
   └───────────┘               └─────────────┘

Why Separate?

  1. Different optimization needs: Writes need consistency; reads need speed
  2. Different scaling requirements: Reads are often 100x more frequent
  3. Independent evolution: Change read model without affecting writes

Commands vs Queries

Commands

Commands are intentions to change state:

type PlaceOrderCommand struct {
    CustomerID string
    Items      []OrderItem
}

func (c PlaceOrderCommand) CommandType() string { return "PlaceOrder" }
func (c PlaceOrderCommand) Validate() error {
    if c.CustomerID == "" {
        return errors.New("customer ID required")
    }
    return nil
}

The Golden Rule

Commands: “Do this” → Returns success/failure Queries: “Give me this” → Returns data


The Command Interface

type Command interface {
    CommandType() string  // Unique type identifier
    Validate() error      // Self-validation
}

Aggregate Command

type AggregateCommand interface {
    Command
    AggregateID() string  // Which aggregate to modify
}

type AddItemCommand struct {
    OrderID  string
    SKU      string
    Quantity int
}

func (c AddItemCommand) CommandType() string { return "AddItem" }
func (c AddItemCommand) AggregateID() string { return c.OrderID }

Command Handlers

Generic Handler

handler := mink.NewGenericHandler(
    func(ctx context.Context, cmd CreateOrderCommand) (mink.CommandResult, error) {
        order := NewOrder(uuid.New().String())
        if err := order.Create(cmd.CustomerID); err != nil {
            return mink.NewErrorResult(err), err
        }
        if err := store.SaveAggregate(ctx, order); err != nil {
            return mink.NewErrorResult(err), err
        }
        return mink.NewSuccessResult(order.AggregateID(), order.Version()), nil
    })

Aggregate Handler

Handles the full aggregate lifecycle automatically:

handler := mink.NewAggregateHandler(mink.AggregateHandlerConfig[AddItemCommand, *Order]{
    Store:   store,
    Factory: NewOrder,
    Executor: func(ctx context.Context, order *Order, cmd AddItemCommand) error {
        return order.AddItem(cmd.SKU, cmd.Quantity, cmd.Price)
    },
})

The Command Bus

registry := mink.NewHandlerRegistry()
registry.Register(createOrderHandler)
registry.Register(addItemHandler)

bus := mink.NewCommandBus(
    mink.WithHandlerRegistry(registry),
    mink.WithMiddleware(
        mink.ValidationMiddleware(),
        mink.RecoveryMiddleware(),
    ),
)

result, err := bus.Dispatch(ctx, CreateOrderCommand{CustomerID: "cust-123"})

Key Takeaways

  1. Commands are intentions: They express what you want to happen
  2. Queries are questions: They don’t change state
  3. Handlers execute commands: Keep them focused and simple
  4. Aggregate handlers reduce boilerplate: Automatic load/save lifecycle
  5. Validation belongs in commands: Self-validating commands are cleaner

← Part 4: Event Store Part 6: Middleware →