Last active
March 26, 2020 17:45
-
-
Save aquach/cd72678d19928fe60619 to your computer and use it in GitHub Desktop.
Exports Amplitude data to MySQL
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class AmplitudeEvent < ActiveRecord::Base | |
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require 'httparty' | |
require 'json' | |
require 'multiple_files_gzip_reader' # necessary to work around https://bugs.ruby-lang.org/issues/9790 | |
require 'zipruby' | |
module AmplitudeImporter | |
class << self | |
# Downloads data from Amplitude between the times provided, with hour granularity. | |
def download(time_gte: nil, time_lte: nil) | |
time_gte ||= Time.at(0) | |
time_lte ||= Time.now | |
HTTParty.get('https://amplitude.com/api/2/export', { | |
query: { | |
start: time_gte.utc.strftime('%Y%m%dT%H'), | |
end: time_lte.utc.strftime('%Y%m%dT%H') | |
}, | |
basic_auth: { | |
username: '<API key>', | |
password: '<API key secret>' | |
}, | |
timeout: 1_000_000 # Wait as long as it takes. | |
}).parsed_response | |
end | |
# Decompresses a zipped archive of gzipped files and yields each line of each file. | |
def decompress_downloaded_data(zipped_resp) | |
Zip::Archive.open_buffer(zipped_resp) do |archive| | |
archive.each do |f| | |
MultipleFilesGzipReader.new(StringIO.new(f.read)).each_line do |l| | |
yield l | |
end | |
end | |
end | |
end | |
def import_event(json_str) | |
event = JSON.parse(json_str.strip) | |
sql_event = AmplitudeEvent.find_or_initialize_by(uuid: event['uuid']) | |
sql_event.update!({ | |
amplitude_id: event['amplitude_id'], | |
user_id: event['user_id'], | |
device_id: event['device_id'], | |
event_time: event['event_time'], | |
server_upload_time: event['server_upload_time'], | |
client_event_time: event['client_event_time'], | |
client_upload_time: event['client_upload_time'], | |
event_id: event['event_id'], | |
session_id: event['session_id'], | |
event_type: event['event_type'], | |
amplitude_event_type: event['amplitude_event_type'], | |
version_name: event['version_name'], | |
platform: event['platform'], | |
os_name: event['os_name'], | |
os_version: event['os_version'], | |
device_brand: event['device_brand'], | |
device_manufacturer: event['device_manufacturer'], | |
device_model: event['device_model'], | |
device_carrier: event['device_carrier'], | |
device_type: event['device_type'], | |
device_family: event['device_family'], | |
country: event['country'], | |
language: event['language'], | |
city: event['city'], | |
region: event['region'], | |
dma: event['dma'], | |
ip_address: event['ip_address'], | |
paying: event['paying'], | |
revenue: event['revenue'], | |
event_properties: event['event_properties'].to_json, | |
user_properties: event['user_properties'].to_json, | |
data: event['data'].to_json | |
}) | |
end | |
# Imports all events created since the last event. This is your main entry point. | |
def import_new_events! | |
last_event = AmplitudeEvent.order('event_time DESC').first | |
last_event_time = last_event && last_event.parsed_event_time | |
# Two hours of slop, just in case. | |
query_time = last_event_time && last_event_time - 2.hours | |
decompress_downloaded_data(download(time_gte: query_time), &method(:import_event)) | |
end | |
end | |
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class AddAmplitudeFields < ActiveRecord::Migration | |
def change | |
create_table "amplitude_events", id: false do |t| | |
t.column 'uuid', :string_primary_key | |
t.integer "created_at", limit: 8, null: false | |
t.integer "amplitude_id", limit: 8 | |
t.string "user_id", limit: 64 | |
t.string "device_id" | |
t.string "event_time", limit: 64 | |
t.string "server_upload_time" | |
t.string "client_event_time" | |
t.string "client_upload_time" | |
t.integer "event_id", limit: 8 | |
t.integer "session_id", limit: 8 | |
t.string "event_type" | |
t.string "amplitude_event_type" | |
t.string "version_name" | |
t.string "platform" | |
t.string "os_name" | |
t.string "os_version" | |
t.string "device_brand" | |
t.string "device_manufacturer" | |
t.string "device_model" | |
t.string "device_carrier" | |
t.string "device_type" | |
t.string "device_family" | |
t.string "country" | |
t.string "language" | |
t.string "city" | |
t.string "region" | |
t.string "dma" | |
t.string "ip_address" | |
t.string "paying" | |
t.decimal "revenue", precision: 15, scale: 2 | |
t.string "event_properties" | |
t.string "user_properties" | |
t.string "data" | |
end | |
add_index "amplitude_events", ["user_id", "event_time"], name: "user_id_event_time", using: :btree | |
add_index "amplitude_events", ["event_time"], name: "event_time", using: :btree | |
# Whatever other indices you need. | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment