Skip to content

Instantly share code, notes, and snippets.

@elwint
Last active December 21, 2017 23:24
Show Gist options
  • Save elwint/b7ab354a6924df820a624710b06ae40a to your computer and use it in GitHub Desktop.
Save elwint/b7ab354a6924df820a624710b06ae40a to your computer and use it in GitHub Desktop.
Messenger between VM's via CPU execution time
#!/usr/bin/python3
#
# Usage: virt_messenger.py [--debug]
# Support for sending text, however chance of receiving correct characters: ~5-15%.
# Chance could maybe still be improved by sending slower and/or changing the execution time.
import os
import time
import threading
import multiprocessing as mp
import random
import psutil
import sys
def read():
bits = [0,0,0, 0,0,0,0,0,0,0, 0,0,0]
t = threading.currentThread()
letter = ''
text = ''
while getattr(t, 'do_run', True):
s = time.time()
random.sample(range(1000000000), n)
e = time.time()
if (e-s)-i < 0.0001:
if len(sys.argv) > 1 and sys.argv[1] == '--debug':
print('\r[reading: 1 ('+str((e-s)-i)+')] ', end='')
bits.pop(0)
bits.append(1)
else:
if len(sys.argv) > 1 and sys.argv[1] == '--debug':
print('\r[reading: 0 ('+str((e-s)-i)+')] ', end='')
bits.pop(0)
bits.append(0)
out = 0
if bits[0] == 1 and bits[1] == 0 and bits[2] == 1 and bits[10] == 1 and bits[11] == 0 and bits[12] == 1:
for bit in bits[3:10]:
out = (out << 1) | bit
#if chr(out).isalpha():
letter = chr(out)
text += letter
print('\r'+"Received: "+text+'\n\n> ', end='')
elif bits[0] == 1 and bits[1] == 1 and bits[2] == 1 and bits[10] == 1 and bits[11] == 0 and bits[12] == 1:
for bit in bits[3:10]:
out = (out << 1) | bit
#if chr(out).isalpha():
if chr(out) != letter:
text += chr(out)
print('\r'+"Received: "+text+'\n\n> ', end='')
elif bits.count(1) == 13:
text = ''
if e-s < 0.1:
time.sleep(0.1-(e-s))
def send(text):
for c in text:
b = bin(ord(c))
print('Sending bits: '+b[2:]+'\n')
send_bits('101'+b[2:]+'101')
send_bits('111'+b[2:]+'101')
send_bits('1111111111111')
def send_bits(bits):
for bit in bits:
if bit == '1':
procs = list()
for x in range(0, psutil.cpu_count()):
p = mp.Process(target=loop, args=([x], 0.1))
p.start()
procs.append(p)
for p in procs:
p.join()
else:
time.sleep(0.1)
def loop(affinity, seconds):
proc = psutil.Process()
proc.cpu_affinity(affinity)
t = time.time() + seconds
while time.time() < t:
pass
def get_input():
while True:
message = input('> ')
if message == 'exit':
os._exit(0)
if message != '':
return message
print('Invalid message\n')
e = 0
s = 0
n = 1
i = 0
while True:
t = threading.Thread(target=read)
t.start()
# Find variable n such that execution time is between about 0.0002 and 0.00025 seconds
while True:
if e-s < 0.0002:
n = n * 2
s = time.time()
random.sample(range(1000000000), n)
e = time.time()
elif e-s > 0.00025:
n -= int(n/1.5)
s = time.time()
random.sample(range(1000000000), n)
e = time.time()
else:
# Checking if n is valid 100 times in a row
for x in range(0,100):
s = time.time()
random.sample(range(1000000000), n)
e = time.time()
if e-s < 0.0002 or e-s > 0.00025:
break
else: # Means "no break", thus succeeded
break
continue
i = e-s
message = get_input()
t.do_run = False
t.join()
send(message)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment