CQRS ve Serverless: DynamoDB Maliyetlerini 70% Azaltıp Performansı Nasıl Artırdım

AWS Lambda, EventBridge ve DynamoDB ile gerçek dünyada CQRS uygulaması. Event sourcing, eventual consistency ve production'daki dağıtık sistemleri debug etme deneyimlerimden öğrenin.

CQRS Nedir ve Neden Önemsemelisin?#

CQRS (Command Query Responsibility Segregation), yazma işlemlerini (commands) okuma işlemlerinden (queries) ayıran mimari bir pattern'dir. Hem okuma hem yazma için aynı modeli kullanmak yerine, her bir tarafı kendi amacı için optimize edersin.

Temel Prensip#

Geleneksel mimarilerde, genellikle hem okuma hem yazma için aynı data modelini kullanırsın:

TypeScript
// Geleneksel yaklaşım - her şey için aynı model
class OrderService {
  async createOrder(orderData) {
    // Aynı tabloya yaz
    return await db.orders.insert(orderData);
  }

  async getOrderHistory(customerId) {
    // Kompleks join'lerle aynı tablodan oku
    return await db.orders.find()
      .join('customers')
      .join('products')
      .where('customerId', customerId);
  }
}

CQRS ile bunu iki optimize edilmiş modele bölersin:

TypeScript
// CQRS yaklaşımı - ayrı optimize edilmiş modeller
class OrderCommandService {
  async createOrder(orderData) {
    // Write-optimized: Basit, hızlı insert'ler
    await writeDb.orders.insert(orderData);
    // Read model güncellemeleri için event publish et
    await eventBus.publish('OrderCreated', orderData);
  }
}

class OrderQueryService {
  async getOrderHistory(customerId) {
    // Read-optimized: Önceden hesaplanmış, denormalize edilmiş data
    return await readDb.customerOrderHistory.find(customerId);
  }
}

CQRS Neden Gerçek Problemleri Çözüyor#

CQRS sadece teorik değil - spesifik, ölçülebilir problemleri ele alıyor:

  1. Performans Uyumsuzluğu: Write'lar validation ve consistency, read'ler hız istiyor
  2. Ölçek Uyumsuzluğu: Çoğu sistemde 10:1 veya 100:1 read-write oranı var
  3. Model Karmaşıklığı: Write'lar için optimize etmek read'leri karmaşık yapıyor ve tersi
  4. Takım Paralelleştirmesi: Farklı takımlar read ve write taraflarında bağımsız çalışabilir

CQRS Ne Zaman Mantıklı#

CQRS'i kullan eğer:

  • Yüksek read-write oranların var (10:1 veya daha fazla)
  • Read'ler ve write'lar için farklı performans gereksinimlerin var
  • Kompleks raporlama veya analitik ihtiyaçların var
  • Read'leri ve write'ları bağımsız scale etmen gerekiyor
  • Birden fazla data temsil ihtiyacın var (API'lar, raporlar, dashboard'lar)

CQRS'ten kaçın eğer:

  • Basit CRUD uygulamalarınız var
  • Düşük trafik uygulamalarınız var
  • Her yerde strong consistency gereksinimi var
  • Karmaşıklığı handle edemeyecek küçük takımınız var
  • Benzer read ve write pattern'leriniz var

Gerçek Dünya Etkisi#

Geçen yıl, e-ticaret platformumuz flash satışlar sırasında DynamoDB throttling hatalarına çarpıyordu. Okuma ve yazma işlemleri aynı throughput kapasitesi için yarışıyordu. Çözümümüz? CQRS implement etmek. Sonuç: 70% maliyet azaltması, 3x daha hızlı okumalar ve Black Friday'de sıfır throttling hatası.

Ama işte ana görü şu: CQRS kullandığın tool'larla ilgili değil, read ve write ihtiyaçlarının temelden farklı olduğunu fark etmekle ilgili.

Beni CQRS'e Götüren Problem#

Neden CQRS'e ihtiyacımız olduğunu gerçek bir örnekle göstereyim. Monolitik Lambda fonksiyonumuz her şeyi handle ediyordu - ürün kataloğu okumaları, sipariş işleme, envanter güncellemeleri. Mart 2024'teki bir flash satış sırasında şu sorunlarla karşılaştık:

  1. DynamoDB throttling: Siparişlerden saniyede 2.000+ yazma işlemi, browsing yapan kullanıcılardan saniyede 10.000+ okuma işlemiyle yarışıyordu
  2. Lambda timeout'ları: Kompleks aggregation query'leri 20+ saniye sürüyordu
  3. Maliyet patlaması: Günde sadece 2 saat ihtiyacımız olan DynamoDB provisioned capacity için ayda $3.200
  4. Data tutarsızlığı: Concurrent update'ler yüzünden envanter sayıları yanlıştı

En kötü kısım? Ürün detay sayfalarımız (trafiğin 80%'i) yavaştı çünkü sipariş işleme için optimize edilmiş aynı data modelini paylaşıyorlardı.

Bu CQRS'in parlladığı klasik senaryo: read ve write workload'larının tamamen farklı karakteristikleri ve gereksinimleri olduğunda.

Architecture Evrimimiz#

CQRS Öncesi (Monolit):

TypeScript
// Her şeyi handle eden tek Lambda - ilk başta basit görünüyordu
export const handler = async (event: APIGatewayEvent) => {
  const { httpMethod, path } = event;

  if (httpMethod === 'GET' && path === '/products') {
    // 3 tabloyu join eden kompleks query
    const products = await dynamoClient.query({
      TableName: 'MainTable',
      IndexName: 'GSI1',
      KeyConditionExpression: 'GSI1PK = :pk',
      ExpressionAttributeValues: { ':pk': 'PRODUCT' }
    }).promise();

    // Sonra her ürün için envanter fetch et (N+1 query problemi)
    for (const product of products.Items) {
      const inventory = await getInventory(product.id);
      product.availableQuantity = inventory.quantity;
    }

    return { statusCode: 200, body: JSON.stringify(products) };
  }

  if (httpMethod === 'POST' && path === '/orders') {
    // Aynı tabloya yaz, throughput için yarış
    await createOrder(JSON.parse(event.body));
  }
};

CQRS Sonrası (Ayrılmış Concern'ler):

Command Side: Write'ları Handle Etme#

TypeScript
// commands/create-order.ts - Sadece sipariş işlemeye odaklanmış
import { DynamoDBClient } from '@aws-sdk/client-dynamodb';
import { DynamoDBDocumentClient, PutCommand } from '@aws-sdk/lib-dynamodb';
import { EventBridgeClient, PutEventsCommand } from '@aws-sdk/client-eventbridge';
import { z } from 'zod';
import { ulid } from 'ulid';

const dynamoClient = DynamoDBDocumentClient.from(new DynamoDBClient({}));
const eventBridge = new EventBridgeClient({});

// Zod ile input validation - production'da çok fazla bug yakaladı
const CreateOrderSchema = z.object({
  customerId: z.string().uuid(),
  items: z.array(z.object({
    productId: z.string(),
    quantity: z.number().positive(),
    price: z.number().positive()
  })).min(1),
  shippingAddress: z.object({
    street: z.string(),
    city: z.string(),
    country: z.string(),
    postalCode: z.string()
  })
});

export const handler = async (event: any) => {
  // Input'u parse et ve validate et
  const input = CreateOrderSchema.parse(JSON.parse(event.body));

  const orderId = ulid(); // Time-sortable ID'ler - debugging için oyun değiştirici
  const timestamp = Date.now();

  // Command store'a yaz (write-optimized tablo)
  const order = {
    PK: `ORDER#${orderId}`,
    SK: `ORDER#${orderId}`,
    id: orderId,
    customerId: input.customerId,
    items: input.items,
    total: input.items.reduce((sum, item) => sum + (item.price * item.quantity), 0),
    status: 'PENDING',
    createdAt: timestamp,
    updatedAt: timestamp,
    version: 1 // Optimistic locking - bizi race condition'lardan kurtardı
  };

  try {
    await dynamoClient.send(new PutCommand({
      TableName: process.env.WRITE_TABLE_NAME!,
      Item: order,
      ConditionExpression: 'attribute_not_exists(PK)' // Duplicate'leri önle
    }));

    // Read model update'leri için event publish et
    await eventBridge.send(new PutEventsCommand({
      Entries: [{
        Source: 'orders.service',
        DetailType: 'OrderCreated',
        Detail: JSON.stringify({
          orderId,
          customerId: input.customerId,
          items: input.items,
          total: order.total,
          timestamp
        }),
        EventBusName: process.env.EVENT_BUS_NAME
      }]
    }));

    return {
      statusCode: 201,
      body: JSON.stringify({ orderId, status: 'CREATED' })
    };

  } catch (error) {
    console.error('Sipariş oluşturma başarısız:', error);
    // Proper error handling ve compensation implement et
    throw error;
  }
};

Query Side: Optimize Edilmiş Okumalar#

TypeScript
// queries/get-product-catalog.ts - Performans için read-optimized
import { DynamoDBClient } from '@aws-sdk/client-dynamodb';
import { DynamoDBDocumentClient, GetCommand, QueryCommand } from '@aws-sdk/lib-dynamodb';

const dynamoClient = DynamoDBDocumentClient.from(new DynamoDBClient({}));

// Hızlı okumalar için pre-computed, denormalized data
export const handler = async (event: any) => {
  const { category, limit = 20, lastKey } = event.queryStringParameters || {};

  // Pre-computed aggregation'larla read-optimized tablodan oku
  const response = await dynamoClient.send(new QueryCommand({
    TableName: process.env.READ_TABLE_NAME!,
    IndexName: 'CategoryIndex',
    KeyConditionExpression: 'category = :category',
    ExpressionAttributeValues: {
      ':category': category || 'ALL'
    },
    Limit: parseInt(limit),
    ExclusiveStartKey: lastKey ? JSON.parse(Buffer.from(lastKey, 'base64').toString()) : undefined,
    // Listing için sadece ihtiyacımız olanı fetch et
    ProjectionExpression: 'id, #n, price, imageUrl, averageRating, reviewCount, inStock',
    ExpressionAttributeNames: {
      '#n': 'name' // 'name' DynamoDB'de reserved word
    }
  }));

  return {
    statusCode: 200,
    headers: {
      'Cache-Control': 'public, max-age=300', // Ürün listelemeleri için 5 dakika cache
    },
    body: JSON.stringify({
      products: response.Items,
      nextKey: response.LastEvaluatedKey
        ? Buffer.from(JSON.stringify(response.LastEvaluatedKey)).toString('base64')
        : null
    })
  };
};

Event Processor: Model'leri Senkronize Tutma#

Sihir burada gerçekleşiyor - ve çoğu CQRS implementasyonunun başarısız olduğu yer:

TypeScript
// processors/sync-read-models.ts - Kritik senkronizasyon katmanı
import { EventBridgeEvent } from 'aws-lambda';
import { DynamoDBClient } from '@aws-sdk/client-dynamodb';
import { DynamoDBDocumentClient, UpdateCommand, BatchWriteCommand } from '@aws-sdk/lib-dynamodb';
import { SQSClient, SendMessageCommand } from '@aws-sdk/client-sqs';

const dynamoClient = DynamoDBDocumentClient.from(new DynamoDBClient({}));
const sqsClient = new SQSClient({});

interface OrderCreatedEvent {
  orderId: string;
  customerId: string;
  items: Array<{ productId: string; quantity: number; price: number }>;
  total: number;
  timestamp: number;
}

export const handler = async (event: EventBridgeEvent<'OrderCreated', OrderCreatedEvent>) => {
  const { detail } = event;

  // Birden fazla read model'i paralel olarak güncelle
  const updatePromises = [];

  // 1. Müşteri sipariş geçmişini güncelle (müşteri query'leri için optimize edilmiş)
  updatePromises.push(
    dynamoClient.send(new UpdateCommand({
      TableName: process.env.READ_TABLE_NAME!,
      Key: {
        PK: `CUSTOMER#${detail.customerId}`,
        SK: `ORDER#${detail.timestamp}#${detail.orderId}`
      },
      UpdateExpression: 'SET orderId = :orderId, total = :total, #items = :items, createdAt = :timestamp',
      ExpressionAttributeNames: {
        '#items': 'items'
      },
      ExpressionAttributeValues: {
        ':orderId': detail.orderId,
        ':total': detail.total,
        ':items': detail.items,
        ':timestamp': detail.timestamp
      }
    }))
  );

  // 2. Ürün istatistiklerini güncelle (popüler ürünler, çok satanlar için)
  for (const item of detail.items) {
    updatePromises.push(
      dynamoClient.send(new UpdateCommand({
        TableName: process.env.READ_TABLE_NAME!,
        Key: {
          PK: `PRODUCT#${item.productId}`,
          SK: 'STATS'
        },
        UpdateExpression: `
          ADD salesCount :quantity, revenue :revenue
          SET lastSoldAt = :timestamp
        `,
        ExpressionAttributeValues: {
          ':quantity': item.quantity,
          ':revenue': item.price * item.quantity,
          ':timestamp': detail.timestamp
        }
      }))
    );
  }

  // 3. Günlük satış aggregation'larını güncelle (dashboard'lar için)
  const dateKey = new Date(detail.timestamp).toISOString().split('T')[0];
  updatePromises.push(
    dynamoClient.send(new UpdateCommand({
      TableName: process.env.READ_TABLE_NAME!,
      Key: {
        PK: `SALES#${dateKey}`,
        SK: 'AGGREGATE'
      },
      UpdateExpression: 'ADD orderCount :one, totalRevenue :total',
      ExpressionAttributeValues: {
        ':one': 1,
        ':total': detail.total
      }
    }))
  );

  try {
    await Promise.all(updatePromises);
  } catch (error) {
    console.error('Read model'leri güncelleme başarısız:', error);

    // Manuel müdahale için DLQ'ya gönder
    await sqsClient.send(new SendMessageCommand({
      QueueUrl: process.env.DLQ_URL!,
      MessageBody: JSON.stringify({
        event: 'OrderCreated',
        detail,
        error: error.message,
        timestamp: Date.now()
      })
    }));

    throw error; // Lambda'nın retry yapmasına izin ver
  }
};

CDK ile Infrastructure as Code#

İşte komple serverless CQRS setup'ı:

TypeScript
// infrastructure/cqrs-stack.ts
import { Stack, StackProps, Duration, RemovalPolicy } from 'aws-cdk-lib';
import { Construct } from 'constructs';
import * as lambda from 'aws-cdk-lib/aws-lambda-nodejs';
import * as dynamodb from 'aws-cdk-lib/aws-dynamodb';
import * as events from 'aws-cdk-lib/aws-events';
import * as targets from 'aws-cdk-lib/aws-events-targets';
import * as apigateway from 'aws-cdk-lib/aws-apigateway';
import * as sqs from 'aws-cdk-lib/aws-sqs';
import { Runtime } from 'aws-cdk-lib/aws-lambda';

export class CQRSServerlessStack extends Stack {
  constructor(scope: Construct, id: string, props?: StackProps) {
    super(scope, id, props);

    // Write model tablosu - write'lar için optimize edilmiş
    const writeTable = new dynamodb.Table(this, 'WriteTable', {
      partitionKey: { name: 'PK', type: dynamodb.AttributeType.STRING },
      sortKey: { name: 'SK', type: dynamodb.AttributeType.STRING },
      billingMode: dynamodb.BillingMode.ON_DEMAND, // Spike'lar sırasında throttling yok
      stream: dynamodb.StreamViewType.NEW_AND_OLD_IMAGES, // Change data capture için
      pointInTimeRecovery: true,
      removalPolicy: RemovalPolicy.RETAIN
    });

    // Read model tablosu - query'ler için optimize edilmiş
    const readTable = new dynamodb.Table(this, 'ReadTable', {
      partitionKey: { name: 'PK', type: dynamodb.AttributeType.STRING },
      sortKey: { name: 'SK', type: dynamodb.AttributeType.STRING },
      billingMode: dynamodb.BillingMode.PAY_PER_REQUEST,
      pointInTimeRecovery: true,
      removalPolicy: RemovalPolicy.RETAIN
    });

    // Farklı query pattern'leri için GSI'lar ekle
    readTable.addGlobalSecondaryIndex({
      indexName: 'CategoryIndex',
      partitionKey: { name: 'category', type: dynamodb.AttributeType.STRING },
      sortKey: { name: 'popularity', type: dynamodb.AttributeType.NUMBER },
      projectionType: dynamodb.ProjectionType.ALL
    });

    readTable.addGlobalSecondaryIndex({
      indexName: 'CustomerIndex',
      partitionKey: { name: 'customerId', type: dynamodb.AttributeType.STRING },
      sortKey: { name: 'createdAt', type: dynamodb.AttributeType.NUMBER },
      projectionType: dynamodb.ProjectionType.ALL
    });

    // CQRS event'leri için event bus
    const eventBus = new events.EventBus(this, 'CQRSEventBus', {
      eventBusName: 'cqrs-events'
    });

    // Başarısız event'ler için dead letter queue
    const dlq = new sqs.Queue(this, 'EventDLQ', {
      queueName: 'cqrs-event-dlq',
      retentionPeriod: Duration.days(14)
    });

    // Command handler'lar
    const createOrderHandler = new lambda.NodejsFunction(this, 'CreateOrderHandler', {
      entry: 'src/commands/create-order.ts',
      runtime: Runtime.NODEJS_20_X,
      memorySize: 1024,
      timeout: Duration.seconds(10),
      environment: {
        WRITE_TABLE_NAME: writeTable.tableName,
        EVENT_BUS_NAME: eventBus.eventBusName,
        AWS_NODEJS_CONNECTION_REUSE_ENABLED: '1'
      },
      bundling: {
        minify: true,
        target: 'node20',
        externalModules: ['@aws-sdk/*']
      }
    });

    writeTable.grantWriteData(createOrderHandler);
    eventBus.grantPutEventsTo(createOrderHandler);

    // Query handler'lar
    const getProductsHandler = new lambda.NodejsFunction(this, 'GetProductsHandler', {
      entry: 'src/queries/get-product-catalog.ts',
      runtime: Runtime.NODEJS_20_X,
      memorySize: 512, // Read-only, daha az memory gerekiyor
      timeout: Duration.seconds(5),
      environment: {
        READ_TABLE_NAME: readTable.tableName,
        AWS_NODEJS_CONNECTION_REUSE_ENABLED: '1'
      }
    });

    readTable.grantReadData(getProductsHandler);

    // Read model'leri senkronize etmek için event processor
    const syncProcessor = new lambda.NodejsFunction(this, 'SyncProcessor', {
      entry: 'src/processors/sync-read-models.ts',
      runtime: Runtime.NODEJS_20_X,
      memorySize: 2048, // Batch update'leri handle ediyor
      timeout: Duration.seconds(30),
      reservedConcurrentExecutions: 10, // Downstream servisleri overwhelm etmeyi önle
      environment: {
        READ_TABLE_NAME: readTable.tableName,
        DLQ_URL: dlq.queueUrl,
        AWS_NODEJS_CONNECTION_REUSE_ENABLED: '1'
      },
      deadLetterQueue: dlq,
      retryAttempts: 2
    });

    readTable.grantWriteData(syncProcessor);
    dlq.grantSendMessages(syncProcessor);

    // Event rule'ları
    new events.Rule(this, 'OrderCreatedRule', {
      eventBus,
      eventPattern: {
        source: ['orders.service'],
        detailType: ['OrderCreated']
      },
      targets: [new targets.LambdaFunction(syncProcessor, {
        retryAttempts: 2,
        maxEventAge: Duration.hours(2)
      })]
    });

    // API Gateway
    const api = new apigateway.RestApi(this, 'CQRSAPI', {
      restApiName: 'cqrs-api',
      defaultCorsPreflightOptions: {
        allowOrigins: apigateway.Cors.ALL_ORIGINS,
        allowMethods: apigateway.Cors.ALL_METHODS
      }
    });

    // Command endpoint'leri
    const orders = api.root.addResource('orders');
    orders.addMethod('POST', new apigateway.LambdaIntegration(createOrderHandler));

    // Query endpoint'leri
    const products = api.root.addResource('products');
    products.addMethod('GET', new apigateway.LambdaIntegration(getProductsHandler));
  }
}

Eventual Consistency'yi Handle Etme (Zor Kısım)#

CQRS, eventual consistency'yi kabul etmek demek. Kullanıcıları kafasını karıştırmadan bunu nasıl handle ediyoruz:

TypeScript
// strategies/consistency-handling.ts
export class ConsistencyStrategy {
  // Strateji 1: Optimistic UI update'leri
  async createOrderWithOptimisticUpdate(orderData: any) {
    // Kullanıcıya hemen başarı göster
    const tempOrderId = `temp_${Date.now()}`;
    updateUI({ orderId: tempOrderId, status: 'processing' });

    try {
      const response = await fetch('/api/orders', {
        method: 'POST',
        body: JSON.stringify(orderData)
      });

      const { orderId } = await response.json();

      // Temp ID'yi gerçek ID ile değiştir
      updateUI({ oldId: tempOrderId, newId: orderId, status: 'confirmed' });

      // Read model update'i için poll et
      await this.waitForReadModelSync(orderId);

    } catch (error) {
      // Optimistic update'i geri al
      removeFromUI(tempOrderId);
      showError('Sipariş başarısız');
    }
  }

  // Strateji 2: Exponential backoff ile polling
  async waitForReadModelSync(orderId: string, maxAttempts = 5) {
    let attempts = 0;
    let delay = 100; // 100ms ile başla

    while (attempts < maxAttempts) {
      const order = await this.checkReadModel(orderId);

      if (order) {
        return order;
      }

      await new Promise(resolve => setTimeout(resolve, delay));
      delay *= 2; // Exponential backoff
      attempts++;
    }

    // Command model query'sine fall back et
    return this.queryCommandModel(orderId);
  }

  // Strateji 3: WebSocket notification'ları
  subscribeToOrderUpdates(customerId: string) {
    const ws = new WebSocket(`wss://api.example.com/orders/${customerId}`);

    ws.onmessage = (event) => {
      const update = JSON.parse(event.data);
      if (update.type === 'READ_MODEL_SYNCED') {
        refreshOrderList();
      }
    };
  }
}

Serverless'ta CQRS Test Etme#

Distributed sistemleri test etmek zor. İşte bizim yaklaşımımız:

TypeScript
// tests/cqrs-integration.test.ts
import { EventBridgeClient, PutEventsCommand } from '@aws-sdk/client-eventbridge';
import { DynamoDBClient } from '@aws-sdk/client-dynamodb';
import { mockClient } from 'aws-sdk-client-mock';

describe('CQRS Event Flow', () => {
  const eventBridgeMock = mockClient(EventBridgeClient);
  const dynamoMock = mockClient(DynamoDBClient);

  beforeEach(() => {
    eventBridgeMock.reset();
    dynamoMock.reset();
  });

  test('Sipariş oluşturma read model update'i tetikler', async () => {
    // Arrange
    const orderId = 'test-order-123';
    eventBridgeMock.on(PutEventsCommand).resolves({
      FailedEntryCount: 0,
      Entries: [{ EventId: 'event-123' }]
    });

    // Act - Sipariş oluştur
    const response = await handler({
      body: JSON.stringify({
        customerId: 'customer-123',
        items: [{ productId: 'prod-1', quantity: 2, price: 99.99 }]
      })
    });

    // Assert - Event publish edildi
    expect(eventBridgeMock.calls()).toHaveLength(1);
    const eventCall = eventBridgeMock.call(0);
    expect(eventCall.args[0].input.Entries[0].DetailType).toBe('OrderCreated');

    // Event processor'ı simüle et
    await syncProcessor({
      detail: JSON.parse(eventCall.args[0].input.Entries[0].Detail)
    });

    // Assert - Read model'ler güncellendi
    const readModelCalls = dynamoMock.calls().filter(
      call => call.args[0].input.TableName === 'ReadTable'
    );
    expect(readModelCalls).toHaveLength(3); // Customer, Product, Daily stats
  });

  test('Başarısız event processing DLQ'ya gönderir', async () => {
    // DynamoDB failure'ı simüle et
    dynamoMock.on(UpdateCommand).rejects(new Error('Throttled'));

    const event = {
      detail: {
        orderId: 'order-123',
        customerId: 'customer-123',
        items: [],
        total: 100,
        timestamp: Date.now()
      }
    };

    await expect(syncProcessor(event)).rejects.toThrow('Throttled');

    // DLQ mesajını verify et
    const sqsCalls = sqsMock.calls();
    expect(sqsCalls).toHaveLength(1);
    expect(JSON.parse(sqsCalls[0].args[0].input.MessageBody))
      .toHaveProperty('error', 'Throttled');
  });
});

CQRS Monitoring ve Debugging#

CQRS'in distributed doğası debugging'i zorlaştırıyor. İşte monitoring setup'ımız:

TypeScript
// monitoring/cqrs-metrics.ts
import { MetricUnit, Metrics } from '@aws-lambda-powertools/metrics';
import { Tracer } from '@aws-lambda-powertools/tracer';
import { Logger } from '@aws-lambda-powertools/logger';

const metrics = new Metrics({ namespace: 'CQRS', serviceName: 'orders' });
const tracer = new Tracer({ serviceName: 'orders' });
const logger = new Logger({ serviceName: 'orders' });

export const instrumentedHandler = tracer.captureLambdaHandler(
  metrics.logMetrics(
    async (event: any) => {
      const segment = tracer.getSegment();

      // Command/query separation'ı track et
      const operationType = event.httpMethod === 'GET' ? 'QUERY' : 'COMMAND';
      metrics.addMetric(`${operationType}_REQUEST`, MetricUnit.Count, 1);

      const startTime = Date.now();

      try {
        // Servisler arası tracing için correlation ID ekle
        const correlationId = event.headers['x-correlation-id'] || ulid();
        segment?.addAnnotation('correlationId', correlationId);
        logger.appendKeys({ correlationId });

        // Read/write model sync lag'i track et
        if (operationType === 'QUERY') {
          const syncLag = await measureSyncLag();
          metrics.addMetric('READ_MODEL_LAG_MS', MetricUnit.Milliseconds, syncLag);

          if (syncLag > 5000) {
            logger.warn('Yüksek read model lag tespit edildi', { syncLag });
          }
        }

        const result = await processRequest(event);

        metrics.addMetric(`${operationType}_SUCCESS`, MetricUnit.Count, 1);
        metrics.addMetric(`${operationType}_DURATION`, MetricUnit.Milliseconds,
          Date.now() - startTime);

        return result;

      } catch (error) {
        metrics.addMetric(`${operationType}_ERROR`, MetricUnit.Count, 1);
        logger.error('Request başarısız', { error, event });
        throw error;
      }
    }
  )
);

// Custom CloudWatch dashboard
export const dashboardConfig = {
  widgets: [
    {
      type: 'metric',
      properties: {
        metrics: [
          ['CQRS', 'COMMAND_REQUEST', { stat: 'Sum' }],
          ['.', 'QUERY_REQUEST', { stat: 'Sum' }],
          ['.', 'READ_MODEL_LAG_MS', { stat: 'Average' }]
        ],
        period: 300,
        stat: 'Average',
        region: 'us-east-1',
        title: 'CQRS Operations'
      }
    }
  ]
};

Maliyet Analizi: 70% Azalma#

Production sistemimizden gerçek maliyet dökümü:

CQRS Öncesi (Mart 2024):#

  • DynamoDB Provisioned: $2.100/ay (peak için provision edilmiş)
  • Lambda Compute: $450/ay (kompleks query'ler, yüksek memory)
  • API Gateway: $180/ay
  • **Toplam:

$2.730/ay**

CQRS Sonrası (Haziran 2024):#

  • DynamoDB On-Demand (Write): $80/ay
  • DynamoDB On-Demand (Read): $320/ay
  • EventBridge: $12/ay
  • Lambda Compute: $180/ay (daha basit, odaklanmış fonksiyonlar)
  • API Gateway: $180/ay
  • Toplam: $972/ay (64% azalma)

Gerçek tasarruf şunlardan geldi:

  1. Peak load'lar için over-provisioning yok
  2. Cache'lenmiş read model'ler database hit'lerini azalttı
  3. Daha az memory kullanan daha basit Lambda fonksiyonları
  4. Purpose-built index'lerle daha iyi query optimizasyonu

Öğrenilen Dersler (Zor Yoldan)#

1. Basit Başla, Gerçekten Basit#

İlk CQRS implementasyonumda 17 farklı read model vardı. Şimdi 3 tane var. Bir read model ile başla ve sadece query pattern'ler gerektirdiğinde daha fazla ekle.

2. Event Versioning Kritik#

Event'lerimizi başlangıçta version'lamadık. OrderCreated'a bir field eklememiz gerektiğinde, her consumer'ı bozduk. Şimdi:

TypeScript
interface OrderCreatedV1 {
  version: 1;
  orderId: string;
  customerId: string;
  total: number;
}

interface OrderCreatedV2 {
  version: 2;
  orderId: string;
  customerId: string;
  total: number;
  currency: string; // Yeni field
}

// Handler her iki version'ı da destekliyor
export const handler = async (event: OrderCreatedV1 | OrderCreatedV2) => {
  const currency = 'version' in event && event.version >= 2
    ? (event as OrderCreatedV2).currency
    : 'USD'; // V1 için default
};

3. Her Yerde Idempotency#

Event'ler birden fazla kez deliver edilebilir. Her handler idempotent olmalı:

TypeScript
// Idempotency sağlamak için conditional write'lar kullan
await dynamoClient.send(new PutCommand({
  TableName: TABLE_NAME,
  Item: processedEvent,
  ConditionExpression: 'attribute_not_exists(eventId)'
}));

4. Sync Lag'i Monitor Et#

Command execution ve read model update arasındaki süre en önemli metriğiniz. 5 saniyeyi aşarsa alert veriyoruz.

5. Reconciliation İçin Plan Yap#

Read model'ler drift edecek. Command ve query model'leri karşılaştıran, tutarsızlıkları düzelten nightly job çalıştırıyoruz:

TypeScript
// Her gün saat 3'te çalışır
export const reconciliationJob = async () => {
  const commandRecords = await scanCommandTable();
  const readRecords = await scanReadTable();

  const discrepancies = findDiscrepancies(commandRecords, readRecords);

  for (const issue of discrepancies) {
    await republishEvent(issue.originalEvent);
    logger.warn('Reconciliation gerekli', { issue });
  }

  metrics.addMetric('RECONCILIATION_FIXES', MetricUnit.Count, discrepancies.length);
};

CQRS'i Ne Zaman KULLANMAYIN#

CQRS sistemimize karmaşıklık ekledi. Bizim için değdi, ama şu durumlarda kaçının:

  1. Read/write pattern'leriniz benzer
  2. Basit CRUD işlemleriniz var
  3. Her yerde strong consistency gerekli
  4. Takımınız eventual consistency ile rahat değil
  5. Performance sorunları yaşamıyorsunuz

Admin panelimizde CQRS denedik (50 kullanıcı, basit CRUD). Felaket oldu - hiç fayda sağlamadan çok fazla karmaşıklık.

Debugging Korku Hikayesi#

CQRS deploy ettikten iki hafta sonra, müşteriler eski sipariş durumlarını gördüklerini bildirdi. Sorun? Event processor'ımız belirli ürün kategorileri için sessizce fail ediyordu. Lambda timeout alıyordu ama DLQ düzgün configure edilmemişti.

Fix bize şunları öğretti:

  1. DLQ'ları her zaman alert'lerle configure et
  2. Event processor'lara circuit breaker ekle
  3. Kritik path'ler için read-after-write consistency implement et
  4. "Command model'e fallback" seçeneği tut

İleriye Doğru#

Serverless ile CQRS, ihtiyacınız olduğunda harika çalışıyor. Lambda'nın auto-scaling'i, EventBridge'in routing'i ve DynamoDB'nin esnek schema'larının kombinasyonu implementasyonu basit hale getiriyor.

Ama unutmayın: CQRS belirli problemlere çözüm - yüksek read/write eşitsizliği, kompleks query gereksinimleri veya scalability sorunları. Bu problemleriniz yoksa, CQRS'e ihtiyacınız yok.

Monolit ile başlayın, bottleneck'lerinizi ölçün ve ancak o zaman CQRS'i düşünün. Implement ettiğinizde, bir read model ile başlayın ve oradan büyüyün.

70% maliyet azaltması güzeldi, ama asıl kazanç Black Friday 2024'tü: sıfır downtime, 15ms p99 latency ve mutlu müşteriler. O zaman doğru seçimi yaptığımızı anladım.

Loading...

Yorumlar (0)

Sohbete katıl

Düşüncelerini paylaşmak ve toplulukla etkileşim kurmak için giriş yap

Henüz yorum yok

Bu yazı hakkında ilk düşüncelerini paylaşan sen ol!

Related Posts