ADR-005: Three Projection Types
| Status | Date | Deciders |
|---|---|---|
| Accepted | 2024-03-01 | Core Team |
Context
Projections transform event streams into read models. Different use cases have different consistency and performance requirements:
- Financial Systems: Need strong consistency (balance must be accurate)
- Reports/Analytics: Can tolerate delay (eventual consistency is fine)
- Dashboards: Need real-time updates (live notifications)
A one-size-fits-all projection approach forces unnecessary trade-offs.
Decision
We will implement three distinct projection types, each optimized for different consistency requirements:
1. Inline Projections (Synchronous)
Updated in the same transaction as the event append.
type InlineProjection interface {
Projection
// Apply is called within the event store transaction
ApplyInline(ctx context.Context, tx Transaction, event StoredEvent) error
}
Characteristics:
- Strong consistency
- Blocks event append until projection updated
- Lower write throughput
- Use for critical read models
Use Cases:
- Account balances
- Inventory counts
- Unique constraint checks
engine.RegisterInline(&AccountBalanceProjection{})
2. Async Projections (Background)
Updated by background workers after events are committed.
type AsyncProjection interface {
Projection
// Apply is called by background worker
Apply(ctx context.Context, event StoredEvent) error
}
type AsyncOptions struct {
BatchSize int // Events per batch
Interval time.Duration // Polling interval
Workers int // Parallel workers
RetryPolicy RetryPolicy // Retry on failure
}
Characteristics:
- Eventual consistency
- High write throughput
- Parallel processing
- Automatic retry on failure
Use Cases:
- Search indexes
- Reports
- Analytics
- Notifications
engine.RegisterAsync(&SearchIndexProjection{}, AsyncOptions{
BatchSize: 100,
Interval: time.Second,
Workers: 4,
RetryPolicy: ExponentialBackoff(100*time.Millisecond, 5*time.Second, 3),
})
3. Live Projections (Real-time)
Receive events in real-time for transient use cases.
type LiveProjection interface {
Projection
// OnEvent is called immediately when event is committed
OnEvent(ctx context.Context, event StoredEvent)
// IsTransient indicates if state should survive restarts
IsTransient() bool
}
Characteristics:
- Real-time delivery
- No persistence (transient)
- WebSocket/SSE integration
- Memory-only state
Use Cases:
- Live dashboards
- Real-time notifications
- WebSocket updates
- Monitoring
engine.RegisterLive(&DashboardProjection{})
// Access live updates
dashboard.Updates() // Returns channel of updates
Comparison
| Aspect | Inline | Async | Live |
|---|---|---|---|
| Consistency | Strong | Eventual | Real-time |
| Persistence | Yes | Yes | No (transient) |
| Write Impact | High | None | None |
| Recovery | Automatic | Checkpoint-based | Rebuild |
| Use Case | Critical data | Reports | Dashboards |
Consequences
Positive
- Right Tool for Job: Choose consistency level per use case
- Performance Optimization: Async projections don’t slow writes
- Real-time Capability: Live projections enable dashboards
- Clear Semantics: Explicit consistency expectations
- Scalability: Async projections can scale independently
Negative
- Complexity: Three types to understand and manage
- Consistency Confusion: Developers must choose correctly
- Testing Variety: Need to test each projection type
Neutral
- Migration: Can change projection types if needs change
- Monitoring: Each type needs different monitoring
Implementation Details
Projection Engine
type ProjectionEngine struct {
eventStore *EventStore
checkpointStore CheckpointStore
inlineProjections []InlineProjection
asyncProjections map[string]*asyncRunner
liveProjections []LiveProjection
subscriptions []Subscription
}
func (e *ProjectionEngine) Start(ctx context.Context) error {
// Start async workers
for name, runner := range e.asyncProjections {
go runner.Run(ctx)
}
// Start live subscriptions
for _, proj := range e.liveProjections {
sub, _ := e.eventStore.SubscribeAll(ctx, 0)
go e.runLiveProjection(ctx, proj, sub)
}
return nil
}
// Called by EventStore during Append
func (e *ProjectionEngine) ProcessInline(ctx context.Context, tx Transaction, events []StoredEvent) error {
for _, proj := range e.inlineProjections {
for _, event := range events {
if proj.HandlesEvent(event.Type) {
if err := proj.ApplyInline(ctx, tx, event); err != nil {
return err
}
}
}
}
return nil
}
Checkpoint Management
Async projections track their position:
type Checkpoint struct {
ProjectionName string
Position uint64
UpdatedAt time.Time
}
type CheckpointStore interface {
Get(ctx context.Context, name string) (Checkpoint, error)
Save(ctx context.Context, checkpoint Checkpoint) error
}
Rebuild Support
All persistent projections support rebuilding:
rebuilder := NewProjectionRebuilder(store, checkpointStore)
// Rebuild from scratch
rebuilder.Rebuild(ctx, projection, RebuildOptions{
BatchSize: 1000,
Parallelism: 4,
FromPosition: 0,
})
Alternatives Considered
Alternative 1: Single Projection Type
Description: One projection interface for all use cases.
Rejected because:
- Forces compromise between consistency and performance
- No real-time capability
- Overly complex configuration
Alternative 2: Configurable Consistency per Event
Description: Configure consistency at event level.
Rejected because:
- Too granular
- Hard to reason about
- Complicates projection logic
Alternative 3: External Projection Service
Description: Run projections in separate service.
Rejected because:
- Adds deployment complexity
- Network latency for inline projections
- Can be added later if needed