Skip to content

Instantly share code, notes, and snippets.

@georgeyjm
Last active November 16, 2023 18:04
Show Gist options
  • Save georgeyjm/dd0225595b8fb6b58b5fdf0d10293092 to your computer and use it in GitHub Desktop.
Save georgeyjm/dd0225595b8fb6b58b5fdf0d10293092 to your computer and use it in GitHub Desktop.
Script for BiliBili Fans Analysis
import http
import time
import json
import math
from concurrent.futures import ThreadPoolExecutor, as_completed
import requests
from tqdm import tqdm
def get_num_fans(uid, session=None) -> int:
url = 'https://api.bilibili.com/x/relation/stat'
params = {'vmid': uid, 'jsonp': 'jsonp'}
if session is None:
resp = requests.get(url, params=params)
else:
resp = session.get(url, params=params)
data = resp.json()
return uid, data['data']['follower']
def list_fans_page(uid, page_num, per_page=20, session=None) -> list[dict]:
url = 'https://api.bilibili.com/x/relation/followers'
params = {
'vmid': uid,
'pn': page_num,
'ps': per_page,
'order': 'desc',
'order_type': 'attention',
'jsonp': 'jsonp',
# 'callback': '__jp6', # Adding this will somehow result in 403
}
if session is None:
resp = requests.get(url, params=params)
else:
resp = session.get(url, params=params)
resp.encoding = resp.apparent_encoding
data = resp.json()
return data['data']['list']
def get_session(cookies_filename='cookies.txt'):
session = requests.session()
# cookies = {}
# with open(cookies_filename, encoding='utf-8') as cookies_file:
# cookies_text = cookies_file.read().strip(';')
# for item in cookies_text.split(';'):
# name, value = item.strip().split('=', 1)
# cookies[name] = value
# cookies_jar = requests.utils.cookiejar_from_dict(cookies, cookiejar=None, overwrite=True)
# session.cookies = cookies_jar
session.cookies = http.cookiejar.MozillaCookieJar(cookies_filename)
return session
def select_top_n(fans_data, fans_followers, n=10):
result = sorted(fans_followers.items(), key=lambda t: t[1], reverse=True)[:n]
result = list(map(lambda t: {'uid': t[0], 'followers': t[1]}, result))
for fan in result:
for fan_data in fans_data:
if fan_data['mid'] != fan['uid']:
continue
fan.update({'name': fan_data['uname']}) # Can add avatar url here
break
return result
user_id = 6675591
per_page = 50
max_workers = 5
top_n = 10
session = get_session()
_, total_fans_num = get_num_fans(user_id, session=session)
print(f'Total of {total_fans_num} fans.')
all_data = []
fans_followers = {}
# Get all UID of fans
print('[INFO] Fetching all fans info')
total_pages_num = math.ceil(total_fans_num / per_page)
with tqdm(total=total_pages_num) as pbar:
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = [executor.submit(list_fans_page, user_id, i + 1, per_page, session=session) for i in range(total_pages_num)]
for future in as_completed(futures):
result = future.result()
all_data += result
pbar.update(1)
# Get each individual fans info
print('[INFO] Fetching followers count of each individual fan')
assert total_fans_num == len(all_data)
with tqdm(total=total_fans_num) as pbar:
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = [executor.submit(get_num_fans, fan['mid'], session=session) for fan in all_data]
for future in as_completed(futures):
uid, num_followers = future.result()
fans_followers[uid] = num_followers
pbar.update(1)
# Format and output data
top_n_result = select_top_n(all_data, fans_followers, top_n)
message = f'Top {top_n} followers:\n' + \
'\n'.join([f'{i + 1}.\t{fan["name"]} ({fan["uid"]}): {fan["followers"]} 粉丝' for i, fan in enumerate(top_n_result)])
message = '{0}\n\n{1}\n\n{0}'.format('=' * 50, message)
print()
print(message)
print()
with open('result.txt', 'w', encoding='utf-8') as f:
f.write(message)
# json.dump(all_data, open('output.json', 'w', encoding='utf-8'), indent=2)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment