Skip to content

Instantly share code, notes, and snippets.

@Canorus
Created October 11, 2021 07:54
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Canorus/85394ec447c65c2d015da71d542c4caf to your computer and use it in GitHub Desktop.
Save Canorus/85394ec447c65c2d015da71d542c4caf to your computer and use it in GitHub Desktop.
Mastodon 인스턴스에서 favourite 한 툿들 중에 키워드를 포함하고 있는 툿을 검색해서 dm으로 보내주는 스크립트
import sys
import requests
import os
import json
from time import sleep
import re
base = os.path.join(os.path.dirname(os.path.abspath(__file__)),'')
with open(os.path.join(base, 'config.txt')) as f:
c = json.loads(f.read())
instance = c['instance']
key = c['access_key']
import logging
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
formatter = logging.Formatter('[%(asctime)s][%(levelname)s|%(filename)s:%(lineno)s] >> %(message)s')
streamHandler = logging.StreamHandler()
streamHandler.setFormatter(formatter)
logger.addHandler(streamHandler)
fileHandler = logging.FileHandler(base + 'bot.log', mode='a')
fileHandler.setFormatter(formatter)
logger.addHandler(fileHandler)
def get_id(s): # cat be blank
return re.search('m.*?_id=\d*', s).group()
def get_max_id(s):
try:
id_ = re.search('max_id=\d*', s).group()
return id_
except:
return None
def parser(r):
content = json.loads(r.content.decode()) # list
try:
max_id = get_max_id(r.headers['link'])
except:
max_id = None
return content, max_id
def strip(t):
t = re.sub('</p><p>','\n',t)
t = re.sub('(<.?p>|<.?a.*?>|<.?span.*?>)','',t)
t = re.sub('&lt;','<',t)
t = re.sub('&gt;','>',t)
t = re.sub('&apos;','\'',t)
t = re.sub('&quot;','\'',t)
t = re.sub('<br.*?\/?>','\n',t)
return t
def send_query(*max_id):
if max_id:
max_id = max_id[0]
return requests.get(instance + '/api/v1/favourites?' + max_id, headers={'Authorization' : 'Bearer ' + key})
else:
return requests.get(instance + '/api/v1/favourites', headers={'Authorization' : 'Bearer ' + key})
def search(kw, l):
res = list()
kwl = kw.split(' ')
for li in l:
for ki in kwl:
if ki in li['content']:
logger.debug(str(li))
logger.debug('')
res.append(li)
m = strip(li['content'])
logger.debug(m)
u = li['url']
logger.debug(u)
logger.info('sending dm:\n' + m + '\n\n' + u)
send_dm(m + '\n\n' + u)
break
return res
def send_dm(m):
requests.post(instance + '/api/v1/statuses', data={'status': str(m), 'visibility': 'direct'}, headers = {'Authorization': 'Bearer ' + key})
def main():
res = list()
kw = sys.argv[1]
r = send_query()
content, max_id = parser(r)
while max_id is not None:
r = send_query(max_id)
content, max_id = parser(r)
sleep(10)
return 0
if __name__=='__main__':
main()
{"instance":"INSTANCE HERE", "access_key":"ACCESS_KEY HERE"}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment