Skip to content

Instantly share code, notes, and snippets.

@tam17aki

tam17aki/watermark_lsb.py

Last active Jul 12, 2020
Embed
What would you like to do?
#!/usr/bin/env python3
"""
A python script to perform steganography on the basis of
least significant bit (LSB) modification method.
"""
# Copyright (c) 2018 Ryan Gibson
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
# Copyright (c) 2020 Akira TAMAMORI
#
# Released under the MIT license.
#
# This script borrows codes from https://github.com/ragibson/Steganography
import wave
import os
import math
import numpy as np
HOST_SIGNAL = "host2.wav" # メッセージを隠すwaveファイル
MESSAGE_PATH = "message.txt" # 秘密のメッセージ
STEGO_SIGNAL = "stego2.wav" # メッセージが隠されたwaveファイル
RECOVER_PATH = "recover_message.txt" # 復元された秘密のメッセージ
BYTE_DEPTH_TO_DTYPE = {1: np.uint8, 2: np.uint16, 4: np.uint32, 8: np.uint64}
def roundup(data, base=1):
""" 丸め """
return int(math.ceil(data / base)) * base
def lsb_interleave_bytes(carrier, payload, num_lsb, truncate=False,
byte_depth=1):
"""
Interleave the bytes of payload into the num_lsb LSBs of carrier.
:param carrier: carrier bytes (バイトオブジェクト)
:param payload: payload bytes (バイトオブジェクト)
:param num_lsb: number of least significant bits to use (整数)
:param truncate: if True, will only return the interleaved part
:param byte_depth: byte depth of carrier values (整数)
:return: The interleaved bytes (バイトオブジェクト)
"""
# 埋め込み対象のデータ長
plen = len(payload)
# データサイズ的には(バイト×サンプル数)なので、
# バイトをビットの情報に変換した上で並べる
payload_bits = np.zeros(shape=(plen, 8), dtype=np.uint8)
# frombuffer()は bytes からndarray 1次元配列へ変換する
# unpackbits()はバイトを「ビット」配列に変換する
# reshape()で8ビットごとに配列を「折り曲げて」2次元配列に変換する
temp = np.unpackbits(np.frombuffer(payload, dtype=np.uint8, count=plen))
payload_bits[:plen, :] = np.array(temp).reshape([plen, 8])
# 埋め込みの総ビット数
bit_height = roundup(plen * 8 / num_lsb)
# 「リサイズ」→num_lsbを考慮して端数を切り捨てる
# つまりは payload_bitsを[-1, num_lsb] にしたいので、
# 要素数の整合性をもたせるための処理。
payload_bits.resize(bit_height * num_lsb)
carrier_dtype = BYTE_DEPTH_TO_DTYPE[byte_depth]
# frombuffer()は bytes からndarray 1次元配列へ変換する
# unpackbits()はバイト(uint8)を「ビット」配列に変換する
# reshape()で8ビットごとに配列を「折り曲げて」2次元配列に変換する
carrier_bits = np.unpackbits(np.frombuffer(carrier, dtype=carrier_dtype,
count=bit_height).view(np.uint8))
carrier_bits = np.array(carrier_bits).reshape(bit_height, 8 * byte_depth)
# キャリアの各フレームにおける 下位 num_lsb ビット たち を
# payloadの num_lsbビット たちで一斉に置き換える
carrier_bits[:, 8 - num_lsb: 8] = payload_bits.reshape(bit_height, num_lsb)
# ビット配列をバイト配列に戻す
ret = np.packbits(carrier_bits)
# バイト配列を通常のバイトオブジェクトに戻す
ret = np.array(ret).tobytes()
# 打ち切り (truncate)がTrueなら 埋め込みされた当該データのみを返す
# Falseなら埋め込みされたデータに加えて 未加工の残りのデータも返す
# → "+"はバイトオブジェクトの連結の意味
return ret if truncate else ret + carrier[byte_depth * bit_height:]
def lsb_deinterleave_bytes(carrier, num_bits, num_lsb, byte_depth=1):
"""
Deinterleave num_bits bits from the num_lsb LSBs of carrier.
:param carrier: carrier bytes
:param num_bits: number of num_bits to retrieve
:param num_lsb: number of least significant bits to use
:param byte_depth: byte depth of carrier values
:return: The deinterleaved bytes
"""
# num_bits 1フレームから取り出す総ビット数
# byte_depth 量子化バイト数
# 戻り値 埋め込まれたデータ(バイトオブジェクト), (num_bits // 8) bytes
plen = roundup(num_bits / num_lsb)
carrier_dtype = BYTE_DEPTH_TO_DTYPE[byte_depth]
# キャリアを読み込み「ビット配列」に変換する
# frombuffer()は bytes からndarray 1次元配列へ変換する
# unpackbits()はバイト(uint8)を「ビット」配列に変換する
# reshape()で8ビットごとに配列を「折り曲げて」2次元配列に変換する
carrier_bits = np.unpackbits(
np.frombuffer(carrier, dtype=carrier_dtype, count=plen).view(np.uint8))
carrier_bits = np.array(carrier_bits).reshape(plen, 8 * byte_depth)
# 埋め込まれたpayloadを取り出す
payload_bits = carrier_bits[:, 8 - num_lsb: 8]
# バイトオブジェクトに変換する
payload_bits = np.array(np.packbits(payload_bits)).tobytes()
# 指定バイトサイズで返す(端数bit切り捨て)
return payload_bits[: num_bits // 8]
def hide(sound_path, file_path, output_path, num_lsb=1):
""" 秘密のメッセージを音ファイルの中に隠す """
if sound_path is None:
raise ValueError("WavSteg hiding requires an input sound file path")
if file_path is None:
raise ValueError("WavSteg hiding requires a secret file path")
if output_path is None:
raise ValueError("WavSteg hiding requires an output sound file path")
sound = wave.open(sound_path, "r")
params = sound.getparams()
wav_header = {}
wav_header["num_channels"] = sound.getnchannels() # チャネル数
wav_header["sample_width"] = sound.getsampwidth() # 量子化ビット数(byte)
wav_header["num_frames"] = sound.getnframes() # フレーム数(チャネルあたりのサンプル数)
num_samples = wav_header["num_channels"] * \
wav_header["num_frames"] # トータルのサンプル数
# 当該のwaveに埋め込める最大バイト数
# 8で割るのは 8ビット = 1バイトだから
max_bytes_to_hide = (num_samples * num_lsb) // 8
# これから埋め込むメッセージのサイズ
file_size = os.stat(file_path).st_size
# 最大 num_frames 個のオーディオフレームを読み込み
# bytes オブジェクトを得る
sound_frames = sound.readframes(wav_header["num_frames"])
with open(file_path, "rb") as file:
data = file.read()
print("復元前:\n" + data.decode())
if file_size > max_bytes_to_hide:
required_lsb = math.ceil(file_size * 8 / num_samples)
raise ValueError(
"Input file too large to hide, "
"requires {} LSBs, using {}".format(required_lsb, num_lsb))
# waveモジュールが扱えるのは量子化ビット数が 8bit or 16bit (残念)
# →24bitはダメ !!
if wav_header["sample_width"] not in (1, 2):
raise ValueError("File has an unsupported bit-depth")
# LSBに情報埋め込み
sound_frames = lsb_interleave_bytes(sound_frames, data, num_lsb,
byte_depth=wav_header["sample_width"])
# ファイルに書き込み
sound_steg = wave.open(output_path, "w")
sound_steg.setparams(params)
sound_steg.writeframes(sound_frames)
sound_steg.close()
return file_size
def recover(sound_path, output_path, num_lsb, bytes_to_recover):
""" ステゴ的な音ファイルから埋め込んだメッセージを復元する """
if sound_path is None:
raise ValueError("recovery requires an input sound file path")
if output_path is None:
raise ValueError("recovery requires an output file path")
if bytes_to_recover is None:
raise ValueError("recovery requires the number of bytes to recover")
sound = wave.open(sound_path, "r")
sample_width = sound.getsampwidth()
num_frames = sound.getnframes()
sound_frames = sound.readframes(num_frames)
if sample_width not in (1, 2):
# Python's wave module doesn't support higher sample widths
raise ValueError("File has an unsupported bit-depth")
# メッセージ復元
data = lsb_deinterleave_bytes(
sound_frames, 8 * bytes_to_recover, num_lsb, byte_depth=sample_width)
# 復元されたメッセージ書き込み
output_file = open(output_path, "wb+")
output_file.write(bytes(data))
output_file.close()
print("復元後:\n" + data.decode())
def main():
"""Main routine. """
written_bytes = hide(HOST_SIGNAL, MESSAGE_PATH, STEGO_SIGNAL)
recover(STEGO_SIGNAL, RECOVER_PATH, 1, written_bytes)
if __name__ in '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.