Skip to content

Instantly share code, notes, and snippets.

@batiati
Created December 23, 2022 20:24
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save batiati/eeb0639b1f2e2a567dc4c21d64eedccd to your computer and use it in GitHub Desktop.
Save batiati/eeb0639b1f2e2a567dc4c21d64eedccd to your computer and use it in GitHub Desktop.
TigerBeetle C sample
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <pthread.h>
#include <sys/time.h>
#include "tb_client.h"
/////////////////////////////////////////////////////////////////////////////////////
// Place this file at the same folder tb_client.h //
// Shows the basic usage of the TigerBeetle C client. //
// This example is implemented as a single-threaded program. //
// It isn't optimized for the real use case scenario, where a single client //
// instance must handle multiple concurrent requests. //
/////////////////////////////////////////////////////////////////////////////////////
// config.message_size_max - @sizeOf(vsr.Header):
const int MAX_MESSAGE_SIZE = (1024 * 1024) - 128;
// Synchronization context between the callback and the main thread.
typedef struct completion_context {
// Pointer to the reply's body.
// In real usage, this memory should be copied to application's ownership before
// the packet is returned to be reused in a next request.
void* reply;
int size;
// In this example we synchronize using a mutex + condvar.
pthread_mutex_t lock;
pthread_cond_t cv;
} completion_context_t;
// Completion function, called by tb_client no notify that a request as completed.
void on_completion(uintptr_t context, tb_client_t client, tb_packet_t* packet, const uint8_t* data, uint32_t size) {
// The user_data gives context to a request:
completion_context_t* ctx = (completion_context_t*)packet->user_data;
// Signaling the main thread we received the reply:
pthread_mutex_lock(&ctx->lock);
ctx->size = size;
ctx->reply = (void*)data;
pthread_cond_signal(&ctx->cv);
pthread_mutex_unlock(&ctx->lock);
}
// For benchmarking purposes.
long long get_time_ms(void) {
struct timeval tv;
gettimeofday(&tv,NULL);
return (((long long)tv.tv_sec)*1000)+(tv.tv_usec/1000);
}
int main(int argc, char **argv) {
printf("TigerBeetle C Sample\n");
printf("Connecting...\n");
tb_client_t client;
tb_packet_list_t packets;
const char* address = "127.0.0.1:3000";
TB_STATUS status = tb_client_init(
&client, // Output client.
&packets, // Output packet list.
0, // Cluster ID.
address, // Cluster addresses.
strlen(address), //
1, // MaxConcurrency == 1, this is a single-threaded example.
NULL, // No need for a global context.
&on_completion // Completion callback.
);
if (status != TB_STATUS_SUCCESS) {
printf("Failed to initialize tb_client\n");
exit(-1);
}
// Initializing the mutex and condvar
completion_context_t ctx;
if (pthread_mutex_init(&ctx.lock, NULL) != 0) {
printf("Failed to initialize mutex\n");
exit(-1);
}
if (pthread_cond_init(&ctx.cv, NULL)) {
printf("Failed to initialize condition\n");
exit(-1);
}
////////////////////////////////////////////////////////////
// Submitting a batch of accounts: //
////////////////////////////////////////////////////////////
const int accounts_len = 2;
const int accounts_size = sizeof(tb_account_t) * accounts_len;
tb_account_t accounts[accounts_len];
// Zeroing the memory, so we don't have to initialize every field.
memset(&accounts, 0, accounts_size);
accounts[0].id = 1;
accounts[0].code = 2;
accounts[0].ledger = 777;
accounts[1].id = 2;
accounts[1].code = 2;
accounts[1].ledger = 777;
// This sample is single-threaded,
// In real use, this linked list will contain
// multiple packets, for handling multiple concurrent requests:
packets.head->operation = TB_OPERATION_CREATE_ACCOUNTS; // The operation to be performed.
packets.head->data = &accounts; // The data to be sent.
packets.head->data_size = accounts_size; //
packets.head->user_data = &ctx; // User-defined context.
packets.head->status = TB_PACKET_OK; // Will be set when the reply arrives.
// Locks the mutex:
pthread_mutex_lock(&ctx.lock);
// Submits the request asynchronously:
printf("Creating accounts...\n");
tb_client_submit(client, &packets);
// Uses a condvar to sync this thread with the callback:
pthread_cond_wait(&ctx.cv, &ctx.lock);
pthread_mutex_unlock(&ctx.lock);
if (packets.head->status != TB_PACKET_OK) {
// Checking if the request failed:
sprintf("Error calling create_accounts (ret=%d)\n", packets.head->status);
exit(-1);
}
if (ctx.size != 0) {
// Checking for errors creating the accounts:
tb_create_accounts_result_t* results = (tb_create_accounts_result_t*) ctx.reply;
int results_len = ctx.size / sizeof(tb_create_accounts_result_t);
printf("create_account results:\n");
for(int i=0;i<results_len;i++) {
printf("index=%d, ret=%d\n", results[i].index, results[i].result);
}
exit(-1);
}
printf("Accounts created successfully\n");
////////////////////////////////////////////////////////////
// Submitting multiple batches of transfers: //
////////////////////////////////////////////////////////////
printf("Creating transfers...\n");
const int max_batches = 100;
const int transfers_per_batch = MAX_MESSAGE_SIZE / sizeof(tb_transfer_t);
long max_latency_ms = 0;
long total_time_ms = 0;
for (int i=0; i< max_batches;i++) {
tb_transfer_t transfers[transfers_per_batch];
// Zeroing the memory, so we don't have to initialize every field.
memset(&transfers, 0, MAX_MESSAGE_SIZE);
for (int j=0; j< transfers_per_batch; j++) {
transfers[j].id = j + 1 + (i * transfers_per_batch);
transfers[j].debit_account_id = accounts[0].id;
transfers[j].credit_account_id = accounts[1].id;
transfers[j].code = 2;
transfers[j].ledger = 777;
transfers[j].amount = 1;
}
// This sample is single-threaded,
// In real use, this linked list will contain
// multiple packets, for handling multiple concurrent requests:
packets.head->operation = TB_OPERATION_CREATE_TRANSFERS; // The operation to be performed.
packets.head->data = &transfers; // The data to be sent.
packets.head->data_size = MAX_MESSAGE_SIZE; //
packets.head->user_data = &ctx; // User-defined context.
packets.head->status = TB_PACKET_OK; // Will be set when the reply arrives.
long long now = get_time_ms();
// Locks the mutex:
pthread_mutex_lock(&ctx.lock);
// Submits the request asynchronously:
tb_client_submit(client, &packets);
// Uses a condvar to sync this thread with the callback:
pthread_cond_wait(&ctx.cv, &ctx.lock);
pthread_mutex_unlock(&ctx.lock);
long elapsed_ms = get_time_ms() - now;
if (elapsed_ms > max_latency_ms) max_latency_ms = elapsed_ms;
total_time_ms += elapsed_ms;
if (packets.head->status != TB_PACKET_OK) {
// Checking if the request failed:
printf("Error calling create_transfers (ret=%d)\n", packets.head->status);
exit(-1);
}
if (ctx.size != 0) {
// Checking for errors creating the accounts:
tb_create_transfers_result_t* results = (tb_create_transfers_result_t*)ctx.reply;
int results_len = ctx.size / sizeof(tb_create_transfers_result_t);
printf("create_transfers results:\n");
for(int i=0;i<results_len;i++) {
printf("index=%d, ret=%d\n", results[i].index, results[i].result);
}
exit(-1);
}
}
printf("Transfers created successfully\n");
printf("============================================\n");
printf("%d transfers per second\n", (max_batches * transfers_per_batch * 1000) / total_time_ms);
printf("create_transfers max p100 latency per %d transfers = %dms\n", transfers_per_batch, max_latency_ms);
printf("total %d transfers in %dms\n", max_batches * transfers_per_batch, total_time_ms);
printf("\n");
////////////////////////////////////////////////////////////
// Looking up accounts: //
////////////////////////////////////////////////////////////
tb_uint128_t ids[2] = { accounts[0].id, accounts[1].id };
packets.head->operation = TB_OPERATION_LOOKUP_ACCOUNTS;
packets.head->data = &ids;
packets.head->data_size = sizeof(tb_uint128_t) * 2;
packets.head->user_data = &ctx;
packets.head->status = TB_PACKET_OK;
pthread_mutex_lock(&ctx.lock);
tb_client_submit(client, &packets);
pthread_cond_wait(&ctx.cv, &ctx.lock);
pthread_mutex_unlock(&ctx.lock);
if (packets.head->status != TB_PACKET_OK) {
// Checking if the request failed:
sprintf("Error calling lookup_accounts (ret=%d)", packets.head->status);
exit(-1);
}
if (ctx.size == 0) {
printf("No accounts found");
exit(-1);
} else {
// Printing the account's balance:
tb_account_t* results = (tb_account_t*) ctx.reply;
int results_len = ctx.size / sizeof(tb_account_t);
printf("Accounts:\n");
printf("============================================\n");
for(int i=0;i<results_len;i++) {
printf("id=%d\n", results[i].id);
printf("debits_posted=%d\n", results[i].debits_posted);
printf("credits_posted=%d\n", results[i].credits_posted);
printf("\n");
}
}
// Cleanup
pthread_cond_destroy(&ctx.cv);
pthread_mutex_destroy(&ctx.lock);
tb_client_deinit(client);
}

How to Run

  1. Clone tigerbeetle repo;

  2. Copy sample.c to src/clients/c folder;

  3. Add the build steps to build.zig file:

    // Add this to build.zig:
    {
        const static_lib = b.addStaticLibrary("tb_client", "src/clients/c/tb_client.zig");
        static_lib.setMainPkgPath("src");    
        static_lib.linkage = .static;
        static_lib.linkLibC();
        static_lib.setBuildMode(mode);
        static_lib.setTarget(target);
        static_lib.pie = true;
        static_lib.bundle_compiler_rt = true;
        static_lib.addOptions("tigerbeetle_build_options", options);
        link_tracer_backend(static_lib, tracer_backend, target);

        const sample = b.addExecutable("c_sample", "src/clients/c/sample.c");
        sample.setBuildMode(mode);
        sample.linkLibrary(static_lib);
        sample.linkLibC();

        const run_cmd = sample.run();
        run_cmd.step.dependOn(b.getInstallStep());

        const c_sample_build = b.step("c_sample", "Run the C client sample");
        c_sample_build.dependOn(&run_cmd.step);
    }
  1. Start a TigerBeetle instance at 127.0.0.1:3000;

  2. Run the C sample program:

./zig/zig build c_client -Drelease-safe
  1. Output:
TigerBeetle C Sample
Connecting...
Creating accounts...
Accounts created successfully
Creating transfers...
Transfers created successfully
============================================
190488 transfers per second
create_transfers max p100 latency per 8191 transfers = 105ms
total 819100 transfers in 4300ms

Accounts:
============================================
id=1
debits_posted=819100
credits_posted=0

id=2
debits_posted=0
credits_posted=819100
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment