Skip to content

Instantly share code, notes, and snippets.

@LuxoftAKutsan
Last active September 29, 2017 18:14
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save LuxoftAKutsan/3505785d1dd5a584329fa30d924b2dfb to your computer and use it in GitHub Desktop.
Save LuxoftAKutsan/3505785d1dd5a584329fa30d924b2dfb to your computer and use it in GitHub Desktop.
SDL log analyse
# coding: utf-8
# In[1]:
log_lines = open("/home/akutsan/Downloads/Mackinaw Ford Edge/25 Jul 2017/36.smartdevicelink.log").readlines()
# In[2]:
import re
p = re.compile("\[0x[123456789abcdef]+\]")
threads = []
for l in log_lines:
m = p.search(l)
if (m):
g = m.group()
if (g not in threads):
threads.append(g)
print ("Threads in log : ")
print (threads)
# In[3]:
def extract_thread_line(thread):
thread_lines = []
for l in log_lines:
if l.find(thread) > 0:
thread_lines.append(l)
return thread_lines
threads_lines = {}
for t in threads:
threads_lines[t] = extract_thread_line(t)
assert(len(threads_lines) == len(threads))
for t in threads_lines:
print ("In thread {} lines : {}".format(t, len(threads_lines[t])))
# In[4]:
def get_breaked_stack(lines):
stack = []
for l in lines:
if l.find(": Enter") > 0 :
stack.append(l)
if l.find(": Exit") > 0 :
stack.pop()
return stack
def print_stack(stack):
offset = 0
for s in stack:
offset +=1
pre_print = offset * "-" + ">"
print(pre_print, s.strip())
print ("Breaked stacks :")
for t in threads_lines:
stack = get_breaked_stack(threads_lines[t])
if (len(stack) >0):
print("==== Thread : {} ====".format(t))
print_stack(stack)
print("")
#! /usr/bin/env python3
# coding: utf-8
from argparse import ArgumentParser
parser = ArgumentParser(description='Extract messages from sdl logs')
print(parser.description)
parser.add_argument("--path", dest="path", metavar="PATH",
help="Path to SmasrDeviceLink logs", required=True)
parser.add_argument("--extract_messages", dest="extract_messages", action="store_true",
default=False, help="Extract MOBILE/HMI messages from sdl logs")
parser.add_argument("--calculate_input_video_data", dest="calculate_input_video_data", action="store_true",
default=False, help="Calculate amount of input videodata")
parser.add_argument("--calculate_output_video_data", dest="calculate_output_video_data", action="store_true",
default=False, help="Calculate amount of written videodata to pipe")
parser.add_argument("--streaming_logs", dest="streaming_logs", action="store_true",
default=False, help="Extract logs related to videostreaming")
args = parser.parse_args()
def read_lines(path):
log_lines = open(path).readlines()
import re
p = re.compile("(\w+) +\[\d\d \w\w\w \d\d\d\d (\d\d:\d\d:\d\d,\d\d\d)\]\[(0x[0123456789abcdef]+)\]\[(\w+)] ([A-Za-z_\/\.]+\.\w+):(\d+) (\w+):(.+)")
detected = []
for l in log_lines:
m = p.search(l)
if (m):
g = list(m.groups())
detected.append(g)
else:
detected[-1][7] = detected[-1][7] + l
return detected
def get(key, group):
if key is "type": return group[0]
if key is "timestamp": return group[1]
if key is "thread":return group[2]
if key is "component":return group[3]
if key is "file":return group[4]
if key is "line":return group[5]
if key is "func":return group[6]
if key is "info":return group[7]
return ""
def to_one_line(string):
return string.replace('\n', '').replace('\r', '').replace(" ", "")
def extract_logs_by_sec():
time_reg_exp = re.compile("(\d\d):(\d\d):(\d\d),(\d\d\d)")
logs_in_sec = {}
for l in detected:
t = get("timestamp",l)
m = time_reg_exp.search(t)
t = m.groups()
sec = int(t[0]) * 60 * 60 + int(t[1]) * 60 + int(t[2])
if sec not in logs_in_sec:
logs_in_sec[sec] = []
logs_in_sec[sec].append(l)
def received_mobile_vide_streaming(detected):
received_from_mobile = 0
for l in detected:
info = get("info", l)
file = get("file", l)
line = get("line", l)
if (info.find("ServiceType: 11") >=0 ):
if (file == "incoming_data_handler.cc") and (line == "180"):
info = info[info.find("DataSize: "):]
bytes_received = int(info.split()[1][:-1])
received_from_mobile += bytes_received
print ("Received from mobile ", received_from_mobile/10000)
def send_to_pipe_video_streaming(detected):
sent_to_pipe = 0
for l in detected:
info = get("info", l)
if (info.find("bytes of data have been written to pipe /fs/tmpfs/video_stream_pipe") >=0):
try:
bytes_sent = int(info.split(" ")[1])
sent_to_pipe += bytes_sent
except:
print (info)
print ("Sent to pipe ", sent_to_pipe/10000)
def received_on_Applink_videostreaming(pasa_log):
pasa_received = 0
for l in pasa_log:
if l.find("size_read") > 0:
received = int(l.split(" ")[5].strip())
pasa_received += received
print("Received on Applink " , pasa_received/10000)
def search_in_info(pattern, lines):
for l in lines:
if (get("info",l).find(pattern) >=0):
return l
def extract_messages_until_exit(func, thread, lines):
result = []
for l in lines:
if get("thread", l) == thread:
result.append(l)
if get("func", l) == func:
if get("info", l).find("Exit") >= 0:
return result
return None
def extract_messaging(detected):
messages = []
for i in range (len(detected)):
l = detected[i]
info = get("info", l)
from_hmi_str = "Message to convert: protocol 0;";
from_mobile_str = "Message to convert: protocol 3;";
to_mobile_str = "SendMessageToMobile"
to_hmi_str = "SendMessageToHMI"
if (info.find(from_hmi_str)>=0):
messages.append(("HMI->SDL", get("timestamp", l), info[len(from_hmi_str) + 10:]))
if (info.find(from_mobile_str)>=0):
messages.append(("MOB->SDL", get("timestamp", l), info[len(from_mobile_str) + 10:]))
if (get("func", l) == to_mobile_str and info.find("Enter") >=0):
mobile_func_lines = extract_messages_until_exit(to_mobile_str, get("thread", l), detected[i:])
if (mobile_func_lines):
mes = search_in_info("Convertion result", mobile_func_lines)
if (mes):
messages.append(("SDL->MOB", get("timestamp", mes), to_one_line(get("info", mes)[20:])))
else:
print("Not able to find conversion result", l)
else:
print("Unable to funx exit form ", l)
if (get("func", l) == to_hmi_str and info.find("Enter") >=0):
func_lines = extract_messages_until_exit(to_hmi_str, get("thread", l), detected[i:])
if (func_lines):
mes = search_in_info("Convertion result", func_lines)
if (mes):
messages.append(("SDL->HMI", get("timestamp", mes), to_one_line(get("info", mes)[20:])))
else:
print("Not able to find conversion result", l)
else:
print("Unable to funx exit form ", l)
return messages
def extract_streaming_info(detected):
messages = []
for i in range (len(detected)):
l = detected[i]
info = get("info", l)
start_stream = "StartStr";
stop_stream = "StopStr";
video_data_available = "OnVide";
if (info.find(start_stream)>=0):
messages.append((get("timestamp", l), to_one_line(info)))
if (info.find(stop_stream)>=0):
messages.append((get("timestamp", l), to_one_line(info)))
if (info.find(video_data_available)>=0):
messages.append((get("timestamp", l), to_one_line(info)))
if (info.find("ServiceType: 11") >=0 ):
file = get("file", l)
line = get("line", l)
if (file == "incoming_data_handler.cc") and (line == "180"):
messages.append((get("timestamp", l), info[63:-15 ]))
if (info.find("bytes of data have been written to pipe /fs/tmpfs/video_stream_pipe") >=0):
messages.append((get("timestamp", l), info))
if (info.find("and rpc OnHMIStatus for")>=0):
messages.append((get("timestamp", l), info))
if (info.find("BasicCommunication.OnAppDeactivated")>=0):
messages.append((get("timestamp", l), to_one_line(info)))
if (info.find("SDL.ActivateApp")>=0):
messages.append((get("timestamp", l), info))
return messages
def main():
detected = read_lines(args.path)
if(args.extract_messages):
messages = extract_messaging(detected)
f = open(args.path+"_messages", "w")
for m in messages:
f.write("{} {} {}\n\n".format(m[0],m[1],m[2]))
f.close()
if (args.calculate_input_video_data):
received_mobile_vide_streaming(detected)
if (args.calculate_output_video_data):
send_to_pipe_video_streaming(detected)
if (args.streaming_logs):
messages = extract_streaming_info(detected)
f = open(args.path+"_streaming_info", "w")
for m in messages:
f.write("{} {}\n".format(m[0],m[1]))
f.close()
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment