Last active
October 25, 2023 12:56
-
-
Save rafa-br34/6df6ca431d2d373545fa5cb0ac6efc1b to your computer and use it in GitHub Desktop.
A simple Roblox stream sniper that I made overnight.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#< VERSION:0.1.0, REV:10 | |
import threading | |
import requests | |
import argparse | |
import json | |
import zlib | |
import gzip | |
import time | |
import os | |
from urllib3.exceptions import InsecureRequestWarning | |
# Helper functions | |
def ClearLine(): | |
print('\r' + (' ' * (os.get_terminal_size()[0] - 1)), end='\r') | |
def LinePrint(*args, sep=' ', end=''): | |
ResultString = "".join([str(Argument) + sep for Argument in args]) | |
PadLen = (os.get_terminal_size()[0] - len(ResultString)) - 1 | |
Padding = (PadLen > 0 and (' ' * PadLen)) or '' | |
print(ResultString, end=Padding + '\r' + end) | |
def DeduplicateList(List): | |
return list(set(List)) | |
# Default arguments | |
c_DefaultThreads = 20 | |
c_DefaultSession = True | |
c_DefaultSSL = False | |
# Other constants | |
c_VerifyUpdates = True | |
c_UpdateLink = "https://gist.githubusercontent.com/rafa-br34/6df6ca431d2d373545fa5cb0ac6efc1b/raw/RobloxStreamSniper.py" | |
g_Shared = { | |
"MessageQueue": [], | |
"Config": None, | |
"Run": True, | |
"CurrentPage": None, | |
"PlayerCount": [0, 0], | |
"TokenCount": [0, 0], | |
"ServerList": [[], []], | |
"UsersData": {}, | |
"IsPaging": True, | |
"WorkerQueue": [], | |
"StartTime": 0 | |
} | |
def _CheckServer(MessageHandler, RequestHandler, TokenToServer, UsersData): | |
Payload = [] | |
for Token in TokenToServer: | |
Payload.append({ | |
"format": "png", | |
"requestId": f"0:{Token}:AvatarHeadshot:150x150:png:regular", | |
"size": "150x150", | |
"targetId": 0, | |
"token": Token, | |
"type": "AvatarHeadShot", | |
}) | |
Result = RequestHandler.post( | |
"https://thumbnails.roblox.com/v1/batch", | |
headers={ "Accept-Encoding": "gzip, deflate, br", "Content-Encoding": "gzip", "Content-Type": "application/json" }, | |
data=gzip.compress(bytes(json.dumps(Payload), "utf-8")) | |
) | |
if Result.status_code == 200: | |
for Thumbnail in Result.json()["data"]: | |
ImageLink = Thumbnail["imageUrl"] | |
if ImageLink in UsersData: | |
Token = Thumbnail["requestId"].split(':')[1] | |
Server = TokenToServer[Token] | |
UserID = UsersData[ImageLink] | |
MessageHandler( | |
"User {} found in {}ms\nRoblox.GameLauncher.joinGameInstance({}, \"{}\")".format( | |
UserID, round((time.time() - g_Shared["StartTime"]) * 1000), | |
g_Shared["Config"].gameid, Server["ID"] | |
) | |
) | |
del UsersData[ImageLink] | |
g_Shared["TokenCount"][1] += len(Payload) | |
def WorkerLogic(TID): | |
global g_Shared | |
Messages = g_Shared["MessageQueue"] | |
Config = g_Shared["Config"] | |
def Message(Text): | |
Messages.append(f"[WORKER]<{TID}> {Text}") | |
Handler = Config.session and requests.Session() or requests | |
Queue = g_Shared["WorkerQueue"] | |
while g_Shared["Run"]: | |
if len(Queue): | |
Method = Queue.pop() | |
Method.pop(0)(Message, Handler, *Method) | |
else: | |
time.sleep(0.001) | |
def VerifierLogic(TID): | |
global g_Shared | |
Messages = g_Shared["MessageQueue"] | |
Config = g_Shared["Config"] | |
def Message(Text): | |
Messages.append(f"[VERIFIER]<{TID}> {Text}") | |
Handler = Config.session and requests.Session() or requests | |
Message("Acquiring target thumbnails...") | |
Result = Handler.get( | |
"https://thumbnails.roblox.com/v1/users/avatar-headshot?userIds={}&size=150x150&format=Png&isCircular=false".format( | |
''.join([ID + ',' for ID in DeduplicateList(Config.userid)]) | |
), | |
verify=Config.ssl | |
) | |
if Result.status_code != 200: | |
Messages(f"Failed to acquire thumbnails, status code {Result.status_code}") | |
g_Shared["Run"] = False | |
return | |
UsersData = g_Shared["UsersData"] | |
for User in Result.json()["data"]: | |
UserID = User["targetId"] | |
State = User["state"] | |
if State != "Completed": | |
Message(f"Failed to acquire thumbnail for {UserID}, state: {State}") | |
else: | |
Message(f"Acquired thumbnail for {UserID}, state: {State}") | |
UsersData[User["imageUrl"]] = UserID | |
InactiveServers = g_Shared["ServerList"][1] | |
ActiveServers = g_Shared["ServerList"][0] | |
WorkQueue = g_Shared["WorkerQueue"] | |
while g_Shared["Run"]: | |
if len(WorkQueue) >= Config.threads and len(ActiveServers) <= 0: | |
time.sleep(0.01) | |
continue | |
if len(ActiveServers) > 0: | |
TokenToServer = {} | |
Server = ActiveServers.pop(0) | |
while len(TokenToServer) < 100 and Server: | |
TokenToServer[Server["Tokens"].pop()] = Server | |
if len(Server["Tokens"]) <= 0: | |
InactiveServers.append(Server) | |
Server = (len(ActiveServers) > 0) and ActiveServers.pop(0) or False | |
# We still need to check some tokens of this server | |
if Server and len(Server["Tokens"]) > 0: | |
ActiveServers.append(Server) | |
WorkQueue.append([_CheckServer, TokenToServer, UsersData]) | |
elif len(g_Shared["UsersData"]) <= 0: | |
break | |
Message("No need to continue") | |
g_Shared["Run"] = False | |
def PagingLogic(TID): | |
global g_Shared | |
Messages = g_Shared["MessageQueue"] | |
Config = g_Shared["Config"] | |
def Message(Text): | |
Messages.append(f"[PAGING]<{TID}> {Text}") | |
Handler = Config.session and requests.Session() or requests | |
Message("Paging started!") | |
g_Shared["IsPaging"] = True | |
Cursor = "" | |
while Cursor != None and g_Shared["Run"]: | |
Result = Handler.get( | |
f"https://games.roblox.com/v1/games/{Config.gameid}/servers/Public?limit=100&cursor={Cursor}", | |
verify=Config.ssl, | |
headers={ "Accept-Encoding": "gzip;q=1.0, deflate;q=0.8, br;q=0.6, *;q=0.1" } | |
) | |
if Result.status_code == 200: | |
g_Shared["CurrentPage"] = Page = Result.json() | |
for Server in Page["data"]: | |
g_Shared["ServerList"][0].append({ | |
"Tokens": Server["playerTokens"], | |
"ID": Server["id"], | |
}) | |
g_Shared["PlayerCount"][0] += Server["playing"] | |
g_Shared["PlayerCount"][1] += Server["maxPlayers"] | |
g_Shared["TokenCount"][0] += len(Server["playerTokens"]) | |
Cursor = Page["nextPageCursor"] | |
else: | |
Message(f"Result with invalid status {Result.status_code}") | |
g_Shared["IsPaging"] = False | |
Message("Paging stopped! (no more servers?)") | |
# Bootstrap functions | |
def VerifyUpdates(): | |
print("[MAIN] Verifying updates...") | |
CurrentSourceFile = open(__file__, "r+") | |
CurrentSource = CurrentSourceFile.read() | |
UpdatedSource = requests.get(c_UpdateLink).text | |
def GetMetadata(Data): | |
Index = Data.find("#<") | |
if Index < 0: | |
return None | |
RawMeta = Data[Index:] | |
if (Index := RawMeta.find('\n')) >= 0: | |
RawMeta = RawMeta[:Index] | |
Metadata = {} | |
for Tag in RawMeta[2:].split(','): | |
NameData = Tag.split(':') | |
Metadata[NameData[0].strip()] = NameData[1].strip() | |
return Metadata | |
CurrentMeta = GetMetadata(CurrentSource) | |
UpdatedMeta = GetMetadata(UpdatedSource) | |
CurrentRevision = CurrentMeta and ("REV" in CurrentMeta and int(CurrentMeta["REV"])) | |
UpdatedRevision = UpdatedMeta and ("REV" in UpdatedMeta and int(UpdatedMeta["REV"])) | |
print(f"[MAIN] Script metadata(Current/Updated): {CurrentMeta}/{UpdatedMeta}") | |
if ((CurrentRevision and UpdatedRevision) and CurrentRevision < UpdatedRevision): | |
if input("[MAIN] Would you like to update(y/n)?\n") not in ['y', "yes"]: | |
return | |
CurrentSourceFile.truncate(0) | |
CurrentSourceFile.seek(0) | |
CurrentSourceFile.write(UpdatedSource) | |
exit() | |
def ParseArguments(): | |
Arguments = argparse.ArgumentParser(description="Just another stream sniper for Roblox.") | |
Arguments.add_argument( | |
"--gameid", "-g", help="The game on which the targets are playing.", required=True, type=str | |
) | |
Arguments.add_argument( | |
"--userid", "-u", help="A list of user IDs to search for.", nargs='+', required=True, type=str | |
) | |
Arguments.add_argument( | |
"--threads", "-t", help=f"The amount of threads to use (defaults to {c_DefaultThreads}).", | |
default=c_DefaultThreads, | |
required=False, | |
type=int | |
) | |
Arguments.add_argument( | |
"--ssl", "-s", help=f"Use SSL verification (defaults to {c_DefaultSSL}).", | |
default=c_DefaultSSL, | |
required=False, | |
type=bool | |
) | |
Arguments.add_argument( | |
"--session", "-S", help=f"Use a requests session (defaults to {c_DefaultSession}).", | |
default=c_DefaultSession, | |
required=False, | |
type=bool | |
) | |
return Arguments.parse_args() | |
def main(): | |
if c_VerifyUpdates: | |
VerifyUpdates() | |
global g_Shared | |
g_Shared["Config"] = Config = ParseArguments() | |
if not Config.ssl: | |
requests.packages.urllib3.disable_warnings(category=InsecureRequestWarning) | |
Messages = g_Shared["MessageQueue"] | |
Threads = [] | |
def MakeThread(Target): | |
NewThread = threading.Thread(target=Target, args=(len(Threads),)) | |
NewThread.start() | |
Threads.append(NewThread) | |
def UpdateMessages(): | |
if len(Messages): | |
ClearLine() | |
while len(Messages): | |
print(Messages.pop(0)) | |
g_Shared["StartTime"] = time.time() | |
Messages.append("[MAIN] Starting search for users {} on game \"{}\"".format(Config.userid, Config.gameid)) | |
MakeThread(PagingLogic) | |
MakeThread(VerifierLogic) | |
for _ in range(Config.threads): | |
MakeThread(WorkerLogic) | |
Messages.append("[MAIN] CLI Loop Start") | |
try: | |
while g_Shared["Run"]: | |
UpdateMessages() | |
Page = g_Shared["CurrentPage"] | |
PagePrev = Page and Page["previousPageCursor"] | |
PageNext = Page and Page["nextPageCursor"] | |
LinePrint("Page(CRC32): {:08X}->{:08X}, Servers: {}/{}, Players: {}/{}, Tokens: {}/{}".format( | |
PagePrev and (zlib.crc32(bytes(PagePrev, "ascii")) & 0xFFFFFFFF) or 0, | |
PageNext and (zlib.crc32(bytes(PageNext, "ascii")) & 0xFFFFFFFF) or 0, | |
len(g_Shared["ServerList"][0]), len(g_Shared["ServerList"][1]), | |
g_Shared["PlayerCount"][0], g_Shared["PlayerCount"][1], | |
g_Shared["TokenCount"][0], g_Shared["TokenCount"][1] | |
)) | |
time.sleep(0.01) | |
except KeyboardInterrupt: | |
print("\n[MAIN] KeyboardInterrupt") | |
finally: | |
g_Shared["Run"] = False | |
print("[MAIN] Stopping threads...") | |
for Thread in Threads: | |
Thread.join() | |
UpdateMessages() | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment