Last active
May 6, 2019 15:41
-
-
Save allieus/3f0c709d869c462399e8e0eb1d491fc9 to your computer and use it in GitHub Desktop.
인스타그램 인증 예시 by Ask Company
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import time | |
from selenium import webdriver | |
from selenium.webdriver.common.by import By | |
from selenium.webdriver.chrome.options import Options | |
from selenium.webdriver.support import expected_conditions as EC | |
from selenium.webdriver.support.ui import WebDriverWait | |
from bs4 import BeautifulSoup | |
class Browser: | |
def __init__(self, driver_path='chromedriver', implicitly_wait_time=0): | |
self.driver = webdriver.Chrome(executable_path=driver_path) | |
def __enter__(self): | |
return self | |
def __exit__(self, type, value, traceback): | |
self.quit() | |
def quit(self): | |
self.driver.quit() | |
def visit(self, url): | |
self.driver.get(url) | |
def bs4_select(self, selector): | |
html = self.driver.page_source | |
soup = BeautifulSoup(html, 'html.parser') | |
return soup.select(selector) | |
def bs4_select_one(self, selector): | |
html = self.driver.page_source | |
soup = BeautifulSoup(html, 'html.parser') | |
return soup.select_one(selector) | |
def implicitly_wait(self, wait_time): | |
self.driver.implicitly_wait(implicitly_wait_time) | |
def webdriver_select(self, selector, explicitly_wait_time=0): | |
element_list = self.driver.find_elements_by_css_selector(selector) | |
for ele in element_list: | |
yield ele | |
def webdriver_select_one(self, selector, explicitly_wait_time=0): | |
if explicitly_wait_time: | |
return WebDriverWait(self.driver, explicitly_wait_time).until( | |
EC.presence_of_element_located((By.CSS_SELECTOR, selector)) | |
) | |
else: | |
return self.driver.find_element_by_css_selector(selector) | |
def webdriver_until(self, check_fn, explicitly_wait_time): | |
return WebDriverWait(self.driver, explicitly_wait_time).until( | |
lambda driver: check_fn(driver) | |
) | |
def scroll_page_down(self, wait=0.5): | |
self.driver.execute_script('window.scrollTo(0, document.body.scrollHeight)') | |
time.sleep(wait) | |
def is_bottom(self): | |
return self.driver.execute_script(''' | |
const doc_height = Math.max( | |
document.body.scrollHeight, | |
document.body.offsetHeight, | |
document.documentElement.clientHeight, | |
document.documentElement.scrollHeight, | |
document.documentElement.offsetHeight); | |
return (window.innerHeight + window.scrollY) >= doc_height; | |
''') |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
######################################## | |
# | |
# custom exceptions | |
# | |
######################################## | |
class UnexpectedException(Exception): | |
pass | |
class RequestSecurityCodeException(Exception): | |
pass | |
######################################## | |
# | |
# | |
######################################## | |
import time | |
import re | |
from browser import Browser | |
class Instagram: | |
POST_ID_PATTERN = re.compile(r'\/p\/(.+?)\/') | |
TAG_BASE_URL = "https://www.instagram.com/explore/tags/{}/" | |
def __init__(self, driver_path): | |
self.browser = Browser(driver_path) | |
def __enter__(self): | |
return self | |
def __exit__(self, type, value, traceback): | |
self.browser.quit() | |
def get_total(self): | |
total_str = self.browser.webdriver_select_one('main header span').text | |
matched = re.match(r'([\d,]+)', total_str) | |
total = int(matched.group(1).replace(',', '')) | |
return total | |
def get_current_post_list(self): | |
for post_tag in self.browser.bs4_select('.v1Nh3 a'): | |
post_url = post_tag['href'] | |
matched = re.search(self.POST_ID_PATTERN, post_url) | |
post_id = matched.group(1) | |
img_tag = post_tag.select_one('img') | |
try: | |
img_url, width = img_tag['srcset'].split(',')[-1].rsplit(' ', 1) | |
except KeyError: | |
img_url = None | |
post = {'id': post_id, 'img_url': img_url} | |
yield post | |
def get_post_dict(self, tag_name, max_size=100): | |
post_dict = dict() | |
page_url = self.TAG_BASE_URL.format(tag_name) | |
self.browser.visit(page_url) | |
total = self.get_total() | |
limit = min(max_size, total) | |
print('total: {} -> limit: {}'.format(total, limit)) | |
while len(post_dict) < limit: | |
self.browser.scroll_page_down(0.5) | |
self.browser.scroll_page_down(0.5) | |
is_added = False | |
for post in self.get_current_post_list(): | |
if post['id'] not in post_dict: | |
post_dict[post['id']] = post | |
is_added = True | |
if not is_added: | |
time.sleep(3) | |
is_bottom = self.browser.is_bottom() | |
if is_bottom: | |
break | |
print('\r{} / {}'.format(len(post_dict), limit), end='') | |
return post_dict | |
return post_dict | |
######################################## | |
# | |
# Instagram2 | |
# | |
######################################## | |
import re | |
import time | |
from datetime import datetime, timedelta | |
from urllib.parse import unquote | |
from imbox import Imbox # pip install imbox | |
from selenium.common.exceptions import TimeoutException | |
class Instagram2(Instagram): | |
SECURITY_CODE_PATTERN = r'[^\d](\d{6})[^\d]' | |
def get_session_id(self): | |
try: | |
session_id = unquote(self.browser.driver.get_cookie('sessionid')['value']) | |
except TypeError: | |
session_id = None # 로그아웃 상황으로 볼 수 있습니다. | |
return session_id | |
def set_session_id(self, session_id): | |
self.browser.visit("https://www.instagram.com") | |
self.browser.driver.add_cookie({ | |
"name": "sessionid", | |
"value": session_id, | |
}) | |
self.browser.driver.refresh() | |
def try_login(self, username, password): | |
self.browser.visit("https://www.instagram.com/accounts/login/") | |
self.browser.webdriver_select_one('[name=username]', 1).send_keys(username) | |
self.browser.webdriver_select_one('[name=password]', 1).send_keys(password) | |
self.browser.webdriver_select_one('[type=submit]', 1).click() | |
try: | |
self.browser.webdriver_until( | |
lambda driver: '/accounts/login/' not in driver.current_url, | |
explicitly_wait_time=3) | |
except TimeoutException: | |
pass | |
current_url = self.browser.driver.current_url | |
# Instagram 앱 다운로드 안내 페이지 | |
if '#reactivated' in current_url: | |
self.browser.visit('https://www.instagram.com') | |
elif '/challenge/' in current_url: | |
html = self.browser.driver.page_source | |
if ('보안 코드 보내기' in html) or ('Send Security Code' in html): | |
raise RequestSecurityCodeException | |
# 다른 상황이 있을 수도 있습니다. | |
if not self.is_logged: | |
raise UnexpectedException('Unexptected URL : {}'.format(current_url)) | |
else: | |
self.dismiss_notification() | |
@property | |
def is_logged(self): | |
# 문자열로 획득 (비교: BeautifulSoup4의 Tag['class'] 반환값은 리스트 타입) | |
class_str = insta.browser.webdriver_select_one('html').get_attribute('class') | |
return 'not-logged-in' not in class_str | |
def try_security_code(self, gmail_username, gmail_password): | |
html = self.browser.driver.page_source | |
for we in driver.find_elements_by_css_selector('button'): | |
if we.text in ('보안 코드 보내기', 'Send Security Code'): | |
we.click() | |
print("보안 코드 전송을 요청했습니다.") | |
break | |
else: | |
raise UnexpectedException('보안 코드 보내기 버튼을 찾을 수 없습니다.') | |
for retry in range(1, 6): | |
print("Gmail 계정 {}에서 보안코드 확인 : {}번째 시도".format(gmail_username, retry)) | |
security_code = self.get_security_code_by_gmail(gmail_username, gmail_password, minutes=5) | |
if security_code: | |
print("보안코드 {} 획득 성공".format(security_code)) | |
break | |
else: | |
time.sleep(5) | |
else: | |
raise UnexpectedException('Gmail 계정 {}에서 보안코드 확인 실패'.format(gmail_username)) | |
print("보안코드 기입") | |
self.browser.webdriver_select_one('[name=security_code]').send_keys(security_code) | |
for we in self.browser.webdriver_select('button'): | |
if we.text in ('제출', 'Submit'): | |
print("보안코드 제출") | |
we.click() | |
break | |
else: | |
raise UnexpectedException() | |
def get_security_code_by_gmail(self, username, password, **timedelta_kwargs): | |
with Imbox('imap.gmail.com', | |
username=username, password=password, | |
ssl=True, ssl_context=None, starttls=False) as imbox: | |
time = datetime.now() - timedelta(**timedelta_kwargs) | |
messages = imbox.messages( | |
sent_from='security@mail.instagram.com', | |
# subject='Verify Your Account', | |
date__gt=time) | |
for uid, message in messages: | |
body_list = message.body['html'] or message.body['plain'] | |
if body_list: | |
content = ''.join(body_list) | |
matched = re.search(self.SECURITY_CODE_PATTERN, content) | |
if matched: | |
return matched.group(1) | |
return None | |
def dismiss_notification(self): | |
for we in self.browser.webdriver_select('button'): | |
if we.text in ('Not Now', '나중에 하기'): | |
we.click() | |
######################################## | |
# | |
# main code | |
# | |
######################################## | |
import json | |
with open("auth.json", "r") as f: | |
auth = json.load(f) | |
instagram_username = auth['instagram_username'] # 인스타그램 계정명 | |
instagram_password = auth['instagram_password'] # 인스타그램 계정암호 | |
gmail_username = auth['gmail_username'] | |
gmail_password = auth['gmail_password'] # https://myaccount.google.com -> 보안 -> 앱 비밀번호 | |
### | |
driver_path = './drivers/chromedriver-mac64-74.0.3729.6' | |
with Instagram2(driver_path) as insta: | |
try: | |
insta.try_login(instagram_username, instagram_password) | |
except RequestSecurityCodeException: | |
insta.try_security_code(gmail_username, gmail_password) | |
print("is_logged:", insta.is_logged) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment