Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save hcy9546843522/35914b942f50c1276c96bc3fbf584019 to your computer and use it in GitHub Desktop.
Save hcy9546843522/35914b942f50c1276c96bc3fbf584019 to your computer and use it in GitHub Desktop.
import requests
import json
import re
import urllib3
import aiohttp
import aiofiles
import asyncio
from bs4 import BeautifulSoup
urllib3.disable_warnings()
#忽略request证书警告
def main(url,i):
number = get_m3u8(url,i)
#read_m3u8(number)
def get_m3u8(url,i):
#拿到初始页面源代码
r = requests.get(url).text
#拿到m3u8链接
obj = re.compile('/index(.*?).m3u8')
re_m3u8 = obj.findall(r)
m3u8_url = "https://m3api.awenhao.com/index" + re_m3u8[0] + ".m3u8"
#下载m3u8文件
r_m3u8 = requests.get(m3u8_url)
with open(f"{i}.m3u8",mode="wb") as f:
f.write(r_m3u8.content)
print(f"下载完第{i}个m3u8文件")
return i
async def read_m3u8(number):
a = 1
tasks = []
async with aiofiles.open(f"{number}.m3u8",mode="r",encoding="utf-8") as f:
async for line in f:
if line.startswith("#"):
continue
line = line.strip()
a = a+1
task = asyncio.create_task(download_ts(line,a))
tasks.append(task)
await asyncio.wait(tasks)
async def download_ts(url,name):
with aiohttp.ClientSession() as session:
async with session.get(url) as resp:
async with aiofiles.open(f"video/{name}",mode="wb") as f:
await f.write(await resp.content.read())
print(f'{name}下载完毕')
if __name__ == '__main__':
for i in range(1,17):
url = f'https://91kju.com/vod-play-id-61522-sid-1-pid-{i}.html'
number = main(url,i)
async.run(read_m3u8(number))
break
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment