Last active
November 10, 2021 11:35
-
-
Save yanoshi/af50948d45e5ada02ec0bcdcaae988e3 to your computer and use it in GitHub Desktop.
Filtered Stream Test (Twitter API v2)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# From https://github.com/twitterdev/Twitter-API-v2-sample-code/blob/2fa0bfdb3a62dd4e544b6f4f334b2a557a395f49/Filtered-Stream/filtered_stream.py | |
# This software includes the work that is distributed in the Apache License 2.0(http://www.apache.org/licenses/LICENSE-2.0). | |
import requests | |
import os | |
import json | |
# To set your enviornment variables in your terminal run the following line: | |
# export 'BEARER_TOKEN'='<your_bearer_token>' | |
bearer_token = os.environ.get("BEARER_TOKEN") | |
def bearer_oauth(r): | |
""" | |
Method required by bearer token authentication. | |
""" | |
r.headers["Authorization"] = f"Bearer {bearer_token}" | |
r.headers["User-Agent"] = "v2FilteredStreamPython" | |
return r | |
def get_rules(): | |
response = requests.get( | |
"https://api.twitter.com/2/tweets/search/stream/rules", auth=bearer_oauth | |
) | |
if response.status_code != 200: | |
raise Exception( | |
"Cannot get rules (HTTP {}): {}".format(response.status_code, response.text) | |
) | |
# print(json.dumps(response.json())) | |
return response.json() | |
def delete_all_rules(rules): | |
if rules is None or "data" not in rules: | |
return None | |
ids = list(map(lambda rule: rule["id"], rules["data"])) | |
payload = {"delete": {"ids": ids}} | |
response = requests.post( | |
"https://api.twitter.com/2/tweets/search/stream/rules", | |
auth=bearer_oauth, | |
json=payload | |
) | |
if response.status_code != 200: | |
raise Exception( | |
"Cannot delete rules (HTTP {}): {}".format( | |
response.status_code, response.text | |
) | |
) | |
# print(json.dumps(response.json())) | |
def set_rules(delete): | |
# You can adjust the rules if needed | |
sample_rules = [ | |
{"value": "#異世界食堂", "tag": "anime hash tag"}, | |
{"value": "#鬼滅の刃", "tag": "anime hash tag2"}, | |
] | |
payload = {"add": sample_rules} | |
response = requests.post( | |
"https://api.twitter.com/2/tweets/search/stream/rules", | |
auth=bearer_oauth, | |
json=payload, | |
) | |
if response.status_code != 201: | |
raise Exception( | |
"Cannot add rules (HTTP {}): {}".format(response.status_code, response.text) | |
) | |
# print(json.dumps(response.json())) | |
def get_stream(set): | |
response = requests.get( | |
"https://api.twitter.com/2/tweets/search/stream", auth=bearer_oauth, stream=True, | |
) | |
print(response.status_code) | |
if response.status_code != 200: | |
raise Exception( | |
"Cannot get stream (HTTP {}): {}".format( | |
response.status_code, response.text | |
) | |
) | |
for response_line in response.iter_lines(): | |
if response_line: | |
json_response = json.loads(response_line) | |
print(json.dumps(json_response, indent=4, sort_keys=True)) | |
def main(): | |
rules = get_rules() | |
delete = delete_all_rules(rules) | |
set = set_rules(delete) | |
get_stream(set) | |
if __name__ == "__main__": | |
main() | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/bin/bash | |
### Please set env: CONSUMER_KEY and CONSUMER_SECRET | |
# ex: | |
# export CONSUMER_KEY=XXXXXXXXXXXXXXXXXX | |
# export CONSUMER_SECRET=XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX | |
export BEARER_TOKEN=$(curl -s -u "${CONSUMER_KEY}:${CONSUMER_SECRET}" --data 'grant_type=client_credentials' 'https://api.twitter.com/oauth2/token' | jq -r .access_token) | |
### Please run pip3 install | |
# pip3 install requests | |
# pip3 install requests-oauthlib | |
curl -s https://gist.githubusercontent.com/yanoshi/af50948d45e5ada02ec0bcdcaae988e3/raw/twitter_api_filtered_stream.py | python3 - | jq .data -S -c |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
curl -s https://gist.githubusercontent.com/yanoshi/af50948d45e5ada02ec0bcdcaae988e3/raw/twitter_api_filtered_stream.sh | bash