Created
June 9, 2016 08:59
-
-
Save ysbaddaden/80cf59b918c1b0d913c546c9520732a8 to your computer and use it in GitHub Desktop.
Asynchronous DNS resolver for Crystal (call getaddrinfo in thread + socket pair communication)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# # Asynchronous DNS resolver for Crystal (sort of) | |
# | |
# It consists on starting a thread that will run the blocking `getaddrinfo` | |
# syscall, thus allowing the main thread —the one running the event loop— to not | |
# stop while resolving domains, which can take from milliseconds to seconds. | |
# Communication with the thread is achieved through a socket pair, one for the | |
# main thread, and one for the resolver thread. Since we rely on socket, the | |
# event-loop will pause or resume fibers accordingly to messages sent or | |
# received from the socket. | |
# | |
# Since it still relies on the `getaddrinfo` syscall, it will allow the | |
# specifities of each libc, thus allowing mDNS, NSSwitch extensions or per | |
# domain resolvers (Darwin) among other things, that a custom DNS resolver | |
# usually won't do (on purpose). | |
# | |
# FIXME: Reliability! | |
# | |
# The protocol isn't reliable. It's highly possible that it will eventually | |
# become corrupt (eg: all responses or requests may not be fully written/read | |
# because an error occured). Since there is nothing to check the message | |
# integrity the resolver may start resolving and returning garbage, when it | |
# should instead cleanup, restart a worker and a socket pair. | |
# | |
# Messages should be sent in one chunk, with a header that would tell the | |
# payload length and an integrity check (eg: HMAC of the payload). | |
# | |
# The socket pair (a UNIX socket) could also be replaced for UDP sockets. It may | |
# not be required to have a pseudo header anymore: just sendto/recvfrom the | |
# payload. | |
# | |
# TODO: Thread Pool | |
# | |
# Some applications may not need a DNS resolver at all. We should thus start the | |
# thread on demand. Alternatively some applications may require a lot of | |
# requests to be processed, and we should thus start a configurable number of | |
# threads to resolve domains in parallel. | |
# | |
# | |
# ## Communication Protocol | |
# | |
# ### Request: | |
# | |
# +---------------------------------------------------------------+ | |
# | Family (32) | | |
# +---------------------------------------------------------------+ | |
# | Socktype (32) | | |
# +---------------------------------------------------------------+ | |
# | Protocol (32) | | |
# +---------------------------------------------------------------+ | |
# | Flags (32) | | |
# +---------------------------------------------------------------+ | |
# | Length (8) | | |
# +===============================================================+ | |
# | Hostname (Length) ... | |
# +---------------------------------------------------------------+ | |
# | |
# ### Response | |
# | |
# In order to read a response, one must first read the first 4 bytes, and | |
# check whether the resulting number is positive (addrinfo family), equal to | |
# zero (DONE) or negative (ERROR). | |
# | |
# The request must read all addrinfo responses until a DONE or ERROR frame is | |
# received. When an ERROR frame is received, the code is the `EAI` error | |
# code, and can be passed to `gai_strerror` for a human readable message. | |
# | |
# ADDRINFO: | |
# | |
# The `addr` is the size of `Sockaddr` by default, or the size of | |
# `SockaddrIn6` if Family is `AF_INET6`. | |
# | |
# +---------------------------------------------------------------+ | |
# | Family (32) | | |
# +---------------------------------------------------------------+ | |
# | Socktype (32) | | |
# +---------------------------------------------------------------+ | |
# | Protocol (32) | | |
# +---------------------------------------------------------------+ | |
# | Flags (32) | | |
# +---------------------------------------------------------------+ | |
# | Addrlen (32) | | |
# +===============================================================+ | |
# | Addr (...) ... | |
# +---------------------------------------------------------------+ | |
# | |
# DONE: | |
# | |
# +---------------------------------------------------------------+ | |
# | Code (32) = 0 | | |
# +---------------------------------------------------------------+ | |
# | |
# ERROR: | |
# | |
# +---------------------------------------------------------------+ | |
# | Code (32) < 0 | | |
# +---------------------------------------------------------------+ | |
require "socket" | |
require "thread" | |
class DNS | |
class Error < Exception | |
enum Code | |
AGAIN = LibC::EAI_AGAIN | |
BADFLAGS = LibC::EAI_BADFLAGS | |
FAIL = LibC::EAI_FAIL | |
FAMILY = LibC::EAI_FAMILY | |
MEMORY = LibC::EAI_MEMORY | |
NONAME = LibC::EAI_NONAME | |
SERVICE = LibC::EAI_SERVICE | |
SOCKTYPE = LibC::EAI_SOCKTYPE | |
SYSTEM = LibC::EAI_SYSTEM | |
OVERFLOW = LibC::EAI_OVERFLOW | |
end | |
getter code : Code | |
def initialize(code) | |
@code = Code.new(code) | |
message = String.new(LibC.gai_strerror(code)) | |
super "EAI_#{@code}: #{message}" | |
end | |
end | |
class Worker | |
def self.run(fd) : Worker | |
new(fd).tap(&.run) | |
end | |
def initialize(@fd : Int32) | |
end | |
def run | |
Thread.new do | |
loop do | |
begin | |
hints, hostname = read_request | |
getaddrinfo(hints, hostname) { |message| write(message) } | |
rescue ex | |
puts "THREAD ERROR: #{ex.message} (#{ex.class.name})\n" | |
end | |
end | |
end | |
end | |
private def getaddrinfo(hints, host) | |
code = LibC.getaddrinfo(host, nil, pointerof(hints), out addrinfo) | |
begin | |
case | |
when 0 | |
ai = addrinfo | |
until ai.null? | |
yield ai.value | |
ai = ai.value.ai_next | |
end | |
yield 0 # done | |
when LibC::EAI_SYSTEM | |
raise Errno.new("getaddrinfo") | |
else # EAI_* | |
yield code | |
end | |
ensure | |
LibC.freeaddrinfo(addrinfo) | |
end | |
end | |
private def read_request | |
hints = LibC::Addrinfo.new | |
hints.ai_family = read_int32 | |
hints.ai_socktype = read_int32 | |
hints.ai_protocol = read_int32 | |
hints.ai_flags = read_int32 | |
{hints, String.new(read(read_byte))} | |
end | |
private def read_byte | |
read(1).first | |
end | |
private def read_int32 | |
read(4).to_unsafe.as(Int32*).value | |
end | |
private def read(n) | |
buf = Slice(UInt8).new(n) | |
read = 0 | |
while read < n | |
ptr = (buf.to_unsafe + read).as(Void*) | |
case ret = LibC.read(@fd, ptr, n) | |
when -1 | |
raise Errno.new("read") | |
else | |
read += ret | |
end | |
end | |
buf | |
end | |
private def write(addrinfo : LibC::Addrinfo) | |
write(addrinfo.ai_family) | |
write(addrinfo.ai_socktype) | |
write(addrinfo.ai_protocol) | |
write(addrinfo.ai_flags) | |
write(addrinfo.ai_addrlen) | |
if addrinfo.ai_family == LibC::AF_INET6 | |
write(addrinfo.ai_addr.as(UInt8*).to_slice(sizeof(LibC::SockaddrIn6))) | |
else | |
write(addrinfo.ai_addr.as(UInt8*).to_slice(addrinfo.ai_addrlen)) | |
end | |
end | |
private def write(message : Int32 | UInt32) | |
LibC.write @fd, pointerof(message).as(Void*), 4 | |
end | |
private def write(message : UInt8) | |
write Slice(UInt8).new(1) { message.to_u8 } | |
end | |
private def write(message : String) | |
LibC.write(@fd, message.to_unsafe.as(Void*), message.bytesize) | |
end | |
private def write(message : Slice(UInt8)) | |
LibC.write(@fd, message.to_unsafe.as(Void*), message.size) | |
end | |
end | |
@socket : UNIXSocket | |
@__thread_socket : UNIXSocket # memoized so GC won't collect and close it | |
def initialize | |
@socket, @__thread_socket = UNIXSocket.pair | |
@__thread_socket.blocking = true | |
@worker = Worker.run(@__thread_socket.fd) | |
end | |
def getaddrinfo(hints, host) | |
send_request(hints, host) | |
addrinfos = [] of LibC::Addrinfo | |
loop do | |
code = @socket.read_bytes(Int32) | |
case code <=> 0 | |
when 1 | |
addrinfos << read_addrinfo(code) | |
when 0 | |
break | |
else | |
Error.new(code) | |
end | |
end | |
addrinfos | |
end | |
private def send_request(hints, host) | |
@socket.write_bytes(hints.ai_family) | |
@socket.write_bytes(hints.ai_socktype) | |
@socket.write_bytes(hints.ai_protocol) | |
@socket.write_bytes(hints.ai_flags) | |
@socket.write_byte(host.bytesize.to_u8) | |
@socket.write(host.to_slice) | |
end | |
private def read_addrinfo(family) | |
addrinfo = LibC::Addrinfo.new | |
addrinfo.ai_family = family | |
addrinfo.ai_socktype = @socket.read_bytes(Int32) | |
addrinfo.ai_protocol = @socket.read_bytes(Int32) | |
addrinfo.ai_flags = @socket.read_bytes(Int32) | |
addrinfo.ai_addrlen = @socket.read_bytes(Int32) | |
if family == LibC::AF_INET6 | |
addr = Slice(UInt8).new(sizeof(LibC::SockaddrIn6)) | |
else | |
addr = Slice(UInt8).new(addrinfo.ai_addrlen) | |
end | |
@socket.read_fully(addr) | |
addrinfo.ai_addr = addr.to_unsafe.as(LibC::Sockaddr*) | |
addrinfo | |
end | |
end | |
dns = DNS.new | |
puts "PID: #{Process.pid}" | |
loop do |i| | |
hints = LibC::Addrinfo.new | |
hints.ai_family = LibC::AF_INET | |
hints.ai_socktype = LibC::SOCK_STREAM | |
hints.ai_protocol = LibC::IPPROTO_TCP | |
addrinfos = dns.getaddrinfo(hints, "localhost") | |
unless addrinfos.size == 1 | |
raise Exception.new("expected 1 addrinfo but got #{addrinfos.size}") | |
end | |
addrinfo = addrinfos.first | |
addr = addrinfo.ai_addr.as(LibC::SockaddrIn*).value | |
unless addr.sin_addr.s_addr == 16777343 | |
raise Exception.new("expected addr: 16777343 but got #{addr.sin_addr.s_addr}") | |
end | |
end | |
#puts "=====" | |
#hints = LibC::Addrinfo.new | |
#hints.ai_family = LibC::AF_UNSPEC | |
#dns.getaddrinfo(hints, "localhost").each do |addrinfo| | |
# p addrinfo | |
#end | |
# | |
#puts "=====" | |
#hints = LibC::Addrinfo.new | |
#hints.ai_family = LibC::AF_INET6 | |
#dns.getaddrinfo(hints, "ip6-localhost").each do |addrinfo| | |
# p addrinfo | |
#end | |
# | |
#puts "=====" | |
#hints = LibC::Addrinfo.new | |
#hints.ai_family = LibC::AF_INET | |
#hints.ai_socktype = LibC::SOCK_STREAM | |
#hints.ai_protocol = LibC::IPPROTO_TCP | |
#dns.getaddrinfo(hints, "localhost").each do |addrinfo| | |
# p addrinfo | |
#end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Boehm GC behaves correctly: memory usage is constant when issuing requests in a tight loop. There aren't much to do anyway: within the thread, we merely create a C struct (which lives in the HEAP?) and manually free the struct pointer returned by
getaddrinfo
.