HStreamDB clients and HStreamDB servers communicate with each other using messages formatted with Protocol Buffers using gRPC.
The format of messages for sending requests and receiving responses is specified in the hstream.proto
file.
To implement the HStreamDB client library, developers should have basic knowledge of how to implement a gRPC client.
The gRPC documents have some sections for learning gRPC basics.
Client libraries should support URL schemes hstream://
and hstreams://
(HStream with SSL/TLS).
When using a URL without a port specified, it is expected to use the default HStream server port 6570
.
To send data to HStreamDB:
- client needs to be able to handle basic streams management jobs, as streams are the unit for sending data to HStreamDB
- client needs to establish producers, which are usually buffered for incoming records
The hstream.proto
file defines the basic operations for client libraries to adapt:
rpc CreateStream
, rpc DeleteStream
, and rpc ListStreams
.
The exposed API for the user should take three arguments: which stream to write to, the data record, and the partition key (which is optional).
A producer is an abstraction for appending to streams. The request to append to streams always makes many records in batch and sends out them at once. So an object (in programming languages) with a buffer for incoming records is used for this task.
It is recommended to implement the producer API that a producer is bound to only the given streams during its initialization, that is, a producer will always write to the same stream during its lifetime.
All data in streams are in the form of an HStream Record. There are two kinds of HStream Records:
- HRecord: one thinks of an HRecord as a piece of JSON data, just like the document in some NoSQL databases
- Raw Record: arbitrary binary data
In the Protocol Buffers, they got serialized as google.protobuf.Struct
and bytes
, respectively.
Partition keys are optional. If not given, the server will automatically assign a default key. Records with the same partition key should be guaranteed to be written orderly by buffered producers.
The buffer of a buffered producer is of the type map<partition_key, list<list<record>>>
.
That is a map of lists of lists of records, indexed by partition keys.
The action of sending records to the HStreamDB server by the rpc Append
and clearing the records in the buffer is called flush.
The inner list of records is the unit of sending rpc Append
as a batch. If the head buffer of the list is full, a producer will
push the latter record into the tail of the next buffer.
When the head element of the list of lists is full, the producer should flush the head buffer of the list asynchronously.
Buffers of different partition keys can get flushed concurrently, but for the same partition key, one should await for the former
request done and then send the next rpc Append
request. So that records with the same partition key are guaranteed to be written orderly by buffered producers.
The user faced records for writing to the producer and consist of a payload in the form of an HStream Record. But the record sent to the HStreamDB server will carry some metadata.
In the hstream.proto
file, a message HStreamRecord
is
consist of payload (of the type bytes
) and header (of the type HStreamRecordHeader
) for metadata.
The HRecord (serialized as google.protobuf.Struct
) and the Raw Record (serialized as bytes
) will get unified into the same
representation bytes
at this stage by the encoding algorithm provide by Protocol Buffers for various languages.
In the metadata HStreamRecordHeader
, a field flag
is used to restore the right representation. The field key
is for storing
the partition key of the HStreamRecord.
The next step is to encode the HStreamRecords in batch. Here is an auxiliary type for the serialization of the HStreamRecords.
A list of HStreamRecords (the unit of flush) will be first transformed into BatchHStreamRecords, and then use the algorithm
provide by Protocol Buffers to serialize them into bytes
. If the CompressionType
of the producer is set to non-CompressionType::None
type, the producer should compress the serialized bytes
concerning the compression type. The processed payload will be set to
the payload
field of the message BatchedRecord
. The message BatchedRecord
also contains a field named publishTime
which is
NOT set by the client side. The time at which the message was published will get populated by the server.
Apart from the stream name and BatchedRecord, here is one more field in the message AppendRequest
, the shardId
.
An algorithm for calculating the shard id can be described as follows:
- call the
rpc ListShards
to get all available shards - apply the MD5 algorithm to the value of the partition key
- turn the result into a value of
BigInt
by parsing the dec string - for each shard in the result of
rpc ListShards
, get theBigInt
value by parsing the dec stringstartHashRangeKey
andendHashRangeKey
, if the predicatestart <= x && x <= end
is satisfied, return the shard id. (See https://github.com/hstreamdb/hstreamdb-rust/commit/610e060557a715e16e790adaac19adecc63f1f78) - if the predicate is not satisfied after the traverse, return an error
Shard lookup needs to be performed each time a client needs to append to a shard. Those requests should be sent to the right server URLs, which is the lookup result.
Lookup can be done with an RPC call rpc LookupShard
as described in the hstream.proto
file.
Therefore, one can send an rpc Append
request.
The hstream.proto
file defines the basic operations for client libraries to adapt:
rpc CreateSubscription
, rpc DeleteSubscription
, and rpc ListSubscriptions
.
The rpc StreamingFetch
is a bidirectional streaming RPC, which is used for fetching records from subscriptions.
Subscription lookup needs to be performed each time a client needs to streaming fetch or from a subscription or delete a subscription. Those requests should be sent to the right server URLs, which is the lookup result.
Lookup can be done with an RPC call rpc LookupSubscription
as described in the hstream.proto
file.
To establish a fetching stream, one should send an initialization request to the HStreamDB server. The initialization request is consist of the subscription id, the new consumer name (the identifier of a fetching stream), and an empty list of ack ids. The response stream will be used in the incoming streaming fetching and Acks.
The client library should provide an ack()
method for each received record to acknowledge that the message has been successfully processed. The method is implemented as sending a request with the same subscription id and consumer name of the initialization fetching stream
request to the request stream, with the record IDs having been successfully processed in the field ackIds
.
The response of streaming fetch is a list of record ids and a batch of received records. To decode the received records, the method is similar to how one encodes them which can be referred to from the above sections.
- match the compression type, choose the corresponding decompression algorithm, decompress the payload
- use the method provided by Protocol Buffers to decode the
bytes
intoBatchHStreamRecords
- zip the received record ids for the decoded records, and provide the
ack()
method for each record to the user