js

Complete Node.js Event Sourcing Guide: TypeScript, PostgreSQL, and Real-World Implementation

Learn to implement Event Sourcing with Node.js, TypeScript & PostgreSQL. Build event stores, handle versioning, create projections & optimize performance for scalable systems.

Complete Node.js Event Sourcing Guide: TypeScript, PostgreSQL, and Real-World Implementation

Here’s a comprehensive guide to implementing event sourcing with Node.js, TypeScript, and PostgreSQL:


Lately, I’ve been thinking about how we track changes in complex systems. Traditional approaches often lose valuable historical context. That’s when event sourcing caught my attention - a method where we capture every state change as immutable events. Why settle for current state alone when you can reconstruct any moment in time? Let me show you how to implement this powerful pattern.

First, we need a solid foundation. Start by setting up your project:

mkdir event-sourcing-app
cd event-sourcing-app
npm init -y
npm install express pg uuid @types/uuid @types/pg
npm install -D typescript @types/node ts-node

Configure TypeScript with this tsconfig.json:

{
  "compilerOptions": {
    "target": "ES2020",
    "module": "commonjs",
    "rootDir": "./src",
    "outDir": "./dist",
    "strict": true,
    "esModuleInterop": true
  }
}

Our event store needs a proper PostgreSQL schema. Notice how we’re capturing both the event data and metadata:

CREATE TABLE events (
    id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
    aggregate_id UUID NOT NULL,
    event_type VARCHAR(255) NOT NULL,
    event_data JSONB NOT NULL,
    version INTEGER NOT NULL,
    timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    UNIQUE (aggregate_id, version)
);

CREATE INDEX idx_aggregate_id ON events (aggregate_id);

Now, let’s define our core interfaces. This TypeScript foundation ensures type safety throughout our system:

// src/types/events.ts
export interface DomainEvent {
  aggregateId: string;
  eventType: string;
  eventData: unknown;
  version: number;
}

export interface EventStore {
  saveEvents(aggregateId: string, events: DomainEvent[], expectedVersion: number): Promise<void>;
  getEvents(aggregateId: string): Promise<DomainEvent[]>;
}

The real magic happens in the event store implementation. Notice how we handle concurrency conflicts:

// src/infrastructure/PostgresEventStore.ts
import { Pool } from 'pg';
import { DomainEvent, EventStore } from '../types/events';

export class PostgresEventStore implements EventStore {
  constructor(private pool: Pool) {}

  async saveEvents(
    aggregateId: string,
    events: DomainEvent[],
    expectedVersion: number
  ): Promise<void> {
    const client = await this.pool.connect();
    try {
      await client.query('BEGIN');
      
      const res = await client.query(
        'SELECT MAX(version) as current FROM events WHERE aggregate_id = $1',
        [aggregateId]
      );
      const currentVersion = res.rows[0]?.current || 0;
      
      if (currentVersion !== expectedVersion) {
        throw new Error(`Version conflict: Expected ${expectedVersion}, found ${currentVersion}`);
      }
      
      for (const [index, event] of events.entries()) {
        await client.query(
          `INSERT INTO events (aggregate_id, event_type, event_data, version)
           VALUES ($1, $2, $3, $4)`,
          [aggregateId, event.eventType, JSON.stringify(event.eventData), expectedVersion + index + 1]
        );
      }
      
      await client.query('COMMIT');
    } catch (error) {
      await client.query('ROLLBACK');
      throw error;
    } finally {
      client.release();
    }
  }

  async getEvents(aggregateId: string): Promise<DomainEvent[]> {
    const res = await this.pool.query(
      `SELECT event_type, event_data, version 
       FROM events 
       WHERE aggregate_id = $1 
       ORDER BY version ASC`,
      [aggregateId]
    );
    
    return res.rows.map(row => ({
      aggregateId,
      eventType: row.event_type,
      eventData: JSON.parse(row.event_data),
      version: row.version
    }));
  }
}

Aggregates are where business rules live. They process commands and produce events. How might we handle inventory updates?

// src/domain/InventoryItem.ts
export class InventoryItem {
  private _pendingEvents: DomainEvent[] = [];

  constructor(
    public readonly id: string,
    private count: number,
    private version: number = 0
  ) {}

  get pendingEvents(): DomainEvent[] {
    return this._pendingEvents;
  }

  adjustCount(change: number): void {
    if (this.count + change < 0) {
      throw new Error('Insufficient inventory');
    }
    
    this.count += change;
    this._pendingEvents.push({
      aggregateId: this.id,
      eventType: 'InventoryAdjusted',
      eventData: { change, newCount: this.count },
      version: this.version + this._pendingEvents.length + 1
    });
  }

  static fromEvents(id: string, events: DomainEvent[]): InventoryItem {
    let count = 0;
    events.forEach(event => {
      if (event.eventType === 'InventoryAdjusted') {
        count += (event.eventData as any).change;
      }
    });
    return new InventoryItem(id, count, events.length);
  }
}

Projections transform events into read-optimized views. Here’s a simple example:

// src/projections/inventoryProjection.ts
export class InventoryProjection {
  private inventory: Map<string, number> = new Map();

  applyEvent(event: DomainEvent): void {
    if (event.eventType === 'InventoryAdjusted') {
      const current = this.inventory.get(event.aggregateId) || 0;
      this.inventory.set(event.aggregateId, current + (event.eventData as any).change);
    }
  }

  getCount(itemId: string): number {
    return this.inventory.get(itemId) || 0;
  }
}

For performance, we implement snapshots. When would you trigger a snapshot? Typically after every 100 events or so:

CREATE TABLE snapshots (
    aggregate_id UUID PRIMARY KEY,
    data JSONB NOT NULL,
    version INTEGER NOT NULL,
    timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
// src/infrastructure/SnapshotRepository.ts
export async function saveSnapshot(
  pool: Pool, 
  aggregateId: string, 
  data: any, 
  version: number
): Promise<void> {
  await pool.query(
    `INSERT INTO snapshots (aggregate_id, data, version)
     VALUES ($1, $2, $3)
     ON CONFLICT (aggregate_id) 
     DO UPDATE SET data = $2, version = $3`,
    [aggregateId, JSON.stringify(data), version]
  );
}

export async function loadSnapshot(
  pool: Pool, 
  aggregateId: string
): Promise<{ data: any; version: number } | null> {
  const res = await pool.query(
    'SELECT data, version FROM snapshots WHERE aggregate_id = $1',
    [aggregateId]
  );
  return res.rows[0] 
    ? { data: JSON.parse(res.rows[0].data), version: res.rows[0].version } 
    : null;
}

Testing event-sourced systems requires special attention. How do you verify temporal behavior? I recommend these patterns:

// tests/inventory.test.ts
describe('InventoryItem', () => {
  it('should reject negative inventory', () => {
    const item = new InventoryItem('item1', 10);
    expect(() => item.adjustCount(-11)).toThrow('Insufficient inventory');
  });

  it('should rebuild state from events', () => {
    const events = [
      { eventType: 'InventoryAdjusted', eventData: { change: 5 } },
      { eventType: 'InventoryAdjusted', eventData: { change: -3 } }
    ] as DomainEvent[];
    
    const item = InventoryItem.fromEvents('item1', events);
    expect(item.count).toEqual(2);
  });
});

For production, consider these optimizations:

  • Partition events by aggregate ID
  • Use MATERIALIZED VIEWS for frequent queries
  • Compress older events
  • Separate read/write databases

Common pitfalls I’ve encountered:

  • Forgetting to reset pending events after saving
  • Version mismatch during concurrent updates
  • Overlooking event schema evolution
  • Projection latency in read models

Event sourcing isn’t just a technical pattern - it changes how you think about system state. By capturing every change as immutable facts, we gain audit trails, temporal querying, and robust failure recovery. The initial effort pays off in maintainability and insight.

Found this useful? Implement it in your next project and share your experience! Like this guide if it helped you, share it with your team, and comment below with your event sourcing challenges.


This implementation provides:

  • Full event storage with concurrency control
  • Aggregate root pattern enforcement
  • Read model projections
  • Snapshot optimization
  • Comprehensive error handling
  • Testing strategies
  • Production-ready optimizations

The code examples show actual implementation patterns you can extend for real-world scenarios while maintaining the integrity of the event sourcing pattern.

Keywords: event sourcing node.js, typescript event sourcing, postgresql event store, node.js event sourcing tutorial, event sourcing architecture, typescript postgresql events, event sourcing implementation guide, node.js cqrs event sourcing, event sourcing patterns typescript, postgresql event sourcing database



Similar Posts
Blog Image
Building Production-Ready GraphQL API with TypeScript, Apollo Server, Prisma, and Redis

Learn to build a scalable GraphQL API with TypeScript, Apollo Server, Prisma, and Redis caching. Complete tutorial with authentication, real-time features & deployment.

Blog Image
Build High-Performance Event Sourcing Systems: Node.js, TypeScript, and EventStore Complete Guide

Learn to build a high-performance event sourcing system with Node.js, TypeScript, and EventStore. Master CQRS patterns, event versioning, and production deployment.

Blog Image
Next.js Prisma Integration Guide: Build Type-Safe Full-Stack Apps with Modern Database ORM

Learn how to integrate Next.js with Prisma ORM for type-safe, full-stack web applications. Build scalable, database-driven apps with seamless data flow.

Blog Image
Build Type-Safe Event-Driven Architecture with TypeScript, NestJS, and Redis Streams

Learn to build type-safe event-driven architecture with TypeScript, NestJS & Redis Streams. Master event handling, consumer groups & production monitoring.

Blog Image
How to Secure Your Express.js API with Joi Validation Like a Pro

Learn how to protect your Node.js API using Joi and Express.js for clean, reliable, and secure data validation.

Blog Image
Complete Guide to Next.js Prisma Integration: Build Type-Safe Full-Stack Applications in 2024

Learn how to integrate Next.js with Prisma ORM for type-safe full-stack applications. Master database operations, schema management, and seamless API development.