Skip to content

Instantly share code, notes, and snippets.

@leonoel
Created February 18, 2019 20:17
Show Gist options
  • Save leonoel/c72a3f57885c9108203c49dad3142292 to your computer and use it in GitHub Desktop.
Save leonoel/c72a3f57885c9108203c49dad3142292 to your computer and use it in GitHub Desktop.
happy eyeballs
{:paths ["."]
:deps {missionary {:mvn/version "a.3"}}}
(ns happyeyeballs
(:require [missionary.core :as m])
(:import (java.net InetAddress InetSocketAddress)
(java.nio.channels AsynchronousSocketChannel CompletionHandler)
(java.util.concurrent.atomic AtomicBoolean)))
(defn happyeyeballs
"Tries to establish a connection to a host associated with multiple endpoints, following the Happy Eyeballs protocol.
Given a sequence of tasks performing the connection to each endpoint, returns a task sequentially performing connection
attempts, staggered by given delay (in milliseconds). The first connector is tried, then if it's still pending after
staggering delay or when it fails the second connector is tried, and so on until the connector sequence is exhausted.
The first successful connection attempt triggers cancellation of other pending connection attempts and makes the whole
task terminate with returned socket. If another connection succeeds in the interim, returned sockets are immediately
closed with given close! function. If every attempt fails, the whole task fails.
Cancelling the task triggers cancellation of all pending connection attempts."
[delay close! connectors]
(m/sp
(try
(let [chosen (m/dfv)]
(m/? ((fn attempt [cs]
(if cs
(let [try-next (m/dfv)]
(m/race
(m/sp (try (let [x (m/? (first cs))
y (chosen x)]
(when-not (identical? x y) (close! x)) y)
(catch Throwable e
(m/?)
(try-next nil)
(throw e))))
(m/sp (m/? (m/sleep delay))
(try-next nil)
(m/? m/never))
(m/sp (m/? try-next)
(m/? (attempt (next cs))))))
(m/race))) (seq connectors))))
(catch Throwable _
(throw (ex-info "Unable to reach target." {:connectors connectors}))))))
(defn connector
"Given an InetAddress and a port (int), returns a task connecting to this endpoint and returning a connected
AsynchronousSocketChannel if successful."
[^InetAddress addr port]
(fn [s! f!]
(let [channel (AsynchronousSocketChannel/open)
pending (AtomicBoolean. true)]
(.connect channel (InetSocketAddress. addr (int port)) nil
(reify CompletionHandler
(completed [_ _ _] (.set pending false) (s! channel))
(failed [_ _ e] (.set pending false) (f! e))))
#(when (.getAndSet pending false) (.close channel)))))
(defn close!
"Closes given AsynchronousSocketChannel."
[^AsynchronousSocketChannel channel]
(.close channel))
(defn happy-connect!!
"Connects to given host/port, synchronously returning an AsynchronousSocketChannel."
[^String host port]
(m/? (happyeyeballs 300 close! (map #(connector % port) (InetAddress/getAllByName host)))))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment