Skip to content

Instantly share code, notes, and snippets.

@felipeborges
Created June 27, 2015 17:47
Show Gist options
  • Save felipeborges/3717d0e888c28fd3e82e to your computer and use it in GitHub Desktop.
Save felipeborges/3717d0e888c28fd3e82e to your computer and use it in GitHub Desktop.
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import threading
from time import sleep
import requests
import simplejson
TELEGRAM_URL = "https://api.telegram.org/bot"
TOKEN = "123196750:AAFEMEjN7S-Fc-J283jZ8JQqZnZ5xTSCW6c"
DEFAULT_TIMEOUT = 300.0
DEFAULT_RETRY_TIME = 3.0
class Stream:
def __init__(self):
self.running = False
self.timeout = DEFAULT_TIMEOUT
self.retry_count = 0
self.retry_time = DEFAULT_RETRY_TIME
def _start(self, callback, **args):
while self.running:
callback(**args)
sleep(self.retry_time)
self.running = False
def run(self, callback, **args):
self.running = True
self._thread = threading.Thread(target = self._start, args=(callback,))
self._thread.start()
class Api:
def __init__(self, token = None):
if token is None:
print "Missing API Token"
return
self.token = token
# just for testing
self.last_msg = None
def get_updates(self):
url = TELEGRAM_URL + self.token + "/getUpdates"
try:
response = requests.get(url, timeout = 150)
except requests.exceptions.RequestException:
print "Request failed"
return
json_obj = simplejson.loads(response.text)
latest_msg = json_obj["result"][-1]["update_id"]
if self.last_msg != latest_msg:
print json_obj["result"][-1]["message"]["text"]
self.last_msg = latest_msg
def main():
api = Api(TOKEN)
s = Stream()
s.run(api.get_updates)
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment