Skip to content

Instantly share code, notes, and snippets.

@is2js
Created October 11, 2021 09:27
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save is2js/da4d6aa587763a8696cd841e989b65f1 to your computer and use it in GitHub Desktop.
Save is2js/da4d6aa587763a8696cd841e989b65f1 to your computer and use it in GitHub Desktop.
04_function_api_add_save
import json
import requests
from pymongo import MongoClient
from pymongo.errors import BulkWriteError
from pymongo.errors import DuplicateKeyError
def get_profile(user_id):
"""
정보 조회 - user_id를 입력하면 백준 사이트에서 해당 user의 프로필 정보 중 일부를 반환해줌.
:param str user_id: 사용자id
:return: 백준 프로필 정보
:rtype: dict or None
"""
url = f"https://solved.ac/api/v3/user/show?handle={user_id}"
r_profile = requests.get(url)
if r_profile.status_code == requests.codes.ok:
profile = json.loads(r_profile.content.decode('utf-8'))
profile = \
{
"tier" : profile.get("tier"),
"rank" : profile.get("rank"),
"solvedCount" : profile.get("solvedCount"),
"rating" : profile.get("rating"),
"exp" : profile.get("exp"),
}
return profile
else:
print("프로필 요청 실패")
return
def get_solved(user_id):
"""
정보 조회 - user_id를 입력하면 백준 사이트에서 해당 user가 푼 총 문제수, 문제들 정보(level 높은 순)를 튜플(int, list)로 반환해줌.
:param str user_id: 사용자id
:return: 내가 푼 문제수, 내가 푼 문제들 정보
:rtype: tuple ([0]: int, [1]:list) or None
"""
url = f"https://solved.ac/api/v3/search/problem?query=solved_by%3A{user_id}&sort=level&direction=desc"
r_solved = requests.get(url)
# 요기는 user_id가 이상하더라도, 무조건 200로 반환된다.
# print(r_solved)
# print(r_solved.content.decode('utf-8'))
# <Response [200]>
# {"count":0,"items":[]}
# 길이를 가지고 0일때 한번 return 시켜서 끝내자.
if json.loads(r_solved.content.decode('utf-8')).get('count') == 0:
print("푼 문제가 없거나 요청이 잘못 됨.")
return
if r_solved.status_code == requests.codes.ok:
solved = json.loads(r_solved.content.decode('utf-8'))
count = solved.get("count")
items = solved.get("items")
solved_problems = []
for item in items:
solved_problems.append(
{
'problemId': item.get("problemId"),
'titleKo': item.get("titleKo"),
'level': item.get("level"),
}
)
# print("푼 문제수와 젤 고난이도 문제 1개만 >>>", count, solved_problems[0])
return count, solved_problems
else:
print("푼 문제들 요청 실패")
return
def get_count_by_level(user_id):
"""
정보 조회 - user_id를 입력하면 백준 사이트에서 해당 user가 푼 문제들에 대한 level별 문제수 정보를 level 높은 순으로 반환해줌.
:param str user_id: 사용자id
:return: level별 총 문제수, 내가 푼 문제수
:rtype: list or None
"""
url = f"https://solved.ac/api/v3/user/problem_stats?handle={user_id}"
r_count_by_level = requests.get(url)
if r_count_by_level.status_code == requests.codes.ok:
count_by_level = json.loads(r_count_by_level.content.decode('utf-8'))
filted_count_by_level = [ {"level":dict_['level'], "total":dict_['total'], "solved":dict_['solved'],} for dict_ in count_by_level if dict_.get('solved') != 0 ]
filted_count_by_level = sorted(filted_count_by_level, key=lambda x:x['level'], reverse=True)
return filted_count_by_level
else:
print("레벨별, 전체 문제수, 푼 문제수 요청 실패")
return
def save_to_db(data_list, collection_names, user_id, my_ip, username, password, db_name):
"""
3개의 콜레션에, 3개의 api요청 정보를 저장함.
:params list data_list: 3개의 api함수 요청으로 인해 들어오는 3개의 데이터
:params list collection_names: 3개의 api요청 데이터를 처리할 3개의 collectin list
:params str my_ip: DB IP
:params str username: DB 계정
:params str password: DB 계정 비밀번호
:params str db_name: DB 이름
:return result: <DB 저장의 결과(성공, 실패등)> 메세지를 담은 dictionary(3개의 collection마다 result_콜렉션명의 key에 표기)
:rtype dict
"""
#db_result = {'result':'success'} # default sucess -> 중복발생시, Insert and Ignore라는 메세지 넣어줄 것임.
db_result = {}
client = MongoClient(host=my_ip, port=27017, username=username, password=password)
db = client[db_name]
# data 와 colletion 1:1 매칭시켜서 zip으로 처리
for data, collection_name in zip(data_list, collection_names):
db_result[f'result_{collection_name}'] = 'success' # default sucess -> 중복발생시, Insert and Ignore라는 메세지 넣어줄 것임.
collection = db[collection_name]
if collection_name == 'user':
# data == profile_dict -> tier, rank, solvedCount, rating, exp -> user_id 추가 -> 그대로 저장
data['user_id'] = user_id
elif collection_name == 'count_by_level':
# data == count_by_level_list -> + level, solved, total -> 개별 user_id 추가 -> 그대로 저장
new_data = []
for x in data:
x['user_id'] = user_id
new_data.append(x)
data = new_data
else:
data = [ {"problemId" : x.get('problemId', None), "user_id" : user_id} for x in data[1]]
try:
if collection_name == 'user':
try:
# 중복방지용 -> 1. index로 지정안해주면, 계속 참.
collection.create_index([('user',1)],unique=True)
collection.insert_one(data)
except DuplicateKeyError as dke:
# print("user 이미 존재함.업데이트 시작.", dke)
# 2. find조건을 user_id로 찾은 뒤, $set을 들어온 새 데이터 전체로 업데이트
collection.update_one({"user_id":user_id}, { "$set": data }, upsert=True)
else:
collection.insert_many(data, ordered=False)
except BulkWriteError as bwe:
db_result[f'result_{collection_name}'] = 'Insert and Ignore duplicated data'
return db_result
user_id = 'tingstyle1'
profile_dict = get_profile(user_id)
count_by_level_list = get_count_by_level(user_id)
count_and_sovled_list = get_solved(user_id)
data_list = [profile_dict, count_by_level_list, count_and_sovled_list]
collection_names = ['user', 'count_by_level', 'count_and_sovled']
my_ip = '***.85.181.203'
username = 'likelion'
password = '***213'
db_name = 'likelion'
result = save_to_db(data_list, collection_names, user_id,
my_ip, username, password, db_name)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment