Last active
July 17, 2020 20:33
-
-
Save jeremyd2019/73bcdb7ac7caaead392b43639921406d to your computer and use it in GitHub Desktop.
Python script to fix up xrandr after Xfce4 messes it up trying to hotplug monitors
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python3 | |
# | |
import sys, subprocess, time | |
import Xlib | |
from Xlib import X, display | |
from Xlib.protocol import rq | |
from Xlib.ext import randr | |
from resettable_timer import reset_or_recreate_timer | |
if Xlib.__version__[:2] < (0, 16): | |
randr.GetOutputInfo._reply = rq.Struct( | |
rq.ReplyCode(), | |
rq.Card8('status'), | |
rq.Card16('sequence_number'), | |
rq.ReplyLength(), | |
rq.Card32('timestamp'), | |
rq.Card32('crtc'), | |
rq.Card32('mm_width'), | |
rq.Card32('mm_height'), | |
rq.Card8('connection'), | |
rq.Card8('subpixel_order'), | |
rq.LengthOf('crtcs', 2), | |
rq.LengthOf('modes', 2), | |
rq.Card16('num_preferred'), | |
rq.LengthOf('clones', 2), | |
rq.LengthOf('name', 2), | |
rq.List('crtcs', rq.Card32Obj), | |
rq.List('modes', rq.Card32Obj), | |
rq.List('clones', rq.Card32Obj), | |
rq.String8('name'), | |
) | |
class Listener: | |
def __init__(self, display): | |
self.d = display | |
self.OutputChangeNotify = self.d.extension_event.OutputChangeNotify | |
if not isinstance(self.OutputChangeNotify, tuple): | |
self.OutputChangeNotify = (self.d.extension_event.CrtcChangeNotify, randr.RRNotify_OutputChange) | |
# Check for extension | |
if not self.d.has_extension(randr.extname): | |
sys.stderr.write('%s: server does not have the %s extension\n' | |
% (sys.argv[0], randr.extname)) | |
print(self.d.query_extension(randr.extname)) | |
sys.stderr.write("\n".join(self.d.list_extensions())) | |
if self.d.query_extension(randr.extname) is None: | |
sys.exit(1) | |
# print version | |
r = self.d.xrandr_query_version() | |
print('%s version %d.%d' % (randr.extname, r.major_version, r.minor_version)) | |
# Grab the current screen | |
self.screen = self.d.screen() | |
self.root = self.screen.root | |
# Enable Structure events (notably DestroyNotify) | |
self.root.change_attributes(event_mask=X.StructureNotifyMask) | |
# Enable all RandR events. | |
self.root.xrandr_select_input(randr.RROutputChangeNotifyMask) | |
self.timer = None | |
# Main loop, handling events | |
def loop(self): | |
current = None | |
while 1: | |
e = self.d.next_event() | |
# Window has been destroyed, quit | |
if e.type == X.DestroyNotify: | |
sys.exit(0) | |
if e.type == self.OutputChangeNotify[0]: #RRNotify | |
# Output information has changed | |
if e.sub_code == self.OutputChangeNotify[1]: | |
print('%0.4f: Output change' % time.time()) | |
e = randr.OutputChangeNotify(display=self.d.display, binarydata=e._binary) | |
res = self.root.xrandr_get_screen_resources()._data | |
output_info = self.d.xrandr_get_output_info(e.output, res['config_timestamp'])._data | |
if output_info: | |
print ("%r was %s" % (output_info['name'], 'connected' if e.connection == randr.Connected else 'disconnected' if e.connection == randr.Disconnected else 'unknown')) | |
if output_info['name'] == 'HDMI-2' and e.connection == randr.Connected: | |
self.timer = reset_or_recreate_timer(self.timer, 1, subprocess.call, (['/etc/lightdm/screenresolution.sh'],)) | |
if __name__ == '__main__': | |
Listener(display.Display()).loop() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
import threading as _threading | |
class ResettableTimer(_threading.Thread): | |
"""Call a function after a specified number of seconds: | |
t = Timer(30.0, f, args=[], kwargs={}) | |
t.start() | |
t.cancel() # stop the timer's action if it's still waiting | |
""" | |
def __init__(self, interval, function, args=[], kwargs={}): | |
super(ResettableTimer,self).__init__() | |
self.interval = interval | |
self.function = function | |
self.args = args | |
self.kwargs = kwargs | |
# don't need recursive lock | |
self.updatecond = _threading.Condition(_threading.Lock()) | |
self.updatetimeout = None | |
self.finished = False | |
def cancel(self): | |
"""Stop the timer if it hasn't finished yet""" | |
with self.updatecond: | |
if self.finished: | |
return False | |
self.finished = True | |
self.updatecond.notify() | |
return True | |
def reset(self, newtimeout): | |
"""Reset the timer if it hasn't finished yet""" | |
with self.updatecond: | |
if self.finished: | |
return False | |
self.updatetimeout = newtimeout | |
self.updatecond.notify() | |
return True | |
def is_finished(self): | |
"""You'll have a race if you use this""" | |
with self.updatecond: | |
return self.finished | |
def run(self): | |
try: | |
with self.updatecond: | |
try: | |
while True: | |
self.updatecond.wait(self.interval) | |
if self.finished: | |
break | |
elif self.updatetimeout is None: | |
self.function(*self.args, **self.kwargs) | |
break | |
else: | |
self.interval = self.updatetimeout | |
self.updatetimeout = None | |
finally: | |
self.finished = True | |
finally: | |
# Avoid a refcycle if the thread is running a function or | |
# an argument that has a member that points to the thread. | |
del self.function, self.args, self.kwargs | |
# Just in case acquiring the lock failed | |
self.finished = True | |
def reset_or_recreate_timer(timer, timeout, function, args=[], kwargs={}): | |
if timer is None or not timer.reset(timeout): | |
timer = ResettableTimer(timeout, function, args, kwargs) | |
timer.start() | |
return timer | |
if __name__ == "__main__": | |
foo = _threading.Event() | |
print("timer in 5 seconds") | |
timer = reset_or_recreate_timer(None, 5, foo.set) | |
foo.wait(2.5) | |
if foo.is_set(): | |
print("timer fired") | |
foo.clear() | |
else: | |
print("Timed out") | |
print("reset another 5 seconds") | |
timer = reset_or_recreate_timer(timer, 5, foo.set) | |
foo.wait(10) | |
if foo.is_set(): | |
print("timer fired") | |
foo.clear() | |
else: | |
print("Timed out") | |
print("try resetting already-fired") | |
timer = reset_or_recreate_timer(timer, 5, foo.set) | |
foo.wait(10) | |
if foo.is_set(): | |
print("timer fired") | |
foo.clear() | |
else: | |
print("Timed out") | |
print("try resetting already-fired") | |
timer = reset_or_recreate_timer(timer, 5, foo.set) | |
foo.wait(3) | |
print("cancel") | |
timer.cancel() | |
foo.wait(10) | |
if foo.is_set(): | |
print("timer fired") | |
foo.clear() | |
else: | |
print("Timed out") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment