CQRS mit Serverless: Wie ich DynamoDB-Kosten um 70% senkte und die Performance verbesserte
Reale CQRS-Implementierung mit AWS Lambda, EventBridge und DynamoDB. Lerne aus meinen Fehlern bei Event Sourcing, Eventual Consistency und dem Debugging verteilter Systeme in der Produktion.
Was ist CQRS und warum solltest du dich dafür interessieren?#
CQRS (Command Query Responsibility Segregation) ist ein Architektur-Pattern, das deine Schreiboperationen (Commands) von deinen Leseoperationen (Queries) trennt. Anstatt dasselbe Model für sowohl Lesen als auch Schreiben zu verwenden, optimierst du jede Seite für ihren spezifischen Zweck.
Das Grundprinzip#
In traditionellen Architekturen verwendest du typischerweise dasselbe Datenmodell für sowohl Lesen als auch Schreiben:
// Traditioneller Ansatz - dasselbe Model für alles
class OrderService {
async createOrder(orderData) {
// In dieselbe Tabelle schreiben
return await db.orders.insert(orderData);
}
async getOrderHistory(customerId) {
// Aus derselben Tabelle mit komplexen Joins lesen
return await db.orders.find()
.join('customers')
.join('products')
.where('customerId', customerId);
}
}
Mit CQRS teilst du das in zwei optimierte Models auf:
// CQRS-Ansatz - separate optimierte Models
class OrderCommandService {
async createOrder(orderData) {
// Write-optimiert: Einfache, schnelle Inserts
await writeDb.orders.insert(orderData);
// Event für Read Model Updates publishen
await eventBus.publish('OrderCreated', orderData);
}
}
class OrderQueryService {
async getOrderHistory(customerId) {
// Read-optimiert: Vorberechnete, denormalisierte Daten
return await readDb.customerOrderHistory.find(customerId);
}
}
Warum CQRS echte Probleme löst#
CQRS ist nicht nur theoretisch - es adressiert spezifische, messbare Probleme:
- Performance-Mismatch: Writes brauchen Validierung und Konsistenz, Reads brauchen Geschwindigkeit
- Scale-Mismatch: Die meisten Systeme haben 10:1 oder 100:1 Read-zu-Write-Verhältnisse
- Model-Komplexität: Für Writes zu optimieren macht Reads komplex und umgekehrt
- Team-Parallelisierung: Verschiedene Teams können unabhängig an Read- und Write-Seite arbeiten
Wann CQRS Sinn macht#
✅ Verwende CQRS wenn du hast:
- Hohe Read-zu-Write-Verhältnisse (10:1 oder höher)
- Unterschiedliche Performance-Anforderungen für Reads vs Writes
- Komplexe Reporting- oder Analytics-Anforderungen
- Reads und Writes unabhängig skalieren musst
- Mehrere Datenrepräsentations-Bedürfnisse (APIs, Reports, Dashboards)
❌ Vermeide CQRS wenn du hast:
- Einfache CRUD-Anwendungen
- Low-Traffic-Anwendungen
- Strong Consistency Anforderungen überall
- Kleines Team, das die Komplexität nicht handhaben kann
- Ähnliche Read- und Write-Patterns
Die Real-World-Auswirkung#
Letztes Jahr hatte unsere E-Commerce-Platform während Flash Sales DynamoDB-Throttling-Fehler. Lese- und Schreiboperationen konkurrierten um dieselbe Throughput-Kapazität. Unsere Lösung? CQRS implementieren. Das Ergebnis: 70% Kostenreduktion, 3x schnellere Reads und null Throttling-Fehler am Black Friday.
Aber hier ist die wichtige Erkenntnis: CQRS geht nicht um die Tools, die du verwendest, sondern darum zu erkennen, wann deine Read- und Write-Bedürfnisse grundsätzlich verschieden sind.
Das Problem, das mich zu CQRS führte#
Lass mich dir genau zeigen, warum wir CQRS brauchten mit einem echten Beispiel. Unsere monolithische Lambda-Funktion hat alles gehandelt - Produktkatalog-Reads, Bestellverarbeitung, Inventar-Updates. Während eines Flash Sales im März 2024 hatten wir diese Probleme:
- DynamoDB-Throttling: 2.000+ Schreiboperationen/Sekunde von Bestellungen konkurrierten mit 10.000+ Leseoperationen/Sekunde von browsenden Usern
- Lambda-Timeouts: Komplexe Aggregations-Queries brauchten 20+ Sekunden
- Kostenexplosion: $3.200/Monat für DynamoDB-Provisioned-Capacity, die wir nur 2 Stunden pro Tag brauchten
- Dateninkonsistenz: Lagerbestände waren wegen Concurrent Updates falsch
Das Schlimmste? Unsere Produktdetailseiten (80% des Traffics) waren langsam, weil sie dasselbe für Bestellverarbeitung optimierte Datenmodell teilten.
Das ist das klassische Szenario, wo CQRS glänzt: wenn deine Read- und Write-Workloads völlig verschiedene Charakteristiken und Anforderungen haben.
Unsere Architektur-Evolution#
Vor CQRS (Der Monolith):
// Single Lambda handling everything - schien anfangs einfach
export const handler = async (event: APIGatewayEvent) => {
const { httpMethod, path } = event;
if (httpMethod === 'GET' && path === '/products') {
// Komplexe Query, die 3 Tabellen joined
const products = await dynamoClient.query({
TableName: 'MainTable',
IndexName: 'GSI1',
KeyConditionExpression: 'GSI1PK = :pk',
ExpressionAttributeValues: { ':pk': 'PRODUCT' }
}).promise();
// Dann Inventar für jedes Produkt fetchen (N+1 Query-Problem)
for (const product of products.Items) {
const inventory = await getInventory(product.id);
product.availableQuantity = inventory.quantity;
}
return { statusCode: 200, body: JSON.stringify(products) };
}
if (httpMethod === 'POST' && path === '/orders') {
// In dieselbe Tabelle schreiben, um Throughput konkurrieren
await createOrder(JSON.parse(event.body));
}
};
Nach CQRS (Getrennte Concerns):
Die Command-Seite: Writes handhaben#
// commands/create-order.ts - Fokussiert nur auf Bestellverarbeitung
import { DynamoDBClient } from '@aws-sdk/client-dynamodb';
import { DynamoDBDocumentClient, PutCommand } from '@aws-sdk/lib-dynamodb';
import { EventBridgeClient, PutEventsCommand } from '@aws-sdk/client-eventbridge';
import { z } from 'zod';
import { ulid } from 'ulid';
const dynamoClient = DynamoDBDocumentClient.from(new DynamoDBClient({}));
const eventBridge = new EventBridgeClient({});
// Input-Validierung mit Zod - hat so viele Bugs in Production gefangen
const CreateOrderSchema = z.object({
customerId: z.string().uuid(),
items: z.array(z.object({
productId: z.string(),
quantity: z.number().positive(),
price: z.number().positive()
})).min(1),
shippingAddress: z.object({
street: z.string(),
city: z.string(),
country: z.string(),
postalCode: z.string()
})
});
export const handler = async (event: any) => {
// Input parsen und validieren
const input = CreateOrderSchema.parse(JSON.parse(event.body));
const orderId = ulid(); // Zeit-sortierbare IDs - Game Changer fürs Debugging
const timestamp = Date.now();
// In Command Store schreiben (write-optimierte Tabelle)
const order = {
PK: `ORDER#${orderId}`,
SK: `ORDER#${orderId}`,
id: orderId,
customerId: input.customerId,
items: input.items,
total: input.items.reduce((sum, item) => sum + (item.price * item.quantity), 0),
status: 'PENDING',
createdAt: timestamp,
updatedAt: timestamp,
version: 1 // Optimistic Locking - hat uns vor Race Conditions gerettet
};
try {
await dynamoClient.send(new PutCommand({
TableName: process.env.WRITE_TABLE_NAME!,
Item: order,
ConditionExpression: 'attribute_not_exists(PK)' // Duplikate verhindern
}));
// Event für Read Model Updates publishen
await eventBridge.send(new PutEventsCommand({
Entries: [{
Source: 'orders.service',
DetailType: 'OrderCreated',
Detail: JSON.stringify({
orderId,
customerId: input.customerId,
items: input.items,
total: order.total,
timestamp
}),
EventBusName: process.env.EVENT_BUS_NAME
}]
}));
return {
statusCode: 201,
body: JSON.stringify({ orderId, status: 'CREATED' })
};
} catch (error) {
console.error('Bestellerstellung fehlgeschlagen:', error);
// Proper Error Handling und Compensation implementieren
throw error;
}
};
Die Query-Seite: Optimierte Reads#
// queries/get-product-catalog.ts - Read-optimiert für Performance
import { DynamoDBClient } from '@aws-sdk/client-dynamodb';
import { DynamoDBDocumentClient, GetCommand, QueryCommand } from '@aws-sdk/lib-dynamodb';
const dynamoClient = DynamoDBDocumentClient.from(new DynamoDBClient({}));
// Vorberechnete, denormalisierte Daten für schnelle Reads
export const handler = async (event: any) => {
const { category, limit = 20, lastKey } = event.queryStringParameters || {};
// Von read-optimierter Tabelle mit vorberechneten Aggregationen lesen
const response = await dynamoClient.send(new QueryCommand({
TableName: process.env.READ_TABLE_NAME!,
IndexName: 'CategoryIndex',
KeyConditionExpression: 'category = :category',
ExpressionAttributeValues: {
':category': category || 'ALL'
},
Limit: parseInt(limit),
ExclusiveStartKey: lastKey ? JSON.parse(Buffer.from(lastKey, 'base64').toString()) : undefined,
// Nur fetchen, was wir fürs Listing brauchen
ProjectionExpression: 'id, #n, price, imageUrl, averageRating, reviewCount, inStock',
ExpressionAttributeNames: {
'#n': 'name' // 'name' ist ein reserviertes Wort in DynamoDB
}
}));
return {
statusCode: 200,
headers: {
'Cache-Control': 'public, max-age=300', // 5-Minuten-Cache für Produktlistings
},
body: JSON.stringify({
products: response.Items,
nextKey: response.LastEvaluatedKey
? Buffer.from(JSON.stringify(response.LastEvaluatedKey)).toString('base64')
: null
})
};
};
Der Event Processor: Models synchron halten#
Hier passiert die Magie - und wo die meisten CQRS-Implementierungen scheitern:
// processors/sync-read-models.ts - Die kritische Synchronisationsebene
import { EventBridgeEvent } from 'aws-lambda';
import { DynamoDBClient } from '@aws-sdk/client-dynamodb';
import { DynamoDBDocumentClient, UpdateCommand, BatchWriteCommand } from '@aws-sdk/lib-dynamodb';
import { SQSClient, SendMessageCommand } from '@aws-sdk/client-sqs';
const dynamoClient = DynamoDBDocumentClient.from(new DynamoDBClient({}));
const sqsClient = new SQSClient({});
interface OrderCreatedEvent {
orderId: string;
customerId: string;
items: Array<{ productId: string; quantity: number; price: number }>;
total: number;
timestamp: number;
}
export const handler = async (event: EventBridgeEvent<'OrderCreated', OrderCreatedEvent>) => {
const { detail } = event;
// Mehrere Read Models parallel updaten
const updatePromises = [];
// 1. Kundenbestellhistorie updaten (optimiert für Kunden-Queries)
updatePromises.push(
dynamoClient.send(new UpdateCommand({
TableName: process.env.READ_TABLE_NAME!,
Key: {
PK: `CUSTOMER#${detail.customerId}`,
SK: `ORDER#${detail.timestamp}#${detail.orderId}`
},
UpdateExpression: 'SET orderId = :orderId, total = :total, #items = :items, createdAt = :timestamp',
ExpressionAttributeNames: {
'#items': 'items'
},
ExpressionAttributeValues: {
':orderId': detail.orderId,
':total': detail.total,
':items': detail.items,
':timestamp': detail.timestamp
}
}))
);
// 2. Produktstatistiken updaten (für beliebte Produkte, Bestseller etc.)
for (const item of detail.items) {
updatePromises.push(
dynamoClient.send(new UpdateCommand({
TableName: process.env.READ_TABLE_NAME!,
Key: {
PK: `PRODUCT#${item.productId}`,
SK: 'STATS'
},
UpdateExpression: `
ADD salesCount :quantity, revenue :revenue
SET lastSoldAt = :timestamp
`,
ExpressionAttributeValues: {
':quantity': item.quantity,
':revenue': item.price * item.quantity,
':timestamp': detail.timestamp
}
}))
);
}
// 3. Tägliche Sales-Aggregationen updaten (für Dashboards)
const dateKey = new Date(detail.timestamp).toISOString().split('T')[0];
updatePromises.push(
dynamoClient.send(new UpdateCommand({
TableName: process.env.READ_TABLE_NAME!,
Key: {
PK: `SALES#${dateKey}`,
SK: 'AGGREGATE'
},
UpdateExpression: 'ADD orderCount :one, totalRevenue :total',
ExpressionAttributeValues: {
':one': 1,
':total': detail.total
}
}))
);
try {
await Promise.all(updatePromises);
} catch (error) {
console.error('Read Models updaten fehlgeschlagen:', error);
// An DLQ für manuelle Intervention senden
await sqsClient.send(new SendMessageCommand({
QueueUrl: process.env.DLQ_URL!,
MessageBody: JSON.stringify({
event: 'OrderCreated',
detail,
error: error.message,
timestamp: Date.now()
})
}));
throw error; // Lambda retry lassen
}
};
Infrastructure as Code mit CDK#
Hier ist das komplette Serverless CQRS Setup:
// infrastructure/cqrs-stack.ts
import { Stack, StackProps, Duration, RemovalPolicy } from 'aws-cdk-lib';
import { Construct } from 'constructs';
import * as lambda from 'aws-cdk-lib/aws-lambda-nodejs';
import * as dynamodb from 'aws-cdk-lib/aws-dynamodb';
import * as events from 'aws-cdk-lib/aws-events';
import * as targets from 'aws-cdk-lib/aws-events-targets';
import * as apigateway from 'aws-cdk-lib/aws-apigateway';
import * as sqs from 'aws-cdk-lib/aws-sqs';
import { Runtime } from 'aws-cdk-lib/aws-lambda';
export class CQRSServerlessStack extends Stack {
constructor(scope: Construct, id: string, props?: StackProps) {
super(scope, id, props);
// Write Model Tabelle - optimiert für Writes
const writeTable = new dynamodb.Table(this, 'WriteTable', {
partitionKey: { name: 'PK', type: dynamodb.AttributeType.STRING },
sortKey: { name: 'SK', type: dynamodb.AttributeType.STRING },
billingMode: dynamodb.BillingMode.ON_DEMAND, // Kein Throttling während Spikes
stream: dynamodb.StreamViewType.NEW_AND_OLD_IMAGES, // Für Change Data Capture
pointInTimeRecovery: true,
removalPolicy: RemovalPolicy.RETAIN
});
// Read Model Tabelle - optimiert für Queries
const readTable = new dynamodb.Table(this, 'ReadTable', {
partitionKey: { name: 'PK', type: dynamodb.AttributeType.STRING },
sortKey: { name: 'SK', type: dynamodb.AttributeType.STRING },
billingMode: dynamodb.BillingMode.PAY_PER_REQUEST,
pointInTimeRecovery: true,
removalPolicy: RemovalPolicy.RETAIN
});
// GSIs für verschiedene Query Patterns hinzufügen
readTable.addGlobalSecondaryIndex({
indexName: 'CategoryIndex',
partitionKey: { name: 'category', type: dynamodb.AttributeType.STRING },
sortKey: { name: 'popularity', type: dynamodb.AttributeType.NUMBER },
projectionType: dynamodb.ProjectionType.ALL
});
readTable.addGlobalSecondaryIndex({
indexName: 'CustomerIndex',
partitionKey: { name: 'customerId', type: dynamodb.AttributeType.STRING },
sortKey: { name: 'createdAt', type: dynamodb.AttributeType.NUMBER },
projectionType: dynamodb.ProjectionType.ALL
});
// Event Bus für CQRS Events
const eventBus = new events.EventBus(this, 'CQRSEventBus', {
eventBusName: 'cqrs-events'
});
// Dead Letter Queue für fehlgeschlagene Events
const dlq = new sqs.Queue(this, 'EventDLQ', {
queueName: 'cqrs-event-dlq',
retentionPeriod: Duration.days(14)
});
// Command Handlers
const createOrderHandler = new lambda.NodejsFunction(this, 'CreateOrderHandler', {
entry: 'src/commands/create-order.ts',
runtime: Runtime.NODEJS_20_X,
memorySize: 1024,
timeout: Duration.seconds(10),
environment: {
WRITE_TABLE_NAME: writeTable.tableName,
EVENT_BUS_NAME: eventBus.eventBusName,
AWS_NODEJS_CONNECTION_REUSE_ENABLED: '1'
},
bundling: {
minify: true,
target: 'node20',
externalModules: ['@aws-sdk/*']
}
});
writeTable.grantWriteData(createOrderHandler);
eventBus.grantPutEventsTo(createOrderHandler);
// Query Handlers
const getProductsHandler = new lambda.NodejsFunction(this, 'GetProductsHandler', {
entry: 'src/queries/get-product-catalog.ts',
runtime: Runtime.NODEJS_20_X,
memorySize: 512, // Read-only, braucht weniger Memory
timeout: Duration.seconds(5),
environment: {
READ_TABLE_NAME: readTable.tableName,
AWS_NODEJS_CONNECTION_REUSE_ENABLED: '1'
}
});
readTable.grantReadData(getProductsHandler);
// Event Processor zum Synchronisieren der Read Models
const syncProcessor = new lambda.NodejsFunction(this, 'SyncProcessor', {
entry: 'src/processors/sync-read-models.ts',
runtime: Runtime.NODEJS_20_X,
memorySize: 2048, // Handhabt Batch Updates
timeout: Duration.seconds(30),
reservedConcurrentExecutions: 10, // Downstream Services nicht überlasten
environment: {
READ_TABLE_NAME: readTable.tableName,
DLQ_URL: dlq.queueUrl,
AWS_NODEJS_CONNECTION_REUSE_ENABLED: '1'
},
deadLetterQueue: dlq,
retryAttempts: 2
});
readTable.grantWriteData(syncProcessor);
dlq.grantSendMessages(syncProcessor);
// Event Rules
new events.Rule(this, 'OrderCreatedRule', {
eventBus,
eventPattern: {
source: ['orders.service'],
detailType: ['OrderCreated']
},
targets: [new targets.LambdaFunction(syncProcessor, {
retryAttempts: 2,
maxEventAge: Duration.hours(2)
})]
});
// API Gateway
const api = new apigateway.RestApi(this, 'CQRSAPI', {
restApiName: 'cqrs-api',
defaultCorsPreflightOptions: {
allowOrigins: apigateway.Cors.ALL_ORIGINS,
allowMethods: apigateway.Cors.ALL_METHODS
}
});
// Command Endpoints
const orders = api.root.addResource('orders');
orders.addMethod('POST', new apigateway.LambdaIntegration(createOrderHandler));
// Query Endpoints
const products = api.root.addResource('products');
products.addMethod('GET', new apigateway.LambdaIntegration(getProductsHandler));
}
}
Eventual Consistency handhaben (Der schwierige Teil)#
CQRS bedeutet, Eventual Consistency zu akzeptieren. So handhaben wir es, ohne User zu verwirren:
// strategies/consistency-handling.ts
export class ConsistencyStrategy {
// Strategie 1: Optimistische UI Updates
async createOrderWithOptimisticUpdate(orderData: any) {
// Zeige dem User sofort Erfolg
const tempOrderId = `temp_${Date.now()}`;
updateUI({ orderId: tempOrderId, status: 'processing' });
try {
const response = await fetch('/api/orders', {
method: 'POST',
body: JSON.stringify(orderData)
});
const { orderId } = await response.json();
// Ersetze Temp-ID mit echter ID
updateUI({ oldId: tempOrderId, newId: orderId, status: 'confirmed' });
// Warte auf Read Model Sync
await this.waitForReadModelSync(orderId);
} catch (error) {
// Optimistisches Update zurückrollen
removeFromUI(tempOrderId);
showError('Bestellung fehlgeschlagen');
}
}
// Strategie 2: Polling mit Exponential Backoff
async waitForReadModelSync(orderId: string, maxAttempts = 5) {
let attempts = 0;
let delay = 100; // Start mit 100ms
while (attempts < maxAttempts) {
const order = await this.checkReadModel(orderId);
if (order) {
return order;
}
await new Promise(resolve => setTimeout(resolve, delay));
delay *= 2; // Exponential Backoff
attempts++;
}
// Fall back auf Command Model Query
return this.queryCommandModel(orderId);
}
// Strategie 3: WebSocket Benachrichtigungen
subscribeToOrderUpdates(customerId: string) {
const ws = new WebSocket(`wss://api.example.com/orders/${customerId}`);
ws.onmessage = (event) => {
const update = JSON.parse(event.data);
if (update.type === 'READ_MODEL_SYNCED') {
refreshOrderList();
}
};
}
}
CQRS in Serverless testen#
Verteilte Systeme zu testen ist schwer. Hier ist unser Ansatz:
// tests/cqrs-integration.test.ts
import { EventBridgeClient, PutEventsCommand } from '@aws-sdk/client-eventbridge';
import { DynamoDBClient } from '@aws-sdk/client-dynamodb';
import { mockClient } from 'aws-sdk-client-mock';
describe('CQRS Event Flow', () => {
const eventBridgeMock = mockClient(EventBridgeClient);
const dynamoMock = mockClient(DynamoDBClient);
beforeEach(() => {
eventBridgeMock.reset();
dynamoMock.reset();
});
test('Bestellerstellung triggert Read Model Update', async () => {
// Arrange
const orderId = 'test-order-123';
eventBridgeMock.on(PutEventsCommand).resolves({
FailedEntryCount: 0,
Entries: [{ EventId: 'event-123' }]
});
// Act - Bestellung erstellen
const response = await handler({
body: JSON.stringify({
customerId: 'customer-123',
items: [{ productId: 'prod-1', quantity: 2, price: 99.99 }]
})
});
// Assert - Event wurde gepublisht
expect(eventBridgeMock.calls()).toHaveLength(1);
const eventCall = eventBridgeMock.call(0);
expect(eventCall.args[0].input.Entries[0].DetailType).toBe('OrderCreated');
// Event Processor simulieren
await syncProcessor({
detail: JSON.parse(eventCall.args[0].input.Entries[0].Detail)
});
// Assert - Read Models wurden geupdatet
const readModelCalls = dynamoMock.calls().filter(
call => call.args[0].input.TableName === 'ReadTable'
);
expect(readModelCalls).toHaveLength(3); // Customer, Product, Daily stats
});
test('Fehlgeschlagene Event-Verarbeitung sendet an DLQ', async () => {
// DynamoDB-Fehler simulieren
dynamoMock.on(UpdateCommand).rejects(new Error('Throttled'));
const event = {
detail: {
orderId: 'order-123',
customerId: 'customer-123',
items: [],
total: 100,
timestamp: Date.now()
}
};
await expect(syncProcessor(event)).rejects.toThrow('Throttled');
// DLQ-Nachricht verifizieren
const sqsCalls = sqsMock.calls();
expect(sqsCalls).toHaveLength(1);
expect(JSON.parse(sqsCalls[0].args[0].input.MessageBody))
.toHaveProperty('error', 'Throttled');
});
});
CQRS Monitoring und Debugging#
Die verteilte Natur von CQRS macht Debugging herausfordernd. Hier ist unser Monitoring-Setup:
// monitoring/cqrs-metrics.ts
import { MetricUnit, Metrics } from '@aws-lambda-powertools/metrics';
import { Tracer } from '@aws-lambda-powertools/tracer';
import { Logger } from '@aws-lambda-powertools/logger';
const metrics = new Metrics({ namespace: 'CQRS', serviceName: 'orders' });
const tracer = new Tracer({ serviceName: 'orders' });
const logger = new Logger({ serviceName: 'orders' });
export const instrumentedHandler = tracer.captureLambdaHandler(
metrics.logMetrics(
async (event: any) => {
const segment = tracer.getSegment();
// Command/Query-Trennung tracken
const operationType = event.httpMethod === 'GET' ? 'QUERY' : 'COMMAND';
metrics.addMetric(`${operationType}_REQUEST`, MetricUnit.Count, 1);
const startTime = Date.now();
try {
// Correlation ID für Service-übergreifendes Tracing hinzufügen
const correlationId = event.headers['x-correlation-id'] || ulid();
segment?.addAnnotation('correlationId', correlationId);
logger.appendKeys({ correlationId });
// Read/Write Model Sync Lag tracken
if (operationType === 'QUERY') {
const syncLag = await measureSyncLag();
metrics.addMetric('READ_MODEL_LAG_MS', MetricUnit.Milliseconds, syncLag);
if (syncLag > 5000) {
logger.warn('Hoher Read Model Lag erkannt', { syncLag });
}
}
const result = await processRequest(event);
metrics.addMetric(`${operationType}_SUCCESS`, MetricUnit.Count, 1);
metrics.addMetric(`${operationType}_DURATION`, MetricUnit.Milliseconds,
Date.now() - startTime);
return result;
} catch (error) {
metrics.addMetric(`${operationType}_ERROR`, MetricUnit.Count, 1);
logger.error('Request fehlgeschlagen', { error, event });
throw error;
}
}
)
);
// Custom CloudWatch Dashboard
export const dashboardConfig = {
widgets: [
{
type: 'metric',
properties: {
metrics: [
['CQRS', 'COMMAND_REQUEST', { stat: 'Sum' }],
['.', 'QUERY_REQUEST', { stat: 'Sum' }],
['.', 'READ_MODEL_LAG_MS', { stat: 'Average' }]
],
period: 300,
stat: 'Average',
region: 'us-east-1',
title: 'CQRS Operations'
}
}
]
};
Kostenanalyse: Die 70% Reduktion#
Hier ist die tatsächliche Kostenaufschlüsselung aus unserem Produktionssystem:
Vor CQRS (März 2024):#
- DynamoDB Provisioned: $2.100/Monat (für Peak provisioniert)
- Lambda Compute: $450/Monat (komplexe Queries, hoher Memory)
- API Gateway: $180/Monat
- **Gesamt:
$2.730/Monat**
Nach CQRS (Juni 2024):#
- DynamoDB On-Demand (Write): $80/Monat
- DynamoDB On-Demand (Read): $320/Monat
- EventBridge: $12/Monat
- Lambda Compute: $180/Monat (einfachere, fokussierte Funktionen)
- API Gateway: $180/Monat
- Gesamt: $972/Monat (64% Reduktion)
Die echten Einsparungen kamen von:
- Kein Over-Provisioning für Peak Loads
- Gecachte Read Models reduzierten Database Hits
- Einfachere Lambda-Funktionen mit weniger Memory
- Bessere Query-Optimierung mit zweckgebauten Indizes
Gelernte Lektionen (auf die harte Tour)#
1. Fang einfach an, wirklich einfach#
Meine erste CQRS-Implementierung hatte 17 verschiedene Read Models. Wir haben jetzt 3. Fang mit einem Read Model an und füg nur mehr hinzu, wenn Query Patterns es verlangen.
2. Event Versioning ist kritisch#
Wir haben unsere Events anfangs nicht versioniert. Als wir ein Feld zu OrderCreated hinzufügen mussten, haben wir jeden Consumer gebrochen. Jetzt:
interface OrderCreatedV1 {
version: 1;
orderId: string;
customerId: string;
total: number;
}
interface OrderCreatedV2 {
version: 2;
orderId: string;
customerId: string;
total: number;
currency: string; // Neues Feld
}
// Handler unterstützt beide Versionen
export const handler = async (event: OrderCreatedV1 | OrderCreatedV2) => {
const currency = 'version' in event && event.version >= 2
? (event as OrderCreatedV2).currency
: 'USD'; // Default für V1
};
3. Idempotenz überall#
Events können mehrfach geliefert werden. Jeder Handler muss idempotent sein:
// Conditional Writes verwenden, um Idempotenz sicherzustellen
await dynamoClient.send(new PutCommand({
TableName: TABLE_NAME,
Item: processedEvent,
ConditionExpression: 'attribute_not_exists(eventId)'
}));
4. Den Sync Lag monitoren#
Die Zeit zwischen Command-Ausführung und Read Model Update ist deine wichtigste Metrik. Wir alarmieren, wenn sie 5 Sekunden überschreitet.
5. Für Reconciliation planen#
Read Models werden driften. Wir führen einen nächtlichen Job aus, der Command und Query Models vergleicht und Diskrepanzen behebt:
// Läuft täglich um 3 Uhr morgens
export const reconciliationJob = async () => {
const commandRecords = await scanCommandTable();
const readRecords = await scanReadTable();
const discrepancies = findDiscrepancies(commandRecords, readRecords);
for (const issue of discrepancies) {
await republishEvent(issue.originalEvent);
logger.warn('Reconciliation erforderlich', { issue });
}
metrics.addMetric('RECONCILIATION_FIXES', MetricUnit.Count, discrepancies.length);
};
Wann du CQRS NICHT verwenden solltest#
CQRS hat unserem System Komplexität hinzugefügt. Es hat sich für uns gelohnt, aber vermeide es, wenn:
- Deine Read/Write Patterns ähnlich sind
- Du einfache CRUD-Operationen hast
- Strong Consistency überall erforderlich ist
- Dein Team mit Eventual Consistency nicht vertraut ist
- Du keine Performance-Probleme hast
Wir haben CQRS bei unserem Admin Panel probiert (50 User, einfaches CRUD). Es war eine Katastrophe - zu viel Komplexität ohne Nutzen.
Die Debugging-Horrorgeschichte#
Zwei Wochen nach dem CQRS-Deploy berichteten Kunden, dass sie alte Bestellstatus sahen. Das Problem? Unser Event Processor scheiterte still für bestimmte Produktkategorien. Die Lambda hatte Timeouts, aber die DLQ war nicht richtig konfiguriert.
Der Fix hat uns gelehrt:
- Immer DLQs mit Alerts konfigurieren
- Circuit Breaker zu Event Processors hinzufügen
- Read-after-Write Consistency für kritische Pfade implementieren
- Eine "Fallback zum Command Model"-Option behalten
Der Weg nach vorne#
CQRS mit Serverless funktioniert wunderbar, wenn du es brauchst. Die Kombination aus Lambdas Auto-Scaling, EventBridges Routing und DynamoDBs flexiblen Schemas macht die Implementierung unkompliziert.
Aber denk dran: CQRS ist eine Lösung für spezifische Probleme - hohe Read/Write-Disparität, komplexe Query-Anforderungen oder Skalierungsprobleme. Wenn du diese Probleme nicht hast, brauchst du kein CQRS.
Fang mit einem Monolithen an, miss deine Bottlenecks und erwäge erst dann CQRS. Wenn du es implementierst, fang mit einem Read Model an und wachse von dort.
Die 70% Kostenreduktion war nett, aber der echte Gewinn war Black Friday 2024: null Downtime, 15ms p99 Latency und glückliche Kunden. Da wusste ich, dass wir die richtige Entscheidung getroffen hatten.
Kommentare (0)
An der Unterhaltung teilnehmen
Melde dich an, um deine Gedanken zu teilen und mit der Community zu interagieren
Noch keine Kommentare
Sei der erste, der deine Gedanken zu diesem Beitrag teilt!
Kommentare (0)
An der Unterhaltung teilnehmen
Melde dich an, um deine Gedanken zu teilen und mit der Community zu interagieren
Noch keine Kommentare
Sei der erste, der deine Gedanken zu diesem Beitrag teilt!