Created
February 15, 2024 04:40
-
-
Save 6mini/44ff7413f1b88bdb377e8f2ca731d5f5 to your computer and use it in GitHub Desktop.
크롤링 중 발생할 수 있는 IP 차단 문제를 우회하기 위한 전략을 구현한다. Python의 requests와 BeautifulSoup 라이브러리를 사용하여 웹 페이지에서 제품 정보를 수집하고, 동시에 여러 요청을 관리하기 위해 concurrent.futures와 스레딩을 활용한다. IP 차단 감지 시, AWS Lambda 함수를 트리거하여 EC2 인스턴스를 재부팅하고 새로운 IP 주소를 획득함으로써 연속적인 데이터 수집 작업을 중단 없이 계속할 수 있다. 데이터는 로컬 환경에서 pickle 형태로 상태 관리되며, AWS S3를 통해 작업 완료 혹은 재부팅 필요성을 판단하는 트리거를 생성한다.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import requests | |
from bs4 import BeautifulSoup | |
from concurrent.futures import ThreadPoolExecutor, as_completed | |
import pandas as pd | |
import threading | |
import datetime | |
import boto3 | |
from pytz import timezone | |
import pickle | |
# 제품 정보를 수집하는 함수 | |
def fetch_product_info(product_id, stop_event): | |
# 작업 중단 여부를 전달하기 위한 이벤트 | |
# 이벤트가 끝난 경우 그냥 다 패스 | |
if stop_event.is_set(): | |
return None | |
url = f"https://example.com/{product_id}" | |
headers = {'User-Agent': 'Mozilla/5.0'} | |
response = requests.get(url, headers=headers) | |
# IP 차단 여부 확인 | |
if response.status_code != 200: | |
print(f"IP 차단 감지: {response.status_code}") | |
return None | |
try: | |
soup = BeautifulSoup(response.text, 'html.parser') | |
product_name = soup.select_one('h1.product-name').text | |
data = { | |
'product_id' : product_id, | |
'product_name' : product_name, | |
... | |
} | |
return data | |
# 기타 예외 처리 | |
except Exception as e: | |
print(f"크롤링 오류: {e}") | |
return None | |
# 데이터베이스에 저장하는 함수 | |
def save_to_db(df): | |
... | |
def process_id_list(product_ids, error_ids, final_error_ids): | |
# 연속 에러 체크 카운터 | |
consecutive_error_counter = 0 | |
# 결과 저장 리스트 | |
results = [] | |
# 중단 여부 확인을 위한 이벤트 | |
stop_event = threading.Event() | |
stop_event.clear() | |
# 남은 아이템 수 출력 | |
len_product_ids = len(product_ids) | |
print("남은 아이템 수:", len_product_ids) | |
with ThreadPoolExecutor(max_workers=10) as executor: | |
futures = {executor.submit(fetch_product_info, product_id, stop_event): product_id for product_id in product_ids} | |
for future in as_completed(futures): | |
product_id = futures[future] | |
result = future.result() | |
# 이벤트가 끝난 경우 그냥 다 패스해주기 | |
if stop_event.is_set(): | |
pass | |
# 5회 이상 에러 발생 시 | |
elif consecutive_error_counter >= 5: | |
print("5회 연속 에러로 인한 인스턴스 재구동") | |
# 종료 이벤트 설정 | |
stop_event.set() | |
# 정상 동작 시 | |
elif result is not None: | |
# 결과 저장 | |
results.append(result) | |
# 연속 에러 카운터 초기화 | |
consecutive_error_counter = 0 | |
# 처리 ID 제거: 재실행할 때 다시 돌아가면 안됨 | |
product_ids.remove(product_id) | |
# 에러 발생 | |
else: | |
print(f"{product_id}에러 발생") | |
# 만약 한번 에러난 이력이 있으면 | |
if product_id in error_ids: | |
# 최종 에러 리스트에 추가 | |
final_error_ids.append(product_id) | |
# 작업 목록에서 제거 | |
product_ids.remove(product_id) | |
else: | |
# 에러 리스트에 추가 | |
error_ids.append(product_id) | |
# 연속 에러 카운터 증가 | |
consecutive_error_counter += 1 | |
return results | |
# 트리거용 S3 버킷에 파일을 업로드하는 함수 | |
def s3_trigger(s3_prefix): | |
now = datetime.datetime.now(timezone('Asia/Seoul')) | |
date_str = now.strftime('%Y-%m-%d_%H:%M') | |
s3_bucket = 'my-bucket-name' | |
s3 = boto3.client('s3') | |
s3_key = s3_prefix + date_str + '.txt' | |
s3.put_object(Body='', Bucket=s3_bucket, Key=s3_key) | |
if __name__ == "__main__": | |
# 변수 관리용 연월 설정 | |
now = datetime.datetime.now(timezone('Asia/Seoul')) | |
year_month = now.strftime('%Y-%m') | |
try: | |
# 이어서 진행할 경우 파일에서 읽어오기 | |
with open(f"state_{year_month}.pkl", 'rb') as f: | |
product_ids, error_ids, final_error_ids = pickle.load(f) | |
except FileNotFoundError: | |
# 파일이 없는 경우 새로운 월로 간주 | |
product_ids = [1001, 1002, 1003, 1004, 1005, ...] | |
error_ids = [] | |
final_error_ids = [] | |
# 작업 수행 | |
results = process_id_list(product_ids, error_ids, final_error_ids) | |
# 결과 저장 | |
if results: | |
save_to_db(results) | |
# 작업 완료 확인 | |
if not product_ids: | |
# 변수 저장 | |
with open(f"state_{year_month}.pkl", 'wb') as f: | |
pickle.dump((product_ids, error_ids, final_error_ids), f) | |
# 인스턴스 종료 위한 작업 완료 트리거 | |
s3_prefix = 'complete/' | |
s3_trigger(s3_prefix) | |
# 작업 미완료 시 | |
else: | |
# 변수 저장 | |
with open(f"state_{year_month}.pkl", 'wb') as f: | |
pickle.dump((product_ids, error_ids, final_error_ids), f) | |
# 인스턴스 재부팅 위한 트리거 | |
s3_prefix = 'reboot/' | |
s3_trigger(s3_prefix) | |
# EC2RebootFunction | |
import boto3 | |
# EC2 클라이언트 생성 | |
ec2 = boto3.client('ec2') | |
# 재부팅할 인스턴스 ID | |
instance_id = 'i-01a23456b7cd8910e' | |
def lambda_handler(event, context): | |
# 인스턴스 중지 | |
ec2.stop_instances(InstanceIds=[instance_id]) | |
# 인스턴스 중지 대기 | |
waiter = ec2.get_waiter('instance_stopped') | |
waiter.wait(InstanceIds=[instance_id]) | |
# 인스턴스 시작 | |
ec2.start_instances(InstanceIds=[instance_id]) | |
print(f'Instance {instance_id} restarted successfully') | |
# EC2StopFunction | |
import boto3 | |
# EC2 클라이언트 생성 | |
ec2 = boto3.client('ec2') | |
# 종료할 인스턴스 ID | |
instance_id = 'i-01a23456b7cd8910e' | |
def lambda_handler(event, context): | |
# 인스턴스 중지 | |
ec2.stop_instances(InstanceIds=[instance_id]) | |
# 인스턴스 중지 대기 | |
waiter = ec2.get_waiter('instance_stopped') | |
waiter.wait(InstanceIds=[instance_id]) | |
print(f'Instance {instance_id} stopped successfully') | |
# EC2StartFunction | |
import boto3 | |
# EC2 클라이언트 생성 | |
ec2 = boto3.client('ec2') | |
# 시작할 인스턴스 ID | |
instance_id = 'i-01a23456b7cd8910e' | |
def lambda_handler(event, context): | |
# 인스턴스 시작 | |
ec2.start_instances(InstanceIds=[instance_id]) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment