今日はAmazon Bedrock AgentCore Browser Tool を試していきます。
AgentCore Browser Tool とは
安全なクラウド上のブラウジング環境をエージェントに提供するためのマネージドツールです。Webブラウザの自動操作・スクレイピング・フォーム入力など、人間が行うような操作をエージェントに代行させることで例えばAPIが提供されていない検索結果を取得して出力、などが可能となります。
CDP (Chrome DevTools Protocol) ベースの操作をサポートしており、Playwright / Puppeteer / Nova Act といった一般的なブラウザ自動化フレームワークとの接続が可能で、ページ遷移・入力・スクリーンショットなどの操作が可能です。browser_session()
の呼び出しだけでブラウザセッションを開始でき、利用後は自動でセッションが終了される、サーバレス型で提供されます。以下の様な利用が想定されています。
- スクリーンショット生成
サイトの状態を自動キャプチャし、レポートや検証に活用 - E2Eテスト / UIテスト
Playwright と連携してフォーム入力や画面遷移を自動化 - Webスクレイピング
Amazonやニュースサイトなどの検索結果を解析し、JSON/Markdown形式で整理 - AIエージェントとの連携
LLMと組み合わせて「Web検索して結果を整理する」といったタスクを実現
さっそくやってみる
ではここのサンプルをまずはやっていきます。
サンプルにはNova Act
を用いた例とPlaywright
を用いた例が紹介されています。記事執筆時点でNova Actはまだ使えませんので、Playwright
を試していきます。
Playwright とは
Microsoftが開発している エンドツーエンド(E2E)テスト自動化フレームワーク です。
ウェブブラウザをプログラムから操作できるライブラリで、Webアプリケーションの動作確認やテストに広く使われています。
以下の様なコードでブラウザをプログラムから操作可能です。
import { chromium } from 'playwright';
(async () => {
const browser = await chromium.launch({ headless: true });
const page = await browser.newPage();
// サイトにアクセス
await page.goto('https://example.com');
// ボタンをクリック
await page.click('text=More information');
// スクリーンショットを保存
await page.screenshot({ path: 'screenshot.png' });
await browser.close();
})();
まずはPlaywright
をインストールします。
pip install playwright
次に browsertool.py
を以下の内容で作成し実行します。
import time
import base64
from datetime import datetime
from playwright.sync_api import sync_playwright, Playwright, BrowserType
from bedrock_agentcore.tools.browser_client import browser_session
def capture_cdp_screenshot(context, page, filename_prefix="screenshot", image_format="jpeg"):
"""Capture a screenshot using the CDP API and save to file."""
cdp_client = context.new_cdp_session(page)
screenshot_data = cdp_client.send("Page.captureScreenshot", {
"format": image_format,
"quality": 80,
"captureBeyondViewport": True
})
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"{filename_prefix}_{timestamp}.{image_format}"
image_bytes = base64.b64decode(screenshot_data['data'])
with open(filename, "wb") as f:
f.write(image_bytes)
print(f"✅ Screenshot saved: {filename}")
return filename
def main(playwright: Playwright):
with browser_session("us-west-2") as client:
print("📡 Browser session started... waiting for readiness")
ws_url, headers = client.generate_ws_headers()
chromium: BrowserType = playwright.chromium
browser = chromium.connect_over_cdp(ws_url, headers=headers)
try:
context = browser.contexts[0] if browser.contexts else browser.new_context()
page = context.pages[0] if context.pages else context.new_page()
# Step 1: Navigate to Amazon
print("🌐 Navigating to Amazon...")
page.goto("https://www.amazon.com", wait_until="domcontentloaded")
time.sleep(2)
capture_cdp_screenshot(context, page, "amazon_home")
# Step 2: Search for "coffee maker"
print("🔎 Searching for 'coffee maker'...")
page.fill("input#twotabsearchtextbox", "coffee maker")
page.keyboard.press("Enter")
page.wait_for_selector(".s-result-item", timeout=10000)
time.sleep(2)
capture_cdp_screenshot(context, page, "coffee_maker_results")
finally:
print("🔒 Closing browser session...")
if not page.is_closed():
page.close()
browser.close()
if __name__ == "__main__":
with sync_playwright() as p:
main(p)
python3 browsertool.py
📡 Browser session started... waiting for readiness
🌐 Navigating to Amazon...
✅ Screenshot saved: amazon_home_20250823_212837.jpeg
🔎 Searching for 'coffee maker'...
✅ Screenshot saved: coffee_maker_results_20250823_212846.jpeg
🔒 Closing browser session...
Amazon.com に対してcoffee maker
で検索を行った結果ブラウザに表示される画面がスクリーンショットとして保存されます。

検索結果のパースとJSON出力
公式のサンプルは以上ですが少し改修を加えてみます。
amazon_search_to_json.py
として以下のファイルを作成し実行します。
import argparse
import base64
import json
import os
import re
import time
from datetime import datetime
from urllib.parse import urljoin, quote_plus
from playwright.sync_api import sync_playwright, Playwright, BrowserType
from bedrock_agentcore.tools.browser_client import browser_session
# ========= 共通ユーティリティ =========
def ts() -> str:
return datetime.now().strftime("%Y%m%d_%H%M%S")
def capture_cdp_screenshot(context, page, filename_prefix="screenshot", image_format="jpeg"):
"""CDP経由でスクリーンショット保存(任意)"""
cdp = context.new_cdp_session(page)
data = cdp.send("Page.captureScreenshot", {
"format": image_format,
"quality": 80,
"captureBeyondViewport": True
})
filename = f"{filename_prefix}_{ts()}.{image_format}"
with open(filename, "wb") as f:
f.write(base64.b64decode(data["data"]))
print(f"✅ Screenshot saved: {filename}")
return filename
def try_dismiss_common_modals(page):
"""クッキー/地域確認などの代表的モーダルを可能な範囲で閉じる(存在すれば)"""
selectors = [
"#sp-cc-accept", # EU cookie accept
"text=Accept Cookies",
"text=同意して続行",
"input[name='accept']",
"button[name='glowDoneButton']", # 配送先確定
"input#continue",
]
for sel in selectors:
try:
el = page.query_selector(sel)
if el:
el.click()
time.sleep(0.3)
except Exception:
pass
# ========= 数値抽出(日本語表記に強い) =========
_num_float_re = re.compile(r"\d+(?:[.,]\d+)?")
_num_int_re = re.compile(r"\d+")
def _first_float(text: str):
if not text:
return None
m = _num_float_re.search(text.replace("\u2009", "").replace(",", "."))
if not m:
return None
try:
return float(m.group(0))
except Exception:
return None
def _first_int(text: str):
if not text:
return None
m = _num_int_re.search(text.replace(",", ""))
return int(m.group(0)) if m else None
# ========= スクレイピング(高速・安定版) =========
def scrape_results(page, base_url: str, max_items: int):
# CAPTCHA/本人確認の簡易検知
content = page.content()
if any(k in content for k in [
"Robot Check",
"ロボットによるアクセス",
"本人確認を行っています",
"画像内の文字を入力",
]):
print("⚠️ Robot/CAPTCHA detected. Try later, lower frequency, or change query/TLD.")
return []
# まずは data-asin がどれだけ見えているかログ
try:
count = page.evaluate("document.querySelectorAll('.s-main-slot [data-asin]:not([data-asin=\"\"])').length")
print(f"🧩 visible [data-asin] count: {count}")
except Exception:
print("🧩 visible [data-asin] count: (eval failed)")
# ブラウザ内JSで一括抽出(co.jp でも拾えるようセレクタを広めに)
raw_items = page.evaluate(
"""
(maxItems) => {
const toText = (el) => (el ? (el.textContent || "").trim() : null);
const q = (el, sel) => el.querySelector(sel);
const qa = (root, sel) => Array.from(root.querySelectorAll(sel));
const items = qa(document, ".s-main-slot [data-asin]:not([data-asin=''])");
const out = [];
for (const it of items) {
const asin = it.getAttribute("data-asin");
// タイトル & リンク(h2が無いカードもあるので少し広めに)
const titleA =
q(it, "h2 a.a-link-normal") ||
q(it, "a.a-link-normal.s-underline-text.s-underline-link-text.s-link-style.a-text-normal");
const title = titleA ? (titleA.textContent || "").trim() : null;
const href = titleA ? titleA.getAttribute("href") : null;
// 価格
const symbolEl = q(it, ".a-price .a-price-symbol") || q(it, ".a-price .a-price-symbol[aria-hidden='true']");
const wholeEl = q(it, ".a-price .a-price-whole");
const fracEl = q(it, ".a-price .a-price-fraction");
const symbol = toText(symbolEl);
const whole = toText(wholeEl);
const frac = toText(fracEl);
// 星評価
const starEl = q(it, "i.a-icon-star-small span.a-icon-alt") || q(it, "span.a-icon-alt");
const starsText = toText(starEl);
// レビュー数(日本語も拾えるように優先)
const reviewsEl =
q(it, "span[aria-label$='件の評価']") ||
q(it, "span[aria-label$='ratings']") ||
q(it, "span[aria-label$='rating']") ||
q(it, "span.a-size-base.s-underline-text");
const reviewsText = toText(reviewsEl);
// サムネイル
const imgEl = q(it, "img.s-image");
const image = imgEl ? imgEl.getAttribute("src") : null;
if (!title && !href) continue;
out.push({
asin, title, href,
priceSymbol: symbol,
priceWhole: whole,
priceFrac: frac,
starsText,
reviewsText,
image
});
if (out.length >= maxItems) break;
}
return out;
}
""",
max_items
)
# 0件のときはHTMLスナップショットを保存(原因の特定に役立つ)
if not raw_items:
os.makedirs("out", exist_ok=True)
snap = f"out/debug_{datetime.now().strftime('%Y%m%d_%H%M%S')}.html"
with open(snap, "w", encoding="utf-8") as f:
f.write(content)
print(f"🪪 no items extracted. saved HTML snapshot: {snap}")
# Python側で整形(数値化・URL結合など)
results = []
for r in raw_items:
# 価格
price = None
if r.get("priceWhole"):
whole_num = _first_int(r["priceWhole"].replace("\u2009", ""))
frac_num = _first_int((r.get("priceFrac") or "00"))
if whole_num is not None:
price = float(f"{whole_num}.{(frac_num or 0):02d}")
# 星評価(小数対応)
stars = _first_float(r.get("starsText"))
# レビュー数
reviews = _first_int(r.get("reviewsText"))
# 絶対URL
url = urljoin(base_url, r["href"]) if r.get("href") else None
results.append({
"asin": r.get("asin"),
"title": r.get("title"),
"price": price,
"currency": r.get("priceSymbol"),
"stars": stars,
"reviews": reviews,
"url": url,
"image": r.get("image"),
})
print(f"🧩 scrape_results: parsed {len(results)} items total (fast path)")
return results
# ========= 保存 =========
def save_json_and_markdown(results, query, domain, outdir):
os.makedirs(outdir, exist_ok=True)
stamp = ts()
json_path = os.path.join(outdir, f"results_{stamp}.json")
md_path = os.path.join(outdir, f"results_{stamp}.md")
# JSON
with open(json_path, "w", encoding="utf-8") as f:
json.dump({
"query": query,
"domain": domain,
"generated_at": stamp,
"items": results
}, f, ensure_ascii=False, indent=2)
print(f"💾 JSON saved: {json_path}")
# Markdown(Zenn想定)
lines = []
lines.append(f"# 検索結果: {query}({domain})")
lines.append("")
lines.append("| 画像 | タイトル | 価格 | 評価 | レビュー数 |")
lines.append("|---|---|---:|---:|---:|")
for r in results:
img_md = f"" if r.get("image") else ""
title_md = f"[{r['title']}]({r['url']})" if r.get("title") and r.get("url") else (r.get("title") or "")
price_md = f"{r['currency'] or ''}{r['price']:.2f}" if r.get("price") is not None else ""
stars_md = f"{r['stars']:.1f}" if r.get("stars") is not None else ""
reviews_md = f"{r['reviews']:,}" if r.get("reviews") is not None else ""
lines.append(f"| {img_md} | {title_md} | {price_md} | {stars_md} | {reviews_md} |")
with open(md_path, "w", encoding="utf-8") as f:
f.write("\n".join(lines))
print(f"📝 Markdown saved: {md_path}")
return json_path, md_path
# ========= メインフロー =========
def run(playwright: Playwright, args):
base = f"https://www.amazon.{args.tld}"
with browser_session(args.region) as client:
print("📡 Browser session started... waiting for readiness")
ws_url, headers = client.generate_ws_headers()
chromium: BrowserType = playwright.chromium
browser = chromium.connect_over_cdp(ws_url, headers=headers)
page = None
try:
context = browser.contexts[0] if browser.contexts else browser.new_context()
# デフォルトタイムアウト短め(張り付き防止)
context.set_default_timeout(8000)
page = context.pages[0] if context.pages else context.new_page()
page.set_default_timeout(8000)
page.set_default_navigation_timeout(20000)
# 1) 検索結果ページへ直遷移(フォーム入力は使わない)
search_url = f"{base}/s?k={quote_plus(args.query)}"
print(f"🌐 Go to: {search_url}")
page.goto(search_url, wait_until="domcontentloaded")
try_dismiss_common_modals(page)
# メインスロットが見えるまで待つ
page.wait_for_selector(".s-main-slot", timeout=30000)
# 遅延ロード対策:ゆっくり数回スクロールして要素をロード
for i in range(5):
page.evaluate("window.scrollBy(0, document.body.scrollHeight * 0.25)")
time.sleep(0.6)
# data-asin が出現するまで軽く待つ
page.wait_for_selector(".s-main-slot [data-asin]:not([data-asin=''])", timeout=8000)
if args.screenshot:
capture_cdp_screenshot(context, page, f"amazon_results_{args.tld}")
# 2) 解析
print("🔎 start parsing …")
results = scrape_results(page, base, args.max_items)
print("📦 parsing done")
# 3) 保存
json_path, md_path = save_json_and_markdown(results, args.query, f"amazon.{args.tld}", args.outdir)
print("🎉 Done.")
print(f" JSON: {json_path}")
print(f" MD : {md_path}")
finally:
print("🔒 Closing browser session...")
try:
if page and (not page.is_closed()):
page.close()
except Exception:
pass
browser.close()
if __name__ == "__main__":
ap = argparse.ArgumentParser(description="Amazon検索→JSON/Markdown保存(AgentCore Browser + Playwright, fast snapshot parser, JP-friendly)")
ap.add_argument("--region", default="us-west-2", help="AgentCore Browser のリージョン")
ap.add_argument("--tld", default="com", help="AmazonドメインのTLD(例: com / co.jp)")
ap.add_argument("--query", default="coffee maker", help="検索ワード")
ap.add_argument("--max-items", type=int, default=20, help="抽出上限件数")
ap.add_argument("--outdir", default="out", help="出力ディレクトリ")
ap.add_argument("--screenshot", action="store_true", help="スクリーンショットも保存する")
args = ap.parse_args()
with sync_playwright() as p:
run(p, args)
python3 amazon_search_to_json.py --query "デロンギ コーヒーメーカー" --tld co.jp --max-items 15 --screenshot
📡 Browser session started... waiting for readiness
🌐 Go to: https://www.amazon.co.jp/s?k=%E3%83%87%E3%83%AD%E3%83%B3%E3%82%AE+%E3%82%B3%E3%83%BC%E3%83%92%E3%83%BC%E3%83%A1%E3%83%BC%E3%82%AB%E3%83%BC
✅ Screenshot saved: amazon_results_co.jp_20250823_211004.jpeg
🔎 start parsing …
🧩 visible [data-asin] count: 48
🧩 scrape_results: parsed 14 items total (fast path)
📦 parsing done
💾 JSON saved: out/results_20250823_211005.json
📝 Markdown saved: out/results_20250823_211005.md
🎉 Done.
JSON: out/results_20250823_211005.json
MD : out/results_20250823_211005.md
🔒 Closing browser session...
先ほどのスクリーンショットに加えて、検索結果をパースしたjsonとmdファイルが保存されます。
{
"query": "デロンギ コーヒーメーカー",
"domain": "amazon.co.jp",
"generated_at": "20250823_211005",
"items": [
{
"asin": "B088HJCVDX",
"title": "¥69,800¥69,800",
"price": 69800.0,
"currency": "¥",
"stars": 4.5,
"reviews": 6780,
"url": "https://www.amazon.co.jp/-/en/ECAM22112B-Magnifica-Automatic-Japonaise-Maintenance/dp/B088HJCVDX/ref=sr_1_1?dib=eyJ2IjoiMSJ9.45wIyDC9RVw_r0lA8nnXSwmghmYzNAIc23wDBDKhk224LXXvc0tBKD3duiavIavgYcWKOrNGGvATCXO-1mYFQjJDB6_sS_7HMFGdc4Vb3Zf2r0B_RfK5-_l1di5RHUxD1jjTK4l7-bYbpKg__Uj6KU7S7L-O_ZeqNbOc9BsxhHkfsZdgHzk6JN8-wchFiZJg9WWXYLUakk4jQRhO2zFonx4g076HCTQD32aqkXNl2SNbfL4d-1_VRLjvovUZuLAEO68N5g3x7H7WSSP_lITKpYioRCpDmx_GDZV4q3O-9VM.I___joRlADZOiudE2lV9uCTB6TqhwMqEsKKQ_ME-5Z4&dib_tag=se&keywords=%E3%83%87%E3%83%AD%E3%83%B3%E3%82%AE+%E3%82%B3%E3%83%BC%E3%83%92%E3%83%BC%E3%83%A1%E3%83%BC%E3%82%AB%E3%83%BC&qid=1755950992&sr=8-1",
"image": "https://m.media-amazon.com/images/I/518ApUrwkqL._AC_UL320_.jpg"
},
まとめ
このようにAmazon Bedrock AgentCore Browser Toolを使用することで簡単にLLMからヘッドレスブラウザの操作ができるようになります。LLMからE2Eテスト / UIテストを実行してスクリーンキャプチャを撮ってテストの工数を削減したり、ブラウザで行う定型的な作業をRPMのような感覚で実行させて業務効率化が可能になります。今後は、これらの機能を Bedrock のエージェントとワークフローと組み合わせることで、「LLM に調査を依頼すると自動でブラウザを立ち上げ → 情報収集 → 整理 → レポート作成まで行ってくれる」といった、より実用的な自動化の仕組みの構築も見えてきそうです。