Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save roman01la/b73fb8fd8e8908af385f8b1d2d0f6c39 to your computer and use it in GitHub Desktop.
Save roman01la/b73fb8fd8e8908af385f8b1d2d0f6c39 to your computer and use it in GitHub Desktop.
Extract Telegram messages from db_sqlite PostBox – made for Telegram for macOS, but should work with Telegram for iOS
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "code",
"execution_count": 1,
"id": "entertaining-teens",
"metadata": {},
"outputs": [],
"source": [
"import sqlite3\n",
"import io\n",
"import struct\n",
"import enum\n",
"import mmh3\n",
"import pprint\n",
"import datetime"
]
},
{
"cell_type": "code",
"execution_count": 2,
"id": "mobile-ability",
"metadata": {},
"outputs": [],
"source": [
"class byteutil:\n",
" def __init__(self, buffer, endian='<'):\n",
" self.endian = endian\n",
" self.buf = buffer\n",
"\n",
" def read_fmt(self, fmt):\n",
" fmt = self.endian + fmt\n",
" data = self.buf.read(struct.calcsize(fmt))\n",
" return struct.unpack(fmt, data)[0]\n",
"\n",
" def read_int8(self):\n",
" return self.read_fmt('b')\n",
" def read_uint8(self):\n",
" return self.read_fmt('B')\n",
"\n",
" def read_int32(self):\n",
" return self.read_fmt('i')\n",
" def read_uint32(self):\n",
" return self.read_fmt('I')\n",
"\n",
" def read_int64(self):\n",
" return self.read_fmt('q')\n",
" def read_uint64(self):\n",
" return self.read_fmt('Q')\n",
"\n",
" def read_bytes(self):\n",
" slen = self.read_int32()\n",
" return self.buf.read(slen)\n",
" def read_str(self):\n",
" return self.read_bytes().decode('utf-8')\n",
" \n",
" def read_short_bytes(self):\n",
" slen = self.read_uint8()\n",
" return self.buf.read(slen)\n",
" def read_short_str(self):\n",
" return self.read_short_bytes().decode('utf-8')\n",
" \n",
" def read_double(self):\n",
" return self.read_fmt('d')"
]
},
{
"cell_type": "code",
"execution_count": 3,
"id": "modular-reason",
"metadata": {},
"outputs": [],
"source": [
"def murmur(d):\n",
" # seed from telegram\n",
" return mmh3.hash(d, seed=-137723950)"
]
},
{
"cell_type": "code",
"execution_count": 4,
"id": "homeless-princess",
"metadata": {},
"outputs": [],
"source": [
"class MessageDataFlags(enum.IntFlag):\n",
" GloballyUniqueId = 1 << 0\n",
" GlobalTags = 1 << 1\n",
" GroupingKey = 1 << 2\n",
" GroupInfo = 1 << 3\n",
" LocalTags = 1 << 4\n",
" ThreadId = 1 << 5\n",
"\n",
"class FwdInfoFlags(enum.IntFlag):\n",
" SourceId = 1 << 1\n",
" SourceMessage = 1 << 2\n",
" Signature = 1 << 3\n",
" PsaType = 1 << 4\n",
" Flags = 1 << 5\n",
"\n",
"class MessageFlags(enum.IntFlag):\n",
" Unsent = 1\n",
" Failed = 2\n",
" Incoming = 4\n",
" TopIndexable = 16\n",
" Sending = 32\n",
" CanBeGroupedIntoFeed = 64\n",
" WasScheduled = 128\n",
" CountedAsIncoming = 256\n",
"\n",
"class MessageTags(enum.IntFlag):\n",
" PhotoOrVideo = 1 << 0\n",
" File = 1 << 1\n",
" Music = 1 << 2\n",
" WebPage = 1 << 3\n",
" VoiceOrInstantVideo = 1 << 4\n",
" UnseenPersonalMessage = 1 << 5\n",
" LiveLocation = 1 << 6\n",
" Gif = 1 << 7\n",
" Photo = 1 << 8\n",
" Video = 1 << 9\n",
" Pinned = 1 << 10"
]
},
{
"cell_type": "code",
"execution_count": 5,
"id": "attached-evidence",
"metadata": {},
"outputs": [],
"source": [
"class MessageIndex:\n",
" def __init__(self, peerId, namespace, mid, timestamp):\n",
" self.peerId = peerId\n",
" self.namespace = namespace\n",
" self.id = mid\n",
" self.timestamp = timestamp\n",
" \n",
" @classmethod\n",
" def from_bytes(cls, b):\n",
" bio = byteutil(io.BytesIO(b), endian='>')\n",
" peerId = bio.read_int64()\n",
" namespace = bio.read_int32()\n",
" timestamp = bio.read_int32()\n",
" mid = bio.read_int32()\n",
" return cls(peerId, namespace, mid, timestamp)\n",
"\n",
" def as_bytes(self):\n",
" return struct.pack('>qiii', self.peerId, self.namespace, self.timestamp, self.id)\n",
" \n",
" def __repr__(self):\n",
" return f'ns:{self.namespace} pr:{self.peerId} id:{self.id} ts:{self.timestamp}'\n"
]
},
{
"cell_type": "code",
"execution_count": 6,
"id": "endangered-washer",
"metadata": {},
"outputs": [],
"source": [
"def get_peer(peer_id, cache={}):\n",
" if peer_id in cache:\n",
" return cache[peer_id]\n",
" cur = con.cursor() \n",
" try:\n",
" cur.execute(\"SELECT value FROM t2 WHERE key = ? ORDER BY key LIMIT 1\", (peer_id,))\n",
" v = cur.fetchone()\n",
" if v is None:\n",
" cache[peer_id] = None\n",
" return None\n",
" data = PostboxDecoder(v[0]).decodeRootObject()\n",
" cache[peer_id] = data\n",
" return data\n",
" finally:\n",
" cur.close()\n",
"\n",
"def get_ref_media(ns, mid, cache={}):\n",
" key = (ns, mid)\n",
" if key in cache:\n",
" return cache[key]\n",
" rawKey = struct.pack('>iq', ns, mid)\n",
"\n",
" cur = con.cursor() \n",
" try:\n",
" cur.execute(\"SELECT value FROM t6 WHERE key = ? ORDER BY key LIMIT 1\", (rawKey,))\n",
" v = cur.fetchone()\n",
" if v is None:\n",
" cache[key] = None\n",
" return None\n",
"\n",
" data = v[0]\n",
" bio = byteutil(io.BytesIO(data))\n",
" data = read_media_entry(key, bio)\n",
" cache[key] = data\n",
" refcnt = bio.read_int32()\n",
" return data\n",
" finally:\n",
" cur.close()\n",
"\n",
"def get_message(idx: MessageIndex):\n",
" cur = con.cursor() \n",
" try:\n",
" cur.execute(\"SELECT value FROM t7 WHERE key = ? ORDER BY key LIMIT 1\", (idx.as_bytes(),))\n",
" v = cur.fetchone()\n",
" if v is None:\n",
" return None\n",
" return read_intermediate_message(v[0])\n",
" finally:\n",
" cur.close()"
]
},
{
"cell_type": "code",
"execution_count": 7,
"id": "appropriate-leeds",
"metadata": {},
"outputs": [],
"source": [
"def get_all_messages(f=None, decode=True):\n",
" cur = con.cursor()\n",
" try:\n",
" cur.execute(\"SELECT key, value FROM t7 ORDER BY key\")\n",
" for key, value in cur:\n",
" idx = MessageIndex.from_bytes(key)\n",
"\n",
" # apply filter func\n",
" if f is not None and not f(idx):\n",
" continue\n",
"\n",
" if decode:\n",
" msg = read_intermediate_message(value)\n",
" else:\n",
" msg = value\n",
" yield idx, msg\n",
" finally:\n",
" cur.close()"
]
},
{
"cell_type": "code",
"execution_count": 8,
"id": "secure-pride",
"metadata": {},
"outputs": [],
"source": [
"class MediaEntryType(enum.Enum):\n",
" Direct = 0\n",
" MessageReference = 1\n",
"\n",
"def read_media_entry(key, bio):\n",
" typ = MediaEntryType(bio.read_uint8())\n",
" if typ == MediaEntryType.Direct:\n",
" data = bio.read_bytes()\n",
" data = PostboxDecoder(data).decodeRootObject()\n",
" return data\n",
" elif typ == MediaEntryType.MessageReference:\n",
" idPeerId = bio.read_int64()\n",
" idNamespace = bio.read_int32()\n",
" idId = bio.read_int32()\n",
" idTimestamp = bio.read_int32()\n",
" idx = MessageIndex(idPeerId, idNamespace, idId, idTimestamp)\n",
" msg = get_message(idx)\n",
" for m in msg['embeddedMedia']:\n",
" if hasattr(m, 'mediaId') and m.mediaId == key:\n",
" return m\n",
" raise Exception(f'refrerenced media not found in message {idx} {key}')\n",
" else:\n",
" raise Exception(f'invalid mediaentrytype {typ}')"
]
},
{
"cell_type": "code",
"execution_count": 9,
"id": "fancy-electricity",
"metadata": {},
"outputs": [],
"source": [
"def peer_str(peerId):\n",
" peer = get_peer(peerId)\n",
" if peer is None:\n",
" return f\"unknown peer {peerId}\"\n",
" if 'fn' in peer:\n",
" peerName = f\"{peer.get('fn', '')} {peer.get('ln', '')} \"\n",
" elif 't' in peer:\n",
" peerName = peer.get('t', '')\n",
" else:\n",
" peerName = 'WARN: UNK NAME'\n",
" return f\"{peerName} (@{peer.get('un', '')} {peerId})\"\n",
"\n",
"def print_media(m, html_mode=False):\n",
" \"\"\" returns - referenced media, hadWarn \"\"\"\n",
" if isinstance(m, TelegramMediaFile):\n",
" res = m.resource\n",
" if not isinstance(res, CloudDocumentMediaResource):\n",
" print(f\"!!! WARN: has file without resource\")\n",
" return None, True\n",
" if html_mode:\n",
" fn = res.fileName or \"\"\n",
" mt = m.mimeType\n",
" if mt.startswith('video/'):\n",
" print(f'<video controls><source src=\"media/{html.escape(res.uniqueId)}\" type=\"{html.escape(mt)}\"/></source></video>')\n",
" elif mt.startswith('image/'):\n",
" print(f'<img src=\"media/{html.escape(res.uniqueId)}\"/>')\n",
" print(f'%%% file <a download=\"{html.escape(fn)}\" href=\"media/{html.escape(res.uniqueId)}\">fn:{res.fileName}</a> mt:{m.mimeType} {res.uniqueId}')\n",
" else:\n",
" print(f\"%%% file fn:{res.fileName} mt:{m.mimeType} {res.uniqueId}\")\n",
" return res.uniqueId, False\n",
" elif isinstance(m, TelegramMediaImage):\n",
" reps = [rep for rep in m.representations if isinstance(rep, TelegramMediaImageRepresentation)]\n",
" reps.sort(key=lambda x: x.height * x.width, reverse=True)\n",
" rep = reps[0] if reps else None\n",
" if rep is None:\n",
" print(f\"!!! WARN: has image without representation[0]\")\n",
" return True\n",
" res = rep.resource\n",
" if not isinstance(res, CloudPhotoSizeMediaResource):\n",
" print(f\"!!! WARN: has image without representation[0].resource\")\n",
" return None, True\n",
" if html_mode:\n",
" print(f'<img src=\"media/{html.escape(res.uniqueId)}\" height=\"{rep.height}\" width=\"{rep.width}\"/>')\n",
" else:\n",
" print(f\"%%% image {res.uniqueId}\")\n",
" return res.uniqueId, False\n",
" elif isinstance(m, TelegramMediaWebpage):\n",
" url = m.url or m.pendingUrl\n",
" if html_mode:\n",
" print(f'%%% webpage for <a href=\"{html.escape(url)}\">{url}</a>')\n",
" else:\n",
" print(f\"%%% webpage for {url}\")\n",
" elif isinstance(m, TelegramMediaAction):\n",
" print(f\"%%% action {m}\")\n",
" else:\n",
" print(f\"%%% unknown media {m}\")\n",
" return None, True\n",
" \n",
" return None, False\n",
"\n",
"def print_message(idx, msg, html_mode=False):\n",
" \"\"\" returns -- set of references, hadWarn \"\"\"\n",
" hadWarn = False\n",
" references = set()\n",
"\n",
" direction = '<-' if MessageFlags.Incoming in msg['flags'] else '->'\n",
" ts = datetime.datetime.fromtimestamp(idx.timestamp).isoformat()\n",
" print(f'=== {direction} {ts} peer:{idx.peerId} id:{idx.id}')\n",
"\n",
" print(f\"=== {peer_str(msg['authorId'])}\")\n",
"\n",
" fwd = msg['fwd']\n",
" if fwd is not None:\n",
" fwdDate = datetime.datetime.fromtimestamp(fwd['date']).isoformat()\n",
" print(f\"=== fwd {fwdDate} from {peer_str(fwd['author'])}\")\n",
"\n",
" for m in msg['embeddedMedia']:\n",
" ref, w = print_media(m, html_mode)\n",
" hadWarn = w or hadWarn\n",
" if ref:\n",
" references.add(ref)\n",
"\n",
" for mref in msg[\"referencedMediaIds\"]:\n",
" m = get_ref_media(*mref)\n",
" if m is None:\n",
" print(f\"!!! WARN: media reference not found\")\n",
" hadWarn = True\n",
" continue\n",
" ref, w = print_media(m, html_mode)\n",
" hadWarn = w or hadWarn\n",
" if ref:\n",
" references.add(ref)\n",
"\n",
" if msg['text']:\n",
" print(msg['text'])\n",
" \n",
" print()\n",
"\n",
" return references, hadWarn\n"
]
},
{
"cell_type": "code",
"execution_count": 10,
"id": "parental-integer",
"metadata": {},
"outputs": [],
"source": [
"def read_intermediate_fwd_info(buf):\n",
" infoFlags = FwdInfoFlags(buf.read_int8())\n",
" if infoFlags == 0:\n",
" return None\n",
"\n",
" authorId = buf.read_int64()\n",
" date = buf.read_int32()\n",
"\n",
" sourceId = None\n",
" if FwdInfoFlags.SourceId in infoFlags:\n",
" sourceId = buf.read_int64()\n",
"\n",
" sourceMessagePeerId = None\n",
" sourceMessageNamespace = None\n",
" sourceMessageIdId = None\n",
" if FwdInfoFlags.SourceMessage in infoFlags:\n",
" sourceMessagePeerId = buf.read_int64()\n",
" sourceMessageNamespace = buf.read_int32()\n",
" sourceMessageIdId = buf.read_int32()\n",
" \n",
" signature = None\n",
" if FwdInfoFlags.Signature in infoFlags:\n",
" signature = buf.read_str()\n",
" \n",
" psaType = None\n",
" if FwdInfoFlags.PsaType in infoFlags:\n",
" psaType = buf.read_str()\n",
" \n",
" flags = None\n",
" if FwdInfoFlags.Flags in infoFlags:\n",
" flags = buf.read_int32()\n",
" \n",
" return {\n",
" 'author': authorId,\n",
" 'date': date,\n",
" 'srcId': sourceId,\n",
" 'srcMsgPeer': sourceMessagePeerId,\n",
" 'srcMsgNs': sourceMessageNamespace,\n",
" 'srcMsgId': sourceMessageIdId,\n",
" 'signature': signature,\n",
" 'psaType': psaType,\n",
" 'flags': flags,\n",
" }"
]
},
{
"cell_type": "code",
"execution_count": 11,
"id": "greek-consequence",
"metadata": {},
"outputs": [],
"source": [
"def read_intermediate_message(v: bytes):\n",
" buf = byteutil(io.BytesIO(v))\n",
" typ = buf.read_int8()\n",
" if typ != 0:\n",
" print(f'wtf, type not 0 but {typ}')\n",
" return None\n",
"\n",
" stableId = buf.read_uint32()\n",
" stableVer = buf.read_uint32()\n",
" \n",
" dataFlags = MessageDataFlags(buf.read_uint8()) # int8 in swift\n",
" \n",
" globallyUniqueId = None\n",
" if MessageDataFlags.GloballyUniqueId in dataFlags:\n",
" globallyUniqueId = buf.read_int64()\n",
" \n",
" globalTags = None\n",
" if MessageDataFlags.GlobalTags in dataFlags:\n",
" globalTags = buf.read_uint32()\n",
" \n",
" groupingKey = None\n",
" if MessageDataFlags.GroupingKey in dataFlags:\n",
" groupingKey = buf.read_int64()\n",
" \n",
" groupInfoStableId = None\n",
" if MessageDataFlags.GroupInfo in dataFlags:\n",
" groupInfoStableId = buf.read_uint32()\n",
"\n",
" localTagsVal = None\n",
" if MessageDataFlags.LocalTags in dataFlags:\n",
" localTagsVal = buf.read_uint32()\n",
" \n",
" threadId = None\n",
" if MessageDataFlags.ThreadId in dataFlags:\n",
" threadId = buf.read_int64()\n",
" \n",
" flags = MessageFlags(buf.read_uint32())\n",
" tags = MessageTags(buf.read_uint32())\n",
" \n",
" fwd_info = read_intermediate_fwd_info(buf)\n",
"\n",
" authorId = None\n",
" hasAuthorId = buf.read_int8()\n",
" if hasAuthorId == 1:\n",
" authorId = buf.read_int64()\n",
" \n",
" text = buf.read_str()\n",
"# print(text)\n",
"\n",
" attributesCount = buf.read_int32()\n",
" attributes = [None]*attributesCount\n",
"# print(f'attributesCount: {attributesCount}')\n",
"\n",
" for i in range(attributesCount):\n",
" attributes[i] = PostboxDecoder(buf.read_bytes()).decodeRootObject()\n",
"# print(f'attributes: {len(attributes[i])}', attributes[i])\n",
"\n",
" embeddedMediaCount = buf.read_int32()\n",
" embeddedMedia = [None]*embeddedMediaCount\n",
"# print(f'embeddedMediaCount: {embeddedMediaCount}')\n",
"\n",
" for i in range(embeddedMediaCount):\n",
" embeddedMedia[i] = PostboxDecoder(buf.read_bytes()).decodeRootObject()\n",
"# print(f'embeddedMedia: {len(embeddedMedia[i])}', embeddedMedia[i])\n",
" \n",
" referencedMediaIds = []\n",
" referencedMediaIdsCount = buf.read_int32()\n",
" for _ in range(referencedMediaIdsCount):\n",
" idNamespace = buf.read_int32()\n",
" idId = buf.read_int64()\n",
"\n",
" referencedMediaIds.append((idNamespace, idId))\n",
"\n",
" leftover = buf.buf.read()\n",
" if leftover != b'' and leftover != b'\\0'*4:\n",
" print('huh, y no empty', leftover)\n",
" \n",
" return {\n",
" 'flags': flags,\n",
" 'tags': tags,\n",
" 'authorId': authorId,\n",
" 'fwd': fwd_info,\n",
" 'text': text,\n",
" 'referencedMediaIds': referencedMediaIds,\n",
" 'embeddedMedia': embeddedMedia,\n",
" 'attributes': attributes,\n",
" }\n"
]
},
{
"cell_type": "code",
"execution_count": 12,
"id": "conceptual-eating",
"metadata": {},
"outputs": [],
"source": [
"class PostboxDecoder:\n",
" registry = {}\n",
" \n",
" @classmethod\n",
" def registerDecoder(cls, t):\n",
" cls.registry[murmur(t.__name__)] = t\n",
" return t\n",
"\n",
" class ValueType(enum.Enum):\n",
" Int32 = 0\n",
" Int64 = 1\n",
" Bool = 2\n",
" Double = 3\n",
" String = 4\n",
" Object = 5\n",
" Int32Array = 6\n",
" Int64Array = 7\n",
" ObjectArray = 8\n",
" ObjectDictionary = 9\n",
" Bytes = 10\n",
" Nil = 11\n",
" StringArray = 12\n",
" BytesArray = 13\n",
" \n",
" def __init__(self, data):\n",
" self.bio = byteutil(io.BytesIO(data), endian='<')\n",
" self.size = len(data)\n",
"\n",
" def decodeRootObject(self):\n",
" return self.decodeObjectForKey('_')\n",
"\n",
" def decodeObjectForKey(self, key):\n",
" t, v = self.get(self.ValueType.Object, key)\n",
" if v:\n",
" return v\n",
"\n",
" def get(self, valueType, key, decodeObjects=None):\n",
" for k, t, v in self._iter_kv(decodeObjects=decodeObjects):\n",
" if k != key:\n",
" pass\n",
" elif valueType == None:\n",
" return t, v\n",
" elif t == valueType:\n",
" return t, v\n",
" elif t == self.ValueType.Nil:\n",
" return t, None\n",
" return None, None\n",
" \n",
" def _iter_kv(self, decodeObjects=None, registry=None):\n",
" self.bio.buf.seek(0, io.SEEK_SET)\n",
" while True:\n",
" pos = self.bio.buf.tell()\n",
" if pos >= self.size:\n",
" break\n",
" \n",
" key = self.bio.read_short_str()\n",
" valueType, value = self.readValue(decodeObjects=decodeObjects, registry=registry)\n",
" yield key, valueType, value\n",
"\n",
" def _readObject(self, decode=None, registry=None):\n",
" if decode is None:\n",
" decode = True\n",
" if registry is None:\n",
" registry = self.registry\n",
"\n",
" typeHash = self.bio.read_int32()\n",
" dataLen = self.bio.read_int32()\n",
" data = self.bio.buf.read(dataLen)\n",
"\n",
" if not decode:\n",
" value = {'type': typeHash, 'data': data}\n",
" elif typeHash in self.registry:\n",
" decoder = self.__class__(data)\n",
" value = self.registry[typeHash](decoder)\n",
" else:\n",
" decoder = self.__class__(data)\n",
" value = {k: v for k, t, v in decoder._iter_kv()}\n",
"# value['@raw'] = data\n",
" value['@type'] = typeHash\n",
"\n",
" return value\n",
"\n",
" def readValue(self, decodeObjects=None, registry=None):\n",
" valueType = self.ValueType(self.bio.read_uint8())\n",
" value = None\n",
" \n",
" objectArgs = {'decode': decodeObjects, 'registry': registry}\n",
"\n",
" if valueType == self.ValueType.Int32:\n",
" value = self.bio.read_int32()\n",
" elif valueType == self.ValueType.Int64:\n",
" value = self.bio.read_int64()\n",
" elif valueType == self.ValueType.Bool:\n",
" value = self.bio.read_uint8() != 0\n",
" elif valueType == self.ValueType.Double:\n",
" value = self.bio.read_double()\n",
" elif valueType == self.ValueType.String:\n",
" value = self.bio.read_str()\n",
" elif valueType == self.ValueType.Object:\n",
" value = self._readObject(**objectArgs)\n",
" elif valueType == self.ValueType.Int32Array:\n",
" alen = self.bio.read_int32()\n",
" value = [None]*alen\n",
" for i in range(alen):\n",
" value[i] = self.bio.read_int32()\n",
" elif valueType == self.ValueType.Int64Array:\n",
" alen = self.bio.read_int32()\n",
" value = [None]*alen\n",
" for i in range(alen):\n",
" value[i] = self.bio.read_int64()\n",
" elif valueType == self.ValueType.ObjectArray:\n",
" alen = self.bio.read_int32()\n",
" value = [None]*alen\n",
" for i in range(alen):\n",
" value[i] = self._readObject(**objectArgs)\n",
" elif valueType == self.ValueType.ObjectDictionary:\n",
" dlen = self.bio.read_int32()\n",
" value = [None]*dlen\n",
" for i in range(dlen):\n",
" dkey = self._readObject(**objectArgs)\n",
" dval = self._readObject(**objectArgs)\n",
" value[i] = (dkey, dval)\n",
" elif valueType == self.ValueType.Bytes:\n",
" value = self.bio.read_bytes()\n",
" elif valueType == self.ValueType.Nil:\n",
" pass # Nil is None\n",
" elif valueType == self.ValueType.StringArray:\n",
" alen = self.bio.read_int32()\n",
" value = [None]*alen\n",
" for i in range(alen):\n",
" value[i] = self.bio.read_str()\n",
" elif valueType == self.ValueType.BytesArray:\n",
" alen = self.bio.read_int32()\n",
" value = [None]*alen\n",
" for i in range(alen):\n",
" value[i] = self.bio.read_bytes()\n",
" else:\n",
" raise Exception('unknown value type')\n",
" return valueType, value"
]
},
{
"cell_type": "code",
"execution_count": 13,
"id": "endless-director",
"metadata": {},
"outputs": [],
"source": [
"class Decodeable:\n",
" def __init__(self, dec):\n",
" for field, v in self.FIELDS.items():\n",
" key = v[0]\n",
" typ = v[1]\n",
" _, val = dec.get(typ, key)\n",
" setattr(self, field, val)\n",
"\n",
" def __repr__(self):\n",
" return repr(self.__dict__)\n",
"\n",
"@PostboxDecoder.registerDecoder\n",
"class TelegramMediaImage(Decodeable):\n",
" FIELDS = {\n",
" 'imageId': ('i', PostboxDecoder.ValueType.Bytes),\n",
" 'representations': ('r', PostboxDecoder.ValueType.ObjectArray),\n",
" 'videoRepresentations': ('vr', PostboxDecoder.ValueType.ObjectArray),\n",
" 'immediateThumbnailData': ('itd', PostboxDecoder.ValueType.Bytes),\n",
" 'reference': ('rf', PostboxDecoder.ValueType.Object),\n",
" 'partialReference': ('prf', PostboxDecoder.ValueType.Object),\n",
" 'flags': ('fl', PostboxDecoder.ValueType.Int32),\n",
" }\n",
" \n",
" def __init__(self, dec):\n",
" super().__init__(dec)\n",
" bio = byteutil(io.BytesIO(self.imageId))\n",
" self.imageId = (bio.read_int32(), bio.read_int64())\n",
" \n",
" @property\n",
" def mediaId(self):\n",
" return self.imageId\n",
" \n",
"@PostboxDecoder.registerDecoder\n",
"class TelegramMediaImageRepresentation(Decodeable):\n",
" FIELDS = {\n",
" 'width': ('dx', PostboxDecoder.ValueType.Int32),\n",
" 'height': ('dy', PostboxDecoder.ValueType.Int32),\n",
" 'resource': ('r', PostboxDecoder.ValueType.Object),\n",
" 'progressiveSizes': ('ps', PostboxDecoder.ValueType.Int32Array),\n",
" }\n",
"\n",
"@PostboxDecoder.registerDecoder\n",
"class CloudPhotoSizeMediaResource(Decodeable):\n",
" FIELDS = {\n",
" 'datacenterId': ('d', PostboxDecoder.ValueType.Int32),\n",
" 'photoId': ('i', PostboxDecoder.ValueType.Int64),\n",
" 'accessHash': ('h', PostboxDecoder.ValueType.Int64),\n",
" 'sizeSpec': ('s', PostboxDecoder.ValueType.String),\n",
" 'size': ('n', PostboxDecoder.ValueType.Int32),\n",
" 'fileReference': ('fr', PostboxDecoder.ValueType.Bytes)\n",
" }\n",
"\n",
" @property\n",
" def uniqueId(self):\n",
" return f\"telegram-cloud-photo-size-{self.datacenterId}-{self.photoId}-{self.sizeSpec}\"\n",
"\n",
"@PostboxDecoder.registerDecoder\n",
"class CloudDocumentMediaResource(Decodeable):\n",
" FIELDS = {\n",
" 'datacenterId': ('d', PostboxDecoder.ValueType.Int32),\n",
" 'fileId': ('f', PostboxDecoder.ValueType.Int64),\n",
" 'accessHash': ('a', PostboxDecoder.ValueType.Int64),\n",
" 'size': ('n', PostboxDecoder.ValueType.Int32),\n",
" 'fileReference': ('fr', PostboxDecoder.ValueType.Bytes),\n",
" 'fileName': ('fn', PostboxDecoder.ValueType.String)\n",
" }\n",
"\n",
" @property\n",
" def uniqueId(self):\n",
" return f\"telegram-cloud-document-{self.datacenterId}-{self.fileId}\"\n",
"\n",
"\n",
"@PostboxDecoder.registerDecoder\n",
"class TelegramMediaFile(Decodeable):\n",
" FIELDS = {\n",
" 'fileId': ('i', PostboxDecoder.ValueType.Bytes),\n",
" 'partialReference': ('prf', PostboxDecoder.ValueType.Object),\n",
" 'resource': ('r', PostboxDecoder.ValueType.Object),\n",
" 'previewRepresentations': ('pr', PostboxDecoder.ValueType.ObjectArray),\n",
" 'videoThumbnails': ('vr', PostboxDecoder.ValueType.ObjectArray),\n",
" 'immediateThumbnailData': ('itd', PostboxDecoder.ValueType.Bytes),\n",
" 'mimeType': ('mt', PostboxDecoder.ValueType.String),\n",
" 'size': ('s', PostboxDecoder.ValueType.Int32),\n",
" 'attributes': ('at', PostboxDecoder.ValueType.ObjectArray)\n",
" }\n",
" \n",
" def __init__(self, dec):\n",
" super().__init__(dec)\n",
" bio = byteutil(io.BytesIO(self.fileId))\n",
" self.fileId = (bio.read_int32(), bio.read_int64())\n",
" \n",
" @property\n",
" def mediaId(self):\n",
" return self.fileId\n",
"\n",
"\n",
"@PostboxDecoder.registerDecoder\n",
"class TelegramMediaWebpage(Decodeable):\n",
" FIELDS = {\n",
" 'webpageId': ('i', PostboxDecoder.ValueType.Bytes),\n",
" 'pendingUrl': ('pendingUrl', PostboxDecoder.ValueType.String),\n",
" 'url': ('u', PostboxDecoder.ValueType.String),\n",
" }\n",
" \n",
" def __init__(self, dec):\n",
" super().__init__(dec)\n",
" bio = byteutil(io.BytesIO(self.webpageId))\n",
" self.webpageId = (bio.read_int32(), bio.read_int64())\n",
" \n",
" @property\n",
" def mediaId(self):\n",
" return self.webpageId\n",
"\n",
"@PostboxDecoder.registerDecoder\n",
"class TelegramMediaAction:\n",
" class Type(enum.Enum):\n",
" unknown = 0\n",
" groupCreated = 1\n",
" addedMembers = 2\n",
" removedMembers = 3\n",
" photoUpdated = 4\n",
" titleUpdated = 5\n",
" pinnedMessageUpdated = 6\n",
" joinedByLink = 7\n",
" channelMigratedFromGroup = 8\n",
" groupMigratedToChannel = 9\n",
" historyCleared = 10\n",
" historyScreenshot = 11\n",
" messageAutoremoveTimeoutUpdated = 12\n",
" gameScore = 13\n",
" phoneCall = 14\n",
" paymentSent = 15\n",
" customText = 16\n",
" botDomainAccessGranted = 17\n",
" botSentSecureValues = 18\n",
" peerJoined = 19\n",
" phoneNumberRequest = 20\n",
" geoProximityReached = 21\n",
" groupPhoneCall = 22\n",
" inviteToGroupPhoneCall = 23\n",
" \n",
" def __init__(self, dec):\n",
" raw = {k: v for k, t, v in dec._iter_kv()}\n",
" self.type = self.Type(raw.get('_rawValue', 0))\n",
" if '_rawValue' in raw:\n",
" del raw['_rawValue']\n",
" self.payload = raw\n",
"\n",
" def __repr__(self):\n",
" return f\"{self.type} {self.payload}\"\n"
]
},
{
"cell_type": "markdown",
"id": "antique-strategy",
"metadata": {},
"source": [
"### example\n",
"1. Find incoming message by text and date interval, then find all messages in the chat containing that message\n",
"2. Get peer info by id\n",
"3. Decrypt tempkey from file"
]
},
{
"cell_type": "code",
"execution_count": 14,
"id": "sealed-treasurer",
"metadata": {},
"outputs": [],
"source": [
"con = sqlite3.connect('plaintext.db')"
]
},
{
"cell_type": "code",
"execution_count": 15,
"id": "vocational-habitat",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"=== <= 2021-04-28T13:50:54 peer:9596437714 id:159\n",
"=== Durov's Channel (@durov 9596437714)\n",
"%%% file fn:webversion.mp4 mt:video/mp4 telegram-cloud-document-1-4922901968625599114\n",
"Really excited about the recently launched web versions of Telegram https://webk.telegram.org and https://webz.telegram.org 🎉\n",
"\n",
"They are light years ahead of what any other social media service has to offer on the web: fast, slick, fluid, light, feature-rich. To make them 100% complete in features, we are currently testing a functional version of web-based video calls internally, which will be added soon.\n",
"\n",
"WebK and WebZ are by far the most cross-platform versions of Telegram we shipped so far - you can instantly access your chats from both mobile and desktop directly from your web browser. No downloads, no installs. \n",
"\n",
"This is particularly good for corporate environments where installing native apps is not always allowed, but also good for users who like the instant nature of web sites.\n",
"\n"
]
}
],
"source": [
"for idx, msg in get_all_messages(f=lambda idx: idx.timestamp > 1619557200):\n",
" if MessageFlags.Incoming in msg['flags'] and 'web versions of Telegram' in msg['text']:\n",
" print_message(idx, msg)\n",
" break"
]
},
{
"cell_type": "code",
"execution_count": 16,
"id": "twelve-windows",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"=== <= 2021-04-03T16:02:02 peer:9596437714 id:156\n",
"=== Durov's Channel (@durov 9596437714)\n",
"🎂 My Mom’s turning 70 today. She is the main reason I am who I am today. In school I was a self-willed kid that often clashed with teachers. My mom always supported me - she never sided with anybody but her sons. \n",
"\n",
"She is kind and full of energy, but also one of the smartest and wisest people I know. Born in a princely family that had been deported to Siberia from Kiev during the October Revolution, she studied in Russia's best universities, lived in Germany and then Italy where she educated students. \n",
"\n",
"Happy birthday, Mom! We love you ❤️\n",
"\n",
"=== <= 2021-04-03T16:03:51 peer:9596437714 id:157\n",
"=== Durov's Channel (@durov 9596437714)\n",
"%%% image telegram-cloud-photo-size-1-5134133047724189882-y\n",
"My Mom Albina a few decades ago and myself in the early 90s. Do I look like Mom?\n",
"\n",
"=== <= 2021-04-03T16:03:51 peer:9596437714 id:158\n",
"=== Durov's Channel (@durov 9596437714)\n",
"%%% image telegram-cloud-photo-size-1-5134611897922988248-y\n",
"\n",
"=== <= 2021-04-28T13:50:54 peer:9596437714 id:159\n",
"=== Durov's Channel (@durov 9596437714)\n",
"%%% file fn:webversion.mp4 mt:video/mp4 telegram-cloud-document-1-4922901968625599114\n",
"Really excited about the recently launched web versions of Telegram https://webk.telegram.org and https://webz.telegram.org 🎉\n",
"\n",
"They are light years ahead of what any other social media service has to offer on the web: fast, slick, fluid, light, feature-rich. To make them 100% complete in features, we are currently testing a functional version of web-based video calls internally, which will be added soon.\n",
"\n",
"WebK and WebZ are by far the most cross-platform versions of Telegram we shipped so far - you can instantly access your chats from both mobile and desktop directly from your web browser. No downloads, no installs. \n",
"\n",
"This is particularly good for corporate environments where installing native apps is not always allowed, but also good for users who like the instant nature of web sites.\n",
"\n",
"=== <= 2021-04-28T13:58:33 peer:9596437714 id:160\n",
"=== Durov's Channel (@durov 9596437714)\n",
"%%% file fn:None mt:video/mp4 telegram-cloud-document-1-4922901968625599116\n",
"📹 Speaking of video calls, we will be adding a video dimension to our voice chats in May, making Telegram a powerful platform for group video calls. Screen sharing, encryption, noise-cancellation, desktop and tablet support – everything you can expect from a modern video conferencing tool, but with Telegram-level UI, speed and encryption. Stay tuned!\n",
"\n"
]
}
],
"source": [
"for idx, msg in get_all_messages(f=lambda idx: idx.peerId == 9596437714 and idx.timestamp > 1617224400):\n",
" print_message(idx, msg)"
]
},
{
"cell_type": "code",
"execution_count": 17,
"id": "filled-testimony",
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"{'i': 9596437714,\n",
" 'ah': -202169186454809330,\n",
" 'aht': 0,\n",
" 't': \"Durov's Channel\",\n",
" 'un': 'durov',\n",
" 'ph': [{'width': 80, 'height': 80, 'resource': {'d': 1, 'p': 155759888548607294, 's': 0, 'v': None, 'l': None, '@type': 923090569}, 'progressiveSizes': []},\n",
" {'width': 640, 'height': 640, 'resource': {'d': 1, 'p': 155759888548607294, 's': 1, 'v': None, 'l': None, '@type': 923090569}, 'progressiveSizes': []}],\n",
" 'd': 1449660337,\n",
" 'v': 0,\n",
" 'ps': 0,\n",
" 'i.t': 0,\n",
" 'i.f': 2,\n",
" 'fl': 1,\n",
" 'ri': None,\n",
" 'ar': None,\n",
" 'br': None,\n",
" 'dbr': None,\n",
" '@type': 1667961306}"
]
},
"execution_count": 17,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"get_peer(9596437714)"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "19fc8750",
"metadata": {},
"outputs": [],
"source": [
"# html export example\n",
"all_refs = set()\n",
"\n",
"print('<html><head><title>Telegram Export</title></head><body>')\n",
"print('<pre>')\n",
"for idx, msg in get_all_messages():\n",
" break\n",
" r, _ = print_message(idx, msg, html_mode=True)\n",
" if r:\n",
" all_refs.update(r)\n",
"print('</pre>')\n",
"\n",
"print('references: <code>')\n",
"print(html.escape(json.dumps(list(all_refs))))\n",
"print('</code>')\n",
"print('</body></html>')"
]
},
{
"cell_type": "code",
"execution_count": 18,
"id": "reflected-operator",
"metadata": {},
"outputs": [],
"source": [
"con.close()"
]
},
{
"cell_type": "markdown",
"id": "cheap-child",
"metadata": {},
"source": [
"## how to decrypt db\n",
"\n",
"Open db_sqlite database with sqlcipher:\n",
"```sh\n",
"$ sqlcipher db_sqlite\n",
"```\n",
"\n",
"Run following code\n",
"```sql\n",
"PRAGMA cipher_plaintext_header_size=32;\n",
"PRAGMA cipher_default_plaintext_header_size=32;\n",
"PRAGMA key=\"x'KEY_FROM_TEMPKEY'\";\n",
"\n",
"PRAGMA user_version; -- should be 4 now\n",
"\n",
"-- empty key will disable encryption\n",
"ATTACH DATABASE 'plaintext.db' AS plaintext KEY '';\n",
"SELECT sqlcipher_export('plaintext');\n",
"DETACH DATABASE plaintext;\n",
"```\n",
"\n",
"To create decrypted sqlite dump \n",
"Run following cell to get KEY_FROM_TEMPKEY\n",
"\n",
"This notebook assumes decrypted db \n",
"Also, this notebook was tested with SQLite PRAGMA user_version 4\n",
"and metadata UserVersion 25:\n",
"\n",
"```sql\n",
"-- t0 is MetadataTable, key=1 is UserVersion\n",
"select hex(value) from t0 where key = 1;\n",
"-- 19000000 = 0x19 = 25\n",
"```\n",
"\n",
"\n",
"oh, and cached attachments can be retrieved from postbox/media \n",
"```sh\n",
"$ file postbox/media/telegram-cloud-document-1-4922901968625599114\n",
"postbox/media/telegram-cloud-document-1-4922901968625599114: ISO Media, MP4 v2 [ISO 14496-14]\n",
"```"
]
},
{
"cell_type": "code",
"execution_count": 19,
"id": "english-loading",
"metadata": {},
"outputs": [],
"source": [
"# install pycryptodome or pycryptodomex\n",
"\n",
"try:\n",
" from Cryptodome.Hash import SHA512\n",
" from Cryptodome.Cipher import AES\n",
"except ImportError:\n",
" from Cryptodome.Hash import SHA512\n",
" from Cryptodome.Cipher import AES\n",
"\n",
"import binascii"
]
},
{
"cell_type": "code",
"execution_count": 20,
"id": "inner-stuff",
"metadata": {},
"outputs": [],
"source": [
"DEFAULT_PASSWORD = 'no-matter-key'\n",
"\n",
"def tempkey_kdf(password):\n",
" h = SHA512.new()\n",
" h.update(password.encode('utf-8')) # never tried on non-ascii passwords tho\n",
" digest = h.digest()\n",
" key, iv = digest[0:32], digest[-16:]\n",
" return key, iv\n",
"\n",
"def tempkey_parse(dataEnc, pwd):\n",
" aesKey, aesIV = tempkey_kdf(DEFAULT_PASSWORD)\n",
" cipher = AES.new(key=aesKey, iv=aesIV, mode=AES.MODE_CBC)\n",
" data = cipher.decrypt(dataEnc)\n",
"\n",
" dbKey = data[0:32]\n",
" dbSalt = data[32:48]\n",
" dbHash = struct.unpack('<i', data[48:52])[0]\n",
" dbPad = data[52:]\n",
" \n",
" if len(dbPad) != 12 and any(dbPad):\n",
" print('warn: dbPad not 12 zeros')\n",
"\n",
" calcHash = murmur(dbKey+dbSalt)\n",
" if dbHash != calcHash:\n",
" raise Exception(f'hash mismatch: {dbHash} != {calcHash}')\n",
"\n",
" return dbKey, dbSalt\n",
"\n",
"def tempkey_pragma(dbKey, dbSalt):\n",
" key = binascii.hexlify(dbKey+dbSalt).decode('utf-8')\n",
" return '''PRAGMA key=\"x'{}'\"'''.format(key);"
]
},
{
"cell_type": "code",
"execution_count": 21,
"id": "relevant-scholarship",
"metadata": {},
"outputs": [],
"source": [
"with open('tempkeyEncrypted', 'rb') as f:\n",
" tempkeyEnc = f.read()"
]
},
{
"cell_type": "code",
"execution_count": 50,
"id": "initial-healing",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"PRAGMA key=\"x'68747470733a2f2f796f7574752e62652f64517734773957675863512f3f7879796f7576656265656e676e6f6d656421'\"\n"
]
}
],
"source": [
"dbKey, dbSalt = tempkey_parse(tempkeyEnc, DEFAULT_PASSWORD)\n",
"print(tempkey_pragma(dbKey, dbSalt))"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.12.2"
}
},
"nbformat": 4,
"nbformat_minor": 5
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment