Part 5: CQRS and the Command Bus
Separating reads from writes with Command Query Responsibility Segregation.
Table of contents
- What is CQRS?
- Commands vs Queries
- The Command Interface
- Command Handlers
- The Command Bus
- Key Takeaways
This is Part 5 of an 8-part series on Event Sourcing and CQRS with Go.
What is CQRS?
CQRS (Command Query Responsibility Segregation) separates read and write operations into different models:
Commands (Write) Queries (Read)
│ │
▼ ▼
┌───────────┐ ┌─────────────┐
│ Command │ │ Query │
│ Handlers │ │ Handlers │
└─────┬─────┘ └──────┬──────┘
│ │
▼ ▼
┌───────────┐ ┌─────────────┐
│ Event │ ───────► │ Read │
│ Store │ (Projections)│ Model │
└───────────┘ └─────────────┘
Why Separate?
- Different optimization needs: Writes need consistency; reads need speed
- Different scaling requirements: Reads are often 100x more frequent
- Independent evolution: Change read model without affecting writes
Commands vs Queries
Commands
Commands are intentions to change state:
type PlaceOrderCommand struct {
CustomerID string
Items []OrderItem
}
func (c PlaceOrderCommand) CommandType() string { return "PlaceOrder" }
func (c PlaceOrderCommand) Validate() error {
if c.CustomerID == "" {
return errors.New("customer ID required")
}
return nil
}
The Golden Rule
Commands: “Do this” → Returns success/failure Queries: “Give me this” → Returns data
The Command Interface
type Command interface {
CommandType() string // Unique type identifier
Validate() error // Self-validation
}
Aggregate Command
type AggregateCommand interface {
Command
AggregateID() string // Which aggregate to modify
}
type AddItemCommand struct {
OrderID string
SKU string
Quantity int
}
func (c AddItemCommand) CommandType() string { return "AddItem" }
func (c AddItemCommand) AggregateID() string { return c.OrderID }
Command Handlers
Generic Handler
handler := mink.NewGenericHandler(
func(ctx context.Context, cmd CreateOrderCommand) (mink.CommandResult, error) {
order := NewOrder(uuid.New().String())
if err := order.Create(cmd.CustomerID); err != nil {
return mink.NewErrorResult(err), err
}
if err := store.SaveAggregate(ctx, order); err != nil {
return mink.NewErrorResult(err), err
}
return mink.NewSuccessResult(order.AggregateID(), order.Version()), nil
})
Aggregate Handler
Handles the full aggregate lifecycle automatically:
handler := mink.NewAggregateHandler(mink.AggregateHandlerConfig[AddItemCommand, *Order]{
Store: store,
Factory: NewOrder,
Executor: func(ctx context.Context, order *Order, cmd AddItemCommand) error {
return order.AddItem(cmd.SKU, cmd.Quantity, cmd.Price)
},
})
The Command Bus
registry := mink.NewHandlerRegistry()
registry.Register(createOrderHandler)
registry.Register(addItemHandler)
bus := mink.NewCommandBus(
mink.WithHandlerRegistry(registry),
mink.WithMiddleware(
mink.ValidationMiddleware(),
mink.RecoveryMiddleware(),
),
)
result, err := bus.Dispatch(ctx, CreateOrderCommand{CustomerID: "cust-123"})
Key Takeaways
- Commands are intentions: They express what you want to happen
- Queries are questions: They don’t change state
- Handlers execute commands: Keep them focused and simple
- Aggregate handlers reduce boilerplate: Automatic load/save lifecycle
- Validation belongs in commands: Self-validating commands are cleaner