Skip to content

Instantly share code, notes, and snippets.

Created February 4, 2023 21:31
Utility to aggregate time series (typically, a log file) in CSV format
import pendulum
import argparse
import csv
parser = argparse.ArgumentParser(description='Aggregate time series data')
parser.add_argument('filename', type=str,
help='a CSV file to process')
parser.add_argument('--column', type=str, default='time',
help='in which column is the datetime (default: time)')
parser.add_argument('--period', type=str, default='minute',
help='time period: second, hour, .. (default: minute)')
parser.add_argument('--sum', type=str, default=None,
help='optional column to sum-up (default: +1)')
parser.add_argument('--name', type=str, default='sum',
help='name of sum-up column (default: sum)')
args = parser.parse_args()
last_time = None
last_row = None
last_sum = 0
def to_num(val):
if '.' in val:
return float(val)
return int(val)
def print_last_row():
if last_row is None:
# Output with previous row counter values
last_row[args.column] = last_time
last_row[] = last_sum
print(','.join([str(v) for v in last_row.values()]))
with open(args.filename, newline='') as csvfile:
doctorwho = csv.DictReader(csvfile)
for row in doctorwho:
# Get the date time column
t = row[args.column]
# Parse date and go back to start of selected period
pt = pendulum.parse(t)
ptt = pt.start_of(args.period)
# First time through, output the header
if last_time is None:
last_time = ptt
last_row = row
print(','.join(row.keys()) + ',' +
elif last_time == ptt:
# Append to row summation
if args.sum is not None:
last_sum = last_sum + to_num(row[args.sum])
last_sum = last_sum + 1
last_row = row
last_time = ptt
last_row = None
last_sum = 0
# Print the last row's data
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment