Event-driven architecture decouples producers from consumers, enabling systems that scale independently and fail gracefully. Lambda + SQS is the workhorse of serverless event processing on AWS. Here’s how to do it right.

Architecture Overview

┌─────────────┐     ┌─────────────┐     ┌─────────────┐
│   Producer  │────►│    SQS      │────►│   Lambda    │
│  (API, S3,  │     │   Queue     │     │  Consumer   │
│   Events)   │     │             │     │             │
└─────────────┘     └──────┬──────┘     └──────┬──────┘
                           │                    │
                           │ Failed msgs        │ Process
                           ▼                    ▼
                    ┌─────────────┐     ┌─────────────┐
                    │   Dead      │     │  DynamoDB/  │
                    │   Letter    │     │    S3/RDS   │
                    │   Queue     │     │             │
                    └─────────────┘     └─────────────┘

Setting Up SQS + Lambda

Terraform Infrastructure

# Main processing queue
resource "aws_sqs_queue" "main" {
  name                       = "orders-queue"
  visibility_timeout_seconds = 300  # 6x Lambda timeout
  message_retention_seconds  = 1209600  # 14 days
  receive_wait_time_seconds  = 20  # Long polling

  # Enable server-side encryption
  sqs_managed_sse_enabled = true

  # Or use KMS
  # kms_master_key_id = aws_kms_key.sqs.id

  redrive_policy = jsonencode({
    deadLetterTargetArn = aws_sqs_queue.dlq.arn
    maxReceiveCount     = 3
  })

  tags = var.tags
}

# Dead letter queue
resource "aws_sqs_queue" "dlq" {
  name                      = "orders-queue-dlq"
  message_retention_seconds = 1209600  # 14 days

  tags = var.tags
}

# Lambda function
resource "aws_lambda_function" "processor" {
  function_name = "order-processor"
  runtime       = "python3.12"
  handler       = "handler.process_order"
  timeout       = 50  # Queue visibility = 6x this
  memory_size   = 512

  filename         = data.archive_file.lambda.output_path
  source_code_hash = data.archive_file.lambda.output_base64sha256
  role             = aws_iam_role.lambda.arn

  environment {
    variables = {
      TABLE_NAME  = aws_dynamodb_table.orders.name
      QUEUE_URL   = aws_sqs_queue.main.url
      LOG_LEVEL   = "INFO"
    }
  }

  dead_letter_config {
    target_arn = aws_sqs_queue.dlq.arn
  }

  tracing_config {
    mode = "Active"
  }

  reserved_concurrent_executions = 100  # Limit concurrency

  tags = var.tags
}

# Event source mapping
resource "aws_lambda_event_source_mapping" "sqs" {
  event_source_arn                   = aws_sqs_queue.main.arn
  function_name                      = aws_lambda_function.processor.arn
  batch_size                         = 10
  maximum_batching_window_in_seconds = 5

  # Enable partial batch failure reporting
  function_response_types = ["ReportBatchItemFailures"]

  # Scaling configuration
  scaling_config {
    maximum_concurrency = 50
  }

  # Filter events (optional)
  filter_criteria {
    filter {
      pattern = jsonencode({
        body = {
          type = ["order_created", "order_updated"]
        }
      })
    }
  }
}

# IAM permissions
resource "aws_iam_role_policy" "lambda_sqs" {
  name = "lambda-sqs-policy"
  role = aws_iam_role.lambda.id

  policy = jsonencode({
    Version = "2012-10-17"
    Statement = [
      {
        Effect = "Allow"
        Action = [
          "sqs:ReceiveMessage",
          "sqs:DeleteMessage",
          "sqs:GetQueueAttributes",
          "sqs:ChangeMessageVisibility"
        ]
        Resource = aws_sqs_queue.main.arn
      },
      {
        Effect = "Allow"
        Action = [
          "sqs:SendMessage"
        ]
        Resource = aws_sqs_queue.dlq.arn
      }
    ]
  })
}

Lambda Handler with Batch Processing

Python Implementation

import json
import logging
import boto3
from dataclasses import dataclass
from typing import List, Optional

logger = logging.getLogger()
logger.setLevel(logging.INFO)

dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table(os.environ['TABLE_NAME'])

@dataclass
class ProcessingResult:
    message_id: str
    success: bool
    error: Optional[str] = None

def handler(event, context):
    """
    Process SQS messages with partial batch failure support.
    """
    logger.info(f"Processing {len(event['Records'])} messages")

    results: List[ProcessingResult] = []

    for record in event['Records']:
        result = process_message(record)
        results.append(result)

    # Return failed message IDs for retry
    failed_ids = [
        {'itemIdentifier': r.message_id}
        for r in results
        if not r.success
    ]

    if failed_ids:
        logger.warning(f"Failed to process {len(failed_ids)} messages")

    return {'batchItemFailures': failed_ids}


def process_message(record: dict) -> ProcessingResult:
    """
    Process a single SQS message.
    """
    message_id = record['messageId']

    try:
        body = json.loads(record['body'])
        logger.info(f"Processing message {message_id}: {body.get('type')}")

        # Validate message
        validate_message(body)

        # Process based on type
        if body['type'] == 'order_created':
            handle_order_created(body['data'])
        elif body['type'] == 'order_updated':
            handle_order_updated(body['data'])
        else:
            raise ValueError(f"Unknown message type: {body['type']}")

        return ProcessingResult(message_id=message_id, success=True)

    except json.JSONDecodeError as e:
        logger.error(f"Invalid JSON in message {message_id}: {e}")
        # Don't retry malformed messages
        return ProcessingResult(message_id=message_id, success=True)

    except ValueError as e:
        logger.error(f"Validation error for {message_id}: {e}")
        # Don't retry validation errors
        return ProcessingResult(message_id=message_id, success=True)

    except Exception as e:
        logger.error(f"Error processing {message_id}: {e}", exc_info=True)
        return ProcessingResult(
            message_id=message_id,
            success=False,
            error=str(e)
        )


def validate_message(body: dict) -> None:
    """Validate message structure."""
    required_fields = ['type', 'data', 'timestamp']
    for field in required_fields:
        if field not in body:
            raise ValueError(f"Missing required field: {field}")


def handle_order_created(data: dict) -> None:
    """Process new order."""
    order_id = data['order_id']

    # Idempotency check
    existing = table.get_item(Key={'pk': f'ORDER#{order_id}'})
    if 'Item' in existing:
        logger.info(f"Order {order_id} already exists, skipping")
        return

    # Create order record
    table.put_item(Item={
        'pk': f'ORDER#{order_id}',
        'sk': 'METADATA',
        'status': 'pending',
        'created_at': data['timestamp'],
        'customer_id': data['customer_id'],
        'items': data['items'],
        'total': data['total']
    })

    logger.info(f"Created order {order_id}")


def handle_order_updated(data: dict) -> None:
    """Update existing order."""
    order_id = data['order_id']

    table.update_item(
        Key={'pk': f'ORDER#{order_id}', 'sk': 'METADATA'},
        UpdateExpression='SET #status = :status, updated_at = :ts',
        ExpressionAttributeNames={'#status': 'status'},
        ExpressionAttributeValues={
            ':status': data['status'],
            ':ts': data['timestamp']
        },
        ConditionExpression='attribute_exists(pk)'
    )

    logger.info(f"Updated order {order_id} to {data['status']}")

Sending Messages to SQS

Producer Function

import json
import uuid
import boto3
from datetime import datetime

sqs = boto3.client('sqs')
QUEUE_URL = os.environ['QUEUE_URL']

def send_order_event(order_data: dict, event_type: str) -> str:
    """Send order event to SQS."""
    message = {
        'type': event_type,
        'data': order_data,
        'timestamp': datetime.utcnow().isoformat(),
        'correlation_id': str(uuid.uuid4())
    }

    response = sqs.send_message(
        QueueUrl=QUEUE_URL,
        MessageBody=json.dumps(message),
        MessageGroupId=order_data.get('customer_id', 'default'),  # For FIFO
        MessageDeduplicationId=f"{order_data['order_id']}-{event_type}"  # For FIFO
    )

    return response['MessageId']


def send_batch(messages: list) -> dict:
    """Send multiple messages in a batch."""
    entries = [
        {
            'Id': str(i),
            'MessageBody': json.dumps(msg),
            'MessageGroupId': msg['data'].get('customer_id', 'default'),
            'MessageDeduplicationId': f"{msg['data']['order_id']}-{msg['type']}"
        }
        for i, msg in enumerate(messages)
    ]

    response = sqs.send_message_batch(
        QueueUrl=QUEUE_URL,
        Entries=entries
    )

    return {
        'successful': len(response.get('Successful', [])),
        'failed': len(response.get('Failed', []))
    }

FIFO Queues for Ordering

When to Use FIFO

Standard Queue:
- At-least-once delivery
- Best-effort ordering
- Unlimited throughput
- Use for: Independent events, high volume

FIFO Queue:
- Exactly-once processing
- Strict ordering within message group
- 3,000 msg/sec (with batching)
- Use for: Order-sensitive processing, deduplication needed

FIFO Queue Configuration

resource "aws_sqs_queue" "fifo" {
  name                        = "orders-queue.fifo"
  fifo_queue                  = true
  content_based_deduplication = false  # We provide our own dedup IDs

  visibility_timeout_seconds = 300
  message_retention_seconds  = 1209600

  # High throughput mode
  deduplication_scope   = "messageGroup"
  fifo_throughput_limit = "perMessageGroupId"

  redrive_policy = jsonencode({
    deadLetterTargetArn = aws_sqs_queue.fifo_dlq.arn
    maxReceiveCount     = 3
  })

  tags = var.tags
}

resource "aws_sqs_queue" "fifo_dlq" {
  name       = "orders-queue-dlq.fifo"
  fifo_queue = true

  tags = var.tags
}

Error Handling Patterns

Exponential Backoff with Visibility Timeout

def process_with_backoff(record: dict, context) -> ProcessingResult:
    """Implement exponential backoff using visibility timeout."""
    message_id = record['messageId']
    receive_count = int(record['attributes'].get('ApproximateReceiveCount', 1))

    try:
        process_message(record)
        return ProcessingResult(message_id=message_id, success=True)

    except RetryableError as e:
        # Calculate backoff
        backoff_seconds = min(300, 30 * (2 ** (receive_count - 1)))

        # Extend visibility timeout
        sqs.change_message_visibility(
            QueueUrl=QUEUE_URL,
            ReceiptHandle=record['receiptHandle'],
            VisibilityTimeout=backoff_seconds
        )

        logger.warning(
            f"Retryable error for {message_id}, "
            f"attempt {receive_count}, backoff {backoff_seconds}s"
        )

        return ProcessingResult(
            message_id=message_id,
            success=False,
            error=str(e)
        )

Dead Letter Queue Processing

def process_dlq(event, context):
    """Process messages from DLQ for manual investigation."""
    for record in event['Records']:
        body = json.loads(record['body'])
        original_queue = record['attributes'].get('DeadLetterQueueSourceArn')

        # Store in S3 for analysis
        s3.put_object(
            Bucket=os.environ['DLQ_BUCKET'],
            Key=f"failed/{record['messageId']}.json",
            Body=json.dumps({
                'message': body,
                'attributes': record['attributes'],
                'original_queue': original_queue,
                'processed_at': datetime.utcnow().isoformat()
            })
        )

        # Send alert
        sns.publish(
            TopicArn=os.environ['ALERTS_TOPIC'],
            Subject='DLQ Message Received',
            Message=f"Failed message {record['messageId']} from {original_queue}"
        )

    return {'processed': len(event['Records'])}

Monitoring and Observability

CloudWatch Alarms

resource "aws_cloudwatch_metric_alarm" "queue_depth" {
  alarm_name          = "orders-queue-depth"
  comparison_operator = "GreaterThanThreshold"
  evaluation_periods  = 2
  metric_name         = "ApproximateNumberOfMessagesVisible"
  namespace           = "AWS/SQS"
  period              = 300
  statistic           = "Average"
  threshold           = 1000
  alarm_description   = "Queue depth is too high"

  dimensions = {
    QueueName = aws_sqs_queue.main.name
  }

  alarm_actions = [aws_sns_topic.alerts.arn]
}

resource "aws_cloudwatch_metric_alarm" "dlq_not_empty" {
  alarm_name          = "orders-dlq-not-empty"
  comparison_operator = "GreaterThanThreshold"
  evaluation_periods  = 1
  metric_name         = "ApproximateNumberOfMessagesVisible"
  namespace           = "AWS/SQS"
  period              = 60
  statistic           = "Sum"
  threshold           = 0
  alarm_description   = "Messages in DLQ require attention"

  dimensions = {
    QueueName = aws_sqs_queue.dlq.name
  }

  alarm_actions = [aws_sns_topic.alerts.arn]
}

resource "aws_cloudwatch_metric_alarm" "processing_errors" {
  alarm_name          = "order-processor-errors"
  comparison_operator = "GreaterThanThreshold"
  evaluation_periods  = 2
  metric_name         = "Errors"
  namespace           = "AWS/Lambda"
  period              = 300
  statistic           = "Sum"
  threshold           = 10
  alarm_description   = "High error rate in order processor"

  dimensions = {
    FunctionName = aws_lambda_function.processor.function_name
  }

  alarm_actions = [aws_sns_topic.alerts.arn]
}

Custom Metrics

import boto3

cloudwatch = boto3.client('cloudwatch')

def publish_metrics(success_count: int, failure_count: int, duration_ms: float):
    """Publish custom processing metrics."""
    cloudwatch.put_metric_data(
        Namespace='OrderProcessing',
        MetricData=[
            {
                'MetricName': 'MessagesProcessed',
                'Value': success_count,
                'Unit': 'Count',
                'Dimensions': [
                    {'Name': 'Status', 'Value': 'Success'}
                ]
            },
            {
                'MetricName': 'MessagesProcessed',
                'Value': failure_count,
                'Unit': 'Count',
                'Dimensions': [
                    {'Name': 'Status', 'Value': 'Failure'}
                ]
            },
            {
                'MetricName': 'ProcessingDuration',
                'Value': duration_ms,
                'Unit': 'Milliseconds'
            }
        ]
    )

Key Takeaways

  1. Set visibility timeout = 6x Lambda timeout — prevents duplicate processing
  2. Always use DLQs — never lose messages silently
  3. Implement partial batch failures — don’t fail the whole batch for one bad message
  4. Use FIFO when order matters — but accept the throughput tradeoff
  5. Monitor queue depth and DLQ — early warning for processing issues

“Event-driven architectures are like plumbing — when they work, nobody notices. When they fail, everybody notices.”