Skip to content

Instantly share code, notes, and snippets.

@rafa-br34
Last active October 25, 2023 12:56
Show Gist options
  • Save rafa-br34/6df6ca431d2d373545fa5cb0ac6efc1b to your computer and use it in GitHub Desktop.
Save rafa-br34/6df6ca431d2d373545fa5cb0ac6efc1b to your computer and use it in GitHub Desktop.
A simple Roblox stream sniper that I made overnight.
#< VERSION:0.1.0, REV:10
import threading
import requests
import argparse
import json
import zlib
import gzip
import time
import os
from urllib3.exceptions import InsecureRequestWarning
# Helper functions
def ClearLine():
print('\r' + (' ' * (os.get_terminal_size()[0] - 1)), end='\r')
def LinePrint(*args, sep=' ', end=''):
ResultString = "".join([str(Argument) + sep for Argument in args])
PadLen = (os.get_terminal_size()[0] - len(ResultString)) - 1
Padding = (PadLen > 0 and (' ' * PadLen)) or ''
print(ResultString, end=Padding + '\r' + end)
def DeduplicateList(List):
return list(set(List))
# Default arguments
c_DefaultThreads = 20
c_DefaultSession = True
c_DefaultSSL = False
# Other constants
c_VerifyUpdates = True
c_UpdateLink = "https://gist.githubusercontent.com/rafa-br34/6df6ca431d2d373545fa5cb0ac6efc1b/raw/RobloxStreamSniper.py"
g_Shared = {
"MessageQueue": [],
"Config": None,
"Run": True,
"CurrentPage": None,
"PlayerCount": [0, 0],
"TokenCount": [0, 0],
"ServerList": [[], []],
"UsersData": {},
"IsPaging": True,
"WorkerQueue": [],
"StartTime": 0
}
def _CheckServer(MessageHandler, RequestHandler, TokenToServer, UsersData):
Payload = []
for Token in TokenToServer:
Payload.append({
"format": "png",
"requestId": f"0:{Token}:AvatarHeadshot:150x150:png:regular",
"size": "150x150",
"targetId": 0,
"token": Token,
"type": "AvatarHeadShot",
})
Result = RequestHandler.post(
"https://thumbnails.roblox.com/v1/batch",
headers={ "Accept-Encoding": "gzip, deflate, br", "Content-Encoding": "gzip", "Content-Type": "application/json" },
data=gzip.compress(bytes(json.dumps(Payload), "utf-8"))
)
if Result.status_code == 200:
for Thumbnail in Result.json()["data"]:
ImageLink = Thumbnail["imageUrl"]
if ImageLink in UsersData:
Token = Thumbnail["requestId"].split(':')[1]
Server = TokenToServer[Token]
UserID = UsersData[ImageLink]
MessageHandler(
"User {} found in {}ms\nRoblox.GameLauncher.joinGameInstance({}, \"{}\")".format(
UserID, round((time.time() - g_Shared["StartTime"]) * 1000),
g_Shared["Config"].gameid, Server["ID"]
)
)
del UsersData[ImageLink]
g_Shared["TokenCount"][1] += len(Payload)
def WorkerLogic(TID):
global g_Shared
Messages = g_Shared["MessageQueue"]
Config = g_Shared["Config"]
def Message(Text):
Messages.append(f"[WORKER]<{TID}> {Text}")
Handler = Config.session and requests.Session() or requests
Queue = g_Shared["WorkerQueue"]
while g_Shared["Run"]:
if len(Queue):
Method = Queue.pop()
Method.pop(0)(Message, Handler, *Method)
else:
time.sleep(0.001)
def VerifierLogic(TID):
global g_Shared
Messages = g_Shared["MessageQueue"]
Config = g_Shared["Config"]
def Message(Text):
Messages.append(f"[VERIFIER]<{TID}> {Text}")
Handler = Config.session and requests.Session() or requests
Message("Acquiring target thumbnails...")
Result = Handler.get(
"https://thumbnails.roblox.com/v1/users/avatar-headshot?userIds={}&size=150x150&format=Png&isCircular=false".format(
''.join([ID + ',' for ID in DeduplicateList(Config.userid)])
),
verify=Config.ssl
)
if Result.status_code != 200:
Messages(f"Failed to acquire thumbnails, status code {Result.status_code}")
g_Shared["Run"] = False
return
UsersData = g_Shared["UsersData"]
for User in Result.json()["data"]:
UserID = User["targetId"]
State = User["state"]
if State != "Completed":
Message(f"Failed to acquire thumbnail for {UserID}, state: {State}")
else:
Message(f"Acquired thumbnail for {UserID}, state: {State}")
UsersData[User["imageUrl"]] = UserID
InactiveServers = g_Shared["ServerList"][1]
ActiveServers = g_Shared["ServerList"][0]
WorkQueue = g_Shared["WorkerQueue"]
while g_Shared["Run"]:
if len(WorkQueue) >= Config.threads and len(ActiveServers) <= 0:
time.sleep(0.01)
continue
if len(ActiveServers) > 0:
TokenToServer = {}
Server = ActiveServers.pop(0)
while len(TokenToServer) < 100 and Server:
TokenToServer[Server["Tokens"].pop()] = Server
if len(Server["Tokens"]) <= 0:
InactiveServers.append(Server)
Server = (len(ActiveServers) > 0) and ActiveServers.pop(0) or False
# We still need to check some tokens of this server
if Server and len(Server["Tokens"]) > 0:
ActiveServers.append(Server)
WorkQueue.append([_CheckServer, TokenToServer, UsersData])
elif len(g_Shared["UsersData"]) <= 0:
break
Message("No need to continue")
g_Shared["Run"] = False
def PagingLogic(TID):
global g_Shared
Messages = g_Shared["MessageQueue"]
Config = g_Shared["Config"]
def Message(Text):
Messages.append(f"[PAGING]<{TID}> {Text}")
Handler = Config.session and requests.Session() or requests
Message("Paging started!")
g_Shared["IsPaging"] = True
Cursor = ""
while Cursor != None and g_Shared["Run"]:
Result = Handler.get(
f"https://games.roblox.com/v1/games/{Config.gameid}/servers/Public?limit=100&cursor={Cursor}",
verify=Config.ssl,
headers={ "Accept-Encoding": "gzip;q=1.0, deflate;q=0.8, br;q=0.6, *;q=0.1" }
)
if Result.status_code == 200:
g_Shared["CurrentPage"] = Page = Result.json()
for Server in Page["data"]:
g_Shared["ServerList"][0].append({
"Tokens": Server["playerTokens"],
"ID": Server["id"],
})
g_Shared["PlayerCount"][0] += Server["playing"]
g_Shared["PlayerCount"][1] += Server["maxPlayers"]
g_Shared["TokenCount"][0] += len(Server["playerTokens"])
Cursor = Page["nextPageCursor"]
else:
Message(f"Result with invalid status {Result.status_code}")
g_Shared["IsPaging"] = False
Message("Paging stopped! (no more servers?)")
# Bootstrap functions
def VerifyUpdates():
print("[MAIN] Verifying updates...")
CurrentSourceFile = open(__file__, "r+")
CurrentSource = CurrentSourceFile.read()
UpdatedSource = requests.get(c_UpdateLink).text
def GetMetadata(Data):
Index = Data.find("#<")
if Index < 0:
return None
RawMeta = Data[Index:]
if (Index := RawMeta.find('\n')) >= 0:
RawMeta = RawMeta[:Index]
Metadata = {}
for Tag in RawMeta[2:].split(','):
NameData = Tag.split(':')
Metadata[NameData[0].strip()] = NameData[1].strip()
return Metadata
CurrentMeta = GetMetadata(CurrentSource)
UpdatedMeta = GetMetadata(UpdatedSource)
CurrentRevision = CurrentMeta and ("REV" in CurrentMeta and int(CurrentMeta["REV"]))
UpdatedRevision = UpdatedMeta and ("REV" in UpdatedMeta and int(UpdatedMeta["REV"]))
print(f"[MAIN] Script metadata(Current/Updated): {CurrentMeta}/{UpdatedMeta}")
if ((CurrentRevision and UpdatedRevision) and CurrentRevision < UpdatedRevision):
if input("[MAIN] Would you like to update(y/n)?\n") not in ['y', "yes"]:
return
CurrentSourceFile.truncate(0)
CurrentSourceFile.seek(0)
CurrentSourceFile.write(UpdatedSource)
exit()
def ParseArguments():
Arguments = argparse.ArgumentParser(description="Just another stream sniper for Roblox.")
Arguments.add_argument(
"--gameid", "-g", help="The game on which the targets are playing.", required=True, type=str
)
Arguments.add_argument(
"--userid", "-u", help="A list of user IDs to search for.", nargs='+', required=True, type=str
)
Arguments.add_argument(
"--threads", "-t", help=f"The amount of threads to use (defaults to {c_DefaultThreads}).",
default=c_DefaultThreads,
required=False,
type=int
)
Arguments.add_argument(
"--ssl", "-s", help=f"Use SSL verification (defaults to {c_DefaultSSL}).",
default=c_DefaultSSL,
required=False,
type=bool
)
Arguments.add_argument(
"--session", "-S", help=f"Use a requests session (defaults to {c_DefaultSession}).",
default=c_DefaultSession,
required=False,
type=bool
)
return Arguments.parse_args()
def main():
if c_VerifyUpdates:
VerifyUpdates()
global g_Shared
g_Shared["Config"] = Config = ParseArguments()
if not Config.ssl:
requests.packages.urllib3.disable_warnings(category=InsecureRequestWarning)
Messages = g_Shared["MessageQueue"]
Threads = []
def MakeThread(Target):
NewThread = threading.Thread(target=Target, args=(len(Threads),))
NewThread.start()
Threads.append(NewThread)
def UpdateMessages():
if len(Messages):
ClearLine()
while len(Messages):
print(Messages.pop(0))
g_Shared["StartTime"] = time.time()
Messages.append("[MAIN] Starting search for users {} on game \"{}\"".format(Config.userid, Config.gameid))
MakeThread(PagingLogic)
MakeThread(VerifierLogic)
for _ in range(Config.threads):
MakeThread(WorkerLogic)
Messages.append("[MAIN] CLI Loop Start")
try:
while g_Shared["Run"]:
UpdateMessages()
Page = g_Shared["CurrentPage"]
PagePrev = Page and Page["previousPageCursor"]
PageNext = Page and Page["nextPageCursor"]
LinePrint("Page(CRC32): {:08X}->{:08X}, Servers: {}/{}, Players: {}/{}, Tokens: {}/{}".format(
PagePrev and (zlib.crc32(bytes(PagePrev, "ascii")) & 0xFFFFFFFF) or 0,
PageNext and (zlib.crc32(bytes(PageNext, "ascii")) & 0xFFFFFFFF) or 0,
len(g_Shared["ServerList"][0]), len(g_Shared["ServerList"][1]),
g_Shared["PlayerCount"][0], g_Shared["PlayerCount"][1],
g_Shared["TokenCount"][0], g_Shared["TokenCount"][1]
))
time.sleep(0.01)
except KeyboardInterrupt:
print("\n[MAIN] KeyboardInterrupt")
finally:
g_Shared["Run"] = False
print("[MAIN] Stopping threads...")
for Thread in Threads:
Thread.join()
UpdateMessages()
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment