Skip to content

Instantly share code, notes, and snippets.

@poconbhui
Forked from a1ien/play.py
Last active May 22, 2021 18:07
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save poconbhui/ab3ea6c5b827c969bd0cc8d76575a0a2 to your computer and use it in GitHub Desktop.
Save poconbhui/ab3ea6c5b827c969bd0cc8d76575a0a2 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
#
# DS4 report stuff
#
import struct
from sys import stdin, argv
import os
from io import FileIO
import signal
import time
import ctypes
hidraw_file = argv[1] if len(argv) > 1 else "/dev/hidraw5"
print("hidraw_file:", hidraw_file)
hiddev = os.open(hidraw_file, os.O_RDWR | os.O_NONBLOCK)
pf = FileIO(hiddev, "wb+", closefd=False)
#pf=open("ds_my.bin", "wb+")
rumble_weak = 0
rumble_strong = 0
r = 0
g = 0
b = 10
crc = b'\x00\x00\x00\x00'
volume_speaker = 80
volume_l = 60
volume_r = 60
unk2 = 100
unk3 = 100
flash_bright = 0
flash_dark = 0
#audio_header = b'\x24'
audio_header = b'\x24'
def frame_number(inc):
res = struct.pack("<H", frame_number.n)
frame_number.n += inc
if frame_number.n > 0xffff:
frame_number.n = 0
return res
frame_number.n = 0
def joy_data():
data = [0xff,0x4,0x00]
global volume_r,volume_unk2, unk3
data.extend([rumble_weak,rumble_strong,r,g,b,flash_bright,flash_dark])
data.extend([0]*8)
data.extend([volume_l,volume_r,unk2,volume_speaker,unk3])
return data
def _11_report():
data = joy_data()
data.extend([0]*(48))
return b'\x11\xC0\x20' + bytearray(data) + crc
def _14_report(audo_data):
return b'\x14\x40\xA0'+ frame_number(2) + b'\x02'+ audo_data + bytearray(40)
def _15_report(audo_data):
data = joy_data();
data.extend([0]*(52))
return b'\x15\xC0\xA0' + bytearray(data)+ frame_number(2) + b'\x02' + audo_data + bytearray(29)
def _17_report(audo_data):
return (
b'\x17\x40\xA0'
+ frame_number(4)
+ audio_header
+ audo_data
+ bytearray(452 - len(audo_data)) + crc
)
#
# Gstreamer stuff
#
import gi
gi.require_version('Gst', '1.0')
gi.require_version('GstBase', '1.0')
gi.require_version('Gtk', '3.0')
from gi.repository import Gst, GstBase, GObject, Gtk
from threading import Thread
from multiprocessing import Pool
import subprocess
import sys
Gst.init(None)
class SBCFrameHeaderParser(object):
MONO = 0
DUAL_CHANNEL = 1
STEREO = 2
JOINT_STEREO = 3
def __init__(self):
pass
def parse(self, raw_header):
# Info in SBC headers from
# https://tools.ietf.org/html/draft-ietf-avt-rtp-sbc-01#section-6.3
# Syncword should be 0x9C
self.syncword = raw_header[0]
self.nrof_subbands = \
SBCFrameHeaderParser.parse_number_of_subbands(
raw_header
)
self.channel_mode = SBCFrameHeaderParser.parse_channel_mode(
raw_header
)
self.nrof_channels = 2
if self.channel_mode == SBCFrameHeaderParser.MONO:
self.nrof_channels = 1
self.nrof_blocks = SBCFrameHeaderParser.parse_number_of_blocks(
raw_header
)
self.join = 0
if self.channel_mode == SBCFrameHeaderParser.JOINT_STEREO:
self.join = 1
self.nrof_subbands = \
SBCFrameHeaderParser.parse_number_of_subbands(
raw_header
)
self.bitpool = SBCFrameHeaderParser.parse_bitpool(raw_header)
self.sampling_frequency = SBCFrameHeaderParser.parse_sampling(
raw_header
)
# Calculate frame length
def ceildiv(a, b):
return -(-a // b)
if (
(self.channel_mode == SBCFrameHeaderParser.MONO)
or (self.channel_mode == SBCFrameHeaderParser.DUAL_CHANNEL)
):
self.frame_length = (
4 + (
4
* self.nrof_subbands
* self.nrof_channels
)//8
+ ceildiv(
self.nrof_blocks
* self.nrof_channels
* self.bitpool,
8
)
)
else:
self.frame_length = (
4 + (
4
* self.nrof_subbands
* self.nrof_channels
)//8
+ ceildiv(
self.join
* self.nrof_subbands
+ self.nrof_blocks
* self.bitpool,
8
)
)
# Calculate bit rate
self.bit_rate = (
8 * self.frame_length * self.sampling_frequency
// self.nrof_subbands // self.nrof_blocks
)
def print_values(self):
# Info in SBC headers from
# https://tools.ietf.org/html/draft-ietf-avt-rtp-sbc-01#section-6.3
print("syncword: ", self.syncword)
print("nrof_subbands", self.nrof_subbands)
print("channel_mode", [
"MONO", "DUAL_CHANNEL", "STEREO", "JOINT_STEREO"
][self.channel_mode]
)
print("nrof_channels", self.nrof_channels)
print("nrof_blocks", self.nrof_blocks)
print("join: ", self.join)
print("nrof_subbands", self.nrof_subbands)
print("bitpool", self.bitpool)
print("sampling_frequency", self.sampling_frequency)
print("frame_length", self.frame_length)
print("bit_rate", self.bit_rate)
@staticmethod
def parse_sampling(raw_header):
sf_word = raw_header[1]
# Find sampling frequency from rightmost 2 bits
if sf_word & 0x80 == 0x80:
bit_0 = 1
else:
bit_0 = 0
if sf_word & 0x40 == 0x40:
bit_1 = 1
else:
bit_1 = 0
if (bit_0 == 0) and (bit_1 == 0):
sampling_frequency = 16000
elif (bit_0 == 0) and (bit_1 == 1):
sampling_frequency = 32000
elif (bit_0 == 1) and (bit_1 == 0):
sampling_frequency = 44100
elif (bit_0 == 1) and (bit_1 == 1):
sampling_frequency = 48000
return sampling_frequency
@staticmethod
def parse_number_of_blocks(raw_header):
nb_word = raw_header[1]
if nb_word & 0x20 == 0x20:
bit_0 = 1
else:
bit_0 = 0
if nb_word & 0x10 == 0x10:
bit_1 = 1
else:
bit_1 = 0
if (bit_0 == 0) and (bit_1 == 0):
number_of_blocks = 4
elif (bit_0 == 0) and (bit_1 == 1):
number_of_blocks = 8
elif (bit_0 == 1) and (bit_1 == 0):
number_of_blocks = 12
elif (bit_0 == 1) and (bit_1 == 1):
number_of_blocks = 16
return number_of_blocks
@staticmethod
def parse_channel_mode(raw_header):
ch_word = raw_header[1]
if ch_word & 0x08 == 0x08:
bit_0 = 1
else:
bit_0 = 0
if ch_word & 0x04 == 0x04:
bit_1 = 1
else:
bit_1 = 0
if (bit_0 == 0) and (bit_1 == 0):
channel_mode = SBCFrameHeaderParser.MONO
elif (bit_0 == 0) and (bit_1 == 1):
channel_mode = SBCFrameHeaderParser.DUAL_CHANNEL
elif (bit_0 == 1) and (bit_1 == 0):
channel_mode = SBCFrameHeaderParser.STEREO
elif (bit_0 == 1) and (bit_1 == 1):
channel_mode = SBCFrameHeaderParser.JOINT_STEREO
return channel_mode
@staticmethod
def parse_number_of_subbands(raw_header):
if raw_header[1] & 0x01 == 0x01:
number_of_subbands = 8
else:
number_of_subbands = 4
return number_of_subbands
@staticmethod
def parse_bitpool(raw_header):
return int(raw_header[2])
class SBCEncoding(object):
def __init__(
self,
rate = 32000,
bitpool = 50,
channel_mode = "stereo",
blocks = 16,
subbands = 8,
channels = 2
):
self.rate = rate
self.bitpool = bitpool
self.channel_mode = channel_mode
self.blocks = blocks
self.subbands = subbands
self.channels = channels
def gst_caps(self):
return (
'audio/x-sbc,'
+ 'channels=' + str(self.channels) + ', '
+ 'rate=' + str(self.rate) + ', '
+ 'channel-mode=' + str(self.channel_mode) + ', '
+ 'blocks=' + str(self.blocks) + ', '
+ 'subbands=' + str(self.subbands) + ', '
+ 'bitpool=' + str(self.bitpool)
)
class CallbackSink(GstBase.BaseSink):
__gstmetadata__ = (
'CustomSink', 'Sink', 'custom test sink element', 'poconbhui'
)
__gsttemplates__ = Gst.PadTemplate.new(
'sink',
Gst.PadDirection.SINK,
Gst.PadPresence.ALWAYS,
Gst.Caps.new_any()
)
def __init__(self, callback, *args, **kwargs):
super().__init__(*args, **kwargs)
self.callback = callback
def do_render(self, buffer):
data = buffer.extract_dup(0, buffer.get_size())
self.callback(buffer, data)
return Gst.FlowReturn.OK
class GtkMainThread(Thread):
def run(self):
Gtk.main()
def close(self):
Gtk.main_quit();
class GstPulseToSBCPipeline(object):
def __init__(self, callback, pulse_sink_name='ds4.monitor'):
self.callback = callback
self.pulse_sink_name = pulse_sink_name
self.player = Gst.Pipeline.new('player')
self.pulse_source = Gst.ElementFactory.make(
'pulsesrc', instance_name='pulse-source'
)
self.pulse_source.set_property('device', self.pulse_sink_name)
self.player.add(self.pulse_source)
self.pulse_resampler = Gst.ElementFactory.make(
'audioresample', 'pulse-resampler'
)
self.player.add(self.pulse_resampler)
self.pulse_source.link(self.pulse_resampler)
self.pulse_resampler_caps = Gst.ElementFactory.make(
'capsfilter', 'pulse-resampler-caps'
)
self.pulse_resampler_caps.set_property(
'caps', Gst.Caps.from_string("audio/x-raw, rate=32000")
)
self.player.add(self.pulse_resampler_caps)
self.pulse_resampler.link(self.pulse_resampler_caps)
self.sbc_encoder = Gst.ElementFactory.make(
'sbcenc', 'sbc-encoder'
)
self.player.add(self.sbc_encoder)
self.pulse_resampler_caps.link(self.sbc_encoder)
self.sbc_encoder_caps = Gst.ElementFactory.make(
'capsfilter', 'sbc-encoder-caps'
)
self.sbc_encoder_caps.set_property(
'caps', Gst.Caps.from_string(SBCEncoding().gst_caps())
)
self.player.add(self.sbc_encoder_caps)
self.sbc_encoder.link(self.sbc_encoder_caps)
self.sink = CallbackSink(self.callback)
self.player.add(self.sink)
self.sbc_encoder_caps.link(self.sink)
self.player.set_state(Gst.State.PLAYING)
self.gtk_main_thread = GtkMainThread()
def start(self):
GObject.threads_init()
self.gtk_main_thread.start()
def stop(self):
self.gtk_main_thread.close()
class Report17CallbackWriter(object):
def __init__(self):
self.sbc_frame_data = b''
self.audio_len = 448
self.write_pool = Pool(processes=1)
def __call__(self, buffer, data):
self.sbc_frame_data += data
self.try_write_report()
@staticmethod
def sigalrm_handler(signum, frame):
raise TimeoutError
@staticmethod
def write_report(audio_data, timeout):
signal.signal(
signal.SIGALRM, Report17CallbackWriter.sigalrm_handler
)
try:
report = _17_report(audio_data)
signal.setitimer(signal.ITIMER_REAL, timeout)
try:
pf.write(report)
except TimeoutError:
#print("Timeout Error")
pass
signal.setitimer(signal.ITIMER_REAL, 0)
except TimeoutError:
pass
except KeyboardInterrupt:
pass
def try_write_report(self):
if len(self.sbc_frame_data) < self.audio_len:
return
audio_data = b''
sbc_header = SBCFrameHeaderParser()
while len(audio_data) < self.audio_len:
sbc_header.parse(self.sbc_frame_data)
frame_length = sbc_header.frame_length
if len(audio_data) + frame_length <= self.audio_len:
audio_data += \
self.sbc_frame_data[0:frame_length]
self.sbc_frame_data = \
self.sbc_frame_data[frame_length:]
else:
break
maxtime = len(audio_data)/sbc_header.bit_rate*10
self.write_pool.apply_async(
Report17CallbackWriter.write_report,
(audio_data, maxtime)
)
# Create pulse source
pulse_sink_name = "ds4"
pulse_sink_id = subprocess.check_output([
'pactl', 'load-module', 'module-null-sink',
'sink_name="{}"'.format(pulse_sink_name),
'sink_properties=device.description="{}"'.format(
"DualShock\ 4"
),
])
pulse_sink_id = int(pulse_sink_id)
print("pulse sink id:", pulse_sink_id)
report_17_callback_writer = Report17CallbackWriter()
pipeline = GstPulseToSBCPipeline(
report_17_callback_writer,
pulse_sink_name = pulse_sink_name + ".monitor"
)
pipeline.start()
try:
# Freeze here until ^C
signal.pause()
except KeyboardInterrupt:
pass
# Cleanup
pipeline.stop()
subprocess.check_output([
'pactl', 'unload-module', str(pulse_sink_id)
])
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment