Skip to content

Instantly share code, notes, and snippets.

@jrudolph
Forked from sirthias/API-proposal.scala
Last active August 29, 2015 14:08
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jrudolph/43151f78de51c687018d to your computer and use it in GitHub Desktop.
Save jrudolph/43151f78de51c687018d to your computer and use it in GitHub Desktop.
object Tcp {
trait ConnectionDescriptor {
def remoteAddress: InetSocketAddress
def localAddress: InetSocketAddress
}
sealed trait ConnectionFlow extends Flow[ByteString, ByteString] {
def remoteAddress(mMap: MaterializedMap): InetSocketAddress
def localAddress(mMap: MaterializedMap): InetSocketAddress
def handleWith(handler: Flow[ByteString, ByteString]): RunnableFlow
}
// passive server-side bind (sugar for the most frequent case)
// The user-specified flow is materialized once per accepted connection.
// Acceptance can be throttled by applying back-pressure to the produced `Source[ConnectionDescriptor] `
def bind(endpoint: InetSocketAddress, ...)
(handler: Flow[ByteString, ByteString]): Source[ConnectionDescriptor] =
generalBind(endpoint, ...)(_ handleWith handler)
// active or passive server-side,
// assumes availability of an akka-stream level vehicle for getting
// access to the MaterializationMap from within the stream
// (https://github.com/akka/akka/issues/16168)
def generalBind(endpoint: InetSocketAddress, ...)
(handler: ConnectionFlow => RunnableFlow): Source[ConnectionDescriptor]
// active or passive client-side
// The ConnectionFlow result is produced synchronously but the actual connection
// is not attempted before the flow is materialized on the user side.
def connectionFlow(remoteAddress: InetSocketAddress, ...): ConnectionFlow
}
// example API usage
object Echo {
val echoFlow = Flow[ByteString].map(identity)
// active client-side (the usual mode),
// i.e. the client connects and starts the conversation
def activeClient(remoteAddress: InetSocketAddress): RunnableFlow =
Source(...).connect(Tcp.connectionFlow(addr)).connect(ForeachSink(println))
// passive server-side (the usual mode),
// i.e. the client connects and starts the conversation
def passiveServer(endpoint: InetSocketAddress): Source[ConnectionDescriptor] =
Tcp.bind(endpoint, echoFlow)
// passive client-side (the unusual mode),
// i.e. the client connects but the server starts the conversation
def passiveClient(remoteAddress: InetSocketAddress): RunnableFlow =
Tcp.connectionFlow(remoteAddress).handleWith(echoFlow)
// active server-side (the unusual mode),
// i.e. the client connects but the server starts the conversation
def activeServer(endpoint: InetSocketAddress): Source[ConnectionDescriptor] =
Tcp.generalBind(endpoint) { connectionFlow =>
Source(...).connect(connectionFlow).connect(ForeachSink(println))
}
}
object SSL {
def upgradeClientSide(connectionFlow: ConnectionFlow, ...): ConnectionFlow
def upgradeServerSide(connectionFlow: ConnectionFlow, ...): ConnectionFlow
}
object Http {
// server-side
def serverFlowToTransport(serverFlow: Flow[HttpRequest, HttpResponse]): Flow[ByteString, ByteString]
def bind(endpoint: InetSocketAddress, ...)
(handler: Flow[HttpRequest, HttpResponse]): Source[ConnectionDescriptor] =
Tcp.bind(endpoint)(serverFlowToTransport(handler))
// client-side
// (assumes the availability of a context-providing stream combinator for 1-to-1 streams)
trait ClientFlow extends Flow[HttpRequest, HttpResponse] {
def remoteAddress(mMap: MaterializedMap): InetSocketAddress
def localAddress(mMap: MaterializedMap): InetSocketAddress
}
def transportToClientFlow(transport: Flow[ByteString, ByteString]): ClientFlow
def clientFlow(remoteAddress: InetSocketAddress, ...): ClientFlow =
transportToClientFlow(Tcp.connectionFlow(remoteAddress))
}
object HttpExample {
// simple server
Http.bind(...) {
Flow[HttpRequest].map {
case HttpRequest(...) => HttpResponse(...)
}
}
// simple server with ssl
Tcp.bind(...) {
SSL.upgradeServerSide {
Http.serverFlowToTransport {
Flow[HttpRequest].map {
case HttpRequest(...) => HttpResponse(...)
}
}
}
}
// simple client flow (can be reused, but needs to be run)
Source.singleton(HttpRequest("ping"))
.connect(Http.clientFlow(address))
.connect(ForeachSink(println)): RunnableFlow
// simple client flow with SSL (can be reused, but needs to be run)
Source.singleton(HttpRequest("ping"))
.connect {
Http.transportToClientFlow {
SSL.upgradeClientSide {
Tcp.connectionFlow(address)
}
}
}
.connect(ForeachSink(println)): RunnableFlow
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment