Skip to main content

Part 5: CQRS and the Command Bus

Separating reads from writes with Command Query Responsibility Segregation.


This is Part 5 of an 8-part series on Event Sourcing and CQRS with Go.


What is CQRS?

CQRS (Command Query Responsibility Segregation) separates read and write operations into different models:

Commands (Write) Queries (Read)
│ │
▼ ▼
┌───────────┐ ┌─────────────┐
│ Command │ │ Query │
│ Handlers │ │ Handlers │
└─────┬─────┘ └──────┬──────┘
│ │
▼ ▼
┌───────────┐ ┌─────────────┐
│ Event │ ───────► │ Read │
│ Store │ (Projections)│ Model │
└───────────┘ └─────────────┘

Why Separate?

  1. Different optimization needs: Writes need consistency; reads need speed
  2. Different scaling requirements: Reads are often 100x more frequent
  3. Independent evolution: Change read model without affecting writes

Commands vs Queries

Commands

Commands are intentions to change state:

type PlaceOrderCommand struct {
CustomerID string
Items []OrderItem
}

func (c PlaceOrderCommand) CommandType() string { return "PlaceOrder" }
func (c PlaceOrderCommand) Validate() error {
if c.CustomerID == "" {
return errors.New("customer ID required")
}
return nil
}

The Golden Rule

Commands: "Do this" -> Returns success/failure Queries: "Give me this" -> Returns data


The Command Interface

type Command interface {
CommandType() string // Unique type identifier
Validate() error // Self-validation
}

Aggregate Command

type AggregateCommand interface {
Command
AggregateID() string // Which aggregate to modify
}

type AddItemCommand struct {
OrderID string
SKU string
Quantity int
}

func (c AddItemCommand) CommandType() string { return "AddItem" }
func (c AddItemCommand) AggregateID() string { return c.OrderID }

Command Handlers

Generic Handler

handler := mink.NewGenericHandler(
func(ctx context.Context, cmd CreateOrderCommand) (mink.CommandResult, error) {
order := NewOrder(uuid.New().String())
if err := order.Create(cmd.CustomerID); err != nil {
return mink.NewErrorResult(err), err
}
if err := store.SaveAggregate(ctx, order); err != nil {
return mink.NewErrorResult(err), err
}
return mink.NewSuccessResult(order.AggregateID(), order.Version()), nil
})

Aggregate Handler

Handles the full aggregate lifecycle automatically:

handler := mink.NewAggregateHandler(mink.AggregateHandlerConfig[AddItemCommand, *Order]{
Store: store,
Factory: NewOrder,
Executor: func(ctx context.Context, order *Order, cmd AddItemCommand) error {
return order.AddItem(cmd.SKU, cmd.Quantity, cmd.Price)
},
})

The Command Bus

registry := mink.NewHandlerRegistry()
registry.Register(createOrderHandler)
registry.Register(addItemHandler)

bus := mink.NewCommandBus(
mink.WithHandlerRegistry(registry),
mink.WithMiddleware(
mink.ValidationMiddleware(),
mink.RecoveryMiddleware(),
),
)

result, err := bus.Dispatch(ctx, CreateOrderCommand{CustomerID: "cust-123"})

Key Takeaways

tip
  1. Commands are intentions: They express what you want to happen
  2. Queries are questions: They don't change state
  3. Handlers execute commands: Keep them focused and simple
  4. Aggregate handlers reduce boilerplate: Automatic load/save lifecycle
  5. Validation belongs in commands: Self-validating commands are cleaner

<-- Part 4: Event Store | Part 6: Middleware -->