Skip to content

Instantly share code, notes, and snippets.

@oyi77
Created February 18, 2023 15:02
Show Gist options
  • Save oyi77/7ce8dbf1918586126c58941f6b12b256 to your computer and use it in GitHub Desktop.
Save oyi77/7ce8dbf1918586126c58941f6b12b256 to your computer and use it in GitHub Desktop.
Modem Pool Communication
from serial.tools import list_ports
import serial
import os, sys, inspect
from loguru import logger as logging
import datetime
import re
import json
import time
from gsmmodem.modem import GsmModem
from time import sleep
from pyin import *
# PENTING DAN OJOK DIUBAH
proj_path = resource_path() + "/"
if proj_path not in sys.path:
sys.path.insert(0, proj_path)
portList_file = proj_path+'datastore/portList.json'
simList_file = proj_path+'datastore/simList.json'
portList = []
portListTemp = []
simList = []
simListTemp = []
portDetail = {"portLoc": "", "status": ""}
simDetail = {"nomer": "", "pulsa": "", "port": "", "waktu": ""}
def _connect_modem(proty, mode="init"):
try:
modem = GsmModem(proty, 115200)
modem.connect(None)
modem.waitForNetworkCoverage(10)
portDetail = {"portLoc": proty, "status": "good"}
portListTemp.append(portDetail.copy())
return modem
except Exception as e:
portDetail = {"portLoc": proty, "status": "error"}
portListTemp.append(portDetail.copy())
logging.warning("[" + proty + "][CONNECT MODEM FAILED] " + str(e))
return False
def _connect_modem2(proty, mode="init"):
try:
s = serial.Serial(port=proty, baudrate=115200, timeout=5)
s.write("ATZ\r".encode())
s.write("AT+CPIN?\r".encode())
res = s.readall()
res = res.decode()
res = res.replace(" ", "")
if "ERROR" in res:
portDetail = {"portLoc": proty, "status": "error"}
portListTemp.append(portDetail.copy())
s.close()
return False
else:
if mode == "init":
delSMS(s)
portDetail = {"portLoc": proty, "status": "good"}
portListTemp.append(portDetail.copy())
return s
except Exception as e:
logging.error("["+proty+"][CONNECT MODEM FAILED] "+str(e))
return False
def sendUSSD(ser,ussd):
""" Kirim Message """
try:
waktu = datetime.datetime.now()
format_waktu = waktu.strftime('%Y-%m-%d %H:%M:%S')
balesan = ser.sendUssd(ussd)
needed_answer = balesan.message.replace(" ","")
x = re.search("(\+62|62|0)8[1-9][0-9]{6,9}", needed_answer)
y = re.search(r"PulsaRp(.*?)s/d", needed_answer)
ser.close()
balesan = {"nomer": x.group(), "pulsa": y.group(1), "waktu": str(format_waktu)}
return balesan
except Exception as e:
logging.error(f"Send Ussd failed: {e}")
return False
def readSMS(ser):
""" Read Message """
try:
waktu = datetime.datetime.now()
format_waktu = waktu.strftime('%Y-%m-%d %H:%M:%S')
srlcmd = 'AT+CMGF=1\r'
ser.write(srlcmd.encode())
srlcmd = 'AT+CMGL="ALL"\r'
ser.write(srlcmd.encode())
balesan = (ser.readall()).decode()
needed_answer = balesan.replace(" ","")
y = re.search(r"verifikasi:(.*?)\(berlaku", needed_answer)
ser.close()
balesan = y.group(1)
return balesan
except Exception as e:
logging.error("Send Message Error: "+str(e))
return False
def delSMS(ser,port=""):
try:
if port != "":
ser = _connect_modem2(port, "otp")
ser.write('AT+CMGF=0\r\n'.encode())
ser.write('AT+CMGD=0,4\r\n'.encode())
ser.write('AT+CMGF=1\r\n'.encode())
balesan = (ser.readall()).decode()
return balesan
except Exception as e:
logging.error("Delete message failed"+str(e))
return False
def getInfo(port, mutex=None):
balesan = ""
try:
s = _connect_modem(port, "init")
if s:
balesan = sendUSSD(s,"*888#")
if balesan is not False:
balesan["port"] = port
balesan["status"] = "good"
#balesan["waktu"] = ""
simDetail = balesan
mutex.acquire(1)
simListTemp.append(simDetail.copy())
mutex.release()
else:
balesan = "No simcard available/Error in Port: " + port
raise Exception(balesan)
except Exception as e:
logging.error("[-] ERROR GET CARD INFO : " + str(e) + " [-]")
pass
finally:
return balesan
def getOtp(port):
s = _connect_modem2(port, "otp")
if s:
balesan = readSMS(s)
updByNum(port,"0",balesan)
return balesan
else:
return False
def find_all_port():
all_port_tuples = list(list_ports.comports())
all_ports = []
for port in all_port_tuples:
if "Exar" in port.manufacturer and "UART" in port.description:
if port.device.startswith("COM") or port.device.startswith("ttyACM"):
all_ports.append({"portLoc": port.device, "status": ""})
if len(all_ports) == 0:
logging.error("No valid port detected!. Possibly, device not plugged/detected.")
pass
return all_ports
def _load_exPort():
f = open (portList_file, "r")
data = json.loads(f.read())
f.close()
return data
@logging.catch
def save_exPort(data):
dataL = []
try:
for s_xdata in data:
dataL.append(s_xdata.copy())
with open(portList_file, 'w') as outfile:
a = json.dumps(dataL)
outfile.write(a)
return True
except Exception as e:
logging.error("[-] ERROR SAVING PORT LIST : " + str(e) + " [-]")
return False
@logging.catch
def save_sim(data):
dataL = []
try:
for s_xdata in data:
dataL.append(s_xdata.copy())
with open(simList_file, 'w') as outfile:
a = json.dumps(dataL)
outfile.write(a)
return True
except Exception as e:
logging.error("[-] ERROR SAVING SIM LIST : " + str(e) + " [-]")
return False
@logging.catch
def save_sim_update(data):
dataL = []
try:
for s_xdata in data:
dataL.append(s_xdata.copy())
with open(simList_file, 'w') as outfile:
a = json.dumps(dataL)
outfile.write(a)
return True
except Exception as e:
logging.error("[-] ERROR SAVING SIM LIST : " + str(e) + " [-]")
return False
def chkByNum(port,numb=""):
try:
f = open (simList_file, "r")
datas = json.loads(f.read())
f.close()
for data in datas:
if port != "0":
if data["port"] == str(port):
return data
else:
if data["nomer"] == str(numb):
return data
except Exception as e:
logging.error("Checking Simcard by Number Failed : "+str(e))
return False
def chkFileAge():
x=os.stat(portList_file)
Result=(time.time()-x.st_mtime)
return Result
def showAllSim():
try:
f = open (simList_file, "r")
datas = json.loads(f.read())
f.close()
return datas
except Exception as e:
logging.error("Get Simcard database failed : "+str(e))
return False
def updByNum(port,numb="",otp=""):
try:
f = open (simList_file, "r")
datas = json.loads(f.read())
f.close()
for data in datas:
if port != "0":
if data["port"] == str(port):
if isinstance(otp, str):
data["otp"] = otp
data["status"] = "good"
else:
data["otp"] = otp
data["status"] = "blacklist"
else:
if data["nomer"] == str(numb):
data["otp"] = otp
data["status"] = "good"
simList = datas
save_sim(simList)
except Exception as e:
logging.error("Checking by number failed : "+str(e))
return False
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment