Azure Functions: Durable Functions for Stateful Workflows
Build stateful, long-running workflows with Azure Durable Functions. Learn orchestration patterns, error handling, and production deployment.
Azure Durable Functions extends Azure Functions with stateful workflow orchestration. It handles checkpointing, retries, and long-running processes — turning serverless into a workflow engine. This guide covers the patterns that make complex workflows manageable.
Durable Functions Concepts
Standard Function: Request → Function → Response (stateless)
Durable Functions:
├── Client Function → Starts orchestrations
├── Orchestrator → Coordinates activities (checkpointed)
├── Activity Functions → Do actual work (stateless)
└── Entity Functions → Stateful actors (optional)
Your First Durable Function
Project Setup
# Create function app
func init DurableDemo --worker-runtime python
cd DurableDemo
# Install Durable Functions extension
pip install azure-functions-durable
# Add to requirements.txt
echo "azure-functions-durable>=1.2.0" >> requirements.txt
Orchestrator
# orchestrator/__init__.py
import azure.durable_functions as df
def orchestrator_function(context: df.DurableOrchestrationContext):
"""Orchestrates a multi-step workflow."""
# Get input
order = context.get_input()
order_id = order["order_id"]
# Step 1: Validate order
validation = yield context.call_activity("validate_order", order)
if not validation["valid"]:
return {"status": "rejected", "reason": validation["reason"]}
# Step 2: Reserve inventory (with retry)
retry_options = df.RetryOptions(
first_retry_interval_in_milliseconds=5000,
max_number_of_attempts=3
)
inventory = yield context.call_activity_with_retry(
"reserve_inventory", retry_options, order
)
# Step 3: Process payment
try:
payment = yield context.call_activity("process_payment", order)
except Exception as e:
# Compensate: release inventory
yield context.call_activity("release_inventory", order_id)
raise
# Step 4: Ship order
shipment = yield context.call_activity("ship_order", order)
return {
"status": "completed",
"order_id": order_id,
"tracking": shipment["tracking_number"]
}
main = df.Orchestrator.create(orchestrator_function)
Activity Functions
# validate_order/__init__.py
def main(order: dict) -> dict:
"""Validate order data."""
if not order.get("items"):
return {"valid": False, "reason": "No items in order"}
if order.get("total", 0) <= 0:
return {"valid": False, "reason": "Invalid total"}
return {"valid": True}
# reserve_inventory/__init__.py
import logging
def main(order: dict) -> dict:
"""Reserve inventory for order items."""
logging.info(f"Reserving inventory for order {order['order_id']}")
# Call inventory service...
return {"reserved": True, "items": order["items"]}
# process_payment/__init__.py
def main(order: dict) -> dict:
"""Process payment."""
# Call payment provider...
return {"success": True, "transaction_id": "txn_123"}
# ship_order/__init__.py
def main(order: dict) -> dict:
"""Create shipment."""
return {"tracking_number": "1Z999AA10123456784"}
HTTP Trigger (Client)
# http_start/__init__.py
import azure.functions as func
import azure.durable_functions as df
async def main(req: func.HttpRequest, starter: str) -> func.HttpResponse:
"""HTTP trigger that starts the orchestration."""
client = df.DurableOrchestrationClient(starter)
# Parse order from request
order = req.get_json()
# Start orchestration
instance_id = await client.start_new("orchestrator", None, order)
# Return status URLs
return client.create_check_status_response(req, instance_id)
function.json Files
// orchestrator/function.json
{
"bindings": [
{
"name": "context",
"type": "orchestrationTrigger",
"direction": "in"
}
]
}
// http_start/function.json
{
"bindings": [
{
"authLevel": "function",
"name": "req",
"type": "httpTrigger",
"direction": "in",
"methods": ["post"]
},
{
"name": "$return",
"type": "http",
"direction": "out"
},
{
"name": "starter",
"type": "durableClient",
"direction": "in"
}
]
}
Common Patterns
Fan-Out/Fan-In (Parallel Processing)
def orchestrator_function(context: df.DurableOrchestrationContext):
"""Process items in parallel, then aggregate."""
items = context.get_input()
# Fan out: start all tasks in parallel
tasks = []
for item in items:
task = context.call_activity("process_item", item)
tasks.append(task)
# Fan in: wait for all to complete
results = yield context.task_all(tasks)
# Aggregate results
total = sum(r["value"] for r in results)
return {"total": total, "count": len(results)}
Human Interaction (Approval Workflow)
import datetime
def orchestrator_function(context: df.DurableOrchestrationContext):
"""Wait for human approval with timeout."""
request = context.get_input()
# Send approval request
yield context.call_activity("send_approval_request", request)
# Wait for external event (approval) or timeout
expiration = context.current_utc_datetime + datetime.timedelta(hours=24)
timeout_task = context.create_timer(expiration)
approval_task = context.wait_for_external_event("ApprovalEvent")
winner = yield context.task_any([approval_task, timeout_task])
if winner == timeout_task:
yield context.call_activity("send_timeout_notification", request)
return {"status": "timeout"}
approval = approval_task.result
if approval["approved"]:
yield context.call_activity("execute_request", request)
return {"status": "approved"}
else:
return {"status": "rejected", "reason": approval.get("reason")}
# Raise event from external system
async def approve(req: func.HttpRequest, starter: str):
client = df.DurableOrchestrationClient(starter)
instance_id = req.route_params["instance_id"]
approval = req.get_json()
await client.raise_event(instance_id, "ApprovalEvent", approval)
return func.HttpResponse(status_code=202)
Chaining with Error Handling
def orchestrator_function(context: df.DurableOrchestrationContext):
"""Chain activities with compensation on failure."""
order = context.get_input()
completed_steps = []
try:
# Step 1
result1 = yield context.call_activity("step1", order)
completed_steps.append("step1")
# Step 2
result2 = yield context.call_activity("step2", result1)
completed_steps.append("step2")
# Step 3
result3 = yield context.call_activity("step3", result2)
completed_steps.append("step3")
return {"status": "success", "result": result3}
except Exception as e:
# Compensate in reverse order
for step in reversed(completed_steps):
yield context.call_activity(f"compensate_{step}", order)
return {"status": "failed", "error": str(e)}
main = df.Orchestrator.create(orchestrator_function)
Eternal Orchestrations (Monitoring)
def orchestrator_function(context: df.DurableOrchestrationContext):
"""Monitor a resource forever with periodic checks."""
config = context.get_input()
while True:
# Check health
status = yield context.call_activity("check_health", config["endpoint"])
if not status["healthy"]:
yield context.call_activity("send_alert", {
"endpoint": config["endpoint"],
"status": status
})
# Wait before next check
next_check = context.current_utc_datetime + datetime.timedelta(minutes=5)
yield context.create_timer(next_check)
# Continue as new to prevent history growth
context.continue_as_new(config)
Terraform Deployment
resource "azurerm_storage_account" "functions" {
name = "funcstorageacct"
resource_group_name = azurerm_resource_group.main.name
location = azurerm_resource_group.main.location
account_tier = "Standard"
account_replication_type = "LRS"
}
resource "azurerm_service_plan" "functions" {
name = "functions-plan"
resource_group_name = azurerm_resource_group.main.name
location = azurerm_resource_group.main.location
os_type = "Linux"
sku_name = "EP1" # Elastic Premium for Durable Functions
}
resource "azurerm_linux_function_app" "durable" {
name = "durable-functions-app"
resource_group_name = azurerm_resource_group.main.name
location = azurerm_resource_group.main.location
storage_account_name = azurerm_storage_account.functions.name
storage_account_access_key = azurerm_storage_account.functions.primary_access_key
service_plan_id = azurerm_service_plan.functions.id
site_config {
application_stack {
python_version = "3.11"
}
elastic_instance_minimum = 1
}
app_settings = {
FUNCTIONS_WORKER_RUNTIME = "python"
AzureWebJobsFeatureFlags = "EnableWorkerIndexing"
WEBSITE_RUN_FROM_PACKAGE = "1"
# Durable Functions settings
DURABLE_TASK_HUB_NAME = "ProductionHub"
}
identity {
type = "SystemAssigned"
}
}
Monitoring and Debugging
# Check orchestration status
curl "https://my-functions.azurewebsites.net/runtime/webhooks/durabletask/instances/{instanceId}?code={functionKey}"
# Query all running orchestrations
curl "https://my-functions.azurewebsites.net/runtime/webhooks/durabletask/instances?runtimeStatus=Running&code={functionKey}"
# Purge completed instances (cleanup)
curl -X POST "https://my-functions.azurewebsites.net/runtime/webhooks/durabletask/instances/purge?createdTimeFrom=2024-01-01&runtimeStatus=Completed&code={functionKey}"
# Query orchestrations programmatically
async def list_instances(req: func.HttpRequest, starter: str):
client = df.DurableOrchestrationClient(starter)
instances = await client.get_status_all()
running = [i for i in instances if i.runtime_status == df.OrchestrationRuntimeStatus.Running]
return func.HttpResponse(
json.dumps([{"id": i.instance_id, "status": i.runtime_status.name} for i in running]),
mimetype="application/json"
)
Best Practices
1. Keep Orchestrators Deterministic
# ❌ BAD: Non-deterministic code in orchestrator
def orchestrator_function(context):
current_time = datetime.datetime.now() # Different on replay!
random_value = random.random() # Different on replay!
# ✅ GOOD: Use context methods
def orchestrator_function(context):
current_time = context.current_utc_datetime # Deterministic
random_value = yield context.call_activity("get_random") # Deterministic
2. Handle Large Payloads
# ❌ BAD: Large data in orchestrator state
result = yield context.call_activity("get_large_data") # 10MB payload
# ✅ GOOD: Use blob storage
blob_url = yield context.call_activity("store_data_in_blob")
result = yield context.call_activity("process_blob", blob_url)
3. Use Sub-Orchestrations for Modularity
def main_orchestrator(context):
order = context.get_input()
# Call sub-orchestration
payment_result = yield context.call_sub_orchestrator(
"payment_orchestrator",
order["payment"]
)
shipping_result = yield context.call_sub_orchestrator(
"shipping_orchestrator",
order["shipping"]
)
return {"payment": payment_result, "shipping": shipping_result}
Key Takeaways
- Orchestrators coordinate, activities work — keep orchestrators lightweight
- Automatic checkpointing — workflows survive function restarts
- Retry policies are built in — configure per activity
- Fan-out/fan-in enables massive parallelism
- External events support human-in-the-loop workflows
- Continue-as-new prevents history bloat in eternal orchestrations
- Determinism is critical — never use random/datetime in orchestrators
“Durable Functions turns serverless from ‘request-response’ into ‘workflow engine’. When you need state machines, saga patterns, or long-running processes — this is the answer.”