Skip to content

Instantly share code, notes, and snippets.

@vicety
Created October 3, 2022 18:52
Show Gist options
  • Save vicety/16fd017001a891fac21ced617e61ed9b to your computer and use it in GitHub Desktop.
Save vicety/16fd017001a891fac21ced617e61ed9b to your computer and use it in GitHub Desktop.
import requests
import os
import json
import socket
import sys
# To set your enviornment variables in your terminal run the following line:
# export 'BEARER_TOKEN'='<your_bearer_token>'
bearer_token = os.environ.get("BEARER_TOKEN")
def bearer_oauth(r):
"""
Method required by bearer token authentication.
"""
r.headers["Authorization"] = f"Bearer {bearer_token}"
r.headers["User-Agent"] = "v2FilteredStreamPython"
return r
def get_rules():
response = requests.get(
"https://api.twitter.com/2/tweets/search/stream/rules", auth=bearer_oauth
)
if response.status_code != 200:
raise Exception(
"Cannot get rules (HTTP {}): {}".format(response.status_code, response.text)
)
print(json.dumps(response.json()))
return response.json()
def delete_all_rules(rules):
if rules is None or "data" not in rules:
return None
ids = list(map(lambda rule: rule["id"], rules["data"]))
payload = {"delete": {"ids": ids}}
response = requests.post(
"https://api.twitter.com/2/tweets/search/stream/rules",
auth=bearer_oauth,
json=payload
)
if response.status_code != 200:
raise Exception(
"Cannot delete rules (HTTP {}): {}".format(
response.status_code, response.text
)
)
print(json.dumps(response.json()))
def set_rules(delete):
# You can adjust the rules if needed
sample_rules = [
{"value": "dog has:images", "tag": "dog pictures"},
{"value": "cat has:images -grumpy", "tag": "cat pictures"},
]
payload = {"add": sample_rules}
response = requests.post(
"https://api.twitter.com/2/tweets/search/stream/rules",
auth=bearer_oauth,
json=payload,
)
if response.status_code != 201:
raise Exception(
"Cannot add rules (HTTP {}): {}".format(response.status_code, response.text)
)
print(json.dumps(response.json()))
def get_stream(set, clientConn):
response = requests.get(
"https://api.twitter.com/2/tweets/search/stream", auth=bearer_oauth, stream=True,
)
print(response.status_code)
if response.status_code != 200:
raise Exception(
"Cannot get stream (HTTP {}): {}".format(
response.status_code, response.text
)
)
for response_line in response.iter_lines():
if response_line:
json_response = json.loads(response_line)
print(json_response["data"]["text"])
# print(json.dumps(json_response, indent=4, sort_keys=True))
clientConn.send(str.encode(json_response["data"]["text"] + "\n"))
def main(clientfd):
rules = get_rules()
delete = delete_all_rules(rules)
set = set_rules(delete)
get_stream(set, clientfd)
# if __name__ == "__main__":
# main()
# initialize tcp server
TCP_IP = "localhost"
TCP_PORT = 9009
conn = None
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.bind((TCP_IP, TCP_PORT))
s.listen(1)
print("Waiting for TCP connection...")
conn, addr = s.accept()
print("Connected... Starting getting tweets.")
main(conn)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment