Skip to content

Instantly share code, notes, and snippets.

@wycats
Created February 17, 2010 07:38
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save wycats/a92eec37c651a804f642 to your computer and use it in GitHub Desktop.
Save wycats/a92eec37c651a804f642 to your computer and use it in GitHub Desktop.

The Pipeline Pattern

This pattern is useful to expose an endpoint with a small number of methods (usually one) that can be trivially wrapped by additional functionality through middleware. Its structure provides a concurrency-friendly architecture with minimal conceptual overhead by creators of middleware.

Its structure is similar to Unix pipes, which provide a single interface to run commands, and then allow other programs to utilize the same interface to modify the inputs and outputs in some way.

Motivation: Dynamic Web Requests

A dynamic web server receives requests from a client (such as a web browser) and sends those requests to an application. When building servers or applications, it is convenient to abstract away the interface between the two, such that a server sends a standard request environment and receives a standard response.

The first such abstraction, CGI, used environment variables (such as PATH_INFO) to communicate information about the request, and expected to receive the response through the standard output. This abstraction, implemented in the Apache web server, allowed programmers to write web applications in any language, using any technique, so long as they followed the contract. However, CGI’s reliance on global state (such as environment variables and standard output) resulted in a new process for each request, which made it impossible for application to leverage in-process caches and incurred the overhead of starting a process with each request.

Because CGI used the standard Unix process model, it would have been possible to implement middleware using standard Unix piping semantics. However, that middleware would have been restricted to techniques using inter-process communication. Some of the problems with CGI were solved with FastCGI, which implements a similar pattern but allows for persistent processes and multiple concurrent requests. However, because FastCGI uses sockets for communication, FastCGI middleware would also be limited to IPC communication.

In order to get around these restrictions, a number of languages have created an in-process version of this abstraction, such as the Servlet API in Java, WSGI in Python, and Rack in Ruby. The Ruby version of this, Rack, was motivated by the forces that motivate this pattern.

In particular, it should be simple to take an endpoint, wrap it using some technique, creating an object that satisfies the same interface as the original endpoint. It should be simple to do this repeatably, via configuration, to produce an endpoint for the server composing a number of aspects that need not be aware of each other.

It should also be simple to inject dependencies into the application, and the application should be concurrency friendly by default, without any additional action required by the application or middleware developer unless they opt into their own concurrency concerns.

Participants

The two primary participants in a pipeline are the Requestor and the Endpoint. The Requestor is responsible for formulating a request, which it sends to the Endpoint. Once the Endpoint has serviced the request, it returns a response to the Requestor. The request itself should be a primitive data structure, such as a Dictionary or Array, with String keys and any values. The Endpoint may specify some expected keys and the expected values for those keys. The Endpoint should be stateless, storing any state directly in the environment. This means that the Requestor may send multiple concurrent requests to the same Endpoint without needing special concurrency semantics.

Because the Endpoint expects to receive a valid request, and the Requestor expects to send the request to a valid Endpoint, it is possible to place an object in between that receives the request, forwards it on to the Endpoint, and then forwards the response to the Requestor. From the Requestor’s perspective, this object is an Endpoint. From the Endpoint’s perspective, this object is a Requestor. For the purposes of this pattern, we can call this object a Middleware.

Because the object serves as an Endpoint proxy, it should also be stateless, storing any state on the environment. This means that the new Endpoint, composed of the original Endpoint and the Middleware, has the same concurrency semantics as a bare Endpoint, allowing a Requestor to send multiple requests to the Middleware without needing special concurrency semantics.

Implementation and Sample Code

The idea of Middleware falls naturally out of the existence of a clear contract between a Requestor and an Endpoint. In order to automate the process of composing an Endpoint and a number of Middlewares, a Middleware should be Factory that receives an Endpoint and returns a new composed Endpoint. In a language with first-class classes, such as Ruby, the Middleware class itself can serve as its own Factory. Middleware Factories must take an Endpoint as the first parameter, but may take additional parameters in order to

Further, a library for the Pipeline may provide a builder that allows for automatic composition of an Endpoint and a number of Middlewares. This would be provided as a convenience, and is a consequence of the Pipeline Pattern, not a requirement of it.

The following is an example implementation of a Pipeline for making database queries, written in Ruby. In this case, the Endpoint expects the Requestor to send it an environment containing the connection and query string, and the Requestor expects the Endpoint to return a ResultSet. It also shows an example Middleware that logs the amount of time taken by the query. It assumes a Query class that takes a Connection and query string in its initializer, and returns a ResultSet.

class QueryEndpoint
  def query(env)
    Query.new(env["query.connection"], env["query.query_string"]).select
  end
end

class LogMiddleware
  def initialize(endpoint, logger)
    @endpoint = endpoint
    @logger   = logger
  end

  def query(env)
    result = nil
    time = Benchmark.measure { result = @endpoint.query(env) }.real
    logger.info("Query #{env["query.query_string"].inspect} took #{time}s")
    return result
  end
end

class Requestor
  def initialize(endpoint, connection_pool)
    @endpoint = endpoint
    @connection_pool = connection_pool
  end

  def query(string)
    @connection_pool.with_connection do |connection|
      return @endpoint.query("query.connection"   => connection,
                             "query.query_string" => string)
    end
  end
end

endpoint  = LogMiddleware.new(QueryEndpoint.new, Logger.new(STDOUT))
endpoint  = TimeoutMiddleware.new(endpoint, 5)
requestor = Requestor.new(endpoint)
result_set = requestor.query("select * from patterns")

An example of an Endpoint Builder and its usage for the above example follows.

class Builder
  def self.configure(endpoint, &block)
    new(endpoint).tap do |builder|
      builder.instance_eval(&block)
    end
  end

  def initialize(endpoint)
    @endpoint    = endpoint
    @middlewares = middlewares
  end

  def use(middleware_class, *args)
    @middlewares << [middleware_class, args]
  end

  def build
    endpoint = @endpoint
    @middlewares.reverse_each do |middleware, args|
      endpoint = middleware.new(endpoint, *args)
    end
    endpoint
  end
end

endpoint = Builder.configure(QueryEndpoint.new) do
  use LogMiddleware, Logger.new(STDOUT)
  use TimeoutMiddleware, 5
end.build

requestor = Requestor.new(endpoint)
requestor.query("select * from patterns*")

Consequences

In a dynamic language, because the environment is a primitive such as a Dictionary, it’s common to see informal contracts form in terms of interactions between Middlewares or between a Middleware and an Endpoint. For instance, the Ruby community has formalized a rack.session key that provides a Dictionary-like object containing the user’s session. That key is populated by a Middleware, not the Requestor, which needs only provide the keys specified by the formal contract. However, this means that an Endpoint configured as part of an application that includes a Session middleware can expect to have access to the user’s session. Because the session key is defined in terms of a contract, different session strategies may be freely swapped out by configuring the application to use a different Middleware populating the rack.session key in accordance with the informal contract.

This also provides a lightweight form of Dependency Injection, without the overhead of needing to define the kinds of dependencies to be injected ahead of time. If a particular kind of dependency is useful, a community of users can informally decide on the dependency’s contract, possibly to be formalized as part of the Endpoint’s API if it achieves enough use. When used for Dependency Injection, Middlewares work with a cooperating Endpoint. This stands in contrast with the typical use of middleware, which allow a developer to create enhanced Endpoints that provide more than the bare minimum required by the Requestor/Endpoint contract. This enables additional functionality without any changes to the Requestor or Endpoint themselves, such as stats collection or logging.

Additionally, Middlewares can be used as routers, choosing between a number of Endpoints on the basis on the elements in the Environment. For instance, a middleware in the Rack pipeline might choose between several Endpoints on the basis of elements of the request environment. From the perspective of the Requestor, this router is still a valid Endpoint, and from the perspective of the Endpoint, the router is a valid Requestor. Like traditional Middleware, routing Middleware can work with unaltered Requestor and Endpoint objects. Because a Router also serves as a valid Endpoint and Requestor, a Router can also route to another Router. The fact that a Router delegates to another Router is completely transparent. From the perspective of the outer router, the inner Router is simply a valid Endpoint, and from the perspective of the inner Router, the outer Router is simply a valid Requestor.

Finally, because Endpoints (including those composed of an Endpoint and several Middlewares) are stateless, a Requestor can send many requests to a given Endpoint simultaneously without a special concurrency strategy. Each element in the Pipeline makes any state modifications in the request environment, which is not shared across threads. In short, the request environment is used in a single thread only, and the Endpoints share no global state, so the Pipeline has a built-in concurrency strategy that is free of locks or other traditional tools to manage concurrency. Of course, if elements of a Pipeline wish to share state across requests (an uncommon scenario), they must use language-appropriate tools to manage the concurrency.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment