Skip to content

Building CRM Systems with Event-Driven Architecture

A practical guide to implementing customer relationship management using event sourcing, CQRS, and event-driven patterns for marketing automation and consent management

Abstract

Traditional CRM systems store customer state directly: a record per customer, mutated in place with each interaction. That model breaks when the product needs real-time personalization, GDPR-compliant audit trails, and multi-channel orchestration at the same time, because the current-state row has no memory of how it got there. Event-driven CRM inverts the model: every interaction is captured as an immutable event, and any view (profile, consent ledger, channel history) is a projection over that event stream. This post covers the architecture of an event-driven CRM on AWS, the projections for personalization and consent, the cross-channel orchestration layer, and the trade-offs (storage cost, eventual-consistency budgets, replay windows) that the pattern introduces.

The Event-Driven CRM Landscape

Most CRM systems start simple: a database with customers, contacts, and interactions. This works until you need to answer questions like "What marketing emails has this customer received?" or "When did they consent to SMS?" or "Why did we send them this notification?"

I've worked with teams migrating from traditional CRM architectures to event-driven systems, and the shift requires rethinking how you model customer data. Instead of updating a customer record when preferences change, you emit a CustomerPreferencesUpdated event. Instead of deleting consent records for GDPR, you emit a ConsentRevoked event.

The fundamental difference: your database becomes a projection of events, not the source of truth.

Why Event-Driven Architecture for CRM?

The CRM domain has specific characteristics that make event-driven architecture particularly valuable:

  1. Audit Requirements: GDPR mandates knowing exactly when consent was granted and for what purpose
  2. Multi-Channel Complexity: Customers interact across email, SMS, push, in-app, and each channel has different rules
  3. Real-Time Personalization: Marketing automation needs to react immediately to customer behavior
  4. Data Privacy: The "right to be forgotten" is easier when you can replay events with redaction
  5. Eventual Consistency: Marketing campaigns can tolerate slight delays if it means better scalability

Here's a realistic scenario: A customer browses your product page, abandons their cart, opts into SMS notifications, then completes purchase via email link. In a traditional CRM, you'd update the customer record multiple times, losing the sequence of events. In an event-driven system, you have the complete story.

System Architecture Overview

Let me show you how the core components fit together:

This architecture separates concerns effectively:

  • Write path: Commands validate business rules and emit events
  • Read path: Projections materialize views optimized for queries
  • Services: React to events and orchestrate workflows
  • Channels: Handle delivery with retry logic and failure tracking

Practical Implementation Guide

Before diving deep into components, let me show you how to get started with a real implementation.

Step-by-Step Getting Started

Step 1: Define Your Core Events

Start simple. Don't try to model everything at once:

typescript
// Start with just customer creation and consentconst coreEvents = [  'CustomerCreated',  'ConsentGranted',  'ConsentRevoked',  'EmailSent'];

Step 2: Set Up Event Store

Use what you have. DynamoDB works well for AWS shops, EventStoreDB for event sourcing purists:

typescript
// Simple DynamoDB event storeconst eventStoreConfig = {  tableName: 'customer-events',  partitionKey: 'customerId',  // PK: CUSTOMER#{id}  sortKey: 'timestamp_eventId',  // SK: EVENT#{timestamp}#{eventId}  ttl: 7 * 365 * 86400  // 7 years retention};

Step 3: Create Command Handlers

Business logic lives here:

typescript
// One handler per aggregateclass CustomerCommandHandler {  async execute(command: Command): Promise<void> {    // 1. Load events    // 2. Rebuild state    // 3. Validate business rules    // 4. Emit new events  }}

Step 4: Build Projections

Start with one read model - the customer view:

typescript
// Single projection for customer queriesclass CustomerProjection {  async handleEvent(event: CustomerEvent): Promise<void> {    switch (event.eventType) {      case 'CustomerCreated':        await this.createCustomer(event);        break;      case 'ConsentGranted':        await this.updateConsent(event);        break;    }  }}

Step 5: Add Campaign Triggers

Start with one simple campaign - welcome email:

typescript
const welcomeCampaign = {  trigger: 'CustomerCreated',  actions: [    { type: 'send-email', template: 'welcome' }  ]};

Step 6: Integrate Channels

Use existing providers. Don't build email infrastructure:

typescript
// Wrap your existing email providerclass EmailChannel {  constructor(private sendGrid: SendGridClient) {}
  async send(customerId: string, template: string): Promise<void> {    // Get customer data from projection    // Send via provider    // Emit EmailSent event  }}

Complete End-to-End Example

Here's a full customer journey from signup to purchase confirmation:

Complete code for this flow:

typescript
// 1. Customer Registrationasync function handleRegistration(request: RegistrationRequest): Promise<string> {  const customerId = crypto.randomUUID();
  // Emit CustomerCreated  await eventStore.appendEvent({    eventId: crypto.randomUUID(),    customerId,    timestamp: new Date().toISOString(),    eventType: 'CustomerCreated',    email: request.email,    firstName: request.firstName,    source: 'web'  });
  // If consented, emit ConsentGranted  if (request.marketingConsent) {    await eventStore.appendEvent({      eventId: crypto.randomUUID(),      customerId,      timestamp: new Date().toISOString(),      eventType: 'ConsentGranted',      purpose: 'marketing',      channel: 'email'    });  }
  return customerId;}
// 2. Projection Updates Customer Recordasync function handleCustomerCreated(event: CustomerCreated): Promise<void> {  await customerDB.putItem({    customerId: event.customerId,    email: event.email,    firstName: event.firstName,    status: 'active',    createdAt: event.timestamp  });}
// 3. Welcome Campaign Triggersasync function handleCustomerCreatedCampaign(event: CustomerCreated): Promise<void> {  // Check consent  const hasConsent = await consentService.hasActiveConsent(    event.customerId,    'marketing',    'email'  );
  if (hasConsent) {    await emailChannel.send({      customerId: event.customerId,      templateId: 'welcome-email',      data: { firstName: event.firstName }    });  }}
// 4. Product Browsing Trackedasync function handleProductView(customerId: string, productId: string): Promise<void> {  await eventStore.appendEvent({    eventId: crypto.randomUUID(),    customerId,    timestamp: new Date().toISOString(),    eventType: 'ProductViewed',    productId,    sessionId: getCurrentSessionId()  });}
// 5. Cart Abandonment Detection (runs periodically)async function detectAbandonedCarts(): Promise<void> {  const abandonedCarts = await findCartsWithNoActivity(60); // 60 minutes
  for (const cart of abandonedCarts) {    await eventStore.appendEvent({      eventId: crypto.randomUUID(),      customerId: cart.customerId,      timestamp: new Date().toISOString(),      eventType: 'CartAbandoned',      cartId: cart.cartId,      items: cart.items,      totalValue: cart.total    });  }}
// 6. Cart Abandonment Campaignasync function handleCartAbandoned(event: CartAbandoned): Promise<void> {  // Wait 1 hour before sending  await scheduleAction({    executeAt: Date.now() + 3600000,    action: async () => {      await emailChannel.send({        customerId: event.customerId,        templateId: 'cart-reminder',        data: {          cartItems: event.items,          cartTotal: event.totalValue        }      });    }  });}
// 7. Order Placementasync function handleOrderPlacement(request: PlaceOrderRequest): Promise<string> {  const orderId = crypto.randomUUID();
  // Emit OrderPlaced  await eventStore.appendEvent({    eventId: crypto.randomUUID(),    customerId: request.customerId,    timestamp: new Date().toISOString(),    eventType: 'OrderPlaced',    orderId,    items: request.items,    total: request.total  });
  // Process payment  const paymentResult = await paymentProvider.charge({    amount: request.total,    customerId: request.customerId  });
  if (paymentResult.success) {    await eventStore.appendEvent({      eventId: crypto.randomUUID(),      customerId: request.customerId,      timestamp: new Date().toISOString(),      eventType: 'PaymentSucceeded',      orderId,      amount: request.total,      transactionId: paymentResult.transactionId    });  }
  return orderId;}
// 8. Order Confirmation Campaignasync function handlePaymentSucceeded(event: PaymentSucceeded): Promise<void> {  // Emit OrderConfirmed  await eventStore.appendEvent({    eventId: crypto.randomUUID(),    customerId: event.customerId,    timestamp: new Date().toISOString(),    eventType: 'OrderConfirmed',    orderId: event.orderId,    confirmationNumber: generateConfirmationNumber()  });
  // Send confirmation email  await emailChannel.send({    customerId: event.customerId,    templateId: 'order-confirmation',    data: {      orderId: event.orderId,      amount: event.amount    }  });}
// 9. Projection Updates - Customer Now a Buyerasync function handleFirstPurchase(event: PaymentSucceeded): Promise<void> {  const purchases = await getPurchaseCount(event.customerId);
  if (purchases === 1) {    // First purchase - update customer segment    await eventStore.appendEvent({      eventId: crypto.randomUUID(),      customerId: event.customerId,      timestamp: new Date().toISOString(),      eventType: 'CustomerSegmentAdded',      segmentId: 'buyers',      segmentName: 'Customers Who Purchased'    });  }}

This example shows every event, every projection update, and every campaign trigger. Start here, then expand to more complex workflows.

Customer Lifecycle Event Flow

Here's the complete picture of customer events over time:

Component Deep Dive

Event Sourcing for Customer Data

The core pattern: instead of storing current state, you store the sequence of events that led to that state. Here's a practical implementation:

typescript
// Event definitions - the source of truthinterface CustomerEvent {  eventId: string;  customerId: string;  timestamp: string;  eventType: string;}
interface CustomerCreated extends CustomerEvent {  eventType: 'CustomerCreated';  email: string;  source: 'web' | 'mobile' | 'api';}
interface ConsentGranted extends CustomerEvent {  eventType: 'ConsentGranted';  purpose: 'marketing' | 'analytics' | 'essential';  channel: 'email' | 'sms' | 'push';  ipAddress: string;  userAgent: string;}
interface ConsentRevoked extends CustomerEvent {  eventType: 'ConsentRevoked';  purpose: 'marketing' | 'analytics' | 'essential';  channel: 'email' | 'sms' | 'push';  reason?: string;}
interface PreferencesUpdated extends CustomerEvent {  eventType: 'PreferencesUpdated';  preferences: {    emailFrequency?: 'daily' | 'weekly' | 'never';    categories?: string[];    timezone?: string;  };}

The event store becomes your single source of truth:

typescript
class EventStore {  constructor(    private dynamoDB: DynamoDBClient,    private eventBus: EventBridge  ) {}
  async appendEvent(event: CustomerEvent): Promise<void> {    // Store event with optimistic locking    await this.dynamoDB.putItem({      TableName: 'customer-events',      Item: {        pk: { S: `CUSTOMER#${event.customerId}` },        sk: { S: `EVENT#${event.timestamp}#${event.eventId}` },        eventType: { S: event.eventType },        payload: { S: JSON.stringify(event) },        version: { N: '1' },        ttl: { N: String(Math.floor(Date.now() / 1000) + 7 * 365 * 86400) }      },      ConditionExpression: 'attribute_not_exists(pk)'    });
    // Publish to event bus for consumers    await this.eventBus.putEvents({      Entries: [{        Source: 'crm.customer',        DetailType: event.eventType,        Detail: JSON.stringify(event),        EventBusName: 'customer-events'      }]    });  }
  async getCustomerEvents(    customerId: string,    fromTimestamp?: string  ): Promise<CustomerEvent[]> {    const params = {      TableName: 'customer-events',      KeyConditionExpression: 'pk = :pk AND sk >= :sk',      ExpressionAttributeValues: {        ':pk': { S: `CUSTOMER#${customerId}` },        ':sk': { S: fromTimestamp ? `EVENT#${fromTimestamp}` : 'EVENT#' }      }    };
    const result = await this.dynamoDB.query(params);    return result.Items?.map(item =>      JSON.parse(item.payload.S!)    ) ?? [];  }}

Key gotcha: Event versioning becomes critical. When your event schema evolves, you need upcasters:

typescript
interface EventUpcaster {  fromVersion: number;  toVersion: number;  upcast(event: any): any;}
// Example: Adding GDPR context to consent eventsconst consentEventUpcaster: EventUpcaster = {  fromVersion: 1,  toVersion: 2,  upcast(event: any) {    if (event.eventType === 'ConsentGranted' && !event.gdprContext) {      return {        ...event,        version: 2,        gdprContext: {          legalBasis: 'consent',          retentionPeriod: '2years',          dataController: 'company-name'        }      };    }    return event;  }};

CQRS: Separating Reads and Writes

CQRS (Command Query Responsibility Segregation) means your write model and read model are completely different. In CRM context, this is powerful because marketing queries need different data structures than consent validation.

Write Model - Optimized for business rules:

typescript
class CustomerCommandHandler {  constructor(    private eventStore: EventStore,    private validator: BusinessRuleValidator  ) {}
  async grantConsent(command: GrantConsentCommand): Promise<void> {    // Load event history to validate    const events = await this.eventStore.getCustomerEvents(command.customerId);    const customer = this.rehydrateCustomer(events);
    // Business rule: Can't grant consent if customer is deleted    if (customer.isDeleted) {      throw new Error('Cannot grant consent for deleted customer');    }
    // Business rule: Can't grant same consent twice without revocation    const existingConsent = customer.consents.find(      c => c.purpose === command.purpose &&           c.channel === command.channel &&           c.status === 'active'    );
    if (existingConsent) {      throw new Error('Consent already exists');    }
    // Emit new event    await this.eventStore.appendEvent({      eventId: crypto.randomUUID(),      customerId: command.customerId,      timestamp: new Date().toISOString(),      eventType: 'ConsentGranted',      purpose: command.purpose,      channel: command.channel,      ipAddress: command.ipAddress,      userAgent: command.userAgent    });  }
  private rehydrateCustomer(events: CustomerEvent[]): Customer {    // Rebuild state from events - this is event sourcing    return events.reduce((customer, event) => {      switch (event.eventType) {        case 'CustomerCreated':          return { ...customer, email: event.email };        case 'ConsentGranted':          return {            ...customer,            consents: [...customer.consents, {              purpose: event.purpose,              channel: event.channel,              grantedAt: event.timestamp,              status: 'active'            }]          };        case 'ConsentRevoked':          return {            ...customer,            consents: customer.consents.map(c =>              c.purpose === event.purpose && c.channel === event.channel                ? { ...c, status: 'revoked', revokedAt: event.timestamp }                : c            )          };        default:          return customer;      }    }, { consents: [] } as Customer);  }}

Read Model - Optimized for queries:

typescript
// Projection builder - runs async from event busclass ConsentProjectionBuilder {  constructor(private readDB: DynamoDBClient) {}
  async handleConsentGranted(event: ConsentGranted): Promise<void> {    // Materialized view optimized for "can we contact this customer?"    await this.readDB.putItem({      TableName: 'customer-consents',      Item: {        pk: { S: `CUSTOMER#${event.customerId}` },        sk: { S: `CONSENT#${event.purpose}#${event.channel}` },        status: { S: 'active' },        grantedAt: { S: event.timestamp },        expiresAt: { S: this.calculateExpiry(event.timestamp) },        ipAddress: { S: event.ipAddress },        // GSI for querying by purpose        gsi1pk: { S: `PURPOSE#${event.purpose}` },        gsi1sk: { S: event.customerId }      }    });  }
  async handleConsentRevoked(event: ConsentRevoked): Promise<void> {    await this.readDB.updateItem({      TableName: 'customer-consents',      Key: {        pk: { S: `CUSTOMER#${event.customerId}` },        sk: { S: `CONSENT#${event.purpose}#${event.channel}` }      },      UpdateExpression: 'SET #status = :revoked, revokedAt = :timestamp',      ExpressionAttributeNames: { '#status': 'status' },      ExpressionAttributeValues: {        ':revoked': { S: 'revoked' },        ':timestamp': { S: event.timestamp }      }    });  }
  private calculateExpiry(grantedAt: string): string {    // GDPR requires re-consent after reasonable period    const granted = new Date(grantedAt);    granted.setFullYear(granted.getFullYear() + 2);    return granted.toISOString();  }}

The trade-off: eventual consistency. When a customer revokes consent, there's a delay before the read model updates. For CRM, this is usually acceptable - if a customer unsubscribes, a few seconds delay before campaigns stop is reasonable.

Complete CRUD Operations

Understanding how basic operations translate to events is fundamental. Let me walk through the complete lifecycle of customer data management.

Customer Creation Flow

When a new customer signs up, you're not just inserting a row - you're starting an event stream:

typescript
interface CustomerRegistrationCommand {  email: string;  firstName: string;  lastName: string;  phone?: string;  source: 'web' | 'mobile' | 'api' | 'import';  marketingConsent: boolean;  termsAccepted: boolean;  ipAddress: string;  userAgent: string;}
class CustomerRegistrationHandler {  constructor(    private eventStore: EventStore,    private validator: EmailValidator  ) {}
  async registerCustomer(    command: CustomerRegistrationCommand  ): Promise<string> {    // Step 1: Validate before creating any events    await this.validateRegistration(command);
    const customerId = crypto.randomUUID();    const timestamp = new Date().toISOString();
    // Step 2: Emit CustomerCreated event    await this.eventStore.appendEvent({      eventId: crypto.randomUUID(),      customerId,      timestamp,      eventType: 'CustomerCreated',      email: command.email,      firstName: command.firstName,      lastName: command.lastName,      phone: command.phone,      source: command.source,      ipAddress: command.ipAddress,      userAgent: command.userAgent    });
    // Step 3: If they consented to marketing, emit ConsentGranted    if (command.marketingConsent) {      await this.eventStore.appendEvent({        eventId: crypto.randomUUID(),        customerId,        timestamp,        eventType: 'ConsentGranted',        purpose: 'marketing',        channel: 'email',        ipAddress: command.ipAddress,        userAgent: command.userAgent,        consentMethod: 'registration-checkbox'      });    }
    // Step 4: Emit EmailVerificationRequested    await this.eventStore.appendEvent({      eventId: crypto.randomUUID(),      customerId,      timestamp,      eventType: 'EmailVerificationRequested',      email: command.email,      verificationToken: crypto.randomUUID()    });
    return customerId;  }
  private async validateRegistration(    command: CustomerRegistrationCommand  ): Promise<void> {    // Email format validation    if (!this.validator.isValid(command.email)) {      throw new Error('Invalid email format');    }
    // Check if customer already exists    const existing = await this.customerQuery.findByEmail(command.email);    if (existing) {      throw new Error('Customer already exists');    }
    // Terms acceptance is required    if (!command.termsAccepted) {      throw new Error('Terms must be accepted');    }  }}

Projection building from these events:

typescript
class CustomerProjectionBuilder {  async handleCustomerCreated(event: CustomerCreated): Promise<void> {    // Create initial customer record in read database    await this.readDB.putItem({      TableName: 'customers',      Item: {        customerId: { S: event.customerId },        email: { S: event.email },        firstName: { S: event.firstName },        lastName: { S: event.lastName },        phone: { S: event.phone || '' },        source: { S: event.source },        status: { S: 'pending-verification' },        createdAt: { S: event.timestamp },        updatedAt: { S: event.timestamp },        // GSI for email lookups        emailLowercase: { S: event.email.toLowerCase() }      }    });  }
  async handleEmailVerificationRequested(    event: EmailVerificationRequested  ): Promise<void> {    // Trigger welcome email with verification link    await this.campaignService.triggerCampaign({      campaignId: 'welcome-verification',      customerId: event.customerId,      data: {        verificationToken: event.verificationToken,        email: event.email      }    });  }}

Key gotcha: Registration flow needs to handle failures gracefully. If consent event fails to write but customer creation succeeds, you have an inconsistent state. Use event batching or saga patterns for atomic multi-event operations.

Customer Update Operations

Updates are where event sourcing shines - you have complete history of what changed and when:

typescript
interface UpdateCustomerEmailCommand {  customerId: string;  newEmail: string;  ipAddress: string;  userAgent: string;}
interface UpdateCustomerProfileCommand {  customerId: string;  updates: {    firstName?: string;    lastName?: string;    phone?: string;    dateOfBirth?: string;    address?: Address;  };}
class CustomerUpdateHandler {  async updateEmail(command: UpdateCustomerEmailCommand): Promise<void> {    // Load current state from events    const events = await this.eventStore.getCustomerEvents(command.customerId);    const customer = this.rehydrateCustomer(events);
    // Business rule: Can't update email for deleted customer    if (customer.status === 'deleted') {      throw new Error('Cannot update deleted customer');    }
    // Business rule: Email must be different    if (customer.email === command.newEmail) {      throw new Error('Email unchanged');    }
    // Emit email change event    await this.eventStore.appendEvent({      eventId: crypto.randomUUID(),      customerId: command.customerId,      timestamp: new Date().toISOString(),      eventType: 'CustomerEmailUpdated',      oldEmail: customer.email,      newEmail: command.newEmail,      ipAddress: command.ipAddress,      userAgent: command.userAgent,      requiresVerification: true    });
    // Trigger verification for new email    await this.eventStore.appendEvent({      eventId: crypto.randomUUID(),      customerId: command.customerId,      timestamp: new Date().toISOString(),      eventType: 'EmailVerificationRequested',      email: command.newEmail,      verificationToken: crypto.randomUUID()    });  }
  async updateProfile(command: UpdateCustomerProfileCommand): Promise<void> {    const events = await this.eventStore.getCustomerEvents(command.customerId);    const customer = this.rehydrateCustomer(events);
    if (customer.status === 'deleted') {      throw new Error('Cannot update deleted customer');    }
    // Emit specific events for each type of update    const timestamp = new Date().toISOString();
    if (command.updates.firstName || command.updates.lastName) {      await this.eventStore.appendEvent({        eventId: crypto.randomUUID(),        customerId: command.customerId,        timestamp,        eventType: 'CustomerNameUpdated',        oldFirstName: customer.firstName,        oldLastName: customer.lastName,        newFirstName: command.updates.firstName || customer.firstName,        newLastName: command.updates.lastName || customer.lastName      });    }
    if (command.updates.phone) {      await this.eventStore.appendEvent({        eventId: crypto.randomUUID(),        customerId: command.customerId,        timestamp,        eventType: 'CustomerPhoneUpdated',        oldPhone: customer.phone,        newPhone: command.updates.phone      });    }
    if (command.updates.address) {      await this.eventStore.appendEvent({        eventId: crypto.randomUUID(),        customerId: command.customerId,        timestamp,        eventType: 'CustomerAddressUpdated',        oldAddress: customer.address,        newAddress: command.updates.address      });    }  }}

Projection updates handle incremental changes:

typescript
class CustomerProjectionBuilder {  async handleCustomerEmailUpdated(    event: CustomerEmailUpdated  ): Promise<void> {    await this.readDB.updateItem({      TableName: 'customers',      Key: { customerId: { S: event.customerId } },      UpdateExpression:        'SET email = :newEmail, emailLowercase = :emailLower, ' +        'emailVerified = :verified, updatedAt = :timestamp',      ExpressionAttributeValues: {        ':newEmail': { S: event.newEmail },        ':emailLower': { S: event.newEmail.toLowerCase() },        ':verified': { BOOL: false },        ':timestamp': { S: event.timestamp }      }    });
    // Audit trail projection for compliance    await this.auditDB.putItem({      TableName: 'customer-audit-trail',      Item: {        customerId: { S: event.customerId },        timestamp: { S: event.timestamp },        eventType: { S: 'EmailUpdated' },        oldValue: { S: event.oldEmail },        newValue: { S: event.newEmail },        ipAddress: { S: event.ipAddress },        userAgent: { S: event.userAgent }      }    });  }
  async handleCustomerAddressUpdated(    event: CustomerAddressUpdated  ): Promise<void> {    await this.readDB.updateItem({      TableName: 'customers',      Key: { customerId: { S: event.customerId } },      UpdateExpression: 'SET address = :address, updatedAt = :timestamp',      ExpressionAttributeValues: {        ':address': { S: JSON.stringify(event.newAddress) },        ':timestamp': { S: event.timestamp }      }    });  }}

Customer Deletion and Deactivation

This is where event sourcing differs significantly from traditional systems:

typescript
interface DeactivateCustomerCommand {  customerId: string;  reason: 'customer-request' | 'fraud' | 'terms-violation' | 'other';  notes?: string;}
interface DeleteCustomerDataCommand {  customerId: string;  reason: 'gdpr-request' | 'data-retention-policy';  deletionType: 'soft' | 'hard' | 'anonymize';}
class CustomerDeletionHandler {  // Soft delete - customer account is deactivated but data retained  async deactivateCustomer(    command: DeactivateCustomerCommand  ): Promise<void> {    const events = await this.eventStore.getCustomerEvents(command.customerId);    const customer = this.rehydrateCustomer(events);
    if (customer.status === 'deleted') {      throw new Error('Customer already deleted');    }
    // Emit deactivation event    await this.eventStore.appendEvent({      eventId: crypto.randomUUID(),      customerId: command.customerId,      timestamp: new Date().toISOString(),      eventType: 'CustomerDeactivated',      reason: command.reason,      notes: command.notes,      previousStatus: customer.status    });
    // Automatically revoke all active marketing consents    const activeConsents = customer.consents.filter(c => c.status === 'active');
    for (const consent of activeConsents) {      await this.eventStore.appendEvent({        eventId: crypto.randomUUID(),        customerId: command.customerId,        timestamp: new Date().toISOString(),        eventType: 'ConsentRevoked',        purpose: consent.purpose,        channel: consent.channel,        reason: 'account-deactivated'      });    }  }
  // GDPR deletion - different from deactivation  async deleteCustomerData(    command: DeleteCustomerDataCommand  ): Promise<void> {    const timestamp = new Date().toISOString();
    // Emit deletion request event    await this.eventStore.appendEvent({      eventId: crypto.randomUUID(),      customerId: command.customerId,      timestamp,      eventType: 'CustomerDataDeletionRequested',      reason: command.reason,      deletionType: command.deletionType    });
    if (command.deletionType === 'anonymize') {      // Anonymize PII in all events      await this.gdprService.anonymizeCustomerEvents(command.customerId);    } else if (command.deletionType === 'hard') {      // Actually delete events (rare, only for specific legal requirements)      await this.gdprService.hardDeleteCustomerEvents(command.customerId);    }
    // Mark as deleted in projections    await this.eventStore.appendEvent({      eventId: crypto.randomUUID(),      customerId: command.customerId,      timestamp,      eventType: 'CustomerDataDeleted',      deletionType: command.deletionType,      completedAt: timestamp    });  }}

Impact on active campaigns:

typescript
class CampaignService {  async handleCustomerDeactivated(    event: CustomerDeactivated  ): Promise<void> {    // Cancel all scheduled campaigns for this customer    const scheduledCampaigns = await this.getScheduledCampaigns(      event.customerId    );
    for (const campaign of scheduledCampaigns) {      await this.cancelCampaign(campaign.id, 'customer-deactivated');    }
    // Remove from all segments    await this.segmentService.removeFromAllSegments(event.customerId);  }
  async handleCustomerDataDeleted(    event: CustomerDataDeleted  ): Promise<void> {    // Purge customer from all systems    await this.purgeFromCampaignQueues(event.customerId);    await this.purgeFromSegments(event.customerId);    await this.purgeFromRecommendations(event.customerId);
    // Record compliance completion    await this.complianceLog.recordDeletion({      customerId: event.customerId,      deletionType: event.deletionType,      completedAt: event.timestamp    });  }}

Key difference: Deactivation is reversible and retains data for analytics. GDPR deletion is permanent and requires careful handling of related data across all systems.

Marketing Automation with Event Triggers

Marketing automation becomes a series of event processors watching for trigger conditions:

typescript
interface CampaignTrigger {  triggerId: string;  campaignId: string;  eventPattern: {    eventType: string;    conditions?: Record<string, any>;  };  actions: CampaignAction[];}
interface CampaignAction {  type: 'send-email' | 'send-sms' | 'add-to-segment' | 'wait';  config: any;}
class CampaignOrchestrator {  constructor(    private triggers: CampaignTrigger[],    private consentService: ConsentService,    private channelOrchestrator: ChannelOrchestrator  ) {}
  async handleEvent(event: CustomerEvent): Promise<void> {    // Find matching triggers    const matchingTriggers = this.triggers.filter(trigger =>      this.eventMatches(event, trigger.eventPattern)    );
    for (const trigger of matchingTriggers) {      await this.executeCampaign(event.customerId, trigger);    }  }
  private async executeCampaign(    customerId: string,    trigger: CampaignTrigger  ): Promise<void> {    // Check consent before any outbound communication    const hasConsent = await this.consentService.hasActiveConsent(      customerId,      'marketing',      'email' // Would derive from action type    );
    if (!hasConsent) {      console.log(`Skipping campaign ${trigger.campaignId} - no consent`);      return;    }
    // Execute actions in sequence    for (const action of trigger.actions) {      await this.executeAction(customerId, action, trigger.campaignId);    }  }
  private async executeAction(    customerId: string,    action: CampaignAction,    campaignId: string  ): Promise<void> {    switch (action.type) {      case 'send-email':        await this.channelOrchestrator.sendEmail({          customerId,          campaignId,          templateId: action.config.templateId,          // Idempotency key to prevent duplicate sends          idempotencyKey: `${campaignId}-${customerId}-${Date.now()}`        });        break;
      case 'wait':        // Implement as scheduled event, not blocking wait        await this.scheduleDelayedAction(          customerId,          campaignId,          action.config.duration        );        break;
      case 'add-to-segment':        await this.eventStore.appendEvent({          eventId: crypto.randomUUID(),          customerId,          timestamp: new Date().toISOString(),          eventType: 'CustomerSegmentAdded',          segmentId: action.config.segmentId,          source: `campaign:${campaignId}`        });        break;    }  }
  private eventMatches(    event: CustomerEvent,    pattern: CampaignTrigger['eventPattern']  ): boolean {    if (event.eventType !== pattern.eventType) return false;
    if (!pattern.conditions) return true;
    // Simple condition matching - production would use JSONPath or similar    return Object.entries(pattern.conditions).every(([key, value]) =>      (event as any)[key] === value    );  }}

Real-world example: Abandoned cart campaign

typescript
// Trigger configurationconst abandonedCartTrigger: CampaignTrigger = {  triggerId: 'abandoned-cart-v2',  campaignId: 'abandoned-cart-email',  eventPattern: {    eventType: 'CartAbandoned',    conditions: {      cartValue: { $gte: 50 } // Only for carts over $50    }  },  actions: [    {      type: 'wait',      config: { duration: '1hour' }    },    {      type: 'send-email',      config: {        templateId: 'abandoned-cart-reminder',        // Dynamic content would be injected        personalization: ['cartItems', 'discountCode']      }    },    {      type: 'wait',      config: { duration: '24hours' }    },    {      type: 'send-email',      config: {        templateId: 'abandoned-cart-final-offer',        personalization: ['cartItems', 'largerDiscountCode']      }    }  ]};

Critical gotcha: Idempotency. Events might be processed multiple times due to retries. Every action needs an idempotency key:

typescript
class EmailChannelHandler {  private sentMessages = new Set<string>();
  async sendEmail(request: SendEmailRequest): Promise<void> {    // Check if already sent using idempotency key    const exists = await this.messageStore.exists(request.idempotencyKey);
    if (exists) {      console.log(`Email already sent: ${request.idempotencyKey}`);      return;    }
    // Send via provider    const result = await this.emailProvider.send({      to: request.recipientEmail,      template: request.templateId,      data: request.personalization    });
    // Record send event    await this.eventStore.appendEvent({      eventId: crypto.randomUUID(),      customerId: request.customerId,      timestamp: new Date().toISOString(),      eventType: 'EmailSent',      campaignId: request.campaignId,      templateId: request.templateId,      messageId: result.messageId,      idempotencyKey: request.idempotencyKey    });  }}

Channel Orchestration and Preference Management

Different customers want different channels at different times. Event-driven architecture makes preference management straightforward:

typescript
class ChannelOrchestrator {  constructor(    private preferenceStore: PreferenceProjection,    private channels: Map<string, ChannelHandler>  ) {}
  async determineChannel(    customerId: string,    messageType: string  ): Promise<string[]> {    // Get customer preferences from read model    const prefs = await this.preferenceStore.getPreferences(customerId);
    // Business logic: Choose channels based on preferences and message type    const availableChannels: string[] = [];
    if (prefs.emailEnabled && this.shouldUseEmail(messageType, prefs)) {      availableChannels.push('email');    }
    if (prefs.smsEnabled && this.shouldUseSMS(messageType, prefs)) {      availableChannels.push('sms');    }
    if (prefs.pushEnabled && this.shouldUsePush(messageType, prefs)) {      availableChannels.push('push');    }
    // Fallback strategy if no preferences set    if (availableChannels.length === 0) {      return this.getDefaultChannels(messageType);    }
    return availableChannels;  }
  private shouldUseEmail(    messageType: string,    prefs: CustomerPreferences  ): boolean {    // Transactional emails always go through    if (messageType === 'transactional') return true;
    // Marketing emails respect frequency preference    if (messageType === 'marketing') {      const lastEmail = prefs.lastEmailSent;      const frequency = prefs.emailFrequency || 'weekly';
      if (!lastEmail) return true;
      const hoursSinceLastEmail =        (Date.now() - new Date(lastEmail).getTime()) / (1000 * 60 * 60);
      switch (frequency) {        case 'daily': return hoursSinceLastEmail >= 24;        case 'weekly': return hoursSinceLastEmail >= 168;        case 'never': return false;        default: return true;      }    }
    return true;  }
  private shouldUseSMS(    messageType: string,    prefs: CustomerPreferences  ): boolean {    // SMS is expensive - use sparingly    // Only for high-value transactional or urgent marketing    return messageType === 'transactional' ||           (messageType === 'urgent-marketing' && prefs.smsForPromotions);  }
  private shouldUsePush(    messageType: string,    prefs: CustomerPreferences  ): boolean {    // Push has low cost but can be ignored    // Good for time-sensitive content    const lastPush = prefs.lastPushSent;
    // Don't spam - at most one push per hour    if (lastPush) {      const hoursSinceLastPush =        (Date.now() - new Date(lastPush).getTime()) / (1000 * 60 * 60);
      if (hoursSinceLastPush < 1) return false;    }
    return prefs.pushCategories?.includes(messageType) ?? false;  }}

Here's the event flow for preference updates:

GDPR Compliance Through Events

The "right to be forgotten" is actually easier with event sourcing:

typescript
class GDPRComplianceService {  constructor(    private eventStore: EventStore,    private projectionRebuilder: ProjectionRebuilder  ) {}
  async handleDataDeletionRequest(customerId: string): Promise<void> {    // Step 1: Emit deletion event    await this.eventStore.appendEvent({      eventId: crypto.randomUUID(),      customerId,      timestamp: new Date().toISOString(),      eventType: 'CustomerDataDeletionRequested',      reason: 'gdpr-right-to-be-forgotten'    });
    // Step 2: Anonymize PII in existing events    // Keep events for analytics but remove identifying data    const events = await this.eventStore.getCustomerEvents(customerId);
    for (const event of events) {      await this.anonymizeEvent(event);    }
    // Step 3: Rebuild projections with anonymized data    await this.projectionRebuilder.rebuild(customerId);
    // Step 4: Emit completion event for audit trail    await this.eventStore.appendEvent({      eventId: crypto.randomUUID(),      customerId,      timestamp: new Date().toISOString(),      eventType: 'CustomerDataDeleted',      eventsAnonymized: events.length    });  }
  private async anonymizeEvent(event: CustomerEvent): Promise<void> {    // Replace PII with anonymized values    const anonymized = {      ...event,      email: this.hashPII(event.email || ''),      ipAddress: this.maskIP(event.ipAddress || ''),      userAgent: '[REDACTED]',      // Keep non-PII for analytics      eventType: event.eventType,      timestamp: event.timestamp    };
    await this.eventStore.replaceEvent(event.eventId, anonymized);  }
  private hashPII(value: string): string {    // One-way hash for anonymization    return crypto.createHash('sha256').update(value).digest('hex');  }
  private maskIP(ip: string): string {    // Keep general location, remove specific identifier    const parts = ip.split('.');    return `${parts[0]}.${parts[1]}.0.0`;  }}

Important consideration: Decide early whether you need true deletion or anonymization. For analytics and business intelligence, anonymized events are valuable. For compliance, document your approach clearly.

Purchase Flow & E-commerce Events

E-commerce integration is where event-driven CRM shows its real power. Every step from browsing to delivery generates events that drive marketing automation.

Order Event Chain

A complete purchase generates a rich event stream:

typescript
// Complete order lifecycle eventsinterface CartCreated extends CustomerEvent {  eventType: 'CartCreated';  cartId: string;  sessionId: string;  source: 'web' | 'mobile' | 'api';}
interface ItemAddedToCart extends CustomerEvent {  eventType: 'ItemAddedToCart';  cartId: string;  productId: string;  productName: string;  quantity: number;  price: number;  currency: string;}
interface CartAbandoned extends CustomerEvent {  eventType: 'CartAbandoned';  cartId: string;  items: CartItem[];  totalValue: number;  currency: string;  abandonedAt: string;  timeInCart: number; // seconds}
interface OrderPlaced extends CustomerEvent {  eventType: 'OrderPlaced';  orderId: string;  cartId: string;  items: OrderItem[];  subtotal: number;  tax: number;  shipping: number;  total: number;  currency: string;  shippingAddress: Address;  billingAddress: Address;}
interface PaymentInitiated extends CustomerEvent {  eventType: 'PaymentInitiated';  orderId: string;  paymentMethod: 'credit-card' | 'paypal' | 'bank-transfer';  amount: number;  currency: string;  paymentProvider: string;}
interface PaymentSucceeded extends CustomerEvent {  eventType: 'PaymentSucceeded';  orderId: string;  paymentId: string;  amount: number;  currency: string;  transactionId: string;}
interface PaymentFailed extends CustomerEvent {  eventType: 'PaymentFailed';  orderId: string;  paymentId: string;  amount: number;  errorCode: string;  errorMessage: string;  retryable: boolean;}
interface OrderConfirmed extends CustomerEvent {  eventType: 'OrderConfirmed';  orderId: string;  confirmationNumber: string;  estimatedDelivery: string;}
interface OrderShipped extends CustomerEvent {  eventType: 'OrderShipped';  orderId: string;  trackingNumber: string;  carrier: string;  shippedAt: string;  estimatedDelivery: string;}
interface OrderDelivered extends CustomerEvent {  eventType: 'OrderDelivered';  orderId: string;  deliveredAt: string;  signedBy?: string;}

Order aggregate reconstruction from events:

typescript
class OrderAggregate {  orderId: string;  customerId: string;  status: OrderStatus;  items: OrderItem[] = [];  totalValue: number = 0;  paymentStatus: PaymentStatus;  shippingStatus: ShippingStatus;  timeline: OrderEvent[] = [];
  static fromEvents(events: CustomerEvent[]): OrderAggregate {    const order = new OrderAggregate();
    for (const event of events) {      order.apply(event);    }
    return order;  }
  private apply(event: CustomerEvent): void {    this.timeline.push(event);
    switch (event.eventType) {      case 'OrderPlaced':        this.orderId = event.orderId;        this.customerId = event.customerId;        this.items = event.items;        this.totalValue = event.total;        this.status = 'pending-payment';        break;
      case 'PaymentSucceeded':        this.paymentStatus = 'paid';        this.status = 'confirmed';        break;
      case 'PaymentFailed':        this.paymentStatus = 'failed';        this.status = 'payment-failed';        break;
      case 'OrderShipped':        this.shippingStatus = 'shipped';        this.status = 'in-transit';        break;
      case 'OrderDelivered':        this.shippingStatus = 'delivered';        this.status = 'completed';        break;    }  }
  // Business logic based on event history  canBeCancelled(): boolean {    return this.status === 'pending-payment' || this.status === 'confirmed';  }
  canBeRefunded(): boolean {    return this.paymentStatus === 'paid' &&           this.shippingStatus !== 'delivered';  }
  getTimeInStatus(status: OrderStatus): number {    const statusEvents = this.timeline.filter(e =>      this.eventResultsInStatus(e, status)    );
    if (statusEvents.length === 0) return 0;
    const startTime = new Date(statusEvents[0].timestamp).getTime();    const endTime = Date.now();    return endTime - startTime;  }}

Payment Event Handling

Payment failures need special attention in event-driven systems:

typescript
class PaymentEventHandler {  async handlePaymentFailed(event: PaymentFailed): Promise<void> {    // Record failure for analytics    await this.analyticsService.trackPaymentFailure({      orderId: event.orderId,      errorCode: event.errorCode,      amount: event.amount    });
    if (event.retryable) {      // Schedule automatic retry for transient failures      await this.schedulePaymentRetry(event.orderId, {        attempt: 1,        maxAttempts: 3,        backoffSeconds: 300 // 5 minutes      });    } else {      // Non-retryable failure - trigger recovery campaign      await this.campaignService.triggerCampaign({        campaignId: 'payment-failed-recovery',        customerId: event.customerId,        data: {          orderId: event.orderId,          errorMessage: this.getFriendlyErrorMessage(event.errorCode),          amount: event.amount,          currency: event.currency        }      });    }
    // Update order projection    await this.orderProjection.updatePaymentStatus(      event.orderId,      'failed',      event.errorCode    );  }
  async handlePaymentSucceeded(event: PaymentSucceeded): Promise<void> {    // Trigger order confirmation workflow    await this.eventStore.appendEvent({      eventId: crypto.randomUUID(),      customerId: event.customerId,      timestamp: new Date().toISOString(),      eventType: 'OrderConfirmed',      orderId: event.orderId,      confirmationNumber: this.generateConfirmationNumber(),      estimatedDelivery: this.calculateDeliveryDate()    });  }}

Refund handling:

typescript
interface RefundInitiated extends CustomerEvent {  eventType: 'RefundInitiated';  orderId: string;  refundId: string;  amount: number;  reason: 'customer-request' | 'quality-issue' | 'delivery-failed' | 'other';  refundType: 'full' | 'partial';  items?: string[]; // For partial refunds}
interface RefundCompleted extends CustomerEvent {  eventType: 'RefundCompleted';  orderId: string;  refundId: string;  amount: number;  completedAt: string;  transactionId: string;}
class RefundHandler {  async initiateRefund(command: InitiateRefundCommand): Promise<void> {    const orderEvents = await this.eventStore.getOrderEvents(command.orderId);    const order = OrderAggregate.fromEvents(orderEvents);
    // Business rules    if (!order.canBeRefunded()) {      throw new Error('Order cannot be refunded');    }
    const refundId = crypto.randomUUID();
    await this.eventStore.appendEvent({      eventId: crypto.randomUUID(),      customerId: order.customerId,      timestamp: new Date().toISOString(),      eventType: 'RefundInitiated',      orderId: command.orderId,      refundId,      amount: command.amount,      reason: command.reason,      refundType: command.refundType    });
    // Process with payment provider    await this.paymentProvider.processRefund({      transactionId: order.paymentTransactionId,      amount: command.amount    });  }}

Post-Purchase Marketing Automation

The order lifecycle drives sophisticated marketing campaigns:

typescript
class PostPurchaseAutomation {  private campaigns: CampaignTrigger[] = [    {      triggerId: 'order-confirmation',      campaignId: 'order-confirmation-email',      eventPattern: {        eventType: 'OrderConfirmed'      },      actions: [        {          type: 'send-email',          config: {            templateId: 'order-confirmation',            personalization: ['orderDetails', 'estimatedDelivery']          }        }      ]    },    {      triggerId: 'shipping-notification',      campaignId: 'shipping-update',      eventPattern: {        eventType: 'OrderShipped'      },      actions: [        {          type: 'send-email',          config: {            templateId: 'order-shipped',            personalization: ['trackingNumber', 'carrier']          }        },        {          type: 'send-push',          config: {            title: 'Your order has shipped!',            body: 'Track your delivery'          }        }      ]    },    {      triggerId: 'delivery-review-request',      campaignId: 'post-delivery-review',      eventPattern: {        eventType: 'OrderDelivered'      },      actions: [        {          type: 'wait',          config: { duration: '3days' }        },        {          type: 'send-email',          config: {            templateId: 'review-request',            personalization: ['products', 'reviewLinks']          }        }      ]    },    {      triggerId: 'replenishment-campaign',      campaignId: 'reorder-reminder',      eventPattern: {        eventType: 'OrderDelivered',        conditions: {          // Only for consumable products          productCategory: 'consumables'        }      },      actions: [        {          type: 'wait',          config: { duration: '30days' }        },        {          type: 'send-email',          config: {            templateId: 'reorder-reminder',            personalization: ['products', 'subscriptionOption']          }        }      ]    }  ];
  async handleOrderEvent(event: CustomerEvent): Promise<void> {    const matchingCampaigns = this.campaigns.filter(campaign =>      this.eventMatches(event, campaign.eventPattern)    );
    for (const campaign of matchingCampaigns) {      await this.executeCampaign(event.customerId, campaign, event);    }  }}

Purchase-Based Customer Segmentation

Events enable sophisticated customer segmentation:

typescript
class CustomerSegmentationEngine {  async handleOrderDelivered(event: OrderDelivered): Promise<void> {    // Calculate customer lifetime value from event history    const purchaseEvents = await this.eventStore.getCustomerPurchases(      event.customerId    );
    const ltv = this.calculateLTV(purchaseEvents);    const orderFrequency = this.calculateFrequency(purchaseEvents);    const avgOrderValue = this.calculateAOV(purchaseEvents);
    // High-value customer identification    if (ltv > 1000 && orderFrequency > 5) {      await this.eventStore.appendEvent({        eventId: crypto.randomUUID(),        customerId: event.customerId,        timestamp: new Date().toISOString(),        eventType: 'CustomerSegmentAdded',        segmentId: 'high-value-customers',        segmentName: 'High Value Customers',        criteria: { ltv, orderFrequency }      });    }
    // Product affinity tracking    const productPreferences = this.analyzeProductAffinity(purchaseEvents);    for (const [category, affinity] of Object.entries(productPreferences)) {      if (affinity > 0.7) {        await this.eventStore.appendEvent({          eventId: crypto.randomUUID(),          customerId: event.customerId,          timestamp: new Date().toISOString(),          eventType: 'ProductAffinityDetected',          category,          affinityScore: affinity,          recommendedProducts: this.getRecommendations(category)        });      }    }  }
  // RFM (Recency, Frequency, Monetary) segmentation  async calculateRFMSegments(customerId: string): Promise<RFMSegment> {    const events = await this.eventStore.getCustomerPurchases(customerId);    const now = Date.now();
    // Recency: Days since last purchase    const lastPurchase = events.filter(e => e.eventType === 'OrderDelivered')      .sort((a, b) => b.timestamp.localeCompare(a.timestamp))[0];
    const daysSinceLastPurchase = lastPurchase      ? (now - new Date(lastPurchase.timestamp).getTime()) / (1000 * 60 * 60 * 24)      : 999;
    // Frequency: Number of purchases    const frequency = events.filter(e => e.eventType === 'OrderDelivered').length;
    // Monetary: Total spend    const monetary = events      .filter(e => e.eventType === 'PaymentSucceeded')      .reduce((sum, e) => sum + e.amount, 0);
    // Score and segment    const rfmScore = {      recency: this.scoreRecency(daysSinceLastPurchase),      frequency: this.scoreFrequency(frequency),      monetary: this.scoreMonetary(monetary)    };
    const segment = this.determineRFMSegment(rfmScore);
    return {      customerId,      recency: daysSinceLastPurchase,      frequency,      monetary,      score: rfmScore,      segment,      calculatedAt: new Date().toISOString()    };  }
  private determineRFMSegment(score: RFMScore): string {    // Champions: High value, frequent, recent    if (score.recency >= 4 && score.frequency >= 4 && score.monetary >= 4) {      return 'champions';    }
    // Loyal Customers: Frequent buyers    if (score.frequency >= 4) {      return 'loyal-customers';    }
    // At Risk: Used to be good, but declining    if (score.recency <= 2 && score.frequency >= 3 && score.monetary >= 3) {      return 'at-risk';    }
    // New Customers: Recent but low frequency    if (score.recency >= 4 && score.frequency <= 2) {      return 'new-customers';    }
    // Need Attention: Below average    return 'needs-attention';  }}

Customer Journey & Funnel Tracking

Tracking customer journeys across touchpoints reveals optimization opportunities and drives personalization.

Journey Event Definitions

Comprehensive journey tracking requires fine-grained events:

typescript
// Awareness stage eventsinterface PageViewed extends CustomerEvent {  eventType: 'PageViewed';  pageUrl: string;  pageTitle: string;  referrer: string;  sessionId: string;  timeOnPage: number;}
interface ProductViewed extends CustomerEvent {  eventType: 'ProductViewed';  productId: string;  productName: string;  productCategory: string;  price: number;  viewSource: 'search' | 'category' | 'recommendation' | 'direct';}
interface SearchPerformed extends CustomerEvent {  eventType: 'SearchPerformed';  query: string;  resultsCount: number;  selectedResult?: string;  sessionId: string;}
// Consideration stage eventsinterface ProductCompared extends CustomerEvent {  eventType: 'ProductCompared';  productIds: string[];  comparisonAttributes: string[];}
interface ReviewRead extends CustomerEvent {  eventType: 'ReviewRead';  productId: string;  reviewId: string;  rating: number;}
interface VideoWatched extends CustomerEvent {  eventType: 'VideoWatched';  videoId: string;  productId?: string;  watchDuration: number;  totalDuration: number;  completionRate: number;}
// Conversion stage eventsinterface CheckoutStarted extends CustomerEvent {  eventType: 'CheckoutStarted';  cartId: string;  cartValue: number;  itemCount: number;}
interface CheckoutStepCompleted extends CustomerEvent {  eventType: 'CheckoutStepCompleted';  cartId: string;  step: 'shipping' | 'payment' | 'review';  stepNumber: number;}
interface CheckoutAbandoned extends CustomerEvent {  eventType: 'CheckoutAbandoned';  cartId: string;  lastCompletedStep: string;  abandonedValue: number;  timeInCheckout: number;}

Building Funnels from Events

Funnel analysis reconstructed from event streams:

typescript
class FunnelAnalyzer {  // Define funnel stages  private readonly purchaseFunnel = [    { stage: 'awareness', events: ['PageViewed', 'ProductViewed'] },    { stage: 'consideration', events: ['ProductCompared', 'ReviewRead'] },    { stage: 'intent', events: ['ItemAddedToCart', 'CartCreated'] },    { stage: 'checkout', events: ['CheckoutStarted', 'CheckoutStepCompleted'] },    { stage: 'purchase', events: ['OrderPlaced', 'PaymentSucceeded'] }  ];
  async analyzeFunnel(    customerId: string,    startDate: string,    endDate: string  ): Promise<FunnelAnalysis> {    const events = await this.eventStore.getCustomerEvents(      customerId,      startDate,      endDate    );
    const funnelProgress: FunnelStage[] = [];    let currentStage = 0;
    for (const event of events) {      const stage = this.getFunnelStage(event.eventType);
      if (stage !== null && stage >= currentStage) {        funnelProgress.push({          stage: this.purchaseFunnel[stage].stage,          event: event.eventType,          timestamp: event.timestamp,          data: event        });        currentStage = Math.max(currentStage, stage + 1);      }    }
    // Identify drop-off points    const dropOffPoint = this.identifyDropOff(funnelProgress);
    // Calculate time spent in each stage    const stageMetrics = this.calculateStageMetrics(funnelProgress);
    return {      customerId,      stages: funnelProgress,      dropOffPoint,      metrics: stageMetrics,      completed: currentStage === this.purchaseFunnel.length,      conversionRate: currentStage / this.purchaseFunnel.length    };  }
  private identifyDropOff(progress: FunnelStage[]): DropOffAnalysis | null {    const lastStage = progress[progress.length - 1];    const expectedNextStage = this.getNextStage(lastStage.stage);
    if (!expectedNextStage) {      return null; // Completed funnel    }
    const timeSinceLastStage =      Date.now() - new Date(lastStage.timestamp).getTime();
    return {      stage: lastStage.stage,      nextExpectedStage: expectedNextStage,      timeSinceLastActivity: timeSinceLastStage,      likelihood: this.calculateDropOffLikelihood(timeSinceLastStage)    };  }
  private calculateStageMetrics(    progress: FunnelStage[]  ): Map<string, StageMetrics> {    const metrics = new Map<string, StageMetrics>();
    for (let i = 0; i < progress.length - 1; i++) {      const current = progress[i];      const next = progress[i + 1];
      const timeInStage =        new Date(next.timestamp).getTime() -        new Date(current.timestamp).getTime();
      metrics.set(current.stage, {        stage: current.stage,        timeSpent: timeInStage,        progressedToNext: true,        events: progress.filter(p => p.stage === current.stage)      });    }
    return metrics;  }}

Multi-Touch Attribution

Understanding which touchpoints drive conversions:

typescript
interface TouchPoint {  timestamp: string;  channel: 'email' | 'sms' | 'push' | 'web' | 'social' | 'paid-ad';  campaign?: string;  eventType: string;  value?: number;}
class AttributionEngine {  async calculateAttribution(    customerId: string,    conversionEvent: OrderPlaced  ): Promise<AttributionModel> {    // Get all touchpoints leading to conversion    const touchpoints = await this.getCustomerTouchpoints(      customerId,      conversionEvent.timestamp    );
    // Apply different attribution models    return {      firstTouch: this.firstTouchAttribution(touchpoints, conversionEvent),      lastTouch: this.lastTouchAttribution(touchpoints, conversionEvent),      linear: this.linearAttribution(touchpoints, conversionEvent),      timeDecay: this.timeDecayAttribution(touchpoints, conversionEvent),      positionBased: this.positionBasedAttribution(touchpoints, conversionEvent)    };  }
  private firstTouchAttribution(    touchpoints: TouchPoint[],    conversion: OrderPlaced  ): Attribution {    const first = touchpoints[0];    return {      model: 'first-touch',      attribution: {        [first.channel]: {          credit: 100,          value: conversion.total,          campaign: first.campaign        }      }    };  }
  private lastTouchAttribution(    touchpoints: TouchPoint[],    conversion: OrderPlaced  ): Attribution {    const last = touchpoints[touchpoints.length - 1];    return {      model: 'last-touch',      attribution: {        [last.channel]: {          credit: 100,          value: conversion.total,          campaign: last.campaign        }      }    };  }
  private linearAttribution(    touchpoints: TouchPoint[],    conversion: OrderPlaced  ): Attribution {    const creditPerTouch = 100 / touchpoints.length;    const valuePerTouch = conversion.total / touchpoints.length;
    const attribution: Record<string, ChannelAttribution> = {};
    for (const touch of touchpoints) {      if (!attribution[touch.channel]) {        attribution[touch.channel] = {          credit: 0,          value: 0,          touchCount: 0        };      }
      attribution[touch.channel].credit += creditPerTouch;      attribution[touch.channel].value += valuePerTouch;      attribution[touch.channel].touchCount += 1;    }
    return {      model: 'linear',      attribution    };  }
  private timeDecayAttribution(    touchpoints: TouchPoint[],    conversion: OrderPlaced  ): Attribution {    const conversionTime = new Date(conversion.timestamp).getTime();    const halfLife = 7 * 24 * 60 * 60 * 1000; // 7 days in milliseconds
    // Calculate weights using exponential decay    const weights = touchpoints.map(touch => {      const touchTime = new Date(touch.timestamp).getTime();      const age = conversionTime - touchTime;      return Math.exp(-age / halfLife);    });
    const totalWeight = weights.reduce((sum, w) => sum + w, 0);
    const attribution: Record<string, ChannelAttribution> = {};
    touchpoints.forEach((touch, i) => {      const credit = (weights[i] / totalWeight) * 100;      const value = (weights[i] / totalWeight) * conversion.total;
      if (!attribution[touch.channel]) {        attribution[touch.channel] = { credit: 0, value: 0, touchCount: 0 };      }
      attribution[touch.channel].credit += credit;      attribution[touch.channel].value += value;      attribution[touch.channel].touchCount += 1;    });
    return {      model: 'time-decay',      attribution    };  }
  private positionBasedAttribution(    touchpoints: TouchPoint[],    conversion: OrderPlaced  ): Attribution {    // 40% to first touch, 40% to last touch, 20% distributed to middle    const attribution: Record<string, ChannelAttribution> = {};
    if (touchpoints.length === 1) {      return this.firstTouchAttribution(touchpoints, conversion);    }
    const first = touchpoints[0];    const last = touchpoints[touchpoints.length - 1];    const middle = touchpoints.slice(1, -1);
    // First touch: 40%    attribution[first.channel] = {      credit: 40,      value: conversion.total * 0.4,      touchCount: 1    };
    // Last touch: 40%    if (!attribution[last.channel]) {      attribution[last.channel] = { credit: 0, value: 0, touchCount: 0 };    }    attribution[last.channel].credit += 40;    attribution[last.channel].value += conversion.total * 0.4;    attribution[last.channel].touchCount += 1;
    // Middle touches: 20% distributed equally    if (middle.length > 0) {      const creditPerMiddle = 20 / middle.length;      const valuePerMiddle = (conversion.total * 0.2) / middle.length;
      for (const touch of middle) {        if (!attribution[touch.channel]) {          attribution[touch.channel] = { credit: 0, value: 0, touchCount: 0 };        }        attribution[touch.channel].credit += creditPerMiddle;        attribution[touch.channel].value += valuePerMiddle;        attribution[touch.channel].touchCount += 1;      }    }
    return {      model: 'position-based',      attribution    };  }}

Real-Time Funnel Progression Campaigns

Trigger campaigns based on funnel position:

typescript
class FunnelProgressionAutomation {  private funnelCampaigns: FunnelCampaign[] = [    {      name: 'Abandoned Browse Recovery',      trigger: {        stage: 'awareness',        inactivityMinutes: 30,        condition: 'viewed-multiple-products-no-cart'      },      actions: [        {          type: 'send-email',          config: {            templateId: 'browse-abandonment',            personalization: ['viewedProducts', 'recommendations']          }        }      ]    },    {      name: 'Checkout Abandonment',      trigger: {        stage: 'checkout',        inactivityMinutes: 60,        condition: 'started-checkout-not-completed'      },      actions: [        {          type: 'wait',          config: { duration: '1hour' }        },        {          type: 'send-email',          config: {            templateId: 'checkout-abandonment',            personalization: ['cartItems', 'checkoutLink', 'incentive']          }        },        {          type: 'wait',          config: { duration: '24hours' }        },        {          type: 'send-sms',          config: {            message: 'Complete your order and get 10% off!'          }        }      ]    },    {      name: 'Post-Purchase Cross-Sell',      trigger: {        stage: 'purchase',        condition: 'order-delivered'      },      actions: [        {          type: 'wait',          config: { duration: '7days' }        },        {          type: 'send-email',          config: {            templateId: 'cross-sell',            personalization: ['purchasedProducts', 'recommendations']          }        }      ]    }  ];
  async monitorFunnelProgress(): Promise<void> {    // Run periodically to check for customers stuck in funnel stages    const stuckCustomers = await this.findStuckCustomers();
    for (const customer of stuckCustomers) {      const analysis = await this.funnelAnalyzer.analyzeFunnel(        customer.customerId,        customer.sessionStart,        new Date().toISOString()      );
      const matchingCampaigns = this.funnelCampaigns.filter(campaign =>        this.shouldTriggerCampaign(campaign, analysis)      );
      for (const campaign of matchingCampaigns) {        await this.executeFunnelCampaign(customer.customerId, campaign, analysis);      }    }  }
  private shouldTriggerCampaign(    campaign: FunnelCampaign,    analysis: FunnelAnalysis  ): boolean {    if (!analysis.dropOffPoint) return false;
    const dropOff = analysis.dropOffPoint;    const inactivityMinutes = dropOff.timeSinceLastActivity / (1000 * 60);
    return (      dropOff.stage === campaign.trigger.stage &&      inactivityMinutes >= campaign.trigger.inactivityMinutes &&      this.checkCondition(campaign.trigger.condition, analysis)    );  }}

This comprehensive journey tracking and funnel analysis enables precise, data-driven marketing decisions. By reconstructing customer paths from events, you can identify exactly where customers struggle and intervene with targeted campaigns.

Integration Patterns

Connecting with Third-Party Marketing Tools

Most marketing teams use specialized tools like SendGrid, Mailchimp, or HubSpot. Here's how to integrate while maintaining event-driven benefits:

typescript
interface MarketingIntegration {  syncCustomer(customerId: string, data: CustomerData): Promise<void>;  syncConsent(customerId: string, consent: ConsentData): Promise<void>;  syncSegment(customerId: string, segments: string[]): Promise<void>;}
class SendGridIntegration implements MarketingIntegration {  constructor(    private apiKey: string,    private eventStore: EventStore  ) {}
  async handleCustomerEvent(event: CustomerEvent): Promise<void> {    switch (event.eventType) {      case 'CustomerCreated':        await this.syncCustomer(event.customerId, {          email: event.email,          created_at: event.timestamp        });        break;
      case 'ConsentGranted':        if (event.purpose === 'marketing' && event.channel === 'email') {          await this.syncConsent(event.customerId, {            status: 'subscribed',            timestamp: event.timestamp          });        }        break;
      case 'ConsentRevoked':        if (event.purpose === 'marketing' && event.channel === 'email') {          await this.syncConsent(event.customerId, {            status: 'unsubscribed',            timestamp: event.timestamp          });        }        break;
      case 'CustomerSegmentAdded':        await this.addToList(event.customerId, event.segmentId);        break;    }
    // Record integration event for debugging    await this.eventStore.appendEvent({      eventId: crypto.randomUUID(),      customerId: event.customerId,      timestamp: new Date().toISOString(),      eventType: 'ThirdPartyIntegrationSynced',      integration: 'sendgrid',      action: event.eventType    });  }
  async syncCustomer(customerId: string, data: CustomerData): Promise<void> {    // SendGrid API call with retry logic    await this.sendGridAPI.post('/marketing/contacts', {      contacts: [{        email: data.email,        custom_fields: {          customer_id: customerId,          created_at: data.created_at        }      }]    });  }
  // Implement other methods...}

Dead Letter Queues for Failed Communications

Not all messages successfully deliver. Event-driven architecture makes failure handling explicit:

typescript
class ChannelHandler {  constructor(    private provider: EmailProvider,    private eventStore: EventStore,    private dlqHandler: DeadLetterQueueHandler  ) {}
  async sendMessage(    customerId: string,    message: OutboundMessage  ): Promise<void> {    try {      const result = await this.provider.send(message);
      // Record success      await this.eventStore.appendEvent({        eventId: crypto.randomUUID(),        customerId,        timestamp: new Date().toISOString(),        eventType: 'MessageSent',        channel: 'email',        messageId: result.messageId,        campaignId: message.campaignId      });
    } catch (error) {      // Check if error is retryable      if (this.isRetryable(error)) {        throw error; // Let event bus retry      }
      // Non-retryable error - send to DLQ      await this.dlqHandler.handleFailedMessage({        customerId,        message,        error: error.message,        timestamp: new Date().toISOString()      });
      // Record failure event      await this.eventStore.appendEvent({        eventId: crypto.randomUUID(),        customerId,        timestamp: new Date().toISOString(),        eventType: 'MessageFailed',        channel: 'email',        campaignId: message.campaignId,        errorType: error.code,        errorMessage: error.message      });    }  }
  private isRetryable(error: any): boolean {    // Provider rate limits, network issues - retry    const retryableCodes = ['RATE_LIMIT', 'TIMEOUT', 'SERVICE_UNAVAILABLE'];    return retryableCodes.includes(error.code);  }}
class DeadLetterQueueHandler {  async handleFailedMessage(failure: FailedMessage): Promise<void> {    // Store in DLQ for manual review    await this.dlqStore.save(failure);
    // Alert on-call if high failure rate    const recentFailures = await this.getRecentFailures('1hour');    if (recentFailures.length > 100) {      await this.alerting.trigger({        severity: 'high',        message: `High email failure rate: ${recentFailures.length} in last hour`,        failures: recentFailures.slice(0, 10)      });    }
    // For specific errors, take automated action    if (failure.error.includes('invalid-email')) {      // Mark email as invalid in customer record      await this.eventStore.appendEvent({        eventId: crypto.randomUUID(),        customerId: failure.customerId,        timestamp: new Date().toISOString(),        eventType: 'CustomerEmailInvalidated',        reason: 'bounced-permanent'      });    }  }}

Scaling Considerations and Trade-offs

Performance: Real-Time vs Batch

I've seen teams struggle with the decision: should projections update in real-time or in batches?

Real-time processing:

  • Pros: Customer sees changes immediately, marketing campaigns react faster
  • Cons: Higher costs, more complex infrastructure, potential thundering herd
  • Best for: Consent updates, transactional notifications

Batch processing:

  • Pros: Better throughput, easier to optimize, cheaper
  • Cons: Eventual consistency delay, stale data in queries
  • Best for: Analytics projections, segment calculations, daily email campaigns

Here's a hybrid approach that worked well:

typescript
class ProjectionOrchestrator {  // Critical projections update immediately  private realTimeProjections = new Set(['consent', 'preferences']);
  // Analytics projections batch every 5 minutes  private batchProjections = new Set(['customer-360', 'segments']);
  async handleEvent(event: CustomerEvent): Promise<void> {    const projectionType = this.classifyProjection(event.eventType);
    if (this.realTimeProjections.has(projectionType)) {      // Process immediately      await this.updateProjection(projectionType, event);    } else {      // Add to batch queue      await this.batchQueue.enqueue(projectionType, event);    }  }
  private classifyProjection(eventType: string): string {    // Map events to projection types    const mapping: Record<string, string> = {      'ConsentGranted': 'consent',      'ConsentRevoked': 'consent',      'PreferencesUpdated': 'preferences',      'CustomerSegmentAdded': 'segments',      'ProductViewed': 'customer-360'    };
    return mapping[eventType] || 'customer-360';  }}

Cost Optimization

Event-driven CRM can get expensive fast if you're not careful. Here's what I've learned:

Event storage costs scale with write volume. Use TTLs aggressively:

typescript
// Keep detailed events for 90 days, then aggregateconst eventRetentionPolicy = {  detailed: 90 * 86400, // 90 days in seconds  aggregated: 7 * 365 * 86400 // 7 years for compliance};
class EventArchiver {  async archiveOldEvents(): Promise<void> {    const cutoffDate = new Date();    cutoffDate.setDate(cutoffDate.getDate() - 90);
    // Aggregate old events into summary records    const customersToArchive = await this.getCustomersWithOldEvents(cutoffDate);
    for (const customerId of customersToArchive) {      const events = await this.eventStore.getCustomerEvents(        customerId,        undefined,        cutoffDate.toISOString()      );
      // Create aggregated summary      const summary = this.aggregateEvents(events);      await this.summaryStore.save(customerId, summary);
      // Delete detailed events (they have TTL, but cleanup ensures it)      await this.eventStore.deleteEvents(        customerId,        cutoffDate.toISOString()      );    }  }
  private aggregateEvents(events: CustomerEvent[]): EventSummary {    return {      totalEvents: events.length,      eventTypes: this.countByType(events),      firstEvent: events[0]?.timestamp,      lastEvent: events[events.length - 1]?.timestamp,      consentHistory: this.summarizeConsents(events),      // Keep legally required data      gdprAuditTrail: this.buildAuditTrail(events)    };  }}

Lambda costs for event processors - batch when possible:

typescript
// Instead of processing each event individually// Process in micro-batchesclass BatchedEventProcessor {  private buffer: CustomerEvent[] = [];  private flushInterval = 5000; // 5 seconds  private maxBatchSize = 100;
  constructor() {    setInterval(() => this.flush(), this.flushInterval);  }
  async addEvent(event: CustomerEvent): Promise<void> {    this.buffer.push(event);
    if (this.buffer.length >= this.maxBatchSize) {      await this.flush();    }  }
  private async flush(): Promise<void> {    if (this.buffer.length === 0) return;
    const batch = this.buffer.splice(0, this.maxBatchSize);
    // Process entire batch in one Lambda invocation    await this.projectionBuilder.processBatch(batch);  }}

Schema Evolution Strategy

Your event schemas will change. Plan for it:

typescript
interface EventSchema {  version: number;  schema: any;  compatibleWith?: number[];}
class SchemaRegistry {  private schemas = new Map<string, EventSchema[]>();
  registerSchema(eventType: string, schema: EventSchema): void {    const existing = this.schemas.get(eventType) || [];    existing.push(schema);    this.schemas.set(eventType, existing);  }
  getLatestSchema(eventType: string): EventSchema | undefined {    const schemas = this.schemas.get(eventType);    return schemas?.[schemas.length - 1];  }
  // Validate event against schema before storing  async validateEvent(event: CustomerEvent): Promise<boolean> {    const schema = this.getLatestSchema(event.eventType);    if (!schema) {      console.warn(`No schema registered for ${event.eventType}`);      return false;    }
    // Use JSON Schema or similar for validation    return this.validator.validate(event, schema.schema);  }}

Lessons Learned and Gotchas

After implementing several event-driven CRM systems, here are the patterns that consistently matter:

1. Idempotency is Non-Negotiable

Every external action (email send, API call, database write) must be idempotent. Events will be replayed, processors will retry, and you'll send duplicate emails if you don't handle this.

The pattern I use: store idempotency keys with every action and check before executing.

If checking consent adds 200ms to every message send, you'll have a bottleneck. Cache consent status aggressively, with TTL of 5-10 minutes. For marketing emails, this delay is acceptable. For transactional emails, you might need shorter TTL or real-time checks.

3. Event Ordering Matters Less Than You Think

Most teams worry about event ordering, but for CRM it's rarely critical. If a customer updates preferences twice in quick succession, the final state is what matters. Use timestamps and version numbers to handle conflicts:

typescript
class ConflictResolution {  mergePreferences(existing: Preferences, incoming: Preferences): Preferences {    // Last-write-wins based on timestamp    return {      emailFrequency:        incoming.updatedAt > existing.emailFrequency.updatedAt          ? incoming.emailFrequency          : existing.emailFrequency,      categories:        incoming.updatedAt > existing.categories.updatedAt          ? incoming.categories          : existing.categories    };  }}

4. Start Simple, Add Complexity When Needed

I've seen teams build complex saga orchestrators for simple "send email after signup" flows. Start with basic event handlers. Add saga patterns only when you have multi-step workflows with compensation logic.

5. Monitoring is Different

Traditional CRM monitoring checks "is the database up?" Event-driven monitoring checks:

  • Event processing lag (how far behind are projections?)
  • Dead letter queue depth (how many failures?)
  • Projection consistency (does aggregate match event replay?)
typescript
class CRMHealthCheck {  async checkHealth(): Promise<HealthStatus> {    const checks = await Promise.all([      this.checkEventProcessingLag(),      this.checkDLQDepth(),      this.checkProjectionConsistency()    ]);
    return {      status: checks.every(c => c.healthy) ? 'healthy' : 'degraded',      checks    };  }
  private async checkEventProcessingLag(): Promise<HealthCheck> {    const latestEvent = await this.eventStore.getLatestEvent();    const latestProjection = await this.projectionStore.getLatestUpdate();
    const lagMs = new Date(latestEvent.timestamp).getTime() -                  new Date(latestProjection.timestamp).getTime();
    return {      name: 'event-processing-lag',      healthy: lagMs < 60000, // Less than 1 minute      value: lagMs,      message: `Projection lag: ${lagMs}ms`    };  }}

Closing Thoughts

Event-driven CRM architecture solves real problems: GDPR compliance, multi-channel orchestration, and real-time personalization. But it introduces new complexity: eventual consistency, event schema evolution, and more moving parts.

The pattern works best when you need:

  • Complete audit trails for compliance
  • Complex, multi-step marketing automation
  • Integration with many external systems
  • Scalability beyond single-database limits

It's overkill when you have:

  • Simple email list management
  • Small customer base (< 100k)
  • Primarily transactional communications

Start with event sourcing for consent and preferences - this gives you GDPR compliance benefits without full commitment. Add CQRS when your read patterns diverge significantly from write patterns. Build marketing automation on events when you need sophisticated workflows.

The architecture enables powerful CRM capabilities, but like any pattern, it's a tool for specific problems. Use it where it adds value, not because it's interesting.

Related Posts