Read Models & Projections
Table of contents
- Overview
- Projection Interfaces
- Projection Engine
- Read Model Repository
- Subscription System
- Projection Rebuilding
- Retry Policy
- Checkpoint Storage
- Complete Example
Overview
Projections transform events into optimized read models. go-mink supports three projection strategies:
Events Stream Read Models
┌─────────────┐
│ OrderCreated│──┐ ┌─────────────────┐
├─────────────┤ │ Inline │ OrderSummary │
│ ItemAdded │──┼─────────────►│ (same tx) │
├─────────────┤ │ └─────────────────┘
│ ItemAdded │──┤
├─────────────┤ │ ┌─────────────────┐
│ OrderShipped│──┼──────────────│ ShippingReport │
└─────────────┘ │ Async │ (background) │
│ └─────────────────┘
│
│ ┌─────────────────┐
└──────────────│ LiveDashboard │
Live │ (real-time) │
└─────────────────┘
Projection Interfaces
Base Projection Interface
All projections implement the base Projection interface:
type Projection interface {
// Name returns a unique identifier for this projection
Name() string
// HandledEvents returns the list of event types this projection handles
HandledEvents() []string
// Apply processes a single event
Apply(ctx context.Context, event StoredEvent) error
}
1. Inline Projections
Updated synchronously when events are appended - strongly consistent.
type InlineProjection interface {
Projection
// Inline projections are processed in the same execution context
}
// Example: Order Summary projection using ProjectionBase
type OrderSummaryProjection struct {
mink.ProjectionBase // Embeds name and handled events
repo *mink.InMemoryRepository[OrderSummary]
}
func NewOrderSummaryProjection(repo *mink.InMemoryRepository[OrderSummary]) *OrderSummaryProjection {
return &OrderSummaryProjection{
ProjectionBase: mink.NewProjectionBase("OrderSummary",
"OrderCreated", "ItemAdded", "OrderShipped"),
repo: repo,
}
}
func (p *OrderSummaryProjection) Apply(ctx context.Context, event mink.StoredEvent) error {
switch event.Type {
case "OrderCreated":
var e OrderCreated
if err := json.Unmarshal(event.Data, &e); err != nil {
return err
}
return p.repo.Insert(ctx, &OrderSummary{
OrderID: e.OrderID,
CustomerID: e.CustomerID,
Status: "Created",
CreatedAt: e.CreatedAt,
})
case "ItemAdded":
var e ItemAdded
if err := json.Unmarshal(event.Data, &e); err != nil {
return err
}
return p.repo.Update(ctx, e.OrderID, func(s *OrderSummary) {
s.ItemCount += e.Quantity
s.TotalAmount += e.Price * float64(e.Quantity)
})
case "OrderShipped":
var e OrderShipped
if err := json.Unmarshal(event.Data, &e); err != nil {
return err
}
return p.repo.Update(ctx, e.OrderID, func(s *OrderSummary) {
s.Status = "Shipped"
s.ShippedAt = &e.ShippedAt
})
}
return nil
}
2. Async Projections
Processed in the background - eventually consistent but more scalable.
type AsyncProjection interface {
Projection
// Batch processing for efficiency
ApplyBatch(ctx context.Context, events []StoredEvent) error
// Batch configuration
BatchSize() int
}
// Example using AsyncProjectionBase
type AnalyticsProjection struct {
mink.AsyncProjectionBase
db *sql.DB
}
func NewAnalyticsProjection(db *sql.DB) *AnalyticsProjection {
return &AnalyticsProjection{
AsyncProjectionBase: mink.NewAsyncProjectionBase(
"Analytics",
100, // batch size
"OrderCreated", "OrderCompleted",
),
db: db,
}
}
func (p *AnalyticsProjection) ApplyBatch(ctx context.Context, events []mink.StoredEvent) error {
tx, _ := p.db.BeginTx(ctx, nil)
defer tx.Rollback()
for _, event := range events {
switch event.Type {
case "OrderCreated":
tx.Exec(`
INSERT INTO daily_stats (date, order_count)
VALUES ($1, 1)
ON CONFLICT (date) DO UPDATE
SET order_count = daily_stats.order_count + 1
`, event.Timestamp.Truncate(24*time.Hour))
}
}
return tx.Commit()
}
3. Live Projections
Real-time subscriptions for dashboards and notifications.
type LiveProjection interface {
Projection
// Called for each event in real-time
OnEvent(ctx context.Context, event StoredEvent)
}
// Example using LiveProjectionBase
type DashboardProjection struct {
mink.LiveProjectionBase
}
func NewDashboardProjection() *DashboardProjection {
return &DashboardProjection{
LiveProjectionBase: mink.NewLiveProjectionBase(
"Dashboard",
"OrderCreated", "OrderShipped",
),
}
}
func (p *DashboardProjection) OnEvent(ctx context.Context, event mink.StoredEvent) {
p.Send(fmt.Sprintf("Event %s on stream %s", event.Type, event.StreamID))
}
// Consume updates
func (p *DashboardProjection) Updates() <-chan string {
return p.LiveProjectionBase.Updates()
}
Projection Engine
The ProjectionEngine orchestrates all projection types:
// Create checkpoint store for async projections
checkpointStore := memory.NewCheckpointStore()
// Create projection engine
engine := mink.NewProjectionEngine(store,
mink.WithCheckpointStore(checkpointStore),
)
// Register inline projection (synchronous)
summaryProjection := NewOrderSummaryProjection(repo)
if err := engine.RegisterInline(summaryProjection); err != nil {
log.Fatal(err)
}
// Register async projection (background)
analyticsProjection := NewAnalyticsProjection(db)
if err := engine.RegisterAsync(analyticsProjection, mink.AsyncOptions{
BatchSize: 100,
Interval: time.Second,
Workers: 4,
RetryPolicy: mink.NewExponentialBackoffRetry(100*time.Millisecond, 5*time.Second, 3),
}); err != nil {
log.Fatal(err)
}
// Register live projection (real-time)
dashboardProjection := NewDashboardProjection()
if err := engine.RegisterLive(dashboardProjection); err != nil {
log.Fatal(err)
}
// Start the engine
if err := engine.Start(ctx); err != nil {
log.Fatal(err)
}
defer engine.Stop(ctx)
// Process events through projections
events, _ := store.LoadRaw(ctx, streamID, 0)
engine.ProcessInlineProjections(ctx, events)
engine.NotifyLiveProjections(ctx, events)
Projection Status
Monitor projection health:
// Get single projection status
status, err := engine.GetStatus("OrderSummary")
fmt.Printf("State: %s, Position: %d, Lag: %d\n",
status.State, status.Position, status.Lag)
// Get all projection statuses
statuses := engine.GetAllStatuses()
for name, status := range statuses {
fmt.Printf("%s: %s (error: %v)\n", name, status.State, status.LastError)
}
Read Model Repository
Generic repository for read model storage:
// Interface definition
type ReadModelRepository[T any] interface {
Insert(ctx context.Context, model *T) error
Get(ctx context.Context, id string) (*T, error)
Update(ctx context.Context, id string, fn func(*T)) error
Delete(ctx context.Context, id string) error
Query(ctx context.Context, query Query) ([]*T, error)
FindOne(ctx context.Context, query Query) (*T, error)
Count(ctx context.Context, query Query) (int, error)
Exists(ctx context.Context, id string) (bool, error)
GetAll(ctx context.Context) ([]*T, error)
Clear(ctx context.Context) error
}
// In-memory implementation (great for testing)
repo := mink.NewInMemoryRepository[OrderSummary](func(o *OrderSummary) string {
return o.OrderID // ID extractor function
})
// CRUD operations
repo.Insert(ctx, &OrderSummary{OrderID: "order-1", Status: "Created"})
summary, err := repo.Get(ctx, "order-1")
repo.Update(ctx, "order-1", func(s *OrderSummary) {
s.Status = "Shipped"
})
repo.Delete(ctx, "order-1")
PostgreSQL Repository
For production use, go-mink provides a PostgreSQL-backed repository with automatic schema migration:
import "github.com/AshkanYarmoradi/go-mink/adapters/postgres"
// Define your read model with mink struct tags
type OrderSummary struct {
OrderID string `mink:"order_id,pk"` // Primary key
CustomerID string `mink:"customer_id,index"` // Creates an index
Status string `mink:"status"`
ItemCount int `mink:"item_count"`
TotalAmount float64 `mink:"total_amount"`
CreatedAt time.Time `mink:"created_at"`
UpdatedAt time.Time `mink:"updated_at"`
}
// Create repository with auto-migration
repo, err := postgres.NewPostgresRepository[OrderSummary](db,
postgres.WithReadModelSchema("projections"),
postgres.WithTableName("order_summaries"),
)
if err != nil {
log.Fatal(err)
}
// Use exactly like in-memory repository
repo.Insert(ctx, &OrderSummary{
OrderID: "order-1",
CustomerID: "cust-123",
Status: "pending",
})
// Queries work with full SQL support
query := mink.NewQuery().
Where("status", mink.FilterOpEq, "pending").
And("total_amount", mink.FilterOpGt, 100.0).
OrderByDesc("created_at").
WithLimit(10)
orders, err := repo.Find(ctx, query.Build())
Supported Struct Tags
| Tag | Description |
|---|---|
mink:"column_name" |
Sets the column name (default: snake_case of field) |
mink:"-" |
Skip this field |
mink:"col,pk" |
Primary key |
mink:"col,index" |
Create index on column |
mink:"col,unique" |
Unique constraint (also creates index) |
mink:"col,nullable" |
Allow NULL values |
mink:"col,default=value" |
Default value (see security note below) |
mink:"col,type=VARCHAR(100)" |
Override SQL type (see security note below) |
Security Note: The
default=andtype=values are validated against common SQL injection patterns but are interpolated into DDL statements. Only use static, hardcoded values in your source code. Never construct these tag values from user input or external sources.
Go Type to SQL Mapping
| Go Type | PostgreSQL Type |
|---|---|
string |
TEXT |
int, int32 |
INTEGER |
int64 |
BIGINT |
float32 |
REAL |
float64 |
DOUBLE PRECISION |
bool |
BOOLEAN |
time.Time |
TIMESTAMPTZ |
[]byte |
BYTEA |
[]T (slices) |
JSONB |
map, struct |
JSONB |
Note on JSONB types: While Go slices (other than
[]byte), maps, and structs are mapped toJSONB, the current implementation stores them using Go’s native database/sql handling. For complex JSONB data, use[]bytewith manual JSON marshaling/unmarshaling, or implement customsql.Scanneranddriver.Valuerinterfaces on your types.
Note on unsigned integers: Go’s unsigned integer types are mapped to PostgreSQL’s signed integer types:
uintanduint32are stored asINTEGER(max2,147,483,647), anduint64is stored asBIGINT(max9,223,372,036,854,775,807). Values greater than these limits will overflow or be rejected by PostgreSQL. If you need to store larger unsigned values, use an explicit tag such asmink:"type=NUMERIC"(or another appropriate type).
Transaction Support
Use transactions for consistent updates across multiple read models:
tx, err := db.BeginTx(ctx, nil)
if err != nil {
return err
}
defer tx.Rollback()
txRepo := repo.WithTx(tx)
// All operations in same transaction
txRepo.Insert(ctx, &OrderSummary{...})
txRepo.Update(ctx, "order-2", func(o *OrderSummary) {
o.ItemCount++
})
return tx.Commit()
Schema Migration
The repository automatically:
- Creates the schema if it doesn’t exist
- Creates the table with proper column types
- Adds indexes for
indexanduniquetagged columns - Adds missing columns when your struct evolves (non-breaking schema changes)
// Disable auto-migration if you manage schema externally
repo, err := postgres.NewPostgresRepository[OrderSummary](db,
postgres.WithReadModelSchema("projections"),
postgres.WithAutoMigrate(false),
)
// Or run migration manually
err = repo.Migrate(ctx)
Query Builder
Fluent query construction:
// Build a query
query := mink.NewQuery().
Where("Status", mink.Eq, "Pending").
And("TotalAmount", mink.Gt, 100.0).
OrderByDesc("CreatedAt").
WithPagination(10, 0) // limit 10, offset 0
// Execute query
orders, err := repo.Query(ctx, query)
// Find single result
order, err := repo.FindOne(ctx, query)
// Count matching records
count, err := repo.Count(ctx, query)
Filter Operators
// Available filter operators
mink.Eq // Equal
mink.NotEq // Not equal
mink.Gt // Greater than
mink.Gte // Greater than or equal
mink.Lt // Less than
mink.Lte // Less than or equal
mink.In // In list
mink.Contains // Contains substring
Subscription System
Subscribe to events for projections:
// Event filters
typeFilter := mink.NewEventTypeFilter("OrderCreated", "OrderShipped")
categoryFilter := mink.NewCategoryFilter("Order")
compositeFilter := mink.NewCompositeFilter(typeFilter, categoryFilter)
// Subscription options
opts := mink.SubscriptionOptions{
FromPosition: 0, // Start position
Filter: compositeFilter, // Event filter
BufferSize: 100, // Channel buffer
}
// Create subscription (requires SubscriptionAdapter)
sub, err := mink.NewCatchupSubscription(adapter, opts)
if err != nil {
log.Fatal(err)
}
// Start receiving events
eventCh, err := sub.Subscribe(ctx)
if err != nil {
log.Fatal(err)
}
for event := range eventCh {
fmt.Printf("Received: %s at position %d\n", event.Type, event.GlobalPosition)
}
Projection Rebuilding
Rebuild projections from the event log:
// Create rebuilder
rebuilder := mink.NewProjectionRebuilder(store, checkpointStore)
// Create progress callback
progress := &mink.RebuildProgress{
OnProgress: func(processed, total uint64) {
pct := float64(processed) / float64(total) * 100
fmt.Printf("Progress: %.1f%% (%d/%d)\n", pct, processed, total)
},
OnComplete: func() {
fmt.Println("Rebuild complete!")
},
OnError: func(err error) {
fmt.Printf("Error: %v\n", err)
},
}
// Rebuild single projection
err := rebuilder.Rebuild(ctx, summaryProjection, mink.RebuildOptions{
BatchSize: 1000,
Progress: progress,
})
// Rebuild all projections
err := rebuilder.RebuildAll(ctx, []mink.Projection{
summaryProjection,
analyticsProjection,
}, mink.RebuildOptions{BatchSize: 1000})
Parallel Rebuilding
Rebuild multiple projections concurrently:
parallelRebuilder := mink.NewParallelRebuilder(store, checkpointStore, 4) // 4 workers
err := parallelRebuilder.RebuildAll(ctx, []mink.Projection{
summaryProjection,
analyticsProjection,
reportProjection,
}, mink.RebuildOptions{
BatchSize: 1000,
})
Clearable Projections
Projections that can be cleared before rebuild:
type Clearable interface {
Clear(ctx context.Context) error
}
// Implement on your projection
func (p *OrderSummaryProjection) Clear(ctx context.Context) error {
return p.repo.Clear(ctx)
}
// Rebuilder automatically clears if projection implements Clearable
Retry Policy
Configure retry behavior for async projections:
// Exponential backoff with jitter
retryPolicy := mink.NewExponentialBackoffRetry(
100*time.Millisecond, // Initial delay
5*time.Second, // Max delay
3, // Max attempts
)
engine.RegisterAsync(projection, mink.AsyncOptions{
RetryPolicy: retryPolicy,
})
Checkpoint Storage
Checkpoints track projection progress:
// In-memory checkpoint store (for testing)
checkpointStore := memory.NewCheckpointStore()
// Get/Set checkpoints
pos, err := checkpointStore.GetCheckpoint(ctx, "OrderSummary")
err = checkpointStore.SetCheckpoint(ctx, "OrderSummary", 100)
// Get checkpoint with timestamp
pos, timestamp, err := checkpointStore.GetCheckpointWithTimestamp(ctx, "OrderSummary")
// List all checkpoints
checkpoints, err := checkpointStore.GetAllCheckpoints(ctx)
Complete Example
See the projections example for a complete working demonstration.
Next: Adapters →