Skip to content

Instantly share code, notes, and snippets.

@ajbouh
Last active August 29, 2015 13:57
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ajbouh/9699141 to your computer and use it in GitHub Desktop.
Save ajbouh/9699141 to your computer and use it in GitHub Desktop.
#!/usr/bin/env ruby
require 'json'
require 'uri'
USAGE = <<-EOF
Usage: #{File.basename($0)} file suffixformat prefix_field bson_date_field [limit]
TODO(adamb) make prefix_field detect a constant (prefix) vs a field (.prefix)
TODO(adamb) make bson_date_field detect a constant (date) vs a field (.date)
TODO(adamb) Use optparse
TODO(adamb) Delete split-json-stream
TODO(adamb) Distribute this via refinery somehow, along with mongoimport and jq
Example values
file = ~/Documents/archive.json
suffixformat = -%F
prefix_field = collection
bson_date_field = timestamp
limit = 10
EOF
if ARGV.length != 4
abort(USAGE)
end
file, suffixformat, prefix_field, bson_date_field, limit = ARGV
limit = limit.to_i if limit
class CollectionPool
def initialize(uri, max)
@uri = uri
@max = max
@collections = Hash.new
end
def save(name, line)
get_io(name).puts(line)
end
private
def get_io(name)
existing = @collections[name]
return existing if existing
# We'll need to open another one...
cmd = [
'mongoimport',
'-h', @uri.host,
'--port', @uri.port.to_s,
'-d', @uri.path[1..-1],
'-c', name]
$stderr.puts("Starting mongoimport for #{name} with command: #{cmd}\n")
newest = IO.popen(cmd, 'w')
if @collections.length > @max
oldest_name = @collections.keys.first
oldest = @collections.delete(oldest_name)
$stderr.puts("Closing mongoimport for #{oldest_name} to make room\n")
oldest.close
end
@collections[name] = newest
end
end
lines = 0
pool = CollectionPool.new(URI.parse(ENV['MONGODB_URI']), 16)
cmd = [
'jq',
'-c', '--online-input', '--unbuffered',
"select(has(#{bson_date_field.inspect})) | select(has(#{prefix_field.inspect}))",
file
]
$stderr.puts("Using jq command: #{cmd}")
IO.popen(cmd) do |io|
io.each_line do |line|
if limit && lines > limit
print "\n"
exit(0)
end
json = JSON.parse(line, max_nesting: false)
lines += 1
prefix = json[prefix_field]
nameformat = prefix + suffixformat
ts = json[bson_date_field]
name = Time.at(ts['$date'] / 1000).utc.strftime(nameformat)
pool.save(name, line)
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment