Skip to content

Instantly share code, notes, and snippets.

Last active December 24, 2023 23:01
Show Gist options
  • Save cidrmill/d9dfb7473e6ceb56982890653f30c9a7 to your computer and use it in GitHub Desktop.
Save cidrmill/d9dfb7473e6ceb56982890653f30c9a7 to your computer and use it in GitHub Desktop.
LinkedInTel v0.1 - Abuse LinkedIn Sales Navigator API for intelligence collection and SOCMINT purposes
from requests import Session, Request
import argparse
import readline, sys, os, re
import traceback
import pkgutil
import json
import stdiomask
import pickle
import time
from functools import partial
prog_will_exit = False
session = Session()
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/ Safari/537.36",
"X-Restli-Protocol-Version": "2.0.0"
account = {
'username': None,
'password': None
endpoints = {
'salesApiIdentity': Request('GET', '', params={'q': 'findLicensesByCurrentMember', 'includeRecentlyInactiveDueToOverallocation': 'true'}, headers={'csrf-token': '{{_csrfToken}}'}),
'salesApiAgnosticAuthentication': Request('POST', '', json={'identity': { 'agnosticIdentity': { 'com.linkedin.sales.authentication.SalesCapIdentity': { 'contractUrn': '{{contractUrn}}', 'seatUrn': '{{seatUrn}}' }}, 'name': '{{name}}' }, 'viewerDeviceType': 'DESKTOP'}, headers={'Content-Type': 'text/plain;charset=UTF-8', 'csrf-token': '{{_csrfToken}}'}),
'salesApiCompanies': Request('GET', '{{id}}', params={'decoration': '(entityUrn,name,account(saved,noteCount,listCount,crmStatus,starred),pictureInfo,companyPictureDisplayImage,description,industry,location,headquarters,website,revenueRange,crmOpportunities,flagshipCompanyUrl,employeeCount,employeeDisplayCount,employeeCountRange,decisionMakersDisplayCount,personaResultCounts,employeeGrowthPercentages,employees*~fs_salesProfile(entityUrn,firstName,lastName,fullName,pictureInfo,profilePictureDisplayImage),specialties,type,yearFounded)'}, headers={'Csrf-Token': '{{_csrfToken}}'}),
'salesLeadSearch': Request('GET', '{{start}}&count={{count}}&query=(recentSearchParam:(doLogHistory:false),filters:List((type:CURRENT_COMPANY,values:List((id:{{company}},selectionType:INCLUDED)))))&decorationId=com.linkedin.sales.deco.desktop.searchv2.LeadSearchResult-13', headers={'csrf-token': '{{_csrfToken}}'}),
'salesApiProfiles': Request('GET', '', params={})
csrf = {
'csrfToken': '',
'sId': '',
'loginCsrfParam': ''
def write_to(file, content, bytes=False):
with open(file, 'w%s' % ('b' if bytes else '')) as f:
def save_session():
write_to('cookies.bin', pickle.dumps(session.cookies, pickle.HIGHEST_PROTOCOL), True)
def load_session():
with open("cookies.bin", 'rb') as f:
except FileNotFoundError:
print("Session data not found on disk. Creating.")
# except TypeError:
# print("Session data seems to've been corrupt. Not loading.")
def has_session():
if session.headers.get('li_a'):
return True
return False
def recursive_replace(d, options):
for k,v in d.items():
if isinstance(d[k], dict):
recursive_replace(d[k], options)
for o in options:
if "{{%s}}" % o in v:
replace_with = options[o]
d[k] = d[k].replace("{{%s}}" % o, str(replace_with))
return d
def api_request(name, options={}):
if not is_logged_in():
if endpoints.get(name):
req = endpoints[name]
for h in req.headers:
if "{{_csrfToken}}" in req.headers[h]:
req.headers[h] = req.headers[h].replace("{{_csrfToken}}", session.cookies.get('JSESSIONID').replace("\"", ''))
for opt in options:
if "{{%s}}" % opt in req.url:
req.url = req.url.replace("{{%s}}" % str(opt), str(options[opt]))
if req.method == 'POST' and req.json:
recursive_replace(req.json, options)
elif req.method == 'GET' and req.params:
recursive_replace(req.params, options)
for h in req.headers:
if "{{%s}}" % opt in req.headers[h]:
req.headers[h] = req.headers[h].replace("{{%s}}" % opt, options[opt])
#req.headers[h] = req.headers[h].replace(':csrfToken:', session.cookies['JSESSIONID'].replace('"', ''))
r = session.send(session.prepare_request(req))
if r.headers.get('Location'):
print("\nYou've been logged out. Attempting sign in.")
return api_request(name, options)
elif not r.status_code == 200:
print(r.status_code, r.reason)
print(name, r.text, r.request.headers, r.request.url)
return r
print("No API endpoint with that name.")
def process_cmd(command):
if command:
print(eval(command, {'session': session}))
return None
def prompt(prompt="LinkedIn~$ "):
command = input(prompt)
def extract_csrf(text):
match ="\"csrfToken\" value=\"(ajax:\d+)\".*\"sIdString\" value=\"([\w\d-]+)\".*\"loginCsrfParam\" value=\"([\w\d-]+)\"", text)
csrf['csrfToken'] =
csrf['sId'] =
csrf['loginCsrfParam'] =
def is_logged_in():
r = session.get('', allow_redirects=False)
if r.status_code == 200:
csrf = extract_csrf(r.text)
return False
return True
def sales_auth():
print("Authenticating with Sales Navigator")
r = api_request('salesApiIdentity')
if r.status_code == 200:
identities = json.loads(r.text)
data = {
'contractUrn': identities['elements'][0]['agnosticIdentity']['com.linkedin.sales.authentication.SalesCapIdentity']['contractUrn'],
'seatUrn': identities['elements'][0]['agnosticIdentity']['com.linkedin.sales.authentication.SalesCapIdentity']['seatUrn'],
'name': identities['elements'][0]['name']
r = api_request('salesApiAgnosticAuthentication', data)
if r.status_code == 200 and r.cookies.get('li_a'):
def sign_in():
if is_logged_in():
if has_session():
print("Already signed in.")
if has_session():
print("\nPlease sign in.")
if not account['username']:
account['username'] = input("Username: ")
print("Username:", account['username'])
if not account['password']:
account['password'] = stdiomask.getpass(prompt="Password: ", mask="*")
print("Password:", '*' * len(account['password']))
headers = {'Content-Type': "application/x-www-form-urlencoded"}
data = {
'csrfToken': csrf["csrfToken"],
'session_key': account['username'],
'sIdString': csrf["sId"],
'parentPageKey': 'd_checkpoint_lg_consumerLogin_navigator',
'pageInstance': 'urn%3Ali%3Apage%3Acheckpoint_lg_uasLogin%3BXo06wk8DReigzddKwI9vrg%3D%3D',
'trk': 'navigator',
'authUUID': '',
'session_redirect': '/sales',
'loginCsrfParam': csrf['loginCsrfParam'],
'session_password': account['password']
loginReq = Request('POST', "", headers=headers, data=data)
prepdReq = session.prepare_request(loginReq)
r = session.send(prepdReq, allow_redirects=False)
if r.headers['Location'].startswith('/checkpoint/challenge'):
print("\nLinkedIn is challenging this attempt. Please sign in on a browser and try again.")
elif r.headers['Location'] != data['session_redirect']:
print("\nInvalid login attempt. Redirected to %s" % r.headers['Location'])
elif r.text == "CSRF check failed.":
print("CSRF check failed.")
if has_session():
print("Login succeeded")
print("Sales navigator authentication failed.")
def _configure_readline():
autocomp = AutoComplete()
readline.set_completer_delims(' \t\n;')
readline.parse_and_bind("tab: complete")
class AutoComplete(object):
def display_match(self):
return None
def complete(self, text, state):
buffer = readline.get_line_buffer()
line = readline.get_line_buffer().split(".")
# show all objects
if not line:
print(c + ' ' for c in globals()[state])
return [c + ' ' for c in globals()[state]]
# resolve command to implementation function
cmd = line[0].strip()
if cmd in globals():
# TODO get submodules
impl = pkgutil.iter_modules(globals()[cmd].__path__) #getattr(self, 'complete_%s' % cmd)
args = line[1:]
if args:
return None #(impl(args) + [None])[state]
return [cmd + ''][state]
results = [c + '' for c in globals() if c.startswith(cmd)] + [None]
return results[state]
def _recurse_dict(d, cb):
for k,v in d.items():
if isinstance(v, dict):
_recurse_dict(v, cb)
if isinstance(v, list):
_recurse_list(v, cb)
elif isinstance(v, str):
def _recurse_list(l, cb):
for v in l:
if isinstance(v, dict):
_recurse_dict(v, cb)
elif isinstance(v, list):
_recurse_list(v, cb)
elif isinstance(v, str):
def _search_and_add(ref, pattern, str):
m = re.findall(pattern, str)
for match in (m if m else []):
if not match.startswith("_"):
def _add_options(api_method, ref):
pattern = r'{{(\w+)}}'
for attr in dir(api_method): # foreach attribute in api_method object
# check attr, CANNOT be callable or start with "_"
if not callable(getattr(api_method, attr)) and not attr.startswith("_"):
vAttr = getattr(api_method, attr) # get value of attr as within api_method
if isinstance(vAttr, str): # string
_search_and_add(ref, pattern, vAttr)
elif isinstance(vAttr, dict): # dictionary
_recurse_dict(vAttr, partial(_search_and_add, ref, pattern))
elif isinstance(vAttr, list): # list
_recurse_list(vAttr, partial(_search_and_add, ref, pattern))
return ref
def _main():
parser = argparse.ArgumentParser(prog="LinkedInTel",
description="Interact with LinkedIn and Sales Navigator APIs for intelligence gathering.",
epilog="Brought to you with love by J. Matczak.")
parser.add_argument("-i", dest="interactive", action="store_true")
parser.add_argument("-u", dest="username", default=None)
parser.add_argument("-p", dest="password", default=None)
parser.add_argument("-v", help="Show version and exit.", action="version")
subparsers = parser.add_subparsers(title="Commands", dest="command")
for api in endpoints:
method_name = api.replace('salesApi', '')
subparser = subparsers.add_parser(method_name)
# `:variable:` within any string (url, params, json, data, headers) dictate a parameter which will be replaced
# later within prog execution. Find these and add their corresponding options to the subparser.
options = []
_add_options(endpoints[api], options)
for o in options:
subparser.add_argument("-%s" % o, help="\"%s\" as it pertains to `%s`." % (o, method_name), dest=o, required=True)
args = parser.parse_args()
account['username'] = args.username
account['password'] = args.password
if args.interactive:
print("Interactive mode requested")
while not prog_will_exit:
if not session.cookies.get('bscookie'):
print("\nNo session detected. Please sign in.")
if not args.command:
if args.command == "salesLeadSearch":
# if count-start >= 100, divide the tasks into rounds per 100
start = int(args.start)
count = int(args.count)
step = count - start
if step > 100:
step = 100
for i in range(start, count, step):
iter = (i / step) + 1
total_iter = (count / step) / 1
if step + i > count:
step = count - i
left = abs((i + step) - count)
print("[%i/%i] Fetching leads %i-%i/%i remaining %i" % (iter, total_iter, i, (i + step), count, left))
r = args.func('salesLeadSearch', {
'start': i,
'count': (i + step)
if r.status_code == 200:
write_to("linkedintel-%s-%i-%i-%s.json" % ('-'.join((args.__dict__[arg].split('@')[0] if arg != 'password' and isinstance(args.__dict__[arg], str) else '') for arg in args.__dict__).strip('-'), i, (i + step), time.time()), r.text)
print("Error occurred on round %i: status code %s %s\n%s" % (iter, r.status_code, r.reason, r.text), r.url)
#start += step
#if step >= count - start:
# step = count - start
r = args.func(args.command if args.command.startswith('sales') else ("salesApi%s" % args.command), args.__dict__)
if r.status_code == 200:
write_to("linkedintel-%s-%s.json" % ('-'.join((args.__dict__[arg].split('@')[0] if arg != 'password' and isinstance(args.__dict__[arg], str) else '') for arg in args.__dict__).strip('-'), time.time()), r.text)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment