Created
October 11, 2021 09:27
-
-
Save is2js/da4d6aa587763a8696cd841e989b65f1 to your computer and use it in GitHub Desktop.
04_function_api_add_save
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import json | |
import requests | |
from pymongo import MongoClient | |
from pymongo.errors import BulkWriteError | |
from pymongo.errors import DuplicateKeyError | |
def get_profile(user_id): | |
""" | |
정보 조회 - user_id를 입력하면 백준 사이트에서 해당 user의 프로필 정보 중 일부를 반환해줌. | |
:param str user_id: 사용자id | |
:return: 백준 프로필 정보 | |
:rtype: dict or None | |
""" | |
url = f"https://solved.ac/api/v3/user/show?handle={user_id}" | |
r_profile = requests.get(url) | |
if r_profile.status_code == requests.codes.ok: | |
profile = json.loads(r_profile.content.decode('utf-8')) | |
profile = \ | |
{ | |
"tier" : profile.get("tier"), | |
"rank" : profile.get("rank"), | |
"solvedCount" : profile.get("solvedCount"), | |
"rating" : profile.get("rating"), | |
"exp" : profile.get("exp"), | |
} | |
return profile | |
else: | |
print("프로필 요청 실패") | |
return | |
def get_solved(user_id): | |
""" | |
정보 조회 - user_id를 입력하면 백준 사이트에서 해당 user가 푼 총 문제수, 문제들 정보(level 높은 순)를 튜플(int, list)로 반환해줌. | |
:param str user_id: 사용자id | |
:return: 내가 푼 문제수, 내가 푼 문제들 정보 | |
:rtype: tuple ([0]: int, [1]:list) or None | |
""" | |
url = f"https://solved.ac/api/v3/search/problem?query=solved_by%3A{user_id}&sort=level&direction=desc" | |
r_solved = requests.get(url) | |
# 요기는 user_id가 이상하더라도, 무조건 200로 반환된다. | |
# print(r_solved) | |
# print(r_solved.content.decode('utf-8')) | |
# <Response [200]> | |
# {"count":0,"items":[]} | |
# 길이를 가지고 0일때 한번 return 시켜서 끝내자. | |
if json.loads(r_solved.content.decode('utf-8')).get('count') == 0: | |
print("푼 문제가 없거나 요청이 잘못 됨.") | |
return | |
if r_solved.status_code == requests.codes.ok: | |
solved = json.loads(r_solved.content.decode('utf-8')) | |
count = solved.get("count") | |
items = solved.get("items") | |
solved_problems = [] | |
for item in items: | |
solved_problems.append( | |
{ | |
'problemId': item.get("problemId"), | |
'titleKo': item.get("titleKo"), | |
'level': item.get("level"), | |
} | |
) | |
# print("푼 문제수와 젤 고난이도 문제 1개만 >>>", count, solved_problems[0]) | |
return count, solved_problems | |
else: | |
print("푼 문제들 요청 실패") | |
return | |
def get_count_by_level(user_id): | |
""" | |
정보 조회 - user_id를 입력하면 백준 사이트에서 해당 user가 푼 문제들에 대한 level별 문제수 정보를 level 높은 순으로 반환해줌. | |
:param str user_id: 사용자id | |
:return: level별 총 문제수, 내가 푼 문제수 | |
:rtype: list or None | |
""" | |
url = f"https://solved.ac/api/v3/user/problem_stats?handle={user_id}" | |
r_count_by_level = requests.get(url) | |
if r_count_by_level.status_code == requests.codes.ok: | |
count_by_level = json.loads(r_count_by_level.content.decode('utf-8')) | |
filted_count_by_level = [ {"level":dict_['level'], "total":dict_['total'], "solved":dict_['solved'],} for dict_ in count_by_level if dict_.get('solved') != 0 ] | |
filted_count_by_level = sorted(filted_count_by_level, key=lambda x:x['level'], reverse=True) | |
return filted_count_by_level | |
else: | |
print("레벨별, 전체 문제수, 푼 문제수 요청 실패") | |
return | |
def save_to_db(data_list, collection_names, user_id, my_ip, username, password, db_name): | |
""" | |
3개의 콜레션에, 3개의 api요청 정보를 저장함. | |
:params list data_list: 3개의 api함수 요청으로 인해 들어오는 3개의 데이터 | |
:params list collection_names: 3개의 api요청 데이터를 처리할 3개의 collectin list | |
:params str my_ip: DB IP | |
:params str username: DB 계정 | |
:params str password: DB 계정 비밀번호 | |
:params str db_name: DB 이름 | |
:return result: <DB 저장의 결과(성공, 실패등)> 메세지를 담은 dictionary(3개의 collection마다 result_콜렉션명의 key에 표기) | |
:rtype dict | |
""" | |
#db_result = {'result':'success'} # default sucess -> 중복발생시, Insert and Ignore라는 메세지 넣어줄 것임. | |
db_result = {} | |
client = MongoClient(host=my_ip, port=27017, username=username, password=password) | |
db = client[db_name] | |
# data 와 colletion 1:1 매칭시켜서 zip으로 처리 | |
for data, collection_name in zip(data_list, collection_names): | |
db_result[f'result_{collection_name}'] = 'success' # default sucess -> 중복발생시, Insert and Ignore라는 메세지 넣어줄 것임. | |
collection = db[collection_name] | |
if collection_name == 'user': | |
# data == profile_dict -> tier, rank, solvedCount, rating, exp -> user_id 추가 -> 그대로 저장 | |
data['user_id'] = user_id | |
elif collection_name == 'count_by_level': | |
# data == count_by_level_list -> + level, solved, total -> 개별 user_id 추가 -> 그대로 저장 | |
new_data = [] | |
for x in data: | |
x['user_id'] = user_id | |
new_data.append(x) | |
data = new_data | |
else: | |
data = [ {"problemId" : x.get('problemId', None), "user_id" : user_id} for x in data[1]] | |
try: | |
if collection_name == 'user': | |
try: | |
# 중복방지용 -> 1. index로 지정안해주면, 계속 참. | |
collection.create_index([('user',1)],unique=True) | |
collection.insert_one(data) | |
except DuplicateKeyError as dke: | |
# print("user 이미 존재함.업데이트 시작.", dke) | |
# 2. find조건을 user_id로 찾은 뒤, $set을 들어온 새 데이터 전체로 업데이트 | |
collection.update_one({"user_id":user_id}, { "$set": data }, upsert=True) | |
else: | |
collection.insert_many(data, ordered=False) | |
except BulkWriteError as bwe: | |
db_result[f'result_{collection_name}'] = 'Insert and Ignore duplicated data' | |
return db_result | |
user_id = 'tingstyle1' | |
profile_dict = get_profile(user_id) | |
count_by_level_list = get_count_by_level(user_id) | |
count_and_sovled_list = get_solved(user_id) | |
data_list = [profile_dict, count_by_level_list, count_and_sovled_list] | |
collection_names = ['user', 'count_by_level', 'count_and_sovled'] | |
my_ip = '***.85.181.203' | |
username = 'likelion' | |
password = '***213' | |
db_name = 'likelion' | |
result = save_to_db(data_list, collection_names, user_id, | |
my_ip, username, password, db_name) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment