Skip to content

Instantly share code, notes, and snippets.

@jonatas
Last active Aug 31, 2021
Embed
What would you like to do?
TimescaleDB ActiveRecord Helpers
require 'bundler/setup'
require 'active_record'
require 'pp'
require 'pry'
require 'ostruct'
# set PG_URI=postgres://user:pass@host:port/db_name
ActiveRecord::Base.establish_connection(ENV['PG_URI'])
class Chunk < ActiveRecord::Base
self.table_name = "timescaledb_information.chunks"
def compress!
execute("SELECT compress_chunk(#{chunk_relation})")
end
def decompress!
execute("SELECT decompress_chunk(#{chunk_relation})")
end
def chunk_relation
"('#{chunk_schema}.#{chunk_name}')::regclass"
end
def execute(sql)
self.class.connection.execute(sql)
end
end
# Simple example
class Event < ActiveRecord::Base
self.primary_key = "identifier"
scope :last_month, -> { where('created_at > ?', 1.month.ago) }
scope :last_week, -> { where('created_at > ?', 1.week.ago) }
scope :last_hour, -> { where('created_at > ?', 1.hour.ago) }
scope :yesterday, -> { where('DATE(created_at) = ?', 1.day.ago.to_date) }
scope :today, -> { where('DATE(created_at) = ?', Date.today) }
scope :counts_per, -> (time_dimension) {
select("time_bucket('#{time_dimension}', created_at) as time, identifier, count(1) as total")
.group(:time, :identifier).order(:time)
.map {|result| [result.time, result.identifier, result.total]}
}
scope :chunks, -> () do
Chunk.where(hypertable_name: self.table_name)
end
scope :detailed_size, -> do
self.connection.execute("SELECT * from chunks_detailed_size('#{self.table_name}')")
.map(&OpenStruct.method(:new))
end
scope :compression_stats, -> do
self.connection.execute("SELECT * from hypertable_compression_stats('#{self.table_name}')")
.map(&OpenStruct.method(:new))
end
end
ActiveRecord::Base.connection.instance_exec do
ActiveRecord::Base.logger = Logger.new(STDOUT)
drop_table(:events) if Event.table_exists?
def digest_hypertable_options(table_name, hypertable_options)
execute "SELECT create_hypertable('#{table_name}', '#{hypertable_options[:time_column] || 'created_at'}')"
if segment_name = hypertable_options[:compress_segmentby]
execute <<~SQL
ALTER TABLE events SET (
timescaledb.compress,
timescaledb.compress_segmentby = '#{segment_name}'
)
SQL
end
if interval = hypertable_options[:compression_interval]
execute "SELECT add_compression_policy('#{table_name}', INTERVAL '#{interval}')"
end
end
def create_table(table_name, id: :primary_key, primary_key: nil, force: nil, **options)
super
digest_hypertable_options(table_name, options[:hypertable]) if options.key?(:hypertable)
end
hypertable_options = {
time_column: 'created_at',
time_bucket_interval: '1 min',
compress_segmentby: 'identifier',
compression_interval: '7 days'
}
create_table(:events, id: false, hypertable: hypertable_options) do |t|
t.string :identifier, null: false
t.jsonb :payload
t.timestamps
end
end
1.times do
Event.transaction do
Event.create identifier: "sign_up", payload: {"name" => "Eon"}
Event.create identifier: "login", payload: {"email" => "eon@timescale.com"}
Event.create identifier: "click", payload: {"user" => "eon", "path" => "/install/timescaledb"}
Event.create identifier: "scroll", payload: {"user" => "eon", "path" => "/install/timescaledb"}
Event.create identifier: "logout", payload: {"email" => "eon@timescale.com"}
end
end
puts Event.last_hour.group(:identifier).count # {"login"=>2, "click"=>1, "logout"=>1, "sign_up"=>1, "scroll"=>1}
pp Event.last_week.counts_per('1 min')
puts "compressing #{ Event.chunks.count }"
Event.chunks.first.compress!
pp Event.detailed_size
pp Event.compression_stats
puts "decompressing"
Event.chunks.first.decompress!
# [[2021-08-30 20:03:00 UTC, "logout", 1],
# [2021-08-30 20:03:00 UTC, "login", 2],
# [2021-08-30 20:03:00 UTC, "sign_up", 1],
# [2021-08-30 20:03:00 UTC, "click", 1],
# [2021-08-30 20:03:00 UTC, "scroll", 1]]
Pry.start
source 'https://rubygems.org'
gem "activerecord", "~> 6.1"
gem "composite_primary_keys", "~> 6.0"
gem "pg", "~> 1.2"
gem 'pry'
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment