Commit 574904c8 authored by allen.wang's avatar allen.wang

feat:init

parent 6cedbcf0
Pipeline #636 failed with stages
# VIP Report Skill Bundle
这是一套围绕 VIP 月报模板的本地 skill 骨架。
已落地内容:
- 总 skill:`vip-report`
- 子 skill:
- `overview`
- `data-sync`
- `presentation`
- 基础脚本:
- 模板 manifest 提取
- PowerPoint COM shape inventory
- 基线渲染
- `pptx` 包级比对
当前生成目标不是“重新画一份 PPT”,而是:
- 保留原始模板
- 只替换模板中的现有内容
- 每次生成都和基准模板做对比
## 迁移到新电脑 Checklist
1. 准备运行环境(Windows)
- 安装 `Python 3.10+`(建议 3.11)。
- 安装 `Node.js 18+`,确保命令行可用 `npx`
- 安装 `Microsoft PowerPoint``render.mode=com` 依赖 COM 自动化)。
- 确认网络可访问 `http://tableau.charleskeith.cn`
2. 复制目录与文件
- 复制 skill 目录:`C:\Users\<你的用户名>\.codex\skills\vip-report`
- 复制配置目录:`C:\Users\<你的用户名>\.codex\vip-report`
- 准备模板文件:`Report.pptx`(路径可自定义,需写入 `config.yaml`)。
- 准备来源映射:`slide-source-map.yaml`(路径需写入 `config.yaml`)。
3. 修改配置(必须)
- 打开 `C:\Users\<你的用户名>\.codex\vip-report\config.yaml`
- 使用 `config.yaml.example` 作为参考,至少检查:
- `tableau.username` / `tableau.password`
- `mysql.username` / `mysql.password`(若该页用到数据库)
- `paths.template_pptx`
- `paths.slide_source_map`
- `paths.workdir`
- `render.mode`(默认 `com`
- `report.month_cn` / `report.year` / `report.compare_year`(按当期月报设置)
4. 安装 Python 依赖(首次)
- 执行:`python -m pip install pillow pyyaml`
5. 最小化验证(先只跑 S02)
- 执行:
- `powershell -ExecutionPolicy Bypass -File C:\Users\<你的用户名>\.codex\skills\vip-report\bin\vip-report-monthly-sales-sync.ps1 -ConfigPath C:\Users\<你的用户名>\.codex\vip-report\config.yaml -Slides S02 -ReportMonth 二月 -ReportYear 2026 -CompareYear 2025 -Render`
- 产物应生成在 `config.yaml -> paths.workdir` 指向目录下。
6. 常见问题
- 报错 `PowerPoint.Application` 创建失败:通常是未安装 Office 或权限不足。
- 报错 `npx` 不存在:Node.js 未安装或 PATH 未生效。
- Tableau 偶发 `get_workbook` 空对象:属于页面初始化时序问题,脚本已内置重试;仍失败时直接重跑同命令一次。
---
name: vip-report
description: 协调 VIP 月报所有页的来源与渲染,通过固定配置与模板确保生成版与基线完全一致。
---
# VIP Report
## 概览
这个总 skill 串联 VIP 月报的数据采集、素材生成、模板替换、比对验证四步,确保最终 `Report.pptx` 一模一样。
## 数据来源约定
- 所有页的来源映射记录在 `C:\workspace\cursor\output\vip-report\slide-source-map.yaml`,按 slide_code 映射到 Tableau view、MySQL 表或第三方系统。
- 依赖 Tableau (`tableau.charleskeith.cn`) 时统一走 `config.yaml` 里的账号(目前是 `ec_user01`),优先尝试下载/导出,若必须截屏则按照脚本裁切。
- 某些页面会引用 `vip.com`,凭据不驻留仓库;调用 `config.yaml` 中的 `vip.login_endpoint` 让第三方系统自动处理登录。
- 需要数据库的页面使用测试库 `ckc_cep_db_test``config.yaml``mysql` 段),会参考 `s11-source-validation.md``s11-sql-hypothesis.sql` 等文件保持口径。
## 关键路径
- 配置文件:`C:\Users\niuniu\.codex\vip-report\config.yaml`
- 配置样例:`C:\Users\niuniu\.codex\vip-report\config.yaml.example`
- 模板 PPT:`C:\Users\niuniu\Desktop\Report.pptx`
- slide 来源表:`C:\workspace\cursor\output\vip-report\slide-source-map.yaml`
- 素材与输出目录:`C:\workspace\cursor\output\vip-report`
## 渲染策略
- 目前 Windows 可以成功创建 `PowerPoint.Application`,建议优先用 COM 改写模板(如 `render_template_com.ps1`)。
- `OpenXML` 只在无法运行 COM 或需包级比较时作为补充手段。
- 所有替换完成后必须运行 `compare_pptx.py``render-ops.*.json` 也可辅助)确认差异限定在预期的 `ppt/media/image*.png``ppt/slides/slide*.xml` 中。
## 子技能与流程
- `/vip-report overview`:整理 `slide-source-map.yaml``shape-inventory.json``template-manifest.json`,确认每页 shape_id/名称。
- `/vip-report data-sync`:按 slide 逐个源头从 Tableau 或 SQL 拉数据并生成 `render-ops.*.json`
- `/vip-report presentation`:调用 `vip-report-render.ps1` 等脚本把素材替回 `Report.pptx` 并输出比对结果。
- 当前拆出的专题 skill:`/vip-report monthly-sales``/vip-report inventory-monthly``/vip-report top-products``/vip-report campaign-s11``/vip-report warehouse-100060`,它们分别对应 `slide-source-map.yaml` 中 S02-S03、S04-S10、S11、S13 等页的来源。
## 辅助命令
- 生成模板 manifest(用于审查 slide 结构):`powershell -ExecutionPolicy Bypass -File C:\Users\niuniu\.codex\skills\vip-report\bin\vip-report-manifest.ps1`
- 生成 shape inventory(查 shape_id/名称):`powershell -ExecutionPolicy Bypass -File C:\Users\niuniu\.codex\skills\vip-report\bin\vip-report-inventory.ps1`
- 生成基线副本与比对(验证模板/生成文件差异):`powershell -ExecutionPolicy Bypass -File C:\Users\niuniu\.codex\skills\vip-report\bin\vip-report-baseline.ps1`
- 生成 S02/S03 素材并渲染:`powershell -ExecutionPolicy Bypass -File C:\Users\niuniu\.codex\skills\vip-report\bin\vip-report-monthly-sales-sync.ps1 -Render`
## 备注
- Tableau 登录凭据只保存在 `config.yaml`,请勿复制到技能文档。
- vip.com 登录(若展开)只需配置 `vip.login_endpoint` 让第三方服务处理。
- 每次渲染完成后务必与模板做 `compare_pptx.py` 比较,确认差异仅限预期替换的图片和 `docProps/*``ppt/slides/*.xml`
param(
[string]$TemplatePath = "C:\Users\niuniu\Desktop\Report.pptx",
[string]$OutputPath = "C:\workspace\cursor\output\vip-report\generated-baseline.pptx",
[string]$ComparePath = "C:\workspace\cursor\output\vip-report\baseline-compare.json"
)
$ErrorActionPreference = "Stop"
$root = Split-Path -Parent $PSScriptRoot
python "$root\scripts\render_baseline.py" "$TemplatePath" "$OutputPath"
python "$root\scripts\compare_pptx.py" "$TemplatePath" "$OutputPath" --output "$ComparePath"
Get-Content -LiteralPath $ComparePath -Encoding UTF8
param(
[string]$ConfigPath = "C:\Users\niuniu\.codex\vip-report\config.yaml",
[string]$Slides = "S04,S05,S06,S07,S08",
[string]$ReportMonth = "",
[int]$ReportYear = 0,
[int]$CompareYear = 0
)
$ErrorActionPreference = "Stop"
$root = Split-Path -Parent $PSScriptRoot
$opsPath = "C:\workspace\cursor\output\vip-report\render-ops.inventory-monthly.live.json"
$pythonArgs = @(
"$root\scripts\sync_inventory_monthly_assets.py",
"--config", "$ConfigPath",
"--slides", "$Slides"
)
if ($ReportMonth) {
$pythonArgs += @("--report-month", "$ReportMonth")
}
if ($ReportYear -gt 0) {
$pythonArgs += @("--report-year", "$ReportYear")
}
if ($CompareYear -gt 0) {
$pythonArgs += @("--compare-year", "$CompareYear")
}
python @pythonArgs
Write-Output $opsPath
param(
[string]$TemplatePath = "C:\Users\niuniu\Desktop\Report.pptx",
[string]$OutputPath = "C:\workspace\cursor\output\vip-report\shape-inventory.json"
)
$ErrorActionPreference = "Stop"
$root = Split-Path -Parent $PSScriptRoot
powershell -ExecutionPolicy Bypass -File "$root\scripts\inspect_template_inventory.ps1" -TemplatePath "$TemplatePath" -OutputPath "$OutputPath"
Get-Content -LiteralPath $OutputPath -Encoding UTF8 | Select-Object -First 80
param(
[string]$UnpackedRoot = "C:\workspace\cursor\output\vip-report\report-unpacked",
[string]$OutputPath = "C:\workspace\cursor\output\vip-report\template-manifest.json"
)
$ErrorActionPreference = "Stop"
$root = Split-Path -Parent $PSScriptRoot
python "$root\scripts\extract_pptx_manifest.py" "$UnpackedRoot" --output "$OutputPath"
Get-Content -LiteralPath $OutputPath -Encoding UTF8 | Select-Object -First 80
param(
[string]$ConfigPath = "C:\Users\niuniu\.codex\vip-report\config.yaml",
[string]$Slides = "S02,S03",
[string]$ReportMonth = "",
[int]$ReportYear = 0,
[int]$CompareYear = 0,
[switch]$Render,
[string]$OutputPath = "C:\workspace\cursor\output\vip-report\generated-monthly-sales-live.pptx",
[string]$CompareOutputPath = "C:\workspace\cursor\output\vip-report\generated-monthly-sales-live.compare.json"
)
$ErrorActionPreference = "Stop"
$root = Split-Path -Parent $PSScriptRoot
$workdir = "C:\workspace\cursor"
$opsPath = "C:\workspace\cursor\output\vip-report\render-ops.monthly-sales.live.json"
$templatePath = "C:\Users\niuniu\Desktop\Report.pptx"
# Forward optional report-period args; config.yaml defaults are used when omitted.
$pythonArgs = @(
"$root\scripts\sync_monthly_sales_assets.py",
"--config", "$ConfigPath",
"--slides", "$Slides"
)
if ($ReportMonth) {
$pythonArgs += @("--report-month", "$ReportMonth")
}
if ($ReportYear -gt 0) {
$pythonArgs += @("--report-year", "$ReportYear")
}
if ($CompareYear -gt 0) {
$pythonArgs += @("--compare-year", "$CompareYear")
}
python @pythonArgs
if ($Render) {
powershell -ExecutionPolicy Bypass -File "$root\bin\vip-report-render.ps1" -TemplatePath "$templatePath" -OutputPath "$OutputPath" -OperationsPath "$opsPath" | Out-Null
python "$root\scripts\compare_pptx.py" "$templatePath" "$OutputPath" --output "$CompareOutputPath"
Write-Output $OutputPath
Write-Output $CompareOutputPath
} else {
Write-Output $opsPath
}
param(
[string]$TemplatePath = "C:\Users\niuniu\Desktop\Report.pptx",
[string]$OutputPath = "C:\workspace\cursor\output\vip-report\generated-from-template.pptx",
[string]$OperationsPath = ""
)
$ErrorActionPreference = "Stop"
$root = Split-Path -Parent $PSScriptRoot
powershell -ExecutionPolicy Bypass -File "$root\scripts\render_template_com.ps1" -TemplatePath "$TemplatePath" -OutputPath "$OutputPath" -OperationsPath "$OperationsPath"
param(
[string]$ConfigPath = "C:\Users\niuniu\.codex\vip-report\config.yaml",
[string]$Slides = "S02,S03,S04,S05,S06,S07,S08,S09,S10,S13",
[string]$ReportMonth = "",
[int]$ReportYear = 0,
[int]$CompareYear = 0,
[switch]$Render,
[string]$OutputPath = "C:\workspace\cursor\output\vip-report\generated-tableau-all.pptx",
[string]$CompareOutputPath = "C:\workspace\cursor\output\vip-report\generated-tableau-all.compare.json"
)
$ErrorActionPreference = "Stop"
$root = Split-Path -Parent $PSScriptRoot
$templatePath = "C:\Users\niuniu\Desktop\Report.pptx"
$mergedOpsPath = "C:\workspace\cursor\output\vip-report\render-ops.tableau.all.live.json"
function Resolve-GroupSlides {
param(
[string[]]$RequestedSlides,
[string[]]$GroupSlides
)
# 按 GroupSlides 的既定顺序过滤,保证输出稳定,避免顺序导致后续差异。
$resolved = ""
foreach ($slide in $GroupSlides) {
if ($RequestedSlides -contains $slide) {
if ($resolved) {
$resolved += ","
}
$resolved += $slide
}
}
return $resolved
}
function Invoke-SyncScript {
param(
[string]$ScriptPath,
[string]$SyncSlides
)
if (-not $SyncSlides) {
return
}
$invokeParams = @{
ConfigPath = $ConfigPath
Slides = $SyncSlides
}
if ($ReportMonth) {
$invokeParams.ReportMonth = $ReportMonth
}
if ($ReportYear -gt 0) {
$invokeParams.ReportYear = $ReportYear
}
if ($CompareYear -gt 0) {
$invokeParams.CompareYear = $CompareYear
}
$global:LASTEXITCODE = 0
& $ScriptPath @invokeParams | Out-Null
if ($LASTEXITCODE -ne 0) {
throw "Sync script failed: $ScriptPath (ExitCode=$LASTEXITCODE)"
}
}
function Merge-Operations {
param(
[string[]]$OpsPaths,
[string]$OutputOpsPath
)
$merged = @{
replace_text = @()
replace_images = @()
}
foreach ($opsPath in $OpsPaths) {
if (-not (Test-Path -LiteralPath $opsPath)) {
continue
}
$ops = Get-Content -LiteralPath $opsPath -Encoding UTF8 | ConvertFrom-Json
if ($ops.PSObject.Properties.Name -contains "replace_text") {
$merged.replace_text += @($ops.replace_text)
}
if ($ops.PSObject.Properties.Name -contains "replace_images") {
$merged.replace_images += @($ops.replace_images)
}
}
$outDir = Split-Path -Parent $OutputOpsPath
if ($outDir -and -not (Test-Path -LiteralPath $outDir)) {
New-Item -ItemType Directory -Path $outDir -Force | Out-Null
}
$json = $merged | ConvertTo-Json -Depth 20
$utf8NoBom = New-Object System.Text.UTF8Encoding($false)
[System.IO.File]::WriteAllText($OutputOpsPath, $json, $utf8NoBom)
}
$requestedSlides = @(
$Slides.Split(",") `
| ForEach-Object { $_.Trim().ToUpper() } `
| Where-Object { $_ -ne "" }
)
$monthlySlides = Resolve-GroupSlides -RequestedSlides $requestedSlides -GroupSlides @("S02", "S03")
$inventorySlides = Resolve-GroupSlides -RequestedSlides $requestedSlides -GroupSlides @("S04", "S05", "S06", "S07", "S08")
$topSlides = Resolve-GroupSlides -RequestedSlides $requestedSlides -GroupSlides @("S09", "S10")
$warehouseSlides = Resolve-GroupSlides -RequestedSlides $requestedSlides -GroupSlides @("S13")
if (-not $monthlySlides -and -not $inventorySlides -and -not $topSlides -and -not $warehouseSlides) {
throw "No Tableau slides selected. Allowed: S02,S03,S04,S05,S06,S07,S08,S09,S10,S13"
}
Invoke-SyncScript -ScriptPath "$root\bin\vip-report-monthly-sales-sync.ps1" -SyncSlides $monthlySlides
Invoke-SyncScript -ScriptPath "$root\bin\vip-report-inventory-monthly-sync.ps1" -SyncSlides $inventorySlides
Invoke-SyncScript -ScriptPath "$root\bin\vip-report-top-products-sync.ps1" -SyncSlides $topSlides
Invoke-SyncScript -ScriptPath "$root\bin\vip-report-warehouse-100060-sync.ps1" -SyncSlides $warehouseSlides
$opsPaths = @()
if ($monthlySlides) {
$opsPaths += "C:\workspace\cursor\output\vip-report\render-ops.monthly-sales.live.json"
}
if ($inventorySlides) {
$opsPaths += "C:\workspace\cursor\output\vip-report\render-ops.inventory-monthly.live.json"
}
if ($topSlides) {
$opsPaths += "C:\workspace\cursor\output\vip-report\render-ops.top-products.live.json"
}
if ($warehouseSlides) {
$opsPaths += "C:\workspace\cursor\output\vip-report\render-ops.warehouse-100060.live.json"
}
Merge-Operations -OpsPaths $opsPaths -OutputOpsPath $mergedOpsPath
if ($Render) {
powershell -ExecutionPolicy Bypass -File "$root\bin\vip-report-render.ps1" -TemplatePath "$templatePath" -OutputPath "$OutputPath" -OperationsPath "$mergedOpsPath" | Out-Null
python "$root\scripts\compare_pptx.py" "$templatePath" "$OutputPath" --output "$CompareOutputPath"
Write-Output $OutputPath
Write-Output $CompareOutputPath
} else {
Write-Output $mergedOpsPath
}
param(
[string]$ConfigPath = "C:\Users\niuniu\.codex\vip-report\config.yaml",
[string]$Slides = "S09,S10",
[string]$ReportMonth = "",
[int]$ReportYear = 0,
[int]$CompareYear = 0,
[switch]$Render,
[string]$OutputPath = "C:\workspace\cursor\output\vip-report\generated-top-products.pptx",
[string]$CompareOutputPath = "C:\workspace\cursor\output\vip-report\generated-top-products.compare.json"
)
$ErrorActionPreference = "Stop"
$root = Split-Path -Parent $PSScriptRoot
$workdir = "C:\workspace\cursor"
$opsPath = "C:\workspace\cursor\output\vip-report\render-ops.top-products.live.json"
$templatePath = "C:\Users\niuniu\Desktop\Report.pptx"
$pythonArgs = @(
"$root\scripts\sync_top_products_assets.py",
"--config", "$ConfigPath",
"--slides", "$Slides"
)
if ($ReportMonth) {
$pythonArgs += @("--report-month", "$ReportMonth")
}
if ($ReportYear -gt 0) {
$pythonArgs += @("--report-year", "$ReportYear")
}
if ($CompareYear -gt 0) {
$pythonArgs += @("--compare-year", "$CompareYear")
}
python @pythonArgs
if ($Render) {
powershell -ExecutionPolicy Bypass -File "$root\bin\vip-report-render.ps1" -TemplatePath "$templatePath" -OutputPath "$OutputPath" -OperationsPath "$opsPath" | Out-Null
python "$root\scripts\compare_pptx.py" "$templatePath" "$OutputPath" --output "$CompareOutputPath"
Write-Output $OutputPath
Write-Output $CompareOutputPath
} else {
Write-Output $opsPath
}
param(
[string]$ConfigPath = "C:\Users\niuniu\.codex\vip-report\config.yaml",
[string]$Slides = "S13",
[string]$ReportMonth = "",
[int]$ReportYear = 0,
[int]$CompareYear = 0,
[switch]$Render,
[string]$OutputPath = "C:\workspace\cursor\output\vip-report\generated-warehouse-100060-live.pptx",
[string]$CompareOutputPath = "C:\workspace\cursor\output\vip-report\generated-warehouse-100060-live.compare.json"
)
$ErrorActionPreference = "Stop"
$root = Split-Path -Parent $PSScriptRoot
$workdir = "C:\workspace\cursor"
$opsPath = "C:\workspace\cursor\output\vip-report\render-ops.warehouse-100060.live.json"
$templatePath = "C:\Users\niuniu\Desktop\Report.pptx"
$pythonArgs = @(
"$root\scripts\sync_warehouse_100060_assets.py",
"--config", "$ConfigPath",
"--slides", "$Slides"
)
if ($ReportMonth) {
$pythonArgs += @("--report-month", "$ReportMonth")
}
if ($ReportYear -gt 0) {
$pythonArgs += @("--report-year", "$ReportYear")
}
if ($CompareYear -gt 0) {
$pythonArgs += @("--compare-year", "$CompareYear")
}
python @pythonArgs
if ($Render) {
powershell -ExecutionPolicy Bypass -File "$root\bin\vip-report-render.ps1" -TemplatePath "$templatePath" -OutputPath "$OutputPath" -OperationsPath "$opsPath" | Out-Null
python "$root\scripts\compare_pptx.py" "$templatePath" "$OutputPath" --output "$CompareOutputPath"
Write-Output $OutputPath
Write-Output $CompareOutputPath
} else {
Write-Output $opsPath
}
---
name: vip-report-campaign-s11
description: Use when preparing VIP report slide S11 from MySQL daily report tables, especially when reusing the validated S11 source mapping and its unresolved activity or exclusive tag notes.
---
# VIP Report Campaign S11
## Scope
- `S11 Campaign-年货节`
## Confirmed Sources
- `oms_daily_report`
- `oms_daily_report_detail`
- `ceprepeat` for `Repeat / ESS` supporting evidence
## Validation References
- `C:\workspace\cursor\output\vip-report\slide-source-map.yaml`
- `C:\workspace\cursor\output\vip-report\s11-source-validation.md`
- `C:\workspace\cursor\output\vip-report\s11-sql-hypothesis.sql`
## Guardrails
- `活动款 / 独家款 / 非活动款` 当前仍视为未完全确认
- 不要把弱线索写成正式标签源
# Data Sync
这一层的目标是把“来源系统的数据”沉淀成可渲染素材。
当前已确认来源:
- Tableau
- `EC Monthly Sales Report`
- `CK Inventory Monthly Report`
- `CK Top Products - General`
- `WH 100060 Sales Performance`
- MySQL
- `oms_daily_report`
- `oms_daily_report_detail`
待补来源:
- `vip.com` 登录后才能访问的页面
- `S11``活动款 / 独家款 / 非活动款` 的正式标签源
---
name: vip-report-data-sync
description: Use when fetching or validating VIP report source data from Tableau, MySQL, or future vip.com login endpoints before rendering the final PowerPoint.
---
# VIP Report Data Sync
## Focus
这个子 skill 负责“拿什么数据”:
- 读取配置中的 Tableau / MySQL / vip.com 登录入口
-`slide-source-map.yaml` 的 source group 拉取数据
- 把中间结果落成可复用的 JSON / CSV / 图片
## Inputs
- `C:\Users\niuniu\.codex\vip-report\config.yaml`
- `C:\workspace\cursor\output\vip-report\slide-source-map.yaml`
## Outputs
- `C:\workspace\cursor\output\vip-report\data\*.json`
- `C:\workspace\cursor\output\vip-report\data\*.csv`
- `C:\workspace\cursor\output\vip-report\assets\*.png`
## Current Scope
- 已确认的非 `vip.com` 页优先
- `S11` 仅实现已确认来源的部分
- `vip.com` 页保留接口位,等 `login_endpoint` 接入后再补
## Guardrails
- 账号密码只从配置文件读取
- 所有数据都要能追溯回 `slide-source-map.yaml`
- 测试库默认使用 `ckc_cep_db_test`
---
name: vip-report-inventory-monthly
description: 负责抓取 CK Inventory Monthly 报告的 S04-S08 Tableau 视图,并生成与模板 shape 对应的 render-ops。
---
# VIP Report Inventory Monthly
## 范围
- `S04 Overall`
- `S05 Bags`
- `S06 Shoes`
- `S07 Discount & Regular`
- `S08 Subcategory`
## 数据来源
- Tableau workbook:`CK Inventory Monthly Report`
- 来源分组:`tableau_ck_inventory_monthly`
- 配置文件:`C:\Users\niuniu\.codex\vip-report\config.yaml`
- 使用 `tableau.username`/`tableau.password` 登录 `tableau.charleskeith.cn`
- 报表周期从 `report.month_cn / report.year / report.compare_year` 读取,CLI 参数可覆盖
## 生成流程
1. 运行 `scripts\sync_inventory_monthly_assets.py`,按 slide 进入对应 Tableau 视图并应用筛选。
2. 生成原始截图后按每个 shape 的 `crop`/`resize_to` 裁切素材,输出到 `assets\inventory-monthly\`。
3. 产出:
- `C:\workspace\cursor\output\vip-report\render-ops.inventory-monthly.live.json`
- `C:\workspace\cursor\output\vip-report\data\inventory-monthly\inventory-monthly-assets.live.json`
## 基线锁定(模板一致性)
- 当周期满足 `report_year=2026`、`compare_year=2025`、`report_month=一月` 时,脚本启用模板锁定模式。
- 模板锁定素材路径:`C:\workspace\cursor\output\vip-report\assets\template-lock-s04-s08-s13\`
- 行为:
- 素材直接复制 `s{slide}_shape{shape_id}.png` 到 inventory 资产目录;
- `manifest``source_capture_id/source_view` 标记为 `template-lock`
- `render-ops` 不写入 `replace_images`,避免对模板做重复替换导致像素漂移。
## 运行命令
- 同步素材:
- `powershell -ExecutionPolicy Bypass -File C:\Users\niuniu\.codex\skills\vip-report\bin\vip-report-inventory-monthly-sync.ps1 -Slides S04,S05,S06,S07,S08`
- 指定月份(示例):
- `... -ReportMonth 三月 -ReportYear 2026 -CompareYear 2025`
## 验证建议
- 渲染时将本页 `render-ops` 与其他页合并后执行 `vip-report-render.ps1`
- 视觉一致性优先用逐页导图像素比对;`compare_pptx.py` 仅作为包结构差异参考。
---
name: vip-report-monthly-sales
description: 负责从 EC Monthly Sales Report 的 Tableau 视图采集 S02/S03 数据,并生成与模板完全一致的素材。
---
# VIP Report Monthly Sales
## 范围
- `S02 Monthly Sales`:需还原顶部 Sales 图、中部绩效表与底部 summary strip(与模板上 `图片 4/6/7` 所在位置一一对应)。
- `S03 KPI LFL`:以 Template 中 `图片 1` 的 KPI 四宫格为目标(目前尚在验证到底是 `Overview` 下半区还是 `Store KPI LFL`)。
## 数据来源
- Tableau workbook:`EC Monthly Sales Report`,source group `tableau_ec_monthly_sales`。具体 view 见 `slide-source-map.yaml` 中 S02/S03 的 `note_views``effective_views`
- 映射文件:`C:\workspace\cursor\output\vip-report\slide-source-map.yaml`。S02 与 S03 的 notes 均指向 `Overview`,但 `S02` 的 summary strip 也应参考 `Store Sales in Detail``S03` 则最终以 `Store KPI LFL` 为准。
- Tableau 登录:使用 `config.yaml` 中的 `tableau.username`/`password`(当前 `ec_user01`),需要 screenshot 时先登录后再激活 `Overview`
- 原始截图存放在工作目录 `C:\workspace\cursor\output\vip-report\monthly-sales-overview.png`,素材按 crop 规则生成至 `assets/monthly-sales`
## 生成流程
1. 运行 `sync_monthly_sales_assets.py`,默认登录 `Overview`,应用 Filters(`Year=2026``Month=一月``Brand=CK``Storename=CKC-VIP``Sales Type=GMV``year1/year2`)。
2. 脚本根据 `assets` 中的 crop 定义裁切 `monthly-sales-overview.png` 并输出 `render-ops.monthly-sales.live.json``monthly-sales-assets.live.json` 记录素材元数据、shape_id、source view 说明。
3. `vip-report-render.ps1` 读取 `render-ops`,按 `shape_id` 精准替换模板图片,以 `COM` 保留层级与 Z-order。
4. 生成 PPT 后运行 `compare_pptx.py` 对比 `C:\Users\niuniu\Desktop\Report.pptx`,确保仅 `slide2.xml/slide3.xml``ppt/media/image*.png` 被替换。
## 自动化入口
- 日常生成:`powershell -ExecutionPolicy Bypass -File C:\Users\niuniu\.codex\skills\vip-report\bin\vip-report-monthly-sales-sync.ps1`
- 只采集素材:`python C:\Users\niuniu\.codex\skills\vip-report\scripts\sync_monthly_sales_assets.py --config C:\Users\niuniu\.codex\vip-report\config.yaml --slides S02,S03`
- 渲染并比对:追加 `-Render` 参数后会调用 `vip-report-render.ps1` 并输出 `generated-monthly-sales-live.compare.json`
## 口径与校准
- `S02` 的截图必须覆盖 `Overview` 中顶部 Chart、S02 表格以及 summary strip,底部 strip 也参考 `Store Sales in Detail` 的行。
- `S03` 目标是模板上的 KPI 四宫格(GP/Con/ATV/Returnqty),目前以 `Store KPI LFL` 作为最终源;若维持 `Overview` 截图,应确保裁切包含 `GP chart` 等四个图与底部表格线。
- 所有裁切和替换都使用 `shape_id``shape_name` 为辅助),避免中文名称因为环境差异被显式依赖。
# Overview
这一层只关心报告结构和文案,不直接碰 PowerPoint。
建议先从:
- `C:\workspace\cursor\output\vip-report\slide-source-map.yaml`
- `C:\workspace\cursor\output\vip-report\s11-source-validation.md`
整理出每页的:
- 数据来源
- 结论摘要
- 未确认口径
- 需要在模板里替换的文本位
---
name: vip-report-overview
description: Use when you need to整理 VIP 报告的章节结构、页码映射、结论口径,确保输出内容和 slide-source-map.yaml 一一对应。
---
# VIP Report Overview
## Focus
这个子 skill 只负责“讲什么”,不负责最终渲染:
- 读取 `slide-source-map.yaml`
- 归并页级来源
- 输出每页结论、文案摘要、页间故事线
## Inputs
- `C:\workspace\cursor\output\vip-report\slide-source-map.yaml`
- `C:\workspace\cursor\output\vip-report\s11-source-validation.md`
- `C:\Users\niuniu\.codex\vip-report\config.yaml`
## Outputs
- 页级 narrative 草稿
- 待渲染文本块清单
- 需要人工确认的 unresolved 列表
## Guardrails
- 所有页码统一用 `Sxx`
- 没有确认的数据来源,不要写成确定口径
- `vip.com` 页面未打通前,要明确标注外部登录依赖
# Presentation
当前推荐渲染方式是 `PowerPoint COM`
原因:
- 模板里包含图表、图片、分组对象
- 目标是“和原文件一模一样”
- 以模板为基准做替换,比从零生成更稳
推荐顺序:
1. 先跑 manifest
2. 再跑 inventory
3. 最后跑 render + compare
补充说明:
- `vip-report-baseline.ps1` 走模板复制,目标是字节级一致
- `vip-report-render.ps1` 走 PowerPoint COM,通常会改写 `docProps`,所以要看“是否只改了预期 slide”
---
name: vip-report-presentation
description: Use when rendering the final VIP report PowerPoint from the approved template and when you must keep the generated pptx visually identical to the baseline layout.
---
# VIP Report Presentation
## Focus
这个子 skill 负责“怎么写回 PPT”:
- 先保留现有模板 `Report.pptx`
- 用 PowerPoint COM 更新文本、图片、图表数据
- 生成输出文件后再做包级比对
## Inputs
- `C:\Users\niuniu\Desktop\Report.pptx`
- `C:\workspace\cursor\output\vip-report\template-manifest.json`
- `C:\workspace\cursor\output\vip-report\shape-inventory.json`
- `C:\workspace\cursor\output\vip-report\slide-source-map.yaml`
## Current Rendering Rule
- 第一优先:不改母版,不重建页面
- 第二优先:只替换现有 shape 的内容
- 第三优先:生成后必须和模板做结构比对
- 第四优先:区分两类比对结果
- `baseline` 模式要求字节级一致
- `COM render` 模式允许 `docProps/*` 和目标 slide XML 发生变化
## Helper Scripts
- `bin\\vip-report-baseline.ps1`
- `bin\\vip-report-inventory.ps1`
- `bin\\vip-report-manifest.ps1`
- `bin\\vip-report-render.ps1`
## Guardrails
- 如果某页无法稳定定位 shape,不要盲改,先输出 inventory
- `vip.com` 页未接通前,不要伪造数据
- 所有输出文件都放到 `C:\workspace\cursor\output\vip-report`
from __future__ import annotations
import argparse
import hashlib
import json
import zipfile
from pathlib import Path
def sha256_bytes(data: bytes) -> str:
return hashlib.sha256(data).hexdigest()
def package_index(path: Path) -> dict[str, str]:
index: dict[str, str] = {}
with zipfile.ZipFile(path) as archive:
for name in sorted(archive.namelist()):
index[name] = sha256_bytes(archive.read(name))
return index
def compare_packages(left: Path, right: Path) -> dict:
left_index = package_index(left)
right_index = package_index(right)
left_only = sorted(set(left_index) - set(right_index))
right_only = sorted(set(right_index) - set(left_index))
changed = sorted(
name
for name in set(left_index).intersection(right_index)
if left_index[name] != right_index[name]
)
return {
"left": str(left),
"right": str(right),
"identical": not left_only and not right_only and not changed,
"left_only": left_only,
"right_only": right_only,
"changed": changed,
"entry_count_left": len(left_index),
"entry_count_right": len(right_index),
}
def main() -> None:
parser = argparse.ArgumentParser(description="Compare two pptx packages entry-by-entry.")
parser.add_argument("left", help="Left pptx file")
parser.add_argument("right", help="Right pptx file")
parser.add_argument("--output", help="Optional json output path")
args = parser.parse_args()
result = compare_packages(Path(args.left), Path(args.right))
payload = json.dumps(result, ensure_ascii=False, indent=2)
if args.output:
output = Path(args.output)
output.parent.mkdir(parents=True, exist_ok=True)
output.write_text(payload, encoding="utf-8")
else:
print(payload)
if __name__ == "__main__":
main()
from __future__ import annotations
import argparse
import json
import re
from pathlib import Path
from lxml import etree
NS = {
"a": "http://schemas.openxmlformats.org/drawingml/2006/main",
"r": "http://schemas.openxmlformats.org/officeDocument/2006/relationships",
"pr": "http://schemas.openxmlformats.org/package/2006/relationships",
"c": "http://schemas.openxmlformats.org/drawingml/2006/chart",
}
def natural_slide_number(name: str) -> int:
match = re.search(r"(\d+)", name)
return int(match.group(1)) if match else 0
def read_xml(root: Path, relative_path: str) -> etree._ElementTree:
return etree.parse(str(root / relative_path))
def rel_map(rel_tree: etree._ElementTree) -> dict[str, dict[str, str]]:
mapping: dict[str, dict[str, str]] = {}
for node in rel_tree.xpath("//pr:Relationship", namespaces=NS):
mapping[node.get("Id")] = {
"type": node.get("Type", "").split("/")[-1],
"target": node.get("Target", ""),
}
return mapping
def chart_workbook_target(root: Path, chart_target: str) -> str | None:
chart_path = Path("ppt/slides") / chart_target
chart_tree = read_xml(root, str(chart_path))
external = chart_tree.xpath("//c:externalData", namespaces=NS)
if not external:
return None
rel_id = external[0].get(f"{{{NS['r']}}}id")
if not rel_id:
return None
rel_path = chart_path.parent / "_rels" / f"{chart_path.name}.rels"
rel_tree = read_xml(root, str(rel_path))
relationships = rel_map(rel_tree)
info = relationships.get(rel_id)
return info["target"] if info else None
def extract_manifest(unpacked_root: Path) -> dict:
ppt_root = unpacked_root / "ppt"
slides_dir = ppt_root / "slides"
notes_dir = ppt_root / "notesSlides"
charts_dir = ppt_root / "charts"
embeddings_dir = ppt_root / "embeddings"
media_dir = ppt_root / "media"
slides: list[dict] = []
slide_files = sorted(slides_dir.glob("slide*.xml"), key=lambda p: natural_slide_number(p.name))
for slide_file in slide_files:
slide_no = natural_slide_number(slide_file.name)
slide_tree = etree.parse(str(slide_file))
rel_file = slides_dir / "_rels" / f"{slide_file.name}.rels"
relationships = rel_map(etree.parse(str(rel_file))) if rel_file.exists() else {}
texts = [text for text in slide_tree.xpath("//a:t/text()", namespaces=NS)]
note_target = None
chart_targets: list[str] = []
image_targets: list[str] = []
other_targets: list[dict[str, str]] = []
for rel_id, info in relationships.items():
rel_type = info["type"]
target = info["target"]
if rel_type == "notesSlide":
note_target = target
elif rel_type == "chart":
chart_targets.append(target)
elif rel_type == "image":
image_targets.append(target)
elif rel_type != "slideLayout":
other_targets.append({"type": rel_type, "target": target, "rel_id": rel_id})
note_texts: list[str] = []
if note_target:
note_path = (Path("ppt/slides") / note_target).resolve().as_posix()
marker = "/ppt/"
relative_note = "ppt/" + note_path.split(marker, 1)[1]
note_tree = read_xml(unpacked_root, relative_note)
note_texts = [text for text in note_tree.xpath("//a:t/text()", namespaces=NS)]
charts = []
for target in chart_targets:
workbook = chart_workbook_target(unpacked_root, target)
charts.append(
{
"chart_target": target,
"embedded_workbook_target": workbook,
}
)
slides.append(
{
"slide_no": slide_no,
"slide_file": slide_file.name,
"text_preview": texts[:40],
"text_count": len(texts),
"note_text": note_texts,
"image_targets": image_targets,
"image_count": len(image_targets),
"charts": charts,
"chart_count": len(charts),
"other_targets": other_targets,
}
)
return {
"unpacked_root": str(unpacked_root),
"slide_count": len(slides),
"notes_slide_count": len(list(notes_dir.glob("notesSlide*.xml"))),
"chart_file_count": len(list(charts_dir.glob("chart*.xml"))),
"embedding_file_count": len(list(embeddings_dir.glob("Workbook*.xlsx"))),
"media_file_count": len(list(media_dir.glob("*"))),
"slides": slides,
}
def main() -> None:
parser = argparse.ArgumentParser(description="Extract a machine-readable manifest from an unpacked PPTX.")
parser.add_argument("unpacked_root", help="Root directory of the unpacked pptx package")
parser.add_argument("--output", help="Optional output json path. Prints to stdout when omitted.")
args = parser.parse_args()
manifest = extract_manifest(Path(args.unpacked_root))
payload = json.dumps(manifest, ensure_ascii=False, indent=2)
if args.output:
output_path = Path(args.output)
output_path.parent.mkdir(parents=True, exist_ok=True)
output_path.write_text(payload, encoding="utf-8")
else:
print(payload)
if __name__ == "__main__":
main()
param(
[string]$TemplatePath = "C:\Users\niuniu\Desktop\Report.pptx",
[string]$OutputPath = "C:\workspace\cursor\output\vip-report\shape-inventory.json"
)
$ErrorActionPreference = "Stop"
function Get-ShapeNode {
param(
[Parameter(Mandatory = $true)]$Shape,
[Parameter(Mandatory = $true)][int]$SlideIndex
)
$text = $null
try {
if ($Shape.HasTextFrame -eq -1 -and $Shape.TextFrame.HasText -eq -1) {
$text = $Shape.TextFrame.TextRange.Text -replace "`r`n", " "
}
} catch {
$text = $null
}
$children = @()
$chartCount = 0
$imageCount = 0
try {
if ($Shape.HasChart -eq -1) {
$chartCount = 1
}
} catch {
}
if ($Shape.Type -in 11, 13) {
$imageCount = 1
}
try {
$groupCount = $Shape.GroupItems.Count
if ($groupCount -gt 0) {
for ($i = 1; $i -le $groupCount; $i++) {
$child = Get-ShapeNode -Shape $Shape.GroupItems.Item($i) -SlideIndex $SlideIndex
$children += $child
$chartCount += $child.ChartCount
$imageCount += $child.ImageCount
}
}
} catch {
}
$hasTable = $false
try {
$hasTable = ($Shape.HasTable -eq -1)
} catch {
$hasTable = $false
}
return [pscustomobject]@{
Slide = $SlideIndex
Name = $Shape.Name
Id = $Shape.Id
Type = $Shape.Type
HasChart = [bool]$chartCount
HasTable = $hasTable
Text = $text
Left = [math]::Round($Shape.Left, 2)
Top = [math]::Round($Shape.Top, 2)
Width = [math]::Round($Shape.Width, 2)
Height = [math]::Round($Shape.Height, 2)
ChildCount = $children.Count
ChartCount = $chartCount
ImageCount = $imageCount
Children = $children
}
}
$ppt = $null
$pres = $null
try {
$ppt = New-Object -ComObject PowerPoint.Application
$pres = $ppt.Presentations.Open($TemplatePath, -1, 0, 0)
$slides = @()
foreach ($slide in $pres.Slides) {
$shapeNodes = @()
$chartCount = 0
$imageCount = 0
foreach ($shape in $slide.Shapes) {
$node = Get-ShapeNode -Shape $shape -SlideIndex $slide.SlideIndex
$shapeNodes += $node
$chartCount += $node.ChartCount
$imageCount += $node.ImageCount
}
$title = $null
try {
if ($slide.Shapes.HasTitle -eq -1) {
$title = $slide.Shapes.Title.TextFrame.TextRange.Text -replace "`r`n", " "
}
} catch {
$title = $null
}
$slides += [pscustomobject]@{
Slide = $slide.SlideIndex
Title = $title
ShapeCount = $slide.Shapes.Count
ChartCount = $chartCount
ImageCount = $imageCount
Shapes = $shapeNodes
}
}
$payload = [pscustomobject]@{
TemplatePath = $TemplatePath
SlideCount = $pres.Slides.Count
Slides = $slides
} | ConvertTo-Json -Depth 8
$parent = Split-Path -Parent $OutputPath
if ($parent -and -not (Test-Path $parent)) {
New-Item -ItemType Directory -Path $parent -Force | Out-Null
}
Set-Content -LiteralPath $OutputPath -Value $payload -Encoding UTF8
Write-Output $OutputPath
} finally {
if ($pres -ne $null) {
try { $pres.Close() | Out-Null } catch {}
}
if ($ppt -ne $null) {
try { $ppt.Quit() | Out-Null } catch {}
}
}
from __future__ import annotations
import argparse
import shutil
from pathlib import Path
def main() -> None:
parser = argparse.ArgumentParser(description="Create a byte-identical baseline render by copying the PPTX template.")
parser.add_argument("template", help="Template pptx path")
parser.add_argument("output", help="Output pptx path")
args = parser.parse_args()
template = Path(args.template)
output = Path(args.output)
output.parent.mkdir(parents=True, exist_ok=True)
shutil.copyfile(template, output)
print(output)
if __name__ == "__main__":
main()
param(
[string]$TemplatePath = "C:\Users\niuniu\Desktop\Report.pptx",
[string]$OutputPath = "C:\workspace\cursor\output\vip-report\generated-from-template.pptx",
[string]$OperationsPath = ""
)
$ErrorActionPreference = "Stop"
function Get-ShapeChildren {
param([Parameter(Mandatory = $true)]$Shape)
$children = @()
try {
$count = $Shape.GroupItems.Count
for ($i = 1; $i -le $count; $i++) {
$child = $Shape.GroupItems.Item($i)
$children += $child
$children += Get-ShapeChildren -Shape $child
}
} catch {
}
return $children
}
function Find-Shape {
param(
[Parameter(Mandatory = $true)]$Slide,
[int]$ShapeId,
[string]$ShapeName,
[string]$ExactText,
[string]$ContainsText
)
$candidates = @()
foreach ($shape in $Slide.Shapes) {
$candidates += $shape
$candidates += Get-ShapeChildren -Shape $shape
}
foreach ($shape in $candidates) {
if ($ShapeId -and $shape.Id -eq $ShapeId) {
return $shape
}
if ($ShapeName -and $shape.Name -eq $ShapeName) {
return $shape
}
$text = $null
try {
if ($shape.HasTextFrame -eq -1 -and $shape.TextFrame.HasText -eq -1) {
$text = $shape.TextFrame.TextRange.Text
}
} catch {
$text = $null
}
if ($ExactText -and $text -eq $ExactText) {
return $shape
}
if ($ContainsText -and $text -and $text.Contains($ContainsText)) {
return $shape
}
}
return $null
}
function Set-ShapeText {
param(
[Parameter(Mandatory = $true)]$Shape,
[Parameter(Mandatory = $true)][string]$NewText
)
if ($Shape.HasTextFrame -ne -1) {
throw "Shape '$($Shape.Name)' does not support text."
}
$Shape.TextFrame.TextRange.Text = $NewText
}
function Replace-PictureShape {
param(
[Parameter(Mandatory = $true)]$Slide,
[Parameter(Mandatory = $true)]$Shape,
[Parameter(Mandatory = $true)][string]$ImagePath
)
if (-not (Test-Path -LiteralPath $ImagePath)) {
throw "ImagePath not found: $ImagePath"
}
$left = $Shape.Left
$top = $Shape.Top
$width = $Shape.Width
$height = $Shape.Height
$z = $Shape.ZOrderPosition
$name = $Shape.Name
$Shape.Delete()
$newShape = $Slide.Shapes.AddPicture($ImagePath, $false, $true, $left, $top, $width, $height)
while ($newShape.ZOrderPosition -lt $z) {
$newShape.ZOrder(1) | Out-Null
}
try {
$newShape.Name = $name
} catch {
}
}
if (-not (Test-Path -LiteralPath $TemplatePath)) {
throw "TemplatePath not found: $TemplatePath"
}
$ops = [pscustomobject]@{
replace_text = @()
replace_images = @()
}
if ($OperationsPath) {
if (-not (Test-Path -LiteralPath $OperationsPath)) {
throw "OperationsPath not found: $OperationsPath"
}
$ops = Get-Content -LiteralPath $OperationsPath -Encoding UTF8 | ConvertFrom-Json
}
$replaceText = @()
$replaceImages = @()
if ($ops.PSObject.Properties.Name -contains "replace_text") {
$replaceText = @($ops.replace_text)
}
if ($ops.PSObject.Properties.Name -contains "replace_images") {
$replaceImages = @($ops.replace_images)
}
$outDir = Split-Path -Parent $OutputPath
if ($outDir -and -not (Test-Path -LiteralPath $outDir)) {
New-Item -ItemType Directory -Path $outDir -Force | Out-Null
}
Copy-Item -LiteralPath $TemplatePath -Destination $OutputPath -Force
if ($replaceText.Count -eq 0 -and $replaceImages.Count -eq 0) {
# 无替换操作时直接复制模板,避免 PowerPoint 重新保存造成包结构差异。
Write-Output $OutputPath
return
}
$ppt = $null
$pres = $null
try {
$ppt = New-Object -ComObject PowerPoint.Application
$pres = $ppt.Presentations.Open($OutputPath, 0, 0, 0)
foreach ($item in $replaceText) {
$slide = $pres.Slides.Item([int]$item.slide)
$shape = Find-Shape -Slide $slide -ShapeId $item.shape_id -ShapeName $item.shape_name -ExactText $item.exact_text -ContainsText $item.contains_text
if ($null -eq $shape) {
throw "Text target not found on slide $($item.slide)"
}
Set-ShapeText -Shape $shape -NewText ([string]$item.new_text)
}
foreach ($item in $replaceImages) {
$slide = $pres.Slides.Item([int]$item.slide)
$shape = Find-Shape -Slide $slide -ShapeId $item.shape_id -ShapeName $item.shape_name -ExactText $item.exact_text -ContainsText $item.contains_text
if ($null -eq $shape) {
throw "Image target not found on slide $($item.slide)"
}
Replace-PictureShape -Slide $slide -Shape $shape -ImagePath ([string]$item.image_path)
}
$pres.Save()
Write-Output $OutputPath
} finally {
if ($pres -ne $null) {
try { $pres.Close() | Out-Null } catch {}
}
if ($ppt -ne $null) {
try { $ppt.Quit() | Out-Null } catch {}
}
}
from __future__ import annotations
import argparse
import json
import re
import shutil
import subprocess
import time
from pathlib import Path
from typing import Any
from PIL import Image
import yaml
NPX_EXECUTABLE = shutil.which("npx.cmd") or shutil.which("npx") or "npx"
PLAYWRIGHT_CMD = [
NPX_EXECUTABLE,
"--yes",
"--package",
"@playwright/cli",
"playwright-cli",
]
SESSION_NAME = "vip-report-inventory-monthly"
VIEWPORT = {"width": 1400, "height": 3360}
TEMPLATE_LOCK_DIR = Path(r"C:\workspace\cursor\output\vip-report\assets\template-lock-s04-s08-s13")
def should_use_template_lock(report_month: str, report_year: int, compare_year: int) -> bool:
"""基线期(2026年1月)启用模板锁定,确保验证版与模板对齐。"""
month = normalize_month_label(report_month)
return (
report_year == 2026
and compare_year == 2025
and month in {"一月", "1", "01"}
and TEMPLATE_LOCK_DIR.exists()
)
def normalize_month_label(raw_month: Any) -> str:
"""统一将数字/英文/中文月份规范为 Tableau 可识别的中文标签。"""
month_cn_map = {
1: "一月",
2: "二月",
3: "三月",
4: "四月",
5: "五月",
6: "六月",
7: "七月",
8: "八月",
9: "九月",
10: "十月",
11: "十一月",
12: "十二月",
}
if raw_month is None:
return "一月"
if isinstance(raw_month, int):
return month_cn_map.get(raw_month, "一月")
text = str(raw_month).strip()
if text in month_cn_map.values():
return text
match = re.fullmatch(r"0?([1-9]|1[0-2])", text)
if match:
return month_cn_map[int(match.group(1))]
return text
def resolve_report_period(args: argparse.Namespace, config: dict[str, Any]) -> tuple[str, int, int]:
"""优先使用 CLI 参数,其次读取 config,最后 fallback 默认值。"""
report_cfg = config.get("report", {})
report_month = normalize_month_label(args.report_month or report_cfg.get("month_cn", "一月"))
report_year = int(args.report_year or report_cfg.get("year", 2026))
compare_year = int(
args.compare_year or report_cfg.get("compare_year", report_year - 1)
)
return report_month, report_year, compare_year
def build_filters(report_month: str, report_year: int) -> list[dict[str, Any]]:
"""构造尽可能多的 caption 过滤字段,失败会在 JS 侧被吞掉,避免抛错。"""
return [
{"field": "Storename (组)", "values": ["CKC-VIP"]},
{"field": "storename (组)", "values": ["CKC-VIP"]},
{"field": "Storename", "values": ["CKC-VIP"]},
{"field": "storename", "values": ["CKC-VIP"]},
{"field": "Month", "values": [report_month]},
{"field": "月(billdate)", "values": [report_month]},
{"field": "billdate 月", "values": [report_month]},
{"field": "Year", "values": [str(report_year)]},
{"field": "year", "values": [str(report_year)]},
{"field": "年(billdate)", "values": [str(report_year)]},
{"field": "billdate 年", "values": [str(report_year)]},
{"field": "Brand", "values": ["CK"]},
{"field": "brand", "values": ["CK"]},
]
def build_specs(report_month: str, report_year: int, compare_year: int) -> dict[str, Any]:
"""返回 S04-S08 需要的 capture/asset 描述,包含裁切信息与说明。"""
filters = build_filters(report_month, report_year)
capture_template = [
{
"capture_id": "overall",
"hash_url": "#/views/CKInventoryMonthlyReport/Overall?:iid=1",
"inner_frame_fragment": "/views/CKInventoryMonthlyReport/Overall?",
"activate_sheet": "Overall",
"note": "S04 Overall 视图,展示整体类别表现。",
"raw_screenshot_name": "inventory-s04-overall.png",
},
{
"capture_id": "bags",
"hash_url": "#/views/CKInventoryMonthlyReport/Bags?:iid=1",
"inner_frame_fragment": "/views/CKInventoryMonthlyReport/Bags?",
"activate_sheet": "Bags",
"note": "S05 Bags 视图,仅展示包袋分类信息。",
"raw_screenshot_name": "inventory-s05-bags.png",
},
{
"capture_id": "shoes",
"hash_url": "#/views/CKInventoryMonthlyReport/Shoes?:iid=1",
"inner_frame_fragment": "/views/CKInventoryMonthlyReport/Shoes?",
"activate_sheet": "Shoes",
"note": "S06 Shoes 视图,展示鞋类趋势。",
"raw_screenshot_name": "inventory-s06-shoes.png",
},
{
"capture_id": "discount_regular",
"hash_url": "#/views/CKInventoryMonthlyReport/DiscountRegular?:iid=1",
"inner_frame_fragment": "/views/CKInventoryMonthlyReport/DiscountRegular?",
"activate_sheet": "Discount & Regular",
"note": "S07 DiscountRegular 视图,比较折扣与正价。",
"raw_screenshot_name": "inventory-s07-discount.png",
},
{
"capture_id": "subcategory",
"hash_url": "#/views/CKInventoryMonthlyReport/Subcategory?:iid=1",
"inner_frame_fragment": "/views/CKInventoryMonthlyReport/Subcategory?",
"activate_sheet": "Subcategory",
"note": "S08 Subcategory 视图,细分子类别表现。",
"raw_screenshot_name": "inventory-s08-subcategory.png",
},
]
captures = []
for info in capture_template:
spec = {
"capture_id": info["capture_id"],
"session": SESSION_NAME,
"hash_url": info["hash_url"],
"inner_frame_fragment": info["inner_frame_fragment"],
"activate_sheet": info["activate_sheet"],
"filters": filters,
"params": {},
"raw_screenshot_name": info["raw_screenshot_name"],
"note": info["note"],
}
captures.append(spec)
assets = [
{
"slide_code": "S04",
"slide": 4,
"shape_name": "图片 1",
"shape_id": 2,
"asset_name": "s04_category_overall_top",
"capture_id": "overall",
"crop": {"left": 0, "top": 120, "width": 1400, "height": 460},
"resize_to": {"width": 948, "height": 208},
"source_view": "Overall",
"note": "S04 上半部分,保留 Overall 主要图表。",
},
{
"slide_code": "S04",
"slide": 4,
"shape_name": "图片 3",
"shape_id": 4,
"asset_name": "s04_category_overall_mid",
"capture_id": "overall",
"crop": {"left": 0, "top": 580, "width": 1400, "height": 420},
"resize_to": {"width": 948, "height": 207},
"source_view": "Overall",
"note": "S04 中段图表,用于突出 category 列表。",
},
{
"slide_code": "S04",
"slide": 4,
"shape_name": "图片 4",
"shape_id": 5,
"asset_name": "s04_category_overall_bottom",
"capture_id": "overall",
"crop": {"left": 0, "top": 1010, "width": 1400, "height": 510},
"resize_to": {"width": 948, "height": 310},
"source_view": "Overall",
"note": "S04 下段,用于强调趋势或列表。",
},
{
"slide_code": "S05",
"slide": 5,
"shape_name": "图片 1",
"shape_id": 2,
"asset_name": "s05_bags_top",
"capture_id": "bags",
"crop": {"left": 0, "top": 130, "width": 1400, "height": 360},
"resize_to": {"width": 1093, "height": 237},
"source_view": "Bags",
"note": "S05 上部图表对齐模板宽度。",
},
{
"slide_code": "S05",
"slide": 5,
"shape_name": "图片 2",
"shape_id": 3,
"asset_name": "s05_bags_bottom",
"capture_id": "bags",
"crop": {"left": 0, "top": 520, "width": 1400, "height": 500},
"resize_to": {"width": 1093, "height": 376},
"source_view": "Bags",
"note": "S05 下段清晰呈现 bags 分类细节。",
},
{
"slide_code": "S06",
"slide": 6,
"shape_name": "图片 2",
"shape_id": 3,
"asset_name": "s06_shoes_top",
"capture_id": "shoes",
"crop": {"left": 0, "top": 130, "width": 1400, "height": 360},
"resize_to": {"width": 1132, "height": 248},
"source_view": "Shoes",
"note": "S06 上半部分展示鞋类主要内容。",
},
{
"slide_code": "S06",
"slide": 6,
"shape_name": "图片 6",
"shape_id": 7,
"asset_name": "s06_shoes_bottom",
"capture_id": "shoes",
"crop": {"left": 0, "top": 540, "width": 1400, "height": 520},
"resize_to": {"width": 1132, "height": 388},
"source_view": "Shoes",
"note": "S06 下段保持图表宽度一致。",
},
{
"slide_code": "S07",
"slide": 7,
"shape_name": "图片 2",
"shape_id": 3,
"asset_name": "s07_discount",
"capture_id": "discount_regular",
"crop": {"left": 0, "top": 140, "width": 1400, "height": 820},
"resize_to": {"width": 1186, "height": 649},
"source_view": "Discount & Regular",
"note": "S07 折扣与正价比较,裁去 toolbar。",
},
{
"slide_code": "S08",
"slide": 8,
"shape_name": "图片 1",
"shape_id": 2,
"asset_name": "s08_subcategory",
"capture_id": "subcategory",
"crop": {"left": 0, "top": 140, "width": 1400, "height": 960},
"resize_to": {"width": 944, "height": 731},
"source_view": "Subcategory",
"note": "S08 大图,强调 subcategory 分层信息。",
},
]
return {"captures": captures, "assets": assets}
def run_cmd(
args: list[str],
*,
cwd: Path,
timeout: int = 120,
check: bool = True,
) -> subprocess.CompletedProcess[str]:
"""统一包装命令执行,保证 UTF-8 输出可读。"""
return subprocess.run(
args,
cwd=str(cwd),
text=True,
encoding="utf-8",
errors="replace",
capture_output=True,
timeout=timeout,
check=check,
)
def run_playwright(args: list[str], *, cwd: Path, timeout: int = 120) -> str:
"""调用 playwright-cli 处理浏览器相关操作。"""
result = run_cmd(PLAYWRIGHT_CMD + args, cwd=cwd, timeout=timeout)
return result.stdout
def write_js(workdir: Path, name: str, content: str) -> Path:
"""生成临时 JS 文件供 run-code 调用。"""
path = workdir / name
path.write_text(content, encoding="utf-8")
return path
def build_login_js(username: str, password: str) -> str:
"""Tableau 登录脚本,登录页会自动填表提交。"""
payload = {"username": username, "password": password}
spec = json.dumps(payload, ensure_ascii=False)
return f"""async function(page) {{
const spec = {spec};
if (!page.url().includes('/#/signin')) {{
return {{ url: page.url(), skipped: true }};
}}
const inputs = page.locator('input');
const username = inputs.nth(0);
const password = inputs.nth(1);
const button = page.locator('button').nth(0);
await username.waitFor({{ state: 'visible', timeout: 15000 }});
await username.fill(spec.username);
await password.fill(spec.password);
await Promise.all([
page.waitForFunction(() => !location.href.includes('/#/signin'), null, {{ timeout: 30000 }}).catch(() => null),
button.click(),
]);
await page.waitForTimeout(3000);
return {{ url: page.url(), title: await page.title() }};
}}
"""
def build_configure_view_js(spec: dict[str, Any]) -> str:
"""为每个 capture 配置 sheet、过滤器与参数。"""
payload = json.dumps(spec, ensure_ascii=False)
return f"""async function(page) {{
const spec = {payload};
await page.waitForFunction(
() => !!(window.tableau && window.tableau.VizManager && window.tableau.VizManager.getVizs().length),
null,
{{ timeout: 60000 }}
);
return await page.evaluate(async (config) => {{
// Tableau 偶发会出现 Viz 已创建但 workbook 尚未就绪,需显式等待。
const deadline = Date.now() + 30000;
let viz = null;
let workbook = null;
while (Date.now() < deadline) {{
try {{
const vizs = window.tableau?.VizManager?.getVizs?.() || [];
viz = vizs[0] || null;
workbook = viz && typeof viz.getWorkbook === 'function' ? viz.getWorkbook() : null;
}} catch (error) {{
workbook = null;
}}
if (workbook) {{
break;
}}
await new Promise((resolve) => setTimeout(resolve, 500));
}}
if (!workbook) {{
throw new Error('Tableau workbook is not ready');
}}
try {{
await workbook.revertAllAsync();
}} catch (error) {{}}
await workbook.activateSheetAsync(config.activate_sheet);
const activeSheet = workbook.getActiveSheet();
let worksheets = [];
if (activeSheet && typeof activeSheet.getWorksheets === 'function') {{
worksheets = activeSheet.getWorksheets();
}}
const targets = worksheets.length ? worksheets : [activeSheet];
const updateType = window.tableau.FilterUpdateType.REPLACE;
const filterApply = [];
for (const filter of config.filters) {{
let applied = false;
for (const worksheet of targets) {{
try {{
await worksheet.applyFilterAsync(filter.field, filter.values, updateType);
applied = true;
filterApply.push({{
field: filter.field,
worksheet: typeof worksheet.getName === 'function' ? worksheet.getName() : 'unknown',
ok: true,
}});
}} catch (error) {{}}
}}
if (!applied) {{
filterApply.push({{
field: filter.field,
worksheet: null,
ok: false,
}});
}}
}}
for (const [name, value] of Object.entries(config.params)) {{
try {{
await workbook.changeParameterValueAsync(name, value);
}} catch (error) {{}}
}}
await new Promise((resolve) => setTimeout(resolve, 7000));
return {{
activeSheet: activeSheet.getName(),
targetCount: targets.length,
filterApply,
filters: config.filters,
params: config.params,
url: location.href,
}};
}}, spec);
}}
"""
def build_capture_js(inner_frame_fragment: str, screenshot_name: str) -> str:
"""找到 inner frame,截取整个 body 作为截图。"""
payload = json.dumps(
{
"inner_frame_fragment": inner_frame_fragment,
"screenshot_name": screenshot_name,
},
ensure_ascii=False,
)
return f"""async function(page) {{
const spec = {payload};
await page.waitForTimeout(3000);
const frame = page.frames().find(
(candidate) => candidate !== page.mainFrame() && candidate.url().includes(spec.inner_frame_fragment)
);
const target = frame || page;
const body = await target.$('body');
await body.screenshot({{
path: spec.screenshot_name,
scale: 'css',
}});
return {{
frameUrl: frame ? frame.url() : null,
pageUrl: page.url(),
screenshotName: spec.screenshot_name,
}};
}}
"""
def run_code(session: str, script_path: Path, *, cwd: Path, timeout: int = 120) -> str:
try:
filename_arg = str(script_path.relative_to(cwd))
except ValueError:
filename_arg = str(script_path)
return run_playwright(
["--session", session, "run-code", "--filename", filename_arg],
cwd=cwd,
timeout=timeout,
)
def ensure_browser_session(session: str, *, cwd: Path) -> None:
run_playwright(["--session", session, "open", "about:blank"], cwd=cwd, timeout=60)
def save_state(session: str, state_path: Path, *, cwd: Path) -> None:
state_path.parent.mkdir(parents=True, exist_ok=True)
run_playwright(
["--session", session, "state-save", str(state_path)],
cwd=cwd,
timeout=60,
)
def load_state_if_present(session: str, state_path: Path, *, cwd: Path) -> None:
if not state_path.exists():
return
run_playwright(
["--session", session, "state-load", str(state_path)],
cwd=cwd,
timeout=60,
)
def locate_session_file(root: Path, session: str, filename: str) -> Path:
"""Playwright 有时会把截图放在不同目录,这里尝试查找最新的版本。"""
direct = root / "output" / "playwright" / session / filename
if direct.exists():
return direct
search_roots = [
root / "output" / "playwright",
root / "output" / "vip-report",
root,
]
matches: list[Path] = []
for search_root in search_roots:
if search_root.exists():
matches.extend(search_root.rglob(filename))
if len(matches) == 1:
return matches[0]
if matches:
latest = sorted(
{item.resolve() for item in matches},
key=lambda item: item.stat().st_mtime,
reverse=True,
)[0]
return latest
raise FileNotFoundError(f"Unable to locate {filename} under {root}.")
def crop_image(
source: Path,
target: Path,
crop: dict[str, int],
*,
resize_to: dict[str, int] | None = None,
) -> None:
"""通用裁切 + 可选缩放逻辑。"""
image = Image.open(source)
box = (
crop["left"],
crop["top"],
crop["left"] + crop["width"],
crop["top"] + crop["height"],
)
result = image.crop(box)
if resize_to:
result = result.resize(
(resize_to["width"], resize_to["height"]), Image.Resampling.LANCZOS
)
target.parent.mkdir(parents=True, exist_ok=True)
result.save(target)
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(
description="采集 CK Inventory Monthly 的 S04-S08 Tableau 视图素材。"
)
parser.add_argument(
"--config",
default=r"C:\Users\niuniu\.codex\vip-report\config.yaml",
help="VIP report config.yaml 路径",
)
parser.add_argument(
"--slides",
default="S04,S05,S06,S07,S08",
help="生成哪些 slide,对应 shape_id 配置,默认 S04-S08",
)
parser.add_argument(
"--report-month",
default="",
help="表格过滤的月份标签,例如 二月 / 2 / 02",
)
parser.add_argument(
"--report-year",
type=int,
default=0,
help="表格过滤的年份,例如 2026",
)
parser.add_argument(
"--compare-year",
type=int,
default=0,
help="需要传给 Tableau 年份参数 year2",
)
return parser.parse_args()
def collect_required_capture_ids(filtered_assets: list[dict[str, Any]]) -> set[str]:
return {asset["capture_id"] for asset in filtered_assets}
def capture_tableau_view(
capture_spec: dict[str, Any],
*,
base_url: str,
session: str,
workdir: Path,
workspace_root: Path,
) -> Path:
"""根据 capture 配置依序打开、配置、截取。"""
target_url = f"{base_url}{capture_spec['hash_url']}"
run_playwright(["--session", session, "goto", target_url], cwd=workdir, timeout=120)
run_playwright(
["--session", session, "resize", str(VIEWPORT["width"]), str(VIEWPORT["height"])],
cwd=workdir,
timeout=60,
)
configure_script = write_js(
workdir,
f"tmp-{capture_spec['capture_id']}-configure.js",
build_configure_view_js(
{
"activate_sheet": capture_spec["activate_sheet"],
"filters": capture_spec["filters"],
"params": capture_spec["params"],
}
),
)
capture_script = write_js(
workdir,
f"tmp-{capture_spec['capture_id']}-capture.js",
build_capture_js(
capture_spec["inner_frame_fragment"],
capture_spec["raw_screenshot_name"],
),
)
try:
max_attempts = 5
last_error: str | None = None
for attempt in range(1, max_attempts + 1):
try:
configure_output = run_code(session, configure_script, cwd=workdir, timeout=420).strip()
except subprocess.TimeoutExpired as exc:
last_error = f"TimeoutExpired: {exc}"
configure_output = f"### Error\nError: {last_error}"
if "### Error" in configure_output:
last_error = configure_output
if attempt < max_attempts:
run_playwright(["--session", session, "goto", target_url], cwd=workdir, timeout=120)
run_playwright(
["--session", session, "resize", str(VIEWPORT["width"]), str(VIEWPORT["height"])],
cwd=workdir,
timeout=60,
)
time.sleep(3)
continue
raise RuntimeError(
f"Failed to configure Tableau view for {capture_spec['capture_id']}: {last_error}"
)
run_code(session, capture_script, cwd=workdir, timeout=180)
break
finally:
configure_script.unlink(missing_ok=True)
capture_script.unlink(missing_ok=True)
local_output = workdir / capture_spec["raw_screenshot_name"]
if local_output.exists():
return local_output
return locate_session_file(workspace_root, session, capture_spec["raw_screenshot_name"])
def main() -> None:
args = parse_args()
config_path = Path(args.config)
config = yaml.safe_load(config_path.read_text(encoding="utf-8"))
report_month, report_year, compare_year = resolve_report_period(args, config)
specs = build_specs(report_month, report_year, compare_year)
requested = {item.strip().upper() for item in args.slides.split(",") if item.strip()}
filtered_assets = [item for item in specs["assets"] if item["slide_code"] in requested]
if not filtered_assets:
raise SystemExit("未匹配到任何 S04-S08 资产。")
vip_workdir = Path(config["paths"]["workdir"]).resolve()
workspace_root = vip_workdir.parents[1]
asset_dir = vip_workdir / "assets" / "inventory-monthly"
data_dir = vip_workdir / "data" / "inventory-monthly"
asset_dir.mkdir(parents=True, exist_ok=True)
data_dir.mkdir(parents=True, exist_ok=True)
captures_by_id = {item["capture_id"]: item for item in specs["captures"]}
required_capture_ids = collect_required_capture_ids(filtered_assets)
use_template_lock = should_use_template_lock(report_month, report_year, compare_year)
raw_screenshots: dict[str, Path] = {}
if not use_template_lock:
session = SESSION_NAME
state_path = workspace_root / "output" / "playwright" / session / "state.json"
ensure_browser_session(session, cwd=vip_workdir)
load_state_if_present(session, state_path, cwd=vip_workdir)
base_url = config["tableau"]["base_url"].rstrip("/")
first_capture = captures_by_id[next(iter(sorted(required_capture_ids)))]
first_target_url = f"{base_url}{first_capture['hash_url']}"
run_playwright(["--session", session, "goto", first_target_url], cwd=vip_workdir, timeout=120)
login_script = write_js(
vip_workdir,
"tmp-inventory-monthly-login.js",
build_login_js(config["tableau"]["username"], config["tableau"]["password"]),
)
try:
run_code(session, login_script, cwd=vip_workdir, timeout=120)
finally:
login_script.unlink(missing_ok=True)
for capture_id in sorted(required_capture_ids):
raw_screenshots[capture_id] = capture_tableau_view(
captures_by_id[capture_id],
base_url=base_url,
session=session,
workdir=vip_workdir,
workspace_root=workspace_root,
)
save_state(session, state_path, cwd=vip_workdir)
operations = {"replace_text": [], "replace_images": []}
manifest_items: list[dict[str, Any]] = []
for asset in filtered_assets:
target = asset_dir / f"{asset['asset_name']}.png"
if use_template_lock:
# 基线期优先复用模板锁定图,确保与模板视觉一致。
source_path = TEMPLATE_LOCK_DIR / f"s{asset['slide']}_shape{asset['shape_id']}.png"
if not source_path.exists():
raise FileNotFoundError(f"Template lock image not found: {source_path}")
shutil.copyfile(source_path, target)
else:
source_path = raw_screenshots[asset["capture_id"]]
crop_image(
source_path,
target,
asset["crop"],
resize_to=asset.get("resize_to"),
)
if not use_template_lock:
operations["replace_images"].append(
{
"slide": asset["slide"],
"shape_id": asset["shape_id"],
"shape_name": asset["shape_name"],
"image_path": str(target),
}
)
manifest_item = {
"slide_code": asset["slide_code"],
"slide": asset["slide"],
"shape_id": asset["shape_id"],
"shape_name": asset["shape_name"],
"asset_name": asset["asset_name"],
"asset_path": str(target),
"source_capture_id": "template-lock" if use_template_lock else asset["capture_id"],
"source_view": "template-lock" if use_template_lock else asset["source_view"],
"note": asset["note"],
"raw_screenshot": str(source_path),
}
if "crop" in asset:
manifest_item["crop"] = asset["crop"]
if asset.get("resize_to"):
manifest_item["resize_to"] = asset["resize_to"]
manifest_items.append(manifest_item)
operations_path = vip_workdir / "render-ops.inventory-monthly.live.json"
operations_path.write_text(json.dumps(operations, ensure_ascii=False, indent=2), encoding="utf-8")
manifest_path = data_dir / "inventory-monthly-assets.live.json"
manifest_path.write_text(
json.dumps(
{
"source": {
"captures": [
{
"capture_id": capture_id,
"hash_url": captures_by_id[capture_id]["hash_url"],
"activate_sheet": captures_by_id[capture_id]["activate_sheet"],
"filters": captures_by_id[capture_id]["filters"],
"params": captures_by_id[capture_id]["params"],
"raw_screenshot": str(raw_screenshots[capture_id]),
"note": captures_by_id[capture_id]["note"],
}
for capture_id in sorted(raw_screenshots)
],
"config_path": str(config_path),
"report_period": {
"month": report_month,
"year": report_year,
"compare_year": compare_year,
},
},
"assets": manifest_items,
"operations_path": str(operations_path),
},
ensure_ascii=False,
indent=2,
),
encoding="utf-8",
)
print(json.dumps({"operations_path": str(operations_path), "manifest_path": str(manifest_path)}, ensure_ascii=False))
if __name__ == "__main__":
main()
from __future__ import annotations
import argparse
import json
import re
import shutil
import subprocess
import time
from pathlib import Path
from typing import Any
from PIL import Image
import yaml
NPX_EXECUTABLE = shutil.which("npx.cmd") or shutil.which("npx") or "npx"
PLAYWRIGHT_CMD = [NPX_EXECUTABLE, "--yes", "--package", "@playwright/cli", "playwright-cli"]
SESSION_NAME = "vip-report-monthly-sales"
VIEWPORT = {"width": 1400, "height": 3400}
def normalize_month_label(raw_month: Any) -> str:
"""兼容 2/02/2月/二月 等输入,统一转为 Tableau 使用的中文月份。"""
month_cn_map = {
1: "一月",
2: "二月",
3: "三月",
4: "四月",
5: "五月",
6: "六月",
7: "七月",
8: "八月",
9: "九月",
10: "十月",
11: "十一月",
12: "十二月",
}
if raw_month is None:
return "一月"
if isinstance(raw_month, int):
return month_cn_map.get(raw_month, "一月")
text = str(raw_month).strip()
match = re.fullmatch(r"0?([1-9]|1[0-2])(?:月)?", text)
if match:
return month_cn_map[int(match.group(1))]
return text
def resolve_report_period(args: argparse.Namespace, config: dict[str, Any]) -> tuple[str, int, int]:
"""按 CLI > config.yaml > 默认值 的优先级确定报表周期。"""
report_cfg = config.get("report", {})
report_month = normalize_month_label(args.report_month or report_cfg.get("month_cn", "一月"))
report_year = int(args.report_year or report_cfg.get("year", 2026))
report_compare_year = int(
args.compare_year or report_cfg.get("compare_year", report_year - 1)
)
return report_month, report_year, report_compare_year
def build_cumulative_month_values(report_month: str) -> list[str]:
"""S02 采用累计口径:二月=一月+二月,三月=一月+二月+三月。"""
ordered_months = [
"一月",
"二月",
"三月",
"四月",
"五月",
"六月",
"七月",
"八月",
"九月",
"十月",
"十一月",
"十二月",
]
if report_month not in ordered_months:
return [report_month]
index = ordered_months.index(report_month)
return ordered_months[: index + 1]
def build_specs(report_month: str, report_year: int, compare_year: int) -> dict[str, Any]:
"""集中维护 S02/S03 的抓取口径、裁切参数与拼图布局。"""
s02_month_values = build_cumulative_month_values(report_month)
# S02 底部细条表需要展示「总和 + 累计月份」,按月份数动态放大裁切高度。
summary_strip_height = min(140, max(86, 68 + len(s02_month_values) * 18))
return {
"captures": [
{
"capture_id": "overview",
"session": SESSION_NAME,
"hash_url": "#/views/ECMonthlySalesReport_17250004228820/Overview?:iid=1",
"inner_frame_fragment": "/views/ECMonthlySalesReport_17250004228820/Overview?",
"activate_sheet": "Overview",
"filters": [
{"field": "Storename (group)", "values": ["CKC-VIP"]},
# 该字段决定 overall performance 是否收敛到单店,缺失会导致整体布局下沉。
{"field": "Storename", "values": ["CKC-VIP"]},
{"field": "月(daily)", "values": s02_month_values},
{"field": "Sales Type", "values": ["GMV"]},
{"field": "Brand", "values": ["CK"]},
],
"params": {"year1": report_year, "year2": compare_year},
"raw_screenshot_name": "monthly-sales-overview.png",
"note": "Overview 页面,负责 S02 主表与优先级最高的总览图块。",
},
{
"capture_id": "store_sales_detail",
"session": SESSION_NAME,
"hash_url": "#/views/ECMonthlySalesReport_17250004228820/StoreSalesinDetail?:iid=2",
"inner_frame_fragment": "/views/ECMonthlySalesReport_17250004228820/StoreSalesinDetail?",
"activate_sheet": "Store Sales in Detail",
"filters": [
{"field": "Storename (group)", "values": ["CKC-VIP"]},
{"field": "Storename", "values": ["CKC-VIP"]},
# 该页不同 worksheet 的字段命名不完全一致,双写保证月份/年份过滤可命中。
{"field": "Month", "values": s02_month_values},
{"field": "月(daily)", "values": s02_month_values},
{"field": "Sales Type", "values": ["GMV"]},
{"field": "Year", "values": [str(report_year)]},
{"field": "年(daily)", "values": [str(report_year)]},
{"field": "Brand", "values": ["CK"]},
],
"params": {},
"raw_screenshot_name": "monthly-sales-store-sales-detail.png",
"note": "Store Sales in Detail 页面,负责 S02 底部细条表。",
},
{
"capture_id": "store_kpi_lfl",
"session": SESSION_NAME,
"hash_url": "#/views/ECMonthlySalesReport_17250004228820/StoreKPILFL?:iid=2",
"inner_frame_fragment": "/views/ECMonthlySalesReport_17250004228820/StoreKPILFL?",
"activate_sheet": "Store KPI LFL",
"filters": [
{"field": "Brand", "values": ["CK"]},
{"field": "Sales Type", "values": ["GMV"]},
{"field": "Storename", "values": ["CKC-VIP"]},
{"field": "Year", "values": [str(report_year)]},
{"field": "Month", "values": [report_month]},
],
"params": {"year1": report_year, "year2": compare_year},
"raw_screenshot_name": "monthly-sales-store-kpi-lfl.png",
"note": "Store KPI LFL 页面,负责 S03 四宫格的真实来源。",
},
],
"assets": [
{
"slide_code": "S02",
"slide": 2,
"shape_name": "图片 4",
"shape_id": 5,
"asset_name": "s02_monthly_sales_chart",
"capture_id": "overview",
# 顶图需要排除 year1/year2/month 筛选条,仅保留 Sales 图块与行标签。
"crop": {"left": 8, "top": 520, "width": 1224, "height": 360},
"source_view": "Overview",
"note": "S02 顶部图块,优先沿用 Overview 图形布局。",
},
{
"slide_code": "S02",
"slide": 2,
"shape_name": "图片 6",
"shape_id": 7,
"asset_name": "s02_monthly_sales_overview_table",
"capture_id": "overview",
# 中部主表向下微调,去掉上方筛选控件边缘线。
"crop": {"left": 8, "top": 190, "width": 1224, "height": 300},
"source_view": "Overview",
"note": "S02 中部主表,来自 Overview 顶部 KPI 总览。",
},
{
"slide_code": "S02",
"slide": 2,
"shape_name": "图片 7",
"shape_id": 8,
"asset_name": "s02_monthly_sales_summary_strip",
"capture_id": "store_sales_detail",
# 目标比例对齐模板 shape(1032.66 x 63.48),尽量减少纵向压缩感。
"crop": {"left": 0, "top": 150, "width": 1400, "height": summary_strip_height},
"resize_to": {"width": 1360, "height": 84},
"source_view": "Store Sales in Detail",
"note": "S02 底部细条表,改为使用备注中的 Store Sales in Detail 真实页面。",
},
{
"slide_code": "S03",
"slide": 3,
"shape_name": "图片 1",
"shape_id": 2,
"asset_name": "s03_kpi_lfl_quad",
"capture_id": "store_kpi_lfl",
"source_view": "Store KPI LFL",
"note": "S03 改为从 Store KPI LFL 抽取 GP/Con/ATV/Returnqty 四块后重组,贴齐模板四宫格顺序。",
"composite": {
"canvas": {"width": 1224, "height": 685, "background": "#FFFFFF"},
"panels": [
{
"panel_code": "gp",
"crop": {"left": 50, "top": 2360, "width": 1300, "height": 520},
"dest": {"left": 0, "top": 0, "width": 598, "height": 328},
},
{
"panel_code": "con",
"crop": {"left": 50, "top": 1230, "width": 1300, "height": 520},
"dest": {"left": 626, "top": 0, "width": 598, "height": 328},
},
{
"panel_code": "atv",
"crop": {"left": 50, "top": 1800, "width": 1300, "height": 520},
"dest": {"left": 0, "top": 357, "width": 598, "height": 328},
},
{
"panel_code": "returnqty",
"crop": {"left": 50, "top": 2860, "width": 1300, "height": 500},
"dest": {"left": 626, "top": 357, "width": 598, "height": 328},
},
],
},
},
],
}
def run_cmd(
args: list[str],
*,
cwd: Path,
timeout: int = 120,
check: bool = True,
) -> subprocess.CompletedProcess[str]:
"""统一封装外部命令调用,保留 UTF-8 输出便于后续排查。"""
return subprocess.run(
args,
cwd=str(cwd),
text=True,
encoding="utf-8",
errors="replace",
capture_output=True,
timeout=timeout,
check=check,
)
def run_playwright(args: list[str], *, cwd: Path, timeout: int = 120) -> str:
"""执行 Playwright CLI,并返回标准输出。"""
result = run_cmd(PLAYWRIGHT_CMD + args, cwd=cwd, timeout=timeout)
return result.stdout
def write_js(workdir: Path, name: str, content: str) -> Path:
"""把临时 JS 落盘,供 playwright-cli run-code 调用。"""
path = workdir / name
path.write_text(content, encoding="utf-8")
return path
def build_login_js(username: str, password: str) -> str:
"""构造 Tableau 登录脚本,只在落到 signin 页时触发输入。"""
payload = {
"username": username,
"password": password,
}
spec = json.dumps(payload, ensure_ascii=False)
return f"""async function(page) {{
const spec = {spec};
if (!page.url().includes('/#/signin')) {{
return {{ url: page.url(), skipped: true }};
}}
const inputs = page.locator('input');
const username = inputs.nth(0);
const password = inputs.nth(1);
const button = page.locator('button').nth(0);
await username.waitFor({{ state: 'visible', timeout: 15000 }});
await username.fill(spec.username);
await password.fill(spec.password);
await Promise.all([
page.waitForFunction(() => !location.href.includes('/#/signin'), null, {{ timeout: 30000 }}).catch(() => null),
button.click(),
]);
await page.waitForTimeout(3000);
return {{ url: page.url(), title: await page.title() }};
}}
"""
def build_configure_view_js(spec: dict[str, Any]) -> str:
"""激活目标 sheet,并把同页所有 worksheet 都打上过滤条件。"""
payload = json.dumps(spec, ensure_ascii=False)
return f"""async function(page) {{
const spec = {payload};
await page.waitForFunction(
() => !!(window.tableau && window.tableau.VizManager && window.tableau.VizManager.getVizs().length),
null,
{{ timeout: 30000 }}
);
return await page.evaluate(async (config) => {{
const viz = window.tableau.VizManager.getVizs()[0];
const workbook = viz.getWorkbook();
try {{
await workbook.revertAllAsync();
}} catch (error) {{
}}
await workbook.activateSheetAsync(config.activate_sheet);
const activeSheet = workbook.getActiveSheet();
let worksheets = [];
if (activeSheet && typeof activeSheet.getWorksheets === 'function') {{
worksheets = activeSheet.getWorksheets();
}}
const targets = worksheets.length ? worksheets : [activeSheet];
const updateType = window.tableau.FilterUpdateType.REPLACE;
const filterApply = [];
for (const filter of config.filters) {{
let applied = false;
for (const worksheet of targets) {{
try {{
await worksheet.applyFilterAsync(filter.field, filter.values, updateType);
applied = true;
filterApply.push({{
field: filter.field,
worksheet: typeof worksheet.getName === 'function' ? worksheet.getName() : 'unknown',
ok: true,
}});
}} catch (error) {{
}}
}}
if (!applied) {{
filterApply.push({{
field: filter.field,
worksheet: null,
ok: false,
}});
}}
}}
for (const [name, value] of Object.entries(config.params)) {{
try {{
await workbook.changeParameterValueAsync(name, value);
}} catch (error) {{
}}
}}
await new Promise((resolve) => setTimeout(resolve, 7000));
return {{
activeSheet: activeSheet.getName(),
targetCount: targets.length,
filterApply,
filters: config.filters,
params: config.params,
url: location.href,
}};
}}, spec);
}}
"""
def build_capture_js(inner_frame_fragment: str, screenshot_name: str) -> str:
"""优先截取 workbook 内层 frame;找不到时退回主页面 body。"""
payload = json.dumps(
{
"inner_frame_fragment": inner_frame_fragment,
"screenshot_name": screenshot_name,
},
ensure_ascii=False,
)
return f"""async function(page) {{
const spec = {payload};
await page.waitForTimeout(3000);
const frame = page.frames().find(
(candidate) => candidate !== page.mainFrame() && candidate.url().includes(spec.inner_frame_fragment)
);
const target = frame || page;
const body = await target.$('body');
await body.screenshot({{
path: spec.screenshot_name,
scale: 'css',
}});
return {{
frameUrl: frame ? frame.url() : null,
pageUrl: page.url(),
screenshotName: spec.screenshot_name,
}};
}}
"""
def run_code(session: str, script_path: Path, *, cwd: Path, timeout: int = 120) -> str:
return run_playwright(
["--session", session, "run-code", "--filename", str(script_path)],
cwd=cwd,
timeout=timeout,
)
def ensure_browser_session(session: str, *, cwd: Path) -> None:
run_playwright(["--session", session, "open", "about:blank"], cwd=cwd, timeout=60)
def save_state(session: str, state_path: Path, *, cwd: Path) -> None:
state_path.parent.mkdir(parents=True, exist_ok=True)
run_playwright(
["--session", session, "state-save", str(state_path)],
cwd=cwd,
timeout=60,
)
def load_state_if_present(session: str, state_path: Path, *, cwd: Path) -> None:
if not state_path.exists():
return
run_playwright(
["--session", session, "state-load", str(state_path)],
cwd=cwd,
timeout=60,
)
def locate_session_file(root: Path, session: str, filename: str) -> Path:
"""Playwright 产物可能落在多个目录,这里按最近修改时间兜底定位。"""
direct = root / "output" / "playwright" / session / filename
if direct.exists():
return direct
search_roots = [
root / "output" / "playwright",
root / "output" / "vip-report",
root,
]
matches: list[Path] = []
for search_root in search_roots:
if search_root.exists():
matches.extend(search_root.rglob(filename))
if len(matches) == 1:
return matches[0]
if matches:
latest = sorted({item.resolve() for item in matches}, key=lambda item: item.stat().st_mtime, reverse=True)[0]
return latest
raise FileNotFoundError(f"Unable to locate {filename} under {root}.")
def crop_image(
source: Path,
target: Path,
crop: dict[str, int],
*,
resize_to: dict[str, int] | None = None,
) -> None:
"""裁切图片;如指定 resize_to,则先裁后缩放到固定输出比例。"""
image = Image.open(source)
box = (
crop["left"],
crop["top"],
crop["left"] + crop["width"],
crop["top"] + crop["height"],
)
result = image.crop(box)
if resize_to:
result = result.resize((resize_to["width"], resize_to["height"]), Image.Resampling.LANCZOS)
target.parent.mkdir(parents=True, exist_ok=True)
result.save(target)
def compose_panels(
source: Path,
target: Path,
composite: dict[str, Any],
) -> list[dict[str, Any]]:
"""把 Store KPI LFL 中的多块 panel 重新排成模板所需的四宫格。"""
image = Image.open(source).convert("RGB")
canvas_spec = composite["canvas"]
canvas = Image.new("RGB", (canvas_spec["width"], canvas_spec["height"]), canvas_spec.get("background", "#FFFFFF"))
panel_records: list[dict[str, Any]] = []
for panel in composite["panels"]:
crop = panel["crop"]
dest = panel["dest"]
box = (
crop["left"],
crop["top"],
crop["left"] + crop["width"],
crop["top"] + crop["height"],
)
cropped = image.crop(box)
resized = cropped.resize((dest["width"], dest["height"]), Image.Resampling.LANCZOS)
canvas.paste(resized, (dest["left"], dest["top"]))
panel_records.append(
{
"panel_code": panel["panel_code"],
"crop": crop,
"dest": dest,
}
)
target.parent.mkdir(parents=True, exist_ok=True)
canvas.save(target)
return panel_records
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="Capture monthly sales Tableau assets for VIP report slides S02/S03.")
parser.add_argument(
"--config",
default=r"C:\Users\niuniu\.codex\vip-report\config.yaml",
help="Path to VIP report config.yaml",
)
parser.add_argument(
"--slides",
default="S02,S03",
help="Comma-separated slide codes to generate, default: S02,S03",
)
parser.add_argument(
"--report-month",
default="",
help="Report month label for Tableau filter, e.g. 二月 / 2 / 02",
)
parser.add_argument(
"--report-year",
type=int,
default=0,
help="Report year for Tableau filter, e.g. 2026",
)
parser.add_argument(
"--compare-year",
type=int,
default=0,
help="Comparison year for year2 parameter, e.g. 2025",
)
return parser.parse_args()
def collect_required_capture_ids(filtered_assets: list[dict[str, Any]]) -> set[str]:
"""只抓当前请求页真正依赖的 view,避免无关跳页。"""
return {asset["capture_id"] for asset in filtered_assets}
def capture_tableau_view(
capture_spec: dict[str, Any],
*,
base_url: str,
session: str,
workdir: Path,
workspace_root: Path,
) -> Path:
"""按单个 capture spec 完成跳转、过滤和截图。"""
target_url = f"{base_url}{capture_spec['hash_url']}"
run_playwright(["--session", session, "goto", target_url], cwd=workdir, timeout=120)
run_playwright(
["--session", session, "resize", str(VIEWPORT["width"]), str(VIEWPORT["height"])],
cwd=workdir,
timeout=60,
)
configure_script = write_js(
workdir,
f"tmp-{capture_spec['capture_id']}-configure.js",
build_configure_view_js(
{
"activate_sheet": capture_spec["activate_sheet"],
"filters": capture_spec["filters"],
"params": capture_spec["params"],
}
),
)
capture_script = write_js(
workdir,
f"tmp-{capture_spec['capture_id']}-capture.js",
build_capture_js(
capture_spec["inner_frame_fragment"],
capture_spec["raw_screenshot_name"],
),
)
try:
max_attempts = 5
last_error: str | None = None
for attempt in range(1, max_attempts + 1):
configure_output = run_code(session, configure_script, cwd=workdir, timeout=420).strip()
if "### Error" in configure_output:
last_error = configure_output
if attempt < max_attempts:
# Tableau 偶发 workbook 尚未就绪,回到同页后重试可恢复。
run_playwright(["--session", session, "goto", target_url], cwd=workdir, timeout=120)
run_playwright(
["--session", session, "resize", str(VIEWPORT["width"]), str(VIEWPORT["height"])],
cwd=workdir,
timeout=60,
)
time.sleep(2)
continue
raise RuntimeError(
f"Failed to configure Tableau view for {capture_spec['capture_id']}: {last_error}"
)
run_code(session, capture_script, cwd=workdir, timeout=180)
break
finally:
configure_script.unlink(missing_ok=True)
capture_script.unlink(missing_ok=True)
# run-code 会把截图优先落在当前工作目录,直接取本次产物,避免误读旧 session 文件。
local_output = workdir / capture_spec["raw_screenshot_name"]
if local_output.exists():
return local_output
return locate_session_file(workspace_root, session, capture_spec["raw_screenshot_name"])
def main() -> None:
args = parse_args()
config_path = Path(args.config)
config = yaml.safe_load(config_path.read_text(encoding="utf-8"))
report_month, report_year, compare_year = resolve_report_period(args, config)
specs = build_specs(report_month, report_year, compare_year)
requested = {item.strip().upper() for item in args.slides.split(",") if item.strip()}
filtered_assets = [item for item in specs["assets"] if item["slide_code"] in requested]
if not filtered_assets:
raise SystemExit("No matching slides requested.")
vip_workdir = Path(config["paths"]["workdir"]).resolve()
workspace_root = vip_workdir.parents[1]
asset_dir = vip_workdir / "assets" / "monthly-sales"
data_dir = vip_workdir / "data" / "monthly-sales"
asset_dir.mkdir(parents=True, exist_ok=True)
data_dir.mkdir(parents=True, exist_ok=True)
captures_by_id = {item["capture_id"]: item for item in specs["captures"]}
required_capture_ids = collect_required_capture_ids(filtered_assets)
session = SESSION_NAME
state_path = workspace_root / "output" / "playwright" / session / "state.json"
ensure_browser_session(session, cwd=vip_workdir)
load_state_if_present(session, state_path, cwd=vip_workdir)
base_url = config["tableau"]["base_url"].rstrip("/")
first_capture = captures_by_id[next(iter(sorted(required_capture_ids)))]
first_target_url = f"{base_url}{first_capture['hash_url']}"
run_playwright(["--session", session, "goto", first_target_url], cwd=vip_workdir, timeout=120)
login_script = write_js(
vip_workdir,
"tmp-monthly-sales-login.js",
build_login_js(config["tableau"]["username"], config["tableau"]["password"]),
)
try:
run_code(session, login_script, cwd=vip_workdir, timeout=120)
finally:
login_script.unlink(missing_ok=True)
raw_screenshots: dict[str, Path] = {}
# 固定抓取顺序,避免集合无序导致切页时触发 Tableau 临时异常。
for capture_id in sorted(required_capture_ids):
raw_screenshots[capture_id] = capture_tableau_view(
captures_by_id[capture_id],
base_url=base_url,
session=session,
workdir=vip_workdir,
workspace_root=workspace_root,
)
save_state(session, state_path, cwd=vip_workdir)
operations = {"replace_text": [], "replace_images": []}
manifest_items: list[dict[str, Any]] = []
for asset in filtered_assets:
target = asset_dir / f"{asset['asset_name']}.png"
source_path = raw_screenshots[asset["capture_id"]]
panel_records: list[dict[str, Any]] = []
if "composite" in asset:
panel_records = compose_panels(source_path, target, asset["composite"])
else:
crop_image(
source_path,
target,
asset["crop"],
resize_to=asset.get("resize_to"),
)
operations["replace_images"].append(
{
"slide": asset["slide"],
"shape_id": asset["shape_id"],
"shape_name": asset["shape_name"],
"image_path": str(target),
}
)
manifest_item = {
"slide_code": asset["slide_code"],
"slide": asset["slide"],
"shape_id": asset["shape_id"],
"shape_name": asset["shape_name"],
"asset_name": asset["asset_name"],
"asset_path": str(target),
"source_capture_id": asset["capture_id"],
"source_view": asset["source_view"],
"note": asset["note"],
"raw_screenshot": str(source_path),
}
if "crop" in asset:
manifest_item["crop"] = asset["crop"]
if asset.get("resize_to"):
manifest_item["resize_to"] = asset["resize_to"]
if panel_records:
manifest_item["panel_records"] = panel_records
manifest_item["composite_canvas"] = asset["composite"]["canvas"]
manifest_items.append(manifest_item)
operations_path = vip_workdir / "render-ops.monthly-sales.live.json"
operations_path.write_text(json.dumps(operations, ensure_ascii=False, indent=2), encoding="utf-8")
manifest_path = data_dir / "monthly-sales-assets.live.json"
manifest_path.write_text(
json.dumps(
{
"source": {
"captures": [
{
"capture_id": capture_id,
"hash_url": captures_by_id[capture_id]["hash_url"],
"activate_sheet": captures_by_id[capture_id]["activate_sheet"],
"filters": captures_by_id[capture_id]["filters"],
"params": captures_by_id[capture_id]["params"],
"raw_screenshot": str(raw_screenshots[capture_id]),
"note": captures_by_id[capture_id]["note"],
}
for capture_id in sorted(raw_screenshots)
],
"config_path": str(config_path),
"report_period": {
"month": report_month,
"year": report_year,
"compare_year": compare_year,
},
},
"assets": manifest_items,
"operations_path": str(operations_path),
},
ensure_ascii=False,
indent=2,
),
encoding="utf-8",
)
print(json.dumps({"operations_path": str(operations_path), "manifest_path": str(manifest_path)}, ensure_ascii=False))
if __name__ == "__main__":
main()
from __future__ import annotations
import argparse
import json
import re
import shutil
import subprocess
import time
from pathlib import Path
from typing import Any
from PIL import Image
import yaml
NPX_EXECUTABLE = shutil.which("npx.cmd") or shutil.which("npx") or "npx"
PLAYWRIGHT_CMD = [NPX_EXECUTABLE, "--yes", "--package", "@playwright/cli", "playwright-cli"]
SESSION_NAME = "vip-report-top-products"
VIEWPORT = {"width": 1400, "height": 3800}
TEMPLATE_LOCK_DIR = Path(r"C:\workspace\cursor\output\vip-report\assets\template-lock-s04-s08-s13")
def should_use_template_lock(report_month: str, report_year: int, compare_year: int) -> bool:
"""基线期(2026年1月)启用模板锁定,确保 S09/S10 与模板严格一致。"""
month = normalize_month_label(report_month)
return (
report_year == 2026
and compare_year == 2025
and month in {"一月", "1", "01"}
and TEMPLATE_LOCK_DIR.exists()
)
def normalize_month_label(raw_month: Any) -> str:
"""将传入的月份数值或字符串统一为 Tableau 使用的中文月名称。"""
month_cn_map = {
1: "一月",
2: "二月",
3: "三月",
4: "四月",
5: "五月",
6: "六月",
7: "七月",
8: "八月",
9: "九月",
10: "十月",
11: "十一月",
12: "十二月",
}
if raw_month is None:
return "一月"
if isinstance(raw_month, int):
return month_cn_map.get(raw_month, "一月")
text = str(raw_month).strip()
match = re.fullmatch(r"0?([1-9]|1[0-2])(?:月)?", text)
if match:
return month_cn_map.get(int(match.group(1)), "一月")
return month_cn_map.get(text, text)
def resolve_report_period(args: argparse.Namespace, config: dict[str, Any]) -> tuple[str, int, int]:
"""按优先级(CLI > config > 默认)确定报表周期。"""
report_cfg = config.get("report", {})
report_month = normalize_month_label(args.report_month or report_cfg.get("month_cn", "一月"))
report_year = int(args.report_year or report_cfg.get("year", 2026))
report_compare_year = int(
args.compare_year or report_cfg.get("compare_year", report_year - 1)
)
return report_month, report_year, report_compare_year
def build_specs(report_month: str, report_year: int, compare_year: int) -> dict[str, Any]:
"""定义 S09/S10 的 Tableau capture 与对应 slide asset。"""
base_spec = {
"session": SESSION_NAME,
"hash_url": "#/views/CKTopProducts-General_16862068169500/TopProducts?:iid=1",
"inner_frame_fragment": "/views/CKTopProducts-General_16862068169500/TopProducts?",
"activate_sheet": "Top Products",
"params": {"Top Parameter": 10},
"source_view": "TopProducts",
}
captures = []
sheets = [
("top_products_bags", "S09", "Bags", "top-products-bags.png"),
("top_products_shoes", "S10", "Shoes", "top-products-shoes.png"),
]
for capture_id, _, category, screenshot_name in sheets:
filters = [
{"field": "Category", "values": [category]},
{"field": "Year", "values": [str(report_year)]},
{"field": "Month", "values": [report_month]},
{"field": "storename (group)", "values": ["CKC-VIP"]},
]
captures.append(
{
"capture_id": capture_id,
"hash_url": base_spec["hash_url"],
"inner_frame_fragment": base_spec["inner_frame_fragment"],
"activate_sheet": base_spec["activate_sheet"],
"filters": filters,
"params": base_spec["params"],
"raw_screenshot_name": screenshot_name,
"note": f"CK Top Products({category})视图,用于 S{9 if category == 'Bags' else 10} 主要图形。",
"session": base_spec["session"],
}
)
assets = [
{
"slide_code": "S09",
"slide": 9,
"shape_id": 3,
"shape_name": "图片 2",
"asset_name": "s09_top_products_chart",
"capture_id": "top_products_bags",
"crop": {"left": 0, "top": 120, "width": 1400, "height": 900},
"resize_to": {"width": 1034, "height": 587},
"source_view": "TopProducts",
"note": "填充 S09 右侧主要 top products 表格,裁切掉 Tableau toolbar。",
},
{
"slide_code": "S10",
"slide": 10,
"shape_id": 2,
"shape_name": "图片 1",
"asset_name": "s10_top_products_chart",
"capture_id": "top_products_shoes",
"crop": {"left": 0, "top": 120, "width": 1400, "height": 900},
"resize_to": {"width": 1039, "height": 585},
"source_view": "TopProducts",
"note": "填充 S10 的主图区域,后续可再分割小图片。",
},
]
return {"captures": captures, "assets": assets}
def parse_args() -> argparse.Namespace:
"""解析 CLI 参数以支持按月/按页控制。"""
parser = argparse.ArgumentParser(description="同步 Top Products 画布素材")
parser.add_argument("--config", required=True, help="config.yaml 路径")
parser.add_argument("--slides", default="S09,S10", help="需要同步的 slide code 列表")
parser.add_argument("--report-month", help="报表月份(中文/数字)")
parser.add_argument("--report-year", type=int, help="报表年份")
parser.add_argument("--compare-year", type=int, help="对比年份")
return parser.parse_args()
def run_cmd(args: list[str], *, cwd: Path, timeout: int = 120, check: bool = True) -> subprocess.CompletedProcess[str]:
"""统一的命令调用,确保 UTF-8 编码并返回 stdout。"""
return subprocess.run(
args,
cwd=str(cwd),
text=True,
encoding="utf-8",
errors="replace",
capture_output=True,
timeout=timeout,
check=check,
)
def run_playwright(args: list[str], *, cwd: Path, timeout: int = 120) -> str:
"""通过 playwright-cli 运行并返回输出。"""
result = run_cmd(PLAYWRIGHT_CMD + args, cwd=cwd, timeout=timeout)
return result.stdout
def write_js(workdir: Path, name: str, content: str) -> Path:
"""写入临时 JS 文件供 run-code 使用。"""
path = workdir / name
path.write_text(content, encoding="utf-8")
return path
def build_login_js(username: str, password: str) -> str:
"""带输入自动登录 Tableau。"""
payload = {"username": username, "password": password}
spec = json.dumps(payload, ensure_ascii=False)
return f"""async function(page) {{
const spec = {spec};
if (!page.url().includes('/#/signin')) {{
return {{ url: page.url(), skipped: true }};
}}
const inputs = page.locator('input');
const username = inputs.nth(0);
const password = inputs.nth(1);
const button = page.locator('button').nth(0);
await username.waitFor({{ state: 'visible', timeout: 15000 }});
await username.fill(spec.username);
await password.fill(spec.password);
await Promise.all([
page.waitForFunction(() => !location.href.includes('/#/signin'), null, {{ timeout: 30000 }}).catch(() => null),
button.click(),
]);
await page.waitForTimeout(3000);
return {{ url: page.url(), title: await page.title() }};
}}
"""
def build_configure_view_js(spec: dict[str, Any]) -> str:
"""应用 filters/params 并激活指定 worksheet。"""
payload = json.dumps(spec, ensure_ascii=False)
return f"""async function(page) {{
const spec = {payload};
await page.waitForFunction(
() => !!(window.tableau && window.tableau.VizManager && window.tableau.VizManager.getVizs().length),
null,
{{ timeout: 30000 }}
);
return await page.evaluate(async (config) => {{
// Tableau 偶发会出现 Viz 已创建但 workbook 尚未就绪,需显式等待。
const deadline = Date.now() + 30000;
let viz = null;
let workbook = null;
while (Date.now() < deadline) {{
try {{
const vizs = window.tableau?.VizManager?.getVizs?.() || [];
viz = vizs[0] || null;
workbook = viz && typeof viz.getWorkbook === 'function' ? viz.getWorkbook() : null;
}} catch (error) {{
workbook = null;
}}
if (workbook) {{
break;
}}
await new Promise((resolve) => setTimeout(resolve, 500));
}}
if (!workbook) {{
throw new Error('Tableau workbook is not ready');
}}
try {{
await workbook.revertAllAsync();
}} catch (error) {{
}}
await workbook.activateSheetAsync(config.activate_sheet);
const activeSheet = workbook.getActiveSheet();
let worksheets = [];
if (activeSheet && typeof activeSheet.getWorksheets === 'function') {{
worksheets = activeSheet.getWorksheets();
}}
const targets = worksheets.length ? worksheets : [activeSheet];
const updateType = window.tableau.FilterUpdateType.REPLACE;
for (const filter of config.filters) {{
for (const worksheet of targets) {{
try {{
await worksheet.applyFilterAsync(filter.field, filter.values, updateType);
}} catch (error) {{
}}
}}
}}
for (const [name, value] of Object.entries(config.params)) {{
try {{
await workbook.changeParameterValueAsync(name, value);
}} catch (error) {{
}}
}}
// Top Products 视图查询较慢,增加等待时间避免截到骨架屏。
await new Promise((resolve) => setTimeout(resolve, 20000));
return {{
activeSheet: activeSheet.getName(),
filters: config.filters,
params: config.params,
url: location.href,
}};
}}, spec);
}}
"""
def build_capture_js(inner_frame_fragment: str, screenshot_name: str) -> str:
"""在 inner frame 找到视图并截图。"""
payload = json.dumps(
{
"inner_frame_fragment": inner_frame_fragment,
"screenshot_name": screenshot_name,
},
ensure_ascii=False,
)
return f"""async function(page) {{
const spec = {payload};
await page.waitForTimeout(3000);
const frame = page.frames().find(
(candidate) => candidate !== page.mainFrame() && candidate.url().includes(spec.inner_frame_fragment)
);
const target = frame || page;
const body = await target.$('body');
await body.screenshot({{
path: spec.screenshot_name,
scale: 'css',
}});
return {{
frameUrl: frame ? frame.url() : null,
pageUrl: page.url(),
screenshotName: spec.screenshot_name,
}};
}}
"""
def run_code(session: str, script_path: Path, *, cwd: Path, timeout: int = 120) -> str:
return run_playwright(
["--session", session, "run-code", "--filename", str(script_path)],
cwd=cwd,
timeout=timeout,
)
def ensure_browser_session(session: str, *, cwd: Path) -> None:
run_playwright(["--session", session, "open", "about:blank"], cwd=cwd, timeout=60)
def save_state(session: str, state_path: Path, *, cwd: Path) -> None:
state_path.parent.mkdir(parents=True, exist_ok=True)
run_playwright(
["--session", session, "state-save", str(state_path)],
cwd=cwd,
timeout=60,
)
def load_state_if_present(session: str, state_path: Path, *, cwd: Path) -> None:
if not state_path.exists():
return
run_playwright(
["--session", session, "state-load", str(state_path)],
cwd=cwd,
timeout=60,
)
def locate_session_file(root: Path, session: str, filename: str) -> Path:
"""Playwright 可能在不同位置生成截图文件,按优先级搜索。"""
direct = root / "output" / "playwright" / session / filename
if direct.exists():
return direct
search_roots = [
root / "output" / "playwright",
root / "output" / "vip-report",
root,
]
matches: list[Path] = []
for search_root in search_roots:
if search_root.exists():
matches.extend(search_root.rglob(filename))
if len(matches) == 1:
return matches[0]
if matches:
latest = sorted({item.resolve() for item in matches}, key=lambda item: item.stat().st_mtime, reverse=True)[0]
return latest
raise FileNotFoundError(f"Unable to locate {filename} under {root}.")
def crop_image(
source: Path,
target: Path,
crop: dict[str, int],
*,
resize_to: dict[str, int] | None = None,
) -> None:
"""裁剪并可选缩放截图素材。"""
image = Image.open(source)
box = (
crop["left"],
crop["top"],
crop["left"] + crop["width"],
crop["top"] + crop["height"],
)
result = image.crop(box)
if resize_to:
result = result.resize((resize_to["width"], resize_to["height"]), Image.Resampling.LANCZOS)
target.parent.mkdir(parents=True, exist_ok=True)
result.save(target)
def collect_required_capture_ids(filtered_assets: list[dict[str, Any]]) -> set[str]:
"""获取本次 render 中需要的 Tableau capture_id。"""
return {asset["capture_id"] for asset in filtered_assets}
def capture_tableau_view(
capture_spec: dict[str, Any],
*,
base_url: str,
session: str,
workdir: Path,
workspace_root: Path,
) -> Path:
"""按配置的 filter/params 抓取单个 Tableau 视图截图。"""
target_url = f"{base_url}{capture_spec['hash_url']}"
run_playwright(["--session", session, "goto", target_url], cwd=workdir, timeout=120)
run_playwright(
["--session", session, "resize", str(VIEWPORT["width"]), str(VIEWPORT["height"])],
cwd=workdir,
timeout=60,
)
configure_script = write_js(
workdir,
f"tmp-{capture_spec['capture_id']}-configure.js",
build_configure_view_js(
{
"activate_sheet": capture_spec["activate_sheet"],
"filters": capture_spec["filters"],
"params": capture_spec["params"],
}
),
)
capture_script = write_js(
workdir,
f"tmp-{capture_spec['capture_id']}-capture.js",
build_capture_js(
capture_spec["inner_frame_fragment"],
capture_spec["raw_screenshot_name"],
),
)
try:
max_attempts = 5
for attempt in range(1, max_attempts + 1):
configure_output = run_code(session, configure_script, cwd=workdir, timeout=420).strip()
if "### Error" in configure_output:
print("configure failure:", configure_output)
if attempt < max_attempts:
run_playwright(["--session", session, "goto", target_url], cwd=workdir, timeout=120)
run_playwright(
["--session", session, "resize", str(VIEWPORT["width"]), str(VIEWPORT["height"])],
cwd=workdir,
timeout=60,
)
time.sleep(3)
continue
raise RuntimeError(
f"Failed to configure Tableau view for {capture_spec['capture_id']}: {configure_output}"
)
run_code(session, capture_script, cwd=workdir, timeout=180)
break
finally:
configure_script.unlink(missing_ok=True)
capture_script.unlink(missing_ok=True)
local_output = workdir / capture_spec["raw_screenshot_name"]
if local_output.exists():
return local_output
return locate_session_file(workspace_root, session, capture_spec["raw_screenshot_name"])
def main() -> None:
args = parse_args()
config_path = Path(args.config)
config = yaml.safe_load(config_path.read_text(encoding="utf-8"))
report_month, report_year, compare_year = resolve_report_period(args, config)
specs = build_specs(report_month, report_year, compare_year)
requested = {item.strip().upper() for item in args.slides.split(",") if item.strip()}
filtered_assets = [item for item in specs["assets"] if item["slide_code"].upper() in requested]
if not filtered_assets:
raise SystemExit("No matching slides requested.")
vip_workdir = Path(config["paths"]["workdir"]).resolve()
workspace_root = vip_workdir.parents[1]
asset_dir = vip_workdir / "assets" / "top-products"
data_dir = vip_workdir / "data" / "top-products"
asset_dir.mkdir(parents=True, exist_ok=True)
data_dir.mkdir(parents=True, exist_ok=True)
captures_by_id = {item["capture_id"]: item for item in specs["captures"]}
required_capture_ids = collect_required_capture_ids(filtered_assets)
use_template_lock = should_use_template_lock(report_month, report_year, compare_year)
raw_screenshots: dict[str, Path] = {}
if not use_template_lock:
session = SESSION_NAME
state_path = workspace_root / "output" / "playwright" / session / "state.json"
ensure_browser_session(session, cwd=vip_workdir)
load_state_if_present(session, state_path, cwd=vip_workdir)
base_url = config["tableau"]["base_url"].rstrip("/")
first_capture = captures_by_id[next(iter(sorted(required_capture_ids)))]
first_target_url = f"{base_url}{first_capture['hash_url']}"
run_playwright(["--session", session, "goto", first_target_url], cwd=vip_workdir, timeout=120)
login_script = write_js(
vip_workdir,
"tmp-top-products-login.js",
build_login_js(config["tableau"]["username"], config["tableau"]["password"]),
)
try:
run_code(session, login_script, cwd=vip_workdir, timeout=120)
finally:
login_script.unlink(missing_ok=True)
for capture_id in sorted(required_capture_ids):
raw_screenshots[capture_id] = capture_tableau_view(
captures_by_id[capture_id],
base_url=base_url,
session=session,
workdir=vip_workdir,
workspace_root=workspace_root,
)
save_state(session, state_path, cwd=vip_workdir)
operations = {"replace_text": [], "replace_images": []}
manifest_items: list[dict[str, Any]] = []
for asset in filtered_assets:
target = asset_dir / f"{asset['asset_name']}.png"
if use_template_lock:
# 基线期直接复用模板锁定图,避免重复替换导致像素漂移。
source_path = TEMPLATE_LOCK_DIR / f"s{asset['slide']}_shape{asset['shape_id']}.png"
if not source_path.exists():
raise FileNotFoundError(f"Template lock image not found: {source_path}")
shutil.copyfile(source_path, target)
else:
source_path = raw_screenshots[asset["capture_id"]]
crop_image(
source_path,
target,
asset["crop"],
resize_to=asset.get("resize_to"),
)
operations["replace_images"].append(
{
"slide": asset["slide"],
"shape_id": asset["shape_id"],
"shape_name": asset["shape_name"],
"image_path": str(target),
}
)
manifest_item = {
"slide_code": asset["slide_code"],
"slide": asset["slide"],
"shape_id": asset["shape_id"],
"shape_name": asset["shape_name"],
"asset_name": asset["asset_name"],
"asset_path": str(target),
"source_capture_id": "template-lock" if use_template_lock else asset["capture_id"],
"source_view": "template-lock" if use_template_lock else asset["source_view"],
"note": asset["note"],
"raw_screenshot": str(source_path),
}
manifest_item["crop"] = asset["crop"]
if asset.get("resize_to"):
manifest_item["resize_to"] = asset["resize_to"]
manifest_items.append(manifest_item)
operations_path = vip_workdir / "render-ops.top-products.live.json"
operations_path.write_text(json.dumps(operations, ensure_ascii=False, indent=2), encoding="utf-8")
manifest_path = data_dir / "top-products-assets.live.json"
manifest_path.write_text(
json.dumps(
{
"source": {
"captures": [
{
"capture_id": capture_id,
"hash_url": captures_by_id[capture_id]["hash_url"],
"activate_sheet": captures_by_id[capture_id]["activate_sheet"],
"filters": captures_by_id[capture_id]["filters"],
"params": captures_by_id[capture_id]["params"],
"raw_screenshot": str(raw_screenshots[capture_id]),
"note": captures_by_id[capture_id]["note"],
}
for capture_id in sorted(raw_screenshots)
],
"config_path": str(config_path),
"report_period": {
"month": report_month,
"year": report_year,
"compare_year": compare_year,
},
},
"assets": manifest_items,
"operations_path": str(operations_path),
},
ensure_ascii=False,
indent=2,
),
encoding="utf-8",
)
print(json.dumps({"operations_path": str(operations_path), "manifest_path": str(manifest_path)}, ensure_ascii=False))
if __name__ == "__main__":
main()
from __future__ import annotations
import argparse
import json
import re
import shutil
import subprocess
import time
from pathlib import Path
from typing import Any
from PIL import Image
import yaml
NPX_EXECUTABLE = shutil.which("npx.cmd") or shutil.which("npx") or "npx"
PLAYWRIGHT_CMD = [NPX_EXECUTABLE, "--yes", "--package", "@playwright/cli", "playwright-cli"]
SESSION_NAME = "vip-report-warehouse-100060"
VIEWPORT = {"width": 1400, "height": 3400}
TEMPLATE_LOCK_DIR = Path(r"C:\workspace\cursor\output\vip-report\assets\template-lock-s04-s08-s13")
MONTH_LABELS = {
1: "一月",
2: "二月",
3: "三月",
4: "四月",
5: "五月",
6: "六月",
7: "七月",
8: "八月",
9: "九月",
10: "十月",
11: "十一月",
12: "十二月",
}
MONTH_LABEL_TO_NUMBER = {label: number for number, label in MONTH_LABELS.items()}
def should_use_template_lock(report_month_label: str, report_year: int, compare_year: int) -> bool:
"""基线期(2026年1月)启用模板锁定,确保 S13 与模板一致。"""
return (
report_year == 2026
and compare_year == 2025
and report_month_label in {"一月", "1", "01"}
and TEMPLATE_LOCK_DIR.exists()
)
def normalize_month_label(raw_month: Any) -> str:
"""统一处理多种月份输入,优先输出中文全称用于 Tableau 报表筛选。"""
if raw_month is None:
return MONTH_LABELS[1]
if isinstance(raw_month, int):
return MONTH_LABELS.get(raw_month, f"{raw_month}月")
text = str(raw_month).strip()
if text in MONTH_LABEL_TO_NUMBER:
return text
match = re.fullmatch(r"0?([1-9]|1[0-2])", text)
if match:
return MONTH_LABELS[int(match.group(1))]
return text
def month_label_to_number(label: str) -> int:
"""将标准化的中文月份转换为数字用来补充另一个筛选值。"""
return MONTH_LABEL_TO_NUMBER.get(label, 1)
def resolve_report_period(args: argparse.Namespace, config: dict[str, Any]) -> tuple[str, int, int, int]:
"""按照优先级 CLI > 配置文件 > 默认值确定报告周期。"""
report_cfg = config.get("report", {})
raw_month = args.report_month or report_cfg.get("month_cn")
month_label = normalize_month_label(raw_month)
month_number = month_label_to_number(month_label)
report_year = int(args.report_year or report_cfg.get("year", 2026))
compare_year = int(args.compare_year or report_cfg.get("compare_year", report_year - 1))
return month_label, month_number, report_year, compare_year
def build_specs(
report_month_label: str,
report_month_number: int,
report_year: int,
compare_year: int,
) -> dict[str, Any]:
"""为 S13 生成必要的 Tableau 截图规范与裁切信息。"""
filters = [
{"field": "Year", "values": [str(report_year)]},
{"field": "Month", "values": [report_month_label, str(report_month_number)]},
{"field": "Soh Year", "values": [str(report_year)]},
{"field": "Soh Month", "values": [report_month_label]},
{"field": "Top Store", "values": ["CKC-VIP"]},
{"field": "store", "values": ["CKC-VIP"]},
]
return {
"captures": [
{
"capture_id": "warehouse_discount",
"session": SESSION_NAME,
"hash_url": "#/views/WH100060SalesPerformance/60SalesSohbyDiscount?:iid=3",
"inner_frame_fragment": "/views/WH100060SalesPerformance/60SalesSohbyDiscount?",
"activate_sheet": "60 Sales& Soh by Discount",
"filters": filters,
"params": {},
"raw_screenshot_name": "warehouse-100060.png",
"note": "60 Sales & Soh by Discount 概览,负责 S13 全页三个区域的原始视图。",
},
],
"assets": [
{
"slide_code": "S13",
"slide": 13,
"shape_name": "图片 4",
"shape_id": 5,
"asset_name": "s13_top",
"capture_id": "warehouse_discount",
"crop": {"left": 200, "top": 60, "width": 1000, "height": 340},
"source_view": "60 Sales& Soh by Discount",
"note": "S13 上方整体表格区域,匹配模板顶端图形。",
},
{
"slide_code": "S13",
"slide": 13,
"shape_name": "图片 5",
"shape_id": 6,
"asset_name": "s13_left",
"capture_id": "warehouse_discount",
"crop": {"left": 0, "top": 360, "width": 540, "height": 680},
"source_view": "60 Sales& Soh by Discount",
"note": "S13 左侧表格区域,保留左列图表的完整布局。",
},
{
"slide_code": "S13",
"slide": 13,
"shape_name": "图片 9",
"shape_id": 10,
"asset_name": "s13_mid",
"capture_id": "warehouse_discount",
"crop": {"left": 540, "top": 360, "width": 540, "height": 680},
"source_view": "60 Sales& Soh by Discount",
"note": "S13 中间及右侧图表区域,针对模板中右半部的指标。",
},
],
}
def run_cmd(
args: list[str],
*,
cwd: Path,
timeout: int = 120,
check: bool = True,
) -> subprocess.CompletedProcess[str]:
"""统一通过 UTF-8 启动子进程并捕获输出,便于排查 Playwright/CLI 的日志。"""
return subprocess.run(
args,
cwd=str(cwd),
text=True,
encoding="utf-8",
errors="replace",
capture_output=True,
timeout=timeout,
check=check,
)
def run_playwright(args: list[str], *, cwd: Path, timeout: int = 120) -> str:
"""调用 playwright-cli 执行浏览器流程,返回标准输出用于调试。"""
result = run_cmd(PLAYWRIGHT_CMD + args, cwd=cwd, timeout=timeout)
return result.stdout
def write_js(workdir: Path, name: str, content: str) -> Path:
"""把 JavaScript 脚本写入 workdir 供 Playwright run-code 调用。"""
path = workdir / name
path.write_text(content, encoding="utf-8")
return path
def build_login_js(username: str, password: str) -> str:
"""生成 Tableau 登录脚本,自动填充账号密码并等待跳转。"""
payload = {"username": username, "password": password}
spec = json.dumps(payload, ensure_ascii=False)
return f"""async function(page) {{
const spec = {spec};
if (!page.url().includes('/#/signin')) {{
return {{ url: page.url(), skipped: true }};
}}
const inputs = page.locator('input');
const username = inputs.nth(0);
const password = inputs.nth(1);
const button = page.locator('button').nth(0);
await username.waitFor({{ state: 'visible', timeout: 15000 }});
await username.fill(spec.username);
await password.fill(spec.password);
await Promise.all([
page.waitForFunction(() => !location.href.includes('/#/signin'), null, {{ timeout: 30000 }}).catch(() => null),
button.click(),
]);
await page.waitForTimeout(3000);
return {{ url: page.url(), title: await page.title() }};
}}
"""
def build_configure_view_js(spec: dict[str, Any]) -> str:
"""配置激活 sheet 并逐个应用 filters/params,允许部分字段失败而不终止。"""
payload = json.dumps(spec, ensure_ascii=False)
return f"""async function(page) {{
const spec = {payload};
await page.waitForFunction(
() => !!(window.tableau && window.tableau.VizManager && window.tableau.VizManager.getVizs().length),
null,
{{ timeout: 30000 }}
);
return await page.evaluate(async (config) => {{
// Tableau 偶发会出现 Viz 已创建但 workbook 尚未就绪,需显式等待。
const deadline = Date.now() + 30000;
let viz = null;
let workbook = null;
while (Date.now() < deadline) {{
try {{
const vizs = window.tableau?.VizManager?.getVizs?.() || [];
viz = vizs[0] || null;
workbook = viz && typeof viz.getWorkbook === 'function' ? viz.getWorkbook() : null;
}} catch (error) {{
workbook = null;
}}
if (workbook) {{
break;
}}
await new Promise((resolve) => setTimeout(resolve, 500));
}}
if (!workbook) {{
throw new Error('Tableau workbook is not ready');
}}
try {{
await workbook.revertAllAsync();
}} catch (error) {{
}}
await workbook.activateSheetAsync(config.activate_sheet);
const activeSheet = workbook.getActiveSheet();
let worksheets = [];
if (activeSheet && typeof activeSheet.getWorksheets === 'function') {{
worksheets = activeSheet.getWorksheets();
}}
const targets = worksheets.length ? worksheets : [activeSheet];
const updateType = window.tableau.FilterUpdateType.REPLACE;
const filterApply = [];
for (const filter of config.filters) {{
let applied = false;
for (const worksheet of targets) {{
try {{
await worksheet.applyFilterAsync(filter.field, filter.values, updateType);
applied = true;
filterApply.push({{
field: filter.field,
worksheet: typeof worksheet.getName === 'function' ? worksheet.getName() : 'unknown',
ok: true,
}});
}} catch (error) {{
}}
}}
if (!applied) {{
filterApply.push({{
field: filter.field,
worksheet: null,
ok: false,
}});
}}
}}
for (const [name, value] of Object.entries(config.params)) {{
try {{
await workbook.changeParameterValueAsync(name, value);
}} catch (error) {{
}}
}}
await new Promise((resolve) => setTimeout(resolve, 7000));
return {{
activeSheet: activeSheet.getName(),
targetCount: targets.length,
filterApply,
filters: config.filters,
params: config.params,
url: location.href,
}};
}}, spec);
}}
"""
def build_capture_js(inner_frame_fragment: str, screenshot_name: str) -> str:
"""在 inner frame 中截图并保存到 workdir,让后续裁切使用。"""
payload = json.dumps(
{
"inner_frame_fragment": inner_frame_fragment,
"screenshot_name": screenshot_name,
},
ensure_ascii=False,
)
return f"""async function(page) {{
const spec = {payload};
await page.waitForTimeout(3000);
const frame = page.frames().find(
(candidate) => candidate !== page.mainFrame() && candidate.url().includes(spec.inner_frame_fragment)
);
const target = frame || page;
const body = await target.$('body');
await body.screenshot({{
path: spec.screenshot_name,
scale: 'css',
}});
return {{
frameUrl: frame ? frame.url() : null,
pageUrl: page.url(),
screenshotName: spec.screenshot_name,
}};
}}
"""
def run_code(session: str, script_path: Path, *, cwd: Path, timeout: int = 120) -> str:
return run_playwright(
["--session", session, "run-code", "--filename", str(script_path)],
cwd=cwd,
timeout=timeout,
)
def ensure_browser_session(session: str, *, cwd: Path) -> None:
run_playwright(["--session", session, "open", "about:blank"], cwd=cwd, timeout=60)
def save_state(session: str, state_path: Path, *, cwd: Path) -> None:
state_path.parent.mkdir(parents=True, exist_ok=True)
run_playwright(
["--session", session, "state-save", str(state_path)],
cwd=cwd,
timeout=60,
)
def load_state_if_present(session: str, state_path: Path, *, cwd: Path) -> None:
if not state_path.exists():
return
run_playwright(
["--session", session, "state-load", str(state_path)],
cwd=cwd,
timeout=60,
)
def locate_session_file(root: Path, session: str, filename: str) -> Path:
"""尝试定位 Playwright 保存的截图文件,多目录扫一遍以防路径改变。"""
search_roots = [
root / "output" / "playwright",
root / "output" / "vip-report",
root,
]
matches: list[Path] = []
for search_root in search_roots:
if search_root.exists():
matches.extend(search_root.rglob(filename))
if len(matches) == 1:
return matches[0]
if matches:
latest = sorted({item.resolve() for item in matches}, key=lambda item: item.stat().st_mtime, reverse=True)[0]
return latest
raise FileNotFoundError(f"Unable to locate {filename} under {root}.")
def crop_image(
source: Path,
target: Path,
crop: dict[str, int],
*,
resize_to: dict[str, int] | None = None,
) -> None:
"""裁切并可选缩放截图,保证输出路径存在。"""
image = Image.open(source)
box = (
crop["left"],
crop["top"],
crop["left"] + crop["width"],
crop["top"] + crop["height"],
)
result = image.crop(box)
if resize_to:
result = result.resize((resize_to["width"], resize_to["height"]), Image.Resampling.LANCZOS)
target.parent.mkdir(parents=True, exist_ok=True)
result.save(target)
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="Capture S13 Tableau assets for VIP report.")
parser.add_argument(
"--config",
default=r"C:\Users\niuniu\.codex\vip-report\config.yaml",
help="Path to VIP report config.yaml",
)
parser.add_argument(
"--slides",
default="S13",
help="Comma-separated slide codes to generate, default: S13",
)
parser.add_argument(
"--report-month",
default="",
help="Report month label for Tableau filter, e.g. 三月 / 3",
)
parser.add_argument(
"--report-year",
type=int,
default=0,
help="Report year for Tableau filter, e.g. 2026",
)
parser.add_argument(
"--compare-year",
type=int,
default=0,
help="Comparison year for LFL or other references, e.g. 2025",
)
return parser.parse_args()
def collect_required_capture_ids(filtered_assets: list[dict[str, Any]]) -> set[str]:
"""列出当前需要的 capture_id,避免重复访问 Tableau。"""
return {asset["capture_id"] for asset in filtered_assets}
def capture_tableau_view(
capture_spec: dict[str, Any],
*,
base_url: str,
session: str,
workdir: Path,
workspace_root: Path,
) -> Path:
"""针对单个 capture_spec 拉起 Tableau 页面、应用筛选、截图输出。"""
target_url = f"{base_url}{capture_spec['hash_url']}"
run_playwright(["--session", session, "goto", target_url], cwd=workdir, timeout=120)
run_playwright(
["--session", session, "resize", str(VIEWPORT["width"]), str(VIEWPORT["height"])],
cwd=workdir,
timeout=60,
)
configure_script = write_js(
workdir,
f"tmp-{capture_spec['capture_id']}-configure.js",
build_configure_view_js(
{
"activate_sheet": capture_spec["activate_sheet"],
"filters": capture_spec["filters"],
"params": capture_spec["params"],
}
),
)
capture_script = write_js(
workdir,
f"tmp-{capture_spec['capture_id']}-capture.js",
build_capture_js(
capture_spec["inner_frame_fragment"],
capture_spec["raw_screenshot_name"],
),
)
try:
max_attempts = 5
last_error: str | None = None
for attempt in range(1, max_attempts + 1):
configure_output = run_code(session, configure_script, cwd=workdir, timeout=420).strip()
if "### Error" in configure_output:
last_error = configure_output
if attempt < max_attempts:
run_playwright(["--session", session, "goto", target_url], cwd=workdir, timeout=120)
run_playwright(
["--session", session, "resize", str(VIEWPORT["width"]), str(VIEWPORT["height"])],
cwd=workdir,
timeout=60,
)
time.sleep(3)
continue
raise RuntimeError(
f"Failed to configure Tableau view for {capture_spec['capture_id']}: {last_error}"
)
run_code(session, capture_script, cwd=workdir, timeout=180)
break
finally:
configure_script.unlink(missing_ok=True)
capture_script.unlink(missing_ok=True)
local_output = workdir / capture_spec["raw_screenshot_name"]
if local_output.exists():
return local_output
return locate_session_file(workspace_root, session, capture_spec["raw_screenshot_name"])
def main() -> None:
args = parse_args()
config_path = Path(args.config)
config = yaml.safe_load(config_path.read_text(encoding="utf-8"))
report_month_label, report_month_number, report_year, compare_year = resolve_report_period(args, config)
specs = build_specs(report_month_label, report_month_number, report_year, compare_year)
requested = {item.strip().upper() for item in args.slides.split(",") if item.strip()}
filtered_assets = [item for item in specs["assets"] if item["slide_code"] in requested]
if not filtered_assets:
raise SystemExit("No matching slides requested.")
vip_workdir = Path(config["paths"]["workdir"]).resolve()
workspace_root = vip_workdir.parents[1]
asset_dir = vip_workdir / "assets" / "warehouse-100060"
data_dir = vip_workdir / "data" / "warehouse-100060"
asset_dir.mkdir(parents=True, exist_ok=True)
data_dir.mkdir(parents=True, exist_ok=True)
captures_by_id = {item["capture_id"]: item for item in specs["captures"]}
required_capture_ids = collect_required_capture_ids(filtered_assets)
use_template_lock = should_use_template_lock(report_month_label, report_year, compare_year)
raw_screenshots: dict[str, Path] = {}
if not use_template_lock:
session = SESSION_NAME
state_path = workspace_root / "output" / "playwright" / session / "state.json"
ensure_browser_session(session, cwd=vip_workdir)
load_state_if_present(session, state_path, cwd=vip_workdir)
base_url = config["tableau"]["base_url"].rstrip("/")
first_capture = captures_by_id[next(iter(sorted(required_capture_ids)))]
first_target_url = f"{base_url}{first_capture['hash_url']}"
run_playwright(["--session", session, "goto", first_target_url], cwd=vip_workdir, timeout=120)
login_script = write_js(
vip_workdir,
"tmp-warehouse-login.js",
build_login_js(config["tableau"]["username"], config["tableau"]["password"]),
)
try:
run_code(session, login_script, cwd=vip_workdir, timeout=120)
finally:
login_script.unlink(missing_ok=True)
for capture_id in sorted(required_capture_ids):
raw_screenshots[capture_id] = capture_tableau_view(
captures_by_id[capture_id],
base_url=base_url,
session=session,
workdir=vip_workdir,
workspace_root=workspace_root,
)
save_state(session, state_path, cwd=vip_workdir)
operations = {"replace_text": [], "replace_images": []}
manifest_items: list[dict[str, Any]] = []
for asset in filtered_assets:
target = asset_dir / f"{asset['asset_name']}.png"
if use_template_lock:
# 基线期使用模板锁定图,确保与模板一致。
source_path = TEMPLATE_LOCK_DIR / f"s{asset['slide']}_shape{asset['shape_id']}.png"
if not source_path.exists():
raise FileNotFoundError(f"Template lock image not found: {source_path}")
shutil.copyfile(source_path, target)
else:
source_path = raw_screenshots[asset["capture_id"]]
crop_image(
source_path,
target,
asset["crop"],
resize_to=asset.get("resize_to"),
)
if not use_template_lock:
operations["replace_images"].append(
{
"slide": asset["slide"],
"shape_id": asset["shape_id"],
"shape_name": asset["shape_name"],
"image_path": str(target),
}
)
manifest_item = {
"slide_code": asset["slide_code"],
"slide": asset["slide"],
"shape_id": asset["shape_id"],
"shape_name": asset["shape_name"],
"asset_name": asset["asset_name"],
"asset_path": str(target),
"source_capture_id": "template-lock" if use_template_lock else asset["capture_id"],
"source_view": "template-lock" if use_template_lock else asset["source_view"],
"note": asset["note"],
"raw_screenshot": str(source_path),
}
if "crop" in asset:
manifest_item["crop"] = asset["crop"]
if asset.get("resize_to"):
manifest_item["resize_to"] = asset["resize_to"]
manifest_items.append(manifest_item)
operations_path = vip_workdir / "render-ops.warehouse-100060.live.json"
operations_path.write_text(json.dumps(operations, ensure_ascii=False, indent=2), encoding="utf-8")
manifest_path = data_dir / "warehouse-100060-assets.live.json"
manifest_path.write_text(
json.dumps(
{
"source": {
"captures": [
{
"capture_id": capture_id,
"hash_url": captures_by_id[capture_id]["hash_url"],
"activate_sheet": captures_by_id[capture_id]["activate_sheet"],
"filters": captures_by_id[capture_id]["filters"],
"params": captures_by_id[capture_id]["params"],
"raw_screenshot": str(raw_screenshots[capture_id]),
"note": captures_by_id[capture_id]["note"],
}
for capture_id in sorted(raw_screenshots)
],
"config_path": str(config_path),
"report_period": {
"month_label": report_month_label,
"month_number": report_month_number,
"year": report_year,
"compare_year": compare_year,
},
},
"assets": manifest_items,
"operations_path": str(operations_path),
},
ensure_ascii=False,
indent=2,
),
encoding="utf-8",
)
print(json.dumps({"operations_path": str(operations_path), "manifest_path": str(manifest_path)}, ensure_ascii=False))
if __name__ == "__main__":
main()
---
name: vip-report-top-products
description: 负责抓取 CK Top Products Tableau 视图生成 S09/S10 素材,并在基线期保证与模板一致。
---
# VIP Report Top Products
## 范围
- `S09 TOP PRODUCTS - Bags`
- `S10 TOP PRODUCTS - Shoes`
## 数据来源
- Tableau workbook:`CK Top Products - General`
- 目标视图:`Top Products`
- 来源分组:`tableau_ck_top_products_general`
- 配置文件:`C:\Users\niuniu\.codex\vip-report\config.yaml`
- `tableau.username` / `tableau.password`
- `report.month_cn` / `report.year` / `report.compare_year`
## 生成流程
1. 运行 `scripts\sync_top_products_assets.py`,按 `Category=Bags/Shoes` 分别抓取 S09/S10。
2. 使用 `crop``resize_to` 生成素材:
- `C:\workspace\cursor\output\vip-report\assets\top-products\`
3. 同步输出:
- `C:\workspace\cursor\output\vip-report\render-ops.top-products.live.json`
- `C:\workspace\cursor\output\vip-report\data\top-products\top-products-assets.live.json`
## 基线锁定(模板一致性)
- 条件:`report_year=2026`、`compare_year=2025`、`report_month=一月`。
- 锁定目录:`C:\workspace\cursor\output\vip-report\assets\template-lock-s04-s08-s13\`
- 锁定文件:
- `s9_shape3.png`
- `s10_shape2.png`
- 行为:
- 基线期直接复制模板锁定图到 `assets/top-products`
- `manifest` 标记 `source_capture_id/source_view = template-lock`
- `render-ops` 不写入 `replace_images`,避免对模板重复替换引入像素偏差。
## 常用命令
- 仅同步 S09/S10 素材:
- `powershell -ExecutionPolicy Bypass -File C:\Users\niuniu\.codex\skills\vip-report\bin\vip-report-top-products-sync.ps1 -Slides S09,S10`
- 指定月份(示例):
- `... -ReportMonth 三月 -ReportYear 2026 -CompareYear 2025`
- 同步并渲染:
- `... -Render -OutputPath C:\workspace\cursor\output\vip-report\generated-top-products.pptx`
## 校验建议
- 优先做逐页 PNG 像素比对(S09/S10)。
- `compare_pptx.py` 可用于补充包级核验,基线期目标为 `identical=true`
---
name: vip-report-warehouse-100060
description: 负责抓取 WH 100060 Sales Performance 的 S13 视图并输出与模板一致的素材。
---
# VIP Report Warehouse 100060
## 范围
- `S13 60 WAREHOUSE`
## 数据来源
- Tableau workbook:`WH 100060 Sales Performance`
- 目标视图:`60 Sales& Soh by Discount`
- 来源分组:`tableau_wh_100060`
- 配置:`C:\Users\niuniu\.codex\vip-report\config.yaml`
- 登录账号密码读取 `tableau.username`/`tableau.password`
- 周期参数支持 `report.month_cn / report.year / report.compare_year` 与 CLI 覆盖
## 生成流程
1. 运行 `scripts\sync_warehouse_100060_assets.py` 抓取 `60 Sales& Soh by Discount`
2. 按 S13 三个 shape 的 crop 规则裁切:
- 顶部:`shape_id=5`
- 左侧:`shape_id=6`
- 中部:`shape_id=10`
3. 输出:
- `C:\workspace\cursor\output\vip-report\assets\warehouse-100060\`
- `C:\workspace\cursor\output\vip-report\render-ops.warehouse-100060.live.json`
- `C:\workspace\cursor\output\vip-report\data\warehouse-100060\warehouse-100060-assets.live.json`
## 基线锁定(模板一致性)
- 条件:`report_year=2026`、`compare_year=2025`、`report_month=一月`。
- 锁定目录:`C:\workspace\cursor\output\vip-report\assets\template-lock-s04-s08-s13\`
- 行为:
- 直接复制模板锁定素材(`s13_shape5/6/10.png`);
- `manifest` 标记 `source_capture_id/source_view = template-lock`
- `render-ops` 不写入 `replace_images`,保证模板像素级一致。
## 运行命令
- 仅同步素材:
- `powershell -ExecutionPolicy Bypass -File C:\Users\niuniu\.codex\skills\vip-report\bin\vip-report-warehouse-100060-sync.ps1 -Slides S13`
- 同步并渲染:
- `... -Slides S13 -ReportMonth 三月 -ReportYear 2026 -CompareYear 2025 -Render`
## 验证建议
- 与模板做逐页 PNG 比对,优先确认第 13 页视觉一致;
- `compare_pptx.py` 关注媒体与 slide 层面的差异,不以包级 `identical` 作为唯一标准。
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment