Skip to content

Instantly share code, notes, and snippets.

@abegong
Created April 30, 2023 17:21
Show Gist options
  • Save abegong/d351edd8c2f5efc86940b9c045e126b8 to your computer and use it in GitHub Desktop.
Save abegong/d351edd8c2f5efc86940b9c045e126b8 to your computer and use it in GitHub Desktop.
Download all posts that an actor has liked using the psychonaut implementation of the ATProtocol
"""
Example code to download all posts that an actor has Liked from BlueSky
Uses the psychonaut implementation of ATProtocol.
"""
import json
from typing import List
from psychonaut.client import get_simple_client_session
from psychonaut.client.cursors import collect_cursored
from psychonaut.cli.util import as_async, clean_handle
from psychonaut.api.lexicons.com.atproto.identity.resolve_handle import (
ResolveHandleReq, ResolveHandleResp,
)
from psychonaut.api.lexicons.com.atproto.repo.list_records import (
ListRecordsReq, ListRecordsResp,
)
from psychonaut.api.lexicons.app.bsky.feed.get_posts import (
GetPostsReq, GetPostsResp,
)
@as_async
async def resolve_handle(actor: str):
async with get_simple_client_session() as sess:
req = ResolveHandleReq(handle=actor)
resp = await req.do_xrpc(sess)
assert isinstance(resp, ResolveHandleResp)
return resp
@as_async
async def list_records(
user_did: str,
collection: str,
):
records = []
async with get_simple_client_session() as sess:
gen = collect_cursored(
sess,
ListRecordsReq(
repo=user_did,
collection=collection,
limit=100,
),
ListRecordsResp,
"records"
)
async for record in gen:
records.append(record)
return records
@as_async
async def get_posts(
uri_list: List[str],
):
async with get_simple_client_session() as sess:
#!!! This function doesn't yet iterate to make multiple calls to GetPostsReq.
req = GetPostsReq(uris=uri_list[:25])
resp = await req.do_xrpc(sess)
assert isinstance(resp, GetPostsResp)
return resp
# Resolve the handle
response = resolve_handle("abegong.bsky.social")
print(response)
# Taking the DID from the resolved handle and get all likes
response = list_records(
user_did=response.did,
collection="app.bsky.feed.like",
)
uris = []
for record in response:
print(json.dumps(record, indent=2))
uris.append(record["value"]["subject"]["uri"])
# Get all like posts
posts = get_posts(
uri_list = uris,
)
print(posts.json(indent=2))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment