Last active
July 3, 2018 19:25
-
-
Save Orpheon/fe428ebdede815eefddd55cb36dcb284 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import json | |
import csv | |
import datetime | |
combine_channels = False | |
root = os.path.join("discorddata", "messages") | |
with open(os.path.join(root, "index.json"), "r", encoding="UTF-8") as f: | |
indexdata = json.load(f) | |
timewindow = datetime.timedelta(days=15) | |
resolution = datetime.timedelta(days=1) # if you change this below a day, the renderer needs to be changed | |
earliest_timepoint = datetime.datetime.now() | |
latest_timepoint = datetime.datetime.fromtimestamp(0) | |
print("Extracting data from csv's..") | |
channels = {} | |
for foldername, user in sorted(indexdata.items()): | |
path = os.path.join(root, foldername) | |
channeldata = json.load(open(os.path.join(path, "channel.json"))) | |
if "guild" in channeldata: | |
if combine_channels: | |
username = channeldata["guild"]["name"] | |
else: | |
username = channeldata["guild"]["name"] + " #" + user | |
elif user: | |
username = user | |
idx = username.find("#") | |
username = username[:idx] + "#XXXX" + username[idx + 5:] | |
else: | |
username = "Deleted accounts [probably garbage data]" | |
if username not in channels: | |
channels[username] = {} | |
with open(os.path.join(path, "messages.csv"), "r", encoding="UTF-8") as importcsv: | |
importcsv.readline() | |
reader = csv.reader(importcsv) | |
for row in reader: | |
timepoint = datetime.datetime.strptime(row[1][:19], "%Y-%m-%d %H:%M:%S") | |
if timepoint < earliest_timepoint: | |
earliest_timepoint = timepoint | |
if timepoint > latest_timepoint: | |
latest_timepoint = timepoint | |
n_chars = len(row[2]) | |
if timepoint in channels[username]: | |
channels[username][timepoint] += n_chars | |
else: | |
channels[username][timepoint] = n_chars | |
print("Calculating sliding window values..") | |
interpolated_channels = {} | |
for idx, (channelname, messages) in enumerate(channels.items()): | |
print("{}%".format(100*idx/len(channels))) | |
interpolated_channels[channelname] = {} | |
timepointdata = sorted(messages.items()) | |
timepoint = earliest_timepoint | |
while timepoint <= latest_timepoint: | |
accumulator = 0 | |
for message_timepoint, n_chars in timepointdata: | |
if message_timepoint > timepoint: | |
if message_timepoint - timepoint > timewindow: | |
break | |
if message_timepoint < timepoint: | |
if timepoint - message_timepoint > timewindow: | |
continue | |
accumulator += n_chars | |
interpolated_channels[channelname][timepoint.isoformat()] = accumulator | |
timepoint += resolution | |
print("Exporting to json..") | |
with open("channeltimedata.json", "w", encoding="UTF-8") as f: | |
json.dump(interpolated_channels, f, sort_keys=True, indent=4) | |
print("Exporting to csv..") | |
timepoints = sorted(list(interpolated_channels.values())[0].keys()) | |
with open("channeltimedata.csv", "w", encoding="UTF-8") as f: | |
writer = csv.writer(f) | |
writer.writerow(["Timestamp"] + sorted(interpolated_channels.keys())) | |
for timepoint in timepoints: | |
writer.writerow([timepoint] + [x[1][timepoint] for x in sorted(interpolated_channels.items())]) | |
print("Done.") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import matplotlib.pyplot as plt | |
import numpy as np | |
import json | |
with open("channeltimedata.json", "r", encoding="UTF-8") as f: | |
channeltimedata = json.load(f) | |
n_legends = 12 | |
timepoints = sorted([x[:10] for x in channeltimedata[list(channeltimedata.keys())[0]]]) | |
x = np.arange(len(timepoints)) | |
plt.xticks(x, timepoints) | |
labels = [] | |
for channel, data in channeltimedata.items(): | |
y = [i[1] for i in sorted(data.items())] | |
labels.append((max(y), plt.plot(x, y, label=channel))) | |
labels = [i[1][0] for i in sorted(labels, key=lambda x:x[0], reverse=True)[:n_legends]] | |
plt.legend(handles=labels) | |
plt.show() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment