Skip to content

Instantly share code, notes, and snippets.

@mlzxgzy
Last active November 30, 2021 07:45
Show Gist options
  • Save mlzxgzy/c1085d0e140184489968a747282d00df to your computer and use it in GitHub Desktop.
Save mlzxgzy/c1085d0e140184489968a747282d00df to your computer and use it in GitHub Desktop.
国家统计局地区爬虫 - 仅省市区 - Python3
import re
import time
import bs4
import requests
from bs4 import BeautifulSoup as bs, Tag
from urllib.parse import urljoin
import json
BASE_URL = 'http://www.stats.gov.cn/tjsj/tjbz/tjyqhdmhcxhfdm/'
def URL2ADCode(url):
code = url.split('/')[-1].split('.')[0]
return code + '0' * (6 - len(code))
def GetAvailbleYearCode():
r"""获取所有可用的年份以及地址
:return: 所有年份
:rtype: list
"""
def GetAvalible_li(alldata: bs4.element.Tag):
ret = []
for i in allYears.find_all('li'):
if i.get('class', None) == None:
ret.append(i)
return ret
ret = []
htmlret: requests.models.Response = requests.get(BASE_URL)
html: bs4.BeautifulSoup = bs(htmlret.content, features="html.parser")
allYears: bs4.element.Tag = html.find('ul', attrs={'class': 'center_list_contlist'})
allavaYears = GetAvalible_li(allYears)
for i in allavaYears:
Now = {}
link = i.find('a')
if link is None:
continue
Now['link'] = urljoin(BASE_URL, link.get('href'))
Now['year'] = i.find('font', attrs={'class': 'cont_tit03'}).string[:-1]
ret.append(Now)
return ret
def PrintAvailbleYear():
'''
输出所有可用年份
:return: [最大年份, 所有年份]
'''
Years = GetAvailbleYearCode()
print("当前可用年份:")
last = 0
for i in Years:
print(f" - {i['year']}")
if int(i['year']) > last:
last = int(i['year'])
return last, Years
def InputTargetYear(last, years: list) -> dict:
'''
输入并判断年份是否正确
:param last: 最大年份
:param years: 所有年份
:return: 所选年份
'''
targetYear = input(f"请输入想获取的年份:[{last}] ")
if targetYear == "":
targetYear = str(last)
try:
ret = filter(lambda x: x['year'] == targetYear, years)
ret = list(ret)
if len(ret) == 0:
raise ValueError
ret = ret[0]
except ValueError as e:
print('[!] 请输入正确的年份')
return InputTargetYear(last, years)
return ret
def GetAvailbleProvince(link) -> list:
'''
从给出的URL中获取所有可用的省份及地址
:param link: 对应年份的URL
:return: Dict
'''
ret = []
htmllink = requests.get(link)
html: bs4.BeautifulSoup = bs(htmllink.content, features="html.parser")
tableforAll: bs4.element.Tag = html.find('table', attrs={'class': "provincetable"})
tableforProvince = tableforAll.find_all('tr', attrs={'class': "provincetr"})
for i in tableforProvince:
column = i.find_all('a')
for j in column:
adlink = j.get('href')
ret.append({'name': j.text, 'adcode': URL2ADCode(adlink), 'url': urljoin(link, adlink), 'districts': None})
print(f"省级完毕,共{len(ret)}条记录")
return ret
def GetAvailbleCity(p):
def GetCity(url):
ret = []
htmllink = requests.get(url)
html: bs4.BeautifulSoup = bs(htmllink.content, features="html.parser")
tableforAll: bs4.element.Tag = html.find('table', attrs={'class': "citytable"})
tableforCity = tableforAll.find_all('tr', attrs={'class': "citytr"})
for i in tableforCity:
data = i.find_all('a')[1]
adlink = data.get('href')
ret.append(
{"name": data.text, "adcode": URL2ADCode(adlink), 'url': urljoin(url, adlink), 'districts': None})
print(f"完毕,共{len(ret)}条数据")
return ret
for i in p:
print(i['name'], end='')
i['districts'] = GetCity(i['url'])
def GetAvailbleCounty(c):
recomp = re.compile('(\d{6})\d{6}(.+?)$')
def GetCounty(url):
ret = []
htmllink = None
try:
htmllink = requests.get(url, timeout=2)
if htmllink.status_code != 200:
raise Exception()
except:
time.sleep(1)
print(f"*!*!*!*错误 {htmllink.status_code} *!*!*!*")
return GetCounty(url)
htmllink.encoding = 'GBK'
html: bs4.BeautifulSoup = bs(htmllink.text, features="html.parser")
tableforAll: bs4.element.Tag = html.find('table', attrs={'class': re.compile(r'.+?table')})
tableforCounty = tableforAll.find_all('tr', attrs={'class': re.compile(r'.+?tr')})
for i in tableforCounty:
data = recomp.findall(i.text)[0]
ret.append(
{"name": data[1], "adcode": data[0], 'districts': None})
print(f"完成,共{len(ret)}条数据")
return ret
for i in c:
print(i['name'])
for j in i['districts']:
print(f'\t{j["name"]}', end='')
j['districts'] = GetCounty(j['url'])
pass
def Formatter(data):
'''
可在此进行数据格式化,随意修改
结构为:
根数组 - 省份1 - 市/县1 - 区1
| | |
| | - 区2
| |
| - 市/县2
|
- 省份2
....
格式为:
{'name' : '名称',
'adcode' : '地区码',
'districts' : '子地区'
}
:param data: 数据
:return: 处理好的数据
'''
def format(data: dict):
if 'url' in data:
del data['url']
if 'districts' in data and data['districts'] is not None:
for i in data['districts']:
format(i)
for i in data:
format(i)
def Save(filename, data):
file = open(filename, 'w', encoding='UTF-8')
json.dump(data, file, ensure_ascii=False)
file.close()
def main(FILENAME):
last, years = PrintAvailbleYear()
targetYear = InputTargetYear(last, years)
Provinces = GetAvailbleProvince(targetYear["link"])
GetAvailbleCity(Provinces)
GetAvailbleCounty(Provinces)
Formatter(Provinces)
Save(FILENAME, Provinces)
print("-=-=-=-=-=-=-=-全部完成-=-=-=-=-=-=-=-")
if __name__ == '__main__':
main("area.json")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment