Skip to main content

Event Store Design

Implemented


Event Structure

// EventData represents an event to be stored
type EventData struct {
// Event type identifier (e.g., "OrderCreated")
Type string

// Serialized event payload
Data []byte

// Optional metadata (correlation ID, causation ID, user ID)
Metadata Metadata
}

// StoredEvent represents a persisted event
type StoredEvent struct {
// Global unique event ID
ID string

// Stream this event belongs to
StreamID StreamID

// Event type
Type string

// Serialized payload
Data []byte

// Event metadata
Metadata Metadata

// Position within stream (1-based)
Version int64

// Global position across all streams
GlobalPosition uint64

// When the event was stored
Timestamp time.Time
}

// StreamID uniquely identifies an event stream
type StreamID struct {
// Category (e.g., "Order", "Customer")
Category string

// Instance ID (e.g., "order-123")
ID string
}

func (s StreamID) String() string {
return fmt.Sprintf("%s-%s", s.Category, s.ID)
}

Core Operations

Appending Events

// AppendToStream adds events with optimistic concurrency
func (s *EventStore) AppendToStream(
ctx context.Context,
streamID StreamID,
expectedVersion int64,
events []EventData,
) error

// Usage
err := store.AppendToStream(ctx,
StreamID{Category: "Order", ID: "123"},
2, // Expected version (for optimistic concurrency)
[]EventData{
{Type: "ItemAdded", Data: itemAddedJSON},
},
)

// Special version constants
const (
AnyVersion int64 = -1 // No concurrency check
NoStream int64 = 0 // Stream must not exist
StreamExists int64 = -2 // Stream must exist
)

Loading Events

// LoadStream retrieves all events from a stream
func (s *EventStore) LoadStream(
ctx context.Context,
streamID StreamID,
) ([]StoredEvent, error)

// LoadStreamFrom retrieves events from a specific version
func (s *EventStore) LoadStreamFrom(
ctx context.Context,
streamID StreamID,
fromVersion int64,
) ([]StoredEvent, error)

// LoadStreamRange retrieves a range of events
func (s *EventStore) LoadStreamRange(
ctx context.Context,
streamID StreamID,
fromVersion int64,
count int,
) ([]StoredEvent, error)

Subscriptions

// SubscribeToStream subscribes to a single stream
func (s *EventStore) SubscribeToStream(
ctx context.Context,
streamID StreamID,
fromVersion int64,
) (<-chan StoredEvent, error)

// SubscribeToAll subscribes to all events (for projections)
func (s *EventStore) SubscribeToAll(
ctx context.Context,
fromPosition uint64,
) (<-chan StoredEvent, error)

// SubscribeToCategory subscribes to all streams in a category
func (s *EventStore) SubscribeToCategory(
ctx context.Context,
category string,
fromPosition uint64,
) (<-chan StoredEvent, error)

PostgreSQL Schema

-- Event streams table
CREATE TABLE streams (
id BIGSERIAL PRIMARY KEY,
stream_id VARCHAR(500) NOT NULL UNIQUE,
category VARCHAR(250) NOT NULL,
version BIGINT NOT NULL DEFAULT 0,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);

CREATE INDEX idx_streams_category ON streams(category);

-- Events table
CREATE TABLE events (
global_position BIGSERIAL PRIMARY KEY,
stream_id VARCHAR(500) NOT NULL,
version BIGINT NOT NULL,
event_id UUID NOT NULL DEFAULT gen_random_uuid(),
event_type VARCHAR(500) NOT NULL,
data JSONB NOT NULL,
metadata JSONB,
timestamp TIMESTAMPTZ NOT NULL DEFAULT NOW(),

UNIQUE(stream_id, version)
);

CREATE INDEX idx_events_stream ON events(stream_id, version);
CREATE INDEX idx_events_type ON events(event_type);
CREATE INDEX idx_events_timestamp ON events(timestamp);

-- Optimistic concurrency is enforced by the UNIQUE(stream_id, version)
-- constraint above together with the adapter's append logic; the adapter does
-- not create a stored procedure.

Serialization

// EventSerializer handles event payload serialization
type EventSerializer interface {
Serialize(event interface{}) ([]byte, error)
Deserialize(data []byte, eventType string) (interface{}, error)
}

// JSONSerializer is the default implementation
type JSONSerializer struct {
registry map[string]reflect.Type
}

func (s *JSONSerializer) RegisterEvent(eventType string, example interface{}) {
s.registry[eventType] = reflect.TypeOf(example)
}

// Usage
serializer := go-mink.NewJSONSerializer()
serializer.RegisterEvent("OrderCreated", OrderCreated{})
serializer.RegisterEvent("ItemAdded", ItemAdded{})

store := go-mink.NewEventStore(adapter, go-mink.WithSerializer(serializer))

Metadata

// Metadata contains event context
type Metadata struct {
// Correlation ID for distributed tracing
CorrelationID string `json:"correlationId,omitempty"`

// Causation ID links to causing event/command
CausationID string `json:"causationId,omitempty"`

// User who triggered this event
UserID string `json:"userId,omitempty"`

// Tenant ID for multi-tenancy
TenantID string `json:"tenantId,omitempty"`

// Custom key-value pairs
Custom map[string]string `json:"custom,omitempty"`
}

// MetadataFromContext extracts metadata from context
func MetadataFromContext(ctx context.Context) Metadata {
return Metadata{
CorrelationID: trace.SpanFromContext(ctx).SpanContext().TraceID().String(),
UserID: auth.UserFromContext(ctx),
TenantID: tenant.FromContext(ctx),
}
}

Snapshots

// Snapshot stores aggregate state for fast loading
type Snapshot struct {
StreamID StreamID
Version int64
Data []byte
Timestamp time.Time
}

// SnapshotStore manages aggregate snapshots
type SnapshotStore interface {
Save(ctx context.Context, snapshot Snapshot) error
Load(ctx context.Context, streamID StreamID) (*Snapshot, error)
Delete(ctx context.Context, streamID StreamID) error
}

// SnapshotPolicy determines when to create snapshots
type SnapshotPolicy interface {
ShouldSnapshot(aggregate Aggregate) bool
}

// Every N events
type EveryNEventsPolicy struct {
N int
}

func (p EveryNEventsPolicy) ShouldSnapshot(agg Aggregate) bool {
return agg.Version()%int64(p.N) == 0
}

Next: Read Models →