Skip to content

Instantly share code, notes, and snippets.

@savagematt
Last active June 29, 2016 14:54
Show Gist options
  • Save savagematt/e0bbbd3975583960dbea to your computer and use it in GitHub Desktop.
Save savagematt/e0bbbd3975583960dbea to your computer and use it in GitHub Desktop.
Toshtogo README WIP

Toshtogo

"Aha!"

What is Toshtogo? (the simple version)

A workflow engine, for monitoring and orchestrating a graph of processes that depend on each other.

Terms

  • Job
  • Agent
  • Job handler
  • Dependency
  • Graph/"Job Graph"

Context

This is a description of the context from which Toshtogo first emerged as a solution.

If you recognise some of these problems, Toshtogo might be suitable for your environment.

If you're considering another workflow engine, such as Luigi or Airflow, then it's probably also worth looking at Toshtogo.

A graph (rather than a linear sequence) of batch jobs that depend on each other's outputs

We have found Toshtogo to be useful for graphs of dependent processes of any complexity. If you want, it'll work fine with a linear, sequential pipeline of jobs. That said, the first use-case was relatively complex: the daily workflow of a hedge fund.

Steps included gathering, cleansing and transforming data from multiple sources; generating trading signals; sending trades for execution; trade reporting; risk reporting; reconciliations...

The graph of jobs contained relatively complex dependencies:

  • >20 different types of jobs
  • 1,000s of individual jobs run each day, as part of the same job graph. The majority were data extraction and transformation
  • Jobs often had multiple dependent jobs waiting for them to complete
  • We had an acyclic directed graph (not a tree) complex enough that we couldn't find a layout algorithm to sensibly visualise it

Dynamic dependencies

It often wasn't known ahead-of-time exactly what dependencies a particular job had.

For example:

  • An easily-understood example from a physical coffee shop: when a customer makes an order, if they order food then we need to pass an order to the kitchen. If they just order coffee, there's no need. The graph of dependencies to complete an order is not known until we've seen the order.
  • In our particular case, when checking what we think our holdings were against our broker's view, we sometimes disagreed. In that case, we need to run a (manual) reconciliation step. If we agree, there's no manual reconcilliation.
  • (with apologies for being esoteric) When calculating VaR using historically simulated price movements, we may need a job to get historical interest rates, but only if our portfolio contains bonds. We don't know whether the historical interest rate job exists until we've looked into our portfolio.

Dynamic dependencies and avoiding duplicate jobs

Dynamic dependencies (that are not known at the time a job is created) introduce a new problem to solve around duplicate jobs. Let's say that we have two nodes in our daily workflow:

  • A.<trade report>, reporting today's trades to our broker at end of day (job type <trade report> with id A)
  • B.<opening positions>, calculating tomorrow's opening positions ready for the morning's run

Both of these operations depend on a third node:

  • C.<today's trades>, the list of the trades that were executed today at market close

With a static computation graph, defined ahead of time in a single operation, we can spot that A.<trade report> and B.<opening positions> both require <today's trades>. We can define a single job, C.<today's trades> and make A. and B. both depend on it.

With a dynamic dependency graph, we don't find out that A. and B. both require <today's trades> until the point at which they are first picked up for execution. Perhaps A. then creates C.<today's trades> and B. creates D.<today's trades>. We now have two <today's trades> jobs- C. and D., which may (but should not) disagree with each other.

We need to provide a way for A. and B. to declare a dependency on the same job without knowing about each other. This needs to be reliable, even in the face of concurrent execution.

(Some) long-running operations

We have found Toshtogo particularly useful for any system with some (not necessarily all) operations lasting more than 10 seconds, at which point:

  • Simple HTTP requests are not a natural fit
  • Recovering from failure by retrying a complete graph of jobs becomes undesirable because it takes a long time. We want continuation of a partially-completed graph.

In our original use case, the duration of our longest job was 1.5 hours. This was a constraint from a 3rd party system which we had no practical route to significantly improving.

Non-transactional, unreliable data stores with no locking or CAS

Most often, the output of our jobs was stored in S3 for auditing purposes; also to move it out of volatile memory- you don't want to re-run a 1.5 hour batch job because some later operation crashes your process.

Often our data stores (S3, FTP sites, POSTs to external websites, etc.) did not support pessimistic locking or compare and set.

Some of our data stores were weirdly unreliable. For example 3rd party (S)FTP sites that lied to us about whether files existed, and which in any case don't support locking; that sort of thing.

This made it impossible to definitively answer the question "is another process already trying to produce the data I am about to produce?". Sometimes we couldn't even answer "does this output already exist?" using the underlying data stores. We needed some other system using a properly transactional datastore to keep track of what outputs had been produced or were currently being produced.

Exactly-once execution

Trades needed to be ready for executing before market open. Because of our 1.5 hour pre-trade batch jobs, it was important that, in normal operation, jobs ran once and only once. Accidentally kicking off multiple copies of a few 1.5 hour jobs would result in missing the deadline for morning trading.

As discussed above, our non-transactional, non-CAS data stores also made it desirable to have a reliable mechanism for exactly-once execution of write operations (since the data stores themselves could not directly support it).

Scheduled execution

Plenty of jobs needed to run at a particular time of day (e.g. trade reporting, executing trades, and so on).

Some of these jobs also had dependencies, which they would wait for even if their scheduled start time had been reached.

A polyglot environment

Clojure code relied on the output of Matlab code, which relied on the output of Python code, and so on.

Forgiving latency requirements

Once your critical path takes 1.5 hours, latency of message passing becomes less significant.

Jobs requiring human intervention

Some jobs required human intelligence to complete. For example confirming whether to accept our view of our holdings or our broker's, where we disagreed.

We wanted an easy way to build a UI specific to these manual job types and have it retrieve and complete jobs using an HTTP API into a coordinating system.

Exception handling

In production, we needed a way to be alerted of errors from all systems, with minimal (or zero) boilerplate in individual projects.

We needed to be able to easily trigger retrying a job after fixing whatever problem caused it to fail.

We also wanted as much information available as possible in order to diagnose an issue. Ideally this information would be presented in a more structured manner than might be achievable with a generic UI over ElasticSearch or similar.

It's somewhat in the nature of micro-services- or whatever we're calling them this week- that occasionally we need to make changes that ripple across a few systems. One convenient approach when doing this is to spin up all relevant services locally (job handlers, in our case) and, when errors happen, redeploy the code locally and retry the job. Then rinse and repeat until the whole graph passes.

In this use case, we see a lot of exceptions and a lot of retries. We want to make this feedback cycle easy.

Polling

Some jobs needed to poll data sources until expected data turned up. Logic like "poll this url every 10 minutes from 05:00-07:00, then fail the job if you're still getting a 404 at 07:00".

Heterogenous agent instances

Many processes required licensed software in order to run, although most did not.

Some processes required large amounts of RAM, some did not.

Because of this, we could not spin up a pool of homogenous EC2 instances (or similar) and have them just pick the next job from the queue. Different instances needed to subscribe to different job types.

What other solutions did/would we try?

In our original context, we immediately ruled out the standard web of micro-services communicating over HTTP. We knew up-front that we had some very slow-running operations not suitable for synchronous communication (although we did allow our initial UAT spike to run over simple HTTP while we figured out a better solution for go-live).

Our first attempt had batch jobs communicating over queues (SQS, in our case).

We very quickly discovered that:

  • Controlled retry of partially completed operations across several services was hard. In this case you tend to want to replay some but not all of your dead letter queue.
  • Managing the callbacks required when jobs started having several downstream dependents was mind-bending...
  • ...and dynamically adding downstream dependents to existing jobs in the queue was practically impossible
  • Transactionally checking whether a job already exists on a queue so we don't create duplicates is also not something queues are designed for
  • We found we wanted a central UI to monitor progress of the whole graph of operations, including which processes were waiting for which other processes
  • A common root problem we found: queues are not a good fit if we have a requirement to query or change items in the queue.
  • We were replicating the same logic across multiple systems to ensure idempotency and once-only execution of jobs, and to display what jobs were in progress

If we were working exclusively in Python, perhaps we would have tried luigi, but we did not discover it at the time (late 2013). Even then, it's notable that Erik's description of his ideal workflow manager matches Toshtogo reasonably closely.

We looked at Azkaban, but it didn't support dynamically defined job dependencies and seemed quite Hadoop-specific. We weren't using Hadoop at all.

A colleague also found Chronos, but:

  • It was too bleeding-edge for us at the time
  • We had no other requirement demanding the complexity of Mesos
  • It is solving a somewhat different problem
  • It had loads of other infrastructure dependencies we didn't want to take on

If we had started in 2014, we would have looked at airflow. Probably we would have had to work around the issue that Airflow does not support fully dynamic dependency graphs.

...also include Pinball

What is Toshtogo? (the long version)

Toshtogo helps you to manage batch jobs where jobs depend upon each other in acyclic directed graphs. Jobs are assumed to run across multiple process boundaries.

Toshtogo provides a JSON-based HTTP API for agents to request and complete work from a horizontally-scaling server, using a Postgres database for synchronisation. All clients are built over the top of this HTTP API. Toshtogo is completely agnostic to the technology choices of its clients.

Toshtogo offers:

  • Error reporting and controlled manual retry when jobs go wrong
  • Pausing and resuming jobs
  • Dynamically declaring dependencies between jobs. No need to define a dependency graph up-front.
  • Fungibility of jobs. Jobs can be created with a fungibility key (an arbitrary string, probably a url/uri). Toshtogo will ensure that if any job of the same type exists with the same fungibility key, the two jobs will be collapsed.
  • Ensuring jobs are only picked up by one agent at a time
  • Robust messaging between agents processing jobs and a central co-ordinating web service
    • Toshtogo servers can scale horizontally, relying on a Postgres database for coordination
    • If the server is unavailable, agents will retry with an exponential backoff until the server is available again.
    • If no agent is available to process a job at the time it is created, it will be queued until a suitable agent becomes available
  • Job request and response bodies are json, but there is no further coupling to what the output of a job is and where it is stored. Typically the json response from a job will contain a url to the output- in S3, on the file system, or in your own web services.
  • Scheduling jobs for execution at a particular time (including dynamically scheduling the retry of a job in 5 minutes time)

What is Toshtogo not?

Low latency

You should not use Toshtogo for any scheduling that requires latencies of under 5 seconds. Although we would hope that latency would be lower than that, latency is not something we put a lot of effort into.

Suitable for streaming and real-time operations

We had some streaming, realtime requirements, but those are out-of-scope for Toshtogo, which is primarily concerned with reliable coordination and execution of graphs of batch jobs.

Suitable for 100,000s of jobs a day

...yet.

Responsible for spinning up new instances

Toshtogo may provide information about how many of a particular job type are in the queue and how many agents are available to pick them up. But it won't directly create new EC2 instances, for example, as queues fill up.

Able to display dependencies until created by a downstream node

We support dynamically adding dependencies to jobs as they run. Hence, unlike Airflow, we cannot display the definitive graph of dependencies ahead of time.

If you really need to see the whole of the graph immediately, it is possible to create the full dependency graph in a single step. This does sacrifice some of the power of Toshtogo, and still doesn't guarantee that no further dependencies will be created down the line.

Dependencies

  • Postgres >9.1
  • Java >8

Design decisions

  • Minimal infrastructure dependencies (only Postgres and a JVM). No deploying ZooKeeper, for example.
  • Minimal complexity in server deployment- a normal, horizontally scaled web app with a database. Don't start mucking around with quorum algorithms and complex deployments for distributed synchronisation of jobs. Leave users to get to a robust deployment using well-understood techniques with existing technologies. RDS offers multi-AZ deployments of Postgres, as well as read replicas, if we really need them (we would need to separate out read and write connections in the server code)
  • Focus first on a clean protocol over HTTP or WebSockets, rather than a client library in any particular language. All client libraries should be simple wrappers over HTTP/WebSockets. For simple operations, maybe you won't even need a proper client- just make an HTTP call. We want to be truly cross-platform.
  • Devolve as much responsibility as possible to agents. Invert the usual control mechanism of the server creating and commanding workers and have workers instead initiate calling the server. This should drive out a tool that is hackable and extensible in directions we cannot anticipate now. The server should aim as far as possible to be a simple API providing features solely related to transactional coordination of a graph of jobs. Agents, rather than the server, should handle:
    • What type of data store/s does this job use and how should I connect?
    • What dependencies should this job have?
    • When should this job run?
    • Should this failing job be retried? How long should we wait?
    • What type of work do I, as an agent, want to do?
    • Authentication details for data sources are held in agents (we don't think hooks belong in the orchestrating server
    • How many agents are working on a particular job type?
    • When should I spin up more agents to cope with a spike in demand?
  • Decouple the server entirely from the choice of data store for job outputs. The core server code should know nothing about S3, Hadoop, email, whatever.
  • Initially, support JSON for cross-process communication. Although EDN is clearly superior, JSON is simple, well understood, very portable, and natively supported by Postgres. Definitely don't use pickling.
  • As far as possible, add features using a plugin architecture. Just like Atom does, even with core features like notifications and spell check. This forces us into a nicely decoupled design that's open for extension.
  • For all exposed API operations, support at-least once delivery of messages to idempotent endpoints (so clients can safely re-send any request on exception or failure to confirm receipt). On the server, use Postgres's guarantees around transactionality to ensure idempotency in the context of horizontally-scaling server and client architectures. This is by far the easiest-to-grok approach to concurrency we know of, if we can get away with it.
  • Structure the database such that operations are inserts, rather than updates. Do not support deletes (although probably we eventually want some way to archive data). This makes concurrency easier to reason about- we never hit upsert problems, concurrency guarantees are as simple as using primary keys to prevent duplicate inserts, our code can be written on the assumption that most data is immutable. Exceptions can be made for absolutely necessary optimisations, but there should be very few of these.
  • We may well provide libraries of specific job types like AirFlow operators (copy files in S3, run SQL, etc.). But we won't ever build these in to server code.

## Something about versioning agents

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment