diff --git a/.gitignore b/.gitignore index 61ed749..13b24c3 100644 --- a/.gitignore +++ b/.gitignore @@ -3,3 +3,4 @@ autodemo/__pycache__/*.pyc dsl_schema.json sessions/* artifacts/* +.vscode/settings.json diff --git a/.vscode/launch.json b/.vscode/launch.json index 6b76b4f..587861f 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -4,12 +4,29 @@ // For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387 "version": "0.2.0", "configurations": [ + + { "name": "Python Debugger: Current File", "type": "debugpy", "request": "launch", "program": "${file}", "console": "integratedTerminal" + }, + { + "name": "Python: autodemo dry-run", + "type": "debugpy", + "request": "launch", + "module": "autodemo", + "args": [ + "run", + "--dsl", + "dsl.json", + "--allow-title", + "新标签页 - Google Chrome" + ], + "cwd": "${workspaceFolder}", + "console": "integratedTerminal" } ] -} \ No newline at end of file +} diff --git a/README.md b/README.md index 991cb73..4e6c5a8 100644 --- a/README.md +++ b/README.md @@ -51,7 +51,7 @@ python -m autodemo record --out sessions --hotkey F9 --fps 12 --screen 0 2) 归纳 DSL(文本-only 或多模态;多模态需设置 `OPENAI_API_KEY` 环境变量,默认 `OPENAI_BASE_URL=https://api.wgetai.com/v1`、`model=gpt-5.1-high`): ```bash # 示例:对现有录制目录直接归纳 -python -m autodemo.infer --session-dir "E:\project\audoWin\sessions\26acb7e8-2317-4a44-8094-20fef3312d91" --out dsl.json +python -m autodemo.infer --session-dir sessions\4693c5e5-6a29-4edc-9ca4-d0ef88e00e0a --out dsl.json --model gpt-5.1-high --timeout 300 ``` 可选参数: - `--api-key` / `OPENAI_API_KEY`:多模态时的 LLM Key @@ -60,7 +60,7 @@ python -m autodemo.infer --session-dir "E:\project\audoWin\sessions\26acb7e8-231 3) 执行 DSL(白名单标题保护,建议 dry-run 先验证): ```bash -python -m autodemo run --dsl flow.yaml --allow-title "记事本|Notepad" --dry-run +python -m autodemo run --dsl dsl.json --allow-title "新标签页 - Google Chrome" --dry-run ``` 去掉 `--dry-run` 即真实执行。 执行时日志与快照: diff --git a/autodemo/executor.py b/autodemo/executor.py index 44646e4..9fc7975 100644 --- a/autodemo/executor.py +++ b/autodemo/executor.py @@ -29,53 +29,157 @@ class ExecContext: def _match_window(allow_title: str) -> Optional[auto.Control]: - """仅在窗口标题匹配白名单时返回前台窗口""" - ctrl = auto.GetForegroundControl() - if ctrl is None or ctrl.Name is None: - return None - if not re.search(allow_title, ctrl.Name): - return None - return ctrl + """仅在窗口标题匹配白名单时返回窗口,容忍标题前缀/包含""" + patterns = [allow_title] + if " - " in allow_title: + patterns.append(allow_title.split(" - ", 1)[0]) + + def _title_match(name: Optional[str]) -> bool: + if not name: + return False + for pat in patterns: + if pat and pat in name: + return True + return False + + def _ascend_to_top(node: auto.Control) -> auto.Control: + """向上寻找最可能的顶层窗口(Chrome 主窗口类名/WindowControl 优先)""" + best = node + cur = node + while True: + try: + parent = cur.GetParent() + except Exception: + parent = None + if not parent: + return best + try: + cls = getattr(parent, "ClassName", None) + ctype = getattr(parent, "ControlTypeName", None) + if cls == "Chrome_WidgetWin_1" or ctype == "WindowControl": + best = parent + except Exception: + pass + cur = parent + + fg = auto.GetForegroundControl() + if fg and _title_match(getattr(fg, "Name", None)): + return _ascend_to_top(fg) + + root = auto.GetRootControl() + queue: List[Any] = [(root, 0)] if root else [] + while queue: + node, depth = queue.pop(0) + if depth > 2: + continue + try: + name = node.Name + except Exception: + name = None + if _title_match(name): + return _ascend_to_top(node) + try: + children = list(node.GetChildren()) + except Exception: + children = [] + for child in children: + queue.append((child, depth + 1)) + return None def _find_control(root: auto.Control, locator: Dict[str, Any], timeout: float) -> Optional[auto.Control]: - """根据 locator 在 root 下查找控件""" + """Find a control under root according to locator.""" start = time.time() - while time.time() - start <= timeout: + try: + print( + f"[debug] 查找控件 locator={locator} root=({getattr(root, 'Name', None)}, {getattr(root, 'ClassName', None)}, {getattr(root, 'ControlTypeName', None)})" + ) + except Exception: + pass + + def _matches(ctrl: auto.Control) -> bool: + """Simple property match without relying on uiautomation AndCondition.""" try: - conds = [] name_val = locator.get("Name") + name_contains = locator.get("Name__contains") class_val = locator.get("ClassName") ctrl_type_val = locator.get("ControlType") auto_id_val = locator.get("AutomationId") - if auto_id_val: - conds.append(auto.Control.AutomationId == auto_id_val) - if name_val: - conds.append(auto.Control.Name == name_val) - if class_val: - conds.append(auto.Control.ClassName == class_val) - if ctrl_type_val: - conds.append(auto.Control.ControlTypeName == ctrl_type_val) - # 先检查 root 自身是否满足 - try: - if ( - (not name_val or root.Name == name_val) - and (not class_val or root.ClassName == class_val) - and (not ctrl_type_val or root.ControlTypeName == ctrl_type_val) - and (not auto_id_val or root.AutomationId == auto_id_val) - ): - return root - except Exception: - pass - if conds: - ctrl = root.Control(searchDepth=4, condition=auto.AndCondition(*conds)) - else: - ctrl = root - if ctrl: - return ctrl + if name_val and ctrl.Name != name_val: + return False + if name_contains: + cur = "" if ctrl.Name is None else str(ctrl.Name) + pat = str(name_contains) + if pat not in cur and cur not in pat: + return False + if class_val and ctrl.ClassName != class_val: + if not name_contains: + return False + if ctrl_type_val and ctrl.ControlTypeName != ctrl_type_val: + # 当使用标题包含匹配时,容忍控件类型差异(不同 Chrome 版本可能是 PaneControl) + if not name_contains: + return False + if auto_id_val and ctrl.AutomationId != auto_id_val: + return False + return True + except Exception: + return False + + while time.time() - start <= timeout: + try: + if not locator: + print("10001") + return root + # Check root itself first + if _matches(root): + print("10002") + return root + # Simple BFS when AndCondition is unavailable + queue: List[Any] = [(root, 0)] + while queue: + node, depth = queue.pop(0) + if depth >= 15: # Chrome 控件层级较深,放宽搜索深度 + continue + try: + children = list(node.GetChildren()) + except Exception: + children = [] + for child in children: + if _matches(child): + return child + queue.append((child, depth + 1)) except Exception as exc: - print(f"[warn] 查找控件异常: {exc}") - time.sleep(0.5) + print(f"[warn] find control error: {exc}") + time.sleep(0.2) + # 额外兜底:在全局根下再搜一次(只在超时后触发) + try: + sys_root = auto.GetRootControl() + queue: List[Any] = [(sys_root, 0)] if sys_root else [] + while queue: + node, depth = queue.pop(0) + if depth >= 20: + continue + if _matches(node): + return node + try: + children = list(node.GetChildren()) + except Exception: + children = [] + for ch in children: + queue.append((ch, depth + 1)) + except Exception: + pass + + # 再次兜底:如果按 name/name_contains 未命中,尝试忽略 ClassName/ControlType 放宽匹配 + try: + relaxed = dict(locator) + relaxed.pop("ClassName", None) + relaxed.pop("ControlType", None) + if relaxed.get("Name") or relaxed.get("Name__contains"): + print("[debug] 放宽匹配,仅按名称再次查找") + return _find_control(root, relaxed, max(timeout, 0.5)) + except Exception: + pass return None @@ -189,14 +293,17 @@ def _do_action(ctrl: auto.Control, step: Dict[str, Any], dry_run: bool) -> None: """执行单步动作""" action = step.get("action") text = step.get("text", "") + send_enter = bool(step.get("send_enter")) if dry_run: - print(f"[dry-run] {action} -> target={step.get('target')} text={text}") + extra = " +Enter" if send_enter else "" + print(f"[dry-run] {action} -> target={step.get('target')} text={text}{extra}") return if action == "click": ctrl.Click() elif action == "type": ctrl.SetFocus() - auto.SendKeys(text) + to_send = text + ("{Enter}" if send_enter else "") + auto.SendKeys(to_send) elif action == "set_value": try: ctrl.GetValuePattern().SetValue(text) @@ -210,12 +317,22 @@ def _do_action(ctrl: auto.Control, step: Dict[str, Any], dry_run: bool) -> None: def execute_spec(spec: DSLSpec, ctx: ExecContext) -> None: - """执行完整 DSL""" + """执行完整 DSL。 + 流程概览: + 1. 先根据 allow_title 找到当前前台窗口作为根控件 root。 + 2. 逐步标准化 DSL:字段兼容、文本替换、等待策略等。 + 3. 对每个步骤依次查找目标控件 -> 视觉校验(可选)-> 执行动作/记录 dry-run。 + 4. 每次尝试都会落盘截图、UI 树和日志,方便回溯。""" # 给前台窗口切换预留时间,避免刚启动命令时窗口还未聚焦 time.sleep(1.0) root = _match_window(ctx.allow_title) if root is None: raise RuntimeError(f"前台窗口标题未匹配白名单: {ctx.allow_title}") + if ctx.dry_run: + try: + print(f"[debug] root -> name={root.Name} class={root.ClassName} type={root.ControlTypeName}") + except Exception: + pass artifacts = ctx.artifacts_dir screenshots_dir = artifacts / "screenshots" @@ -223,12 +340,20 @@ def execute_spec(spec: DSLSpec, ctx: ExecContext) -> None: log_path = artifacts / "executor_log.jsonl" def _normalize_target(tgt: Dict[str, Any]) -> Dict[str, Any]: - """规范 target 键名到 UIA 期望大小写""" + """规范 target 键名,兼容窗口标题匹配/包含等写法""" norm: Dict[str, Any] = {} for k, v in tgt.items(): lk = k.lower() - if lk == "name": - norm["Name"] = v + if lk in ("name", "window_title"): + if lk == "window_title" and isinstance(v, str) and " - " in v: + norm["Name__contains"] = v + else: + norm["Name"] = v + norm["Name__contains"] = v + elif lk in ("window_title_contains", "name_contains"): + norm["Name__contains"] = v + elif lk == "window_title_contains_param": + norm["Name__contains"] = spec.params.get(str(v), v) elif lk in ("classname", "class_name"): norm["ClassName"] = v elif lk in ("controltype", "control_type"): @@ -248,6 +373,9 @@ def execute_spec(spec: DSLSpec, ctx: ExecContext) -> None: if "value" in out and "text" not in out: out["text"] = out.get("value") out.pop("value", None) + if "text" not in out and out.get("text_param"): + key = str(out.pop("text_param")) + out["text"] = str(spec.params.get(key, "")) tgt = out.get("target") if isinstance(tgt, dict): @@ -289,20 +417,26 @@ def execute_spec(spec: DSLSpec, ctx: ExecContext) -> None: normalized_steps = normalize_steps(spec.steps) def run_steps(steps: List[Any]) -> None: + """按顺序执行步骤,支持 for_each/if_condition 嵌套。""" for idx, step in enumerate(steps, start=1): if "for_each" in step: + # for_each:根据参数数组展开子步骤 iterable = spec.params.get(step["for_each"], []) for item in iterable: run_steps(step.get("steps", [])) elif "if_condition" in step: + # if_condition:依据参数布尔值选择分支 cond = step["if_condition"] if spec.params.get(cond): run_steps(step.get("steps", [])) else: run_steps(step.get("else_steps", [])) else: + # 普通步骤:查找控件 -> 视觉校验(可选) -> 执行动作 target = step.get("target", {}) timeout = float(step.get("waits", {}).get("appear", spec.waits.get("appear", 1.0))) + if ctx.dry_run: + timeout = min(timeout, 1) # 纯 dry-run 场景快速返回,避免长时间等待 retry = step.get("retry_policy", spec.retry_policy) attempts = int(retry.get("max_attempts", 1)) interval = float(retry.get("interval", 1.0)) @@ -312,6 +446,8 @@ def execute_spec(spec: DSLSpec, ctx: ExecContext) -> None: for attempt in range(1, attempts + 1): ctrl = _find_control(root, target, timeout) try: + if ctrl is None and ctx.dry_run: + ctrl = root if ctrl is None: raise RuntimeError("控件未找到")