AWS Lambda enables event-driven architectures that scale automatically and cost nothing when idle. But serverless has pitfalls — cold starts, execution limits, and debugging challenges. This guide covers patterns that work at scale.

Lambda Anatomy

# handler.py
import json
import logging

logger = logging.getLogger()
logger.setLevel(logging.INFO)

# Code outside handler runs once per container (cold start)
# Use for initialization: DB connections, loading configs
import boto3
dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table('my-table')

def handler(event, context):
    """
    event: Input data (varies by trigger)
    context: Runtime info (request ID, remaining time, etc.)
    """
    logger.info(f"Processing request {context.aws_request_id}")
    
    try:
        # Your business logic
        result = process_event(event)
        
        return {
            'statusCode': 200,
            'headers': {'Content-Type': 'application/json'},
            'body': json.dumps(result)
        }
    except ValueError as e:
        logger.warning(f"Invalid input: {e}")
        return {'statusCode': 400, 'body': json.dumps({'error': str(e)})}
    except Exception as e:
        logger.error(f"Unexpected error: {e}", exc_info=True)
        return {'statusCode': 500, 'body': json.dumps({'error': 'Internal error'})}

Infrastructure as Code with Terraform

# Lambda function
resource "aws_lambda_function" "api" {
  function_name = "my-api"
  runtime       = "python3.12"
  handler       = "handler.handler"
  timeout       = 30
  memory_size   = 512

  filename         = data.archive_file.lambda_zip.output_path
  source_code_hash = data.archive_file.lambda_zip.output_base64sha256

  role = aws_iam_role.lambda_role.arn

  environment {
    variables = {
      TABLE_NAME   = aws_dynamodb_table.main.name
      LOG_LEVEL    = "INFO"
      ENVIRONMENT  = var.environment
    }
  }

  # VPC config (if needed)
  vpc_config {
    subnet_ids         = var.private_subnet_ids
    security_group_ids = [aws_security_group.lambda.id]
  }

  # Reserved concurrency (optional)
  reserved_concurrent_executions = 100

  # Enable X-Ray tracing
  tracing_config {
    mode = "Active"
  }

  tags = var.tags
}

# IAM role
resource "aws_iam_role" "lambda_role" {
  name = "my-api-lambda-role"

  assume_role_policy = jsonencode({
    Version = "2012-10-17"
    Statement = [{
      Action = "sts:AssumeRole"
      Effect = "Allow"
      Principal = {
        Service = "lambda.amazonaws.com"
      }
    }]
  })
}

# Permissions
resource "aws_iam_role_policy" "lambda_policy" {
  name = "my-api-lambda-policy"
  role = aws_iam_role.lambda_role.id

  policy = jsonencode({
    Version = "2012-10-17"
    Statement = [
      {
        Effect = "Allow"
        Action = [
          "logs:CreateLogGroup",
          "logs:CreateLogStream",
          "logs:PutLogEvents"
        ]
        Resource = "arn:aws:logs:*:*:*"
      },
      {
        Effect = "Allow"
        Action = [
          "dynamodb:GetItem",
          "dynamodb:PutItem",
          "dynamodb:Query"
        ]
        Resource = aws_dynamodb_table.main.arn
      },
      {
        Effect = "Allow"
        Action = [
          "xray:PutTraceSegments",
          "xray:PutTelemetryRecords"
        ]
        Resource = "*"
      }
    ]
  })
}

Event-Driven Patterns

API Gateway Integration

# API Gateway HTTP API
resource "aws_apigatewayv2_api" "main" {
  name          = "my-api"
  protocol_type = "HTTP"

  cors_configuration {
    allow_origins = ["https://myapp.com"]
    allow_methods = ["GET", "POST", "PUT", "DELETE"]
    allow_headers = ["Content-Type", "Authorization"]
    max_age       = 3600
  }
}

resource "aws_apigatewayv2_integration" "lambda" {
  api_id                 = aws_apigatewayv2_api.main.id
  integration_type       = "AWS_PROXY"
  integration_uri        = aws_lambda_function.api.invoke_arn
  payload_format_version = "2.0"
}

resource "aws_apigatewayv2_route" "api" {
  api_id    = aws_apigatewayv2_api.main.id
  route_key = "ANY /api/{proxy+}"
  target    = "integrations/${aws_apigatewayv2_integration.lambda.id}"
}

resource "aws_lambda_permission" "api_gateway" {
  statement_id  = "AllowAPIGateway"
  action        = "lambda:InvokeFunction"
  function_name = aws_lambda_function.api.function_name
  principal     = "apigateway.amazonaws.com"
  source_arn    = "${aws_apigatewayv2_api.main.execution_arn}/*/*"
}

SQS Queue Processing

# handler.py
import json

def handler(event, context):
    """Process SQS messages in batches"""
    failed_message_ids = []
    
    for record in event['Records']:
        try:
            body = json.loads(record['body'])
            process_message(body)
        except Exception as e:
            print(f"Failed to process message {record['messageId']}: {e}")
            failed_message_ids.append(record['messageId'])
    
    # Partial batch failure response
    if failed_message_ids:
        return {
            'batchItemFailures': [
                {'itemIdentifier': msg_id} 
                for msg_id in failed_message_ids
            ]
        }
    
    return {'batchItemFailures': []}
# SQS trigger
resource "aws_lambda_event_source_mapping" "sqs" {
  event_source_arn                   = aws_sqs_queue.main.arn
  function_name                      = aws_lambda_function.worker.arn
  batch_size                         = 10
  maximum_batching_window_in_seconds = 5

  # Enable partial batch failure
  function_response_types = ["ReportBatchItemFailures"]

  # Scaling configuration
  scaling_config {
    maximum_concurrency = 50
  }
}

S3 Event Processing

# handler.py
import boto3
import urllib.parse

s3 = boto3.client('s3')

def handler(event, context):
    for record in event['Records']:
        bucket = record['s3']['bucket']['name']
        key = urllib.parse.unquote_plus(record['s3']['object']['key'])
        
        print(f"Processing s3://{bucket}/{key}")
        
        # Download file
        response = s3.get_object(Bucket=bucket, Key=key)
        content = response['Body'].read()
        
        # Process and upload result
        result = process_file(content)
        s3.put_object(
            Bucket=bucket,
            Key=f"processed/{key}",
            Body=result
        )

EventBridge Patterns

# EventBridge rule
resource "aws_cloudwatch_event_rule" "order_created" {
  name = "order-created"

  event_pattern = jsonencode({
    source      = ["myapp.orders"]
    detail-type = ["Order Created"]
  })
}

resource "aws_cloudwatch_event_target" "process_order" {
  rule = aws_cloudwatch_event_rule.order_created.name
  arn  = aws_lambda_function.process_order.arn
}

# DLQ for failed events
resource "aws_cloudwatch_event_target" "dlq" {
  rule = aws_cloudwatch_event_rule.order_created.name
  arn  = aws_sqs_queue.dlq.arn

  dead_letter_config {
    arn = aws_sqs_queue.dlq.arn
  }
}

Cold Start Optimization

Provisioned Concurrency

resource "aws_lambda_alias" "live" {
  name             = "live"
  function_name    = aws_lambda_function.api.function_name
  function_version = aws_lambda_function.api.version
}

resource "aws_lambda_provisioned_concurrency_config" "api" {
  function_name                     = aws_lambda_function.api.function_name
  qualifier                         = aws_lambda_alias.live.name
  provisioned_concurrent_executions = 5
}

Minimize Package Size

# Use multi-stage build for dependencies
FROM public.ecr.aws/lambda/python:3.12 as builder

COPY requirements.txt .
RUN pip install --target /asset -r requirements.txt

FROM public.ecr.aws/lambda/python:3.12

COPY --from=builder /asset ${LAMBDA_TASK_ROOT}
COPY handler.py ${LAMBDA_TASK_ROOT}

CMD ["handler.handler"]

Lazy Loading

# Don't load at module level if not always needed
_heavy_model = None

def get_model():
    global _heavy_model
    if _heavy_model is None:
        _heavy_model = load_ml_model()  # Only load when first needed
    return _heavy_model

def handler(event, context):
    if event.get('warmup'):
        return {'statusCode': 200}  # Don't load model for warmup
    
    model = get_model()
    # Use model...

Error Handling Patterns

Dead Letter Queues

resource "aws_lambda_function" "worker" {
  # ...

  dead_letter_config {
    target_arn = aws_sqs_queue.dlq.arn
  }
}

resource "aws_sqs_queue" "dlq" {
  name = "worker-dlq"
  message_retention_seconds = 1209600  # 14 days
}

Retry with Exponential Backoff

import time
import random

def retry_with_backoff(func, max_retries=3, base_delay=1):
    for attempt in range(max_retries):
        try:
            return func()
        except Exception as e:
            if attempt == max_retries - 1:
                raise
            delay = base_delay * (2 ** attempt) + random.uniform(0, 1)
            print(f"Attempt {attempt + 1} failed, retrying in {delay:.2f}s")
            time.sleep(delay)

Idempotency

import hashlib
import boto3

dynamodb = boto3.resource('dynamodb')
idempotency_table = dynamodb.Table('idempotency')

def handler(event, context):
    # Create idempotency key from event
    event_hash = hashlib.sha256(
        json.dumps(event, sort_keys=True).encode()
    ).hexdigest()
    
    # Check if already processed
    try:
        response = idempotency_table.get_item(Key={'id': event_hash})
        if 'Item' in response:
            print(f"Already processed: {event_hash}")
            return response['Item']['result']
    except Exception:
        pass
    
    # Process event
    result = process_event(event)
    
    # Store result
    idempotency_table.put_item(
        Item={
            'id': event_hash,
            'result': result,
            'ttl': int(time.time()) + 86400  # 24 hour TTL
        }
    )
    
    return result

Observability

Structured Logging

import json
import logging

class JsonFormatter(logging.Formatter):
    def format(self, record):
        log_record = {
            'timestamp': self.formatTime(record),
            'level': record.levelname,
            'message': record.getMessage(),
            'function': record.funcName,
        }
        if hasattr(record, 'request_id'):
            log_record['request_id'] = record.request_id
        if record.exc_info:
            log_record['exception'] = self.formatException(record.exc_info)
        return json.dumps(log_record)

logger = logging.getLogger()
handler = logging.StreamHandler()
handler.setFormatter(JsonFormatter())
logger.addHandler(handler)
logger.setLevel(logging.INFO)

def handler(event, context):
    # Add request ID to all logs
    logger = logging.LoggerAdapter(
        logging.getLogger(),
        {'request_id': context.aws_request_id}
    )
    
    logger.info("Processing event", extra={'event_type': event.get('type')})

Custom Metrics

import boto3

cloudwatch = boto3.client('cloudwatch')

def publish_metric(name, value, unit='Count'):
    cloudwatch.put_metric_data(
        Namespace='MyApp',
        MetricData=[{
            'MetricName': name,
            'Value': value,
            'Unit': unit,
            'Dimensions': [
                {'Name': 'FunctionName', 'Value': os.environ['AWS_LAMBDA_FUNCTION_NAME']},
                {'Name': 'Environment', 'Value': os.environ.get('ENVIRONMENT', 'unknown')}
            ]
        }]
    )

def handler(event, context):
    start = time.time()
    
    try:
        result = process_event(event)
        publish_metric('ProcessedEvents', 1)
        return result
    except Exception as e:
        publish_metric('FailedEvents', 1)
        raise
    finally:
        duration = (time.time() - start) * 1000
        publish_metric('ProcessingTime', duration, 'Milliseconds')

Testing Lambda Functions

Unit Tests

# test_handler.py
import pytest
from unittest.mock import patch, MagicMock
from handler import handler

@pytest.fixture
def mock_context():
    context = MagicMock()
    context.aws_request_id = 'test-request-id'
    context.get_remaining_time_in_millis.return_value = 30000
    return context

def test_handler_success(mock_context):
    event = {'body': '{"name": "test"}'}
    
    with patch('handler.table') as mock_table:
        mock_table.put_item.return_value = {}
        
        response = handler(event, mock_context)
        
        assert response['statusCode'] == 200
        mock_table.put_item.assert_called_once()

def test_handler_validation_error(mock_context):
    event = {'body': 'invalid json'}
    
    response = handler(event, mock_context)
    
    assert response['statusCode'] == 400

Local Testing with SAM

# template.yaml
AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31

Globals:
  Function:
    Timeout: 30
    Runtime: python3.12

Resources:
  ApiFunction:
    Type: AWS::Serverless::Function
    Properties:
      Handler: handler.handler
      CodeUri: ./src
      Events:
        Api:
          Type: Api
          Properties:
            Path: /api/{proxy+}
            Method: ANY
# Start local API
sam local start-api

# Invoke with event
sam local invoke -e event.json

Key Takeaways

  1. Initialize outside handler — DB connections, SDK clients
  2. Use provisioned concurrency for latency-sensitive workloads
  3. Implement idempotency — events can be delivered multiple times
  4. Set appropriate timeouts — default 3s is too short for most workloads
  5. Use DLQs — never lose events silently
  6. Structured logging — JSON logs are queryable
  7. Right-size memory — more memory = more CPU = sometimes cheaper

“Serverless doesn’t mean worry-less. You’re not managing servers, but you’re still managing distributed systems.”