async with async_playwright() as p:
# Use regular launch without user data directory
browser = await p.chromium.launch(headless=False)
context = await browser.new_context()
pg = await context.new_page()
async with await agentql.wrap_async(pg) as agentql_page:
await agentql_page.goto("https://www.example.com/")
async with async_playwright() as p:
user_data_dir = os.environ.get("USER_DATA_DIR")
if user_data_dir:
# Convert to absolute path
profile_path = Path(user_data_dir).resolve()
print(f"profile_path: {profile_path}")
# Use launch_persistent_context for user data directory
context = await p.chromium.launch_persistent_context(
user_data_dir=str(profile_path),
executable_path="/Applications/Google Chrome.app/Contents/MacOS/Google Chrome",
headless=False,
args=[
"--disable-blink-features=AutomationControlled" # This is key
],
# Remove the default automation flags
ignore_default_args=["--enable-automation"]
)
pg = await context.new_page()
async with await agentql.wrap_async(pg) as agentql_page:
await agentql_page.goto("https://www.example.com/")
def get_browser_websocket_url():
"""Get the browser-level WebSocket URL.
start browser before call this
/Applications/Google\ Chrome.app/Contents/MacOS/Google\ Chrome \
--remote-debugging-port=9222 \
--user-data-dir=/Users/chengxinsun/temp/ChromeDebugProfile \
--lang=en-US \
--timezone=America/Los_Angeles
"""
try:
response = requests.get("http://localhost:9222/json/version")
if response.status_code == 200:
data = response.json()
return data.get('webSocketDebuggerUrl')
else:
print(f"Failed to get browser version. Status code: {response.status_code}")
return None
except Exception as e:
print(f"Error getting browser version: {e}")
return None
async def interact_with_new_page_in_local_browser(browser_url):
"""This function demonstrates how to open and interact with a new page in your local browser using AgentQL QueryParams."""
if not check_agentql_api_key():
return
try:
async with async_playwright() as p:
# Connect to the browser via Chrome DevTools Protocol
browser = await p.chromium.connect_over_cdp(browser_url)
# Get the existing context and create AgentQL wrapper
# Wrap the page with AgentQL using the async wrapper
pg = await browser.contexts[0].new_page()
async with await agentql.wrap_async(pg) as agentql_page:
try:
# Navigate to the URL
await agentql_page.goto("https://www.example.com/")