启动带用debug端口及新profile的Chrome实例
/Applications/Google\ Chrome.app/Contents/MacOS/Google\ Chrome --remote-debugging-port=9222 --user-data-dir=~/ChromeDebugProfile
查找browser level webSocketDebuggerUrl
访问http://localhost:9222/json/version,从输出中找到webSocketDebuggerUrl
{
“Browser”: “Chrome/138.0.7204.169”,
“Protocol-Version”: “1.3”,
“User-Agent”: “Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/138.0.0.0 Safari/537.36”,
“V8-Version”: “13.8.258.29”,
“WebKit-Version”: “537.36 (@f67687ed433da666b8d134ca10de067bcc94c6fe)”,
“webSocketDebuggerUrl”: “ws://localhost:9222/devtools/browser/4f47efe0-043f-49b5-b733-2856232e21be”
}
{
“Browser”: “Chrome/138.0.7204.169”,
“Protocol-Version”: “1.3”,
“User-Agent”: “Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/138.0.0.0 Safari/537.36”,
“V8-Version”: “13.8.258.29”,
“WebKit-Version”: “537.36 (@f67687ed433da666b8d134ca10de067bcc94c6fe)”,
“webSocketDebuggerUrl”: “ws://localhost:9222/devtools/browser/4f47efe0-043f-49b5-b733-2856232e21be”
}
用playwright 抓取数据
“` python
import logging
import dotenv
import lxml
import requests
from playwright.async_api import async_playwright
def get_browser_websocket_url():
“””Get the browser-level WebSocket URL.”””
try:
response = requests.get(“http://localhost:9222/json/version”)
if response.status_code == 200:
data = response.json()
return data.get(‘webSocketDebuggerUrl’)
else:
print(f”Failed to get browser version. Status code: {response.status_code}”)
return None
except Exception as e:
print(f”Error getting browser version: {e}”)
return None
async def interact_with_new_page_in_local_browser(browser_url):
“””This function demonstrates how to open and interact with a new page in your local browser using AgentQL QueryParams.”””
try:
url = “https://carta.com/sg/en/executive-team/”
async with async_playwright() as p:
# Connect to the browser via Chrome DevTools Protocol
browser = await p.chromium.connect_over_cdp(browser_url)
# Get the existing context and create AgentQL wrapper
context = browser.contexts[0]
page = await context.new_page()
try:
print(f”Navigating to: {url}”)
# Navigate to the URL
response = await page.goto(url)
# Wait for page to be ready and network idle
print(“Waiting for page to be ready…”)
await page.wait_for_load_state(“domcontentloaded”)
print(“Waiting for network to be idle …”)
await page.wait_for_load_state(“networkidle”)
print(“Waiting for 0.5 seconds timeout …”)
await page.wait_for_timeout(500) # 0.5 seconds timeout
print(“Waiting for network to be idle 2 …”)
await page.wait_for_load_state(“networkidle”)
print(“Waiting for 0.5 seconds timeout 2 …”)
await page.wait_for_timeout(500) # 0.5 seconds timeout
print(“Page is ready! Response status:”, response.status)
print(“Response text:”)
print(“=” * 50)
html_content = await page.content()
# print(html_content)
from lxml import html
html_lxml = html.fromstring(html_content)
executives = html_lxml.xpath(‘//div[@class=”ca-card__info-header”]’)
print(“Executive Team Members:”)
print(“-” * 50)
# Store parsed executives in a list
parsed_executives = []
for i, executive in enumerate(executives, 1):
# Get the HTML content of the executive element to preserve <br> tags
html_content = lxml.html.tostring(executive, encoding=’unicode’)
print(html_content)
# Split by <br> tags to separate name and title
import re
# Split by <br> tags (case insensitive) and clean up
parts = re.split(r'<br\s*/?>’, html_content, flags=re.IGNORECASE)
# Clean up each part by removing HTML tags and extra whitespace
cleaned_parts = []
for part in parts:
# Remove HTML tags and decode HTML entities
from lxml.html import fromstring
try:
# Parse the HTML fragment and get text content
fragment = fromstring(f”<div>{part}</div>”)
text = fragment.text_content().strip()
if text: # Only add non-empty parts
cleaned_parts.append(text)
except:
# Fallback: simple text extraction
text = re.sub(r'<[^>]+>’, ”, part).strip()
if text:
cleaned_parts.append(text)
# Parse name and title based on <br> separation
if len(cleaned_parts) >= 2:
name = cleaned_parts[0]
title = cleaned_parts[1]
print(f”{i}. Name: {name}”)
print(f” Title: {title}”)
# Store the parsed data
parsed_executives.append({
‘name’: name,
‘title’: title
})
elif len(cleaned_parts) == 1:
# If only one part, try to split by multiple spaces as fallback
full_text = cleaned_parts[0]
space_parts = re.split(r’\s{3,}’, full_text)
if len(space_parts) >= 2:
name = space_parts[0].strip()
title = space_parts[1].strip()
print(f”{i}. Name: {name}”)
print(f” Title: {title}”)
parsed_executives.append({
‘name’: name,
‘title’: title
})
else:
# Single line with no clear separation
print(f”{i}. Name: {full_text}”)
print(f” Title: (No title found)”)
parsed_executives.append({
‘name’: full_text,
‘title’: None
})
else:
print(f”{i}. (No content found)”)
parsed_executives.append({
‘name’: None,
‘title’: None
})
print()
# Print the structured data
print(“\nStructured Executive Data:”)
print(“-” * 50)
for i, exec_data in enumerate(parsed_executives, 1):
print(f”{i}. {exec_data}”)
# You can also save this data to a file or use it in your application
import json
with open(‘executive_team_data.json’, ‘w’) as f:
json.dump(parsed_executives, f, indent=2)
print(f”\nData saved to executive_team_data.json”)
print(“=” * 50)
except Exception as e:
print(f”Error navigating to URL: {e}”)
return
finally:
await page.close()
await context.close()
await browser.close()
except Exception as e:
print(f”Error connecting to browser or fetching data: {e}”)
print(“\nTrying to get available browser instances…”)
async def main():
“””Main async function to run the debug tool.”””
print(“🔧 AgentQL Remote Browser Debug Tool”)
print(“=” * 50)
logging.basicConfig(
level=logging.DEBUG,
format=”%(asctime)s – %(name)s – %(levelname)s – %(message)s”,
)
dotenv.load_dotenv()
# Get the browser-level WebSocket URL
browser_url = get_browser_websocket_url()
if browser_url:
print(f”Browser WebSocket URL: {browser_url}”)
await interact_with_new_page_in_local_browser(browser_url)
else:
print(“No browser instances found. Please start Chrome with remote debugging enabled: chrome –remote-debugging-port=9222”)
if __name__ == “__main__”:
import asyncio
asyncio.run(main())