Last active
April 28, 2023 14:39
-
-
Save serializingme/5c1a6fd6c7ea58af77c7b80579737c5a to your computer and use it in GitHub Desktop.
Asyncio Leveraging OBS Python Script
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"""Simple example how to make use of asyncio in OBS Python scripts.""" | |
from typing import Optional | |
import threading | |
import asyncio | |
_LOOP: Optional[asyncio.AbstractEventLoop] = None | |
_THREAD: Optional[threading.Thread] = None | |
def run_stuff(): | |
_LOOP = asyncio.new_event_loop() | |
asyncio.set_event_loop(_LOOP) | |
# Start anything that needs the async loop. The call to run_forever may | |
# not be needed, depends on the way the loop dependent components are | |
# started. | |
_LOOP.run_forever() | |
# Stop anything that is running on the loop before closing. Most likely | |
# using the loop run_until_complete function | |
_LOOP.close() | |
_LOOP = None | |
def script_load(settings): | |
_THREAD = threading.Thread(None, run_stuff, daemon=True) | |
_THREAD.start() | |
def script_unload(): | |
if _LOOP is not None: | |
_LOOP.call_soon_threadsafe(lambda l: l.stop(), _LOOP) | |
if _THREAD is not None: | |
# Wait for 5 seconds, if it doesn't exit just move on not to block | |
# OBS main thread. Logging something about the failure to properly exit | |
# is advised. | |
_THREAD.join(timeout=5) | |
_THREAD = None | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Should work now.