Skip to content

Instantly share code, notes, and snippets.

@kekemoto
Last active February 22, 2023 21:13
Show Gist options
  • Save kekemoto/8c56a5245e9a09a2ae425f4649d8d5ec to your computer and use it in GitHub Desktop.
Save kekemoto/8c56a5245e9a09a2ae425f4649d8d5ec to your computer and use it in GitHub Desktop.
A simple key-value store in Ruby.
# This is a simple KVS.
#
# Feature
#
# - Easy to use.
# - You can write and read Ruby data as it is. (Marshalable only)
# - It is thread-safe using an event loop.
# - Even separate machines can communicate via TCP.
#
# Example
#
# kvs = KeyValueStore.new
# kvs.sever
#
# kvs.client do |client|
# client[:foo] = :bar
# puts client[:foo]
# # => bar
# end
#
# Example: Interprocess communication
#
# kvs = KeyValueStore.new
# kvs.server
#
# kvs.client do |c|
# c[:input] = :input
# end
#
# Process.fork do
# kvs.client do |c|
# puts c[:input]
# # => input
#
# c[:output] = :output
# end
# end
#
# Process.wait
#
# puts kvs.client[:output]
# # => output
#
# Example: Mutex
#
# kvs = KeyValueStore.new
# kvs.server
#
# kvs.client { _1[:count] = 0 }
#
# th1 = Thread.new do
# kvs.client do |c|
# 50.times do
# c.mutex(:count) { |old| old + 1 }
# end
# end
# end
#
# th2 = Thread.new do
# kvs.client do |c|
# 50.times do
# c.mutex(:count) { |old| old + 1 }
# end
# end
# end
#
# th1.join
# th2.join
#
# puts kvs.client[:count]
# # => 100
#
# Example: Advanced use of Mutex.
#
# kvs = KeyValueStore.new
# kvs.server
#
# kvs.client do |c|
# c[:list] = []
# pp c.mutex(:list){ _1 << 1 }
# # => [1]
# pp c.mutex(:list){ _1 << 2 }
# # => [1, 2]
# pp c.mutex(:list){ _1 << 3 }
# # => [1, 2, 3]
# end
#
# Example: Ruby as it is!
#
# kvs = KeyValueStore.new
# kvs.server
#
# class Point
# def initialize(x, y)
# @x = x
# @y = y
# end
# end
#
# kvs.client do |c|
# c[:point] = Point.new(1, 2)
# pp c[:point]
# # => #<Point:0x00007fc795ec2930 @x=1, @y=2>
# end
#
# Example: Even separate machines can communicate via TCP
#
# # server.rb
# kvs = KeyValueStore.new("example.com", 8769)
# thread = kvs.server
# thread.join
#
# # client.rb
# kvs = KeyValueStore.new("example.com", 8769)
# kvs.client do |c|
# pp c[:foo]
# c[:foo] = :bar
# pp c[:foo]
# end
#
# # Running the client a second time while leaving the server running yields different results.
require "socket"
class KeyValueStore
attr_reader :thread
def initialize(host = "localhost", port = 8769)
@host = host
@port = port
end
def server
Server.new(@host, @port).start
end
def client
if block_given?
begin
client = Client.new(@host, @port)
yield client
ensure
client.close
end
else
Client.new(@host, @port)
end
end
RequestWrite = Struct.new(:key, :value)
ResponseWrite = Struct.new(:value)
RequestRead = Struct.new(:key)
ResponseRead = Struct.new(:value)
RequestClose = Struct.new(:reason)
ResponseClose = Struct.new(:reason)
RequestMutex = Struct.new(:key)
ResponseMutexResume = Struct.new(:old_value)
RequestMutexResume = Struct.new(:new_value)
ResponseMutex = Struct.new(:new_value)
class Client
def initialize(host, port)
@socket = TCPSocket.open(host, port)
rescue Errno::EADDRNOTAVAIL
# Even if the client is created after creating the server in the code, the client may connect before the server, so wait 0.1 second.
sleep 0.1
@socket = TCPSocket.open(host, port)
end
def set(key, value)
response = request(RequestWrite, key, value)
raise KeyValueStore::DevelopError, "This is an unexpected response. response: #{response}" unless response.is_a?(ResponseWrite)
response.value
end
alias []= set
def get(key)
response = request(RequestRead, key)
raise KeyValueStore::DevelopError, "This is an unexpected response. response: #{response}" unless response.is_a?(ResponseRead)
response.value
end
alias [] get
def mutex(key)
response = request(RequestMutex, key)
raise KeyValueStore::DevelopError, "This is an unexpected response. response: #{response}" unless response.is_a?(ResponseMutexResume)
new_value = yield response.old_value
response = request(RequestMutexResume, new_value)
raise KeyValueStore::DevelopError, "This is an unexpected response. response: #{response}" unless response.is_a?(ResponseMutex)
response.new_value
end
def close(reason = nil)
response = request(RequestClose, reason)
raise KeyValueStore::DevelopError, "This is an unexpected response. response: #{response}" unless response.is_a?(ResponseClose)
response.reason
ensure
@socket&.close
end
private
def request(...)
write(...)
read
end
def write(request, *args)
raise KeyValueStore::ClosedError, "Connection is closed." if @socket.closed?
Marshal.dump(request.new(*args), @socket)
end
def read
raise KeyValueStore::ClosedError, "Connection is closed." if @socket.eof?
result = Marshal.load(@socket)
result
end
end
class Server
def initialize(host, port)
@host = host
@port = port
@store = {}
end
def start
th = Thread.new do
server = TCPServer.open(@host, @port)
sockets = [server]
loop do
readable_sockets, _ = IO.select(sockets)
readable_sockets.each do |socket|
case socket
when TCPServer
sockets << socket.accept
when TCPSocket
if socket.eof?
sockets.delete(socket)
next
end
is_continue = dispatcher(socket)
sockets.delete(socket) unless is_continue
else
raise KeyValueStore::DevelopError, "This is an unexpected socket. socket: #{socket}"
end
end
end
ensure
sockets.each(&:close)
end
th.abort_on_exception = true
th
end
private
def dispatcher(socket)
request = read(socket)
method_name = convert_method_name(request)
raise KeyValueStore::DevelopError, "This is an unexpected request. request: #{request}" unless respond_to?(method_name, true)
__send__(method_name, request:, socket:)
end
def request_read(request:, socket:)
request => { key: }
value = @store[key]
write(socket, ResponseRead.new(value))
true
end
def request_write(request:, socket:)
request => { key:, value: }
old_value = @store[key]
@store[key] = value
write(socket, ResponseWrite.new(old_value))
true
end
def request_mutex(request:, socket:)
request => { key: }
write(socket, ResponseMutexResume.new(@store[key]))
return false if socket.eof?
request = read(socket)
raise KeyValueStore::DevelopError, "This is an unexpected request. request: #{request}" unless request.is_a?(RequestMutexResume)
@store[key] = request.new_value
write(socket, ResponseMutex.new(request.new_value))
true
end
def request_close(request:, socket:)
request => reason
write(socket, ResponseClose.new(nil))
false
end
def read(socket)
Marshal.load(socket)
end
def write(socket, value)
Marshal.dump(value, socket)
end
def convert_method_name(request)
underscore request.class.name.split("::").last
end
def underscore(string)
result = string[0].downcase
string[1..].each_char do |char|
if char.match?(/[A-Z]/)
result += "_" + char.downcase
else
result += char.downcase
end
end
result
end
end
class Error < StandardError; end
class ClosedError < Error; end
class ClientClosed < Error; end
class DevelopError < Error; end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment