Skip to content

Instantly share code, notes, and snippets.

@cidrmill
Last active December 24, 2023 23:01
Show Gist options
  • Save cidrmill/d9dfb7473e6ceb56982890653f30c9a7 to your computer and use it in GitHub Desktop.
Save cidrmill/d9dfb7473e6ceb56982890653f30c9a7 to your computer and use it in GitHub Desktop.
LinkedInTel v0.1 - Abuse LinkedIn Sales Navigator API for intelligence collection and SOCMINT purposes
#!/usr/bin/python3
from requests import Session, Request
import argparse
import readline, sys, os, re
import traceback
import pkgutil
import json
import stdiomask
import pickle
import time
from functools import partial
prog_will_exit = False
session = Session()
session.headers.update({
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/108.0.0.0 Safari/537.36",
"X-Restli-Protocol-Version": "2.0.0"
})
account = {
'username': None,
'password': None
}
endpoints = {
'salesApiIdentity': Request('GET', 'https://www.linkedin.com/sales-api/salesApiIdentity', params={'q': 'findLicensesByCurrentMember', 'includeRecentlyInactiveDueToOverallocation': 'true'}, headers={'csrf-token': '{{_csrfToken}}'}),
'salesApiAgnosticAuthentication': Request('POST', 'https://www.linkedin.com/sales-api/salesApiAgnosticAuthentication', json={'identity': { 'agnosticIdentity': { 'com.linkedin.sales.authentication.SalesCapIdentity': { 'contractUrn': '{{contractUrn}}', 'seatUrn': '{{seatUrn}}' }}, 'name': '{{name}}' }, 'viewerDeviceType': 'DESKTOP'}, headers={'Content-Type': 'text/plain;charset=UTF-8', 'csrf-token': '{{_csrfToken}}'}),
'salesApiCompanies': Request('GET', 'https://www.linkedin.com/sales-api/salesApiCompanies/{{id}}', params={'decoration': '(entityUrn,name,account(saved,noteCount,listCount,crmStatus,starred),pictureInfo,companyPictureDisplayImage,description,industry,location,headquarters,website,revenueRange,crmOpportunities,flagshipCompanyUrl,employeeCount,employeeDisplayCount,employeeCountRange,decisionMakersDisplayCount,personaResultCounts,employeeGrowthPercentages,employees*~fs_salesProfile(entityUrn,firstName,lastName,fullName,pictureInfo,profilePictureDisplayImage),specialties,type,yearFounded)'}, headers={'Csrf-Token': '{{_csrfToken}}'}),
'salesLeadSearch': Request('GET', 'https://www.linkedin.com/sales-api/salesApiLeadSearch?q=searchQuery&start={{start}}&count={{count}}&query=(recentSearchParam:(doLogHistory:false),filters:List((type:CURRENT_COMPANY,values:List((id:{{company}},selectionType:INCLUDED)))))&decorationId=com.linkedin.sales.deco.desktop.searchv2.LeadSearchResult-13', headers={'csrf-token': '{{_csrfToken}}'}),
'salesApiProfiles': Request('GET', 'https://www.linkedin.com/sales-api/salesApiProfiles', params={})
}
csrf = {
'csrfToken': '',
'sId': '',
'loginCsrfParam': ''
}
def write_to(file, content, bytes=False):
with open(file, 'w%s' % ('b' if bytes else '')) as f:
f.write(content)
def save_session():
write_to('cookies.bin', pickle.dumps(session.cookies, pickle.HIGHEST_PROTOCOL), True)
def load_session():
try:
with open("cookies.bin", 'rb') as f:
session.cookies.update(pickle.loads(f.read()))
except FileNotFoundError:
print("Session data not found on disk. Creating.")
save_session()
# except TypeError:
# print("Session data seems to've been corrupt. Not loading.")
def has_session():
if session.headers.get('li_a'):
return True
return False
def recursive_replace(d, options):
for k,v in d.items():
if isinstance(d[k], dict):
recursive_replace(d[k], options)
else:
for o in options:
if "{{%s}}" % o in v:
replace_with = options[o]
d[k] = d[k].replace("{{%s}}" % o, str(replace_with))
return d
def api_request(name, options={}):
if not is_logged_in():
sign_in()
if endpoints.get(name):
req = endpoints[name]
for h in req.headers:
if "{{_csrfToken}}" in req.headers[h]:
req.headers[h] = req.headers[h].replace("{{_csrfToken}}", session.cookies.get('JSESSIONID').replace("\"", ''))
for opt in options:
if "{{%s}}" % opt in req.url:
req.url = req.url.replace("{{%s}}" % str(opt), str(options[opt]))
if req.method == 'POST' and req.json:
recursive_replace(req.json, options)
elif req.method == 'GET' and req.params:
recursive_replace(req.params, options)
for h in req.headers:
if "{{%s}}" % opt in req.headers[h]:
req.headers[h] = req.headers[h].replace("{{%s}}" % opt, options[opt])
#req.headers[h] = req.headers[h].replace(':csrfToken:', session.cookies['JSESSIONID'].replace('"', ''))
r = session.send(session.prepare_request(req))
if r.headers.get('Location'):
print("\nYou've been logged out. Attempting sign in.")
sign_in()
return api_request(name, options)
elif not r.status_code == 200:
print(r.status_code, r.reason)
print(name, r.text, r.request.headers, r.request.url)
save_session()
return r
else:
print("No API endpoint with that name.")
def process_cmd(command):
if command:
try:
print(eval(command, {'session': session}))
except:
traceback.print_exc()
return None
def prompt(prompt="LinkedIn~$ "):
command = input(prompt)
readline.add_history(command)
process_cmd(command)
def extract_csrf(text):
match = re.search(r"\"csrfToken\" value=\"(ajax:\d+)\".*\"sIdString\" value=\"([\w\d-]+)\".*\"loginCsrfParam\" value=\"([\w\d-]+)\"", text)
csrf['csrfToken'] = match.group(1)
csrf['sId'] = match.group(2)
csrf['loginCsrfParam'] = match.group(3)
def is_logged_in():
r = session.get('https://www.linkedin.com/uas/login', allow_redirects=False)
if r.status_code == 200:
csrf = extract_csrf(r.text)
return False
else:
return True
def sales_auth():
print("Authenticating with Sales Navigator")
r = api_request('salesApiIdentity')
if r.status_code == 200:
identities = json.loads(r.text)
data = {
'contractUrn': identities['elements'][0]['agnosticIdentity']['com.linkedin.sales.authentication.SalesCapIdentity']['contractUrn'],
'seatUrn': identities['elements'][0]['agnosticIdentity']['com.linkedin.sales.authentication.SalesCapIdentity']['seatUrn'],
'name': identities['elements'][0]['name']
}
r = api_request('salesApiAgnosticAuthentication', data)
if r.status_code == 200 and r.cookies.get('li_a'):
save_session()
def sign_in():
if is_logged_in():
if has_session():
print("Already signed in.")
else:
sales_auth()
if has_session():
return
print("\nPlease sign in.")
if not account['username']:
account['username'] = input("Username: ")
else:
print("Username:", account['username'])
if not account['password']:
account['password'] = stdiomask.getpass(prompt="Password: ", mask="*")
else:
print("Password:", '*' * len(account['password']))
headers = {'Content-Type': "application/x-www-form-urlencoded"}
data = {
'csrfToken': csrf["csrfToken"],
'session_key': account['username'],
'sIdString': csrf["sId"],
'parentPageKey': 'd_checkpoint_lg_consumerLogin_navigator',
'pageInstance': 'urn%3Ali%3Apage%3Acheckpoint_lg_uasLogin%3BXo06wk8DReigzddKwI9vrg%3D%3D',
'trk': 'navigator',
'authUUID': '',
'session_redirect': '/sales',
'loginCsrfParam': csrf['loginCsrfParam'],
'session_password': account['password']
}
loginReq = Request('POST', "https://www.linkedin.com/checkpoint/lg/login-submit", headers=headers, data=data)
prepdReq = session.prepare_request(loginReq)
r = session.send(prepdReq, allow_redirects=False)
if r.headers['Location'].startswith('/checkpoint/challenge'):
print("\nLinkedIn is challenging this attempt. Please sign in on a browser and try again.")
elif r.headers['Location'] != data['session_redirect']:
print("\nInvalid login attempt. Redirected to %s" % r.headers['Location'])
elif r.text == "CSRF check failed.":
print("CSRF check failed.")
else:
sales_auth()
if has_session():
print("Login succeeded")
else:
print("Sales navigator authentication failed.")
def _configure_readline():
autocomp = AutoComplete()
readline.set_completer_delims(' \t\n;')
readline.parse_and_bind("tab: complete")
readline.set_completer(autocomp.complete)
#readline.set_completion_display_matches_hook(autocomp.display_match)
class AutoComplete(object):
def display_match(self):
return None
def complete(self, text, state):
buffer = readline.get_line_buffer()
line = readline.get_line_buffer().split(".")
# show all objects
if not line:
print(c + ' ' for c in globals()[state])
return [c + ' ' for c in globals()[state]]
# resolve command to implementation function
cmd = line[0].strip()
if cmd in globals():
# TODO get submodules
impl = pkgutil.iter_modules(globals()[cmd].__path__) #getattr(self, 'complete_%s' % cmd)
args = line[1:]
if args:
return None #(impl(args) + [None])[state]
return [cmd + ''][state]
results = [c + '' for c in globals() if c.startswith(cmd)] + [None]
return results[state]
def _recurse_dict(d, cb):
for k,v in d.items():
if isinstance(v, dict):
_recurse_dict(v, cb)
if isinstance(v, list):
_recurse_list(v, cb)
elif isinstance(v, str):
cb(d[k])
def _recurse_list(l, cb):
for v in l:
if isinstance(v, dict):
_recurse_dict(v, cb)
elif isinstance(v, list):
_recurse_list(v, cb)
elif isinstance(v, str):
cb(v)
def _search_and_add(ref, pattern, str):
m = re.findall(pattern, str)
for match in (m if m else []):
if not match.startswith("_"):
ref.append(match)
def _add_options(api_method, ref):
pattern = r'{{(\w+)}}'
for attr in dir(api_method): # foreach attribute in api_method object
# check attr, CANNOT be callable or start with "_"
if not callable(getattr(api_method, attr)) and not attr.startswith("_"):
vAttr = getattr(api_method, attr) # get value of attr as within api_method
if isinstance(vAttr, str): # string
_search_and_add(ref, pattern, vAttr)
elif isinstance(vAttr, dict): # dictionary
_recurse_dict(vAttr, partial(_search_and_add, ref, pattern))
elif isinstance(vAttr, list): # list
_recurse_list(vAttr, partial(_search_and_add, ref, pattern))
return ref
def _main():
parser = argparse.ArgumentParser(prog="LinkedInTel",
description="Interact with LinkedIn and Sales Navigator APIs for intelligence gathering.",
epilog="Brought to you with love by J. Matczak.")
parser.add_argument("-i", dest="interactive", action="store_true")
parser.add_argument("-u", dest="username", default=None)
parser.add_argument("-p", dest="password", default=None)
parser.add_argument("-v", help="Show version and exit.", action="version")
subparsers = parser.add_subparsers(title="Commands", dest="command")
for api in endpoints:
method_name = api.replace('salesApi', '')
subparser = subparsers.add_parser(method_name)
subparser.set_defaults(func=api_request)
# `:variable:` within any string (url, params, json, data, headers) dictate a parameter which will be replaced
# later within prog execution. Find these and add their corresponding options to the subparser.
options = []
_add_options(endpoints[api], options)
for o in options:
subparser.add_argument("-%s" % o, help="\"%s\" as it pertains to `%s`." % (o, method_name), dest=o, required=True)
args = parser.parse_args()
load_session()
account['username'] = args.username
account['password'] = args.password
if args.interactive:
print("Interactive mode requested")
_configure_readline()
while not prog_will_exit:
if not session.cookies.get('bscookie'):
print("\nNo session detected. Please sign in.")
sign_in()
else:
prompt()
else:
if not args.command:
parser.print_help()
exit(1)
if args.command == "salesLeadSearch":
# if count-start >= 100, divide the tasks into rounds per 100
start = int(args.start)
count = int(args.count)
step = count - start
if step > 100:
step = 100
for i in range(start, count, step):
iter = (i / step) + 1
total_iter = (count / step) / 1
if step + i > count:
step = count - i
left = abs((i + step) - count)
print("[%i/%i] Fetching leads %i-%i/%i remaining %i" % (iter, total_iter, i, (i + step), count, left))
r = args.func('salesLeadSearch', {
'company': args.company,
'start': i,
'count': (i + step)
})
if r.status_code == 200:
write_to("linkedintel-%s-%i-%i-%s.json" % ('-'.join((args.__dict__[arg].split('@')[0] if arg != 'password' and isinstance(args.__dict__[arg], str) else '') for arg in args.__dict__).strip('-'), i, (i + step), time.time()), r.text)
else:
print("Error occurred on round %i: status code %s %s\n%s" % (iter, r.status_code, r.reason, r.text), r.url)
break
#start += step
#if step >= count - start:
# step = count - start
else:
r = args.func(args.command if args.command.startswith('sales') else ("salesApi%s" % args.command), args.__dict__)
if r.status_code == 200:
write_to("linkedintel-%s-%s.json" % ('-'.join((args.__dict__[arg].split('@')[0] if arg != 'password' and isinstance(args.__dict__[arg], str) else '') for arg in args.__dict__).strip('-'), time.time()), r.text)
_main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment