Skip to content

Instantly share code, notes, and snippets.

@harjitmoe
Last active November 12, 2016 21:07
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save harjitmoe/fc864288621cde8bd5af3478110efa4a to your computer and use it in GitHub Desktop.
Save harjitmoe/fc864288621cde8bd5af3478110efa4a to your computer and use it in GitHub Desktop.
Generate HTML output from Google hangouts takeout data
# (c) Thomas Hori 2016.
# Generate HTML output from Google hangouts takeout data (the _g one is group chats only)
# May be redistributed under terms of Zlib license ("The zlib/libpng License" as defined by
# the Open Source Initiative), which should be affixed.
# Clarifying all mentions of my name or copyright notice with a "modified (by)" line not
# using my name shall be sufficient for plainly marking this particular script as altered.
import marshal, json, time
print ("Loading JSON")
b=json.load(open("Hangouts.json","rU"))
print ("Loaded JSON")
CV="conversation"
CVS=CV+"_state"
CVI=CV+"_id"
class Event(object):
def __init__(self, rec, id2name, defname):
self.id=rec["event_id"]
sid=rec["sender_id"]["chat_id"]
if sid not in id2name:
id2name[sid] = str(sid)
self.sender=(id2name[sid], sid)
self.timestamp=int(rec["timestamp"], 10)
#'REGULAR_CHAT_MESSAGE', 'RENAME_CONVERSATION', 'HANGOUT_EVENT', 'ADD_USER', 'REMOVE_USER'
self.type=rec["event_type"]
if self.type=="HANGOUT_EVENT":
self.content="<aside><i>%s dials a hangout call.</i></aside>\n"%self.sender[0]
elif self.type in ("ADD_USER","REMOVE_USER"):
#print rec["membership_change"]
uids=[i["chat_id"] for i in rec["membership_change"]["participant_id"]]
for uid in uids:
if uid not in id2name:
id2name[uid] = str(uid)
self.content=[(id2name[uid], uid) for uid in uids]
self.content="<aside><i>%s %s %s.</i></aside>\n"%(self.sender[0],"adds" if self.type=="ADD_USER" else "removes",", ".join([i[0] for i in self.content]))
elif self.type=="RENAME_CONVERSATION":
self.content=(rec["conversation_rename"]["new_name"].replace("&","&amp;").replace("<","&lt;").replace(">","&gt;") or defname,(rec["conversation_rename"]["old_name"] or "unnamed").replace("&","&amp;").replace("<","&lt;").replace(">","&gt;"))
if rec["conversation_rename"]["new_name"]:
self.content="<aside><i>%s renames chat to <b>%s</b> (was %s)</i></aside>\n"%((self.sender[0],)+self.content)
else:
self.content="<aside><i>%s denames chat (was %s)</i></aside>\n"%((self.sender[0],)+self.content[1:])
elif self.type=="REGULAR_CHAT_MESSAGE":
self.content=""
if "attachment" in rec["chat_message"]["message_content"]:
for att in rec["chat_message"]["message_content"]["attachment"]:
for pk,pp in att["embed_item"].items():
if "url" in pp:
self.content+='<a href="%s">%s</a>'%(pp["url"].replace('"',"&quot;"),pp["url"].replace("&","&amp;").replace("<","&lt;").replace(">","&gt;"))+"\n"
elif pk in ("type","id"): pass
else:
#print (pk)
self.content+=pk.replace("&","&amp;").replace("<","&lt;").replace(">","&gt;")+"\n"
#NOT elif
if "segment" in rec["chat_message"]["message_content"]:
for seg in rec["chat_message"]["message_content"]["segment"]:
#print (seg["type"])
#print (seg.keys())
if seg["type"]=="LINE_BREAK":
seg["text"]="\n"
esca=seg["text"].replace("&","&amp;").replace("<","&lt;").replace(">","&gt;")
if "formatting" in seg:
#if, NOT elif
if seg["formatting"]["strikethrough"]:
esca="<del>%s</del>"%esca
if seg["formatting"]["italics"]:
esca="<em>%s</em>"%esca
if seg["formatting"]["bold"]:
esca="<strong>%s</strong>"%esca
if seg["formatting"]["underline"]:
esca="<u>%s</u>"%esca
if seg["type"]=="LINK":
escra='<a href="%s">%s</a>'%(seg["link_data"]["link_target"].replace("&","&amp;").replace('"',"&quot;"),esca)
self.content+=esca
try:
bp=time.asctime(time.gmtime(self.timestamp//1000000))
except:
bp="???"
if self.content.startswith("?OTR"):
self.content="{ChatSecure redacted}"
self.content="<p>%s at %s GMT/UTC:</p>\n<blockquote>%s</blockquote>\n"%(id2name[sid],bp,self.content.replace("\n","<br />"))
else:
#print (self.type)
self.content=self.type
if type(self.content)!=bytes:
self.content=self.content.encode("utf-8")
class Conversation(object):
def __init__(self, bcvs):
self.bcvs=bcvs
if bcvs[CVS][CV]["type"]=="GROUP":
self.is_group=1
if "name" in bcvs[CVS][CV]:
self.name=bcvs[CVS][CV]["name"]
else:
self.name=""
else:
self.is_group=0
self.id2name={}
for ptd in bcvs[CVS][CV]["participant_data"]:
self.id2name[ptd["id"]["chat_id"]]=ptd["fallback_name"]
if not self.is_group:
self.name=""
self.defname=[]
for cname,name in self.id2name.items():
#if not cname.startswith(me):
self.defname.append(name)
self.defname=", ".join(sorted(self.defname))
if not self.name:
self.name=self.defname
name=self.name #antigotcha
print ("Loading events from %s"%self.name)
self.events=events=[]
buflen=0
for eve in bcvs[CVS]["event"]:
bap=Event(eve, self.id2name, self.defname)
buflen+=len(bap.content)
self.events.append(bap)
self.events.sort(key=lambda a:a.timestamp)
#Use a bytearray:
# much faster than many many independent concatenations
titlef=("<h1>%s</h1>"%self.name).encode("utf-8")
buflen+=len(titlef)
self.content=bytearray("\x01".encode("utf-8")*buflen)
bufpos=0
self.content[bufpos:bufpos+len(titlef)]=titlef
bufpos+=len(titlef)
print ("Agglomerating events from %s"%self.name)
for i in self.events:
self.content[bufpos:bufpos+len(i.content)]=i.content
bufpos+=len(i.content)
l=[]
content=gcontent='<!DOCTYPE html SYSTEM "about:legacy-compat">\n<head><meta charset="utf-8"></meta></head>\n<body>'
for conv in b[CVS]:
print ("Loading metadata")
l.append(Conversation(conv))
print ("Agglomerating chats")
for i in l:
try:
content+=i.content
except TypeError:
content+=i.content.decode("utf-8")
if i.is_group:
gcontent+=i.content.decode("utf-8")
else:
if i.is_group:
gcontent+=i.content
content+="</body></html>"
gcontent+="</body></html>"
print ("Writing HTML")
open("hang.html","w").write(content)
open("hang_g.html","w").write(gcontent)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment