Part 7: Projections and Read Models
Building optimized query views from your event stream.
Table of contents
- The Read Side Problem
- Projection Types
- Inline Projections
- Async Projections
- Live Projections
- The Projection Engine
- Best Practices
- Key Takeaways
This is Part 7 of an 8-part series on Event Sourcing and CQRS with Go.
The Read Side Problem
Event sourcing stores events, not state. But users need fast queries. We build read models—denormalized views optimized for specific queries.
Events in Event Store
│
▼
┌─────────────┐
│ Projection │ ──────► Read Model (SQL, Redis, Elastic)
└─────────────┘
Projection Types
| Type | Consistency | Performance | Use Case |
|---|---|---|---|
| Inline | Strong | Lower throughput | Counters, caches |
| Async | Eventual | High throughput | Read databases |
| Live | Real-time | Non-blocking | Notifications |
Inline Projections
Run synchronously during event append:
type OrderCountProjection struct {
mink.ProjectionBase
count int64
}
func (p *OrderCountProjection) Apply(ctx context.Context, event mink.StoredEvent) error {
if event.Type == "OrderCreated" {
p.count++
}
return nil
}
engine.RegisterInline(NewOrderCountProjection())
Async Projections
Run in background workers with batching:
type OrderListProjection struct {
mink.ProjectionBase
db *sql.DB
}
func (p *OrderListProjection) Apply(ctx context.Context, event mink.StoredEvent) error {
switch event.Type {
case "OrderCreated":
var data OrderCreated
json.Unmarshal(event.Data, &data)
p.db.Exec(`INSERT INTO order_list ...`, data.OrderID, data.CustomerID)
}
return nil
}
engine.RegisterAsync(projection, mink.AsyncOptions{
BatchSize: 100,
PollInterval: 100 * time.Millisecond,
})
Live Projections
Real-time for transient use cases:
type OrderNotificationProjection struct {
mink.ProjectionBase
broadcast chan<- OrderUpdate
}
func (p *OrderNotificationProjection) OnEvent(ctx context.Context, event mink.StoredEvent) {
p.broadcast <- OrderUpdate{OrderID: extractID(event.StreamID)}
}
func (p *OrderNotificationProjection) IsTransient() bool {
return true
}
The Projection Engine
engine := mink.NewProjectionEngine(store,
mink.WithCheckpointStore(checkpointStore),
)
engine.RegisterInline(counterProjection)
engine.RegisterAsync(readModelProjection, asyncOptions)
engine.RegisterLive(notificationProjection)
engine.Start(ctx)
defer engine.Stop(ctx)
Best Practices
1. Idempotent Projections
// Use UPSERT instead of INSERT
p.db.Exec(`INSERT INTO order_list (order_id, status)
VALUES ($1, $2)
ON CONFLICT (order_id) DO UPDATE SET status = $2`,
orderID, status)
2. Handle Unknown Events
switch event.Type {
case "OrderCreated":
// Handle
default:
return nil // Ignore, don't fail
}
3. Monitor Lag
status := engine.GetStatus("OrderList")
if status.Lag > 10000 {
alerting.Warn("Projection lag: %d", status.Lag)
}
Key Takeaways
- Projections build read models: Transform events into queryable data
- Three types for different needs: Inline, Async, Live
- Checkpoints enable restart: Know where you left off
- Idempotency is crucial: Events may be replayed
- Multiple read models are normal: Different queries, different models