Part 3: Building Your First Aggregate
Domain modeling with Aggregates—the heart of event sourcing.
Table of contents
- The Problem with Raw Events
- What is an Aggregate?
- The Aggregate Interface
- Your First Aggregate: Shopping Cart
- Using the Aggregate
- The Apply Pattern
- Handling Concurrency
- Testing Aggregates
- What’s Next?
- Key Takeaways
This is Part 3 of an 8-part series on Event Sourcing and CQRS with Go. In this post, we’ll learn about Aggregates—the heart of domain-driven design and event sourcing.
The Problem with Raw Events
In Part 2, we manually rebuilt state by loading events and switching on their types:
func rebuildTask(events []mink.Event) *Task {
task := &Task{}
for _, event := range events {
switch e := event.Data.(type) {
case TaskCreated:
task.ID = e.TaskID
task.Title = e.Title
// ... more fields
case TaskCompleted:
task.IsCompleted = true
// ... more fields
}
}
return task
}
This works, but has problems:
- Scattered logic: State rebuilding is separate from business operations
- No encapsulation: Anyone can create events without validation
- No invariants: Business rules aren’t enforced
- Duplication: The switch statement appears everywhere
We need something better: Aggregates.
What is an Aggregate?
An Aggregate is a cluster of domain objects that can be treated as a single unit. In event sourcing, an aggregate:
- Encapsulates state and the rules that govern it
- Produces events when its state changes
- Rebuilds from events during loading
- Enforces invariants (business rules that must always be true)
Think of an aggregate as the guardian of a consistency boundary. All modifications go through the aggregate, and it ensures the rules are followed.
The Aggregate Interface
go-mink defines the aggregate contract:
type Aggregate interface {
AggregateID() string // Unique identifier
AggregateType() string // Category name (e.g., "Order", "Task")
Version() int64 // Current version (event count)
ApplyEvent(event interface{}) error // Rebuild state from one event
UncommittedEvents() []interface{} // Events not yet saved
ClearUncommittedEvents() // Called after successful save
}
You could implement all these methods yourself, but go-mink provides AggregateBase to handle the boilerplate.
Your First Aggregate: Shopping Cart
Let’s build a shopping cart aggregate step by step.
Step 1: Define the Events
package cart
import "time"
// CartCreated is raised when a new cart is created
type CartCreated struct {
CartID string `json:"cartId"`
CustomerID string `json:"customerId"`
}
// ItemAdded is raised when an item is added to the cart
type ItemAdded struct {
SKU string `json:"sku"`
Name string `json:"name"`
Quantity int `json:"quantity"`
Price float64 `json:"price"`
}
// ItemRemoved is raised when an item is removed from the cart
type ItemRemoved struct {
SKU string `json:"sku"`
}
// CartCheckedOut is raised when the cart is converted to an order
type CartCheckedOut struct {
OrderID string `json:"orderId"`
CheckedOut time.Time `json:"checkedOutAt"`
}
Step 2: Define the Aggregate State
package cart
import "github.com/AshkanYarmoradi/go-mink"
// CartItem represents an item in the cart
type CartItem struct {
SKU string
Name string
Quantity int
Price float64
}
// Cart is the shopping cart aggregate
type Cart struct {
mink.AggregateBase // Embed the base implementation
CustomerID string
Items map[string]*CartItem // SKU -> Item
IsCheckedOut bool
OrderID string
}
// NewCart creates a new cart aggregate
func NewCart(id string) *Cart {
return &Cart{
AggregateBase: mink.NewAggregateBase(id, "Cart"),
Items: make(map[string]*CartItem),
}
}
Key points:
- Embed
mink.AggregateBaseto get the common functionality - Use
mink.NewAggregateBase(id, type)to initialize it - The type name
"Cart"becomes part of the stream ID
Step 3: Implement ApplyEvent
The ApplyEvent method rebuilds state from a single event. It’s called:
- During loading (replay)
- Immediately after recording a new event
// ApplyEvent updates state based on a single event
func (c *Cart) ApplyEvent(event interface{}) error {
switch e := event.(type) {
case CartCreated:
c.CustomerID = e.CustomerID
case ItemAdded:
if item, exists := c.Items[e.SKU]; exists {
item.Quantity += e.Quantity
} else {
c.Items[e.SKU] = &CartItem{
SKU: e.SKU,
Name: e.Name,
Quantity: e.Quantity,
Price: e.Price,
}
}
case ItemRemoved:
delete(c.Items, e.SKU)
case CartCheckedOut:
c.IsCheckedOut = true
c.OrderID = e.OrderID
}
c.IncrementVersion()
return nil
}
Critical rule:
ApplyEventmust be:
- Deterministic: Same event always produces same state change
- Side-effect free: No I/O, no external calls
- Idempotent: Can be called multiple times safely
Step 4: Implement Business Methods
Now the interesting part—business operations that produce events:
import (
"errors"
"fmt"
"time"
)
var (
ErrCartAlreadyExists = errors.New("cart already exists")
ErrCartEmpty = errors.New("cart is empty")
ErrCartCheckedOut = errors.New("cart is already checked out")
ErrItemNotFound = errors.New("item not found in cart")
)
// Create initializes a new cart
func (c *Cart) Create(customerID string) error {
// Invariant: Can only create once
if c.Version() > 0 {
return ErrCartAlreadyExists
}
// Record the event
c.Apply(CartCreated{
CartID: c.AggregateID(),
CustomerID: customerID,
})
return nil
}
// AddItem adds an item to the cart
func (c *Cart) AddItem(sku, name string, quantity int, price float64) error {
// Invariant: Cannot modify checked-out cart
if c.IsCheckedOut {
return ErrCartCheckedOut
}
// Validation
if quantity <= 0 {
return fmt.Errorf("quantity must be positive, got %d", quantity)
}
if price < 0 {
return fmt.Errorf("price cannot be negative, got %.2f", price)
}
// Record the event
c.Apply(ItemAdded{
SKU: sku,
Name: name,
Quantity: quantity,
Price: price,
})
return nil
}
// RemoveItem removes an item from the cart
func (c *Cart) RemoveItem(sku string) error {
if c.IsCheckedOut {
return ErrCartCheckedOut
}
// Invariant: Item must exist
if _, exists := c.Items[sku]; !exists {
return ErrItemNotFound
}
c.Apply(ItemRemoved{SKU: sku})
return nil
}
// Checkout converts the cart to an order
func (c *Cart) Checkout(orderID string) error {
if c.IsCheckedOut {
return ErrCartCheckedOut
}
// Invariant: Cannot checkout empty cart
if len(c.Items) == 0 {
return ErrCartEmpty
}
c.Apply(CartCheckedOut{
OrderID: orderID,
CheckedOut: time.Now(),
})
return nil
}
// Total calculates the cart total
func (c *Cart) Total() float64 {
var total float64
for _, item := range c.Items {
total += item.Price * float64(item.Quantity)
}
return total
}
Notice the pattern in each method:
- Check invariants — Is this operation allowed?
- Validate input — Is the data correct?
- Apply event — Record what happened
- Return — Success or error
The Apply() method (from AggregateBase) does two things:
- Adds the event to uncommitted events
- Calls
ApplyEvent()to update state immediately
Using the Aggregate
Creating and Saving
ctx := context.Background()
// Create a new cart
cart := cart.NewCart("cart-001")
if err := cart.Create("customer-123"); err != nil {
log.Fatal(err)
}
// Add some items
cart.AddItem("LAPTOP-01", "MacBook Pro", 1, 2499.00)
cart.AddItem("MOUSE-01", "Magic Mouse", 2, 99.00)
// Save to the store
if err := store.SaveAggregate(ctx, cart); err != nil {
log.Fatal(err)
}
fmt.Printf("Cart saved with %d events, version %d\n",
len(cart.UncommittedEvents()), cart.Version())
Loading and Modifying
// Create an empty cart with the same ID
cart := cart.NewCart("cart-001")
// Load replays all events to rebuild state
if err := store.LoadAggregate(ctx, cart); err != nil {
log.Fatal(err)
}
fmt.Printf("Loaded cart with %d items, version %d\n",
len(cart.Items), cart.Version())
// Modify
cart.RemoveItem("MOUSE-01")
cart.Checkout("order-456")
// Save the new events
if err := store.SaveAggregate(ctx, cart); err != nil {
log.Fatal(err)
}
The Apply Pattern
A key pattern in event-sourced aggregates is the separation between:
- Business methods — Validate, enforce invariants, call
Apply() - ApplyEvent — Update state based on events
┌────────────────────────────────────────────────────────┐
│ Business Method │
│ 1. Validate input │
│ 2. Check invariants │
│ 3. Call Apply(event) ──────────────┐ │
│ 4. Return success/error │ │
└─────────────────────────────────────│──────────────────┘
│
▼
┌────────────────────────────────────────────────────────┐
│ Apply() │
│ 1. Add event to uncommittedEvents │
│ 2. Call ApplyEvent(event) ─────────┐ │
└─────────────────────────────────────│──────────────────┘
│
▼
┌────────────────────────────────────────────────────────┐
│ ApplyEvent() │
│ 1. Switch on event type │
│ 2. Update internal state │
│ 3. Increment version │
└────────────────────────────────────────────────────────┘
Why this separation?
ApplyEventis also called during loading to replay history- Business methods are only called for new operations
- Invariants are checked in business methods, not
ApplyEvent
Handling Concurrency
When you save an aggregate, go-mink uses the version for optimistic concurrency:
cart := cart.NewCart("cart-001")
store.LoadAggregate(ctx, cart) // Version is now 3
cart.AddItem("GADGET-01", "Widget", 1, 50.00)
// Meanwhile, another process also modified the cart...
err := store.SaveAggregate(ctx, cart)
if errors.Is(err, mink.ErrConcurrencyConflict) {
// Reload and retry
cart = cart.NewCart("cart-001")
store.LoadAggregate(ctx, cart)
// Try the operation again
}
SaveAggregate expects the stream version to match the aggregate’s version when loaded. If someone else appended events in between, you get a conflict.
Testing Aggregates
Aggregates are pure logic—perfect for testing:
func TestCart_AddItem(t *testing.T) {
cart := NewCart("test-cart")
cart.Create("customer-1")
err := cart.AddItem("SKU-1", "Widget", 2, 10.00)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if len(cart.Items) != 1 {
t.Errorf("expected 1 item, got %d", len(cart.Items))
}
if cart.Items["SKU-1"].Quantity != 2 {
t.Errorf("expected quantity 2, got %d", cart.Items["SKU-1"].Quantity)
}
}
func TestCart_CannotCheckoutEmpty(t *testing.T) {
cart := NewCart("test-cart")
cart.Create("customer-1")
err := cart.Checkout("order-1")
if !errors.Is(err, ErrCartEmpty) {
t.Errorf("expected ErrCartEmpty, got %v", err)
}
}
BDD-Style Testing with go-mink
go-mink provides helpers for behavior-driven testing:
func TestCart_Checkout(t *testing.T) {
mink.Given(t, NewCart("cart-1"),
CartCreated{CartID: "cart-1", CustomerID: "cust-1"},
ItemAdded{SKU: "SKU-1", Name: "Widget", Quantity: 1, Price: 10.00},
).
When(func(c *Cart) error {
return c.Checkout("order-1")
}).
Then(
CartCheckedOut{OrderID: "order-1"},
)
}
This reads naturally:
- Given this history of events…
- When we perform this action…
- Then expect these events to be produced
What’s Next?
In this post, you learned:
- What aggregates are and why they matter
- How to implement the
Aggregateinterface usingAggregateBase - The separation between business methods and
ApplyEvent - How to enforce invariants and validation
- Testing patterns for aggregates
In Part 4, we’ll dive deeper into the Event Store, exploring advanced features like stream metadata, subscriptions, and handling large event streams.
Key Takeaways
- Aggregates encapsulate logic: State + behavior + invariants
- Business methods produce events: Validate, then Apply()
- ApplyEvent rebuilds state: Pure, deterministic, no I/O
- Version enables concurrency: Optimistic locking built-in
- Test business logic directly: Aggregates are pure and testable