清科谷体的博客

  • 文章
  • 关于
  • 联系
  • 隐私政策

  1. 首页
  2. 编程
  3. 正文

用 Python 爬虫批量下载色图(二)从网站API获取数据

2025年1月24日 148点热度 0人点赞 1条评论

这次要从一个贴图论坛下载赛马娘tag的所有色图,预计要下载6700张图片。

查看网页结构

从网站在查询时的url可以看出网站是PHP搭建的。经过测试发现,页面的每张图片的缩略图链接,稍微修改一下就是完整的图片链接。

查看了分页逻辑的,看到JS代码是手动计算分页。每页显示42张图片,将总数除以42得到页数。可以解析每一页分页的图片链接,点击进去帖子里面。然后获取到图片地址。

使用公开的API

要处理近6700张图片,每页有42张图,就要处理160页面,解析帖子链接。然后从6700个帖子链接中获取图片地址。

这恐怕是个大工程,不行。不过网站除了普通搜索检索功能,还提供了api,我们可以直接批量获取数据。

 

使用接口测试工具,可以看到服务器返回了xml格式的数据。包含帖子图片的所有信息。

下载脚本

已经实现的功能:

  • 使用 BeautifulSoup 解析XML,批量抓取图片数据
  • 使用 TheadPool 多线程并发下载图片
  • 利用服务器返回的文件大小,验证文件完整性
  • 以服务器返回的图片信息重命名图片(创建时间、md5等)
  • 图片下载失败重试机制,在网络超时,文件不完整等,指数时间退避再稍后重试
  • 显示下载进度,包含总体进度,成功个数,失败个数
  • 获取请求时设置随机延迟,模拟正常的请求,防止被服务器屏蔽
  • 动态调整并发进程数,在多次失败后降低线程数,成功率高时增加线程
  • 设置多轮自动重试下载,任务完成后,重新下载失败的任务。
  • 每轮重试下载任务后,将失败任务以json格式写入文本日志
  • 从日志读取失败任务,恢复之前的任务,重新下载

可以添加优化的功能

  • 异步下载,一个线程可以同时进行多个下载请求,在线程堵塞时仍能下载其他图片,速度更快
  • 网络请求和磁盘操作分开,防止某一方出现问题,就阻塞进程。
  • 意外中断,在下载时写入未完成任务到日志。异常退出时,下次运行时可恢复之前的任务。
import os
import random
import requests
import time
import json
from tqdm import tqdm
from datetime import datetime
from bs4 import BeautifulSoup
from multiprocessing.pool import ThreadPool


# 配置参数
limit = 1000  # 每页返回的最大条目数
tags = 'umamusume'  # 查询的标签
# 动态线程配置
min_workers = 10
max_workers = 50

# 设置请求头
headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'}
    
# 设置保存路径
save_folder = r"C:/Users/haowa/Desktop/umamusume-rule34"
if not os.path.exists(save_folder):
    os.makedirs(save_folder)

# 保存未完成任务
incomplete_tasks_file = os.path.join(save_folder, 'incomplete_tasks.log')


# 下载函数,带重试机制和完整性验证
def download_image_with_retry(image_data, retries=3, delay_range=(0.1, 0.3)):
    link = image_data['file_url']
    md5 = image_data['md5']
    created_at = image_data['created_at']
    
    if not link:
        return None

    # 解析原始的时间字符串
    created_at_obj = datetime.strptime(created_at, '%a %b %d %H:%M:%S %z %Y')
    # 格式化为 "yyyy-mm-dd hr-mi-se" 的格式
    formatted_created_at = created_at_obj.strftime('%Y-%m-%d %H-%M-%S')
    temp_file_path = None  # 临时文件路径
    for attempt in range(retries):
        try:
            time.sleep(random.uniform(*delay_range))  # 随机延迟
            response = requests.get(link, stream=True, timeout=10)
            response.raise_for_status() # 检查请求是否成功

            # 获取文件名并设置临时路径(用文件链接中的文件名)
            file_name = os.path.basename(link)
            temp_file_path = os.path.join(save_folder, file_name)

            # 获取文件大小
            content_length = int(response.headers.get("Content-Length", 0))

            # 下载并保存为临时文件,以二进制保存,会覆盖文件
            with open(temp_file_path, 'wb') as img_file:
                for chunk in response.iter_content(chunk_size=8192):
                    img_file.write(chunk)

            # 验证文件大小和完整性
            if os.path.getsize(temp_file_path) != content_length:
                os.remove(temp_file_path)
                raise Exception("文件不完整")     
            else:
                # 重命名临时文件到最终路径
                final_file_path = os.path.join(save_folder, f"{formatted_created_at} {file_name}")
                # 如果程序意外中断,已经下载了,删除防止报错
                if os.path.exists(final_file_path):
                    os.remove(final_file_path)
                os.rename(temp_file_path, final_file_path)
                return None  # 下载成功,不返回失败链接

        except Exception as e:
            if attempt < retries - 1:
                time.sleep(2 ** attempt)  # 指数退避重试
            else:
                print(f"下载失败:{link},错误原因:{e}") # 打印错误信息
                return image_data  # 返回失败的链接


# 动态调整线程数的函数
def adjust_thread_count(success_count, failed_count, current_workers):
    total_count = success_count + failed_count + 1e-6 # 防止分母为零,数字很小不会影响结果
    success_rate = success_count / total_count

    if success_rate < 0.5: # 成功率低于50% new_workers = max(current_workers - 5, min_workers) # 降低线程数 elif success_rate > 0.8:  # 成功率高于80%
        new_workers = min(current_workers + 5, max_workers)  # 增加线程数
    else: # 成功率在50%到80%
        new_workers = current_workers  # 不变
    return new_workers


# 多线程下载函数(带动态线程调整),支持进度条显示和错误日志
def download_images_concurrently(image_datas):
    failed_image_datas = [] # 下载失败数据列表
    success_count = 0  # 成功计数
    failed_count = 0   # 失败计数
    current_workers = min_workers # 下载线程数

    # 创建一个全局进度条
    progress_bar = tqdm(total=len(image_datas), desc="下载进度", position=0) 

    while image_datas:
        # 按批次取出数据
        batch_size = min(100, len(image_datas))  # 每次处理 100 张
        batch = image_datas[:batch_size]
        image_datas = image_datas[batch_size:]

        success_count_percent = 0
        failed_count_percent = 0
        # 创建线程池并处理当前批次
        with ThreadPool(current_workers) as pool:
            # 执行多线程任务
            results = pool.imap(download_image_with_retry, batch)

            for result in results:
                progress_bar.update(1)  # 每完成一个任务,更新进度条
                if result:  # 如果返回失败链接
                    failed_image_datas.append(result)  # 记录失败信息
                    failed_count += 1
                    failed_count_percent += 1          
                else:
                    success_count += 1
                    success_count_percent += 1

                progress_bar.set_postfix({"当前线程数": current_workers, "成功": success_count, "失败": failed_count,})

        # 动态调整线程数
        current_workers = adjust_thread_count(success_count_percent, failed_count_percent, current_workers)

    progress_bar.close()

    # 将失败下载数据保存
    if failed_image_datas:
        record_incomplete_tasks(failed_image_datas) # 写入未完成日志
        print(f"有未完成任务,已经写入日志:{incomplete_tasks_file}")

    print(f"下载图片保存至:{save_folder}")
    return failed_image_datas  # 返回失败下载数据


# 自动重试下载失败的图片
def retry_failed_downloads(image_datas, max_retries=3):
    if len(image_datas) == 0:
        print("所有图片已下载成功!")
        os.remove(incomplete_tasks_file)
        return
    
    retry_count = 0
    while retry_count < max_retries:
        print(f"一分钟后第{retry_count + 1}轮重试")
        time.sleep(60)  # 每轮前等待 1 分钟
        print(f"\n开始第 {retry_count + 1} 次下载失败的图片...")
        
        image_datas = download_images_concurrently(image_datas)
        retry_count += 1


# 下载数据获取函数,从api中获取数据
def get_image_data():
    pid = 0 # 初始化分页
    has_more = True  # 是否还有更多页
    image_datas = [] # 下载数据列表

    while has_more:
        url = f'https://api.rule34.xxx/index.php?page=dapi&s=post&q=index&limit={limit}&pid={pid}&tags={tags}'
        response = requests.get(url, headers=headers)
        response.raise_for_status()
        response.encoding = 'utf-8'

        # 解析 XML 数据
        soup = BeautifulSoup(response.text, 'html.parser')
        posts = soup.find_all('post')

        if not posts:
            print(f"第 {pid} 页没有更多数据,抓取完成!")
            has_more = False
            break

        for post in posts:
            file_url = post.get('file_url')  # 提取文件链接
            md5 = post.get('md5')  # 提取文件的md5
            created_at = post.get('created_at')  # 提取创建时间
            
            # 保存链接及相关数据
            if file_url and md5:
                image_datas.append({'file_url': file_url, 'md5': md5, 'created_at': created_at})

        print(f"第 {pid} 页抓取完成,共获取 {len(posts)} 条数据")

        if len(posts) < limit:
            print("已到达最后一页,抓取结束")
            has_more = False
        else:
            pid += 1

    return image_datas


# 日志记录函数,将没有下载的任务写入日志
def record_incomplete_tasks(image_datas):
    with open(incomplete_tasks_file, 'w', encoding='utf-8') as log_file:
        for task in image_datas:
            log_file.write(json.dumps(task) + "\n")


# 任务读取函数,从日志中读取没有下载的任务
def restore_incomplete_tasks():
    image_datas = []
    if os.path.exists(incomplete_tasks_file):
        with open(incomplete_tasks_file, 'r', encoding='utf-8') as log_file:
            lines = log_file.readlines()
            for line in lines:
                image_datas.append(json.loads(line.strip()))
        print(f"恢复了 {len(image_datas)} 条未完成任务。")
    return image_datas 


# 主程序入口
if __name__ == "__main__":
    image_datas = restore_incomplete_tasks()  # 恢复未完成任务

    if not image_datas:
        image_datas = get_image_data()  # 如果没有未完成任务,开始抓取新的数据
    print("开始下载图片...")

    failed_image_datas = download_images_concurrently(image_datas)
    retry_failed_downloads(failed_image_datas, max_retries=3)
本作品采用 知识共享署名 4.0 国际许可协议 进行许可
标签: Python 多线程 并发 爬虫 脚本 自动化
最后更新:2025年4月28日

ingker

自娱自乐

点赞
< 上一篇
下一篇 >

文章评论

  • 1793

    bravo!

    2025年5月17日
    回复
  • 取消回复

    COPYRIGHT © 2025 清科谷体's blog. ALL RIGHTS RESERVED.
    THEME KRATOS MADE BY VTROIS | MODIFIED BY INGKER

    正在加载今日诗词....

    本站已运行