diff --git a/.env b/.env new file mode 100644 index 0000000..b1a2f53 --- /dev/null +++ b/.env @@ -0,0 +1,4 @@ +# 环境变量示例,复制为 .env 使用 +# OpenAI 兼容接口的 API Key 与 Base URL(若不需要多模态,可留空) +OPENAI_API_KEY=sk-22WA5NxNePfQIr6ArU3oqO75IrsZNTTakqp1ImZO0uKhhJoy +OPENAI_BASE_URL=https://api.wgetai.com/v1 diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..1d70a47 --- /dev/null +++ b/.gitignore @@ -0,0 +1,4 @@ +dsl.json +autodemo/__pycache__/*.pyc +dsl_schema.json +sessions/* diff --git a/@AutomationLog.txt b/@AutomationLog.txt new file mode 100644 index 0000000..1b2d666 --- /dev/null +++ b/@AutomationLog.txt @@ -0,0 +1,5 @@ + +[WinError -2147221008] 尚未调用 CoInitialize。 +Can not load UIAutomationCore.dll. +1, You may need to install Windows Update KB971513 if your OS is Windows XP, see https://github.com/yinkaisheng/WindowsUpdateKB971513ForIUIAutomation +2, You need to use an UIAutomationInitializerInThread object if use uiautomation in a thread, see demos/uiautomation_in_thread.py diff --git a/autodemo/__init__.py b/autodemo/__init__.py new file mode 100644 index 0000000..2034319 --- /dev/null +++ b/autodemo/__init__.py @@ -0,0 +1,11 @@ +# MIT License +# Copyright (c) 2024 +"""轻量级示教式自动化原型。""" + +__all__ = [ + "schema", + "recorder", + "llm", + "dsl", + "executor", +] diff --git a/autodemo/__main__.py b/autodemo/__main__.py new file mode 100644 index 0000000..f33823b --- /dev/null +++ b/autodemo/__main__.py @@ -0,0 +1,8 @@ +# MIT License +# Copyright (c) 2024 +"""允许 python -m autodemo 运行 CLI。""" + +from .cli import main + +if __name__ == "__main__": + main() diff --git a/autodemo/cli.py b/autodemo/cli.py new file mode 100644 index 0000000..c9d7ffd --- /dev/null +++ b/autodemo/cli.py @@ -0,0 +1,91 @@ +# MIT License +# Copyright (c) 2024 +"""Command line entry point.""" + +from __future__ import annotations + +import argparse +import json +from pathlib import Path + +from .dsl import load_dsl, save_dsl +from .executor import ExecContext, execute_spec +from .llm import DummyLLM, LLMClient +from .recorder import Recorder +from .schema import EventRecord + + +def cmd_record(args: argparse.Namespace) -> None: + """Start multimodal recording.""" + rec = Recorder(Path(args.out), hotkey=args.hotkey, fps=args.fps, screen=args.screen) + print(f"Recording... press {args.hotkey} to stop.") + session_dir = rec.start() + print(f"Session saved to: {session_dir}") + + +def _load_events(path: Path) -> list[EventRecord]: + events = [] + with path.open("r", encoding="utf-8") as f: + for line in f: + if not line.strip(): + continue + events.append(EventRecord.parse_obj(json.loads(line))) + return events + + +def cmd_infer(args: argparse.Namespace) -> None: + """Infer DSL from recorded events.""" + events = _load_events(Path(args.session)) + client: LLMClient = DummyLLM() + spec = client.generate(events) + out_path = Path(args.output) + save_dsl(spec, out_path) + print(f"DSL saved to {out_path}") + + +def cmd_run(args: argparse.Namespace) -> None: + """Execute DSL.""" + spec = load_dsl(Path(args.dsl)) + if args.params: + spec.params.update(json.loads(args.params)) + ctx = ExecContext(allow_title=args.allow_title, dry_run=args.dry_run) + execute_spec(spec, ctx) + print("Done") + + +def build_parser() -> argparse.ArgumentParser: + """Build CLI parser.""" + parser = argparse.ArgumentParser(description="示教式自动化原型") + sub = parser.add_subparsers(dest="command", required=True) + + p_rec = sub.add_parser("record", help="开始录制") + p_rec.add_argument("--out", type=str, default="sessions", help="输出目录") + p_rec.add_argument("--hotkey", type=str, default="F9", help="停止录制的热键") + p_rec.add_argument("--fps", type=int, default=12, help="录屏帧率") + p_rec.add_argument("--screen", type=int, default=0, help="屏幕编号,默认主屏") + p_rec.set_defaults(func=cmd_record) + + p_inf = sub.add_parser("infer", help="LLM 归纳生成 DSL") + p_inf.add_argument("--session", type=str, required=True, help="events.jsonl 文件") + p_inf.add_argument("--output", type=str, default="flow.yaml", help="输出 DSL 路径") + p_inf.set_defaults(func=cmd_infer) + + p_run = sub.add_parser("run", help="执行 DSL") + p_run.add_argument("--dsl", type=str, required=True, help="DSL YAML 文件") + p_run.add_argument("--params", type=str, help="JSON 参数覆盖") + p_run.add_argument("--allow-title", type=str, default="记事本|Notepad", help="允许的窗口标题正则") + p_run.add_argument("--dry-run", action="store_true", help="仅打印动作不执行") + p_run.set_defaults(func=cmd_run) + + return parser + + +def main() -> None: + """Entrypoint.""" + parser = build_parser() + args = parser.parse_args() + args.func(args) + + +if __name__ == "__main__": + main() diff --git a/autodemo/dsl.py b/autodemo/dsl.py new file mode 100644 index 0000000..eb7ba1d --- /dev/null +++ b/autodemo/dsl.py @@ -0,0 +1,24 @@ +# MIT License +# Copyright (c) 2024 +"""DSL 的加载与保存。""" + +from pathlib import Path +from typing import Any, Dict + +import yaml + +from .schema import DSLSpec + + +def save_dsl(spec: DSLSpec, path: Path) -> None: + """保存 DSL 为 YAML。""" + with path.open("w", encoding="utf-8") as f: + yaml.safe_dump(spec.dict(), f, allow_unicode=True, sort_keys=False) + + +def load_dsl(path: Path) -> DSLSpec: + """从 YAML 读取 DSL。""" + with path.open("r", encoding="utf-8") as f: + data: Dict[str, Any] = yaml.safe_load(f) + return DSLSpec.parse_obj(data) + diff --git a/autodemo/executor.py b/autodemo/executor.py new file mode 100644 index 0000000..b85710b --- /dev/null +++ b/autodemo/executor.py @@ -0,0 +1,125 @@ +# MIT License +# Copyright (c) 2024 +"""执行层:根据 DSL 进行 UI 自动化。""" + +import re +import time +from dataclasses import dataclass +from typing import Any, Dict, List, Optional + +import uiautomation as auto # type: ignore + +from .schema import DSLSpec + + +@dataclass +class ExecContext: + """执行上下文。""" + + allow_title: str + dry_run: bool = False + + +def _match_window(allow_title: str) -> Optional[auto.Control]: + """仅在窗口标题匹配白名单时返回前台窗口。""" + ctrl = auto.GetForegroundControl() + if ctrl is None: + return None + if ctrl.Name is None: + return None + if not re.search(allow_title, ctrl.Name): + return None + return ctrl + + +def _find_control(root: auto.Control, locator: Dict[str, Any], timeout: float) -> Optional[auto.Control]: + """根据 locator 在 root 下查找控件。""" + start = time.time() + while time.time() - start <= timeout: + try: + conds = [] + if "AutomationId" in locator: + conds.append(auto.Control.AutomationId == locator["AutomationId"]) + if "Name" in locator: + conds.append(auto.Control.Name == locator["Name"]) + if "ClassName" in locator: + conds.append(auto.Control.ClassName == locator["ClassName"]) + if "ControlType" in locator: + conds.append(auto.Control.ControlTypeName == locator["ControlType"]) + if conds: + ctrl = root.Control(searchDepth=4, condition=auto.AndCondition(*conds)) + else: + ctrl = root + if ctrl: + return ctrl + except Exception: + pass + time.sleep(0.5) + return None + + +def _do_action(ctrl: auto.Control, step: Dict[str, Any], dry_run: bool) -> None: + """执行单步动作。""" + action = step.get("action") + text = step.get("text", "") + if dry_run: + print(f"[dry-run] {action} -> target={step.get('target')} text={text}") + return + if action == "click": + ctrl.Click() + elif action == "type": + ctrl.SetFocus() + auto.SendKeys(text) + elif action == "set_value": + try: + ctrl.GetValuePattern().SetValue(text) + except Exception: + ctrl.SendKeys(text) + elif action == "assert_exists": + assert ctrl is not None, "控件未找到" + elif action == "wait_for": + # wait_for 仅等待存在 + time.sleep(float(step.get("waits", {}).get("appear", 1.0))) + + +def execute_spec(spec: DSLSpec, ctx: ExecContext) -> None: + """执行完整的 DSL。""" + root = _match_window(ctx.allow_title) + if root is None: + raise RuntimeError(f"前台窗口标题未匹配白名单: {ctx.allow_title}") + + def run_steps(steps: List[Any]) -> None: + for step in steps: + if "for_each" in step: + # 简单遍历列表参数 + iterable = spec.params.get(step["for_each"], []) + for item in iterable: + run_steps(step.get("steps", [])) + elif "if_condition" in step: + cond = step["if_condition"] + if spec.params.get(cond): + run_steps(step.get("steps", [])) + else: + run_steps(step.get("else_steps", [])) + else: + target = step.get("target", {}) + timeout = float(step.get("waits", {}).get("appear", spec.waits.get("appear", 5.0))) + retry = step.get("retry_policy", spec.retry_policy) + attempts = int(retry.get("max_attempts", 1)) + interval = float(retry.get("interval", 1.0)) + last_err: Optional[Exception] = None + for _ in range(attempts): + ctrl = _find_control(root, target, timeout) + try: + if ctrl is None: + raise RuntimeError("控件未找到") + _do_action(ctrl, step, ctx.dry_run) + last_err = None + break + except Exception as e: # noqa: BLE001 + last_err = e + time.sleep(interval) + if last_err: + raise last_err + + run_steps(spec.steps) diff --git a/autodemo/infer.py b/autodemo/infer.py new file mode 100644 index 0000000..09d1379 --- /dev/null +++ b/autodemo/infer.py @@ -0,0 +1,396 @@ +# MIT License +# Copyright (c) 2024 +"""多模态归纳:读取 session 目录,组装提示,调用 LLM,生成 DSL""" + +from __future__ import annotations + +import argparse +import base64 +import json +import os +from pathlib import Path +from typing import Any, Dict, List, Optional + +import requests # type: ignore + +try: + # 优先使用 python-dotenv,缺失则退回手动解析 + from dotenv import load_dotenv # type: ignore +except Exception: + load_dotenv = None + +from .prompt_templates import SYSTEM_PROMPT, render_user_prompt +from .schema import DSLSpec, EventRecord, FramePaths, UISnapshot, UISelector + + +# --------- Pydantic v1/v2 兼容辅助 --------- +def _model_validate(cls, data: Any) -> Any: + if hasattr(cls, "model_validate"): + return cls.model_validate(data) # type: ignore[attr-defined] + return cls.parse_obj(data) # type: ignore[attr-defined] + + +def _model_dump(obj: Any, **kwargs: Any) -> Dict[str, Any]: + if hasattr(obj, "model_dump"): + return obj.model_dump(**kwargs) # type: ignore[attr-defined] + return obj.dict(**kwargs) # type: ignore[attr-defined] + + +def _load_env_file() -> None: + """加载项目根目录的 .env,优先使用 python-dotenv,缺失则手工解析""" + env_path = Path(__file__).resolve().parent.parent / ".env" + if load_dotenv: + load_dotenv(env_path) + return + if not env_path.exists(): + return + for line in env_path.read_text(encoding="utf-8").splitlines(): + line = line.strip() + if not line or line.startswith("#") or "=" not in line: + continue + key, val = line.split("=", 1) + os.environ.setdefault(key.strip(), val.strip()) + + +def _coerce_assertions(spec_dict: Dict[str, Any]) -> Dict[str, Any]: + """将 assertions 内的非字符串条目转换为字符串,防止验证失败""" + assertions = spec_dict.get("assertions") + if isinstance(assertions, list): + new_items = [] + for item in assertions: + if isinstance(item, str): + new_items.append(item) + else: + try: + new_items.append(json.dumps(item, ensure_ascii=False)) + except Exception: + new_items.append(str(item)) + spec_dict["assertions"] = new_items + return spec_dict + + +def _strip_code_fences(text: str) -> str: + """去除 ```json ... ``` 或 ``` ... ``` 包裹""" + stripped = text.strip() + if stripped.startswith("```"): + parts = stripped.split("```") + if len(parts) >= 3: + return parts[1].lstrip("json").strip() if parts[1].startswith("json") else parts[1].strip() + return stripped + + +def _normalize_steps(spec_dict: Dict[str, Any]) -> Dict[str, Any]: + """规范化 steps 字段到 schema 支持的动作/字段""" + steps = spec_dict.get("steps") + if not isinstance(steps, list): + return spec_dict + normalized = [] + for step in steps: + if not isinstance(step, dict): + continue + # 将 selector -> target + if "target" not in step and "selector" in step: + step["target"] = step["selector"] + step.pop("selector", None) + + action = step.get("action") + # value -> text 归一化,兼容 set_value/type + if "value" in step and "text" not in step: + step["text"] = step.get("value") + step.pop("value", None) + + # 处理 wait_for_window 自定义动作 + if action == "wait_for_window": + title = step.pop("window_title_part", None) + timeout = step.pop("timeout", None) + step["action"] = "wait_for" + step["target"] = step.get("target") or {} + if title: + step["target"].setdefault("Name", title) + step["target"].setdefault("ControlType", "WindowControl") + if timeout: + secs = float(timeout) / 1000.0 + step["waits"] = {"appear": secs, "disappear": 5.0} + # 若 action 不在允许列表,降级为 assert_exists + if step.get("action") not in {"click", "type", "set_value", "assert_exists", "wait_for"}: + step["action"] = "assert_exists" + + # 标准化 ControlType 命名 + tgt = step.get("target", {}) + if isinstance(tgt, dict) and tgt.get("ControlType") == "Window": + tgt["ControlType"] = "WindowControl" + normalized.append(step) + spec_dict["steps"] = normalized + return spec_dict + + +# ---------------- LLM 抽象 ---------------- +class LLMClient: + """LLM 抽象接口""" + + def generate(self, system_prompt: str, user_prompt: str, images: Optional[List[Dict[str, Any]]] = None) -> str: + raise NotImplementedError + + +class DummyLLM(LLMClient): + """纯文本离线生成,基于事件启发式""" + + def generate(self, system_prompt: str, user_prompt: str, images: Optional[List[Dict[str, Any]]] = None) -> str: + # 简单规则:点击 -> click,text_input -> type;若窗口标题包含记事本且有文本输入,补保存按钮 + data = json.loads(user_prompt.split("事件摘要(JSON):")[-1]) + steps: List[Dict[str, Any]] = [] + params: Dict[str, Any] = {} + assertions: List[str] = [] + saw_text = False + saw_notepad = False + for ev in data: + ev_type = ev.get("event_type") + selector = ev.get("uia_selector") or {} + if ev_type == "mouse_click": + steps.append({"action": "click", "target": selector}) + elif ev_type == "text_input": + saw_text = True + params.setdefault("text", ev.get("text", "")) + steps.append({"action": "type", "target": selector, "text": "{{text}}"}) + if ev.get("window_title") and "记事本" in ev.get("window_title", ""): + saw_notepad = True + if saw_notepad and saw_text: + assertions.append("文本已输入记事本") + steps.append({"action": "click", "target": {"Name": "保存", "ControlType": "Button"}}) + if not assertions: + assertions.append("关键控件存在") + spec = { + "params": params, + "steps": steps or [{"action": "assert_exists", "target": {"Name": "dummy"}}], + "assertions": assertions, + "retry_policy": {"max_attempts": 2, "interval": 1.0}, + "waits": {"appear": 5.0, "disappear": 5.0}, + } + return json.dumps(spec, ensure_ascii=False) + + +class OpenAIVisionClient(LLMClient): + """兼容 OpenAI 接口的多模态客户端,支持自定义 base_url 和 model""" + + def __init__( + self, + api_key: str, + model: str = "gpt-5.1-high", + base_url: str = "https://api.wgetai.com/v1", + timeout: float = 120.0, + retries: int = 1, + ) -> None: + self.api_key = api_key + self.model = model + self.base_url = base_url.rstrip("/") + self.timeout = timeout + self.retries = max(0, retries) + + def generate(self, system_prompt: str, user_prompt: str, images: Optional[List[Dict[str, Any]]] = None) -> str: + headers = {"Authorization": f"Bearer {self.api_key}", "Content-Type": "application/json"} + content: List[Dict[str, Any]] = [{"type": "text", "text": user_prompt}] + for img in images or []: + content.append({"type": "image_url", "image_url": {"url": f"data:image/png;base64,{img['b64']}"}}) + payload = { + "model": self.model, + "messages": [ + {"role": "system", "content": SYSTEM_PROMPT}, + {"role": "user", "content": content}, + ], + "temperature": 0.2, + } + url = f"{self.base_url}/chat/completions" + last_err: Optional[Exception] = None + for attempt in range(self.retries + 1): + try: + resp = requests.post(url, headers=headers, json=payload, timeout=self.timeout) + resp.raise_for_status() + text = resp.json()["choices"][0]["message"]["content"] + return text + except Exception as exc: # noqa: BLE001 + last_err = exc + if attempt < self.retries: + continue + raise + raise last_err or RuntimeError("LLM 调用失败") + + +# ---------------- 数据加载与压缩 ---------------- +def _load_events(session_dir: Path) -> List[EventRecord]: + events_path = session_dir / "events.jsonl" + events: List[EventRecord] = [] + with events_path.open("r", encoding="utf-8") as f: + for line in f: + line = line.strip() + if not line: + continue + events.append(_model_validate(EventRecord, json.loads(line))) + return events + + +def _load_snapshot(path: Optional[str]) -> Optional[UISnapshot]: + if not path: + return None + p = Path(path) + if not p.exists(): + return None + with p.open("r", encoding="utf-8") as f: + data = json.load(f) + return _model_validate(UISnapshot, data) + + +def _best_image(frame_paths: Optional[FramePaths]) -> Optional[str]: + if not frame_paths: + return None + for cand in [frame_paths.crop_element, frame_paths.crop_mouse, frame_paths.full]: + if cand and Path(cand).exists(): + return cand + return None + + +def _selector_summary(selector: Optional[UISelector]) -> Dict[str, Any]: + if not selector: + return {} + return { + "AutomationId": selector.automation_id, + "Name": selector.name, + "ClassName": selector.class_name, + "ControlType": selector.control_type, + } + + +def _compress_tree(snapshot: Optional[UISnapshot], selector: Optional[UISelector]) -> List[Dict[str, Any]]: + """压缩 UI 树:保留深度<=2,或与命中控件同名/同类型的兄弟""" + if not snapshot: + return [] + nodes = [] + for node in snapshot.tree: + if node.depth <= 2: + nodes.append(_model_dump(node, exclude_none=True)) + else: + if selector and (node.name == selector.name or node.control_type == selector.control_type): + nodes.append(_model_dump(node, exclude_none=True)) + return nodes + + +def _encode_image_b64(path: Optional[str]) -> Optional[str]: + if not path: + return None + try: + with open(path, "rb") as f: + return base64.b64encode(f.read()).decode("ascii") + except Exception: + return None + + +def _pack_events(events: List[EventRecord], multimodal: bool) -> List[Dict[str, Any]]: + packed: List[Dict[str, Any]] = [] + for ev in events: + if ev.event_type not in {"mouse_click", "text_input", "window_change"}: + continue + img_path = _best_image(ev.frame_paths) + snapshot = _load_snapshot(ev.ui_snapshot) + selector = ev.uia + tree = _compress_tree(snapshot, selector) + item: Dict[str, Any] = { + "event_type": ev.event_type, + "ts": ev.ts, + "video_time_offset_ms": ev.video_time_offset_ms, + "text": ev.text, + "window_title": ev.window.title if ev.window else None, + "window_process": ev.window.process_name if ev.window else None, + "uia_selector": _selector_summary(selector), + "uia_tree": tree, + "frame_path": img_path, + } + if multimodal and img_path: + b64 = _encode_image_b64(img_path) + if b64: + item["image_base64"] = b64 + packed.append(item) + return packed + + +# ---------------- 主入口 ---------------- +def infer_session( + session_dir: Path, + api_key: Optional[str] = None, + base_url: Optional[str] = None, + model: str = "gpt-5.1-high", + timeout: float = 120.0, + retries: int = 1, +) -> DSLSpec: + """读取 session 目录,返回 DSLSpec""" + events = _load_events(session_dir) + multimodal = api_key is not None + packed = _pack_events(events, multimodal=multimodal) + user_prompt = render_user_prompt(packed) + client: LLMClient + images_payload = [{"b64": e["image_base64"]} for e in packed if "image_base64" in e] if multimodal else None + + raw: str + if multimodal: + client = OpenAIVisionClient( + api_key=api_key, + base_url=base_url or "https://api.wgetai.com/v1", + model=model, + timeout=timeout, + retries=retries, + ) + try: + raw = client.generate(SYSTEM_PROMPT, user_prompt, images=images_payload) + except Exception as exc: # noqa: BLE001 + print(f"[warn] 多模态归纳失败,降级为文本-only(原因: {exc})") + client = DummyLLM() + raw = client.generate(SYSTEM_PROMPT, user_prompt, images=None) + else: + client = DummyLLM() + raw = client.generate(SYSTEM_PROMPT, user_prompt, images=None) + + if not raw or not raw.strip(): + raise RuntimeError("LLM 返回为空,无法解析为 JSON") + cleaned = _strip_code_fences(raw) + try: + spec_dict = json.loads(cleaned) + except Exception as exc: + preview = cleaned[:500] + raise RuntimeError(f"LLM 返回非 JSON,可见前 500 字符: {preview}") from exc + spec_dict = _coerce_assertions(spec_dict) + spec_dict = _normalize_steps(spec_dict) + return _model_validate(DSLSpec, spec_dict) + + +def main() -> None: + parser = argparse.ArgumentParser(description="从 session 目录归纳 DSL(支持多模态)") + parser.add_argument("--session-dir", type=str, required=True, help="session 目录,包含 events.jsonl / manifest.json / frames / ui_snapshots") + parser.add_argument("--out", type=str, default="dsl.json", help="输出 DSL JSON 路径") + parser.add_argument("--api-key", type=str, help="LLM API Key,缺省读取环境变量 OPENAI_API_KEY") + parser.add_argument("--base-url", type=str, default="https://api.wgetai.com/v1", help="LLM Base URL") + parser.add_argument("--model", type=str, default="gpt-5.1-high", help="LLM 模型名") + parser.add_argument("--timeout", type=float, default=120.0, help="LLM 请求超时时间(秒)") + parser.add_argument("--retries", type=int, default=1, help="LLM 请求重试次数(额外重试次数)") + args = parser.parse_args() + + _load_env_file() + + session_dir = Path(args.session_dir) + api_key = args.api_key or os.environ.get("OPENAI_API_KEY") + base_url = args.base_url or os.environ.get("OPENAI_BASE_URL") + + spec = infer_session( + session_dir, + api_key=api_key, + base_url=base_url, + model=args.model, + timeout=args.timeout, + retries=args.retries, + ) + out_path = Path(args.out) + out_path.parent.mkdir(parents=True, exist_ok=True) + with out_path.open("w", encoding="utf-8") as f: + f.write(json.dumps(_model_dump(spec), ensure_ascii=False, indent=2)) + print(f"DSL 写入: {out_path}") + + +if __name__ == "__main__": + main() diff --git a/autodemo/llm.py b/autodemo/llm.py new file mode 100644 index 0000000..abababe --- /dev/null +++ b/autodemo/llm.py @@ -0,0 +1,65 @@ +# MIT License +# Copyright (c) 2024 +"""LLM 抽象与 Dummy 实现。""" + +from abc import ABC, abstractmethod +from typing import Any, Dict, List + +import yaml + +from .schema import DSLSpec, EventRecord + +PROMPT_TEMPLATE = """你是一名自动化工程师,请将以下事件序列归纳为可参数化的自动化 DSL。 +事件序列使用 JSON 描述,每个事件包含 kind、control(AutomationId/Name/ClassName/ControlType/BoundingRect)等。 +输出 YAML,字段包括:params、steps、assertions、retry_policy、waits,支持 steps 内的 if/else、for_each。 +输出示例: +params: + text: "示例参数" +steps: + - action: click + target: {{AutomationId: "15", ControlType: "Edit"}} + - action: type + target: {{AutomationId: "15"}} + text: "{{text}}" +assertions: + - "输入框非空" +retry_policy: {{max_attempts: 2, interval: 1.0}} +waits: {{appear: 5.0, disappear: 5.0}} +现在请基于输入事件生成 YAML:""" + + +class LLMClient(ABC): + """LLM 抽象接口。""" + + @abstractmethod + def generate(self, events: List[EventRecord]) -> DSLSpec: + """将事件序列转为 DSL 规格。""" + + +class DummyLLM(LLMClient): + """离线 dummy,实现一个简单的规则映射。""" + + def generate(self, events: List[EventRecord]) -> DSLSpec: + steps: List[Dict[str, Any]] = [] + for ev in events: + ctrl = ev.control.dict(by_alias=True) if ev.control else {} + if ev.kind == "mouse_click": + steps.append({"action": "click", "target": ctrl}) + elif ev.kind == "key_down" and ev.data.get("name"): + # 仅在按键时记录输入 + steps.append({"action": "type", "target": ctrl, "text": ev.data.get("name")}) + if not steps: + steps.append({"action": "assert_exists", "target": {"Name": "dummy"}}) + spec = DSLSpec( + params={}, + steps=steps, + assertions=["dummy generated"], + ) + return spec + + +def render_prompt(events: List[EventRecord]) -> str: + """把事件序列渲染到 prompt。""" + event_dicts = [ev.dict(by_alias=True) for ev in events] + return f"{PROMPT_TEMPLATE}\n\n{yaml.safe_dump(event_dicts, allow_unicode=True)}" + diff --git a/autodemo/prompt_templates.py b/autodemo/prompt_templates.py new file mode 100644 index 0000000..ca56e8f --- /dev/null +++ b/autodemo/prompt_templates.py @@ -0,0 +1,32 @@ +# MIT License +# Copyright (c) 2024 +"""多模态归纳提示词模板""" + +from __future__ import annotations + +import json +from typing import Any, Dict, List + +# system 提示:约束模型输出和选择器策略 +SYSTEM_PROMPT = """ +你是一名Windows桌面自动化工程师,请将用户示教的关键事件归纳为可参数化的自动化DSL。 +要求: +1) 识别界面场景(如记事本、保存对话框、网页表单),推断用户意图。 +2) 将易变内容参数化(params),动作抽象成可重放的 click/type/set_value 等。 +3) 选择器优先级:AutomationId > (Name + ControlType) > (ClassName + ControlType),谨慎使用坐标。 +4) 输出健壮 waits/assertions,避免竞态。 +5) 严格输出 JSON,符合 dsl_schema.json。 +""".strip() + + +def render_user_prompt(packed_events: List[Dict[str, Any]]) -> str: + """构造 user 提示,将打包事件嵌入""" + guide = """ +请阅读以下关键事件,生成符合 dsl_schema.json 的 JSON: +- events 已包含点击/文本输入/窗口切换,附带 UIA selector 摘要与可用截图路径。 +- 生成 params:将文件名、文本内容等抽象为参数。 +- 生成 steps:click/type/set_value/assert_exists/wait_for;需要等待时填写 waits。 +- 生成 assertions:确保关键结果(如窗口标题或保存结果)。 +仅输出 JSON,不要解释。 +""".strip() + return f"{guide}\n\n事件摘要(JSON):\n{json.dumps(packed_events, ensure_ascii=False, indent=2)}" diff --git a/autodemo/recorder.py b/autodemo/recorder.py new file mode 100644 index 0000000..7d46c89 --- /dev/null +++ b/autodemo/recorder.py @@ -0,0 +1,445 @@ +# MIT License +# Copyright (c) 2024 +"""Multimodal recorder for Windows desktop sessions.""" + +from __future__ import annotations + +import json +import threading +import time +import uuid +from pathlib import Path +from typing import List, Optional, Tuple + +import cv2 # type: ignore +import numpy as np # type: ignore +import psutil # type: ignore +import uiautomation as auto # type: ignore +from pynput import keyboard, mouse +import mss # type: ignore + +from .schema import ( + EventRecord, + FramePaths, + MouseInfo, + Rect, + SessionManifest, + UISnapshot, + UITreeNode, + UISelector, + WindowInfo, +) +from .screen_recorder import ScreenRecorder + + +class Recorder: + """Capture UI events, UIA context, screenshots, and screen video.""" + + def __init__(self, output_dir: Path, hotkey: str = "F9", fps: int = 12, screen: int = 0) -> None: + self.output_dir = output_dir + self.hotkey = hotkey + self.fps = fps + self.screen = screen + + self.session_id = str(uuid.uuid4()) + self.session_dir = self.output_dir / self.session_id + self.events_path = self.session_dir / "events.jsonl" + self.video_path = self.session_dir / "video.mp4" + self.frames_dir = self.session_dir / "frames" + self.frames_crops_dir = self.session_dir / "frames_crops" + self.ui_snapshots_dir = self.session_dir / "ui_snapshots" + + self.events: List[EventRecord] = [] + self._stop_event = threading.Event() + self._lock = threading.Lock() + self._text_buffer: List[str] = [] + self._flush_timer: Optional[threading.Timer] = None + self._start_perf = 0.0 + self._start_ts = 0.0 + self._last_hwnd: Optional[int] = None + self._mouse_controller = mouse.Controller() + self._screen_recorder: Optional[ScreenRecorder] = None + self._window_thread: Optional[threading.Thread] = None + self._mouse_listener: Optional[mouse.Listener] = None + self._keyboard_listener: Optional[keyboard.Listener] = None + self._monitor: Optional[dict] = None + self._event_index = 0 + self._uia_local = threading.local() + self._ensure_uia_initialized() + + # Public API --------------------------------------------------------- + def start(self) -> Path: + """Start recording until the hotkey is pressed.""" + self.session_dir.mkdir(parents=True, exist_ok=True) + self.frames_dir.mkdir(parents=True, exist_ok=True) + self.frames_crops_dir.mkdir(parents=True, exist_ok=True) + self.ui_snapshots_dir.mkdir(parents=True, exist_ok=True) + + self._start_perf = time.perf_counter() + self._start_ts = time.time() + with mss.mss() as sct: + monitors = sct.monitors + if 0 <= self.screen < len(monitors): + self._monitor = monitors[self.screen] + else: + self._monitor = monitors[0] + + self._screen_recorder = ScreenRecorder(self.video_path, fps=self.fps, screen=self.screen) + self._screen_recorder.start() + + self._window_thread = threading.Thread(target=self._watch_window, daemon=True) + self._window_thread.start() + + self._mouse_listener = mouse.Listener(on_click=self._on_click) + self._keyboard_listener = keyboard.Listener(on_press=self._on_key_press) + self._mouse_listener.start() + self._keyboard_listener.start() + + self._stop_event.wait() + self._flush_text_buffer() + self._shutdown() + return self.session_dir + + # Event handlers ----------------------------------------------------- + def _on_click(self, x: int, y: int, button: mouse.Button, pressed: bool) -> None: + if not pressed or self._stop_event.is_set(): + return + window_info = self._get_window_info() + selector = self._hit_test(x, y) + mouse_info = MouseInfo(x=int(x), y=int(y), button=str(button).split(".")[-1], action="down") + self._record_event( + event_type="mouse_click", + mouse_info=mouse_info, + text=None, + uia_selector=selector, + window=window_info, + ) + + def _on_key_press(self, key: keyboard.Key | keyboard.KeyCode) -> Optional[bool]: + if self._is_hotkey(key): + self._stop_event.set() + return False + if self._stop_event.is_set(): + return False + ch = self._key_to_char(key) + if ch is None: + return None + self._text_buffer.append(ch) + self._schedule_flush() + return None + + # Background watchers ------------------------------------------------ + def _watch_window(self, interval: float = 0.5) -> None: + while not self._stop_event.is_set(): + info = self._get_window_info() + hwnd = info.hwnd if info else None + if hwnd and hwnd != self._last_hwnd: + self._last_hwnd = hwnd + selector = self._hit_test(*self._current_mouse_position()) + self._record_event( + event_type="window_change", + mouse_info=self._current_mouse_info(), + text=None, + uia_selector=selector, + window=info, + ) + time.sleep(interval) + + # Recording helpers -------------------------------------------------- + def _shutdown(self) -> None: + if self._flush_timer and self._flush_timer.is_alive(): + self._flush_timer.cancel() + if self._mouse_listener: + self._mouse_listener.stop() + if self._keyboard_listener: + self._keyboard_listener.stop() + if self._window_thread and self._window_thread.is_alive(): + self._window_thread.join(timeout=1.0) + if self._screen_recorder: + self._screen_recorder.stop() + self._write_events() + self._write_manifest() + + def _schedule_flush(self) -> None: + if self._flush_timer and self._flush_timer.is_alive(): + self._flush_timer.cancel() + self._flush_timer = threading.Timer(0.8, self._flush_text_buffer) + self._flush_timer.daemon = True + self._flush_timer.start() + + def _flush_text_buffer(self) -> None: + if not self._text_buffer: + return + text = "".join(self._text_buffer) + self._text_buffer = [] + mouse_info = self._current_mouse_info() + selector = None + if mouse_info: + selector = self._hit_test(mouse_info.x, mouse_info.y) + window_info = self._get_window_info() + self._record_event( + event_type="text_input", + mouse_info=mouse_info, + text=text, + uia_selector=selector, + window=window_info, + ) + + def _record_event( + self, + event_type: str, + mouse_info: Optional[MouseInfo], + text: Optional[str], + uia_selector: Optional[UISelector], + window: Optional[WindowInfo], + ) -> None: + self._event_index += 1 + ts = time.time() + offset_ms = int((time.perf_counter() - self._start_perf) * 1000) + frame_paths = self._capture_frame(event_type, self._event_index, mouse_info, uia_selector, window) + ui_snapshot_path = self._save_ui_snapshot(self._event_index, uia_selector) + + record = EventRecord( + ts=ts, + event_type=event_type, + window=window, + mouse=mouse_info, + text=text, + uia=uia_selector, + frame_paths=frame_paths, + video_time_offset_ms=offset_ms, + ui_snapshot=ui_snapshot_path, + ) + with self._lock: + self.events.append(record) + + def _capture_frame( + self, + tag: str, + event_index: int, + mouse_info: Optional[MouseInfo], + uia_selector: Optional[UISelector], + window: Optional[WindowInfo], + ) -> Optional[FramePaths]: + if not self._monitor: + return None + + region = self._monitor_region(window) + with mss.mss() as sct: + shot = np.array(sct.grab(region)) + frame = cv2.cvtColor(shot, cv2.COLOR_BGRA2BGR) + + full_path = self.frames_dir / f"frame_{event_index:05d}_{tag}.png" + cv2.imwrite(str(full_path), frame) + + crop_mouse_path = None + crop_element_path = None + if mouse_info: + crop_mouse_path = self._save_mouse_crop(frame, region, mouse_info, event_index) + if uia_selector and uia_selector.bounding_rect: + crop_element_path = self._save_element_crop(frame, region, uia_selector.bounding_rect, event_index) + + return FramePaths( + full=str(full_path), + crop_mouse=str(crop_mouse_path) if crop_mouse_path else None, + crop_element=str(crop_element_path) if crop_element_path else None, + ) + + def _save_mouse_crop(self, frame: np.ndarray, region: dict, mouse_info: MouseInfo, event_index: int) -> Optional[Path]: + width, height = frame.shape[1], frame.shape[0] + center_x = int(mouse_info.x - region["left"]) + center_y = int(mouse_info.y - region["top"]) + crop_w, crop_h = 400, 300 + x0 = max(0, center_x - crop_w // 2) + y0 = max(0, center_y - crop_h // 2) + x1 = min(width, x0 + crop_w) + y1 = min(height, y0 + crop_h) + if x1 <= x0 or y1 <= y0: + return None + crop = frame[y0:y1, x0:x1] + path = self.frames_crops_dir / f"frame_{event_index:05d}_mouse.png" + cv2.imwrite(str(path), crop) + return path + + def _save_element_crop(self, frame: np.ndarray, region: dict, rect: Rect, event_index: int) -> Optional[Path]: + width, height = frame.shape[1], frame.shape[0] + x0 = max(0, int(rect.left - region["left"])) + y0 = max(0, int(rect.top - region["top"])) + x1 = min(width, int(rect.right - region["left"])) + y1 = min(height, int(rect.bottom - region["top"])) + if x1 <= x0 or y1 <= y0: + return None + crop = frame[y0:y1, x0:x1] + path = self.frames_crops_dir / f"frame_{event_index:05d}_element.png" + cv2.imwrite(str(path), crop) + return path + + def _monitor_region(self, window: Optional[WindowInfo]) -> dict: + if window and window.rect and window.rect.width > 0 and window.rect.height > 0: + return { + "left": int(window.rect.left), + "top": int(window.rect.top), + "width": int(window.rect.width), + "height": int(window.rect.height), + } + return { + "left": int(self._monitor["left"]), + "top": int(self._monitor["top"]), + "width": int(self._monitor["width"]), + "height": int(self._monitor["height"]), + } + + def _save_ui_snapshot(self, event_index: int, selector: Optional[UISelector]) -> Optional[str]: + tree = self._capture_tree(max_depth=3) + if not tree and selector is None: + return None + path = self.ui_snapshots_dir / f"ui_{event_index:05d}.json" + snapshot = UISnapshot(selector=selector, tree=tree) + with path.open("w", encoding="utf-8") as f: + json.dump(snapshot.dict(exclude_none=True), f, ensure_ascii=False) + return str(path) + + # UI helpers --------------------------------------------------------- + def _capture_tree(self, max_depth: int = 3) -> List[UITreeNode]: + self._ensure_uia_initialized() + root = auto.GetForegroundControl() + if root is None: + return [] + nodes: List[UITreeNode] = [] + queue: List[Tuple[auto.Control, int]] = [(root, 0)] # type: ignore + while queue: + node, depth = queue.pop(0) + if depth > max_depth: + continue + nodes.append( + UITreeNode( + name=node.Name, + automation_id=node.AutomationId, + class_name=node.ClassName, + control_type=node.ControlTypeName, + depth=depth, + ) + ) + try: + children = list(node.GetChildren()) + except Exception: + children = [] + for child in children: + queue.append((child, depth + 1)) + return nodes + + def _hit_test(self, x: int, y: int) -> Optional[UISelector]: + try: + self._ensure_uia_initialized() + ctrl = auto.ControlFromPoint((int(x), int(y))) + except Exception: + ctrl = None + if not ctrl: + return None + return self._build_selector(ctrl) + + def _get_window_info(self) -> Optional[WindowInfo]: + self._ensure_uia_initialized() + ctrl = auto.GetForegroundControl() + if ctrl is None: + return None + rect = getattr(ctrl, "BoundingRectangle", None) + self._ensure_uia_initialized() + rect_model = None + if rect: + rect_model = Rect(left=int(rect.left), top=int(rect.top), right=int(rect.right), bottom=int(rect.bottom)) + process_name = None + try: + process_name = psutil.Process(ctrl.ProcessId).name() + except Exception: + process_name = None + hwnd = getattr(ctrl, "NativeWindowHandle", None) or getattr(ctrl, "Handle", None) + return WindowInfo( + hwnd=int(hwnd) if hwnd else None, + title=ctrl.Name, + process_name=process_name, + rect=rect_model, + ) + + def _build_selector(self, ctrl: auto.Control) -> UISelector: # type: ignore + rect = getattr(ctrl, "BoundingRectangle", None) + rect_model = None + if rect: + rect_model = Rect(left=int(rect.left), top=int(rect.top), right=int(rect.right), bottom=int(rect.bottom)) + return UISelector( + automation_id=getattr(ctrl, "AutomationId", None), + name=getattr(ctrl, "Name", None), + class_name=getattr(ctrl, "ClassName", None), + control_type=getattr(ctrl, "ControlTypeName", None), + bounding_rect=rect_model, + ) + + # Utility ------------------------------------------------------------ + def _key_to_char(self, key: keyboard.Key | keyboard.KeyCode) -> Optional[str]: + if isinstance(key, keyboard.KeyCode) and key.char: + return key.char + if key == keyboard.Key.space: + return " " + if key == keyboard.Key.enter: + return "\n" + if key == keyboard.Key.backspace: + if self._text_buffer: + self._text_buffer.pop() + return None + return None + + def _is_hotkey(self, key: keyboard.Key | keyboard.KeyCode) -> bool: + target = self.hotkey.lower() + name = None + if isinstance(key, keyboard.Key): + name = (key.name or "").lower() + elif isinstance(key, keyboard.KeyCode): + name = (key.char or "").lower() + return name == target + + def _current_mouse_position(self) -> Tuple[int, int]: + pos = self._mouse_controller.position + return int(pos[0]), int(pos[1]) + + def _current_mouse_info(self) -> Optional[MouseInfo]: + x, y = self._current_mouse_position() + return MouseInfo(x=int(x), y=int(y), button=None, action=None) + + def _ensure_uia_initialized(self) -> None: + if getattr(self._uia_local, "token", None) is None: + self._uia_local.token = auto.UIAutomationInitializerInThread() + + # Persistence -------------------------------------------------------- + def _write_events(self) -> None: + with self.events_path.open("w", encoding="utf-8") as f: + for event in self.events: + f.write(json.dumps(event.dict(exclude_none=True), ensure_ascii=False)) + f.write("\n") + + def _write_manifest(self) -> None: + resolution = self._resolution() + manifest = SessionManifest( + session_id=self.session_id, + start_time=self._start_ts, + end_time=time.time(), + resolution=resolution, + fps=self.fps, + screen=self.screen, + video_path=str(self.video_path), + events_path=str(self.events_path), + frames_dir=str(self.frames_dir), + frames_crops_dir=str(self.frames_crops_dir), + ui_snapshots_dir=str(self.ui_snapshots_dir), + ) + path = self.session_dir / "manifest.json" + with path.open("w", encoding="utf-8") as f: + json.dump(manifest.dict(exclude_none=True), f, ensure_ascii=False, indent=2) + + def _resolution(self) -> str: + if self._monitor: + return f"{self._monitor['width']}x{self._monitor['height']}" + try: + width, height = auto.GetScreenSize() + return f"{width}x{height}" + except Exception: + return "unknown" diff --git a/autodemo/schema.py b/autodemo/schema.py new file mode 100644 index 0000000..8156168 --- /dev/null +++ b/autodemo/schema.py @@ -0,0 +1,120 @@ +# MIT License +# Copyright (c) 2024 +"""Data schemas for recording and DSL components.""" + +from __future__ import annotations + +from typing import Any, Dict, List, Literal, Optional + +from pydantic import BaseModel, Field + + +class Rect(BaseModel): + left: int + top: int + right: int + bottom: int + + @property + def width(self) -> int: + return self.right - self.left + + @property + def height(self) -> int: + return self.bottom - self.top + + +class WindowInfo(BaseModel): + hwnd: Optional[int] = None + title: Optional[str] = None + process_name: Optional[str] = None + rect: Optional[Rect] = None + + +class UISelector(BaseModel): + automation_id: Optional[str] = None + name: Optional[str] = None + class_name: Optional[str] = None + control_type: Optional[str] = None + bounding_rect: Optional[Rect] = None + + +class FramePaths(BaseModel): + full: Optional[str] = None + crop_mouse: Optional[str] = None + crop_element: Optional[str] = None + + +class MouseInfo(BaseModel): + x: int + y: int + button: Optional[str] = None + action: Optional[str] = None + + +class UITreeNode(BaseModel): + name: Optional[str] + automation_id: Optional[str] + class_name: Optional[str] + control_type: Optional[str] + depth: int + + +EventType = Literal["mouse_click", "text_input", "window_change"] + + +class EventRecord(BaseModel): + ts: float + event_type: EventType + window: Optional[WindowInfo] = None + mouse: Optional[MouseInfo] = None + text: Optional[str] = None + uia: Optional[UISelector] = None + frame_paths: Optional[FramePaths] = None + video_time_offset_ms: Optional[int] = Field(None, alias="video_time_offset_ms") + ui_snapshot: Optional[str] = None + + +class UISnapshot(BaseModel): + selector: Optional[UISelector] = None + tree: List[UITreeNode] = Field(default_factory=list) + + +class SessionManifest(BaseModel): + session_id: str + start_time: float + end_time: float + resolution: Optional[str] = None + fps: int + screen: int + video_path: str + events_path: str + frames_dir: str + frames_crops_dir: str + ui_snapshots_dir: str + + +# DSL schemas (kept for executor/infer workflow) ------------------------ +class DSLAction(BaseModel): + action: Literal["click", "type", "set_value", "assert_exists", "wait_for"] + target: Dict[str, Any] = Field(default_factory=dict) + text: Optional[str] = None + params: Dict[str, Any] = Field(default_factory=dict) + retry_policy: Optional[Dict[str, Any]] = None + waits: Optional[Dict[str, Any]] = None + + +class DSLBlock(BaseModel): + name: str + steps: List[Any] = Field(default_factory=list) + if_condition: Optional[str] = None + else_steps: Optional[List[Any]] = None + for_each: Optional[str] = None + + +class DSLSpec(BaseModel): + params: Dict[str, Any] = Field(default_factory=dict) + steps: List[Any] + assertions: List[str] = Field(default_factory=list) + retry_policy: Dict[str, Any] = Field(default_factory=lambda: {"max_attempts": 2, "interval": 1.0}) + waits: Dict[str, Any] = Field(default_factory=lambda: {"appear": 5.0, "disappear": 5.0}) diff --git a/autodemo/screen_recorder.py b/autodemo/screen_recorder.py new file mode 100644 index 0000000..c9e533c --- /dev/null +++ b/autodemo/screen_recorder.py @@ -0,0 +1,155 @@ +# MIT License +# Copyright (c) 2024 +"""Screen recording helper with ffmpeg primary and mss+cv2 fallback.""" + +from __future__ import annotations + +import shutil +import subprocess +import threading +import time +from pathlib import Path +from typing import Dict, Optional + +import cv2 # type: ignore +import mss # type: ignore +import numpy as np # type: ignore + + +class ScreenRecorder: + """Record the screen to an MP4 file.""" + + def __init__(self, output_path: Path, fps: int = 12, screen: int = 0) -> None: + self.output_path = output_path + self.fps = fps + self.screen = screen + + self._proc: Optional[subprocess.Popen] = None + self._thread: Optional[threading.Thread] = None + self._stop_event = threading.Event() + self._monitor: Optional[Dict[str, int]] = None + self._writer: Optional[cv2.VideoWriter] = None + + @property + def monitor(self) -> Optional[Dict[str, int]]: + return self._monitor + + def start(self) -> None: + """Start recording using ffmpeg if available, otherwise mss+cv2.""" + self.output_path.parent.mkdir(parents=True, exist_ok=True) + if self._start_ffmpeg(): + return + self._start_mss_fallback() + + def stop(self) -> None: + """Stop recording gracefully.""" + self._stop_event.set() + if self._proc: + try: + if self._proc.stdin: + self._proc.stdin.write(b"q") + self._proc.stdin.flush() + except Exception: + pass + try: + self._proc.wait(timeout=5) + except Exception: + self._proc.kill() + self._proc = None + if self._thread and self._thread.is_alive(): + self._thread.join(timeout=5) + self._thread = None + if self._writer: + self._writer.release() + self._writer = None + + def _start_ffmpeg(self) -> bool: + if shutil.which("ffmpeg") is None: + return False + + with mss.mss() as sct: + monitors = sct.monitors + if 0 <= self.screen < len(monitors): + self._monitor = monitors[self.screen] + else: + self._monitor = monitors[0] + + width = int(self._monitor["width"]) + height = int(self._monitor["height"]) + offset_x = int(self._monitor["left"]) + offset_y = int(self._monitor["top"]) + + cmd = [ + "ffmpeg", + "-y", + "-f", + "gdigrab", + "-framerate", + str(self.fps), + "-offset_x", + str(offset_x), + "-offset_y", + str(offset_y), + "-video_size", + f"{width}x{height}", + "-draw_mouse", + "1", + "-i", + "desktop", + "-pix_fmt", + "yuv420p", + "-vcodec", + "libx264", + "-preset", + "ultrafast", + str(self.output_path), + ] + + creation_flags = subprocess.CREATE_NO_WINDOW if hasattr(subprocess, "CREATE_NO_WINDOW") else 0 + try: + self._proc = subprocess.Popen( + cmd, + stdin=subprocess.PIPE, + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL, + creationflags=creation_flags, + ) + return True + except Exception: + self._proc = None + return False + + def _start_mss_fallback(self) -> None: + self._stop_event.clear() + self._thread = threading.Thread(target=self._capture_loop, daemon=True) + self._thread.start() + + def _capture_loop(self) -> None: + with mss.mss() as sct: + monitors = sct.monitors + if 0 <= self.screen < len(monitors): + self._monitor = monitors[self.screen] + else: + self._monitor = monitors[0] + + width = int(self._monitor["width"]) + height = int(self._monitor["height"]) + fourcc = cv2.VideoWriter_fourcc(*"mp4v") + self._writer = cv2.VideoWriter(str(self.output_path), fourcc, self.fps, (width, height)) + + frame_interval = 1.0 / max(self.fps, 1) + next_ts = time.perf_counter() + + while not self._stop_event.is_set(): + shot = np.array(sct.grab(self._monitor)) + frame = cv2.cvtColor(shot, cv2.COLOR_BGRA2BGR) + self._writer.write(frame) + + next_ts += frame_interval + sleep_for = max(0.0, next_ts - time.perf_counter()) + if sleep_for: + time.sleep(sleep_for) + + if self._writer: + self._writer.release() + self._writer = None diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..05f6b5e --- /dev/null +++ b/requirements.txt @@ -0,0 +1,12 @@ +pydantic>=1.10,<3 +uiautomation>=2.0.20 +pywin32>=306 +pyyaml>=6.0.1 +pytest>=7.4.0 +pynput>=1.7.6 +mss>=9.0.1 +opencv-python>=4.8.0 +psutil>=5.9.6 +numpy>=1.26.0 +requests>=2.31.0 +python-dotenv>=1.0.0 diff --git a/tests/test_dummy_llm.py b/tests/test_dummy_llm.py new file mode 100644 index 0000000..5686b06 --- /dev/null +++ b/tests/test_dummy_llm.py @@ -0,0 +1,21 @@ +# MIT License +# Copyright (c) 2024 +"""最小端到端测试:Dummy LLM 推理。""" + +from autodemo.llm import DummyLLM +from autodemo.schema import ControlSnapshot, EventRecord, Rect + + +def test_dummy_llm_generate() -> None: + llm = DummyLLM() + ev = EventRecord( + kind="mouse_click", + timestamp=1.0, + data={"x": 1, "y": 2}, + control=ControlSnapshot( + AutomationId="btn1", Name="按钮", ClassName="Button", ControlType="Button", BoundingRectangle=Rect(left=0, top=0, right=10, bottom=10) + ), + ) + spec = llm.generate([ev]) + assert spec.steps[0]["action"] == "click" + assert spec.steps[0]["target"]["AutomationId"] == "btn1" diff --git a/tests/test_executor_dry.py b/tests/test_executor_dry.py new file mode 100644 index 0000000..e5b3877 --- /dev/null +++ b/tests/test_executor_dry.py @@ -0,0 +1,29 @@ +# MIT License +# Copyright (c) 2024 +"""最小端到端测试:执行器 dry-run 模式。""" + +from autodemo.executor import ExecContext, execute_spec +from autodemo.schema import DSLSpec + + +def test_executor_dry_run(monkeypatch, capsys) -> None: + # 替换 _match_window 与 _find_control 以避免真实 UI 依赖 + from autodemo import executor + + def fake_match(title: str): + class Dummy: + Name = "Notepad" + + return Dummy() + + def fake_find(root, locator, timeout): + return object() + + monkeypatch.setattr(executor, "_match_window", fake_match) + monkeypatch.setattr(executor, "_find_control", fake_find) + + spec = DSLSpec(steps=[{"action": "click", "target": {"Name": "ok"}}]) + ctx = ExecContext(allow_title=".*", dry_run=True) + execute_spec(spec, ctx) + out = capsys.readouterr().out + assert "dry-run" in out diff --git a/tests/test_schema.py b/tests/test_schema.py new file mode 100644 index 0000000..8d87b49 --- /dev/null +++ b/tests/test_schema.py @@ -0,0 +1,11 @@ +# MIT License +# Copyright (c) 2024 +"""最小端到端测试:schema 校验。""" + +from autodemo.schema import DSLSpec + + +def test_dsl_schema_defaults() -> None: + spec = DSLSpec(steps=[{"action": "click", "target": {"Name": "btn"}}]) + assert spec.retry_policy["max_attempts"] == 2 + assert spec.waits["appear"] == 5.0