这次要从一个贴图论坛下载赛马娘tag的所有色图,预计要下载6700张图片。
查看网页结构
从网站在查询时的url可以看出网站是PHP搭建的。经过测试发现,页面的每张图片的缩略图链接,稍微修改一下就是完整的图片链接。
查看了分页逻辑的,看到JS代码是手动计算分页。每页显示42张图片,将总数除以42得到页数。可以解析每一页分页的图片链接,点击进去帖子里面。然后获取到图片地址。
使用公开的API
要处理近6700张图片,每页有42张图,就要处理160页面,解析帖子链接。然后从6700个帖子链接中获取图片地址。
这恐怕是个大工程,不行。不过网站除了普通搜索检索功能,还提供了api,我们可以直接批量获取数据。
使用接口测试工具,可以看到服务器返回了xml格式的数据。包含帖子图片的所有信息。
下载脚本
已经实现的功能:
- 使用 BeautifulSoup 解析XML,批量抓取图片数据
- 使用 TheadPool 多线程并发下载图片
- 利用服务器返回的文件大小,验证文件完整性
- 以服务器返回的图片信息重命名图片(创建时间、md5等)
- 图片下载失败重试机制,在网络超时,文件不完整等,指数时间退避再稍后重试
- 显示下载进度,包含总体进度,成功个数,失败个数
- 获取请求时设置随机延迟,模拟正常的请求,防止被服务器屏蔽
- 动态调整并发进程数,在多次失败后降低线程数,成功率高时增加线程
- 设置多轮自动重试下载,任务完成后,重新下载失败的任务。
- 每轮重试下载任务后,将失败任务以json格式写入文本日志
- 从日志读取失败任务,恢复之前的任务,重新下载
可以添加优化的功能
- 异步下载,一个线程可以同时进行多个下载请求,在线程堵塞时仍能下载其他图片,速度更快
- 网络请求和磁盘操作分开,防止某一方出现问题,就阻塞进程。
- 意外中断,在下载时写入未完成任务到日志。异常退出时,下次运行时可恢复之前的任务。
import os
import random
import requests
import time
import json
from tqdm import tqdm
from datetime import datetime
from bs4 import BeautifulSoup
from multiprocessing.pool import ThreadPool
# 配置参数
limit = 1000 # 每页返回的最大条目数
tags = 'umamusume' # 查询的标签
# 动态线程配置
min_workers = 10
max_workers = 50
# 设置请求头
headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'}
# 设置保存路径
save_folder = r"C:/Users/haowa/Desktop/umamusume-rule34"
if not os.path.exists(save_folder):
os.makedirs(save_folder)
# 保存未完成任务
incomplete_tasks_file = os.path.join(save_folder, 'incomplete_tasks.log')
# 下载函数,带重试机制和完整性验证
def download_image_with_retry(image_data, retries=3, delay_range=(0.1, 0.3)):
link = image_data['file_url']
md5 = image_data['md5']
created_at = image_data['created_at']
if not link:
return None
# 解析原始的时间字符串
created_at_obj = datetime.strptime(created_at, '%a %b %d %H:%M:%S %z %Y')
# 格式化为 "yyyy-mm-dd hr-mi-se" 的格式
formatted_created_at = created_at_obj.strftime('%Y-%m-%d %H-%M-%S')
temp_file_path = None # 临时文件路径
for attempt in range(retries):
try:
time.sleep(random.uniform(*delay_range)) # 随机延迟
response = requests.get(link, stream=True, timeout=10)
response.raise_for_status() # 检查请求是否成功
# 获取文件名并设置临时路径(用文件链接中的文件名)
file_name = os.path.basename(link)
temp_file_path = os.path.join(save_folder, file_name)
# 获取文件大小
content_length = int(response.headers.get("Content-Length", 0))
# 下载并保存为临时文件,以二进制保存,会覆盖文件
with open(temp_file_path, 'wb') as img_file:
for chunk in response.iter_content(chunk_size=8192):
img_file.write(chunk)
# 验证文件大小和完整性
if os.path.getsize(temp_file_path) != content_length:
os.remove(temp_file_path)
raise Exception("文件不完整")
else:
# 重命名临时文件到最终路径
final_file_path = os.path.join(save_folder, f"{formatted_created_at} {file_name}")
# 如果程序意外中断,已经下载了,删除防止报错
if os.path.exists(final_file_path):
os.remove(final_file_path)
os.rename(temp_file_path, final_file_path)
return None # 下载成功,不返回失败链接
except Exception as e:
if attempt < retries - 1:
time.sleep(2 ** attempt) # 指数退避重试
else:
print(f"下载失败:{link},错误原因:{e}") # 打印错误信息
return image_data # 返回失败的链接
# 动态调整线程数的函数
def adjust_thread_count(success_count, failed_count, current_workers):
total_count = success_count + failed_count + 1e-6 # 防止分母为零,数字很小不会影响结果
success_rate = success_count / total_count
if success_rate < 0.5: # 成功率低于50% new_workers = max(current_workers - 5, min_workers) # 降低线程数 elif success_rate > 0.8: # 成功率高于80%
new_workers = min(current_workers + 5, max_workers) # 增加线程数
else: # 成功率在50%到80%
new_workers = current_workers # 不变
return new_workers
# 多线程下载函数(带动态线程调整),支持进度条显示和错误日志
def download_images_concurrently(image_datas):
failed_image_datas = [] # 下载失败数据列表
success_count = 0 # 成功计数
failed_count = 0 # 失败计数
current_workers = min_workers # 下载线程数
# 创建一个全局进度条
progress_bar = tqdm(total=len(image_datas), desc="下载进度", position=0)
while image_datas:
# 按批次取出数据
batch_size = min(100, len(image_datas)) # 每次处理 100 张
batch = image_datas[:batch_size]
image_datas = image_datas[batch_size:]
success_count_percent = 0
failed_count_percent = 0
# 创建线程池并处理当前批次
with ThreadPool(current_workers) as pool:
# 执行多线程任务
results = pool.imap(download_image_with_retry, batch)
for result in results:
progress_bar.update(1) # 每完成一个任务,更新进度条
if result: # 如果返回失败链接
failed_image_datas.append(result) # 记录失败信息
failed_count += 1
failed_count_percent += 1
else:
success_count += 1
success_count_percent += 1
progress_bar.set_postfix({"当前线程数": current_workers, "成功": success_count, "失败": failed_count,})
# 动态调整线程数
current_workers = adjust_thread_count(success_count_percent, failed_count_percent, current_workers)
progress_bar.close()
# 将失败下载数据保存
if failed_image_datas:
record_incomplete_tasks(failed_image_datas) # 写入未完成日志
print(f"有未完成任务,已经写入日志:{incomplete_tasks_file}")
print(f"下载图片保存至:{save_folder}")
return failed_image_datas # 返回失败下载数据
# 自动重试下载失败的图片
def retry_failed_downloads(image_datas, max_retries=3):
if len(image_datas) == 0:
print("所有图片已下载成功!")
os.remove(incomplete_tasks_file)
return
retry_count = 0
while retry_count < max_retries:
print(f"一分钟后第{retry_count + 1}轮重试")
time.sleep(60) # 每轮前等待 1 分钟
print(f"\n开始第 {retry_count + 1} 次下载失败的图片...")
image_datas = download_images_concurrently(image_datas)
retry_count += 1
# 下载数据获取函数,从api中获取数据
def get_image_data():
pid = 0 # 初始化分页
has_more = True # 是否还有更多页
image_datas = [] # 下载数据列表
while has_more:
url = f'https://api.rule34.xxx/index.php?page=dapi&s=post&q=index&limit={limit}&pid={pid}&tags={tags}'
response = requests.get(url, headers=headers)
response.raise_for_status()
response.encoding = 'utf-8'
# 解析 XML 数据
soup = BeautifulSoup(response.text, 'html.parser')
posts = soup.find_all('post')
if not posts:
print(f"第 {pid} 页没有更多数据,抓取完成!")
has_more = False
break
for post in posts:
file_url = post.get('file_url') # 提取文件链接
md5 = post.get('md5') # 提取文件的md5
created_at = post.get('created_at') # 提取创建时间
# 保存链接及相关数据
if file_url and md5:
image_datas.append({'file_url': file_url, 'md5': md5, 'created_at': created_at})
print(f"第 {pid} 页抓取完成,共获取 {len(posts)} 条数据")
if len(posts) < limit:
print("已到达最后一页,抓取结束")
has_more = False
else:
pid += 1
return image_datas
# 日志记录函数,将没有下载的任务写入日志
def record_incomplete_tasks(image_datas):
with open(incomplete_tasks_file, 'w', encoding='utf-8') as log_file:
for task in image_datas:
log_file.write(json.dumps(task) + "\n")
# 任务读取函数,从日志中读取没有下载的任务
def restore_incomplete_tasks():
image_datas = []
if os.path.exists(incomplete_tasks_file):
with open(incomplete_tasks_file, 'r', encoding='utf-8') as log_file:
lines = log_file.readlines()
for line in lines:
image_datas.append(json.loads(line.strip()))
print(f"恢复了 {len(image_datas)} 条未完成任务。")
return image_datas
# 主程序入口
if __name__ == "__main__":
image_datas = restore_incomplete_tasks() # 恢复未完成任务
if not image_datas:
image_datas = get_image_data() # 如果没有未完成任务,开始抓取新的数据
print("开始下载图片...")
failed_image_datas = download_images_concurrently(image_datas)
retry_failed_downloads(failed_image_datas, max_retries=3)
文章评论
bravo!