Last active
February 22, 2023 21:13
-
-
Save kekemoto/8c56a5245e9a09a2ae425f4649d8d5ec to your computer and use it in GitHub Desktop.
A simple key-value store in Ruby.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# This is a simple KVS. | |
# | |
# Feature | |
# | |
# - Easy to use. | |
# - You can write and read Ruby data as it is. (Marshalable only) | |
# - It is thread-safe using an event loop. | |
# - Even separate machines can communicate via TCP. | |
# | |
# Example | |
# | |
# kvs = KeyValueStore.new | |
# kvs.sever | |
# | |
# kvs.client do |client| | |
# client[:foo] = :bar | |
# puts client[:foo] | |
# # => bar | |
# end | |
# | |
# Example: Interprocess communication | |
# | |
# kvs = KeyValueStore.new | |
# kvs.server | |
# | |
# kvs.client do |c| | |
# c[:input] = :input | |
# end | |
# | |
# Process.fork do | |
# kvs.client do |c| | |
# puts c[:input] | |
# # => input | |
# | |
# c[:output] = :output | |
# end | |
# end | |
# | |
# Process.wait | |
# | |
# puts kvs.client[:output] | |
# # => output | |
# | |
# Example: Mutex | |
# | |
# kvs = KeyValueStore.new | |
# kvs.server | |
# | |
# kvs.client { _1[:count] = 0 } | |
# | |
# th1 = Thread.new do | |
# kvs.client do |c| | |
# 50.times do | |
# c.mutex(:count) { |old| old + 1 } | |
# end | |
# end | |
# end | |
# | |
# th2 = Thread.new do | |
# kvs.client do |c| | |
# 50.times do | |
# c.mutex(:count) { |old| old + 1 } | |
# end | |
# end | |
# end | |
# | |
# th1.join | |
# th2.join | |
# | |
# puts kvs.client[:count] | |
# # => 100 | |
# | |
# Example: Advanced use of Mutex. | |
# | |
# kvs = KeyValueStore.new | |
# kvs.server | |
# | |
# kvs.client do |c| | |
# c[:list] = [] | |
# pp c.mutex(:list){ _1 << 1 } | |
# # => [1] | |
# pp c.mutex(:list){ _1 << 2 } | |
# # => [1, 2] | |
# pp c.mutex(:list){ _1 << 3 } | |
# # => [1, 2, 3] | |
# end | |
# | |
# Example: Ruby as it is! | |
# | |
# kvs = KeyValueStore.new | |
# kvs.server | |
# | |
# class Point | |
# def initialize(x, y) | |
# @x = x | |
# @y = y | |
# end | |
# end | |
# | |
# kvs.client do |c| | |
# c[:point] = Point.new(1, 2) | |
# pp c[:point] | |
# # => #<Point:0x00007fc795ec2930 @x=1, @y=2> | |
# end | |
# | |
# Example: Even separate machines can communicate via TCP | |
# | |
# # server.rb | |
# kvs = KeyValueStore.new("example.com", 8769) | |
# thread = kvs.server | |
# thread.join | |
# | |
# # client.rb | |
# kvs = KeyValueStore.new("example.com", 8769) | |
# kvs.client do |c| | |
# pp c[:foo] | |
# c[:foo] = :bar | |
# pp c[:foo] | |
# end | |
# | |
# # Running the client a second time while leaving the server running yields different results. | |
require "socket" | |
class KeyValueStore | |
attr_reader :thread | |
def initialize(host = "localhost", port = 8769) | |
@host = host | |
@port = port | |
end | |
def server | |
Server.new(@host, @port).start | |
end | |
def client | |
if block_given? | |
begin | |
client = Client.new(@host, @port) | |
yield client | |
ensure | |
client.close | |
end | |
else | |
Client.new(@host, @port) | |
end | |
end | |
RequestWrite = Struct.new(:key, :value) | |
ResponseWrite = Struct.new(:value) | |
RequestRead = Struct.new(:key) | |
ResponseRead = Struct.new(:value) | |
RequestClose = Struct.new(:reason) | |
ResponseClose = Struct.new(:reason) | |
RequestMutex = Struct.new(:key) | |
ResponseMutexResume = Struct.new(:old_value) | |
RequestMutexResume = Struct.new(:new_value) | |
ResponseMutex = Struct.new(:new_value) | |
class Client | |
def initialize(host, port) | |
@socket = TCPSocket.open(host, port) | |
rescue Errno::EADDRNOTAVAIL | |
# Even if the client is created after creating the server in the code, the client may connect before the server, so wait 0.1 second. | |
sleep 0.1 | |
@socket = TCPSocket.open(host, port) | |
end | |
def set(key, value) | |
response = request(RequestWrite, key, value) | |
raise KeyValueStore::DevelopError, "This is an unexpected response. response: #{response}" unless response.is_a?(ResponseWrite) | |
response.value | |
end | |
alias []= set | |
def get(key) | |
response = request(RequestRead, key) | |
raise KeyValueStore::DevelopError, "This is an unexpected response. response: #{response}" unless response.is_a?(ResponseRead) | |
response.value | |
end | |
alias [] get | |
def mutex(key) | |
response = request(RequestMutex, key) | |
raise KeyValueStore::DevelopError, "This is an unexpected response. response: #{response}" unless response.is_a?(ResponseMutexResume) | |
new_value = yield response.old_value | |
response = request(RequestMutexResume, new_value) | |
raise KeyValueStore::DevelopError, "This is an unexpected response. response: #{response}" unless response.is_a?(ResponseMutex) | |
response.new_value | |
end | |
def close(reason = nil) | |
response = request(RequestClose, reason) | |
raise KeyValueStore::DevelopError, "This is an unexpected response. response: #{response}" unless response.is_a?(ResponseClose) | |
response.reason | |
ensure | |
@socket&.close | |
end | |
private | |
def request(...) | |
write(...) | |
read | |
end | |
def write(request, *args) | |
raise KeyValueStore::ClosedError, "Connection is closed." if @socket.closed? | |
Marshal.dump(request.new(*args), @socket) | |
end | |
def read | |
raise KeyValueStore::ClosedError, "Connection is closed." if @socket.eof? | |
result = Marshal.load(@socket) | |
result | |
end | |
end | |
class Server | |
def initialize(host, port) | |
@host = host | |
@port = port | |
@store = {} | |
end | |
def start | |
th = Thread.new do | |
server = TCPServer.open(@host, @port) | |
sockets = [server] | |
loop do | |
readable_sockets, _ = IO.select(sockets) | |
readable_sockets.each do |socket| | |
case socket | |
when TCPServer | |
sockets << socket.accept | |
when TCPSocket | |
if socket.eof? | |
sockets.delete(socket) | |
next | |
end | |
is_continue = dispatcher(socket) | |
sockets.delete(socket) unless is_continue | |
else | |
raise KeyValueStore::DevelopError, "This is an unexpected socket. socket: #{socket}" | |
end | |
end | |
end | |
ensure | |
sockets.each(&:close) | |
end | |
th.abort_on_exception = true | |
th | |
end | |
private | |
def dispatcher(socket) | |
request = read(socket) | |
method_name = convert_method_name(request) | |
raise KeyValueStore::DevelopError, "This is an unexpected request. request: #{request}" unless respond_to?(method_name, true) | |
__send__(method_name, request:, socket:) | |
end | |
def request_read(request:, socket:) | |
request => { key: } | |
value = @store[key] | |
write(socket, ResponseRead.new(value)) | |
true | |
end | |
def request_write(request:, socket:) | |
request => { key:, value: } | |
old_value = @store[key] | |
@store[key] = value | |
write(socket, ResponseWrite.new(old_value)) | |
true | |
end | |
def request_mutex(request:, socket:) | |
request => { key: } | |
write(socket, ResponseMutexResume.new(@store[key])) | |
return false if socket.eof? | |
request = read(socket) | |
raise KeyValueStore::DevelopError, "This is an unexpected request. request: #{request}" unless request.is_a?(RequestMutexResume) | |
@store[key] = request.new_value | |
write(socket, ResponseMutex.new(request.new_value)) | |
true | |
end | |
def request_close(request:, socket:) | |
request => reason | |
write(socket, ResponseClose.new(nil)) | |
false | |
end | |
def read(socket) | |
Marshal.load(socket) | |
end | |
def write(socket, value) | |
Marshal.dump(value, socket) | |
end | |
def convert_method_name(request) | |
underscore request.class.name.split("::").last | |
end | |
def underscore(string) | |
result = string[0].downcase | |
string[1..].each_char do |char| | |
if char.match?(/[A-Z]/) | |
result += "_" + char.downcase | |
else | |
result += char.downcase | |
end | |
end | |
result | |
end | |
end | |
class Error < StandardError; end | |
class ClosedError < Error; end | |
class ClientClosed < Error; end | |
class DevelopError < Error; end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment