API Design

Table of contents

  1. Core Package (mink)
    1. Event Store
    2. Aggregates
    3. Events
    4. Projections
    5. Subscriptions
    6. Read Model Repository
    7. Middleware
    8. Multi-tenancy
    9. Errors
    10. Testing Utilities

Core Package (mink)

Event Store

package go-mink

// EventStore is the main entry point
type EventStore struct {
    adapter     EventStoreAdapter
    serializer  Serializer
    projections *ProjectionEngine
    middleware  []Middleware
}

// New creates a new event store
func New(adapter EventStoreAdapter, opts ...Option) *EventStore

// Options
func WithSerializer(s Serializer) Option
func WithMiddleware(m ...Middleware) Option
func WithSnapshots(adapter SnapshotAdapter, policy SnapshotPolicy) Option
func WithProjections(engine *ProjectionEngine) Option
func WithLogger(logger Logger) Option
func WithMetrics(provider MetricsProvider) Option

// Core operations
func (s *EventStore) Append(ctx context.Context, streamID string, 
    events []interface{}, opts ...AppendOption) error

func (s *EventStore) Load(ctx context.Context, streamID string) ([]Event, error)

func (s *EventStore) LoadAggregate(ctx context.Context, agg Aggregate) error

func (s *EventStore) SaveAggregate(ctx context.Context, agg Aggregate) error

// Append options
func ExpectVersion(v int64) AppendOption
func WithMetadata(m Metadata) AppendOption

Aggregates

// Aggregate interface for domain objects
type Aggregate interface {
    AggregateID() string
    AggregateType() string
    Version() int64
    ApplyEvent(event interface{}) error
    UncommittedEvents() []interface{}
    ClearUncommittedEvents()
}

// AggregateBase provides default implementation
type AggregateBase struct {
    id               string
    version          int64
    uncommittedEvents []interface{}
}

func (a *AggregateBase) AggregateID() string { return a.id }
func (a *AggregateBase) SetID(id string)     { a.id = id }
func (a *AggregateBase) Version() int64      { return a.version }
func (a *AggregateBase) SetVersion(v int64)  { a.version = v }

func (a *AggregateBase) Apply(event interface{}) {
    a.uncommittedEvents = append(a.uncommittedEvents, event)
}

func (a *AggregateBase) UncommittedEvents() []interface{} {
    return a.uncommittedEvents
}

func (a *AggregateBase) ClearUncommittedEvents() {
    a.uncommittedEvents = nil
}

Events

// Event represents a stored event
type Event struct {
    ID             string
    StreamID       string
    Type           string
    Data           interface{}
    Metadata       Metadata
    Version        int64
    GlobalPosition uint64
    Timestamp      time.Time
}

// Metadata carries event context
type Metadata struct {
    CorrelationID string
    CausationID   string
    UserID        string
    TenantID      string
    Custom        map[string]string
}

// EventRegistry maps type names to Go types
type EventRegistry struct {
    types map[string]reflect.Type
}

func NewEventRegistry() *EventRegistry

func (r *EventRegistry) Register(eventType string, example interface{})
func (r *EventRegistry) RegisterAll(events ...interface{}) // Uses struct name
func (r *EventRegistry) Lookup(eventType string) (reflect.Type, bool)

Projections

// Projection transforms events to read models
type Projection interface {
    Name() string
    HandledEvents() []string
}

// InlineProjection runs in same transaction
type InlineProjection interface {
    Projection
    Apply(ctx context.Context, tx Transaction, event Event) error
}

// AsyncProjection runs in background
type AsyncProjection interface {
    Projection
    Apply(ctx context.Context, events []Event) error
    Checkpoint() uint64
    SetCheckpoint(pos uint64) error
}

// ProjectionEngine manages all projections
type ProjectionEngine struct {
    inline []InlineProjection
    async  []asyncRunner
    store  EventStore
}

func NewProjectionEngine(store *EventStore) *ProjectionEngine

func (e *ProjectionEngine) RegisterInline(p InlineProjection)
func (e *ProjectionEngine) RegisterAsync(p AsyncProjection, opts AsyncOptions)
func (e *ProjectionEngine) Start(ctx context.Context) error
func (e *ProjectionEngine) Stop() error
func (e *ProjectionEngine) Rebuild(ctx context.Context, name string) error

// AsyncOptions configures async projection processing
type AsyncOptions struct {
    BatchSize    int
    BatchTimeout time.Duration
    Workers      int
    RetryPolicy  RetryPolicy
}

Subscriptions

// Subscribe to event streams
type Subscription interface {
    Events() <-chan Event
    Errors() <-chan error
    Close() error
}

func (s *EventStore) SubscribeAll(ctx context.Context, 
    fromPosition uint64) (Subscription, error)

func (s *EventStore) SubscribeStream(ctx context.Context, 
    streamID string, fromVersion int64) (Subscription, error)

func (s *EventStore) SubscribeCategory(ctx context.Context, 
    category string, fromPosition uint64) (Subscription, error)

// Example usage
sub, _ := store.SubscribeAll(ctx, 0)
defer sub.Close()

for {
    select {
    case event := <-sub.Events():
        fmt.Printf("Event: %s\n", event.Type)
    case err := <-sub.Errors():
        log.Printf("Error: %v\n", err)
    case <-ctx.Done():
        return
    }
}

Read Model Repository

// Generic repository for read models
type Repository[T any] interface {
    Get(ctx context.Context, id string) (*T, error)
    GetAll(ctx context.Context) ([]*T, error)
    Find(ctx context.Context, query Query) ([]*T, error)
    FindOne(ctx context.Context, query Query) (*T, error)
    Save(ctx context.Context, model *T) error
    Delete(ctx context.Context, id string) error
    Count(ctx context.Context, query Query) (int64, error)
}

// Query builder
type Query struct {
    filters  []Filter
    orderBy  []OrderBy
    limit    int
    offset   int
}

func NewQuery() *Query
func (q *Query) Where(field string, op string, value interface{}) *Query
func (q *Query) And(field string, op string, value interface{}) *Query
func (q *Query) Or(field string, op string, value interface{}) *Query
func (q *Query) OrderBy(field string, desc bool) *Query
func (q *Query) Limit(n int) *Query
func (q *Query) Offset(n int) *Query

// Usage
orders, _ := repo.Find(ctx, 
    go-mink.NewQuery().
        Where("status", "=", "pending").
        And("total", ">", 100).
        OrderBy("created_at", true).
        Limit(10),
)

Middleware

// Middleware wraps event store operations
type Middleware func(next Handler) Handler

type Handler func(ctx context.Context, cmd Command) error

// Built-in middleware
func LoggingMiddleware(logger Logger) Middleware
func MetricsMiddleware(provider MetricsProvider) Middleware
func TracingMiddleware(tracer Tracer) Middleware
func RetryMiddleware(policy RetryPolicy) Middleware
func ValidationMiddleware() Middleware

// Usage
store := go-mink.New(adapter,
    go-mink.WithMiddleware(
        go-mink.LoggingMiddleware(logger),
        go-mink.MetricsMiddleware(prometheus.NewProvider()),
        go-mink.TracingMiddleware(otel.Tracer("go-mink")),
    ),
)

Multi-tenancy

// TenantContext adds tenant awareness
type TenantContext struct {
    TenantID string
    store    *EventStore
}

func (s *EventStore) ForTenant(tenantID string) *TenantContext

// Tenant isolation strategies
type TenantStrategy int

const (
    // Shared tables with tenant_id column
    SharedTable TenantStrategy = iota
    
    // Separate schema per tenant
    SchemaPerTenant
    
    // Separate database per tenant  
    DatabasePerTenant
)

// Configuration
config := go-mink.Config{
    MultiTenancy: go-mink.MultiTenancyConfig{
        Strategy: go-mink.SchemaPerTenant,
        SchemaPrefix: "tenant_",
    },
}

// Usage
tenantStore := store.ForTenant("acme-corp")
tenantStore.Append(ctx, "order-123", events)

Errors

// Sentinel errors
var (
    ErrStreamNotFound      = errors.New("stream not found")
    ErrConcurrencyConflict = errors.New("concurrency conflict")
    ErrEventNotFound       = errors.New("event not found")
    ErrProjectionNotFound  = errors.New("projection not found")
    ErrSerializationFailed = errors.New("serialization failed")
    ErrAdapterNotSupported = errors.New("adapter not supported")
)

// Typed errors with details
type ConcurrencyError struct {
    StreamID        string
    ExpectedVersion int64
    ActualVersion   int64
}

func (e *ConcurrencyError) Error() string {
    return fmt.Sprintf("concurrency conflict on stream %s: expected %d, got %d",
        e.StreamID, e.ExpectedVersion, e.ActualVersion)
}

// Error checking
if errors.Is(err, go-mink.ErrConcurrencyConflict) {
    // Handle conflict
}

var concErr *go-mink.ConcurrencyError
if errors.As(err, &concErr) {
    // Access error details
    fmt.Printf("Stream: %s\n", concErr.StreamID)
}

Testing Utilities

package go-minktest

// InMemoryStore for unit tests
func NewInMemoryStore() *go-mink.EventStore

// EventBuilder for test events
type EventBuilder struct{}

func NewEvent(eventType string) *EventBuilder
func (b *EventBuilder) WithData(data interface{}) *EventBuilder
func (b *EventBuilder) WithMetadata(m go-mink.Metadata) *EventBuilder
func (b *EventBuilder) Build() go-mink.Event

// Assertions
func AssertEventTypes(t *testing.T, events []go-mink.Event, types ...string)
func AssertStreamVersion(t *testing.T, store *go-mink.EventStore, streamID string, version int64)
func AssertProjectionState[T any](t *testing.T, repo go-mink.Repository[T], id string, expected T)

// Usage in tests
func TestOrderAggregate(t *testing.T) {
    store := go-minktest.NewInMemoryStore()
    
    order := NewOrder("order-123")
    order.Create("customer-456")
    order.AddItem("SKU-001", 2, 29.99)
    
    store.SaveAggregate(ctx, order)
    
    go-minktest.AssertEventTypes(t, order.UncommittedEvents(),
        "OrderCreated", "ItemAdded")
    go-minktest.AssertStreamVersion(t, store, "order-123", 2)
}

Next: Roadmap →