Skip to content

Instantly share code, notes, and snippets.

@trueroad
Last active December 17, 2023 12:25
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save trueroad/bb4e2ac32c5006695249cb9d71488420 to your computer and use it in GitHub Desktop.
Save trueroad/bb4e2ac32c5006695249cb9d71488420 to your computer and use it in GitHub Desktop.
Calc time used.
#!/usr/bin/env python3
"""
Calc time used.
https://gist.github.com/trueroad/bb4e2ac32c5006695249cb9d71488420
Copyright (C) 2023 Masamichi Hosoda.
All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions
are met:
* Redistributions of source code must retain the above copyright notice,
this list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above copyright notice,
this list of conditions and the following disclaimer in the documentation
and/or other materials provided with the distribution.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
ARE DISCLAIMED.
IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
SUCH DAMAGE.
使用方法
以下のような日時列と ID 列を含む利用ログの CSV を入力とする。
UTF-8 BOM 付きなどでよい。改行コードは自動判別される。
複数指定も可能、縦に連結してから集計する。
日時列の名前は `Datetime` ID 列の名前は `ID` としている
(列名はコマンドラインオプションで変更可能)。
名前で見ているため、列の順番は違ってもよい。
利用ログの間隔が 20 分未満ならば利用していたとみなして利用時間に加算し、
20 分以上ならば利用していなかったとみなして利用時間に加算しない
(このスレッショルドはコマンドラインオプションで変更可能)。
```in_example.csv
Datetime,ID,
2023-12-15 10:00:00,1,
2023-12-15 10:10:00,1,
2023-12-15 11:00:00,1,
2023-12-15 11:00:00,1,
2023-12-15 12:00:00,1,
2023-12-15 12:10:00,1,
2023-12-16 10:00:00,1,
2023-12-16 10:00:01,1,
2023-12-15 11:00:00,2,
2023-12-15 11:00:01,2,
```
以下のように実行するとID 毎、日毎の利用時間を集計した CSV が得られる。
```
$ ./calc_time_used.py --output out_example.csv in_example.csv
```
以下のような出力 CSV が得られる。
UTF-8 BOM 付き、改行コードは実行環境依存で出力される。
表頭は入力に含まれていた最初の日から最後の日まで毎日を
YYYY-MM-DD のように表記したもの。
表側(最初の列)は ID 列。
表体は利用時間(秒)。
```out_example.csv
ID,2023-12-15,2023-12-16
1,1200.0,1.0
2,1.0,0.0
```
"""
import os
import sys
from typing import Any, Sequence, Union
import pandas as pd
import numpy as np
import numpy.typing as npt
class calc_time_used:
"""Calc class."""
def __init__(self,
threshold_no_use_period: np.timedelta64 =
np.timedelta64(20, 'm'),
column_datetime: str = 'Datetime',
column_id: str = 'ID') -> None:
"""
__init__.
Args:
threshold_no_use_period (np.Timedelta64):
無利用とみなす利用ログ間隔のスレッショルド
間隔が本スレッショルド
未満なら利用していたとみなして利用時間に加算し、
以上なら利用していなかったとみなして利用時間に加算しない
column_datetime (str):
日時を含む列の名前
column_id (str):
ID を含む列の名前
"""
if threshold_no_use_period > np.timedelta64(1, 'D'):
# 無利用ログ間隔スレッショルドが長すぎる(丸一日より長い)
raise ValueError('Threshold is too long.')
self.threshold_no_use_period: np.timedelta64 = \
threshold_no_use_period
self.column_datetime: str = column_datetime
self.column_id: str = column_id
# 中間処理で追加する日付だけの列の名前
self.column_date: str = \
'added-date-for-processing-in-calc-time-used-class'
def calc(self, df: pd.DataFrame) -> pd.DataFrame:
"""
計算メイン.
Args:
df (pd.DataFrame):
日時列と ID 列を含む利用ログのデータフレーム
Returns:
pd.DataFrame:
ID 毎、日毎の利用時間集計結果
表頭は YYYY-MM-DD 形式の日付、表側は ID、表体は利用時間(秒)
"""
if self.column_date in df.columns:
# 追加したい列名が既に存在している
raise RuntimeError('Column name to be added alreay exists.')
# 作業用のデータフレームにコピー
df_in: pd.DataFrame = df.copy()
# 日付だけの列を追加(日へのキャストは仕様が不明なので避けた)
df_in[self.column_date] = df_in[self.column_datetime].map(
lambda x: pd.Timestamp(year=x.year,
month=x.month,
day=x.day))
# 含まれている日付の一覧
dates: npt.NDArray[np.datetime64] = \
np.sort(df_in[self.column_date].unique())
# 最初の日付
date_begin: np.datetime64 = np.datetime64(dates[0], 'D')
# 最後の日付
date_last: np.datetime64 = np.datetime64(dates[-1], 'D')
print(f'Date: {date_begin} ~ {date_last}')
# 最初から最後まで毎日
date_all: npt.NDArray[np.datetime64] = \
np.arange(date_begin, date_last + np.timedelta64(1, 'D'))
# 出力用データフレーム、列名(表頭)は YYYY-MM-DD 形式
df_out: pd.DataFrame = pd.DataFrame(columns=date_all.astype(str),
dtype=np.float64)
# インデックスのタイトルに入力の ID 列名を設定
df_out.index.name = self.column_id
# ID の一覧
ids: npt.NDArray[Any] = np.sort(df_in[self.column_id].unique())
id: str
for id in ids:
# ID 毎の処理
print(f'Calculating ID: {id} ...')
# 出力用データフレームに ID 行を追加、初期値はすべての日で 0.0 秒
df_out.loc[id] = 0.0
# 入力用データフレームから該当ID のみを抽出
df_id: pd.DataFrame = df_in[df_in[self.column_id] == id]
# 日時の一覧
datetimes: npt.NDArray[np.datetime64] = \
np.sort(df_id[self.column_datetime].unique())
# 日時ループ中で使う一つ前の日時を初期化
t_before: np.datetime64 = datetimes[0]
t: np.datetime64
for t in datetimes:
# 入力中の記録日時毎の処理
# 現在処理中の日時 t と一つ前の日時 t_before を比較する
if t < t_before:
# 順番がおかしい、ソートがおかしい?
raise RuntimeError('Sort failed.')
if t == t_before:
# 同時刻ならスキップ
continue
if (t - t_before) >= self.threshold_no_use_period:
# 間隔が無利用ログ間隔スレッショルド以上
# 利用していないとみなして次の日時の処理へ
t_before = t
continue
# 間隔が無利用ログ間隔スレッショルド未満
# 利用時間に加算する
# 一旦 pandas の Timestamp 型に変換する
p = pd.Timestamp(t)
p_before = pd.Timestamp(t_before)
# 列名(YYYY-MM-DD 形式)を生成
col = str(np.datetime64(pd.Timestamp(year=p.year,
month=p.month,
day=p.day), 'D'))
if p.year == p_before.year and \
p.month == p_before.month and \
p.day == p_before.day:
# 前後で日付が一致
# 差分を該当日付の列へ加算
df_out.at[id, col] += (p - p_before).total_seconds()
else:
# 前後で日を跨いでいる
print('*** Across two days ***')
# 日付境界時刻
boundary: pd.Timestamp = pd.Timestamp(year=p.year,
month=p.month,
day=p.day)
# 前半用の列名(YYYY-MM-DD 形式)を生成
col_before = \
str(np.datetime64(pd.Timestamp(year=p_before.year,
month=p_before.month,
day=p_before.day), 'D'))
# 日付境界時刻までを前半の日付の列へ加算
df_out.at[id, col_before] += \
(boundary - p_before).total_seconds()
# 日付境界時刻以降を後半の日付の列へ加算
df_out.at[id, col] += (p - boundary).total_seconds()
print(' Total : '
f'{(p - p_before).total_seconds()} [s]')
print(f' {col_before}: '
f'{(boundary - p_before).total_seconds()} [s]')
print(f' {col}: '
f'{(p - boundary).total_seconds()} [s]')
# 次の日時の処理へ
t_before = t
return df_out
def process_csv(self,
filename_ins: Sequence[Union[str, os.PathLike[str]]],
filename_out: Union[str, os.PathLike[str]]) -> bool:
"""
CSV ファイルを処理する.
Args:
filename_ins (Sequence[Union[str, os.PathLike[str]]]):
入力 CSV ファイルのリスト
filename_out (Union[str, os.PathLike[str]]):
出力 CSV ファイル
Returns:
bool: True なら成功、False なら失敗
"""
# 空のデータフレームを用意し複数の入力を順次読み込んで結合
df: pd.DataFrame = pd.DataFrame()
filename_in: Union[str, os.PathLike[str]]
for filename_in in filename_ins:
print(f'Loading: {filename_in} ...')
df_load: pd.DataFrame = \
pd.read_csv(filename_in,
parse_dates=[self.column_datetime],
dtype={self.column_id: str})
df = pd.concat([df, df_load])
# 計算する
df_out: pd.DataFrame = self.calc(df)
# 集計結果を出力
print(f'Writing: {filename_out} ...')
# UTF-8 BOM 付き CSVで出力(Excel で開けるように)
df_out.to_csv(filename_out, encoding='utf_8_sig')
return True
class commandline():
"""Commandline option class."""
def __init__(self) -> None:
"""___init__."""
self.default_th_no_use_period: int = 20
self.default_unit_no_use_period: str = 'm'
self.default_col_datetime: str = 'Datetime'
self.default_col_id: str = 'ID'
def parse(self) -> tuple[list[str], str, np.timedelta64, str, str]:
"""
コマンドラインをパースする.
Returns:
tuple:
list[str]: 入力 CSV ファイル名のリスト
str: 出力 CSV ファイル名
np.timedelta64: 無利用とみなす利用ログ間隔のスレッショルド
str: 日時を含む列の名前
str: ID を含む列の名前
"""
import argparse
parser: argparse.ArgumentParser = argparse.ArgumentParser()
parser.add_argument('INPUT.csv',
help='Input CSV filename',
type=str,
nargs='+')
parser.add_argument('--output',
help='Output CSV filename',
type=str,
required=True)
parser.add_argument('--threshold-no-use-period',
help='Threshold of no use period',
type=int, default=self.default_th_no_use_period,
required=False)
parser.add_argument('--unit-no-use-period',
help='Unit of no use period',
type=str, default=self.default_unit_no_use_period,
required=False)
parser.add_argument('--column-datetime',
help='Datetime column name',
type=str, default=self.default_col_datetime,
required=False)
parser.add_argument('--column-id',
help='ID column name',
type=str, default=self.default_col_id,
required=False)
args: argparse.Namespace = parser.parse_args()
vargs: dict[str, Any] = vars(args)
input_filenames: list[str] = vargs['INPUT.csv']
output_filename: str = vargs['output']
th_no_use_period: int = vargs['threshold_no_use_period']
unit_no_use_period: str = vargs['unit_no_use_period']
col_datetime: str = vargs['column_datetime']
col_id: str = vargs['column_id']
print('Filenames\n'
f' Input filenames : {input_filenames}\n'
f' Output filename : {output_filename}\n'
'Threshold parameter\n'
f' No use period : {th_no_use_period}'
f' [{unit_no_use_period}]\n'
'Column names\n'
f' Datetime : {col_datetime}\n'
f' ID : {col_id}\n')
return (input_filenames,
output_filename,
np.timedelta64(th_no_use_period, unit_no_use_period),
col_datetime,
col_id)
def main() -> None:
"""Do main."""
print(f'Calc time used.\n\n'
'https://gist.github.com/trueroad/'
'bb4e2ac32c5006695249cb9d71488420\n\n'
'Copyright (C) 2023 Masamichi Hosoda.\n'
'All rights reserved.\n')
cl: commandline = commandline()
input_filenames: list[str]
output_filename: str
threshold_no_use_period: np.timedelta64
col_datetime: str
col_id: str
input_filenames, output_filename, \
threshold_no_use_period, \
col_datetime, col_id = cl.parse()
ctu: calc_time_used = calc_time_used(
threshold_no_use_period=threshold_no_use_period,
column_datetime=col_datetime,
column_id=col_id)
ctu.process_csv(input_filenames, output_filename)
print('Done.')
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment