Last active
December 24, 2023 23:01
-
-
Save cidrmill/d9dfb7473e6ceb56982890653f30c9a7 to your computer and use it in GitHub Desktop.
LinkedInTel v0.1 - Abuse LinkedIn Sales Navigator API for intelligence collection and SOCMINT purposes
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python3 | |
from requests import Session, Request | |
import argparse | |
import readline, sys, os, re | |
import traceback | |
import pkgutil | |
import json | |
import stdiomask | |
import pickle | |
import time | |
from functools import partial | |
prog_will_exit = False | |
session = Session() | |
session.headers.update({ | |
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/108.0.0.0 Safari/537.36", | |
"X-Restli-Protocol-Version": "2.0.0" | |
}) | |
account = { | |
'username': None, | |
'password': None | |
} | |
endpoints = { | |
'salesApiIdentity': Request('GET', 'https://www.linkedin.com/sales-api/salesApiIdentity', params={'q': 'findLicensesByCurrentMember', 'includeRecentlyInactiveDueToOverallocation': 'true'}, headers={'csrf-token': '{{_csrfToken}}'}), | |
'salesApiAgnosticAuthentication': Request('POST', 'https://www.linkedin.com/sales-api/salesApiAgnosticAuthentication', json={'identity': { 'agnosticIdentity': { 'com.linkedin.sales.authentication.SalesCapIdentity': { 'contractUrn': '{{contractUrn}}', 'seatUrn': '{{seatUrn}}' }}, 'name': '{{name}}' }, 'viewerDeviceType': 'DESKTOP'}, headers={'Content-Type': 'text/plain;charset=UTF-8', 'csrf-token': '{{_csrfToken}}'}), | |
'salesApiCompanies': Request('GET', 'https://www.linkedin.com/sales-api/salesApiCompanies/{{id}}', params={'decoration': '(entityUrn,name,account(saved,noteCount,listCount,crmStatus,starred),pictureInfo,companyPictureDisplayImage,description,industry,location,headquarters,website,revenueRange,crmOpportunities,flagshipCompanyUrl,employeeCount,employeeDisplayCount,employeeCountRange,decisionMakersDisplayCount,personaResultCounts,employeeGrowthPercentages,employees*~fs_salesProfile(entityUrn,firstName,lastName,fullName,pictureInfo,profilePictureDisplayImage),specialties,type,yearFounded)'}, headers={'Csrf-Token': '{{_csrfToken}}'}), | |
'salesLeadSearch': Request('GET', 'https://www.linkedin.com/sales-api/salesApiLeadSearch?q=searchQuery&start={{start}}&count={{count}}&query=(recentSearchParam:(doLogHistory:false),filters:List((type:CURRENT_COMPANY,values:List((id:{{company}},selectionType:INCLUDED)))))&decorationId=com.linkedin.sales.deco.desktop.searchv2.LeadSearchResult-13', headers={'csrf-token': '{{_csrfToken}}'}), | |
'salesApiProfiles': Request('GET', 'https://www.linkedin.com/sales-api/salesApiProfiles', params={}) | |
} | |
csrf = { | |
'csrfToken': '', | |
'sId': '', | |
'loginCsrfParam': '' | |
} | |
def write_to(file, content, bytes=False): | |
with open(file, 'w%s' % ('b' if bytes else '')) as f: | |
f.write(content) | |
def save_session(): | |
write_to('cookies.bin', pickle.dumps(session.cookies, pickle.HIGHEST_PROTOCOL), True) | |
def load_session(): | |
try: | |
with open("cookies.bin", 'rb') as f: | |
session.cookies.update(pickle.loads(f.read())) | |
except FileNotFoundError: | |
print("Session data not found on disk. Creating.") | |
save_session() | |
# except TypeError: | |
# print("Session data seems to've been corrupt. Not loading.") | |
def has_session(): | |
if session.headers.get('li_a'): | |
return True | |
return False | |
def recursive_replace(d, options): | |
for k,v in d.items(): | |
if isinstance(d[k], dict): | |
recursive_replace(d[k], options) | |
else: | |
for o in options: | |
if "{{%s}}" % o in v: | |
replace_with = options[o] | |
d[k] = d[k].replace("{{%s}}" % o, str(replace_with)) | |
return d | |
def api_request(name, options={}): | |
if not is_logged_in(): | |
sign_in() | |
if endpoints.get(name): | |
req = endpoints[name] | |
for h in req.headers: | |
if "{{_csrfToken}}" in req.headers[h]: | |
req.headers[h] = req.headers[h].replace("{{_csrfToken}}", session.cookies.get('JSESSIONID').replace("\"", '')) | |
for opt in options: | |
if "{{%s}}" % opt in req.url: | |
req.url = req.url.replace("{{%s}}" % str(opt), str(options[opt])) | |
if req.method == 'POST' and req.json: | |
recursive_replace(req.json, options) | |
elif req.method == 'GET' and req.params: | |
recursive_replace(req.params, options) | |
for h in req.headers: | |
if "{{%s}}" % opt in req.headers[h]: | |
req.headers[h] = req.headers[h].replace("{{%s}}" % opt, options[opt]) | |
#req.headers[h] = req.headers[h].replace(':csrfToken:', session.cookies['JSESSIONID'].replace('"', '')) | |
r = session.send(session.prepare_request(req)) | |
if r.headers.get('Location'): | |
print("\nYou've been logged out. Attempting sign in.") | |
sign_in() | |
return api_request(name, options) | |
elif not r.status_code == 200: | |
print(r.status_code, r.reason) | |
print(name, r.text, r.request.headers, r.request.url) | |
save_session() | |
return r | |
else: | |
print("No API endpoint with that name.") | |
def process_cmd(command): | |
if command: | |
try: | |
print(eval(command, {'session': session})) | |
except: | |
traceback.print_exc() | |
return None | |
def prompt(prompt="LinkedIn~$ "): | |
command = input(prompt) | |
readline.add_history(command) | |
process_cmd(command) | |
def extract_csrf(text): | |
match = re.search(r"\"csrfToken\" value=\"(ajax:\d+)\".*\"sIdString\" value=\"([\w\d-]+)\".*\"loginCsrfParam\" value=\"([\w\d-]+)\"", text) | |
csrf['csrfToken'] = match.group(1) | |
csrf['sId'] = match.group(2) | |
csrf['loginCsrfParam'] = match.group(3) | |
def is_logged_in(): | |
r = session.get('https://www.linkedin.com/uas/login', allow_redirects=False) | |
if r.status_code == 200: | |
csrf = extract_csrf(r.text) | |
return False | |
else: | |
return True | |
def sales_auth(): | |
print("Authenticating with Sales Navigator") | |
r = api_request('salesApiIdentity') | |
if r.status_code == 200: | |
identities = json.loads(r.text) | |
data = { | |
'contractUrn': identities['elements'][0]['agnosticIdentity']['com.linkedin.sales.authentication.SalesCapIdentity']['contractUrn'], | |
'seatUrn': identities['elements'][0]['agnosticIdentity']['com.linkedin.sales.authentication.SalesCapIdentity']['seatUrn'], | |
'name': identities['elements'][0]['name'] | |
} | |
r = api_request('salesApiAgnosticAuthentication', data) | |
if r.status_code == 200 and r.cookies.get('li_a'): | |
save_session() | |
def sign_in(): | |
if is_logged_in(): | |
if has_session(): | |
print("Already signed in.") | |
else: | |
sales_auth() | |
if has_session(): | |
return | |
print("\nPlease sign in.") | |
if not account['username']: | |
account['username'] = input("Username: ") | |
else: | |
print("Username:", account['username']) | |
if not account['password']: | |
account['password'] = stdiomask.getpass(prompt="Password: ", mask="*") | |
else: | |
print("Password:", '*' * len(account['password'])) | |
headers = {'Content-Type': "application/x-www-form-urlencoded"} | |
data = { | |
'csrfToken': csrf["csrfToken"], | |
'session_key': account['username'], | |
'sIdString': csrf["sId"], | |
'parentPageKey': 'd_checkpoint_lg_consumerLogin_navigator', | |
'pageInstance': 'urn%3Ali%3Apage%3Acheckpoint_lg_uasLogin%3BXo06wk8DReigzddKwI9vrg%3D%3D', | |
'trk': 'navigator', | |
'authUUID': '', | |
'session_redirect': '/sales', | |
'loginCsrfParam': csrf['loginCsrfParam'], | |
'session_password': account['password'] | |
} | |
loginReq = Request('POST', "https://www.linkedin.com/checkpoint/lg/login-submit", headers=headers, data=data) | |
prepdReq = session.prepare_request(loginReq) | |
r = session.send(prepdReq, allow_redirects=False) | |
if r.headers['Location'].startswith('/checkpoint/challenge'): | |
print("\nLinkedIn is challenging this attempt. Please sign in on a browser and try again.") | |
elif r.headers['Location'] != data['session_redirect']: | |
print("\nInvalid login attempt. Redirected to %s" % r.headers['Location']) | |
elif r.text == "CSRF check failed.": | |
print("CSRF check failed.") | |
else: | |
sales_auth() | |
if has_session(): | |
print("Login succeeded") | |
else: | |
print("Sales navigator authentication failed.") | |
def _configure_readline(): | |
autocomp = AutoComplete() | |
readline.set_completer_delims(' \t\n;') | |
readline.parse_and_bind("tab: complete") | |
readline.set_completer(autocomp.complete) | |
#readline.set_completion_display_matches_hook(autocomp.display_match) | |
class AutoComplete(object): | |
def display_match(self): | |
return None | |
def complete(self, text, state): | |
buffer = readline.get_line_buffer() | |
line = readline.get_line_buffer().split(".") | |
# show all objects | |
if not line: | |
print(c + ' ' for c in globals()[state]) | |
return [c + ' ' for c in globals()[state]] | |
# resolve command to implementation function | |
cmd = line[0].strip() | |
if cmd in globals(): | |
# TODO get submodules | |
impl = pkgutil.iter_modules(globals()[cmd].__path__) #getattr(self, 'complete_%s' % cmd) | |
args = line[1:] | |
if args: | |
return None #(impl(args) + [None])[state] | |
return [cmd + ''][state] | |
results = [c + '' for c in globals() if c.startswith(cmd)] + [None] | |
return results[state] | |
def _recurse_dict(d, cb): | |
for k,v in d.items(): | |
if isinstance(v, dict): | |
_recurse_dict(v, cb) | |
if isinstance(v, list): | |
_recurse_list(v, cb) | |
elif isinstance(v, str): | |
cb(d[k]) | |
def _recurse_list(l, cb): | |
for v in l: | |
if isinstance(v, dict): | |
_recurse_dict(v, cb) | |
elif isinstance(v, list): | |
_recurse_list(v, cb) | |
elif isinstance(v, str): | |
cb(v) | |
def _search_and_add(ref, pattern, str): | |
m = re.findall(pattern, str) | |
for match in (m if m else []): | |
if not match.startswith("_"): | |
ref.append(match) | |
def _add_options(api_method, ref): | |
pattern = r'{{(\w+)}}' | |
for attr in dir(api_method): # foreach attribute in api_method object | |
# check attr, CANNOT be callable or start with "_" | |
if not callable(getattr(api_method, attr)) and not attr.startswith("_"): | |
vAttr = getattr(api_method, attr) # get value of attr as within api_method | |
if isinstance(vAttr, str): # string | |
_search_and_add(ref, pattern, vAttr) | |
elif isinstance(vAttr, dict): # dictionary | |
_recurse_dict(vAttr, partial(_search_and_add, ref, pattern)) | |
elif isinstance(vAttr, list): # list | |
_recurse_list(vAttr, partial(_search_and_add, ref, pattern)) | |
return ref | |
def _main(): | |
parser = argparse.ArgumentParser(prog="LinkedInTel", | |
description="Interact with LinkedIn and Sales Navigator APIs for intelligence gathering.", | |
epilog="Brought to you with love by J. Matczak.") | |
parser.add_argument("-i", dest="interactive", action="store_true") | |
parser.add_argument("-u", dest="username", default=None) | |
parser.add_argument("-p", dest="password", default=None) | |
parser.add_argument("-v", help="Show version and exit.", action="version") | |
subparsers = parser.add_subparsers(title="Commands", dest="command") | |
for api in endpoints: | |
method_name = api.replace('salesApi', '') | |
subparser = subparsers.add_parser(method_name) | |
subparser.set_defaults(func=api_request) | |
# `:variable:` within any string (url, params, json, data, headers) dictate a parameter which will be replaced | |
# later within prog execution. Find these and add their corresponding options to the subparser. | |
options = [] | |
_add_options(endpoints[api], options) | |
for o in options: | |
subparser.add_argument("-%s" % o, help="\"%s\" as it pertains to `%s`." % (o, method_name), dest=o, required=True) | |
args = parser.parse_args() | |
load_session() | |
account['username'] = args.username | |
account['password'] = args.password | |
if args.interactive: | |
print("Interactive mode requested") | |
_configure_readline() | |
while not prog_will_exit: | |
if not session.cookies.get('bscookie'): | |
print("\nNo session detected. Please sign in.") | |
sign_in() | |
else: | |
prompt() | |
else: | |
if not args.command: | |
parser.print_help() | |
exit(1) | |
if args.command == "salesLeadSearch": | |
# if count-start >= 100, divide the tasks into rounds per 100 | |
start = int(args.start) | |
count = int(args.count) | |
step = count - start | |
if step > 100: | |
step = 100 | |
for i in range(start, count, step): | |
iter = (i / step) + 1 | |
total_iter = (count / step) / 1 | |
if step + i > count: | |
step = count - i | |
left = abs((i + step) - count) | |
print("[%i/%i] Fetching leads %i-%i/%i remaining %i" % (iter, total_iter, i, (i + step), count, left)) | |
r = args.func('salesLeadSearch', { | |
'company': args.company, | |
'start': i, | |
'count': (i + step) | |
}) | |
if r.status_code == 200: | |
write_to("linkedintel-%s-%i-%i-%s.json" % ('-'.join((args.__dict__[arg].split('@')[0] if arg != 'password' and isinstance(args.__dict__[arg], str) else '') for arg in args.__dict__).strip('-'), i, (i + step), time.time()), r.text) | |
else: | |
print("Error occurred on round %i: status code %s %s\n%s" % (iter, r.status_code, r.reason, r.text), r.url) | |
break | |
#start += step | |
#if step >= count - start: | |
# step = count - start | |
else: | |
r = args.func(args.command if args.command.startswith('sales') else ("salesApi%s" % args.command), args.__dict__) | |
if r.status_code == 200: | |
write_to("linkedintel-%s-%s.json" % ('-'.join((args.__dict__[arg].split('@')[0] if arg != 'password' and isinstance(args.__dict__[arg], str) else '') for arg in args.__dict__).strip('-'), time.time()), r.text) | |
_main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment