Last active
December 6, 2022 07:24
-
-
Save LemonPrefect/0275929fe8bb61dd21f298a45aaf2bbc to your computer and use it in GitHub Desktop.
Solution for lena from Xiangyun Cup Quals, 2022
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import cv2 | |
import pywt | |
import numpy as np | |
from PIL import Image | |
from numpy import uint8, ndarray | |
from reedsolo import RSCodec | |
def arnoldShuffle(imageData, factor): | |
shuffleTimes, a, b = factor | |
image = np.zeros(imageData.shape) | |
height, width = imageData.shape[:2] | |
for time in range(shuffleTimes): | |
for x in range(height): | |
for y in range(width): | |
_y = (y + a * x) % width | |
_x = (b * y + (a * b + 1) * x) % height | |
image[_x, _y] = imageData[x, y] | |
imageData = image.copy() | |
return image | |
def splitImage(imageData, size): | |
height, width = imageData.shape[:2] | |
heightSize, widthSize = size | |
shapes = (height // heightSize, width // widthSize, heightSize, widthSize) | |
strides = imageData.itemsize * np.array([width * heightSize, widthSize, width, 1]) | |
imagePieces = np.lib.stride_tricks.as_strided(imageData, shapes, strides).astype('float64') | |
imagePieces = np.reshape(imagePieces, (shapes[0] * shapes[1], heightSize, widthSize)) | |
return imagePieces | |
def mergeImage(imageData, shape): | |
height, width = shape[:2] | |
heightSize, widthSize = imageData.shape[-2:] | |
shapes = (height // heightSize, width // widthSize, heightSize, widthSize) | |
imageData = np.reshape(imageData, shapes) | |
imagePieces = [] | |
for imagePiece in imageData: | |
imagePieces.append(np.concatenate(imagePiece, axis=1)) | |
image = np.concatenate(imagePieces, axis=0) | |
return image | |
def binarization(pixel: ndarray) -> uint8: | |
return ((pixel > 128) * 255).astype('uint8') | |
def muLawEncode(x, b, mu): | |
return np.log(1 + mu * (np.abs(x) / b)) / np.log(1 + mu) | |
def muLawDecode(y, b, mu): | |
return (b / mu) * (np.power(1 + mu, np.abs(y)) - 1) | |
def qim(x, m, delta: int): | |
return (np.round(x * 1000 / delta) * delta + (-1) ** (m + 1) * delta / 4.) / 1000 | |
def deQim(x, delta: int): | |
return delta / 2 - x * 1000 % delta | |
class Watermark: | |
def __init__(self, carrier): | |
self.block_shape = 4 | |
self.arnold_factor = (6, 20, 22) | |
self.rsc_factor = 100 | |
self.mu_law_mu = 100 | |
self.mu_law_X_max = 8000 | |
self.delta = 15 | |
self.carrier = carrier.astype('float32') | |
height, width = self.carrier.shape[:2] | |
self.carrier_cA_height = height // 2 | |
self.carrier_cA_width = width // 2 | |
self.watermark_height = self.carrier_cA_height // self.block_shape | |
self.watermark_width = self.carrier_cA_width // self.block_shape | |
self.max_bits_size = self.watermark_height * self.watermark_width | |
self.max_bytes_size = self.max_bits_size // 8 | |
self.rsc_size = len(RSCodec(self.rsc_factor).encode(b'\x00' * self.max_bytes_size)) | |
def fixBinary(self, source): | |
binaries = (source % 2).flatten() | |
if len(binaries) < self.max_bits_size: | |
binaries = np.hstack((binaries, np.zeros(self.max_bits_size - len(binaries)))).astype('uint8') | |
return binaries | |
def binaryPack(self, data) -> bytes: | |
return np.packbits(data).tobytes() | |
def binaryUnpack(self, binary: bytes) -> ndarray: | |
unpackedBinary = np.unpackbits(np.frombuffer(binary, dtype='uint8')) | |
if len(unpackedBinary) < self.max_bits_size: | |
unpackedBinary = np.hstack((unpackedBinary, np.zeros(self.max_bits_size - len(unpackedBinary)))).astype('uint8') | |
return unpackedBinary | |
def dctQimSvdMuShuffle(self, carriers, watermark): | |
carried = carriers.copy() | |
for index, carrier in enumerate(carriers): | |
watermarkPiece = watermark[index] | |
carrierdcted = cv2.dct(carrier) | |
u, s, v = np.linalg.svd(carrierdcted) | |
x = np.max(s) | |
imagePiece = muLawEncode(x, self.mu_law_X_max, self.mu_law_mu) | |
qimedImagePiece = qim(imagePiece, watermarkPiece, self.delta) | |
imagePiece = muLawDecode(qimedImagePiece, self.mu_law_X_max, self.mu_law_mu) | |
for shape in range(self.block_shape): | |
if s[shape] == x: | |
s[shape] = imagePiece | |
carried[index] = cv2.idct(np.dot(np.dot(u, np.diag(s)), v)) | |
return carried | |
def undctQimSvdMuShuffle(self, image): | |
carried = image.copy() | |
result = [] | |
for index, carrier in enumerate(carried): | |
carrierdcted = cv2.dct(carrier) | |
u, s, v = np.linalg.svd(carrierdcted) | |
x = np.max(s) | |
imagePiece = muLawEncode(x, self.mu_law_X_max, self.mu_law_mu) | |
data = deQim(imagePiece, self.delta) | |
result.append(1 & (data > 0)) | |
return result | |
def stego(self, imageData): | |
imageAnorlded = arnoldShuffle(imageData, self.arnold_factor) | |
imageBinarized = binarization(imageAnorlded) | |
imageBinarizedFixed = self.fixBinary(imageBinarized) | |
imagePacked = self.binaryPack(imageBinarizedFixed) | |
imageRSCoded = RSCodec(self.rsc_factor) | |
imageBytes = bytes(imageRSCoded.encode(imagePacked)) | |
imageLastBytes = self.binaryUnpack(imageBytes[:self.max_bytes_size]) | |
imageFirstBytes = self.binaryUnpack(imageBytes[self.max_bytes_size:]) | |
carrierChangedSpace = cv2.cvtColor(self.carrier, cv2.COLOR_BGR2YCrCb) | |
carrierY, carrierCr, carrierCb = cv2.split(carrierChangedSpace) | |
carrierCrA, carrierCrD = pywt.dwt2(carrierCr, 'haar') | |
carrierCbA, carrierCbD = pywt.dwt2(carrierCb, 'haar') | |
imagesCrA = splitImage(carrierCrA, (self.block_shape, self.block_shape)) | |
imagesCbA = splitImage(carrierCbA, (self.block_shape, self.block_shape)) | |
imagesCrAdcted = self.dctQimSvdMuShuffle(imagesCrA, imageLastBytes) | |
carrierCrAdcted = mergeImage(imagesCrAdcted, (self.carrier_cA_height, self.carrier_cA_width)) | |
imagesCbAdcted = self.dctQimSvdMuShuffle(imagesCbA, imageFirstBytes) | |
carrierCbAdcted = mergeImage(imagesCbAdcted, (self.carrier_cA_height, self.carrier_cA_width)) | |
carrierCr = pywt.idwt2((carrierCrAdcted, carrierCrD), 'haar') | |
carrierCb = pywt.idwt2((carrierCbAdcted, carrierCbD), 'haar') | |
carrier = cv2.merge([carrierY, carrierCr.astype('float32'), carrierCb.astype('float32')]) | |
carrierRGB = cv2.cvtColor(carrier, cv2.COLOR_YCrCb2BGR) | |
return carrierRGB | |
def unstego(self): | |
carrier = self.carrier | |
carrierYCrCb = cv2.cvtColor(carrier, cv2.COLOR_BGR2YCrCb) | |
carrierY, carrierCr, carrierCb = cv2.split(carrierYCrCb) | |
carrierCrA, carrierCrD = pywt.dwt2(carrierCr, 'haar') | |
carrierCbA, carrierCbD = pywt.dwt2(carrierCb, 'haar') | |
imagesCrA = splitImage(carrierCrA, (self.block_shape, self.block_shape)) | |
imagesCbA = splitImage(carrierCbA, (self.block_shape, self.block_shape)) | |
imagesCrAdcted = self.undctQimSvdMuShuffle(imagesCrA) | |
imagesCbAdcted = self.undctQimSvdMuShuffle(imagesCbA) | |
imageBytes = self.binaryPack(np.array(imagesCrAdcted + imagesCbAdcted))[:self.rsc_size] | |
imageRSCoded = RSCodec(self.rsc_factor) | |
imageBytes = bytes(imageRSCoded.decode(imageBytes)[0]) | |
image = self.binaryUnpack(imageBytes).reshape((240, 240)) | |
for x in range(19): | |
image = arnoldShuffle(image, self.arnold_factor) | |
return image | |
if __name__ == '__main__': | |
# carrier = cv2.imread('test_images/lena.png') | |
# watermark = cv2.imread('test_images/flag.png', cv2.IMREAD_GRAYSCALE) | |
# wm = Watermark(carrier) | |
# embedded = wm.stego(watermark) | |
# cv2.imwrite('embedded.png', embedded) | |
carried = cv2.imread(r"G:\das XiangyunCup 2022 Quals\lena\embedded.png") | |
wm = Watermark(carried) | |
data = wm.unstego() | |
image = Image.fromarray(np.uint8(data * 255)) | |
image.save(r"G:\das XiangyunCup 2022 Quals\lena\extract.png") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment