Skip to content

Instantly share code, notes, and snippets.

@aquach
Last active March 26, 2020 17:45
Show Gist options
  • Star 5 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save aquach/cd72678d19928fe60619 to your computer and use it in GitHub Desktop.
Save aquach/cd72678d19928fe60619 to your computer and use it in GitHub Desktop.
Exports Amplitude data to MySQL
class AmplitudeEvent < ActiveRecord::Base
end
require 'httparty'
require 'json'
require 'multiple_files_gzip_reader' # necessary to work around https://bugs.ruby-lang.org/issues/9790
require 'zipruby'
module AmplitudeImporter
class << self
# Downloads data from Amplitude between the times provided, with hour granularity.
def download(time_gte: nil, time_lte: nil)
time_gte ||= Time.at(0)
time_lte ||= Time.now
HTTParty.get('https://amplitude.com/api/2/export', {
query: {
start: time_gte.utc.strftime('%Y%m%dT%H'),
end: time_lte.utc.strftime('%Y%m%dT%H')
},
basic_auth: {
username: '<API key>',
password: '<API key secret>'
},
timeout: 1_000_000 # Wait as long as it takes.
}).parsed_response
end
# Decompresses a zipped archive of gzipped files and yields each line of each file.
def decompress_downloaded_data(zipped_resp)
Zip::Archive.open_buffer(zipped_resp) do |archive|
archive.each do |f|
MultipleFilesGzipReader.new(StringIO.new(f.read)).each_line do |l|
yield l
end
end
end
end
def import_event(json_str)
event = JSON.parse(json_str.strip)
sql_event = AmplitudeEvent.find_or_initialize_by(uuid: event['uuid'])
sql_event.update!({
amplitude_id: event['amplitude_id'],
user_id: event['user_id'],
device_id: event['device_id'],
event_time: event['event_time'],
server_upload_time: event['server_upload_time'],
client_event_time: event['client_event_time'],
client_upload_time: event['client_upload_time'],
event_id: event['event_id'],
session_id: event['session_id'],
event_type: event['event_type'],
amplitude_event_type: event['amplitude_event_type'],
version_name: event['version_name'],
platform: event['platform'],
os_name: event['os_name'],
os_version: event['os_version'],
device_brand: event['device_brand'],
device_manufacturer: event['device_manufacturer'],
device_model: event['device_model'],
device_carrier: event['device_carrier'],
device_type: event['device_type'],
device_family: event['device_family'],
country: event['country'],
language: event['language'],
city: event['city'],
region: event['region'],
dma: event['dma'],
ip_address: event['ip_address'],
paying: event['paying'],
revenue: event['revenue'],
event_properties: event['event_properties'].to_json,
user_properties: event['user_properties'].to_json,
data: event['data'].to_json
})
end
# Imports all events created since the last event. This is your main entry point.
def import_new_events!
last_event = AmplitudeEvent.order('event_time DESC').first
last_event_time = last_event && last_event.parsed_event_time
# Two hours of slop, just in case.
query_time = last_event_time && last_event_time - 2.hours
decompress_downloaded_data(download(time_gte: query_time), &method(:import_event))
end
end
end
class AddAmplitudeFields < ActiveRecord::Migration
def change
create_table "amplitude_events", id: false do |t|
t.column 'uuid', :string_primary_key
t.integer "created_at", limit: 8, null: false
t.integer "amplitude_id", limit: 8
t.string "user_id", limit: 64
t.string "device_id"
t.string "event_time", limit: 64
t.string "server_upload_time"
t.string "client_event_time"
t.string "client_upload_time"
t.integer "event_id", limit: 8
t.integer "session_id", limit: 8
t.string "event_type"
t.string "amplitude_event_type"
t.string "version_name"
t.string "platform"
t.string "os_name"
t.string "os_version"
t.string "device_brand"
t.string "device_manufacturer"
t.string "device_model"
t.string "device_carrier"
t.string "device_type"
t.string "device_family"
t.string "country"
t.string "language"
t.string "city"
t.string "region"
t.string "dma"
t.string "ip_address"
t.string "paying"
t.decimal "revenue", precision: 15, scale: 2
t.string "event_properties"
t.string "user_properties"
t.string "data"
end
add_index "amplitude_events", ["user_id", "event_time"], name: "user_id_event_time", using: :btree
add_index "amplitude_events", ["event_time"], name: "event_time", using: :btree
# Whatever other indices you need.
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment