Skip to content

Instantly share code, notes, and snippets.

@YellowRoseCx
Last active October 30, 2023 17:40
Show Gist options
  • Save YellowRoseCx/e6979d9842e2177e5a018eb642df25cc to your computer and use it in GitHub Desktop.
Save YellowRoseCx/e6979d9842e2177e5a018eb642df25cc to your computer and use it in GitHub Desktop.
Real-Time Steganographic Communication System for Web Browsing using Audio Network Steganography
Let's say you have two laptops connected to the same WiFi network, both running the Python script provided below.
One laptop (let's call it Laptop A) has a web browser open on a page hosted by a remote server (not controlled by either laptop),
while the other laptop (Laptop B) runs the server script.
To begin the steganographic communication, someone uses Laptop A to visit a particular webpage containing sensitive information
that needs to be securely transmitted to Laptop B. Meanwhile, the person operating Laptop B clicks a button in the server
script to initiate listening for incoming audio streams from Laptop A. When Laptop A tries to access the webpage, the server
script intercepts the request and prepares an audio file containing the encrypted version of the requested HTML content. It
then sends this audio file to Laptop B using the network communication module. Upon receipt, Laptop B plays the audio file,
decodes the hidden message using the steganography module, and finally displays the original HTML content in its web browser.
Throughout this entire process, neither laptop ever exchanges plaintext data over the network, making it virtually
impossible for anyone eavesdropping on the traffic to intercept or tamper with the sensitive information being transmitted.
---
Please note that these implementations assume that the audio files used contain only mono channels and 16-bit samples.
If your use case involves stereo audio or higher bit depths, you may need to modify these functions accordingly.
Also, make sure to install the necessary libraries like `socket requests scipy numpy librosa` before running the script.
Note that this code has not been optimized for efficiency yet.
You can run the scripts using the following commands:
On the client machine:
``python client.py https://example.com``
On the server machine :
``python server.py``
This will start a web server on port 3000 that listens for incoming requests and forwards them to another computer at `192.168.1.101`.
The IP address of this other computer should be changed accordingly when testing on different networks.
After implementing these methods, sending an HTTP request from one laptop to the server controlled by another laptop will forward the request to the intended recipient. The receiving laptop will play the received audio file, decode the hidden message using the steganography algorithm, and display the original data in its web browser.
Throughout this process, no plaintext data is transferred over the network, ensuring the security of the transmission.
import scipy.io.wavfile as wavfile
def load_audio(filename):
rate, data = wavfile.read(filename)
return data, rate
def save_audio(data, filename, rate=None):
if rate is None:
rate = len(data) // len(data) * 1000
wavfile.write(filename, data, rate)
import socket
import requests
from audio_processing import AudioProcessor
from steganography import Steganographer
from network import NetworkCommunicator
config = {"ip": "127.0.0.1", "port": 3000}
class Client(object):
def __init__(self, ip=config["ip"], port=config["port"]):
self.ip = ip
self.port = port
self.ap = AudioProcessor()
self.nc = NetworkCommunicator()
self.sg = Steganographer()
def get_config(self):
if config["ip"] == "" or config["port"] == 0:
raise ValueError("IP address and port must be provided.")
return config
def send_request(self, url):
req = requests.get(url)
audio = self.ap.encode_data_into_audio(req.headers, req.content)
config = self.get_config()
self.send_audio_to_server(audio, config['ip'], config['port'])
def receive_response(self, server_address):
audio = self.receive_audio_from_server(server_address)
config = self.get_config()
resp = self.sg.decode_data_from_audio(audio, config['ip'], config['port'])
return resp
def send_audio_to_server(self, audio, host=None, port=None):
host = host or self.ip
port = port or self.port
config = self.get_config()
if not (host == config['ip'] and port == config['port']):
raise ValueError("Invalid host or port specified.")
self.nc.send_data_to_remote_host(audio, host, port)
def receive_audio_from_server(self, server_address):
host, port = server_address
config = self.get_config()
if not (host == config['ip'] and port == config['port']):
raise ValueError("Invalid host or port specified.")
return self.nc.receive_data_from_remote_host(host, port)
if __name__ == "__main__":
c = Client()
url = input("Enter URL: ")
print("\nSending request...")
c.send_request(url)
resp = c.receive_response((config['ip'], config['port']))
print(f"\nResponse status: {resp.status}")
print(f"Response reason: {resp.reason}")
print(f"Response text: {resp.text}")
import socket
class NetworkCommunicator:
def send_data_to_remote_host(self, data, host, port):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect((host, port))
sock.sendall(data)
sock.close()
def receive_data_from_remote_host(self, host, port):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind((host,port))
sock.listen(1)
conn, addr = sock.accept()
data = conn.recv(2 ** 15)
conn.close()
return data
import socket
from http.server import BaseHTTPRequestHandler, HTTPServer
import scipy.io.wavfile
import numpy as np
from audio_processing import AudioProcessor
from steganography import Steganographer
from network import NetworkCommunicator
class Server(BaseHTTPRequestHandler):
def do_GET(self):
pass
def do_POST(self):
content_length = int(self.headers["Content-Length"])
body = self.rfile.read(content_length).decode('utf-8')
addr = self.client_address
nc = NetworkCommunicator()
nc.send_data_to_remote_host(body.encode(), addr[0], addr[1])
while True:
remote_data = nc.receive_data_from_remote_host(addr[0], addr[1])
if len(remote_data) > 0:
self.send_response(remote_data)
break
def send_response(self, data):
self.send_response(200)
self.send_header('Content-Type', 'audio/x-wave')
self.send_header('Content-Length', str(len(data)))
self.end_headers()
self.wfile.write(data)
def run_server():
HOST = "127.0.0.1"
PORT = 3001
server_address=(HOST, PORT)
print(f"Serving on {HOST}:{PORT}")
handler=Server
httpd=HTTPServer(server_address, handler)
try:
httpd.serve_forever()
except KeyboardInterrupt:
pass
if __name__ == "__main__":
run_server()
import numpy as np
from scipy.signal import stft
from audio_processing import AudioProcessor
def get_melspectrogram(y, sr=22050, n_fft=1024, hop_length=512, power=True):
S = np.abs(np.fft.rfft(y, n=n_fft)) ** 2
mel = melspectrogram(y, sr=sr, n_fft=n_fft, hop_length=hop_length)
if power:
return np.log(1 + S / max(mel))
else:
return mel
def embed_data(spectrum, data, threshold=-np.inf):
masked_spectrum = spectrum * (data < threshold)
embedded_spectrum = np.clip(masked_spectrum + 1e-7, 0, 255).astype(int)
return embedded_spectrum.round()
def extract_data(spectrum):
threshold = -spectrum.min() // 2
extracted_data = spectrum.copy()
extracted_data[extracted_data >= threshold] = 0
return extracted_data.sum()
class Steganographer(object):
def __init__(self):
pass
@staticmethod
def decode_data_from_audio(audio_bytes):
sample_rate, channel_count = AudioProcessor.get_sample_rate_and_channel_count(audio_bytes)
fft_size = 1024
stft_window = np.hamming(fft_size)
hop_length = int(fft_size // 2)
spectrogram = np.apply_along_axis(lambda x: stft(x, fft_size, hop_length, window=stft_window), axis=-1, x=audio_bytes[:, 0])
threshold = -spectrogram.min() // 2
extracted_data = spectrogram.copy()
extracted_data[extracted_data >= threshold] = 0
return extracted_data.sum()
@staticmethod
def encode_data_into_audio(headers, data):
mime_type = headers['content-type']
audio_file, _ = AudioProcessor.load_audio(headers['content-location'])
frame_duration = len(audio_file) // len(audio_file) * 1000
channel_count = audio_file.shape[1]
num_frames = int(np.ceil(float(data) / (2**16)))
new_audio_data = np.zeros(shape=(num_frames, 2), dtype=np.uint16)
new_spectrogram = np.zeros(shape=(num_frames, )*channel_count, dtype=np.complex128)
for i in range(num_frames):
offset = i * (frame_duration * sample_rate // 1000)
current_frame = audio_file[i][0].flat[offset:(offset+frame_duration)]
new_audio_data[i] = current_frame + ((data >> (i * 16)) & 0xffff)
y, _ = librosa.feature.melspectrogram_std(new_audio_data[i], sr=sample_rate, n_fft=1024, hop_length=512, power=True)
new_spectrogram[i] = y * (data << (i * 16))
AudioProcessor.save_modified_audio(headers['content-location'], new_audio_data.astype(np.int16))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment