Skip to content

Instantly share code, notes, and snippets.

@ensarkarabudak
Last active December 6, 2023 17:54
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ensarkarabudak/3e40b1e8ad17303074295db6412914a1 to your computer and use it in GitHub Desktop.
Save ensarkarabudak/3e40b1e8ad17303074295db6412914a1 to your computer and use it in GitHub Desktop.
Jumpstart 5G Network Data Logger
import time
import subprocess
import psutil
import gspread
import traceback
import requests
from google.auth import exceptions
from google.auth.transport.requests import Request
from google.oauth2 import service_account
# Google Sheets API Authorization
credentials = None
credentials_file = "" # JSON Private Key
spreadsheet_id = "" # Google Sheets File ID
worksheet_title = "" # Page Title to Use
api_key = "" # Opencellid API Key
# Global variables for SINR, RSRP and RSRQ values
latest_sinr, latest_rsrp, latest_rsrq = None, None, None
try:
credentials = service_account.Credentials.from_service_account_file(
credentials_file, scopes=["https://www.googleapis.com/auth/spreadsheets"]
)
except exceptions.GoogleAuthError as e:
print(f"Google Auth Error: {e}")
print("Please check your credentials.")
# Open Google Sheets file
gc = gspread.authorize(credentials)
worksheet = gc.open_by_key(spreadsheet_id).worksheet(worksheet_title)
def send_at_com(command, desired):
try:
cp = subprocess.run(
["/home/pi/.local/bin/atcom", command, "--find", desired],
universal_newlines=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
except Exception as error:
print(f"Message: {error}")
return ("", "", 1)
else:
return (cp.stdout, cp.stderr, cp.returncode)
def read_response(command, desired):
response, error, returncode = send_at_com(command, desired)
return response.strip(), error.strip(), returncode
def clean_response(response):
cleaned_response = response.split("\n", 1)[-1].strip()
cleaned_response = response.split(" ", 1)[-1].strip()
cleaned_response = cleaned_response.rstrip("OK").strip()
return cleaned_response
def extract_values(response):
global latest_sinr, latest_rsrp, latest_rsrq
values = response.split(",")
mode = values[2].strip('"')
if mode == "NR5G-SA":
sinr = values[14].strip()
rsrp = values[15].strip()
rsrq = values[16].strip()
elif mode == "LTE":
sinr = values[13].strip()
rsrp = values[14].strip()
rsrq = values[15].strip()
elif mode == "NR5G-NSA":
sinr = values[13].strip()
rsrp = values[14].strip()
rsrq = values[15].strip()
latest_sinr, latest_rsrp, latest_rsrq = sinr, rsrp, rsrq
def read_geoloc_data():
serving_cell_response_map = {
"nr5g": {"radio_type": 2, "mcc": 4, "mnc": 5, "tac": 8, "cid": 6, "psc": 7},
"lte": {"radio_type": 2, "mcc": 4, "mnc": 5, "tac": 12, "cid": 6, "psc": 7},
"wcdma": {"radio_type": 2, "mcc": 3, "mnc": 4, "lac": 5, "cid": 6, "psc": 8},
"gsm": {"radio_type": 2, "mcc": 3, "mnc": 4, "lac": 5, "cid": 6},
}
geolocation = {}
radio_type_id = 2
output = send_at_com('AT+QENG="servingcell"', "OK")
if output[2] == 0:
data = output[0].split(",")
radio_type = data[radio_type_id].replace('"', "").casefold()
try:
for key in serving_cell_response_map:
if key.find(radio_type) != -1:
temp = serving_cell_response_map.get(key, {})
for key in temp:
geolocation[key] = data[temp[key]].replace('"', "").casefold()
except:
raise ValueError("Geolocation data is broken")
else:
raise RuntimeError(output[0])
# str/hex/int conversation
try:
for key in geolocation:
if key in ["tac", "lac", "psc", "cid"]:
geolocation[key] = int(geolocation[key], 16)
elif key in ["mcc", "mnc"]:
geolocation[key] = int(geolocation[key])
except:
raise ValueError("read_geoloc_data --> error occured converting hex to int")
print("Geolocation Data:", geolocation)
return geolocation
def measure_data_usage(interface='usb0'):
try:
net_stats = psutil.net_io_counters(pernic=True)[interface]
rx = (net_stats.bytes_recv) / (1024.0 * 1024.0)
tx = (net_stats.bytes_sent) / (1024.0 * 1024.0)
print(f'RX Bytes: {rx:.2f} MB')
print(f'TX Bytes: {tx:.2f} MB')
return rx,tx
except KeyError:
print(f"Network interface ({interface}) not found.")
return 0, 0
def measure_ping_latency():
try:
result = subprocess.run(['ping', '-c', '4', '8.8.8.8'], stdout=subprocess.PIPE)
ping_output = result.stdout.decode('utf-8')
latency = None
lines = ping_output.split('\n')
for line in lines:
if "time=" in line:
latency = line.split("time=")[1].split(" ")[0]
print(f'Ping Latency: {latency} ms')
return latency, None
except Exception as e:
print(f"An error occurred: {e}")
traceback.print_exc()
return None, e
def get_cell_location(api_key, mcc, mnc, lac, cid):
url = f'https://www.opencellid.org/cell/get?key={api_key}&mcc={mcc}&mnc={mnc}&lac={lac}&cellid={cid}&format=json'
response = requests.get(url)
if response.status_code == 200:
data = response.json()
location = data.get('lat', None), data.get('lon', None)
return location
else:
return None
try:
header_values = ["Date","ICCID","Operator","RAT","Band","Ping Latency(ms)","Location(lat/lon)","Registration","Download(MB)","Upload(MB)","RSSI","BER","SINR","RSRP","RSRQ"]
existing_headers = worksheet.row_values(1)
# If headers are missing, add them at the top
if set(header_values) != set(existing_headers):
worksheet.insert_row(header_values, 1)
at_commands = [
'AT+CSQ',
'AT+QNWINFO',
'AT+COPS?',
'AT+QENG="servingcell"',
'AT+CREG?',
'AT+QCCID'
]
for command in at_commands:
response, error, returncode = read_response(command, '')
if "CSQ" in response:
# Get RSSI and BER values from response "AT+CSQ"
response = clean_response(response)
values = response.split(",")
rssi = values[0].strip()
ber = values[1].strip() if len(values) > 1 else "N/A"
if "QNWINFO" in response:
# Get Band and RAT values from response "AT+QNWINFO"
response = clean_response(response)
values = response.split(",")
rat = values[0].strip('"')
band = values[2].strip('"')
if "CREG" in response:
# Get the registration value from the response "AT+CREG"
response = clean_response(response)
registration = response
if "COPS" in response:
# Get the operator from the response "AT+COPS"
response = clean_response(response)
values = response.split(",")
operator = values[2].strip("\"")
if "QCCID" in response:
# Get the ICCID from the response "AT+QCCID"
response = clean_response(response)
iccid = response
if "servingcell" in response:
# Get SINR, RSRP, RSRQ
response = clean_response(response)
extract_values(response)
cleaned_response = clean_response(response)
print(f'AT Command: {command}\nOutput: {cleaned_response}\n{"="*30}')
geolocation_data = read_geoloc_data()
location = get_cell_location(api_key, geolocation_data['mcc'], geolocation_data['mnc'], geolocation_data['tac'], geolocation_data['cid'])
if location:
print(f'Cell Location: Latitude {location[0]}, Longitude {location[1]}')
else:
print('Location not found.')
latency, error = measure_ping_latency()
rx,tx = measure_data_usage()
print(f'RSSI: {rssi}, BER: {ber}, SINR: {latest_sinr}, RSRP: {latest_rsrp}, RSRQ: {latest_rsrq}')
worksheet.append_row([time.strftime("%Y-%m-%d %H:%M:%S"),iccid,operator,rat,band,latency,f"{location[0]},{location[1]}",registration,f'{rx:.2f}',f'{tx:.2f}',rssi,ber,latest_sinr,latest_rsrp,latest_rsrq])
except Exception as e:
print(f"An error occurred: {e}")
traceback.print_exc() # Print error traceback information
raise
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment