Last active
December 17, 2023 12:25
-
-
Save trueroad/bb4e2ac32c5006695249cb9d71488420 to your computer and use it in GitHub Desktop.
Calc time used.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
""" | |
Calc time used. | |
https://gist.github.com/trueroad/bb4e2ac32c5006695249cb9d71488420 | |
Copyright (C) 2023 Masamichi Hosoda. | |
All rights reserved. | |
Redistribution and use in source and binary forms, with or without | |
modification, are permitted provided that the following conditions | |
are met: | |
* Redistributions of source code must retain the above copyright notice, | |
this list of conditions and the following disclaimer. | |
* Redistributions in binary form must reproduce the above copyright notice, | |
this list of conditions and the following disclaimer in the documentation | |
and/or other materials provided with the distribution. | |
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" | |
AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE | |
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE | |
ARE DISCLAIMED. | |
IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE | |
FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL | |
DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS | |
OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) | |
HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT | |
LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY | |
OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF | |
SUCH DAMAGE. | |
使用方法 | |
以下のような日時列と ID 列を含む利用ログの CSV を入力とする。 | |
UTF-8 BOM 付きなどでよい。改行コードは自動判別される。 | |
複数指定も可能、縦に連結してから集計する。 | |
日時列の名前は `Datetime` ID 列の名前は `ID` としている | |
(列名はコマンドラインオプションで変更可能)。 | |
名前で見ているため、列の順番は違ってもよい。 | |
利用ログの間隔が 20 分未満ならば利用していたとみなして利用時間に加算し、 | |
20 分以上ならば利用していなかったとみなして利用時間に加算しない | |
(このスレッショルドはコマンドラインオプションで変更可能)。 | |
```in_example.csv | |
Datetime,ID, | |
2023-12-15 10:00:00,1, | |
2023-12-15 10:10:00,1, | |
2023-12-15 11:00:00,1, | |
2023-12-15 11:00:00,1, | |
2023-12-15 12:00:00,1, | |
2023-12-15 12:10:00,1, | |
2023-12-16 10:00:00,1, | |
2023-12-16 10:00:01,1, | |
2023-12-15 11:00:00,2, | |
2023-12-15 11:00:01,2, | |
``` | |
以下のように実行するとID 毎、日毎の利用時間を集計した CSV が得られる。 | |
``` | |
$ ./calc_time_used.py --output out_example.csv in_example.csv | |
``` | |
以下のような出力 CSV が得られる。 | |
UTF-8 BOM 付き、改行コードは実行環境依存で出力される。 | |
表頭は入力に含まれていた最初の日から最後の日まで毎日を | |
YYYY-MM-DD のように表記したもの。 | |
表側(最初の列)は ID 列。 | |
表体は利用時間(秒)。 | |
```out_example.csv | |
ID,2023-12-15,2023-12-16 | |
1,1200.0,1.0 | |
2,1.0,0.0 | |
``` | |
""" | |
import os | |
import sys | |
from typing import Any, Sequence, Union | |
import pandas as pd | |
import numpy as np | |
import numpy.typing as npt | |
class calc_time_used: | |
"""Calc class.""" | |
def __init__(self, | |
threshold_no_use_period: np.timedelta64 = | |
np.timedelta64(20, 'm'), | |
column_datetime: str = 'Datetime', | |
column_id: str = 'ID') -> None: | |
""" | |
__init__. | |
Args: | |
threshold_no_use_period (np.Timedelta64): | |
無利用とみなす利用ログ間隔のスレッショルド | |
間隔が本スレッショルド | |
未満なら利用していたとみなして利用時間に加算し、 | |
以上なら利用していなかったとみなして利用時間に加算しない | |
column_datetime (str): | |
日時を含む列の名前 | |
column_id (str): | |
ID を含む列の名前 | |
""" | |
if threshold_no_use_period > np.timedelta64(1, 'D'): | |
# 無利用ログ間隔スレッショルドが長すぎる(丸一日より長い) | |
raise ValueError('Threshold is too long.') | |
self.threshold_no_use_period: np.timedelta64 = \ | |
threshold_no_use_period | |
self.column_datetime: str = column_datetime | |
self.column_id: str = column_id | |
# 中間処理で追加する日付だけの列の名前 | |
self.column_date: str = \ | |
'added-date-for-processing-in-calc-time-used-class' | |
def calc(self, df: pd.DataFrame) -> pd.DataFrame: | |
""" | |
計算メイン. | |
Args: | |
df (pd.DataFrame): | |
日時列と ID 列を含む利用ログのデータフレーム | |
Returns: | |
pd.DataFrame: | |
ID 毎、日毎の利用時間集計結果 | |
表頭は YYYY-MM-DD 形式の日付、表側は ID、表体は利用時間(秒) | |
""" | |
if self.column_date in df.columns: | |
# 追加したい列名が既に存在している | |
raise RuntimeError('Column name to be added alreay exists.') | |
# 作業用のデータフレームにコピー | |
df_in: pd.DataFrame = df.copy() | |
# 日付だけの列を追加(日へのキャストは仕様が不明なので避けた) | |
df_in[self.column_date] = df_in[self.column_datetime].map( | |
lambda x: pd.Timestamp(year=x.year, | |
month=x.month, | |
day=x.day)) | |
# 含まれている日付の一覧 | |
dates: npt.NDArray[np.datetime64] = \ | |
np.sort(df_in[self.column_date].unique()) | |
# 最初の日付 | |
date_begin: np.datetime64 = np.datetime64(dates[0], 'D') | |
# 最後の日付 | |
date_last: np.datetime64 = np.datetime64(dates[-1], 'D') | |
print(f'Date: {date_begin} ~ {date_last}') | |
# 最初から最後まで毎日 | |
date_all: npt.NDArray[np.datetime64] = \ | |
np.arange(date_begin, date_last + np.timedelta64(1, 'D')) | |
# 出力用データフレーム、列名(表頭)は YYYY-MM-DD 形式 | |
df_out: pd.DataFrame = pd.DataFrame(columns=date_all.astype(str), | |
dtype=np.float64) | |
# インデックスのタイトルに入力の ID 列名を設定 | |
df_out.index.name = self.column_id | |
# ID の一覧 | |
ids: npt.NDArray[Any] = np.sort(df_in[self.column_id].unique()) | |
id: str | |
for id in ids: | |
# ID 毎の処理 | |
print(f'Calculating ID: {id} ...') | |
# 出力用データフレームに ID 行を追加、初期値はすべての日で 0.0 秒 | |
df_out.loc[id] = 0.0 | |
# 入力用データフレームから該当ID のみを抽出 | |
df_id: pd.DataFrame = df_in[df_in[self.column_id] == id] | |
# 日時の一覧 | |
datetimes: npt.NDArray[np.datetime64] = \ | |
np.sort(df_id[self.column_datetime].unique()) | |
# 日時ループ中で使う一つ前の日時を初期化 | |
t_before: np.datetime64 = datetimes[0] | |
t: np.datetime64 | |
for t in datetimes: | |
# 入力中の記録日時毎の処理 | |
# 現在処理中の日時 t と一つ前の日時 t_before を比較する | |
if t < t_before: | |
# 順番がおかしい、ソートがおかしい? | |
raise RuntimeError('Sort failed.') | |
if t == t_before: | |
# 同時刻ならスキップ | |
continue | |
if (t - t_before) >= self.threshold_no_use_period: | |
# 間隔が無利用ログ間隔スレッショルド以上 | |
# 利用していないとみなして次の日時の処理へ | |
t_before = t | |
continue | |
# 間隔が無利用ログ間隔スレッショルド未満 | |
# 利用時間に加算する | |
# 一旦 pandas の Timestamp 型に変換する | |
p = pd.Timestamp(t) | |
p_before = pd.Timestamp(t_before) | |
# 列名(YYYY-MM-DD 形式)を生成 | |
col = str(np.datetime64(pd.Timestamp(year=p.year, | |
month=p.month, | |
day=p.day), 'D')) | |
if p.year == p_before.year and \ | |
p.month == p_before.month and \ | |
p.day == p_before.day: | |
# 前後で日付が一致 | |
# 差分を該当日付の列へ加算 | |
df_out.at[id, col] += (p - p_before).total_seconds() | |
else: | |
# 前後で日を跨いでいる | |
print('*** Across two days ***') | |
# 日付境界時刻 | |
boundary: pd.Timestamp = pd.Timestamp(year=p.year, | |
month=p.month, | |
day=p.day) | |
# 前半用の列名(YYYY-MM-DD 形式)を生成 | |
col_before = \ | |
str(np.datetime64(pd.Timestamp(year=p_before.year, | |
month=p_before.month, | |
day=p_before.day), 'D')) | |
# 日付境界時刻までを前半の日付の列へ加算 | |
df_out.at[id, col_before] += \ | |
(boundary - p_before).total_seconds() | |
# 日付境界時刻以降を後半の日付の列へ加算 | |
df_out.at[id, col] += (p - boundary).total_seconds() | |
print(' Total : ' | |
f'{(p - p_before).total_seconds()} [s]') | |
print(f' {col_before}: ' | |
f'{(boundary - p_before).total_seconds()} [s]') | |
print(f' {col}: ' | |
f'{(p - boundary).total_seconds()} [s]') | |
# 次の日時の処理へ | |
t_before = t | |
return df_out | |
def process_csv(self, | |
filename_ins: Sequence[Union[str, os.PathLike[str]]], | |
filename_out: Union[str, os.PathLike[str]]) -> bool: | |
""" | |
CSV ファイルを処理する. | |
Args: | |
filename_ins (Sequence[Union[str, os.PathLike[str]]]): | |
入力 CSV ファイルのリスト | |
filename_out (Union[str, os.PathLike[str]]): | |
出力 CSV ファイル | |
Returns: | |
bool: True なら成功、False なら失敗 | |
""" | |
# 空のデータフレームを用意し複数の入力を順次読み込んで結合 | |
df: pd.DataFrame = pd.DataFrame() | |
filename_in: Union[str, os.PathLike[str]] | |
for filename_in in filename_ins: | |
print(f'Loading: {filename_in} ...') | |
df_load: pd.DataFrame = \ | |
pd.read_csv(filename_in, | |
parse_dates=[self.column_datetime], | |
dtype={self.column_id: str}) | |
df = pd.concat([df, df_load]) | |
# 計算する | |
df_out: pd.DataFrame = self.calc(df) | |
# 集計結果を出力 | |
print(f'Writing: {filename_out} ...') | |
# UTF-8 BOM 付き CSVで出力(Excel で開けるように) | |
df_out.to_csv(filename_out, encoding='utf_8_sig') | |
return True | |
class commandline(): | |
"""Commandline option class.""" | |
def __init__(self) -> None: | |
"""___init__.""" | |
self.default_th_no_use_period: int = 20 | |
self.default_unit_no_use_period: str = 'm' | |
self.default_col_datetime: str = 'Datetime' | |
self.default_col_id: str = 'ID' | |
def parse(self) -> tuple[list[str], str, np.timedelta64, str, str]: | |
""" | |
コマンドラインをパースする. | |
Returns: | |
tuple: | |
list[str]: 入力 CSV ファイル名のリスト | |
str: 出力 CSV ファイル名 | |
np.timedelta64: 無利用とみなす利用ログ間隔のスレッショルド | |
str: 日時を含む列の名前 | |
str: ID を含む列の名前 | |
""" | |
import argparse | |
parser: argparse.ArgumentParser = argparse.ArgumentParser() | |
parser.add_argument('INPUT.csv', | |
help='Input CSV filename', | |
type=str, | |
nargs='+') | |
parser.add_argument('--output', | |
help='Output CSV filename', | |
type=str, | |
required=True) | |
parser.add_argument('--threshold-no-use-period', | |
help='Threshold of no use period', | |
type=int, default=self.default_th_no_use_period, | |
required=False) | |
parser.add_argument('--unit-no-use-period', | |
help='Unit of no use period', | |
type=str, default=self.default_unit_no_use_period, | |
required=False) | |
parser.add_argument('--column-datetime', | |
help='Datetime column name', | |
type=str, default=self.default_col_datetime, | |
required=False) | |
parser.add_argument('--column-id', | |
help='ID column name', | |
type=str, default=self.default_col_id, | |
required=False) | |
args: argparse.Namespace = parser.parse_args() | |
vargs: dict[str, Any] = vars(args) | |
input_filenames: list[str] = vargs['INPUT.csv'] | |
output_filename: str = vargs['output'] | |
th_no_use_period: int = vargs['threshold_no_use_period'] | |
unit_no_use_period: str = vargs['unit_no_use_period'] | |
col_datetime: str = vargs['column_datetime'] | |
col_id: str = vargs['column_id'] | |
print('Filenames\n' | |
f' Input filenames : {input_filenames}\n' | |
f' Output filename : {output_filename}\n' | |
'Threshold parameter\n' | |
f' No use period : {th_no_use_period}' | |
f' [{unit_no_use_period}]\n' | |
'Column names\n' | |
f' Datetime : {col_datetime}\n' | |
f' ID : {col_id}\n') | |
return (input_filenames, | |
output_filename, | |
np.timedelta64(th_no_use_period, unit_no_use_period), | |
col_datetime, | |
col_id) | |
def main() -> None: | |
"""Do main.""" | |
print(f'Calc time used.\n\n' | |
'https://gist.github.com/trueroad/' | |
'bb4e2ac32c5006695249cb9d71488420\n\n' | |
'Copyright (C) 2023 Masamichi Hosoda.\n' | |
'All rights reserved.\n') | |
cl: commandline = commandline() | |
input_filenames: list[str] | |
output_filename: str | |
threshold_no_use_period: np.timedelta64 | |
col_datetime: str | |
col_id: str | |
input_filenames, output_filename, \ | |
threshold_no_use_period, \ | |
col_datetime, col_id = cl.parse() | |
ctu: calc_time_used = calc_time_used( | |
threshold_no_use_period=threshold_no_use_period, | |
column_datetime=col_datetime, | |
column_id=col_id) | |
ctu.process_csv(input_filenames, output_filename) | |
print('Done.') | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment