Skip to main content

ADR-005: Three Projection Types

StatusDateDeciders
Accepted2024-03-01Core Team

Context

Projections transform event streams into read models. Different use cases have different consistency and performance requirements:

  1. Financial Systems: Need strong consistency (balance must be accurate)
  2. Reports/Analytics: Can tolerate delay (eventual consistency is fine)
  3. Dashboards: Need real-time updates (live notifications)

A one-size-fits-all projection approach forces unnecessary trade-offs.

Decision

We will implement three distinct projection types, each optimized for different consistency requirements:

1. Inline Projections (Synchronous)

Updated in the same transaction as the event append.

type InlineProjection interface {
Projection
// Apply is called within the event store transaction
ApplyInline(ctx context.Context, tx Transaction, event StoredEvent) error
}

Characteristics:

  • Strong consistency
  • Blocks event append until projection updated
  • Lower write throughput
  • Use for critical read models

Use Cases:

  • Account balances
  • Inventory counts
  • Unique constraint checks
engine.RegisterInline(&AccountBalanceProjection{})

2. Async Projections (Background)

Updated by background workers after events are committed.

type AsyncProjection interface {
Projection
// Apply is called by background worker
Apply(ctx context.Context, event StoredEvent) error
}

type AsyncOptions struct {
BatchSize int // Events per batch
Interval time.Duration // Polling interval
Workers int // Parallel workers
RetryPolicy RetryPolicy // Retry on failure
}

Characteristics:

  • Eventual consistency
  • High write throughput
  • Parallel processing
  • Automatic retry on failure

Use Cases:

  • Search indexes
  • Reports
  • Analytics
  • Notifications
engine.RegisterAsync(&SearchIndexProjection{}, AsyncOptions{
BatchSize: 100,
Interval: time.Second,
Workers: 4,
RetryPolicy: ExponentialBackoff(100*time.Millisecond, 5*time.Second, 3),
})

3. Live Projections (Real-time)

Receive events in real-time for transient use cases.

type LiveProjection interface {
Projection
// OnEvent is called immediately when event is committed
OnEvent(ctx context.Context, event StoredEvent)
// IsTransient indicates if state should survive restarts
IsTransient() bool
}

Characteristics:

  • Real-time delivery
  • No persistence (transient)
  • WebSocket/SSE integration
  • Memory-only state

Use Cases:

  • Live dashboards
  • Real-time notifications
  • WebSocket updates
  • Monitoring
engine.RegisterLive(&DashboardProjection{})

// Access live updates
dashboard.Updates() // Returns channel of updates

Comparison

AspectInlineAsyncLive
ConsistencyStrongEventualReal-time
PersistenceYesYesNo (transient)
Write ImpactHighNoneNone
RecoveryAutomaticCheckpoint-basedRebuild
Use CaseCritical dataReportsDashboards

Consequences

Positive

  1. Right Tool for Job: Choose consistency level per use case
  2. Performance Optimization: Async projections don't slow writes
  3. Real-time Capability: Live projections enable dashboards
  4. Clear Semantics: Explicit consistency expectations
  5. Scalability: Async projections can scale independently

Negative

  1. Complexity: Three types to understand and manage
  2. Consistency Confusion: Developers must choose correctly
  3. Testing Variety: Need to test each projection type

Neutral

  1. Migration: Can change projection types if needs change
  2. Monitoring: Each type needs different monitoring

Implementation Details

Projection Engine

type ProjectionEngine struct {
eventStore *EventStore
checkpointStore CheckpointStore

inlineProjections []InlineProjection
asyncProjections map[string]*asyncRunner
liveProjections []LiveProjection

subscriptions []Subscription
}

func (e *ProjectionEngine) Start(ctx context.Context) error {
// Start async workers
for name, runner := range e.asyncProjections {
go runner.Run(ctx)
}

// Start live subscriptions
for _, proj := range e.liveProjections {
sub, _ := e.eventStore.SubscribeAll(ctx, 0)
go e.runLiveProjection(ctx, proj, sub)
}

return nil
}

// Called by EventStore during Append
func (e *ProjectionEngine) ProcessInline(ctx context.Context, tx Transaction, events []StoredEvent) error {
for _, proj := range e.inlineProjections {
for _, event := range events {
if proj.HandlesEvent(event.Type) {
if err := proj.ApplyInline(ctx, tx, event); err != nil {
return err
}
}
}
}
return nil
}

Checkpoint Management

Async projections track their position:

type Checkpoint struct {
ProjectionName string
Position uint64
UpdatedAt time.Time
}

type CheckpointStore interface {
Get(ctx context.Context, name string) (Checkpoint, error)
Save(ctx context.Context, checkpoint Checkpoint) error
}

Rebuild Support

All persistent projections support rebuilding:

rebuilder := NewProjectionRebuilder(store, checkpointStore)

// Rebuild from scratch
rebuilder.Rebuild(ctx, projection, RebuildOptions{
BatchSize: 1000,
Parallelism: 4,
FromPosition: 0,
})

Alternatives Considered

Alternative 1: Single Projection Type

Description: One projection interface for all use cases.

Rejected because:

  • Forces compromise between consistency and performance
  • No real-time capability
  • Overly complex configuration

Alternative 2: Configurable Consistency per Event

Description: Configure consistency at event level.

Rejected because:

  • Too granular
  • Hard to reason about
  • Complicates projection logic

Alternative 3: External Projection Service

Description: Run projections in separate service.

Rejected because:

  • Adds deployment complexity
  • Network latency for inline projections
  • Can be added later if needed

References