Skip to content

Instantly share code, notes, and snippets.

@jeremyd2019
Last active July 17, 2020 20:33
Show Gist options
  • Save jeremyd2019/73bcdb7ac7caaead392b43639921406d to your computer and use it in GitHub Desktop.
Save jeremyd2019/73bcdb7ac7caaead392b43639921406d to your computer and use it in GitHub Desktop.
Python script to fix up xrandr after Xfce4 messes it up trying to hotplug monitors
#!/usr/bin/python3
#
import sys, subprocess, time
import Xlib
from Xlib import X, display
from Xlib.protocol import rq
from Xlib.ext import randr
from resettable_timer import reset_or_recreate_timer
if Xlib.__version__[:2] < (0, 16):
randr.GetOutputInfo._reply = rq.Struct(
rq.ReplyCode(),
rq.Card8('status'),
rq.Card16('sequence_number'),
rq.ReplyLength(),
rq.Card32('timestamp'),
rq.Card32('crtc'),
rq.Card32('mm_width'),
rq.Card32('mm_height'),
rq.Card8('connection'),
rq.Card8('subpixel_order'),
rq.LengthOf('crtcs', 2),
rq.LengthOf('modes', 2),
rq.Card16('num_preferred'),
rq.LengthOf('clones', 2),
rq.LengthOf('name', 2),
rq.List('crtcs', rq.Card32Obj),
rq.List('modes', rq.Card32Obj),
rq.List('clones', rq.Card32Obj),
rq.String8('name'),
)
class Listener:
def __init__(self, display):
self.d = display
self.OutputChangeNotify = self.d.extension_event.OutputChangeNotify
if not isinstance(self.OutputChangeNotify, tuple):
self.OutputChangeNotify = (self.d.extension_event.CrtcChangeNotify, randr.RRNotify_OutputChange)
# Check for extension
if not self.d.has_extension(randr.extname):
sys.stderr.write('%s: server does not have the %s extension\n'
% (sys.argv[0], randr.extname))
print(self.d.query_extension(randr.extname))
sys.stderr.write("\n".join(self.d.list_extensions()))
if self.d.query_extension(randr.extname) is None:
sys.exit(1)
# print version
r = self.d.xrandr_query_version()
print('%s version %d.%d' % (randr.extname, r.major_version, r.minor_version))
# Grab the current screen
self.screen = self.d.screen()
self.root = self.screen.root
# Enable Structure events (notably DestroyNotify)
self.root.change_attributes(event_mask=X.StructureNotifyMask)
# Enable all RandR events.
self.root.xrandr_select_input(randr.RROutputChangeNotifyMask)
self.timer = None
# Main loop, handling events
def loop(self):
current = None
while 1:
e = self.d.next_event()
# Window has been destroyed, quit
if e.type == X.DestroyNotify:
sys.exit(0)
if e.type == self.OutputChangeNotify[0]: #RRNotify
# Output information has changed
if e.sub_code == self.OutputChangeNotify[1]:
print('%0.4f: Output change' % time.time())
e = randr.OutputChangeNotify(display=self.d.display, binarydata=e._binary)
res = self.root.xrandr_get_screen_resources()._data
output_info = self.d.xrandr_get_output_info(e.output, res['config_timestamp'])._data
if output_info:
print ("%r was %s" % (output_info['name'], 'connected' if e.connection == randr.Connected else 'disconnected' if e.connection == randr.Disconnected else 'unknown'))
if output_info['name'] == 'HDMI-2' and e.connection == randr.Connected:
self.timer = reset_or_recreate_timer(self.timer, 1, subprocess.call, (['/etc/lightdm/screenresolution.sh'],))
if __name__ == '__main__':
Listener(display.Display()).loop()
#!/usr/bin/env python
import threading as _threading
class ResettableTimer(_threading.Thread):
"""Call a function after a specified number of seconds:
t = Timer(30.0, f, args=[], kwargs={})
t.start()
t.cancel() # stop the timer's action if it's still waiting
"""
def __init__(self, interval, function, args=[], kwargs={}):
super(ResettableTimer,self).__init__()
self.interval = interval
self.function = function
self.args = args
self.kwargs = kwargs
# don't need recursive lock
self.updatecond = _threading.Condition(_threading.Lock())
self.updatetimeout = None
self.finished = False
def cancel(self):
"""Stop the timer if it hasn't finished yet"""
with self.updatecond:
if self.finished:
return False
self.finished = True
self.updatecond.notify()
return True
def reset(self, newtimeout):
"""Reset the timer if it hasn't finished yet"""
with self.updatecond:
if self.finished:
return False
self.updatetimeout = newtimeout
self.updatecond.notify()
return True
def is_finished(self):
"""You'll have a race if you use this"""
with self.updatecond:
return self.finished
def run(self):
try:
with self.updatecond:
try:
while True:
self.updatecond.wait(self.interval)
if self.finished:
break
elif self.updatetimeout is None:
self.function(*self.args, **self.kwargs)
break
else:
self.interval = self.updatetimeout
self.updatetimeout = None
finally:
self.finished = True
finally:
# Avoid a refcycle if the thread is running a function or
# an argument that has a member that points to the thread.
del self.function, self.args, self.kwargs
# Just in case acquiring the lock failed
self.finished = True
def reset_or_recreate_timer(timer, timeout, function, args=[], kwargs={}):
if timer is None or not timer.reset(timeout):
timer = ResettableTimer(timeout, function, args, kwargs)
timer.start()
return timer
if __name__ == "__main__":
foo = _threading.Event()
print("timer in 5 seconds")
timer = reset_or_recreate_timer(None, 5, foo.set)
foo.wait(2.5)
if foo.is_set():
print("timer fired")
foo.clear()
else:
print("Timed out")
print("reset another 5 seconds")
timer = reset_or_recreate_timer(timer, 5, foo.set)
foo.wait(10)
if foo.is_set():
print("timer fired")
foo.clear()
else:
print("Timed out")
print("try resetting already-fired")
timer = reset_or_recreate_timer(timer, 5, foo.set)
foo.wait(10)
if foo.is_set():
print("timer fired")
foo.clear()
else:
print("Timed out")
print("try resetting already-fired")
timer = reset_or_recreate_timer(timer, 5, foo.set)
foo.wait(3)
print("cancel")
timer.cancel()
foo.wait(10)
if foo.is_set():
print("timer fired")
foo.clear()
else:
print("Timed out")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment