Skip to content

Instantly share code, notes, and snippets.

@lukesnow
Created August 16, 2015 03:45
Show Gist options
  • Save lukesnow/f8d5371616d51869ef34 to your computer and use it in GitHub Desktop.
Save lukesnow/f8d5371616d51869ef34 to your computer and use it in GitHub Desktop.
tcp_test_client_v2.py
# credits
# https://github.com/roger-/pyrtlsdr/blob/master/rtlsdr/rtlsdr.py
# https://gist.github.com/simeonmiteff/3792676
# http://stackoverflow.com/questions/23732930/how-to-check-if-all-data-are-received-with-a-tcp-socket-in-python?rq=1
import socket
import struct
import numpy as np
import sound
import wave
import sys
SET_FREQUENCY = 0x01
SET_SAMPLERATE = 0x02
SET_GAINMODE = 0x03 # Tuner agc
SET_GAIN = 0x04
SET_FREQENCYCORRECTION = 0x05 # in ppm?
def firwin(M, Fc):
# M is number of taps.
# Fc is cutoff frequency.
I = np.linspace(0,M)
h = np.sinc(2*Fc*(I-M/2))*(0.42-0.5*np.cos(2*np.pi*I/M)+0.08*np.cos(4*np.pi*I/M))
return h/np.sum(h)
global samp_rate, seconds, station_freq, audio_sampleRate
samp_rate = int(2.4*1e6) #2.4 MSPS; 2,400,000/48,000 = 50
seconds = 10
station_freq = 125807277 # this frequency is for KGO, with ham it up turned on. it has been tested.
audio_sampleRate = 48000.0
class RtlTCP(object):
def __init__(self):
self.remote_host = "192.168.1.8"
self.remote_port = 10021
self.conn = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.conn.connect((self.remote_host, self.remote_port))
self.__send_command(SET_SAMPLERATE, samp_rate)
def tune(self, freq):
self.__send_command(SET_FREQUENCY, freq)
def gain(self,gain):
self.__send_command(SET_GAIN, gain)
def __send_command(self, command, parameter):
cmd = struct.pack(">BI", command, parameter)
self.conn.send(cmd)
def close(self):
self.close = self.conn.close()
if __name__=="__main__":
from itertools import izip
sdr = RtlTCP()
sdr.tune(station_freq)
sdr.gain(15)
# throw away first few KB
for idx in range(0,3):
bytes = sdr.conn.recv(1024)
# read byte string from connection, convert to unsigned 8 bit int.
bytes = np.fromstring(sdr.conn.recv(samp_rate*seconds*2, socket.MSG_WAITALL), dtype=np.uint8)
sdr.close()
# create complex array, separate I and Q samples
dsb = int(samp_rate/audio_sampleRate) # Down Sample By this factor
div = 2*dsb # split IQ array, also downsample by dsb.
iq = np.empty(len(bytes)//div, 'complex')
iq.real, iq.imag = bytes[::div], bytes[1::div]
# normalize to between (-1,1),
iq /= (255/2)
iq -= (1 + 1j)
signal = np.abs(iq)
h = firwin(140,(4000/audio_sampleRate))
signal = np.convolve(signal,h)
# sound.play_effect(np.abs(iq))
# duration = 10
volume = 100
numChan = 1
dataSize = 2 # 1 for 8 bit, 2 for 16 bit?
signal = (32767*volume/100.0*signal).astype(np.int16)
print signal
print len(signal)
# print(firwin(140, 0.1))
wavfile = wave.open('test.wav', 'w')
wavfile.setparams((numChan, dataSize, audio_sampleRate, len(signal), "NONE", "Uncompressed"))
wavfile.writeframes(signal)
wavfile.close()
sound.play_effect('test.wav')
# generate wav file containing sine waves
#import math, wave, array
#duration = 3 # seconds
#freq = 440 # of cycles per second (Hz) (frequency of the sine waves)
#volume = 100 # percent
#data = array.array('h') # signed short integer (-32768 to 32767) data
#sampleRate = 44100.0 # of samples per second (standard)
#numChan = 1 # of channels (1: mono, 2: stereo)
#dataSize = 2 # 2 bytes because of using signed short integers => bit depth = 16
#numSamplesPerCyc = (sampleRate / freq)
#numSamples = int(sampleRate * duration)
#for i in range(numSamples):
# sample = 32767 * float(volume) / 100
# sample *= math.sin(math.pi * 2 * (i / numSamplesPerCyc))
# data.append(int(sample))
#f = wave.open('440hz.wav', 'w')
#f.setparams((numChan, dataSize, sampleRate, numSamples, "NONE", "Uncompressed"))
#f.writeframes(data.tostring())
#f.close()
#import sound
#sound.play_effect('440hz.wav')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment