Durable Functions for Rust will be a port of the Durable Functions implementation for JavaScript, but with an API that makes sense for Rust.
See the Durable Functions Overview documentation for more information about Durable Functions.
The repository we'll be using for upstream is Azure Functions for Rust. Please fork this repository to your GitHub account.
The branch we'll be using is the durable-functions
branch. When implementing work, please branch from the durable-functions
branch and submit a pull request to the durable-functions
branch on the upstream Azure Functions for Rust repository to move completed work into upstream.
This feature will make use of the unstable async/await feature of Rust. Therefore, a nightly Rust compiler is required. When the implementation of this feature is completed and it's time to move the work into the dev
branch, the appropriate feature gating will be performed so that users can continue to use a stable Rust compiler if they do not want to use the Durable Functions feature.
- Install rustup.
rustup install nightly && rustup default nightly
rustup component add rustfmt
(forcargo fmt
to format your source code)rustup component add clippy
(forcargo clippy
to catch linter errors)- Install the requirements listed for Azure Functions for Rust.
- Checkout the
durable-functions
branch:git checkout durable-functions
. - Install the local Azure Functions for Rust SDK:
cargo install -f --path azure-functions-sdk
.
Run the "Durable Functions" example:
cd examples/durable-functions
func settings add AzureWebJobsStorage <connection_string>
(note: it should be possible to use the Storage Emulator instead; I tend to use an actual Azure Storage account. If you are using an actual storage account, please do not commit changes tolocal.settings.json
, otherwise it'll get flagged as a leaked secret and you'll need to recycle your access key.)cargo func run -- --features unstable
Note: the sample is a work in progress and is not yet doing much of anything because there is no orchestration client or orchestration context implementation in place.
The sample should have a VS Code launch profile named "Debug" that uses the CodeLLDB extension.
cd samples/durable-functions
.code .
Debug -> Start Debugging
If you're curious to see what code is getting generated when you build an Azure Function, use the Cargo Expand tool.
There are three parts of the implementation required to port Durable Functions to Rust:
- The Bindings for Durable Functions that developers will use in their Azure Functions.
- The Orchestration Client that interacts with the Durable Functions HTTP API for starting orchestrations and querying their status. This will be ported from the JavaScript implementation.
- Support for creating orchestration and activity functions in the Azure Functions for Rust SDK (e.g.
cargo func new orchestration
andcargo func new activity
).
There are three Durable Functions bindings:
DurableOrchestrationClient
- an input binding for orchestration client functions.DurableOrchestrationContext
- a trigger binding used for orchestrator functions.DurableActivityContext
- a trigger binding used for activity functions.
The bindings will be implemented as part of the azure-functions
crate.
This binding is responsible for providing an instance of an Orchestration Client to enable users to schedule orchestrations.
The binding data will provide two sets of URLs that will be passed to configure the Orchestration Client:
- Creation URLs - used to create new orchestration.
- Management URLs - used to query the status and manage existing orchestrations.
The binding type will internally keep an OrchestrationClient
from the azure-functions-durable
crate and forward its methods to the client implementation.
Typically, developers will use the DurableOrchestrationClient
binding to start an orchestration:
use azure_functions::{
bindings::{DurableOrchestrationClient, HttpRequest, HttpResponse},
func,
};
use serde_json::Value;
#[func]
pub async fn start(_req: HttpRequest, client: DurableOrchestrationClient) -> HttpResponse {
match client.start_new("hello_world", None, Value::Null).await {
Ok(_) => "Orchestration started.".into(),
Err(e) => format!("Failed to start orchestration: {}", e).into(),
}
}
See the C# documentation for information about these methods.
fn task_hub() -> &str
async fn query_status(&self, instance_id: &str, show_history: bool, show_history_output: bool, show_input: bool) -> Result<impl Iterator<Item = OrchestrationStatusQueryResult>, OrchestrationClientError>
async fn query_all_statuses(&self) -> Result<impl Iterator<Item = OrchestrationStatusQueryResult>, OrchestrationClientError>
async fn query_statuses<S>(&self, created_time_from: DateTime<Utc>, created_time_to: Option<DateTime<Utc>>, runtime_statuses: S) -> Result<impl Iterator<Item = OrchestrationStatusQueryResult>, OrchestrationClientError> where S: IntoIterator<Item = OrchestrationRuntimeStatus>
async fn purge_history(&self, instance_id: &str) -> Result<PurgeHistoryResult, OrchestrationClientError>
async fn purge_histories<S>(&self, created_time_from: DateTime<Utc>, created_time_to: Option<DateTime<Utc>>, runtime_statuses: S) -> Result<PurgeHistoryResult, OrchestrationClientError> where S: IntoIterator<Item = OrchestrationRuntimeStatus>
async fn raise_event<D>(&self, instance_id: &str, event_name: &str, data: D) -> Result<(), OrchestrationClientError> where D: Into<serde_json::Value>
async fn raise_event_for_hub<D>(&self, task_hub: &str, instance_id: &str, event_name: &str, data: D) -> Result<(), OrchestrationClientError> where D: Into<serde_json::Value>
async fn rewind(&self, instance_id: &str, reason: &str) -> Result<(), OrchestrationClientError>
async fn start_new<D>(&self, function_name: &str, instance_id: Option<String>, data: D) -> Result<String, OrchestrationClientError> where D: Into<serde_json::Value>
async fn terminate(&self, instance_id: &str, reason: &str) -> Result<(), OrchestrationClientError>
This binding is used for triggering orchestration functions.
It will be responsible for deserializing the binding context data, including the execution history, and serializing the execution result.
This implementation will rely heavily on the JavaScript implementation.
Orchestration functions in Rust must:
- Be an
async
function. - Take only a single parameter of type
DurableOrchestrationContext
. - Have no return type or return
OrchestrationOutput
.
These requirements will be checked at compile-time.
Additionally, orchestration functions are not permitted to perform asynchronous I/O operations (despite being an async
function). This requirement will be checked at runtime when the user's function is polled by having a Waker
implementation that will panic if used.
This example runs three activities in parallel and returns back an array of the activity output:
use azure_functions::{bindings::DurableOrchestrationContext, durable::OrchestrationOutput, func};
use log::warn;
use serde_json::Value;
#[func]
pub async fn hello_world(mut context: DurableOrchestrationContext) -> OrchestrationOutput {
if !context.is_replaying() {
warn!("Orchestration started at {}.", context.current_time());
}
let activities = vec![
context.call_activity("say_hello", "Tokyo"),
context.call_activity("say_hello", "London"),
context.call_activity("say_hello", "Seattle"),
];
let result: Vec<_> = context
.join_all(activities)
.await
.into_iter()
.map(|r| r.unwrap_or_else(|e| Value::from(format!("Activity failed: {}", e))))
.collect();
if !context.is_replaying() {
warn!("Orchestration completed at {}.", context.current_time());
}
result.into()
}
The binding data will be a JSON string containing the current orchestration execution history.
See the JavaScript defintions of the various history types that will need to be ported to Rust for deserialization.
An example of the input data given at the very start of an orchestration:
{
"history":[
{
"EventType":12,
"EventId":-1,
"IsPlayed":false,
"Timestamp":"2019-07-22T06:20:29.488203Z"
},
{
"OrchestrationInstance":{
"InstanceId":"5238cdd354f4418687aeb6775294ba27",
"ExecutionId":"60353e751ebb44268e6230b5dda25b04"
},
"EventType":0,
"ParentInstance":null,
"Name":"hello_world",
"Version":"",
"Input":"{}",
"Tags":null,
"EventId":-1,
"IsPlayed":false,
"Timestamp":"2019-07-22T06:20:29.101967Z"
}
],
"input":{
},
"instanceId":"5238cdd354f4418687aeb6775294ba27",
"isReplaying":false,
"parentInstanceId":null
}
The DurableOrchestrationContext
binding should serialize into a JSON string representing the execution result of the current invocation of the orchestration function.
See the execution result schema document for the expected output data format.
See the C# documentation for information about these methods.
fn current_time(&self) -> DateTime<Utc>
fn instance_id(&self) -> &str
fn parent_instance_id(&self) -> Option<&str>
fn is_replaying(&self) -> bool
fn call_activity<D>(&mut self, activity_name: &str, data: D) -> impl Future<Output = Result<serde_json::Value, String>> where D: Into<serde_json::Value>
fn call_activity_with_retry<D>(&mut self, activity_name: &str, retry_options: RetryOptions, data: D) -> impl Future<Output = Result<serde_json::Value, String>> where D: Into<serde_json::Value>
fn call_sub_orchestrator<D>(&mut self, function_name: &str, instance_id, Option<String>, data: D) -> impl Future<Output = Result<serde_json::Value, String>> where D: Into<serde_json::Value>
fn call_sub_orchestrator_with_retry<D>(&mut self, function_name: &str, instance_id, Option<String>, retry_options: RetryOptions, data: D) -> impl Future<Output = Result<serde_json::Value, String>> where D: Into<serde_json::Value>
fn continue_as_new<D>(&mut self, data: D, preserve_unprocessed_events: bool) -> impl Future<Output = ()> where D: Into<serde_json::Value>
fn create_timer<S>(&mut self, fire_at: DateTime<Utc>, state: S) -> impl Future<Output = Result<serde_json::Value, String>> where S: Into<serde_json::Value>
fn input(&self) -> serde_json::Value
fn new_guid(&self) -> uuid::Uuid
fn set_custom_status<S>(&mut self, status: S) where S: Into<serde_json::Value>
fn wait_for_event(&mut self, name: &str) -> impl Future<Output = Result<serde_json::Value, String>>
fn wait_for_event_with_timeout(&mut self, name: &str) -> impl Future<Output = Result<Option<serde_json::Value>, String>>
fn join_all<I>(&self, iter: I) -> JoinAll<I::Item>
fn select_all<I>(&self, iter: I) -> SelectAll<<<I as IntoIterator>::Item as IntoFuture>::Future>
For implementing most of these methods, consult the JavaScript implementation.
The generator loop will be driven by a Rust Future
and not a generator. The Future
-returning methods of the DurableOrchestrationContext
should return Poll::Ready
when the requested action was previously scheduled and has completed (successful or failed). It should return Poll::Pending
if the action has not yet been scheduled or has not completed.
The Future
representing the user's orchestration function will be polled exactly once and the provided std::task::Context
will not support asynchronous I/O operations; the underlying Waker
implementation will panic if used. The Future
will progress as far as it can given the current execution history, building a list of actions that the being performed as it goes (including those that have already completed). Whenever the user's function awaits a Future
returned from the DurableOrchestrationContext
that has not yet been scheduled, Poll::Pending
will be returned causing Azure Functions for Rust to return back the execution result for the current invocation of the orchestration function. If the Future
representing the user's function returns Poll::Ready
, the execution result is marked as "done".
Here is the callActivity
implementation in JavaScript:
private callActivity(state: HistoryEvent[], name: string, input?: unknown): Task {
const newAction = new CallActivityAction(name, input);
const taskScheduled = this.findTaskScheduled(state, name);
const taskCompleted = this.findTaskCompleted(state, taskScheduled);
const taskFailed = this.findTaskFailed(state, taskScheduled);
this.setProcessed([taskScheduled, taskCompleted, taskFailed]);
if (taskCompleted) {
const result = this.parseHistoryEvent(taskCompleted);
return new Task(true, false, newAction, result, taskCompleted.Timestamp, taskCompleted.TaskScheduledId);
} else if (taskFailed) {
return new Task(
true,
true,
newAction,
taskFailed.Reason,
taskFailed.Timestamp,
taskFailed.TaskScheduledId,
new Error(taskFailed.Reason),
);
} else {
return new Task(
false,
false,
newAction,
);
}
}
For Rust, a port of this method might look like:
/// Schedules an activity function for execution.
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub fn call_activity<D>(
&mut self,
activity_name: &str,
data: D,
) -> impl Future<Output = Result<Value, String>> + OrchestrationFuture
where
D: Into<Value>,
{
let mut state = self.state.borrow_mut();
// Push the action on the execution result
state.result.borrow_mut().push_action(Action::CallActivity {
function_name: activity_name.to_string(),
input: data.into(),
});
let mut result: Option<Result<Value, String>> = None;
// Attempt to resolve the activity
if let Some(scheduled) = state.find_scheduled_activity(activity_name) {
scheduled.is_processed = true;
let id = scheduled.event_id;
if let Some(completed) = state.find_completed_activity(id) {
completed.is_processed = true;
result = Some(Ok(completed
.result
.as_ref()
.map(|s| from_str(&s).unwrap_or_default())
.unwrap_or(Value::Null)));
} else if let Some(failed) = state.find_failed_activity(id) {
failed.is_processed = true;
result = Some(Err(failed.reason.clone().unwrap_or_default()));
}
}
ActionFuture::new(result, self.state.clone())
}
This binding is used for triggering activity functions.
Unlike orchestration functions, activity functions are essentially normal Azure Functions that can have any number of additional input and output bindings.
Activity functions in Rust must:
- Not have a return value, return
()
in the$return
binding position, or returnActivityOutput
in the$return
binding position. Additional output bindings are supported.
These requirements will be checked at compile-time.
The ActivityOutput
type will not be a "real" binding; it will not produce a binding on $return
, but will set the return value of the function.
This example sets the output of the activity to a "hello" message:
use azure_functions::{bindings::DurableActivityContext, durable::ActivityOutput, func};
#[func]
pub fn say_hello(context: DurableActivityContext) -> ActivityOutput {
format!(
"Hello {}!",
context.input.as_str().expect("expected a string input")
)
.into()
}
See the C# documentation for information about these methods.
fn instance_id(&self) -> &str
fn input(&self) -> serde_json::Value
The orchestration client will be implemented by the OrchestrationClient
type in the azure-functions-durable
crate. It will be responsible for communicating with the Durable Functions extension via the HTTP API.
It is recommended to use hyper for the HTTP communication. However, a release of hyper that is based off of std::future
is not yet released. For the initial implementation of the orchestration client, it is acceptable to reference the master
branch of the hyper repository in Cargo.toml
rather than the currently released version.
We may use the JavaScript implementation of the client to serve as a basis for the implementation of the Rust OrchestrationClient
.
The client will be configured with a set of URLs that should be used to communicate with the Durable Functions extension. For Azure Functions for Rust, this information will be passed via the DurableOrchestrationClient
binding. The client must manipulate the URLs based on data passed to it before using the URLs with the HTTP client.
The client will be configured with two URLs used for creating new orchestration instances.
The create_new_instance_url
URL will be used for requesting that the Durable Functions extension create a new orchestration.
Here is an example provided by a locally running Durable Function extension:
http://localhost:8080/runtime/webhooks/durabletask/orchestrators/{functionName}[/{instanceId}]?code=TE9M
And when formatted by OrchestrationClient
for creating a new instance for the hello_world
orchestration function with a default instance id:
http://localhost:8080/runtime/webhooks/durabletask/orchestrators/hello_world?code=TE9M
POST
The content type should be application/javascript
and the request body should be the input value to the orchestration function.
functionName
- The name of the orchestration function (required).instanceId
- The instance identifier to use for the orchestration (optional).
The create_new_instance_and_wait_url
URL will be used for requesting that the Durable Functions extension create a new orchestration and wait for it to complete. Note: this URL is not used in the JavaScript implementation; it may not be needed for the Rust implementation either.
Here is an example provided by a locally running Durable Function extension:
http://localhost:8080/runtime/webhooks/durabletask/orchestrators/{functionName}[/{instanceId}]?timeout={timeoutInSeconds}&pollingInterval={intervalInSeconds}&code=TE9M
And when formatted by OrchestrationClient
for creating a new instance for the hello_world
orchestration function with a default instance id and with a timeout of 10 minutes that polls the orchestration status every 30 seconds:
http://localhost:8080/runtime/webhooks/durabletask/orchestrators/hello_world?timeout=3600&pollingInterval=30&code=TE9M
POST
The content type should be application/javascript
and the request body should be the input value to the orchestration function.
functionName
- The name of the orchestration function (required).instanceId
- The instance identifier to use for the orchestration (optional).timeoutInSeconds
- The timeout of the wait, in seconds (required).intervalInSeconds
- The poll interval, in seconds (required).
The management URLs will be used to manage existing orchestrations.
See the HTTP API URL discovery section of the documentation for an example of the data that the client will need to be configured with.
The data provided in the "example response" is what the client will need to operate.
The status_query_url
URL will be used query the status of existing orchestration(s).
See the API documentation for parameters and options.
Note: the instanceId
parameter will be already populated in the URL given to the client. The client will also be passed the current orchestration instance identifier so that the client can remove the instance identifier from the URL, if needed.
An example status_query_url
used to configure an orchestration client (where INSTANCEID
is the current orchestration instance identifier):
http://localhost:8080/runtime/webhooks/durabletask/instances/INSTANCEID?taskHub=DurableFunctionsHub&connection=Storage&code=TE9M
The raise_event_url
URL will be used to raise an external event for an orchestration.
See the API documentation for parameters and options.
Note: the instanceId
parameter will be already populated in the URL given to the client. The client will also be passed the current orchestration instance identifier so that the client can remove the instance identifier from the URL, if needed.
An example raise_event_url
used to configure an orchestration client (where INSTANCEID
is the current orchestration instance identifier):
http://localhost:8080/runtime/webhooks/durabletask/instances/INSTANCEID/raiseEvent/{eventName}?taskHub=DurableFunctionsHub&connection=Storage&code=TE9M
The terminate_url
URL will be used to terminate an orchestration.
See the API documentation for parameters and options.
Note: the instanceId
parameter will be already populated in the URL given to the client. The client will also be passed the current orchestration instance identifier so that the client can remove the instance identifier from the URL, if needed.
An example terminate_url
used to configure an orchestration client (where INSTANCEID
is the current orchestration instance identifier):
http://localhost:8080/runtime/webhooks/durabletask/instances/INSTANCEID/terminate?reason={text}&taskHub=DurableFunctionsHub&connection=Storage&code=TE9M
The rewind_url
URL will be used to restore a failed orchestration instance into a running state by replaying the most recent failed operations.
See the API documentation for parameters and options.
Note: the instanceId
parameter will be already populated in the URL given to the client. The client will also be passed the current orchestration instance identifier so that the client can remove the instance identifier from the URL, if needed.
An example rewind_url
used to configure an orchestration client (where INSTANCEID
is the current orchestration instance identifier):
http://localhost:8080/runtime/webhooks/durabletask/instances/INSTANCEID/rewind?reason={text}&taskHub=DurableFunctionsHub&connection=Storage&code=TE9M
The purge_history_url
URL will be used to restore a failed orchestration instance into a running state by replaying the most recent failed operations.
See the API documentation for parameters and options.
Note: the instanceId
parameter will be already populated in the URL given to the client. The client will also be passed the current orchestration instance identifier so that the client can remove the instance identifier from the URL, if needed.
An example rewind_url
used to configure an orchestration client (where INSTANCEID
is the current orchestration instance identifier):
http://localhost:8080/runtime/webhooks/durabletask/instances/INSTANCEID?taskHub=DurableFunctionsHub&connection=Storage&code=TE9M
The methods on OrchestrationClient
will be the same as the DurableOrchestrationClient
binding type.
async fn query_status(&self, instance_id: &str, show_history: bool, show_history_output: bool, show_input: bool) -> Result<impl IntoIterator<Item = OrchestrationStatusQueryResult>, OrchestrationClientError>
async fn query_all_statuses(&self) -> Result<impl IntoIterator<Item = OrchestrationStatusQueryResult>, OrchestrationClientError>
async fn query_statuses<S>(&self, created_time_from: DateTime<Utc>, created_time_to: Option<DateTime<Utc>>, runtime_statuses: S) -> Result<impl IntoIterator<Item = OrchestrationStatusQueryResult>, OrchestrationClientError> where S: IntoIterator<Item = OrchestrationRuntimeStatus>
async fn purge_history(&self, instance_id: &str) -> Result<PurgeHistoryResult, OrchestrationClientError>
async fn purge_histories<S>(&self, created_time_from: DateTime<Utc>, created_time_to: Option<DateTime<Utc>>, runtime_statuses: S) -> Result<PurgeHistoryResult, OrchestrationClientError> where S: IntoIterator<Item = OrchestrationRuntimeStatus>
async fn raise_event<D>(&self, instance_id: &str, event_name: &str, data: D) -> Result<(), OrchestrationClientError> where D: Into<serde_json::Value>
async fn raise_event_for_hub<D>(&self, task_hub: &str, instance_id: &str, event_name: &str, data: D) -> Result<(), OrchestrationClientError> where D: Into<serde_json::Value>
async fn rewind(&self, instance_id: &str, reason: &str) -> Result<(), OrchestrationClientError>
async fn start_new<D>(&self, function_name: &str, instance_id: Option<String>, data: D) -> Result<String, OrchestrationClientError> where D: Into<serde_json::Value>
async fn terminate(&self, instance_id: &str, reason: &str) -> Result<(), OrchestrationClientError>