Adapter System

v1.0.0

Table of contents

  1. Design Philosophy
  2. Adapter Interfaces
    1. Event Store Adapter (Implemented ✅)
    2. Subscription Adapter (Implemented ✅)
    3. Read Model Adapter (Future)
    4. Snapshot Adapter
    5. Outbox Adapter
  3. PostgreSQL Adapter
  4. MongoDB Adapter
  5. Redis Adapter
  6. Memory Adapter (Testing)
  7. Custom Adapter Template
  8. Configuration

Design Philosophy

go-mink’s adapter system allows mixing different storage backends:

┌─────────────────────────────────────────────────────────────────┐
│                        Your Application                          │
├─────────────────────────────────────────────────────────────────┤
│                         go-mink Core                                │
│                                                                  │
│  EventStore    Projections    Snapshots    Outbox               │
│      │              │             │           │                  │
├──────▼──────────────▼─────────────▼───────────▼─────────────────┤
│                    Adapter Interfaces                            │
├─────────────────────────────────────────────────────────────────┤
│                                                                  │
│  ┌──────────┐  ┌──────────┐  ┌──────────┐  ┌──────────┐        │
│  │PostgreSQL│  │ MongoDB  │  │  Redis   │  │  Memory  │        │
│  └──────────┘  └──────────┘  └──────────┘  └──────────┘        │
│                                                                  │
│  Events ────────► PostgreSQL (ACID, JSON)                       │
│  Read Models ───► MongoDB (flexible queries)                    │
│  Snapshots ─────► Redis (fast access)                           │
│  Cache ─────────► Redis (ephemeral)                             │
│                                                                  │
└─────────────────────────────────────────────────────────────────┘

Adapter Interfaces

Event Store Adapter (Implemented ✅)

// EventStoreAdapter is the interface that database adapters must implement.
// It provides the low-level operations for persisting and retrieving events.
type EventStoreAdapter interface {
    // Append stores events to the specified stream with optimistic concurrency control.
    // expectedVersion specifies the expected current version of the stream:
    //   - AnyVersion (-1): Skip version check
    //   - NoStream (0): Stream must not exist
    //   - StreamExists (-2): Stream must exist
    //   - Any positive number: Stream must be at this exact version
    Append(ctx context.Context, streamID string, events []EventRecord, expectedVersion int64) ([]StoredEvent, error)

    // Load retrieves all events from a stream starting from the specified version.
    Load(ctx context.Context, streamID string, fromVersion int64) ([]StoredEvent, error)

    // GetStreamInfo returns metadata about a stream.
    GetStreamInfo(ctx context.Context, streamID string) (*StreamInfo, error)

    // GetLastPosition returns the global position of the last stored event.
    GetLastPosition(ctx context.Context) (uint64, error)

    // Initialize sets up the required database schema.
    Initialize(ctx context.Context) error

    // Close releases any resources held by the adapter.
    Close() error
}

Subscription Adapter (Implemented ✅)

// SubscriptionAdapter provides event subscription capabilities.
type SubscriptionAdapter interface {
    // LoadFromPosition loads events starting from a global position.
    // This is used by projection engines to catch up on historical events.
    LoadFromPosition(ctx context.Context, fromPosition uint64, limit int) ([]StoredEvent, error)

    // SubscribeAll subscribes to all events across all streams.
    // Optional SubscriptionOptions can be provided to configure buffer size, poll interval, etc.
    SubscribeAll(ctx context.Context, fromPosition uint64, opts ...SubscriptionOptions) (<-chan StoredEvent, error)

    // SubscribeStream subscribes to events from a specific stream.
    // Optional SubscriptionOptions can be provided to configure behavior.
    SubscribeStream(ctx context.Context, streamID string, fromVersion int64, opts ...SubscriptionOptions) (<-chan StoredEvent, error)

    // SubscribeCategory subscribes to all events from streams in a category.
    // Optional SubscriptionOptions can be provided to configure behavior.
    SubscribeCategory(ctx context.Context, category string, fromPosition uint64, opts ...SubscriptionOptions) (<-chan StoredEvent, error)
}

// SubscriptionOptions configures subscription behavior.
type SubscriptionOptions struct {
    BufferSize   int           // Channel buffer size (default: 100)
    PollInterval time.Duration // Polling interval for polling-based subscriptions
    OnError      func(error)   // Error callback for non-fatal errors
}

Read Model Adapter (Future)

// ReadModelAdapter provides generic document storage
type ReadModelAdapter interface {
    // CRUD operations
    Get(ctx context.Context, collection, id string) ([]byte, error)
    Set(ctx context.Context, collection, id string, data []byte) error
    Delete(ctx context.Context, collection, id string) error
    
    // Bulk operations
    GetMany(ctx context.Context, collection string, ids []string) ([][]byte, error)
    SetMany(ctx context.Context, collection string, docs map[string][]byte) error
    
    // Queries
    Query(ctx context.Context, collection string, query QuerySpec) ([][]byte, error)
    Count(ctx context.Context, collection string, query QuerySpec) (int64, error)
    
    // Schema management
    CreateCollection(ctx context.Context, collection string, schema Schema) error
    CreateIndex(ctx context.Context, collection string, index IndexSpec) error
    
    // Transactions (optional)
    BeginTx(ctx context.Context) (Transaction, error)
}

Snapshot Adapter

// SnapshotAdapter stores aggregate snapshots
type SnapshotAdapter interface {
    Save(ctx context.Context, streamID string, version int64, data []byte) error
    Load(ctx context.Context, streamID string) (*SnapshotRecord, error)
    Delete(ctx context.Context, streamID string) error
}

Outbox Adapter

// OutboxAdapter for reliable event publishing
type OutboxAdapter interface {
    // Store outbox entry (in same tx as events)
    Store(ctx context.Context, tx Transaction, entries []OutboxEntry) error
    
    // Fetch unpublished entries
    FetchPending(ctx context.Context, limit int) ([]OutboxEntry, error)
    
    // Mark as published
    MarkPublished(ctx context.Context, ids []string) error
    
    // Cleanup old entries
    Cleanup(ctx context.Context, olderThan time.Duration) error
}

PostgreSQL Adapter

package postgres

import (
    "database/sql"
    "github.com/AshkanYarmoradi/go-mink"
)

type PostgresAdapter struct {
    db     *sql.DB
    schema string
}

func NewAdapter(connStr string, opts ...Option) (*PostgresAdapter, error) {
    db, err := sql.Open("pgx", connStr)
    if err != nil {
        return nil, err
    }
    
    adapter := &PostgresAdapter{
        db:     db,
        schema: "go-mink",
    }
    
    for _, opt := range opts {
        opt(adapter)
    }
    
    return adapter, nil
}

// Options
func WithSchema(schema string) Option {
    return func(a *PostgresAdapter) { a.schema = schema }
}

func WithMaxConnections(n int) Option {
    return func(a *PostgresAdapter) { a.db.SetMaxOpenConns(n) }
}

// Initialize creates required tables
func (a *PostgresAdapter) Initialize(ctx context.Context) error {
    // Create schema
    _, err := a.db.ExecContext(ctx, fmt.Sprintf(
        `CREATE SCHEMA IF NOT EXISTS %s`, a.schema,
    ))
    if err != nil {
        return err
    }
    
    // Create tables (streams, events, checkpoints, outbox)
    return a.runMigrations(ctx)
}

MongoDB Adapter

package mongodb

import (
    "go.mongodb.org/mongo-driver/mongo"
    "github.com/AshkanYarmoradi/go-mink"
)

type MongoAdapter struct {
    client   *mongo.Client
    database string
}

func NewAdapter(uri, database string) (*MongoAdapter, error) {
    client, err := mongo.Connect(context.Background(), 
        options.Client().ApplyURI(uri))
    if err != nil {
        return nil, err
    }
    
    return &MongoAdapter{
        client:   client,
        database: database,
    }, nil
}

// MongoDB-specific: Flexible document queries
func (a *MongoAdapter) Query(ctx context.Context, collection string, 
    query QuerySpec) ([][]byte, error) {
    
    coll := a.client.Database(a.database).Collection(collection)
    
    filter := buildMongoFilter(query.Filters)
    opts := options.Find().
        SetSort(buildMongoSort(query.OrderBy)).
        SetLimit(int64(query.Limit)).
        SetSkip(int64(query.Offset))
    
    cursor, err := coll.Find(ctx, filter, opts)
    if err != nil {
        return nil, err
    }
    defer cursor.Close(ctx)
    
    var results [][]byte
    for cursor.Next(ctx) {
        results = append(results, cursor.Current)
    }
    
    return results, cursor.Err()
}

Redis Adapter

package redis

import (
    "github.com/redis/go-redis/v9"
    "github.com/AshkanYarmoradi/go-mink"
)

type RedisAdapter struct {
    client *redis.Client
    prefix string
}

func NewAdapter(addr string, opts ...Option) *RedisAdapter {
    client := redis.NewClient(&redis.Options{Addr: addr})
    
    return &RedisAdapter{
        client: client,
        prefix: "go-mink:",
    }
}

// Optimized for snapshots - fast key-value access
func (a *RedisAdapter) Save(ctx context.Context, streamID string, 
    version int64, data []byte) error {
    
    key := fmt.Sprintf("%ssnapshot:%s", a.prefix, streamID)
    
    value, _ := json.Marshal(SnapshotRecord{
        Version: version,
        Data:    data,
        SavedAt: time.Now(),
    })
    
    return a.client.Set(ctx, key, value, 0).Err()
}

func (a *RedisAdapter) Load(ctx context.Context, 
    streamID string) (*SnapshotRecord, error) {
    
    key := fmt.Sprintf("%ssnapshot:%s", a.prefix, streamID)
    
    data, err := a.client.Get(ctx, key).Bytes()
    if err == redis.Nil {
        return nil, nil
    }
    if err != nil {
        return nil, err
    }
    
    var record SnapshotRecord
    json.Unmarshal(data, &record)
    return &record, nil
}

Memory Adapter (Testing)

package memory

// InMemoryAdapter for unit tests
type InMemoryAdapter struct {
    mu      sync.RWMutex
    streams map[string][]StoredEvent
    global  []StoredEvent
}

func NewAdapter() *InMemoryAdapter {
    return &InMemoryAdapter{
        streams: make(map[string][]StoredEvent),
    }
}

// Perfect for unit tests - no external dependencies
func (a *InMemoryAdapter) Append(ctx context.Context, streamID string,
    events []EventRecord, expectedVersion int64) ([]StoredEvent, error) {
    
    a.mu.Lock()
    defer a.mu.Unlock()
    
    stream := a.streams[streamID]
    currentVersion := int64(len(stream))
    
    if expectedVersion >= 0 && currentVersion != expectedVersion {
        return nil, go-mink.ErrConcurrencyConflict
    }
    
    var stored []StoredEvent
    for _, e := range events {
        currentVersion++
        se := StoredEvent{
            ID:             uuid.NewString(),
            StreamID:       streamID,
            Type:           e.Type,
            Data:           e.Data,
            Version:        currentVersion,
            GlobalPosition: uint64(len(a.global) + 1),
            Timestamp:      time.Now(),
        }
        stored = append(stored, se)
        a.global = append(a.global, se)
    }
    
    a.streams[streamID] = append(stream, stored...)
    return stored, nil
}

Custom Adapter Template

package myadapter

import "github.com/AshkanYarmoradi/go-mink"

// Implement your own adapter
type MyCustomAdapter struct {
    // Your storage client
}

// Ensure interface compliance at compile time
var _ go-mink.EventStoreAdapter = (*MyCustomAdapter)(nil)

func NewAdapter( /* your config */ ) *MyCustomAdapter {
    return &MyCustomAdapter{}
}

// Implement all EventStoreAdapter methods...
func (a *MyCustomAdapter) Append(ctx context.Context, streamID string,
    events []go-mink.EventRecord, expectedVersion int64) ([]go-mink.StoredEvent, error) {
    // Your implementation
}

// Register with go-mink
func init() {
    go-mink.RegisterAdapter("myadapter", func(config map[string]interface{}) (go-mink.EventStoreAdapter, error) {
        // Create adapter from config
        return NewAdapter(), nil
    })
}

Configuration

// Mix and match adapters via configuration
store, _ := go-mink.New(go-mink.Config{
    // Events in PostgreSQL for ACID guarantees
    EventStore: go-mink.AdapterConfig{
        Type: "postgres",
        Connection: "postgres://localhost/mydb",
        Options: map[string]interface{}{
            "schema": "events",
            "maxConnections": 25,
        },
    },
    
    // Read models in MongoDB for flexible queries
    ReadModels: go-mink.AdapterConfig{
        Type: "mongodb",
        Connection: "mongodb://localhost:27017",
        Options: map[string]interface{}{
            "database": "readmodels",
        },
    },
    
    // Snapshots in Redis for speed
    Snapshots: go-mink.AdapterConfig{
        Type: "redis",
        Connection: "redis://localhost:6379",
    },
})

Next: API Design →