Created
January 6, 2018 21:51
-
-
Save kingsleyh/2cbca1f5ee6cfd678045f6e7343de046 to your computer and use it in GitHub Desktop.
Iterator issues
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
p my_cursor | |
#<RethinkDB::Cursor:0x10bead8a0 | |
@index=1, | |
@response= | |
#<RethinkDB::Connection::Response:0x10beb0a50 | |
@b=nil, | |
@e=nil, | |
@n=[], | |
@p=nil, | |
@r= | |
[{"groupId" => "3dc3cc0e-886e-4ec8-96f2-617fa84ab506", "id" => "4c951ac7-0f89-43a8-8298-749ac76d1a8f", "isLanding" => false, "name" => "channel1"}], | |
@t=SUCCESS_SEQUENCE>, | |
@stream= | |
#<RethinkDB::Connection::ResponseStream:0x10bead990 | |
@channel=#<Channel::Unbuffered(String):0x10bea7e40>, | |
@conn= | |
#<RethinkDB::Connection:0x109e04c00 | |
@channels={}, | |
@db="jamtalk", | |
@next_id=20_u64, | |
@next_query_id=1_u64, | |
@open=true, | |
@sock=#<TCPSocket:fd 9>>, | |
@id=0_u64, | |
@runopts={"db" => [14_i64, ["jamtalk"]]}>> | |
p my_cursor.to_a | |
[] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require "socket" | |
require "socket/tcp_socket" | |
require "json" | |
require "./serialization" | |
require "./constants" | |
module RethinkDB | |
class Connection | |
def initialize(options) | |
host = options[:host]? || "localhost" | |
port = options[:port]? || 28015 | |
@db = options[:db]? || "test" | |
auth_key = options[:auth_key]? || "" | |
timeout = options[:timeout]? || 20 | |
@next_id = 1u64 | |
@open = true | |
@sock = TCPSocket.new(host, port) | |
@sock.write_bytes(Version::V0_4.to_u32, IO::ByteFormat::LittleEndian) | |
@sock.write_bytes(auth_key.bytesize, IO::ByteFormat::LittleEndian) | |
@sock.write(auth_key.to_slice) | |
@sock.write_bytes(Protocol::JSON.to_u32, IO::ByteFormat::LittleEndian) | |
error = @sock.gets('\0') | |
unless error | |
raise ReqlDriverError.new | |
end | |
unless error.rchop == "SUCCESS" | |
raise ReqlDriverError.new(error.rchop) | |
end | |
@channels = {} of UInt64 => Channel::Unbuffered(String) | |
@next_query_id = 1_u64 | |
spawn do | |
while @open | |
id = @sock.read_bytes(UInt64, IO::ByteFormat::LittleEndian) | |
size = @sock.read_bytes(UInt32, IO::ByteFormat::LittleEndian) | |
slice = Slice(UInt8).new(size) | |
@sock.read(slice) | |
@channels[id]?.try &.send String.new(slice) | |
end | |
@sock.close | |
end | |
end | |
def close | |
@open = false | |
end | |
protected def next_id | |
id = @next_id | |
@next_id += 1 | |
id | |
end | |
class Response | |
JSON.mapping({ | |
t: ResponseType, | |
r: Array(QueryResult), | |
e: {type: ErrorType, nilable: true}, | |
b: {type: Array(JSON::Any), nilable: true}, | |
p: {type: JSON::Any, nilable: true}, | |
n: {type: Array(Int32), nilable: true} | |
}) | |
end | |
class ResponseStream | |
getter id : UInt64 | |
@channel : Channel::Unbuffered(String) | |
@runopts : Hash(String, JSON::Type) | |
def initialize(@conn : Connection, runopts) | |
@id = @conn.next_id | |
@channel = @conn.@channels[id] = Channel(String).new | |
@runopts = {} of String => JSON::Type | |
runopts.each do |key, val| | |
@runopts[key] = val | |
end | |
@runopts["db"] = RethinkDB.db(@conn.@db).to_reql | |
end | |
def query_term(term) | |
send_query [QueryType::START, term.to_reql, @runopts].to_json | |
read_response | |
end | |
def query_continue | |
send_query [QueryType::CONTINUE].to_json | |
read_response | |
end | |
private def send_query(query) | |
if @id == 0 | |
raise ReqlDriverError.new("Bug: Using already finished stream.") | |
end | |
@conn.@sock.write_bytes(@id, IO::ByteFormat::LittleEndian) | |
@conn.@sock.write_bytes(query.bytesize, IO::ByteFormat::LittleEndian) | |
@conn.@sock.write(query.to_slice) | |
end | |
private def read_response | |
response = Response.from_json(@channel.receive) | |
finish unless response.t == ResponseType::SUCCESS_PARTIAL | |
if response.t == ResponseType::CLIENT_ERROR | |
raise ReqlClientError.new(response.r[0].to_s) | |
elsif response.t == ResponseType::COMPILE_ERROR | |
raise ReqlCompileError.new(response.r[0].to_s) | |
elsif response.t == ResponseType::RUNTIME_ERROR | |
msg = response.r[0].to_s | |
case response.e | |
when ErrorType::QUERY_LOGIC | |
raise ReqlQueryLogicError.new(msg) | |
when ErrorType::USER | |
raise ReqlUserError.new(msg) | |
when ErrorType::NON_EXISTENCE | |
raise ReqlNonExistenceError.new(msg) | |
else | |
raise ReqlRunTimeError.new(response.e.to_s + ": " + msg) | |
end | |
end | |
response.r = response.r.map &.transformed( | |
time_format: @runopts["time_format"]? || "native", | |
group_format: @runopts["group_format"]? || "native", | |
binary_format: @runopts["binary_format"]? || "native" | |
) | |
response | |
end | |
private def finish | |
@conn.@channels.delete @id | |
@id = 0u64 | |
end | |
end | |
def query_error(term, runopts) | |
stream = ResponseStream.new(self, runopts) | |
response = stream.query_term(term) | |
raise ReqlDriverError.new("An r.error should never return successfully") | |
end | |
def query_datum(term, runopts) | |
stream = ResponseStream.new(self, runopts) | |
response = stream.query_term(term) | |
unless response.t == ResponseType::SUCCESS_ATOM | |
raise ReqlDriverError.new("Expected SUCCESS_ATOM but got #{response.t}") | |
end | |
response.r[0] | |
end | |
def query_cursor(term, runopts) | |
stream = ResponseStream.new(self, runopts) | |
response = stream.query_term(term) | |
unless response.t == ResponseType::SUCCESS_SEQUENCE || response.t == ResponseType::SUCCESS_PARTIAL | |
raise ReqlDriverError.new("Expected SUCCESS_SEQUENCE or SUCCESS_PARTIAL but got #{response.t}") | |
end | |
Cursor.new(stream, response) | |
end | |
end | |
class Cursor | |
include Iterator(QueryResult) | |
def initialize(@stream : Connection::ResponseStream, @response : Connection::Response) | |
@index = 0 | |
end | |
def fetch_next | |
@response = @stream.query_continue | |
@index = 0 | |
unless @response.t == ResponseType::SUCCESS_SEQUENCE || @response.t == ResponseType::SUCCESS_PARTIAL | |
raise ReqlDriverError.new("Expected SUCCESS_SEQUENCE or SUCCESS_PARTIAL but got #{@response.t}") | |
end | |
end | |
def next | |
while @index == @response.r.size | |
return stop if @response.t == ResponseType::SUCCESS_SEQUENCE | |
fetch_next | |
end | |
value = @response.r[@index] | |
@index += 1 | |
return value | |
end | |
end | |
end | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment