Skip to content

Instantly share code, notes, and snippets.

@mahdizojaji
Created December 8, 2019 14:38
Show Gist options
  • Save mahdizojaji/2eb78afd7691ff3d774b1631f614f2a0 to your computer and use it in GitHub Desktop.
Save mahdizojaji/2eb78afd7691ff3d774b1631f614f2a0 to your computer and use it in GitHub Desktop.
from requests import get
from redis import Redis
from flask import Flask, jsonify
from threading import Thread
app = Flask(__name__)
class JobDB:
redis = Redis(decode_responses=True)
def save_job(self, id_, type_, url, created_at, company, company_url, location, title, description, company_logo):
data = {
'type': type_ or '',
'url': url or '',
'created_at': created_at or '',
'company': company or '',
'company_url': company_url or '',
'location': location or '',
'title': title or '',
'description': description or '',
'company_logo': company_logo or ''
}
self.redis.hmset(id_, data)
self.redis.sadd('jobs', id_)
def exist_job(self, id_):
return self.redis.sismember('jobs', id_)
def get_jobs(self):
for job_id in self.redis.smembers('jobs'):
yield self.redis.hgetall(job_id)
class JobApi(JobDB):
__BASE_URL = 'https://jobs.github.com/positions.json'
def get_job_positions(self):
url = f'{self.__BASE_URL}?search=python'
response = get(url)
if response.ok:
print('ok')
for job_position in response.json():
print(job_position)
yield job_position
def run(self):
while True:
for job_position in self.get_job_positions():
if self.exist_job(id_=job_position['id']):
continue
self.save_job(
id_=job_position['id'],
type_=job_position['type'],
url=job_position['url'],
created_at=job_position['created_at'],
company=job_position['company'],
company_url=job_position['company_url'],
location=job_position['location'],
title=job_position['title'],
description=job_position['description'],
company_logo=job_position['company_logo']
)
class Main:
def __init__(self):
self.job_api = JobApi()
def run(self):
self.job_api.run()
@app.route('/', methods=['GET', 'POST'])
def index():
return jsonify({'ok': True, 'jobs': [job for job in self.job_api.get_jobs()]})
app.run(debug=True)
if __name__ == '__main__':
main = Main()
main.run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment