Skip to content

Instantly share code, notes, and snippets.

@ohac
Created August 10, 2008 05:43
Show Gist options
  • Save ohac/4729 to your computer and use it in GitHub Desktop.
Save ohac/4729 to your computer and use it in GitHub Desktop.
Simple Distributed File System (SDFS)
#!/usr/bin/ruby
require 'digest'
require 'fileutils'
require 'tempfile'
require 'yaml'
require 'uri'
class DFS
INDEXNAME = 'index'
CONFIGNAME = 'config'
DIGEST_METHOD = Digest::SHA256
MD5 = Digest::MD5
def initialize
@storagetable = []
end
def namenodes
table = @storagetable[0, 2] # use primary and secondary datanode
nodes = []
table.each do |node|
nodes << DataNode.get(node)
end
nodes
end
def add(url, size = '1GB')
@storagetable << { :url => url, :size => size, :removable => false }
end
def loadconfig(file)
open(file) do |f|
@storagetable = YAML::load(f)
end
@storagetable.each do |s|
s[:size] ||= '1GB'
s[:removable] ||= false
end
end
def datanodes(digeststr = '', n = nil)
digest = MD5.new # use light weight digest
digest.update(digeststr)
nodes = []
n = @storagetable.size unless n
n = @storagetable.size if n > @storagetable.size
n.times do |i|
target = choose(digest, i)
nodes << DataNode.get(target)
end
nodes
end
def putconfig(configname = CONFIGNAME)
open(configname, 'w') do |f|
YAML::dump(@storagetable, f)
end
digest = DIGEST_METHOD.file(configname)
namenodes.each do |datanode|
datanode.put(configname, digest.to_s)
datanode.putspecial(digest.to_s, CONFIGNAME)
end
end
def getconfig
namenodes.each do |datanode|
tmp = Tempfile.new('sdfs')
tmp.close
FileUtils.rm_f(tmp.path)
datanode.getspecial(CONFIGNAME, tmp.path)
next unless File.exist?(tmp.path)
loadconfig(tmp.path)
tmp.close(true)
break
end
end
def clear()
@storagetable.clear
end
def storagesize(str)
val = str.chop.chop.to_f
case str[-2, 2]
when 'TB', 'tb'
val * (1024 ** 2)
when 'GB', 'gb'
val * 1024
when 'MB', 'mb'
val
when 'KB', 'kb'
val / 1024
end
end
def choose(digest, n = 0)
digest = digest.clone
allsize = @storagetable.inject(0.0) {|a,b| a + storagesize(b[:size])}
map = []
sum = 0.0
removablef = false
@storagetable.each do |s|
removablef = true if s[:removable]
sum += storagesize(s[:size]) / allsize
map << (0x10000 * sum).to_i
end
num = 0
nums = []
while n >= 0
str = digest.digest
digest.update('.')
dice = (str[-2] << 8) | str[-1]
num = map.index {|v| dice <= v }
next if nums.include?(num)
if removablef
# first datanode must not be removable
next if nums.size == 0 and @storagetable[num][:removable]
# second datanode must be removable
next if nums.size == 1 and !@storagetable[num][:removable]
end
n -= 1
nums << num
end
@storagetable[num]
end
def put(src, dest = nil)
dest = File.basename(src) unless dest
destdir = File.dirname(dest)
return false if destdir != '.' and file?(destdir)
dest += '/' + File.basename(src) if dir?(dest)
digest = DIGEST_METHOD.file(src)
rep = 2
datanodes(digest.to_s).each do |datanode|
next unless datanode.put(src, digest.to_s)
# TODO take a snapshot to namenodes if datanode is removable
rep -= 1
break if rep == 0
end
return false if rep == 2
puts 'warning: replication' if rep > 0
index = ls(:raw => true)
searchptn = /^#{dest} /
pos = index.index(searchptn)
if pos
before = index[0, pos]
pos2 = index.index("\n", pos)
after = index[pos2 + 1, index.size]
index = before + after
end
index += "%s %s\n" % [ dest, digest.to_s ] # TODO filename contains space
tmp = Tempfile.new('sdfs')
tmp.write(index)
tmp.close
digest = DIGEST_METHOD.file(tmp.path)
namenodes.each do |datanode|
# TODO take a removable datanode's snapshot(s) here
datanode.put(tmp.path, digest.to_s)
datanode.putspecial(digest.to_s, INDEXNAME)
end
tmp.close(true)
true
end
def get(name, verbose = true)
index = ls(:raw => true)
searchptn = /^#{name} /
pos = index.index(searchptn)
return nil unless pos
pos2 = index.index("\n", pos)
digeststr = index[pos, pos2].split[1]
datanodes(digeststr).each do |datanode|
data = datanode.get(digeststr)
if data
digeststr2 = DIGEST_METHOD.hexdigest(data)
if digeststr != digeststr2
puts 'warning: digest' if verbose
next
end
return data
end
end
nil
end
def ls(params = {}, entry = nil)
namenodes.each do |datanode|
tmp = Tempfile.new('sdfs')
tmp.close
FileUtils.rm_f(tmp.path)
datanode.getspecial(INDEXNAME, tmp.path)
next unless File.exist?(tmp.path)
index = IO.read(tmp.path)
tmp.close(true)
return index if params[:raw]
a = []
index.each_line do |l|
f = l.split(' ')[0] # TODO filename contains space
if entry
dir = entry
dir = dir.chop if dir[-1] == ?/
if f.index(/^#{dir}\//)
f = f[dir.size + 1, f.size]
elsif f.index(/^#{entry}$/)
else
next
end
end
if f.include?('/') # TODO filename contains slash
f = f.split('/')[0] + '/'
end
a << f
end
return a.uniq.join("\n")
end
''
end
def check(quiet = false)
result = true
unbalance = overrep = replication = missing = offline = badchunks = 0
datanodes.each do |datanode|
unless datanode.online?
offline += 1
next # TODO consider if datanode is removable
end
unless datanode.check(quiet)
badchunks += 1
result = false
end
end
ls(:raw => true).each_line do |l|
chunks = l.split
fname = chunks.shift # TODO filename contains space
chunks.each do |chunk|
count = 2
rep = 0
datanodes(chunk).each do |datanode|
if datanode.exist?(chunk)
rep += 1
else
if count > 0
puts "warning: #{datanode} should have #{chunk}" unless quiet
unbalance += 1
end
end
count -= 1
end
if rep == 0
puts "error: missing #{chunk}" unless quiet
missing += 1
elsif rep < 2
puts "warning: no replication #{chunk}" unless quiet
replication += 1
elsif rep > 2
overrep += 1
end
end
end
unless quiet
puts
puts 'Result:'
puts "online: #{@storagetable.size - offline}"
puts "offline: #{offline}"
puts "unbalance: #{unbalance}"
puts "under replication: #{replication}"
puts "over replication: #{overrep}"
puts "missing: #{missing}"
puts "datanode has bad chunks: #{badchunks}"
puts "all chunks: N/A" # TODO
puts "trash chunks: N/A" # TODO
puts "expired removable datanodes: N/A" # TODO
end
result
end
def dir?(dir)
pos = ls(:raw => true).index(/^#{dir}\//)
(pos ? true : false)
end
def file?(file)
pos = ls(:raw => true).index(/^#{file} /)
(pos ? true : false)
end
end
class DataNode
def self.get(param)
url = param[:url]
c = FileDataNode
c = SSHDataNode if url.index('ssh://') == 0
c.new(param)
end
def initialize(param)
@url = param[:url]
@removable = param[:removable]
end
def to_s
@url
end
end
class FileDataNode < DataNode
def initialize(param)
super(param)
@mount = param[:mount]
end
def online?
return true unless @mount
str = `mount`
str.index(/ on #{@mount} /) != nil # TODO
end
def put(src, digest)
begin
FileUtils.mkdir_p('%s/chunk' % @url)
FileUtils.cp(src, '%s/chunk/%s' % [ @url, digest ])
true
rescue Errno::EACCES, Errno::EEXIST
false
end
end
def putspecial(digest, name)
# TODO Windows can't handle symlink?
begin
FileUtils.ln_sf("chunk/%s" % digest, "#{@url}/#{name}")
true
rescue Errno::ENOTDIR
false
end
end
def get(digest)
begin
IO.read('%s/chunk/%s' % [ @url, digest ]) # FIXME use binary mode
rescue Errno::ENOENT
nil
end
end
def getspecial(name, dest = nil)
begin
dest = name unless dest
FileUtils.cp("#{@url}/#{name}", dest)
true
rescue Errno::ENOENT, Errno::ENOTDIR
false
end
end
def exist?(digest)
if @removable and !online?
# TODO look at latest snapshot which is not expired
false
else
File.exist?("#{@url}/chunk/#{digest}")
end
end
def check(quiet = false)
return true unless File.directory?("#{@url}/chunk")
str = `cd #{@url}/chunk && sha256sum *`
result = true
str.each_line do |l|
file, digest = l.split
if file != digest
unless quiet
puts "#{@url} has a bad chunk." # TODO fix when?
puts file
puts digest
end
result = false
end
end
result
end
end
class SSHDataNode < DataNode
def initialize(param)
super(param)
uri = URI.parse(@url)
user = uri.user
@host = (user ? user + '@' : '') + uri.host
@path = uri.path
@path = @path[1, @path.size]
end
def online?
true # TODO
end
def ssh(command)
system("ssh #{@host} #{command}")
end
def put(src, digest)
return false unless ssh("mkdir -p #{@path}/chunk")
return false unless system("scp #{src} #{@host}:#{@path}/chunk/#{digest}")
true
end
def putspecial(digest, name)
ssh("ln -sf chunk/#{digest} #{@path}/#{name}")
end
def get(digest)
tmp = Tempfile.new('sdfs')
tmp.close
fname = tmp.path
FileUtils.rm_f(fname)
return nil unless system(
"scp #{@host}:#{@path}/chunk/#{digest} #{fname} >/dev/null")
begin
IO.read(fname) # FIXME use binary mode
rescue Errno::ENOENT
nil
end
end
def getspecial(name, dest = nil)
dest = name unless dest
system("scp #{@host}:#{@path}/#{name} #{dest}")
end
def exist?(digest)
ssh("ls #{@path}/chunk/#{digest} 2>/dev/null >/dev/null")
end
def check(quiet = false)
result = false
if ssh("sha256sum #{@path}/chunk/* >/tmp/sha256.txt") # FIXME
result = true
open("/tmp/sha256.txt", 'r') do |f|
f.each_line do |l|
file, digest = l.split
if digest.index(file) < 0
unless quiet
puts "#{@url} has a bad chunk." # TODO fix when?
puts file
puts digest
end
result = false
end
end
end
end
result
end
end
#!/usr/bin/ruby
require 'sdfs'
require 'test/unit'
class DFSTest < Test::Unit::TestCase
URLS = 'a b c d'.split.map {|i| "tmp/dfs/#{i}" }
def setup
@dfs = DFS.new
URLS.each {|url| FileUtils.rm_rf(url) }
FileUtils.mkdir_p('tmp')
open('tmp/a', 'w') do |f|
f.puts('z')
end
30.times do |i|
open("tmp/#{i}", 'w') do |f|
f.puts(i.to_s)
end
end
end
def test_basic
URLS.each {|url| @dfs.add(url) }
@dfs.putconfig('tmp/config')
@dfs.put('tmp/a', 'd/a')
lsstr = @dfs.ls
@dfs.clear
@dfs.add(URLS[0])
@dfs.getconfig
lsstr2 = @dfs.ls
assert_equal(lsstr, lsstr2)
text = @dfs.get('d/a')
assert_equal(Digest::MD5.hexdigest("z\n"), Digest::MD5.hexdigest(text))
assert_nil(@dfs.get('d/b'))
URLS.each do |url|
next unless File.exist?(url)
FileUtils.mv(url, 'tmp/datanode')
assert_equal(Digest::MD5.hexdigest("z\n"), Digest::MD5.hexdigest(text))
lsstr2 = @dfs.ls
assert_equal(lsstr, lsstr2)
FileUtils.mv('tmp/datanode', url)
end
end
EXPECTED_DIR = <<EOF
d c865f6c5ab8d1b0bcd383a5e1e3879d22681c96bf462c269b7581d523fbe70ab
d2/b c865f6c5ab8d1b0bcd383a5e1e3879d22681c96bf462c269b7581d523fbe70ab
d2/a c865f6c5ab8d1b0bcd383a5e1e3879d22681c96bf462c269b7581d523fbe70ab
EOF
def test_dir
URLS.each {|url| @dfs.add(url) }
@dfs.putconfig('tmp/config')
@dfs.put('tmp/a', 'd')
assert(@dfs.put('tmp/a', 'd/a') == false)
@dfs.put('tmp/a', 'd2/b')
@dfs.put('tmp/a', 'd2')
assert_equal(EXPECTED_DIR, @dfs.ls(:raw => true))
end
def test_badchunk
URLS.each {|url| @dfs.add(url) }
@dfs.putconfig('tmp/config')
@dfs.put('tmp/a', 'a')
assert(@dfs.check(true))
digest = 'c865f6c5ab8d1b0bcd383a5e1e3879d22681c96bf462c269b7581d523fbe70ab'
open("tmp/dfs/d/chunk/#{digest}", 'w') do |f|
f.write('Z')
end
assert(!@dfs.check(true))
text = @dfs.get('a', false)
assert_equal(Digest::MD5.hexdigest("z\n"), Digest::MD5.hexdigest(text))
open("tmp/dfs/a/chunk/#{digest}", 'w') do |f|
f.write('Z')
end
assert(!@dfs.check(true))
assert_nil(@dfs.get('a', false))
end
def test_offline
URLS.each {|url| @dfs.add(url) }
@dfs.putconfig('tmp/config')
URLS.each do |url|
next unless File.exist?(url)
FileUtils.mv(url, 'tmp/datanode')
FileUtils.touch(url)
30.times do |i|
@dfs.put("tmp/#{i}")
end
FileUtils.rm(url)
FileUtils.mv('tmp/datanode', url)
end
end
end
class UITest < Test::Unit::TestCase
EXPECTED_LOG = <<EOF
commands: init, config, ls, cat, get, put, check
commands: init, config, ls, cat, get, put, check
a
z
EOF
def setup
FileUtils.rm_rf('tmp')
FileUtils.mkdir_p('tmp')
ENV['HOME'] = 'tmp'
open('tmp/a', 'w') do |f|
f.puts('z')
end
end
def test_uibasic
system('ruby ui.rb >tmp/log')
system('ruby ui.rb help >>tmp/log')
system('ruby ui.rb init tmp/x tmp/y tmp/z >>tmp/log')
system('ruby ui.rb put tmp/a a >>tmp/log')
system('ruby ui.rb ls >>tmp/log')
system('ruby ui.rb cat a >>tmp/log')
system('ruby ui.rb get a tmp/b >>tmp/log')
actual_log = IO.read('tmp/log')
assert_equal(EXPECTED_LOG, actual_log)
actual_b = IO.read('tmp/b')
assert_equal("z\n", actual_b)
end
end
#!/usr/bin/ruby
require File.dirname(__FILE__) + '/sdfs'
require 'optparse'
opt = OptionParser.new
opt.on('-r [URL]') {|v| p :todo } # TODO
opt.order!(ARGV)
cmd = ARGV.shift
opt = OptionParser.new
CONFIGFILE = "#{ENV['HOME']}/.sdfs.yaml"
dfs = DFS.new
dfs.loadconfig(CONFIGFILE) if File.exist?(CONFIGFILE)
case cmd
when 'init'
urls = ARGV
if urls.size == 0
puts 'no URL(s)'
exit 1
end
if File.exist?(CONFIGFILE)
puts "remove #{CONFIGFILE} first"
exit 1
end
urls.each {|url| dfs.add(url) }
dfs.putconfig(CONFIGFILE)
when 'config'
configcmd = ARGV.shift
case configcmd
when 'get'
dfs.getconfig
dfs.putconfig(CONFIGFILE)
when 'put'
dfs.putconfig
else
puts 'commands: get, put'
end
when 'ls'
lstxt = dfs.ls({}, ARGV.shift)
puts lstxt if lstxt.size > 0
when 'cat'
file = ARGV.shift
print dfs.get(file)
when 'get'
src = ARGV.shift
dest = ARGV.shift
if src == nil
puts 'error'
exit 1
end
dest = File.basename(src) unless dest
open(dest, 'wb') do |f|
f.write(dfs.get(src))
end
when 'put'
src = ARGV.shift
dest = ARGV.shift
if src == nil
puts 'error'
exit 1
end
put 'error' unless dfs.put(src, dest)
when 'check'
dfs.check
else
puts 'commands: init, config, ls, cat, get, put, check'
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment