Skip to content

Instantly share code, notes, and snippets.

@MaxAntony
Created October 30, 2022 18:09
Show Gist options
  • Save MaxAntony/7210a9fb60f9a247150afe2248b1e141 to your computer and use it in GitHub Desktop.
Save MaxAntony/7210a9fb60f9a247150afe2248b1e141 to your computer and use it in GitHub Desktop.
conectarse a telegram y encender pc via WOL en una raspberry pi pico w
import struct
import socket
import network
import utime
import time
import gc
import ujson
import urequests
wifi_config = {
'ssid':'nombrewifi',
'password':'contrasenawifi'
}
utelegram_config = {
'token': '5601296679:tokenbot'
}
MAC = 'XX:XX:XX:XX:XX:XX'
BROADCASTADDRESS = '192.168.100.255'
class ubot:
def __init__(self, token, offset=0):
self.url = 'https://api.telegram.org/bot' + token
self.commands = {}
self.default_handler = None
self.message_offset = offset
self.sleep_btw_updates = 3
messages = self.read_messages()
if messages:
if self.message_offset==0:
self.message_offset = messages[-1]['update_id']
else:
for message in messages:
if message['update_id'] >= self.message_offset:
self.message_offset = message['update_id']
break
def send(self, chat_id, text):
data = {'chat_id': chat_id, 'text': text}
try:
headers = {'Content-type': 'application/json', 'Accept': 'text/plain'}
response = urequests.post(self.url + '/sendMessage', json=data, headers=headers)
response.close()
return True
except:
return False
def read_messages(self):
result = []
self.query_updates = {
'offset': self.message_offset + 1,
'limit': 1,
'timeout': 30,
'allowed_updates': ['message']}
try:
update_messages = urequests.post(self.url + '/getUpdates', json=self.query_updates).json()
if 'result' in update_messages:
for item in update_messages['result']:
result.append(item)
return result
except (ValueError):
return None
except (OSError):
print("OSError: request timed out")
return None
def listen(self):
while True:
self.read_once()
time.sleep(self.sleep_btw_updates)
gc.collect()
def read_once(self):
messages = self.read_messages()
if messages:
if self.message_offset==0:
self.message_offset = messages[-1]['update_id']
self.message_handler(messages[-1])
else:
for message in messages:
if message['update_id'] >= self.message_offset:
self.message_offset = message['update_id']
self.message_handler(message)
break
def register(self, command, handler):
self.commands[command] = handler
def set_default_handler(self, handler):
self.default_handler = handler
def set_sleep_btw_updates(self, sleep_time):
self.sleep_btw_updates = sleep_time
def message_handler(self, message):
if 'text' in message['message']:
parts = message['message']['text'].split(' ')
if parts[0] in self.commands:
self.commands[parts[0]](message)
else:
if self.default_handler:
self.default_handler(message)
debug = True
sta_if = network.WLAN(network.STA_IF)
sta_if.active(True)
sta_if.scan()
sta_if.connect(wifi_config['ssid'], wifi_config['password'])
if debug: print('WAITING FOR NETWORK - sleep 20')
print(sta_if.isconnected())
while(not sta_if.isconnected()):
print("red no disponible aun")
utime.sleep(1)
def get_message(message):
bot.send(message['message']['chat']['id'], message['message']['text'].upper())
def reply_ping(message):
print(message)
bot.send(message['message']['chat']['id'], 'pong')
def sendWOL():
PORT = '9'
wol_header = b'\xff'*6
split_mac_address = MAC.split(':')
MSGw=struct.pack(b'!BBBBBB',
int(split_mac_address[0], 16),
int(split_mac_address[1], 16),
int(split_mac_address[2], 16),
int(split_mac_address[3], 16),
int(split_mac_address[4], 16),
int(split_mac_address[5], 16))
MSG = wol_header+MSGw*16
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s.sendto(MSG, (BROADCASTADDRESS, 1926))
def start_pc(message):
bot.send(message['message']['chat']['id'], 'Encendiendo la computadora')
for _ in range(5):
sendWOL()
bot.send(message['message']['chat']['id'],'Magic packet enviado')
gc.collect()
bot.send(message['message']['chat']['id'], 'finalizado envio de paquetes')
if sta_if.isconnected():
print('conectando a telegram')
bot = ubot(utelegram_config['token'])
print('conexion a bot exitosa')
bot.register('/ping', reply_ping)
bot.register('/startpc', start_pc)
bot.set_default_handler(get_message)
print('BOT LISTENING')
bot.listen()
else:
print('NOT CONNECTED - aborting')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment