Skip to content

Instantly share code, notes, and snippets.

@phatboyg
Last active Jan 23, 2020
Embed
What would you like to do?
MassTransit Conductor

Conductor

Conductor is a layer on top of MassTransit for managing, controlling, and monitoring communication between services. Conductor provides a consistent set of capabilities for services and decouples the complexity of routing, failure handling, and monitoring from the service code itself. This allows service visibility and control without requiring services to be modified to apply behaviorial changes related to how those services interact.

Contents

Architecture

Conductor's core architectural principle is extensibility. As a modern architecture built upon the learning from decades of prior art, which includes everything from SOA, the ESB, and even the Atari 2600 game console, Conductor provides a set of atoms which can be composed into a variety of capabilities. These atoms provide the basic building blocks for creating distributed services.

Services that depend on other services are inheritently coupled to those services. Despite the marketecture about microservices being loosely-coupled, they are still coupled to their dependencies. And unlike a monolithic application which is coupled at design-time, development-time, and compile-time, microservices are coupled at run-time, which makes ensuring that all dependencies are available even more complicated than during earlier stages of development.

Conductor consists of several components, each of which has a specific function. The components include:

Service Endpoint

At the base layer, MassTransit has three types of endpoints. Receive Endpoints are used to connect message consumers to the broker and are typically connected to an input queue. Send Endpoints are used to send messages to a specific destination, which is typically a queue but may be another messaging entity supported by the broker. Finally, Publish Endpoints are used to publish messages to connected receive endpoints, creating a copy of the message in each receive endpoints input queue.

Conductor adds a new endpoint type, the Service Endpoint. A service endpoint is a virtual receive endpoint that is implemented by Conductor. The service endpoint details, such as the consumer types, consumed message types, sent and published message types, etc., are communicated by the Steward to the Conductor. This supports service discovery along with the other features.

Steward (Client)

On the client side, the Steward keeps a local copy of the service catalog, and acts as a request client factory. As notifications are published by service endpoints, those services are added to the service catalog, and the state of each service instance (up or down) is cached. As new request clients are created, unknown types are discovered using published events, or via a request to a distributed service catalog (the Conductor Service).

  • The steward address is prefixed with steward- and is generated using the same algorithm as the temporary bus queue name

    If a steward instance loses connection to the broker, but the process does not exist, the previous steward addresses should be sent to the conductor as "now-known-as" redirects to allow the network partition to be resolved automatically.

Conductor (Service)

The conductor service is the coordinator responsible for service discovery as well traffic management, policy, routing, and service endpoint communication.

Steward uses HTTP to communicate with the Conductor Service on port 7580 (default).

  • Steward may use DNS to get initial list of IP addresses that belong to a cluster of Conductors.

    In this case, an address such as conductor.mynetwork.com would resolve to the URI of the conductor's command queue.

    The default address of the conductor is conductor on the same broker instance (host, or host/vhost).

Inspector

The inspector is responsible for authorization of services, endpoints, and access within the data plane of the system. Messages are inspected for content and/or headers, and that data is used to ensure that messages are properly authorized to use requested services.

Service Discovery

The Steward caches every resource used, and actively checks those resources for availability. If a newly discovered resource is found and the service accepts/passes a health check, it is used. Service discovery is managed separately, allowing endpoints to query and maintain health information for each node using request telemetry and active health checks.

Registration Types

When a service is registered, the service’s availability can be specified.

  • Permanent The service remains registered, even when it isn’t available.
  • Transient The service will automatically be unregistered if none of the service endpoints are available.

Availability Check

Before processing a request, a service may do an availability check to see if the response address is still valid. If the response endpoint is no longer available, the service may discard the request if it is no longer required. This is typically used for queries, which do not change data.

For example, GetSendEndpoint() could perform a QueueDeclarePassive, which would only succeed if the exchange or queue is still present, and fail if the queue or exchange has been removed. This would prevent it from being redeclared, and discard the response rather than sending it into the void.

Tombstone Period

When an endpoint is marked unavailable, a default tombstone period is used. If a new endpoint becomes available that replaces the previous endpoint, requests or responses sent to the previous endpoint may be redirected to the newly registered endpoint. This may be useful when a bus instance loses a connection, and a new response endpoint is created. The responses could be reattached to a new endpoint, allowing them to be received.

Treat a newly registered endpoint as on probation, such as a recently closed circuit break, until it has proven itself a viable endpoint.

Steward/Conductor Protocol

The messages used to communicate amongst the smart endpoints, the central configuration services, and the service endpoints.

Service endpoints may communicate directly with the Steward endpoints by maintaining a list of recent endpoints and sending availability events directly to those Steward endpoints, as well as updating the cache. This is probably better done as a publish/subscribe, so that all Steward endpoints get the update, as well as the shared storage/caching engine. Only trust the linear timestamp of the source system, do not use Now anywhere.

Health Monitoring

Each Steward monitors service health, ensuring that when the service is called it will likely succeed.

External Resource Policy

Policies can be assigned to external resources, such as a remote HTTP endpoint (aka, REST service). These policy may include failure condition checks, rate limits, concurrency limits, etc.

  • Timeout
  • Retry
  • Redirect
  • Block / Allow

Resource Availability

The availability of each resource is tracked, allowing stewards to make intelligent decisions about whether or not it should send a request to a service at any point in time.

There are several types of availability, each of which has a different strategy.

Scheduled Availability

When a resource is only available during a known schedule, that schedule is used to indicate service availability.

Toggled Availability

A manual, on/off switch, typically in response to an unexpected event (FIRE!). This could also be a "feature toggle" implemented as an availability flag.

Limited Availability

When a resource has availability constraints, such as a limited number of calls within a time period (requests/second) or a longer than normal response duration (slow).

  • Manually specified, such as during a service upgrade, rolling outage, etc.
  • Automatically, using metrics
    • Slow response time
    • Faults

Transient Availability

When a resource becomes unavailable temporarily. This is typically unexpected, and is triggered by a series of consecutive faults. Usually handled via a circuit breaker, but in this case using that information to consider other options beyond just fast fail.

  • Fault Detection
    • Consecutive Faults
  • Snooze
    • Disable for n minutes
    • Manual

Resource Templates

Resource templates are used to match resources, which can be used to verify and track resource availability without requiring every single resource to be configured.

  • Host name (which can be resolved, or not resolved, and tracked to avoid sending requests to invalid entries) The host name may be invalid, in which case the fact is invalid is tracked.
  • Email Address (which can be pattern match validated, or actually reported as invalid through a bounce)
  • IP Address (okay, not a DNS name, but is the network address available)
  • URI (match a full network address, for a resource)

Custom Resources

A custom resource allows a service to configure a property of a service, and use that property to distinguish requests. For example, consider the command to check the inventory status of an item in a warehouse:

public interface GetItemInventoryStatus
{
    string ItemNumber { get; }
    string WarehouseId { get; }
}

The service would be able to specify, in the endpoint configuration, the WarehouseId property has a distinct meaning in the service's behavior. In this example, a fault talking to one warehouse doesn't mean that every request would fault, so track faults separately using the property.

Fault Injection

Faults can be injected, similar to what Chaos Monkey was built to do, at service endpoints or external resources.

  • HTTP Abort (400, or specified status code)

  • HTTP Slowdown (delay request)

  • Consumer Fault (Fault<T>)

  • Consumer Timeout

  • Consumer Ghost (drop message, silently)

Traffic Management

Traffic management decouples client and server endpoints, allowing requests to be routed according to type, content, or policy.

A steward can also be configured to send a percentage of traffic to a service variation. This makes it possible to test a new service version without taking the whole load. Traffic tagged with a specific variation identifier can also be routed to that variation's service instance, so that developers can test a single service within the larger production and/or test environment.

Traffic management would also allow Courier to direct the flow of routing slips to services, based upon the activity arguments, types, and names, and should be able to register activities and discover them when using the routing slip builder to add activities to the routing slip.

Type Routing

Uses the message type to determine the service endpoint address.

Courier Activity Routing

When sending a routing slip to the next activity, use the routing slip itinerary to determine the destination address for the next activity. If the activity service endpoint is unavailable at that time, allow policy to be applied.

When creating an intinerary using the routing slip builder, use Steward to return the execute address of the activity based upon the activity name and/or argument type.

Steward may also know enough about the activities to determine if the itinerary includes all needed arguments and variables. Steward should be able to learn (or be told):

  • The activity's service endpoint (for execute) details, including destination address
  • The activity's argument type, properties, etc.
  • The variables added to the routing slip by the activity

Using this information, when a builder is used, it can validate the routing slip to ensure that it's in a sufficient order, has all the required variables (or has the activities which add the required variables).

Event Handling

Traffic management also includes event handling when services are unavailable, under stress, or otherwise non-optimal.

Roadblock

A failing resource, identified by a URI, which if in the path of the policy client should be avoided.

Traffic Jam

A resource that is a bottleneck, either due to a rate limit (static), or the response time of the resource (dynamic).

Detour

Akin to a fallback or rescue, allowing an alternate path for traffic in the case of failures. Defining the behavior when the actions requested of the policy client faults, allowing alternatives to be used up to and perhaps including returning a default response. This is preferable to traffic backing up behind a road block or traffic jam, since it allows an alternative to using the resource which avoids creating increased pressure on the resource.

One detour might be the ability to cache a previous retrieved value, which includes some aspects like:

  • Caching The ability to cache the results from previous requests matching a set of arguments would reduce the load on the service.
    • Normal Lifetime How long an item should be used from the cache under normal operating conditions.
    • Extended/Outage Lifetime How long to keep a value around when the service is unavailable, or exceptional

Rate Limiting / Request Check

Upon the first request to send a message to a service, the client will ask for a token from the service, along with carrying the request message. The service will respond with an allocation of tokens (best-effort, if I ask for 10, and you have 5, so be it, I'll take the 5). Use a sliding window to capture how many requests are being performed per second/minute/interval and request enough to satisfy the next interval, decrementing them as requests are sent.

Environment Versioning

Every configuration change in conductor creates a new version, and previous versions are maintained so that broken changes can be reverted quickly. Each change is also tracked so that a complete change history is available, which may be used for audits or change management purposes.

Variations

Create a variation of an existing system, which allows one or more services to be redirected to separate service instance(s). This allows a service to be tested within an existing system, without requiring a complete duplicate of the system.

For example, if there is a service configured on a queue, creating a variation creates a new id, such as 427. Everything stays the same until Frank (your awesome engineer) decides that he wants to debug that service to reproduce an issue that shows up in production.

By default, the service uses the order-service queue. So production requests are passing through normally, being handled by that service instance. It would be foolish to debug the service in production, since it's handling real traffic, however, the bug only shows up in production. However, since a variation was created (427, remember), it can be used instead.

So Frank fires up Rider, and configures his service to run as variation 427 (I don't know how yet, let's just say it's a command line argument or something). When the service starts, instead of connecting to order-service (because we're using the same conductor/configuration as production), the Get-Configuration 427 returns order-service-427 for the receive endpoint.

So now the service is running, but not receiving any traffic. To test, Frank sends a request via HTTP to the API endpoint, with the X-MassTransit-Variation header set to 427. That API endpoint is smart, knows about MT, and copies that header into the request message, which Steward uses to get the endpoint address for the request. Since 427 matches the lookup value of the variation, it returns order-service-427 instead of the default, and the message goes to Frank's service queue.

The nice part is that since Frank's service is running along with the rest of the system, when the order service needs to check pricing, or allocate inventory, it can call the real inventory service - Frank doesn't have to create a separate one. For services that may have side effects, such as an inventory service (compared to a pricing service that just returns the current price without changing anything), it may make sense to allow variations to have tags associated with them so that services can check for those tags and behave differently. In the case of an inventory service, if the request has tag of {test} it would pretend to allocate the inventory, returning success but not actually doing the work.

Variation Details

Some detailed notes on variations:

  • Each service endpoint is uniquely identified, enclosed in the host/process/etc. in which it's running.
  • All messages published/sent by the service include the variation header
  • Service is considered transient?
  • Filter published events at the consumer, so that if a variation header is present and the consumer is not the matching variation, silenty discard the event. The
  • configuration data should be uses to know if the service has a variation active that matches the header.

Slow Rollout

It may be possible to use variations as a form of slow rollout, sending some percentage of traffic, or traffic with a property associated to a particular set of users/customers/etc. This would allow a custom version of a service to handle traffic for that subset, where the rest of the traffic flows through the default services. This would be akin to a fully distributed feature toggle, which would be like Fooidity on steroids.

Smart Service Endpoint

A smart service endpoint is a receive endpoint which informs the conductor about services, messages, events, etc.

Request Tracking

Conductor should track requests, and be able to retry requests based upon a policy.

Tracking Events

The events used to track requests, which include a combination of steward and conductor generated events, includes:

Event Description
RequestCreated A request was created, identified by a unique RequestId
RequestCompleted A request was successfully completed by a service
RequestFaulted A request faulted, and will not be completed in the future
RequestExpired A request did not complete before the deadline was reached
RequestCanceled A request was canceled. The request may or may not continue to execute, fault, or complete.
RequestAlreadyCompleted A request was attempted to execute but has already completed.

Request State

The state of a request is tracked using a state machine.

public sealed class RequestStateMachine :
    MassTransitStateMachine<RequestState>
{
    public RequestStateMachine()
    {
        InstanceState(x => x.CurrentState, Created, Executing, Completed, Faulted, TimedOut, Expired, Canceled);

        SubState(() => TimedOut, Canceled);
        SubState(() => Expired, Canceled);
    }

    /// <summary>
    /// State when the request has been created
    /// </summary>
    public State Created { get; set; }

    /// <summary>
    /// The request is executing within a consumer
    /// </summary>
    public State Executing { get; set; }

    /// <summary>
    /// The request completed end-to-end
    /// </summary>
    public State Completed { get; set; }

    /// <summary>
    /// The request faulted at the consumer
    /// </summary>
    public State Faulted { get; set; }

    /// <summary>
    /// The request timed out before it could be processed
    /// </summary>
    public State TimedOut { get; set; }

    /// <summary>
    /// The deadline for the request passed, making it no longer relevant
    /// </summary>
    public State Expired { get; set; }

    /// <summary>
    /// The request was canceled by the requestor
    /// </summary>
    public State Canceled { get; set; }
}

public class RequestState :
    SagaStateMachineInstance
{
    /// <summary>
    /// The original RequestId from the request client, which may be waiting for the request
    /// </summary>
    public Guid CorrelationId { get; set; }

    /// <summary>
    /// The current request id, which is used to correlate events back to the state instance
    /// </summary>
    public Guid RequestId { get; set; }

    public int CurrentState { get; set; }

    public DateTime? CreatedAt { get; set; }
    public DateTime? StartedAt { get; set; }
    public DateTime? CompletedAt { get; set; }
    public DateTime? FaultedAt { get; set; }
}
@alexeyzimarev
Copy link

alexeyzimarev commented Nov 26, 2019

I see, so it isn't the library docs, it's the service docs, so the proposal is to include some tooling to generate the AsyncApi specs from the Conductor, right?

@michalpenka
Copy link

michalpenka commented Nov 26, 2019

Correct, sorry for not being clear

Now I would say that should you consider my feature proposal and should it be part of Conductor it shouldn't delay the delivery of Conductor/v6 itself

@phatboyg
Copy link
Author

phatboyg commented Nov 26, 2019

Right, it would be 6.1 or later. The metadata is already a core function of Conductor (ConsumerInfo, ObjectInfo, etc.) so it wouldn't be too hard to use that data to expose the details.

@michalpenka
Copy link

michalpenka commented Nov 26, 2019

Sounds good, thanks for pushing this forward, can't wait for Conductor to be out!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment