ADR-006: Optimistic Concurrency Control
| Status | Date | Deciders |
|---|---|---|
| Accepted | 2024-01-15 | Core Team |
Context
Event sourcing requires protecting against concurrent modifications to the same aggregate. Consider this scenario:
Time User A User B
────────────────────────────────────────────────
T1 Load Order (v1)
T2 Load Order (v1)
T3 Add Item → v2
T4 Add Item → v2 (CONFLICT!)
Both users loaded version 1 and try to create version 2. Without protection, one user’s changes would be lost.
We need to decide how to handle concurrent access:
- Pessimistic Locking: Lock aggregate during edit
- Optimistic Concurrency: Detect conflicts at write time
- Last Write Wins: No protection (not acceptable)
Decision
We will use Optimistic Concurrency Control (OCC) with version-based conflict detection.
Mechanism
Each event append includes an expected version:
func (s *EventStore) Append(ctx context.Context, streamID string, events []EventData, expectedVersion int64) error
The append only succeeds if the current stream version matches the expected version.
Version Constants
const (
AnyVersion int64 = -1 // Skip version check (use carefully!)
NoStream int64 = 0 // Stream must not exist (create)
StreamExists int64 = -2 // Stream must exist (update)
)
PostgreSQL Implementation
-- Atomic version check and insert
WITH current_version AS (
SELECT COALESCE(MAX(version), 0) as v
FROM mink_events
WHERE stream_id = $1
)
INSERT INTO mink_events (stream_id, version, type, data, metadata)
SELECT $1, cv.v + generate_series(1, $2),
unnest($3::text[]),
unnest($4::jsonb[]),
unnest($5::jsonb[])
FROM current_version cv
WHERE cv.v = $6 -- Expected version check
RETURNING *;
Error Handling
var ErrConcurrencyConflict = errors.New("mink: concurrency conflict")
type ConcurrencyError struct {
StreamID string
ExpectedVersion int64
ActualVersion int64
}
func (e *ConcurrencyError) Error() string {
return fmt.Sprintf("concurrency conflict on stream %s: expected %d, got %d",
e.StreamID, e.ExpectedVersion, e.ActualVersion)
}
func (e *ConcurrencyError) Is(target error) bool {
return target == ErrConcurrencyConflict
}
Usage in Aggregates
func (s *EventStore) SaveAggregate(ctx context.Context, agg Aggregate) error {
events := agg.UncommittedEvents()
if len(events) == 0 {
return nil
}
// Use aggregate version for expected version
expectedVersion := agg.Version()
eventData := make([]EventData, len(events))
for i, e := range events {
eventData[i] = s.serialize(e)
}
_, err := s.Append(ctx, agg.StreamID(), eventData, expectedVersion)
if err != nil {
return err
}
agg.ClearUncommittedEvents()
return nil
}
Conflict Resolution Strategies
When a conflict occurs, applications can:
- Retry with Reload:
func SaveWithRetry(ctx context.Context, store *EventStore, agg Aggregate, action func() error) error { for attempts := 0; attempts < 3; attempts++ { if err := action(); err != nil { return err } err := store.SaveAggregate(ctx, agg) if err == nil { return nil } if !errors.Is(err, mink.ErrConcurrencyConflict) { return err } // Reload and retry agg.ClearUncommittedEvents() if err := store.LoadAggregate(ctx, agg); err != nil { return err } } return ErrMaxRetriesExceeded } - Merge Changes (domain-specific):
func MergeOrderItems(original, concurrent []Item) []Item { // Domain-specific merge logic } - User Resolution:
if errors.Is(err, mink.ErrConcurrencyConflict) { return NewConflictResponse(agg.Version(), actualVersion) }
Consequences
Positive
- No Deadlocks: No locks held during user think time
- High Throughput: Multiple readers don’t block each other
- Simple Model: Just version numbers, no lock management
- Scalable: Works across multiple application instances
- Natural Fit: Version numbers already exist in event streams
Negative
- Retry Logic: Applications must handle conflicts
- Write Failures: High-contention scenarios may see many conflicts
- Complexity: Conflict resolution can be domain-specific
Neutral
- Conflict Visibility: Developers must think about concurrency
- Testing: Need to test conflict scenarios
When to Use AnyVersion
AnyVersion (-1) skips the version check. Use sparingly:
// Acceptable: Append-only log with no business logic
store.Append(ctx, "audit-log", events, mink.AnyVersion)
// Dangerous: Business aggregate without version check
store.Append(ctx, "order-123", events, mink.AnyVersion) // DON'T DO THIS
Alternatives Considered
Alternative 1: Pessimistic Locking
Description: Lock the aggregate before editing.
Pros:
- No conflicts possible
- Simpler application logic
Rejected because:
- Deadlock risk
- Reduced throughput
- Doesn’t scale across instances
- Holds locks during user think time
Alternative 2: Event Sequence Numbers
Description: Use global sequence instead of per-stream version.
Rejected because:
- Single point of contention
- Doesn’t prevent per-stream conflicts
- Complicates partitioning
Alternative 3: Timestamp-Based
Description: Use timestamps for conflict detection.
Rejected because:
- Clock skew issues
- Less precise than versions
- Doesn’t guarantee ordering
Alternative 4: CRDTs
Description: Use conflict-free replicated data types.
Rejected because:
- Limited to specific data structures
- Doesn’t fit all domain models
- Adds significant complexity