Skip to content

Instantly share code, notes, and snippets.

@avibryant
Created November 21, 2012 17:51
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save avibryant/4126443 to your computer and use it in GitHub Desktop.
Save avibryant/4126443 to your computer and use it in GitHub Desktop.
=begin
Adapatation of streaming DISCO algorithm to parallel streams/monoids.
Each instance of Disco maintains @counts which is just #(x)
as well as @h which stores (q,n) for each (x,y) pair.
n corresponds conceptually to the number of emitted values for that pair, q corresponds (as in the paper)
to the probability with which they were emitted.
A separate instance is created for each dimension, these are then merged in any order.
Initialize with a single dimension by setting counts(w) to 1 for each w
in the dimension, and h(q,n) to (1,1) for each (w1,w2).
(base case: one value, emitted with probability 1).
When merging a smaller instance (fewer keys in h) into a larger, combine their counts, then iterate through the
(x,y) pairs in smaller.h, examining their (q,n). Compute a new q' for (x,y) as in the paper, based
on the current #(x) and #(y). Then scale n by q'/q - this
is similar in spirit to the subsampling in the paper. Do the same to any existing (q,n)
for that pair in the larger instance, and record the sum of the resulting n values, recording q' as the new q.
If n falls below 1, then with probability (1-n) delete the key (setting n effectively to 0),
and with probability n set it to 1 - this prunes the key space but keeps the expectation the same.
When estimating a final cos value, perform a similar scaling then multiply by E/P as in the paper.
This simplifies to n/q(sqrt(#(x)#(y))) for a recorded (q,n).
=end
P = 1.0
E = 0.5
class Disco
attr_reader :counts, :h
def initialize(dimension)
@counts = Hash.new(0)
@h = {}
dimension.uniq!
dimension.each{|x| @counts[x] = 1}
dimension.each do |x|
dimension.each do |y|
if x < y
@h[[x,y]] = [1.0,1.0]
end
end
end
end
def merge(smaller)
smaller.counts.each{|k,v| @counts[k] += v}
smaller.h.each do |k,v|
x,y=k
q1,n1 = v
q = P/(E * Math.sqrt(@counts[x] * @counts[y]))
n = q/q1 * n1
if(v2 = @h[k])
q2,n2 = v2
n += q/q2 * n2
end
if(n > 1)
@h[k] = [q,n]
else
if n > rand
@h[k] = [q,1]
else
@h.delete(k)
end
end
end
end
def cos(x,y)
if v = @h[[x,y]]
q1, n1 = v
n1/(Math.sqrt(@counts[x] * @counts[y]) * q1)
else
0
end
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment