Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save josifoski/3150f3b5d5ba4762517c165eae13352d to your computer and use it in GitHub Desktop.
Save josifoski/3150f3b5d5ba4762517c165eae13352d to your computer and use it in GitHub Desktop.
#! /usr/bin/env python3.6
# Script for scraping linkedin companies data with input csv with companies urls
# Aleksandar Josifoski for Jordan Hollander
# 2017 May 22;
from pyvirtualdisplay import Display
from bs4 import BeautifulSoup
from selenium import webdriver
from selenium.webdriver.common.desired_capabilities import DesiredCapabilities
from selenium.webdriver.common.keys import Keys
from selenium.common.exceptions import TimeoutException
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support.ui import Select
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
import datetime
import random
import time
import html
import os
import re
import sys
import codecs
import csv
#reading parameters file
print("loading linkedin_parameters2.py file")
with codecs.open("linkedin_parameters2.py", "r", "utf-8") as fp:
sparam = ''
for line in fp:
if len(line.strip()) > 0:
if not line.strip()[0] == '#':
sparam += line
try:
dparameters = dict(eval(sparam))
except Exception as e:
print(str(e))
now = str(datetime.datetime.now())[:16]
log.write(now + ' ' + str(e) + os.linesep)
sys.exit()
dir_in = dparameters["dir_in"].strip()
timeout = dparameters["timeout"]
geckodriverexcecutablePath = dparameters["geckodriverexcecutablePath"].strip()
usegecko = dparameters["usegecko"]
ffProfilePath = dparameters["ffProfilePath"]
ffWidth = dparameters["ffWidth"]
ffHeight = dparameters["ffHeight"]
scrollbypx = dparameters["scrollbypx"]
headlessMode = dparameters["headlessMode"]
input_csv = dparameters["input_csv"]
# read input from csv file. You'll have to set correct info here, ie place/create input_companies_urls.csv in dir_in
# Note that first line in input csv will be skipped
with open(dir_in + input_csv, 'r') as fcsvinput:
lurls = fcsvinput.readlines()
lurls = lurls[1:]
# check if linkedin_companies_output.csv exists
if not os.path.exists(dir_in + 'linkedin_companies_output.csv'):
write_first_row = True
else:
write_first_row = False
# define output csv
csvdelimiter = '#'
csvFile = codecs.open(dir_in + 'linkedin_companies_output.csv', 'a', 'utf-8')
csvl = csv.writer(csvFile, delimiter = csvdelimiter)
if write_first_row:
csvl.writerow(["logo image url", "company name", "num of employees", "description", "specialities", "location", "website url", "year founded"])
if headlessMode:
display = Display(visible=0, size=(ffWidth, ffHeight))
display.start()
log = codecs.open(dir_in + "linkedin_errorslog.txt", "a", "utf-8")
time1 = time.time()
counter = 0
def open_tag_by_css(css_selector):
'''function to click item based on css selector'''
driver.find_element_by_css_selector(css_selector).click()
def open_tag_by_xpath(xpath):
'''function to click item based on xpath'''
driver.find_element_by_xpath(xpath).click()
def enter_in_tag_by_css(css_selector, text):
'''function to enter text based on css selector'''
driver.find_element_by_css_selector(css_selector).send_keys(text)
def enter_in_tag_by_xpath(xpath, text):
'''function to enter text based on xpath'''
driver.find_element_by_xpath(xpath).send_keys(text)
def save_response_to_file(text):
'''temporary function to analyse html response'''
with codecs.open(dir_in + "rawresponse.txt", "w", "utf-8") as fresp:
fresp.write(html.unescape(text))
def waitForLoadbyCSS(CSS_SELECTOR):
'''function to wait until web element is available via css check'''
wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, CSS_SELECTOR)))
def waitForLoadbyXpath(xpath):
'''function to wait until web element is available via xpath check'''
try:
wait.until(EC.presence_of_element_located((By.XPATH, xpath)))
return True
except:
return False
def openurl(url):
'''function to open url using selenium'''
global counter
try:
counter += 1
driver.get(url)
print('%05d' % counter + '-' * 100)
print("loading " + url)
except Exception as e:
now = str(datetime.datetime.now())[:16]
log.write(now + ' ' + str(e) + os.linesep)
print(str(e))
def setbrowser():
''' function for preparing browser for automation '''
print("Preparing browser")
global driver
global wait
profile = webdriver.FirefoxProfile(profile_directory = ffProfilePath)
capabilities = DesiredCapabilities.FIREFOX
if usegecko:
capabilities["marionette"] = True
driver = webdriver.Firefox(firefox_profile = profile,
capabilities = capabilities,
executable_path = geckodriverexcecutablePath)
driver.set_window_size(ffWidth, ffHeight)
driver.implicitly_wait(timeout)
wait = WebDriverWait(driver, timeout)
def scroll_smoothly(sbypx):
#driver.execute_script("window.scrollTo(0, 0);")
#time.sleep(0.3)
driver.execute_script("window.scrollBy(0, %d);" % (sbypx))
time.sleep(0.3)
#driver.execute_script("window.scrollTo(0, document.body.scrollHeight);")
#time.sleep(1)
def is_element_present(xpath):
'''checking is element present based on xpath'''
try:
driver.find_element_by_xpath(xpath)
bprocess = True
except:
bprocess = False
return bprocess
def parse_companies(url):
# open url company page
openurl(url)
# since MacOS have problem with wait for element presence, time.sleep should be added with large num of seconds
time.sleep(7)
xpath = "//h1[contains(@dir,'ltr')]"
is_element_present(xpath)
# Logo url
xpath = "//img[contains(@alt,'Logo')]"
try:
logo_url = driver.find_element_by_xpath(xpath).get_attribute("outerHTML")
pattern = re.compile(r'src="(.*?)"')
logo_url = pattern.search(logo_url).group(1)
except:
logo_url = '/'
print(logo_url)
# company_name
xpath = "//h1[contains(@dir,'ltr')]"
try:
company_name = driver.find_element_by_xpath(xpath).get_attribute("innerHTML")
company_name = company_name.strip()
except:
company_name = "/"
print(company_name)
# number of employees
xpath = "//strong[contains(.,'See all') and contains(.,'employees on LinkedIn')]"
try:
num_of_employees = driver.find_element_by_xpath(xpath).get_attribute("innerHTML")
pattern = re.compile(r'(\d+)')
num_of_employees = pattern.search(num_of_employees).group(1)
except:
num_of_employees = "/"
print(num_of_employees)
# description
xpath = "//p[contains(@class,'org-about-us-organization-description__text description')]"
try:
description = driver.find_element_by_xpath(xpath).get_attribute("innerHTML")
description = description.replace('\r', '').replace('\n', ' ').replace('\t', ' ')
description = description.strip()
except:
description = "/"
print(description)
scroll_smoothly(100)
# is See more collapsed? click on it
xpath = "//button[contains(@id,'show-details-btn') and contains(@aria-expanded,'false')]"
try:
driver.find_element_by_xpath(xpath).click()
time.sleep(1)
scroll_smoothly(400)
time.sleep(0.5)
except:
#xpath = "//button[contains(@id,'show-details-btn') and contains(@aria-expanded,'true')]"
pass
# specialities
xpath = "//p[contains(@class,'specialities mb5')]"
try:
specialities = driver.find_element_by_xpath(xpath).get_attribute("innerHTML")
specialities = specialities.replace('\n', ' ')
specialities = specialities.strip()
except:
specialities = "/"
print(specialities)
# headquarters
xpath = "//p[contains(@class,'headquarters')]"
try:
headquarters = driver.find_element_by_xpath(xpath).get_attribute("innerHTML")
headquarters = headquarters.strip()
except:
headquarters = "/"
print(headquarters)
# website
xpath = "//a[contains(@class,'website') and contains(@class,'link')]"
try:
website = driver.find_element_by_xpath(xpath).get_attribute("innerHTML")
website = website.strip()
except:
website = "/"
print(website)
# founded
xpath = "//p[contains(@class,'org-about-company-module__founded')]"
try:
founded = driver.find_element_by_xpath(xpath).get_attribute("innerHTML")
founded = founded.strip()
except:
founded = "/"
print(founded)
csvl.writerow([logo_url, company_name, num_of_employees, description, specialities, headquarters, website, founded])
def calculate_time():
'''function to calculate elapsed time'''
time2 = time.time()
hours = int((time2-time1)/3600)
minutes = int((time2-time1 - hours * 3600)/60)
sec = time2 - time1 - hours * 3600 - minutes * 60
print("processed in %dh:%dm:%ds" % (hours, minutes, sec))
if __name__ == '__main__':
setbrowser()
for url in lurls:
url = url.strip()
parse_companies(url)
calculate_time()
log.close()
driver.close()
if headlessMode:
display.stop()
csvFile.close()
print('Done.')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment