Created
October 3, 2022 18:52
-
-
Save vicety/16fd017001a891fac21ced617e61ed9b to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import requests | |
import os | |
import json | |
import socket | |
import sys | |
# To set your enviornment variables in your terminal run the following line: | |
# export 'BEARER_TOKEN'='<your_bearer_token>' | |
bearer_token = os.environ.get("BEARER_TOKEN") | |
def bearer_oauth(r): | |
""" | |
Method required by bearer token authentication. | |
""" | |
r.headers["Authorization"] = f"Bearer {bearer_token}" | |
r.headers["User-Agent"] = "v2FilteredStreamPython" | |
return r | |
def get_rules(): | |
response = requests.get( | |
"https://api.twitter.com/2/tweets/search/stream/rules", auth=bearer_oauth | |
) | |
if response.status_code != 200: | |
raise Exception( | |
"Cannot get rules (HTTP {}): {}".format(response.status_code, response.text) | |
) | |
print(json.dumps(response.json())) | |
return response.json() | |
def delete_all_rules(rules): | |
if rules is None or "data" not in rules: | |
return None | |
ids = list(map(lambda rule: rule["id"], rules["data"])) | |
payload = {"delete": {"ids": ids}} | |
response = requests.post( | |
"https://api.twitter.com/2/tweets/search/stream/rules", | |
auth=bearer_oauth, | |
json=payload | |
) | |
if response.status_code != 200: | |
raise Exception( | |
"Cannot delete rules (HTTP {}): {}".format( | |
response.status_code, response.text | |
) | |
) | |
print(json.dumps(response.json())) | |
def set_rules(delete): | |
# You can adjust the rules if needed | |
sample_rules = [ | |
{"value": "dog has:images", "tag": "dog pictures"}, | |
{"value": "cat has:images -grumpy", "tag": "cat pictures"}, | |
] | |
payload = {"add": sample_rules} | |
response = requests.post( | |
"https://api.twitter.com/2/tweets/search/stream/rules", | |
auth=bearer_oauth, | |
json=payload, | |
) | |
if response.status_code != 201: | |
raise Exception( | |
"Cannot add rules (HTTP {}): {}".format(response.status_code, response.text) | |
) | |
print(json.dumps(response.json())) | |
def get_stream(set, clientConn): | |
response = requests.get( | |
"https://api.twitter.com/2/tweets/search/stream", auth=bearer_oauth, stream=True, | |
) | |
print(response.status_code) | |
if response.status_code != 200: | |
raise Exception( | |
"Cannot get stream (HTTP {}): {}".format( | |
response.status_code, response.text | |
) | |
) | |
for response_line in response.iter_lines(): | |
if response_line: | |
json_response = json.loads(response_line) | |
print(json_response["data"]["text"]) | |
# print(json.dumps(json_response, indent=4, sort_keys=True)) | |
clientConn.send(str.encode(json_response["data"]["text"] + "\n")) | |
def main(clientfd): | |
rules = get_rules() | |
delete = delete_all_rules(rules) | |
set = set_rules(delete) | |
get_stream(set, clientfd) | |
# if __name__ == "__main__": | |
# main() | |
# initialize tcp server | |
TCP_IP = "localhost" | |
TCP_PORT = 9009 | |
conn = None | |
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
s.bind((TCP_IP, TCP_PORT)) | |
s.listen(1) | |
print("Waiting for TCP connection...") | |
conn, addr = s.accept() | |
print("Connected... Starting getting tweets.") | |
main(conn) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment