Skip to content

Instantly share code, notes, and snippets.

@davidhoness
Created May 13, 2019 13:11
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save davidhoness/b16a666403c46bfbe159fa2ec907df1d to your computer and use it in GitHub Desktop.
Save davidhoness/b16a666403c46bfbe159fa2ec907df1d to your computer and use it in GitHub Desktop.
from __future__ import division
from pysstv.sstv import SSTV
from pysstv.color import PD120
from itertools import islice
from PIL import Image
from shapely.geometry import Polygon, Point
import struct
import serial
import pyaudio
import glob
import time
import qrcode
import uuid
import datetime
import ephem
import math
import os
import sys
W = 0
H = 1
class py_audio_sstv(object):
def __init__(self, sstv):
self.pa = pyaudio.PyAudio()
self.sstv = sstv
self.fmt = '<' + SSTV.BITS_TO_STRUCT[sstv.bits]
def __del__(self):
self.pa.terminate()
def execute(self, keep_playing=None):
self.sampler = self.sstv.gen_samples()
stream = self.pa.open(
format=self.pa.get_format_from_width(self.sstv.bits // 8),
channels=1,
rate=self.sstv.samples_per_sec,
output=True,
stream_callback=self.callback)
stream.start_stream()
while stream.is_active():
if keep_playing is not None and not keep_playing():
break
time.sleep(0.5)
stream.stop_stream()
stream.close()
def callback(self, in_data, frame_count, time_info, status):
frames = bytes().join(struct.pack(self.fmt, b) for b in islice(self.sampler, frame_count))
return frames, pyaudio.paContinue
class sstv_event(object):
def __init__(self, sstv_mode, off_time_seconds, in_image_path_glob, out_image_path, tle_file, log_file, keep_out_zones=[], shutdown_time=None, begin_transmission_cb=None, end_transmission_cb=None):
self.sstv_mode = sstv_mode
self.off_time_seconds = off_time_seconds
self.in_image_path_glob = in_image_path_glob
self.out_image_path = out_image_path
self.tle_file = tle_file
self.log_file = log_file
self.keep_out_zones = keep_out_zones
self.shutdown_time = shutdown_time
self.begin_transmission_cb = begin_transmission_cb
self.end_transmission_cb = end_transmission_cb
self.last_txing_time_utc = datetime.datetime.utcnow() - datetime.timedelta(seconds=self.off_time_seconds)
self.validate_images()
self.validate_keep_out_zones()
self.validate_path(self.out_image_path)
self.load_tle()
def validate_images(self):
self.images = []
for image_file in glob.glob(self.in_image_path_glob):
try:
img = Image.open(image_file)
self.images.append(image_file)
except Exception as e:
print(e)
if len(self.images) == 0:
print("no images found")
sys.exit(0)
def validate_keep_out_zones(self):
for zone in self.keep_out_zones:
if not type(zone) is Polygon or not zone.is_valid:
print("invalid polygon in keep out zones")
print(zone)
sys.exit(0)
def validate_path(self, p):
if not os.path.exists(p):
os.makedirs(p)
def load_tle(self):
with open(self.tle_file) as f:
lines = f.readlines()
self.tle = ephem.readtle(lines[0], lines[1], lines[2])
self.tle.compute()
@property
def running(self):
result = True
if self.shutdown_time is not None:
result = datetime.datetime.utcnow() < self.shutdown_time
return result
def load_size_image(self, image_file):
img = Image.open(image_file)
if img.size[W] * img.size[H] != self.sstv_mode.WIDTH * self.sstv_mode.HEIGHT:
img = img.resize((self.sstv_mode.WIDTH, self.sstv_mode.HEIGHT), Image.ANTIALIAS)
return img
def create_qr_code(self, data):
qr = qrcode.QRCode(
version=2,
error_correction=qrcode.constants.ERROR_CORRECT_H, # max redundancy
box_size=4, # min for clear contrast in decoded pictures
border=2)
qr.add_data(data)
qr.make(fit=True)
return qr.make_image()
def overlay_qr_codes(self, main_img, uid):
qr_img = self.create_qr_code(uid)
qr_count = main_img.size[H] // qr_img.size[H]
spacer = (main_img.size[H] - (qr_img.size[H] * qr_count)) // qr_count
qr_box = Image.new("RGB", (qr_img.size[W], main_img.size[H]), (255, 255, 255))
main_img.paste(qr_box, (main_img.size[W] - qr_img.size[W], 0))
for i in range(qr_count):
offset = i * (qr_img.size[H] + spacer)
offset += spacer // 2
main_img.paste(qr_img, (main_img.size[W] - qr_img.size[W], offset))
qr_img = qr_img.transpose(Image.ROTATE_90) # rotate to mitigate interference
def clear_to_transmit(self):
result = self.running
# off time check
if result and datetime.datetime.utcnow() < self.last_txing_time_utc + datetime.timedelta(seconds=self.off_time_seconds):
result = False
# keep out zone check
if result and len(self.keep_out_zones) > 0:
self.tle.compute()
p = Point(math.degrees(self.tle.sublat), math.degrees(self.tle.sublong))
for zone in self.keep_out_zones:
if zone.contains(p):
result = False
break
return result
def get_timestamp_location(self):
t = datetime.datetime.utcnow()
self.tle.compute(t)
lat = round(math.degrees(self.tle.sublat), 4)
long = round(math.degrees(self.tle.sublong), 4)
return t, lat, long
def run_call_back(self, cb):
if cb is not None:
cb()
def execute(self):
self.log = open(self.log_file, "w")
while self.running:
for image_file in self.images:
while not self.clear_to_transmit() and self.running:
print("waiting for clear to transmit")
time.sleep(1)
if not self.running:
break
# create unique ID for the image
uid = str(uuid.uuid4())[:8]
# load and resize the image if needed
main_img = self.load_size_image(image_file)
# repeat QR code down right side of image
self.overlay_qr_codes(main_img, uid)
# save reference image to disk
tx_file = os.path.join(self.out_image_path, "%s.jpg" % uid)
main_img.save(tx_file, "JPEG")
# convert image to sstv data
sstv = self.sstv_mode(main_img, 48000, 16)
sstv.vox_enabled = True
# capture start time and location
start_time, start_lat, start_long = self.get_timestamp_location()
# turn on radio
self.run_call_back(self.begin_transmission_cb)
# play sstv audio and stop if clear to transmit becomes False
py_audio_sstv(sstv).execute(self.clear_to_transmit)
# turn off radio
self.run_call_back(self.end_transmission_cb)
# capture stop time and location
stop_time, stop_lat, stop_long = self.get_timestamp_location()
# write to log file
self.log.write(",".join((uid, str(start_time), str(start_lat), str(start_long), str(stop_time), str(stop_lat), str(stop_long), tx_file, str(self.clear_to_transmit()))))
self.log.write("\n")
self.log.flush()
# enforce off time by updating last_txing_time_utc
self.last_txing_time_utc = stop_time
self.log.close()
# very rough polygon
china = Polygon([
(48.74, 87.17),
(39.09, 74.05),
(33.25, 79.13),
(28.24, 86.44),
(29.58, 95.74),
(26.33, 98.82),
(24.29, 97.89),
(21.85, 100.82),
(23.57, 105.32),
(21.65, 108.14),
(23.04, 116.23),
(27.10, 120.31),
(30.63, 122.08),
(39.81, 124.09),
(46.87, 133.90),
(53.37, 121.73),
(46.57, 119.57),
(41.64, 105.26),
(42.75, 96.28),
(45.30, 90.76)])
def start_radio():
print("start radio")
# os.system("linux command")
with serial.Serial(port="/dev/ttyAMA0", baudrate=115200) as ser:
ser.write(b"set_mode FM\r\n")
ser.write(b"set_freq 145800000\r\n")
ser.write(b"set_ptt TX\r\n")
ser.flush()
def stop_radio():
print("stop radio")
# os.system("linux command")
with serial.Serial(port="/dev/ttyAMA0", baudrate=115200) as ser:
ser.write(b"set_ptt RX\r\n")
ser.flush()
ev = sstv_event(sstv_mode=PD120,
off_time_seconds=120,
in_image_path_glob="/usr/share/rpd-wallpaper/*.jpg",
out_image_path="/home/pi/qsstv/tx_sstv",
tle_file="iss.tle",
log_file="log.csv",
keep_out_zones=[china],
shutdown_time=datetime.datetime(2099, 12, 31, 18, 0, 0),
begin_transmission_cb=start_radio,
end_transmission_cb=stop_radio)
ev.execute()
@davidhoness
Copy link
Author

davidhoness commented May 13, 2019

sudo apt-get install gcc-arm-linux-gnueabi python3-pyaudio python3-qrcode python3-shapely
sudo pip3 install ephem pysstv

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment