Skip to content

Instantly share code, notes, and snippets.

@carlessistare
Created October 19, 2015 09:27
Show Gist options
  • Save carlessistare/d92df542c138a13e47e8 to your computer and use it in GitHub Desktop.
Save carlessistare/d92df542c138a13e47e8 to your computer and use it in GitHub Desktop.
Couchbase = require 'couchbase'
fs = require('fs')
readline = require('readline')
stream = require('stream')
async = require('async')
elasticsearch = require('elasticsearch')
host = "xxx"
cluster = new Couchbase.Cluster("couchbase://#{host}")
couchbase = cluster.openBucket("users")
path_in = '/Users/carles/www/mr-algo/data/targeted_users/users.txt'
path_out = '/Users/carles/www/mr-algo/data/targeted_users/users_countries.txt'
instream = fs.createReadStream path_in
outstream = new stream
outstream.readable = true
outstream.writable = true
fd = fs.openSync path_out, 'w+'
i = 0
funci = (line, lineCount) ->
return (cb)->
couchbase.get "odid:#{line}", (err, res) ->
i++
console.log i if i % 1000 is 0
geoipcode = ""
if not err? and res?.value?.geoip?.code?
geoipcode = res.value.geoip.code
fs.writeSync fd, "#{line}\t#{geoipcode}\n"
cb()
rl = readline.createInterface
input: instream
output: outstream
terminal: false
tasks = []
rl.on 'line', (line, lineCount) ->
tasks.push(funci(line, lineCount))
.on 'close', () ->
async.parallelLimit tasks, 2, (err, res) ->
console.log err, res
fs.closeSync(fd)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment