Last active
December 6, 2023 17:54
-
-
Save ensarkarabudak/3e40b1e8ad17303074295db6412914a1 to your computer and use it in GitHub Desktop.
Jumpstart 5G Network Data Logger
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import time | |
import subprocess | |
import psutil | |
import gspread | |
import traceback | |
import requests | |
from google.auth import exceptions | |
from google.auth.transport.requests import Request | |
from google.oauth2 import service_account | |
# Google Sheets API Authorization | |
credentials = None | |
credentials_file = "" # JSON Private Key | |
spreadsheet_id = "" # Google Sheets File ID | |
worksheet_title = "" # Page Title to Use | |
api_key = "" # Opencellid API Key | |
# Global variables for SINR, RSRP and RSRQ values | |
latest_sinr, latest_rsrp, latest_rsrq = None, None, None | |
try: | |
credentials = service_account.Credentials.from_service_account_file( | |
credentials_file, scopes=["https://www.googleapis.com/auth/spreadsheets"] | |
) | |
except exceptions.GoogleAuthError as e: | |
print(f"Google Auth Error: {e}") | |
print("Please check your credentials.") | |
# Open Google Sheets file | |
gc = gspread.authorize(credentials) | |
worksheet = gc.open_by_key(spreadsheet_id).worksheet(worksheet_title) | |
def send_at_com(command, desired): | |
try: | |
cp = subprocess.run( | |
["/home/pi/.local/bin/atcom", command, "--find", desired], | |
universal_newlines=True, | |
stdout=subprocess.PIPE, | |
stderr=subprocess.PIPE, | |
) | |
except Exception as error: | |
print(f"Message: {error}") | |
return ("", "", 1) | |
else: | |
return (cp.stdout, cp.stderr, cp.returncode) | |
def read_response(command, desired): | |
response, error, returncode = send_at_com(command, desired) | |
return response.strip(), error.strip(), returncode | |
def clean_response(response): | |
cleaned_response = response.split("\n", 1)[-1].strip() | |
cleaned_response = response.split(" ", 1)[-1].strip() | |
cleaned_response = cleaned_response.rstrip("OK").strip() | |
return cleaned_response | |
def extract_values(response): | |
global latest_sinr, latest_rsrp, latest_rsrq | |
values = response.split(",") | |
mode = values[2].strip('"') | |
if mode == "NR5G-SA": | |
sinr = values[14].strip() | |
rsrp = values[15].strip() | |
rsrq = values[16].strip() | |
elif mode == "LTE": | |
sinr = values[13].strip() | |
rsrp = values[14].strip() | |
rsrq = values[15].strip() | |
elif mode == "NR5G-NSA": | |
sinr = values[13].strip() | |
rsrp = values[14].strip() | |
rsrq = values[15].strip() | |
latest_sinr, latest_rsrp, latest_rsrq = sinr, rsrp, rsrq | |
def read_geoloc_data(): | |
serving_cell_response_map = { | |
"nr5g": {"radio_type": 2, "mcc": 4, "mnc": 5, "tac": 8, "cid": 6, "psc": 7}, | |
"lte": {"radio_type": 2, "mcc": 4, "mnc": 5, "tac": 12, "cid": 6, "psc": 7}, | |
"wcdma": {"radio_type": 2, "mcc": 3, "mnc": 4, "lac": 5, "cid": 6, "psc": 8}, | |
"gsm": {"radio_type": 2, "mcc": 3, "mnc": 4, "lac": 5, "cid": 6}, | |
} | |
geolocation = {} | |
radio_type_id = 2 | |
output = send_at_com('AT+QENG="servingcell"', "OK") | |
if output[2] == 0: | |
data = output[0].split(",") | |
radio_type = data[radio_type_id].replace('"', "").casefold() | |
try: | |
for key in serving_cell_response_map: | |
if key.find(radio_type) != -1: | |
temp = serving_cell_response_map.get(key, {}) | |
for key in temp: | |
geolocation[key] = data[temp[key]].replace('"', "").casefold() | |
except: | |
raise ValueError("Geolocation data is broken") | |
else: | |
raise RuntimeError(output[0]) | |
# str/hex/int conversation | |
try: | |
for key in geolocation: | |
if key in ["tac", "lac", "psc", "cid"]: | |
geolocation[key] = int(geolocation[key], 16) | |
elif key in ["mcc", "mnc"]: | |
geolocation[key] = int(geolocation[key]) | |
except: | |
raise ValueError("read_geoloc_data --> error occured converting hex to int") | |
print("Geolocation Data:", geolocation) | |
return geolocation | |
def measure_data_usage(interface='usb0'): | |
try: | |
net_stats = psutil.net_io_counters(pernic=True)[interface] | |
rx = (net_stats.bytes_recv) / (1024.0 * 1024.0) | |
tx = (net_stats.bytes_sent) / (1024.0 * 1024.0) | |
print(f'RX Bytes: {rx:.2f} MB') | |
print(f'TX Bytes: {tx:.2f} MB') | |
return rx,tx | |
except KeyError: | |
print(f"Network interface ({interface}) not found.") | |
return 0, 0 | |
def measure_ping_latency(): | |
try: | |
result = subprocess.run(['ping', '-c', '4', '8.8.8.8'], stdout=subprocess.PIPE) | |
ping_output = result.stdout.decode('utf-8') | |
latency = None | |
lines = ping_output.split('\n') | |
for line in lines: | |
if "time=" in line: | |
latency = line.split("time=")[1].split(" ")[0] | |
print(f'Ping Latency: {latency} ms') | |
return latency, None | |
except Exception as e: | |
print(f"An error occurred: {e}") | |
traceback.print_exc() | |
return None, e | |
def get_cell_location(api_key, mcc, mnc, lac, cid): | |
url = f'https://www.opencellid.org/cell/get?key={api_key}&mcc={mcc}&mnc={mnc}&lac={lac}&cellid={cid}&format=json' | |
response = requests.get(url) | |
if response.status_code == 200: | |
data = response.json() | |
location = data.get('lat', None), data.get('lon', None) | |
return location | |
else: | |
return None | |
try: | |
header_values = ["Date","ICCID","Operator","RAT","Band","Ping Latency(ms)","Location(lat/lon)","Registration","Download(MB)","Upload(MB)","RSSI","BER","SINR","RSRP","RSRQ"] | |
existing_headers = worksheet.row_values(1) | |
# If headers are missing, add them at the top | |
if set(header_values) != set(existing_headers): | |
worksheet.insert_row(header_values, 1) | |
at_commands = [ | |
'AT+CSQ', | |
'AT+QNWINFO', | |
'AT+COPS?', | |
'AT+QENG="servingcell"', | |
'AT+CREG?', | |
'AT+QCCID' | |
] | |
for command in at_commands: | |
response, error, returncode = read_response(command, '') | |
if "CSQ" in response: | |
# Get RSSI and BER values from response "AT+CSQ" | |
response = clean_response(response) | |
values = response.split(",") | |
rssi = values[0].strip() | |
ber = values[1].strip() if len(values) > 1 else "N/A" | |
if "QNWINFO" in response: | |
# Get Band and RAT values from response "AT+QNWINFO" | |
response = clean_response(response) | |
values = response.split(",") | |
rat = values[0].strip('"') | |
band = values[2].strip('"') | |
if "CREG" in response: | |
# Get the registration value from the response "AT+CREG" | |
response = clean_response(response) | |
registration = response | |
if "COPS" in response: | |
# Get the operator from the response "AT+COPS" | |
response = clean_response(response) | |
values = response.split(",") | |
operator = values[2].strip("\"") | |
if "QCCID" in response: | |
# Get the ICCID from the response "AT+QCCID" | |
response = clean_response(response) | |
iccid = response | |
if "servingcell" in response: | |
# Get SINR, RSRP, RSRQ | |
response = clean_response(response) | |
extract_values(response) | |
cleaned_response = clean_response(response) | |
print(f'AT Command: {command}\nOutput: {cleaned_response}\n{"="*30}') | |
geolocation_data = read_geoloc_data() | |
location = get_cell_location(api_key, geolocation_data['mcc'], geolocation_data['mnc'], geolocation_data['tac'], geolocation_data['cid']) | |
if location: | |
print(f'Cell Location: Latitude {location[0]}, Longitude {location[1]}') | |
else: | |
print('Location not found.') | |
latency, error = measure_ping_latency() | |
rx,tx = measure_data_usage() | |
print(f'RSSI: {rssi}, BER: {ber}, SINR: {latest_sinr}, RSRP: {latest_rsrp}, RSRQ: {latest_rsrq}') | |
worksheet.append_row([time.strftime("%Y-%m-%d %H:%M:%S"),iccid,operator,rat,band,latency,f"{location[0]},{location[1]}",registration,f'{rx:.2f}',f'{tx:.2f}',rssi,ber,latest_sinr,latest_rsrp,latest_rsrq]) | |
except Exception as e: | |
print(f"An error occurred: {e}") | |
traceback.print_exc() # Print error traceback information | |
raise |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment