Skip to content

Instantly share code, notes, and snippets.

@e0en
Created May 1, 2019 07:51
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save e0en/3f7a5a45999b5e5f649fddcd3086e31e to your computer and use it in GitHub Desktop.
Save e0en/3f7a5a45999b5e5f649fddcd3086e31e to your computer and use it in GitHub Desktop.
A twitter bot that posts any new announcements from SONY korea online store
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import json
from pathlib import Path
import time
import traceback
from urllib import request
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import twitter
def fetch_content(article_id):
url = 'https://store.sony.co.kr/handler/SonyStoreNotice-Detail'
data = urlencode({'bbsNo': article_id, 'seqNum': 1}).encode('utf-8')
with request.urlopen(url, data=data) as resp:
soup = BeautifulSoup(resp.read(), 'html.parser')
content = soup.find('div', attrs={'class': 'cont'})
if content is None:
content = soup.find('div', attrs={'class': 'wrapper'})
if content is None:
content = soup.find('div', attrs={'class': 'press_cont'})
return content.text.strip()
def alert(api, article_id, title, date):
url = 'https://store.sony.co.kr/handler/SonyStoreNotice-Start'
content = (fetch_content(article_id))
if len(content) > 30:
content = content[:27] + '...'
msg = f'[{date}] {title}\n{content}\n{url}'
api.PostUpdate(msg)
def check_new_noti(prev_last_id):
url = 'https://store.sony.co.kr/handler/SonyStoreNotice-Start'
with request.urlopen(url) as resp:
soup = BeautifulSoup(resp.read(), 'html.parser')
rows = list(soup.find_all("tr"))[1:]
for r in rows:
cells = r.find_all("td")
last_id = cells[0].text
if last_id == prev_last_id:
return None
if last_id != prev_last_id:
title = cells[1].a.text
date = cells[2].text
return (last_id, title, date)
def write_log(log_path, msg):
ts = time.time()
with open(log_path, 'a') as fp:
fp.write(f'{ts}\t{msg}\n')
if __name__ == '__main__':
pwd = Path(__file__).parents[0].resolve()
with open(pwd.joinpath('twitter-api-key.json')) as fp:
twitter_key = json.loads(fp.read())
api = twitter.Api(**twitter_key)
id_path = pwd.joinpath('last_id.txt')
log_path = pwd.joinpath('sony.log')
write_log(log_path, f'started')
with open(id_path) as fp:
last_id = fp.read().strip()
write_log(log_path, f'loaded last_id = {last_id}')
try:
result = check_new_noti(last_id)
if result:
[last_id, title, date] = result
alert(api, last_id, title, date)
write_log(log_path, f'Tweeted: {last_id}\t{title}\t{date}')
else:
print('No new notifications')
with open(id_path, 'w') as fp:
fp.write(last_id)
write_log(log_path, f'wrote {last_id} to last_id.txt')
except Exception:
write_log(log_path, traceback.format_exc())
{
"consumer_key": "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXX",
"consumer_secret": "XXXXXXXXXXXXXXXXXXXXXXXXXXXX",
"access_token_key": "XXXXXXXXXXXXXXXX-XXXXXXXXXXXXXXXX",
"access_token_secret": "XXXXXXXXXXXXXXXXXXXXXXXXXXXX"
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment