Skip to content

Instantly share code, notes, and snippets.

@6mini
Created February 15, 2024 04:40
Show Gist options
  • Save 6mini/44ff7413f1b88bdb377e8f2ca731d5f5 to your computer and use it in GitHub Desktop.
Save 6mini/44ff7413f1b88bdb377e8f2ca731d5f5 to your computer and use it in GitHub Desktop.
크롤링 중 발생할 수 있는 IP 차단 문제를 우회하기 위한 전략을 구현한다. Python의 requests와 BeautifulSoup 라이브러리를 사용하여 웹 페이지에서 제품 정보를 수집하고, 동시에 여러 요청을 관리하기 위해 concurrent.futures와 스레딩을 활용한다. IP 차단 감지 시, AWS Lambda 함수를 트리거하여 EC2 인스턴스를 재부팅하고 새로운 IP 주소를 획득함으로써 연속적인 데이터 수집 작업을 중단 없이 계속할 수 있다. 데이터는 로컬 환경에서 pickle 형태로 상태 관리되며, AWS S3를 통해 작업 완료 혹은 재부팅 필요성을 판단하는 트리거를 생성한다.
import requests
from bs4 import BeautifulSoup
from concurrent.futures import ThreadPoolExecutor, as_completed
import pandas as pd
import threading
import datetime
import boto3
from pytz import timezone
import pickle
# 제품 정보를 수집하는 함수
def fetch_product_info(product_id, stop_event):
# 작업 중단 여부를 전달하기 위한 이벤트
# 이벤트가 끝난 경우 그냥 다 패스
if stop_event.is_set():
return None
url = f"https://example.com/{product_id}"
headers = {'User-Agent': 'Mozilla/5.0'}
response = requests.get(url, headers=headers)
# IP 차단 여부 확인
if response.status_code != 200:
print(f"IP 차단 감지: {response.status_code}")
return None
try:
soup = BeautifulSoup(response.text, 'html.parser')
product_name = soup.select_one('h1.product-name').text
data = {
'product_id' : product_id,
'product_name' : product_name,
...
}
return data
# 기타 예외 처리
except Exception as e:
print(f"크롤링 오류: {e}")
return None
# 데이터베이스에 저장하는 함수
def save_to_db(df):
...
def process_id_list(product_ids, error_ids, final_error_ids):
# 연속 에러 체크 카운터
consecutive_error_counter = 0
# 결과 저장 리스트
results = []
# 중단 여부 확인을 위한 이벤트
stop_event = threading.Event()
stop_event.clear()
# 남은 아이템 수 출력
len_product_ids = len(product_ids)
print("남은 아이템 수:", len_product_ids)
with ThreadPoolExecutor(max_workers=10) as executor:
futures = {executor.submit(fetch_product_info, product_id, stop_event): product_id for product_id in product_ids}
for future in as_completed(futures):
product_id = futures[future]
result = future.result()
# 이벤트가 끝난 경우 그냥 다 패스해주기
if stop_event.is_set():
pass
# 5회 이상 에러 발생 시
elif consecutive_error_counter >= 5:
print("5회 연속 에러로 인한 인스턴스 재구동")
# 종료 이벤트 설정
stop_event.set()
# 정상 동작 시
elif result is not None:
# 결과 저장
results.append(result)
# 연속 에러 카운터 초기화
consecutive_error_counter = 0
# 처리 ID 제거: 재실행할 때 다시 돌아가면 안됨
product_ids.remove(product_id)
# 에러 발생
else:
print(f"{product_id}에러 발생")
# 만약 한번 에러난 이력이 있으면
if product_id in error_ids:
# 최종 에러 리스트에 추가
final_error_ids.append(product_id)
# 작업 목록에서 제거
product_ids.remove(product_id)
else:
# 에러 리스트에 추가
error_ids.append(product_id)
# 연속 에러 카운터 증가
consecutive_error_counter += 1
return results
# 트리거용 S3 버킷에 파일을 업로드하는 함수
def s3_trigger(s3_prefix):
now = datetime.datetime.now(timezone('Asia/Seoul'))
date_str = now.strftime('%Y-%m-%d_%H:%M')
s3_bucket = 'my-bucket-name'
s3 = boto3.client('s3')
s3_key = s3_prefix + date_str + '.txt'
s3.put_object(Body='', Bucket=s3_bucket, Key=s3_key)
if __name__ == "__main__":
# 변수 관리용 연월 설정
now = datetime.datetime.now(timezone('Asia/Seoul'))
year_month = now.strftime('%Y-%m')
try:
# 이어서 진행할 경우 파일에서 읽어오기
with open(f"state_{year_month}.pkl", 'rb') as f:
product_ids, error_ids, final_error_ids = pickle.load(f)
except FileNotFoundError:
# 파일이 없는 경우 새로운 월로 간주
product_ids = [1001, 1002, 1003, 1004, 1005, ...]
error_ids = []
final_error_ids = []
# 작업 수행
results = process_id_list(product_ids, error_ids, final_error_ids)
# 결과 저장
if results:
save_to_db(results)
# 작업 완료 확인
if not product_ids:
# 변수 저장
with open(f"state_{year_month}.pkl", 'wb') as f:
pickle.dump((product_ids, error_ids, final_error_ids), f)
# 인스턴스 종료 위한 작업 완료 트리거
s3_prefix = 'complete/'
s3_trigger(s3_prefix)
# 작업 미완료 시
else:
# 변수 저장
with open(f"state_{year_month}.pkl", 'wb') as f:
pickle.dump((product_ids, error_ids, final_error_ids), f)
# 인스턴스 재부팅 위한 트리거
s3_prefix = 'reboot/'
s3_trigger(s3_prefix)
# EC2RebootFunction
import boto3
# EC2 클라이언트 생성
ec2 = boto3.client('ec2')
# 재부팅할 인스턴스 ID
instance_id = 'i-01a23456b7cd8910e'
def lambda_handler(event, context):
# 인스턴스 중지
ec2.stop_instances(InstanceIds=[instance_id])
# 인스턴스 중지 대기
waiter = ec2.get_waiter('instance_stopped')
waiter.wait(InstanceIds=[instance_id])
# 인스턴스 시작
ec2.start_instances(InstanceIds=[instance_id])
print(f'Instance {instance_id} restarted successfully')
# EC2StopFunction
import boto3
# EC2 클라이언트 생성
ec2 = boto3.client('ec2')
# 종료할 인스턴스 ID
instance_id = 'i-01a23456b7cd8910e'
def lambda_handler(event, context):
# 인스턴스 중지
ec2.stop_instances(InstanceIds=[instance_id])
# 인스턴스 중지 대기
waiter = ec2.get_waiter('instance_stopped')
waiter.wait(InstanceIds=[instance_id])
print(f'Instance {instance_id} stopped successfully')
# EC2StartFunction
import boto3
# EC2 클라이언트 생성
ec2 = boto3.client('ec2')
# 시작할 인스턴스 ID
instance_id = 'i-01a23456b7cd8910e'
def lambda_handler(event, context):
# 인스턴스 시작
ec2.start_instances(InstanceIds=[instance_id])
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment