Skip to content

Instantly share code, notes, and snippets.

@seanlynch
Created January 22, 2010 23:52
Show Gist options
  • Save seanlynch/284298 to your computer and use it in GitHub Desktop.
Save seanlynch/284298 to your computer and use it in GitHub Desktop.
An implementation of onlne codes in Python
#!/usr/bin/python2.2
# This is online.py, a Python implementation of online codes.
# Copyright (C) 2003 Sean R. Lynch <http://sean.lynch.tv/>
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2.1 of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
#You should have received a copy of the GNU Lesser General Public
#License along with this library; if not, write to the Free Software
#Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
# See http://kademlia.scs.cs.nyu.edu/msd.ps
# To run the test:
# python online.py <infile> <outfile> <db file (created)>
# In the future the db file will be used for restarting the decoder
# Changes
# 2003-05-07 seanl Changed aux block list to be indexed by block number rather
# than aux block number, and made the thing disk based
# TODO:
# - Add more/better comments
# - Cache aux blocks on disk to be able to produce
# - Faster XOR function
# - Faster/better PRNG
# - Make the interface make more sense and easier to use
# - Allow decode attempts to take place in batches rather than on the fly
# if the user desires
# - Improve the scheme for assigning message blocks to auxiliary blocks.
# I don't know how much the uniformity of the distribution of message block
# assignments affects the overhead.
import math, bisect, sha, struct, time
from hmac import _strxor as strxor
class Random:
"""Random number generator based on sha-1"""
def __init__(self, seed):
self.sha = sha.new(seed)
self.counter = 0
def next(self):
s = self.sha.copy()
s.update(struct.pack(">I", self.counter))
self.counter += 1
return s.digest()
def random(self):
r, = struct.unpack(">I", self.next()[:4])
return float(r) / 0xffffffffL
def randrange(self, end):
r, = struct.unpack(">I", self.next()[:4])
while r < 0xffffffffL % end: r = self.next()
return r % end
def shuffle(self, l):
for i in range(len(l)-1, -1, -1):
j = self.randrange(i+1)
l[j], l[i] = l[i], l[j]
class CheckBlockMixin:
def __init__(self, blocks):
self.degree = len(blocks) # only needed for debugging
self.blocks = []
# add the data of any block whose data is known to our own
for block in blocks:
if block.known: self.add_data(block.get_data())
else:
self.blocks.append(block)
block.add_checkblock(self)
def remove_block(self, block):
self.blocks.remove(block)
if self.known and len(self.blocks) == 1 and not self.blocks[0].known:
return 1
def __repr__(self):
return '<%s instance at %x with degree %d>' % (self.__class__, id(self), self.degree)
class MessageBlockMixin:
def __init__(self, decoder, blocknum):
self.decoder = decoder
self.blocknum = blocknum
self.known = 0
self.checkblocks = []
def add_checkblock(self, cb):
assert cb not in self.checkblocks
self.checkblocks.append(cb)
def get_data(self): return self.decoder.get_block(self.blocknum)
class Block:
"""A message block, aux block or check block"""
pass
class AuxBlock(Block, CheckBlockMixin, MessageBlockMixin):
def __init__(self, decoder, blocknum, blocks):
self.temp = None
CheckBlockMixin.__init__(self, blocks)
MessageBlockMixin.__init__(self, decoder, blocknum)
def add_data(self, data):
assert data is not None
if self.temp is None: self.temp = data
else: self.temp = strxor(self.temp, data)
def set_data(self, data):
# For an aux block, don't want to lose what we've already recovered
# from the other message blocks
assert not self.known
self.decoder.set_block(self.blocknum, data)
self.add_data(data)
self.known = 1
def remove_checkblock(self, cb):
self.checkblocks.remove(cb)
temp = cb.get()
self.set_data(temp)
if len(self.blocks) == 1: known = [self]
else: known = []
for cb1 in self.checkblocks:
assert cb1 is not cb and cb1 is not self
cb1.add_data(temp)
useful = cb1.remove_block(self)
if useful: known.append(cb1)
self.checkblocks = None
return known
def get(self): return self.temp
class CheckBlock(Block, CheckBlockMixin):
known = 1
def __init__(self, decoder, cbid, blocks):
self.decoder = decoder
self.id = cbid
# The decoder should have already stored our data
CheckBlockMixin.__init__(self, blocks)
def get(self):
return self.decoder.cbdb[self.id]
def add_data(self, data):
# Skip the check since a CheckBlock always starts with data
self.decoder.cbdb[self.id] = strxor(self.decoder.cbdb[self.id], data)
def __del__(self):
# Count discarded checkblocks
if self.blocks is None: self.decoder.used += 1
elif self.blocks == []: self.decoder.discarded += 1
class MessageBlock(Block, MessageBlockMixin):
def __init__(self, decoder, blocknum):
MessageBlockMixin.__init__(self, decoder, blocknum)
def set_data(self, data):
assert not self.known
self.decoder.set_block(self.blocknum, data)
self.known = 1
self.decoder.unknown -= 1
def get_data(self):
return self.decoder.get_block(self.blocknum)
def remove_checkblock(self, cb):
self.checkblocks.remove(cb)
#assert cb.data is not None
#print cb
#print "Block number", self.blocknum
temp = cb.get()
self.set_data(temp)
known = []
for cb1 in self.checkblocks:
assert cb1 is not cb and cb1 is not self
cb1.add_data(temp)
useful = cb1.remove_block(self)
if useful: known.append(cb1)
self.checkblocks = None
return known
class Parameters:
"""System parameters"""
def __init__(self, file, length, seed="42", blocksize=1024, epsilon=0.01, q=3):
self.file = file
self.length = length
self.blocksize = blocksize
n = int(math.ceil(float(length)/blocksize))
#print q, epsilon, n
F = int(math.ceil(math.log(epsilon**2.0/4.0)/math.log(1-epsilon/2.0)))
p = [0] * (F + 1)
p[1] = 1.0 - (1.0 + 1.0 / F) / (1.0 + epsilon)
for i in range(2, F+1):
# Use total probability
p[i] = p[i-1] + (1.0 - p[1]) * F / ((F - 1.0) * i * (i - 1.0))
self.n = n
self.seed = seed
self.F = F
self.p = p
n_aux = int(0.55 * q * epsilon * n)
m = int(math.ceil(float(q*n)/n_aux))
r = Random(seed)
def choose_blocks(x):
# This clearly needs to be run in the correct order
aux_assn = [] # Aux blocks this block is assigned to
for i in range(q):
x = r.randrange(n_aux)
while x in aux_assn: x = r.randrange(n_aux)
aux_assn.append(x)
return aux_assn
self.n_aux = n_aux
self.aux = map(choose_blocks, range(n))
def select_blocks(self, checkblock_id):
"""Select which blocks to use in a check block"""
r = Random(checkblock_id)
x = r.random()
# degree must follow probability distribution calculated earlier
degree = bisect.bisect_right(self.p, x)
# Select degree blocks
temp = []
m = self.n + self.n_aux
for i in range(degree):
x = r.randrange(m)
# Make sure we don't use the same block twice
while x in temp: x = r.randrange(m)
temp.append(x)
return temp
class Encoder(Parameters):
"""Generator for encoded blocks"""
def __init__(self, *args, **kwargs):
Parameters.__init__(self, *args, **kwargs)
aux = self.aux
aux_blocks = [None] * self.n_aux
for blocknum in range(self.n):
for auxblocknum in aux[blocknum]:
block = self.get_block(blocknum)
if aux_blocks[auxblocknum] is None:
aux_blocks[auxblocknum] = block
else: aux_blocks[auxblocknum] = strxor(aux_blocks[auxblocknum], block)
self.aux_blocks = aux_blocks
def block(self, checkblock_id):
"""Return another encoded block"""
l = self.select_blocks(checkblock_id)
# Return the xor of the blocks indexed by l
return reduce(strxor, map(self.get_block, l))
def get_block(self, blocknum):
"""Return a block from the composite message (including aux blocks)"""
if blocknum < self.n:
i = blocknum * self.blocksize
self.file.seek(i, 0)
block = self.file.read(self.blocksize)
if len(block) < self.blocksize:
# This is the last block of the file. Pad it.
block = block + '\0' * (self.blocksize-len(block))
else:
# Want an auxiliary block
block = self.aux_blocks[blocknum-self.n]
return block
class Decoder(Parameters):
"""Decode a set of blocks incrementally"""
def __init__(self, file, length, cbdb={}, *args, **kwargs):
Parameters.__init__(self, file, length, *args, **kwargs)
self.unknown = self.n # Keep track of number of unknown blocks
self.used = 0 # Number of check blocks used in a useful way
self.discarded = 0 # Keep track of discarded checkblocks
self.blocks = map(lambda x: MessageBlock(self, x), range(self.n))
# Generate aux blocks with their appropriate message/aux blocks
# assigned
# Need to figure out which message blocks each aux block has assigned
aux_list = map(lambda x: [], range(self.n_aux))
for block, auxblocks in zip(self.blocks, self.aux):
for auxblocknum in auxblocks: aux_list[auxblocknum].append(block)
# Create all the auxiliary blocks, knowing their block number and the
# list of message blocks assigned to them
aux_blocks = map(lambda arg: AuxBlock(self, *arg),
zip(range(self.n, self.n+self.n_aux), aux_list))
self.blocks = self.blocks + aux_blocks
self.cbdb = cbdb
# Feed any existing blocks into the decoder
for cbid in cbdb.keys(): self.add(cbid, cbdb[cbid])
def add(self, id, data):
"""Give a check block to the decoder and incrementally decode"""
# Store this checkblock's data
self.cbdb[id] = data
# Create a checkblock object with the appropriate message/aux blocks
# assigned
l = self.select_blocks(id)
blocks = map(self.blocks.__getitem__, l)
cb = CheckBlock(self, id, blocks)
if len(cb.blocks) != 1: return self.unknown
known = [cb]
while known:
cb = known[-1]
del known[-1]
if not cb.blocks: continue
assert len(cb.blocks) == 1
assert cb.known
block = cb.blocks[0]
cb.blocks = None
newknown = block.remove_checkblock(cb)
known += newknown
print self.unknown
return self.unknown
def get_block(self, blocknum):
"""Return a block from the decoded augmented message."""
i = blocknum * self.blocksize
self.file.seek(blocknum*self.blocksize, 0)
return self.file.read(self.blocksize)
def set_block(self, blocknum, data):
"""Set a block in the decoded message"""
self.file.seek(blocknum * self.blocksize, 0)
self.file.write(data)
def get_cb(self, cbid):
return self.cbdb[cbid]
if __name__ == "__main__":
import sys, os, anydbm
fp = open(sys.argv[1], 'rb')
fp.seek(0, 2)
length = fp.tell()
e = Encoder(fp, length)
outfp = open(sys.argv[2], 'w+b')
cbdb = anydbm.open(sys.argv[3], 'c')
d = Decoder(outfp, length, cbdb)
assert e.n == d.n
assert e.seed == d.seed
assert e.F == d.F
assert e.p == d.p
assert e.aux == d.aux
u = -1
i = 0
r = Random(str(time.time()))
while u:
i += 1
cbid = r.next()
cb = e.block(cbid)
u = d.add(cbid, cb)
#print i, u
# if i % 1000 == 0: d.test()
fp.close()
# Truncate the file to eliminate all the aux blocks at the end and the
# padding on the last message block
outfp.truncate(length)
outfp.close()
print "Total number of check blocks:", i
print "Number of check blocks used:", d.used
print "Number of check blocks discarded:", d.discarded
assert d.used + d.discarded == i
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment