In the first part of the durable function blog, we had seen why to use the durable functions and different types of durable functions including examples. In this part we will have a look on the remaining types of functions.
Types of durable functions
1. Function Chaining (Covered in first part)
2. Fan-Out/Fan-In (Covered in first part)
3. Async HTTP APIs (Covered in this article)
4. Monitoring (Covered in this article)
5. Human interaction (Covered in this article)
6. Aggregator (Stateful entities)
So, we are starting with the remaining functions.
What is function Async HTTP API?
The async HTTP API durable function type helps us to know the status of the long-running process with the outside clients. Rather than waiting to complete the process of the outside client, we can ask for the status of the process. So, we will redirect the client to get the status of the process and the client will polls when the process has been completed. In the below picture, we can co-relate this scenario.
A user will start the long-running process and DoWork long-running process which will return after the process is completed and update the status, with an extra endpoint we can understand the status of the long-running process.
This feature is supported by the Durable function so we as a developer need not worry about the managing the code just using the durable functions and we are done. Also, durable function managing the state of every function whether it is InProgress/Completed/Removed etc.
Let’s consider the scenario, if we need all list of countries from the client and which is taking time to fetch all countries and apply the business logic on and return the result which almost taking more than 2min, to avoid such long running scenarios we will create a Async HTTP API as below.
1. GetAllCountries_Start – as a HTTP trigger method will start the OrchestrationTrigger method and return the instance id and tracking endpoints.
2. GetAllCountries_AsyncCall – Will call the ActivityTrigger method GetAllCountries_DoWork.
3. GetAllCountries_DoWork – Will going to invoke the client and does the job returns the result to the context and context will update the status and the result.
Below is sample of code to fetch the all the countries GetAllCountries from the client.
public static class AsyncHttpApi
{
[FunctionName("GetAllCountries_Start")]
public static async Task<HttpResponseMessage> HttpStart(
[HttpTrigger(AuthorizationLevel.Anonymous, "get", "post")] HttpRequestMessage req,
[DurableClient] IDurableOrchestrationClient starter,
ILogger log)
{
// Function input comes from the request content.
string instanceId = await starter.StartNewAsync("GetAllCountries_AsyncCall", null);
log.LogInformation("Started orchestration with ID = '{instanceId}'.", instanceId);
return starter.CreateCheckStatusResponse(req, instanceId);
}
[FunctionName("GetAllCountries_AsyncCall")]
public static async Task<string> RunOrchestrator(
[OrchestrationTrigger] IDurableOrchestrationContext context)
{
return await context.CallActivityAsync<string>(nameof(GetAllCountries_DoWork), null);
}
[FunctionName(nameof(GetAllCountries_DoWork))]
public static string GetAllCountries_DoWork([ActivityTrigger] ILogger log)
{
Thread.Sleep(10000); // Sleep for the 10 seconds Just for making long running.
HttpClient client = new HttpClient();
client.BaseAddress = new Uri("https://api.first.org/data/v1/countries");
HttpRequestMessage httpRequestMessage = new HttpRequestMessage(HttpMethod.Get, "https://api.first.org/data/v1/countries");
// List data response.
HttpResponseMessage response = client.SendAsync(httpRequestMessage).Result;
if (response.IsSuccessStatusCode)
{
var dataObjects = response.Content.ReadAsStringAsync().Result; //Make sure to add a reference to System.Net.Http.Formatting.dll
return dataObjects;
}
else
{
Console.WriteLine("{0} ({1})", (int)response.StatusCode, response.ReasonPhrase);
return string.Empty;
}
}
}
In the above example, we created a GetAllCountries_Start function which will start the long-running async method and returns the different endpoints to the understand the status or to communicate with the long running process, instance id with the http status code 202(Accepted).
To get the status of the execution of the process we can just invoke the statusQueryUri in the postman. If the process is not yet completed, then it will return the response as running with 202 http status code. You can have a look on below screen shot.
After the process has been completed it will return 200 or if any error occurred while execution.
What is function Monitoring?
When we are thinking about our traditional monitor method for cleaning activity or sending an email, we must give an interval or datetime and which is static, Also the managing instance lifetime is quite complex. By using durable functions, we can create flexible recurrence intervals, manage task lifetimes, and create the multiple monitoring process from a single orchestration function.
In async HTTP API we are exposing the endpoint for clients to monitor the long-running process to completed or not but in the Monitor, we need not do that it will consume the endpoint and then wait until the status change.
Let’s consider an employee who applied leave and manager will approve or reject in given period else it needs to auto reject and send an email to the employee in both the case. To achieve this scenario, we will use the monitor functions as below.
1. MonitorJobStatus – Is a OrchestrationTrigger function it will call an activity trigger function GetLeaveStatus after every interval.
2. Will manage the timer start and ends which depends on the polling interval. We can manage the interval dynamically easily.
3. To manage the lifetime of the instance we can specify the expiry time it will keep checking with the CurrentUtcDateTime of the context.
4. After the response from the GetLeaveStatus will filter the response and decide to send an alert or create a next timer. If the timer is renewed it will be continued till it gets the correct status or meets the expiry time.
public class MonitorFunction
{
public readonly ICosmosdbService _cosmosDbService;
public MonitorFunction(ICosmosdbService cosmosDbService)
{
_cosmosDbService = cosmosDbService;
}
[FunctionName("MonitorJobStatus_HttpStart")]
public async Task<HttpResponseMessage> HttpStart([HttpTrigger(AuthorizationLevel.Anonymous, "get", "post")] HttpRequestMessage req,
[DurableClient] IDurableOrchestrationClient starter, ILogger log)
{
var content = req.Content;
string instanceId = content.ReadAsAsync<string>().Result;
var leaveReq = new LeaveRequest();
leaveReq.id = instanceId;
// Function input comes from the request content.
string instanceIdNew = await starter.StartNewAsync("MonitorJobStatus", leaveReq);
log.LogInformation($"Started orchestration with ID = '{instanceIdNew}'.");
// To understand the status of the OrchestrationTrigger(MonitorJobStatus) method
return starter.CreateCheckStatusResponse(req, instanceIdNew);
}
[FunctionName("MonitorJobStatus")]
public async Task Run([OrchestrationTrigger] IDurableOrchestrationContext context)
{
LeaveRequest instanceId = context.GetInput<LeaveRequest>();
int pollingInterval = GetPollingInterval();
DateTime expiryTime = DateTime.Now.AddDays(7); // So the job will end up in next 7 days if status not updates according
while (context.CurrentUtcDateTime < expiryTime)
{
var jobStatus = await context.CallActivityAsync<LeaveRequest>(" GetLeaveStatus", instanceId.id);
if (jobStatus.ApprovalStatus == "Approved")
{
// Perform an action when a condition is met.
await context.CallActivityAsync("SendAlert", jobStatus);
context.SetOutput("Leave approved and email has been sent.");
break; // Orchestration will finish the process
}
else if (jobStatus.ApprovalStatus == "Rejected")
{
await context.CallActivityAsync("SendAlert", jobStatus);
context.SetOutput("Leave rejected and email has been sent.");
break;
}
// Orchestration sleeps until this time.
var nextCheck = context.CurrentUtcDateTime.AddSeconds(pollingInterval);
await context.CreateTimer(nextCheck, CancellationToken.None);
}
// Perform more work here or let the orchestration end.
}
private int GetPollingInterval()
{
return 600; // 10Min this way we can keep timer dynamic
}
[FunctionName("GetLeaveStatus")]
public async Task<LeaveRequest> GetLeaveStatus([ActivityTrigger] string instanceId)
{
// Will get the status from the database
return await _cosmosDbService.GetItemStatus(instanceId);
}
[FunctionName("SendAlert")]
/// <summary>
/// SendAlert
/// </summary>
/// <returns></returns>
///
public static async Task SendAlert([ActivityTrigger] LeaveRequest leave)
{
//Send an email or process more
Console.WriteLine(leave.ApprovalStatus);
}
}
What is the meaning of Human Interaction functions?
In the world of automation and IT there are multiple scenarios/process are automated but there are still some scenario’s which needs human interactions in between of the process. Involving humans in between the process which is quite tricky as people aren’t highly available as cloud services. But using the durable functions we can manage the human interaction workflows.
In the below diagram, we can see there is a RequestApproval method which is raised by the user and waiting for the people to reply if the person replied then will ProcessApproval, if doesn’t reply in time then it will be escalated.
Let’s understand this scenario with the code.
public class HumanInteractionFunction
{
public readonly ICosmosdbService _cosmosDbService;
public HumanInteractionFunction(ICosmosdbService cosmosDbService)
{
_cosmosDbService = cosmosDbService;
}
[FunctionName("HumanInteractionFunction_HttpStart")]
public async Task<HttpResponseMessage> HttpStart(
[HttpTrigger(AuthorizationLevel.Anonymous, "get", "post")] HttpRequestMessage req,
[DurableClient] IDurableOrchestrationClient starter, ILogger log)
{
var input = await req.Content.ReadAsAsync<LeaveRequest>();
input.ApprovalStatus = ApprovalStatus.Applied.ToString();
// Function input comes from the request content.
// it will return the newly created instance id
string instanceId = await starter.StartNewAsync("HumanInteractionFunction", input);
input.id = instanceId;
log.LogInformation($"Started orchestration with ID = '{instanceId}'.");
// Will return the status endpoints
return starter.CreateCheckStatusResponse(req, instanceId);
}
[FunctionName("HumanInteractionFunction")]
public async Task<LeaveRequest> RunOrchestrator([OrchestrationTrigger] IDurableOrchestrationContext context)
{
var leave = context.GetInput<LeaveRequest>();
leave.id = context.InstanceId;
await context.CallActivityAsync("RequestLeave", leave);
using (var timeoutCts = new CancellationTokenSource())
{
DateTime dueTime = context.CurrentUtcDateTime.AddMinutes(60);
Task durableTimeout = context.CreateTimer(dueTime, timeoutCts.Token);
Task<bool> approvalEvent = context.WaitForExternalEvent<bool>("ApprovalEvent"); // Waiting for the event raise by the human
if (durableTimeout == await Task.WhenAny(approvalEvent, durableTimeout) || !approvalEvent.Result)
{
leave.ApprovalStatus = ApprovalStatus.Rejected.ToString(); // Auto rejected after 10 min
}
else
{
leave.ApprovalStatus = ApprovalStatus.Approved.ToString();
}
timeoutCts.Cancel();
await context.CallActivityAsync("ProcessApproval", leave);
}
return leave;
}
[FunctionName("RequestLeave")]
public async Task RequestLeave([ActivityTrigger] LeaveRequest leaveApproval)
{
try
{
await _cosmosDbService.AddItemAsync(leaveApproval);
}
catch (Exception ex)
{
throw new Exception(ex.ToString());
}
}
[FunctionName("ProcessApproval")]
public async Task ProcessApproval([ActivityTrigger] LeaveRequest leaveApproval)
{
try
{
await _cosmosDbService.UpdateItemAsync(leaveApproval.id, leaveApproval);
}
catch (Exception ex)
{
throw new Exception(ex.ToString());
}
}
[FunctionName("RaiseEventToOrchestration")]
public async Task Run([HttpTrigger(new string[] { "post", "get" })] HttpRequest request, [DurableClient] IDurableOrchestrationClient client)
{
try
{
string instanceId = request.Query.FirstOrDefault(a => a.Key == "instanceId").Value;
bool value = bool.TryParse(request.Query.FirstOrDefault(a => a.Key == "isApproved").Value, out bool isApproved);
await client.RaiseEventAsync(instanceId, "ApprovalEvent", isApproved); // Raise the event approve/reject
}
catch (Exception ex)
{
throw new Exception(ex.ToString());
}
}
}
public class LeaveRequest
{
public string id { get; set; }
public string Name { get; set; }
public DateTime FromDate { get; set; }
public DateTime ToDate { get; set; }
public string Reason { get; set; }
public string ApprovalStatus { get; set; }
}
In the above example created a leave approval scenario.
1. An employee will request a leave using the function HumanInteractionFunction_HttpStart and waits for the approval/rejection.
2. By using statusQueryGetUri employee can track the status it is completed or running, and, in the output, attribute can understand it is approved/rejected.
3. HumanInteractionFunction it is a OrchestrationTrigger it will call by the activity trigger function RequestLeave (Just to save the request into the Database) and create a approvalEvent and timer.
4. Now in between how the human/manager will approve/reject the leave, to do that created one more endpoint RaiseEventToOrchestration this http trigger raise an event for the ApprovalEvent.
5. RaiseEventToOrchestration needs two inputs instanceId(string) and isApproved(boolean) by using instanceid it will process the specific instance request and isApproved will decide to approve or reject.
6. ApprovalEvent will raise the event by manager or if timeout happened and the process will be completed. In the above example 60min is a waiting period after that it will be auto rejected, and the process will be completed.
Sample endpoint for the RaiseEventToOrchestration
Sample endpoint to understand the status.
Reference document: Durable-functions-overview
In the next part, we will cover the following topics,
- Aggregator type of Durable functions.
- Durable function timeout varies as per the App plans will see in detail.
- Deployment of durable functions.
Azure Durable Functions – Part 1
A serverless cloud environment for your application to run with less code, less infrastructure maintenance, and save your cost.