Created
October 11, 2021 09:04
-
-
Save is2js/64b90ed7c89cc5a7fbaec52ae07948dd to your computer and use it in GitHub Desktop.
04_function_api_add_save.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import json | |
import requests | |
from pymongo import MongoClient | |
from pymongo.errors import BulkWriteError | |
def get_profile(user_id): | |
""" | |
정보 조회 - user_id를 입력하면 백준 사이트에서 해당 user의 프로필 정보 중 일부를 반환해줌. | |
:param str user_id: 사용자id | |
:return: 백준 프로필 정보 | |
:rtype: dict or None | |
""" | |
url = f"https://solved.ac/api/v3/user/show?handle={user_id}" | |
r_profile = requests.get(url) | |
if r_profile.status_code == requests.codes.ok: | |
profile = json.loads(r_profile.content.decode('utf-8')) | |
profile = \ | |
{ | |
"tier" : profile.get("tier"), | |
"rank" : profile.get("rank"), | |
"solvedCount" : profile.get("solvedCount"), | |
"rating" : profile.get("rating"), | |
"exp" : profile.get("exp"), | |
} | |
return profile | |
else: | |
print("프로필 요청 실패") | |
return | |
def get_solved(user_id): | |
""" | |
정보 조회 - user_id를 입력하면 백준 사이트에서 해당 user가 푼 총 문제수, 문제들 정보(level 높은 순)를 튜플(int, list)로 반환해줌. | |
:param str user_id: 사용자id | |
:return: 내가 푼 문제수, 내가 푼 문제들 정보 | |
:rtype: tuple ([0]: int, [1]:list) or None | |
""" | |
url = f"https://solved.ac/api/v3/search/problem?query=solved_by%3A{user_id}&sort=level&direction=desc" | |
r_solved = requests.get(url) | |
# 요기는 user_id가 이상하더라도, 무조건 200로 반환된다. | |
# print(r_solved) | |
# print(r_solved.content.decode('utf-8')) | |
# <Response [200]> | |
# {"count":0,"items":[]} | |
# 길이를 가지고 0일때 한번 return 시켜서 끝내자. | |
if json.loads(r_solved.content.decode('utf-8')).get('count') == 0: | |
print("푼 문제가 없거나 요청이 잘못 됨.") | |
return | |
if r_solved.status_code == requests.codes.ok: | |
solved = json.loads(r_solved.content.decode('utf-8')) | |
count = solved.get("count") | |
items = solved.get("items") | |
solved_problems = [] | |
for item in items: | |
solved_problems.append( | |
{ | |
'problemId': item.get("problemId"), | |
'titleKo': item.get("titleKo"), | |
'level': item.get("level"), | |
} | |
) | |
# print("푼 문제수와 젤 고난이도 문제 1개만 >>>", count, solved_problems[0]) | |
return count, solved_problems | |
else: | |
print("푼 문제들 요청 실패") | |
return | |
def get_count_by_level(user_id): | |
""" | |
정보 조회 - user_id를 입력하면 백준 사이트에서 해당 user가 푼 문제들에 대한 level별 문제수 정보를 level 높은 순으로 반환해줌. | |
:param str user_id: 사용자id | |
:return: level별 총 문제수, 내가 푼 문제수 | |
:rtype: list or None | |
""" | |
url = f"https://solved.ac/api/v3/user/problem_stats?handle={user_id}" | |
r_count_by_level = requests.get(url) | |
if r_count_by_level.status_code == requests.codes.ok: | |
count_by_level = json.loads(r_count_by_level.content.decode('utf-8')) | |
filted_count_by_level = [ {"level":dict_['level'], "total":dict_['total'], "solved":dict_['solved'],} for dict_ in count_by_level if dict_.get('solved') != 0 ] | |
filted_count_by_level = sorted(filted_count_by_level, key=lambda x:x['level'], reverse=True) | |
return filted_count_by_level | |
else: | |
print("레벨별, 전체 문제수, 푼 문제수 요청 실패") | |
return | |
def save_to_db(data_list, collection_names, user_id, my_ip, username, password, db_name): | |
""" | |
딕셔너리 리스트를 DB에 저장 | |
:params list data_list: 3개의 api요청으로 인해 들어오는 3개의 데이터 | |
:params list collection_names: 3개의 api요청 데이터를 처리할 collectin list | |
:params str my_ip: DB IP | |
:params str username: DB 계정 | |
:params str password: DB 계정 비밀번호 | |
:params str db_name: DB 이름 | |
:return result: <DB 저장의 결과(성공, 실패등)> 메세지를 담은 dictionary | |
:rtype dict | |
""" | |
#db_result = {'result':'success'} # default sucess -> 중복발생시, Insert and Ignore라는 메세지 넣어줄 것임. | |
db_result = {} | |
client = MongoClient(host=my_ip, port=27017, username=username, password=password) | |
db = client[db_name] | |
# data 와 colletion 1:1 매칭시켜서 zip으로 처리 | |
for data, collection_name in zip(data_list, collection_names): | |
print(f"{collection_name}") | |
db_result[f'result_{collection_name}'] = 'success' # default sucess -> 중복발생시, Insert and Ignore라는 메세지 넣어줄 것임. | |
collection = db[collection_name] | |
# collection.create_index([('user_id', 1)], unique=True) | |
# 'user', 'count_by_level', 'count_and_sovled' | |
# profile_dict, count_by_level_list, count_and_sovled_list | |
if collection_name == 'user': | |
# data == profile_dict -> tier, rank, solvedCount, rating, exp -> user_id 추가 -> 그대로 저장 | |
data['user_id'] = user_id | |
elif collection_name == 'count_by_level': | |
# data == count_by_level_list -> + level, solved, total -> 개별 user_id 추가 -> 그대로 저장 | |
new_data = [] | |
for x in data: | |
x['user_id'] = user_id | |
new_data.append(x) | |
data = new_data | |
else: | |
# data == count_and_sovled_list -> problemId 만 뽑아서 저장 -> user_id추가 -> 나중에 problemId로 전체문제에서 추출해주면 될 듯 | |
# print("수정전:>>>", data) # > (98, [{ }, { }]) | |
data = [ {"problemId" : x.get('problemId', None), "user_id" : user_id} for x in data[1]] | |
#print("수정후:>>>", data) | |
# print("넣기전 data :>>", data) | |
try: | |
if collection_name == 'user': | |
collection.insert_one(data) | |
# 중복방지용 | |
collection.update(data,data,upsert=True) | |
else: | |
collection.insert_many(data, ordered=False) | |
except BulkWriteError as bwe: | |
db_result[f'result_{collection_name}'] = 'Insert and Ignore duplicated data' | |
return db_result | |
user_id = 'tingstyle1' | |
profile_dict = get_profile(user_id) | |
count_by_level_list = get_count_by_level(user_id) | |
count_and_sovled_list = get_solved(user_id) | |
data_list = [profile_dict, count_by_level_list, count_and_sovled_list] | |
# print('*****') | |
# print(data_list[0]) | |
# print('*****') | |
# # print(data_list[1]) | |
# print('*****') | |
# print(data_list[2]) | |
# print('*****') | |
collection_names = ['user', 'count_by_level', 'count_and_sovled'] | |
my_ip = '115.85.181.203' | |
username = 'likelion' | |
password = '564213' | |
db_name = 'likelion' | |
result = save_to_db(data_list, collection_names, user_id, | |
my_ip, username, password, db_name) | |
# user_id = "jigreg" # user_id | |
# profile_dict = get_profile(user_id) | |
# print(f"========{user_id}님의 프로필========") | |
# print(profile_dict) | |
# # 유저 정보 collection(user) | |
# # user_id, tier, rank, solvedCount, rating, exp | |
# print(f"========{user_id}님이 푼 문제들의 레벨별 갯수========") | |
# count_by_level_list = get_count_by_level(user_id) | |
# print(count_by_level_list) | |
# # (유저가 푼) 문제풀이 정보(count_by_level) | |
# # + level, solved, total + user_id | |
# count_and_sovled_list = get_solved(user_id) | |
# print(f"========{user_id}님이 푼 문제들({count_and_sovled_list[0] if count_and_sovled_list else None})========") | |
# print(count_and_sovled_list[1] if count_and_sovled_list else None) | |
# # 문제들 collection (count_and_sovled) -> user_id 필드(FK) 포함시킬 것 | |
# # problemId + user_id | |
# # -> problemId만 있으면, 추가정보는 따로 불러올 수 있다. | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment