Skip to content

Instantly share code, notes, and snippets.

@theY4Kman
Created May 22, 2020 00:45
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save theY4Kman/49a59bf8e5196e7ab477ba4c30a13bf5 to your computer and use it in GitHub Desktop.
Save theY4Kman/49a59bf8e5196e7ab477ba4c30a13bf5 to your computer and use it in GitHub Desktop.
#include <string.h>
#include <dlfcn.h>
#include <libxfce4util/libxfce4util.h>
#include <libxfce4panel/xfce-panel-plugin.h>
#include <gtk/gtk.h>
#include <Python.h>
#include <canberra.h>
#include <gsound.h>
#include "pygobject.h"
/* prototypes */
static void
airhorn_construct(XfcePanelPlugin *plugin);
/* register the plugin */
XFCE_PANEL_PLUGIN_REGISTER (airhorn_construct);
static PyObject *
_xfce4panelplugin_menu_insert_item(PyObject *self, PyObject *args, PyObject *kwargs);
struct _GSoundContext
{
GObject parent;
ca_context *ca;
};
static PyObject *
_xfce4panelplugin_menu_insert_item(PyObject *self, PyObject *args, PyObject *kwargs) {
PyObject *py_plugin;
PyObject *py_item;
static char *kwlist[] = {"plugin", "item", NULL};
if (!PyArg_ParseTupleAndKeywords(args, kwargs, "OO", kwlist, &py_plugin, &py_item)) {
return NULL;
}
// TODO: type-checking plugin and item
PyObject *plugin_capsule = PyObject_GetAttrString(py_plugin, "__gpointer__");
if (PyErr_Occurred()) goto py_error;
XfcePanelPlugin *plugin = (XfcePanelPlugin *)PyCapsule_GetPointer(plugin_capsule, NULL);
if (PyErr_Occurred()) goto py_error;
PyObject *item_capsule = PyObject_GetAttrString(py_item, "__gpointer__");
if (PyErr_Occurred()) goto py_error;
GtkMenuItem *item = (GtkMenuItem *)PyCapsule_GetPointer(item_capsule, NULL);
if (PyErr_Occurred()) goto py_error;
xfce_panel_plugin_menu_insert_item(plugin, item);
Py_INCREF(Py_True);
return Py_True;
py_error:
PyErr_Print();
Py_INCREF(Py_False);
return Py_False;
}
static PyObject *
_xfce4panelplugin_add_action_widget(PyObject *self, PyObject *args, PyObject *kwargs) {
PyObject *py_plugin;
PyObject *py_widget;
static char *kwlist[] = {"plugin", "widget", NULL};
if (!PyArg_ParseTupleAndKeywords(args, kwargs, "OO", kwlist, &py_plugin, &py_widget)) {
return NULL;
}
// TODO: type-checking plugin and widget
PyObject *plugin_capsule = PyObject_GetAttrString(py_plugin, "__gpointer__");
if (PyErr_Occurred()) goto py_error;
XfcePanelPlugin *plugin = (XfcePanelPlugin *)PyCapsule_GetPointer(plugin_capsule, NULL);
if (PyErr_Occurred()) goto py_error;
PyObject *widget_capsule = PyObject_GetAttrString(py_widget, "__gpointer__");
if (PyErr_Occurred()) goto py_error;
GtkWidget *widget = (GtkWidget *)PyCapsule_GetPointer(widget_capsule, NULL);
if (PyErr_Occurred()) goto py_error;
xfce_panel_plugin_add_action_widget(plugin, widget);
Py_INCREF(Py_True);
return Py_True;
py_error:
PyErr_Print();
Py_INCREF(Py_False);
return Py_False;
}
static PyObject *
_xfce4panelplugin_ca_change_device(PyObject *self, PyObject *args, PyObject *kwargs) {
PyObject *py_ctx;
const char *device = NULL;
static char *kwlist[] = {"ctx", "device", NULL};
if (!PyArg_ParseTupleAndKeywords(args, kwargs, "O|s", kwlist, &py_ctx, &device)) {
return NULL;
}
// TODO: type-checking ctx
PyObject *ctx_capsule = PyObject_GetAttrString(py_ctx, "__gpointer__");
if (PyErr_Occurred()) goto py_error;
GSoundContext *gsound_context = (GSoundContext *)PyCapsule_GetPointer(ctx_capsule, NULL);
if (PyErr_Occurred()) goto py_error;
int ret = ca_context_change_device(gsound_context->ca, device);
return PyLong_FromLong(ret);
py_error:
PyErr_Print();
Py_INCREF(Py_False);
return Py_False;
}
static PyMethodDef _Xfce4PanelPluginMethods[] = {
{"menu_insert_item", _xfce4panelplugin_menu_insert_item, METH_VARARGS | METH_KEYWORDS,
"Insert a custom menu item to the plugin's right click menu. "
"This item is packed below the \"Move\" menu item."
},
{"add_action_widget", _xfce4panelplugin_add_action_widget, METH_VARARGS | METH_KEYWORDS,
"Attach the plugin menu to this widget. Plugin writers should call this "
"for every widget that can receive mouse events. If you forget to call this "
"the plugin will not have a right-click menu and the user won't be able to "
"remove it."
},
{"ca_change_device", _xfce4panelplugin_ca_change_device, METH_VARARGS | METH_KEYWORDS,
"Specify the backend device to use. This function may be called not be called after "
"ca_context_open() suceeded. This function might suceed even when "
"the specified driver backend is not available. Use "
"ca_context_open() to find out whether the backend is available"
},
{NULL, NULL, 0, NULL} // sentinel
};
static struct PyModuleDef _xfce4panelplugin_module = {
PyModuleDef_HEAD_INIT,
"_xfce4panelplugin",
NULL,
-1,
_Xfce4PanelPluginMethods
};
PyMODINIT_FUNC
PyInit__xfce4panelplugin(void)
{
return PyModule_Create(&_xfce4panelplugin_module);
}
#define CONFIG_SET(attr, value) \
status = PyConfig_SetBytesString(&config, &config.attr, value); \
if (PyStatus_Exception(status)) { \
goto fail; \
}
static void
airhorn_construct(XfcePanelPlugin *xpp) {
PyStatus status;
PyConfig config;
dlopen("libpython3.8.so", RTLD_LAZY | RTLD_GLOBAL);
dlopen("libcanberra.so", RTLD_LAZY | RTLD_GLOBAL);
dlopen("libgsound.so", RTLD_LAZY | RTLD_GLOBAL);
setbuf(stdout, NULL);
if (PyImport_AppendInittab("_xfce4panelplugin", PyInit__xfce4panelplugin) == -1) {
fprintf(stderr, "Error: could not extend in-built modules table\n");
exit(1);
}
PyConfig_InitPythonConfig(&config);
config.site_import = 1;
CONFIG_SET(home, "/usr");
CONFIG_SET(base_prefix, "/usr");
CONFIG_SET(prefix, "/home/they4kman/.virtualenvs/airhorn");
CONFIG_SET(exec_prefix, "/home/they4kman/.virtualenvs/airhorn");
CONFIG_SET(base_exec_prefix, "/usr");
CONFIG_SET(executable, "/home/they4kman/.virtualenvs/airhorn/bin/python");
Py_InitializeFromConfig(&config);
PyObject *sys_path = PySys_GetObject("path");
PyList_Append(sys_path, PyUnicode_FromString("/usr/lib/xfce4/panel/plugins/airhorn"));
PyObject *gi = PyImport_Import(PyUnicode_FromString("gi"));
PyObject_CallMethod(gi, "require_version", "ss", "Gtk", "3.0");
if (PyErr_Occurred()) goto py_error;
PyObject *_gi = PyImport_Import(PyUnicode_FromString("gi._gi"));
PyObject *_gi_so_path_o = PyObject_GetAttrString(_gi, "__file__");
const char *_gi_so_path = PyUnicode_AsUTF8(_gi_so_path_o);
dlopen(_gi_so_path, RTLD_LAZY | RTLD_GLOBAL);
if (PyErr_Occurred()) goto py_error;
PyObject *_gobject = PyImport_Import(PyUnicode_FromString("gi._gi"));
PyObject *cobject = PyObject_GetAttrString(_gobject, "_PyGObject_API");
_PyGObject_API = (struct _PyGObject_Functions *) PyCapsule_GetPointer(cobject, "gobject._PyGObject_API");
if (PyErr_Occurred()) goto py_error;
// PyObject *GtkWindow = PyObject_GetAttrString(Gtk, "Window");
//
// PyObject *capsule = PyCapsule_New(xpp, "airhorn window", NULL);
// PyObject *window = PyObject_CallFunction(GtkWindow, "");
// PyObject_SetAttrString(window, "__gpointer__", capsule);
// Load Gtk first, to initialize widget classes
PyImport_Import(PyUnicode_FromString("gi.repository.Gtk"));
if (PyErr_Occurred()) goto py_error;
PyObject *window = pygobject_new((GObject *)xpp);
if (PyErr_Occurred()) goto py_error;
PyObject *xfce4_airhorn = PyImport_ImportModule("xfce4_airhorn");
if (PyErr_Occurred()) goto py_error;
// PyObject *plugin = PyObject_CallMethod(xfce4_airhorn, "Xfce4Airhorn", "O", window);
PyObject_CallMethod(xfce4_airhorn, "plugin_load", "O", window);
if (PyErr_Occurred()) goto py_error;
printf("initialized plugin!\n\n");
PyRun_SimpleString("from gi.repository import Gtk\nGtk.main()");
if (PyErr_Occurred()) goto py_error;
// PyObject *Gtk = PyImport_Import(PyUnicode_FromString("gi.repository.Gtk"));
// PyObject_CallMethod(Gtk, "main", "");
printf("exiting ...\n\n");
goto done;
py_error:
PyErr_Print();
done:
Py_Finalize();
return;
fail:
PyConfig_Clear(&config);
Py_ExitStatusException(status);
}
pygobject
Cython
Pillow
pulsectl
import os
import shlex
import subprocess
import sysconfig
from distutils import sysconfig
from distutils.core import setup
from typing import Dict, List
from Cython.Build import cythonize
from Cython.Distutils import Extension, build_ext
def parse_cflags(args):
kwargs = {
'include_dirs': [],
'libraries': [],
'library_dirs': [],
}
for arg in args:
if arg[0] != '-' or len(arg) < 3:
continue
opt, value = arg[1], arg[2:]
if opt == 'I':
kwargs['include_dirs'].append(value)
elif opt == 'L':
kwargs['library_dirs'].append(value)
elif opt == 'l':
kwargs['libraries'].append(value)
return kwargs
def get_pkgconfig_cython_kwargs(*packages):
res = subprocess.check_output([
'pkg-config',
'--cflags',
'--libs', *packages,
])
res = res.decode('utf-8')
res = res.strip()
args = shlex.split(res)
return parse_cflags(args)
def get_python_config_cython_kwargs():
res = ' '.join([
sysconfig.get_config_var('LIBS'),
sysconfig.get_config_var('INCLUDEPY'),
sysconfig.get_config_var('BLDLIBRARY'),
])
args = shlex.split(res)
return parse_cflags(args)
def merge_cython_kwargs(*kwargses: Dict[str, List[str]], **kwargs) -> Dict[str, List[str]]:
res = {
'include_dirs': [],
'libraries': [],
'library_dirs': [],
**kwargs,
}
for kwargs in kwargses:
for key, value in kwargs.items():
if key not in res:
res[key] = value
else:
res[key].extend(value)
return res
class NoSuffixBuilder(build_ext):
def get_ext_filename(self, ext_name):
filename = super().get_ext_filename(ext_name)
suffix = sysconfig.get_config_var('EXT_SUFFIX')
_, ext = os.path.splitext(filename)
return filename.replace(suffix, '') + ext
module_kwargs = merge_cython_kwargs(
get_pkgconfig_cython_kwargs('libxfce4panel-2.0'),
get_python_config_cython_kwargs(),
libraries=['canberra', 'gsound'],
)
setup(
package_dir={'': 'src'},
ext_modules=cythonize(
module_list=[
Extension(
name='libxfce4airhorn',
sources=['src/libxfce4airhorn.pyx', 'src/plugin.c'],
extra_link_args=shlex.split(sysconfig.get_config_var('LINKFORSHARED')),
**module_kwargs,
)
],
gdb_debug=True,
),
cmdclass={'build_ext': NoSuffixBuilder},
)
#!/usr/bin/env python3
import inspect
import logging
import math
import os
from pathlib import Path
from typing import Dict
import gi
import PIL.ImageEnhance
import pulsectl
from PIL import Image
gi.require_version('Gtk', '3.0')
gi.require_version('GSound', '1.0')
gi.require_version('libxfce4panel', '2.0')
from gi.repository import Gtk, Gdk, Gio, GLib, GdkPixbuf, GSound, libxfce4panel as Xfce4Panel
import _xfce4panelplugin
logging.basicConfig(level=os.getenv('AIRHORN_LOG_LEVEL', 'INFO').upper())
logger = logging.getLogger(__name__)
DEBUG = bool(os.getenv('AIRHORN_REMOTE_DEBUG'))
if DEBUG:
try:
import pydevd_pycharm
except ImportError:
pass
else:
port = int(os.getenv('AIRHORN_REMOTE_DEBUG_PORT', 57024))
try:
pydevd_pycharm.settrace('localhost', port=port, stdoutToServer=True, stderrToServer=True, suspend=False)
except ConnectionRefusedError:
logger.error('Unable to connect to remote debugging server, port %s', port)
SCRIPT_PATH = Path(__file__)
SCRIPT_DIR = SCRIPT_PATH.parent.absolute()
RESOURCES_DIR = SCRIPT_DIR / 'share'
GLADE_PATH = SCRIPT_DIR / 'xfce4-airhorn.glade'
CSS_PATH = RESOURCES_DIR / 'xfce4-airhorn.css'
class Xfce4Airhorn:
# Event ID to use with libcanberra, so airhorn sounds can be canceled.
CA_AIRHORN_ID = 1
def __init__(self, plugin: Xfce4Panel.PanelPlugin):
self.plugin = plugin
self.volume = 100
self.volume_delta = 12
self._sound_canceler = None
self.ca_ctx = None
self.device = None
self.pulse = pulsectl.Pulse('airhorn')
self.init_sound()
self.builder = None
self.container = None
self.button = None
self.volume_overlay = None
self.build_ui()
self.device_items: Dict[int, Gtk.ImageMenuItem] = {}
self.init_menu()
self.plugin.connect('size-changed', self.on_size_changed)
# For some reason, I can only get the parent window to accept scroll events
self.plugin.get_window().set_events(Gdk.EventMask.SCROLL_MASK)
self.plugin.get_parent().connect('scroll-event', self.on_scroll)
self.css_provider = None
self.init_styles()
self.plugin.show_all()
self.monitors = []
self.monitor_ui_source_changes()
logger.info('Initialized airhorn GUI')
def init_sound(self):
self._sound_canceler = Gio.Cancellable.new()
self.ca_ctx = GSound.Context.new(self._sound_canceler)
if self.device is not None:
_xfce4panelplugin.ca_change_device(self.ca_ctx, str(self.device))
self.ca_ctx.cache({
GSound.ATTR_EVENT_ID: 'airhorn',
})
def build_ui(self):
self.builder = Gtk.Builder()
self.builder.add_objects_from_file(str(GLADE_PATH), ('airhorn-icon', 'container'))
# XXX: for some reason, connect_signals(self) is not working
self.builder.connect_signals({
name: method
for name, method in inspect.getmembers(self, inspect.ismethod)
if name.startswith('on_')
})
self.container: Gtk.Widget = self.builder.get_object('container')
self.plugin.add(self.container)
self.button = self.builder.get_object('airhorn-button')
self.volume_overlay = self.builder.get_object('volume-overlay')
self.on_size_changed(self.plugin, self.plugin.props.size)
def rebuild_ui(self):
logger.debug('Rebuilding UI ...')
for widget in self.plugin.get_children():
widget.destroy()
self.build_ui()
self.plugin.show_all()
def init_menu(self):
for sink in self.pulse.sink_list():
item = Gtk.ImageMenuItem.new_with_label(sink.description)
item.set_visible(True)
item.set_sensitive(True)
item.connect('activate', self.on_change_device, sink.index)
_xfce4panelplugin.menu_insert_item(self.plugin, item)
self.device_items[sink.index] = item
def on_change_device(self, menu_item: Gtk.MenuItem, device: int):
for item_device, item in self.device_items.items():
if item_device == device:
# Checkmark
image = Gtk.Image.new_from_icon_name('emblem-default-symbolic', -1)
else:
image = None
item.set_image(image)
self.device = device
self.init_sound()
def init_styles(self):
self.css_provider = Gtk.CssProvider()
style_context = Gtk.StyleContext()
screen = Gdk.Screen.get_default()
style_context.add_provider_for_screen(screen, self.css_provider, Gtk.STYLE_PROVIDER_PRIORITY_APPLICATION)
self.load_styles()
def load_styles(self):
with CSS_PATH.open('rb') as fp:
self.css_provider.load_from_data(fp.read())
def reload_styles(self):
logger.debug('Reloading styles ...')
self.load_styles()
def monitor_ui_source_changes(self):
watches = [
(GLADE_PATH, self.rebuild_ui),
(CSS_PATH, self.reload_styles),
]
for path, callback in watches:
gio_file = Gio.File.new_for_path(str(path))
monitor = gio_file.monitor_file(Gio.FileMonitorFlags.NONE, None)
monitor.connect('changed', self.file_changed_handler(callback))
self.monitors.append(monitor)
def file_changed_handler(self, callback):
def on_file_changed(monitor, gfile, o, event):
if event == Gio.FileMonitorEvent.CHANGES_DONE_HINT:
callback()
return on_file_changed
@property
def volume_db(self) -> float:
"""Volume in decibels
libcanberra accepts its volume inputs in decibels, not nice amplitudes
using floats or 0-100. So, we must convert.
Calculations sourced from: https://blog.demofox.org/2015/04/14/decibels-db-and-amplitude/
"""
if self.volume <= 0:
return -96
else:
return 20 * math.log10(self.volume / 100)
def play_airhorn_sound(self):
attrs = {
GSound.ATTR_EVENT_ID: 'airhorn',
GSound.ATTR_CANBERRA_VOLUME: f'{self.volume_db:.2f}',
}
self.ca_ctx.play_simple(attrs, self._sound_canceler)
logger.info('Playing airhorn sound')
def stop_airhorn_sounds(self):
self._sound_canceler.cancel()
self._sound_canceler = Gio.Cancellable.new()
logger.info('Stopped airhorn sounds')
def on_airhorn_button_pressed(self, airhorn_button: Gtk.Button, event: Gdk.EventButton, *args):
if event.type == Gdk.EventType.BUTTON_PRESS:
if event.button == Gdk.BUTTON_PRIMARY:
if DEBUG and event.state & Gdk.ModifierType.CONTROL_MASK:
print('ctrl click')
else:
self.play_airhorn_sound()
elif event.button == Gdk.BUTTON_MIDDLE:
self.stop_airhorn_sounds()
def on_volume_overlay_realize(self, volume_overlay: Gtk.DrawingArea, *args):
# We have to manually set pass-through on the volume overlay's Gdk.Window,
# or it won't allow mouse events to pass through to the button.
#
# This is in addition to setting pass-through on the actual Gtk.DrawingArea,
# which we do in Glade.
# (Technically, it calls gtk_overlay.set_overlay_pass_through(volume_overlay, true))
#
window = volume_overlay.get_window()
window.set_pass_through(True)
def on_scroll(self, widget, event: Gdk.EventScroll):
prev_volume = self.volume
volume_change = self.volume_delta * -event.delta_y
self.volume += volume_change
self.volume = max(min(self.volume, 100), 0)
if prev_volume != self.volume:
self.update_button_image()
def on_size_changed(self, plugin, size: int):
if size == 0:
# hell naw
return
orientation = plugin.props.orientation
if orientation == Gtk.Orientation.HORIZONTAL:
plugin.set_size_request(-1, size)
else:
plugin.set_size_request(size, -1)
self.update_button_image()
def update_button_image(self):
size = self.plugin.props.size
icon_theme = Gtk.IconTheme.get_default()
icon = icon_theme.load_icon('airhorn', size-10, Gtk.IconLookupFlags(0))
volume_frac = self.volume / 100
gray_frac = 1.0 - volume_frac
gray_height = int(icon.get_height() * gray_frac)
im = pixbuf2image(icon)
gray_area = PIL.ImageEnhance.Brightness(im.crop((0, 0, icon.get_width(), gray_height))).enhance(0.5)
im.paste(gray_area)
grayed_icon = convert_image_to_pixbuf(im)
button_img = self.button.get_image()
button_img.set_from_pixbuf(grayed_icon)
def on_destroy(self, widget, data=None):
Gtk.main_quit()
def plugin_load(plugin: Xfce4Panel.PanelPlugin):
inst = Xfce4Airhorn(plugin)
# Without this call, none of the widgets are displayed, and the Xfce4PanelPlugin
# widget and its children appear grayed out in the GTK inspector.
plugin.map()
def pixbuf2image(pix):
"""Convert gdkpixbuf to PIL image"""
data = pix.get_pixels()
w = pix.props.width
h = pix.props.height
stride = pix.props.rowstride
mode = 'RGB'
if pix.props.has_alpha:
mode = 'RGBA'
im = Image.frombytes(mode, (w, h), data, 'raw', mode, stride)
return im
def convert_image_to_pixbuf(im):
"""Convert Pillow image to GdkPixbuf
"""
data = im.tobytes()
width, height = im.size
data = GLib.Bytes.new(data)
has_alpha = im.mode == 'RGBA'
rowstride = width * (4 if has_alpha else 3)
pix = GdkPixbuf.Pixbuf.new_from_bytes(
data,
GdkPixbuf.Colorspace.RGB,
has_alpha,
8,
width, height,
rowstride,
)
return pix.copy()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment