Skip to content

Instantly share code, notes, and snippets.

@is2js
Created October 11, 2021 09:04
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save is2js/64b90ed7c89cc5a7fbaec52ae07948dd to your computer and use it in GitHub Desktop.
Save is2js/64b90ed7c89cc5a7fbaec52ae07948dd to your computer and use it in GitHub Desktop.
04_function_api_add_save.py
import json
import requests
from pymongo import MongoClient
from pymongo.errors import BulkWriteError
def get_profile(user_id):
"""
정보 조회 - user_id를 입력하면 백준 사이트에서 해당 user의 프로필 정보 중 일부를 반환해줌.
:param str user_id: 사용자id
:return: 백준 프로필 정보
:rtype: dict or None
"""
url = f"https://solved.ac/api/v3/user/show?handle={user_id}"
r_profile = requests.get(url)
if r_profile.status_code == requests.codes.ok:
profile = json.loads(r_profile.content.decode('utf-8'))
profile = \
{
"tier" : profile.get("tier"),
"rank" : profile.get("rank"),
"solvedCount" : profile.get("solvedCount"),
"rating" : profile.get("rating"),
"exp" : profile.get("exp"),
}
return profile
else:
print("프로필 요청 실패")
return
def get_solved(user_id):
"""
정보 조회 - user_id를 입력하면 백준 사이트에서 해당 user가 푼 총 문제수, 문제들 정보(level 높은 순)를 튜플(int, list)로 반환해줌.
:param str user_id: 사용자id
:return: 내가 푼 문제수, 내가 푼 문제들 정보
:rtype: tuple ([0]: int, [1]:list) or None
"""
url = f"https://solved.ac/api/v3/search/problem?query=solved_by%3A{user_id}&sort=level&direction=desc"
r_solved = requests.get(url)
# 요기는 user_id가 이상하더라도, 무조건 200로 반환된다.
# print(r_solved)
# print(r_solved.content.decode('utf-8'))
# <Response [200]>
# {"count":0,"items":[]}
# 길이를 가지고 0일때 한번 return 시켜서 끝내자.
if json.loads(r_solved.content.decode('utf-8')).get('count') == 0:
print("푼 문제가 없거나 요청이 잘못 됨.")
return
if r_solved.status_code == requests.codes.ok:
solved = json.loads(r_solved.content.decode('utf-8'))
count = solved.get("count")
items = solved.get("items")
solved_problems = []
for item in items:
solved_problems.append(
{
'problemId': item.get("problemId"),
'titleKo': item.get("titleKo"),
'level': item.get("level"),
}
)
# print("푼 문제수와 젤 고난이도 문제 1개만 >>>", count, solved_problems[0])
return count, solved_problems
else:
print("푼 문제들 요청 실패")
return
def get_count_by_level(user_id):
"""
정보 조회 - user_id를 입력하면 백준 사이트에서 해당 user가 푼 문제들에 대한 level별 문제수 정보를 level 높은 순으로 반환해줌.
:param str user_id: 사용자id
:return: level별 총 문제수, 내가 푼 문제수
:rtype: list or None
"""
url = f"https://solved.ac/api/v3/user/problem_stats?handle={user_id}"
r_count_by_level = requests.get(url)
if r_count_by_level.status_code == requests.codes.ok:
count_by_level = json.loads(r_count_by_level.content.decode('utf-8'))
filted_count_by_level = [ {"level":dict_['level'], "total":dict_['total'], "solved":dict_['solved'],} for dict_ in count_by_level if dict_.get('solved') != 0 ]
filted_count_by_level = sorted(filted_count_by_level, key=lambda x:x['level'], reverse=True)
return filted_count_by_level
else:
print("레벨별, 전체 문제수, 푼 문제수 요청 실패")
return
def save_to_db(data_list, collection_names, user_id, my_ip, username, password, db_name):
"""
딕셔너리 리스트를 DB에 저장
:params list data_list: 3개의 api요청으로 인해 들어오는 3개의 데이터
:params list collection_names: 3개의 api요청 데이터를 처리할 collectin list
:params str my_ip: DB IP
:params str username: DB 계정
:params str password: DB 계정 비밀번호
:params str db_name: DB 이름
:return result: <DB 저장의 결과(성공, 실패등)> 메세지를 담은 dictionary
:rtype dict
"""
#db_result = {'result':'success'} # default sucess -> 중복발생시, Insert and Ignore라는 메세지 넣어줄 것임.
db_result = {}
client = MongoClient(host=my_ip, port=27017, username=username, password=password)
db = client[db_name]
# data 와 colletion 1:1 매칭시켜서 zip으로 처리
for data, collection_name in zip(data_list, collection_names):
print(f"{collection_name}")
db_result[f'result_{collection_name}'] = 'success' # default sucess -> 중복발생시, Insert and Ignore라는 메세지 넣어줄 것임.
collection = db[collection_name]
# collection.create_index([('user_id', 1)], unique=True)
# 'user', 'count_by_level', 'count_and_sovled'
# profile_dict, count_by_level_list, count_and_sovled_list
if collection_name == 'user':
# data == profile_dict -> tier, rank, solvedCount, rating, exp -> user_id 추가 -> 그대로 저장
data['user_id'] = user_id
elif collection_name == 'count_by_level':
# data == count_by_level_list -> + level, solved, total -> 개별 user_id 추가 -> 그대로 저장
new_data = []
for x in data:
x['user_id'] = user_id
new_data.append(x)
data = new_data
else:
# data == count_and_sovled_list -> problemId 만 뽑아서 저장 -> user_id추가 -> 나중에 problemId로 전체문제에서 추출해주면 될 듯
# print("수정전:>>>", data) # > (98, [{ }, { }])
data = [ {"problemId" : x.get('problemId', None), "user_id" : user_id} for x in data[1]]
#print("수정후:>>>", data)
# print("넣기전 data :>>", data)
try:
if collection_name == 'user':
collection.insert_one(data)
# 중복방지용
collection.update(data,data,upsert=True)
else:
collection.insert_many(data, ordered=False)
except BulkWriteError as bwe:
db_result[f'result_{collection_name}'] = 'Insert and Ignore duplicated data'
return db_result
user_id = 'tingstyle1'
profile_dict = get_profile(user_id)
count_by_level_list = get_count_by_level(user_id)
count_and_sovled_list = get_solved(user_id)
data_list = [profile_dict, count_by_level_list, count_and_sovled_list]
# print('*****')
# print(data_list[0])
# print('*****')
# # print(data_list[1])
# print('*****')
# print(data_list[2])
# print('*****')
collection_names = ['user', 'count_by_level', 'count_and_sovled']
my_ip = '115.85.181.203'
username = 'likelion'
password = '564213'
db_name = 'likelion'
result = save_to_db(data_list, collection_names, user_id,
my_ip, username, password, db_name)
# user_id = "jigreg" # user_id
# profile_dict = get_profile(user_id)
# print(f"========{user_id}님의 프로필========")
# print(profile_dict)
# # 유저 정보 collection(user)
# # user_id, tier, rank, solvedCount, rating, exp
# print(f"========{user_id}님이 푼 문제들의 레벨별 갯수========")
# count_by_level_list = get_count_by_level(user_id)
# print(count_by_level_list)
# # (유저가 푼) 문제풀이 정보(count_by_level)
# # + level, solved, total + user_id
# count_and_sovled_list = get_solved(user_id)
# print(f"========{user_id}님이 푼 문제들({count_and_sovled_list[0] if count_and_sovled_list else None})========")
# print(count_and_sovled_list[1] if count_and_sovled_list else None)
# # 문제들 collection (count_and_sovled) -> user_id 필드(FK) 포함시킬 것
# # problemId + user_id
# # -> problemId만 있으면, 추가정보는 따로 불러올 수 있다.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment