Skip to main content

Adapter System

v1.0.0


Design Philosophy

go-mink's adapter system allows mixing different storage backends:

┌─────────────────────────────────────────────────────────────────┐
│ Your Application │
├─────────────────────────────────────────────────────────────────┤
│ go-mink Core │
│ │
│ EventStore Projections Snapshots Outbox │
│ │ │ │ │ │
├──────▼──────────────▼─────────────▼───────────▼─────────────────┤
│ Adapter Interfaces │
├─────────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │PostgreSQL│ │ MongoDB │ │ Redis │ │ Memory │ │
│ └──────────┘ └──────────┘ └──────────┘ └──────────┘ │
│ │
│ Events ────────► PostgreSQL (ACID, JSON) │
│ Read Models ───► MongoDB (flexible queries) │
│ Snapshots ─────► Redis (fast access) │
│ Cache ─────────► Redis (ephemeral) │
│ │
└─────────────────────────────────────────────────────────────────┘

Adapter Interfaces

Event Store Adapter (Implemented)

// EventStoreAdapter is the interface that database adapters must implement.
// It provides the low-level operations for persisting and retrieving events.
type EventStoreAdapter interface {
// Append stores events to the specified stream with optimistic concurrency control.
// expectedVersion specifies the expected current version of the stream:
// - AnyVersion (-1): Skip version check
// - NoStream (0): Stream must not exist
// - StreamExists (-2): Stream must exist
// - Any positive number: Stream must be at this exact version
Append(ctx context.Context, streamID string, events []EventRecord, expectedVersion int64) ([]StoredEvent, error)

// Load retrieves all events from a stream starting from the specified version.
Load(ctx context.Context, streamID string, fromVersion int64) ([]StoredEvent, error)

// GetStreamInfo returns metadata about a stream.
GetStreamInfo(ctx context.Context, streamID string) (*StreamInfo, error)

// GetLastPosition returns the global position of the last stored event.
GetLastPosition(ctx context.Context) (uint64, error)

// Initialize sets up the required database schema.
Initialize(ctx context.Context) error

// Close releases any resources held by the adapter.
Close() error
}

Subscription Adapter (Implemented)

// SubscriptionAdapter provides event subscription capabilities.
type SubscriptionAdapter interface {
// LoadFromPosition loads events starting from a global position.
// This is used by projection engines to catch up on historical events.
LoadFromPosition(ctx context.Context, fromPosition uint64, limit int) ([]StoredEvent, error)

// SubscribeAll subscribes to all events across all streams.
// Optional SubscriptionOptions can be provided to configure buffer size, poll interval, etc.
SubscribeAll(ctx context.Context, fromPosition uint64, opts ...SubscriptionOptions) (<-chan StoredEvent, error)

// SubscribeStream subscribes to events from a specific stream.
// Optional SubscriptionOptions can be provided to configure behavior.
SubscribeStream(ctx context.Context, streamID string, fromVersion int64, opts ...SubscriptionOptions) (<-chan StoredEvent, error)

// SubscribeCategory subscribes to all events from streams in a category.
// Optional SubscriptionOptions can be provided to configure behavior.
SubscribeCategory(ctx context.Context, category string, fromPosition uint64, opts ...SubscriptionOptions) (<-chan StoredEvent, error)
}

// SubscriptionOptions configures subscription behavior.
type SubscriptionOptions struct {
BufferSize int // Channel buffer size (default: 100)
PollInterval time.Duration // Polling interval for polling-based subscriptions
OnError func(error) // Error callback for non-fatal errors
}

Read Model Adapter (Future)

// ReadModelAdapter provides generic document storage
type ReadModelAdapter interface {
// CRUD operations
Get(ctx context.Context, collection, id string) ([]byte, error)
Set(ctx context.Context, collection, id string, data []byte) error
Delete(ctx context.Context, collection, id string) error

// Bulk operations
GetMany(ctx context.Context, collection string, ids []string) ([][]byte, error)
SetMany(ctx context.Context, collection string, docs map[string][]byte) error

// Queries
Query(ctx context.Context, collection string, query QuerySpec) ([][]byte, error)
Count(ctx context.Context, collection string, query QuerySpec) (int64, error)

// Schema management
CreateCollection(ctx context.Context, collection string, schema Schema) error
CreateIndex(ctx context.Context, collection string, index IndexSpec) error

// Transactions (optional)
BeginTx(ctx context.Context) (Transaction, error)
}

Snapshot Adapter

// SnapshotAdapter stores aggregate snapshots
type SnapshotAdapter interface {
Save(ctx context.Context, streamID string, version int64, data []byte) error
Load(ctx context.Context, streamID string) (*SnapshotRecord, error)
Delete(ctx context.Context, streamID string) error
}

Outbox Adapter

// OutboxAdapter for reliable event publishing
type OutboxAdapter interface {
// Store outbox entry (in same tx as events)
Store(ctx context.Context, tx Transaction, entries []OutboxEntry) error

// Fetch unpublished entries
FetchPending(ctx context.Context, limit int) ([]OutboxEntry, error)

// Mark as published
MarkPublished(ctx context.Context, ids []string) error

// Cleanup old entries
Cleanup(ctx context.Context, olderThan time.Duration) error
}

PostgreSQL Adapter

package postgres

import (
"database/sql"
"go-mink.dev"
)

type PostgresAdapter struct {
db *sql.DB
schema string
}

func NewAdapter(connStr string, opts ...Option) (*PostgresAdapter, error) {
db, err := sql.Open("pgx", connStr)
if err != nil {
return nil, err
}

adapter := &PostgresAdapter{
db: db,
schema: "go-mink",
}

for _, opt := range opts {
opt(adapter)
}

return adapter, nil
}

// Options
func WithSchema(schema string) Option {
return func(a *PostgresAdapter) { a.schema = schema }
}

func WithMaxConnections(n int) Option {
return func(a *PostgresAdapter) { a.db.SetMaxOpenConns(n) }
}

// Initialize creates required tables
func (a *PostgresAdapter) Initialize(ctx context.Context) error {
// Create schema
_, err := a.db.ExecContext(ctx, fmt.Sprintf(
`CREATE SCHEMA IF NOT EXISTS %s`, a.schema,
))
if err != nil {
return err
}

// Create tables (streams, events, checkpoints, outbox)
return a.runMigrations(ctx)
}

MongoDB Adapter

package mongodb

import (
"go.mongodb.org/mongo-driver/mongo"
"go-mink.dev"
)

type MongoAdapter struct {
client *mongo.Client
database string
}

func NewAdapter(uri, database string) (*MongoAdapter, error) {
client, err := mongo.Connect(context.Background(),
options.Client().ApplyURI(uri))
if err != nil {
return nil, err
}

return &MongoAdapter{
client: client,
database: database,
}, nil
}

// MongoDB-specific: Flexible document queries
func (a *MongoAdapter) Query(ctx context.Context, collection string,
query QuerySpec) ([][]byte, error) {

coll := a.client.Database(a.database).Collection(collection)

filter := buildMongoFilter(query.Filters)
opts := options.Find().
SetSort(buildMongoSort(query.OrderBy)).
SetLimit(int64(query.Limit)).
SetSkip(int64(query.Offset))

cursor, err := coll.Find(ctx, filter, opts)
if err != nil {
return nil, err
}
defer cursor.Close(ctx)

var results [][]byte
for cursor.Next(ctx) {
results = append(results, cursor.Current)
}

return results, cursor.Err()
}

Redis Adapter

package redis

import (
"github.com/redis/go-redis/v9"
"go-mink.dev"
)

type RedisAdapter struct {
client *redis.Client
prefix string
}

func NewAdapter(addr string, opts ...Option) *RedisAdapter {
client := redis.NewClient(&redis.Options{Addr: addr})

return &RedisAdapter{
client: client,
prefix: "go-mink:",
}
}

// Optimized for snapshots - fast key-value access
func (a *RedisAdapter) Save(ctx context.Context, streamID string,
version int64, data []byte) error {

key := fmt.Sprintf("%ssnapshot:%s", a.prefix, streamID)

value, _ := json.Marshal(SnapshotRecord{
Version: version,
Data: data,
SavedAt: time.Now(),
})

return a.client.Set(ctx, key, value, 0).Err()
}

func (a *RedisAdapter) Load(ctx context.Context,
streamID string) (*SnapshotRecord, error) {

key := fmt.Sprintf("%ssnapshot:%s", a.prefix, streamID)

data, err := a.client.Get(ctx, key).Bytes()
if err == redis.Nil {
return nil, nil
}
if err != nil {
return nil, err
}

var record SnapshotRecord
json.Unmarshal(data, &record)
return &record, nil
}

Memory Adapter (Testing)

package memory

// InMemoryAdapter for unit tests
type InMemoryAdapter struct {
mu sync.RWMutex
streams map[string][]StoredEvent
global []StoredEvent
}

func NewAdapter() *InMemoryAdapter {
return &InMemoryAdapter{
streams: make(map[string][]StoredEvent),
}
}

// Perfect for unit tests - no external dependencies
func (a *InMemoryAdapter) Append(ctx context.Context, streamID string,
events []EventRecord, expectedVersion int64) ([]StoredEvent, error) {

a.mu.Lock()
defer a.mu.Unlock()

stream := a.streams[streamID]
currentVersion := int64(len(stream))

if expectedVersion >= 0 && currentVersion != expectedVersion {
return nil, go-mink.ErrConcurrencyConflict
}

var stored []StoredEvent
for _, e := range events {
currentVersion++
se := StoredEvent{
ID: uuid.NewString(),
StreamID: streamID,
Type: e.Type,
Data: e.Data,
Version: currentVersion,
GlobalPosition: uint64(len(a.global) + 1),
Timestamp: time.Now(),
}
stored = append(stored, se)
a.global = append(a.global, se)
}

a.streams[streamID] = append(stream, stored...)
return stored, nil
}

Custom Adapter Template

package myadapter

import "go-mink.dev"

// Implement your own adapter
type MyCustomAdapter struct {
// Your storage client
}

// Ensure interface compliance at compile time
var _ go-mink.EventStoreAdapter = (*MyCustomAdapter)(nil)

func NewAdapter( /* your config */ ) *MyCustomAdapter {
return &MyCustomAdapter{}
}

// Implement all EventStoreAdapter methods...
func (a *MyCustomAdapter) Append(ctx context.Context, streamID string,
events []go-mink.EventRecord, expectedVersion int64) ([]go-mink.StoredEvent, error) {
// Your implementation
}

// Register with go-mink
func init() {
go-mink.RegisterAdapter("myadapter", func(config map[string]interface{}) (go-mink.EventStoreAdapter, error) {
// Create adapter from config
return NewAdapter(), nil
})
}

Configuration

// Mix and match adapters via configuration
store, _ := go-mink.New(go-mink.Config{
// Events in PostgreSQL for ACID guarantees
EventStore: go-mink.AdapterConfig{
Type: "postgres",
Connection: "postgres://localhost/mydb",
Options: map[string]interface{}{
"schema": "events",
"maxConnections": 25,
},
},

// Read models in MongoDB for flexible queries
ReadModels: go-mink.AdapterConfig{
Type: "mongodb",
Connection: "mongodb://localhost:27017",
Options: map[string]interface{}{
"database": "readmodels",
},
},

// Snapshots in Redis for speed
Snapshots: go-mink.AdapterConfig{
Type: "redis",
Connection: "redis://localhost:6379",
},
})

Next: API Design →