-
-
Save mahdizojaji/2eb78afd7691ff3d774b1631f614f2a0 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from requests import get | |
from redis import Redis | |
from flask import Flask, jsonify | |
from threading import Thread | |
app = Flask(__name__) | |
class JobDB: | |
redis = Redis(decode_responses=True) | |
def save_job(self, id_, type_, url, created_at, company, company_url, location, title, description, company_logo): | |
data = { | |
'type': type_ or '', | |
'url': url or '', | |
'created_at': created_at or '', | |
'company': company or '', | |
'company_url': company_url or '', | |
'location': location or '', | |
'title': title or '', | |
'description': description or '', | |
'company_logo': company_logo or '' | |
} | |
self.redis.hmset(id_, data) | |
self.redis.sadd('jobs', id_) | |
def exist_job(self, id_): | |
return self.redis.sismember('jobs', id_) | |
def get_jobs(self): | |
for job_id in self.redis.smembers('jobs'): | |
yield self.redis.hgetall(job_id) | |
class JobApi(JobDB): | |
__BASE_URL = 'https://jobs.github.com/positions.json' | |
def get_job_positions(self): | |
url = f'{self.__BASE_URL}?search=python' | |
response = get(url) | |
if response.ok: | |
print('ok') | |
for job_position in response.json(): | |
print(job_position) | |
yield job_position | |
def run(self): | |
while True: | |
for job_position in self.get_job_positions(): | |
if self.exist_job(id_=job_position['id']): | |
continue | |
self.save_job( | |
id_=job_position['id'], | |
type_=job_position['type'], | |
url=job_position['url'], | |
created_at=job_position['created_at'], | |
company=job_position['company'], | |
company_url=job_position['company_url'], | |
location=job_position['location'], | |
title=job_position['title'], | |
description=job_position['description'], | |
company_logo=job_position['company_logo'] | |
) | |
class Main: | |
def __init__(self): | |
self.job_api = JobApi() | |
def run(self): | |
self.job_api.run() | |
@app.route('/', methods=['GET', 'POST']) | |
def index(): | |
return jsonify({'ok': True, 'jobs': [job for job in self.job_api.get_jobs()]}) | |
app.run(debug=True) | |
if __name__ == '__main__': | |
main = Main() | |
main.run() | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment