Last active
May 27, 2021 15:50
-
-
Save Yaulendil/f2ef7e16298fad51180030236048178d to your computer and use it in GitHub Desktop.
HexChat plugin for Twitch.tv stream chat IRC bridge; implements channel events, user purges, and badges
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
__module_name__ = "Twitch Formatter" | |
__module_version__ = "2.16.4" | |
__module_description__ = ( | |
'(irc.twitch.tv) Subscription notifications, name shortening, user "badges", and more' | |
) | |
""" | |
By @Davarice | |
Formatting plugin for use with HexChat and Twitch.TV | |
Parses IRCv3 tags from Twitch, representing subscriptions, raids, etc, and displays them properly in chat. Also prepends badges to the usernames of broadcasters, moderators, etc. | |
NOTE: This plugin --MUST-- be used with a version of HexChat that exposes IRCv3 to the Python interface. As of this writing, the main branch DOES NOT support this. | |
""" | |
# @badges=subscriber/0,bits-leader/2;color=#FF69B4;display-name=Luxurianhat;emotes=;flags=;id=54ab445f-2c39-426f-a48c-ec5dc73f9363;login=luxurianhat;mod=0;msg-id=bitsbadgetier;msg-param-threshold=10000;room-id=23936415;subscriber=1;system-msg=bits\\sbadge\\stier\\snotification;tmi-sent-ts=1552278196475;user-id=65508135;user-type= :tmi.twitch.tv USERNOTICE #jerma985 :*crunch* OW | |
###===--- | |
# CONFIG | | |
###===--- | |
# When /dbug >0... | |
debugServer = True # Display server event info ("PRIVMSG") | |
debugPrint = True # Display print event info ("Channel Message") | |
# Past this length, usernames will be shortened. Badges count towards length. | |
unameTruncate = 16 | |
longNameFiller = "…" # What character replaces the last character of a long name | |
# Username replacements. For when a large number of people annoyingly have usernames with the same first 10 characters for some idiotic reason. Bitter? Me? Never. | |
nameReplace = {r"phijkchu_": r"phj/", | |
r"phijkchu": r"phj/", | |
r"ㅡ": r"-"} | |
## USER BADGES ## | |
# Maximum number of characters allowed to represent badges | |
# Default is 3, the same as Twitch web view | |
prefixLengthCap = 3 | |
# Characters to precede a username, representing their badges | |
# This is also the order in which they are displayed. Only the topmost {prefixLengthCap} applicable characters will be shown. | |
badgeIcons = { | |
"broadcaster": "🜲", # Owner of the channel | |
# Big Deal™ Badges | |
"staff": "⚙", # Twitch Staff | |
"admin": "α", # Twitch Administrator | |
"global-mod": "μ", # Global Moderator | |
# Channel Badges | |
"moderator": "🗡", # Channel moderator | |
"subscriber": { # Subscriber to the channel | |
0: "①", | |
3: "③", | |
6: "⑥", | |
9: "⑨", | |
12: "ⅰ", | |
24: "ⅱ", | |
36: "ⅲ", | |
48: "ⅳ", | |
60: "ⅴ", | |
}, | |
"vip": "⚑", | |
"sub-gifter": ":", # Has given gift subscriptions | |
"bits-leader": "❖", | |
"bits": { # Has given bits (number attached) | |
0: "▴", | |
100: "⬧", | |
1000: "⬠", | |
5000: "⬡", | |
10000: "🟋", | |
100000: "🟎" | |
}, | |
# Account Badges | |
"partner": "✓", # "Verified" checkmark badge (Thanks CTD) | |
"turbo": "+", # Twitch Turbo | |
"premium": "±", # Twitch Prime | |
} | |
# A short string to be placed between a username and the badge characters preceding it | |
# Default is a single whitespace so that usernames can still be moused over correctly | |
badgeSep = " " | |
# Every event that needs the sender to have a badge added, as well as the color to make the tab | |
eventList = { | |
"Channel Message": 2, | |
"Channel Action": 2, | |
"Channel Msg Hilight": 3, | |
"Channel Action Hilight": 3, | |
"Your Message": 2, | |
} | |
## SUBSCRIPTIONS ## | |
# Messages to be printed for each subscription type, with keyword formatting | |
sublines = { | |
"resub": "<{subber}> resubscribed", | |
"sub": "<{subber}> subscribed", | |
"subgift": "<{subber}> was giftsubbed by {gifter}", | |
"submysterygift": "<{gifter}> gives out ({subNumber}) gift subscription{s2}", | |
"giftpaidupgrade": "<{subber}> upgraded their giftsub from {gifter}", | |
"primepaidupgrade": "<{subber}> upgraded their Prime Sub", | |
"bitsbadgetier": "<{subber}> earned a new tier of Bits Badge", | |
} | |
# Additional endings for special subscriptions (these might be chained) | |
sublineStreak = " for ({streak}) month{s1} in a row" | |
sublineFirst = ", starting a new streak" | |
sublineCumul = ", with ({cumul}) months in total" | |
sublinePrime = ", with Twitch Prime" | |
sublineMessage = ", saying: {msg}" | |
# DM conversion | |
dmSendEvent = { | |
"Your Message": "Private Message to Dialog", | |
"Your Action": "Private Action to Dialog", | |
} | |
dmDecoration = "=={}==" | |
###===------ | |
# UTILITIES | | |
###===------ | |
import datetime | |
import os | |
import hexchat | |
# import re | |
try: | |
print("Searching for Mortis in",os.getcwd()) | |
import mortis | |
except Exception as e: | |
print("Could not load Mortis:",e) | |
mortis = None | |
else: | |
print("MORTIS") | |
tabColors = {} | |
userstates = {} | |
msglast = "" | |
inbox = [] | |
opReturns = 0 | |
F = { | |
"b": "\002", # bold | |
"c": "\003", # color (followed by number) | |
"e": "\007", # sound | |
"h": "\010", # italics | |
"x": "\017", # reset | |
"r": "\026", # reverse color | |
"i": "\035", # italics | |
"u": "\037", # underline | |
"t": "\t", | |
} # tab | |
def ech(stuff, mtype="Server Error"): | |
hexchat.emit_print(mtype, stuff) | |
## Return Functions ## | |
# Given grammar and a number, return the appropriate singular or plural form | |
def usePlural(n, p="s", s="", w=""): | |
return w + {True: p, False: s}[n != 1] | |
# Split a raw IRCv3 string into a dict and return it | |
def tagsplit(tagline): | |
tags = {} | |
m = "" | |
tagline = tagline.lstrip("@") | |
tagtable = tagline.split(";") | |
for pair in tagtable: | |
try: | |
[k, v] = pair.split("=", 1) | |
if " :" in v: | |
vm = v.split(" :", 1) | |
v = vm[0] | |
m = vm[1] | |
tags.update({k: v}) | |
except: | |
pass | |
tags.update({"MSG": m}) | |
return tags | |
def price_is_right(seq, limit): | |
# Find the highest number in a sequence that is not higher than the limit | |
seq2 = [item if item <= limit else None for item in seq] | |
while None in seq2: | |
seq2.remove(None) | |
if seq2: | |
return max(seq2) | |
else: | |
return limit | |
# Given a set of comma separated badges, match them to the symbol dict and return the result | |
def badgesplit(bline): | |
if bline == "": | |
return "" | |
prefix = "" | |
blist = bline.split(",") | |
for badge in blist: | |
try: | |
# "Rank" here is the "level" of the badge, for example "12" for the 1-year subscription badge. | |
# The entry in badgeIcons can be a dict, and if it is, the keys should be numbers. The highest key which is <= the rank is the key whose value is displayed. | |
[btype, rank] = badge.split("/") | |
icon = badgeIcons[btype] | |
if type(icon) == dict: | |
icon = str(icon.get(price_is_right(icon, int(rank)), "")) | |
prefix += icon | |
except: | |
#ech(str(e)) | |
pass | |
if prefix != "": | |
return prefix[:prefixLengthCap] + badgeSep | |
else: | |
return prefix | |
# Make sure the given username, with its badges attached, fits within the username character limit | |
# Also perform necessary replacements, while we have it here | |
def nameShorten(name, badges=""): | |
for i in nameReplace: | |
name = name.replace(i, nameReplace[i]) | |
# name = re.sub(i,nameReplace[i],name,flags=re.IGNORECASE) | |
nlen = len(name) | |
mlen = unameTruncate - len(badges) - len(badgeSep) | |
flen = len(longNameFiller) | |
if nlen > mlen: | |
nout = name[: mlen - flen] + longNameFiller | |
else: | |
nout = name | |
return nout.lower() | |
# Test whether the passed context is focused | |
def Focused(t): | |
focus = hexchat.find_context().get_info("channel") | |
target = t.get_info("channel") | |
return focus == target | |
# Get rid of \x01 at the ends of a string | |
def simplify(m): | |
return m.split(":", 1)[1].strip("\x01") | |
# Even in dicts, IRCv3 strings are annoying so this will compare them easily | |
def MessageMatchesTagstring(msg, ircv3): | |
im = simplify(ircv3["MSG"]) | |
if msg == im: | |
return True | |
im = im.split(" ", 1)[1] | |
if msg == im: | |
return True | |
return False | |
## Void Functions ## | |
# Color a tab (0-3), but only if the current color is lower | |
def colorTabSafely(t, n=0, *, reset=False): | |
try: | |
assert not Focused(t) or reset # Do not color a visible tab, unless resetting | |
assert 0 <= n <= 3 | |
tN = ( | |
t.get_info("network").lower() + "/" + t.get_info("channel") | |
) # [t]ab[N]ame is channel name, ie '#davarice' | |
c = tabColors.get( | |
tN, 0 | |
) # [c]urrent color is the entry under the tabName in the colortable, or 0 | |
if ( | |
n >= c or reset | |
): # If the new color is "more important", or the function is called with the reset flag, change it | |
tabColors[tN] = n | |
t.command("gui color {}".format(str(n))) | |
except: | |
pass | |
# Post a message through a DM tab | |
def dmPost(uname, cname, msg, mtype): | |
c = hexchat.find_context("Twitch", cname.lower()) | |
if c == None: | |
cc = hexchat.find_context("Twitch") | |
if not cc: | |
return | |
c = hexchat.find_context("Twitch", cname.lower()) | |
if c == None: | |
cc.command(f"query {cname.lower()}") | |
c = hexchat.find_context("Twitch", cname.lower()) | |
if c == None: | |
return | |
c.command(f"settab {dmDecoration.format(cname)}") | |
c.emit_print(mtype, uname, msg) | |
colorTabSafely(c, 2) | |
return c | |
if mortis: | |
c = dmPost("Mortis", "Mortis", "Initialized", "Private Action to Dialog") | |
mortis = mortis.Mortis(c) | |
# Receive a message through DM/Whisper | |
def dmRecv(ircv3): | |
uname = ircv3["display-name"].lower() | |
m1 = ircv3["MSG"].split("WHISPER", 1)[1].split(":", 1)[1] | |
m2 = m1.split(" ", 1) | |
if m2[0].lower() == "/me" or m2[0].lower() == ".me": | |
dmPost(uname, uname, m2[1], "Private Action to Dialog") | |
else: | |
dmPost(uname, uname, m1, "Private Message to Dialog") | |
###===------ | |
# CALLBACKS | | |
###===------ | |
# Send a message to a DM/Whisper | |
def dmSend(word, word_eol, udat): | |
try: # This callback ACTUALLY triggers for EVERY message the user sends...so we had better make sure not to waste any time | |
c = hexchat.get_context() | |
assert c.get_info("network").lower() == "twitch" | |
myName = word[0] | |
assert myName.lower() != "say" | |
dmName = c.get_info("channel") | |
wsplit = word[1].split(" ", 2) | |
assert dmName[0] != "#" or wsplit[0].lower() == ".w" | |
except: | |
return hexchat.EAT_NONE | |
if wsplit[0].lower() == ".w": | |
dmName = wsplit[1] | |
m = wsplit[2] | |
else: | |
m = word[1] | |
c.command(f"say .w {dmName} {m}") | |
return hexchat.EAT_ALL | |
msplit = m.split(" ", 1) | |
if m[0].lower() == "/me": | |
m = m[1] | |
act = "Private Action to Dialog" | |
else: | |
act = "Private Message to Dialog" | |
dmPost(myName, dmName, m, act) | |
return hexchat.EAT_ALL | |
# Parse and format a message directly from the server | |
def cb_parse(word, word_eol, userdata, attrs): | |
c = hexchat.get_context() | |
# Only do anything if this is Twitch | |
if c.get_info("network").lower() != "twitch": | |
return hexchat.EAT_NONE | |
mtype = word[1] | |
ircv3 = tagsplit(attrs.ircv3) | |
def tagsOut(): | |
ech("IRCv3: '{}'".format(attrs.ircv3.replace("\s", " "))) | |
ech("Dict format: " + str(ircv3)) | |
global opReturns | |
ct = f'{c.get_info("network")}/{c.get_info("channel")}' | |
if debugServer and opReturns > 0: | |
ech("CTXT: {}".format(ct)) | |
ech("TYPE: {}".format(mtype)) | |
ech("TAGS: '{}'".format(attrs.ircv3)) | |
opReturns -= 1 | |
if mtype == "ROOMSTATE": | |
return hexchat.EAT_HEXCHAT | |
# This contains information about your own badges | |
elif mtype == "USERSTATE": | |
#global userstates | |
userstates[ct] = badgesplit(ircv3["badges"]) | |
return hexchat.EAT_HEXCHAT | |
# User talks | |
# Send the IRCv3 data to the global inbox, to be read when the script picks up a message emission | |
# This is a HACK to get IRCv3 data for print events from server events; Improvement of the tag forwarding interface would render it unnecessary | |
elif mtype == "PRIVMSG": | |
global inbox | |
inbox.append(ircv3) | |
return hexchat.EAT_NONE | |
# Subscriber or Raid | |
elif mtype == "USERNOTICE": | |
stype = ircv3["msg-id"] | |
if stype == "raid": # A raid | |
src = ircv3["msg-param-displayName"] | |
num = ircv3["msg-param-viewerCount"] | |
ech(f"{src} sends {num} raiders to this channel", "Motd") | |
hexchat.command("gui color 2") | |
return hexchat.EAT_HEXCHAT | |
elif stype in ["ritual", "charity"]: # A "ritual", whatever that is | |
ech(ircv3["system-msg"].replace("\s"," "), "Motd") | |
return hexchat.EAT_HEXCHAT | |
else: # A subscription, probably | |
gifter = "" | |
subber = "" | |
num = 0 | |
streak = 0 | |
cumul = 0 | |
#tagsOut() | |
line = sublines.get(stype) | |
if not line: | |
ech("Unknown stype '{}': {}".format(stype, ircv3)) | |
return hexchat.EAT_ALL | |
if stype == "subgift": # Gift Sub | |
gifter = ircv3.get("login", "") | |
subber = ircv3.get("msg-param-recipient-user-name", "") | |
streak = int(ircv3.get("msg-param-streak-months", 0)) or int(ircv3.get("msg-param-months", 0)) | |
cumul = int(ircv3.get("msg-param-cumulative-months", 0)) | |
elif stype == "submysterygift": # Multi Gift Sub | |
gifter = ircv3.get("login", "") | |
num = int(ircv3.get("msg-param-mass-gift-count", 0)) | |
elif stype == "giftpaidupgrade": # Gift Sub Upgrade | |
subber = ircv3.get("login", "") | |
gifter = ircv3.get("msg-param-sender-login", "") | |
else: # Regular Sub | |
subber = ircv3.get("login", "") | |
streak = int(ircv3.get("msg-param-streak-months", 0)) or int(ircv3.get("msg-param-months", 0)) | |
cumul = int(ircv3.get("msg-param-cumulative-months", 0)) | |
# tagsOut() | |
pluralStreak = usePlural(streak) | |
pluralSubs = usePlural(num) | |
if streak > 1: # A Streak value is included | |
line += sublineStreak | |
else: | |
line += sublineFirst | |
if cumul: # A Cumulative month count is included | |
line += sublineCumul | |
try: # Sub is with prime, add extra part | |
if ircv3["msg-param-sub-plan"] == "Prime": | |
line += sublinePrime | |
except: | |
pass | |
try: # Sub includes a message | |
msg = word_eol[3][1:] | |
line += sublineMessage | |
except: | |
msg = "" | |
try: | |
line = line.format( | |
subber=subber, | |
streak=streak, | |
cumul=cumul, | |
gifter=gifter, | |
subNumber=num, | |
s1=pluralStreak, | |
s2=pluralSubs, | |
msg=msg, | |
) | |
except Exception as e: | |
ech(f"Something happened, but there was an error: {e}") | |
return hexchat.EAT_HEXCHAT | |
ech(line, "Motd") | |
colorTabSafely(c, 2) | |
return hexchat.EAT_HEXCHAT | |
# Pass DMs to the handler functions | |
elif mtype == "WHISPER": | |
dmRecv(ircv3) | |
return hexchat.EAT_ALL | |
# Provide direct link for Host | |
elif mtype == "HOSTTARGET": | |
hostline = word[3].replace(":", "", 1) | |
if hostline == "-": | |
return hexchat.EAT_HEXCHAT | |
else: | |
hexchat.emit_print( | |
"Server Notice", | |
"Hosting active. Destination URL: https://www.twitch.tv/{}".format( | |
hostline | |
), | |
) | |
return hexchat.EAT_HEXCHAT | |
# Distinguish selective message removal | |
elif mtype == "CLEARMSG": | |
cut = word[3].strip(":") | |
ech("A message by <{}> was deleted: '{}'".format( | |
ircv3.get("login"), | |
ircv3.get("MSG"), | |
)) | |
# tagsOut() | |
colorTabSafely(c, 1) | |
return hexchat.EAT_HEXCHAT | |
# Distinguish purges | |
elif mtype == "CLEARCHAT": | |
cut = word[3].strip(":") | |
rsn = ircv3.get("ban-reason") | |
if rsn: | |
t = " Reason: " + rsn.replace("\s", " ") | |
else: | |
t = "" | |
dur = ircv3.get("ban-duration") | |
if dur: | |
ech(f"{cut} was timed out for {dur}s.{t}") | |
else: | |
ech(f"{cut} was banned permanently.{t}") | |
# tagsOut() | |
colorTabSafely(c, 1) | |
return hexchat.EAT_HEXCHAT | |
# Parse and format a message that comes from Hexchat itself | |
def cb_parseMessage(word, word_eol, mtype): | |
try: | |
c = hexchat.get_context() | |
assert c.get_info("network").lower() == "twitch" | |
# Only do anything if this is Twitch | |
except: | |
return hexchat.EAT_NONE | |
global inbox | |
ct = f'{c.get_info("network")}/{c.get_info("channel")}' | |
mtext = hexchat.strip(word[1], -1, 3) | |
ircv3 = {} | |
for i in inbox: | |
try: | |
if MessageMatchesTagstring(mtext, i): | |
ircv3 = i | |
inbox.remove(i) | |
break | |
except: | |
pass | |
while len(inbox) > 5: | |
inbox.pop(0) | |
global msglast | |
if ircv3 == {}: | |
if "Your" in mtype and word[1] != msglast: | |
msglast = word[1] | |
if mortis: | |
mortis.pargon("", word[1], ircv3, c, mtype) | |
name = hexchat.get_info("nick") | |
bprefix = userstates.get(ct, "_ ") | |
c.emit_print(mtype, name, word[1], "", bprefix) | |
return hexchat.EAT_ALL | |
return hexchat.EAT_NONE | |
msglast = "" | |
global opReturns | |
if debugPrint and opReturns > 0: | |
ech("CTXT: {}".format(ct)) | |
ech("TEXT: {}".format(str(word))) | |
ech("TAGS: '{}'".format(str(ircv3))) | |
opReturns -= 1 | |
bline = ircv3["badges"] | |
bprefix = badgesplit(bline) | |
name = nameShorten(ircv3["display-name"], bprefix) | |
hexchat.emit_print(mtype, name, word[1], bprefix, "") | |
colorTabSafely(c, eventList[mtype]) | |
if mortis: | |
try: | |
mortis.pargon(ircv3["display-name"], word[1], ircv3, c, mtype) | |
except Exception as e: | |
ech(str(e)) | |
pass | |
return hexchat.EAT_ALL | |
# Reset the color of a newly focused tab | |
def cb_focus(word, word_eol, udat): | |
try: | |
t = hexchat.get_context() | |
except: | |
return hexchat.EAT_NONE | |
colorTabSafely(t, reset=True) | |
return hexchat.EAT_NONE | |
###===----- | |
# COMMANDS | | |
###===----- | |
def set_outputs(word, word_eol, udat): | |
try: | |
x = int(word_eol[1]) | |
global opReturns | |
opReturns = x | |
except: | |
pass | |
return hexchat.EAT_ALL | |
def printKey(word, word_eol, udat): | |
ech("Badge Key:", "Motd") | |
for i in badgeIcons: | |
if badgeIcons[i] != "": | |
ech(f"'{badgeIcons[i]}' : {i.replace('-',' ').capitalize()}", "Server Text") | |
return hexchat.EAT_ALL | |
def printCache(word, word_eol, udat): | |
ech("Cached IRCv3 strings:", "Motd") | |
for i in inbox: | |
ech(f"{i}", "Server Text") | |
return hexchat.EAT_ALL | |
def tabcolors(word, word_eol, udat): | |
ech("Registered Tab Colors:", "Motd") | |
for i in tabColors: | |
ech(f"'{i}' = {tabColors[i]}", "Server Text") | |
return hexchat.EAT_ALL | |
def tjoin(word, word_eol, udat): | |
twitch = hexchat.find_context("Twitch") | |
if twitch: | |
twitch.command(f"JOIN {word_eol[1]}") | |
return hexchat.EAT_ALL | |
hexchat.hook_command("tabs", tabcolors) | |
hexchat.hook_command( | |
"dbug", | |
set_outputs, | |
help="Usage: DBUG <n>, prints IRCv3 tags for the next <n> messages", | |
) | |
hexchat.hook_command( | |
"badges", | |
printKey, | |
help="Print a key of what badges are represented by username prefix characters", | |
) | |
hexchat.hook_command( | |
"inbox", | |
printCache, | |
help="Dump the cache of IRCv3 strings that have yet to be picked up by a matching print event", | |
) | |
hexchat.hook_command("twitchjoin", tjoin) | |
hexchat.hook_command("say", dmSend) | |
hexchat.hook_server_attrs("RAW LINE", cb_parse) | |
hexchat.hook_print("Focus Tab", cb_focus, priority=hexchat.PRI_LOW) | |
for i in eventList: # Connect every print event listed for parsing | |
hexchat.hook_print(i, cb_parseMessage, userdata=i, priority=hexchat.PRI_HIGH) | |
for i in ["Your Message", "Your Action"]: # Connect print events that can happen in PM | |
hexchat.hook_print(i, dmSend, userdata=i, priority=hexchat.PRI_HIGH) | |
ech("\00304{} loaded.\003".format(__module_name__), "Motd") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Correct; This is why a patched version is required. The process is described in the Readme of my Rust version of this plugin.