Last active
July 12, 2020 03:30
-
-
Save tam17aki/fa084a9e698ffd255791750b923cdc8c to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
""" | |
A python script to perform steganography on the basis of | |
least significant bit (LSB) modification method. | |
""" | |
# Copyright (c) 2018 Ryan Gibson | |
# | |
# Permission is hereby granted, free of charge, to any person obtaining a copy | |
# of this software and associated documentation files (the "Software"), to deal | |
# in the Software without restriction, including without limitation the rights | |
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell | |
# copies of the Software, and to permit persons to whom the Software is | |
# furnished to do so, subject to the following conditions: | |
# | |
# The above copyright notice and this permission notice shall be included in | |
# all copies or substantial portions of the Software. | |
# | |
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR | |
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, | |
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE | |
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER | |
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, | |
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE | |
# SOFTWARE. | |
# Copyright (c) 2020 Akira TAMAMORI | |
# | |
# Released under the MIT license. | |
# | |
# This script borrows codes from https://github.com/ragibson/Steganography | |
import wave | |
import os | |
import math | |
import numpy as np | |
HOST_SIGNAL = "host2.wav" # メッセージを隠すwaveファイル | |
MESSAGE_PATH = "message.txt" # 秘密のメッセージ | |
STEGO_SIGNAL = "stego2.wav" # メッセージが隠されたwaveファイル | |
RECOVER_PATH = "recover_message.txt" # 復元された秘密のメッセージ | |
BYTE_DEPTH_TO_DTYPE = {1: np.uint8, 2: np.uint16, 4: np.uint32, 8: np.uint64} | |
def roundup(data, base=1): | |
""" 丸め """ | |
return int(math.ceil(data / base)) * base | |
def lsb_interleave_bytes(carrier, payload, num_lsb, truncate=False, | |
byte_depth=1): | |
""" | |
Interleave the bytes of payload into the num_lsb LSBs of carrier. | |
:param carrier: carrier bytes (バイトオブジェクト) | |
:param payload: payload bytes (バイトオブジェクト) | |
:param num_lsb: number of least significant bits to use (整数) | |
:param truncate: if True, will only return the interleaved part | |
:param byte_depth: byte depth of carrier values (整数) | |
:return: The interleaved bytes (バイトオブジェクト) | |
""" | |
# 埋め込み対象のデータ長 | |
plen = len(payload) | |
# データサイズ的には(バイト×サンプル数)なので、 | |
# バイトをビットの情報に変換した上で並べる | |
payload_bits = np.zeros(shape=(plen, 8), dtype=np.uint8) | |
# frombuffer()は bytes からndarray 1次元配列へ変換する | |
# unpackbits()はバイトを「ビット」配列に変換する | |
# reshape()で8ビットごとに配列を「折り曲げて」2次元配列に変換する | |
temp = np.unpackbits(np.frombuffer(payload, dtype=np.uint8, count=plen)) | |
payload_bits[:plen, :] = np.array(temp).reshape([plen, 8]) | |
# 埋め込みの総ビット数 | |
bit_height = roundup(plen * 8 / num_lsb) | |
# 「リサイズ」→num_lsbを考慮して端数を切り捨てる | |
# つまりは payload_bitsを[-1, num_lsb] にしたいので、 | |
# 要素数の整合性をもたせるための処理。 | |
payload_bits.resize(bit_height * num_lsb) | |
carrier_dtype = BYTE_DEPTH_TO_DTYPE[byte_depth] | |
# frombuffer()は bytes からndarray 1次元配列へ変換する | |
# unpackbits()はバイト(uint8)を「ビット」配列に変換する | |
# reshape()で8ビットごとに配列を「折り曲げて」2次元配列に変換する | |
carrier_bits = np.unpackbits(np.frombuffer(carrier, dtype=carrier_dtype, | |
count=bit_height).view(np.uint8)) | |
carrier_bits = np.array(carrier_bits).reshape(bit_height, 8 * byte_depth) | |
# キャリアの各フレームにおける 下位 num_lsb ビット たち を | |
# payloadの num_lsbビット たちで一斉に置き換える | |
carrier_bits[:, 8 - num_lsb: 8] = payload_bits.reshape(bit_height, num_lsb) | |
# ビット配列をバイト配列に戻す | |
ret = np.packbits(carrier_bits) | |
# バイト配列を通常のバイトオブジェクトに戻す | |
ret = np.array(ret).tobytes() | |
# 打ち切り (truncate)がTrueなら 埋め込みされた当該データのみを返す | |
# Falseなら埋め込みされたデータに加えて 未加工の残りのデータも返す | |
# → "+"はバイトオブジェクトの連結の意味 | |
return ret if truncate else ret + carrier[byte_depth * bit_height:] | |
def lsb_deinterleave_bytes(carrier, num_bits, num_lsb, byte_depth=1): | |
""" | |
Deinterleave num_bits bits from the num_lsb LSBs of carrier. | |
:param carrier: carrier bytes | |
:param num_bits: number of num_bits to retrieve | |
:param num_lsb: number of least significant bits to use | |
:param byte_depth: byte depth of carrier values | |
:return: The deinterleaved bytes | |
""" | |
# num_bits 1フレームから取り出す総ビット数 | |
# byte_depth 量子化バイト数 | |
# 戻り値 埋め込まれたデータ(バイトオブジェクト), (num_bits // 8) bytes | |
plen = roundup(num_bits / num_lsb) | |
carrier_dtype = BYTE_DEPTH_TO_DTYPE[byte_depth] | |
# キャリアを読み込み「ビット配列」に変換する | |
# frombuffer()は bytes からndarray 1次元配列へ変換する | |
# unpackbits()はバイト(uint8)を「ビット」配列に変換する | |
# reshape()で8ビットごとに配列を「折り曲げて」2次元配列に変換する | |
carrier_bits = np.unpackbits( | |
np.frombuffer(carrier, dtype=carrier_dtype, count=plen).view(np.uint8)) | |
carrier_bits = np.array(carrier_bits).reshape(plen, 8 * byte_depth) | |
# 埋め込まれたpayloadを取り出す | |
payload_bits = carrier_bits[:, 8 - num_lsb: 8] | |
# バイトオブジェクトに変換する | |
payload_bits = np.array(np.packbits(payload_bits)).tobytes() | |
# 指定バイトサイズで返す(端数bit切り捨て) | |
return payload_bits[: num_bits // 8] | |
def hide(sound_path, file_path, output_path, num_lsb=1): | |
""" 秘密のメッセージを音ファイルの中に隠す """ | |
if sound_path is None: | |
raise ValueError("WavSteg hiding requires an input sound file path") | |
if file_path is None: | |
raise ValueError("WavSteg hiding requires a secret file path") | |
if output_path is None: | |
raise ValueError("WavSteg hiding requires an output sound file path") | |
sound = wave.open(sound_path, "r") | |
params = sound.getparams() | |
wav_header = {} | |
wav_header["num_channels"] = sound.getnchannels() # チャネル数 | |
wav_header["sample_width"] = sound.getsampwidth() # 量子化ビット数(byte) | |
wav_header["num_frames"] = sound.getnframes() # フレーム数(チャネルあたりのサンプル数) | |
num_samples = wav_header["num_channels"] * \ | |
wav_header["num_frames"] # トータルのサンプル数 | |
# 当該のwaveに埋め込める最大バイト数 | |
# 8で割るのは 8ビット = 1バイトだから | |
max_bytes_to_hide = (num_samples * num_lsb) // 8 | |
# これから埋め込むメッセージのサイズ | |
file_size = os.stat(file_path).st_size | |
# 最大 num_frames 個のオーディオフレームを読み込み | |
# bytes オブジェクトを得る | |
sound_frames = sound.readframes(wav_header["num_frames"]) | |
with open(file_path, "rb") as file: | |
data = file.read() | |
print("復元前:\n" + data.decode()) | |
if file_size > max_bytes_to_hide: | |
required_lsb = math.ceil(file_size * 8 / num_samples) | |
raise ValueError( | |
"Input file too large to hide, " | |
"requires {} LSBs, using {}".format(required_lsb, num_lsb)) | |
# waveモジュールが扱えるのは量子化ビット数が 8bit or 16bit (残念) | |
# →24bitはダメ !! | |
if wav_header["sample_width"] not in (1, 2): | |
raise ValueError("File has an unsupported bit-depth") | |
# LSBに情報埋め込み | |
sound_frames = lsb_interleave_bytes(sound_frames, data, num_lsb, | |
byte_depth=wav_header["sample_width"]) | |
# ファイルに書き込み | |
sound_steg = wave.open(output_path, "w") | |
sound_steg.setparams(params) | |
sound_steg.writeframes(sound_frames) | |
sound_steg.close() | |
return file_size | |
def recover(sound_path, output_path, num_lsb, bytes_to_recover): | |
""" ステゴ的な音ファイルから埋め込んだメッセージを復元する """ | |
if sound_path is None: | |
raise ValueError("recovery requires an input sound file path") | |
if output_path is None: | |
raise ValueError("recovery requires an output file path") | |
if bytes_to_recover is None: | |
raise ValueError("recovery requires the number of bytes to recover") | |
sound = wave.open(sound_path, "r") | |
sample_width = sound.getsampwidth() | |
num_frames = sound.getnframes() | |
sound_frames = sound.readframes(num_frames) | |
if sample_width not in (1, 2): | |
# Python's wave module doesn't support higher sample widths | |
raise ValueError("File has an unsupported bit-depth") | |
# メッセージ復元 | |
data = lsb_deinterleave_bytes( | |
sound_frames, 8 * bytes_to_recover, num_lsb, byte_depth=sample_width) | |
# 復元されたメッセージ書き込み | |
output_file = open(output_path, "wb+") | |
output_file.write(bytes(data)) | |
output_file.close() | |
print("復元後:\n" + data.decode()) | |
def main(): | |
"""Main routine. """ | |
written_bytes = hide(HOST_SIGNAL, MESSAGE_PATH, STEGO_SIGNAL) | |
recover(STEGO_SIGNAL, RECOVER_PATH, 1, written_bytes) | |
if __name__ in '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment