Serverless Data Processing: Lambda + S3 + DynamoDB
Build scalable data pipelines with AWS serverless services. Learn event-driven processing patterns, data transformation, and best practices for Lambda, S3, and DynamoDB integration.
Serverless data processing scales automatically, costs nothing when idle, and eliminates infrastructure management. S3 as the data lake, Lambda for transformations, and DynamoDB for fast access — this trio handles everything from log processing to real-time analytics. Here’s how to build it.
Architecture Overview
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ Data │────►│ S3 │────►│ Lambda │
│ Sources │ │ Raw Data │ │ Transform │
└─────────────┘ └─────────────┘ └──────┬──────┘
│
┌──────────────────────────┼──────────────────────────┐
▼ ▼ ▼
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ S3 │ │ DynamoDB │ │ Athena/ │
│ Processed │ │ Hot Data │ │ Redshift │
└─────────────┘ └─────────────┘ └─────────────┘
S3 Event Processing
Terraform Setup
# Raw data bucket
resource "aws_s3_bucket" "raw" {
bucket = "my-app-raw-data-${var.environment}"
tags = var.tags
}
resource "aws_s3_bucket_versioning" "raw" {
bucket = aws_s3_bucket.raw.id
versioning_configuration {
status = "Enabled"
}
}
# Processed data bucket
resource "aws_s3_bucket" "processed" {
bucket = "my-app-processed-${var.environment}"
tags = var.tags
}
# Lambda function
resource "aws_lambda_function" "processor" {
function_name = "data-processor"
runtime = "python3.12"
handler = "handler.process"
timeout = 300
memory_size = 1024
filename = data.archive_file.lambda.output_path
source_code_hash = data.archive_file.lambda.output_base64sha256
role = aws_iam_role.lambda.arn
environment {
variables = {
PROCESSED_BUCKET = aws_s3_bucket.processed.id
TABLE_NAME = aws_dynamodb_table.data.name
LOG_LEVEL = "INFO"
}
}
reserved_concurrent_executions = 50 # Prevent DynamoDB throttling
tags = var.tags
}
# S3 trigger
resource "aws_s3_bucket_notification" "raw" {
bucket = aws_s3_bucket.raw.id
lambda_function {
lambda_function_arn = aws_lambda_function.processor.arn
events = ["s3:ObjectCreated:*"]
filter_prefix = "incoming/"
filter_suffix = ".json"
}
depends_on = [aws_lambda_permission.s3]
}
resource "aws_lambda_permission" "s3" {
statement_id = "AllowS3"
action = "lambda:InvokeFunction"
function_name = aws_lambda_function.processor.function_name
principal = "s3.amazonaws.com"
source_arn = aws_s3_bucket.raw.arn
}
Lambda Processor
import json
import os
import boto3
import logging
from typing import List, Dict
from datetime import datetime
from decimal import Decimal
logger = logging.getLogger()
logger.setLevel(os.environ.get('LOG_LEVEL', 'INFO'))
s3 = boto3.client('s3')
dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table(os.environ['TABLE_NAME'])
PROCESSED_BUCKET = os.environ['PROCESSED_BUCKET']
def handler(event, context):
"""Process S3 events."""
processed = 0
failed = 0
for record in event['Records']:
try:
process_s3_record(record)
processed += 1
except Exception as e:
logger.error(f"Failed to process record: {e}", exc_info=True)
failed += 1
logger.info(f"Processed: {processed}, Failed: {failed}")
if failed > 0:
raise Exception(f"Failed to process {failed} records")
return {'processed': processed}
def process_s3_record(record: dict):
"""Process a single S3 event record."""
bucket = record['s3']['bucket']['name']
key = record['s3']['object']['key']
logger.info(f"Processing s3://{bucket}/{key}")
# Download file
response = s3.get_object(Bucket=bucket, Key=key)
raw_data = json.loads(response['Body'].read().decode('utf-8'))
# Transform data
transformed = transform_data(raw_data)
# Store in DynamoDB (hot data)
store_in_dynamodb(transformed['records'])
# Store in S3 (cold data)
store_in_s3(transformed, key)
logger.info(f"Processed {len(transformed['records'])} records from {key}")
def transform_data(raw_data: dict) -> dict:
"""Transform raw data into processed format."""
records = []
for item in raw_data.get('events', []):
record = {
'pk': f"EVENT#{item['event_id']}",
'sk': item['timestamp'],
'event_type': item['type'],
'user_id': item.get('user_id'),
'data': item.get('data', {}),
'processed_at': datetime.utcnow().isoformat(),
# Convert floats to Decimal for DynamoDB
'metrics': {k: Decimal(str(v)) for k, v in item.get('metrics', {}).items()}
}
records.append(record)
return {
'records': records,
'summary': {
'total_events': len(records),
'event_types': list(set(r['event_type'] for r in records)),
'processed_at': datetime.utcnow().isoformat()
}
}
def store_in_dynamodb(records: List[dict]):
"""Batch write records to DynamoDB."""
with table.batch_writer() as batch:
for record in records:
batch.put_item(Item=record)
def store_in_s3(transformed: dict, original_key: str):
"""Store processed data in S3."""
# Generate output key
date = datetime.utcnow()
output_key = f"processed/year={date.year}/month={date.month:02d}/day={date.day:02d}/{os.path.basename(original_key)}"
# Convert Decimals back to floats for JSON
output_data = json.loads(
json.dumps(transformed, default=str)
)
s3.put_object(
Bucket=PROCESSED_BUCKET,
Key=output_key,
Body=json.dumps(output_data),
ContentType='application/json'
)
# For large files, use streaming
def process_large_file(bucket: str, key: str):
"""Process large files line by line."""
import gzip
response = s3.get_object(Bucket=bucket, Key=key)
# Handle gzipped files
if key.endswith('.gz'):
body = gzip.GzipFile(fileobj=response['Body'])
else:
body = response['Body']
batch = []
batch_size = 25 # DynamoDB batch write limit
for line in body.iter_lines():
record = json.loads(line)
transformed = transform_record(record)
batch.append(transformed)
if len(batch) >= batch_size:
store_in_dynamodb(batch)
batch = []
# Process remaining records
if batch:
store_in_dynamodb(batch)
DynamoDB Design
Table Schema
resource "aws_dynamodb_table" "data" {
name = "event-data"
billing_mode = "PAY_PER_REQUEST"
hash_key = "pk"
range_key = "sk"
attribute {
name = "pk"
type = "S"
}
attribute {
name = "sk"
type = "S"
}
attribute {
name = "user_id"
type = "S"
}
attribute {
name = "event_type"
type = "S"
}
# GSI for querying by user
global_secondary_index {
name = "user-index"
hash_key = "user_id"
range_key = "sk"
projection_type = "ALL"
}
# GSI for querying by event type
global_secondary_index {
name = "type-index"
hash_key = "event_type"
range_key = "sk"
projection_type = "KEYS_ONLY"
}
# TTL for automatic cleanup
ttl {
attribute_name = "ttl"
enabled = true
}
# Enable streams for downstream processing
stream_enabled = true
stream_view_type = "NEW_AND_OLD_IMAGES"
point_in_time_recovery {
enabled = true
}
tags = var.tags
}
Query Patterns
from boto3.dynamodb.conditions import Key, Attr
def get_user_events(user_id: str, start_date: str, end_date: str) -> List[dict]:
"""Query events for a specific user in a date range."""
response = table.query(
IndexName='user-index',
KeyConditionExpression=Key('user_id').eq(user_id) & Key('sk').between(start_date, end_date),
ScanIndexForward=False, # Newest first
Limit=100
)
return response['Items']
def get_events_by_type(event_type: str, limit: int = 50) -> List[dict]:
"""Get recent events of a specific type."""
response = table.query(
IndexName='type-index',
KeyConditionExpression=Key('event_type').eq(event_type),
ScanIndexForward=False,
Limit=limit
)
# Fetch full items (GSI is KEYS_ONLY)
keys = [{'pk': item['pk'], 'sk': item['sk']} for item in response['Items']]
if not keys:
return []
batch_response = dynamodb.batch_get_item(
RequestItems={
table.name: {'Keys': keys}
}
)
return batch_response['Responses'][table.name]
def update_event_status(event_id: str, timestamp: str, status: str):
"""Update event status with optimistic locking."""
table.update_item(
Key={'pk': f"EVENT#{event_id}", 'sk': timestamp},
UpdateExpression='SET #status = :status, updated_at = :ts',
ConditionExpression='attribute_exists(pk)',
ExpressionAttributeNames={'#status': 'status'},
ExpressionAttributeValues={
':status': status,
':ts': datetime.utcnow().isoformat()
}
)
DynamoDB Streams to Lambda
Processing Changes
resource "aws_lambda_event_source_mapping" "dynamodb_stream" {
event_source_arn = aws_dynamodb_table.data.stream_arn
function_name = aws_lambda_function.stream_processor.arn
starting_position = "LATEST"
batch_size = 100
maximum_batching_window_in_seconds = 5
maximum_retry_attempts = 3
filter_criteria {
filter {
pattern = jsonencode({
eventName = ["INSERT", "MODIFY"]
dynamodb = {
NewImage = {
event_type = {
S = ["order_placed", "payment_completed"]
}
}
}
})
}
}
destination_config {
on_failure {
destination_arn = aws_sqs_queue.dlq.arn
}
}
}
Stream Handler
def stream_handler(event, context):
"""Process DynamoDB stream events."""
for record in event['Records']:
event_name = record['eventName']
if event_name == 'INSERT':
handle_new_event(record['dynamodb']['NewImage'])
elif event_name == 'MODIFY':
handle_updated_event(
record['dynamodb']['OldImage'],
record['dynamodb']['NewImage']
)
elif event_name == 'REMOVE':
handle_deleted_event(record['dynamodb']['OldImage'])
def handle_new_event(new_image: dict):
"""Handle new event created."""
event = deserialize_dynamodb_item(new_image)
logger.info(f"New event: {event['pk']}")
# Send to analytics
send_to_kinesis(event)
# Trigger notifications if needed
if event.get('event_type') == 'order_placed':
send_notification(event)
def deserialize_dynamodb_item(item: dict) -> dict:
"""Convert DynamoDB format to Python dict."""
from boto3.dynamodb.types import TypeDeserializer
deserializer = TypeDeserializer()
return {k: deserializer.deserialize(v) for k, v in item.items()}
Batch Processing with S3 Select
def query_s3_with_select(bucket: str, key: str, query: str) -> List[dict]:
"""Use S3 Select to query data without downloading entire file."""
response = s3.select_object_content(
Bucket=bucket,
Key=key,
ExpressionType='SQL',
Expression=query,
InputSerialization={
'JSON': {'Type': 'LINES'},
'CompressionType': 'GZIP'
},
OutputSerialization={'JSON': {}}
)
results = []
for event in response['Payload']:
if 'Records' in event:
records = event['Records']['Payload'].decode('utf-8')
for line in records.strip().split('\n'):
if line:
results.append(json.loads(line))
return results
# Example: Get all error events from a log file
def get_errors_from_logs(bucket: str, key: str) -> List[dict]:
query = """
SELECT *
FROM s3object s
WHERE s.level = 'ERROR'
"""
return query_s3_with_select(bucket, key, query)
Aggregation Pipeline
# Aggregation Lambda triggered on schedule
resource "aws_cloudwatch_event_rule" "hourly_aggregation" {
name = "hourly-aggregation"
schedule_expression = "rate(1 hour)"
}
resource "aws_cloudwatch_event_target" "aggregation" {
rule = aws_cloudwatch_event_rule.hourly_aggregation.name
arn = aws_lambda_function.aggregator.arn
}
def aggregation_handler(event, context):
"""Hourly aggregation of event data."""
end_time = datetime.utcnow().replace(minute=0, second=0, microsecond=0)
start_time = end_time - timedelta(hours=1)
# Query all events in the hour
events = query_events_in_range(start_time, end_time)
# Calculate aggregates
aggregates = calculate_aggregates(events)
# Store aggregates
store_aggregates(aggregates, start_time)
logger.info(f"Aggregated {len(events)} events for {start_time}")
def calculate_aggregates(events: List[dict]) -> dict:
"""Calculate aggregate metrics."""
from collections import Counter
return {
'total_events': len(events),
'events_by_type': dict(Counter(e['event_type'] for e in events)),
'unique_users': len(set(e.get('user_id') for e in events if e.get('user_id'))),
'metrics': {
'avg_duration': sum(e.get('duration', 0) for e in events) / len(events) if events else 0,
'total_value': sum(e.get('value', 0) for e in events)
}
}
def store_aggregates(aggregates: dict, timestamp: datetime):
"""Store hourly aggregates."""
# DynamoDB for quick access
table.put_item(Item={
'pk': 'AGGREGATE',
'sk': timestamp.isoformat(),
**{k: Decimal(str(v)) if isinstance(v, float) else v for k, v in aggregates.items()},
'ttl': int((timestamp + timedelta(days=90)).timestamp())
})
# S3 for long-term storage
s3.put_object(
Bucket=os.environ['AGGREGATES_BUCKET'],
Key=f"aggregates/{timestamp.year}/{timestamp.month:02d}/{timestamp.day:02d}/{timestamp.hour:02d}.json",
Body=json.dumps(aggregates, default=str),
ContentType='application/json'
)
Error Handling and Retry
from botocore.config import Config
from botocore.exceptions import ClientError
import time
# Configure retry behavior
config = Config(
retries={
'max_attempts': 3,
'mode': 'adaptive'
}
)
dynamodb = boto3.resource('dynamodb', config=config)
def write_with_exponential_backoff(items: List[dict], max_retries: int = 5):
"""Write items with exponential backoff for throttling."""
unprocessed = items
for attempt in range(max_retries):
if not unprocessed:
break
try:
with table.batch_writer() as batch:
for item in unprocessed:
batch.put_item(Item=item)
unprocessed = []
except ClientError as e:
if e.response['Error']['Code'] == 'ProvisionedThroughputExceededException':
wait_time = (2 ** attempt) + random.uniform(0, 1)
logger.warning(f"Throttled, waiting {wait_time:.2f}s (attempt {attempt + 1})")
time.sleep(wait_time)
else:
raise
if unprocessed:
raise Exception(f"Failed to write {len(unprocessed)} items after {max_retries} attempts")
Monitoring
resource "aws_cloudwatch_metric_alarm" "processing_errors" {
alarm_name = "data-processor-errors"
comparison_operator = "GreaterThanThreshold"
evaluation_periods = 2
metric_name = "Errors"
namespace = "AWS/Lambda"
period = 300
statistic = "Sum"
threshold = 5
dimensions = {
FunctionName = aws_lambda_function.processor.function_name
}
alarm_actions = [aws_sns_topic.alerts.arn]
}
resource "aws_cloudwatch_metric_alarm" "dynamodb_throttling" {
alarm_name = "dynamodb-write-throttling"
comparison_operator = "GreaterThanThreshold"
evaluation_periods = 2
metric_name = "WriteThrottledRequests"
namespace = "AWS/DynamoDB"
period = 300
statistic = "Sum"
threshold = 0
dimensions = {
TableName = aws_dynamodb_table.data.name
}
alarm_actions = [aws_sns_topic.alerts.arn]
}
Key Takeaways
- Use S3 lifecycle policies — automatically tier old data to cheaper storage
- Partition DynamoDB data — design keys for your access patterns
- Limit Lambda concurrency — prevent overwhelming DynamoDB
- Use batch operations — more efficient than single-item writes
- S3 Select for filtering — query data without downloading entire files
“Serverless data pipelines are like LEGO blocks — simple pieces that combine into powerful systems.”