Adapter System

Phase 1 Complete

Table of contents

  1. Design Philosophy
  2. Adapter Interfaces
    1. Event Store Adapter (Implemented ✅)
    2. Subscription Adapter (Implemented ✅)
    3. Read Model Adapter (Future - Phase 3)
    4. Snapshot Adapter
    5. Outbox Adapter
  3. PostgreSQL Adapter
  4. MongoDB Adapter
  5. Redis Adapter
  6. Memory Adapter (Testing)
  7. Custom Adapter Template
  8. Configuration

Design Philosophy

go-mink’s adapter system allows mixing different storage backends:

┌─────────────────────────────────────────────────────────────────┐
│                        Your Application                          │
├─────────────────────────────────────────────────────────────────┤
│                         go-mink Core                                │
│                                                                  │
│  EventStore    Projections    Snapshots    Outbox               │
│      │              │             │           │                  │
├──────▼──────────────▼─────────────▼───────────▼─────────────────┤
│                    Adapter Interfaces                            │
├─────────────────────────────────────────────────────────────────┤
│                                                                  │
│  ┌──────────┐  ┌──────────┐  ┌──────────┐  ┌──────────┐        │
│  │PostgreSQL│  │ MongoDB  │  │  Redis   │  │  Memory  │        │
│  └──────────┘  └──────────┘  └──────────┘  └──────────┘        │
│                                                                  │
│  Events ────────► PostgreSQL (ACID, JSON)                       │
│  Read Models ───► MongoDB (flexible queries)                    │
│  Snapshots ─────► Redis (fast access)                           │
│  Cache ─────────► Redis (ephemeral)                             │
│                                                                  │
└─────────────────────────────────────────────────────────────────┘

Adapter Interfaces

Event Store Adapter (Implemented ✅)

// EventStoreAdapter is the interface that database adapters must implement.
// It provides the low-level operations for persisting and retrieving events.
type EventStoreAdapter interface {
    // Append stores events to the specified stream with optimistic concurrency control.
    // expectedVersion specifies the expected current version of the stream:
    //   - AnyVersion (-1): Skip version check
    //   - NoStream (0): Stream must not exist
    //   - StreamExists (-2): Stream must exist
    //   - Any positive number: Stream must be at this exact version
    Append(ctx context.Context, streamID string, events []EventRecord, expectedVersion int64) ([]StoredEvent, error)

    // Load retrieves all events from a stream starting from the specified version.
    Load(ctx context.Context, streamID string, fromVersion int64) ([]StoredEvent, error)

    // GetStreamInfo returns metadata about a stream.
    GetStreamInfo(ctx context.Context, streamID string) (*StreamInfo, error)

    // GetLastPosition returns the global position of the last stored event.
    GetLastPosition(ctx context.Context) (uint64, error)

    // Initialize sets up the required database schema.
    Initialize(ctx context.Context) error

    // Close releases any resources held by the adapter.
    Close() error
}

Subscription Adapter (Implemented ✅)

// SubscriptionAdapter provides event subscription capabilities.
type SubscriptionAdapter interface {
    // LoadFromPosition loads events starting from a global position.
    // This is used by projection engines to catch up on historical events.
    LoadFromPosition(ctx context.Context, fromPosition uint64, limit int) ([]StoredEvent, error)

    // SubscribeAll subscribes to all events across all streams.
    // Optional SubscriptionOptions can be provided to configure buffer size, poll interval, etc.
    SubscribeAll(ctx context.Context, fromPosition uint64, opts ...SubscriptionOptions) (<-chan StoredEvent, error)

    // SubscribeStream subscribes to events from a specific stream.
    // Optional SubscriptionOptions can be provided to configure behavior.
    SubscribeStream(ctx context.Context, streamID string, fromVersion int64, opts ...SubscriptionOptions) (<-chan StoredEvent, error)

    // SubscribeCategory subscribes to all events from streams in a category.
    // Optional SubscriptionOptions can be provided to configure behavior.
    SubscribeCategory(ctx context.Context, category string, fromPosition uint64, opts ...SubscriptionOptions) (<-chan StoredEvent, error)
}

// SubscriptionOptions configures subscription behavior.
type SubscriptionOptions struct {
    BufferSize   int           // Channel buffer size (default: 100)
    PollInterval time.Duration // Polling interval for polling-based subscriptions
    OnError      func(error)   // Error callback for non-fatal errors
}

Read Model Adapter (Future - Phase 3)

// ReadModelAdapter provides generic document storage
type ReadModelAdapter interface {
    // CRUD operations
    Get(ctx context.Context, collection, id string) ([]byte, error)
    Set(ctx context.Context, collection, id string, data []byte) error
    Delete(ctx context.Context, collection, id string) error
    
    // Bulk operations
    GetMany(ctx context.Context, collection string, ids []string) ([][]byte, error)
    SetMany(ctx context.Context, collection string, docs map[string][]byte) error
    
    // Queries
    Query(ctx context.Context, collection string, query QuerySpec) ([][]byte, error)
    Count(ctx context.Context, collection string, query QuerySpec) (int64, error)
    
    // Schema management
    CreateCollection(ctx context.Context, collection string, schema Schema) error
    CreateIndex(ctx context.Context, collection string, index IndexSpec) error
    
    // Transactions (optional)
    BeginTx(ctx context.Context) (Transaction, error)
}

Snapshot Adapter

// SnapshotAdapter stores aggregate snapshots
type SnapshotAdapter interface {
    Save(ctx context.Context, streamID string, version int64, data []byte) error
    Load(ctx context.Context, streamID string) (*SnapshotRecord, error)
    Delete(ctx context.Context, streamID string) error
}

Outbox Adapter

// OutboxAdapter for reliable event publishing
type OutboxAdapter interface {
    // Store outbox entry (in same tx as events)
    Store(ctx context.Context, tx Transaction, entries []OutboxEntry) error
    
    // Fetch unpublished entries
    FetchPending(ctx context.Context, limit int) ([]OutboxEntry, error)
    
    // Mark as published
    MarkPublished(ctx context.Context, ids []string) error
    
    // Cleanup old entries
    Cleanup(ctx context.Context, olderThan time.Duration) error
}

PostgreSQL Adapter

package postgres

import (
    "database/sql"
    "github.com/AshkanYarmoradi/go-mink"
)

type PostgresAdapter struct {
    db     *sql.DB
    schema string
}

func NewAdapter(connStr string, opts ...Option) (*PostgresAdapter, error) {
    db, err := sql.Open("pgx", connStr)
    if err != nil {
        return nil, err
    }
    
    adapter := &PostgresAdapter{
        db:     db,
        schema: "go-mink",
    }
    
    for _, opt := range opts {
        opt(adapter)
    }
    
    return adapter, nil
}

// Options
func WithSchema(schema string) Option {
    return func(a *PostgresAdapter) { a.schema = schema }
}

func WithMaxConnections(n int) Option {
    return func(a *PostgresAdapter) { a.db.SetMaxOpenConns(n) }
}

// Initialize creates required tables
func (a *PostgresAdapter) Initialize(ctx context.Context) error {
    // Create schema
    _, err := a.db.ExecContext(ctx, fmt.Sprintf(
        `CREATE SCHEMA IF NOT EXISTS %s`, a.schema,
    ))
    if err != nil {
        return err
    }
    
    // Create tables (streams, events, checkpoints, outbox)
    return a.runMigrations(ctx)
}

MongoDB Adapter

package mongodb

import (
    "go.mongodb.org/mongo-driver/mongo"
    "github.com/AshkanYarmoradi/go-mink"
)

type MongoAdapter struct {
    client   *mongo.Client
    database string
}

func NewAdapter(uri, database string) (*MongoAdapter, error) {
    client, err := mongo.Connect(context.Background(), 
        options.Client().ApplyURI(uri))
    if err != nil {
        return nil, err
    }
    
    return &MongoAdapter{
        client:   client,
        database: database,
    }, nil
}

// MongoDB-specific: Flexible document queries
func (a *MongoAdapter) Query(ctx context.Context, collection string, 
    query QuerySpec) ([][]byte, error) {
    
    coll := a.client.Database(a.database).Collection(collection)
    
    filter := buildMongoFilter(query.Filters)
    opts := options.Find().
        SetSort(buildMongoSort(query.OrderBy)).
        SetLimit(int64(query.Limit)).
        SetSkip(int64(query.Offset))
    
    cursor, err := coll.Find(ctx, filter, opts)
    if err != nil {
        return nil, err
    }
    defer cursor.Close(ctx)
    
    var results [][]byte
    for cursor.Next(ctx) {
        results = append(results, cursor.Current)
    }
    
    return results, cursor.Err()
}

Redis Adapter

package redis

import (
    "github.com/redis/go-redis/v9"
    "github.com/AshkanYarmoradi/go-mink"
)

type RedisAdapter struct {
    client *redis.Client
    prefix string
}

func NewAdapter(addr string, opts ...Option) *RedisAdapter {
    client := redis.NewClient(&redis.Options{Addr: addr})
    
    return &RedisAdapter{
        client: client,
        prefix: "go-mink:",
    }
}

// Optimized for snapshots - fast key-value access
func (a *RedisAdapter) Save(ctx context.Context, streamID string, 
    version int64, data []byte) error {
    
    key := fmt.Sprintf("%ssnapshot:%s", a.prefix, streamID)
    
    value, _ := json.Marshal(SnapshotRecord{
        Version: version,
        Data:    data,
        SavedAt: time.Now(),
    })
    
    return a.client.Set(ctx, key, value, 0).Err()
}

func (a *RedisAdapter) Load(ctx context.Context, 
    streamID string) (*SnapshotRecord, error) {
    
    key := fmt.Sprintf("%ssnapshot:%s", a.prefix, streamID)
    
    data, err := a.client.Get(ctx, key).Bytes()
    if err == redis.Nil {
        return nil, nil
    }
    if err != nil {
        return nil, err
    }
    
    var record SnapshotRecord
    json.Unmarshal(data, &record)
    return &record, nil
}

Memory Adapter (Testing)

package memory

// InMemoryAdapter for unit tests
type InMemoryAdapter struct {
    mu      sync.RWMutex
    streams map[string][]StoredEvent
    global  []StoredEvent
}

func NewAdapter() *InMemoryAdapter {
    return &InMemoryAdapter{
        streams: make(map[string][]StoredEvent),
    }
}

// Perfect for unit tests - no external dependencies
func (a *InMemoryAdapter) Append(ctx context.Context, streamID string,
    events []EventRecord, expectedVersion int64) ([]StoredEvent, error) {
    
    a.mu.Lock()
    defer a.mu.Unlock()
    
    stream := a.streams[streamID]
    currentVersion := int64(len(stream))
    
    if expectedVersion >= 0 && currentVersion != expectedVersion {
        return nil, go-mink.ErrConcurrencyConflict
    }
    
    var stored []StoredEvent
    for _, e := range events {
        currentVersion++
        se := StoredEvent{
            ID:             uuid.NewString(),
            StreamID:       streamID,
            Type:           e.Type,
            Data:           e.Data,
            Version:        currentVersion,
            GlobalPosition: uint64(len(a.global) + 1),
            Timestamp:      time.Now(),
        }
        stored = append(stored, se)
        a.global = append(a.global, se)
    }
    
    a.streams[streamID] = append(stream, stored...)
    return stored, nil
}

Custom Adapter Template

package myadapter

import "github.com/AshkanYarmoradi/go-mink"

// Implement your own adapter
type MyCustomAdapter struct {
    // Your storage client
}

// Ensure interface compliance at compile time
var _ go-mink.EventStoreAdapter = (*MyCustomAdapter)(nil)

func NewAdapter( /* your config */ ) *MyCustomAdapter {
    return &MyCustomAdapter{}
}

// Implement all EventStoreAdapter methods...
func (a *MyCustomAdapter) Append(ctx context.Context, streamID string,
    events []go-mink.EventRecord, expectedVersion int64) ([]go-mink.StoredEvent, error) {
    // Your implementation
}

// Register with go-mink
func init() {
    go-mink.RegisterAdapter("myadapter", func(config map[string]interface{}) (go-mink.EventStoreAdapter, error) {
        // Create adapter from config
        return NewAdapter(), nil
    })
}

Configuration

// Mix and match adapters via configuration
store, _ := go-mink.New(go-mink.Config{
    // Events in PostgreSQL for ACID guarantees
    EventStore: go-mink.AdapterConfig{
        Type: "postgres",
        Connection: "postgres://localhost/mydb",
        Options: map[string]interface{}{
            "schema": "events",
            "maxConnections": 25,
        },
    },
    
    // Read models in MongoDB for flexible queries
    ReadModels: go-mink.AdapterConfig{
        Type: "mongodb",
        Connection: "mongodb://localhost:27017",
        Options: map[string]interface{}{
            "database": "readmodels",
        },
    },
    
    // Snapshots in Redis for speed
    Snapshots: go-mink.AdapterConfig{
        Type: "redis",
        Connection: "redis://localhost:6379",
    },
})

Next: API Design →