Skip to content

Instantly share code, notes, and snippets.

@ExHix
Last active April 30, 2024 07:46
Show Gist options
  • Save ExHix/cc4e27a9143747777a7afec35f7edf81 to your computer and use it in GitHub Desktop.
Save ExHix/cc4e27a9143747777a7afec35f7edf81 to your computer and use it in GitHub Desktop.

update: script2.py is much more automatic
just add accounts via twscrape, run the script, and let time to collect data.

from tweety import Twitter, filters
import tweety.exceptions_
import time, traceback
from loguru import logger
import asyncio, os
from twscrape import API, gather, Account
error_rest_time = 2*60
s = set()
api = API() #proxy="socks5://127.0.0.1:9091")
async def search_test(account: Account):
cnt = 0
app = Twitter(f"cookies_{account.username}") #,proxy="socks5://127.0.0.1:10808")
try:
app.load_cookies(account.cookies)
except tweety.exceptions_.InvalidCredentials:
print("Invalid credential:",account.username)
os.system(f"twscrape relogin {account.username}")
account = await api.pool.get_account(account.username)
app.load_cookies(account.cookies)
print(account.username, "start testing")
from loguru import logger as logger
logger.add(f"log_{account.username}.log", format="{time}|{level}|{message}")
while True:
try:
user_info = app.me
print(user_info)
result = app.search("elon musk", filter_=filters.SearchFilters.Latest())
print(len(result.results))
cnt+=1
logger.info(f"`{account.username}`: ({cnt}) Success, returned results: {len(result.results)}")
await asyncio.sleep(10)
except tweety.exceptions_.RateLimitReached:
cnt = 0
logger.warning(f"`{account.username}`: Rate Limited.")
await asyncio.sleep(error_rest_time)
except tweety.exceptions_.InvalidCredentials:
cnt = 0
logger.warning(f"`{account.username}`: Invalid credential. Relogin...")
os.system(f"twscrape relogin {account.username}")
account = await api.pool.get_account(account.username)
app.load_cookies(account.cookies)
except Exception:
cnt = 0
logger.error(f"`{account.username}`: Unknown exception: " + traceback.format_exc())
await asyncio.sleep(error_rest_time)
async def main():
await api.pool.login_all()
pool = await api.pool.accounts_info()
print(pool)
for account_info in pool:
print(account_info["username"])
account = await api.pool.get_account(account_info["username"])
task = asyncio.create_task(search_test(account))
task.add_done_callback(s.discard)
s.add(task)
async def forever():
while True:
await asyncio.sleep(1000000000)
await forever()
if __name__ == "__main__":
asyncio.run(main())
from tweety import Twitter, filters
import tweety.exceptions_
import time, traceback
from loguru import logger
# Cookies can be a str or a dict
cookies = {}
session_name = "x_username"
error_rest_time = 2*60
logger.add(f"{session_name}.log", format="{time}|{level}|{message}")
app = Twitter(session_name) #,proxy="socks5://127.0.0.1:10808")
app.load_cookies(cookies)
user_info = app.me
cnt = 0
def test():
global cnt
try:
print(user_info)
result = app.search("elon musk", filter_=filters.SearchFilters.Latest())
print(len(result.results))
cnt+=1
logger.info(f"({cnt}) Success, returned results: {len(result.results)}")
time.sleep(10)
except tweety.exceptions_.RateLimitReached:
logger.warning("Rate Limited.")
cnt = 0
time.sleep(error_rest_time)
except Exception:
logger.error("Unknown exception: " + traceback.format_exc())
cnt = 0
time.sleep(error_rest_time)
while True:
test()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment