Skip to content

Instantly share code, notes, and snippets.

@lnanase
Last active May 16, 2020 04:17
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save lnanase/575871f1cc1b28ed0db86fb8a207eabb to your computer and use it in GitHub Desktop.
Save lnanase/575871f1cc1b28ed0db86fb8a207eabb to your computer and use it in GitHub Desktop.
imastodonのワードクラウド(トレンドン4i)を作成するプログラム
02,32 * * * * (cd $HOME/scripts/rb && ruby pull_imastodon_timeline.rb > /tmp/pull_imastodon_timeline.log 2>&1) > /dev/null
06 00,06,12,18 * * * (cd $HOME/scripts/py && $HOME/.pyenv/shims/python make_wordcloud.py > /tmp/make_wordcloud.log 2>&1) > /dev/null
10 00,06,12,18 * * * (cd $HOME/scripts/rb && ruby push_imastodon_trendon.rb >> /tmp/push_imastodon_t.log 2>&1) > /dev/null
# -*- coding: utf-8 -*-
# imastodon用にwordcloudを作成する
from collections import Counter
from dotenv import load_dotenv
from PIL import Image
from wordcloud import WordCloud
import csv
import emoji
import memcache
import numpy as np
import os
load_dotenv()
def make_wordcloud():
# tsv形式のファイルを辞書型変数に格納
dic = {}
tsv = csv.reader(open('/tmp/imastodon_words.txt', "r"), delimiter="\t")
for row in tsv:
dic.setdefault('id' + row[0], []).append(row[1])
# ブラックリスト入りのチェック
dic = spam_check(dic)
text = []
for val in dic.values():
text.extend(val)
# 絵文字除去
text2 = remove_emoji("\n".join(text))
im = np.array(Image.open('imas_mask.png'))
words = Counter(text2.split('\n'))
print(words, file=open('/tmp/imastodon_words_counter.log', 'w'))
#wordcloud = WordCloud(background_color="white", font_path="/usr/share/fonts/NotoSansCJKjp/NotoSansCJKjp-Regular.otf", width=1024, height=600, mask=im, stopwords="").generate_from_frequencies(words)
wordcloud = WordCloud(background_color="white", font_path="/usr/share/fonts/NotoSansCJKjp/NotoSansCJKjp-Regular.otf", width=1024, height=600, stopwords="").generate_from_frequencies(words)
wordcloud.to_file("/tmp/wordcloud.png")
# 絵文字除去
def remove_emoji(src):
return ''.join(c for c in src if c not in emoji.UNICODE_EMOJI)
# 同一単語を一定数投稿してるユーザーがいないかチェック
def spam_check(dic):
del_key = []
limit = int(os.getenv('IMASTODON_SPAM_LIMIT'))
for key, row in dic.items():
if limit < len(row):
c = Counter(row)
max_count = c.most_common()[0]
if limit < max_count[1]:
print(key)
print(max_count)
del_key.append(key)
# ブラックリスト入のユーザーの対応
if len(del_key) > 0:
for key in del_key:
del dic[key]
# memcacheに保存
save_spam_id(','.join(del_key).replace('id', ''))
return dic
# memcacheに文字列を保存
def save_spam_id(key):
db = memcache.Client(['localhost:11211'])
db.set('wordcloud', key, 64800)
print("memcache save. " + key)
if __name__ == '__main__':
make_wordcloud()
# ワードクラウド作成後は単語ファイルを旧ファイルとして退避
os.rename('/tmp/imastodon_words.txt', '/tmp/imastodon_words.txt.old')
# -*- coding: utf-8 -*-
# imastodonのLTLを取得してMecabで形態素解析を行う
# require natto
#
# usage: pull_imastodon_timeline.rb
require 'nkf'
require 'rubygems'
require 'bundler'
Bundler.require(:default)
Dotenv.load
# CONST
UA = 'Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/63.0.3239.84 Safari/537.36' #Chrome
# LTLからタイムラインを検索し、ユーザーID,トゥートのハッシュを返す
def find_latest_timeline(agent)
contents = {}
# デフォルトは過去1時間分を収集
yesterday = Time.now - 1 * 60 * 60
max_id = -1
limit_id = nil
# 前回実行分のstatus_idを取得
if File.exist?('/tmp/imastodon_words_id')
limit_id = IO.read('/tmp/imastodon_words_id').strip
end
puts "limit: #{limit_id}"
# memcacheからブラックリストのIDを取得
client = Dalli::Client.new('localhost:11211')
value = client.get('wordcloud')
puts "blacklist ids: #{value}"
black_ids = value.nil? ? [] : value.split(/,/)
# LTLをsearch
get_uri = ENV['MASTODON_HOST'] + 'api/v1/timelines/public'
get_params = {'local': true, 'limit': 40}
# MAX40件しかget出来ないのでループ
until max_id.nil? do
res = agent.get(get_uri, get_params)
JSON.parse(res.body).each {|status|
# 最新のstatus_idを記録
if max_id == -1
open('/tmp/imastodon_words_id.bak', 'w') {|f| f.puts(status['id']) }
end
created_at = Time.parse(status['created_at'])
created_at.localtime("+09:00")
p status['id']
#p status
# botアカウントは除外 && ブラックリストでない
if status['account']['bot'] == false && black_ids.include?(status['account']['id']) == false
content = (status['spoiler_text'].empty? ? '' : status['spoiler_text'] << ' ') << status['content']
# リンク削除
content = content.gsub(/<a href="https:\/\/imastodon\.net\/tags\/(.*)" (.*)>#(.*)<\/a>/) { $3 } # ハッシュタグは残す
content = content.gsub(/<a href=".*".*>(.*)<\/a>/, "")
# htmlタグ除去
content = Nokogiri::HTML(content).xpath("//text()").to_s
content = content.gsub(/(&amp;|&lt;|&gt;|&quot;|&copy;)/, "")
# 全角英数、半角カナを変換
content = NKF.nkf('-m0XZ1 -W -w', content)
unless contents.has_key?(status['account']['id'])
contents[status['account']['id']] = []
end
contents[status['account']['id']] << content
end
max_id = status['id']
get_params['max_id'] = max_id
if (limit_id.nil? && yesterday > created_at) || (!limit_id.nil? && limit_id >= max_id)
max_id = nil
break
end
}
end
contents
end
# 文字列をmecabを使用して形態素解析
def parse_in_mecab(contents)
# 集計ルール
target = ['名詞', '形容詞']
exclude = ['非自立', '接尾', '代名詞', '数']
exclude_phrase = ['/', '.', ',', ':', '-', '_', '#', '?', '!', 'http', 'https', '://', '(', ')', '₍', '₎', '()','[', ']','ー','一', '~', 'いい' ,'ない']
result = {}
nm = Natto::MeCab.new
contents.each {|k, v|
result[k] = []
# トゥートの配列をスペースで文字列に変換して形態素解析
nm.enum_parse(v.join(" ")).each {|n|
if target.any? {|t| n.feature.start_with?(t) }
# 除外する条件
next if exclude.any? {|e| n.feature.include?(e) }
next if exclude_phrase.any? {|e| n.surface == e }
result[k] << n.surface
end
}
}
result
end
# imastodon
agent = Mechanize.new
agent.user_agent = UA
agent.request_headers = {'accept-language' => 'ja, ja-JP', 'accept-encoding' => 'utf-8'}
agent.request_headers = {'Authorization' => 'Bearer ' + ENV['TRENDON_BEARER']}
contents = find_latest_timeline(agent)
result = parse_in_mecab(contents)
# tsvでoutput(append)
open('/tmp/imastodon_words.txt', 'a') {|f| result.each { |k, v| v.each { |s| f.puts(k + "\t" + s) } unless v.empty? } }
# status_idファイルを置き換え
if File.exists?("/tmp/imastodon_words_id")
File.delete("/tmp/imastodon_words_id")
end
File.rename("/tmp/imastodon_words_id.bak", "/tmp/imastodon_words_id")
# -*- coding: utf-8 -*-
# imastodonに生成したワードクラウドの画像を送信する
#
# usage: push_imastodon_trendon.rb
require 'rubygems'
require 'bundler'
Bundler.require(:default)
Dotenv.load
# CONST
UA = 'Mozilla/5.0 (Windows NT 6.3; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3987.132 Safari/537.36' #Chrome
IMAGE_PATH = '/tmp/wordcloud.png'
# mechanize
agent = Mechanize.new
agent.user_agent = UA
agent.request_headers = {'accept-language' => 'ja, ja-JP', 'accept-encoding' => 'utf-8'}
agent.request_headers = {'Authorization' => 'Bearer ' + ENV['TRENDON_BEARER']}
# imastodon push
# image
res = agent.post(ENV['MASTODON_HOST'] + 'api/v1/media', {
'file': File.open(IMAGE_PATH)
})
hash = JSON.parse(res.body)
image_id = hash['id']
p image_id
# toot
res = agent.post(ENV['MASTODON_HOST'] + 'api/v1/statuses', {
'status': '直近6時間のワードクラウドです(β版) #トレンドン4i',
'media_ids[]': [image_id],
'visibility': 'public'
})
p res.code
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment