Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
from magniv.core import task
from datetime import datetime
import urllib
import json
import tweepy as tp
#auth for twitter api
auth = tp.OAuthHandler('xxxxxxx', 'xxxxxxxx')
auth.set_access_token('xxxxx-xxxxx', 'xxxxxxxx')
api = tp.API(auth, wait_on_rate_limit=False)
try:
api.verify_credentials()
print("Authentication done")
except:
print("Error during authentication")
def get_bitcoin_data():
"""get btc info via messari api"""
main_result = {}
try:
url = "https://data.messari.io/api/v1/assets/btc/metrics"
resp = urllib.request.urlopen(url).read()
result = json.loads(resp)
main_result['bitcoin price usd'] = round(result['data']['market_data']['price_usd'],2)
main_result['real volume last 24 hours'] = round(result['data']['market_data']['real_volume_last_24_hours'],2)
main_result['percent change usd last 1 hour'] = round(result['data']['market_data']['percent_change_usd_last_1_hour'],2)
#main_result['percent_change_btc_last_24_hours'] = result['data']['roi_data']['percent_change_btc_last_24_hours']
except:
pass
return main_result
def rephrase_data(reff):
""" take a dict and convert it to a str"""
empy_res = []
for key, value in reff.items():
empy_res.append("{} -- {}\n\n".format(key, value))
return " ".join(empy_res)
@task(schedule="@hourly")
def post_tweet():
""" get btc price, rephrase and send a tweet"""
try:
result = rephrase_data(get_bitcoin_data())
print("gotten tweet, rephrase and ready to shoot")
return api.update_status(result)
except:
print("couldn't post tweet")
pass
if __name__ == '__main__':
post_tweet()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment