Skip to content

Instantly share code, notes, and snippets.

@LemonPrefect
Last active December 6, 2022 07:24
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save LemonPrefect/0275929fe8bb61dd21f298a45aaf2bbc to your computer and use it in GitHub Desktop.
Save LemonPrefect/0275929fe8bb61dd21f298a45aaf2bbc to your computer and use it in GitHub Desktop.
Solution for lena from Xiangyun Cup Quals, 2022
import cv2
import pywt
import numpy as np
from PIL import Image
from numpy import uint8, ndarray
from reedsolo import RSCodec
def arnoldShuffle(imageData, factor):
shuffleTimes, a, b = factor
image = np.zeros(imageData.shape)
height, width = imageData.shape[:2]
for time in range(shuffleTimes):
for x in range(height):
for y in range(width):
_y = (y + a * x) % width
_x = (b * y + (a * b + 1) * x) % height
image[_x, _y] = imageData[x, y]
imageData = image.copy()
return image
def splitImage(imageData, size):
height, width = imageData.shape[:2]
heightSize, widthSize = size
shapes = (height // heightSize, width // widthSize, heightSize, widthSize)
strides = imageData.itemsize * np.array([width * heightSize, widthSize, width, 1])
imagePieces = np.lib.stride_tricks.as_strided(imageData, shapes, strides).astype('float64')
imagePieces = np.reshape(imagePieces, (shapes[0] * shapes[1], heightSize, widthSize))
return imagePieces
def mergeImage(imageData, shape):
height, width = shape[:2]
heightSize, widthSize = imageData.shape[-2:]
shapes = (height // heightSize, width // widthSize, heightSize, widthSize)
imageData = np.reshape(imageData, shapes)
imagePieces = []
for imagePiece in imageData:
imagePieces.append(np.concatenate(imagePiece, axis=1))
image = np.concatenate(imagePieces, axis=0)
return image
def binarization(pixel: ndarray) -> uint8:
return ((pixel > 128) * 255).astype('uint8')
def muLawEncode(x, b, mu):
return np.log(1 + mu * (np.abs(x) / b)) / np.log(1 + mu)
def muLawDecode(y, b, mu):
return (b / mu) * (np.power(1 + mu, np.abs(y)) - 1)
def qim(x, m, delta: int):
return (np.round(x * 1000 / delta) * delta + (-1) ** (m + 1) * delta / 4.) / 1000
def deQim(x, delta: int):
return delta / 2 - x * 1000 % delta
class Watermark:
def __init__(self, carrier):
self.block_shape = 4
self.arnold_factor = (6, 20, 22)
self.rsc_factor = 100
self.mu_law_mu = 100
self.mu_law_X_max = 8000
self.delta = 15
self.carrier = carrier.astype('float32')
height, width = self.carrier.shape[:2]
self.carrier_cA_height = height // 2
self.carrier_cA_width = width // 2
self.watermark_height = self.carrier_cA_height // self.block_shape
self.watermark_width = self.carrier_cA_width // self.block_shape
self.max_bits_size = self.watermark_height * self.watermark_width
self.max_bytes_size = self.max_bits_size // 8
self.rsc_size = len(RSCodec(self.rsc_factor).encode(b'\x00' * self.max_bytes_size))
def fixBinary(self, source):
binaries = (source % 2).flatten()
if len(binaries) < self.max_bits_size:
binaries = np.hstack((binaries, np.zeros(self.max_bits_size - len(binaries)))).astype('uint8')
return binaries
def binaryPack(self, data) -> bytes:
return np.packbits(data).tobytes()
def binaryUnpack(self, binary: bytes) -> ndarray:
unpackedBinary = np.unpackbits(np.frombuffer(binary, dtype='uint8'))
if len(unpackedBinary) < self.max_bits_size:
unpackedBinary = np.hstack((unpackedBinary, np.zeros(self.max_bits_size - len(unpackedBinary)))).astype('uint8')
return unpackedBinary
def dctQimSvdMuShuffle(self, carriers, watermark):
carried = carriers.copy()
for index, carrier in enumerate(carriers):
watermarkPiece = watermark[index]
carrierdcted = cv2.dct(carrier)
u, s, v = np.linalg.svd(carrierdcted)
x = np.max(s)
imagePiece = muLawEncode(x, self.mu_law_X_max, self.mu_law_mu)
qimedImagePiece = qim(imagePiece, watermarkPiece, self.delta)
imagePiece = muLawDecode(qimedImagePiece, self.mu_law_X_max, self.mu_law_mu)
for shape in range(self.block_shape):
if s[shape] == x:
s[shape] = imagePiece
carried[index] = cv2.idct(np.dot(np.dot(u, np.diag(s)), v))
return carried
def undctQimSvdMuShuffle(self, image):
carried = image.copy()
result = []
for index, carrier in enumerate(carried):
carrierdcted = cv2.dct(carrier)
u, s, v = np.linalg.svd(carrierdcted)
x = np.max(s)
imagePiece = muLawEncode(x, self.mu_law_X_max, self.mu_law_mu)
data = deQim(imagePiece, self.delta)
result.append(1 & (data > 0))
return result
def stego(self, imageData):
imageAnorlded = arnoldShuffle(imageData, self.arnold_factor)
imageBinarized = binarization(imageAnorlded)
imageBinarizedFixed = self.fixBinary(imageBinarized)
imagePacked = self.binaryPack(imageBinarizedFixed)
imageRSCoded = RSCodec(self.rsc_factor)
imageBytes = bytes(imageRSCoded.encode(imagePacked))
imageLastBytes = self.binaryUnpack(imageBytes[:self.max_bytes_size])
imageFirstBytes = self.binaryUnpack(imageBytes[self.max_bytes_size:])
carrierChangedSpace = cv2.cvtColor(self.carrier, cv2.COLOR_BGR2YCrCb)
carrierY, carrierCr, carrierCb = cv2.split(carrierChangedSpace)
carrierCrA, carrierCrD = pywt.dwt2(carrierCr, 'haar')
carrierCbA, carrierCbD = pywt.dwt2(carrierCb, 'haar')
imagesCrA = splitImage(carrierCrA, (self.block_shape, self.block_shape))
imagesCbA = splitImage(carrierCbA, (self.block_shape, self.block_shape))
imagesCrAdcted = self.dctQimSvdMuShuffle(imagesCrA, imageLastBytes)
carrierCrAdcted = mergeImage(imagesCrAdcted, (self.carrier_cA_height, self.carrier_cA_width))
imagesCbAdcted = self.dctQimSvdMuShuffle(imagesCbA, imageFirstBytes)
carrierCbAdcted = mergeImage(imagesCbAdcted, (self.carrier_cA_height, self.carrier_cA_width))
carrierCr = pywt.idwt2((carrierCrAdcted, carrierCrD), 'haar')
carrierCb = pywt.idwt2((carrierCbAdcted, carrierCbD), 'haar')
carrier = cv2.merge([carrierY, carrierCr.astype('float32'), carrierCb.astype('float32')])
carrierRGB = cv2.cvtColor(carrier, cv2.COLOR_YCrCb2BGR)
return carrierRGB
def unstego(self):
carrier = self.carrier
carrierYCrCb = cv2.cvtColor(carrier, cv2.COLOR_BGR2YCrCb)
carrierY, carrierCr, carrierCb = cv2.split(carrierYCrCb)
carrierCrA, carrierCrD = pywt.dwt2(carrierCr, 'haar')
carrierCbA, carrierCbD = pywt.dwt2(carrierCb, 'haar')
imagesCrA = splitImage(carrierCrA, (self.block_shape, self.block_shape))
imagesCbA = splitImage(carrierCbA, (self.block_shape, self.block_shape))
imagesCrAdcted = self.undctQimSvdMuShuffle(imagesCrA)
imagesCbAdcted = self.undctQimSvdMuShuffle(imagesCbA)
imageBytes = self.binaryPack(np.array(imagesCrAdcted + imagesCbAdcted))[:self.rsc_size]
imageRSCoded = RSCodec(self.rsc_factor)
imageBytes = bytes(imageRSCoded.decode(imageBytes)[0])
image = self.binaryUnpack(imageBytes).reshape((240, 240))
for x in range(19):
image = arnoldShuffle(image, self.arnold_factor)
return image
if __name__ == '__main__':
# carrier = cv2.imread('test_images/lena.png')
# watermark = cv2.imread('test_images/flag.png', cv2.IMREAD_GRAYSCALE)
# wm = Watermark(carrier)
# embedded = wm.stego(watermark)
# cv2.imwrite('embedded.png', embedded)
carried = cv2.imread(r"G:\das XiangyunCup 2022 Quals\lena\embedded.png")
wm = Watermark(carried)
data = wm.unstego()
image = Image.fromarray(np.uint8(data * 255))
image.save(r"G:\das XiangyunCup 2022 Quals\lena\extract.png")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment