Skip to content

Instantly share code, notes, and snippets.

@teddokano
Last active March 19, 2021 21:13
Show Gist options
  • Save teddokano/79678d074b12b370ae67c4cbf031accf to your computer and use it in GitHub Desktop.
Save teddokano/79678d074b12b370ae67c4cbf031accf to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
# run_match.py
#
# script for finding same course ruuning from .fit files
#
# usage: run_analyze.py data*.fit ..
#
# Tedd OKANO, Tsukimidai Communications Syndicate 2021
# Version 0.7 20-March-2021
# Copyright (c) 2021 Tedd OKANO
# Released under the MIT license
# https://opensource.org/licenses/mit-license.php
import fitpandas # https://github.com/teddokano/run_coursemap
import fitpandas_util as fu # https://github.com/teddokano/run_coursemap
import pandas as pd
import matplotlib.pyplot as plt
import matplotlib.patches as patches
import numpy as np
import os.path
import sys
import argparse
import pickle
import datetime
import itertools
import staticmaps
ID = "run_match"
VERSION = "0.7"
ID_VER = ID + " v" + VERSION
CORR_THRESHOLD = 0.90 # correlation_coefficient
DIST_THRESHOLD = 1000 # meters
PLOT_OFFET_CANCEL = False
DEBUG = False
MAP_RESOLUTION = { "low": 256, "mid": 512, "high": 1024, "off": "" }
REQUIRED_DATA_COLUMNS = [
"distance",
"position_long",
"position_lat"
]
def main():
print( ID_VER )
if not args.load_bin:
if 3 > len( sys.argv ):
print( "error: need 2 or more files to compare" )
sys.exit( 1 )
print( "file reading.." )
rec_df_dic, session_df = read_files( args.input_files )
session_df = update_session_df( rec_df_dic, session_df )
else:
with open( args.load_bin, "rb" ) as file:
id_ver = pickle.load( file )
rec_df_dic = pickle.load( file )
session_df = pickle.load( file )
if args.save_bin:
with open( args.save_bin + ".run_match.pickle", "wb" ) as file:
catalog = args.input_files
pickle.dump( ID_VER, file )
pickle.dump( rec_df_dic, file )
pickle.dump( session_df, file )
print( "converting coodinates.." )
dst_df_dic = convert_to_distance_based_coodinates( rec_df_dic, 10 )
print( "finding course groups.." )
groups = course_match( dst_df_dic, session_df, CORR_THRESHOLD, DIST_THRESHOLD )
cm_list = [ get_colormap( len( g[ "fn_list" ] ) ) for g in groups ]
print( "==== result: course groups with .." )
print( " correlation_coefficient >= {}, route center eccentricity > {}m ====".format( CORR_THRESHOLD, DIST_THRESHOLD ) )
result_print( groups, cm_list, session_df )
print( "==== {} run groups in total {} run{} ====".format( len( groups ), len( session_df.index ), "s" if 1 < len( session_df.index ) else "" ) )
print( "plotting course grouped by color (z_axis = running distance)" )
sgi_list = find_super_group( groups )
print( " super group:", end = "" )
print( sgi_list )
cells = cell_check( groups, session_df )
draw_cell_group_rect( cells )
# quit()
if sgi_list:
for sgis in sgi_list:
gsd = []
for i in sgis:
gsd.append( { "num": i, "fn_list": groups[ i ][ "fn_list" ] } )
cm_list = [ get_colormap( len( g[ "fn_list" ] ) ) for g in gsd ]
plot_groups( gsd, dst_df_dic, session_df.T, cm_list )
else:
plot_groups( groups, dst_df_dic, session_df.T, cm_list )
def update_session_df( df_dic, session_df ):
s = {}
for k, v in df_dic.items():
s[ k ] = fu.attributes( v )
return session_df.join( pd.DataFrame( s ).T )
def result_print( groups, cm_list, s ):
lb = groups[ 0 ][ "fn_list" ][ 0 ]
if args.no_cityname:
start_place = ""
farend_place = ""
else:
start_place = "@ " + fu.get_city_name( s.at[ lb, "start_lat" ], s.at[ lb, "start_long" ] )
farend_place = "@ " + fu.get_city_name( s.at[ lb, "farthest_lat" ], s.at[ lb, "farthest_long" ] )
if start_place == farend_place:
farend_place = ""
distance = s.at[ lb, "fin" ] / 1000
cc = 0
for g in groups:
sports_type = []
for fn in g[ "fn_list" ]:
sport = s.at[ fn, "sport" ]
if not sport in sports_type:
sports_type.append( sport )
cm = cm_list[ cc ]
print( " group-{} {:.1f}km ({} {} time{}) {} ~ {}:".format( g[ "num" ], distance, "/".join( sports_type ), len( g[ "fn_list" ] ), "s" if 1 < len( g[ "fn_list" ] ) else "", start_place, farend_place ) )
count = 0
for fn in g[ "fn_list" ]:
sp = s.at[ fn, "sport" ]
et = datetime.timedelta( seconds = round( s.at[ fn, "total_elapsed_time" ] ) )
st = s.at[ fn, "start_time" ].strftime( "%Y-%b-%d %a" )
c_print( " {} ({:10} for {}, on {})".format( fn, sp, et, st ), cm[ count ] )
count += 1
cc += 1
def c_print( s, color ):
r, g, b = tuple( [ "{}".format( int(x * 255) ) for x in color ] )
rgb = "\033[38;2;{};{};{}m".format( r, g, b )
# print( "{}{}{}".format( rgb, s, "\033[0m" ) )
print( "{}".format( s ) )
def read_files( input_files ):
rec_df_dic = {}
sesion_ar = []
for file_name in input_files:
print( " reading " + file_name + " .. ", end = "" )
fn, file_ext = os.path.splitext( file_name )
if ".fit" != file_ext.lower():
print( " cannot read. - .fit format file only" )
continue
rec_df, session, units = fitpandas.get_workout( file_name )
print( "{}/{} ".format( session[ "sport" ], session[ "sub_sport" ] ), end = "" )
if not "position_lat" in rec_df.columns.values:
print( "<ignored>" )
continue # don't add indoor activity to DataFrame array and session DataFrame
rec_df = rec_df.dropna( subset = REQUIRED_DATA_COLUMNS )
rec_df.reset_index( inplace = True, drop = True )
if len( rec_df.index ) < 60:
print( "<ignored>" )
continue # data too short
print( "{}".format( session[ "start_time" ] ) )
rec_df[ "position_lat" ] = rec_df[ "position_lat" ].apply( fu.semicircles2dgree )
rec_df[ "position_long" ] = rec_df[ "position_long" ].apply( fu.semicircles2dgree )
# session[ "nec_lat" ] = fu.semicircles2dgree( session[ "nec_lat" ] )
# session[ "nec_long" ] = fu.semicircles2dgree( session[ "nec_long" ] )
# session[ "swc_lat" ] = fu.semicircles2dgree( session[ "swc_lat" ] )
# session[ "swc_long" ] = fu.semicircles2dgree( session[ "swc_long" ] )
rec_df_dic[ file_name ] = rec_df
sesion_ar.append( session )
session_df = pd.DataFrame( sesion_ar, index = rec_df_dic.keys() )
session_df.sort_values( "start_time", inplace = True )
return rec_df_dic, session_df
def convert_to_distance_based_coodinates( rec_df_dic0, interval ):
df_dic = {}
for k, v in rec_df_dic0.items():
df = get_dist_based_coodinates( v, 10 )
df_dic[ k ] = df
return df_dic
def course_match( df_dic, s, threshold, distance ):
####
#### course_match finds same course run.
#### the same runs are gethered in a group and returns the list of groups
####
ddf = pd.DataFrame()
for i in df_dic.keys():
ddf.at[ i, i ] = 0
cmb = list( itertools.combinations( df_dic.keys(), 2 ) )
for i, j in cmb:
ddf.at[ i, j ] = fu.p2p_distance( s.at[ i, "v_cntr_deg" ], s.at[ i, "h_cntr_deg" ], s.at[ j, "v_cntr_deg" ], s.at[ j, "h_cntr_deg" ] )
ddf.at[ j, i ] = ddf.at[ i, j ]
ddf.rename( columns = lambda s: s + "_d", inplace = True )
xd = {}
yd = {}
for k, v in df_dic.items():
xd[ k ] = v[ "x" ]
yd[ k ] = v[ "y" ]
xcdf = pd.DataFrame( xd ).corr()
ycdf = pd.DataFrame( yd ).corr()
xcdf.rename( columns = lambda s: s + "_x", inplace = True )
ycdf.rename( columns = lambda s: s + "_y", inplace = True )
cdf = pd.concat( [ xcdf, ycdf, ddf ], axis=1 )
keys = [ k for k in df_dic.keys() ]
checked = []
groups = []
count = 0
for k in keys:
if k in checked:
continue
fn_list = cdf[ (cdf[ k + "_x" ] >= threshold) & (cdf[ k + "_y" ] >= threshold) & (cdf[ k + "_d" ] < distance) ].index.values.tolist()
checked.extend( fn_list )
groups.append( { "num": count, "fn_list": fn_list } )
count += 1
return groups
def find_super_group( groups ):
####
#### find_super_group finds groups which has same run in group member
#### groups in the super-group may have similar course.
#### this result will be varied by setting of CORR_THRESHOLD and DIST_THRESHOLD.
####
d_list = [ pd.Series( 1, index = g[ "fn_list" ] ) for g in groups ]
chk_df = pd.concat( d_list, axis = 1 )
chk_df[ "dup" ] = chk_df.sum( axis = 1 )
if DEBUG:
pd.set_option( 'max_columns', 1024 )
pd.set_option( 'max_rows', 500 )
pd.set_option('display.expand_frame_repr', False)
print( chk_df )
chk_df = chk_df[ chk_df[ "dup" ] >= 2 ]
chk_df.drop( "dup", axis=1, inplace = True )
chk_df = chk_df.T
chk_df[ "pick" ] = chk_df.sum( axis = 1 )
chk_df = chk_df[ chk_df[ "pick" ] != 0 ]
chk_df.drop( "pick", axis=1, inplace = True )
chk_df.fillna( 0, inplace = True )
sg_ar = []
for fn in chk_df.columns:
t = list( chk_df[ chk_df[ fn ] != 0 ].index.values )
append_flag = True
for sg in sg_ar:
if 0 != len( set( t ) & set( sg ) ):
sg.extend( t )
append_flag = False
continue
if append_flag:
sg_ar.append( t )
r_ar = []
for sg in sg_ar:
r_ar.append( list( set( sg ) ) )
return r_ar
def get_dist_based_coodinates( data, stops ):
y = []
x = []
s = 0
for i in range( len( data.index ) ):
if s <= data[ "distance" ][ i ]:
y.append( data[ "position_lat" ][ i ] )
x.append( data[ "position_long" ][ i ] )
s += stops
d = [ x * stops for x in range( len( y ) ) ]
return pd.DataFrame( { "x": x, "y": y, "d": d } )
def group_attr( fns, s ):
s = s[ fns ].T
s = s[ [ "nec_lat", "nec_long", "swc_lat", "swc_long", "start_position_lat", "start_position_long" ] ]
for p in s.keys():
s[ p ] = s[ p ].apply( fu.semicircles2dgree )
s = s.describe()
sd = fu.att_core( s )
sd[ "start_long" ] = s[ "start_position_lat" ][ "mean" ]
sd[ "start_lat" ] = s[ "start_position_long" ][ "mean" ]
return sd
def plot_groups( groups, df_dic, s, cm_list ):
fig = plt.figure( figsize=( 11, 11 ) )
ax = fig.add_subplot( 111, projection = "3d" )
fns = sum( [ g[ "fn_list" ] for g in groups ], [] )
sd = group_attr( fns, s )
start_lat = sd[ "start_lat" ]
start_long = sd[ "start_long" ]
v_start_ofst = sd[ "Cv" ] * start_lat
h_start_ofst = sd[ "Ch" ] * start_long
sd[ "north" ] -= v_start_ofst
sd[ "south" ] -= v_start_ofst
sd[ "east" ] -= h_start_ofst
sd[ "west" ] -= h_start_ofst
list_count = 0
for g in groups:
if list_count == 0:
map_arr = get_map( ax, "low", sd )
cm = cm_list[ list_count ]
count = 0
for fn in g[ "fn_list" ]:
df = df_dic[ fn ]
x = df[ "x" ].apply( lambda x: sd[ "Ch" ] * (x - start_long) )
y = df[ "y" ].apply( lambda y: sd[ "Cv" ] * (y - start_lat ) )
d = df[ "d" ] / 1000
clr, alph = (cm[ count ], .2)
plt.plot( x, y, d, color = clr, alpha = alph )
count += 1
list_count += 1
fn1 = { g[ "num" ]: g[ "fn_list" ][ 0 ] for g in groups } # get a first file name from each group
list_count = 0
for i, fn in fn1.items():
x = df[ "x" ].apply( lambda x: sd[ "Ch" ] * (x - start_long) ).to_list()
y = df[ "y" ].apply( lambda y: sd[ "Cv" ] * (y - start_lat ) ).to_list()
d = (df[ "d" ] / 1000).to_list()
marktext( ax, x[ 0 ], y[ 0 ], d[ 0 ], 10, cm_list[ list_count ][ 0 ], "group{}-start".format( i ) )
marktext( ax, x[ -1 ], y[ -1 ], d[ -1 ], 10, cm_list[ list_count ][ 0 ], "group{}-fin:{:.1f}km".format( i, d[ -1 ] ) )
list_count += 1
ax.set_xlabel( "longitude [km]" )
ax.set_ylabel( "latiitude [km]" )
ax.set_zlabel( "road distance [km]" )
plt.show()
def get_colormap( n_lines ):
if n_lines == 1:
cm = [ [ 1, 0, 0 ] ] # red
else:
cm = []
for x in range( n_lines + 1 ):
cm.append( [ 0, x / n_lines, 1 - x / n_lines ] )
return cm
def marktext( ax, x, y, z, dotsize, color, text ):
ax.scatter( x, y, z, s = dotsize, color = color, alpha = 0.2 )
ax.text( x, y, z, text, color = color, alpha = 0.2, size = 10, ha = "left", va = "center" )
def command_line_handling():
parser = argparse.ArgumentParser( description = "finding route run repeadedly :)" )
parser.add_argument( "input_files", help = "input file(s) (.fit format)", nargs='*' )
parser.add_argument( "--load_bin", help = "load bin file" )
parser.add_argument( "--save_bin", help = "save bin file which was gathered from loaded .fit files" )
parser.add_argument( "--no_cityname", help = "no showing reverse geocoding result", action = "store_true" )
return parser.parse_args()
def get_map( axis, size_idx, lv ):
# finding zoom level
# reference: https://wiki.openstreetmap.org/wiki/Zoom_levels
# reference: https://wiki.openstreetmap.org/wiki/Tiles
ZOOM_SCALE = [ 360,
180, 90, 45, 22.5, 11.25,
5.625, 2.813, 1.406, 0.703, 0.352,
0.176, 0.088, 0.044, 0.022, 0.011,
0.005, 0.003, 0.001, 0.0005, 0.00025
]
TILE_SIZE = 256.0
size = MAP_RESOLUTION[ size_idx ]
span = lv[ "vh_span" ] * TILE_SIZE / size
for zoom_level in range( 1, len( ZOOM_SCALE) + 1 ):
if (span / lv[ "Ch" ]) > ZOOM_SCALE[ zoom_level ]:
break
zoom_level -= 1
map_span_by_size = (fu.K * lv[ "Rcv" ]) / (2 ** zoom_level) * (size / TILE_SIZE)
size = int( np.ceil( size * (lv[ "vh_span" ] / map_span_by_size) ) )
context = staticmaps.Context()
context.set_tile_provider( staticmaps.tile_provider_OSM )
context.set_zoom( zoom_level )
context.set_center( staticmaps.create_latlng( lv[ "v_cntr_deg" ], lv[ "h_cntr_deg" ] ) )
image = context.render_cairo( size, size )
# image.write_to_png("_map_img.png")
buf = image.get_data()
arri = np.ndarray( shape=( size, size, 4 ), dtype = np.uint8, buffer = buf ).astype(np.uint8)
ar = np.array( arri / 255.0, dtype = np.float16 )
arr = np.ndarray( shape=( size, size, 4 ), dtype = np.float16 )
for x in range( size ):
for y in range( size ):
arr[ y ][ (size - 1) - x ] = [ ar[ x ][ y ][ 2 ], ar[ x ][ y ][ 1 ], ar[ x ][ y ][ 0 ], 0.1 ]
surface_x = [ [x] for x in np.linspace( lv[ "west" ], lv[ "east" ], size ) ]
surface_y = [ y for y in np.linspace( lv[ "south" ], lv[ "north" ] , size ) ]
stride = 1
axis.plot_surface( surface_x, surface_y, np.atleast_2d( 0 ), rstride = stride, cstride = stride, facecolors = arr, shade = False )
return arr
def cell_check( groups, s ):
####
#### cell check finds cells.
#### "cell" is defined as rectangle area on map shaed by multiple route groups.
####
cell_list = []
gr_ss_dic_list = []
for g in groups:
group_number = g[ "num" ]
file_name_list = g[ "fn_list" ]
gr_ss_dic_list.append( { "num": group_number, "attr": group_attr( file_name_list, s.T ) } )
for gr_ss_dic in gr_ss_dic_list:
group_number = gr_ss_dic[ "num" ]
gr_ss = gr_ss_dic[ "attr" ]
if 0 == len( cell_list ):
c = cell( gr_ss, gr_ss_dic )
cell_list.append( c )
continue
ts, tn = gr_ss["s_deg"], gr_ss["n_deg"]
tw, te = gr_ss["w_deg"], gr_ss["e_deg"]
for c in cell_list:
cs, cn = c[ "s_deg" ], c[ "n_deg" ]
cw, ce = c[ "w_deg" ], c[ "e_deg" ]
hit = False
if not ((ce < tw) or (te < cw) or (cn < ts) or (tn < cs)):
hit = True
break
if not ((ce < tw) or (te < cw) or (cn < ts) or (tn < cs)):
c[ "n_deg" ] = tn if cn < tn else cn
c[ "s_deg" ] = ts if ts < cs else cs
c[ "e_deg" ] = te if ce < te else ce
c[ "w_deg" ] = tw if tw < cw else cw
c[ "gssd_list" ].append( gr_ss_dic )
else:
c = cell( gr_ss, gr_ss_dic )
cell_list.append( c )
print( " {} cells".format( len( cell_list ) ) )
for c in cell_list:
print( " {} groups: ".format( len( c[ "gssd_list" ] ) ), end = "" )
print( [ gssd[ "num" ] for gssd in c[ "gssd_list" ] ] )
return cell_list
def cell( gs, gssd ):
c = {}
c[ "n_deg" ] = gs[ "n_deg" ]
c[ "s_deg" ] = gs[ "s_deg" ]
c[ "e_deg" ] = gs[ "e_deg" ]
c[ "w_deg" ] = gs[ "w_deg" ]
c[ "gssd_list" ] = [ gssd ]
return c
def draw_cell_group_rect( cell_list ):
fig = plt.figure()
ax = plt.axes()
rects = []
for c in cell_list:
rects.append( rect( c, "r" ) )
for gsd in c[ "gssd_list" ]:
rects.append( rect( gsd[ "attr" ], "b" ) )
for r in rects:
ax.add_patch( r )
plt.axis('scaled')
ax.set_aspect('equal')
plt.show()
def rect( s, c ):
r = patches.Rectangle(xy=(s["w_deg"], s["s_deg"]), width=(s["e_deg"]-s["w_deg"]), height=(s["n_deg"]-s["s_deg"]), ec=c, fill=False)
return r
if __name__ == "__main__":
args = command_line_handling()
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment