AWS Lambda: Serverless Patterns That Scale
Design scalable serverless applications with AWS Lambda. Learn event-driven patterns, cold start optimization, and best practices for production workloads.
AWS Lambda enables event-driven architectures that scale automatically and cost nothing when idle. But serverless has pitfalls — cold starts, execution limits, and debugging challenges. This guide covers patterns that work at scale.
Lambda Anatomy
# handler.py
import json
import logging
logger = logging.getLogger()
logger.setLevel(logging.INFO)
# Code outside handler runs once per container (cold start)
# Use for initialization: DB connections, loading configs
import boto3
dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table('my-table')
def handler(event, context):
"""
event: Input data (varies by trigger)
context: Runtime info (request ID, remaining time, etc.)
"""
logger.info(f"Processing request {context.aws_request_id}")
try:
# Your business logic
result = process_event(event)
return {
'statusCode': 200,
'headers': {'Content-Type': 'application/json'},
'body': json.dumps(result)
}
except ValueError as e:
logger.warning(f"Invalid input: {e}")
return {'statusCode': 400, 'body': json.dumps({'error': str(e)})}
except Exception as e:
logger.error(f"Unexpected error: {e}", exc_info=True)
return {'statusCode': 500, 'body': json.dumps({'error': 'Internal error'})}
Infrastructure as Code with Terraform
# Lambda function
resource "aws_lambda_function" "api" {
function_name = "my-api"
runtime = "python3.12"
handler = "handler.handler"
timeout = 30
memory_size = 512
filename = data.archive_file.lambda_zip.output_path
source_code_hash = data.archive_file.lambda_zip.output_base64sha256
role = aws_iam_role.lambda_role.arn
environment {
variables = {
TABLE_NAME = aws_dynamodb_table.main.name
LOG_LEVEL = "INFO"
ENVIRONMENT = var.environment
}
}
# VPC config (if needed)
vpc_config {
subnet_ids = var.private_subnet_ids
security_group_ids = [aws_security_group.lambda.id]
}
# Reserved concurrency (optional)
reserved_concurrent_executions = 100
# Enable X-Ray tracing
tracing_config {
mode = "Active"
}
tags = var.tags
}
# IAM role
resource "aws_iam_role" "lambda_role" {
name = "my-api-lambda-role"
assume_role_policy = jsonencode({
Version = "2012-10-17"
Statement = [{
Action = "sts:AssumeRole"
Effect = "Allow"
Principal = {
Service = "lambda.amazonaws.com"
}
}]
})
}
# Permissions
resource "aws_iam_role_policy" "lambda_policy" {
name = "my-api-lambda-policy"
role = aws_iam_role.lambda_role.id
policy = jsonencode({
Version = "2012-10-17"
Statement = [
{
Effect = "Allow"
Action = [
"logs:CreateLogGroup",
"logs:CreateLogStream",
"logs:PutLogEvents"
]
Resource = "arn:aws:logs:*:*:*"
},
{
Effect = "Allow"
Action = [
"dynamodb:GetItem",
"dynamodb:PutItem",
"dynamodb:Query"
]
Resource = aws_dynamodb_table.main.arn
},
{
Effect = "Allow"
Action = [
"xray:PutTraceSegments",
"xray:PutTelemetryRecords"
]
Resource = "*"
}
]
})
}
Event-Driven Patterns
API Gateway Integration
# API Gateway HTTP API
resource "aws_apigatewayv2_api" "main" {
name = "my-api"
protocol_type = "HTTP"
cors_configuration {
allow_origins = ["https://myapp.com"]
allow_methods = ["GET", "POST", "PUT", "DELETE"]
allow_headers = ["Content-Type", "Authorization"]
max_age = 3600
}
}
resource "aws_apigatewayv2_integration" "lambda" {
api_id = aws_apigatewayv2_api.main.id
integration_type = "AWS_PROXY"
integration_uri = aws_lambda_function.api.invoke_arn
payload_format_version = "2.0"
}
resource "aws_apigatewayv2_route" "api" {
api_id = aws_apigatewayv2_api.main.id
route_key = "ANY /api/{proxy+}"
target = "integrations/${aws_apigatewayv2_integration.lambda.id}"
}
resource "aws_lambda_permission" "api_gateway" {
statement_id = "AllowAPIGateway"
action = "lambda:InvokeFunction"
function_name = aws_lambda_function.api.function_name
principal = "apigateway.amazonaws.com"
source_arn = "${aws_apigatewayv2_api.main.execution_arn}/*/*"
}
SQS Queue Processing
# handler.py
import json
def handler(event, context):
"""Process SQS messages in batches"""
failed_message_ids = []
for record in event['Records']:
try:
body = json.loads(record['body'])
process_message(body)
except Exception as e:
print(f"Failed to process message {record['messageId']}: {e}")
failed_message_ids.append(record['messageId'])
# Partial batch failure response
if failed_message_ids:
return {
'batchItemFailures': [
{'itemIdentifier': msg_id}
for msg_id in failed_message_ids
]
}
return {'batchItemFailures': []}
# SQS trigger
resource "aws_lambda_event_source_mapping" "sqs" {
event_source_arn = aws_sqs_queue.main.arn
function_name = aws_lambda_function.worker.arn
batch_size = 10
maximum_batching_window_in_seconds = 5
# Enable partial batch failure
function_response_types = ["ReportBatchItemFailures"]
# Scaling configuration
scaling_config {
maximum_concurrency = 50
}
}
S3 Event Processing
# handler.py
import boto3
import urllib.parse
s3 = boto3.client('s3')
def handler(event, context):
for record in event['Records']:
bucket = record['s3']['bucket']['name']
key = urllib.parse.unquote_plus(record['s3']['object']['key'])
print(f"Processing s3://{bucket}/{key}")
# Download file
response = s3.get_object(Bucket=bucket, Key=key)
content = response['Body'].read()
# Process and upload result
result = process_file(content)
s3.put_object(
Bucket=bucket,
Key=f"processed/{key}",
Body=result
)
EventBridge Patterns
# EventBridge rule
resource "aws_cloudwatch_event_rule" "order_created" {
name = "order-created"
event_pattern = jsonencode({
source = ["myapp.orders"]
detail-type = ["Order Created"]
})
}
resource "aws_cloudwatch_event_target" "process_order" {
rule = aws_cloudwatch_event_rule.order_created.name
arn = aws_lambda_function.process_order.arn
}
# DLQ for failed events
resource "aws_cloudwatch_event_target" "dlq" {
rule = aws_cloudwatch_event_rule.order_created.name
arn = aws_sqs_queue.dlq.arn
dead_letter_config {
arn = aws_sqs_queue.dlq.arn
}
}
Cold Start Optimization
Provisioned Concurrency
resource "aws_lambda_alias" "live" {
name = "live"
function_name = aws_lambda_function.api.function_name
function_version = aws_lambda_function.api.version
}
resource "aws_lambda_provisioned_concurrency_config" "api" {
function_name = aws_lambda_function.api.function_name
qualifier = aws_lambda_alias.live.name
provisioned_concurrent_executions = 5
}
Minimize Package Size
# Use multi-stage build for dependencies
FROM public.ecr.aws/lambda/python:3.12 as builder
COPY requirements.txt .
RUN pip install --target /asset -r requirements.txt
FROM public.ecr.aws/lambda/python:3.12
COPY --from=builder /asset ${LAMBDA_TASK_ROOT}
COPY handler.py ${LAMBDA_TASK_ROOT}
CMD ["handler.handler"]
Lazy Loading
# Don't load at module level if not always needed
_heavy_model = None
def get_model():
global _heavy_model
if _heavy_model is None:
_heavy_model = load_ml_model() # Only load when first needed
return _heavy_model
def handler(event, context):
if event.get('warmup'):
return {'statusCode': 200} # Don't load model for warmup
model = get_model()
# Use model...
Error Handling Patterns
Dead Letter Queues
resource "aws_lambda_function" "worker" {
# ...
dead_letter_config {
target_arn = aws_sqs_queue.dlq.arn
}
}
resource "aws_sqs_queue" "dlq" {
name = "worker-dlq"
message_retention_seconds = 1209600 # 14 days
}
Retry with Exponential Backoff
import time
import random
def retry_with_backoff(func, max_retries=3, base_delay=1):
for attempt in range(max_retries):
try:
return func()
except Exception as e:
if attempt == max_retries - 1:
raise
delay = base_delay * (2 ** attempt) + random.uniform(0, 1)
print(f"Attempt {attempt + 1} failed, retrying in {delay:.2f}s")
time.sleep(delay)
Idempotency
import hashlib
import boto3
dynamodb = boto3.resource('dynamodb')
idempotency_table = dynamodb.Table('idempotency')
def handler(event, context):
# Create idempotency key from event
event_hash = hashlib.sha256(
json.dumps(event, sort_keys=True).encode()
).hexdigest()
# Check if already processed
try:
response = idempotency_table.get_item(Key={'id': event_hash})
if 'Item' in response:
print(f"Already processed: {event_hash}")
return response['Item']['result']
except Exception:
pass
# Process event
result = process_event(event)
# Store result
idempotency_table.put_item(
Item={
'id': event_hash,
'result': result,
'ttl': int(time.time()) + 86400 # 24 hour TTL
}
)
return result
Observability
Structured Logging
import json
import logging
class JsonFormatter(logging.Formatter):
def format(self, record):
log_record = {
'timestamp': self.formatTime(record),
'level': record.levelname,
'message': record.getMessage(),
'function': record.funcName,
}
if hasattr(record, 'request_id'):
log_record['request_id'] = record.request_id
if record.exc_info:
log_record['exception'] = self.formatException(record.exc_info)
return json.dumps(log_record)
logger = logging.getLogger()
handler = logging.StreamHandler()
handler.setFormatter(JsonFormatter())
logger.addHandler(handler)
logger.setLevel(logging.INFO)
def handler(event, context):
# Add request ID to all logs
logger = logging.LoggerAdapter(
logging.getLogger(),
{'request_id': context.aws_request_id}
)
logger.info("Processing event", extra={'event_type': event.get('type')})
Custom Metrics
import boto3
cloudwatch = boto3.client('cloudwatch')
def publish_metric(name, value, unit='Count'):
cloudwatch.put_metric_data(
Namespace='MyApp',
MetricData=[{
'MetricName': name,
'Value': value,
'Unit': unit,
'Dimensions': [
{'Name': 'FunctionName', 'Value': os.environ['AWS_LAMBDA_FUNCTION_NAME']},
{'Name': 'Environment', 'Value': os.environ.get('ENVIRONMENT', 'unknown')}
]
}]
)
def handler(event, context):
start = time.time()
try:
result = process_event(event)
publish_metric('ProcessedEvents', 1)
return result
except Exception as e:
publish_metric('FailedEvents', 1)
raise
finally:
duration = (time.time() - start) * 1000
publish_metric('ProcessingTime', duration, 'Milliseconds')
Testing Lambda Functions
Unit Tests
# test_handler.py
import pytest
from unittest.mock import patch, MagicMock
from handler import handler
@pytest.fixture
def mock_context():
context = MagicMock()
context.aws_request_id = 'test-request-id'
context.get_remaining_time_in_millis.return_value = 30000
return context
def test_handler_success(mock_context):
event = {'body': '{"name": "test"}'}
with patch('handler.table') as mock_table:
mock_table.put_item.return_value = {}
response = handler(event, mock_context)
assert response['statusCode'] == 200
mock_table.put_item.assert_called_once()
def test_handler_validation_error(mock_context):
event = {'body': 'invalid json'}
response = handler(event, mock_context)
assert response['statusCode'] == 400
Local Testing with SAM
# template.yaml
AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31
Globals:
Function:
Timeout: 30
Runtime: python3.12
Resources:
ApiFunction:
Type: AWS::Serverless::Function
Properties:
Handler: handler.handler
CodeUri: ./src
Events:
Api:
Type: Api
Properties:
Path: /api/{proxy+}
Method: ANY
# Start local API
sam local start-api
# Invoke with event
sam local invoke -e event.json
Key Takeaways
- Initialize outside handler — DB connections, SDK clients
- Use provisioned concurrency for latency-sensitive workloads
- Implement idempotency — events can be delivered multiple times
- Set appropriate timeouts — default 3s is too short for most workloads
- Use DLQs — never lose events silently
- Structured logging — JSON logs are queryable
- Right-size memory — more memory = more CPU = sometimes cheaper
“Serverless doesn’t mean worry-less. You’re not managing servers, but you’re still managing distributed systems.”