浏览器自动化 Agent 实战
原创
灵阙教研团队
S 精选 进阶 |
约 10 分钟阅读
更新于 2026-02-28 AI 导读
浏览器自动化 Agent 实战 Playwright/Puppeteer Agent 架构、DOM 理解与可访问性树、动作规划、视觉定位与错误恢复 引言 浏览器自动化 Agent 是 AI Agent 领域最具挑战性的方向之一。与 API 调用不同,浏览器操作需要理解视觉布局、动态 DOM、异步加载、弹窗拦截等复杂交互。这本质上是一个"在不确定环境中执行多步操作"的问题。 本文以...
浏览器自动化 Agent 实战
Playwright/Puppeteer Agent 架构、DOM 理解与可访问性树、动作规划、视觉定位与错误恢复
引言
浏览器自动化 Agent 是 AI Agent 领域最具挑战性的方向之一。与 API 调用不同,浏览器操作需要理解视觉布局、动态 DOM、异步加载、弹窗拦截等复杂交互。这本质上是一个"在不确定环境中执行多步操作"的问题。
本文以 Playwright 为基础框架,构建一个能够理解网页、规划操作、执行动作并从错误中恢复的浏览器自动化 Agent。
架构设计
Agent 循环
┌──────────────────────────────────────────────────────────┐
│ Browser Agent Loop │
│ │
│ ┌─────────┐ ┌──────────┐ ┌──────────┐ │
│ │ Observe │────▶│ Think │────▶│ Act │ │
│ │ │ │ │ │ │ │
│ │ 获取页面 │ │ LLM 决策 │ │ 执行操作 │ │
│ │ 状态快照 │ │ 下一步 │ │ 等待结果 │ │
│ └────┬────┘ └──────────┘ └────┬─────┘ │
│ │ │ │
│ └────────────────────────────────┘ │
│ (循环直到任务完成) │
│ │
│ ┌──────────┐ │
│ │ Recover │ ← 检测到错误/异常时触发 │
│ │ 错误恢复 │ │
│ └──────────┘ │
└──────────────────────────────────────────────────────────┘
核心组件
| 组件 | 职责 | 技术选择 |
|---|---|---|
| Observer | 获取页面状态 | Accessibility Tree + Screenshot |
| Planner | 决策下一步操作 | LLM (GPT-4o / Claude Sonnet) |
| Executor | 执行浏览器操作 | Playwright API |
| Recoverer | 错误检测与恢复 | 规则 + LLM 判断 |
| Memory | 操作历史与状态 | 内存 + 文件 |
DOM 理解
可访问性树 vs 原始 HTML
原始 HTML 的问题:
- 太大(动辄数千行)
- 包含大量无关信息(样式、脚本、隐藏元素)
- 消耗大量 token
可访问性树 (Accessibility Tree) 的优势:
- 精简(只包含交互元素和语义结构)
- 结构化(树状层级关系清晰)
- 语义丰富(包含角色、名称、状态)
页面快照提取
# src/browser/observer.py
from playwright.async_api import Page
class PageObserver:
"""Extract structured page state for LLM understanding."""
def __init__(self, page: Page):
self.page = page
async def get_snapshot(self) -> dict:
"""Get a comprehensive page snapshot."""
return {
"url": self.page.url,
"title": await self.page.title(),
"accessibility_tree": await self._get_accessibility_tree(),
"interactive_elements": await self._get_interactive_elements(),
"page_text": await self._get_visible_text(),
}
async def _get_accessibility_tree(self) -> str:
"""Get the accessibility tree as a compact text representation."""
snapshot = await self.page.accessibility.snapshot()
if not snapshot:
return "Empty page"
return self._format_tree(snapshot, depth=0)
def _format_tree(self, node: dict, depth: int) -> str:
"""Format accessibility tree node recursively."""
indent = " " * depth
role = node.get("role", "")
name = node.get("name", "")
value = node.get("value", "")
# Skip decorative/structural elements
skip_roles = {"none", "presentation", "generic"}
if role in skip_roles and not name:
parts = []
for child in node.get("children", []):
parts.append(self._format_tree(child, depth))
return "\n".join(parts)
line = f"{indent}[{role}]"
if name:
line += f' "{name}"'
if value:
line += f" value={value}"
# Add state information
states = []
if node.get("disabled"):
states.append("disabled")
if node.get("checked") is not None:
states.append(f"checked={node['checked']}")
if node.get("expanded") is not None:
states.append(f"expanded={node['expanded']}")
if states:
line += f" ({', '.join(states)})"
parts = [line]
for child in node.get("children", []):
child_text = self._format_tree(child, depth + 1)
if child_text:
parts.append(child_text)
return "\n".join(parts)
async def _get_interactive_elements(self) -> list[dict]:
"""Get all interactive elements with their properties."""
elements = await self.page.evaluate("""
() => {
const interactive = document.querySelectorAll(
'a, button, input, select, textarea, [role="button"], [role="link"], [tabindex]'
);
return Array.from(interactive)
.filter(el => {
const style = window.getComputedStyle(el);
return style.display !== 'none'
&& style.visibility !== 'hidden'
&& el.offsetParent !== null;
})
.map((el, i) => ({
index: i,
tag: el.tagName.toLowerCase(),
role: el.getAttribute('role') || el.tagName.toLowerCase(),
text: (el.textContent || '').trim().slice(0, 100),
placeholder: el.getAttribute('placeholder') || '',
type: el.getAttribute('type') || '',
href: el.getAttribute('href') || '',
ariaLabel: el.getAttribute('aria-label') || '',
id: el.id || '',
name: el.getAttribute('name') || '',
value: el.value || '',
disabled: el.disabled || false,
}));
}
""")
return elements
动作规划与执行
LLM 动作规划器
# src/browser/planner.py
from typing import Literal
PLANNER_SYSTEM_PROMPT = """You are a browser automation agent. Given the current page state and task,
decide the next action to take.
Available actions:
- click(selector): Click an element
- fill(selector, text): Type text into an input field
- select(selector, value): Select an option from a dropdown
- navigate(url): Go to a URL
- scroll(direction): Scroll up or down
- wait(seconds): Wait for page to load
- screenshot(): Take a screenshot for visual verification
- done(result): Task is complete, return result
- fail(reason): Task cannot be completed
Rules:
1. Use CSS selectors or text content to identify elements
2. Always wait for page loads after navigation or clicks
3. If an element is not visible, try scrolling first
4. If stuck, try an alternative approach
5. Never enter sensitive data (passwords, credit cards)
Respond with a JSON action object."""
class BrowserPlanner:
def __init__(self, llm_client):
self.llm = llm_client
self.action_history: list[dict] = []
async def plan_next_action(
self,
task: str,
page_snapshot: dict,
error: str = None,
) -> dict:
"""Plan the next browser action."""
# Build context
context = f"""## Task
{task}
## Current Page
URL: {page_snapshot['url']}
Title: {page_snapshot['title']}
## Page Structure
{page_snapshot['accessibility_tree'][:3000]}
## Interactive Elements
{self._format_elements(page_snapshot['interactive_elements'][:30])}
## Action History
{self._format_history()}
"""
if error:
context += f"\n## Previous Error\n{error}\n"
messages = [
{"role": "system", "content": PLANNER_SYSTEM_PROMPT},
{"role": "user", "content": context},
]
response = await self.llm.generate(messages, model="gpt-4o")
action = parse_json(response)
self.action_history.append(action)
return action
def _format_elements(self, elements: list[dict]) -> str:
lines = []
for el in elements:
desc = f"[{el['index']}] <{el['tag']}"
if el['type']:
desc += f" type={el['type']}"
if el['text']:
desc += f'> "{el["text"][:50]}"'
elif el['placeholder']:
desc += f'> placeholder="{el["placeholder"]}"'
elif el['ariaLabel']:
desc += f'> aria-label="{el["ariaLabel"]}"'
else:
desc += ">"
if el['disabled']:
desc += " [disabled]"
lines.append(desc)
return "\n".join(lines)
def _format_history(self) -> str:
if not self.action_history:
return "(no actions taken yet)"
return "\n".join([
f"{i+1}. {json.dumps(a)}"
for i, a in enumerate(self.action_history[-5:])
])
动作执行器
# src/browser/executor.py
from playwright.async_api import Page, TimeoutError as PlaywrightTimeout
class BrowserExecutor:
"""Execute browser actions with error handling."""
def __init__(self, page: Page):
self.page = page
async def execute(self, action: dict) -> dict:
"""Execute a planned action and return result."""
action_type = action.get("action")
try:
if action_type == "click":
return await self._click(action["selector"])
elif action_type == "fill":
return await self._fill(action["selector"], action["text"])
elif action_type == "select":
return await self._select(action["selector"], action["value"])
elif action_type == "navigate":
return await self._navigate(action["url"])
elif action_type == "scroll":
return await self._scroll(action.get("direction", "down"))
elif action_type == "wait":
return await self._wait(action.get("seconds", 2))
elif action_type == "done":
return {"status": "completed", "result": action.get("result")}
elif action_type == "fail":
return {"status": "failed", "reason": action.get("reason")}
else:
return {"status": "error", "error": f"Unknown action: {action_type}"}
except PlaywrightTimeout:
return {"status": "error", "error": "Action timed out"}
except Exception as e:
return {"status": "error", "error": str(e)}
async def _click(self, selector: str) -> dict:
# Try multiple strategies to find the element
element = None
# Strategy 1: CSS selector
try:
element = self.page.locator(selector).first
if await element.is_visible():
await element.click(timeout=5000)
return {"status": "success", "action": f"clicked '{selector}'"}
except Exception:
pass
# Strategy 2: Text content
try:
element = self.page.get_by_text(selector, exact=False).first
if await element.is_visible():
await element.click(timeout=5000)
return {"status": "success", "action": f"clicked text '{selector}'"}
except Exception:
pass
# Strategy 3: Role + name
try:
element = self.page.get_by_role("button", name=selector).first
await element.click(timeout=5000)
return {"status": "success", "action": f"clicked button '{selector}'"}
except Exception:
pass
return {"status": "error", "error": f"Element not found: {selector}"}
async def _fill(self, selector: str, text: str) -> dict:
locator = self.page.locator(selector).first
await locator.click()
await locator.fill(text)
return {"status": "success", "action": f"filled '{selector}' with text"}
async def _select(self, selector: str, value: str) -> dict:
await self.page.select_option(selector, value)
return {"status": "success", "action": f"selected '{value}' in '{selector}'"}
async def _navigate(self, url: str) -> dict:
await self.page.goto(url, wait_until="domcontentloaded")
return {"status": "success", "action": f"navigated to {url}"}
async def _scroll(self, direction: str) -> dict:
delta = -500 if direction == "up" else 500
await self.page.mouse.wheel(0, delta)
await self.page.wait_for_timeout(500)
return {"status": "success", "action": f"scrolled {direction}"}
async def _wait(self, seconds: float) -> dict:
await self.page.wait_for_timeout(int(seconds * 1000))
return {"status": "success", "action": f"waited {seconds}s"}
错误恢复
自动恢复策略
# src/browser/recoverer.py
class ErrorRecoverer:
"""Detect and recover from common browser automation errors."""
def __init__(self, page: Page):
self.page = page
async def handle_error(self, error: str, action: dict) -> dict:
"""Attempt to recover from an error."""
# Strategy 1: Dismiss popups/dialogs
if "dialog" in error.lower() or "popup" in error.lower():
return await self._dismiss_popups()
# Strategy 2: Handle cookie consent
if await self._detect_cookie_banner():
return await self._dismiss_cookie_banner()
# Strategy 3: Wait for loading
if "not found" in error.lower() or "timeout" in error.lower():
await self.page.wait_for_load_state("networkidle", timeout=10000)
return {"recovered": True, "action": "waited for page load"}
# Strategy 4: Scroll element into view
if "not visible" in error.lower():
return await self._scroll_to_find(action.get("selector", ""))
return {"recovered": False, "error": error}
async def _dismiss_popups(self) -> dict:
"""Close common popup patterns."""
close_selectors = [
"[aria-label='Close']",
"[aria-label='Dismiss']",
"button:has-text('Close')",
"button:has-text('No thanks')",
".modal-close",
".popup-close",
]
for selector in close_selectors:
try:
el = self.page.locator(selector).first
if await el.is_visible(timeout=1000):
await el.click()
return {"recovered": True, "action": f"dismissed popup: {selector}"}
except Exception:
continue
return {"recovered": False, "error": "Could not dismiss popup"}
async def _detect_cookie_banner(self) -> bool:
keywords = ["cookie", "consent", "privacy", "accept all"]
text = await self.page.text_content("body") or ""
return any(kw in text.lower() for kw in keywords)
async def _dismiss_cookie_banner(self) -> dict:
reject_selectors = [
"button:has-text('Reject all')",
"button:has-text('Decline')",
"button:has-text('Only necessary')",
"[aria-label='Reject cookies']",
]
for selector in reject_selectors:
try:
el = self.page.locator(selector).first
if await el.is_visible(timeout=2000):
await el.click()
return {"recovered": True, "action": "rejected cookies"}
except Exception:
continue
# Fallback: accept if reject not available
try:
accept = self.page.locator("button:has-text('Accept')").first
if await accept.is_visible(timeout=2000):
await accept.click()
return {"recovered": True, "action": "accepted cookies (reject unavailable)"}
except Exception:
pass
return {"recovered": False, "error": "Could not handle cookie banner"}
async def _scroll_to_find(self, selector: str) -> dict:
for _ in range(5):
await self.page.mouse.wheel(0, 500)
await self.page.wait_for_timeout(500)
try:
el = self.page.locator(selector).first
if await el.is_visible(timeout=1000):
return {"recovered": True, "action": "scrolled to element"}
except Exception:
continue
return {"recovered": False, "error": "Element not found after scrolling"}
Agent 主循环
# src/browser/agent.py
from playwright.async_api import async_playwright
class BrowserAgent:
"""Complete browser automation agent."""
def __init__(self, llm_client, max_steps: int = 20):
self.llm = llm_client
self.max_steps = max_steps
async def run(self, task: str, start_url: str = None) -> dict:
async with async_playwright() as p:
browser = await p.chromium.launch(headless=True)
page = await browser.new_page()
if start_url:
await page.goto(start_url, wait_until="domcontentloaded")
observer = PageObserver(page)
planner = BrowserPlanner(self.llm)
executor = BrowserExecutor(page)
recoverer = ErrorRecoverer(page)
for step in range(self.max_steps):
# Observe
snapshot = await observer.get_snapshot()
# Plan
action = await planner.plan_next_action(task, snapshot)
# Check for completion
if action.get("action") in ("done", "fail"):
await browser.close()
return action
# Execute
result = await executor.execute(action)
# Recover if needed
if result.get("status") == "error":
recovery = await recoverer.handle_error(
result["error"], action
)
if not recovery.get("recovered"):
# Let planner know about the error
continue
# Wait for page state to settle
await page.wait_for_timeout(1000)
await browser.close()
return {"status": "failed", "reason": f"Max steps ({self.max_steps}) exceeded"}
总结
- 可访问性树优于原始 HTML:精简、语义丰富、token 效率高,是 LLM 理解网页的最佳表示。
- 多策略元素定位:CSS 选择器、文本内容、ARIA 属性依次尝试,提高定位成功率。
- 错误恢复是核心能力:弹窗、Cookie 横幅、加载延迟是最常见的干扰,必须有自动处理。
- 操作历史帮助规划:让 LLM 看到之前的操作和结果,避免重复失败的路径。
- 步数限制防止无限循环:设置最大步数是简单但有效的安全措施。
Maurice | maurice_wen@proton.me