Skip to content

Instantly share code, notes, and snippets.

@mitio
Last active November 14, 2019 14:37
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mitio/e7b36b4b1f47cda7e503b7b26cedac8e to your computer and use it in GitHub Desktop.
Save mitio/e7b36b4b1f47cda7e503b7b26cedac8e to your computer and use it in GitHub Desktop.
A rudimentary example of a distributed in-memory key-value store (with lots of bugs)

A distributed in-memory key-value store

A rudimentary example of a distributed in-memory key-value store (with lots of bugs)

Usage

In one terminal:

ruby server.rb 4001

In another terminal:

telnet localhost 4001

And use the protocol in spec.txt, e.g. PUT foo bar and then GET foo.

You can then start another server in a third terminal:

ruby server.rb 4002

And using the open telnet session, you can execute ADD_NODE localhost 4002.

Then you can connect to the new node with telnet localhost 4002 and run GET foo there and it'll work.

That's probably everything that will work 😄

require 'socket'
require 'logger'
server_port = (ARGV.first || 4143).to_i
server = TCPServer.new '0.0.0.0', server_port
logger = Logger.new STDOUT
logger.info "Listening on #{server_port}"
database = {}
nodes = {}
EOL = /\r?\n\z/
loop do
Thread.new server.accept do |connection|
_, peer_port, _, peer_ip = connection.peeraddr
client_info = "#{peer_ip}:#{peer_port}"
logger.info "Accepted a connection from #{client_info}"
connection.puts "EHLO"
loop do
begin
message = connection.gets
case message
when /\AGET ([^ \n]+?)#{EOL}/
key = $1
connection.puts database[key]
when /\ADEL ([^ \n]+?)#{EOL}/
key = $1
database.delete key
connection.puts "OK DELETED #{key}"
when /\APUT ([^ \n]+?) ([^\n]+?)#{EOL}/
key, value = $1, $2
logger.info [key, value].inspect
database[key] = value
connection.puts "OK #{key}"
when /\ALIST#{EOL}/
connection.puts database.size
connection.puts database.keys
when /\AQUIT#{EOL}/
connection.puts 'OK BYE'
connection.close
logger.info "Disconnecting #{client_info}"
break
when /\AADD_NODE ([^ ]+) (\d+)#{EOL}/
ip, port = $2, $3
new_node = [ip, port]
new_node_info = "#{ip}:#{port}"
if nodes[new_node]
logger.info "Ignoring known node #{new_node_info}"
connection.puts "NODE EXISTS #{new_node_info}"
next
end
nodes[new_node] = TCPSocket.new ip, port
logger.info "Added node #{new_node_info}"
connection.puts "ADDED NODE #{new_node_info}"
# Push our DB to all nodes
database.each do |key, value|
nodes[new_node].puts "PUT #{key} #{value}"
end
# Announce the new node to all other nodes
nodes.each do |node, node_connection|
next if node == new_node
ip, port = node
logger.info "Announcing #{new_node_info} to #{ip}:#{port}"
node_connection.puts "ADD_NODE #{ip} #{port}"
end
when /\ADEL_NODE ([^ ]+) (\d+)#{EOL}/
ip, port = $1, $2
new_node = [ip, port]
new_node_info = "#{ip}:#{port}"
unless nodes[new_node]
logger.info "Ignoring unknown node #{new_node_info}"
connection.puts "UNKNOWN NODE #{new_node_info}"
next
end
logger.info "Deleted node #{new_node_info}"
connection.puts "DELETED NODE #{new_node_info}"
# Announce the removal to all other nodes
nodes.each do |node, node_connection|
ip, port = node
logger.info "Announcing node #{new_node_info} removal to #{ip}:#{port}"
node_connection.puts "DEL_NODE #{ip} #{port}"
end
when /\ALIST_NODES#{EOL}/
connection.puts nodes.size
connection.puts nodes.keys.map { |ip, port| "#{ip}:#{port}" }
else
connection.puts "UNKNOWN MESSAGE: #{message}"
logger.warn "Unsupported message received from #{client_info}: #{message}"
end
rescue Errno::EPIPE => error
logger.info "Client #{client_info} disconnected (#{error.class}: #{error.message})"
connection.close
break
rescue Exception => e
logger.error e
connection.close
break
end
end
end
end
PUT key value
GET key
DEL key
LIST
QUIT
ADD_NODE 1.2.3.4 1234
DEL_NODE 1.2.3.4 1234
LIST_NODES
> LIST
< 3
< key-1
< key-2
< key-3
key: [^ \n]
value: [^\n]
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment