-
-
Save poconbhui/ab3ea6c5b827c969bd0cc8d76575a0a2 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# | |
# DS4 report stuff | |
# | |
import struct | |
from sys import stdin, argv | |
import os | |
from io import FileIO | |
import signal | |
import time | |
import ctypes | |
hidraw_file = argv[1] if len(argv) > 1 else "/dev/hidraw5" | |
print("hidraw_file:", hidraw_file) | |
hiddev = os.open(hidraw_file, os.O_RDWR | os.O_NONBLOCK) | |
pf = FileIO(hiddev, "wb+", closefd=False) | |
#pf=open("ds_my.bin", "wb+") | |
rumble_weak = 0 | |
rumble_strong = 0 | |
r = 0 | |
g = 0 | |
b = 10 | |
crc = b'\x00\x00\x00\x00' | |
volume_speaker = 80 | |
volume_l = 60 | |
volume_r = 60 | |
unk2 = 100 | |
unk3 = 100 | |
flash_bright = 0 | |
flash_dark = 0 | |
#audio_header = b'\x24' | |
audio_header = b'\x24' | |
def frame_number(inc): | |
res = struct.pack("<H", frame_number.n) | |
frame_number.n += inc | |
if frame_number.n > 0xffff: | |
frame_number.n = 0 | |
return res | |
frame_number.n = 0 | |
def joy_data(): | |
data = [0xff,0x4,0x00] | |
global volume_r,volume_unk2, unk3 | |
data.extend([rumble_weak,rumble_strong,r,g,b,flash_bright,flash_dark]) | |
data.extend([0]*8) | |
data.extend([volume_l,volume_r,unk2,volume_speaker,unk3]) | |
return data | |
def _11_report(): | |
data = joy_data() | |
data.extend([0]*(48)) | |
return b'\x11\xC0\x20' + bytearray(data) + crc | |
def _14_report(audo_data): | |
return b'\x14\x40\xA0'+ frame_number(2) + b'\x02'+ audo_data + bytearray(40) | |
def _15_report(audo_data): | |
data = joy_data(); | |
data.extend([0]*(52)) | |
return b'\x15\xC0\xA0' + bytearray(data)+ frame_number(2) + b'\x02' + audo_data + bytearray(29) | |
def _17_report(audo_data): | |
return ( | |
b'\x17\x40\xA0' | |
+ frame_number(4) | |
+ audio_header | |
+ audo_data | |
+ bytearray(452 - len(audo_data)) + crc | |
) | |
# | |
# Gstreamer stuff | |
# | |
import gi | |
gi.require_version('Gst', '1.0') | |
gi.require_version('GstBase', '1.0') | |
gi.require_version('Gtk', '3.0') | |
from gi.repository import Gst, GstBase, GObject, Gtk | |
from threading import Thread | |
from multiprocessing import Pool | |
import subprocess | |
import sys | |
Gst.init(None) | |
class SBCFrameHeaderParser(object): | |
MONO = 0 | |
DUAL_CHANNEL = 1 | |
STEREO = 2 | |
JOINT_STEREO = 3 | |
def __init__(self): | |
pass | |
def parse(self, raw_header): | |
# Info in SBC headers from | |
# https://tools.ietf.org/html/draft-ietf-avt-rtp-sbc-01#section-6.3 | |
# Syncword should be 0x9C | |
self.syncword = raw_header[0] | |
self.nrof_subbands = \ | |
SBCFrameHeaderParser.parse_number_of_subbands( | |
raw_header | |
) | |
self.channel_mode = SBCFrameHeaderParser.parse_channel_mode( | |
raw_header | |
) | |
self.nrof_channels = 2 | |
if self.channel_mode == SBCFrameHeaderParser.MONO: | |
self.nrof_channels = 1 | |
self.nrof_blocks = SBCFrameHeaderParser.parse_number_of_blocks( | |
raw_header | |
) | |
self.join = 0 | |
if self.channel_mode == SBCFrameHeaderParser.JOINT_STEREO: | |
self.join = 1 | |
self.nrof_subbands = \ | |
SBCFrameHeaderParser.parse_number_of_subbands( | |
raw_header | |
) | |
self.bitpool = SBCFrameHeaderParser.parse_bitpool(raw_header) | |
self.sampling_frequency = SBCFrameHeaderParser.parse_sampling( | |
raw_header | |
) | |
# Calculate frame length | |
def ceildiv(a, b): | |
return -(-a // b) | |
if ( | |
(self.channel_mode == SBCFrameHeaderParser.MONO) | |
or (self.channel_mode == SBCFrameHeaderParser.DUAL_CHANNEL) | |
): | |
self.frame_length = ( | |
4 + ( | |
4 | |
* self.nrof_subbands | |
* self.nrof_channels | |
)//8 | |
+ ceildiv( | |
self.nrof_blocks | |
* self.nrof_channels | |
* self.bitpool, | |
8 | |
) | |
) | |
else: | |
self.frame_length = ( | |
4 + ( | |
4 | |
* self.nrof_subbands | |
* self.nrof_channels | |
)//8 | |
+ ceildiv( | |
self.join | |
* self.nrof_subbands | |
+ self.nrof_blocks | |
* self.bitpool, | |
8 | |
) | |
) | |
# Calculate bit rate | |
self.bit_rate = ( | |
8 * self.frame_length * self.sampling_frequency | |
// self.nrof_subbands // self.nrof_blocks | |
) | |
def print_values(self): | |
# Info in SBC headers from | |
# https://tools.ietf.org/html/draft-ietf-avt-rtp-sbc-01#section-6.3 | |
print("syncword: ", self.syncword) | |
print("nrof_subbands", self.nrof_subbands) | |
print("channel_mode", [ | |
"MONO", "DUAL_CHANNEL", "STEREO", "JOINT_STEREO" | |
][self.channel_mode] | |
) | |
print("nrof_channels", self.nrof_channels) | |
print("nrof_blocks", self.nrof_blocks) | |
print("join: ", self.join) | |
print("nrof_subbands", self.nrof_subbands) | |
print("bitpool", self.bitpool) | |
print("sampling_frequency", self.sampling_frequency) | |
print("frame_length", self.frame_length) | |
print("bit_rate", self.bit_rate) | |
@staticmethod | |
def parse_sampling(raw_header): | |
sf_word = raw_header[1] | |
# Find sampling frequency from rightmost 2 bits | |
if sf_word & 0x80 == 0x80: | |
bit_0 = 1 | |
else: | |
bit_0 = 0 | |
if sf_word & 0x40 == 0x40: | |
bit_1 = 1 | |
else: | |
bit_1 = 0 | |
if (bit_0 == 0) and (bit_1 == 0): | |
sampling_frequency = 16000 | |
elif (bit_0 == 0) and (bit_1 == 1): | |
sampling_frequency = 32000 | |
elif (bit_0 == 1) and (bit_1 == 0): | |
sampling_frequency = 44100 | |
elif (bit_0 == 1) and (bit_1 == 1): | |
sampling_frequency = 48000 | |
return sampling_frequency | |
@staticmethod | |
def parse_number_of_blocks(raw_header): | |
nb_word = raw_header[1] | |
if nb_word & 0x20 == 0x20: | |
bit_0 = 1 | |
else: | |
bit_0 = 0 | |
if nb_word & 0x10 == 0x10: | |
bit_1 = 1 | |
else: | |
bit_1 = 0 | |
if (bit_0 == 0) and (bit_1 == 0): | |
number_of_blocks = 4 | |
elif (bit_0 == 0) and (bit_1 == 1): | |
number_of_blocks = 8 | |
elif (bit_0 == 1) and (bit_1 == 0): | |
number_of_blocks = 12 | |
elif (bit_0 == 1) and (bit_1 == 1): | |
number_of_blocks = 16 | |
return number_of_blocks | |
@staticmethod | |
def parse_channel_mode(raw_header): | |
ch_word = raw_header[1] | |
if ch_word & 0x08 == 0x08: | |
bit_0 = 1 | |
else: | |
bit_0 = 0 | |
if ch_word & 0x04 == 0x04: | |
bit_1 = 1 | |
else: | |
bit_1 = 0 | |
if (bit_0 == 0) and (bit_1 == 0): | |
channel_mode = SBCFrameHeaderParser.MONO | |
elif (bit_0 == 0) and (bit_1 == 1): | |
channel_mode = SBCFrameHeaderParser.DUAL_CHANNEL | |
elif (bit_0 == 1) and (bit_1 == 0): | |
channel_mode = SBCFrameHeaderParser.STEREO | |
elif (bit_0 == 1) and (bit_1 == 1): | |
channel_mode = SBCFrameHeaderParser.JOINT_STEREO | |
return channel_mode | |
@staticmethod | |
def parse_number_of_subbands(raw_header): | |
if raw_header[1] & 0x01 == 0x01: | |
number_of_subbands = 8 | |
else: | |
number_of_subbands = 4 | |
return number_of_subbands | |
@staticmethod | |
def parse_bitpool(raw_header): | |
return int(raw_header[2]) | |
class SBCEncoding(object): | |
def __init__( | |
self, | |
rate = 32000, | |
bitpool = 50, | |
channel_mode = "stereo", | |
blocks = 16, | |
subbands = 8, | |
channels = 2 | |
): | |
self.rate = rate | |
self.bitpool = bitpool | |
self.channel_mode = channel_mode | |
self.blocks = blocks | |
self.subbands = subbands | |
self.channels = channels | |
def gst_caps(self): | |
return ( | |
'audio/x-sbc,' | |
+ 'channels=' + str(self.channels) + ', ' | |
+ 'rate=' + str(self.rate) + ', ' | |
+ 'channel-mode=' + str(self.channel_mode) + ', ' | |
+ 'blocks=' + str(self.blocks) + ', ' | |
+ 'subbands=' + str(self.subbands) + ', ' | |
+ 'bitpool=' + str(self.bitpool) | |
) | |
class CallbackSink(GstBase.BaseSink): | |
__gstmetadata__ = ( | |
'CustomSink', 'Sink', 'custom test sink element', 'poconbhui' | |
) | |
__gsttemplates__ = Gst.PadTemplate.new( | |
'sink', | |
Gst.PadDirection.SINK, | |
Gst.PadPresence.ALWAYS, | |
Gst.Caps.new_any() | |
) | |
def __init__(self, callback, *args, **kwargs): | |
super().__init__(*args, **kwargs) | |
self.callback = callback | |
def do_render(self, buffer): | |
data = buffer.extract_dup(0, buffer.get_size()) | |
self.callback(buffer, data) | |
return Gst.FlowReturn.OK | |
class GtkMainThread(Thread): | |
def run(self): | |
Gtk.main() | |
def close(self): | |
Gtk.main_quit(); | |
class GstPulseToSBCPipeline(object): | |
def __init__(self, callback, pulse_sink_name='ds4.monitor'): | |
self.callback = callback | |
self.pulse_sink_name = pulse_sink_name | |
self.player = Gst.Pipeline.new('player') | |
self.pulse_source = Gst.ElementFactory.make( | |
'pulsesrc', instance_name='pulse-source' | |
) | |
self.pulse_source.set_property('device', self.pulse_sink_name) | |
self.player.add(self.pulse_source) | |
self.pulse_resampler = Gst.ElementFactory.make( | |
'audioresample', 'pulse-resampler' | |
) | |
self.player.add(self.pulse_resampler) | |
self.pulse_source.link(self.pulse_resampler) | |
self.pulse_resampler_caps = Gst.ElementFactory.make( | |
'capsfilter', 'pulse-resampler-caps' | |
) | |
self.pulse_resampler_caps.set_property( | |
'caps', Gst.Caps.from_string("audio/x-raw, rate=32000") | |
) | |
self.player.add(self.pulse_resampler_caps) | |
self.pulse_resampler.link(self.pulse_resampler_caps) | |
self.sbc_encoder = Gst.ElementFactory.make( | |
'sbcenc', 'sbc-encoder' | |
) | |
self.player.add(self.sbc_encoder) | |
self.pulse_resampler_caps.link(self.sbc_encoder) | |
self.sbc_encoder_caps = Gst.ElementFactory.make( | |
'capsfilter', 'sbc-encoder-caps' | |
) | |
self.sbc_encoder_caps.set_property( | |
'caps', Gst.Caps.from_string(SBCEncoding().gst_caps()) | |
) | |
self.player.add(self.sbc_encoder_caps) | |
self.sbc_encoder.link(self.sbc_encoder_caps) | |
self.sink = CallbackSink(self.callback) | |
self.player.add(self.sink) | |
self.sbc_encoder_caps.link(self.sink) | |
self.player.set_state(Gst.State.PLAYING) | |
self.gtk_main_thread = GtkMainThread() | |
def start(self): | |
GObject.threads_init() | |
self.gtk_main_thread.start() | |
def stop(self): | |
self.gtk_main_thread.close() | |
class Report17CallbackWriter(object): | |
def __init__(self): | |
self.sbc_frame_data = b'' | |
self.audio_len = 448 | |
self.write_pool = Pool(processes=1) | |
def __call__(self, buffer, data): | |
self.sbc_frame_data += data | |
self.try_write_report() | |
@staticmethod | |
def sigalrm_handler(signum, frame): | |
raise TimeoutError | |
@staticmethod | |
def write_report(audio_data, timeout): | |
signal.signal( | |
signal.SIGALRM, Report17CallbackWriter.sigalrm_handler | |
) | |
try: | |
report = _17_report(audio_data) | |
signal.setitimer(signal.ITIMER_REAL, timeout) | |
try: | |
pf.write(report) | |
except TimeoutError: | |
#print("Timeout Error") | |
pass | |
signal.setitimer(signal.ITIMER_REAL, 0) | |
except TimeoutError: | |
pass | |
except KeyboardInterrupt: | |
pass | |
def try_write_report(self): | |
if len(self.sbc_frame_data) < self.audio_len: | |
return | |
audio_data = b'' | |
sbc_header = SBCFrameHeaderParser() | |
while len(audio_data) < self.audio_len: | |
sbc_header.parse(self.sbc_frame_data) | |
frame_length = sbc_header.frame_length | |
if len(audio_data) + frame_length <= self.audio_len: | |
audio_data += \ | |
self.sbc_frame_data[0:frame_length] | |
self.sbc_frame_data = \ | |
self.sbc_frame_data[frame_length:] | |
else: | |
break | |
maxtime = len(audio_data)/sbc_header.bit_rate*10 | |
self.write_pool.apply_async( | |
Report17CallbackWriter.write_report, | |
(audio_data, maxtime) | |
) | |
# Create pulse source | |
pulse_sink_name = "ds4" | |
pulse_sink_id = subprocess.check_output([ | |
'pactl', 'load-module', 'module-null-sink', | |
'sink_name="{}"'.format(pulse_sink_name), | |
'sink_properties=device.description="{}"'.format( | |
"DualShock\ 4" | |
), | |
]) | |
pulse_sink_id = int(pulse_sink_id) | |
print("pulse sink id:", pulse_sink_id) | |
report_17_callback_writer = Report17CallbackWriter() | |
pipeline = GstPulseToSBCPipeline( | |
report_17_callback_writer, | |
pulse_sink_name = pulse_sink_name + ".monitor" | |
) | |
pipeline.start() | |
try: | |
# Freeze here until ^C | |
signal.pause() | |
except KeyboardInterrupt: | |
pass | |
# Cleanup | |
pipeline.stop() | |
subprocess.check_output([ | |
'pactl', 'unload-module', str(pulse_sink_id) | |
]) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment