Skip to content

Instantly share code, notes, and snippets.

View ismasan's full-sized avatar

Ismael Celis ismasan

View GitHub Profile
@ismasan
ismasan / params_validator_step.rb
Last active March 24, 2024 22:08
Input validator step using Parametric
# https://github.com/ismasan/parametric
require 'parametric'
class InputValidator
def initialize(&block)
@schema = Parametric::Schema.new(&block)
end
# @param result [Result]
@ismasan
ismasan / concurrent_processing.rb
Last active March 24, 2024 22:11
Practical Railway-oriented Pipeline for Ruby
# A Pipeline extension to process steps concurrently
# Example
# class ConcurrentPipeline < Pipeline
# include ConcurrentProcessing
# end
#
# MyPipeline = ConcurrentPipeline.new do |pl|
# pl.step ValidateInput
#
# # These steps run concurrently
@ismasan
ismasan / json_to_csv.rb
Created February 9, 2024 11:09
Script to convert JSON array to CSV rows
require 'json'
require 'csv'
# ruby json_to_csv.rb /path/to/data.json > /path/to/data.csv
#
MAP_VALUE = ->(value) {
case value
when Array
value.join('|')
when Hash
@ismasan
ismasan / hash64.rb
Last active October 16, 2023 20:20
hash64.rb
module Hash64
# Turn a string into a well distributed 64 bit integer
# This matches the hash_64() PGSQL function used to partition streams.
#
# @param value [String] the string to hash
# @return [Integer] a 64 bit integer
def self.call(value)
binary_string = ('0' + Digest::MD5.hexdigest(value)[0...16]).hex.to_s(2)
# Convert the binary string to a signed BigInt
bigint_value = binary_string.to_i(2)
@ismasan
ismasan / plpgsql_fn_rebalance_consumers.sql
Created September 15, 2023 10:22
A PG function to rebalance consumers in a group
CREATE OR REPLACE FUNCTION public.rebalance_consumers_all_groups()
RETURNS void
LANGUAGE plpgsql
AS $function$
DECLARE
group_ids uuid[];
min_last_seq bigint;
consumers_deleted bigint;
group_min_seq bigint;
p_group_id uuid;
require 'digest'
class ConsistentHash
Node = Data.define(:position, :updated_at)
def initialize(size = 1)
@hash_ring = {}
(0...size).each { |i| add(i) }
end
@ismasan
ismasan / worker_host.rb
Created August 7, 2023 14:15
Host and run concurrent workers as ruby Fibers, using Async library
require 'pg'
require 'connection_pool'
require 'async'
DB = ConnectionPool.new(size: 5, timeout: 5) do
PG.connect(dbname: 'test_db')
end
class DBClient
def initialize(db)
# Turn a line-based Enumerator into an Enumerator
# that parses each CSV line and yields them as Hash rows
# Usage
# csv = CSVStream.new(line_stream, headers: { product_name: 0, price: 2 })
#
# With a bit more work it could detect CSV headers automatically, from the first line.
require 'csv'
class CSVStream
include Enumerable
puts "START"
chan = Channel(Nil).new
spawn do
sleep 6
chan.send nil
end
select
when chan.receive
puts "CHAN"
# simple railway-style pipelines for Crystal
class Success(T)
def initialize(@value : T)
end
def then(callable : T -> Success(T) | Failure(T))
callable.call(@value)
end