Skip to main content

Part 4: Projections & Queries

Build optimized read models from your event stream.


Overview

In this part, you'll:

  • Understand projections and read models
  • Build inline projections for consistent reads
  • Create async projections for scalability
  • Add live projections for real-time updates
  • Query read models with filters and pagination

Time: ~45 minutes


What are Projections?

Projections transform your event stream into optimized read models:

Events Projections Read Models
─────────────────────────────────────────────────────────────────────────────
OrderPlaced ──────┐
ItemAdded ─────┤ ┌──────────────────┐
OrderShipped ─────┼────────────▶│ Order Summary │────────▶ OrderSummaryView
OrderDelivered ───┘ │ Projection │ (Fast queries)
└──────────────────┘

ProductCreated ───┐ ┌──────────────────┐
StockAdded ────┼────────────▶│ Product Catalog │────────▶ ProductCatalog
PriceChanged ─────┘ │ Projection │ (Search ready)
└──────────────────┘

All Events ───────────────────▶ ┌──────────────────┐
│ Analytics │────────▶ Dashboard
│ Projection │ (Real-time)
└──────────────────┘

Types of Projections

TypeConsistencyPerformanceUse Case
InlineStrongLower write throughputCritical reads (account balance)
AsyncEventualHigh throughputReports, search, analytics
LiveReal-timeTransientDashboards, notifications

Part 1: Define Read Models

Let's create read models for our e-commerce system.

Product Catalog Read Model

Create internal/projections/product_catalog.go:

package projections

import (
"context"
"encoding/json"
"time"

"go-mink.dev"
"minkshop/internal/domain/product"
)

// ProductView represents a product in the catalog.
type ProductView struct {
ProductID string `json:"productId"`
SKU string `json:"sku"`
Name string `json:"name"`
Description string `json:"description"`
Price float64 `json:"price"`
AvailableStock int `json:"availableStock"`
ReservedStock int `json:"reservedStock"`
IsAvailable bool `json:"isAvailable"`
IsDiscontinued bool `json:"isDiscontinued"`
CreatedAt time.Time `json:"createdAt"`
UpdatedAt time.Time `json:"updatedAt"`
}

// ProductCatalogProjection builds the product catalog read model.
type ProductCatalogProjection struct {
mink.ProjectionBase
repo *mink.InMemoryRepository[ProductView]
}

// NewProductCatalogProjection creates a new projection.
func NewProductCatalogProjection(repo *mink.InMemoryRepository[ProductView]) *ProductCatalogProjection {
return &ProductCatalogProjection{
ProjectionBase: mink.NewProjectionBase("ProductCatalog",
"ProductCreated",
"PriceChanged",
"StockAdded",
"StockReserved",
"StockReleased",
"StockShipped",
"ProductDiscontinued",
),
repo: repo,
}
}

// Apply processes an event and updates the read model.
func (p *ProductCatalogProjection) Apply(ctx context.Context, event mink.StoredEvent) error {
switch event.Type {
case "ProductCreated":
var e product.ProductCreated
if err := json.Unmarshal(event.Data, &e); err != nil {
return err
}
return p.repo.Insert(ctx, &ProductView{
ProductID: e.ProductID,
SKU: e.SKU,
Name: e.Name,
Description: e.Description,
Price: e.Price,
AvailableStock: e.InitialStock,
IsAvailable: e.InitialStock > 0,
CreatedAt: e.CreatedAt,
UpdatedAt: e.CreatedAt,
})

case "PriceChanged":
var e product.PriceChanged
if err := json.Unmarshal(event.Data, &e); err != nil {
return err
}
return p.repo.Update(ctx, e.ProductID, func(v *ProductView) {
v.Price = e.NewPrice
v.UpdatedAt = e.ChangedAt
})

case "StockAdded":
var e product.StockAdded
if err := json.Unmarshal(event.Data, &e); err != nil {
return err
}
return p.repo.Update(ctx, e.ProductID, func(v *ProductView) {
v.AvailableStock += e.Quantity
v.IsAvailable = !v.IsDiscontinued && v.AvailableStock > 0
v.UpdatedAt = e.AddedAt
})

case "StockReserved":
var e product.StockReserved
if err := json.Unmarshal(event.Data, &e); err != nil {
return err
}
return p.repo.Update(ctx, e.ProductID, func(v *ProductView) {
v.AvailableStock -= e.Quantity
v.ReservedStock += e.Quantity
v.IsAvailable = !v.IsDiscontinued && v.AvailableStock > 0
v.UpdatedAt = e.ReservedAt
})

case "StockReleased":
var e product.StockReleased
if err := json.Unmarshal(event.Data, &e); err != nil {
return err
}
return p.repo.Update(ctx, e.ProductID, func(v *ProductView) {
v.AvailableStock += e.Quantity
v.ReservedStock -= e.Quantity
v.IsAvailable = !v.IsDiscontinued && v.AvailableStock > 0
v.UpdatedAt = e.ReleasedAt
})

case "StockShipped":
var e product.StockShipped
if err := json.Unmarshal(event.Data, &e); err != nil {
return err
}
return p.repo.Update(ctx, e.ProductID, func(v *ProductView) {
v.ReservedStock -= e.Quantity
v.UpdatedAt = e.ShippedAt
})

case "ProductDiscontinued":
var e product.ProductDiscontinued
if err := json.Unmarshal(event.Data, &e); err != nil {
return err
}
return p.repo.Update(ctx, e.ProductID, func(v *ProductView) {
v.IsDiscontinued = true
v.IsAvailable = false
v.UpdatedAt = e.DiscontinuedAt
})
}

return nil
}

Order History Read Model

Create internal/projections/order_history.go:

package projections

import (
"context"
"encoding/json"
"time"

"go-mink.dev"
"minkshop/internal/domain/order"
)

// OrderView represents an order in the history.
type OrderView struct {
OrderID string `json:"orderId"`
CustomerID string `json:"customerId"`
Status string `json:"status"`
Items []OrderItemView `json:"items"`
ItemCount int `json:"itemCount"`
Subtotal float64 `json:"subtotal"`
ShippingCost float64 `json:"shippingCost"`
Tax float64 `json:"tax"`
TotalAmount float64 `json:"totalAmount"`
ShippingAddress AddressView `json:"shippingAddress"`
TrackingNumber string `json:"trackingNumber,omitempty"`
Carrier string `json:"carrier,omitempty"`
PlacedAt time.Time `json:"placedAt"`
PaidAt *time.Time `json:"paidAt,omitempty"`
ShippedAt *time.Time `json:"shippedAt,omitempty"`
DeliveredAt *time.Time `json:"deliveredAt,omitempty"`
CancelledAt *time.Time `json:"cancelledAt,omitempty"`
UpdatedAt time.Time `json:"updatedAt"`
}

// OrderItemView represents an item in an order view.
type OrderItemView struct {
ProductID string `json:"productId"`
ProductName string `json:"productName"`
Quantity int `json:"quantity"`
UnitPrice float64 `json:"unitPrice"`
Subtotal float64 `json:"subtotal"`
}

// AddressView represents a shipping address.
type AddressView struct {
Name string `json:"name"`
Street string `json:"street"`
City string `json:"city"`
State string `json:"state"`
PostalCode string `json:"postalCode"`
Country string `json:"country"`
}

// OrderHistoryProjection builds the order history read model.
type OrderHistoryProjection struct {
mink.ProjectionBase
repo *mink.InMemoryRepository[OrderView]
}

// NewOrderHistoryProjection creates a new projection.
func NewOrderHistoryProjection(repo *mink.InMemoryRepository[OrderView]) *OrderHistoryProjection {
return &OrderHistoryProjection{
ProjectionBase: mink.NewProjectionBase("OrderHistory",
"OrderPlaced",
"PaymentReceived",
"OrderConfirmed",
"OrderShipped",
"OrderDelivered",
"OrderCancelled",
),
repo: repo,
}
}

// Apply processes an event and updates the read model.
func (p *OrderHistoryProjection) Apply(ctx context.Context, event mink.StoredEvent) error {
switch event.Type {
case "OrderPlaced":
var e order.OrderPlaced
if err := json.Unmarshal(event.Data, &e); err != nil {
return err
}

items := make([]OrderItemView, len(e.Items))
itemCount := 0
for i, item := range e.Items {
items[i] = OrderItemView{
ProductID: item.ProductID,
ProductName: item.ProductName,
Quantity: item.Quantity,
UnitPrice: item.UnitPrice,
Subtotal: item.Subtotal,
}
itemCount += item.Quantity
}

return p.repo.Insert(ctx, &OrderView{
OrderID: e.OrderID,
CustomerID: e.CustomerID,
Status: "pending",
Items: items,
ItemCount: itemCount,
Subtotal: e.Subtotal,
ShippingCost: e.ShippingCost,
Tax: e.Tax,
TotalAmount: e.TotalAmount,
ShippingAddress: AddressView{
Name: e.ShippingAddress.Name,
Street: e.ShippingAddress.Street,
City: e.ShippingAddress.City,
State: e.ShippingAddress.State,
PostalCode: e.ShippingAddress.PostalCode,
Country: e.ShippingAddress.Country,
},
PlacedAt: e.PlacedAt,
UpdatedAt: e.PlacedAt,
})

case "PaymentReceived":
var e order.PaymentReceived
if err := json.Unmarshal(event.Data, &e); err != nil {
return err
}
return p.repo.Update(ctx, e.OrderID, func(v *OrderView) {
v.Status = "paid"
v.PaidAt = &e.ReceivedAt
v.UpdatedAt = e.ReceivedAt
})

case "OrderConfirmed":
var e order.OrderConfirmed
if err := json.Unmarshal(event.Data, &e); err != nil {
return err
}
return p.repo.Update(ctx, e.OrderID, func(v *OrderView) {
v.Status = "confirmed"
v.UpdatedAt = e.ConfirmedAt
})

case "OrderShipped":
var e order.OrderShipped
if err := json.Unmarshal(event.Data, &e); err != nil {
return err
}
return p.repo.Update(ctx, e.OrderID, func(v *OrderView) {
v.Status = "shipped"
v.TrackingNumber = e.TrackingNumber
v.Carrier = e.Carrier
v.ShippedAt = &e.ShippedAt
v.UpdatedAt = e.ShippedAt
})

case "OrderDelivered":
var e order.OrderDelivered
if err := json.Unmarshal(event.Data, &e); err != nil {
return err
}
return p.repo.Update(ctx, e.OrderID, func(v *OrderView) {
v.Status = "delivered"
v.DeliveredAt = &e.DeliveredAt
v.UpdatedAt = e.DeliveredAt
})

case "OrderCancelled":
var e order.OrderCancelled
if err := json.Unmarshal(event.Data, &e); err != nil {
return err
}
return p.repo.Update(ctx, e.OrderID, func(v *OrderView) {
v.Status = "cancelled"
v.CancelledAt = &e.CancelledAt
v.UpdatedAt = e.CancelledAt
})
}

return nil
}

Sales Dashboard (Live Projection)

Create internal/projections/dashboard.go:

package projections

import (
"context"
"encoding/json"
"fmt"
"sync"
"time"

"go-mink.dev"
"minkshop/internal/domain/order"
"minkshop/internal/domain/product"
)

// DashboardStats holds real-time statistics.
type DashboardStats struct {
mu sync.RWMutex

// Orders
TotalOrders int `json:"totalOrders"`
PendingOrders int `json:"pendingOrders"`
ShippedOrders int `json:"shippedOrders"`
TotalRevenue float64 `json:"totalRevenue"`

// Products
TotalProducts int `json:"totalProducts"`
LowStockProducts int `json:"lowStockProducts"`

// Recent activity
RecentActivity []ActivityItem `json:"recentActivity"`
}

// ActivityItem represents a dashboard activity.
type ActivityItem struct {
Timestamp time.Time `json:"timestamp"`
Type string `json:"type"`
Description string `json:"description"`
}

// DashboardProjection provides real-time updates.
type DashboardProjection struct {
mink.LiveProjectionBase
stats *DashboardStats
updates chan DashboardUpdate
}

// DashboardUpdate represents a dashboard update notification.
type DashboardUpdate struct {
Type string `json:"type"`
Message string `json:"message"`
Stats *DashboardStats `json:"stats"`
Time time.Time `json:"time"`
}

// NewDashboardProjection creates a new live dashboard projection.
func NewDashboardProjection() *DashboardProjection {
return &DashboardProjection{
LiveProjectionBase: mink.NewLiveProjectionBase("Dashboard", true,
"OrderPlaced",
"OrderShipped",
"OrderCancelled",
"ProductCreated",
"StockAdded",
"StockReserved",
),
stats: &DashboardStats{
RecentActivity: make([]ActivityItem, 0, 10),
},
updates: make(chan DashboardUpdate, 100),
}
}

// OnEvent handles real-time events.
func (p *DashboardProjection) OnEvent(ctx context.Context, event mink.StoredEvent) {
p.stats.mu.Lock()
defer p.stats.mu.Unlock()

var message string
var activityType string

switch event.Type {
case "OrderPlaced":
var e order.OrderPlaced
if err := json.Unmarshal(event.Data, &e); err != nil {
return
}
p.stats.TotalOrders++
p.stats.PendingOrders++
p.stats.TotalRevenue += e.TotalAmount
message = fmt.Sprintf("New order #%s for $%.2f", e.OrderID, e.TotalAmount)
activityType = "order_placed"

case "OrderShipped":
var e order.OrderShipped
if err := json.Unmarshal(event.Data, &e); err != nil {
return
}
p.stats.PendingOrders--
p.stats.ShippedOrders++
message = fmt.Sprintf("Order #%s shipped via %s", e.OrderID, e.Carrier)
activityType = "order_shipped"

case "OrderCancelled":
var e order.OrderCancelled
if err := json.Unmarshal(event.Data, &e); err != nil {
return
}
p.stats.PendingOrders--
message = fmt.Sprintf("Order #%s cancelled: %s", e.OrderID, e.Reason)
activityType = "order_cancelled"

case "ProductCreated":
var e product.ProductCreated
if err := json.Unmarshal(event.Data, &e); err != nil {
return
}
p.stats.TotalProducts++
if e.InitialStock < 10 {
p.stats.LowStockProducts++
}
message = fmt.Sprintf("New product: %s ($%.2f)", e.Name, e.Price)
activityType = "product_created"

case "StockAdded":
var e product.StockAdded
if err := json.Unmarshal(event.Data, &e); err != nil {
return
}
message = fmt.Sprintf("Stock added: %d units to %s", e.Quantity, e.ProductID)
activityType = "stock_added"
}

if message != "" {
// Add to recent activity (keep last 10)
activity := ActivityItem{
Timestamp: event.Timestamp,
Type: activityType,
Description: message,
}
p.stats.RecentActivity = append([]ActivityItem{activity}, p.stats.RecentActivity...)
if len(p.stats.RecentActivity) > 10 {
p.stats.RecentActivity = p.stats.RecentActivity[:10]
}

// Send update notification
select {
case p.updates <- DashboardUpdate{
Type: activityType,
Message: message,
Stats: p.copyStats(),
Time: event.Timestamp,
}:
default:
// Channel full, skip
}
}
}

// Updates returns the channel for dashboard updates.
func (p *DashboardProjection) Updates() <-chan DashboardUpdate {
return p.updates
}

// GetStats returns a copy of current statistics.
func (p *DashboardProjection) GetStats() *DashboardStats {
p.stats.mu.RLock()
defer p.stats.mu.RUnlock()
return p.copyStats()
}

func (p *DashboardProjection) copyStats() *DashboardStats {
activity := make([]ActivityItem, len(p.stats.RecentActivity))
copy(activity, p.stats.RecentActivity)

return &DashboardStats{
TotalOrders: p.stats.TotalOrders,
PendingOrders: p.stats.PendingOrders,
ShippedOrders: p.stats.ShippedOrders,
TotalRevenue: p.stats.TotalRevenue,
TotalProducts: p.stats.TotalProducts,
LowStockProducts: p.stats.LowStockProducts,
RecentActivity: activity,
}
}

Part 2: Set Up Projection Engine

Update the application to include projections.

Update internal/app/app.go:

package app

import (
"context"
"fmt"
"log"
"time"

"go-mink.dev"
"go-mink.dev/adapters/memory"
"go-mink.dev/adapters/postgres"

"minkshop/internal/domain/cart"
"minkshop/internal/domain/order"
"minkshop/internal/domain/product"
"minkshop/internal/handlers"
"minkshop/internal/projections"
)

// Application holds all application dependencies.
type Application struct {
Store *mink.EventStore
CommandBus *mink.CommandBus
ProjectionEngine *mink.ProjectionEngine

// Read model repositories
ProductRepo *mink.InMemoryRepository[projections.ProductView]
OrderRepo *mink.InMemoryRepository[projections.OrderView]
Dashboard *projections.DashboardProjection

adapter *postgres.PostgresAdapter
}

// Config holds application configuration.
type Config struct {
DatabaseURL string
DatabaseSchema string
MaxConnections int
}

// New creates a new Application instance.
func New(ctx context.Context, cfg Config) (*Application, error) {
// Create PostgreSQL adapter
adapter, err := postgres.NewAdapter(cfg.DatabaseURL,
postgres.WithSchema(cfg.DatabaseSchema),
postgres.WithMaxConnections(cfg.MaxConnections),
)
if err != nil {
return nil, fmt.Errorf("failed to create adapter: %w", err)
}

// Initialize schema
if err := adapter.Initialize(ctx); err != nil {
adapter.Close()
return nil, fmt.Errorf("failed to initialize schema: %w", err)
}

// Create event store
store := mink.New(adapter)

// Register all domain events
store.RegisterEvents(product.AllEvents()...)
store.RegisterEvents(cart.AllEvents()...)
store.RegisterEvents(order.AllEvents()...)

// Create read model repositories
productRepo := mink.NewInMemoryRepository(func(p *projections.ProductView) string {
return p.ProductID
})

orderRepo := mink.NewInMemoryRepository(func(o *projections.OrderView) string {
return o.OrderID
})

// Create checkpoint store
checkpointStore := memory.NewCheckpointStore()

// Create projection engine
engine := mink.NewProjectionEngine(store,
mink.WithCheckpointStore(checkpointStore),
)

// Register inline projections (synchronous)
productCatalog := projections.NewProductCatalogProjection(productRepo)
if err := engine.RegisterInline(productCatalog); err != nil {
adapter.Close()
return nil, fmt.Errorf("failed to register product catalog: %w", err)
}

orderHistory := projections.NewOrderHistoryProjection(orderRepo)
if err := engine.RegisterInline(orderHistory); err != nil {
adapter.Close()
return nil, fmt.Errorf("failed to register order history: %w", err)
}

// Register live projection (real-time)
dashboard := projections.NewDashboardProjection()
if err := engine.RegisterLive(dashboard); err != nil {
adapter.Close()
return nil, fmt.Errorf("failed to register dashboard: %w", err)
}

// Start projection engine
if err := engine.Start(ctx); err != nil {
adapter.Close()
return nil, fmt.Errorf("failed to start projection engine: %w", err)
}

// Create command bus with middleware
bus := mink.NewCommandBus()
bus.Use(
mink.RecoveryMiddleware(),
mink.ValidationMiddleware(),
NewLoggingMiddleware(),
NewTimingMiddleware(),
NewProjectionMiddleware(engine), // Process projections after commands
)

// Register command handlers
productHandler := handlers.NewProductHandler(store)
productHandler.RegisterHandlers(bus)

cartHandler := handlers.NewCartHandler(store)
cartHandler.RegisterHandlers(bus)

orderHandler := handlers.NewOrderHandler(store)
orderHandler.RegisterHandlers(bus)

return &Application{
Store: store,
CommandBus: bus,
ProjectionEngine: engine,
ProductRepo: productRepo,
OrderRepo: orderRepo,
Dashboard: dashboard,
adapter: adapter,
}, nil
}

// Close releases all resources.
func (a *Application) Close() error {
if a.ProjectionEngine != nil {
a.ProjectionEngine.Stop(context.Background())
}
if a.adapter != nil {
return a.adapter.Close()
}
return nil
}

// NewProjectionMiddleware processes events through projections.
func NewProjectionMiddleware(engine *mink.ProjectionEngine) mink.Middleware {
return func(next mink.MiddlewareFunc) mink.MiddlewareFunc {
return func(ctx context.Context, cmd mink.Command) (mink.CommandResult, error) {
result, err := next(ctx, cmd)

// After successful command, trigger projection processing
// In production, this would be handled by subscriptions
// For this tutorial, we'll process manually

return result, err
}
}
}

// Rest of middleware functions...
func NewLoggingMiddleware() mink.Middleware {
return func(next mink.MiddlewareFunc) mink.MiddlewareFunc {
return func(ctx context.Context, cmd mink.Command) (mink.CommandResult, error) {
log.Printf("[CMD] Dispatching %s", cmd.CommandType())
result, err := next(ctx, cmd)
if err != nil {
log.Printf("[CMD] %s failed: %v", cmd.CommandType(), err)
} else if result.IsError() {
log.Printf("[CMD] %s returned error: %v", cmd.CommandType(), result.Error)
} else {
log.Printf("[CMD] %s succeeded (aggregate: %s, version: %d)",
cmd.CommandType(), result.AggregateID, result.Version)
}
return result, err
}
}
}

func NewTimingMiddleware() mink.Middleware {
return func(next mink.MiddlewareFunc) mink.MiddlewareFunc {
return func(ctx context.Context, cmd mink.Command) (mink.CommandResult, error) {
start := time.Now()
result, err := next(ctx, cmd)
duration := time.Since(start)
if duration > 100*time.Millisecond {
log.Printf("[PERF] %s took %v (slow!)", cmd.CommandType(), duration)
}
return result, err
}
}
}

Part 3: Create Query Services

Create query services for read models.

Create internal/queries/product_queries.go:

package queries

import (
"context"

"go-mink.dev"
"minkshop/internal/projections"
)

// ProductQueryService provides product catalog queries.
type ProductQueryService struct {
repo *mink.InMemoryRepository[projections.ProductView]
}

// NewProductQueryService creates a new query service.
func NewProductQueryService(repo *mink.InMemoryRepository[projections.ProductView]) *ProductQueryService {
return &ProductQueryService{repo: repo}
}

// GetProduct returns a product by ID.
func (s *ProductQueryService) GetProduct(ctx context.Context, productID string) (*projections.ProductView, error) {
return s.repo.Get(ctx, productID)
}

// ListProducts returns all products.
func (s *ProductQueryService) ListProducts(ctx context.Context) ([]*projections.ProductView, error) {
return s.repo.Query(ctx, mink.NewQuery())
}

// ListAvailableProducts returns only available products.
func (s *ProductQueryService) ListAvailableProducts(ctx context.Context) ([]*projections.ProductView, error) {
return s.repo.Query(ctx, mink.NewQuery().
Where("IsAvailable", mink.Eq, true).
OrderByDesc("CreatedAt"))
}

// SearchProducts searches products by name.
func (s *ProductQueryService) SearchProducts(ctx context.Context, term string) ([]*projections.ProductView, error) {
return s.repo.Query(ctx, mink.NewQuery().
Where("Name", mink.Contains, term).
Where("IsAvailable", mink.Eq, true))
}

// GetLowStockProducts returns products with low stock.
func (s *ProductQueryService) GetLowStockProducts(ctx context.Context, threshold int) ([]*projections.ProductView, error) {
return s.repo.Query(ctx, mink.NewQuery().
Where("AvailableStock", mink.Lt, threshold).
Where("IsDiscontinued", mink.Eq, false))
}

// GetProductsBySKU returns products matching SKU prefix.
func (s *ProductQueryService) GetProductsBySKU(ctx context.Context, skuPrefix string) ([]*projections.ProductView, error) {
return s.repo.Query(ctx, mink.NewQuery().
Where("SKU", mink.Contains, skuPrefix))
}

Create internal/queries/order_queries.go:

package queries

import (
"context"

"go-mink.dev"
"minkshop/internal/projections"
)

// OrderQueryService provides order history queries.
type OrderQueryService struct {
repo *mink.InMemoryRepository[projections.OrderView]
}

// NewOrderQueryService creates a new query service.
func NewOrderQueryService(repo *mink.InMemoryRepository[projections.OrderView]) *OrderQueryService {
return &OrderQueryService{repo: repo}
}

// GetOrder returns an order by ID.
func (s *OrderQueryService) GetOrder(ctx context.Context, orderID string) (*projections.OrderView, error) {
return s.repo.Get(ctx, orderID)
}

// GetCustomerOrders returns all orders for a customer.
func (s *OrderQueryService) GetCustomerOrders(ctx context.Context, customerID string) ([]*projections.OrderView, error) {
return s.repo.Query(ctx, mink.NewQuery().
Where("CustomerID", mink.Eq, customerID).
OrderByDesc("PlacedAt"))
}

// GetPendingOrders returns orders awaiting shipment.
func (s *OrderQueryService) GetPendingOrders(ctx context.Context) ([]*projections.OrderView, error) {
return s.repo.Query(ctx, mink.NewQuery().
Where("Status", mink.In, []string{"pending", "paid", "confirmed"}).
OrderBy("PlacedAt"))
}

// GetShippedOrders returns orders that have been shipped.
func (s *OrderQueryService) GetShippedOrders(ctx context.Context) ([]*projections.OrderView, error) {
return s.repo.Query(ctx, mink.NewQuery().
Where("Status", mink.Eq, "shipped"))
}

// GetOrdersByStatus returns orders filtered by status.
func (s *OrderQueryService) GetOrdersByStatus(ctx context.Context, status string) ([]*projections.OrderView, error) {
return s.repo.Query(ctx, mink.NewQuery().
Where("Status", mink.Eq, status).
OrderByDesc("UpdatedAt"))
}

// GetRecentOrders returns the most recent orders.
func (s *OrderQueryService) GetRecentOrders(ctx context.Context, limit int) ([]*projections.OrderView, error) {
return s.repo.Query(ctx, mink.NewQuery().
OrderByDesc("PlacedAt").
WithLimit(limit))
}

// GetOrdersByDateRange returns orders within a date range.
// Note: For production, use a database with proper date filtering.
func (s *OrderQueryService) GetHighValueOrders(ctx context.Context, minAmount float64) ([]*projections.OrderView, error) {
return s.repo.Query(ctx, mink.NewQuery().
Where("TotalAmount", mink.Gte, minAmount).
OrderByDesc("TotalAmount"))
}

Part 4: Update Demo with Queries

Update the demo to show queries in action.

Update cmd/server/main.go (add after demo commands):

func runDemo(ctx context.Context, app *app.Application) error {
// ... existing demo code ...

fmt.Println()
fmt.Println("🔍 Running queries...")
fmt.Println()

// Query products
productQueries := queries.NewProductQueryService(app.ProductRepo)

fmt.Println("1. Get product by ID...")
prod, err := productQueries.GetProduct(ctx, "PROD-001")
if err != nil {
log.Printf(" ⚠️ Product not found: %v", err)
} else {
fmt.Printf(" Product: %s - $%.2f (stock: %d)\n",
prod.Name, prod.Price, prod.AvailableStock)
}

fmt.Println("2. List available products...")
products, err := productQueries.ListAvailableProducts(ctx)
if err != nil {
log.Printf(" ⚠️ Query failed: %v", err)
} else {
fmt.Printf(" Found %d available products\n", len(products))
for _, p := range products {
fmt.Printf(" - %s: $%.2f\n", p.Name, p.Price)
}
}

// Query orders
orderQueries := queries.NewOrderQueryService(app.OrderRepo)

fmt.Println("3. Get customer orders...")
orders, err := orderQueries.GetCustomerOrders(ctx, "CUST-001")
if err != nil {
log.Printf(" ⚠️ Query failed: %v", err)
} else {
fmt.Printf(" Customer has %d orders\n", len(orders))
for _, o := range orders {
fmt.Printf(" - Order %s: $%.2f (%s)\n",
o.OrderID, o.TotalAmount, o.Status)
}
}

// Dashboard stats
fmt.Println("4. Dashboard statistics...")
stats := app.Dashboard.GetStats()
fmt.Printf(" Total Orders: %d\n", stats.TotalOrders)
fmt.Printf(" Pending: %d, Shipped: %d\n", stats.PendingOrders, stats.ShippedOrders)
fmt.Printf(" Total Revenue: $%.2f\n", stats.TotalRevenue)
fmt.Printf(" Products: %d (low stock: %d)\n", stats.TotalProducts, stats.LowStockProducts)

if len(stats.RecentActivity) > 0 {
fmt.Println(" Recent Activity:")
for _, activity := range stats.RecentActivity[:min(3, len(stats.RecentActivity))] {
fmt.Printf(" - %s\n", activity.Description)
}
}

return nil
}

func min(a, b int) int {
if a < b {
return a
}
return b
}

Part 5: Process Projections

To ensure projections are updated, we need to process events. Add a helper function.

Add to internal/app/app.go:

// ProcessProjectionsForStream manually processes projections for a stream.
// In production, this would be handled automatically by subscriptions.
func (a *Application) ProcessProjectionsForStream(ctx context.Context, streamID string) error {
events, err := a.Store.LoadRaw(ctx, streamID, 0)
if err != nil {
return err
}

a.ProjectionEngine.ProcessInlineProjections(ctx, events)
a.ProjectionEngine.NotifyLiveProjections(ctx, events)

return nil
}

Update handlers to call this after saving. For example, in product_handler.go:

func (h *ProductHandler) HandleCreateProduct(ctx context.Context, cmd mink.Command) (mink.CommandResult, error) {
// ... existing code ...

// Save to event store
if err := h.store.SaveAggregate(ctx, p); err != nil {
return mink.NewErrorResult(err), err
}

// Process projections (temporary - in production use subscriptions)
h.processProjections(ctx, p.StreamID())

return mink.NewSuccessResult(p.AggregateID(), p.Version()), nil
}

func (h *ProductHandler) processProjections(ctx context.Context, streamID string) {
events, _ := h.store.LoadRaw(ctx, streamID, 0)
// This would be injected in production
// For now, projections are processed by the application
}

What's Next?

You've built the query side of CQRS with:

  • ✅ Product catalog read model
  • ✅ Order history read model
  • ✅ Real-time dashboard projection
  • ✅ Query services with filters
  • ✅ Inline and live projections

In Part 5: Testing, you'll:

  • Write BDD tests for aggregates
  • Test projections with fixtures
  • Add integration tests
  • Use assertion helpers

Next: Part 5: Testing →


Key Takeaways

ConceptDescription
ProjectionTransforms events into optimized read models
InlineSynchronous, strong consistency
AsyncBackground processing, eventual consistency
LiveReal-time updates, transient
CheckpointTracks last processed position

tip

💡 Best Practice: Keep projections simple and focused. One projection per read model. If a query needs data from multiple sources, create a composite projection or join at query time.