Skip to content

Instantly share code, notes, and snippets.

@anonfloppa
Created March 12, 2022 19:04
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save anonfloppa/60022c5b9ed973f7555c5c94d5e14505 to your computer and use it in GitHub Desktop.
Save anonfloppa/60022c5b9ed973f7555c5c94d5e14505 to your computer and use it in GitHub Desktop.
delete old devices of inactive users to clear room in device_inbox
import datetime
import requests
import urllib
import sys
import time
server = 'https://matrix.SERVER.TLD'
token = 'PUT TOKEN HERE'
# devices older than 180 days for users who didn't login for 45 days will be deleted
device_days = 180
user_days = 45
now = datetime.datetime.now()
then = now - datetime.timedelta(days=device_days)
device_delete_ts = int(datetime.datetime.timestamp(then) * 1000)
then = now - datetime.timedelta(days=user_days)
user_inactive_ts = int(datetime.datetime.timestamp(then) * 1000)
def get_users(ffrom, limit=10):
api = f'{server}/_synapse/admin/v2/users?from={ffrom}&limit={limit}&guests=false&order_by=creation_ts&access_token={token}'
r = requests.get(api)
return r.json()
def get_devices(name):
api = f'{server}/_synapse/admin/v2/users/{name}/devices?access_token={token}'
r = requests.get(api)
return r.json()
def delete_devices(name, devices):
api = f'{server}/_synapse/admin/v2/users/{name}/delete_devices'
headers = {
'Authorization': f'Bearer {token}',
}
data = {
'devices': devices
}
print(api, data)
return requests.post(api, headers=headers, json=data)
users = get_users(0)
total_users = int(users['total']) + (10 - int(str(users['total'])[-1]))
print('total_users:', total_users)
for i in range(0, total_users, 10):
users = get_users(i)
for user in users['users']:
if user['admin'] == 1:
continue
name = user['name']
devices = get_devices(name)
to_delete = []
user_last_seen = 0
for device in devices['devices']:
device_last_seen = device.get('last_seen_ts', 0) or 0
if device_last_seen > user_last_seen:
user_last_seen = device_last_seen
if device_last_seen < device_delete_ts:
print('device', device['device_id'], 'for user', device['user_id'], 'could be deleted')
to_delete.append(device['device_id'])
if len(to_delete) > 0 and user_last_seen < user_inactive_ts:
try:
delete_devices(name, to_delete)
except:
pass # catch timeout
finally:
print('wait a while after deleting devices...')
time.sleep(10) # flatten the load
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment