Serverless Operations, inc

>_cd /blog/id_eao9qtqvxp72

title

Amazon Bedrock AgentCore (5) Browser Tool :AI上でPlaywrightを実行し、ヘッドレスブラウザからスクレイピングさせてみた

今日はAmazon Bedrock AgentCore Browser Tool を試していきます。

AgentCore Browser Tool とは

安全なクラウド上のブラウジング環境をエージェントに提供するためのマネージドツールです。Webブラウザの自動操作・スクレイピング・フォーム入力など、人間が行うような操作をエージェントに代行させることで例えばAPIが提供されていない検索結果を取得して出力、などが可能となります。

CDP (Chrome DevTools Protocol) ベースの操作をサポートしており、Playwright / Puppeteer / Nova Act といった一般的なブラウザ自動化フレームワークとの接続が可能で、ページ遷移・入力・スクリーンショットなどの操作が可能です。browser_session()の呼び出しだけでブラウザセッションを開始でき、利用後は自動でセッションが終了される、サーバレス型で提供されます。以下の様な利用が想定されています。

  • スクリーンショット生成
    サイトの状態を自動キャプチャし、レポートや検証に活用
  • E2Eテスト / UIテスト
    Playwright と連携してフォーム入力や画面遷移を自動化
  • Webスクレイピング
    Amazonやニュースサイトなどの検索結果を解析し、JSON/Markdown形式で整理
  • AIエージェントとの連携
    LLMと組み合わせて「Web検索して結果を整理する」といったタスクを実現

さっそくやってみる

ではここのサンプルをまずはやっていきます。

サンプルにはNova Actを用いた例とPlaywrightを用いた例が紹介されています。記事執筆時点でNova Actはまだ使えませんので、Playwrightを試していきます。

Playwright とは

Microsoftが開発している エンドツーエンド(E2E)テスト自動化フレームワーク です。
ウェブブラウザをプログラムから操作できるライブラリで、Webアプリケーションの動作確認やテストに広く使われています。

以下の様なコードでブラウザをプログラムから操作可能です。

import { chromium } from 'playwright';

(async () => {
  const browser = await chromium.launch({ headless: true });
  const page = await browser.newPage();

  // サイトにアクセス
  await page.goto('https://example.com');

  // ボタンをクリック
  await page.click('text=More information');

  // スクリーンショットを保存
  await page.screenshot({ path: 'screenshot.png' });

  await browser.close();
})();

まずはPlaywrightをインストールします。

pip install playwright

次に browsertool.py を以下の内容で作成し実行します。

import time
import base64
from datetime import datetime
from playwright.sync_api import sync_playwright, Playwright, BrowserType
from bedrock_agentcore.tools.browser_client import browser_session

def capture_cdp_screenshot(context, page, filename_prefix="screenshot", image_format="jpeg"):
    """Capture a screenshot using the CDP API and save to file."""
    cdp_client = context.new_cdp_session(page)
    screenshot_data = cdp_client.send("Page.captureScreenshot", {
        "format": image_format,
        "quality": 80,
        "captureBeyondViewport": True
    })

    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
    filename = f"{filename_prefix}_{timestamp}.{image_format}"
    image_bytes = base64.b64decode(screenshot_data['data'])

    with open(filename, "wb") as f:
        f.write(image_bytes)

    print(f"✅ Screenshot saved: {filename}")
    return filename


def main(playwright: Playwright):
    with browser_session("us-west-2") as client:
        print("📡 Browser session started... waiting for readiness")

        ws_url, headers = client.generate_ws_headers()
        chromium: BrowserType = playwright.chromium
        browser = chromium.connect_over_cdp(ws_url, headers=headers)

        try:
            context = browser.contexts[0] if browser.contexts else browser.new_context()
            page = context.pages[0] if context.pages else context.new_page()

            # Step 1: Navigate to Amazon
            print("🌐 Navigating to Amazon...")
            page.goto("https://www.amazon.com", wait_until="domcontentloaded")
            time.sleep(2)
            capture_cdp_screenshot(context, page, "amazon_home")

            # Step 2: Search for "coffee maker"
            print("🔎 Searching for 'coffee maker'...")
            page.fill("input#twotabsearchtextbox", "coffee maker")
            page.keyboard.press("Enter")
            page.wait_for_selector(".s-result-item", timeout=10000)
            time.sleep(2)
            capture_cdp_screenshot(context, page, "coffee_maker_results")

        finally:
            print("🔒 Closing browser session...")
            if not page.is_closed():
                page.close()
            browser.close()


if __name__ == "__main__":
    with sync_playwright() as p:
        main(p)
python3 browsertool.py
📡 Browser session started... waiting for readiness
🌐 Navigating to Amazon...
✅ Screenshot saved: amazon_home_20250823_212837.jpeg
🔎 Searching for 'coffee maker'...
✅ Screenshot saved: coffee_maker_results_20250823_212846.jpeg
🔒 Closing browser session...

Amazon.com に対してcoffee maker で検索を行った結果ブラウザに表示される画面がスクリーンショットとして保存されます。

検索結果のパースとJSON出力

公式のサンプルは以上ですが少し改修を加えてみます。

amazon_search_to_json.py として以下のファイルを作成し実行します。

import argparse
import base64
import json
import os
import re
import time
from datetime import datetime
from urllib.parse import urljoin, quote_plus

from playwright.sync_api import sync_playwright, Playwright, BrowserType
from bedrock_agentcore.tools.browser_client import browser_session


# ========= 共通ユーティリティ =========

def ts() -> str:
    return datetime.now().strftime("%Y%m%d_%H%M%S")


def capture_cdp_screenshot(context, page, filename_prefix="screenshot", image_format="jpeg"):
    """CDP経由でスクリーンショット保存(任意)"""
    cdp = context.new_cdp_session(page)
    data = cdp.send("Page.captureScreenshot", {
        "format": image_format,
        "quality": 80,
        "captureBeyondViewport": True
    })
    filename = f"{filename_prefix}_{ts()}.{image_format}"
    with open(filename, "wb") as f:
        f.write(base64.b64decode(data["data"]))
    print(f"✅ Screenshot saved: {filename}")
    return filename


def try_dismiss_common_modals(page):
    """クッキー/地域確認などの代表的モーダルを可能な範囲で閉じる(存在すれば)"""
    selectors = [
        "#sp-cc-accept",                 # EU cookie accept
        "text=Accept Cookies",
        "text=同意して続行",
        "input[name='accept']",
        "button[name='glowDoneButton']", # 配送先確定
        "input#continue",
    ]
    for sel in selectors:
        try:
            el = page.query_selector(sel)
            if el:
                el.click()
                time.sleep(0.3)
        except Exception:
            pass


# ========= 数値抽出(日本語表記に強い) =========

_num_float_re = re.compile(r"\d+(?:[.,]\d+)?")
_num_int_re = re.compile(r"\d+")

def _first_float(text: str):
    if not text:
        return None
    m = _num_float_re.search(text.replace("\u2009", "").replace(",", "."))
    if not m:
        return None
    try:
        return float(m.group(0))
    except Exception:
        return None

def _first_int(text: str):
    if not text:
        return None
    m = _num_int_re.search(text.replace(",", ""))
    return int(m.group(0)) if m else None


# ========= スクレイピング(高速・安定版) =========

def scrape_results(page, base_url: str, max_items: int):
    # CAPTCHA/本人確認の簡易検知
    content = page.content()
    if any(k in content for k in [
        "Robot Check",
        "ロボットによるアクセス",
        "本人確認を行っています",
        "画像内の文字を入力",
    ]):
        print("⚠️ Robot/CAPTCHA detected. Try later, lower frequency, or change query/TLD.")
        return []

    # まずは data-asin がどれだけ見えているかログ
    try:
        count = page.evaluate("document.querySelectorAll('.s-main-slot [data-asin]:not([data-asin=\"\"])').length")
        print(f"🧩 visible [data-asin] count: {count}")
    except Exception:
        print("🧩 visible [data-asin] count: (eval failed)")

    # ブラウザ内JSで一括抽出(co.jp でも拾えるようセレクタを広めに)
    raw_items = page.evaluate(
        """
        (maxItems) => {
          const toText = (el) => (el ? (el.textContent || "").trim() : null);
          const q = (el, sel) => el.querySelector(sel);
          const qa = (root, sel) => Array.from(root.querySelectorAll(sel));
          const items = qa(document, ".s-main-slot [data-asin]:not([data-asin=''])");
          const out = [];
          for (const it of items) {
            const asin = it.getAttribute("data-asin");

            // タイトル & リンク(h2が無いカードもあるので少し広めに)
            const titleA =
              q(it, "h2 a.a-link-normal") ||
              q(it, "a.a-link-normal.s-underline-text.s-underline-link-text.s-link-style.a-text-normal");
            const title = titleA ? (titleA.textContent || "").trim() : null;
            const href  = titleA ? titleA.getAttribute("href") : null;

            // 価格
            const symbolEl = q(it, ".a-price .a-price-symbol") || q(it, ".a-price .a-price-symbol[aria-hidden='true']");
            const wholeEl  = q(it, ".a-price .a-price-whole");
            const fracEl   = q(it, ".a-price .a-price-fraction");
            const symbol   = toText(symbolEl);
            const whole    = toText(wholeEl);
            const frac     = toText(fracEl);

            // 星評価
            const starEl = q(it, "i.a-icon-star-small span.a-icon-alt") || q(it, "span.a-icon-alt");
            const starsText = toText(starEl);

            // レビュー数(日本語も拾えるように優先)
            const reviewsEl =
              q(it, "span[aria-label$='件の評価']") ||
              q(it, "span[aria-label$='ratings']") ||
              q(it, "span[aria-label$='rating']") ||
              q(it, "span.a-size-base.s-underline-text");
            const reviewsText = toText(reviewsEl);

            // サムネイル
            const imgEl = q(it, "img.s-image");
            const image = imgEl ? imgEl.getAttribute("src") : null;

            if (!title && !href) continue;

            out.push({
              asin, title, href,
              priceSymbol: symbol,
              priceWhole: whole,
              priceFrac: frac,
              starsText,
              reviewsText,
              image
            });
            if (out.length >= maxItems) break;
          }
          return out;
        }
        """,
        max_items
    )

    # 0件のときはHTMLスナップショットを保存(原因の特定に役立つ)
    if not raw_items:
        os.makedirs("out", exist_ok=True)
        snap = f"out/debug_{datetime.now().strftime('%Y%m%d_%H%M%S')}.html"
        with open(snap, "w", encoding="utf-8") as f:
            f.write(content)
        print(f"🪪 no items extracted. saved HTML snapshot: {snap}")

    # Python側で整形(数値化・URL結合など)
    results = []
    for r in raw_items:
        # 価格
        price = None
        if r.get("priceWhole"):
            whole_num = _first_int(r["priceWhole"].replace("\u2009", ""))
            frac_num  = _first_int((r.get("priceFrac") or "00"))
            if whole_num is not None:
                price = float(f"{whole_num}.{(frac_num or 0):02d}")

        # 星評価(小数対応)
        stars = _first_float(r.get("starsText"))

        # レビュー数
        reviews = _first_int(r.get("reviewsText"))

        # 絶対URL
        url = urljoin(base_url, r["href"]) if r.get("href") else None

        results.append({
            "asin": r.get("asin"),
            "title": r.get("title"),
            "price": price,
            "currency": r.get("priceSymbol"),
            "stars": stars,
            "reviews": reviews,
            "url": url,
            "image": r.get("image"),
        })

    print(f"🧩 scrape_results: parsed {len(results)} items total (fast path)")
    return results


# ========= 保存 =========

def save_json_and_markdown(results, query, domain, outdir):
    os.makedirs(outdir, exist_ok=True)
    stamp = ts()
    json_path = os.path.join(outdir, f"results_{stamp}.json")
    md_path = os.path.join(outdir, f"results_{stamp}.md")

    # JSON
    with open(json_path, "w", encoding="utf-8") as f:
        json.dump({
            "query": query,
            "domain": domain,
            "generated_at": stamp,
            "items": results
        }, f, ensure_ascii=False, indent=2)
    print(f"💾 JSON saved: {json_path}")

    # Markdown(Zenn想定)
    lines = []
    lines.append(f"# 検索結果: {query}({domain})")
    lines.append("")
    lines.append("| 画像 | タイトル | 価格 | 評価 | レビュー数 |")
    lines.append("|---|---|---:|---:|---:|")
    for r in results:
        img_md = f"![]({r['image']})" if r.get("image") else ""
        title_md = f"[{r['title']}]({r['url']})" if r.get("title") and r.get("url") else (r.get("title") or "")
        price_md = f"{r['currency'] or ''}{r['price']:.2f}" if r.get("price") is not None else ""
        stars_md = f"{r['stars']:.1f}" if r.get("stars") is not None else ""
        reviews_md = f"{r['reviews']:,}" if r.get("reviews") is not None else ""
        lines.append(f"| {img_md} | {title_md} | {price_md} | {stars_md} | {reviews_md} |")

    with open(md_path, "w", encoding="utf-8") as f:
        f.write("\n".join(lines))
    print(f"📝 Markdown saved: {md_path}")

    return json_path, md_path


# ========= メインフロー =========

def run(playwright: Playwright, args):
    base = f"https://www.amazon.{args.tld}"

    with browser_session(args.region) as client:
        print("📡 Browser session started... waiting for readiness")
        ws_url, headers = client.generate_ws_headers()
        chromium: BrowserType = playwright.chromium
        browser = chromium.connect_over_cdp(ws_url, headers=headers)

        page = None
        try:
            context = browser.contexts[0] if browser.contexts else browser.new_context()
            # デフォルトタイムアウト短め(張り付き防止)
            context.set_default_timeout(8000)

            page = context.pages[0] if context.pages else context.new_page()
            page.set_default_timeout(8000)
            page.set_default_navigation_timeout(20000)

            # 1) 検索結果ページへ直遷移(フォーム入力は使わない)
            search_url = f"{base}/s?k={quote_plus(args.query)}"
            print(f"🌐 Go to: {search_url}")
            page.goto(search_url, wait_until="domcontentloaded")
            try_dismiss_common_modals(page)

            # メインスロットが見えるまで待つ
            page.wait_for_selector(".s-main-slot", timeout=30000)

            # 遅延ロード対策:ゆっくり数回スクロールして要素をロード
            for i in range(5):
                page.evaluate("window.scrollBy(0, document.body.scrollHeight * 0.25)")
                time.sleep(0.6)

            # data-asin が出現するまで軽く待つ
            page.wait_for_selector(".s-main-slot [data-asin]:not([data-asin=''])", timeout=8000)

            if args.screenshot:
                capture_cdp_screenshot(context, page, f"amazon_results_{args.tld}")

            # 2) 解析
            print("🔎 start parsing …")
            results = scrape_results(page, base, args.max_items)
            print("📦 parsing done")

            # 3) 保存
            json_path, md_path = save_json_and_markdown(results, args.query, f"amazon.{args.tld}", args.outdir)
            print("🎉 Done.")
            print(f"   JSON: {json_path}")
            print(f"   MD  : {md_path}")

        finally:
            print("🔒 Closing browser session...")
            try:
                if page and (not page.is_closed()):
                    page.close()
            except Exception:
                pass
            browser.close()


if __name__ == "__main__":
    ap = argparse.ArgumentParser(description="Amazon検索→JSON/Markdown保存(AgentCore Browser + Playwright, fast snapshot parser, JP-friendly)")
    ap.add_argument("--region", default="us-west-2", help="AgentCore Browser のリージョン")
    ap.add_argument("--tld", default="com", help="AmazonドメインのTLD(例: com / co.jp)")
    ap.add_argument("--query", default="coffee maker", help="検索ワード")
    ap.add_argument("--max-items", type=int, default=20, help="抽出上限件数")
    ap.add_argument("--outdir", default="out", help="出力ディレクトリ")
    ap.add_argument("--screenshot", action="store_true", help="スクリーンショットも保存する")
    args = ap.parse_args()

    with sync_playwright() as p:
        run(p, args)
 python3 amazon_search_to_json.py --query "デロンギ コーヒーメーカー" --tld co.jp --max-items 15 --screenshot
📡 Browser session started... waiting for readiness
🌐 Go to: https://www.amazon.co.jp/s?k=%E3%83%87%E3%83%AD%E3%83%B3%E3%82%AE+%E3%82%B3%E3%83%BC%E3%83%92%E3%83%BC%E3%83%A1%E3%83%BC%E3%82%AB%E3%83%BC
✅ Screenshot saved: amazon_results_co.jp_20250823_211004.jpeg
🔎 start parsing …
🧩 visible [data-asin] count: 48
🧩 scrape_results: parsed 14 items total (fast path)
📦 parsing done
💾 JSON saved: out/results_20250823_211005.json
📝 Markdown saved: out/results_20250823_211005.md
🎉 Done.
   JSON: out/results_20250823_211005.json
   MD  : out/results_20250823_211005.md
🔒 Closing browser session...

先ほどのスクリーンショットに加えて、検索結果をパースしたjsonとmdファイルが保存されます。

{
  "query": "デロンギ コーヒーメーカー",
  "domain": "amazon.co.jp",
  "generated_at": "20250823_211005",
  "items": [
    {
      "asin": "B088HJCVDX",
      "title": "¥69,800¥69,800",
      "price": 69800.0,
      "currency": "¥",
      "stars": 4.5,
      "reviews": 6780,
      "url": "https://www.amazon.co.jp/-/en/ECAM22112B-Magnifica-Automatic-Japonaise-Maintenance/dp/B088HJCVDX/ref=sr_1_1?dib=eyJ2IjoiMSJ9.45wIyDC9RVw_r0lA8nnXSwmghmYzNAIc23wDBDKhk224LXXvc0tBKD3duiavIavgYcWKOrNGGvATCXO-1mYFQjJDB6_sS_7HMFGdc4Vb3Zf2r0B_RfK5-_l1di5RHUxD1jjTK4l7-bYbpKg__Uj6KU7S7L-O_ZeqNbOc9BsxhHkfsZdgHzk6JN8-wchFiZJg9WWXYLUakk4jQRhO2zFonx4g076HCTQD32aqkXNl2SNbfL4d-1_VRLjvovUZuLAEO68N5g3x7H7WSSP_lITKpYioRCpDmx_GDZV4q3O-9VM.I___joRlADZOiudE2lV9uCTB6TqhwMqEsKKQ_ME-5Z4&dib_tag=se&keywords=%E3%83%87%E3%83%AD%E3%83%B3%E3%82%AE+%E3%82%B3%E3%83%BC%E3%83%92%E3%83%BC%E3%83%A1%E3%83%BC%E3%82%AB%E3%83%BC&qid=1755950992&sr=8-1",
      "image": "https://m.media-amazon.com/images/I/518ApUrwkqL._AC_UL320_.jpg"
    },

まとめ

このようにAmazon Bedrock AgentCore Browser Toolを使用することで簡単にLLMからヘッドレスブラウザの操作ができるようになります。LLMからE2Eテスト / UIテストを実行してスクリーンキャプチャを撮ってテストの工数を削減したり、ブラウザで行う定型的な作業をRPMのような感覚で実行させて業務効率化が可能になります。今後は、これらの機能を Bedrock のエージェントとワークフローと組み合わせることで、LLM に調査を依頼すると自動でブラウザを立ち上げ → 情報収集 → 整理 → レポート作成まで行ってくれるといった、より実用的な自動化の仕組みの構築も見えてきそうです。

Written by
編集部

亀田 治伸

Kameda Harunobu

  • Facebook->
  • X->
  • GitHub->

Share

Facebook->X->
Back
to list
<-