Skip to content

Instantly share code, notes, and snippets.

@peko
Last active April 17, 2016 14:37
Show Gist options
  • Save peko/605238bd08f6d39304fc9d62b26be73f to your computer and use it in GitHub Desktop.
Save peko/605238bd08f6d39304fc9d62b26be73f to your computer and use it in GitHub Desktop.
Hashtag aggregator for instagram, vk & twitter
pg = require "pg"
hag = require "./hashtag-aggregator"
postgres = "postgres://"
cn = postgres
pc = new pg.Client cn
await pc.connect defer err
if err
console.error 'could not connect to postgres', err
process.exit 1
console.log 'Connected to postgres'
hashtag = ""
cache_url = ""
cache_path = ""
insta = new hag.Instagram pc,
name : "insta_test"
hashtag : hashtag
cache_url : cache_url
cache_path: cache_path
instagram_cid: ""
instagram_sec: ""
vk = new hag.Vkontakte pc,
name : "vk_test"
hashtag : hashtag
cache_url : cache_url
cache_path: cache_path
twitter = new hag.Twitter pc,
name : "twitter_test"
hashtag : hashtag
cache_url : cache_url
cache_path: cache_path
oauth:
consumer_key : ''
consumer_secret : ''
access_token_key : ''
access_token_secret: ''
mkdirp = require "mkdirp"
colors = require "colors"
query = require "querystring"
request = require "request"
md5 = require "md5"
fs = require "fs"
https = require "https"
http = require "http"
TS = "TIMESTAMP WITH TIME ZONE 'epoch' + INTERVAL '1 second'"
class HashtagAggregator
new_posts: []
totalPostCounter:0
constructor:(@pc, @cfg)->
@cfg.name ?="dummy"
@cfg.interval ?= 5000
@cfg.cache_path ?= "./cache"
@cfg.cache_mem ?= 1000
@cfg.cache_url ?= "http://localhost:8000"
mkdirp.sync "#{@cfg.cache_path}/#{@cfg.name}"
@posts = {}
@queue = []
do @createScheme
@iid = setInterval @iterate, @cfg.interval
createScheme:=>
# DROP TABLE IF EXISTS #{@cfg.name} CASCADE;
qry =
"""
CREATE TABLE IF NOT EXISTS #{@cfg.name} (
id SERIAL,
pid character varying,
owner character varying,
text character varying,
cache character varying,
signed character varying,
image character varying,
hashtag character varying,
utime timestamp(6) without time zone,
moderated boolean DEFAULT false,
posted boolean DEFAULT false,
CONSTRAINT #{@cfg.name}_pid UNIQUE(pid)
);
"""
await @pc.query qry, defer(err, data)
if err
console.log err
process.exit 1
storePost:(post)=>
console.error "--- #{post.pid} ---".blue
console.log "owner: ", post.owner
console.log "text: ", post.text[..140]
console.log "image: ", post.image
console.log "cache: ", post.cache
console.log "hashtag: ", post.hashtag
console.log "utime: ", post.utime
qry = """
INSERT INTO #{@cfg.name}
( pid, owner, text, image, cache, hashtag, utime )
VALUES ( $1, $2, $3, $4, $5, $6, #{TS} * $7 );
"""
params = [
post.pid
post.owner
post.text
post.image
post.cache
post.hashtag
post.utime
]
@pc.query qry, params, (err, data)->console.error err if err
stop:=>
clearInterval @iid
iterate:=>
await @grab defer data
try
@extract JSON.parse data
catch err
return console.error err
@newPostCounter = 0
console.error "=== #{@cfg.name} ===".green
@processNewPosts()
console.error "=== #{@cfg.name} #{@totalPostCounter}+#{@newPostCounter} ===".green
@totalPostCounter+= @newPostCounter
download: (url, cache)=>
file = fs.createWriteStream "#{@cfg.cache_path}/#{@cfg.name}/#{cache}"
if url.indexOf("https://") is 0
https.get url, (res)->res.pipe file
else
http.get url, (res)->res.pipe file
processNewPosts:=>
while post = @new_posts.shift()
until post.pid of @posts
if post.image isnt null
if (qpos=post.image.indexOf "?") > 0
post.image = post.image[..qpos-1]
cache = "#{md5 post.image}.jpg"
@download post.image, cache
post.cache = "#{@cfg.cache_url}/#{@cfg.name}/#{cache}"
else
post.cache = null
post.hashtag = @cfg.hashtag
@storePost post
@newPostCounter++
@posts[post.pid] = post
@queue.push post.pid
delete @posts[@queue.shift()] if @queue.length > @cfg.cache_mem
console.error "queue: #{@queue.length}"
# INTERFACE
grab:(cb)=> console.error "GRAB NOT IMPLEMENTED".red
extract:(json)=> console.error "extract NOT IMPLEMENTED".red
###
INSTAGRAM IMLEMENTATION
###
class Instagram extends HashtagAggregator
grab:(cb)=>
params =
client_id : @cfg.instagram_cid
client_secret: @cfg.instagram_sec
min_tag_id : 0
url = "https://api.instagram.com/v1/tags/#{@cfg.hashtag}/media/recent?#{query.stringify params}"
await request url, defer(err, res, body)
cb body
extract:(json)=>
for post in json.data
p =
pid : post.id
text : post.caption.text
owner: post.user.username
image: post.images.standard_resolution.url
utime: post.created_time
@new_posts.push p
###
VKONTAKTE IMLEMENTATION
###
class Vkontakte extends HashtagAggregator
grab:(cb)=>
params =
v: "5.50"
q: "##{@cfg.hashtag}"
count:200
url = "https://api.vk.com/method/newsfeed.search?#{query.stringify(params)}"
await request url, defer(err, res, body)
cb body
extract:(json)=>
for post in json.response.items
image=undefined
if post.attachments
for attach, j in post.attachments
if attach.type is "photo"
image?=attach.photo.photo_1280
image?=attach.photo.photo_807
image?=attach.photo.photo_604
image?=attach.photo.photo_130
break
image?=null
p =
pid : "#{post.owner_id}_#{post.id}"
text : post.text
owner: post.owner_id
image: image
utime: post.date
@new_posts.push p
###
ТWITTER IMLEMENTATION
###
class Twitter extends HashtagAggregator
grab:(cb)=>
params =
q: "##{@cfg.hashtag}"
url = "https://api.twitter.com/1.1/search/tweets.json?#{query.stringify(params)}"
await request {url:url, oauth:@cfg.oauth}, defer(err, res, body)
cb body
extract:(json)=>
for post in json.statuses
image=undefined
if post.entities?.media?
for m in post.entities.media
if m.type is "photo"
image = m.media_url_https
break
image?=null
p =
pid : post.id_str
text : post.text
owner: post.user.screen_name
image: image
utime: "#{Date.parse(post.created_at)}"[..-4]
@new_posts.push p
module.exports =
Instagram: Instagram
Vkontakte: Vkontakte
Twitter : Twitter
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment