Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
# coding=utf-8
import requests
import time
import os
import json
import queue
import threading
import tushare as ts
today = time.time()
today = int(today)
threeDay = [today - 3600 * 24 * 2, today - 3600 * 24, today]
yesterday = threeDay[1]
baseURL_day = "https://xueqiu.com/stock/forchartk/stocklist.json?symbol=SH600"
baseURL_now = "https://xueqiu.com/v4/stock/quote.json?code=SH600"
headers = {
"Cookie": "device_id=821d23973d557a1dc60dc5b8733d81d2; s=eh1lzjdjtv; xq_a_token=c18b900ab44e027d3fc6d58a2fd682e30f1bec07; xqat=c18b900ab44e027d3fc6d58a2fd682e30f1bec07; xq_r_token=9b14b6c6181fa6eb651c97ff98f48b0f04c3d6e9; xq_token_expire=Tue%20Jun%2012%202018%2017%3A12%3A07%20GMT%2B0800%20(CST); xq_is_login=1; u=4653963984; aliyungf_tc=AQAAAKDztAjpIQgAhP01biVt3zzy5euT; __utmc=1; bid=b2e914f29c0ca45a29d23889a46d8510_jhh9vt39; __utma=1.629377178.1526634714.1526958087.1526968944.4; __utmz=1.1526968944.4.3.utmcsr=google|utmccn=(organic)|utmcmd=organic|utmctr=(not%20provided); __utmb=1.2.10.1526968944; Hm_lvt_1db88642e346389874251b5a1eded6e3=1526958087,1526968944,1526968946; Hm_lpvt_1db88642e346389874251b5a1eded6e3=1526968946",
"Host": "xueqiu.com",
'content-type': 'application/json',
"user-agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_13_4) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/66.0.3359.139 Safari/537.36"
}
path = "./stock.txt"
if not os.path.exists(path):
print("不存在文件")
os.system("touch " + path)
# f = open(path, "w")
# for i in range(0, 1000):
# url = baseURL_day + "%03d" % i + "&period=1day&type=after&begin=" + str(threeDay[0] * 1000) + "&end=" + str(
# threeDay[2] * 1000)
# response = requests.post(url, headers=headers)
# if response.status_code == 200:
# responseJson = json.loads(response.text)
# responseListContent = list(responseJson["chartlist"])
# length = responseListContent.__len__()
#
# # 过滤掉不合格的数据
# if length < 4:
# continue
# if responseListContent[length - 1]["timestamp"] < yesterday * 1000:
# continue
#
# print("当前股票是:SH600" + str("%03d" % i))
# print(url)
# threeDayMax = responseListContent[length - 4]["high"]
# for i in range(length - 4, length - 1):
# print("前三天最高价依次是:" + str(responseListContent[i]["high"]))
# if threeDayMax < responseListContent[i]["high"]:
# threeDayMax = responseListContent[i]["high"]
# print("前三天最高价:" + str(threeDayMax))
# if responseListContent[length - 1]["low"] > threeDayMax:
# print("反打前三")
# f.write(responseJson["stock"]["symbol"] + "\n")
f = open(path, "a")
baseURL_day = "https://xueqiu.com/stock/forchartk/stocklist.json?symbol=SZ00"
baseURL_now = "https://xueqiu.com/v4/stock/quote.json?code=SZ00"
for i in range(0, 2932):
url = baseURL_day + "%04d" % i + "&period=1day&type=after&begin=" + str(threeDay[0] * 1000) + "&end=" + str(
threeDay[2] * 1000)
response = requests.post(url, headers=headers)
if response.status_code == 200:
responseJson = json.loads(response.text)
responseListContent = list(responseJson["chartlist"])
length = responseListContent.__len__()
# 过滤掉不合格的数据
if length < 4:
continue
if responseListContent[length - 1]["timestamp"] < yesterday * 1000:
continue
print("当前股票是:SZ00" + str("%04d" % i))
print(url)
threeDayMax = responseListContent[length - 4]["high"]
fiftyDayMin = responseListContent[length - 16]["low"]
fiftyDayMax = responseListContent[length - 16]["high"]
for j in range(length - 16, length - 1):
if fiftyDayMax < responseListContent[j]["high"]:
fiftyDayMax = responseListContent[j]["high"]
if fiftyDayMin > responseListContent[j]["low"]:
fiftyDayMin = responseListContent[j]["low"]
print("15日最低值:" + str(fiftyDayMin))
print("15日最高值:" + str(fiftyDayMax))
rate = (fiftyDayMax - fiftyDayMin) / fiftyDayMax
print("15差值百分比:" + str(rate * 100) + "%")
twentyDaysAgo = responseListContent[length - 21]["high"]
ma10 = responseListContent[length - 1]["ma10"]
ma20 = responseListContent[length - 1]["ma20"]
ma30 = responseListContent[length - 1]["ma30"]
for j in range(length - 4, length - 1):
print("前三天最高价依次是:" + str(responseListContent[j]["high"]))
if threeDayMax < responseListContent[j]["high"]:
threeDayMax = responseListContent[j]["high"]
print("前三天最高价:" + str(threeDayMax))
url = baseURL_now + "%04d" % i
referer = "https://xueqiu.com/S/SZ00" + "%04d" % i
stockID = "SZ00" + "%04d" % i
headers = {
"Referer": referer,
"Cookie": "device_id=821d23973d557a1dc60dc5b8733d81d2; s=eh1lzjdjtv; xq_a_token=c18b900ab44e027d3fc6d58a2fd682e30f1bec07; xqat=c18b900ab44e027d3fc6d58a2fd682e30f1bec07; xq_r_token=9b14b6c6181fa6eb651c97ff98f48b0f04c3d6e9; xq_token_expire=Tue%20Jun%2012%202018%2017%3A12%3A07%20GMT%2B0800%20(CST); xq_is_login=1; u=4653963984; aliyungf_tc=AQAAAKDztAjpIQgAhP01biVt3zzy5euT; __utmc=1; bid=b2e914f29c0ca45a29d23889a46d8510_jhh9vt39; __utma=1.629377178.1526634714.1526958087.1526968944.4; __utmz=1.1526968944.4.3.utmcsr=google|utmccn=(organic)|utmcmd=organic|utmctr=(not%20provided); __utmb=1.2.10.1526968944; Hm_lvt_1db88642e346389874251b5a1eded6e3=1526958087,1526968944,1526968946; Hm_lpvt_1db88642e346389874251b5a1eded6e3=1526968946",
"Host": "xueqiu.com",
'content-type': 'application/json',
"user-agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_13_4) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/66.0.3359.139 Safari/537.36"
}
response = requests.get(url, headers=headers)
if response.status_code == 200:
responseJson1 = json.loads(response.text)
if float(responseJson1[stockID]["current"]) > threeDayMax and ma20 > ma10 and ma30 > ma20 and rate > 0.15:
print("反打前三")
f.write(responseJson["stock"]["symbol"] + "\t" + "15差值百分比:" + str(rate * 100) + "%" + "\n")
# 线程管理者
class WorkManager(object):
def __init__(self, kw, work_num, thread_num):
self.task_queue = queue.Queue()
self.threads = []
self.kw = kw
self.__init_task_queue(work_num)
self.__init_thread_pool(thread_num)
self.start_task()
"""
添加一项工作入队
"""
def add_job(self, func):
self.task_queue.put(func) # 任务入队,Queue内部实现了同步机制
"""
初始化任务队列
"""
def __init_task_queue(self, jobs_num):
for i in range(1, jobs_num + 1):
self.add_job(do_job1(self.kw, i))
"""
初始化线程池
"""
def __init_thread_pool(self, thread_num):
for i in range(1, thread_num + 1):
self.threads.append(Work(self.task_queue))
"""
开始执行任务
"""
def start_task(self):
for i in self.threads:
i.setDaemon(1)
i.start()
"""
等待所有线程运行完毕
"""
def wait_allcomplete(self):
for item in self.threads:
item.join(1)
# 这是线程类
class Work(threading.Thread):
def __init__(self, task_queue):
threading.Thread.__init__(self)
self.task_queue = task_queue
def run(self):
while not self.task_queue.empty():
self.task_queue.get().start() # 任务异步出队,Queue内部实现了同步机制
self.task_queue.task_done() # 通知系统任务完成
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.