Skip to content

Instantly share code, notes, and snippets.

@doxy-ai
Last active February 25, 2024 03:35
Show Gist options
  • Save doxy-ai/364d9804d97c8d37285e7b8671d274d4 to your computer and use it in GitHub Desktop.
Save doxy-ai/364d9804d97c8d37285e7b8671d274d4 to your computer and use it in GitHub Desktop.
ZatsuDachi Vstream Plugin
# Requirements: pip install websocket-client cbor2
# Import required packages
from api import PluginBase, Message, Color
from flask import escape
import threading
import asyncio
import json
import websocket
import base64
import uuid
import requests
import cbor2
import sys
# Define the plugin class
class Plugin(PluginBase):
# Set plugin name and target stream
name = "Vstream Chat Plugin"
# Set image URL for the plugin
pluginImageURL = "https://cdn.discordapp.com/attachments/1105538681567203368/1106682813207556187/Icon_white_transparent_1.png"
# Reference to the current websocket
_lock = threading.Lock()
_ws = None
# CONNECTION NOTE: Paste the id of your livestream here!
targetStream = "i8CRx7HhSPCOvT2BiyhRNQ"
async def go(self):
"""
Perform initial setup and establish a WebSocket connection.
This method sends an HTTP GET request to a specific URL based on `self.targetStream` to obtain the necessary information for establishing the WebSocket connection.
It then extracts the required data from the response and generates a `videoID` based on `self.targetStream`.
Finally, it calls the `setup_websocket` method to establish the WebSocket connection with the obtained information.
"""
# If the plugin is already running... don't start it again!
Plugin._lock.acquire()
try:
if Plugin._ws is not None: return
# Send an HTTP GET request to a specific URL based on self.targetStream
response = requests.get(f"https://vstream.com/v/{self.targetStream}/chat-popout")
# Raise an exception if the response status code is not successful (200)
response.raise_for_status()
# Extract a string between two other strings within the response using the extract_string_between method
# and parse it as JSON to obtain a dictionary named data
data = json.loads("{" + self.extract_string_between(response.text, "window.__remixContext = {", "};") + "}")
# Retrieve the channelID from the data dictionary
channelID = data["state"]["loaderData"]["routes/v_.$liveStreamID.chat-popout"]["liveStream"]["channelID"]
# Generate a videoID by decoding self.targetStream using base64 and creating a UUID from it
videoID = self.decode_videoID_if_nessicary(self.targetStream)
self.register_badge_if_not_registered("streamer", "https://vstream.com/build/_assets/streamer-OL6426YZ.png")
self.register_badge_if_not_registered("moderator", "https://vstream.com/build/_assets/moderator-RXLWJZX3.png")
finally: Plugin._lock.release()
# Call the setup_websocket method with the channelID and videoID parameters to establish a WebSocket connection
while self._keepAlive: # Whenever the connection closes... restart it!
self.setup_websocket(channelID, videoID)
def stop(self):
"""
Stop the plugin and halt the websocket listener
"""
PluginBase.stop(self)
Plugin._lock.acquire()
try:
if Plugin._ws is not None: Plugin._ws.close()
Plugin._ws = None
finally:
Plugin._lock.release()
print("Vstream stopped!")
def setup_websocket(self, channelID, videoID, depth = 1):
"""
Initialize a WebSocket connection and set up event handlers for different WebSocket events.
Args:
channelID (str): The channel ID used in the WebSocket connection URL.
videoID (UUID): The video ID used in the WebSocket connection URL.
Depth (int): The recursive depth of the function
"""
Plugin._lock.acquire()
try:
if not self._keepAlive: return
if depth > sys.getrecursionlimit() - 10: return
if Plugin._ws is not None:
Plugin._ws.close()
print("Vstream conncetion closed")
# Create a WebSocketApp object with a specific URL based on channelID and videoID
Plugin._ws = websocket.WebSocketApp(f'wss://vstream.com/suika2/videos/{videoID}')
# Set up an event handler for when a message is received
Plugin._ws.on_message = self.on_message
# Set up an event handler for when the WebSocket connection is opened
Plugin._ws.on_open = lambda ws: print("Vstream connection opened!")
# Set up an event handler for when an error occurs
Plugin._ws.on_error = lambda ws, e: print(e)
# When we lose connection restart the handler
Plugin._ws.on_close = lambda ws, status, msg: self.setup_websocket(channelID, videoID, depth + 1)
finally: Plugin._lock.release()
# Run the WebSocket connection indefinitely
Plugin._ws.run_forever()
def on_message(self, ws, message):
"""
Process a received message through the WebSocket.
This method extracts relevant information from the message, constructs a `Message` object, and passes it to the `recieve_message` method.
Args:
ws (WebSocket): The WebSocket instance that received the message.
message (str): The received message.
"""
data = cbor2.loads(message)
if(data[-4] != "ChatCreatedEvent"): return
chat = data["chat"]
# Process each chat message
try:
# Extract the color of the message sender
color = Color(rgb=(chat["chatterColor"][0]/255, chat["chatterColor"][1]/255, chat["chatterColor"][2]/255))
# Extract the badges from the message
try:
badges = chat["chatterBadges"]
if isinstance(chat["chatterBadges"][0], dict):
badges = [x[-4].replace("Badge", "").lower() for x in badges]
except: badges = []
# print(color)
print(chat)
print("message recived")
# Initialize the message content
msg = ""
# Iterate over the nodes of the chat message
for node in chat["nodes"]:
# Check the type of the node
match(node[-4]):
case "TextChatNode":
# If the node is a TextChatNode, append the text to the message content
msg += node["text"]
case "LinkChatNode":
# If the node is a link append it to the message!
msg += node['href'] # No more pretty colors... but it prints properly :D
case "MentionChatNode":
# If the node is a MentionChatNode, append the username preceded by "@" to the message content
msg += "@" + node[5]
case "EmojiChatNode":
# If the node is an EmojiChatNode
if node["emoji"]["size28Src"] == None:
# If there is no associated image, append the alt text to the message content
msg += node["emoji"]["altText"]
else:
# If there is an associated image, register the emote and append the action text to the message content
self.register_emote(node["emoji"]["actionText"], node["emoji"]["size28Src"])
msg += node["emoji"]["actionText"]
# Create a Message object with the constructed content, sender information, color, and plugin image URL
self.recieve_message(Message(content=msg, sender=chat["chatter"][5], senderColor=color, senderBadges=badges, pluginImageUrl=self.pluginImageURL))
print(msg)
except Exception as e:
# pass # Handle any exceptions that occur during message processing
raise e
# Checks if a string is a valid UUID
def is_valid_uuid(self, uuid_str):
try:
uuid_obj = uuid.UUID(uuid_str)
return str(uuid_obj) == uuid_str
except ValueError:
return False
# Decodes the video stream into a valid UUID if nessicary
def decode_videoID_if_nessicary(self, uuid_str):
if self.is_valid_uuid(uuid_str):
return uuid_str
return self.vstreamBase64DecodeToUUID(uuid_str)
# Extract a string between two other strings within a given source string.
def extract_string_between(self, source, start, end):
"""
Extract a string between two other strings within a given source string.
Args:
source_string (str): The source string from which to extract the substring.
start_string (str): The starting string marking the beginning of the desired substring.
end_string (str): The ending string marking the end of the desired substring.
Returns:
str or None: The extracted substring if found, or None if either the start or end string is not found.
"""
start_index = source.find(start)
# If start_string is not found within source_string, return None
if start_index == -1:
return None
# Search for end_string starting from the index immediately after start_string
end_index = source.find(end, start_index + len(start))
# If end_string is not found within source_string after start_string, return None
if end_index == -1:
return None
# Return the substring between start_string and end_string
return source[start_index + len(start):end_index]
# Function which decodes Vstream's particular base64 encoding
# Thank you to @E for the original version of this function!
def vstreamBase64DecodeToUUID(self, base64URL):
encoding = 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789-.'
padding = '='
codes = {char: i for i, char in enumerate(encoding)}
end = len(base64URL)
while base64URL[end - 1] == padding:
end -= 1
data = bytearray((end * 6) // 8)
bits = 0
buffer = 0
written = 0
for i in range(end):
value = codes.get(base64URL[i])
if value is None:
raise SyntaxError(f'Invalid character {base64URL[i]}')
buffer = (buffer << 6) | value
bits += 6
if bits >= 8:
bits -= 8
data[written] = 0xff & (buffer >> bits)
written += 1
if bits >= 6 or 0xff & (buffer << (8 - bits)):
raise SyntaxError('Unexpected end of data')
if len(data) != 16:
raise ValueError(f'Invalid Uint8Array length for uuid: {len(data)}')
str_data = ''.join([byte.to_bytes(1, 'big').hex() for byte in data])
return f'{str_data[:8]}-{str_data[8:12]}-{str_data[12:16]}-{str_data[16:20]}-{str_data[20:]}'
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment