Last active
November 30, 2021 07:45
-
-
Save mlzxgzy/c1085d0e140184489968a747282d00df to your computer and use it in GitHub Desktop.
国家统计局地区爬虫 - 仅省市区 - Python3
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import re | |
import time | |
import bs4 | |
import requests | |
from bs4 import BeautifulSoup as bs, Tag | |
from urllib.parse import urljoin | |
import json | |
BASE_URL = 'http://www.stats.gov.cn/tjsj/tjbz/tjyqhdmhcxhfdm/' | |
def URL2ADCode(url): | |
code = url.split('/')[-1].split('.')[0] | |
return code + '0' * (6 - len(code)) | |
def GetAvailbleYearCode(): | |
r"""获取所有可用的年份以及地址 | |
:return: 所有年份 | |
:rtype: list | |
""" | |
def GetAvalible_li(alldata: bs4.element.Tag): | |
ret = [] | |
for i in allYears.find_all('li'): | |
if i.get('class', None) == None: | |
ret.append(i) | |
return ret | |
ret = [] | |
htmlret: requests.models.Response = requests.get(BASE_URL) | |
html: bs4.BeautifulSoup = bs(htmlret.content, features="html.parser") | |
allYears: bs4.element.Tag = html.find('ul', attrs={'class': 'center_list_contlist'}) | |
allavaYears = GetAvalible_li(allYears) | |
for i in allavaYears: | |
Now = {} | |
link = i.find('a') | |
if link is None: | |
continue | |
Now['link'] = urljoin(BASE_URL, link.get('href')) | |
Now['year'] = i.find('font', attrs={'class': 'cont_tit03'}).string[:-1] | |
ret.append(Now) | |
return ret | |
def PrintAvailbleYear(): | |
''' | |
输出所有可用年份 | |
:return: [最大年份, 所有年份] | |
''' | |
Years = GetAvailbleYearCode() | |
print("当前可用年份:") | |
last = 0 | |
for i in Years: | |
print(f" - {i['year']}") | |
if int(i['year']) > last: | |
last = int(i['year']) | |
return last, Years | |
def InputTargetYear(last, years: list) -> dict: | |
''' | |
输入并判断年份是否正确 | |
:param last: 最大年份 | |
:param years: 所有年份 | |
:return: 所选年份 | |
''' | |
targetYear = input(f"请输入想获取的年份:[{last}] ") | |
if targetYear == "": | |
targetYear = str(last) | |
try: | |
ret = filter(lambda x: x['year'] == targetYear, years) | |
ret = list(ret) | |
if len(ret) == 0: | |
raise ValueError | |
ret = ret[0] | |
except ValueError as e: | |
print('[!] 请输入正确的年份') | |
return InputTargetYear(last, years) | |
return ret | |
def GetAvailbleProvince(link) -> list: | |
''' | |
从给出的URL中获取所有可用的省份及地址 | |
:param link: 对应年份的URL | |
:return: Dict | |
''' | |
ret = [] | |
htmllink = requests.get(link) | |
html: bs4.BeautifulSoup = bs(htmllink.content, features="html.parser") | |
tableforAll: bs4.element.Tag = html.find('table', attrs={'class': "provincetable"}) | |
tableforProvince = tableforAll.find_all('tr', attrs={'class': "provincetr"}) | |
for i in tableforProvince: | |
column = i.find_all('a') | |
for j in column: | |
adlink = j.get('href') | |
ret.append({'name': j.text, 'adcode': URL2ADCode(adlink), 'url': urljoin(link, adlink), 'districts': None}) | |
print(f"省级完毕,共{len(ret)}条记录") | |
return ret | |
def GetAvailbleCity(p): | |
def GetCity(url): | |
ret = [] | |
htmllink = requests.get(url) | |
html: bs4.BeautifulSoup = bs(htmllink.content, features="html.parser") | |
tableforAll: bs4.element.Tag = html.find('table', attrs={'class': "citytable"}) | |
tableforCity = tableforAll.find_all('tr', attrs={'class': "citytr"}) | |
for i in tableforCity: | |
data = i.find_all('a')[1] | |
adlink = data.get('href') | |
ret.append( | |
{"name": data.text, "adcode": URL2ADCode(adlink), 'url': urljoin(url, adlink), 'districts': None}) | |
print(f"完毕,共{len(ret)}条数据") | |
return ret | |
for i in p: | |
print(i['name'], end='') | |
i['districts'] = GetCity(i['url']) | |
def GetAvailbleCounty(c): | |
recomp = re.compile('(\d{6})\d{6}(.+?)$') | |
def GetCounty(url): | |
ret = [] | |
htmllink = None | |
try: | |
htmllink = requests.get(url, timeout=2) | |
if htmllink.status_code != 200: | |
raise Exception() | |
except: | |
time.sleep(1) | |
print(f"*!*!*!*错误 {htmllink.status_code} *!*!*!*") | |
return GetCounty(url) | |
htmllink.encoding = 'GBK' | |
html: bs4.BeautifulSoup = bs(htmllink.text, features="html.parser") | |
tableforAll: bs4.element.Tag = html.find('table', attrs={'class': re.compile(r'.+?table')}) | |
tableforCounty = tableforAll.find_all('tr', attrs={'class': re.compile(r'.+?tr')}) | |
for i in tableforCounty: | |
data = recomp.findall(i.text)[0] | |
ret.append( | |
{"name": data[1], "adcode": data[0], 'districts': None}) | |
print(f"完成,共{len(ret)}条数据") | |
return ret | |
for i in c: | |
print(i['name']) | |
for j in i['districts']: | |
print(f'\t{j["name"]}', end='') | |
j['districts'] = GetCounty(j['url']) | |
pass | |
def Formatter(data): | |
''' | |
可在此进行数据格式化,随意修改 | |
结构为: | |
根数组 - 省份1 - 市/县1 - 区1 | |
| | | | |
| | - 区2 | |
| | | |
| - 市/县2 | |
| | |
- 省份2 | |
.... | |
格式为: | |
{'name' : '名称', | |
'adcode' : '地区码', | |
'districts' : '子地区' | |
} | |
:param data: 数据 | |
:return: 处理好的数据 | |
''' | |
def format(data: dict): | |
if 'url' in data: | |
del data['url'] | |
if 'districts' in data and data['districts'] is not None: | |
for i in data['districts']: | |
format(i) | |
for i in data: | |
format(i) | |
def Save(filename, data): | |
file = open(filename, 'w', encoding='UTF-8') | |
json.dump(data, file, ensure_ascii=False) | |
file.close() | |
def main(FILENAME): | |
last, years = PrintAvailbleYear() | |
targetYear = InputTargetYear(last, years) | |
Provinces = GetAvailbleProvince(targetYear["link"]) | |
GetAvailbleCity(Provinces) | |
GetAvailbleCounty(Provinces) | |
Formatter(Provinces) | |
Save(FILENAME, Provinces) | |
print("-=-=-=-=-=-=-=-全部完成-=-=-=-=-=-=-=-") | |
if __name__ == '__main__': | |
main("area.json") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment