Skip to content

Instantly share code, notes, and snippets.

@jerryan999
Last active November 10, 2018 13:01
Show Gist options
  • Save jerryan999/675a6336af2ebd8f5f5f8ea24bd68ad3 to your computer and use it in GitHub Desktop.
Save jerryan999/675a6336af2ebd8f5f5f8ea24bd68ad3 to your computer and use it in GitHub Desktop.
Jobschedule_by_process_and_threading
from multiprocessing import cpu_count, Process, Queue, Manager
import threading
import os
try:
from Queue import Empty
except:
from queue import Empty
from threading import Lock as TL
import threading
from multiprocessing import Lock as PL
from multiprocessing import Pool
import time
from tqdm import tqdm
class JobSchedule(object):
def __init__(self, function,queue,prcesscount=None,thread_count=1, max_size=0, third_func=None, third_queue=None):
# main function and queue
# specical token `None` for the aim of stopping worker
self.function = function
self.queue = queue
# process and threading config
self.thread_count = thread_count
self.prcesscount = [prcesscount,cpu_count()][prcesscount is None]
# for tqdm to update
# message token for example: progress and finish
self.message_queue = Queue()
self.max_size = max_size
# third function and queue
# queue message for example: (token,content)
# token: progress and finish
self.third_func = third_func
self.third_queue = third_queue
def _makethread(self,target,thread_count,args, pid):
thread_pool = []
for x in range(thread_count):
if self.third_queue:
t = threading.Thread(target=target,args=(self.update_pbar, pid, args, self.third_queue))
else:
t = threading.Thread(target=target,args=(self.update_pbar, pid, args))
thread_pool.append(t)
for process in thread_pool:
process.start()
#print("Process pid:{},thread{}/active_count:{}".format(pid,process.name,threading.active_count()))
for process in thread_pool:
process.join()
def start(self):
# start pbar thread
pbar_thread = threading.Thread(target=self.make_pbar_thread)
pbar_thread.start()
# start third thread start
if self.third_func:
third_thread = threading.Thread(target=self.third_func, args=(self.third_queue,))
third_thread.start()
# start task process and threading
process_pool = []
for x in range(self.prcesscount):
p = Process(target=self._makethread,args=(self.function,self.thread_count,self.queue, x))
process_pool.append(p)
for process in process_pool:
process.start()
for process in process_pool:
process.join()
# stop bar worker
self.update_pbar('finish')
pbar_thread.join()
# stop third_queue
if self.third_queue:
self.third_queue.put(('finish', None))
third_thread.join()
# stop task workers
# for x in range(self.prcesscount):
# for y in range(self.thread_count):
# self.queue.put(None)
def make_pbar_thread(self):
'''
two kinds of tokens:
`progress` stands for working
`finish` stands for end
'''
pbar = tqdm(total=self.max_size)
while True:
message = self.message_queue.get()
if message == 'progress':
pbar.update()
if message == 'finish':
return
def update_pbar(self, message):
self.message_queue.put(message)
# encoding=utf-8
import urllib.parse
import requests
from wehomeWechatPipeline.common.logger.logger import WechatPipelineLogger
from wehomeWechatPipeline.common.factory.enginefactory import EngineFactory
from wehomeWechatPipeline.config import app_config as config
from wehomeWechatPipeline.common.constant import ua
from wehomeWechatPipeline.common.utils.jobschedule import JobSchedule
from datetime import datetime, date, timedelta, time
import json
from .constant import headers,proxies
from multiprocessing import Queue
import threading
from tqdm import tqdm
logger = WechatPipelineLogger('info', filehandle=config['filehandle']).build()
def get_properties():
# 从数据库中取得房源数据
sql = '''
SELECT id,addr,city,state,zipcode
FROM new_listings_investment
WHERE
status = 2
'''.format(date.today().isoformat())
engine = EngineFactory.get_engine('wechatdata')
results = []
with engine.connect() as conn:
for r in conn.execute(sql):
item = {}
item['id'] = r[0]
item['addr'] = r[1]
item['city'] = r[2]
item['state'] = r[3]
item['zipcode'] = r[4]
results.append(item)
return results
def get_html_with_auth(url,auth):
r = requests.get(url,
headers={
'user-agent': ua,
'Authorization': auth,
})
return r.text
def get_house_canary_estimate(item,processed_queue):
'''
search property details
'''
item['slug'] = "{}-{}-{}-{}".format(item['addr'].replace(' ','-'),
item['city'].replace(' ','-'),
item['state'],
item['zipcode'])
item['street_address'] = item['slug'].replace('-',' ')
property_details_url = "https://api.housecanary.com/api/v1/proxy/huell/api/v1/emporium/subject_details/?"+\
urllib.parse.urlencode({
"add_listing_record":True,
"address_slug":item['slug'],
"street_address":item['street_address'],
"zipcode":item['zipcode']}
)
property_details = get_html_with_auth(property_details_url,
auth='Basic ZmFmYWZmQGZhZmFsZi5jb206ZmFmYWZmYWZhZg==')
# parse result
hcestimate_obj = json.loads(property_details)
#print(hcestimate_obj)
item['hc_estimate'] = hcestimate_obj.get('value')
# insert to queue
processed_queue.put(("insert",item))
def update_one_to_sql(item):
engine = EngineFactory.get_engine('wechatdata')
if item.get('hc_estimate') is not None:
sql = '''
UPDATE new_listings_investment
SET hc_estimate = {}
WHERE id={}'''.format(item['hc_estimate'],item['id'])
with engine.connect() as conn:
conn.execute(sql)
def process():
results = get_properties()
# enqueue tasks
property_queue = Queue()
count = 0
for item in results:
if "#" in item['addr'] or item['city'] is None: # TODO
continue
property_queue.put_nowait(item)
count+=1
logger.info("Housecanary Property task count:{}".format(count))
def main_task(update_bar, pid, queue, processed_queue):
# this queue should be the property_queue
while not queue.empty():
item = queue.get(True)
get_house_canary_estimate(item,processed_queue)
update_bar('progress')
def sql_task(queue):
# this queue should be the third queue which waits for writing to sql
while True:
token,item = queue.get(block=True, timeout=None)
if token == 'finish':
return
update_one_to_sql(item)
# third queue
third_queue = Queue()
# job schedule
js = JobSchedule(
function=main_task,
queue=property_queue,
thread_count=10,
max_size=count,
third_func=sql_task,
third_queue=third_queue)
js.start()
def sync(recent):
logger.info("Housecanary Property scraper start")
process()
logger.info("Housecanary Property scraper end")
@jerryan999
Copy link
Author

Queue should come from module multiprocessing not queue

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment