Skip to content

Instantly share code, notes, and snippets.

@TheSkorm
Last active June 12, 2021 03:10
Show Gist options
  • Save TheSkorm/55852a5e67300bb581a407b3234f41e2 to your computer and use it in GitHub Desktop.
Save TheSkorm/55852a5e67300bb581a407b3234f41e2 to your computer and use it in GitHub Desktop.
from subprocess import *
import time
import os
from datetime import datetime
import fcntl
import wave
FREQUENCY = 438025000
BANDWIDTH = 20000
SQUELCH = 100
GAIN = 50
SLEEP_TIME = 1 # seconds before creating a new file
FM_COMMAND = ["rtl_fm","-d","port4","-l",str(SQUELCH),"-g",str(GAIN),"-f",str(FREQUENCY),"-M","fm","-s",str(BANDWIDTH),"-E", "deemp", "-"]
SOX_COMMAND = ["sox", "-r", str(BANDWIDTH), "-t", "raw", "-e", "s" ,"-b", "16", "-c", "1", "-V1", "-",
"-t", "raw", "-e", "s", "-b", "16", "-c", "1", "-", "sinc","-n","2048","300-6000", "gain", "6"]
def non_block_read(output):
fd = output.fileno()
fl = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
try:
return output.read()
except:
return ""
file_object = None
with Popen(FM_COMMAND, shell=False, stdout=PIPE, bufsize=0) as rtl_fm:
with Popen(SOX_COMMAND, shell=False, stdout=PIPE, stdin=rtl_fm.stdout, bufsize=0) as sox:
while 1:
buffer = non_block_read(sox.stdout)
if not file_object and buffer:
print("openning new log")
file_object = wave.open(f"rtl-fm-monitor-{datetime.now().isoformat().replace(':','')}.wav", "wb")
file_object.setnchannels(1)
file_object.setsampwidth(2)
file_object.setframerate(BANDWIDTH)
if file_object and not buffer:
print("stopped")
file_object.close()
file_object = None
if buffer:
print("logging")
file_object.writeframesraw(buffer)
time.sleep(SLEEP_TIME)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment