Skip to content

Instantly share code, notes, and snippets.

@jonatas
Last active May 15, 2024 18:29
Show Gist options
  • Save jonatas/418f360d45c890e1d86c30547a0cf6a4 to your computer and use it in GitHub Desktop.
Save jonatas/418f360d45c890e1d86c30547a0cf6a4 to your computer and use it in GitHub Desktop.
POC Make RubyGems track downloads with TimescaleDB
require 'bundler/inline'
gemfile do
source 'https://rubygems.org'
gem 'timescaledb'
gem 'bulk_insert'
gem 'pry'
end
require 'timescaledb'
class Download < ActiveRecord::Base
acts_as_hypertable time_column: 'ts'
scope :time_bucket, -> (range='1m', query="count(*)") do
select("time_bucket('#{range}', #{time_column}) as #{time_column}, #{query}")
end
scope :per_minute, -> (query="count(*) as downloads") do
time_bucket('1m', query).group(1)
end
scope :stats_agg_per_minute, -> do
per_minute("gem_name, gem_version, stats_agg(cast(1.0 as double precision)) as stats_agg")
.group(1,2,3)
end
scope :gems_per_minute, -> do
per_minute("gem_name, count(*) as downloads").group(1,2)
end
scope :versions_per_minute, -> do
per_minute("gem_name, gem_version, count(*) as downloads").group(1,2,3)
end
cagg = -> (view_name) do
Class.new(ActiveRecord::Base) do
self.table_name = "downloads_#{view_name}"
QUERIES = {
sum_downloads: "sum(downloads)::bigint as downloads",
avg_downloads: "avg(downloads)::bigint as avg_downloads",
rollup: "rollup(stats_agg) as stats_agg",
rolling_downloads: "rolling(stats_agg)->sum() as downloads",
rollup_downloads: "rollup(stats_agg)->sum() as downloads",
rollup_downloads_per_gem: "gem_name,rollup(stats_agg)->sum() as downloads",
rollup_downloads_per_version: "gem_name, gem_version, rollup(stats_agg)->sum() as downloads"
}
scope :rollup, -> (range='1d', query=:sum_downloads) do
select("time_bucket('#{range}', ts) as ts, #{QUERIES[query] || query}")
.group(1)
end
scope :per_hour, -> (query=:sum_downloads) do
rollup('1h', query)
end
scope :per_day, -> (query=:sum_downloads) do
rollup('1d', query)
end
scope :per_week, -> (query=:sum_downloads) do
rollup('1w', query)
end
scope :per_month, -> (query=:sum_downloads) do
rollup('1mon', query)
end
scope :per_year, -> (query=:sum_downloads) do
rollup('1y', query)
end
def readonly?
true
end
def self.refresh!
ActiveRecord::Base.connection.execute <<-SQL
CALL refresh_continuous_aggregate('#{table_name}', null, null);
SQL
end
end
end
PerMinute = cagg['per_minute']
PerHour= cagg['per_hour']
PerDay = cagg['per_day']
PerMonth = cagg['per_month']
GemsPerMinute = cagg['gems_per_minute']
GemsPerHour= cagg['gems_per_hour']
GemsPerDay = cagg['gems_per_day']
GemsPerMonth= cagg['gems_per_month']
VersionsPerMinute= cagg['versions_per_minute']
VersionsPerHour = cagg['versions_per_hour']
VersionsPerDay = cagg['versions_per_day']
VersionsPerMonth = cagg['versions_per_month']
StatsAggPerMinute = cagg['stats_agg_per_minute']
StatsAggPerHour = cagg['stats_agg_per_hour']
StatsAggPerDay = cagg['stats_agg_per_day']
StatsAggPerMonth = cagg['stats_agg_per_month']
end
# Connect to the database
ActiveRecord::Base.establish_connection(ENV['DATABASE_URL'])
ActiveRecord::Base.connection.instance_exec do
ActiveRecord::Base.logger = Logger.new(STDOUT)
%w[day hour minute].each do |frame|
["downloads_per_#{frame}",
"downloads_gems_per_#{frame}",
"downloads_versions_per_#{frame}",
"downloads_stats_agg_per_#{frame}"
].each do |view|
execute("DROP MATERIALIZED VIEW IF EXISTS #{view} cascade")
end
end
drop_table(:downloads, force: :cascade) if Download.table_exists?
hypertable_options = {
time_column: 'ts',
chunk_time_interval: '1 day',
compress_segmentby: 'gem_name, gem_version',
compress_orderby: 'ts DESC',
compression_interval: '7 days'
}
create_table(:downloads, id: false, hypertable: hypertable_options) do |t|
t.timestamptz :ts, null: false
t.text :gem_name, :gem_version, null: false
t.jsonb :payload
end
{
per_minute: Download.per_minute,
per_hour: Download::PerMinute.per_hour(:sum_downloads).group(1),
per_day: Download::PerHour.per_day(:sum_downloads).group(1),
per_month: Download::PerDay.per_month(:sum_downloads).group(1),
gems_per_minute: Download.gems_per_minute,
gems_per_hour: Download::GemsPerMinute.per_hour("gem_name, count(*) as downloads").group(1,2),
gems_per_day: Download::GemsPerHour.per_day("gem_name, count(*) as downloads").group(1,2),
gems_per_month: Download::GemsPerDay.per_month("gem_name, count(*) as downloads").group(1,2),
versions_per_minute: Download.versions_per_minute,
versions_per_hour: Download::VersionsPerMinute.per_hour("gem_name, gem_version, count(*) as downloads").group(1,2,3),
versions_per_day: Download::VersionsPerHour.per_day("gem_name, gem_version, count(*) as downloads").group(1,2,3),
versions_per_month: Download::VersionsPerDay.per_month("gem_name, gem_version, count(*) as downloads").group(1,2,3),
stats_agg_per_minute: Download.stats_agg_per_minute,
stats_agg_per_hour: Download::StatsAggPerMinute.per_hour(:rollup).group(1),
stats_agg_per_day: Download::StatsAggPerHour.per_day(:rollup).group(1),
stats_agg_per_month: Download::StatsAggPerDay.per_month(:rollup).group(1)
}.each do |name, scope|
puts "Creating continuous aggregate #{name}", scope.to_sql
frame = name.to_s.split('per_').last
create_continuous_aggregate(
"downloads_#{name}",
scope.to_sql,
refresh_policies: {
schedule_interval: "INTERVAL '1 #{frame}'",
start_offset: "INTERVAL '3 #{frame}'",
end_offset: "INTERVAL '1 minute'"
})
end
end
ActiveRecord::Base.logger = nil
PATH_PATTERN = /\/gems\/(?<gem_name>.*)-(?<gem_version>\d+.*)\.gem/
def parse_file(file)
downloads = []
File.open(file).each_line do |log_line|
fragments = log_line.split
path, response_code = fragments[10, 2]
case response_code.to_i
# Only count successful downloads
# NB: we consider a 304 response a download attempt
when 200, 304
m = path.match(PATH_PATTERN)
gem_name = m[:gem_name] || path
gem_version = m[:gem_version]
ip = fragments[3]
ts = Time.parse fragments[4..9].join(' ')
env = parse_env fragments[12..-1]
payload = {ip:, env:}
downloads << {ts:, gem_name:, gem_version:, payload:}
if downloads.size == 5000
insert_downloads(downloads)
downloads.clear
end
end
end
if downloads.any?
insert_downloads(downloads)
end
end
# example env = "bundler/2.5.9 rubygems/3.3.25 ruby/3.1.0"
# output = {bundler: "2.5.9", rubygems: "3.3.25", ruby: "3.1.0"}
# case it says single word like jruby it appends true as the value
# example env = "jruby"
# output = {jruby: "true"}
def parse_env(output)
env = output.join(' ').gsub(/command.*|\(.*\)|Ruby, /,'').strip
env = nil if env == "(null)"
env = env.split(' ').map do |info|
pair = info.split(/\/|-/,2)
pair << "true" if pair.size == 1
pair
end.to_h
end
def insert_downloads(downloads)
Download.bulk_insert values: downloads
end
s3_files = [
"2024-04-26T00_15_00.000-szTpTn9sP1-116Ajwl4N.log"
]
Benchmark.bm do |x|
s3_files.each do |file|
x.report "parse and load #{file}" do
parse_file(file)
end
%w[
PerMinute GemsPerMinute VersionsPerMinute StatsAggPerMinute
PerHour GemsPerHour VersionsPerHour StatsAggPerHour
PerDay GemsPerDay VersionsPerDay StatsAggPerDay
PerMonth GemsPerMonth VersionsPerMonth StatsAggPerMonth
].each do |view|
x.report "Refresh #{view}" do
Download.const_get(view).refresh!
end
end
end
end
require "pry";binding.pry
=begin
Download::PerHour.first
Download::GemsPerHour.all
Download::VersionsPerHour.where(gem_name: "rails").pluck(:gem_version, :downloads) # => [["1.2.3.4", 6], ["6.1.7", 1], ["7.0.2", 1]]
=end
@jonatas
Copy link
Author

jonatas commented Apr 30, 2024

Cross linking the video walkthrough.

Head of the log output:

head 2024-04-26T00_15_00.000-szTpTn9sP1-116Ajwl4N.log
<134>2024-04-26T00:10:54Z cache-pao-kpao1770049 downloads[262515]: 1.2.3.4 Fri, 26 Apr 2024 00:10:54 GMT /gems/racc-1.7.3-java.gem 200 bundler/2.5.9 rubygems/3.3.25 ruby/3.1.0 (x86_64-Eclipse Adoptium-linux) command/install jruby/1.2.3.4 options/jobs,no_install,path ci/ci,github 47244d1623b8b050
<134>2024-04-26T00:10:54Z cache-iad-kiad7000157 downloads[262515]: 1.2.3.4 Fri, 26 Apr 2024 00:10:54 GMT /gems/aws-sdk-core-3.193.0.gem 200 Ruby, RubyGems/3.1.4 x86_64-linux Ruby/2.7.2 (2020-10-01 patchlevel 137)
<134>2024-04-26T00:10:54Z cache-pao-kpao1770049 downloads[262515]: 1.2.3.4 Fri, 26 Apr 2024 00:10:54 GMT /gems/regexp_parser-2.9.0.gem 200 bundler/2.5.9 rubygems/3.3.25 ruby/3.1.0 (x86_64-Eclipse Adoptium-linux) command/install jruby/1.2.3.4 options/jobs,no_install,path ci/ci,github 47244d1623b8b050
<134>2024-04-26T00:10:54Z cache-iad-kiad7000022 downloads[262515]: 1.2.3.4 Fri, 26 Apr 2024 00:10:54 GMT /gems/debase-ruby_core_source-3.3.1.gem 200 Ruby, RubyGems/3.4.1 x86_64-linux Ruby/2.6.10 (2022-04-12 patchlevel 210)
<134>2024-04-26T00:10:54Z cache-iad-kjyo7100141 downloads[262515]: 1.2.3.4 Fri, 26 Apr 2024 00:10:54 GMT /gems/apollo_upload_server-2.1.5.gem 200 Ruby, RubyGems/3.4.4 x86_64-linux Ruby/3.1.4 (2023-03-30 patchlevel 223)
<134>2024-04-26T00:10:54Z cache-chi-klot8100092 downloads[262515]: 1.2.3.4 Fri, 26 Apr 2024 00:10:54 GMT /gems/database_cleaner-core-2.0.1.gem 200 bundler/2.5.9 rubygems/3.4.4 ruby/3.0.6 (x86_64-pc-linux) command/install options/no_install,path 8d319b4f02ec22c1
<134>2024-04-26T00:10:54Z cache-iad-kiad7000088 downloads[262515]: 1.2.3.4 Fri, 26 Apr 2024 00:10:54 GMT /gems/unicode-display_width-2.5.0.gem 200 Ruby, RubyGems/3.1.6 x86_64-linux Ruby/2.7.8 (2023-03-30 patchlevel 225)
<134>2024-04-26T00:10:54Z 2024-04-26T00:10:54.586614839+00:00 downloads[262515]: 1.2.3.4 Fri, 26 Apr 2024 00:10:54 GMT /gems/tomlrb-2.0.3.gem 200 bundler/2.5.9 rubygems/3.5.3 ruby/3.3.0 (x86_64-pc-linux) command/install options/app_config,ignore_messages,no_install,rubygems.pkg.github.com,silence_root_warning 4d2c9a175ea7b626
<134>2024-04-26T00:10:54Z cache-iad-kcgs7200148 downloads[262515]: 1.2.3.4 Fri, 26 Apr 2024 00:10:54 GMT /gems/rspec-its-1.3.0.gem 200 Ruby, RubyGems/3.1.0 x86_64-linux Ruby/2.5.7 (2019-10-01 patchlevel 206)
<134>2024-04-26T00:10:54Z cache-iad-kcgs7200148 downloads[262515]: 1.2.3.4 Fri, 26 Apr 2024 00:10:54 GMT /gems/rspec-mocks-3.13.0.gem 200 Ruby, RubyGems/3.1.0 x86_64-linux Ruby/2.5.7 (2019-10-01 patchlevel 206)

@jonatas
Copy link
Author

jonatas commented May 15, 2024

Exploring stats_agg with grouping by gem name:

Download::StatsAggPerMinute
  .select("ts, gem_name, rollup(stats_agg)->sum() as downloads")
  .group(1,2).map(&:attributes)
# [{"ts"=>2024-04-26 00:11:00 UTC, "gem_name"=>"rack-session", "downloads"=>3.0},
# {"ts"=>2024-04-26 00:11:00 UTC, "gem_name"=>"aws-sdk-organizations", "downloads"=>1.0},
# {"ts"=>2024-04-26 00:11:00 UTC, "gem_name"=>"aws-sdk-comprehendmedical", "downloads"=>2.0},
# {"ts"=>2024-04-26 00:10:00 UTC, "gem_name"=>"net-pop", "downloads"=>1.0},
# {"ts"=>2024-04-26 00:10:00 UTC, "gem_name"=>"mustermann-grape", "downloads"=>1.0},
# {"ts"=>2024-04-26 00:11:00 UTC, "gem_name"=>"httpclient", "downloads"=>5.0}, ...]

By gem_name and version:

Download::StatsAggPerMinute
  .select("ts, rollup(stats_agg)->sum() as downloads")
  .group(1).map(&:attributes)
# => [{"ts"=>2024-04-26 00:12:00 UTC, "downloads"=>1461.0},
# {"ts"=>2024-04-26 00:14:00 UTC, "downloads"=>1127.0},
# {"ts"=>2024-04-26 00:13:00 UTC, "downloads"=>1150.0},
# {"ts"=>2024-04-26 00:15:00 UTC, "downloads"=>1005.0},
# {"ts"=>2024-04-26 00:11:00 UTC, "downloads"=>1322.0},
# {"ts"=>2024-04-26 00:10:00 UTC, "downloads"=>110.0}]

Unfolding one gem by version:

Download::StatsAggPerMinute
  .where(gem_name: "rails")
  .select("ts, gem_version, rollup(stats_agg)->sum() as downloads")
  .group(1,2).map(&:attributes)
# => [{"ts"=>2024-04-26 00:14:00 UTC, "gem_version"=>"7.0.2", "downloads"=>1.0},
# {"ts"=>2024-04-26 00:14:00 UTC, "gem_version"=>"6.1.7", "downloads"=>1.0},
# {"ts"=>2024-04-26 00:11:00 UTC, "gem_version"=>"1.2.3.4", "downloads"=>2.0},
# {"ts"=>2024-04-26 00:15:00 UTC, "gem_version"=>"1.2.3.4", "downloads"=>1.0},
# {"ts"=>2024-04-26 00:14:00 UTC, "gem_version"=>"1.2.3.4", "downloads"=>2.0},
# {"ts"=>2024-04-26 00:12:00 UTC, "gem_version"=>"1.2.3.4", "downloads"=>1.0}]

Rolling per_hour:

Download::StatsAggPerMinute
 .where(gem_name: "rails")
 .per_hour("gem_version, rollup(stats_agg)->sum() as downloads")
 .group(1,2).map(&:attributes)
# => [{"ts"=>2024-04-26 00:00:00 UTC, "gem_version"=>"1.2.3.4", "downloads"=>6.0},
# {"ts"=>2024-04-26 00:00:00 UTC, "gem_version"=>"6.1.7", "downloads"=>1.0},
# {"ts"=>2024-04-26 00:00:00 UTC, "gem_version"=>"7.0.2", "downloads"=>1.0}]

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment