fix: prevent browser process leaks and improve robustness

- fix(browser): wrap context lifecycle in try/finally to ensure browser
  closes on exceptions and KeyboardInterrupt (login, sniffer, generator)
- fix(browser): replace time.sleep with Playwright native waits
  (wait_for, wait_for_timeout) for more reliable element interaction
- fix(browser): use parameterized page.evaluate instead of f-string
  JS injection in generator polling
- fix(api): add retry logic in wait_for_completion to survive transient
  network errors
- fix(config): add prepare_profile_dir to copy profile to temp dir,
  preventing Chromium SingletonLock conflicts when tools run concurrently
- fix(sniffer): stream API logs to tempfile instead of unbounded memory
  list to avoid OOM
- fix(api): specify encoding='utf-8' when loading cookies from file

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
Claude
2026-05-24 23:08:46 +08:00
parent b2f549af01
commit 734d53dafb
6 changed files with 255 additions and 180 deletions

View File

@@ -190,8 +190,15 @@ class Hunyuan3DAPI:
完成的创作信息
"""
start_time = time.time()
last_error = None
while time.time() - start_time < timeout:
status = self.get_generation_status(creation_id)
try:
status = self.get_generation_status(creation_id)
except Exception as e:
last_error = e
print(f"查询状态失败: {e},将在 {poll_interval} 秒后重试...")
time.sleep(poll_interval)
continue
# Handle both wrapped {code, data} and flat response formats
if "code" in status:
@@ -209,6 +216,8 @@ class Hunyuan3DAPI:
time.sleep(poll_interval)
if last_error:
raise TimeoutError(f"Generation timeout after {timeout} seconds. Last error: {last_error}")
raise TimeoutError(f"Generation timeout after {timeout} seconds")
@@ -217,7 +226,7 @@ from .config import get_cookie_path
def load_cookies_from_file(filepath: Optional[str] = None) -> str:
"""从文件加载Cookie默认读取用户配置目录的 cookies.txt"""
path = filepath or str(get_cookie_path())
with open(path, 'r') as f:
with open(path, 'r', encoding='utf-8') as f:
return f.read().strip()

View File

@@ -411,7 +411,7 @@ from .config import get_cookie_path
def load_cookies_from_file(filepath: Optional[str] = None) -> str:
"""从文件加载Cookie默认读取用户配置目录的 cookies.txt"""
path = filepath or str(get_cookie_path())
with open(path, 'r') as f:
with open(path, 'r', encoding='utf-8') as f:
return f.read().strip()

View File

@@ -4,8 +4,12 @@
使用 cloakbrowser 在浏览器内完成所有操作(包括签名生成)
"""
import contextlib
import json
import os
import shutil
import sys
import tempfile
import time
from datetime import datetime
from cloakbrowser import launch_persistent_context
@@ -15,6 +19,36 @@ from ..config import get_profile_dir
PROFILE_DIR = str(get_profile_dir())
@contextlib.contextmanager
def _temp_persistent_context(headless=True):
"""
将标准 profile 复制到临时目录后启动持久化上下文,
避免与 login 进程发生 Chromium SingletonLock 冲突。
"""
standard = str(get_profile_dir())
if not os.path.exists(standard):
raise FileNotFoundError(f"未找到登录状态目录: {standard}")
temp_root = tempfile.mkdtemp(prefix="hunyuan3dweb-profile-")
temp_profile = os.path.join(temp_root, "profile")
shutil.copytree(standard, temp_profile, dirs_exist_ok=True)
context = None
try:
context = launch_persistent_context(temp_profile, headless=headless)
yield context
finally:
if context is not None:
try:
context.close()
except Exception:
pass
try:
shutil.rmtree(temp_root)
except Exception:
pass
def generate_3d(image_path, wait_for_complete=False, timeout=300):
"""
上传图片并触发生成3D模型
@@ -30,59 +64,55 @@ def generate_3d(image_path, wait_for_complete=False, timeout=300):
if not os.path.exists(image_path):
raise FileNotFoundError(f"图片不存在: {image_path}")
if not os.path.exists(PROFILE_DIR):
raise FileNotFoundError(f"未找到登录状态目录: {PROFILE_DIR}")
with _temp_persistent_context(headless=True) as context:
page = context.new_page()
context = launch_persistent_context(PROFILE_DIR, headless=True)
page = context.new_page()
# 用于存储结果
result = {"creationsId": None, "status": None, "modelUrl": None}
# 用于存储结果
result = {"creationsId": None, "status": None, "modelUrl": None}
# 监听响应
def handle_response(response):
url = response.url
if "creations/generations" in url and response.status == 200:
try:
body = response.json()
if "creationsId" in body:
result["creationsId"] = body["creationsId"]
print(f"✅ 生成已触发creationsId: {body['creationsId']}")
except:
pass
elif "creations/detail" in url and response.status == 200:
try:
body = response.json()
result["status"] = body.get("status")
if "result" in body and isinstance(body["result"], list) and len(body["result"]) > 0:
model_data = body["result"][0]
if "modelUrl" in model_data:
result["modelUrl"] = model_data["modelUrl"]
except:
pass
# 监听响应
def handle_response(response):
url = response.url
if "creations/generations" in url and response.status == 200:
try:
body = response.json()
if "creationsId" in body:
result["creationsId"] = body["creationsId"]
print(f"✅ 生成已触发creationsId: {body['creationsId']}")
except:
pass
elif "creations/detail" in url and response.status == 200:
try:
body = response.json()
result["status"] = body.get("status")
if "result" in body and isinstance(body["result"], list) and len(body["result"]) > 0:
model_data = body["result"][0]
if "modelUrl" in model_data:
result["modelUrl"] = model_data["modelUrl"]
except:
pass
page.on("response", handle_response)
page.on("response", handle_response)
try:
print("[1/6] 打开首页...")
page.goto("https://3d.hunyuan.tencent.com/")
page.goto("https://3d.hunyuan.tencent.com/", timeout=60000)
page.wait_for_timeout(3000)
print("[2/6] 点击 AI创作...")
page.locator("button").filter(has_text="AI创作").first.click()
page.wait_for_timeout(2000)
page.locator("text=图生3D").wait_for(state="visible", timeout=10000)
print("[3/6] 点击 图生3D...")
page.locator("text=图生3D").first.click()
page.wait_for_timeout(2000)
page.locator("text=单张图片").wait_for(state="visible", timeout=10000)
print("[4/6] 点击 单张图片...")
page.locator("text=单张图片").first.click()
page.wait_for_timeout(2000)
page.locator('input[type=file]').first.wait_for(state="visible", timeout=10000)
print("[5/6] 上传图片...")
page.locator('input[type=file]').first.set_input_files(image_path)
page.wait_for_timeout(3000)
page.locator("text=立即生成").wait_for(state="visible", timeout=10000)
print("[6/6] 点击 立即生成...")
page.locator("text=立即生成").first.click()
@@ -100,14 +130,14 @@ def generate_3d(image_path, wait_for_complete=False, timeout=300):
last_status = None
while time.time() - start < timeout:
# 轮询状态
status_result = page.evaluate(f'''
async () => {{
const resp = await fetch('https://3d.hunyuan.tencent.com/api/3d/creations/detail?creationsId={result["creationsId"]}', {{
headers: {{'X-Source': 'web', 'Referer': 'https://3d.hunyuan.tencent.com/'}}
}});
status_result = page.evaluate('''
async (creationsId) => {
const resp = await fetch(`https://3d.hunyuan.tencent.com/api/3d/creations/detail?creationsId=${creationsId}`, {
headers: {'X-Source': 'web', 'Referer': 'https://3d.hunyuan.tencent.com/'}
});
return await resp.json();
}}
''')
}
''', result["creationsId"])
status = status_result.get("status", "unknown")
progress = status_result.get("progress", 0)
@@ -134,17 +164,13 @@ def generate_3d(image_path, wait_for_complete=False, timeout=300):
return result
finally:
context.close()
def get_quota():
"""获取当前配额信息"""
context = launch_persistent_context(PROFILE_DIR, headless=True)
page = context.new_page()
with _temp_persistent_context(headless=True) as context:
page = context.new_page()
try:
page.goto("https://3d.hunyuan.tencent.com/")
page.goto("https://3d.hunyuan.tencent.com/", timeout=60000)
page.wait_for_timeout(3000)
result = page.evaluate('''
@@ -158,17 +184,14 @@ def get_quota():
}
''')
return result
finally:
context.close()
def get_creations_list():
"""获取作品列表"""
context = launch_persistent_context(PROFILE_DIR, headless=True)
page = context.new_page()
with _temp_persistent_context(headless=True) as context:
page = context.new_page()
try:
page.goto("https://3d.hunyuan.tencent.com/")
page.goto("https://3d.hunyuan.tencent.com/", timeout=60000)
page.wait_for_timeout(3000)
result = page.evaluate('''
@@ -182,13 +205,9 @@ def get_creations_list():
}
''')
return result
finally:
context.close()
if __name__ == "__main__":
import sys
if len(sys.argv) < 2:
print("用法:")
print(f" python {sys.argv[0]} quota # 查询配额")

View File

@@ -47,105 +47,104 @@ def main():
headless = not args.no_headless
# 使用持久化上下文cookie 和登录状态会保存到 PROFILE_DIR
context = launch_persistent_context(PROFILE_DIR, headless=headless)
page = context.new_page()
context = None
try:
context = launch_persistent_context(PROFILE_DIR, headless=headless)
page = context.new_page()
print("正在打开腾讯混元3D...")
page.goto("https://3d.hunyuan.tencent.com/")
time.sleep(2)
print("正在打开腾讯混元3D...")
page.goto("https://3d.hunyuan.tencent.com/", timeout=60000)
page.wait_for_timeout(2000)
# 检查是否已经是登录状态(没有登录按钮说明已登录)
login_btn = page.locator("button").filter(has_text="登录").first
if login_btn.count() == 0:
print("检测到已有登录状态,无需重新登录。")
print(f"当前页面: {page.url}")
_extract_and_save_cookies(context)
if headless:
context.close()
return
else:
# 未登录,走登录流程
login_btn.click()
time.sleep(1.5)
# 切换到邮箱登录
email_tab = page.locator("text=邮箱").first
if email_tab.count() > 0:
email_tab.click()
time.sleep(1.5)
else:
print("未找到邮箱登录选项")
context.close()
return
# 定位元素
email_input = page.locator('input[placeholder="请输入邮箱地址"]')
code_input = page.locator('input[type="number"][placeholder="请输入邮箱验证码"]')
send_code_btn = page.locator("a.hyc-email-login__send-code")
login_submit_btn = page.locator("button.hyc-email-login__btn")
checkbox = page.locator(".t-checkbox__former")
# 第一次交互:输入邮箱并发送验证码
print("\n============================================")
email = input("请输入邮箱地址,按回车发送验证码: ").strip()
if not email:
print("邮箱不能为空,退出")
context.close()
return
email_input.fill(email)
time.sleep(0.5)
# 勾选协议(如果未勾选)
if checkbox.count() > 0:
is_checked = checkbox.evaluate("el => el.checked")
if not is_checked:
checkbox.evaluate("el => el.click()")
time.sleep(0.3)
if send_code_btn.count() > 0:
send_code_btn.click()
print("已点击发送验证码,请查收邮件...")
else:
print("未找到发送验证码按钮")
context.close()
return
# 第二次交互:输入验证码并登录
print("\n============================================")
code = input("请输入邮箱验证码,按回车登录: ").strip()
if not code:
print("验证码不能为空,退出")
context.close()
return
code_input.fill(code)
time.sleep(0.5)
# 点击登录
if login_submit_btn.count() > 0:
login_submit_btn.click()
print("已点击登录按钮,等待跳转...")
else:
print("未找到登录提交按钮")
context.close()
return
# 等待登录成功跳转
try:
page.wait_for_url(lambda url: "login" not in url, timeout=30000)
print(f"\n登录成功!当前页面: {page.url}")
print(f"登录状态已保存到: {os.path.abspath(PROFILE_DIR)}")
# 检查是否已经是登录状态(没有登录按钮说明已登录)
login_btn = page.locator("button").filter(has_text="登录").first
if login_btn.count() == 0:
print("检测到已有登录状态,无需重新登录。")
print(f"当前页面: {page.url}")
_extract_and_save_cookies(context)
except Exception:
print("\n登录可能仍在处理中,或出现错误。请检查浏览器状态。")
# 保持浏览器打开(仅非无头模式)
if not headless:
print("\n按 Enter 键关闭浏览器并退出...")
input()
context.close()
if headless:
return
else:
# 未登录,走登录流程
login_btn.click()
page.locator("text=邮箱").wait_for(state="visible", timeout=10000)
# 切换到邮箱登录
email_tab = page.locator("text=邮箱").first
if email_tab.count() > 0:
email_tab.click()
email_input.wait_for(state="visible", timeout=10000)
else:
print("未找到邮箱登录选项")
return
# 定位元素
email_input = page.locator('input[placeholder="请输入邮箱地址"]')
code_input = page.locator('input[type="number"][placeholder="请输入邮箱验证码"]')
send_code_btn = page.locator("a.hyc-email-login__send-code")
login_submit_btn = page.locator("button.hyc-email-login__btn")
checkbox = page.locator(".t-checkbox__former")
# 第一次交互:输入邮箱并发送验证码
print("\n============================================")
email = input("请输入邮箱地址,按回车发送验证码: ").strip()
if not email:
print("邮箱不能为空,退出")
return
email_input.fill(email)
# 勾选协议(如果未勾选)
if checkbox.count() > 0:
is_checked = checkbox.evaluate("el => el.checked")
if not is_checked:
checkbox.evaluate("el => el.click()")
page.wait_for_timeout(300)
if send_code_btn.count() > 0:
send_code_btn.click()
print("已点击发送验证码,请查收邮件...")
else:
print("未找到发送验证码按钮")
return
# 第二次交互:输入验证码并登录
print("\n============================================")
code = input("请输入邮箱验证码,按回车登录: ").strip()
if not code:
print("验证码不能为空,退出")
return
code_input.fill(code)
# 点击登录
if login_submit_btn.count() > 0:
login_submit_btn.click()
print("已点击登录按钮,等待跳转...")
else:
print("未找到登录提交按钮")
return
# 等待登录成功跳转
try:
page.wait_for_url(lambda url: "login" not in url, timeout=30000)
print(f"\n登录成功!当前页面: {page.url}")
print(f"登录状态已保存到: {os.path.abspath(PROFILE_DIR)}")
_extract_and_save_cookies(context)
except Exception:
print("\n登录可能仍在处理中,或出现错误。请检查浏览器状态。")
# 保持浏览器打开(仅非无头模式)
if not headless:
print("\n按 Enter 键关闭浏览器并退出...")
input()
finally:
if context is not None:
try:
context.close()
except Exception:
pass
if __name__ == "__main__":

View File

@@ -6,19 +6,22 @@
import json
import os
import shutil
import sys
import tempfile
import time
from datetime import datetime
from cloakbrowser import launch_persistent_context
from ..config import get_profile_dir
from ..config import get_profile_dir, prepare_profile_dir
PROFILE_DIR = str(get_profile_dir())
API_LOG_FILE = "./api_requests.log.json"
def main():
logs = []
# 使用临时文件缓冲日志,避免长时间嗅探导致内存无限增长
log_buffer = tempfile.NamedTemporaryFile(mode="w+", suffix=".jsonl", delete=False, encoding="utf-8")
def log_request(request):
url = request.url
@@ -37,7 +40,8 @@ def main():
entry["body"] = request.post_data
except Exception:
pass
logs.append(entry)
log_buffer.write(json.dumps(entry, ensure_ascii=False) + "\n")
log_buffer.flush()
print(f"[REQ] {request.method} {url}")
def log_response(response):
@@ -62,7 +66,8 @@ def main():
entry["body_preview"] = text[:500]
except Exception as e:
entry["body_error"] = str(e)
logs.append(entry)
log_buffer.write(json.dumps(entry, ensure_ascii=False) + "\n")
log_buffer.flush()
print(f"[RES] {response.status} {url}")
if not os.path.exists(PROFILE_DIR):
@@ -70,27 +75,51 @@ def main():
print("请先运行 hunyuan3dweb-login 完成登录")
sys.exit(1)
context = launch_persistent_context(PROFILE_DIR, headless=False)
page = context.new_page()
profile_dir, temp_dir = prepare_profile_dir()
page.on("request", log_request)
page.on("response", log_response)
context = None
try:
context = launch_persistent_context(profile_dir, headless=False)
page = context.new_page()
print("正在打开腾讯混元3D并捕获 API...")
page.goto("https://3d.hunyuan.tencent.com/")
time.sleep(3)
page.on("request", log_request)
page.on("response", log_response)
# 检查是否已登录
login_btn = page.locator("button").filter(has_text="登录").first
if login_btn.count() > 0:
print("\n警告: 当前未检测到登录状态API 可能返回未授权")
else:
print("\n已检测到登录状态,开始捕获 API...")
print("正在打开腾讯混元3D并捕获 API...")
page.goto("https://3d.hunyuan.tencent.com/")
time.sleep(3)
print("\n你可以手动在浏览器中操作切换页面、生成3D等")
print("所有 API 请求会实时打印在终端中")
print("按 Enter 停止捕获并保存结果...\n")
input()
# 检查是否已登录
login_btn = page.locator("button").filter(has_text="登录").first
if login_btn.count() > 0:
print("\n警告: 当前未检测到登录状态API 可能返回未授权")
else:
print("\n已检测到登录状态,开始捕获 API...")
print("\n你可以手动在浏览器中操作切换页面、生成3D等")
print("所有 API 请求会实时打印在终端中")
print("按 Enter 停止捕获并保存结果...\n")
input()
finally:
if context is not None:
try:
context.close()
except Exception:
pass
if temp_dir is not None:
try:
shutil.rmtree(temp_dir)
except Exception:
pass
# 从临时文件读取并保存为 JSON 数组
try:
log_buffer.flush()
log_buffer.seek(0)
logs = [json.loads(line) for line in log_buffer if line.strip()]
finally:
log_buffer.close()
os.unlink(log_buffer.name)
# 保存日志
with open(API_LOG_FILE, "w", encoding="utf-8") as f:
@@ -99,8 +128,6 @@ def main():
print(f"\n共捕获 {len([l for l in logs if l['type'] == 'request'])} 个请求")
print(f"结果已保存到: {os.path.abspath(API_LOG_FILE)}")
context.close()
if __name__ == "__main__":
try:

View File

@@ -1,5 +1,8 @@
import os
import shutil
import tempfile
from pathlib import Path
from typing import Optional, Tuple
def get_config_dir() -> Path:
@@ -21,3 +24,21 @@ def get_profile_dir() -> Path:
path = get_config_dir() / "profile"
path.mkdir(parents=True, exist_ok=True)
return path
def prepare_profile_dir() -> Tuple[str, Optional[str]]:
"""
将标准 profile 复制到临时目录,避免 Chromium 锁冲突。
Returns:
(实际使用的 profile 路径, 临时目录路径或 None)。
如果使用了临时副本,调用者应在完成后用 shutil.rmtree 删除。
"""
standard = str(get_profile_dir())
if not os.path.exists(standard):
return standard, None
temp_root = tempfile.mkdtemp(prefix="hunyuan3dweb-profile-")
temp_profile = os.path.join(temp_root, "profile")
shutil.copytree(standard, temp_profile, dirs_exist_ok=True)
return temp_profile, temp_root