abarringer (owner)

Fork Of

gist: 87908 by tmm1 async dns + cache for EM

Revisions

gist: 110510 Download_button fork
public
Public Clone URL: git://gist.github.com/110510.git
Embed All Files: show embed
lib/em/dns_cache.rb #
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
# $Id: dns_cache.rb 5040 2007-10-05 17:31:04Z francis $
#
#
 
require 'rubygems'
require 'eventmachine'
require 'resolv'
 
 
module EventMachine
module DnsCache
 
class Cache
def initialize
@hash = {}
end
def add domain, value, expiration
ex = ((expiration < 0) ? :none : (Time.now + expiration))
@hash[domain] = [ex, value]
end
def retrieve domain
if @hash.has_key?(domain)
d = @hash[domain]
if d.first != :none and d.first < Time.now
@hash.delete(domain)
nil
else
d.last
end
end
end
end
 
@a_cache = Cache.new
@mx_cache = Cache.new
@nameservers = []
@message_ix = 0
 
def self.add_nameserver ns
@nameservers << ns unless @nameservers.include?(ns)
end
 
def self.verbose v=true
@verbose = v
end
 
 
def self.add_cache_entry cache_type, domain, value, expiration
cache = if cache_type == :mx
@mx_cache
elsif cache_type == :a
@a_cache
else
raise "bad cache type"
end
 
v = EM::DefaultDeferrable.new
v.succeed( value.dup.freeze )
cache.add domain, v, expiration
end
 
# Needs to be DRYed up with resolve_mx.
#
def self.resolve domain
if d = @a_cache.retrieve(domain)
d
else
=begin
d = @a_cache[domain]
if d.first < Time.now
$>.puts "Expiring stale cache entry for #{domain}" if @verbose
@a_cache.delete domain
resolve domain
else
$>.puts "Fulfilled #{domain} from cache" if @verbose
d.last
end
else
=end
$>.puts "Fulfilling #{domain} from network" if @verbose
d = EM::DefaultDeferrable.new
d.timeout(5)
@a_cache.add domain, d, 300 # Hard-code a 5 minute expiration
#@a_cache[domain] = [Time.now+120, d] # Hard code a 120-second expiration.
 
lazy_initialize
m = Resolv::DNS::Message.new
m.rd = 1
m.add_question domain, Resolv::DNS::Resource::IN::A
m = m.encode
@nameservers.each {|ns|
@message_ix = (@message_ix + 1) % 60000
Request.new d, @message_ix
msg = m.dup
msg[0,2] = [@message_ix].pack("n")
@u.send_datagram msg, ns, 53
}
 
d.callback {|resp|
r = []
resp.each_answer {|name,ttl,data|
r << data.address.to_s
}
 
# Freeze the array since we'll be keeping it in cache and passing it
# around to multiple users. And alternative would have been to dup it.
r.freeze
d.succeed r
}
 
 
d
end
end
 
 
# Needs to be DRYed up with resolve.
#
def self.resolve_mx domain
if d = @mx_cache.retrieve(domain)
d
else
=begin
if @mx_cache.has_key?(domain)
d = @mx_cache[domain]
if d.first < Time.now
$>.puts "Expiring stale cache entry for #{domain}" if @verbose
@mx_cache.delete domain
resolve_mx domain
else
$>.puts "Fulfilled #{domain} from cache" if @verbose
d.last
end
else
=end
$>.puts "Fulfilling #{domain} from network" if @verbose
d = EM::DefaultDeferrable.new
d.timeout(5)
#@mx_cache[domain] = [Time.now+120, d] # Hard code a 120-second expiration.
@mx_cache.add domain, d, 300 # Hard-code a 5 minute expiration
 
mx_query = MxQuery.new d
 
lazy_initialize
m = Resolv::DNS::Message.new
m.rd = 1
m.add_question domain, Resolv::DNS::Resource::IN::MX
m = m.encode
@nameservers.each {|ns|
@message_ix = (@message_ix + 1) % 60000
Request.new mx_query, @message_ix
msg = m.dup
msg[0,2] = [@message_ix].pack("n")
@u.send_datagram msg, ns, 53
}
 
 
 
d
end
 
end
 
 
def self.lazy_initialize
# Will throw an exception if EM is not running.
# We wire a signaller into the socket handler to tell us when that socket
# goes away. (Which can happen, among other things, if the reactor
# stops and restarts.)
#
raise "EventMachine reactor not running" unless EM.reactor_running?
 
unless @u
us = proc {@u = nil}
@u = EM::open_datagram_socket( "0.0.0.0", 0, Socket ) {|c|
c.unbind_signaller = us
}
end
 
end
 
 
def self.parse_local_mx_records txt
domain = nil
addrs = []
 
add_it = proc {
a = addrs.sort {|m,n| m.last <=> n.last}.map {|y| y.first}
add_cache_entry :mx, domain, a, -1
}
 
txt = StringIO.new( txt ) if txt.is_a?(String)
txt.each_line {|ln|
if ln =~ /\A\s*([\d\w\.\-\_]+)\s+(\d+)\s*\Z/
if domain
addrs << [$1.dup, $2.dup.to_i]
end
elsif ln =~ /\A\s*([^\s\:]+)\s*\:\s*\Z/
add_it.call if domain
domain = $1.dup
addrs.clear
end
}
 
add_it.call if domain
end
 
 
class MxQuery
include EM::Deferrable
 
def initialize rslt
@result = rslt # Deferrable
@n_a_lookups = 0
 
self.callback {|resp|
addrs = {}
resp.each_additional {|name,ttl,data|
addrs.has_key?(name) ? (addrs[name] << data.address.to_s) : (addrs[name] = [data.address.to_s])
}
 
@addresses = resp.answer.
sort {|a,b| a[2].preference <=> b[2].preference}.
map {|name,ttl,data|
ex = data.exchange
addrs[ex] or EM::DnsCache.resolve(ex.to_s)
}
 
@addresses.each_with_index {|a,ix|
if a.respond_to?(:set_deferred_status)
@n_a_lookups += 1
a.callback {|r|
@addresses[ix] = r
@n_a_lookups -= 1
succeed_result if @n_a_lookups == 0
}
end
}
 
succeed_result if @n_a_lookups == 0
}
end
 
def succeed_result
# Questionable whether we should uniq if it perturbs the sort order.
# Also freeze it so some user can't wipe it out on us.
@result.succeed @addresses.flatten.uniq.freeze
end
 
end
 
class Request
include EM::Deferrable
 
@@outstanding = {}
 
def self.post response
if r = @@outstanding.delete(response.id)
r.succeed response
end
end
 
def initialize rslt, m_id
@result = rslt
@msgid = m_id
raise "request-queue overflow" if @@outstanding.has_key?(@msgid)
@@outstanding[@msgid] = self
 
self.timeout(10)
self.errback { @@outstanding.delete(@msgid) }
self.callback {|resp| @result.succeed resp }
end
end
 
class Socket < EM::Connection
attr_accessor :unbind_signaller
 
def receive_data dg
m = nil
begin
m = Resolv::DNS::Message.decode dg
rescue
end
Request.post(m) if m
end
 
def unbind
@unbind_signaller.call if @unbind_signaller
end
end
 
end
end
 
 
test/test_basic.rb #
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
# $Id: test_basic.rb 5040 2007-10-05 17:31:04Z francis $
#
#
 
$:.unshift(File.dirname(__FILE__) + '/../lib')
require 'dns_cache'
 
 
 
 
#--------------------------------------
 
 
class TestBasic < Test::Unit::TestCase
 
TestNameserver = "151.202.0.85"
TestNameserver2 = "151.202.0.86"
 
LocalMxRecords = %Q(
boondoggle.zzz:
65.66.67.68 10
55.56.57.58 5
esmtp.someone.zzz 4
 
boondoggle.yyy:
)
 
def test_a
EM::DnsCache.add_nameserver TestNameserver
EM::DnsCache.verbose
 
out = nil
 
EM.run {
d = EM::DnsCache.resolve "bayshorenetworks.com"
d.errback {EM.stop}
d.callback {|r|
d = EM::DnsCache.resolve "bayshorenetworks.com"
d.errback { EM.stop }
d.callback {|r| puts r; out = r; EM.stop }
}
}
 
assert out
end
 
def test_a_pair
EM::DnsCache.add_nameserver TestNameserver
EM::DnsCache.verbose
 
out = nil
 
EM.run {
d = EM::DnsCache.resolve "maila.microsoft.com"
d.errback {EM.stop}
d.callback {|r|
out = r
EM.stop
}
}
 
assert_equal( Array, out.class )
assert_equal( 2, out.length )
end
 
 
# This test causes each request to hit the network because they're all scheduled
# before the first one can come back and load the cache. Although a nice mis-feature for
# stress testing, it would be nice to fix it someday, perhaps by not kicking off a
# request for a particular domain if one is already pending.
# Without epoll, this test gets really slow and usually doesn't complete.
def test_lots_of_a
EM.epoll
EM::DnsCache.add_nameserver TestNameserver
EM::DnsCache.add_nameserver TestNameserver2
EM::DnsCache.verbose
 
n = 250
e = 0
s = 0
EM.run {
n.times {
d = EM::DnsCache.resolve "ibm.com"
d.errback {e+=1; n -= 1; EM.stop if n == 0}
d.callback {s+=1; n -= 1; EM.stop if n == 0}
}
}
assert_equal( 0, n)
assert_equal( 250, s)
end
 
 
 
 
def test_mx
EM::DnsCache.add_nameserver TestNameserver
EM::DnsCache.verbose
 
out = nil
 
EM.run {
d = EM::DnsCache.resolve_mx "steamheat.net"
d.errback {EM.stop}
d.callback {|r|
p r
d = EM::DnsCache.resolve_mx "steamheat.net"
d.errback {EM.stop}
d.callback {|r|
out = r
p r
EM.stop
}
}
}
 
assert out
end
 
 
# The arrays of addresses we get back from the DnsCache are FROZEN.
# That's because the same objects get passed around to any caller that
# asks for them. If you need to modify the array, dup it.
#
def test_freeze
EM::DnsCache.add_nameserver TestNameserver
EM::DnsCache.verbose
 
out = nil
 
EM.run {
d = EM::DnsCache.resolve_mx "steamheat.net"
d.errback {EM.stop}
d.callback {|r|
out = r
EM.stop
}
}
 
assert out
assert( out.length > 0)
assert_raise( TypeError ) {
out.clear
}
end
 
 
def test_local_defs
EM::DnsCache.add_nameserver TestNameserver
EM::DnsCache.verbose
 
EM::DnsCache.add_cache_entry( :mx, "example.zzz", ["1.2.3.4"], -1 )
out = nil
EM.run {
d = EM::DnsCache.resolve_mx "example.zzz"
d.errback {EM.stop}
d.callback {|r|
out = r
EM.stop
}
}
assert_equal( ["1.2.3.4"], out )
end
 
def test_multiple_local_defs
EM::DnsCache.verbose
 
EM::DnsCache.add_cache_entry( :mx, "example.zzz", ["1.2.3.4", "5.6.7.8"], -1 )
out = nil
EM.run {
d = EM::DnsCache.resolve_mx "example.zzz"
d.errback {EM.stop}
d.callback {|r|
out = r
EM.stop
}
}
assert_equal( ["1.2.3.4","5.6.7.8"], out )
end
 
# Adding cache entries where they already exist causes them to be REPLACED.
#
def test_replace
EM::DnsCache.verbose
 
EM::DnsCache.add_cache_entry( :mx, "example.zzz", ["1.2.3.4", "5.6.7.8"], -1 )
EM::DnsCache.add_cache_entry( :mx, "example.zzz", ["10.11.12.13"], -1 )
out = nil
EM.run {
d = EM::DnsCache.resolve_mx "example.zzz"
d.errback {EM.stop}
d.callback {|r|
out = r
EM.stop
}
}
assert_equal( ["10.11.12.13"], out )
end
 
# We have a facility for storing locally-defined MX records.
# The DNS cache has a way to parse and process these values.
#
def test_external_mx_defs
EM::DnsCache.verbose
 
EM::DnsCache.parse_local_mx_records LocalMxRecords
 
out = nil
EM.run {
d = EM::DnsCache.resolve_mx "boondoggle.zzz"
d.errback {EM.stop}
d.callback {|r|
out = r
EM.stop
}
}
assert_equal( ["esmtp.someone.zzz", "55.56.57.58", "65.66.67.68"], out )
end
 
end