Last active
February 25, 2024 03:35
-
-
Save doxy-ai/364d9804d97c8d37285e7b8671d274d4 to your computer and use it in GitHub Desktop.
ZatsuDachi Vstream Plugin
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Requirements: pip install websocket-client cbor2 | |
# Import required packages | |
from api import PluginBase, Message, Color | |
from flask import escape | |
import threading | |
import asyncio | |
import json | |
import websocket | |
import base64 | |
import uuid | |
import requests | |
import cbor2 | |
import sys | |
# Define the plugin class | |
class Plugin(PluginBase): | |
# Set plugin name and target stream | |
name = "Vstream Chat Plugin" | |
# Set image URL for the plugin | |
pluginImageURL = "https://cdn.discordapp.com/attachments/1105538681567203368/1106682813207556187/Icon_white_transparent_1.png" | |
# Reference to the current websocket | |
_lock = threading.Lock() | |
_ws = None | |
# CONNECTION NOTE: Paste the id of your livestream here! | |
targetStream = "i8CRx7HhSPCOvT2BiyhRNQ" | |
async def go(self): | |
""" | |
Perform initial setup and establish a WebSocket connection. | |
This method sends an HTTP GET request to a specific URL based on `self.targetStream` to obtain the necessary information for establishing the WebSocket connection. | |
It then extracts the required data from the response and generates a `videoID` based on `self.targetStream`. | |
Finally, it calls the `setup_websocket` method to establish the WebSocket connection with the obtained information. | |
""" | |
# If the plugin is already running... don't start it again! | |
Plugin._lock.acquire() | |
try: | |
if Plugin._ws is not None: return | |
# Send an HTTP GET request to a specific URL based on self.targetStream | |
response = requests.get(f"https://vstream.com/v/{self.targetStream}/chat-popout") | |
# Raise an exception if the response status code is not successful (200) | |
response.raise_for_status() | |
# Extract a string between two other strings within the response using the extract_string_between method | |
# and parse it as JSON to obtain a dictionary named data | |
data = json.loads("{" + self.extract_string_between(response.text, "window.__remixContext = {", "};") + "}") | |
# Retrieve the channelID from the data dictionary | |
channelID = data["state"]["loaderData"]["routes/v_.$liveStreamID.chat-popout"]["liveStream"]["channelID"] | |
# Generate a videoID by decoding self.targetStream using base64 and creating a UUID from it | |
videoID = self.decode_videoID_if_nessicary(self.targetStream) | |
self.register_badge_if_not_registered("streamer", "https://vstream.com/build/_assets/streamer-OL6426YZ.png") | |
self.register_badge_if_not_registered("moderator", "https://vstream.com/build/_assets/moderator-RXLWJZX3.png") | |
finally: Plugin._lock.release() | |
# Call the setup_websocket method with the channelID and videoID parameters to establish a WebSocket connection | |
while self._keepAlive: # Whenever the connection closes... restart it! | |
self.setup_websocket(channelID, videoID) | |
def stop(self): | |
""" | |
Stop the plugin and halt the websocket listener | |
""" | |
PluginBase.stop(self) | |
Plugin._lock.acquire() | |
try: | |
if Plugin._ws is not None: Plugin._ws.close() | |
Plugin._ws = None | |
finally: | |
Plugin._lock.release() | |
print("Vstream stopped!") | |
def setup_websocket(self, channelID, videoID, depth = 1): | |
""" | |
Initialize a WebSocket connection and set up event handlers for different WebSocket events. | |
Args: | |
channelID (str): The channel ID used in the WebSocket connection URL. | |
videoID (UUID): The video ID used in the WebSocket connection URL. | |
Depth (int): The recursive depth of the function | |
""" | |
Plugin._lock.acquire() | |
try: | |
if not self._keepAlive: return | |
if depth > sys.getrecursionlimit() - 10: return | |
if Plugin._ws is not None: | |
Plugin._ws.close() | |
print("Vstream conncetion closed") | |
# Create a WebSocketApp object with a specific URL based on channelID and videoID | |
Plugin._ws = websocket.WebSocketApp(f'wss://vstream.com/suika2/videos/{videoID}') | |
# Set up an event handler for when a message is received | |
Plugin._ws.on_message = self.on_message | |
# Set up an event handler for when the WebSocket connection is opened | |
Plugin._ws.on_open = lambda ws: print("Vstream connection opened!") | |
# Set up an event handler for when an error occurs | |
Plugin._ws.on_error = lambda ws, e: print(e) | |
# When we lose connection restart the handler | |
Plugin._ws.on_close = lambda ws, status, msg: self.setup_websocket(channelID, videoID, depth + 1) | |
finally: Plugin._lock.release() | |
# Run the WebSocket connection indefinitely | |
Plugin._ws.run_forever() | |
def on_message(self, ws, message): | |
""" | |
Process a received message through the WebSocket. | |
This method extracts relevant information from the message, constructs a `Message` object, and passes it to the `recieve_message` method. | |
Args: | |
ws (WebSocket): The WebSocket instance that received the message. | |
message (str): The received message. | |
""" | |
data = cbor2.loads(message) | |
if(data[-4] != "ChatCreatedEvent"): return | |
chat = data["chat"] | |
# Process each chat message | |
try: | |
# Extract the color of the message sender | |
color = Color(rgb=(chat["chatterColor"][0]/255, chat["chatterColor"][1]/255, chat["chatterColor"][2]/255)) | |
# Extract the badges from the message | |
try: | |
badges = chat["chatterBadges"] | |
if isinstance(chat["chatterBadges"][0], dict): | |
badges = [x[-4].replace("Badge", "").lower() for x in badges] | |
except: badges = [] | |
# print(color) | |
print(chat) | |
print("message recived") | |
# Initialize the message content | |
msg = "" | |
# Iterate over the nodes of the chat message | |
for node in chat["nodes"]: | |
# Check the type of the node | |
match(node[-4]): | |
case "TextChatNode": | |
# If the node is a TextChatNode, append the text to the message content | |
msg += node["text"] | |
case "LinkChatNode": | |
# If the node is a link append it to the message! | |
msg += node['href'] # No more pretty colors... but it prints properly :D | |
case "MentionChatNode": | |
# If the node is a MentionChatNode, append the username preceded by "@" to the message content | |
msg += "@" + node[5] | |
case "EmojiChatNode": | |
# If the node is an EmojiChatNode | |
if node["emoji"]["size28Src"] == None: | |
# If there is no associated image, append the alt text to the message content | |
msg += node["emoji"]["altText"] | |
else: | |
# If there is an associated image, register the emote and append the action text to the message content | |
self.register_emote(node["emoji"]["actionText"], node["emoji"]["size28Src"]) | |
msg += node["emoji"]["actionText"] | |
# Create a Message object with the constructed content, sender information, color, and plugin image URL | |
self.recieve_message(Message(content=msg, sender=chat["chatter"][5], senderColor=color, senderBadges=badges, pluginImageUrl=self.pluginImageURL)) | |
print(msg) | |
except Exception as e: | |
# pass # Handle any exceptions that occur during message processing | |
raise e | |
# Checks if a string is a valid UUID | |
def is_valid_uuid(self, uuid_str): | |
try: | |
uuid_obj = uuid.UUID(uuid_str) | |
return str(uuid_obj) == uuid_str | |
except ValueError: | |
return False | |
# Decodes the video stream into a valid UUID if nessicary | |
def decode_videoID_if_nessicary(self, uuid_str): | |
if self.is_valid_uuid(uuid_str): | |
return uuid_str | |
return self.vstreamBase64DecodeToUUID(uuid_str) | |
# Extract a string between two other strings within a given source string. | |
def extract_string_between(self, source, start, end): | |
""" | |
Extract a string between two other strings within a given source string. | |
Args: | |
source_string (str): The source string from which to extract the substring. | |
start_string (str): The starting string marking the beginning of the desired substring. | |
end_string (str): The ending string marking the end of the desired substring. | |
Returns: | |
str or None: The extracted substring if found, or None if either the start or end string is not found. | |
""" | |
start_index = source.find(start) | |
# If start_string is not found within source_string, return None | |
if start_index == -1: | |
return None | |
# Search for end_string starting from the index immediately after start_string | |
end_index = source.find(end, start_index + len(start)) | |
# If end_string is not found within source_string after start_string, return None | |
if end_index == -1: | |
return None | |
# Return the substring between start_string and end_string | |
return source[start_index + len(start):end_index] | |
# Function which decodes Vstream's particular base64 encoding | |
# Thank you to @E for the original version of this function! | |
def vstreamBase64DecodeToUUID(self, base64URL): | |
encoding = 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789-.' | |
padding = '=' | |
codes = {char: i for i, char in enumerate(encoding)} | |
end = len(base64URL) | |
while base64URL[end - 1] == padding: | |
end -= 1 | |
data = bytearray((end * 6) // 8) | |
bits = 0 | |
buffer = 0 | |
written = 0 | |
for i in range(end): | |
value = codes.get(base64URL[i]) | |
if value is None: | |
raise SyntaxError(f'Invalid character {base64URL[i]}') | |
buffer = (buffer << 6) | value | |
bits += 6 | |
if bits >= 8: | |
bits -= 8 | |
data[written] = 0xff & (buffer >> bits) | |
written += 1 | |
if bits >= 6 or 0xff & (buffer << (8 - bits)): | |
raise SyntaxError('Unexpected end of data') | |
if len(data) != 16: | |
raise ValueError(f'Invalid Uint8Array length for uuid: {len(data)}') | |
str_data = ''.join([byte.to_bytes(1, 'big').hex() for byte in data]) | |
return f'{str_data[:8]}-{str_data[8:12]}-{str_data[12:16]}-{str_data[16:20]}-{str_data[20:]}' |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment