diff --git a/.gitignore b/.gitignore index 1d70a47..61ed749 100644 --- a/.gitignore +++ b/.gitignore @@ -2,3 +2,4 @@ dsl.json autodemo/__pycache__/*.pyc dsl_schema.json sessions/* +artifacts/* diff --git a/.vscode/launch.json b/.vscode/launch.json new file mode 100644 index 0000000..6b76b4f --- /dev/null +++ b/.vscode/launch.json @@ -0,0 +1,15 @@ +{ + // Use IntelliSense to learn about possible attributes. + // Hover to view descriptions of existing attributes. + // For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387 + "version": "0.2.0", + "configurations": [ + { + "name": "Python Debugger: Current File", + "type": "debugpy", + "request": "launch", + "program": "${file}", + "console": "integratedTerminal" + } + ] +} \ No newline at end of file diff --git a/README.md b/README.md index e218cdf..991cb73 100644 --- a/README.md +++ b/README.md @@ -63,6 +63,20 @@ python -m autodemo.infer --session-dir "E:\project\audoWin\sessions\26acb7e8-231 python -m autodemo run --dsl flow.yaml --allow-title "记事本|Notepad" --dry-run ``` 去掉 `--dry-run` 即真实执行。 +执行时日志与快照: +- 执行器会将每一步的结果写入 `artifacts/executor_log.jsonl`,包含时间、动作、目标、尝试次数、成功/失败、错误等。 +- 成功/失败都会保存截图到 `artifacts/screenshots/`,并保存 UIA 树摘要到 `artifacts/ui_trees/`,便于排查。 +可选视觉校验(模板匹配): +- 在 DSL step 中添加 `expected_screen`,例如: +```yaml +steps: + - action: click + target: {Name: "确定", ControlType: "Button"} + expected_screen: + template_path: templates/ok_button.png + threshold: 0.8 +``` +执行器会在 UIA 定位后先做模板匹配,未通过则按重试策略重试或报错。 ### 参数覆盖示例 ```bash diff --git a/autodemo/executor.py b/autodemo/executor.py index b85710b..44646e4 100644 --- a/autodemo/executor.py +++ b/autodemo/executor.py @@ -1,12 +1,19 @@ # MIT License # Copyright (c) 2024 -"""执行层:根据 DSL 进行 UI 自动化。""" +"""执行层:基于 DSL 进行 UI 自动化,并支持可选视觉校验与结构化日志""" +from __future__ import annotations + +import json import re import time from dataclasses import dataclass +from pathlib import Path from typing import Any, Dict, List, Optional +import cv2 # type: ignore +import mss # type: ignore +import numpy as np # type: ignore import uiautomation as auto # type: ignore from .schema import DSLSpec @@ -14,18 +21,17 @@ from .schema import DSLSpec @dataclass class ExecContext: - """执行上下文。""" + """执行上下文""" allow_title: str dry_run: bool = False + artifacts_dir: Path = Path("artifacts") def _match_window(allow_title: str) -> Optional[auto.Control]: - """仅在窗口标题匹配白名单时返回前台窗口。""" + """仅在窗口标题匹配白名单时返回前台窗口""" ctrl = auto.GetForegroundControl() - if ctrl is None: - return None - if ctrl.Name is None: + if ctrl is None or ctrl.Name is None: return None if not re.search(allow_title, ctrl.Name): return None @@ -33,33 +39,154 @@ def _match_window(allow_title: str) -> Optional[auto.Control]: def _find_control(root: auto.Control, locator: Dict[str, Any], timeout: float) -> Optional[auto.Control]: - """根据 locator 在 root 下查找控件。""" + """根据 locator 在 root 下查找控件""" start = time.time() while time.time() - start <= timeout: try: conds = [] - if "AutomationId" in locator: - conds.append(auto.Control.AutomationId == locator["AutomationId"]) - if "Name" in locator: - conds.append(auto.Control.Name == locator["Name"]) - if "ClassName" in locator: - conds.append(auto.Control.ClassName == locator["ClassName"]) - if "ControlType" in locator: - conds.append(auto.Control.ControlTypeName == locator["ControlType"]) + name_val = locator.get("Name") + class_val = locator.get("ClassName") + ctrl_type_val = locator.get("ControlType") + auto_id_val = locator.get("AutomationId") + if auto_id_val: + conds.append(auto.Control.AutomationId == auto_id_val) + if name_val: + conds.append(auto.Control.Name == name_val) + if class_val: + conds.append(auto.Control.ClassName == class_val) + if ctrl_type_val: + conds.append(auto.Control.ControlTypeName == ctrl_type_val) + # 先检查 root 自身是否满足 + try: + if ( + (not name_val or root.Name == name_val) + and (not class_val or root.ClassName == class_val) + and (not ctrl_type_val or root.ControlTypeName == ctrl_type_val) + and (not auto_id_val or root.AutomationId == auto_id_val) + ): + return root + except Exception: + pass if conds: ctrl = root.Control(searchDepth=4, condition=auto.AndCondition(*conds)) else: ctrl = root if ctrl: return ctrl - except Exception: - pass + except Exception as exc: + print(f"[warn] 查找控件异常: {exc}") time.sleep(0.5) return None +def _capture_screenshot(ctrl: Optional[auto.Control], out_path: Path) -> Optional[Path]: + """截取控件区域或全屏""" + try: + with mss.mss() as sct: + if ctrl and getattr(ctrl, "BoundingRectangle", None): + rect = ctrl.BoundingRectangle + region = {"left": int(rect.left), "top": int(rect.top), "width": int(rect.right - rect.left), "height": int(rect.bottom - rect.top)} + else: + monitor = sct.monitors[1] if len(sct.monitors) > 1 else sct.monitors[0] + region = {"left": monitor["left"], "top": monitor["top"], "width": monitor["width"], "height": monitor["height"]} + shot = np.array(sct.grab(region)) + frame = cv2.cvtColor(shot, cv2.COLOR_BGRA2BGR) + out_path.parent.mkdir(parents=True, exist_ok=True) + cv2.imwrite(str(out_path), frame) + return out_path + except Exception: + return None + + +def _capture_tree(ctrl: Optional[auto.Control], max_depth: int = 3) -> List[Dict[str, Any]]: + """采集浅层 UIA 树摘要""" + if ctrl is None: + return [] + nodes: List[Dict[str, Any]] = [] + queue: List[Any] = [(ctrl, 0)] + while queue: + node, depth = queue.pop(0) + if depth > max_depth: + continue + nodes.append( + { + "name": node.Name, + "automation_id": node.AutomationId, + "class_name": node.ClassName, + "control_type": node.ControlTypeName, + "depth": depth, + } + ) + try: + children = list(node.GetChildren()) + except Exception: + children = [] + for child in children: + queue.append((child, depth + 1)) + return nodes + + +def _save_tree(ctrl: Optional[auto.Control], out_path: Path) -> Optional[Path]: + try: + data = _capture_tree(ctrl) + out_path.parent.mkdir(parents=True, exist_ok=True) + out_path.write_text(json.dumps(data, ensure_ascii=False, indent=2), encoding="utf-8") + return out_path + except Exception: + return None + + +def _image_similarity(full_img_path: Path, template_path: Path, threshold: float = 0.8) -> bool: + """简单模板匹配,相似度 >= 阈值视为通过""" + if not full_img_path.exists() or not template_path.exists(): + return False + full = cv2.imread(str(full_img_path), cv2.IMREAD_COLOR) + tmpl = cv2.imread(str(template_path), cv2.IMREAD_COLOR) + if full is None or tmpl is None or full.shape[0] < tmpl.shape[0] or full.shape[1] < tmpl.shape[1]: + return False + res = cv2.matchTemplate(full, tmpl, cv2.TM_CCOEFF_NORMED) + _, max_val, _, _ = cv2.minMaxLoc(res) + return float(max_val) >= threshold + + +def _visual_check(expected: Dict[str, Any], ctrl: Optional[auto.Control], artifacts_dir: Path, step_idx: int, attempt: int) -> bool: + """执行可选视觉校验:模板匹配""" + template_path = expected.get("template_path") + threshold = float(expected.get("threshold", 0.8)) + if not template_path: + return True + snap_path = artifacts_dir / "screenshots" / f"step{step_idx:03d}_attempt{attempt}_visual.png" + snap = _capture_screenshot(ctrl, snap_path) + if not snap: + return False + return _image_similarity(snap, Path(template_path), threshold) + + +def _log_event(log_path: Path, record: Dict[str, Any]) -> None: + log_path.parent.mkdir(parents=True, exist_ok=True) + with log_path.open("a", encoding="utf-8") as f: + f.write(json.dumps(record, ensure_ascii=False)) + f.write("\n") + + +def _render_value(val: Any, params: Dict[str, Any]) -> Any: + """简单占位符替换 ${param}""" + if isinstance(val, str): + out = val + for k, v in params.items(): + placeholder = f"${{{k}}}" + if placeholder in out: + out = out.replace(placeholder, str(v)) + return out + if isinstance(val, dict): + return {k: _render_value(v, params) for k, v in val.items()} + if isinstance(val, list): + return [_render_value(v, params) for v in val] + return val + + def _do_action(ctrl: auto.Control, step: Dict[str, Any], dry_run: bool) -> None: - """执行单步动作。""" + """执行单步动作""" action = step.get("action") text = step.get("text", "") if dry_run: @@ -76,22 +203,94 @@ def _do_action(ctrl: auto.Control, step: Dict[str, Any], dry_run: bool) -> None: except Exception: ctrl.SendKeys(text) elif action == "assert_exists": - assert ctrl is not None, "控件未找到" + if ctrl is None: + raise RuntimeError("控件未找到") elif action == "wait_for": - # wait_for 仅等待存在 time.sleep(float(step.get("waits", {}).get("appear", 1.0))) def execute_spec(spec: DSLSpec, ctx: ExecContext) -> None: - """执行完整的 DSL。""" + """执行完整 DSL""" + # 给前台窗口切换预留时间,避免刚启动命令时窗口还未聚焦 + time.sleep(1.0) root = _match_window(ctx.allow_title) if root is None: raise RuntimeError(f"前台窗口标题未匹配白名单: {ctx.allow_title}") + artifacts = ctx.artifacts_dir + screenshots_dir = artifacts / "screenshots" + trees_dir = artifacts / "ui_trees" + log_path = artifacts / "executor_log.jsonl" + + def _normalize_target(tgt: Dict[str, Any]) -> Dict[str, Any]: + """规范 target 键名到 UIA 期望大小写""" + norm: Dict[str, Any] = {} + for k, v in tgt.items(): + lk = k.lower() + if lk == "name": + norm["Name"] = v + elif lk in ("classname", "class_name"): + norm["ClassName"] = v + elif lk in ("controltype", "control_type"): + norm["ControlType"] = v + elif lk == "automationid": + norm["AutomationId"] = v + else: + norm[k] = v + return norm + + def normalize_step(step: Dict[str, Any]) -> Dict[str, Any]: + """归一化字段,兼容不同 DSL 变体""" + out = _render_value(dict(step), spec.params) + if "target" not in out and "selector" in out: + out["target"] = out.get("selector") + out.pop("selector", None) + if "value" in out and "text" not in out: + out["text"] = out.get("value") + out.pop("value", None) + + tgt = out.get("target") + if isinstance(tgt, dict): + out["target"] = _normalize_target(tgt) + + waits_obj = out.get("waits") + if isinstance(waits_obj, list): + appear = None + for w in waits_obj: + if isinstance(w, dict) and "timeout_ms" in w: + appear = float(w.get("timeout_ms", 0)) / 1000.0 + break + out["waits"] = {"appear": appear or spec.waits.get("appear", 5.0), "disappear": spec.waits.get("disappear", 1.0)} + elif isinstance(waits_obj, dict): + waits_obj = dict(waits_obj) + if "timeout_ms" in waits_obj and "appear" not in waits_obj: + waits_obj["appear"] = float(waits_obj.pop("timeout_ms")) / 1000.0 + out["waits"] = waits_obj + else: + out["waits"] = spec.waits + + if "timeout_ms" in out: + out.setdefault("waits", {}) + out["waits"]["appear"] = float(out.pop("timeout_ms")) / 1000.0 + return out + + def normalize_steps(steps: List[Any]) -> List[Any]: + normed: List[Any] = [] + for st in steps: + if isinstance(st, dict): + st = normalize_step(st) + if "steps" in st and isinstance(st["steps"], list): + st["steps"] = normalize_steps(st["steps"]) + if "else_steps" in st and isinstance(st["else_steps"], list): + st["else_steps"] = normalize_steps(st["else_steps"]) + normed.append(st) + return normed + + normalized_steps = normalize_steps(spec.steps) + def run_steps(steps: List[Any]) -> None: - for step in steps: + for idx, step in enumerate(steps, start=1): if "for_each" in step: - # 简单遍历列表参数 iterable = spec.params.get(step["for_each"], []) for item in iterable: run_steps(step.get("steps", [])) @@ -103,23 +302,64 @@ def execute_spec(spec: DSLSpec, ctx: ExecContext) -> None: run_steps(step.get("else_steps", [])) else: target = step.get("target", {}) - timeout = float(step.get("waits", {}).get("appear", spec.waits.get("appear", 5.0))) + timeout = float(step.get("waits", {}).get("appear", spec.waits.get("appear", 1.0))) retry = step.get("retry_policy", spec.retry_policy) attempts = int(retry.get("max_attempts", 1)) interval = float(retry.get("interval", 1.0)) + expected = step.get("expected_screen") or {} last_err: Optional[Exception] = None - for _ in range(attempts): + + for attempt in range(1, attempts + 1): ctrl = _find_control(root, target, timeout) try: if ctrl is None: raise RuntimeError("控件未找到") + + # 视觉校验(可选) + if expected: + ok = _visual_check(expected, ctrl, artifacts, idx, attempt) + if not ok: + raise RuntimeError("视觉校验未通过") + _do_action(ctrl, step, ctx.dry_run) + snap_path = _capture_screenshot(ctrl, screenshots_dir / f"step{idx:03d}_attempt{attempt}_success.png") + tree_path = _save_tree(ctrl, trees_dir / f"step{idx:03d}_attempt{attempt}_tree.json") + _log_event( + log_path, + { + "ts": time.time(), + "step_index": idx, + "action": step.get("action"), + "target": target, + "attempt": attempt, + "result": "success", + "screenshot": str(snap_path) if snap_path else None, + "tree": str(tree_path) if tree_path else None, + }, + ) last_err = None break except Exception as e: # noqa: BLE001 last_err = e - time.sleep(interval) + snap_path = _capture_screenshot(ctrl, screenshots_dir / f"step{idx:03d}_attempt{attempt}_fail.png") + tree_path = _save_tree(ctrl, trees_dir / f"step{idx:03d}_attempt{attempt}_tree.json") + _log_event( + log_path, + { + "ts": time.time(), + "step_index": idx, + "action": step.get("action"), + "target": target, + "attempt": attempt, + "result": "fail", + "error": str(e), + "screenshot": str(snap_path) if snap_path else None, + "tree": str(tree_path) if tree_path else None, + }, + ) + if attempt < attempts: + time.sleep(interval) if last_err: raise last_err - run_steps(spec.steps) + run_steps(normalized_steps)