Created
August 10, 2008 05:43
-
-
Save ohac/4729 to your computer and use it in GitHub Desktop.
Simple Distributed File System (SDFS)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/ruby | |
require 'digest' | |
require 'fileutils' | |
require 'tempfile' | |
require 'yaml' | |
require 'uri' | |
class DFS | |
INDEXNAME = 'index' | |
CONFIGNAME = 'config' | |
DIGEST_METHOD = Digest::SHA256 | |
MD5 = Digest::MD5 | |
def initialize | |
@storagetable = [] | |
end | |
def namenodes | |
table = @storagetable[0, 2] # use primary and secondary datanode | |
nodes = [] | |
table.each do |node| | |
nodes << DataNode.get(node) | |
end | |
nodes | |
end | |
def add(url, size = '1GB') | |
@storagetable << { :url => url, :size => size, :removable => false } | |
end | |
def loadconfig(file) | |
open(file) do |f| | |
@storagetable = YAML::load(f) | |
end | |
@storagetable.each do |s| | |
s[:size] ||= '1GB' | |
s[:removable] ||= false | |
end | |
end | |
def datanodes(digeststr = '', n = nil) | |
digest = MD5.new # use light weight digest | |
digest.update(digeststr) | |
nodes = [] | |
n = @storagetable.size unless n | |
n = @storagetable.size if n > @storagetable.size | |
n.times do |i| | |
target = choose(digest, i) | |
nodes << DataNode.get(target) | |
end | |
nodes | |
end | |
def putconfig(configname = CONFIGNAME) | |
open(configname, 'w') do |f| | |
YAML::dump(@storagetable, f) | |
end | |
digest = DIGEST_METHOD.file(configname) | |
namenodes.each do |datanode| | |
datanode.put(configname, digest.to_s) | |
datanode.putspecial(digest.to_s, CONFIGNAME) | |
end | |
end | |
def getconfig | |
namenodes.each do |datanode| | |
tmp = Tempfile.new('sdfs') | |
tmp.close | |
FileUtils.rm_f(tmp.path) | |
datanode.getspecial(CONFIGNAME, tmp.path) | |
next unless File.exist?(tmp.path) | |
loadconfig(tmp.path) | |
tmp.close(true) | |
break | |
end | |
end | |
def clear() | |
@storagetable.clear | |
end | |
def storagesize(str) | |
val = str.chop.chop.to_f | |
case str[-2, 2] | |
when 'TB', 'tb' | |
val * (1024 ** 2) | |
when 'GB', 'gb' | |
val * 1024 | |
when 'MB', 'mb' | |
val | |
when 'KB', 'kb' | |
val / 1024 | |
end | |
end | |
def choose(digest, n = 0) | |
digest = digest.clone | |
allsize = @storagetable.inject(0.0) {|a,b| a + storagesize(b[:size])} | |
map = [] | |
sum = 0.0 | |
removablef = false | |
@storagetable.each do |s| | |
removablef = true if s[:removable] | |
sum += storagesize(s[:size]) / allsize | |
map << (0x10000 * sum).to_i | |
end | |
num = 0 | |
nums = [] | |
while n >= 0 | |
str = digest.digest | |
digest.update('.') | |
dice = (str[-2] << 8) | str[-1] | |
num = map.index {|v| dice <= v } | |
next if nums.include?(num) | |
if removablef | |
# first datanode must not be removable | |
next if nums.size == 0 and @storagetable[num][:removable] | |
# second datanode must be removable | |
next if nums.size == 1 and !@storagetable[num][:removable] | |
end | |
n -= 1 | |
nums << num | |
end | |
@storagetable[num] | |
end | |
def put(src, dest = nil) | |
dest = File.basename(src) unless dest | |
destdir = File.dirname(dest) | |
return false if destdir != '.' and file?(destdir) | |
dest += '/' + File.basename(src) if dir?(dest) | |
digest = DIGEST_METHOD.file(src) | |
rep = 2 | |
datanodes(digest.to_s).each do |datanode| | |
next unless datanode.put(src, digest.to_s) | |
# TODO take a snapshot to namenodes if datanode is removable | |
rep -= 1 | |
break if rep == 0 | |
end | |
return false if rep == 2 | |
puts 'warning: replication' if rep > 0 | |
index = ls(:raw => true) | |
searchptn = /^#{dest} / | |
pos = index.index(searchptn) | |
if pos | |
before = index[0, pos] | |
pos2 = index.index("\n", pos) | |
after = index[pos2 + 1, index.size] | |
index = before + after | |
end | |
index += "%s %s\n" % [ dest, digest.to_s ] # TODO filename contains space | |
tmp = Tempfile.new('sdfs') | |
tmp.write(index) | |
tmp.close | |
digest = DIGEST_METHOD.file(tmp.path) | |
namenodes.each do |datanode| | |
# TODO take a removable datanode's snapshot(s) here | |
datanode.put(tmp.path, digest.to_s) | |
datanode.putspecial(digest.to_s, INDEXNAME) | |
end | |
tmp.close(true) | |
true | |
end | |
def get(name, verbose = true) | |
index = ls(:raw => true) | |
searchptn = /^#{name} / | |
pos = index.index(searchptn) | |
return nil unless pos | |
pos2 = index.index("\n", pos) | |
digeststr = index[pos, pos2].split[1] | |
datanodes(digeststr).each do |datanode| | |
data = datanode.get(digeststr) | |
if data | |
digeststr2 = DIGEST_METHOD.hexdigest(data) | |
if digeststr != digeststr2 | |
puts 'warning: digest' if verbose | |
next | |
end | |
return data | |
end | |
end | |
nil | |
end | |
def ls(params = {}, entry = nil) | |
namenodes.each do |datanode| | |
tmp = Tempfile.new('sdfs') | |
tmp.close | |
FileUtils.rm_f(tmp.path) | |
datanode.getspecial(INDEXNAME, tmp.path) | |
next unless File.exist?(tmp.path) | |
index = IO.read(tmp.path) | |
tmp.close(true) | |
return index if params[:raw] | |
a = [] | |
index.each_line do |l| | |
f = l.split(' ')[0] # TODO filename contains space | |
if entry | |
dir = entry | |
dir = dir.chop if dir[-1] == ?/ | |
if f.index(/^#{dir}\//) | |
f = f[dir.size + 1, f.size] | |
elsif f.index(/^#{entry}$/) | |
else | |
next | |
end | |
end | |
if f.include?('/') # TODO filename contains slash | |
f = f.split('/')[0] + '/' | |
end | |
a << f | |
end | |
return a.uniq.join("\n") | |
end | |
'' | |
end | |
def check(quiet = false) | |
result = true | |
unbalance = overrep = replication = missing = offline = badchunks = 0 | |
datanodes.each do |datanode| | |
unless datanode.online? | |
offline += 1 | |
next # TODO consider if datanode is removable | |
end | |
unless datanode.check(quiet) | |
badchunks += 1 | |
result = false | |
end | |
end | |
ls(:raw => true).each_line do |l| | |
chunks = l.split | |
fname = chunks.shift # TODO filename contains space | |
chunks.each do |chunk| | |
count = 2 | |
rep = 0 | |
datanodes(chunk).each do |datanode| | |
if datanode.exist?(chunk) | |
rep += 1 | |
else | |
if count > 0 | |
puts "warning: #{datanode} should have #{chunk}" unless quiet | |
unbalance += 1 | |
end | |
end | |
count -= 1 | |
end | |
if rep == 0 | |
puts "error: missing #{chunk}" unless quiet | |
missing += 1 | |
elsif rep < 2 | |
puts "warning: no replication #{chunk}" unless quiet | |
replication += 1 | |
elsif rep > 2 | |
overrep += 1 | |
end | |
end | |
end | |
unless quiet | |
puts | |
puts 'Result:' | |
puts "online: #{@storagetable.size - offline}" | |
puts "offline: #{offline}" | |
puts "unbalance: #{unbalance}" | |
puts "under replication: #{replication}" | |
puts "over replication: #{overrep}" | |
puts "missing: #{missing}" | |
puts "datanode has bad chunks: #{badchunks}" | |
puts "all chunks: N/A" # TODO | |
puts "trash chunks: N/A" # TODO | |
puts "expired removable datanodes: N/A" # TODO | |
end | |
result | |
end | |
def dir?(dir) | |
pos = ls(:raw => true).index(/^#{dir}\//) | |
(pos ? true : false) | |
end | |
def file?(file) | |
pos = ls(:raw => true).index(/^#{file} /) | |
(pos ? true : false) | |
end | |
end | |
class DataNode | |
def self.get(param) | |
url = param[:url] | |
c = FileDataNode | |
c = SSHDataNode if url.index('ssh://') == 0 | |
c.new(param) | |
end | |
def initialize(param) | |
@url = param[:url] | |
@removable = param[:removable] | |
end | |
def to_s | |
@url | |
end | |
end | |
class FileDataNode < DataNode | |
def initialize(param) | |
super(param) | |
@mount = param[:mount] | |
end | |
def online? | |
return true unless @mount | |
str = `mount` | |
str.index(/ on #{@mount} /) != nil # TODO | |
end | |
def put(src, digest) | |
begin | |
FileUtils.mkdir_p('%s/chunk' % @url) | |
FileUtils.cp(src, '%s/chunk/%s' % [ @url, digest ]) | |
true | |
rescue Errno::EACCES, Errno::EEXIST | |
false | |
end | |
end | |
def putspecial(digest, name) | |
# TODO Windows can't handle symlink? | |
begin | |
FileUtils.ln_sf("chunk/%s" % digest, "#{@url}/#{name}") | |
true | |
rescue Errno::ENOTDIR | |
false | |
end | |
end | |
def get(digest) | |
begin | |
IO.read('%s/chunk/%s' % [ @url, digest ]) # FIXME use binary mode | |
rescue Errno::ENOENT | |
nil | |
end | |
end | |
def getspecial(name, dest = nil) | |
begin | |
dest = name unless dest | |
FileUtils.cp("#{@url}/#{name}", dest) | |
true | |
rescue Errno::ENOENT, Errno::ENOTDIR | |
false | |
end | |
end | |
def exist?(digest) | |
if @removable and !online? | |
# TODO look at latest snapshot which is not expired | |
false | |
else | |
File.exist?("#{@url}/chunk/#{digest}") | |
end | |
end | |
def check(quiet = false) | |
return true unless File.directory?("#{@url}/chunk") | |
str = `cd #{@url}/chunk && sha256sum *` | |
result = true | |
str.each_line do |l| | |
file, digest = l.split | |
if file != digest | |
unless quiet | |
puts "#{@url} has a bad chunk." # TODO fix when? | |
puts file | |
puts digest | |
end | |
result = false | |
end | |
end | |
result | |
end | |
end | |
class SSHDataNode < DataNode | |
def initialize(param) | |
super(param) | |
uri = URI.parse(@url) | |
user = uri.user | |
@host = (user ? user + '@' : '') + uri.host | |
@path = uri.path | |
@path = @path[1, @path.size] | |
end | |
def online? | |
true # TODO | |
end | |
def ssh(command) | |
system("ssh #{@host} #{command}") | |
end | |
def put(src, digest) | |
return false unless ssh("mkdir -p #{@path}/chunk") | |
return false unless system("scp #{src} #{@host}:#{@path}/chunk/#{digest}") | |
true | |
end | |
def putspecial(digest, name) | |
ssh("ln -sf chunk/#{digest} #{@path}/#{name}") | |
end | |
def get(digest) | |
tmp = Tempfile.new('sdfs') | |
tmp.close | |
fname = tmp.path | |
FileUtils.rm_f(fname) | |
return nil unless system( | |
"scp #{@host}:#{@path}/chunk/#{digest} #{fname} >/dev/null") | |
begin | |
IO.read(fname) # FIXME use binary mode | |
rescue Errno::ENOENT | |
nil | |
end | |
end | |
def getspecial(name, dest = nil) | |
dest = name unless dest | |
system("scp #{@host}:#{@path}/#{name} #{dest}") | |
end | |
def exist?(digest) | |
ssh("ls #{@path}/chunk/#{digest} 2>/dev/null >/dev/null") | |
end | |
def check(quiet = false) | |
result = false | |
if ssh("sha256sum #{@path}/chunk/* >/tmp/sha256.txt") # FIXME | |
result = true | |
open("/tmp/sha256.txt", 'r') do |f| | |
f.each_line do |l| | |
file, digest = l.split | |
if digest.index(file) < 0 | |
unless quiet | |
puts "#{@url} has a bad chunk." # TODO fix when? | |
puts file | |
puts digest | |
end | |
result = false | |
end | |
end | |
end | |
end | |
result | |
end | |
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/ruby | |
require 'sdfs' | |
require 'test/unit' | |
class DFSTest < Test::Unit::TestCase | |
URLS = 'a b c d'.split.map {|i| "tmp/dfs/#{i}" } | |
def setup | |
@dfs = DFS.new | |
URLS.each {|url| FileUtils.rm_rf(url) } | |
FileUtils.mkdir_p('tmp') | |
open('tmp/a', 'w') do |f| | |
f.puts('z') | |
end | |
30.times do |i| | |
open("tmp/#{i}", 'w') do |f| | |
f.puts(i.to_s) | |
end | |
end | |
end | |
def test_basic | |
URLS.each {|url| @dfs.add(url) } | |
@dfs.putconfig('tmp/config') | |
@dfs.put('tmp/a', 'd/a') | |
lsstr = @dfs.ls | |
@dfs.clear | |
@dfs.add(URLS[0]) | |
@dfs.getconfig | |
lsstr2 = @dfs.ls | |
assert_equal(lsstr, lsstr2) | |
text = @dfs.get('d/a') | |
assert_equal(Digest::MD5.hexdigest("z\n"), Digest::MD5.hexdigest(text)) | |
assert_nil(@dfs.get('d/b')) | |
URLS.each do |url| | |
next unless File.exist?(url) | |
FileUtils.mv(url, 'tmp/datanode') | |
assert_equal(Digest::MD5.hexdigest("z\n"), Digest::MD5.hexdigest(text)) | |
lsstr2 = @dfs.ls | |
assert_equal(lsstr, lsstr2) | |
FileUtils.mv('tmp/datanode', url) | |
end | |
end | |
EXPECTED_DIR = <<EOF | |
d c865f6c5ab8d1b0bcd383a5e1e3879d22681c96bf462c269b7581d523fbe70ab | |
d2/b c865f6c5ab8d1b0bcd383a5e1e3879d22681c96bf462c269b7581d523fbe70ab | |
d2/a c865f6c5ab8d1b0bcd383a5e1e3879d22681c96bf462c269b7581d523fbe70ab | |
EOF | |
def test_dir | |
URLS.each {|url| @dfs.add(url) } | |
@dfs.putconfig('tmp/config') | |
@dfs.put('tmp/a', 'd') | |
assert(@dfs.put('tmp/a', 'd/a') == false) | |
@dfs.put('tmp/a', 'd2/b') | |
@dfs.put('tmp/a', 'd2') | |
assert_equal(EXPECTED_DIR, @dfs.ls(:raw => true)) | |
end | |
def test_badchunk | |
URLS.each {|url| @dfs.add(url) } | |
@dfs.putconfig('tmp/config') | |
@dfs.put('tmp/a', 'a') | |
assert(@dfs.check(true)) | |
digest = 'c865f6c5ab8d1b0bcd383a5e1e3879d22681c96bf462c269b7581d523fbe70ab' | |
open("tmp/dfs/d/chunk/#{digest}", 'w') do |f| | |
f.write('Z') | |
end | |
assert(!@dfs.check(true)) | |
text = @dfs.get('a', false) | |
assert_equal(Digest::MD5.hexdigest("z\n"), Digest::MD5.hexdigest(text)) | |
open("tmp/dfs/a/chunk/#{digest}", 'w') do |f| | |
f.write('Z') | |
end | |
assert(!@dfs.check(true)) | |
assert_nil(@dfs.get('a', false)) | |
end | |
def test_offline | |
URLS.each {|url| @dfs.add(url) } | |
@dfs.putconfig('tmp/config') | |
URLS.each do |url| | |
next unless File.exist?(url) | |
FileUtils.mv(url, 'tmp/datanode') | |
FileUtils.touch(url) | |
30.times do |i| | |
@dfs.put("tmp/#{i}") | |
end | |
FileUtils.rm(url) | |
FileUtils.mv('tmp/datanode', url) | |
end | |
end | |
end | |
class UITest < Test::Unit::TestCase | |
EXPECTED_LOG = <<EOF | |
commands: init, config, ls, cat, get, put, check | |
commands: init, config, ls, cat, get, put, check | |
a | |
z | |
EOF | |
def setup | |
FileUtils.rm_rf('tmp') | |
FileUtils.mkdir_p('tmp') | |
ENV['HOME'] = 'tmp' | |
open('tmp/a', 'w') do |f| | |
f.puts('z') | |
end | |
end | |
def test_uibasic | |
system('ruby ui.rb >tmp/log') | |
system('ruby ui.rb help >>tmp/log') | |
system('ruby ui.rb init tmp/x tmp/y tmp/z >>tmp/log') | |
system('ruby ui.rb put tmp/a a >>tmp/log') | |
system('ruby ui.rb ls >>tmp/log') | |
system('ruby ui.rb cat a >>tmp/log') | |
system('ruby ui.rb get a tmp/b >>tmp/log') | |
actual_log = IO.read('tmp/log') | |
assert_equal(EXPECTED_LOG, actual_log) | |
actual_b = IO.read('tmp/b') | |
assert_equal("z\n", actual_b) | |
end | |
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/ruby | |
require File.dirname(__FILE__) + '/sdfs' | |
require 'optparse' | |
opt = OptionParser.new | |
opt.on('-r [URL]') {|v| p :todo } # TODO | |
opt.order!(ARGV) | |
cmd = ARGV.shift | |
opt = OptionParser.new | |
CONFIGFILE = "#{ENV['HOME']}/.sdfs.yaml" | |
dfs = DFS.new | |
dfs.loadconfig(CONFIGFILE) if File.exist?(CONFIGFILE) | |
case cmd | |
when 'init' | |
urls = ARGV | |
if urls.size == 0 | |
puts 'no URL(s)' | |
exit 1 | |
end | |
if File.exist?(CONFIGFILE) | |
puts "remove #{CONFIGFILE} first" | |
exit 1 | |
end | |
urls.each {|url| dfs.add(url) } | |
dfs.putconfig(CONFIGFILE) | |
when 'config' | |
configcmd = ARGV.shift | |
case configcmd | |
when 'get' | |
dfs.getconfig | |
dfs.putconfig(CONFIGFILE) | |
when 'put' | |
dfs.putconfig | |
else | |
puts 'commands: get, put' | |
end | |
when 'ls' | |
lstxt = dfs.ls({}, ARGV.shift) | |
puts lstxt if lstxt.size > 0 | |
when 'cat' | |
file = ARGV.shift | |
print dfs.get(file) | |
when 'get' | |
src = ARGV.shift | |
dest = ARGV.shift | |
if src == nil | |
puts 'error' | |
exit 1 | |
end | |
dest = File.basename(src) unless dest | |
open(dest, 'wb') do |f| | |
f.write(dfs.get(src)) | |
end | |
when 'put' | |
src = ARGV.shift | |
dest = ARGV.shift | |
if src == nil | |
puts 'error' | |
exit 1 | |
end | |
put 'error' unless dfs.put(src, dest) | |
when 'check' | |
dfs.check | |
else | |
puts 'commands: init, config, ls, cat, get, put, check' | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment