|
#!/usr/bin/env python3 |
|
# |
|
# TootsExporter.py |
|
# - Another Mastodon Toots Exporter in Python 3. |
|
# |
|
# Dependency: |
|
# - pip(3) install "requests[socks]" |
|
# |
|
# License: MIT |
|
|
|
import os |
|
import requests |
|
import json |
|
import csv |
|
import re |
|
import shutil |
|
from os.path import isfile |
|
from time import sleep |
|
from requests.packages.urllib3.util.retry import Retry |
|
from requests.adapters import HTTPAdapter |
|
|
|
proxies = { |
|
# 'http': 'socks5://127.0.0.1:1080', |
|
# 'https': 'socks5://127.0.0.1:1080', |
|
} # For the Great Firewall |
|
|
|
|
|
def atomic_write(data, filename): |
|
temp_file = filename + '.tmp' |
|
f = open(temp_file, 'w') |
|
json.dump(data, f) |
|
f.flush() |
|
os.fsync(f.fileno()) |
|
f.close() |
|
os.rename(temp_file, filename) |
|
|
|
|
|
retries = Retry(total=6, backoff_factor=0.1, read=3, connect=3, |
|
status_forcelist=[500, 502, 503, 504]) |
|
|
|
rq = requests.Session() |
|
rq.mount('https://', HTTPAdapter(max_retries=retries)) |
|
rq.proxies = proxies |
|
rq.timeout = 3 |
|
|
|
# 0. Load local data |
|
|
|
if isfile('Toots.json'): |
|
print('Loading Toots.json...') |
|
toots_data = json.load(open('Toots.json')) |
|
since_id = str(toots_data[-1]['id']) |
|
print(str(len(toots_data)) + ' toots have been loaded.') |
|
else: |
|
toots_data = [] |
|
since_id = '0' |
|
|
|
# 1. Authentication |
|
|
|
if isfile('Auth.json'): |
|
print('Loading Auth.json...') |
|
auth = json.load(open('Auth.json')) |
|
client_id = auth['client_id'] |
|
client_secret = auth['client_secret'] |
|
access_token = auth['access_token'] |
|
instance = auth['instance'] |
|
headers = auth['headers'] |
|
print('Checking access token...') |
|
test = rq.get(instance + '/api/v1/accounts/verify_credentials', |
|
proxies=proxies, headers=headers, timeout=3).json() |
|
if test.get('error', False): |
|
print('Your access token has been outdated.') |
|
login = False |
|
else: |
|
print('Your access token is still valid!') |
|
login = True |
|
rq.mount(instance, HTTPAdapter(max_retries=retries)) |
|
rq.headers.update(headers) |
|
else: |
|
auth = {} |
|
client_id = '' |
|
client_secret = '' |
|
access_token = '' |
|
instance = '' |
|
headers = {} |
|
login = False |
|
|
|
if not login: |
|
if not instance: |
|
# 1.1 Connect to instance |
|
print('Please enter the link to your Mastodon instance.') |
|
print('e.g. https://pawoo.net , then press Enter to continue.') |
|
instance = 'https://' + input('Link: ').replace('http://', '')\ |
|
.replace('https://', '').split('/')[0].strip() |
|
print('Connecting to ' + instance + ' ...') |
|
test = rq.get(instance + '/api/v1/instance').json() |
|
if test.get('title', False): |
|
print('Success.') |
|
else: |
|
print('Failed, exiting...') |
|
exit() |
|
rq.mount(instance, HTTPAdapter(max_retries=retries)) |
|
if not client_id: |
|
# 1.2 Apply for a new app |
|
print('Applying for a new app...') |
|
payload = {'client_name': 'TootsExporter', |
|
'redirect_uris': 'urn:ietf:wg:oauth:2.0:oob', |
|
'scopes': 'read'} |
|
test = rq.post(instance + '/api/v1/apps', data=payload).json() |
|
if test.get('Error', False): |
|
print('Failed, exiting...') |
|
exit() |
|
else: |
|
print('Success.') |
|
client_id = test['client_id'] |
|
client_secret = test['client_secret'] |
|
# 1.3 Login with Authentication Code |
|
print('Now please open the link below in browser to authorize this app:') |
|
oauth_uri = instance + '/oauth/authorize' + \ |
|
'?scope=read&response_type=code' + \ |
|
'&redirect_uri=urn:ietf:wg:oauth:2.0:oob' + \ |
|
'&client_id=' + client_id |
|
print(oauth_uri) |
|
print('After authentication, please copy the code in the web page') |
|
print('and paste below, then press Enter to continue.') |
|
auth_code = input('Authorization Code: ').strip() |
|
params = {'client_id': client_id, |
|
'client_secret': client_secret, |
|
'grant_type': 'authorization_code', |
|
'code': auth_code, |
|
'redirect_uri': 'urn:ietf:wg:oauth:2.0:oob'} |
|
test = rq.post(instance + '/oauth/token', params=params).json() |
|
if test.get('access_token', False): |
|
print('Login successfully.') |
|
else: |
|
print('Failed, exiting...') |
|
exit() |
|
access_token = test['access_token'] |
|
headers = {'Authorization': 'Bearer ' + access_token} |
|
rq.headers.update(headers) |
|
auth = {'client_id': client_id, |
|
'client_secret': client_secret, |
|
'instance': instance, |
|
'access_token': access_token, |
|
'headers': headers} |
|
atomic_write(auth, 'Auth.json') |
|
print('Auth info has been saved to `Auth.json`.') |
|
|
|
# 2. Get the target user |
|
|
|
if isfile('User.json'): |
|
print('Loading User.json...') |
|
user_data = json.load(open('User.json')) |
|
username = user_data['username'] |
|
else: |
|
print('\nNow please enter the username of the account you want to export,') |
|
print('e.g. @pixiv, then press Enter to continue.') |
|
print('If you want to export your toots, just press Enter.') |
|
print('If you want to export someone else, the amount of toots you can') |
|
print('export is always smaller than the total count.') |
|
username = input('Username: ').replace('@', '').strip() |
|
user_data = {'username': username} |
|
atomic_write(user_data, 'User.json') |
|
print('User info has been saved to `User.json`.') |
|
|
|
if username == '': |
|
test = rq.get(instance + '/api/v1/accounts/verify_credentials').json() |
|
userid = str(test['id']) |
|
else: |
|
test = re.search(r'/api/salmon/(\d+)', |
|
rq.get(instance + '/@' + username).text) |
|
userid = test.group(1) |
|
test = rq.get(instance + '/api/v1/accounts/' + userid).json() |
|
|
|
print('Username: ' + test['username']) |
|
print('ID: ' + str(test['id'])) |
|
print('Created at: ' + test['created_at']) |
|
print('Toots:' + str(test['statuses_count'])) |
|
|
|
# 3. Export all toots |
|
|
|
print('\nExporting toots, please wait...') |
|
|
|
test = rq.get(instance + '/api/v1/accounts/' + userid + \ |
|
'/statuses?limit=40&since_id=' + since_id).json() |
|
toots = [] |
|
|
|
while len(test) > 0: |
|
for i in test: |
|
i.pop('account', None) |
|
toots.append(i) |
|
print(str(len(toots)) + ' toots have been exported.') |
|
max_id = str(test[-1]['id']) |
|
sleep(1) |
|
test = rq.get(instance + '/api/v1/accounts/' + userid + \ |
|
'/statuses?limit=40&max_id=' + \ |
|
max_id + '&since_id=' + since_id).json() |
|
|
|
toots_data.extend(toots.__reversed__()) |
|
print('Total: ' + str(len(toots_data))) |
|
|
|
# 4. Save to local files |
|
|
|
atomic_write(toots_data, 'Toots.json') |
|
|
|
toots_csv = open('Toots.csv', 'w', errors='ignore') |
|
csv_writer = csv.writer(toots_csv) |
|
|
|
csv_writer.writerow(['id', 'url', 'content', 'visibility', 'date', 'media']) |
|
|
|
for i in toots_data: |
|
csv_writer.writerow([ |
|
i['id'], |
|
i['url'], |
|
re.sub(r'<.*?>', '', i['content'].replace('</p>', '\n').replace('<br', '\n<br')).strip(), |
|
i['visibility'], |
|
i['created_at'], |
|
(lambda x: '\n'.join([j['url'] for j in x]) if len(x) else '')(i['media_attachments']) |
|
]) |
|
|
|
toots_csv.close() |
|
|
|
print('Success. All toots have been exported to `Toots.json` and `Toots.csv`.') |
|
|
|
if isfile('all_media.json'): |
|
print('Importing from all_media.json...') |
|
all_media = json.load(open('all_media.json')) |
|
else: |
|
all_media = [] |
|
count = len(all_media) |
|
for i in toots.__reversed__(): |
|
if len(i['media_attachments']): |
|
for j in i['media_attachments']: |
|
all_media.append(j['url']) |
|
print('Total: ' + str(len(all_media)) + ', ' + |
|
str(len(all_media) - count) + ' need to be downloaded.') |
|
if len(all_media) - count > 0: |
|
atomic_write(all_media, 'all_media.json') |
|
rq.mount('https://' + all_media[-1].replace('http://', '') \ |
|
.replace('https://','').strip().split('/')[0], \ |
|
HTTPAdapter(max_retries=retries)) |
|
for i in range(count, len(all_media)): |
|
filename = str(i + 1) + '.' + all_media[i].split('.')[-1] |
|
fb = rq.get(all_media[i], stream=True, proxies=proxies, timeout=3) |
|
with open(filename, 'wb') as f: |
|
shutil.copyfileobj(fb.raw, f) |
|
print(filename + ' OK') |
|
sleep(1) |
|
print('Success.') |