Skip to content

Instantly share code, notes, and snippets.

@arivero
Last active October 12, 2016 23:51
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save arivero/1e356ffff5c4cb447965cd47f3baf5b6 to your computer and use it in GitHub Desktop.
Save arivero/1e356ffff5c4cb447965cd47f3baf5b6 to your computer and use it in GitHub Desktop.
pokemon notification bot for Telegram/pkalarm somehow extended
#Setup Logging
import logging
log = logging.getLogger(__name__)
#Python modules
import io
import json
#Local modules
from ..alarm import Alarm
from ..utils import *
#External modules
import telepot
from telepot.namedtuple import InlineKeyboardMarkup,InlineKeyboardButton, ReplyKeyboardMarkup, KeyboardButton, ReplyKeyboardHide, ForceReply
from geopy.distance import VincentyDistance
from s2sphere import LatLng
import motionless
import shapely
import shapely.geometry
from urllib2 import urlopen
import requests
from peewee import *
db = SqliteDatabase('botUsers.db')
class BaseModel(Model):
class Meta:
database = db
#las tres clases deberian ser foreigkeyfield(user) pero no tenemos definida la clase/tabla User :-D
class homebase(BaseModel):
user=CharField(unique=True,primary_key=True)
latitud=DecimalField()
longitud=DecimalField()
class distancia(BaseModel):
user=CharField(unique=True,primary_key=True)
metros=IntegerField()
class favorites(BaseModel):
user=CharField()
pokemon=IntegerField()
class Meta:
primary_key = CompositeKey('user','pokemon')
def investiga(position, previa=None, descartar=False):
Vdist=VincentyDistance(meters=250*1.4142)
radial=VincentyDistance(meters=200)
currentShape=shapely.geometry.Polygon([(p[0],p[1]) for p in [radial.destination(point=(position[0],position[1],230),bearing=b) for b in range(0,360,15)]])
if previa:
if descartar:
newShape=previa.difference(currentShape)
else:
newShape=currentShape.intersection(previa)
if not newShape.is_empty:
currentShape=newShape
center=(currentShape.centroid.x,currentShape.centroid.y,230)
p1=Vdist.destination(point=center,bearing=45+180)
p2=Vdist.destination(point=center,bearing=45)
gmap = motionless.DecoratedMap(size_x=600, size_y=600,scale=2,fillcolor='gray',pathcolor="0x0000EE20",region=True,
lat=center[0],lon=center[1],zoom=17) #17 is 1.1943 m/pixel???
for p in currentShape.boundary.coords:
gmap.add_path_latlon(p[0],p[1])
urlsmall=gmap.generate_url() #or with simplify()
cursor=db.execute_sql('select printf("%.4f",latitude) lat, printf("%.4f",longitude) lon from spawns where latitude > ? and longitude > ? and latitude < ? and longitude < ? order by random()',
[p1[0],p1[1],p2[0],p2[1]])
totalmarkers=0
container=currentShape.buffer(90.0/10000/1000*25.0) #el buffer deberia ser distinto en lat que en long, pero bueno...
reserve=[]
for lat,lon in cursor:
color='red'
size = 'tiny'
marker=motionless.LatLonMarker(lat, lon, color=color, size=size)
if container.contains(shapely.geometry.Point(float(lat),float(lon))):
gmap.add_marker(marker)
totalmarkers+=1
else:
reserve.append(marker)
for m in reserve:
gmap.add_marker(m)
totalmarkers+=1
if totalmarkers>100:
break
try:
url=gmap.generate_url()
except:
url=urlsmall
print "url len is", len(url), len(urlsmall),totalmarkers
return url,currentShape
class Telegram_Alarm(Alarm):
#Gather settings and create alarm
def __init__(self, settings):
db.connect()
db.create_tables([homebase, favorites,distancia], safe=True)
self.bot_token = settings['bot_token']
self.avisados=set()
self.exclusion={} #dict of booleans
self.connect()
self.chat_id = settings['chat_id']
self.send_map = parse_boolean(settings.get('send_map', "True"))
self.title = settings.get('title', "A wild <pkmn> has appeared!")
self.body = settings.get('body', "<gmaps> \n #<pkmn> available until <24h_time> (<time_left>).")
log.info("Telegram Alarm intialized.")
self.client.sendMessage(self.chat_id, 'Relanzando PokeAlarm! Se olvidan posiciones previas')
self.client.message_loop(self.handle)
self.shapes={}
#(Re)establishes Telegram connection
def connect(self):
self.client = telepot.Bot(self.bot_token)
def handle(self,msg):
print "al bot:",msg
if telepot.flavor(msg) == 'callback_query':
cbid,fromuser_id,data = telepot.glance(msg,flavor='callback_query') #ahora tambien hay un chat_instance
chat_id,origin_id =telepot.origin_identifier(msg)
self.exclusion[fromuser_id]= (data=="exclusion")
self.client.answerCallbackQuery(cbid,
"Estas en modo "+data+". Responde al mensaje para dar una nueva posicion con la que operar",
show_alert=not (fromuser_id in self.avisados) )
#reply_markup=ForceReply(selective=True))
self.avisados.add(fromuser_id)
return
print "al bot: 2",msg['chat']
print "al bot: 3",msg['chat']['id'],msg['chat']['id'] < 0
if "location" in msg:
if msg['chat']['id'] < 0 : #es para analizar!!
chat_id=msg['chat']['id']
origen=msg['reply_to_message']['message_id']
self.client.sendChatAction(chat_id,'find_location') #upload_photo upload_document typing
respuesta,shape=investiga((msg['location']['latitude'],msg['location']['longitude']),self.shapes.get(origen),self.exclusion.get(msg['from']['id']))
print respuesta
datafile=io.BytesIO(urlopen(respuesta).read())
datafile.name='mapa.png'
url = 'https://api.telegram.org/bot'+self.bot_token+'/sendPhoto'
markup = {'inline_keyboard':[
[{'text':'Corroborar (lo veo)', 'callback_data':'interseccion'}],
[{'text':'Descartar (no lo veo)', 'callback_data':'exclusion'}]
]}
data={ 'chat_id':msg['chat']['id'],
'caption': "Zona del avistamiento",
'reply_markup':json.dumps(markup)}
#files = {'photo':open('mapa.png', 'rb')}
files = {'photo': datafile}
r = requests.post(url, files=files, data=data)
print r.json()
if r.json()["ok"]:
self.shapes[r.json()["result"]["message_id"]]=shape
#self.client.sendMessage(chat_id,"hecho")
else:
dato,created=homebase.get_or_create(user=msg['from']['id'],defaults={'latitud':41.648088, 'longitud':-0.893720})
dato.latitud=msg['location']['latitude']
dato.longitud=msg['location']['longitude']
dato.save()
self.client.sendMessage(msg['from']['id'],'posicion actualizada')
elif telepot.flavor(msg)=="chat":
content_type, chat_type, chat_id = telepot.glance(msg)
if content_type=="text":
texto=msg['text'].strip().lower()
command=texto.split(" ",1)[0].split("@",1)[0]
line=texto.split(" ")
if command<>"/investiga":
try:
if command in ["/desde","/pokemention","/hasta","/help","/start","/settings","/status"]:
self.client.sendMessage(msg['from']['id'],"Respuesta al comando "+command)
else:
self.client.sendMessage(msg['from']['id'],"Error, comando desconocido. Consulta /help")
except Exception as inst:
print "Error", type(inst)
self.client.sendMessage(chat_id,'no puedo contestarte.\n Abre primero una conversacion en privado.')
try:
print inst
print inst.description()
print inst.args
print inst.json()
except:
print "oh, well"
if command=="/status":
data=""
with open('/home/alejandro/PokemomMap2/donepokes.txt', 'r') as f:
data=f.read()
self.client.sendMessage(chat_id,data)
if command=="/pokemention":
if len(line) < 3 or (line[1] != "add" and line[1]!="remove"):
self.client.sendMessage(chat_id, '/pokemention {add|remove} Pokemon_id_number(s)')
else:
for token in line[2:]:
try:
pokemon=int(token)
except:
pokemon=999
if pokemon > 151 or pokemon <1:
self.client.sendMessage(msg['from']['id'], 'usa numeros entre 1 y 151 separados por espacios')
else:
if line[1]=="add":
favorites.get_or_create(user=msg['from']['id'],pokemon=pokemon)
self.client.sendMessage(msg['from']['id'], 'Te mencionaremos si tienes nick y avistamos algun ' + get_pkmn_name(pokemon))
elif line[1]=="remove":
favorites.delete().where(favorites.user==msg['from']['id'],favorites.pokemon==pokemon).execute()
self.client.sendMessage(msg['from']['id'], get_pkmn_name(pokemon)+' eliminado de tu lista')
self.client.sendMessage(msg['from']['id'], 'Recuerda que puedes usar /desde y /hasta para configurar radio de menciones')
if command=="/desde":
markup = ReplyKeyboardMarkup(keyboard=[
[KeyboardButton(text='Location', request_location=True)],
],one_time_keyboard=True)
self.client.sendMessage(chat_id, 'La posicion solo se puede especificar en chat privado')
self.client.sendMessage(msg['from']['id'], 'me tienes que indicar tu posicion con el boton que sale ahora en el teclado... si consigo sacarlo. Si no, vuelve a enviarme el comando /desde',reply_markup=markup)
if command=="/hasta":
metros=-1
try:
metros=int(line[1])
except:
self.client.sendMessage(msg['from']['id'], 'la distancia debe ser un numero entero, en metros')
if metros>=0:
dato,created=distancia.get_or_create(user=msg['from']['id'],defaults={'metros':5000})
dato.metros=metros
dato.save()
mensaje= 'distancia cambiada, '+str(metros)+' metros\n'
try:
pos=homebase.get(user=msg['from']['id'])
except:
mensaje+= 'Tienes que establecer tu base con el comando /desde en chat privado con el bot'
self.client.sendMessage(msg['from']['id'],mensaje)
if command=="/start":
mensaje="""
Hola. Este bot tiene una funcion que se puede usar en cualquier grupo, /investiga, y otras
que son solo para configurar notificaciones en PokeRaresZgz. Haz /help para mas info"""
self.client.sendMessage(msg['from']['id'],mensaje)
if command=="/help":
mensaje="""
/settings lista los settings personales
/investiga en un grupo, arranca la busqueda de puntos de spawns
/pokemention define los pokemones para los que deseas aparecer en la lista de menciones
/desde punto de origen del radio de menciones
/hasta distancia del radio de menciones"""
self.client.sendMessage(msg['from']['id'],mensaje)
if command=="/settings":
mensaje="Configuracion de menciones en PokeRaresZgz:"
pks=favorites.select().where(favorites.user==msg['from']['id']).execute()
for p in pks:
mensaje+="\n siguiendo "+str(p.pokemon)+" "+get_pkmn_name(int(p.pokemon))
try:
dist=distancia.get(distancia.user==msg['from']['id'])
mensaje+="\n distancia hasta "+str(dist.metros)+" metros"
except:
mensaje+="\n distancia por defecto (5000 metros)"
try:
latlon=homebase.get(homebase.user==msg['from']['id'])
mensaje+="\n base latitud="+str(latlon.latitud)+" longitud="+str(latlon.longitud)
except:
latlon=False
mensaje+="\n base por defecto (Glorieta de los Zagries)"
self.client.sendMessage(msg['from']['id'],mensaje)
if latlon:
self.client.sendLocation(msg['from']['id'],float(latlon.latitud),float(latlon.longitud))
if command=="/investiga":
self.client.sendMessage(chat_id,
'Contesta aqui dando una posicion',
reply_to_message_id=msg['message_id'],
reply_markup=ForceReply(selective=True))
#Send Pokemon Info
def pokemon_alert(self, pkinfo):
print "alert",pkinfo
name = get_pkmn_name(pkinfo["id"])
if pkinfo["spawnpoint_id"]<>'0':
cursor=db.execute_sql('select 100.0*x/n from sample where point_id=? and pokemon_id=?',
[pkinfo["spawnpoint_id"],pkinfo["id"]])
resultcursor=[r for r in cursor]
if len(resultcursor) > 0:
porcentaje=float(resultcursor[0][0])
if porcentaje > 10.0:
print "pokemon de nido",name, pkinfo["spawnpoint_id"]
#self.client.sendMessage(self.chat_id,"Bah, "+name+ "en nido.")
return
title = replace(self.title, pkinfo)
if pkinfo["24h_time"]=="02:00:00":
body=replace("Avistado #<pkmn> en un radio de 200 metros del ojeador" ,pkinfo)
else:
body = replace(self.body, pkinfo)
args = {
'chat_id': self.chat_id,
'text': '<b>' + title + '</b> \n' + body,
'parse_mode': 'HTML',
'disable_web_page_preview': 'False',
}
try_sending(log, self.connect, "Telegram", self.client.sendMessage, args)
concerned= favorites.select(favorites.user,distancia.metros,homebase.latitud,homebase.longitud
).join(distancia,JOIN.LEFT_OUTER,on=(distancia.user==favorites.user)
).join(homebase,JOIN.LEFT_OUTER,on=(homebase.user==distancia.user)
).where((favorites.pokemon==132) | (favorites.pokemon==pkinfo["id"])
).group_by(favorites.user
).dicts().execute()
concernedWithDefaults=[(float(user['latitud']) if user['latitud'] else 41.648088,
float(user['longitud']) if user['longitud'] else -0.893720,
int(user['metros']) if user['metros'] else 5000,
user['user'] ) for user in concerned]
print concernedWithDefaults,pkinfo
concernedDist=[(6371008.8 * LatLng.from_degrees(float(pkinfo['lat']),float(pkinfo['lng'])).get_distance(LatLng.from_degrees(t[0],t[1])).radians,
t[2],t[3]) for t in concernedWithDefaults]
concernedDist.sort()
print concernedDist
concerned=[(t[2],t[0]) for t in concernedDist if t[0]<t[1]]
mensaje="Visto "+name+"! "
if len(concerned)>0:
primero=True
for userid,dist in concerned:
result= self.client.getChatMember(self.chat_id,userid)
print result
if result['status'] in ["creator","administrator","member"]:
try:
username='@'+result['user']['username']
except:
print "falla con ", result['user']['first_name']+ ' @'+str(userid)
username=""
mensaje+= " "+ username
if primero and username<>"" and dist < 500:
primero=False
self.client.sendMessage(self.chat_id,"A menos de 500m, "+username+"!")
else:
mensaje+= "Pero nadie lo quiere :-("
self.client.sendMessage(self.chat_id,mensaje)
if (self.send_map is True) and (len(concerned)>0):
locargs = {
'chat_id': self.chat_id,
'latitude': pkinfo['lat'],
'longitude': pkinfo['lng'],
'disable_notification':'True'
}
try_sending(log, self.connect, "Telegram (loc)", self.client.sendLocation, locargs)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment