Last active
March 19, 2021 21:13
-
-
Save teddokano/79678d074b12b370ae67c4cbf031accf to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# run_match.py | |
# | |
# script for finding same course ruuning from .fit files | |
# | |
# usage: run_analyze.py data*.fit .. | |
# | |
# Tedd OKANO, Tsukimidai Communications Syndicate 2021 | |
# Version 0.7 20-March-2021 | |
# Copyright (c) 2021 Tedd OKANO | |
# Released under the MIT license | |
# https://opensource.org/licenses/mit-license.php | |
import fitpandas # https://github.com/teddokano/run_coursemap | |
import fitpandas_util as fu # https://github.com/teddokano/run_coursemap | |
import pandas as pd | |
import matplotlib.pyplot as plt | |
import matplotlib.patches as patches | |
import numpy as np | |
import os.path | |
import sys | |
import argparse | |
import pickle | |
import datetime | |
import itertools | |
import staticmaps | |
ID = "run_match" | |
VERSION = "0.7" | |
ID_VER = ID + " v" + VERSION | |
CORR_THRESHOLD = 0.90 # correlation_coefficient | |
DIST_THRESHOLD = 1000 # meters | |
PLOT_OFFET_CANCEL = False | |
DEBUG = False | |
MAP_RESOLUTION = { "low": 256, "mid": 512, "high": 1024, "off": "" } | |
REQUIRED_DATA_COLUMNS = [ | |
"distance", | |
"position_long", | |
"position_lat" | |
] | |
def main(): | |
print( ID_VER ) | |
if not args.load_bin: | |
if 3 > len( sys.argv ): | |
print( "error: need 2 or more files to compare" ) | |
sys.exit( 1 ) | |
print( "file reading.." ) | |
rec_df_dic, session_df = read_files( args.input_files ) | |
session_df = update_session_df( rec_df_dic, session_df ) | |
else: | |
with open( args.load_bin, "rb" ) as file: | |
id_ver = pickle.load( file ) | |
rec_df_dic = pickle.load( file ) | |
session_df = pickle.load( file ) | |
if args.save_bin: | |
with open( args.save_bin + ".run_match.pickle", "wb" ) as file: | |
catalog = args.input_files | |
pickle.dump( ID_VER, file ) | |
pickle.dump( rec_df_dic, file ) | |
pickle.dump( session_df, file ) | |
print( "converting coodinates.." ) | |
dst_df_dic = convert_to_distance_based_coodinates( rec_df_dic, 10 ) | |
print( "finding course groups.." ) | |
groups = course_match( dst_df_dic, session_df, CORR_THRESHOLD, DIST_THRESHOLD ) | |
cm_list = [ get_colormap( len( g[ "fn_list" ] ) ) for g in groups ] | |
print( "==== result: course groups with .." ) | |
print( " correlation_coefficient >= {}, route center eccentricity > {}m ====".format( CORR_THRESHOLD, DIST_THRESHOLD ) ) | |
result_print( groups, cm_list, session_df ) | |
print( "==== {} run groups in total {} run{} ====".format( len( groups ), len( session_df.index ), "s" if 1 < len( session_df.index ) else "" ) ) | |
print( "plotting course grouped by color (z_axis = running distance)" ) | |
sgi_list = find_super_group( groups ) | |
print( " super group:", end = "" ) | |
print( sgi_list ) | |
cells = cell_check( groups, session_df ) | |
draw_cell_group_rect( cells ) | |
# quit() | |
if sgi_list: | |
for sgis in sgi_list: | |
gsd = [] | |
for i in sgis: | |
gsd.append( { "num": i, "fn_list": groups[ i ][ "fn_list" ] } ) | |
cm_list = [ get_colormap( len( g[ "fn_list" ] ) ) for g in gsd ] | |
plot_groups( gsd, dst_df_dic, session_df.T, cm_list ) | |
else: | |
plot_groups( groups, dst_df_dic, session_df.T, cm_list ) | |
def update_session_df( df_dic, session_df ): | |
s = {} | |
for k, v in df_dic.items(): | |
s[ k ] = fu.attributes( v ) | |
return session_df.join( pd.DataFrame( s ).T ) | |
def result_print( groups, cm_list, s ): | |
lb = groups[ 0 ][ "fn_list" ][ 0 ] | |
if args.no_cityname: | |
start_place = "" | |
farend_place = "" | |
else: | |
start_place = "@ " + fu.get_city_name( s.at[ lb, "start_lat" ], s.at[ lb, "start_long" ] ) | |
farend_place = "@ " + fu.get_city_name( s.at[ lb, "farthest_lat" ], s.at[ lb, "farthest_long" ] ) | |
if start_place == farend_place: | |
farend_place = "" | |
distance = s.at[ lb, "fin" ] / 1000 | |
cc = 0 | |
for g in groups: | |
sports_type = [] | |
for fn in g[ "fn_list" ]: | |
sport = s.at[ fn, "sport" ] | |
if not sport in sports_type: | |
sports_type.append( sport ) | |
cm = cm_list[ cc ] | |
print( " group-{} {:.1f}km ({} {} time{}) {} ~ {}:".format( g[ "num" ], distance, "/".join( sports_type ), len( g[ "fn_list" ] ), "s" if 1 < len( g[ "fn_list" ] ) else "", start_place, farend_place ) ) | |
count = 0 | |
for fn in g[ "fn_list" ]: | |
sp = s.at[ fn, "sport" ] | |
et = datetime.timedelta( seconds = round( s.at[ fn, "total_elapsed_time" ] ) ) | |
st = s.at[ fn, "start_time" ].strftime( "%Y-%b-%d %a" ) | |
c_print( " {} ({:10} for {}, on {})".format( fn, sp, et, st ), cm[ count ] ) | |
count += 1 | |
cc += 1 | |
def c_print( s, color ): | |
r, g, b = tuple( [ "{}".format( int(x * 255) ) for x in color ] ) | |
rgb = "\033[38;2;{};{};{}m".format( r, g, b ) | |
# print( "{}{}{}".format( rgb, s, "\033[0m" ) ) | |
print( "{}".format( s ) ) | |
def read_files( input_files ): | |
rec_df_dic = {} | |
sesion_ar = [] | |
for file_name in input_files: | |
print( " reading " + file_name + " .. ", end = "" ) | |
fn, file_ext = os.path.splitext( file_name ) | |
if ".fit" != file_ext.lower(): | |
print( " cannot read. - .fit format file only" ) | |
continue | |
rec_df, session, units = fitpandas.get_workout( file_name ) | |
print( "{}/{} ".format( session[ "sport" ], session[ "sub_sport" ] ), end = "" ) | |
if not "position_lat" in rec_df.columns.values: | |
print( "<ignored>" ) | |
continue # don't add indoor activity to DataFrame array and session DataFrame | |
rec_df = rec_df.dropna( subset = REQUIRED_DATA_COLUMNS ) | |
rec_df.reset_index( inplace = True, drop = True ) | |
if len( rec_df.index ) < 60: | |
print( "<ignored>" ) | |
continue # data too short | |
print( "{}".format( session[ "start_time" ] ) ) | |
rec_df[ "position_lat" ] = rec_df[ "position_lat" ].apply( fu.semicircles2dgree ) | |
rec_df[ "position_long" ] = rec_df[ "position_long" ].apply( fu.semicircles2dgree ) | |
# session[ "nec_lat" ] = fu.semicircles2dgree( session[ "nec_lat" ] ) | |
# session[ "nec_long" ] = fu.semicircles2dgree( session[ "nec_long" ] ) | |
# session[ "swc_lat" ] = fu.semicircles2dgree( session[ "swc_lat" ] ) | |
# session[ "swc_long" ] = fu.semicircles2dgree( session[ "swc_long" ] ) | |
rec_df_dic[ file_name ] = rec_df | |
sesion_ar.append( session ) | |
session_df = pd.DataFrame( sesion_ar, index = rec_df_dic.keys() ) | |
session_df.sort_values( "start_time", inplace = True ) | |
return rec_df_dic, session_df | |
def convert_to_distance_based_coodinates( rec_df_dic0, interval ): | |
df_dic = {} | |
for k, v in rec_df_dic0.items(): | |
df = get_dist_based_coodinates( v, 10 ) | |
df_dic[ k ] = df | |
return df_dic | |
def course_match( df_dic, s, threshold, distance ): | |
#### | |
#### course_match finds same course run. | |
#### the same runs are gethered in a group and returns the list of groups | |
#### | |
ddf = pd.DataFrame() | |
for i in df_dic.keys(): | |
ddf.at[ i, i ] = 0 | |
cmb = list( itertools.combinations( df_dic.keys(), 2 ) ) | |
for i, j in cmb: | |
ddf.at[ i, j ] = fu.p2p_distance( s.at[ i, "v_cntr_deg" ], s.at[ i, "h_cntr_deg" ], s.at[ j, "v_cntr_deg" ], s.at[ j, "h_cntr_deg" ] ) | |
ddf.at[ j, i ] = ddf.at[ i, j ] | |
ddf.rename( columns = lambda s: s + "_d", inplace = True ) | |
xd = {} | |
yd = {} | |
for k, v in df_dic.items(): | |
xd[ k ] = v[ "x" ] | |
yd[ k ] = v[ "y" ] | |
xcdf = pd.DataFrame( xd ).corr() | |
ycdf = pd.DataFrame( yd ).corr() | |
xcdf.rename( columns = lambda s: s + "_x", inplace = True ) | |
ycdf.rename( columns = lambda s: s + "_y", inplace = True ) | |
cdf = pd.concat( [ xcdf, ycdf, ddf ], axis=1 ) | |
keys = [ k for k in df_dic.keys() ] | |
checked = [] | |
groups = [] | |
count = 0 | |
for k in keys: | |
if k in checked: | |
continue | |
fn_list = cdf[ (cdf[ k + "_x" ] >= threshold) & (cdf[ k + "_y" ] >= threshold) & (cdf[ k + "_d" ] < distance) ].index.values.tolist() | |
checked.extend( fn_list ) | |
groups.append( { "num": count, "fn_list": fn_list } ) | |
count += 1 | |
return groups | |
def find_super_group( groups ): | |
#### | |
#### find_super_group finds groups which has same run in group member | |
#### groups in the super-group may have similar course. | |
#### this result will be varied by setting of CORR_THRESHOLD and DIST_THRESHOLD. | |
#### | |
d_list = [ pd.Series( 1, index = g[ "fn_list" ] ) for g in groups ] | |
chk_df = pd.concat( d_list, axis = 1 ) | |
chk_df[ "dup" ] = chk_df.sum( axis = 1 ) | |
if DEBUG: | |
pd.set_option( 'max_columns', 1024 ) | |
pd.set_option( 'max_rows', 500 ) | |
pd.set_option('display.expand_frame_repr', False) | |
print( chk_df ) | |
chk_df = chk_df[ chk_df[ "dup" ] >= 2 ] | |
chk_df.drop( "dup", axis=1, inplace = True ) | |
chk_df = chk_df.T | |
chk_df[ "pick" ] = chk_df.sum( axis = 1 ) | |
chk_df = chk_df[ chk_df[ "pick" ] != 0 ] | |
chk_df.drop( "pick", axis=1, inplace = True ) | |
chk_df.fillna( 0, inplace = True ) | |
sg_ar = [] | |
for fn in chk_df.columns: | |
t = list( chk_df[ chk_df[ fn ] != 0 ].index.values ) | |
append_flag = True | |
for sg in sg_ar: | |
if 0 != len( set( t ) & set( sg ) ): | |
sg.extend( t ) | |
append_flag = False | |
continue | |
if append_flag: | |
sg_ar.append( t ) | |
r_ar = [] | |
for sg in sg_ar: | |
r_ar.append( list( set( sg ) ) ) | |
return r_ar | |
def get_dist_based_coodinates( data, stops ): | |
y = [] | |
x = [] | |
s = 0 | |
for i in range( len( data.index ) ): | |
if s <= data[ "distance" ][ i ]: | |
y.append( data[ "position_lat" ][ i ] ) | |
x.append( data[ "position_long" ][ i ] ) | |
s += stops | |
d = [ x * stops for x in range( len( y ) ) ] | |
return pd.DataFrame( { "x": x, "y": y, "d": d } ) | |
def group_attr( fns, s ): | |
s = s[ fns ].T | |
s = s[ [ "nec_lat", "nec_long", "swc_lat", "swc_long", "start_position_lat", "start_position_long" ] ] | |
for p in s.keys(): | |
s[ p ] = s[ p ].apply( fu.semicircles2dgree ) | |
s = s.describe() | |
sd = fu.att_core( s ) | |
sd[ "start_long" ] = s[ "start_position_lat" ][ "mean" ] | |
sd[ "start_lat" ] = s[ "start_position_long" ][ "mean" ] | |
return sd | |
def plot_groups( groups, df_dic, s, cm_list ): | |
fig = plt.figure( figsize=( 11, 11 ) ) | |
ax = fig.add_subplot( 111, projection = "3d" ) | |
fns = sum( [ g[ "fn_list" ] for g in groups ], [] ) | |
sd = group_attr( fns, s ) | |
start_lat = sd[ "start_lat" ] | |
start_long = sd[ "start_long" ] | |
v_start_ofst = sd[ "Cv" ] * start_lat | |
h_start_ofst = sd[ "Ch" ] * start_long | |
sd[ "north" ] -= v_start_ofst | |
sd[ "south" ] -= v_start_ofst | |
sd[ "east" ] -= h_start_ofst | |
sd[ "west" ] -= h_start_ofst | |
list_count = 0 | |
for g in groups: | |
if list_count == 0: | |
map_arr = get_map( ax, "low", sd ) | |
cm = cm_list[ list_count ] | |
count = 0 | |
for fn in g[ "fn_list" ]: | |
df = df_dic[ fn ] | |
x = df[ "x" ].apply( lambda x: sd[ "Ch" ] * (x - start_long) ) | |
y = df[ "y" ].apply( lambda y: sd[ "Cv" ] * (y - start_lat ) ) | |
d = df[ "d" ] / 1000 | |
clr, alph = (cm[ count ], .2) | |
plt.plot( x, y, d, color = clr, alpha = alph ) | |
count += 1 | |
list_count += 1 | |
fn1 = { g[ "num" ]: g[ "fn_list" ][ 0 ] for g in groups } # get a first file name from each group | |
list_count = 0 | |
for i, fn in fn1.items(): | |
x = df[ "x" ].apply( lambda x: sd[ "Ch" ] * (x - start_long) ).to_list() | |
y = df[ "y" ].apply( lambda y: sd[ "Cv" ] * (y - start_lat ) ).to_list() | |
d = (df[ "d" ] / 1000).to_list() | |
marktext( ax, x[ 0 ], y[ 0 ], d[ 0 ], 10, cm_list[ list_count ][ 0 ], "group{}-start".format( i ) ) | |
marktext( ax, x[ -1 ], y[ -1 ], d[ -1 ], 10, cm_list[ list_count ][ 0 ], "group{}-fin:{:.1f}km".format( i, d[ -1 ] ) ) | |
list_count += 1 | |
ax.set_xlabel( "longitude [km]" ) | |
ax.set_ylabel( "latiitude [km]" ) | |
ax.set_zlabel( "road distance [km]" ) | |
plt.show() | |
def get_colormap( n_lines ): | |
if n_lines == 1: | |
cm = [ [ 1, 0, 0 ] ] # red | |
else: | |
cm = [] | |
for x in range( n_lines + 1 ): | |
cm.append( [ 0, x / n_lines, 1 - x / n_lines ] ) | |
return cm | |
def marktext( ax, x, y, z, dotsize, color, text ): | |
ax.scatter( x, y, z, s = dotsize, color = color, alpha = 0.2 ) | |
ax.text( x, y, z, text, color = color, alpha = 0.2, size = 10, ha = "left", va = "center" ) | |
def command_line_handling(): | |
parser = argparse.ArgumentParser( description = "finding route run repeadedly :)" ) | |
parser.add_argument( "input_files", help = "input file(s) (.fit format)", nargs='*' ) | |
parser.add_argument( "--load_bin", help = "load bin file" ) | |
parser.add_argument( "--save_bin", help = "save bin file which was gathered from loaded .fit files" ) | |
parser.add_argument( "--no_cityname", help = "no showing reverse geocoding result", action = "store_true" ) | |
return parser.parse_args() | |
def get_map( axis, size_idx, lv ): | |
# finding zoom level | |
# reference: https://wiki.openstreetmap.org/wiki/Zoom_levels | |
# reference: https://wiki.openstreetmap.org/wiki/Tiles | |
ZOOM_SCALE = [ 360, | |
180, 90, 45, 22.5, 11.25, | |
5.625, 2.813, 1.406, 0.703, 0.352, | |
0.176, 0.088, 0.044, 0.022, 0.011, | |
0.005, 0.003, 0.001, 0.0005, 0.00025 | |
] | |
TILE_SIZE = 256.0 | |
size = MAP_RESOLUTION[ size_idx ] | |
span = lv[ "vh_span" ] * TILE_SIZE / size | |
for zoom_level in range( 1, len( ZOOM_SCALE) + 1 ): | |
if (span / lv[ "Ch" ]) > ZOOM_SCALE[ zoom_level ]: | |
break | |
zoom_level -= 1 | |
map_span_by_size = (fu.K * lv[ "Rcv" ]) / (2 ** zoom_level) * (size / TILE_SIZE) | |
size = int( np.ceil( size * (lv[ "vh_span" ] / map_span_by_size) ) ) | |
context = staticmaps.Context() | |
context.set_tile_provider( staticmaps.tile_provider_OSM ) | |
context.set_zoom( zoom_level ) | |
context.set_center( staticmaps.create_latlng( lv[ "v_cntr_deg" ], lv[ "h_cntr_deg" ] ) ) | |
image = context.render_cairo( size, size ) | |
# image.write_to_png("_map_img.png") | |
buf = image.get_data() | |
arri = np.ndarray( shape=( size, size, 4 ), dtype = np.uint8, buffer = buf ).astype(np.uint8) | |
ar = np.array( arri / 255.0, dtype = np.float16 ) | |
arr = np.ndarray( shape=( size, size, 4 ), dtype = np.float16 ) | |
for x in range( size ): | |
for y in range( size ): | |
arr[ y ][ (size - 1) - x ] = [ ar[ x ][ y ][ 2 ], ar[ x ][ y ][ 1 ], ar[ x ][ y ][ 0 ], 0.1 ] | |
surface_x = [ [x] for x in np.linspace( lv[ "west" ], lv[ "east" ], size ) ] | |
surface_y = [ y for y in np.linspace( lv[ "south" ], lv[ "north" ] , size ) ] | |
stride = 1 | |
axis.plot_surface( surface_x, surface_y, np.atleast_2d( 0 ), rstride = stride, cstride = stride, facecolors = arr, shade = False ) | |
return arr | |
def cell_check( groups, s ): | |
#### | |
#### cell check finds cells. | |
#### "cell" is defined as rectangle area on map shaed by multiple route groups. | |
#### | |
cell_list = [] | |
gr_ss_dic_list = [] | |
for g in groups: | |
group_number = g[ "num" ] | |
file_name_list = g[ "fn_list" ] | |
gr_ss_dic_list.append( { "num": group_number, "attr": group_attr( file_name_list, s.T ) } ) | |
for gr_ss_dic in gr_ss_dic_list: | |
group_number = gr_ss_dic[ "num" ] | |
gr_ss = gr_ss_dic[ "attr" ] | |
if 0 == len( cell_list ): | |
c = cell( gr_ss, gr_ss_dic ) | |
cell_list.append( c ) | |
continue | |
ts, tn = gr_ss["s_deg"], gr_ss["n_deg"] | |
tw, te = gr_ss["w_deg"], gr_ss["e_deg"] | |
for c in cell_list: | |
cs, cn = c[ "s_deg" ], c[ "n_deg" ] | |
cw, ce = c[ "w_deg" ], c[ "e_deg" ] | |
hit = False | |
if not ((ce < tw) or (te < cw) or (cn < ts) or (tn < cs)): | |
hit = True | |
break | |
if not ((ce < tw) or (te < cw) or (cn < ts) or (tn < cs)): | |
c[ "n_deg" ] = tn if cn < tn else cn | |
c[ "s_deg" ] = ts if ts < cs else cs | |
c[ "e_deg" ] = te if ce < te else ce | |
c[ "w_deg" ] = tw if tw < cw else cw | |
c[ "gssd_list" ].append( gr_ss_dic ) | |
else: | |
c = cell( gr_ss, gr_ss_dic ) | |
cell_list.append( c ) | |
print( " {} cells".format( len( cell_list ) ) ) | |
for c in cell_list: | |
print( " {} groups: ".format( len( c[ "gssd_list" ] ) ), end = "" ) | |
print( [ gssd[ "num" ] for gssd in c[ "gssd_list" ] ] ) | |
return cell_list | |
def cell( gs, gssd ): | |
c = {} | |
c[ "n_deg" ] = gs[ "n_deg" ] | |
c[ "s_deg" ] = gs[ "s_deg" ] | |
c[ "e_deg" ] = gs[ "e_deg" ] | |
c[ "w_deg" ] = gs[ "w_deg" ] | |
c[ "gssd_list" ] = [ gssd ] | |
return c | |
def draw_cell_group_rect( cell_list ): | |
fig = plt.figure() | |
ax = plt.axes() | |
rects = [] | |
for c in cell_list: | |
rects.append( rect( c, "r" ) ) | |
for gsd in c[ "gssd_list" ]: | |
rects.append( rect( gsd[ "attr" ], "b" ) ) | |
for r in rects: | |
ax.add_patch( r ) | |
plt.axis('scaled') | |
ax.set_aspect('equal') | |
plt.show() | |
def rect( s, c ): | |
r = patches.Rectangle(xy=(s["w_deg"], s["s_deg"]), width=(s["e_deg"]-s["w_deg"]), height=(s["n_deg"]-s["s_deg"]), ec=c, fill=False) | |
return r | |
if __name__ == "__main__": | |
args = command_line_handling() | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment