Skip to content

Instantly share code, notes, and snippets.

@toriningen
Last active January 15, 2020 16:47
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save toriningen/7c56c262bff38fda40b5cb6a014b87a2 to your computer and use it in GitHub Desktop.
Save toriningen/7c56c262bff38fda40b5cb6a014b87a2 to your computer and use it in GitHub Desktop.
Naive Python implementation of hash-based obfuscation codec

Идея

Несколько лет назад у меня возникла идея, как можно с этим бороться и сделать создание правила для DPI-фильтра невозможным на практике.

Суть идеи заключается в том, чтобы вместо открытых данных пересылать случайные данные, но которые можно по некоторому заранее известному, желательно трудозатратному алгоритму без секретной компоненты, преобразовать обратно в открытые данные.

Таким образом, это не добавит какой-либо секретности в пересылаемые данные, но затруднит их непосредственный анализ без предварительной обработки, которая в масштабах глобального прослушивания повлечет за собой как минимум ложнопозитивные сработки, и как максимум — невозможность справиться с потоком данных с точки зрения требуемых ресурсов.

Для кодирования следует разбить открытые данные на чанки по N бит, затем для каждого чанка нужно брутфорсом подобрать такой случайный блок данных фиксированного размера H, который при декодировании превратится в ожидаемый чанк открытых данных в N бит размером. Пример такой процедуры декодирования — циклически посчитать SHA256 от блока случайных данных 1000 раз, и затем вернуть младшие N бит хэша.


Приведу пример декодирования готовой строки. Допустим, у нас есть следующие бинарные данные:

4b 82 93 6a b5 0f 39 9b 33 18 2a 7d b4 96 c2 f8 | K..j..9.3.*}....
cb 57 49 b7 67 6d f6 c2 9a e2 73 64 54 1f 92 7d | .WI.gm....sdT..}
65 44 12 7e de 07 c2 85 41 15 9e 32 28 3b c5 3a | eD.~....A..2(;.:
0f 16 3a d9                                     | ..:.

Мы заранее знаем, что открытый текст делился на блоки по 8 бит каждый, и что мы использовали блоки "шифротекста" по 32 бита.

Первый блок — 4b 82 93 6a. SHA256 от него = 486b84b02736885238a0d8ba871a78262362f1dfe7422f1d50b257ed6c99bcc3. Младшие 8 байт этого хэша в little endian = 0x48, т.е. "H" в ASCII.

Следующий блок — b5 0f 39 9b. SHA256 от него = 6546ec1ccea61e68c70671b6a04d9a074c0bf024f92d72d9103e78c3641801eb. Младшие 8 байт этого хэша в little endian = 0x65, т.е. "e" в ASCII.

Продолжая декодирование таким же образом, получим строку "Hello, world!".

Обсуждение

Q: Хотя ваш алгоритм и «трудозатратный», но с другой стороны он не увеличивает количество ложнопозитивных срабатываний, а наоборот уменьшает их.

Что я имею в виду. Если мы знаем алгоритм и применяем его ко всем пакетам, то с очень большой вероятностью только «ваши» пакеты на выходе дадут что-то осмысленное, т.к. вероятность что что-то осмысленное получится после 1000 раундов SHA256 очень мала, а умножив это на количество блоков мы получим вообще исчезающе малую величину.

Мне кажется чтобы увеличить количество ложноположительных срабатываний нужно наоборот использовать «нестойкие» алгоритмы которые с большей вероятностью будут выдавать нечто «осмысленное».

A: Вы не совсем правы. В предложенной мной схеме речь не идет о поиске такого прообраза, после хэширования которого мы получим "осмысленный пакет" (т.е. некую фиксированную битовую последовательность) — вероятность угадать такой прообраз, учитывая, что выхлоп сильной хэш-функции можно полагать случайным, составляет 2**(-N), где N — длина искомого "смысла". 100-битный осмысленный пакет можно искать неимоверно долго, и абсолютно впустую — добывать биткоины при таком количестве свободного времени и ресурсов явно выгоднее :)

Предложенную схему не стоит рассматривать как средство сокрытия информации — очевидно же, что если ваш пакет начинается с "Lorem ipsum" (88 бит), то каждый 309485009821345068724781056-й в среднем пакет должен будет "расшировываться" в "Lorem ipsum", и если наблюдаемая частота таких сообщений значительно превышает ожидаемый среднестатистический, то это "бжжж" тут явно неспроста.

Суть же предложенной мной схемы как раз заключается в том, чтобы сделать слежку невозможной именно в глобальных масштабах, основываясь на, скажем, эдаком proof-of-work. Если невозможно заранее, глядя только на мусорные блоки, отличить их от настоящего мусора, то атакующей стороне в любом случае потребуется потратить энное количество ватт (времени, денег) на вычисление хэш-функции от этого мусорного блока.

И если поток сквозных данных превысит возможности атакующей стороны по вычислению хэшей, им придется либо пропускать часть трафика, либо в принципе отказаться от идеи вычислять хэши для произвольных данных. Причем инициатору соединения даже необязательно использовать истинно случайные блоки — можно притворяться чем-то другим, лишь бы все еще было достаточно степеней свободы для подбора хэша с требуемыми свойствами.

Утрируя, мы можем передавать 1 бит данных в 128 битах мусора. Уже даже с такой избыточностью невозможно составить таблицу соответствия мусорных данных истинным данным, значит, DPI-фильтру придется "играть честно", и чем сложнее будет для вычисления хэш-функция (pbkdf2? scrypt? bcrypt?), тем меньшее количество сообщений выйдет декодировать, потратив 1 кВт, и тем быстрее правительству^W атакующей стороне это надоест.

Ну а процент "ложноположительных срабатываний" у этой схемы и так статистически неотличим от шума, куда уж лучше?

import hashlib
import random
from collections import deque
from abc import ABC, abstractmethod
class AbstractHashCodec(ABC):
"""Abstract hash codec.
Implement :meth:`decode_block`.
:param plaintext_block_bits: bit length of chunks to split the plaintext into.
:param ciphertext_block_bits: bit length of chunks to split the ciphertext into.
:param max_pool_size: how many ciphertext block candidates should be stored in the pool for one particular
plaintext block.
"""
def __init__(self, plaintext_block_bits: int, ciphertext_block_bits: int, max_pool_size: int = 8):
if not plaintext_block_bits > 0:
raise ValueError("plaintext_block_bits must be larger than zero")
if not ciphertext_block_bits > 0:
raise ValueError("ciphertext_block_bits must be larger than zero")
if not ciphertext_block_bits % 8 == 0:
raise ValueError("ciphertext_block_bits must be divisible by 8")
if not plaintext_block_bits < ciphertext_block_bits:
raise ValueError("plaintext_block_bits must be less than ciphertext_block_bits")
self._plaintext_block_bits = plaintext_block_bits
self._plaintext_block_mask = (1 << self._plaintext_block_bits) - 1
self._ciphertext_block_bits = ciphertext_block_bits
self._ciphertext_block_mask = (1 << self._ciphertext_block_bits) - 1
self._ciphertext_block_bytes = self._ciphertext_block_bits // 8
self._pool = {}
self._max_pool_size = max_pool_size
def encode(self, plaintext: bytes) -> bytes:
"""Encode bytes.
This splits the plaintext stream into chunks ``plaintext_block_bits`` bits long, and encodes each one by one,
stitching the encoded blocks into the resulting byte string.
"""
ciphertext = []
plaintext_int = int.from_bytes(plaintext, 'little')
while plaintext_int:
plaintext_block = plaintext_int & self._plaintext_block_mask
plaintext_int >>= self._plaintext_block_bits
ciphertext_block = self.encode_block(plaintext_block)
ciphertext_bytes = ciphertext_block.to_bytes(self._ciphertext_block_bytes, 'little')
ciphertext.append(ciphertext_bytes)
return b''.join(ciphertext)
def decode(self, ciphertext: bytes) -> bytes:
"""Decode bytes.
This splits the ciphertext into chunks ``ciphertext_block_bits`` bits long, and then decodes them one by one,
stitching the decoded blocks into the resulting byte string.
The result may be longer than original plaintext, so make sure to include length markers in the payload.
"""
plaintext = 0
total_blocks = 0
while True:
offset = total_blocks * self._ciphertext_block_bytes
offset_next = offset + self._ciphertext_block_bytes
chunk = ciphertext[offset:offset_next]
if not chunk:
break
assert len(chunk) == self._ciphertext_block_bytes
ciphertext_block = int.from_bytes(chunk, 'little')
plaintext_block = self.decode_block(ciphertext_block)
plaintext |= (plaintext_block << (self._plaintext_block_bits * total_blocks))
total_blocks += 1
total_bytes = ((total_blocks * self._plaintext_block_bits) - 1) // 8 + 1
return plaintext.to_bytes(total_bytes, 'little')
def encode_block(self, plaintext_block: int) -> int:
"""Encode plaintext block.
This function searches for a random block that can be decoded into designated plaintext block. It would use
cached blocks if possible, and would cache block candidates for later use.
"""
assert plaintext_block & self._plaintext_block_mask == plaintext_block
pool = self._pool.get(plaintext_block)
if pool:
ciphertext_block = pool.pop()
if not pool:
del self._pool[plaintext_block]
return ciphertext_block
while True:
ciphertext_block = random.getrandbits(self._ciphertext_block_bits)
decoded_plaintext_block = self.decode_block(ciphertext_block)
if decoded_plaintext_block == plaintext_block:
return ciphertext_block
pool = self._pool.get(decoded_plaintext_block)
if pool is None: # not using .setdefault to avoid expensive deque() construction in the loop
pool = self._pool[decoded_plaintext_block] = deque()
if len(pool) < self._max_pool_size:
pool.append(ciphertext_block)
@abstractmethod
def decode_block(self, ciphertext_block: int) -> int:
"""Decode ciphertext block.
This function must statelessly decode the ciphertext block.
"""
class Sha256Codec(AbstractHashCodec):
def decode_block(self, ciphertext_block: int) -> int:
assert ciphertext_block & self._ciphertext_block_mask == ciphertext_block
preimage = ciphertext_block.to_bytes(self._ciphertext_block_bytes, 'little')
digest_bytes = hashlib.sha256(preimage).digest()
digest_int = int.from_bytes(digest_bytes, 'little')
plaintext_block = digest_int & self._plaintext_block_mask
return plaintext_block
g_codec = Sha256Codec(8, 32)
g_plaintext = b'Hello, world!'
g_ciphertext = g_codec.encode(g_plaintext)
g_plaintext2 = g_codec.decode(g_ciphertext)
assert g_plaintext == g_plaintext2[:len(g_plaintext)]
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment