Azure Durable Functions extends Azure Functions with stateful workflow orchestration. It handles checkpointing, retries, and long-running processes — turning serverless into a workflow engine. This guide covers the patterns that make complex workflows manageable.

Durable Functions Concepts

Standard Function: Request → Function → Response (stateless)

Durable Functions:
├── Client Function     → Starts orchestrations
├── Orchestrator        → Coordinates activities (checkpointed)
├── Activity Functions  → Do actual work (stateless)
└── Entity Functions    → Stateful actors (optional)

Your First Durable Function

Project Setup

# Create function app
func init DurableDemo --worker-runtime python
cd DurableDemo

# Install Durable Functions extension
pip install azure-functions-durable

# Add to requirements.txt
echo "azure-functions-durable>=1.2.0" >> requirements.txt

Orchestrator

# orchestrator/__init__.py
import azure.durable_functions as df

def orchestrator_function(context: df.DurableOrchestrationContext):
    """Orchestrates a multi-step workflow."""
    
    # Get input
    order = context.get_input()
    order_id = order["order_id"]
    
    # Step 1: Validate order
    validation = yield context.call_activity("validate_order", order)
    if not validation["valid"]:
        return {"status": "rejected", "reason": validation["reason"]}
    
    # Step 2: Reserve inventory (with retry)
    retry_options = df.RetryOptions(
        first_retry_interval_in_milliseconds=5000,
        max_number_of_attempts=3
    )
    inventory = yield context.call_activity_with_retry(
        "reserve_inventory", retry_options, order
    )
    
    # Step 3: Process payment
    try:
        payment = yield context.call_activity("process_payment", order)
    except Exception as e:
        # Compensate: release inventory
        yield context.call_activity("release_inventory", order_id)
        raise
    
    # Step 4: Ship order
    shipment = yield context.call_activity("ship_order", order)
    
    return {
        "status": "completed",
        "order_id": order_id,
        "tracking": shipment["tracking_number"]
    }

main = df.Orchestrator.create(orchestrator_function)

Activity Functions

# validate_order/__init__.py
def main(order: dict) -> dict:
    """Validate order data."""
    if not order.get("items"):
        return {"valid": False, "reason": "No items in order"}
    if order.get("total", 0) <= 0:
        return {"valid": False, "reason": "Invalid total"}
    return {"valid": True}

# reserve_inventory/__init__.py
import logging

def main(order: dict) -> dict:
    """Reserve inventory for order items."""
    logging.info(f"Reserving inventory for order {order['order_id']}")
    # Call inventory service...
    return {"reserved": True, "items": order["items"]}

# process_payment/__init__.py
def main(order: dict) -> dict:
    """Process payment."""
    # Call payment provider...
    return {"success": True, "transaction_id": "txn_123"}

# ship_order/__init__.py
def main(order: dict) -> dict:
    """Create shipment."""
    return {"tracking_number": "1Z999AA10123456784"}

HTTP Trigger (Client)

# http_start/__init__.py
import azure.functions as func
import azure.durable_functions as df

async def main(req: func.HttpRequest, starter: str) -> func.HttpResponse:
    """HTTP trigger that starts the orchestration."""
    client = df.DurableOrchestrationClient(starter)
    
    # Parse order from request
    order = req.get_json()
    
    # Start orchestration
    instance_id = await client.start_new("orchestrator", None, order)
    
    # Return status URLs
    return client.create_check_status_response(req, instance_id)

function.json Files

// orchestrator/function.json
{
  "bindings": [
    {
      "name": "context",
      "type": "orchestrationTrigger",
      "direction": "in"
    }
  ]
}

// http_start/function.json
{
  "bindings": [
    {
      "authLevel": "function",
      "name": "req",
      "type": "httpTrigger",
      "direction": "in",
      "methods": ["post"]
    },
    {
      "name": "$return",
      "type": "http",
      "direction": "out"
    },
    {
      "name": "starter",
      "type": "durableClient",
      "direction": "in"
    }
  ]
}

Common Patterns

Fan-Out/Fan-In (Parallel Processing)

def orchestrator_function(context: df.DurableOrchestrationContext):
    """Process items in parallel, then aggregate."""
    items = context.get_input()
    
    # Fan out: start all tasks in parallel
    tasks = []
    for item in items:
        task = context.call_activity("process_item", item)
        tasks.append(task)
    
    # Fan in: wait for all to complete
    results = yield context.task_all(tasks)
    
    # Aggregate results
    total = sum(r["value"] for r in results)
    return {"total": total, "count": len(results)}

Human Interaction (Approval Workflow)

import datetime

def orchestrator_function(context: df.DurableOrchestrationContext):
    """Wait for human approval with timeout."""
    request = context.get_input()
    
    # Send approval request
    yield context.call_activity("send_approval_request", request)
    
    # Wait for external event (approval) or timeout
    expiration = context.current_utc_datetime + datetime.timedelta(hours=24)
    timeout_task = context.create_timer(expiration)
    approval_task = context.wait_for_external_event("ApprovalEvent")
    
    winner = yield context.task_any([approval_task, timeout_task])
    
    if winner == timeout_task:
        yield context.call_activity("send_timeout_notification", request)
        return {"status": "timeout"}
    
    approval = approval_task.result
    if approval["approved"]:
        yield context.call_activity("execute_request", request)
        return {"status": "approved"}
    else:
        return {"status": "rejected", "reason": approval.get("reason")}
# Raise event from external system
async def approve(req: func.HttpRequest, starter: str):
    client = df.DurableOrchestrationClient(starter)
    instance_id = req.route_params["instance_id"]
    
    approval = req.get_json()
    await client.raise_event(instance_id, "ApprovalEvent", approval)
    
    return func.HttpResponse(status_code=202)

Chaining with Error Handling

def orchestrator_function(context: df.DurableOrchestrationContext):
    """Chain activities with compensation on failure."""
    order = context.get_input()
    completed_steps = []
    
    try:
        # Step 1
        result1 = yield context.call_activity("step1", order)
        completed_steps.append("step1")
        
        # Step 2
        result2 = yield context.call_activity("step2", result1)
        completed_steps.append("step2")
        
        # Step 3
        result3 = yield context.call_activity("step3", result2)
        completed_steps.append("step3")
        
        return {"status": "success", "result": result3}
        
    except Exception as e:
        # Compensate in reverse order
        for step in reversed(completed_steps):
            yield context.call_activity(f"compensate_{step}", order)
        
        return {"status": "failed", "error": str(e)}

main = df.Orchestrator.create(orchestrator_function)

Eternal Orchestrations (Monitoring)

def orchestrator_function(context: df.DurableOrchestrationContext):
    """Monitor a resource forever with periodic checks."""
    config = context.get_input()
    
    while True:
        # Check health
        status = yield context.call_activity("check_health", config["endpoint"])
        
        if not status["healthy"]:
            yield context.call_activity("send_alert", {
                "endpoint": config["endpoint"],
                "status": status
            })
        
        # Wait before next check
        next_check = context.current_utc_datetime + datetime.timedelta(minutes=5)
        yield context.create_timer(next_check)
        
        # Continue as new to prevent history growth
        context.continue_as_new(config)

Terraform Deployment

resource "azurerm_storage_account" "functions" {
  name                     = "funcstorageacct"
  resource_group_name      = azurerm_resource_group.main.name
  location                 = azurerm_resource_group.main.location
  account_tier             = "Standard"
  account_replication_type = "LRS"
}

resource "azurerm_service_plan" "functions" {
  name                = "functions-plan"
  resource_group_name = azurerm_resource_group.main.name
  location            = azurerm_resource_group.main.location
  os_type             = "Linux"
  sku_name            = "EP1"  # Elastic Premium for Durable Functions
}

resource "azurerm_linux_function_app" "durable" {
  name                = "durable-functions-app"
  resource_group_name = azurerm_resource_group.main.name
  location            = azurerm_resource_group.main.location

  storage_account_name       = azurerm_storage_account.functions.name
  storage_account_access_key = azurerm_storage_account.functions.primary_access_key
  service_plan_id            = azurerm_service_plan.functions.id

  site_config {
    application_stack {
      python_version = "3.11"
    }

    elastic_instance_minimum = 1
  }

  app_settings = {
    FUNCTIONS_WORKER_RUNTIME       = "python"
    AzureWebJobsFeatureFlags       = "EnableWorkerIndexing"
    WEBSITE_RUN_FROM_PACKAGE       = "1"
    
    # Durable Functions settings
    DURABLE_TASK_HUB_NAME = "ProductionHub"
  }

  identity {
    type = "SystemAssigned"
  }
}

Monitoring and Debugging

# Check orchestration status
curl "https://my-functions.azurewebsites.net/runtime/webhooks/durabletask/instances/{instanceId}?code={functionKey}"

# Query all running orchestrations
curl "https://my-functions.azurewebsites.net/runtime/webhooks/durabletask/instances?runtimeStatus=Running&code={functionKey}"

# Purge completed instances (cleanup)
curl -X POST "https://my-functions.azurewebsites.net/runtime/webhooks/durabletask/instances/purge?createdTimeFrom=2024-01-01&runtimeStatus=Completed&code={functionKey}"
# Query orchestrations programmatically
async def list_instances(req: func.HttpRequest, starter: str):
    client = df.DurableOrchestrationClient(starter)
    
    instances = await client.get_status_all()
    running = [i for i in instances if i.runtime_status == df.OrchestrationRuntimeStatus.Running]
    
    return func.HttpResponse(
        json.dumps([{"id": i.instance_id, "status": i.runtime_status.name} for i in running]),
        mimetype="application/json"
    )

Best Practices

1. Keep Orchestrators Deterministic

# ❌ BAD: Non-deterministic code in orchestrator
def orchestrator_function(context):
    current_time = datetime.datetime.now()  # Different on replay!
    random_value = random.random()  # Different on replay!

# ✅ GOOD: Use context methods
def orchestrator_function(context):
    current_time = context.current_utc_datetime  # Deterministic
    random_value = yield context.call_activity("get_random")  # Deterministic

2. Handle Large Payloads

# ❌ BAD: Large data in orchestrator state
result = yield context.call_activity("get_large_data")  # 10MB payload

# ✅ GOOD: Use blob storage
blob_url = yield context.call_activity("store_data_in_blob")
result = yield context.call_activity("process_blob", blob_url)

3. Use Sub-Orchestrations for Modularity

def main_orchestrator(context):
    order = context.get_input()
    
    # Call sub-orchestration
    payment_result = yield context.call_sub_orchestrator(
        "payment_orchestrator",
        order["payment"]
    )
    
    shipping_result = yield context.call_sub_orchestrator(
        "shipping_orchestrator",
        order["shipping"]
    )
    
    return {"payment": payment_result, "shipping": shipping_result}

Key Takeaways

  1. Orchestrators coordinate, activities work — keep orchestrators lightweight
  2. Automatic checkpointing — workflows survive function restarts
  3. Retry policies are built in — configure per activity
  4. Fan-out/fan-in enables massive parallelism
  5. External events support human-in-the-loop workflows
  6. Continue-as-new prevents history bloat in eternal orchestrations
  7. Determinism is critical — never use random/datetime in orchestrators

“Durable Functions turns serverless from ‘request-response’ into ‘workflow engine’. When you need state machines, saga patterns, or long-running processes — this is the answer.”