Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Turn an array of heartbeats into durations by combining heartbeats that are within Timeout minutes from each other.
/*
BSD 3-Clause License
Copyright (c) 2019 Alan Hamlett.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:
* Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above copyright
notice, this list of conditions and the following disclaimer
in the documentation and/or other materials provided
with the distribution.
* Neither the names of WakaTime, nor the names of its
contributors may be used to endorse or promote products derived
from this software without specific prior written permission.
THIS SOFTWARE AND DOCUMENTATION IS PROVIDED BY THE COPYRIGHT HOLDERS AND
CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT
NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER
OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
$(function() {
new (Backbone.View.extend({
data: {
heartbeats: new (Backbone.Collection.extend({
url: '/api/v1/users/current/heartbeats',
parse: function(response) {
this.start = response.start;
this.end = response.end;
this.yesterday_heartbeat = response.yesterday_heartbeat;
this.tomorrow_heartbeat = response.tomorrow_heartbeat;
return response.data;
},
}))(),
},
processHeartbeatsIntoDurations: function() {
var start = utils.toMoment(this.data.heartbeats.start, bootstrapped.timezone).unix();
var end = utils.toMoment(this.data.heartbeats.end, bootstrapped.timezone).unix();
var heartbeats = this.data.heartbeats.toJSON();
if (heartbeats.length > 0) {
var yesterday = this.data.heartbeats.yesterday_heartbeat;
if (yesterday && heartbeats[0].time - yesterday.time < bootstrapped.timeout * 60) {
yesterday.time = this.round(start, 6);
heartbeats.unshift(yesterday);
}
var tomorrow = this.data.heartbeats.tomorrow_heartbeat;
if (tomorrow && tomorrow.time - _.last(heartbeats).time < bootstrapped.timeout * 60) {
tomorrow.time = this.round(end, 6);
heartbeats.push(tomorrow);
}
}
var mini_durations = this.heartbeatsToMiniDurations(heartbeats);
var durations = this.combineMiniDurations(mini_durations);
this.durations = durations;
console.log(durations);
},
heartbeatsToMiniDurations: function(heartbeats) {
_.each(
heartbeats,
_.bind(function(heartbeat, index) {
var duration = 0;
var next_heartbeat = index < heartbeats.length - 1 ? heartbeats[index + 1] : undefined;
heartbeat.project = heartbeat.project || 'Unknown Project';
heartbeat.language = heartbeat.language || 'Other';
if (next_heartbeat && next_heartbeat.time - heartbeat.time < bootstrapped.timeout * 60) {
duration = next_heartbeat.time - heartbeat.time;
if (duration < 0) duration = 0;
}
heartbeat.duration = duration;
}, this),
);
return heartbeats;
},
combineMiniDurations: function(mini_durations) {
var last = null;
var durations = [];
var filters = {};
_.each(
mini_durations,
function(duration) {
if (duration['is_write']) filters[this.lower(duration['project'])] = true;
},
this,
);
for (var i = 0; i < mini_durations.length; i++) {
var duration = mini_durations[i];
// apply write filter
if (bootstrapped.writes_only && !filters[this.lower(duration['project'])]) {
last = null;
continue;
}
// combine durations
var current = durations.length > 0 ? _.last(durations) : null;
if (current && this.shouldJoinDuration(duration, last, bootstrapped.timeout)) {
var endtime = this.round(duration['time'], 2) + this.round(duration['duration'], 2);
current['duration'] = this.round(endtime - this.round(current['time'], 2));
if (current['duration'] < 0) current['duration'] = 0;
} else {
current = _.clone(duration);
durations.push(current);
}
last = duration;
}
return durations;
},
shouldJoinDuration: function(duration, last) {
if (!last) return false;
if (this.lower(last.project) != this.lower(duration.project)) return false;
var last_time = last['time'] + last['duration'];
var time_span = duration['time'] - last_time;
if (time_span < 0) time_span = 0;
if (time_span > this.timeout * 60) return false;
return true;
},
lower: function(str) {
if (str) return str.toLowerCase();
return str;
},
round: function(number, precision) {
var factor = Math.pow(10, precision);
var tmp = number * factor;
var roundedTmp = Math.round(tmp);
return roundedTmp / factor;
},
error: function(object, response) {
console.log(response);
},
initialize: function() {
this.listenTo(this.data.heartbeats, 'sync', this.processHeartbeatsIntoDurations);
this.listenTo(this.data.heartbeats, 'error', this.error);
this.data.heartbeats.fetch({
data: {
date: bootstrapped.date,
surrounding_heartbeats: 'true',
},
});
},
}))();
});
"""
BSD 3-Clause License
Copyright (c) 2019 Alan Hamlett.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:
* Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above copyright
notice, this list of conditions and the following disclaimer
in the documentation and/or other materials provided
with the distribution.
* Neither the names of WakaTime, nor the names of its
contributors may be used to endorse or promote products derived
from this software without specific prior written permission.
THIS SOFTWARE AND DOCUMENTATION IS PROVIDED BY THE COPYRIGHT HOLDERS AND
CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT
NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER
OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""
"""
INSTALL: pip install requests pytz
RUN: python ./heartbeats_to_durations.py
"""
import calendar
import pytz
import requests
from decimal import Decimal
from datetime import datetime, timedelta, date as datetime_date
API_KEY = ""
START = "2019-12-01"
END = "2019-12-03"
TIMEZONE = 'America/Los_Angeles'
def get_combined_durations(
start=None,
end=None,
project=None,
branches=[],
timeout=15,
timezone=None,
):
timezone = str(timezone)
start = start_of_day(start.astimezone(pytz.timezone(timezone)))
end = end_of_day(end.astimezone(pytz.timezone(timezone)))
durations = []
current_day = start.replace(hour=11)
while current_day < end:
# calculate combined durations for this day from heartbeats
durations.extend(
combine_durations(
get_mini_durations(
day=current_day,
timeout=timeout,
),
timeout=timeout,
)
)
current_day = increment_one_day(current_day)
durations.sort(key=lambda x: x["time"])
return durations
def get_mini_durations(day=None, timeout=None):
"""Mini durations are just heartbeats with a duration attribute."""
heartbeats, yesterday, tomorrow = get_heartbeats(day=day, timeout=timeout)
if len(heartbeats) > 0:
if yesterday:
start = iso8601_to_timestamp(start_of_day(day).astimezone(pytz.utc))
yesterday['time'] = Decimal(start)
heartbeats.insert(0, yesterday)
if tomorrow:
end = iso8601_to_timestamp(end_of_day(day).astimezone(pytz.utc))
tomorrow['time'] = Decimal(end)
heartbeats.append(tomorrow)
return add_duration_data_to_heartbeats(heartbeats, timeout)
def add_duration_data_to_heartbeats(heartbeats, timeout):
"""Adds attribute 'duration' to each heartbeat."""
index = 0
for heartbeat in heartbeats:
duration = Decimal()
try:
next_heartbeat = heartbeats[index + 1]
except IndexError:
next_heartbeat = None
heartbeat['time'] = Decimal(heartbeat['time'])
if (next_heartbeat is not None and Decimal(next_heartbeat['time']) - Decimal(heartbeat['time']) < timeout * 60):
duration = Decimal(next_heartbeat['time']) - Decimal(heartbeat['time'])
if duration < 0:
duration = Decimal()
heartbeat['duration'] = duration
index += 1
return heartbeats
def combine_durations(mini_durations, timeout=None):
last = None
durations = []
for duration in mini_durations:
combine_mini_duration(durations, duration, last, timeout)
last = duration.copy()
return durations
def combine_mini_duration(durations, duration, last_duration, timeout):
if len(durations) > 0 and should_join_duration(duration, last_duration, timeout):
current = durations[-1]
endtime = Decimal(duration['time']) + Decimal(duration['duration'])
current["duration"] = endtime - Decimal(current["time"])
if current["duration"] < 0:
current["duration"] = Decimal()
else:
durations.append(duration.copy())
def should_join_duration(duration, last, timeout):
if not last:
return False
if last.get("project", '').lower() != duration.get("project", '').lower():
return False
last_time = Decimal(last["time"]) + Decimal(last["duration"])
time_span = Decimal(duration['time']) - last_time
if time_span < 0:
time_span = 0
if time_span > timeout * 60:
return False
return True
def start_of_day(dt):
"""Returns the given `datetime` object converted to 12:01 AM.
Subtracts a `timedelta` so the DST bit of the `datetime` object reflects DST
changes if they occur between the `datetime` and 12AM.
"""
if not isinstance(dt, datetime):
if not isinstance(dt, datetime_date):
return None
dt = date_to_dt(dt)
(year, month, day) = (dt.year, dt.month, dt.day)
dt = dt.replace(microsecond=0)
delta = timedelta(hours=dt.hour, minutes=dt.minute, seconds=dt.second)
dt = dt - delta
if dt.tzinfo is not None:
dt = (
datetime.utcfromtimestamp(int(round(iso8601_to_timestamp(dt))))
.replace(tzinfo=pytz.utc)
.astimezone(dt.tzinfo)
)
return dt.replace(year=year, month=month, day=day, hour=0, minute=0, second=0)
def end_of_day(dt):
"""Returns the given `datetime` object converted to 11:59 PM.
Adds a `timedelta` so the DST bit of the `datetime` object reflects DST
changes if they occur between the `datetime` and midnight.
"""
if not isinstance(dt, datetime):
if not isinstance(dt, datetime_date):
return None
dt = date_to_dt(dt)
(year, month, day) = (dt.year, dt.month, dt.day)
dt = dt.replace(microsecond=0)
delta = timedelta(
hours=(23 - dt.hour), minutes=(59 - dt.minute), seconds=(59 - dt.second)
)
dt = dt + delta
if dt.tzinfo is not None:
dt = (
datetime.utcfromtimestamp(int(round(iso8601_to_timestamp(dt))))
.replace(tzinfo=pytz.utc)
.astimezone(dt.tzinfo)
)
return dt.replace(year=year, month=month, day=day, hour=23, minute=59, second=59)
def increment_one_day(dt):
if not isinstance(dt, datetime):
if not isinstance(dt, datetime_date):
raise Exception("Expecting a date or datetime: {}".format(dt))
return dt + timedelta(days=1)
return dt.replace(hour=11) + timedelta(hours=26)
def decrement_one_day(dt):
if not isinstance(dt, datetime):
if not isinstance(dt, datetime_date):
raise Exception("Expecting a date or datetime: {}".format(dt))
return dt - timedelta(days=1)
return dt.replace(hour=11) - timedelta(hours=26)
def iso8601_to_timestamp(dt):
if not isinstance(dt, datetime):
raise Exception("Expecting a date or datetime: {}".format(dt))
if dt.tzinfo:
dt = dt.astimezone(pytz.utc).replace(tzinfo=None)
if dt.microsecond:
return calendar.timegm(dt.utctimetuple()) + dt.microsecond / 1000000
return calendar.timegm(dt.utctimetuple())
def date_to_dt(date):
if not is_date(date):
raise Exception('Date must be a utc date object: {}'.format(type(date)))
return datetime(year=date.year, month=date.month, day=date.day, hour=11)
def is_date(date):
return isinstance(date, datetime_date) and not isinstance(date, datetime)
def get_heartbeats(day=None, timeout=None):
url = "https://api.wakatime.com/api/v1/users/current/heartbeats?api_key={key}&date={date}&surrounding_heartbeats=true&timeout={timeout}".format(
key=API_KEY,
date=day.strftime('%Y-%m-%d'),
timeout=timeout,
)
return requests.get(url).json()['data'], requests.get(url).json()['yesterday_heartbeat'], requests.get(url).json()['tomorrow_heartbeat']
if __name__ == "__main__":
timezone = pytz.timezone(TIMEZONE)
start = datetime.strptime(START, '%Y-%m-%d').replace(tzinfo=timezone)
end = datetime.strptime(END, '%Y-%m-%d').replace(tzinfo=timezone)
durations = get_combined_durations(start=start, end=end, timezone=timezone)
print(durations)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment