Last active
November 10, 2018 13:01
-
-
Save jerryan999/675a6336af2ebd8f5f5f8ea24bd68ad3 to your computer and use it in GitHub Desktop.
Jobschedule_by_process_and_threading
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from multiprocessing import cpu_count, Process, Queue, Manager | |
import threading | |
import os | |
try: | |
from Queue import Empty | |
except: | |
from queue import Empty | |
from threading import Lock as TL | |
import threading | |
from multiprocessing import Lock as PL | |
from multiprocessing import Pool | |
import time | |
from tqdm import tqdm | |
class JobSchedule(object): | |
def __init__(self, function,queue,prcesscount=None,thread_count=1, max_size=0, third_func=None, third_queue=None): | |
# main function and queue | |
# specical token `None` for the aim of stopping worker | |
self.function = function | |
self.queue = queue | |
# process and threading config | |
self.thread_count = thread_count | |
self.prcesscount = [prcesscount,cpu_count()][prcesscount is None] | |
# for tqdm to update | |
# message token for example: progress and finish | |
self.message_queue = Queue() | |
self.max_size = max_size | |
# third function and queue | |
# queue message for example: (token,content) | |
# token: progress and finish | |
self.third_func = third_func | |
self.third_queue = third_queue | |
def _makethread(self,target,thread_count,args, pid): | |
thread_pool = [] | |
for x in range(thread_count): | |
if self.third_queue: | |
t = threading.Thread(target=target,args=(self.update_pbar, pid, args, self.third_queue)) | |
else: | |
t = threading.Thread(target=target,args=(self.update_pbar, pid, args)) | |
thread_pool.append(t) | |
for process in thread_pool: | |
process.start() | |
#print("Process pid:{},thread{}/active_count:{}".format(pid,process.name,threading.active_count())) | |
for process in thread_pool: | |
process.join() | |
def start(self): | |
# start pbar thread | |
pbar_thread = threading.Thread(target=self.make_pbar_thread) | |
pbar_thread.start() | |
# start third thread start | |
if self.third_func: | |
third_thread = threading.Thread(target=self.third_func, args=(self.third_queue,)) | |
third_thread.start() | |
# start task process and threading | |
process_pool = [] | |
for x in range(self.prcesscount): | |
p = Process(target=self._makethread,args=(self.function,self.thread_count,self.queue, x)) | |
process_pool.append(p) | |
for process in process_pool: | |
process.start() | |
for process in process_pool: | |
process.join() | |
# stop bar worker | |
self.update_pbar('finish') | |
pbar_thread.join() | |
# stop third_queue | |
if self.third_queue: | |
self.third_queue.put(('finish', None)) | |
third_thread.join() | |
# stop task workers | |
# for x in range(self.prcesscount): | |
# for y in range(self.thread_count): | |
# self.queue.put(None) | |
def make_pbar_thread(self): | |
''' | |
two kinds of tokens: | |
`progress` stands for working | |
`finish` stands for end | |
''' | |
pbar = tqdm(total=self.max_size) | |
while True: | |
message = self.message_queue.get() | |
if message == 'progress': | |
pbar.update() | |
if message == 'finish': | |
return | |
def update_pbar(self, message): | |
self.message_queue.put(message) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# encoding=utf-8 | |
import urllib.parse | |
import requests | |
from wehomeWechatPipeline.common.logger.logger import WechatPipelineLogger | |
from wehomeWechatPipeline.common.factory.enginefactory import EngineFactory | |
from wehomeWechatPipeline.config import app_config as config | |
from wehomeWechatPipeline.common.constant import ua | |
from wehomeWechatPipeline.common.utils.jobschedule import JobSchedule | |
from datetime import datetime, date, timedelta, time | |
import json | |
from .constant import headers,proxies | |
from multiprocessing import Queue | |
import threading | |
from tqdm import tqdm | |
logger = WechatPipelineLogger('info', filehandle=config['filehandle']).build() | |
def get_properties(): | |
# 从数据库中取得房源数据 | |
sql = ''' | |
SELECT id,addr,city,state,zipcode | |
FROM new_listings_investment | |
WHERE | |
status = 2 | |
'''.format(date.today().isoformat()) | |
engine = EngineFactory.get_engine('wechatdata') | |
results = [] | |
with engine.connect() as conn: | |
for r in conn.execute(sql): | |
item = {} | |
item['id'] = r[0] | |
item['addr'] = r[1] | |
item['city'] = r[2] | |
item['state'] = r[3] | |
item['zipcode'] = r[4] | |
results.append(item) | |
return results | |
def get_html_with_auth(url,auth): | |
r = requests.get(url, | |
headers={ | |
'user-agent': ua, | |
'Authorization': auth, | |
}) | |
return r.text | |
def get_house_canary_estimate(item,processed_queue): | |
''' | |
search property details | |
''' | |
item['slug'] = "{}-{}-{}-{}".format(item['addr'].replace(' ','-'), | |
item['city'].replace(' ','-'), | |
item['state'], | |
item['zipcode']) | |
item['street_address'] = item['slug'].replace('-',' ') | |
property_details_url = "https://api.housecanary.com/api/v1/proxy/huell/api/v1/emporium/subject_details/?"+\ | |
urllib.parse.urlencode({ | |
"add_listing_record":True, | |
"address_slug":item['slug'], | |
"street_address":item['street_address'], | |
"zipcode":item['zipcode']} | |
) | |
property_details = get_html_with_auth(property_details_url, | |
auth='Basic ZmFmYWZmQGZhZmFsZi5jb206ZmFmYWZmYWZhZg==') | |
# parse result | |
hcestimate_obj = json.loads(property_details) | |
#print(hcestimate_obj) | |
item['hc_estimate'] = hcestimate_obj.get('value') | |
# insert to queue | |
processed_queue.put(("insert",item)) | |
def update_one_to_sql(item): | |
engine = EngineFactory.get_engine('wechatdata') | |
if item.get('hc_estimate') is not None: | |
sql = ''' | |
UPDATE new_listings_investment | |
SET hc_estimate = {} | |
WHERE id={}'''.format(item['hc_estimate'],item['id']) | |
with engine.connect() as conn: | |
conn.execute(sql) | |
def process(): | |
results = get_properties() | |
# enqueue tasks | |
property_queue = Queue() | |
count = 0 | |
for item in results: | |
if "#" in item['addr'] or item['city'] is None: # TODO | |
continue | |
property_queue.put_nowait(item) | |
count+=1 | |
logger.info("Housecanary Property task count:{}".format(count)) | |
def main_task(update_bar, pid, queue, processed_queue): | |
# this queue should be the property_queue | |
while not queue.empty(): | |
item = queue.get(True) | |
get_house_canary_estimate(item,processed_queue) | |
update_bar('progress') | |
def sql_task(queue): | |
# this queue should be the third queue which waits for writing to sql | |
while True: | |
token,item = queue.get(block=True, timeout=None) | |
if token == 'finish': | |
return | |
update_one_to_sql(item) | |
# third queue | |
third_queue = Queue() | |
# job schedule | |
js = JobSchedule( | |
function=main_task, | |
queue=property_queue, | |
thread_count=10, | |
max_size=count, | |
third_func=sql_task, | |
third_queue=third_queue) | |
js.start() | |
def sync(recent): | |
logger.info("Housecanary Property scraper start") | |
process() | |
logger.info("Housecanary Property scraper end") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Queue
should come from modulemultiprocessing
notqueue