Event-Driven Architecture with Lambda and SQS
Build resilient event-driven systems with AWS Lambda and SQS. Learn queue patterns, error handling, dead letter queues, and scaling strategies for production workloads.
Event-driven architecture decouples producers from consumers, enabling systems that scale independently and fail gracefully. Lambda + SQS is the workhorse of serverless event processing on AWS. Here’s how to do it right.
Architecture Overview
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ Producer │────►│ SQS │────►│ Lambda │
│ (API, S3, │ │ Queue │ │ Consumer │
│ Events) │ │ │ │ │
└─────────────┘ └──────┬──────┘ └──────┬──────┘
│ │
│ Failed msgs │ Process
▼ ▼
┌─────────────┐ ┌─────────────┐
│ Dead │ │ DynamoDB/ │
│ Letter │ │ S3/RDS │
│ Queue │ │ │
└─────────────┘ └─────────────┘
Setting Up SQS + Lambda
Terraform Infrastructure
# Main processing queue
resource "aws_sqs_queue" "main" {
name = "orders-queue"
visibility_timeout_seconds = 300 # 6x Lambda timeout
message_retention_seconds = 1209600 # 14 days
receive_wait_time_seconds = 20 # Long polling
# Enable server-side encryption
sqs_managed_sse_enabled = true
# Or use KMS
# kms_master_key_id = aws_kms_key.sqs.id
redrive_policy = jsonencode({
deadLetterTargetArn = aws_sqs_queue.dlq.arn
maxReceiveCount = 3
})
tags = var.tags
}
# Dead letter queue
resource "aws_sqs_queue" "dlq" {
name = "orders-queue-dlq"
message_retention_seconds = 1209600 # 14 days
tags = var.tags
}
# Lambda function
resource "aws_lambda_function" "processor" {
function_name = "order-processor"
runtime = "python3.12"
handler = "handler.process_order"
timeout = 50 # Queue visibility = 6x this
memory_size = 512
filename = data.archive_file.lambda.output_path
source_code_hash = data.archive_file.lambda.output_base64sha256
role = aws_iam_role.lambda.arn
environment {
variables = {
TABLE_NAME = aws_dynamodb_table.orders.name
QUEUE_URL = aws_sqs_queue.main.url
LOG_LEVEL = "INFO"
}
}
dead_letter_config {
target_arn = aws_sqs_queue.dlq.arn
}
tracing_config {
mode = "Active"
}
reserved_concurrent_executions = 100 # Limit concurrency
tags = var.tags
}
# Event source mapping
resource "aws_lambda_event_source_mapping" "sqs" {
event_source_arn = aws_sqs_queue.main.arn
function_name = aws_lambda_function.processor.arn
batch_size = 10
maximum_batching_window_in_seconds = 5
# Enable partial batch failure reporting
function_response_types = ["ReportBatchItemFailures"]
# Scaling configuration
scaling_config {
maximum_concurrency = 50
}
# Filter events (optional)
filter_criteria {
filter {
pattern = jsonencode({
body = {
type = ["order_created", "order_updated"]
}
})
}
}
}
# IAM permissions
resource "aws_iam_role_policy" "lambda_sqs" {
name = "lambda-sqs-policy"
role = aws_iam_role.lambda.id
policy = jsonencode({
Version = "2012-10-17"
Statement = [
{
Effect = "Allow"
Action = [
"sqs:ReceiveMessage",
"sqs:DeleteMessage",
"sqs:GetQueueAttributes",
"sqs:ChangeMessageVisibility"
]
Resource = aws_sqs_queue.main.arn
},
{
Effect = "Allow"
Action = [
"sqs:SendMessage"
]
Resource = aws_sqs_queue.dlq.arn
}
]
})
}
Lambda Handler with Batch Processing
Python Implementation
import json
import logging
import boto3
from dataclasses import dataclass
from typing import List, Optional
logger = logging.getLogger()
logger.setLevel(logging.INFO)
dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table(os.environ['TABLE_NAME'])
@dataclass
class ProcessingResult:
message_id: str
success: bool
error: Optional[str] = None
def handler(event, context):
"""
Process SQS messages with partial batch failure support.
"""
logger.info(f"Processing {len(event['Records'])} messages")
results: List[ProcessingResult] = []
for record in event['Records']:
result = process_message(record)
results.append(result)
# Return failed message IDs for retry
failed_ids = [
{'itemIdentifier': r.message_id}
for r in results
if not r.success
]
if failed_ids:
logger.warning(f"Failed to process {len(failed_ids)} messages")
return {'batchItemFailures': failed_ids}
def process_message(record: dict) -> ProcessingResult:
"""
Process a single SQS message.
"""
message_id = record['messageId']
try:
body = json.loads(record['body'])
logger.info(f"Processing message {message_id}: {body.get('type')}")
# Validate message
validate_message(body)
# Process based on type
if body['type'] == 'order_created':
handle_order_created(body['data'])
elif body['type'] == 'order_updated':
handle_order_updated(body['data'])
else:
raise ValueError(f"Unknown message type: {body['type']}")
return ProcessingResult(message_id=message_id, success=True)
except json.JSONDecodeError as e:
logger.error(f"Invalid JSON in message {message_id}: {e}")
# Don't retry malformed messages
return ProcessingResult(message_id=message_id, success=True)
except ValueError as e:
logger.error(f"Validation error for {message_id}: {e}")
# Don't retry validation errors
return ProcessingResult(message_id=message_id, success=True)
except Exception as e:
logger.error(f"Error processing {message_id}: {e}", exc_info=True)
return ProcessingResult(
message_id=message_id,
success=False,
error=str(e)
)
def validate_message(body: dict) -> None:
"""Validate message structure."""
required_fields = ['type', 'data', 'timestamp']
for field in required_fields:
if field not in body:
raise ValueError(f"Missing required field: {field}")
def handle_order_created(data: dict) -> None:
"""Process new order."""
order_id = data['order_id']
# Idempotency check
existing = table.get_item(Key={'pk': f'ORDER#{order_id}'})
if 'Item' in existing:
logger.info(f"Order {order_id} already exists, skipping")
return
# Create order record
table.put_item(Item={
'pk': f'ORDER#{order_id}',
'sk': 'METADATA',
'status': 'pending',
'created_at': data['timestamp'],
'customer_id': data['customer_id'],
'items': data['items'],
'total': data['total']
})
logger.info(f"Created order {order_id}")
def handle_order_updated(data: dict) -> None:
"""Update existing order."""
order_id = data['order_id']
table.update_item(
Key={'pk': f'ORDER#{order_id}', 'sk': 'METADATA'},
UpdateExpression='SET #status = :status, updated_at = :ts',
ExpressionAttributeNames={'#status': 'status'},
ExpressionAttributeValues={
':status': data['status'],
':ts': data['timestamp']
},
ConditionExpression='attribute_exists(pk)'
)
logger.info(f"Updated order {order_id} to {data['status']}")
Sending Messages to SQS
Producer Function
import json
import uuid
import boto3
from datetime import datetime
sqs = boto3.client('sqs')
QUEUE_URL = os.environ['QUEUE_URL']
def send_order_event(order_data: dict, event_type: str) -> str:
"""Send order event to SQS."""
message = {
'type': event_type,
'data': order_data,
'timestamp': datetime.utcnow().isoformat(),
'correlation_id': str(uuid.uuid4())
}
response = sqs.send_message(
QueueUrl=QUEUE_URL,
MessageBody=json.dumps(message),
MessageGroupId=order_data.get('customer_id', 'default'), # For FIFO
MessageDeduplicationId=f"{order_data['order_id']}-{event_type}" # For FIFO
)
return response['MessageId']
def send_batch(messages: list) -> dict:
"""Send multiple messages in a batch."""
entries = [
{
'Id': str(i),
'MessageBody': json.dumps(msg),
'MessageGroupId': msg['data'].get('customer_id', 'default'),
'MessageDeduplicationId': f"{msg['data']['order_id']}-{msg['type']}"
}
for i, msg in enumerate(messages)
]
response = sqs.send_message_batch(
QueueUrl=QUEUE_URL,
Entries=entries
)
return {
'successful': len(response.get('Successful', [])),
'failed': len(response.get('Failed', []))
}
FIFO Queues for Ordering
When to Use FIFO
Standard Queue:
- At-least-once delivery
- Best-effort ordering
- Unlimited throughput
- Use for: Independent events, high volume
FIFO Queue:
- Exactly-once processing
- Strict ordering within message group
- 3,000 msg/sec (with batching)
- Use for: Order-sensitive processing, deduplication needed
FIFO Queue Configuration
resource "aws_sqs_queue" "fifo" {
name = "orders-queue.fifo"
fifo_queue = true
content_based_deduplication = false # We provide our own dedup IDs
visibility_timeout_seconds = 300
message_retention_seconds = 1209600
# High throughput mode
deduplication_scope = "messageGroup"
fifo_throughput_limit = "perMessageGroupId"
redrive_policy = jsonencode({
deadLetterTargetArn = aws_sqs_queue.fifo_dlq.arn
maxReceiveCount = 3
})
tags = var.tags
}
resource "aws_sqs_queue" "fifo_dlq" {
name = "orders-queue-dlq.fifo"
fifo_queue = true
tags = var.tags
}
Error Handling Patterns
Exponential Backoff with Visibility Timeout
def process_with_backoff(record: dict, context) -> ProcessingResult:
"""Implement exponential backoff using visibility timeout."""
message_id = record['messageId']
receive_count = int(record['attributes'].get('ApproximateReceiveCount', 1))
try:
process_message(record)
return ProcessingResult(message_id=message_id, success=True)
except RetryableError as e:
# Calculate backoff
backoff_seconds = min(300, 30 * (2 ** (receive_count - 1)))
# Extend visibility timeout
sqs.change_message_visibility(
QueueUrl=QUEUE_URL,
ReceiptHandle=record['receiptHandle'],
VisibilityTimeout=backoff_seconds
)
logger.warning(
f"Retryable error for {message_id}, "
f"attempt {receive_count}, backoff {backoff_seconds}s"
)
return ProcessingResult(
message_id=message_id,
success=False,
error=str(e)
)
Dead Letter Queue Processing
def process_dlq(event, context):
"""Process messages from DLQ for manual investigation."""
for record in event['Records']:
body = json.loads(record['body'])
original_queue = record['attributes'].get('DeadLetterQueueSourceArn')
# Store in S3 for analysis
s3.put_object(
Bucket=os.environ['DLQ_BUCKET'],
Key=f"failed/{record['messageId']}.json",
Body=json.dumps({
'message': body,
'attributes': record['attributes'],
'original_queue': original_queue,
'processed_at': datetime.utcnow().isoformat()
})
)
# Send alert
sns.publish(
TopicArn=os.environ['ALERTS_TOPIC'],
Subject='DLQ Message Received',
Message=f"Failed message {record['messageId']} from {original_queue}"
)
return {'processed': len(event['Records'])}
Monitoring and Observability
CloudWatch Alarms
resource "aws_cloudwatch_metric_alarm" "queue_depth" {
alarm_name = "orders-queue-depth"
comparison_operator = "GreaterThanThreshold"
evaluation_periods = 2
metric_name = "ApproximateNumberOfMessagesVisible"
namespace = "AWS/SQS"
period = 300
statistic = "Average"
threshold = 1000
alarm_description = "Queue depth is too high"
dimensions = {
QueueName = aws_sqs_queue.main.name
}
alarm_actions = [aws_sns_topic.alerts.arn]
}
resource "aws_cloudwatch_metric_alarm" "dlq_not_empty" {
alarm_name = "orders-dlq-not-empty"
comparison_operator = "GreaterThanThreshold"
evaluation_periods = 1
metric_name = "ApproximateNumberOfMessagesVisible"
namespace = "AWS/SQS"
period = 60
statistic = "Sum"
threshold = 0
alarm_description = "Messages in DLQ require attention"
dimensions = {
QueueName = aws_sqs_queue.dlq.name
}
alarm_actions = [aws_sns_topic.alerts.arn]
}
resource "aws_cloudwatch_metric_alarm" "processing_errors" {
alarm_name = "order-processor-errors"
comparison_operator = "GreaterThanThreshold"
evaluation_periods = 2
metric_name = "Errors"
namespace = "AWS/Lambda"
period = 300
statistic = "Sum"
threshold = 10
alarm_description = "High error rate in order processor"
dimensions = {
FunctionName = aws_lambda_function.processor.function_name
}
alarm_actions = [aws_sns_topic.alerts.arn]
}
Custom Metrics
import boto3
cloudwatch = boto3.client('cloudwatch')
def publish_metrics(success_count: int, failure_count: int, duration_ms: float):
"""Publish custom processing metrics."""
cloudwatch.put_metric_data(
Namespace='OrderProcessing',
MetricData=[
{
'MetricName': 'MessagesProcessed',
'Value': success_count,
'Unit': 'Count',
'Dimensions': [
{'Name': 'Status', 'Value': 'Success'}
]
},
{
'MetricName': 'MessagesProcessed',
'Value': failure_count,
'Unit': 'Count',
'Dimensions': [
{'Name': 'Status', 'Value': 'Failure'}
]
},
{
'MetricName': 'ProcessingDuration',
'Value': duration_ms,
'Unit': 'Milliseconds'
}
]
)
Key Takeaways
- Set visibility timeout = 6x Lambda timeout — prevents duplicate processing
- Always use DLQs — never lose messages silently
- Implement partial batch failures — don’t fail the whole batch for one bad message
- Use FIFO when order matters — but accept the throughput tradeoff
- Monitor queue depth and DLQ — early warning for processing issues
“Event-driven architectures are like plumbing — when they work, nobody notices. When they fail, everybody notices.”