在自动化测试或网页数据交互场景中,文件上传与下载是极为常见的操作。Playwright 作为强大的自动化测试工具,不仅能模拟用户触发上传和下载行为,更能精准判断操作是否完成。本文将从原理到实践,全面讲解如何利用 Playwright 实现文件上传、下载的完成判断,附带完整代码示例和最佳实践,帮助开发者高效解决文件交互场景中的自动化难题。?
文件上传的完成判断核心在于 “识别上传成功的标志”—— 可能是页面 DOM 元素变化、网络请求响应,或是成功提示文本。Playwright 提供了多种监听机制,可根据实际场景灵活选择。
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 |
import asyncio from playwright.async_api import async_playwright
async def file_upload_example(): async with async_playwright() as p: browser = await p.chromium.launch(headless=False) page = await browser.new_page()
# 监听文件选择事件 async with page.expect_file_chooser() as fc_info: await page.click('input[type="file"]') # 点击文件上传按钮
file_chooser = await fc_info.value
# 选择要上传的文件 await file_chooser.set_files("path/to/your/file.txt")
# 方法1: 等待上传成功的元素出现 await page.wait_for_selector('.upload-success', timeout=10000)
# 方法2: 等待网络请求完成 async with page.expect_response(lambda response: "upload" in response.url and response.status == 200) as response_info: # 这里可以执行触发上传的操作 await page.click('#submit-upload')
response = await response_info.value print(f"上传完成,状态码: {response.status}")
# 方法3: 等待特定文本出现 await page.wait_for_selector('text=上传成功')
await browser.close()
# 运行示例 asyncio.run(file_upload_example()) |
Playwright 的上传判断本质是 “监听页面状态变化”,三种核心方法对应不同场景:?
page.wait_for_selector(selector, **kwargs)?
page.expect_response(predicate, **kwargs)?
文件下载的判断比上传更复杂 —— 需先监听下载开始,再等待文件写入磁盘,最后验证文件有效性(如是否存在、是否为空)。Playwright 通过download事件和Download对象,实现全流程监控。?
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 |
import asyncio import os from playwright.async_api import async_playwright
async def file_download_example(): async with async_playwright() as p: browser = await p.chromium.launch(headless=False) page = await browser.new_page()
# 设置下载路径 download_path = "./downloads" if not os.path.exists(download_path): os.makedirs(download_path)
# 监听下载事件 async with page.expect_download() as download_info: # 触发下载操作 await page.click('#download-button')
download = await download_info.value
# 方法1: 等待下载完成并获取文件路径 file_path = await download.path() print(f"文件下载完成: {file_path}")
# 方法2: 保存文件到指定路径 save_path = os.path.join(download_path, download.suggested_filename) await download.save_as(save_path) print(f"文件已保存到: {save_path}")
# 方法3: 监听下载状态变化 def handle_download(download): print(f"下载开始: {download.url}") # 等待下载完成 download.path().then(lambda path: print(f"下载完成,文件路径: {path}"))
# 注册下载监听器 page.on('download', handle_download)
# 验证文件是否存在且大小合理 if os.path.exists(save_path): file_size = os.path.getsize(save_path) print(f"文件大小: {file_size} bytes") if file_size > 0: print("下载文件验证成功") else: print("警告: 下载的文件可能为空") else: print("错误: 文件下载失败")
await browser.close()
# 更完整的下载监控示例 async def advanced_download_monitor(): async with async_playwright() as p: browser = await p.chromium.launch(headless=False) page = await browser.new_page()
download_path = "./downloads" os.makedirs(download_path, exist_ok=True)
# 存储下载信息 downloads = []
def on_download(download): downloads.append(download) print(f"新的下载: {download.suggested_filename}")
page.on('download', on_download)
# 触发下载 await page.goto('https://example.com/download') await page.click('#download-link')
# 等待所有下载完成 for download in downloads: # 等待下载完成(最多等待30秒) try: await download.path() print(f"下载完成: {download.suggested_filename}")
# 保存文件 await download.save_as( os.path.join(download_path, download.suggested_filename) )
except Exception as e: print(f"下载失败: {e}")
await browser.close()
# 运行示例 asyncio.run(file_download_example()) asyncio.run(advanced_download_monitor()) |
Playwright 的下载判断分为三个核心环节:?
page.expect_download(** kwargs)?
download.path()?
download.save_as(path)?
为提高代码复用性,可封装FileHandler工具类,整合上传完成判断、下载完成判断、文件验证等功能,适用于复杂项目场景。
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 |
import asyncio import os from playwright.async_api import async_playwright
class FileHandler: def __init__(self, download_dir="./downloads"): self.download_dir = download_dir os.makedirs(download_dir, exist_ok=True)
async def wait_for_upload_completion(self, page, success_selector=None, timeout=30000): """等待文件上传完成""" completion_indicators = [ lambda: page.wait_for_selector(success_selector, timeout=timeout) if success_selector else None, lambda: page.wait_for_selector('.upload-complete', timeout=timeout), lambda: page.wait_for_selector('text=上传成功', timeout=timeout), lambda: page.wait_for_selector('text=Upload Complete', timeout=timeout) ]
for indicator in completion_indicators: if indicator: try: await indicator() return True except: continue return False
async def wait_for_download_completion(self, page, download_trigger=None, timeout=60000): """等待文件下载完成""" try: # 监听下载开始 async with page.expect_download(timeout=timeout) as download_info: if download_trigger: await download_trigger()
download = await download_info.value print(f"开始下载: {download.suggested_filename}")
# 等待下载完成 file_path = await download.path() print(f"下载完成: {file_path}")
# 保存到指定目录 save_path = os.path.join(self.download_dir, download.suggested_filename) await download.save_as(save_path)
# 验证文件 if self.validate_downloaded_file(save_path): print(f"文件验证成功: {save_path}") return save_path else: print(f"文件验证失败: {save_path}") return None
except Exception as e: print(f"下载过程出错: {e}") return None
def validate_downloaded_file(self, file_path): """验证下载的文件""" if not os.path.exists(file_path): return False
file_size = os.path.getsize(file_path) if file_size == 0: return False
# 可以根据需要添加更多验证逻辑 # 例如文件类型、内容校验等
return True
# 使用示例 async def main(): handler = FileHandler()
async with async_playwright() as p: browser = await p.chromium.launch(headless=False) page = await browser.new_page()
# 文件上传示例 await page.goto('https://example.com/upload') await page.set_input_files('input[type="file"]', 'test_file.txt') await page.click('#upload-button')
# 等待上传完成 upload_success = await handler.wait_for_upload_completion( page, success_selector='.upload-success-message' )
if upload_success: print("文件上传成功") else: print("文件上传失败或超时")
# 文件下载示例 await page.goto('https://example.com/download')
download_path = await handler.wait_for_download_completion( page, download_trigger=lambda: page.click('#download-btn') )
if download_path: print(f"文件下载并保存到: {download_path}")
await browser.close()
if __name__ == "__main__": asyncio.run(main()) |
| 操作类型? | 完成判断核心方法? | 关键注意事项? |
|---|---|---|
| 文件上传? | 1. 等待成功元素(wait_for_selector)2. 监听接口响应(expect_response)3. 匹配成功文本(text=xxx)? | 1. 选择器需精准(避免误判)2. 超时时间合理(10-30 秒)3. 优先验证网络请求(确保后端接收成功)? |
| 文件下载? | 1. 捕获Download对象(expect_download)2. 等待文件写入(download.path())3. 验证文件有效性(存在 + 非空)? | 1. 及时保存文件(避免临时文件被清理)2. 多文件下载需批量监听 |