public
Created

Records DataSift Twitter interaction stream to Google Fusion Table

  • Download Gist
ds_to_gft.rb
Ruby
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96
# Description: Consume a DataSift stream and save Twitter interactions to a Google Fusion Table
# Author: Paul M. Watson <paul.watson@storyful.com>
# Date: 2011/11/28
# Usage:
# ruby ds_to_gft.rb <DataSift stream id hash>
# config.yml should contain;
# datasift:
# username: datasift username
# api_key: datasift api key
# googlefusiontables:
# username: google account login
# password: google account password
# Code credit:
# datasift: https://github.com/datasift/datasift-ruby/blob/master/examples/consume-stream.rb
# tokumine: https://github.com/tokumine/fusion_tables/blob/master/examples/boris_bikes.rb
 
require 'rubygems'
require 'fusion_tables'
require 'json'
require 'datasift'
 
if ARGV.size == 0
puts 'ERR: Please specify the DataSift stream hash ID to consume'
exit!
end
 
streamhashid = ARGV[0]
 
config = YAML::load(File.open(File.join(File.dirname(__FILE__), 'config.yml')))
 
# Setup Google Fusion Table
puts 'Google Fusion Tables: Creating...'
ft = GData::Client::FusionTables.new
ft.clientlogin(config['googlefusiontables']['username'], config['googlefusiontables']['password'])
table_name = 'datasift_stream_' + streamhashid
cols = [
{:name => 'screen_name', :type => 'string'},
{:name => 'content', :type => 'string'},
{:name => 'created_at', :type => 'datetime'},
{:name => 'id', :type => 'number'},
{:name => 'location_user', :type => 'location'},
{:name => 'location_tweet', :type => 'location'}
]
tables = ft.show_tables
table = tables.select{|t| t.name == table_name}.first
table = ft.create_table(table_name, cols) if !table
 
# Consume DataSift stream
puts 'DataSift: Authenticating...'
user = DataSift::User.new(config['datasift']['username'], config['datasift']['api_key'])
consumer = user.getConsumer(DataSift::StreamConsumer::TYPE_HTTP, streamhashid)
 
consumer.onStopped do |reason|
puts 'DataSift: Stopped: ' + reason
end
 
puts 'DataSift: Consuming...'
rowcounter = 0
batchrows = 10
data = []
consumer.consume(true) do |interaction|
if interaction && interaction['interaction']['type'] == 'twitter'
screen_name = interaction['twitter']['user']['screen_name']
content = interaction['interaction']['content']
created_at = interaction['interaction']['created_at']
id = interaction['twitter']['id']
location_user = interaction['twitter']['user']['location']
location_tweet = ''
if interaction['twitter']['geo']
location_tweet = "#{interaction['twitter']['geo']['latitude']} #{interaction['twitter']['geo']['longitude']}"
end
puts "#{id} >> @#{screen_name} at #{created_at}"
puts interaction.to_json
data << {
"screen_name" => screen_name,
"created_at" => Time::parse(created_at),
"id" => id,
"content" => content,
"location_user" => location_user,
"location_tweet" => location_tweet
}
rowcounter += 1
if rowcounter > batchrows
puts "Google Fusion Tables: Inserting rows"
rowcounter = 0
table.insert data
data = []
end
end
end
puts 'DataSift: Finished consuming'

Please sign in to comment on this gist.

Something went wrong with that request. Please try again.