Serverless data processing scales automatically, costs nothing when idle, and eliminates infrastructure management. S3 as the data lake, Lambda for transformations, and DynamoDB for fast access — this trio handles everything from log processing to real-time analytics. Here’s how to build it.

Architecture Overview

┌─────────────┐     ┌─────────────┐     ┌─────────────┐
│   Data      │────►│   S3        │────►│   Lambda    │
│   Sources   │     │  Raw Data   │     │  Transform  │
└─────────────┘     └─────────────┘     └──────┬──────┘

                    ┌──────────────────────────┼──────────────────────────┐
                    ▼                          ▼                          ▼
             ┌─────────────┐           ┌─────────────┐           ┌─────────────┐
             │   S3        │           │  DynamoDB   │           │   Athena/   │
             │  Processed  │           │  Hot Data   │           │  Redshift   │
             └─────────────┘           └─────────────┘           └─────────────┘

S3 Event Processing

Terraform Setup

# Raw data bucket
resource "aws_s3_bucket" "raw" {
  bucket = "my-app-raw-data-${var.environment}"

  tags = var.tags
}

resource "aws_s3_bucket_versioning" "raw" {
  bucket = aws_s3_bucket.raw.id
  versioning_configuration {
    status = "Enabled"
  }
}

# Processed data bucket
resource "aws_s3_bucket" "processed" {
  bucket = "my-app-processed-${var.environment}"

  tags = var.tags
}

# Lambda function
resource "aws_lambda_function" "processor" {
  function_name = "data-processor"
  runtime       = "python3.12"
  handler       = "handler.process"
  timeout       = 300
  memory_size   = 1024

  filename         = data.archive_file.lambda.output_path
  source_code_hash = data.archive_file.lambda.output_base64sha256
  role             = aws_iam_role.lambda.arn

  environment {
    variables = {
      PROCESSED_BUCKET = aws_s3_bucket.processed.id
      TABLE_NAME       = aws_dynamodb_table.data.name
      LOG_LEVEL        = "INFO"
    }
  }

  reserved_concurrent_executions = 50  # Prevent DynamoDB throttling

  tags = var.tags
}

# S3 trigger
resource "aws_s3_bucket_notification" "raw" {
  bucket = aws_s3_bucket.raw.id

  lambda_function {
    lambda_function_arn = aws_lambda_function.processor.arn
    events              = ["s3:ObjectCreated:*"]
    filter_prefix       = "incoming/"
    filter_suffix       = ".json"
  }

  depends_on = [aws_lambda_permission.s3]
}

resource "aws_lambda_permission" "s3" {
  statement_id  = "AllowS3"
  action        = "lambda:InvokeFunction"
  function_name = aws_lambda_function.processor.function_name
  principal     = "s3.amazonaws.com"
  source_arn    = aws_s3_bucket.raw.arn
}

Lambda Processor

import json
import os
import boto3
import logging
from typing import List, Dict
from datetime import datetime
from decimal import Decimal

logger = logging.getLogger()
logger.setLevel(os.environ.get('LOG_LEVEL', 'INFO'))

s3 = boto3.client('s3')
dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table(os.environ['TABLE_NAME'])

PROCESSED_BUCKET = os.environ['PROCESSED_BUCKET']


def handler(event, context):
    """Process S3 events."""
    processed = 0
    failed = 0

    for record in event['Records']:
        try:
            process_s3_record(record)
            processed += 1
        except Exception as e:
            logger.error(f"Failed to process record: {e}", exc_info=True)
            failed += 1

    logger.info(f"Processed: {processed}, Failed: {failed}")

    if failed > 0:
        raise Exception(f"Failed to process {failed} records")

    return {'processed': processed}


def process_s3_record(record: dict):
    """Process a single S3 event record."""
    bucket = record['s3']['bucket']['name']
    key = record['s3']['object']['key']

    logger.info(f"Processing s3://{bucket}/{key}")

    # Download file
    response = s3.get_object(Bucket=bucket, Key=key)
    raw_data = json.loads(response['Body'].read().decode('utf-8'))

    # Transform data
    transformed = transform_data(raw_data)

    # Store in DynamoDB (hot data)
    store_in_dynamodb(transformed['records'])

    # Store in S3 (cold data)
    store_in_s3(transformed, key)

    logger.info(f"Processed {len(transformed['records'])} records from {key}")


def transform_data(raw_data: dict) -> dict:
    """Transform raw data into processed format."""
    records = []

    for item in raw_data.get('events', []):
        record = {
            'pk': f"EVENT#{item['event_id']}",
            'sk': item['timestamp'],
            'event_type': item['type'],
            'user_id': item.get('user_id'),
            'data': item.get('data', {}),
            'processed_at': datetime.utcnow().isoformat(),
            # Convert floats to Decimal for DynamoDB
            'metrics': {k: Decimal(str(v)) for k, v in item.get('metrics', {}).items()}
        }
        records.append(record)

    return {
        'records': records,
        'summary': {
            'total_events': len(records),
            'event_types': list(set(r['event_type'] for r in records)),
            'processed_at': datetime.utcnow().isoformat()
        }
    }


def store_in_dynamodb(records: List[dict]):
    """Batch write records to DynamoDB."""
    with table.batch_writer() as batch:
        for record in records:
            batch.put_item(Item=record)


def store_in_s3(transformed: dict, original_key: str):
    """Store processed data in S3."""
    # Generate output key
    date = datetime.utcnow()
    output_key = f"processed/year={date.year}/month={date.month:02d}/day={date.day:02d}/{os.path.basename(original_key)}"

    # Convert Decimals back to floats for JSON
    output_data = json.loads(
        json.dumps(transformed, default=str)
    )

    s3.put_object(
        Bucket=PROCESSED_BUCKET,
        Key=output_key,
        Body=json.dumps(output_data),
        ContentType='application/json'
    )


# For large files, use streaming
def process_large_file(bucket: str, key: str):
    """Process large files line by line."""
    import gzip

    response = s3.get_object(Bucket=bucket, Key=key)

    # Handle gzipped files
    if key.endswith('.gz'):
        body = gzip.GzipFile(fileobj=response['Body'])
    else:
        body = response['Body']

    batch = []
    batch_size = 25  # DynamoDB batch write limit

    for line in body.iter_lines():
        record = json.loads(line)
        transformed = transform_record(record)
        batch.append(transformed)

        if len(batch) >= batch_size:
            store_in_dynamodb(batch)
            batch = []

    # Process remaining records
    if batch:
        store_in_dynamodb(batch)

DynamoDB Design

Table Schema

resource "aws_dynamodb_table" "data" {
  name         = "event-data"
  billing_mode = "PAY_PER_REQUEST"
  hash_key     = "pk"
  range_key    = "sk"

  attribute {
    name = "pk"
    type = "S"
  }

  attribute {
    name = "sk"
    type = "S"
  }

  attribute {
    name = "user_id"
    type = "S"
  }

  attribute {
    name = "event_type"
    type = "S"
  }

  # GSI for querying by user
  global_secondary_index {
    name            = "user-index"
    hash_key        = "user_id"
    range_key       = "sk"
    projection_type = "ALL"
  }

  # GSI for querying by event type
  global_secondary_index {
    name            = "type-index"
    hash_key        = "event_type"
    range_key       = "sk"
    projection_type = "KEYS_ONLY"
  }

  # TTL for automatic cleanup
  ttl {
    attribute_name = "ttl"
    enabled        = true
  }

  # Enable streams for downstream processing
  stream_enabled   = true
  stream_view_type = "NEW_AND_OLD_IMAGES"

  point_in_time_recovery {
    enabled = true
  }

  tags = var.tags
}

Query Patterns

from boto3.dynamodb.conditions import Key, Attr

def get_user_events(user_id: str, start_date: str, end_date: str) -> List[dict]:
    """Query events for a specific user in a date range."""
    response = table.query(
        IndexName='user-index',
        KeyConditionExpression=Key('user_id').eq(user_id) & Key('sk').between(start_date, end_date),
        ScanIndexForward=False,  # Newest first
        Limit=100
    )
    return response['Items']


def get_events_by_type(event_type: str, limit: int = 50) -> List[dict]:
    """Get recent events of a specific type."""
    response = table.query(
        IndexName='type-index',
        KeyConditionExpression=Key('event_type').eq(event_type),
        ScanIndexForward=False,
        Limit=limit
    )

    # Fetch full items (GSI is KEYS_ONLY)
    keys = [{'pk': item['pk'], 'sk': item['sk']} for item in response['Items']]

    if not keys:
        return []

    batch_response = dynamodb.batch_get_item(
        RequestItems={
            table.name: {'Keys': keys}
        }
    )

    return batch_response['Responses'][table.name]


def update_event_status(event_id: str, timestamp: str, status: str):
    """Update event status with optimistic locking."""
    table.update_item(
        Key={'pk': f"EVENT#{event_id}", 'sk': timestamp},
        UpdateExpression='SET #status = :status, updated_at = :ts',
        ConditionExpression='attribute_exists(pk)',
        ExpressionAttributeNames={'#status': 'status'},
        ExpressionAttributeValues={
            ':status': status,
            ':ts': datetime.utcnow().isoformat()
        }
    )

DynamoDB Streams to Lambda

Processing Changes

resource "aws_lambda_event_source_mapping" "dynamodb_stream" {
  event_source_arn  = aws_dynamodb_table.data.stream_arn
  function_name     = aws_lambda_function.stream_processor.arn
  starting_position = "LATEST"
  batch_size        = 100

  maximum_batching_window_in_seconds = 5
  maximum_retry_attempts             = 3

  filter_criteria {
    filter {
      pattern = jsonencode({
        eventName = ["INSERT", "MODIFY"]
        dynamodb = {
          NewImage = {
            event_type = {
              S = ["order_placed", "payment_completed"]
            }
          }
        }
      })
    }
  }

  destination_config {
    on_failure {
      destination_arn = aws_sqs_queue.dlq.arn
    }
  }
}

Stream Handler

def stream_handler(event, context):
    """Process DynamoDB stream events."""
    for record in event['Records']:
        event_name = record['eventName']

        if event_name == 'INSERT':
            handle_new_event(record['dynamodb']['NewImage'])
        elif event_name == 'MODIFY':
            handle_updated_event(
                record['dynamodb']['OldImage'],
                record['dynamodb']['NewImage']
            )
        elif event_name == 'REMOVE':
            handle_deleted_event(record['dynamodb']['OldImage'])


def handle_new_event(new_image: dict):
    """Handle new event created."""
    event = deserialize_dynamodb_item(new_image)
    logger.info(f"New event: {event['pk']}")

    # Send to analytics
    send_to_kinesis(event)

    # Trigger notifications if needed
    if event.get('event_type') == 'order_placed':
        send_notification(event)


def deserialize_dynamodb_item(item: dict) -> dict:
    """Convert DynamoDB format to Python dict."""
    from boto3.dynamodb.types import TypeDeserializer
    deserializer = TypeDeserializer()
    return {k: deserializer.deserialize(v) for k, v in item.items()}

Batch Processing with S3 Select

def query_s3_with_select(bucket: str, key: str, query: str) -> List[dict]:
    """Use S3 Select to query data without downloading entire file."""
    response = s3.select_object_content(
        Bucket=bucket,
        Key=key,
        ExpressionType='SQL',
        Expression=query,
        InputSerialization={
            'JSON': {'Type': 'LINES'},
            'CompressionType': 'GZIP'
        },
        OutputSerialization={'JSON': {}}
    )

    results = []
    for event in response['Payload']:
        if 'Records' in event:
            records = event['Records']['Payload'].decode('utf-8')
            for line in records.strip().split('\n'):
                if line:
                    results.append(json.loads(line))

    return results


# Example: Get all error events from a log file
def get_errors_from_logs(bucket: str, key: str) -> List[dict]:
    query = """
    SELECT *
    FROM s3object s
    WHERE s.level = 'ERROR'
    """
    return query_s3_with_select(bucket, key, query)

Aggregation Pipeline

# Aggregation Lambda triggered on schedule
resource "aws_cloudwatch_event_rule" "hourly_aggregation" {
  name                = "hourly-aggregation"
  schedule_expression = "rate(1 hour)"
}

resource "aws_cloudwatch_event_target" "aggregation" {
  rule = aws_cloudwatch_event_rule.hourly_aggregation.name
  arn  = aws_lambda_function.aggregator.arn
}
def aggregation_handler(event, context):
    """Hourly aggregation of event data."""
    end_time = datetime.utcnow().replace(minute=0, second=0, microsecond=0)
    start_time = end_time - timedelta(hours=1)

    # Query all events in the hour
    events = query_events_in_range(start_time, end_time)

    # Calculate aggregates
    aggregates = calculate_aggregates(events)

    # Store aggregates
    store_aggregates(aggregates, start_time)

    logger.info(f"Aggregated {len(events)} events for {start_time}")


def calculate_aggregates(events: List[dict]) -> dict:
    """Calculate aggregate metrics."""
    from collections import Counter

    return {
        'total_events': len(events),
        'events_by_type': dict(Counter(e['event_type'] for e in events)),
        'unique_users': len(set(e.get('user_id') for e in events if e.get('user_id'))),
        'metrics': {
            'avg_duration': sum(e.get('duration', 0) for e in events) / len(events) if events else 0,
            'total_value': sum(e.get('value', 0) for e in events)
        }
    }


def store_aggregates(aggregates: dict, timestamp: datetime):
    """Store hourly aggregates."""
    # DynamoDB for quick access
    table.put_item(Item={
        'pk': 'AGGREGATE',
        'sk': timestamp.isoformat(),
        **{k: Decimal(str(v)) if isinstance(v, float) else v for k, v in aggregates.items()},
        'ttl': int((timestamp + timedelta(days=90)).timestamp())
    })

    # S3 for long-term storage
    s3.put_object(
        Bucket=os.environ['AGGREGATES_BUCKET'],
        Key=f"aggregates/{timestamp.year}/{timestamp.month:02d}/{timestamp.day:02d}/{timestamp.hour:02d}.json",
        Body=json.dumps(aggregates, default=str),
        ContentType='application/json'
    )

Error Handling and Retry

from botocore.config import Config
from botocore.exceptions import ClientError
import time

# Configure retry behavior
config = Config(
    retries={
        'max_attempts': 3,
        'mode': 'adaptive'
    }
)

dynamodb = boto3.resource('dynamodb', config=config)


def write_with_exponential_backoff(items: List[dict], max_retries: int = 5):
    """Write items with exponential backoff for throttling."""
    unprocessed = items

    for attempt in range(max_retries):
        if not unprocessed:
            break

        try:
            with table.batch_writer() as batch:
                for item in unprocessed:
                    batch.put_item(Item=item)
            unprocessed = []

        except ClientError as e:
            if e.response['Error']['Code'] == 'ProvisionedThroughputExceededException':
                wait_time = (2 ** attempt) + random.uniform(0, 1)
                logger.warning(f"Throttled, waiting {wait_time:.2f}s (attempt {attempt + 1})")
                time.sleep(wait_time)
            else:
                raise

    if unprocessed:
        raise Exception(f"Failed to write {len(unprocessed)} items after {max_retries} attempts")

Monitoring

resource "aws_cloudwatch_metric_alarm" "processing_errors" {
  alarm_name          = "data-processor-errors"
  comparison_operator = "GreaterThanThreshold"
  evaluation_periods  = 2
  metric_name         = "Errors"
  namespace           = "AWS/Lambda"
  period              = 300
  statistic           = "Sum"
  threshold           = 5

  dimensions = {
    FunctionName = aws_lambda_function.processor.function_name
  }

  alarm_actions = [aws_sns_topic.alerts.arn]
}

resource "aws_cloudwatch_metric_alarm" "dynamodb_throttling" {
  alarm_name          = "dynamodb-write-throttling"
  comparison_operator = "GreaterThanThreshold"
  evaluation_periods  = 2
  metric_name         = "WriteThrottledRequests"
  namespace           = "AWS/DynamoDB"
  period              = 300
  statistic           = "Sum"
  threshold           = 0

  dimensions = {
    TableName = aws_dynamodb_table.data.name
  }

  alarm_actions = [aws_sns_topic.alerts.arn]
}

Key Takeaways

  1. Use S3 lifecycle policies — automatically tier old data to cheaper storage
  2. Partition DynamoDB data — design keys for your access patterns
  3. Limit Lambda concurrency — prevent overwhelming DynamoDB
  4. Use batch operations — more efficient than single-item writes
  5. S3 Select for filtering — query data without downloading entire files

“Serverless data pipelines are like LEGO blocks — simple pieces that combine into powerful systems.”