Skip to content

Instantly share code, notes, and snippets.

@ysbaddaden
Created June 9, 2016 08:59
Show Gist options
  • Save ysbaddaden/80cf59b918c1b0d913c546c9520732a8 to your computer and use it in GitHub Desktop.
Save ysbaddaden/80cf59b918c1b0d913c546c9520732a8 to your computer and use it in GitHub Desktop.
Asynchronous DNS resolver for Crystal (call getaddrinfo in thread + socket pair communication)
# # Asynchronous DNS resolver for Crystal (sort of)
#
# It consists on starting a thread that will run the blocking `getaddrinfo`
# syscall, thus allowing the main thread —the one running the event loop— to not
# stop while resolving domains, which can take from milliseconds to seconds.
# Communication with the thread is achieved through a socket pair, one for the
# main thread, and one for the resolver thread. Since we rely on socket, the
# event-loop will pause or resume fibers accordingly to messages sent or
# received from the socket.
#
# Since it still relies on the `getaddrinfo` syscall, it will allow the
# specifities of each libc, thus allowing mDNS, NSSwitch extensions or per
# domain resolvers (Darwin) among other things, that a custom DNS resolver
# usually won't do (on purpose).
#
# FIXME: Reliability!
#
# The protocol isn't reliable. It's highly possible that it will eventually
# become corrupt (eg: all responses or requests may not be fully written/read
# because an error occured). Since there is nothing to check the message
# integrity the resolver may start resolving and returning garbage, when it
# should instead cleanup, restart a worker and a socket pair.
#
# Messages should be sent in one chunk, with a header that would tell the
# payload length and an integrity check (eg: HMAC of the payload).
#
# The socket pair (a UNIX socket) could also be replaced for UDP sockets. It may
# not be required to have a pseudo header anymore: just sendto/recvfrom the
# payload.
#
# TODO: Thread Pool
#
# Some applications may not need a DNS resolver at all. We should thus start the
# thread on demand. Alternatively some applications may require a lot of
# requests to be processed, and we should thus start a configurable number of
# threads to resolve domains in parallel.
#
#
# ## Communication Protocol
#
# ### Request:
#
# +---------------------------------------------------------------+
# | Family (32) |
# +---------------------------------------------------------------+
# | Socktype (32) |
# +---------------------------------------------------------------+
# | Protocol (32) |
# +---------------------------------------------------------------+
# | Flags (32) |
# +---------------------------------------------------------------+
# | Length (8) |
# +===============================================================+
# | Hostname (Length) ...
# +---------------------------------------------------------------+
#
# ### Response
#
# In order to read a response, one must first read the first 4 bytes, and
# check whether the resulting number is positive (addrinfo family), equal to
# zero (DONE) or negative (ERROR).
#
# The request must read all addrinfo responses until a DONE or ERROR frame is
# received. When an ERROR frame is received, the code is the `EAI` error
# code, and can be passed to `gai_strerror` for a human readable message.
#
# ADDRINFO:
#
# The `addr` is the size of `Sockaddr` by default, or the size of
# `SockaddrIn6` if Family is `AF_INET6`.
#
# +---------------------------------------------------------------+
# | Family (32) |
# +---------------------------------------------------------------+
# | Socktype (32) |
# +---------------------------------------------------------------+
# | Protocol (32) |
# +---------------------------------------------------------------+
# | Flags (32) |
# +---------------------------------------------------------------+
# | Addrlen (32) |
# +===============================================================+
# | Addr (...) ...
# +---------------------------------------------------------------+
#
# DONE:
#
# +---------------------------------------------------------------+
# | Code (32) = 0 |
# +---------------------------------------------------------------+
#
# ERROR:
#
# +---------------------------------------------------------------+
# | Code (32) < 0 |
# +---------------------------------------------------------------+
require "socket"
require "thread"
class DNS
class Error < Exception
enum Code
AGAIN = LibC::EAI_AGAIN
BADFLAGS = LibC::EAI_BADFLAGS
FAIL = LibC::EAI_FAIL
FAMILY = LibC::EAI_FAMILY
MEMORY = LibC::EAI_MEMORY
NONAME = LibC::EAI_NONAME
SERVICE = LibC::EAI_SERVICE
SOCKTYPE = LibC::EAI_SOCKTYPE
SYSTEM = LibC::EAI_SYSTEM
OVERFLOW = LibC::EAI_OVERFLOW
end
getter code : Code
def initialize(code)
@code = Code.new(code)
message = String.new(LibC.gai_strerror(code))
super "EAI_#{@code}: #{message}"
end
end
class Worker
def self.run(fd) : Worker
new(fd).tap(&.run)
end
def initialize(@fd : Int32)
end
def run
Thread.new do
loop do
begin
hints, hostname = read_request
getaddrinfo(hints, hostname) { |message| write(message) }
rescue ex
puts "THREAD ERROR: #{ex.message} (#{ex.class.name})\n"
end
end
end
end
private def getaddrinfo(hints, host)
code = LibC.getaddrinfo(host, nil, pointerof(hints), out addrinfo)
begin
case
when 0
ai = addrinfo
until ai.null?
yield ai.value
ai = ai.value.ai_next
end
yield 0 # done
when LibC::EAI_SYSTEM
raise Errno.new("getaddrinfo")
else # EAI_*
yield code
end
ensure
LibC.freeaddrinfo(addrinfo)
end
end
private def read_request
hints = LibC::Addrinfo.new
hints.ai_family = read_int32
hints.ai_socktype = read_int32
hints.ai_protocol = read_int32
hints.ai_flags = read_int32
{hints, String.new(read(read_byte))}
end
private def read_byte
read(1).first
end
private def read_int32
read(4).to_unsafe.as(Int32*).value
end
private def read(n)
buf = Slice(UInt8).new(n)
read = 0
while read < n
ptr = (buf.to_unsafe + read).as(Void*)
case ret = LibC.read(@fd, ptr, n)
when -1
raise Errno.new("read")
else
read += ret
end
end
buf
end
private def write(addrinfo : LibC::Addrinfo)
write(addrinfo.ai_family)
write(addrinfo.ai_socktype)
write(addrinfo.ai_protocol)
write(addrinfo.ai_flags)
write(addrinfo.ai_addrlen)
if addrinfo.ai_family == LibC::AF_INET6
write(addrinfo.ai_addr.as(UInt8*).to_slice(sizeof(LibC::SockaddrIn6)))
else
write(addrinfo.ai_addr.as(UInt8*).to_slice(addrinfo.ai_addrlen))
end
end
private def write(message : Int32 | UInt32)
LibC.write @fd, pointerof(message).as(Void*), 4
end
private def write(message : UInt8)
write Slice(UInt8).new(1) { message.to_u8 }
end
private def write(message : String)
LibC.write(@fd, message.to_unsafe.as(Void*), message.bytesize)
end
private def write(message : Slice(UInt8))
LibC.write(@fd, message.to_unsafe.as(Void*), message.size)
end
end
@socket : UNIXSocket
@__thread_socket : UNIXSocket # memoized so GC won't collect and close it
def initialize
@socket, @__thread_socket = UNIXSocket.pair
@__thread_socket.blocking = true
@worker = Worker.run(@__thread_socket.fd)
end
def getaddrinfo(hints, host)
send_request(hints, host)
addrinfos = [] of LibC::Addrinfo
loop do
code = @socket.read_bytes(Int32)
case code <=> 0
when 1
addrinfos << read_addrinfo(code)
when 0
break
else
Error.new(code)
end
end
addrinfos
end
private def send_request(hints, host)
@socket.write_bytes(hints.ai_family)
@socket.write_bytes(hints.ai_socktype)
@socket.write_bytes(hints.ai_protocol)
@socket.write_bytes(hints.ai_flags)
@socket.write_byte(host.bytesize.to_u8)
@socket.write(host.to_slice)
end
private def read_addrinfo(family)
addrinfo = LibC::Addrinfo.new
addrinfo.ai_family = family
addrinfo.ai_socktype = @socket.read_bytes(Int32)
addrinfo.ai_protocol = @socket.read_bytes(Int32)
addrinfo.ai_flags = @socket.read_bytes(Int32)
addrinfo.ai_addrlen = @socket.read_bytes(Int32)
if family == LibC::AF_INET6
addr = Slice(UInt8).new(sizeof(LibC::SockaddrIn6))
else
addr = Slice(UInt8).new(addrinfo.ai_addrlen)
end
@socket.read_fully(addr)
addrinfo.ai_addr = addr.to_unsafe.as(LibC::Sockaddr*)
addrinfo
end
end
dns = DNS.new
puts "PID: #{Process.pid}"
loop do |i|
hints = LibC::Addrinfo.new
hints.ai_family = LibC::AF_INET
hints.ai_socktype = LibC::SOCK_STREAM
hints.ai_protocol = LibC::IPPROTO_TCP
addrinfos = dns.getaddrinfo(hints, "localhost")
unless addrinfos.size == 1
raise Exception.new("expected 1 addrinfo but got #{addrinfos.size}")
end
addrinfo = addrinfos.first
addr = addrinfo.ai_addr.as(LibC::SockaddrIn*).value
unless addr.sin_addr.s_addr == 16777343
raise Exception.new("expected addr: 16777343 but got #{addr.sin_addr.s_addr}")
end
end
#puts "====="
#hints = LibC::Addrinfo.new
#hints.ai_family = LibC::AF_UNSPEC
#dns.getaddrinfo(hints, "localhost").each do |addrinfo|
# p addrinfo
#end
#
#puts "====="
#hints = LibC::Addrinfo.new
#hints.ai_family = LibC::AF_INET6
#dns.getaddrinfo(hints, "ip6-localhost").each do |addrinfo|
# p addrinfo
#end
#
#puts "====="
#hints = LibC::Addrinfo.new
#hints.ai_family = LibC::AF_INET
#hints.ai_socktype = LibC::SOCK_STREAM
#hints.ai_protocol = LibC::IPPROTO_TCP
#dns.getaddrinfo(hints, "localhost").each do |addrinfo|
# p addrinfo
#end
@ysbaddaden
Copy link
Author

Boehm GC behaves correctly: memory usage is constant when issuing requests in a tight loop. There aren't much to do anyway: within the thread, we merely create a C struct (which lives in the HEAP?) and manually free the struct pointer returned by getaddrinfo.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment