Skip to content

Instantly share code, notes, and snippets.

@peterhuene
Last active July 28, 2019 06:31
Show Gist options
  • Save peterhuene/f5c305c637f6642f46906c66e624a418 to your computer and use it in GitHub Desktop.
Save peterhuene/f5c305c637f6642f46906c66e624a418 to your computer and use it in GitHub Desktop.
Durable Functions for Rust

Durable Functions for Rust

Durable Functions for Rust will be a port of the Durable Functions implementation for JavaScript, but with an API that makes sense for Rust.

See the Durable Functions Overview documentation for more information about Durable Functions.

Getting Started

The repository we'll be using for upstream is Azure Functions for Rust. Please fork this repository to your GitHub account.

The branch we'll be using is the durable-functions branch. When implementing work, please branch from the durable-functions branch and submit a pull request to the durable-functions branch on the upstream Azure Functions for Rust repository to move completed work into upstream.

This feature will make use of the unstable async/await feature of Rust. Therefore, a nightly Rust compiler is required. When the implementation of this feature is completed and it's time to move the work into the dev branch, the appropriate feature gating will be performed so that users can continue to use a stable Rust compiler if they do not want to use the Durable Functions feature.

Installation

  1. Install rustup.
  2. rustup install nightly && rustup default nightly
  3. rustup component add rustfmt (for cargo fmt to format your source code)
  4. rustup component add clippy (for cargo clippy to catch linter errors)
  5. Install the requirements listed for Azure Functions for Rust.
  6. Checkout the durable-functions branch: git checkout durable-functions.
  7. Install the local Azure Functions for Rust SDK: cargo install -f --path azure-functions-sdk.

Running the Durable Functions Example

Run the "Durable Functions" example:

  1. cd examples/durable-functions
  2. func settings add AzureWebJobsStorage <connection_string> (note: it should be possible to use the Storage Emulator instead; I tend to use an actual Azure Storage account. If you are using an actual storage account, please do not commit changes to local.settings.json, otherwise it'll get flagged as a leaked secret and you'll need to recycle your access key.)
  3. cargo func run -- --features unstable

Note: the sample is a work in progress and is not yet doing much of anything because there is no orchestration client or orchestration context implementation in place.

Debugging the Sample

The sample should have a VS Code launch profile named "Debug" that uses the CodeLLDB extension.

  1. cd samples/durable-functions.
  2. code .
  3. Debug -> Start Debugging

Viewing Generated Code

If you're curious to see what code is getting generated when you build an Azure Function, use the Cargo Expand tool.

Implementation

There are three parts of the implementation required to port Durable Functions to Rust:

  • The Bindings for Durable Functions that developers will use in their Azure Functions.
  • The Orchestration Client that interacts with the Durable Functions HTTP API for starting orchestrations and querying their status. This will be ported from the JavaScript implementation.
  • Support for creating orchestration and activity functions in the Azure Functions for Rust SDK (e.g. cargo func new orchestration and cargo func new activity).

Bindings

There are three Durable Functions bindings:

The bindings will be implemented as part of the azure-functions crate.

DurableOrchestrationClient Binding

This binding is responsible for providing an instance of an Orchestration Client to enable users to schedule orchestrations.

The binding data will provide two sets of URLs that will be passed to configure the Orchestration Client:

  • Creation URLs - used to create new orchestration.
  • Management URLs - used to query the status and manage existing orchestrations.

The binding type will internally keep an OrchestrationClient from the azure-functions-durable crate and forward its methods to the client implementation.

Example

Typically, developers will use the DurableOrchestrationClient binding to start an orchestration:

use azure_functions::{
    bindings::{DurableOrchestrationClient, HttpRequest, HttpResponse},
    func,
};
use serde_json::Value;

#[func]
pub async fn start(_req: HttpRequest, client: DurableOrchestrationClient) -> HttpResponse {
    match client.start_new("hello_world", None, Value::Null).await {
        Ok(_) => "Orchestration started.".into(),
        Err(e) => format!("Failed to start orchestration: {}", e).into(),
    }
}

Methods

See the C# documentation for information about these methods.

fn task_hub() -> &str
async fn query_status(&self, instance_id: &str, show_history: bool, show_history_output: bool, show_input: bool) -> Result<impl Iterator<Item = OrchestrationStatusQueryResult>, OrchestrationClientError>
async fn query_all_statuses(&self) -> Result<impl Iterator<Item = OrchestrationStatusQueryResult>, OrchestrationClientError>
async fn query_statuses<S>(&self, created_time_from: DateTime<Utc>, created_time_to: Option<DateTime<Utc>>, runtime_statuses: S) -> Result<impl Iterator<Item = OrchestrationStatusQueryResult>, OrchestrationClientError> where S: IntoIterator<Item = OrchestrationRuntimeStatus> 
async fn purge_history(&self, instance_id: &str) -> Result<PurgeHistoryResult, OrchestrationClientError>
async fn purge_histories<S>(&self, created_time_from: DateTime<Utc>, created_time_to: Option<DateTime<Utc>>, runtime_statuses: S) -> Result<PurgeHistoryResult, OrchestrationClientError> where S: IntoIterator<Item = OrchestrationRuntimeStatus> 
async fn raise_event<D>(&self, instance_id: &str, event_name: &str, data: D) -> Result<(), OrchestrationClientError> where D: Into<serde_json::Value>
async fn raise_event_for_hub<D>(&self, task_hub: &str, instance_id: &str, event_name: &str, data: D) -> Result<(), OrchestrationClientError> where D: Into<serde_json::Value> 
async fn rewind(&self, instance_id: &str, reason: &str) -> Result<(), OrchestrationClientError>
async fn start_new<D>(&self, function_name: &str, instance_id: Option<String>, data: D) -> Result<String, OrchestrationClientError> where D: Into<serde_json::Value>
async fn terminate(&self, instance_id: &str, reason: &str) -> Result<(), OrchestrationClientError>

DurableOrchestrationContext Binding

This binding is used for triggering orchestration functions.

It will be responsible for deserializing the binding context data, including the execution history, and serializing the execution result.

This implementation will rely heavily on the JavaScript implementation.

Orchestration functions in Rust must:

  • Be an async function.
  • Take only a single parameter of type DurableOrchestrationContext.
  • Have no return type or return OrchestrationOutput.

These requirements will be checked at compile-time.

Additionally, orchestration functions are not permitted to perform asynchronous I/O operations (despite being an async function). This requirement will be checked at runtime when the user's function is polled by having a Waker implementation that will panic if used.

Example

This example runs three activities in parallel and returns back an array of the activity output:

use azure_functions::{bindings::DurableOrchestrationContext, durable::OrchestrationOutput, func};
use log::warn;
use serde_json::Value;

#[func]
pub async fn hello_world(mut context: DurableOrchestrationContext) -> OrchestrationOutput {
    if !context.is_replaying() {
        warn!("Orchestration started at {}.", context.current_time());
    }

    let activities = vec![
        context.call_activity("say_hello", "Tokyo"),
        context.call_activity("say_hello", "London"),
        context.call_activity("say_hello", "Seattle"),
    ];

    let result: Vec<_> = context
        .join_all(activities)
        .await
        .into_iter()
        .map(|r| r.unwrap_or_else(|e| Value::from(format!("Activity failed: {}", e))))
        .collect();

    if !context.is_replaying() {
        warn!("Orchestration completed at {}.", context.current_time());
    }

    result.into()
}

Input Data

The binding data will be a JSON string containing the current orchestration execution history.

See the JavaScript defintions of the various history types that will need to be ported to Rust for deserialization.

Example Input Data

An example of the input data given at the very start of an orchestration:

{
   "history":[
      {
         "EventType":12,
         "EventId":-1,
         "IsPlayed":false,
         "Timestamp":"2019-07-22T06:20:29.488203Z"
      },
      {
         "OrchestrationInstance":{
            "InstanceId":"5238cdd354f4418687aeb6775294ba27",
            "ExecutionId":"60353e751ebb44268e6230b5dda25b04"
         },
         "EventType":0,
         "ParentInstance":null,
         "Name":"hello_world",
         "Version":"",
         "Input":"{}",
         "Tags":null,
         "EventId":-1,
         "IsPlayed":false,
         "Timestamp":"2019-07-22T06:20:29.101967Z"
      }
   ],
   "input":{

   },
   "instanceId":"5238cdd354f4418687aeb6775294ba27",
   "isReplaying":false,
   "parentInstanceId":null
}

Output Data

The DurableOrchestrationContext binding should serialize into a JSON string representing the execution result of the current invocation of the orchestration function.

See the execution result schema document for the expected output data format.

Methods

See the C# documentation for information about these methods.

fn current_time(&self) -> DateTime<Utc>
fn instance_id(&self) -> &str
fn parent_instance_id(&self) -> Option<&str>
fn is_replaying(&self) -> bool
fn call_activity<D>(&mut self, activity_name: &str, data: D) -> impl Future<Output = Result<serde_json::Value, String>> where D: Into<serde_json::Value>
fn call_activity_with_retry<D>(&mut self, activity_name: &str, retry_options: RetryOptions, data: D) -> impl Future<Output = Result<serde_json::Value, String>> where D: Into<serde_json::Value>
fn call_sub_orchestrator<D>(&mut self, function_name: &str, instance_id, Option<String>, data: D) -> impl Future<Output = Result<serde_json::Value, String>> where D: Into<serde_json::Value>
fn call_sub_orchestrator_with_retry<D>(&mut self, function_name: &str, instance_id, Option<String>, retry_options: RetryOptions, data: D) -> impl Future<Output = Result<serde_json::Value, String>> where D: Into<serde_json::Value>
fn continue_as_new<D>(&mut self, data: D, preserve_unprocessed_events: bool) -> impl Future<Output = ()> where D: Into<serde_json::Value>
fn create_timer<S>(&mut self, fire_at: DateTime<Utc>, state: S) -> impl Future<Output = Result<serde_json::Value, String>> where S: Into<serde_json::Value>
fn input(&self) -> serde_json::Value
fn new_guid(&self) -> uuid::Uuid
fn set_custom_status<S>(&mut self, status: S) where S: Into<serde_json::Value> 
fn wait_for_event(&mut self, name: &str) -> impl Future<Output = Result<serde_json::Value, String>>
fn wait_for_event_with_timeout(&mut self, name: &str) -> impl Future<Output = Result<Option<serde_json::Value>, String>>
fn join_all<I>(&self, iter: I) -> JoinAll<I::Item>
fn select_all<I>(&self, iter: I) -> SelectAll<<<I as IntoIterator>::Item as IntoFuture>::Future> 

Porting from JavaScript

For implementing most of these methods, consult the JavaScript implementation.

The generator loop will be driven by a Rust Future and not a generator. The Future-returning methods of the DurableOrchestrationContext should return Poll::Ready when the requested action was previously scheduled and has completed (successful or failed). It should return Poll::Pending if the action has not yet been scheduled or has not completed.

The Future representing the user's orchestration function will be polled exactly once and the provided std::task::Context will not support asynchronous I/O operations; the underlying Waker implementation will panic if used. The Future will progress as far as it can given the current execution history, building a list of actions that the being performed as it goes (including those that have already completed). Whenever the user's function awaits a Future returned from the DurableOrchestrationContext that has not yet been scheduled, Poll::Pending will be returned causing Azure Functions for Rust to return back the execution result for the current invocation of the orchestration function. If the Future representing the user's function returns Poll::Ready, the execution result is marked as "done".

Porting Example

Here is the callActivity implementation in JavaScript:

private callActivity(state: HistoryEvent[], name: string, input?: unknown): Task {
    const newAction = new CallActivityAction(name, input);

    const taskScheduled = this.findTaskScheduled(state, name);
    const taskCompleted = this.findTaskCompleted(state, taskScheduled);
    const taskFailed = this.findTaskFailed(state, taskScheduled);
    this.setProcessed([taskScheduled, taskCompleted, taskFailed]);

    if (taskCompleted) {
        const result = this.parseHistoryEvent(taskCompleted);

        return new Task(true, false, newAction, result, taskCompleted.Timestamp, taskCompleted.TaskScheduledId);
    } else if (taskFailed) {
        return new Task(
            true,
            true,
            newAction,
            taskFailed.Reason,
            taskFailed.Timestamp,
            taskFailed.TaskScheduledId,
            new Error(taskFailed.Reason),
        );
    } else {
        return new Task(
            false,
            false,
            newAction,
        );
    }
}

For Rust, a port of this method might look like:

/// Schedules an activity function for execution.
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub fn call_activity<D>(
    &mut self,
    activity_name: &str,
    data: D,
) -> impl Future<Output = Result<Value, String>> + OrchestrationFuture
where
    D: Into<Value>,
{
    let mut state = self.state.borrow_mut();

    // Push the action on the execution result
    state.result.borrow_mut().push_action(Action::CallActivity {
        function_name: activity_name.to_string(),
        input: data.into(),
    });

    let mut result: Option<Result<Value, String>> = None;

    // Attempt to resolve the activity
    if let Some(scheduled) = state.find_scheduled_activity(activity_name) {
        scheduled.is_processed = true;

        let id = scheduled.event_id;
        if let Some(completed) = state.find_completed_activity(id) {
            completed.is_processed = true;
            result = Some(Ok(completed
                .result
                .as_ref()
                .map(|s| from_str(&s).unwrap_or_default())
                .unwrap_or(Value::Null)));
        } else if let Some(failed) = state.find_failed_activity(id) {
            failed.is_processed = true;
            result = Some(Err(failed.reason.clone().unwrap_or_default()));
        }
    }

    ActionFuture::new(result, self.state.clone())
}

DurableActivityContext Binding

This binding is used for triggering activity functions.

Unlike orchestration functions, activity functions are essentially normal Azure Functions that can have any number of additional input and output bindings.

Activity functions in Rust must:

  • Not have a return value, return () in the $return binding position, or return ActivityOutput in the $return binding position. Additional output bindings are supported.

These requirements will be checked at compile-time.

The ActivityOutput type will not be a "real" binding; it will not produce a binding on $return, but will set the return value of the function.

Example

This example sets the output of the activity to a "hello" message:

use azure_functions::{bindings::DurableActivityContext, durable::ActivityOutput, func};

#[func]
pub fn say_hello(context: DurableActivityContext) -> ActivityOutput {
    format!(
        "Hello {}!",
        context.input.as_str().expect("expected a string input")
    )
    .into()
}

Methods

See the C# documentation for information about these methods.

fn instance_id(&self) -> &str
fn input(&self) -> serde_json::Value

Orchestration Client

The orchestration client will be implemented by the OrchestrationClient type in the azure-functions-durable crate. It will be responsible for communicating with the Durable Functions extension via the HTTP API.

It is recommended to use hyper for the HTTP communication. However, a release of hyper that is based off of std::future is not yet released. For the initial implementation of the orchestration client, it is acceptable to reference the master branch of the hyper repository in Cargo.toml rather than the currently released version.

We may use the JavaScript implementation of the client to serve as a basis for the implementation of the Rust OrchestrationClient.

Configuration

The client will be configured with a set of URLs that should be used to communicate with the Durable Functions extension. For Azure Functions for Rust, this information will be passed via the DurableOrchestrationClient binding. The client must manipulate the URLs based on data passed to it before using the URLs with the HTTP client.

Creation URLs

The client will be configured with two URLs used for creating new orchestration instances.

Create New Instance URL

The create_new_instance_url URL will be used for requesting that the Durable Functions extension create a new orchestration.

Example

Here is an example provided by a locally running Durable Function extension:

http://localhost:8080/runtime/webhooks/durabletask/orchestrators/{functionName}[/{instanceId}]?code=TE9M

And when formatted by OrchestrationClient for creating a new instance for the hello_world orchestration function with a default instance id:

http://localhost:8080/runtime/webhooks/durabletask/orchestrators/hello_world?code=TE9M
HTTP Method

POST

Request content

The content type should be application/javascript and the request body should be the input value to the orchestration function.

Parameters
  • functionName - The name of the orchestration function (required).
  • instanceId - The instance identifier to use for the orchestration (optional).
Create New Instance and Wait URL

The create_new_instance_and_wait_url URL will be used for requesting that the Durable Functions extension create a new orchestration and wait for it to complete. Note: this URL is not used in the JavaScript implementation; it may not be needed for the Rust implementation either.

Example

Here is an example provided by a locally running Durable Function extension:

http://localhost:8080/runtime/webhooks/durabletask/orchestrators/{functionName}[/{instanceId}]?timeout={timeoutInSeconds}&pollingInterval={intervalInSeconds}&code=TE9M

And when formatted by OrchestrationClient for creating a new instance for the hello_world orchestration function with a default instance id and with a timeout of 10 minutes that polls the orchestration status every 30 seconds:

http://localhost:8080/runtime/webhooks/durabletask/orchestrators/hello_world?timeout=3600&pollingInterval=30&code=TE9M
HTTP Method

POST

Request content

The content type should be application/javascript and the request body should be the input value to the orchestration function.

Parameters
  • functionName - The name of the orchestration function (required).
  • instanceId - The instance identifier to use for the orchestration (optional).
  • timeoutInSeconds - The timeout of the wait, in seconds (required).
  • intervalInSeconds - The poll interval, in seconds (required).

Management URLs

The management URLs will be used to manage existing orchestrations.

See the HTTP API URL discovery section of the documentation for an example of the data that the client will need to be configured with.

The data provided in the "example response" is what the client will need to operate.

Status Query URL

The status_query_url URL will be used query the status of existing orchestration(s).

See the API documentation for parameters and options.

Note: the instanceId parameter will be already populated in the URL given to the client. The client will also be passed the current orchestration instance identifier so that the client can remove the instance identifier from the URL, if needed.

Example

An example status_query_url used to configure an orchestration client (where INSTANCEID is the current orchestration instance identifier):

http://localhost:8080/runtime/webhooks/durabletask/instances/INSTANCEID?taskHub=DurableFunctionsHub&connection=Storage&code=TE9M
Send Event URL

The raise_event_url URL will be used to raise an external event for an orchestration.

See the API documentation for parameters and options.

Note: the instanceId parameter will be already populated in the URL given to the client. The client will also be passed the current orchestration instance identifier so that the client can remove the instance identifier from the URL, if needed.

Example

An example raise_event_url used to configure an orchestration client (where INSTANCEID is the current orchestration instance identifier):

http://localhost:8080/runtime/webhooks/durabletask/instances/INSTANCEID/raiseEvent/{eventName}?taskHub=DurableFunctionsHub&connection=Storage&code=TE9M
Terminate URL

The terminate_url URL will be used to terminate an orchestration.

See the API documentation for parameters and options.

Note: the instanceId parameter will be already populated in the URL given to the client. The client will also be passed the current orchestration instance identifier so that the client can remove the instance identifier from the URL, if needed.

Example

An example terminate_url used to configure an orchestration client (where INSTANCEID is the current orchestration instance identifier):

http://localhost:8080/runtime/webhooks/durabletask/instances/INSTANCEID/terminate?reason={text}&taskHub=DurableFunctionsHub&connection=Storage&code=TE9M
Rewind URL

The rewind_url URL will be used to restore a failed orchestration instance into a running state by replaying the most recent failed operations.

See the API documentation for parameters and options.

Note: the instanceId parameter will be already populated in the URL given to the client. The client will also be passed the current orchestration instance identifier so that the client can remove the instance identifier from the URL, if needed.

Example

An example rewind_url used to configure an orchestration client (where INSTANCEID is the current orchestration instance identifier):

http://localhost:8080/runtime/webhooks/durabletask/instances/INSTANCEID/rewind?reason={text}&taskHub=DurableFunctionsHub&connection=Storage&code=TE9M
Purge History URL

The purge_history_url URL will be used to restore a failed orchestration instance into a running state by replaying the most recent failed operations.

See the API documentation for parameters and options.

Note: the instanceId parameter will be already populated in the URL given to the client. The client will also be passed the current orchestration instance identifier so that the client can remove the instance identifier from the URL, if needed.

Example

An example rewind_url used to configure an orchestration client (where INSTANCEID is the current orchestration instance identifier):

http://localhost:8080/runtime/webhooks/durabletask/instances/INSTANCEID?taskHub=DurableFunctionsHub&connection=Storage&code=TE9M

Methods

The methods on OrchestrationClient will be the same as the DurableOrchestrationClient binding type.

async fn query_status(&self, instance_id: &str, show_history: bool, show_history_output: bool, show_input: bool) -> Result<impl IntoIterator<Item = OrchestrationStatusQueryResult>, OrchestrationClientError>
async fn query_all_statuses(&self) -> Result<impl IntoIterator<Item = OrchestrationStatusQueryResult>, OrchestrationClientError>
async fn query_statuses<S>(&self, created_time_from: DateTime<Utc>, created_time_to: Option<DateTime<Utc>>, runtime_statuses: S) -> Result<impl IntoIterator<Item = OrchestrationStatusQueryResult>, OrchestrationClientError> where S: IntoIterator<Item = OrchestrationRuntimeStatus>
async fn purge_history(&self, instance_id: &str) -> Result<PurgeHistoryResult, OrchestrationClientError>
async fn purge_histories<S>(&self, created_time_from: DateTime<Utc>, created_time_to: Option<DateTime<Utc>>, runtime_statuses: S) -> Result<PurgeHistoryResult, OrchestrationClientError> where S: IntoIterator<Item = OrchestrationRuntimeStatus>
async fn raise_event<D>(&self, instance_id: &str, event_name: &str, data: D) -> Result<(), OrchestrationClientError> where D: Into<serde_json::Value>
async fn raise_event_for_hub<D>(&self, task_hub: &str, instance_id: &str, event_name: &str, data: D) -> Result<(), OrchestrationClientError> where D: Into<serde_json::Value>
async fn rewind(&self, instance_id: &str, reason: &str) -> Result<(), OrchestrationClientError>
async fn start_new<D>(&self, function_name: &str, instance_id: Option<String>, data: D) -> Result<String, OrchestrationClientError> where D: Into<serde_json::Value>
async fn terminate(&self, instance_id: &str, reason: &str) -> Result<(), OrchestrationClientError>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment