Skip to content

Instantly share code, notes, and snippets.

@xssfox
Last active September 18, 2022 06:56
Show Gist options
  • Save xssfox/3fc32fca37e91478b3a25610c84511aa to your computer and use it in GitHub Desktop.
Save xssfox/3fc32fca37e91478b3a25610c84511aa to your computer and use it in GitHub Desktop.
SSTV Stream
"""
Requires an RTSP stream (I used rtsp-simple-server) and rigctld running
requirements pyaudio, pillow, numpy, pysstv, opencv-python
Update output_device_index to be your audio sink (I use pulse)
"""
import cv2
from PIL import Image, ImageOps, ImageDraw, ImageFont
import numpy as np
from pysstv import color, grayscale
import pyaudio
import io
import time
import socket
import logging
import sys, traceback
mode = color.Robot36
class Rigctld:
""" rotctld (hamlib) communication class """
# Note: This is a massive hack.
def __init__(self, hostname="localhost", port=4532, poll_rate=5, timeout=5):
""" Open a connection to rotctld, and test it for validity """
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.sock.settimeout(timeout)
self.hostname = hostname
self.port = port
self.connect()
logging.debug(f"Rigctl intialized")
def get_model(self):
""" Get the rotator model from rotctld """
model = self.send_command(b"_")
return model
def connect(self):
""" Connect to rotctld instance """
self.sock.connect((self.hostname, self.port))
model = self.get_model()
if model == None:
# Timeout!
self.close()
raise Exception("Timeout!")
else:
return model
def close(self):
self.sock.close()
def send_command(self, command):
"""Send a command to the connected rotctld instance,
and return the return value.
"""
self.sock.sendall(command + b"\n")
try:
return self.sock.recv(1024)
except:
return None
def ptt_enable(self):
logging.debug(f"PTT enabled")
self.send_command(b"T 1")
def ptt_disable(self):
logging.debug(f"PTT disabled")
self.send_command(b"T 0")
def get_image():
cap = cv2.VideoCapture('rtsp://streamip:8554/mystream')
ret, frame = cap.read()
img = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
return Image.fromarray(img)
def generate_sstv(image):
wav_bytes = io.BytesIO()
mode(image,48000,16).write_wav(wav_bytes)
wav_bytes.seek(0)
audio = wav_bytes
return audio
rig = Rigctld()
p = pyaudio.PyAudio()
stream = p.open(format = pyaudio.paInt16,
channels = 1,
rate = 48000,
output = True,
output_device_index=13
)
try:
while(1):
image = get_image()
# resize
image = image.resize((image.width, image.height))
image = ImageOps.contain(image, (mode.WIDTH, mode.HEIGHT) )
image = ImageOps.pad(image, (mode.WIDTH, mode.HEIGHT))
audio = generate_sstv(image)
rig.ptt_enable()
while(audio):
data = audio.read(24)
if data:
stream.write(data)
else:
break
#stream.write(audio)
# play_obj = sa.play_buffer(audio, 1, 2, 48000)
# play_obj.wait_done()
rig.ptt_disable()
time.sleep(25)
except:
traceback.print_exc(file=sys.stderr)
rig.ptt_disable()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment