Python爬虫学习(六)
- 下载视频(简单版)的步骤介绍
- 第一步:在网页上找到.m3u8文件
- 第二步:通过.m3u8文件下载对应的.ts视频文件
- 第三步:依据.m3u8文件合并.ts文件为一个.mp4文件
- 下载视频(复杂版)
- 下载ts文件并合并为mp4
- 使用单线程
- 使用异步协程
下载视频(简单版)的步骤介绍
"""
<video src='视频.mp4"></video:>
一般的视频网站是怎么做的?
用户上传->转码(把视频做处理,2K,1080,标清)->切片处理(把单个的文件进行拆分,形成众多的.ts文件)
需要一个文件记录:1.视频播放顺序,2.视频存放的路径,这个文件就是m3u
m3u以utf-8编码存储就是m3u8文件,本质就是一个文本文件。
M3U8 txt json =>文本
想要抓取一个视频:
1.找到 m3u8(各种手段)
2.通过 m3u8下载到 ts文件(这里先不管.ts是否被加密)
3.通过各种手段(不仅是编程手段)把ts文件合并为一个mp4文件
"""
第一步:在网页上找到.m3u8文件
(这里假设网页对应的.m3u8文件没有进行jia mi、隐藏等处理,真实的.m3u8文件下载链接直接就在页面源码中)
import requests
import re
# 第一步,下载m3u8文件
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/127.0.0.0 Safari/537.36 Edg/127.0.0.0"
}
# 测试网址,已经不能正常打开
url = "https://www.91kanju/vod-play/54812-1-1.html"
# 用来提取找到m3u8的url地址的预加载正则表达式,需要根据具体网页情况编写合适的表达式
obj = re.compile(r"url: '(?P<url>.*?)',", re.S)
resp = requests.get(url)
m3u8_url = obj.search(resp.text).group("url") # 拿到m3u8的地址
# print(m3u8_url)
resp.close()
# 下载m3u8文件
resp2 = requests.get(m3u8_url, headers=headers)
with open("video.m3u8", mode="wb") as f:
f.write(resp2.content)
resp2.close()
print("下载完毕")
第二步:通过.m3u8文件下载对应的.ts视频文件
上一步的网址不能打开,得不到对应的.m3u8文件,可以直接用下面的.m3u8文件进行测试。
测试链接:https://upyun.luckly-mjw/Assets/media-source/example/media/index.m3u8
打开下载好的.m3u8文件如下所示,不带#的行就是.ts视频文件的下载地址,只需要对其进行发送请求就能下载得到对应的.ts视频文件。
注意:这里演示的.m3u8比较特殊,直接就是完整的下载链接,大部分只有部分文件名,需要根据网页通过一些手段找到对应的网站域名或者网址前缀。
# 第二步,解析m3u8文件
with open("data_file/index.m3u8", mode="r", encoding="utf-8") as f:
for line in f:
line = line.strip() # 先去掉空格,空白,换行符
if line.startswith("#"): # 如果以#开头,跳过这一行
continue
# print(line)
ts_name = line.split('/')[-1] # test-1.ts
resp3 = requests.get(line)
f = open(f"data_file/video/{ts_name}", mode="wb")
f.write(resp3.content)
resp3.close()
print(f"{ts_name},下载成功!")
第三步:依据.m3u8文件合并.ts文件为一个.mp4文件
注意:
1、.ts文件的名称要与.m3u8文件记录的名称一样
才可以利用下面的代码进行合并。
2、这个合并代码仅适用于Windows系统
,合并二进制文件使用的是copy命令,对于mac系统应该使用cat命令,具体细节请“百度”。
import os
import subprocess
def merge_ts_to_mp4(m3u8_file, ts_folder_path, merge_video_name):
# 检查m3u8文件路径是否存在
if not os.path.isfile(m3u8_file):
print(f"错误:m3u8文件 '{m3u8_file}' 不存在!")
return
# 检查ts文件夹路径是否存在
if not os.path.isdir(ts_folder_path):
print(f"错误:TS文件夹 '{ts_folder_path}' 不存在!")
return
lst = []
try:
with open(m3u8_file, mode='r', encoding='utf-8') as f:
for line in f:
if line.startswith('#'):
continue
line = line.strip() # 去掉空格和换行
ts_name = line.split('/')[-1] # 提取文件名
ts_path = os.path.join(ts_folder_path, ts_name) # 构建完整路径
# 检查每个ts文件是否存在
if os.path.isfile(ts_path):
lst.append(ts_path)
else:
print(f"警告:TS文件 '{ts_path}' 不存在,将被跳过。")
if not lst:
print("没有有效的TS文件可供合并。")
return
temp_output_path = os.path.join(ts_folder_path, 'temp_output.ts') # 临时文件路径
total_ts_files = len(lst)
print("开始合并视频文件...")
for index, ts_file in enumerate(lst):
command = f'copy /b "{temp_output_path}" + "{ts_file}" "{temp_output_path}"' if os.path.exists(
temp_output_path) else f'copy /b "{ts_file}" "{temp_output_path}"'
result = subprocess.run(command, shell=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
if result.returncode == 0:
percentage = (index + 1) * 100 / total_ts_files
print(f"{percentage:.2f}% - 已合并: {index + 1}/{total_ts_files}")
else:
print(f"合并失败:{ts_file},跳过该文件。")
# 最终重命名
final_output_path = os.path.join(ts_folder_path, merge_video_name)
os.rename(temp_output_path, final_output_path) # 重命名临时文件为最终输出文件名
# 完成合并后提示用户
print(f"所有.ts文件已合并到:\n '{final_output_path}'。")
except Exception as e:
print(f"发生异常:{e}")
if __name__ == '__main__':
# .m3u8文件路径
m3u8_file = r'D:\User_Data\Documents\PycharmProjects\NewFile\data_file\index.m3u8'
# 从.m3u8文件下载的.ts文件的目录路径,该目录下放置下载的众多.ts文件
ts_folder_path = r'D:\User_Data\Documents\PycharmProjects\NewFile\data_file\video'
# 最终合并的视频名称,放在与.ts文件相同的目录下
merge_video_name = 'ts视频合并.mp4'
merge_ts_to_mp4(m3u8_file, ts_folder_path, merge_video_name)
运行结果如下所示:
下载视频(复杂版)
思路:
- 拿到主页面的页面源代码,找到iframe
- 从iframe的页面源代码中拿到m3u8文件
- 下载第一层m3u8文件 -->下载第二层m3u8文件(真实的视频存放路径)
- 下载视频
- 下载mi yao,进行jie mi操作
- 合并所有ts文件为一个mp4文件
注意:演示网址的视频没有jia mi,只是yin cang了真实的.m3u8下载地址,不需要jie mi,对于需要jie mi的也有代码演示,代码不可直接运行,部分存在问题,实际需要具体情况具体分析。
"""
网页播放地址:
https://www.555dy16/vodplay/128103-7-2/
iframe对应的网页地址:
https://www.555dy16/player/?url=https://vip.kuaikan-cdn4/20240808/s1aDNcWE/index.m3u8&dianshiju&next=https://www.555dy16/vodplay/128103-7-3/
iframe里面的m3u8地址:
https://vip.kuaikan-cdn4/20240808/s1aDNcWE/index.m3u8
抓包里面的m3u8地址:
第一个
https://vip.kuaikan-cdn4/20240808/s1aDNcWE/index.m3u8
预览:
#EXTM3U
#EXT-X-STREAM-INF:PROGRAM-ID=1,BANDWIDTH=933000,RESOLUTION=1280x720
/20240808/s1aDNcWE/933kb/hls/index.m3u8
第二个
https://vip.kuaikan-cdn4/20240808/s1aDNcWE/933kb/hls/index.m3u8
预览:
#EXTM3U
#EXT-X-VERSION:3
#EXT-X-TARGETDURATION:2
#EXT-X-PLAYLIST-TYPE:VOD
#EXT-X-MEDIA-SEQUENCE:0
#EXTINF:1,
/20240808/s1aDNcWE/933kb/hls/8IfxeFcu.ts
#EXTINF:1,
/20240808/s1aDNcWE/933kb/hls/d4F8NT9f.ts
#EXTINF:1,
/20240808/s1aDNcWE/933kb/hls/W8uFMNJv.ts
……
"""
import re
import os
import requests
import asyncio
import aiohttp
import aiofiles
import subprocess
from bs4 import BeautifulSoup
from Crypto.Cipher import AES
# 代码想要正常运行需要对一些位置进行适当修改,切勿直接运行!
# 对某网页的视频进行下载
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/127.0.0.0 Safari/537.36 Edg/127.0.0.0"
}
def get_iframe_src(url):
# resp = requests.get(url, headers=headers)
# resp.encoding = 'utf-8'
# print(resp.text)
# main_page = BeautifulSoup(resp.text, "html.parser")
# iframe_src = main_page.find('iframe').get('src')
# 网页原因不能找到iframe,先直接给定
iframe_src = 'https://www.555dy16/player/?url=https://vip.kuaikan-cdn4/20240808/s1aDNcWE/index.m3u8&dianshiju&next=https://www.555dy16/vodplay/128103-7-3/'
print(iframe_src)
return iframe_src
def get_first_m3u8_url(url):
resp = requests.get(url, headers=headers)
resp.encoding = 'utf-8'
# print(resp.text)
resp_html = resp.text
obj = re.compile(r'"url": "(?P<m3u8_url>.*?)"', re.S)
m3u8_url = obj.search(resp_html).group('m3u8_url')
print(m3u8_url)
return m3u8_url
def download_m3u8_file(url, file_name):
resp = requests.get(url, headers=headers)
with open('data_file/' + file_name, mode='wb') as f:
f.write(resp.content)
print(f"'{file_name}' 下载成功!")
async def download_ts(ts_url, ts_name, session):
async with session.get(ts_url) as resp:
async with aiofiles.open(f'data_file/video_ts/{ts_name}', mode='wb') as f:
await f.write(await resp.content.read()) # 下载到的内容写入到文件
print(f"{ts_name} 下载完成!")
async def aio_download(up_url):
tasks = []
async with aiohttp.ClientSession() as session: # 提前准备好session
async with aiofiles.open('data_file/' + "second_m3u8.txt", mode='r', encoding='utf-8') as f:
async for line in f:
if line.startswith('#'): # 可能pycharm提示高亮,实际运行没有问题,不必理会
continue
line = line.strip() # 去掉没用的空格和换行
# /20240808/s1aDNcWE/933kb/hls/8IfxeFcu.ts
ts_name = line.split('/')[-1]
# 8IfxeFcu.ts
# 拼接得到真正的ts下载路径
ts_url = up_url + line
# https://vip.kuaikan-cdn4/20240808/s1aDNcWE/933kb/hls/8IfxeFcu.ts
task = asyncio.create_task(download_ts(ts_url, ts_name, session)) # 创建任务
tasks.append(task)
await asyncio.wait(tasks) # 等待任务结束
def merge_ts_to_mp4():
# mac: cat 1.ts 2.ts 3.ts > xxx mp4
# windows: copy /b 1.ts+2.ts+3.ts xxx.mp4
lst = []
with open('data_file/' + "second_m3u8.txt", mode='r', encoding='utf-8') as f:
for line in f:
# line = await line # 确保获取的是字符串
if line.startswith('#'):
continue
line = line.strip() # 去掉没用的空格和换行
# /20240808/s1aDNcWE/933kb/hls/8IfxeFcu.ts
ts_name = line.split('/')[-1]
# 8IfxeFcu.ts
ts_path = 'data_file/video_ts/' + ts_name # 构建完整路径
# data_file/video_ts/8IfxeFcu.ts
lst.append(ts_path)
# Windows系统使用copy命令
temp_output_path = os.path.join('data_file/video_ts', 'temp_output.ts') # 临时文件路径
total_ts_files = len(lst) # 所有的ts文件数量
print("开始合并.ts视频文件...")
for index, ts_file in enumerate(lst):
# 使用加号连接文件名,Windows系统使用copy命令
command = f'copy /b "{temp_output_path}" + "{ts_file}" "{temp_output_path}"' if os.path.exists(
temp_output_path) else f'copy /b "{ts_file}" "{temp_output_path}"'
result = subprocess.run(command, shell=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
if result.returncode == 0:
percentage = (index + 1) * 100 / total_ts_files
print(f"{percentage:.2f}% - 已合并: {index + 1}/{total_ts_files}")
else:
print(f"合并失败:{ts_file},跳过该文件。")
# 最终重命名
final_output_path = os.path.join('data_file/video_ts', 'movies.mp4')
os.rename(temp_output_path, final_output_path) # 重命名临时文件为最终输出文件名
def main(url):
# 1.拿到主页面的页面源代码,找到iframe对应的url
iframe_src = get_iframe_src(url)
# https://www.555dy16/player/?url=https://vip.kuaikan-cdn4/20240808/s1aDNcWE/index.m3u8&dianshiju&next=https://www.555dy16/vodplay/128103-7-3/
# 2.拿到第一层的m3u8文件的下载地址,看具体情况对拿到的地址进行拼接处理
first_m3u8_url = get_first_m3u8_url(iframe_src)
# https://vip.kuaikan-cdn4/20240808/s1aDNcWE/index.m3u8
m3u8_domain = first_m3u8_url.split('')[0] + ''
# 3.1.下载第一层m3u8文件
first_txt_filename = "first_m3u8.txt"
download_m3u8_file(first_m3u8_url, first_txt_filename)
# 3.2.下载第二层m3u8文件
with open('data_file/' + first_txt_filename, mode='r', encoding='utf-8') as f:
for line in f:
if line.startswith('#'):
continue
else:
# 去掉空白或者换行符
line = line.strip() # /20240808/s1aDNcWE/933kb/hls/index.m3u8
# 拼接第二层m3u8的下载地址
second_m3u8_url = m3u8_domain + line
# https://vip.kuaikan-cdn4/20240808/s1aDNcWE/933kb/hls/index.m3u8
print(second_m3u8_url)
# 下载第二层m3u8文件
second_txt_filename = "second_m3u8.txt"
download_m3u8_file(second_m3u8_url, second_txt_filename)
# 4.下载视频
ts_domain_url = 'https://vip.kuaikan-cdn4'
# 异步协程
asyncio.run(aio_download(ts_domain_url))
# =======================这一部分看网站m3u8文件具体情况,是否需要jie mi==========================
# 关注.m3u8文件是否包含这一行:#EXT-X-KEY:METHOD=AES-128,URI="Key.Key",
# 有代表不能直接对下载的.ts文件进行合并,合并前需要对.ts文件进行jie mi,对jie mi后的.ts文件进行合并
# 5.1 拿到mi yao (后面内容仅做示范,代码不可运行,需要具体情况具体分析,为了方便理顺流程这部分函数与代码直接写在一起)
def get_key(url):
resp = requests.get(url)
# print(resp.text) # c5878c26baaaac8c,会得到诸如注释类似格式的文本
return resp.text
key_url = 'https://vip.kuaikan-cdn4/……/key.key' # 要从m3u8文件里去获取
key = get_key(key_url)
# 5.2 jie mi(要对下载的每一个.ts文件进行解密,需要使用异步协程提高效率)
async def dec_ts(ts_name, key):
aes = AES.new(key=key, IV=b"0000000000000000", mode=AES.MODE_CBC)
async with aiofiles.open(f'data_file/video_ts/{ts_name}', mode="rb") as f1, \
aiofiles.open(f'data_file/video_ts/temp_{ts_name}', mode="wb") as f2:
bs = await f1.read() # 从源文件读取内容
await f2.write(aes.decrypt(bs)) # 把解密好的内容写入文件
print(f'temp_{ts_name} 处理完毕!')
async def aio_dec(key):
# jie mi
tasks = []
async with aiofiles.open('data_file/' + "second_m3u8.txt", mode='r', encoding='utf-8') as f:
async for line in f:
# line = await line # 确保获取的是字符串
if line.startswith('#'):
continue
line = line.strip() # 去掉没用的空格和换行
# /20240808/s1aDNcWE/933kb/hls/8IfxeFcu.ts
ts_name = line.split('/')[-1]
# 8IfxeFcu.ts
# 开始创建异步任务
task = asyncio.create_task(dec_ts(ts_name, key))
tasks.append(task)
await asyncio.wait(tasks) # 等待任务结束
pass
asyncio.run(aio_dec(key))
# ======================================================================================
# 6.合并ts文件
merge_ts_to_mp4() # 合并ts文件为mp4文件
# 主程序
if __name__ == '__main__':
url = 'https://www.555dy16/vodplay/128103-7-2/'
main(url)
print("所有文件下载完毕!")
下载ts文件并合并为mp4
简单记录一下如何获取想要的m3u8与ts文件的请求地址:
注意:
- 两种方式最主要的区别在于下载ts文件的速度,经测试,对于一个合并为mp4后,大小
1.6G
左右的文件(ts文件数量超过1800个
),仅下载所有ts文件速度而言,使用单线程
下载耗时会超过1个小时
,而使用异步协程
下载耗时仅不到2分钟
,差别巨大。 - 经测试,有部分网址使用异步协程进行下载速度反而更慢甚至会报错,应该是加了fan pa机制,对于这类可以使用单线程进行下载。
- 两种方式的ts合并为mp4的函数一样,对于ts数量较多的情况耗时较长,有待进一步优化。
使用单线程
import os
import time
import requests
import subprocess
from tqdm import tqdm
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/127.0.0.0 Safari/537.36 Edg/127.0.0.0"
}
def download_m3u8(m3u8_name, m3u8_url, files_save_path):
# 下载m3u8文件
resp = requests.get(m3u8_url, headers=headers)
if resp.ok:
if not m3u8_name.endswith('.m3u8'):
m3u8_name += '.m3u8'
m3u8_file = os.path.join(files_save_path, m3u8_name)
print('1.开始下载m3u8文件...')
with open(m3u8_file, mode="wb") as f:
f.write(resp.content)
resp.close()
# print('-------------------->.m3u8文件下载完毕!<--------------------')
return m3u8_file
else:
print(f"{m3u8_name} 文件下载失败!服务器响应码:{resp.status_code}")
return None
def download_ts(m3u8_file_path, ts_url_prefix):
# 获取m3u8文件所在的文件夹路径,将下载的ts文件放到与m3u8文件相同的目录下
m3u8_folder_path = os.path.dirname(m3u8_file_path)
if ts_url_prefix[-1] != '/':
ts_url_prefix = ts_url_prefix + '/'
ts_url_lst = []
with open(m3u8_file_path, mode="r", encoding="utf-8") as f:
for line in f:
line = line.strip()
if line.startswith("#"): # 以#开头,跳过这行
continue
ts_name = line.split('/')[-1] # test-1.ts
ts_url = ts_url_prefix + ts_name
ts_url_lst.append(ts_url)
# 所有的.ts文件数量
total_ts_url = len(ts_url_lst)
print('2.开始下载ts文件...')
with tqdm(total=total_ts_url, desc='下载进度', unit='个文件') as pro_bar:
for index, ts_url in enumerate(ts_url_lst):
resp_ts = requests.get(ts_url, headers=headers)
if resp_ts.ok:
ts_name = ts_url.split('/')[-1]
ts_file = os.path.join(m3u8_folder_path, ts_name)
with open(ts_file, mode="wb") as f:
f.write(resp_ts.content)
resp_ts.close()
pro_bar.update(1) # 更新进度条
else:
print(f"下载失败:{ts_url}")
continue
def merge_ts_to_mp4(m3u8_file, ts_folder_path, merge_video_name, delete_ts_flag=False):
# 检查m3u8文件路径是否存在
if not os.path.isfile(m3u8_file):
print(f"错误:m3u8文件 '{m3u8_file}' 不存在!")
return
# 检查ts文件夹路径是否存在
if not os.path.isdir(ts_folder_path):
print(f"错误:TS文件夹 '{ts_folder_path}' 不存在!")
return
lst = []
try:
with open(m3u8_file, mode='r', encoding='utf-8') as f:
for line in f:
if line.startswith('#'):
continue
line = line.strip() # 去掉空格和换行
ts_name = line.split('/')[-1] # 提取文件名
ts_path = os.path.join(ts_folder_path, ts_name) # 构建完整路径
# 检查每个ts文件是否存在
if os.path.isfile(ts_path):
lst.append(ts_path)
else:
print(f"警告:TS文件 '{ts_path}' 不存在,将被跳过。")
if not lst:
print("没有有效的TS文件可供合并。")
return
temp_output_path = os.path.join(ts_folder_path, 'temp_output.ts') # 临时文件路径
total_ts_files = len(lst)
print("3.开始合并视频文件...")
# 使用 tqdm 显示进度条
with tqdm(total=total_ts_files, desc='合并进度', unit='个文件') as progress_bar:
for index, ts_file in enumerate(lst):
command = f'copy /b "{temp_output_path}" + "{ts_file}" "{temp_output_path}"' if os.path.exists(
temp_output_path) else f'copy /b "{ts_file}" "{temp_output_path}"'
result = subprocess.run(command, shell=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
if result.returncode == 0:
progress_bar.update(1) # 更新进度条
else:
print(f"合并失败:{ts_file},跳过该文件。")
# 最终重命名
if not merge_video_name.endswith('.mp4'):
merge_video_name += '.mp4'
final_output_path = os.path.join(ts_folder_path, merge_video_name)
os.rename(temp_output_path, final_output_path) # 重命名临时文件为最终输出文件名
# 完成合并后提示用户
print(f"-------------------->所有.ts文件已合并到:<--------------------\n '{final_output_path}'。")
if delete_ts_flag == True:
# 删除ts文件与m3u8文件
for file in os.listdir(ts_folder_path):
if file.endswith(".ts") or file.endswith(".m3u8"):
file_path = os.path.join(ts_folder_path, file)
os.remove(file_path)
print('已删除文件夹下的ts与m3u8文件!')
except Exception as e:
print(f"发生异常:{e}")
def main(m3u8_url, files_save_path, video_name):
# 1.下载m3u8文件
start0 = time.time()
m3u8_file_path = download_m3u8(video_name, m3u8_url, files_save_path)
# 2.下载ts文件
start1 = time.time()
# 通过抓包工具获取的某个具体的xxx.ts的请求url,结合m3u8文件的ts名称可以得出的所有ts文件url下载地址
ts_url_prefix = m3u8_url[:m3u8_url.rfind('/') + 1]
download_ts(m3u8_file_path, ts_url_prefix)
end1 = time.time()
minute = int((end1 - start1) / 60)
second = round((end1 - start1) - minute * 60, 2)
print(f"下载ts耗时⌛:{minute} m {second} s")
# 3.合并ts文件
start2 = time.time()
ts_folder_path = os.path.dirname(m3u8_file_path)
merge_ts_to_mp4(m3u8_file_path, ts_folder_path, video_name, delete_ts_flag=True)
end2 = time.time()
minute = int((end2 - start2) / 60)
second = round((end2 - start2) - minute * 60, 2)
print(f"合并ts耗时⌛:{minute} m {second} s")
total_minute = int((end2 - start0) / 60)
total_second = round((end2 - start0) - total_minute * 60, 2)
print(f"累计耗时⏱:{total_minute} m {total_second} s")
if __name__ == '__main__':
# 通过抓包工具获取的m3u8_url地址
m3u8_url = 'https://europe.olemovienews/ts3/20240811/kAtnpIai/mp4/kAtnpIai.mp4/index-v1-a1.m3u8'
# 下载的文件保存路径
files_save_path = r'D:\User_Data\Videos\Movies_Download\测试'
# 下载的视频名称,作为下载的.m3u8文件名、最终合并的.mp4文件名,不用加任何后缀
video_name = '某逆 49'
# 执行下载视频程序
main(m3u8_url, files_save_path, video_name)
使用异步协程
import os
import time
import asyncio
import aiohttp
import subprocess
from tqdm import tqdm
# 请求头
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/127.0.0.0 Safari/537.36 Edg/127.0.0.0"
}
async def download_m3u8(m3u8_name, m3u8_url, files_save_path):
async with aiohttp.ClientSession() as session:
async with session.get(m3u8_url, headers=headers) as resp:
if resp.status == 200:
if not m3u8_name.endswith('.m3u8'):
m3u8_name += '.m3u8'
m3u8_file = os.path.join(files_save_path, m3u8_name)
print('1.开始下载m3u8文件...')
with open(m3u8_file, mode="wb") as f:
f.write(await resp.read())
print('-------------------->.m3u8文件下载完毕!<--------------------')
return m3u8_file
else:
print(f"{m3u8_name} 文件下载失败!服务器响应码:{resp.status}")
return None
async def download_ts(session, ts_url, ts_folder_path, retries=3):
for attempt in range(retries):
try:
async with session.get(ts_url, headers=headers) as resp_ts:
resp_ts.raise_for_status() # 抛出 HTTPError
ts_name = ts_url.split('/')[-1]
ts_file = os.path.join(ts_folder_path, ts_name)
with open(ts_file, mode="wb") as f:
f.write(await resp_ts.read())
return True
except (aiohttp.ClientError, asyncio.TimeoutError) as e:
if attempt < retries - 1:
print(f"下载失败,正在重试...({attempt + 1}) {ts_url}")
await asyncio.sleep(1) # 等待一秒再重试
else:
print(f"下载失败,已达到最大重试次数: {ts_url},错误: {e}")
return False
async def download_all_ts(m3u8_file_path, ts_url_prefix):
m3u8_folder_path = os.path.dirname(m3u8_file_path)
# 生成所有TS文件的URL
ts_url_lst = []
with open(m3u8_file_path, mode="r", encoding="utf-8") as f:
for line in f:
line = line.strip()
if line.startswith("#"):
continue
ts_name = line.split('/')[-1]
ts_url = ts_url_prefix + ts_name
ts_url_lst.append(ts_url)
total_ts_url = len(ts_url_lst)
print('2.开始下载ts文件...')
# 使用 asyncio.Semaphore 控制并发数
semaphore = asyncio.Semaphore(5) # 限制同时请求的数量
async with aiohttp.ClientSession() as session:
tasks = []
for ts_url in ts_url_lst:
task = download_ts(session, ts_url, m3u8_folder_path)
tasks.append(task)
with tqdm(total=total_ts_url, desc='下载进度', unit='个文件') as pro_bar:
for result in await asyncio.gather(*tasks):
if result:
pro_bar.update(1)
def merge_ts_to_mp4(m3u8_file, ts_folder_path, merge_video_name, delete_ts_flag=False):
# 检查m3u8文件路径是否存在
if not os.path.isfile(m3u8_file):
print(f"错误:m3u8文件 '{m3u8_file}' 不存在!")
return
# 检查ts文件夹路径是否存在
if not os.path.isdir(ts_folder_path):
print(f"错误:TS文件夹 '{ts_folder_path}' 不存在!")
return
lst = []
try:
with open(m3u8_file, mode='r', encoding='utf-8') as f:
for line in f:
if line.startswith('#'):
continue
line = line.strip() # 去掉空格和换行
ts_name = line.split('/')[-1] # 提取文件名
ts_path = os.path.join(ts_folder_path, ts_name) # 构建完整路径
# 检查每个ts文件是否存在
if os.path.isfile(ts_path):
lst.append(ts_path)
else:
print(f"警告:TS文件 '{ts_path}' 不存在,将被跳过。")
if not lst:
print("没有有效的TS文件可供合并。")
return
temp_output_path = os.path.join(ts_folder_path, 'temp_output.ts') # 临时文件路径
total_ts_files = len(lst)
print("3.开始合并视频文件...")
# 使用 tqdm 显示进度条
with tqdm(total=total_ts_files, desc='合并进度', unit='个文件') as progress_bar:
for index, ts_file in enumerate(lst):
command = f'copy /b "{temp_output_path}" + "{ts_file}" "{temp_output_path}"' if os.path.exists(
temp_output_path) else f'copy /b "{ts_file}" "{temp_output_path}"'
result = subprocess.run(command, shell=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
if result.returncode == 0:
progress_bar.update(1) # 更新进度条
else:
print(f"合并失败:{ts_file},跳过该文件。")
# 最终重命名
if not merge_video_name.endswith('.mp4'):
merge_video_name += '.mp4'
final_output_path = os.path.join(ts_folder_path, merge_video_name)
os.rename(temp_output_path, final_output_path) # 重命名临时文件为最终输出文件名
# 完成合并后提示用户
print(f"-------------------->所有.ts文件已合并到:<--------------------\n '{final_output_path}'。")
if delete_ts_flag == True:
# 删除临时ts文件与m3u8文件
for file in os.listdir(ts_folder_path):
if file.endswith(".ts") or file.endswith(".m3u8"):
file_path = os.path.join(ts_folder_path, file)
os.remove(file_path)
print('已删除文件夹下的ts与m3u8文件!')
except Exception as e:
print(f"发生异常:{e}")
def get_non_ad_ts_start_str(m3u8_file_path):
with open(m3u8_file_path, mode="r", encoding="utf-8") as f:
for line in f:
line = line.strip()
if line.startswith("#"):
continue
ts_name = line.split('/')[-1]
ts_name_start_str = ts_name[0:3]
return ts_name_start_str
def remove_ad_ts_files(m3u8_file_path, non_ad_ts_start_str):
ts_folder_path = os.path.dirname(m3u8_file_path)
# 删除广告对应的ts文件
for file in os.listdir(ts_folder_path):
if file.endswith(".ts") and not file.startswith(non_ad_ts_start_str):
ad_ts_file_path = os.path.join(ts_folder_path, file)
print(ad_ts_file_path)
os.remove(ad_ts_file_path)
print('已删除对应为广告的ts文件!')
async def main(m3u8_url, files_save_path, video_name):
start_time = time.time()
# 1. 下载 m3u8 文件
m3u8_file_path = await download_m3u8("video", m3u8_url, files_save_path)
if not m3u8_file_path:
return
# 2. 读取 m3u8 文件并下载所有 ts 文件
download_ts_start_time = time.time()
ts_url_prefix = '/'.join(m3u8_url.split('/')[:-1]) + '/'
await download_all_ts(m3u8_file_path, ts_url_prefix)
download_ts_end_time = time.time()
elapsed_time_ts = download_ts_end_time - download_ts_start_time
minute_ts = int(elapsed_time_ts / 60)
second_ts = round(elapsed_time_ts - minute_ts * 60, 2)
print(f"下载ts耗时⌛:{minute_ts} m {second_ts} s")
# 去除广告(可选),视具体情况而定,去除原理为有用的ts文件名开头字符都一样只有末尾存在差别
non_ad_ts_start_str = get_non_ad_ts_start_str(m3u8_file_path)
print(f'非广告的ts文件名开头字符:{non_ad_ts_start_str}')
remove_ad_ts_files(m3u8_file_path, non_ad_ts_start_str)
# 3. 合并 ts 文件到 mp4
merge_ts_to_mp4(m3u8_file_path, os.path.dirname(m3u8_file_path), video_name, delete_ts_flag=True)
end_time = time.time()
elapsed_time = end_time - start_time
minute = int(elapsed_time / 60)
second = round(elapsed_time - minute * 60, 2)
print(f"全部过程耗时⏱:{minute} m {second} s")
if __name__ == "__main__":
# 通过抓包工具获取的m3u8_url地址
m3u8_url = 'https://vip.ffzy-play6/20240817/28179_cddca21d/2000k/hls/mixed.m3u8'
# 下载的文件保存路径
files_save_path = r'D:\User_Data\Videos\Movies_Download\ceshi'
# 下载的视频名称,作为下载的.m3u8文件名、最终合并的.mp4文件名,不用加任何后缀
video_name = 'ceshi'
# 执行下载视频程序
asyncio.run(main(m3u8_url, files_save_path, video_name))
Python爬虫学习(六)
- 下载视频(简单版)的步骤介绍
- 第一步:在网页上找到.m3u8文件
- 第二步:通过.m3u8文件下载对应的.ts视频文件
- 第三步:依据.m3u8文件合并.ts文件为一个.mp4文件
- 下载视频(复杂版)
- 下载ts文件并合并为mp4
- 使用单线程
- 使用异步协程
下载视频(简单版)的步骤介绍
"""
<video src='视频.mp4"></video:>
一般的视频网站是怎么做的?
用户上传->转码(把视频做处理,2K,1080,标清)->切片处理(把单个的文件进行拆分,形成众多的.ts文件)
需要一个文件记录:1.视频播放顺序,2.视频存放的路径,这个文件就是m3u
m3u以utf-8编码存储就是m3u8文件,本质就是一个文本文件。
M3U8 txt json =>文本
想要抓取一个视频:
1.找到 m3u8(各种手段)
2.通过 m3u8下载到 ts文件(这里先不管.ts是否被加密)
3.通过各种手段(不仅是编程手段)把ts文件合并为一个mp4文件
"""
第一步:在网页上找到.m3u8文件
(这里假设网页对应的.m3u8文件没有进行jia mi、隐藏等处理,真实的.m3u8文件下载链接直接就在页面源码中)
import requests
import re
# 第一步,下载m3u8文件
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/127.0.0.0 Safari/537.36 Edg/127.0.0.0"
}
# 测试网址,已经不能正常打开
url = "https://www.91kanju/vod-play/54812-1-1.html"
# 用来提取找到m3u8的url地址的预加载正则表达式,需要根据具体网页情况编写合适的表达式
obj = re.compile(r"url: '(?P<url>.*?)',", re.S)
resp = requests.get(url)
m3u8_url = obj.search(resp.text).group("url") # 拿到m3u8的地址
# print(m3u8_url)
resp.close()
# 下载m3u8文件
resp2 = requests.get(m3u8_url, headers=headers)
with open("video.m3u8", mode="wb") as f:
f.write(resp2.content)
resp2.close()
print("下载完毕")
第二步:通过.m3u8文件下载对应的.ts视频文件
上一步的网址不能打开,得不到对应的.m3u8文件,可以直接用下面的.m3u8文件进行测试。
测试链接:https://upyun.luckly-mjw/Assets/media-source/example/media/index.m3u8
打开下载好的.m3u8文件如下所示,不带#的行就是.ts视频文件的下载地址,只需要对其进行发送请求就能下载得到对应的.ts视频文件。
注意:这里演示的.m3u8比较特殊,直接就是完整的下载链接,大部分只有部分文件名,需要根据网页通过一些手段找到对应的网站域名或者网址前缀。
# 第二步,解析m3u8文件
with open("data_file/index.m3u8", mode="r", encoding="utf-8") as f:
for line in f:
line = line.strip() # 先去掉空格,空白,换行符
if line.startswith("#"): # 如果以#开头,跳过这一行
continue
# print(line)
ts_name = line.split('/')[-1] # test-1.ts
resp3 = requests.get(line)
f = open(f"data_file/video/{ts_name}", mode="wb")
f.write(resp3.content)
resp3.close()
print(f"{ts_name},下载成功!")
第三步:依据.m3u8文件合并.ts文件为一个.mp4文件
注意:
1、.ts文件的名称要与.m3u8文件记录的名称一样
才可以利用下面的代码进行合并。
2、这个合并代码仅适用于Windows系统
,合并二进制文件使用的是copy命令,对于mac系统应该使用cat命令,具体细节请“百度”。
import os
import subprocess
def merge_ts_to_mp4(m3u8_file, ts_folder_path, merge_video_name):
# 检查m3u8文件路径是否存在
if not os.path.isfile(m3u8_file):
print(f"错误:m3u8文件 '{m3u8_file}' 不存在!")
return
# 检查ts文件夹路径是否存在
if not os.path.isdir(ts_folder_path):
print(f"错误:TS文件夹 '{ts_folder_path}' 不存在!")
return
lst = []
try:
with open(m3u8_file, mode='r', encoding='utf-8') as f:
for line in f:
if line.startswith('#'):
continue
line = line.strip() # 去掉空格和换行
ts_name = line.split('/')[-1] # 提取文件名
ts_path = os.path.join(ts_folder_path, ts_name) # 构建完整路径
# 检查每个ts文件是否存在
if os.path.isfile(ts_path):
lst.append(ts_path)
else:
print(f"警告:TS文件 '{ts_path}' 不存在,将被跳过。")
if not lst:
print("没有有效的TS文件可供合并。")
return
temp_output_path = os.path.join(ts_folder_path, 'temp_output.ts') # 临时文件路径
total_ts_files = len(lst)
print("开始合并视频文件...")
for index, ts_file in enumerate(lst):
command = f'copy /b "{temp_output_path}" + "{ts_file}" "{temp_output_path}"' if os.path.exists(
temp_output_path) else f'copy /b "{ts_file}" "{temp_output_path}"'
result = subprocess.run(command, shell=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
if result.returncode == 0:
percentage = (index + 1) * 100 / total_ts_files
print(f"{percentage:.2f}% - 已合并: {index + 1}/{total_ts_files}")
else:
print(f"合并失败:{ts_file},跳过该文件。")
# 最终重命名
final_output_path = os.path.join(ts_folder_path, merge_video_name)
os.rename(temp_output_path, final_output_path) # 重命名临时文件为最终输出文件名
# 完成合并后提示用户
print(f"所有.ts文件已合并到:\n '{final_output_path}'。")
except Exception as e:
print(f"发生异常:{e}")
if __name__ == '__main__':
# .m3u8文件路径
m3u8_file = r'D:\User_Data\Documents\PycharmProjects\NewFile\data_file\index.m3u8'
# 从.m3u8文件下载的.ts文件的目录路径,该目录下放置下载的众多.ts文件
ts_folder_path = r'D:\User_Data\Documents\PycharmProjects\NewFile\data_file\video'
# 最终合并的视频名称,放在与.ts文件相同的目录下
merge_video_name = 'ts视频合并.mp4'
merge_ts_to_mp4(m3u8_file, ts_folder_path, merge_video_name)
运行结果如下所示:
下载视频(复杂版)
思路:
- 拿到主页面的页面源代码,找到iframe
- 从iframe的页面源代码中拿到m3u8文件
- 下载第一层m3u8文件 -->下载第二层m3u8文件(真实的视频存放路径)
- 下载视频
- 下载mi yao,进行jie mi操作
- 合并所有ts文件为一个mp4文件
注意:演示网址的视频没有jia mi,只是yin cang了真实的.m3u8下载地址,不需要jie mi,对于需要jie mi的也有代码演示,代码不可直接运行,部分存在问题,实际需要具体情况具体分析。
"""
网页播放地址:
https://www.555dy16/vodplay/128103-7-2/
iframe对应的网页地址:
https://www.555dy16/player/?url=https://vip.kuaikan-cdn4/20240808/s1aDNcWE/index.m3u8&dianshiju&next=https://www.555dy16/vodplay/128103-7-3/
iframe里面的m3u8地址:
https://vip.kuaikan-cdn4/20240808/s1aDNcWE/index.m3u8
抓包里面的m3u8地址:
第一个
https://vip.kuaikan-cdn4/20240808/s1aDNcWE/index.m3u8
预览:
#EXTM3U
#EXT-X-STREAM-INF:PROGRAM-ID=1,BANDWIDTH=933000,RESOLUTION=1280x720
/20240808/s1aDNcWE/933kb/hls/index.m3u8
第二个
https://vip.kuaikan-cdn4/20240808/s1aDNcWE/933kb/hls/index.m3u8
预览:
#EXTM3U
#EXT-X-VERSION:3
#EXT-X-TARGETDURATION:2
#EXT-X-PLAYLIST-TYPE:VOD
#EXT-X-MEDIA-SEQUENCE:0
#EXTINF:1,
/20240808/s1aDNcWE/933kb/hls/8IfxeFcu.ts
#EXTINF:1,
/20240808/s1aDNcWE/933kb/hls/d4F8NT9f.ts
#EXTINF:1,
/20240808/s1aDNcWE/933kb/hls/W8uFMNJv.ts
……
"""
import re
import os
import requests
import asyncio
import aiohttp
import aiofiles
import subprocess
from bs4 import BeautifulSoup
from Crypto.Cipher import AES
# 代码想要正常运行需要对一些位置进行适当修改,切勿直接运行!
# 对某网页的视频进行下载
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/127.0.0.0 Safari/537.36 Edg/127.0.0.0"
}
def get_iframe_src(url):
# resp = requests.get(url, headers=headers)
# resp.encoding = 'utf-8'
# print(resp.text)
# main_page = BeautifulSoup(resp.text, "html.parser")
# iframe_src = main_page.find('iframe').get('src')
# 网页原因不能找到iframe,先直接给定
iframe_src = 'https://www.555dy16/player/?url=https://vip.kuaikan-cdn4/20240808/s1aDNcWE/index.m3u8&dianshiju&next=https://www.555dy16/vodplay/128103-7-3/'
print(iframe_src)
return iframe_src
def get_first_m3u8_url(url):
resp = requests.get(url, headers=headers)
resp.encoding = 'utf-8'
# print(resp.text)
resp_html = resp.text
obj = re.compile(r'"url": "(?P<m3u8_url>.*?)"', re.S)
m3u8_url = obj.search(resp_html).group('m3u8_url')
print(m3u8_url)
return m3u8_url
def download_m3u8_file(url, file_name):
resp = requests.get(url, headers=headers)
with open('data_file/' + file_name, mode='wb') as f:
f.write(resp.content)
print(f"'{file_name}' 下载成功!")
async def download_ts(ts_url, ts_name, session):
async with session.get(ts_url) as resp:
async with aiofiles.open(f'data_file/video_ts/{ts_name}', mode='wb') as f:
await f.write(await resp.content.read()) # 下载到的内容写入到文件
print(f"{ts_name} 下载完成!")
async def aio_download(up_url):
tasks = []
async with aiohttp.ClientSession() as session: # 提前准备好session
async with aiofiles.open('data_file/' + "second_m3u8.txt", mode='r', encoding='utf-8') as f:
async for line in f:
if line.startswith('#'): # 可能pycharm提示高亮,实际运行没有问题,不必理会
continue
line = line.strip() # 去掉没用的空格和换行
# /20240808/s1aDNcWE/933kb/hls/8IfxeFcu.ts
ts_name = line.split('/')[-1]
# 8IfxeFcu.ts
# 拼接得到真正的ts下载路径
ts_url = up_url + line
# https://vip.kuaikan-cdn4/20240808/s1aDNcWE/933kb/hls/8IfxeFcu.ts
task = asyncio.create_task(download_ts(ts_url, ts_name, session)) # 创建任务
tasks.append(task)
await asyncio.wait(tasks) # 等待任务结束
def merge_ts_to_mp4():
# mac: cat 1.ts 2.ts 3.ts > xxx mp4
# windows: copy /b 1.ts+2.ts+3.ts xxx.mp4
lst = []
with open('data_file/' + "second_m3u8.txt", mode='r', encoding='utf-8') as f:
for line in f:
# line = await line # 确保获取的是字符串
if line.startswith('#'):
continue
line = line.strip() # 去掉没用的空格和换行
# /20240808/s1aDNcWE/933kb/hls/8IfxeFcu.ts
ts_name = line.split('/')[-1]
# 8IfxeFcu.ts
ts_path = 'data_file/video_ts/' + ts_name # 构建完整路径
# data_file/video_ts/8IfxeFcu.ts
lst.append(ts_path)
# Windows系统使用copy命令
temp_output_path = os.path.join('data_file/video_ts', 'temp_output.ts') # 临时文件路径
total_ts_files = len(lst) # 所有的ts文件数量
print("开始合并.ts视频文件...")
for index, ts_file in enumerate(lst):
# 使用加号连接文件名,Windows系统使用copy命令
command = f'copy /b "{temp_output_path}" + "{ts_file}" "{temp_output_path}"' if os.path.exists(
temp_output_path) else f'copy /b "{ts_file}" "{temp_output_path}"'
result = subprocess.run(command, shell=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
if result.returncode == 0:
percentage = (index + 1) * 100 / total_ts_files
print(f"{percentage:.2f}% - 已合并: {index + 1}/{total_ts_files}")
else:
print(f"合并失败:{ts_file},跳过该文件。")
# 最终重命名
final_output_path = os.path.join('data_file/video_ts', 'movies.mp4')
os.rename(temp_output_path, final_output_path) # 重命名临时文件为最终输出文件名
def main(url):
# 1.拿到主页面的页面源代码,找到iframe对应的url
iframe_src = get_iframe_src(url)
# https://www.555dy16/player/?url=https://vip.kuaikan-cdn4/20240808/s1aDNcWE/index.m3u8&dianshiju&next=https://www.555dy16/vodplay/128103-7-3/
# 2.拿到第一层的m3u8文件的下载地址,看具体情况对拿到的地址进行拼接处理
first_m3u8_url = get_first_m3u8_url(iframe_src)
# https://vip.kuaikan-cdn4/20240808/s1aDNcWE/index.m3u8
m3u8_domain = first_m3u8_url.split('')[0] + ''
# 3.1.下载第一层m3u8文件
first_txt_filename = "first_m3u8.txt"
download_m3u8_file(first_m3u8_url, first_txt_filename)
# 3.2.下载第二层m3u8文件
with open('data_file/' + first_txt_filename, mode='r', encoding='utf-8') as f:
for line in f:
if line.startswith('#'):
continue
else:
# 去掉空白或者换行符
line = line.strip() # /20240808/s1aDNcWE/933kb/hls/index.m3u8
# 拼接第二层m3u8的下载地址
second_m3u8_url = m3u8_domain + line
# https://vip.kuaikan-cdn4/20240808/s1aDNcWE/933kb/hls/index.m3u8
print(second_m3u8_url)
# 下载第二层m3u8文件
second_txt_filename = "second_m3u8.txt"
download_m3u8_file(second_m3u8_url, second_txt_filename)
# 4.下载视频
ts_domain_url = 'https://vip.kuaikan-cdn4'
# 异步协程
asyncio.run(aio_download(ts_domain_url))
# =======================这一部分看网站m3u8文件具体情况,是否需要jie mi==========================
# 关注.m3u8文件是否包含这一行:#EXT-X-KEY:METHOD=AES-128,URI="Key.Key",
# 有代表不能直接对下载的.ts文件进行合并,合并前需要对.ts文件进行jie mi,对jie mi后的.ts文件进行合并
# 5.1 拿到mi yao (后面内容仅做示范,代码不可运行,需要具体情况具体分析,为了方便理顺流程这部分函数与代码直接写在一起)
def get_key(url):
resp = requests.get(url)
# print(resp.text) # c5878c26baaaac8c,会得到诸如注释类似格式的文本
return resp.text
key_url = 'https://vip.kuaikan-cdn4/……/key.key' # 要从m3u8文件里去获取
key = get_key(key_url)
# 5.2 jie mi(要对下载的每一个.ts文件进行解密,需要使用异步协程提高效率)
async def dec_ts(ts_name, key):
aes = AES.new(key=key, IV=b"0000000000000000", mode=AES.MODE_CBC)
async with aiofiles.open(f'data_file/video_ts/{ts_name}', mode="rb") as f1, \
aiofiles.open(f'data_file/video_ts/temp_{ts_name}', mode="wb") as f2:
bs = await f1.read() # 从源文件读取内容
await f2.write(aes.decrypt(bs)) # 把解密好的内容写入文件
print(f'temp_{ts_name} 处理完毕!')
async def aio_dec(key):
# jie mi
tasks = []
async with aiofiles.open('data_file/' + "second_m3u8.txt", mode='r', encoding='utf-8') as f:
async for line in f:
# line = await line # 确保获取的是字符串
if line.startswith('#'):
continue
line = line.strip() # 去掉没用的空格和换行
# /20240808/s1aDNcWE/933kb/hls/8IfxeFcu.ts
ts_name = line.split('/')[-1]
# 8IfxeFcu.ts
# 开始创建异步任务
task = asyncio.create_task(dec_ts(ts_name, key))
tasks.append(task)
await asyncio.wait(tasks) # 等待任务结束
pass
asyncio.run(aio_dec(key))
# ======================================================================================
# 6.合并ts文件
merge_ts_to_mp4() # 合并ts文件为mp4文件
# 主程序
if __name__ == '__main__':
url = 'https://www.555dy16/vodplay/128103-7-2/'
main(url)
print("所有文件下载完毕!")
下载ts文件并合并为mp4
简单记录一下如何获取想要的m3u8与ts文件的请求地址:
注意:
- 两种方式最主要的区别在于下载ts文件的速度,经测试,对于一个合并为mp4后,大小
1.6G
左右的文件(ts文件数量超过1800个
),仅下载所有ts文件速度而言,使用单线程
下载耗时会超过1个小时
,而使用异步协程
下载耗时仅不到2分钟
,差别巨大。 - 经测试,有部分网址使用异步协程进行下载速度反而更慢甚至会报错,应该是加了fan pa机制,对于这类可以使用单线程进行下载。
- 两种方式的ts合并为mp4的函数一样,对于ts数量较多的情况耗时较长,有待进一步优化。
使用单线程
import os
import time
import requests
import subprocess
from tqdm import tqdm
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/127.0.0.0 Safari/537.36 Edg/127.0.0.0"
}
def download_m3u8(m3u8_name, m3u8_url, files_save_path):
# 下载m3u8文件
resp = requests.get(m3u8_url, headers=headers)
if resp.ok:
if not m3u8_name.endswith('.m3u8'):
m3u8_name += '.m3u8'
m3u8_file = os.path.join(files_save_path, m3u8_name)
print('1.开始下载m3u8文件...')
with open(m3u8_file, mode="wb") as f:
f.write(resp.content)
resp.close()
# print('-------------------->.m3u8文件下载完毕!<--------------------')
return m3u8_file
else:
print(f"{m3u8_name} 文件下载失败!服务器响应码:{resp.status_code}")
return None
def download_ts(m3u8_file_path, ts_url_prefix):
# 获取m3u8文件所在的文件夹路径,将下载的ts文件放到与m3u8文件相同的目录下
m3u8_folder_path = os.path.dirname(m3u8_file_path)
if ts_url_prefix[-1] != '/':
ts_url_prefix = ts_url_prefix + '/'
ts_url_lst = []
with open(m3u8_file_path, mode="r", encoding="utf-8") as f:
for line in f:
line = line.strip()
if line.startswith("#"): # 以#开头,跳过这行
continue
ts_name = line.split('/')[-1] # test-1.ts
ts_url = ts_url_prefix + ts_name
ts_url_lst.append(ts_url)
# 所有的.ts文件数量
total_ts_url = len(ts_url_lst)
print('2.开始下载ts文件...')
with tqdm(total=total_ts_url, desc='下载进度', unit='个文件') as pro_bar:
for index, ts_url in enumerate(ts_url_lst):
resp_ts = requests.get(ts_url, headers=headers)
if resp_ts.ok:
ts_name = ts_url.split('/')[-1]
ts_file = os.path.join(m3u8_folder_path, ts_name)
with open(ts_file, mode="wb") as f:
f.write(resp_ts.content)
resp_ts.close()
pro_bar.update(1) # 更新进度条
else:
print(f"下载失败:{ts_url}")
continue
def merge_ts_to_mp4(m3u8_file, ts_folder_path, merge_video_name, delete_ts_flag=False):
# 检查m3u8文件路径是否存在
if not os.path.isfile(m3u8_file):
print(f"错误:m3u8文件 '{m3u8_file}' 不存在!")
return
# 检查ts文件夹路径是否存在
if not os.path.isdir(ts_folder_path):
print(f"错误:TS文件夹 '{ts_folder_path}' 不存在!")
return
lst = []
try:
with open(m3u8_file, mode='r', encoding='utf-8') as f:
for line in f:
if line.startswith('#'):
continue
line = line.strip() # 去掉空格和换行
ts_name = line.split('/')[-1] # 提取文件名
ts_path = os.path.join(ts_folder_path, ts_name) # 构建完整路径
# 检查每个ts文件是否存在
if os.path.isfile(ts_path):
lst.append(ts_path)
else:
print(f"警告:TS文件 '{ts_path}' 不存在,将被跳过。")
if not lst:
print("没有有效的TS文件可供合并。")
return
temp_output_path = os.path.join(ts_folder_path, 'temp_output.ts') # 临时文件路径
total_ts_files = len(lst)
print("3.开始合并视频文件...")
# 使用 tqdm 显示进度条
with tqdm(total=total_ts_files, desc='合并进度', unit='个文件') as progress_bar:
for index, ts_file in enumerate(lst):
command = f'copy /b "{temp_output_path}" + "{ts_file}" "{temp_output_path}"' if os.path.exists(
temp_output_path) else f'copy /b "{ts_file}" "{temp_output_path}"'
result = subprocess.run(command, shell=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
if result.returncode == 0:
progress_bar.update(1) # 更新进度条
else:
print(f"合并失败:{ts_file},跳过该文件。")
# 最终重命名
if not merge_video_name.endswith('.mp4'):
merge_video_name += '.mp4'
final_output_path = os.path.join(ts_folder_path, merge_video_name)
os.rename(temp_output_path, final_output_path) # 重命名临时文件为最终输出文件名
# 完成合并后提示用户
print(f"-------------------->所有.ts文件已合并到:<--------------------\n '{final_output_path}'。")
if delete_ts_flag == True:
# 删除ts文件与m3u8文件
for file in os.listdir(ts_folder_path):
if file.endswith(".ts") or file.endswith(".m3u8"):
file_path = os.path.join(ts_folder_path, file)
os.remove(file_path)
print('已删除文件夹下的ts与m3u8文件!')
except Exception as e:
print(f"发生异常:{e}")
def main(m3u8_url, files_save_path, video_name):
# 1.下载m3u8文件
start0 = time.time()
m3u8_file_path = download_m3u8(video_name, m3u8_url, files_save_path)
# 2.下载ts文件
start1 = time.time()
# 通过抓包工具获取的某个具体的xxx.ts的请求url,结合m3u8文件的ts名称可以得出的所有ts文件url下载地址
ts_url_prefix = m3u8_url[:m3u8_url.rfind('/') + 1]
download_ts(m3u8_file_path, ts_url_prefix)
end1 = time.time()
minute = int((end1 - start1) / 60)
second = round((end1 - start1) - minute * 60, 2)
print(f"下载ts耗时⌛:{minute} m {second} s")
# 3.合并ts文件
start2 = time.time()
ts_folder_path = os.path.dirname(m3u8_file_path)
merge_ts_to_mp4(m3u8_file_path, ts_folder_path, video_name, delete_ts_flag=True)
end2 = time.time()
minute = int((end2 - start2) / 60)
second = round((end2 - start2) - minute * 60, 2)
print(f"合并ts耗时⌛:{minute} m {second} s")
total_minute = int((end2 - start0) / 60)
total_second = round((end2 - start0) - total_minute * 60, 2)
print(f"累计耗时⏱:{total_minute} m {total_second} s")
if __name__ == '__main__':
# 通过抓包工具获取的m3u8_url地址
m3u8_url = 'https://europe.olemovienews/ts3/20240811/kAtnpIai/mp4/kAtnpIai.mp4/index-v1-a1.m3u8'
# 下载的文件保存路径
files_save_path = r'D:\User_Data\Videos\Movies_Download\测试'
# 下载的视频名称,作为下载的.m3u8文件名、最终合并的.mp4文件名,不用加任何后缀
video_name = '某逆 49'
# 执行下载视频程序
main(m3u8_url, files_save_path, video_name)
使用异步协程
import os
import time
import asyncio
import aiohttp
import subprocess
from tqdm import tqdm
# 请求头
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/127.0.0.0 Safari/537.36 Edg/127.0.0.0"
}
async def download_m3u8(m3u8_name, m3u8_url, files_save_path):
async with aiohttp.ClientSession() as session:
async with session.get(m3u8_url, headers=headers) as resp:
if resp.status == 200:
if not m3u8_name.endswith('.m3u8'):
m3u8_name += '.m3u8'
m3u8_file = os.path.join(files_save_path, m3u8_name)
print('1.开始下载m3u8文件...')
with open(m3u8_file, mode="wb") as f:
f.write(await resp.read())
print('-------------------->.m3u8文件下载完毕!<--------------------')
return m3u8_file
else:
print(f"{m3u8_name} 文件下载失败!服务器响应码:{resp.status}")
return None
async def download_ts(session, ts_url, ts_folder_path, retries=3):
for attempt in range(retries):
try:
async with session.get(ts_url, headers=headers) as resp_ts:
resp_ts.raise_for_status() # 抛出 HTTPError
ts_name = ts_url.split('/')[-1]
ts_file = os.path.join(ts_folder_path, ts_name)
with open(ts_file, mode="wb") as f:
f.write(await resp_ts.read())
return True
except (aiohttp.ClientError, asyncio.TimeoutError) as e:
if attempt < retries - 1:
print(f"下载失败,正在重试...({attempt + 1}) {ts_url}")
await asyncio.sleep(1) # 等待一秒再重试
else:
print(f"下载失败,已达到最大重试次数: {ts_url},错误: {e}")
return False
async def download_all_ts(m3u8_file_path, ts_url_prefix):
m3u8_folder_path = os.path.dirname(m3u8_file_path)
# 生成所有TS文件的URL
ts_url_lst = []
with open(m3u8_file_path, mode="r", encoding="utf-8") as f:
for line in f:
line = line.strip()
if line.startswith("#"):
continue
ts_name = line.split('/')[-1]
ts_url = ts_url_prefix + ts_name
ts_url_lst.append(ts_url)
total_ts_url = len(ts_url_lst)
print('2.开始下载ts文件...')
# 使用 asyncio.Semaphore 控制并发数
semaphore = asyncio.Semaphore(5) # 限制同时请求的数量
async with aiohttp.ClientSession() as session:
tasks = []
for ts_url in ts_url_lst:
task = download_ts(session, ts_url, m3u8_folder_path)
tasks.append(task)
with tqdm(total=total_ts_url, desc='下载进度', unit='个文件') as pro_bar:
for result in await asyncio.gather(*tasks):
if result:
pro_bar.update(1)
def merge_ts_to_mp4(m3u8_file, ts_folder_path, merge_video_name, delete_ts_flag=False):
# 检查m3u8文件路径是否存在
if not os.path.isfile(m3u8_file):
print(f"错误:m3u8文件 '{m3u8_file}' 不存在!")
return
# 检查ts文件夹路径是否存在
if not os.path.isdir(ts_folder_path):
print(f"错误:TS文件夹 '{ts_folder_path}' 不存在!")
return
lst = []
try:
with open(m3u8_file, mode='r', encoding='utf-8') as f:
for line in f:
if line.startswith('#'):
continue
line = line.strip() # 去掉空格和换行
ts_name = line.split('/')[-1] # 提取文件名
ts_path = os.path.join(ts_folder_path, ts_name) # 构建完整路径
# 检查每个ts文件是否存在
if os.path.isfile(ts_path):
lst.append(ts_path)
else:
print(f"警告:TS文件 '{ts_path}' 不存在,将被跳过。")
if not lst:
print("没有有效的TS文件可供合并。")
return
temp_output_path = os.path.join(ts_folder_path, 'temp_output.ts') # 临时文件路径
total_ts_files = len(lst)
print("3.开始合并视频文件...")
# 使用 tqdm 显示进度条
with tqdm(total=total_ts_files, desc='合并进度', unit='个文件') as progress_bar:
for index, ts_file in enumerate(lst):
command = f'copy /b "{temp_output_path}" + "{ts_file}" "{temp_output_path}"' if os.path.exists(
temp_output_path) else f'copy /b "{ts_file}" "{temp_output_path}"'
result = subprocess.run(command, shell=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
if result.returncode == 0:
progress_bar.update(1) # 更新进度条
else:
print(f"合并失败:{ts_file},跳过该文件。")
# 最终重命名
if not merge_video_name.endswith('.mp4'):
merge_video_name += '.mp4'
final_output_path = os.path.join(ts_folder_path, merge_video_name)
os.rename(temp_output_path, final_output_path) # 重命名临时文件为最终输出文件名
# 完成合并后提示用户
print(f"-------------------->所有.ts文件已合并到:<--------------------\n '{final_output_path}'。")
if delete_ts_flag == True:
# 删除临时ts文件与m3u8文件
for file in os.listdir(ts_folder_path):
if file.endswith(".ts") or file.endswith(".m3u8"):
file_path = os.path.join(ts_folder_path, file)
os.remove(file_path)
print('已删除文件夹下的ts与m3u8文件!')
except Exception as e:
print(f"发生异常:{e}")
def get_non_ad_ts_start_str(m3u8_file_path):
with open(m3u8_file_path, mode="r", encoding="utf-8") as f:
for line in f:
line = line.strip()
if line.startswith("#"):
continue
ts_name = line.split('/')[-1]
ts_name_start_str = ts_name[0:3]
return ts_name_start_str
def remove_ad_ts_files(m3u8_file_path, non_ad_ts_start_str):
ts_folder_path = os.path.dirname(m3u8_file_path)
# 删除广告对应的ts文件
for file in os.listdir(ts_folder_path):
if file.endswith(".ts") and not file.startswith(non_ad_ts_start_str):
ad_ts_file_path = os.path.join(ts_folder_path, file)
print(ad_ts_file_path)
os.remove(ad_ts_file_path)
print('已删除对应为广告的ts文件!')
async def main(m3u8_url, files_save_path, video_name):
start_time = time.time()
# 1. 下载 m3u8 文件
m3u8_file_path = await download_m3u8("video", m3u8_url, files_save_path)
if not m3u8_file_path:
return
# 2. 读取 m3u8 文件并下载所有 ts 文件
download_ts_start_time = time.time()
ts_url_prefix = '/'.join(m3u8_url.split('/')[:-1]) + '/'
await download_all_ts(m3u8_file_path, ts_url_prefix)
download_ts_end_time = time.time()
elapsed_time_ts = download_ts_end_time - download_ts_start_time
minute_ts = int(elapsed_time_ts / 60)
second_ts = round(elapsed_time_ts - minute_ts * 60, 2)
print(f"下载ts耗时⌛:{minute_ts} m {second_ts} s")
# 去除广告(可选),视具体情况而定,去除原理为有用的ts文件名开头字符都一样只有末尾存在差别
non_ad_ts_start_str = get_non_ad_ts_start_str(m3u8_file_path)
print(f'非广告的ts文件名开头字符:{non_ad_ts_start_str}')
remove_ad_ts_files(m3u8_file_path, non_ad_ts_start_str)
# 3. 合并 ts 文件到 mp4
merge_ts_to_mp4(m3u8_file_path, os.path.dirname(m3u8_file_path), video_name, delete_ts_flag=True)
end_time = time.time()
elapsed_time = end_time - start_time
minute = int(elapsed_time / 60)
second = round(elapsed_time - minute * 60, 2)
print(f"全部过程耗时⏱:{minute} m {second} s")
if __name__ == "__main__":
# 通过抓包工具获取的m3u8_url地址
m3u8_url = 'https://vip.ffzy-play6/20240817/28179_cddca21d/2000k/hls/mixed.m3u8'
# 下载的文件保存路径
files_save_path = r'D:\User_Data\Videos\Movies_Download\ceshi'
# 下载的视频名称,作为下载的.m3u8文件名、最终合并的.mp4文件名,不用加任何后缀
video_name = 'ceshi'
# 执行下载视频程序
asyncio.run(main(m3u8_url, files_save_path, video_name))