本篇指南将教你如何使用Python和Selenium库来构建一个自动化图像引擎,该引擎能够根据指定参数自动截取网页快照,并将生成的图片存储到云端。此工具还可以通过消息队列接收任务指令,非常适合需要批量处理网页截图的应用场景。
确保你已经安装了Python和必要的库:
1 |
pip install selenium oss2 kafka-python-ng |
创建一个简单的config.ini文件来存储你的OSS和Kafka设置:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
[oss] access_key_id = YOUR_OSS_ACCESS_KEY_ID access_key_secret = YOUR_OSS_ACCESS_KEY_SECRET bucket_name = YOUR_BUCKET_NAME endpoint = http://oss-cn-hangzhou.aliyuncs.com
[kafka] bootstrap_servers = localhost:9092 topic = your_topic_name notify_topic = your_notify_topic consumer_group = your_consumer_group
[engine] driver_path = path/to/chromedriver image_path = path/to/screenshots param_path = path/to/params site_base_path = https://example.com |
为程序添加基本的日志记录功能,以便于调试:
1 2 3 4 5 6 7 8 9 10 11 12 13 |
import logging from logging.handlers import TimedRotatingFileHandler import os
logger = logging.getLogger('image_engine') logger.setLevel(logging.DEBUG)
log_file = 'logs/image_engine.log' os.makedirs('logs', exist_ok=True) handler = TimedRotatingFileHandler(log_file, when='midnight', backupCount=7, encoding='utf-8') formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s') handler.setFormatter(formatter) logger.addHandler(handler) |
初始化Chrome WebDriver,并设置窗口最大化:
1 2 3 4 5 6 7 8 9 10 11 |
from selenium import webdriver from selenium.webdriver.chrome.service import Service
# 读取配置文件 import configparser config = configparser.ConfigParser() config.read('config.ini')
service = Service(config.get('engine', 'driver_path')) driver = webdriver.Chrome(service=service) driver.maximize_window() |
编写一个函数来处理每个Kafka消息,打开指定网页,等待页面加载完成,然后保存截图:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 |
from kafka import KafkaConsumer, KafkaProducer import json import time from datetime import datetime import oss2
def process_task(msg): task_params = json.loads(msg.value) item_id = task_params['itemId'] param_value = task_params['paramValue']
logger.info(f"开始处理项【{item_id}】对应参数【{param_value}】")
# 构建请求链接 url = f"{config.get('engine', 'site_base_path')}/view?param={param_value}&id={item_id}" driver.get(url)
try: # 简单等待页面加载 time.sleep(3) # 根据需要调整或替换为WebDriverWait
# 生成截图文件名 today = datetime.now().strftime('%Y-%m-%d') screenshot_dir = os.path.join(config.get('engine', 'image_path'), 'images', today) os.makedirs(screenshot_dir, exist_ok=True) fname = os.path.join(screenshot_dir, f"{item_id}_{param_value}.png")
driver.save_screenshot(fname) logger.info(f"保存截图到 {fname}")
# 上传至OSS(省略具体实现,根据实际情况添加) upload_to_oss(fname)
# 发送完成通知 notify_completion(item_id, param_value, fname)
logger.info(f"完成处理项【{item_id}】对应参数【{param_value}】") except Exception as e: logger.error(f"处理项【{item_id}】对应参数【{param_value}】时发生异常: {e}")
def upload_to_oss(file_path): """上传文件到阿里云OSS""" auth = oss2.Auth(config.get('oss', 'access_key_id'), config.get('oss', 'access_key_secret')) bucket = oss2.Bucket(auth, config.get('oss', 'endpoint'), config.get('oss', 'bucket_name')) remote_path = os.path.relpath(file_path, config.get('engine', 'image_path')) bucket.put_object_from_file(remote_path, file_path)
def notify_completion(item_id, param_value, image_path): """发送完成通知""" producer.send(config.get('kafka', 'notify_topic'), { 'itemId': item_id, 'paramValue': param_value, 'imagePath': image_path }) |
启动Kafka消费者,监听消息并调用处理函数:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
if __name__ == "__main__": consumer = KafkaConsumer( config.get('kafka', 'topic'), bootstrap_servers=config.get('kafka', 'bootstrap_servers').split(','), group_id=config.get('kafka', 'consumer_group'), auto_offset_reset='latest', enable_auto_commit=True, value_deserializer=lambda m: m.decode('utf-8') )
for msg in consumer: try: process_task(msg) except Exception as ex: logger.error(f"消费消息发生异常: {ex}") |
通过上述简化步骤,你可以快速搭建一个基于Python和Selenium的图像引擎。该引擎能够从Kafka接收任务指令,访问指定网站,截取页面快照,并将截图上传到阿里云OSS。此版本去除了不必要的复杂性,专注于核心功能的实现。