浏览器自动化 Agent 实战

Playwright/Puppeteer Agent 架构、DOM 理解与可访问性树、动作规划、视觉定位与错误恢复

引言

浏览器自动化 Agent 是 AI Agent 领域最具挑战性的方向之一。与 API 调用不同,浏览器操作需要理解视觉布局、动态 DOM、异步加载、弹窗拦截等复杂交互。这本质上是一个"在不确定环境中执行多步操作"的问题。

本文以 Playwright 为基础框架,构建一个能够理解网页、规划操作、执行动作并从错误中恢复的浏览器自动化 Agent。

架构设计

Agent 循环

┌──────────────────────────────────────────────────────────┐
│                Browser Agent Loop                         │
│                                                          │
│  ┌─────────┐     ┌──────────┐     ┌──────────┐          │
│  │ Observe │────▶│  Think   │────▶│   Act    │          │
│  │         │     │          │     │          │          │
│  │ 获取页面 │     │ LLM 决策 │     │ 执行操作 │          │
│  │ 状态快照 │     │ 下一步   │     │ 等待结果 │          │
│  └────┬────┘     └──────────┘     └────┬─────┘          │
│       │                                │                │
│       └────────────────────────────────┘                │
│              (循环直到任务完成)                            │
│                                                          │
│  ┌──────────┐                                           │
│  │ Recover  │ ← 检测到错误/异常时触发                     │
│  │ 错误恢复 │                                            │
│  └──────────┘                                           │
└──────────────────────────────────────────────────────────┘

核心组件

组件 职责 技术选择
Observer 获取页面状态 Accessibility Tree + Screenshot
Planner 决策下一步操作 LLM (GPT-4o / Claude Sonnet)
Executor 执行浏览器操作 Playwright API
Recoverer 错误检测与恢复 规则 + LLM 判断
Memory 操作历史与状态 内存 + 文件

DOM 理解

可访问性树 vs 原始 HTML

原始 HTML 的问题:
  - 太大(动辄数千行)
  - 包含大量无关信息(样式、脚本、隐藏元素)
  - 消耗大量 token

可访问性树 (Accessibility Tree) 的优势:
  - 精简(只包含交互元素和语义结构)
  - 结构化(树状层级关系清晰)
  - 语义丰富(包含角色、名称、状态)

页面快照提取

# src/browser/observer.py
from playwright.async_api import Page

class PageObserver:
    """Extract structured page state for LLM understanding."""

    def __init__(self, page: Page):
        self.page = page

    async def get_snapshot(self) -> dict:
        """Get a comprehensive page snapshot."""
        return {
            "url": self.page.url,
            "title": await self.page.title(),
            "accessibility_tree": await self._get_accessibility_tree(),
            "interactive_elements": await self._get_interactive_elements(),
            "page_text": await self._get_visible_text(),
        }

    async def _get_accessibility_tree(self) -> str:
        """Get the accessibility tree as a compact text representation."""
        snapshot = await self.page.accessibility.snapshot()
        if not snapshot:
            return "Empty page"
        return self._format_tree(snapshot, depth=0)

    def _format_tree(self, node: dict, depth: int) -> str:
        """Format accessibility tree node recursively."""
        indent = "  " * depth
        role = node.get("role", "")
        name = node.get("name", "")
        value = node.get("value", "")

        # Skip decorative/structural elements
        skip_roles = {"none", "presentation", "generic"}
        if role in skip_roles and not name:
            parts = []
            for child in node.get("children", []):
                parts.append(self._format_tree(child, depth))
            return "\n".join(parts)

        line = f"{indent}[{role}]"
        if name:
            line += f' "{name}"'
        if value:
            line += f" value={value}"

        # Add state information
        states = []
        if node.get("disabled"):
            states.append("disabled")
        if node.get("checked") is not None:
            states.append(f"checked={node['checked']}")
        if node.get("expanded") is not None:
            states.append(f"expanded={node['expanded']}")
        if states:
            line += f" ({', '.join(states)})"

        parts = [line]
        for child in node.get("children", []):
            child_text = self._format_tree(child, depth + 1)
            if child_text:
                parts.append(child_text)

        return "\n".join(parts)

    async def _get_interactive_elements(self) -> list[dict]:
        """Get all interactive elements with their properties."""
        elements = await self.page.evaluate("""
            () => {
                const interactive = document.querySelectorAll(
                    'a, button, input, select, textarea, [role="button"], [role="link"], [tabindex]'
                );
                return Array.from(interactive)
                    .filter(el => {
                        const style = window.getComputedStyle(el);
                        return style.display !== 'none'
                            && style.visibility !== 'hidden'
                            && el.offsetParent !== null;
                    })
                    .map((el, i) => ({
                        index: i,
                        tag: el.tagName.toLowerCase(),
                        role: el.getAttribute('role') || el.tagName.toLowerCase(),
                        text: (el.textContent || '').trim().slice(0, 100),
                        placeholder: el.getAttribute('placeholder') || '',
                        type: el.getAttribute('type') || '',
                        href: el.getAttribute('href') || '',
                        ariaLabel: el.getAttribute('aria-label') || '',
                        id: el.id || '',
                        name: el.getAttribute('name') || '',
                        value: el.value || '',
                        disabled: el.disabled || false,
                    }));
            }
        """)
        return elements

动作规划与执行

LLM 动作规划器

# src/browser/planner.py
from typing import Literal

PLANNER_SYSTEM_PROMPT = """You are a browser automation agent. Given the current page state and task,
decide the next action to take.

Available actions:
- click(selector): Click an element
- fill(selector, text): Type text into an input field
- select(selector, value): Select an option from a dropdown
- navigate(url): Go to a URL
- scroll(direction): Scroll up or down
- wait(seconds): Wait for page to load
- screenshot(): Take a screenshot for visual verification
- done(result): Task is complete, return result
- fail(reason): Task cannot be completed

Rules:
1. Use CSS selectors or text content to identify elements
2. Always wait for page loads after navigation or clicks
3. If an element is not visible, try scrolling first
4. If stuck, try an alternative approach
5. Never enter sensitive data (passwords, credit cards)

Respond with a JSON action object."""

class BrowserPlanner:
    def __init__(self, llm_client):
        self.llm = llm_client
        self.action_history: list[dict] = []

    async def plan_next_action(
        self,
        task: str,
        page_snapshot: dict,
        error: str = None,
    ) -> dict:
        """Plan the next browser action."""

        # Build context
        context = f"""## Task
{task}

## Current Page
URL: {page_snapshot['url']}
Title: {page_snapshot['title']}

## Page Structure
{page_snapshot['accessibility_tree'][:3000]}

## Interactive Elements
{self._format_elements(page_snapshot['interactive_elements'][:30])}

## Action History
{self._format_history()}
"""

        if error:
            context += f"\n## Previous Error\n{error}\n"

        messages = [
            {"role": "system", "content": PLANNER_SYSTEM_PROMPT},
            {"role": "user", "content": context},
        ]

        response = await self.llm.generate(messages, model="gpt-4o")
        action = parse_json(response)
        self.action_history.append(action)
        return action

    def _format_elements(self, elements: list[dict]) -> str:
        lines = []
        for el in elements:
            desc = f"[{el['index']}] <{el['tag']}"
            if el['type']:
                desc += f" type={el['type']}"
            if el['text']:
                desc += f'> "{el["text"][:50]}"'
            elif el['placeholder']:
                desc += f'> placeholder="{el["placeholder"]}"'
            elif el['ariaLabel']:
                desc += f'> aria-label="{el["ariaLabel"]}"'
            else:
                desc += ">"
            if el['disabled']:
                desc += " [disabled]"
            lines.append(desc)
        return "\n".join(lines)

    def _format_history(self) -> str:
        if not self.action_history:
            return "(no actions taken yet)"
        return "\n".join([
            f"{i+1}. {json.dumps(a)}"
            for i, a in enumerate(self.action_history[-5:])
        ])

动作执行器

# src/browser/executor.py
from playwright.async_api import Page, TimeoutError as PlaywrightTimeout

class BrowserExecutor:
    """Execute browser actions with error handling."""

    def __init__(self, page: Page):
        self.page = page

    async def execute(self, action: dict) -> dict:
        """Execute a planned action and return result."""
        action_type = action.get("action")

        try:
            if action_type == "click":
                return await self._click(action["selector"])
            elif action_type == "fill":
                return await self._fill(action["selector"], action["text"])
            elif action_type == "select":
                return await self._select(action["selector"], action["value"])
            elif action_type == "navigate":
                return await self._navigate(action["url"])
            elif action_type == "scroll":
                return await self._scroll(action.get("direction", "down"))
            elif action_type == "wait":
                return await self._wait(action.get("seconds", 2))
            elif action_type == "done":
                return {"status": "completed", "result": action.get("result")}
            elif action_type == "fail":
                return {"status": "failed", "reason": action.get("reason")}
            else:
                return {"status": "error", "error": f"Unknown action: {action_type}"}
        except PlaywrightTimeout:
            return {"status": "error", "error": "Action timed out"}
        except Exception as e:
            return {"status": "error", "error": str(e)}

    async def _click(self, selector: str) -> dict:
        # Try multiple strategies to find the element
        element = None

        # Strategy 1: CSS selector
        try:
            element = self.page.locator(selector).first
            if await element.is_visible():
                await element.click(timeout=5000)
                return {"status": "success", "action": f"clicked '{selector}'"}
        except Exception:
            pass

        # Strategy 2: Text content
        try:
            element = self.page.get_by_text(selector, exact=False).first
            if await element.is_visible():
                await element.click(timeout=5000)
                return {"status": "success", "action": f"clicked text '{selector}'"}
        except Exception:
            pass

        # Strategy 3: Role + name
        try:
            element = self.page.get_by_role("button", name=selector).first
            await element.click(timeout=5000)
            return {"status": "success", "action": f"clicked button '{selector}'"}
        except Exception:
            pass

        return {"status": "error", "error": f"Element not found: {selector}"}

    async def _fill(self, selector: str, text: str) -> dict:
        locator = self.page.locator(selector).first
        await locator.click()
        await locator.fill(text)
        return {"status": "success", "action": f"filled '{selector}' with text"}

    async def _select(self, selector: str, value: str) -> dict:
        await self.page.select_option(selector, value)
        return {"status": "success", "action": f"selected '{value}' in '{selector}'"}

    async def _navigate(self, url: str) -> dict:
        await self.page.goto(url, wait_until="domcontentloaded")
        return {"status": "success", "action": f"navigated to {url}"}

    async def _scroll(self, direction: str) -> dict:
        delta = -500 if direction == "up" else 500
        await self.page.mouse.wheel(0, delta)
        await self.page.wait_for_timeout(500)
        return {"status": "success", "action": f"scrolled {direction}"}

    async def _wait(self, seconds: float) -> dict:
        await self.page.wait_for_timeout(int(seconds * 1000))
        return {"status": "success", "action": f"waited {seconds}s"}

错误恢复

自动恢复策略

# src/browser/recoverer.py

class ErrorRecoverer:
    """Detect and recover from common browser automation errors."""

    def __init__(self, page: Page):
        self.page = page

    async def handle_error(self, error: str, action: dict) -> dict:
        """Attempt to recover from an error."""

        # Strategy 1: Dismiss popups/dialogs
        if "dialog" in error.lower() or "popup" in error.lower():
            return await self._dismiss_popups()

        # Strategy 2: Handle cookie consent
        if await self._detect_cookie_banner():
            return await self._dismiss_cookie_banner()

        # Strategy 3: Wait for loading
        if "not found" in error.lower() or "timeout" in error.lower():
            await self.page.wait_for_load_state("networkidle", timeout=10000)
            return {"recovered": True, "action": "waited for page load"}

        # Strategy 4: Scroll element into view
        if "not visible" in error.lower():
            return await self._scroll_to_find(action.get("selector", ""))

        return {"recovered": False, "error": error}

    async def _dismiss_popups(self) -> dict:
        """Close common popup patterns."""
        close_selectors = [
            "[aria-label='Close']",
            "[aria-label='Dismiss']",
            "button:has-text('Close')",
            "button:has-text('No thanks')",
            ".modal-close",
            ".popup-close",
        ]

        for selector in close_selectors:
            try:
                el = self.page.locator(selector).first
                if await el.is_visible(timeout=1000):
                    await el.click()
                    return {"recovered": True, "action": f"dismissed popup: {selector}"}
            except Exception:
                continue

        return {"recovered": False, "error": "Could not dismiss popup"}

    async def _detect_cookie_banner(self) -> bool:
        keywords = ["cookie", "consent", "privacy", "accept all"]
        text = await self.page.text_content("body") or ""
        return any(kw in text.lower() for kw in keywords)

    async def _dismiss_cookie_banner(self) -> dict:
        reject_selectors = [
            "button:has-text('Reject all')",
            "button:has-text('Decline')",
            "button:has-text('Only necessary')",
            "[aria-label='Reject cookies']",
        ]

        for selector in reject_selectors:
            try:
                el = self.page.locator(selector).first
                if await el.is_visible(timeout=2000):
                    await el.click()
                    return {"recovered": True, "action": "rejected cookies"}
            except Exception:
                continue

        # Fallback: accept if reject not available
        try:
            accept = self.page.locator("button:has-text('Accept')").first
            if await accept.is_visible(timeout=2000):
                await accept.click()
                return {"recovered": True, "action": "accepted cookies (reject unavailable)"}
        except Exception:
            pass

        return {"recovered": False, "error": "Could not handle cookie banner"}

    async def _scroll_to_find(self, selector: str) -> dict:
        for _ in range(5):
            await self.page.mouse.wheel(0, 500)
            await self.page.wait_for_timeout(500)
            try:
                el = self.page.locator(selector).first
                if await el.is_visible(timeout=1000):
                    return {"recovered": True, "action": "scrolled to element"}
            except Exception:
                continue
        return {"recovered": False, "error": "Element not found after scrolling"}

Agent 主循环

# src/browser/agent.py
from playwright.async_api import async_playwright

class BrowserAgent:
    """Complete browser automation agent."""

    def __init__(self, llm_client, max_steps: int = 20):
        self.llm = llm_client
        self.max_steps = max_steps

    async def run(self, task: str, start_url: str = None) -> dict:
        async with async_playwright() as p:
            browser = await p.chromium.launch(headless=True)
            page = await browser.new_page()

            if start_url:
                await page.goto(start_url, wait_until="domcontentloaded")

            observer = PageObserver(page)
            planner = BrowserPlanner(self.llm)
            executor = BrowserExecutor(page)
            recoverer = ErrorRecoverer(page)

            for step in range(self.max_steps):
                # Observe
                snapshot = await observer.get_snapshot()

                # Plan
                action = await planner.plan_next_action(task, snapshot)

                # Check for completion
                if action.get("action") in ("done", "fail"):
                    await browser.close()
                    return action

                # Execute
                result = await executor.execute(action)

                # Recover if needed
                if result.get("status") == "error":
                    recovery = await recoverer.handle_error(
                        result["error"], action
                    )
                    if not recovery.get("recovered"):
                        # Let planner know about the error
                        continue

                # Wait for page state to settle
                await page.wait_for_timeout(1000)

            await browser.close()
            return {"status": "failed", "reason": f"Max steps ({self.max_steps}) exceeded"}

总结

  1. 可访问性树优于原始 HTML:精简、语义丰富、token 效率高,是 LLM 理解网页的最佳表示。
  2. 多策略元素定位:CSS 选择器、文本内容、ARIA 属性依次尝试,提高定位成功率。
  3. 错误恢复是核心能力:弹窗、Cookie 横幅、加载延迟是最常见的干扰,必须有自动处理。
  4. 操作历史帮助规划:让 LLM 看到之前的操作和结果,避免重复失败的路径。
  5. 步数限制防止无限循环:设置最大步数是简单但有效的安全措施。

Maurice | maurice_wen@proton.me