Skip to content

Instantly share code, notes, and snippets.

@huguesbr
Created April 3, 2014 16:55
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save huguesbr/9958369 to your computer and use it in GitHub Desktop.
Save huguesbr/9958369 to your computer and use it in GitHub Desktop.
#!/bin/ruby
# import data from keen to keen
# can also delete data from source
require 'keen'
require 'awesome_print'
require 'active_support/core_ext'
require 'i18n'
EVENTS_IMPORT_SLICE_SIZE = 200
def import(collection: nil, client_from: nil, client_to: nil, options: nil, filter: nil, write: false, delete: false)
starting_date = DateTime.iso8601(options[:timeframe]['start']).strftime('%Y-%m-%d-%H')
ending_date = DateTime.iso8601(options[:timeframe]['end']).strftime('%Y-%m-%d-%H')
timestamp = Time.now.getutc.to_i.to_s
ap "Processing #{collection} from #{starting_date} to #{ending_date} "
# count source & destination
count = client_from.count(collection, options.clone)
ap "#{count} #{collection} on source client (#{client_from.project_id}) to be imported"
return 0 if count == 0
# extract source
events = client_from.extraction(collection, options.clone)
ap "#{events.count} #{collection} on source client (#{client_from.project_id}) extracted"
return 0 if events.empty?
# save extraction to file
File.open("#{client_to.project_id}-events-#{collection}-#{ending_date}-#{timestamp}.json", "w") do |file|
file.write events.to_json
end
# write
return false if !write
ap "Publishing #{events.count} #{collection} to destination"
total_imported = 0
total_deleted = 0
events.each_slice(EVENTS_SLICE_SIZE) do |events|
# trim keen keys from events
events = events.map {|e| e.merge('keen' => e['keen'].except('id', 'created_at')) }
# import
results = client_to.publish_batch(collection => events)
ap "#{results[collection].count} #{collection} imported to destination client (#{client_to.project_id})"
total_imported += results[collection].count
#delete if import successful
if delete && results[collection].count == events.count
# delete events based on keen.id key
client_from_deleted_result = client_from.delete(
collection,
filters: [{
property_name: 'keen.id',
operator: 'in',
property_value: events.map{|e| e['keen']['id'] }
}]
)
total_deleted += events.count if client_from_deleted_result
ap "#{total_deleted} #{collection} deleted on source client (#{client_from.project_id})"
end
end
# stats
ap "TOTAL: #{total_imported} #{collection} imported to destination client (#{client_to.project_id})"
ap "DELETED #{total_deleted} from #{collection} on source client (#{client_from.project_id})"
return total_imported
end
from_keen = Keen::Client.new(
project_id: 'xx',
write_key: 'xx',
read_key: 'xx',
master_key: 'xx'
)
to_keen = Keen::Client.new(
project_id: 'xx',
write_key: 'xx',
read_key: 'xx',
master_key: nil
)
from_date = DateTime.iso8601('2014-03-14T00:00:00Z')
to_date = from_date + 9.days
import_filters = [{
property_name:"app.build",
operator: "eq",
property_value: "3247"
}];
import_collections = ['xxx']
options = {
timeframe: {
"start" => from_date.to_s,
"end" => to_date.to_s
},
filters: import_filters
}
collections_counts = {}
# import with slice of 6 hours
(0..5).each do |d|
(1..4).each do |h|
import_collections.each do |collection|
options[:timeframe]['start'] = (starting_date + d.days).to_s
options[:timeframe]['end'] = (starting_date + d.days + (h*6).hours).to_s
imported = import(
collection: collection,
client_from: keen_dev,
client_to: keen_prod,
options: options,
filter: nil,
write: true,
delete: true,
)
collections_counts[collection] = (collections_counts[collection] || 0) + imported if imported
end
end
end
ap collections_counts
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment