Commit 89ec371c authored by allen.wang's avatar allen.wang

feat:init

parent 62507a1d
param(
[string]$ConfigPath = "C:\Users\niuniu\.codex\vip-report\config.yaml",
[string]$Slides = "S09,S10",
[string]$Slides = "S10",
[string]$ReportMonth = "",
[int]$ReportYear = 0,
[int]$CompareYear = 0,
[switch]$Render,
[string]$TemplatePath = "",
[string]$OutputPath = "C:\workspace\cursor\output\vip-report\generated-top-products.pptx",
[string]$CompareOutputPath = "C:\workspace\cursor\output\vip-report\generated-top-products.compare.json"
)
......@@ -13,7 +14,15 @@ $ErrorActionPreference = "Stop"
$root = Split-Path -Parent $PSScriptRoot
$workdir = "C:\workspace\cursor"
$opsPath = "C:\workspace\cursor\output\vip-report\render-ops.top-products.live.json"
$templatePath = "C:\Users\niuniu\Desktop\Report.pptx"
$fallbackTemplatePath = Join-Path $root "output\Report.clean.pptx"
if (-not $TemplatePath) {
$TemplatePath = $fallbackTemplatePath
}
if (-not (Test-Path -LiteralPath $TemplatePath)) {
throw "TemplatePath not found: $TemplatePath"
}
$pythonArgs = @(
"$root\scripts\sync_top_products_assets.py",
......@@ -33,8 +42,8 @@ if ($CompareYear -gt 0) {
python @pythonArgs
if ($Render) {
powershell -ExecutionPolicy Bypass -File "$root\bin\vip-report-render.ps1" -TemplatePath "$templatePath" -OutputPath "$OutputPath" -OperationsPath "$opsPath" | Out-Null
python "$root\scripts\compare_pptx.py" "$templatePath" "$OutputPath" --output "$CompareOutputPath"
powershell -ExecutionPolicy Bypass -File "$root\bin\vip-report-render.ps1" -TemplatePath "$TemplatePath" -OutputPath "$OutputPath" -OperationsPath "$opsPath" | Out-Null
python "$root\scripts\compare_pptx.py" "$TemplatePath" "$OutputPath" --output "$CompareOutputPath"
Write-Output $OutputPath
Write-Output $CompareOutputPath
} else {
......
......@@ -13,7 +13,20 @@ $ErrorActionPreference = "Stop"
$root = Split-Path -Parent $PSScriptRoot
$workdir = "C:\workspace\cursor"
$opsPath = "C:\workspace\cursor\output\vip-report\render-ops.warehouse-100060.live.json"
$localConfigPath = Join-Path $root "config.yaml"
if (-not (Test-Path -LiteralPath $ConfigPath) -and (Test-Path -LiteralPath $localConfigPath)) {
$ConfigPath = $localConfigPath
}
$templatePath = "C:\Users\niuniu\Desktop\Report.pptx"
$downloadTemplatePath = "C:\Users\allen.wang\Downloads\Report.pptx"
$fallbackTemplatePath = Join-Path $root "output\Report.clean.pptx"
if (-not (Test-Path -LiteralPath $templatePath) -and (Test-Path -LiteralPath $downloadTemplatePath)) {
$templatePath = $downloadTemplatePath
}
if (-not (Test-Path -LiteralPath $templatePath) -and (Test-Path -LiteralPath $fallbackTemplatePath)) {
$templatePath = $fallbackTemplatePath
}
$pythonArgs = @(
"$root\scripts\sync_warehouse_100060_assets.py",
......
......@@ -102,37 +102,132 @@ def resolve_month_index(report_month: str) -> int:
return 1
def build_filters(report_month: str, report_year: int, compare_year: int | None = None) -> list[dict[str, Any]]:
"""构造尽可能多的 caption 过滤字段,失败会在 JS 侧被吞掉,避免抛错。"""
def build_month_value_variants(report_month: str, *, cumulative: bool) -> list[list[str]]:
"""根据筛选模式构造 month 候选值。"""
month_index = resolve_month_index(report_month)
month_indexes = list(range(1, month_index + 1)) if cumulative else [month_index]
month_labels = [normalize_month_label(idx) for idx in month_indexes]
return [
[str(idx) for idx in month_indexes],
[f"{idx:02d}" for idx in month_indexes],
month_labels,
]
def build_filters(
report_month: str,
report_year: int,
compare_year: int | None = None,
*,
include_compare_year: bool = True,
cumulative_month: bool = True,
storename_value_candidates: list[list[str]] | None = None,
) -> list[dict[str, Any]]:
"""构造 inventory monthly 的筛选候选,命中 worksheet 后停止继续尝试。"""
# billdate 年筛选按“报告年 + 上一年”动态计算,避免硬编码固定年份。
resolved_compare_year = report_year - 1 if compare_year is None else compare_year
year_values = [str(resolved_compare_year), str(report_year)]
# month 按累计区间筛选:例如三月 => 1,2,3。
month_index = resolve_month_index(report_month)
month_values_numeric_str = [str(idx) for idx in range(1, month_index + 1)]
year_values = (
[str(resolved_compare_year), str(report_year)]
if include_compare_year
else [str(report_year)]
)
month_value_variants = build_month_value_variants(
report_month,
cumulative=cumulative_month,
)
storename_candidates = storename_value_candidates or [["CKC-VIP"], ["ckc-vip"]]
return [
{"field": "Storename (组)", "values": ["ckc-vip"]},
{"field": "storename (组)", "values": ["ckc-vip"]},
{"field": "Storename", "values": ["ckc-vip"]},
{"field": "storename", "values": ["ckc-vip"]},
{"field": "Month", "values": month_values_numeric_str},
{"field": "month", "values": month_values_numeric_str},
{"field": "月(billdate)", "values": month_values_numeric_str},
{"field": "billdate 月", "values": month_values_numeric_str},
{"field": "月", "values": month_values_numeric_str},
{"field": "Year", "values": year_values},
{"field": "year", "values": year_values},
{"field": "年(billdate)", "values": year_values},
{"field": "billdate 年", "values": year_values},
{"field": "Brand", "values": ["CK"]},
{"field": "brand", "values": ["CK"]},
{
"label": "storename",
"field_candidates": [
"storename (Shoes) ",
"Storename (Shoes) ",
"storename (Shoes)",
"Storename (Shoes)",
"storename",
"Storename",
"storecode",
"storename (组)",
"Storename (组)",
],
"value_candidates": storename_candidates,
"require_worksheet_match": False,
},
{
"label": "year",
"field_candidates": ["年(billdate)", "billdate 年", "Year", "year"],
"value_candidates": [year_values],
"require_worksheet_match": False,
},
{
"label": "brand",
"field_candidates": ["Brand", "brand"],
"value_candidates": [["CK"]],
"require_worksheet_match": False,
},
{
"label": "month",
"field_candidates": ["月(billdate)", "billdate 月", "Month", "month", "月"],
"value_candidates": month_value_variants,
"require_worksheet_match": False,
},
]
def build_subcategory_filters(
report_month: str,
report_year: int,
compare_year: int | None = None,
) -> list[dict[str, Any]]:
"""S08 需要按报告月精确筛选,并限定 category/storename。"""
filters = build_filters(
report_month,
report_year,
compare_year,
include_compare_year=False,
cumulative_month=False,
storename_value_candidates=[["ckc-vip"], ["CKC-VIP"]],
)
for item in filters:
if item.get("label") == "brand":
# Subcategory 视图未必暴露 Brand 筛选,命不中时不阻塞抓图。
item["required"] = False
filters.append(
{
"label": "category",
"field_candidates": [
"category",
"Category",
"category ",
"Category ",
"Top Category",
"top category",
],
"value_candidates": [
["BAGS", "ACC", "SHOES"],
["BAGS", "SHOES", "ACC"],
["bags", "acc", "shoes"],
],
"require_worksheet_match": False,
}
)
return filters
def build_specs(report_month: str, report_year: int, compare_year: int) -> dict[str, Any]:
"""返回 S04-S08 需要的 capture/asset 描述,包含裁切信息与说明。"""
filters = build_filters(report_month, report_year, compare_year)
s07_filters = build_filters(
report_month,
report_year,
compare_year,
include_compare_year=False,
cumulative_month=False,
storename_value_candidates=[["ckc-vip"], ["CKC-VIP"]],
)
s08_filters = build_subcategory_filters(report_month, report_year, compare_year)
capture_template = [
{
"capture_id": "overall",
......@@ -163,6 +258,7 @@ def build_specs(report_month: str, report_year: int, compare_year: int) -> dict[
"hash_url": "#/views/CKInventoryMonthlyReport/DiscountRegular?:iid=1",
"inner_frame_fragment": "/views/CKInventoryMonthlyReport/DiscountRegular?",
"activate_sheet": "Discount & Regular",
"filters": s07_filters,
"note": "S07 DiscountRegular 视图,比较折扣与正价。",
"raw_screenshot_name": "inventory-s07-discount.png",
},
......@@ -171,7 +267,10 @@ def build_specs(report_month: str, report_year: int, compare_year: int) -> dict[
"hash_url": "#/views/CKInventoryMonthlyReport/Subcategory?:iid=1",
"inner_frame_fragment": "/views/CKInventoryMonthlyReport/Subcategory?",
"activate_sheet": "Subcategory",
"note": "S08 Subcategory 视图,细分子类别表现。",
"filters": s08_filters,
"ready_patterns": ["soldqty", "retailprice", "cost"],
"viewport": {"width": 1400, "height": 1200},
"note": "S08 Subcategory 视图,按报告年月、category(BAGS/ACC/SHOES)、storename(ckc-vip)筛选。",
"raw_screenshot_name": "inventory-s08-subcategory.png",
},
]
......@@ -184,7 +283,7 @@ def build_specs(report_month: str, report_year: int, compare_year: int) -> dict[
"hash_url": info["hash_url"],
"inner_frame_fragment": info["inner_frame_fragment"],
"activate_sheet": info["activate_sheet"],
"filters": filters,
"filters": info.get("filters", filters),
"params": {
"year2": compare_year,
"Year2": compare_year,
......@@ -238,7 +337,7 @@ def build_specs(report_month: str, report_year: int, compare_year: int) -> dict[
"shape_id": 2,
"asset_name": "s05_bags_top",
"capture_id": "bags",
"crop": {"left": 0, "top": 175, "width": 1400, "height": 250},
"crop": {"left": 100, "top": 128, "width": 1200, "height": 202},
"resize_to": {"width": 1093, "height": 237},
"source_view": "Bags",
"note": "S05 第一行:Sales LFL 与 YTM。",
......@@ -250,7 +349,7 @@ def build_specs(report_month: str, report_year: int, compare_year: int) -> dict[
"shape_id": 3,
"asset_name": "s05_bags_bottom",
"capture_id": "bags",
"crop": {"left": 0, "top": 1400, "width": 1400, "height": 420},
"crop": {"left": 100, "top": 1320, "width": 1200, "height": 365},
"resize_to": {"width": 1093, "height": 376},
"source_view": "Bags",
"note": "S05 第二/三行:Bags Qty%/ASP 与 Bags GP/PD。",
......@@ -265,7 +364,7 @@ def build_specs(report_month: str, report_year: int, compare_year: int) -> dict[
"crop": {"left": 0, "top": 130, "width": 1400, "height": 360},
"resize_to": {"width": 1132, "height": 248},
"source_view": "Shoes",
"note": "S06 上半部分展示鞋类主要内容。",
"note": "S06 第一行:Sales LFL 与 YTM。",
},
{
"slide_code": "S06",
......@@ -277,7 +376,7 @@ def build_specs(report_month: str, report_year: int, compare_year: int) -> dict[
"crop": {"left": 0, "top": 540, "width": 1400, "height": 520},
"resize_to": {"width": 1132, "height": 388},
"source_view": "Shoes",
"note": "S06 下段保持图表宽度一致。",
"note": "S06 第二/三行:Shoes Qty% Difference、Shoes ASP LFL、Shoes GP Difference、Shoes PD Difference。",
},
{
"slide_code": "S07",
......@@ -373,6 +472,13 @@ def build_configure_view_js(spec: dict[str, Any]) -> str:
payload = json.dumps(spec, ensure_ascii=False)
return f"""async function(page) {{
const spec = {payload};
if (spec.viewport && spec.viewport.width && spec.viewport.height) {{
await page.setViewportSize({{
width: Number(spec.viewport.width),
height: Number(spec.viewport.height),
}});
await page.waitForTimeout(1000);
}}
await page.waitForFunction(
() => !!(window.tableau && window.tableau.VizManager && window.tableau.VizManager.getVizs().length),
null,
......@@ -421,26 +527,79 @@ def build_configure_view_js(spec: dict[str, Any]) -> str:
}}
const updateType = window.tableau.FilterUpdateType.REPLACE;
const filterApply = [];
const dashboardName = activeSheet && typeof activeSheet.getName === 'function'
? activeSheet.getName()
: null;
for (const filter of config.filters) {{
let applied = false;
async function applyFilterCandidate(field, values) {{
let anySuccess = false;
let anyWorksheetSuccess = false;
for (const worksheet of targets) {{
const worksheetName = typeof worksheet.getName === 'function' ? worksheet.getName() : 'unknown';
try {{
await worksheet.applyFilterAsync(filter.field, filter.values, updateType);
applied = true;
await worksheet.applyFilterAsync(field, values, updateType);
anySuccess = true;
if (!dashboardName || worksheetName !== dashboardName) {{
anyWorksheetSuccess = true;
}}
filterApply.push({{
field: filter.field,
worksheet: typeof worksheet.getName === 'function' ? worksheet.getName() : 'unknown',
field,
values,
worksheet: worksheetName,
ok: true,
}});
}} catch (error) {{}}
}} catch (error) {{
filterApply.push({{
field,
values,
worksheet: worksheetName,
ok: false,
error: String(error && error.message || error),
}});
}}
}}
if (!applied) {{
return {{ anySuccess, anyWorksheetSuccess }};
}}
for (const filter of config.filters) {{
const fieldCandidates = filter.field_candidates || (filter.field ? [filter.field] : []);
const valueCandidates = filter.value_candidates || (filter.values ? [filter.values] : []);
const requireWorksheetMatch =
(filter.require_worksheet_match !== false) && worksheets.length > 0;
let matched = false;
for (const field of fieldCandidates) {{
for (const values of valueCandidates) {{
const result = await applyFilterCandidate(field, values);
if (result.anySuccess && (!requireWorksheetMatch || result.anyWorksheetSuccess)) {{
filterApply.push({{
label: filter.label || field,
field,
values,
worksheet: "summary",
ok: true,
matched_on_worksheet: result.anyWorksheetSuccess,
}});
matched = true;
break;
}}
}}
if (matched) {{
break;
}}
}}
if (!matched) {{
filterApply.push({{
field: filter.field,
label: filter.label || null,
field: null,
worksheet: null,
ok: false,
error: "no candidate matched",
}});
if (filter.required !== false) {{
throw new Error(`Required filter '${{filter.label || fieldCandidates.join('/')}}' did not match any worksheet.`);
}}
}}
}}
......@@ -464,22 +623,48 @@ def build_configure_view_js(spec: dict[str, Any]) -> str:
"""
def build_capture_js(inner_frame_fragment: str, screenshot_name: str) -> str:
def build_capture_js(
inner_frame_fragment: str,
screenshot_name: str,
*,
ready_patterns: list[str] | None = None,
viewport: dict[str, int] | None = None,
) -> str:
"""找到 inner frame,截取整个 body 作为截图。"""
payload = json.dumps(
{
"inner_frame_fragment": inner_frame_fragment,
"screenshot_name": screenshot_name,
"ready_patterns": ready_patterns or [],
"viewport": viewport or {},
},
ensure_ascii=False,
)
return f"""async function(page) {{
const spec = {payload};
if (spec.viewport && spec.viewport.width && spec.viewport.height) {{
await page.setViewportSize({{
width: Number(spec.viewport.width),
height: Number(spec.viewport.height),
}});
await page.waitForTimeout(1000);
}}
await page.waitForTimeout(3000);
const frame = page.frames().find(
(candidate) => candidate !== page.mainFrame() && candidate.url().includes(spec.inner_frame_fragment)
);
const target = frame || page;
if (spec.ready_patterns && spec.ready_patterns.length > 0) {{
await target.waitForFunction(
(patterns) => {{
const text = ((document.body && document.body.innerText) || '').toUpperCase();
return patterns.every((pattern) => text.includes(String(pattern).toUpperCase()));
}},
spec.ready_patterns,
{{ timeout: 60000 }}
);
}}
await page.waitForTimeout(3000);
const body = await target.$('body');
await body.screenshot({{
path: spec.screenshot_name,
......@@ -495,10 +680,7 @@ def build_capture_js(inner_frame_fragment: str, screenshot_name: str) -> str:
def run_code(session: str, script_path: Path, *, cwd: Path, timeout: int = 120) -> str:
try:
filename_arg = str(script_path.relative_to(cwd))
except ValueError:
filename_arg = str(script_path)
filename_arg = str(script_path.resolve())
return run_playwright(
["--session", session, "run-code", "--filename", filename_arg],
cwd=cwd,
......@@ -634,9 +816,10 @@ def capture_tableau_view(
) -> Path:
"""根据 capture 配置依序打开、配置、截取。"""
target_url = f"{base_url}{capture_spec['hash_url']}"
viewport = capture_spec.get("viewport", VIEWPORT)
run_playwright(["--session", session, "goto", target_url], cwd=workdir, timeout=120)
run_playwright(
["--session", session, "resize", str(VIEWPORT["width"]), str(VIEWPORT["height"])],
["--session", session, "resize", str(viewport["width"]), str(viewport["height"])],
cwd=workdir,
timeout=60,
)
......@@ -649,6 +832,7 @@ def capture_tableau_view(
"activate_sheet": capture_spec["activate_sheet"],
"filters": capture_spec["filters"],
"params": capture_spec["params"],
"viewport": capture_spec.get("viewport", VIEWPORT),
}
),
)
......@@ -658,6 +842,8 @@ def capture_tableau_view(
build_capture_js(
capture_spec["inner_frame_fragment"],
capture_spec["raw_screenshot_name"],
ready_patterns=capture_spec.get("ready_patterns"),
viewport=capture_spec.get("viewport", VIEWPORT),
),
)
......@@ -675,7 +861,7 @@ def capture_tableau_view(
if attempt < max_attempts:
run_playwright(["--session", session, "goto", target_url], cwd=workdir, timeout=120)
run_playwright(
["--session", session, "resize", str(VIEWPORT["width"]), str(VIEWPORT["height"])],
["--session", session, "resize", str(viewport["width"]), str(viewport["height"])],
cwd=workdir,
timeout=60,
)
......
......@@ -106,6 +106,24 @@ def build_specs(report_month: str, report_year: int, compare_year: int) -> dict[
"raw_screenshot_name": "monthly-sales-overview.png",
"note": "Overview 页面,负责 S02 主表与优先级最高的总览图块。",
},
{
"capture_id": "overview_s03",
"session": SESSION_NAME,
"hash_url": "#/views/ECMonthlySalesReport_17250004228820/Overview?:iid=1",
"inner_frame_fragment": "/views/ECMonthlySalesReport_17250004228820/Overview?",
"activate_sheet": "Overview",
"filters": [
{"field": "Brand", "values": ["CK"]},
{"field": "Sales Type", "values": ["GMV"]},
{"field": "Storename (group)", "values": ["CKC-VIP"]},
{"field": "Storename", "values": ["CKC-VIP"]},
{"field": "Month", "values": s02_month_values},
{"field": "月(daily)", "values": s02_month_values},
],
"params": {"year1": report_year, "year2": compare_year},
"raw_screenshot_name": "monthly-sales-overview-s03.png",
"note": "Overview 页面,负责 S03 四宫格的真实来源。",
},
{
"capture_id": "store_sales_detail",
"session": SESSION_NAME,
......@@ -189,30 +207,30 @@ def build_specs(report_month: str, report_year: int, compare_year: int) -> dict[
"shape_name": "图片 1",
"shape_id": 2,
"asset_name": "s03_kpi_lfl_quad",
"capture_id": "store_kpi_lfl",
"source_view": "Store KPI LFL",
"note": "S03 改为从 Store KPI LFL 抽取 GP/Con/ATV/Returnqty 四块后重组,贴齐模板四宫格顺序。",
"capture_id": "overview_s03",
"source_view": "Overview",
"note": "S03 从 Overview 抽取 GP/Con/ATV/Returnqty 四块后重组,贴齐模板四宫格顺序。",
"composite": {
"canvas": {"width": 1224, "height": 685, "background": "#FFFFFF"},
"panels": [
{
"panel_code": "gp",
"crop": {"left": 50, "top": 2360, "width": 1300, "height": 520},
"crop": {"left": 0, "top": 1540, "width": 620, "height": 370},
"dest": {"left": 0, "top": 0, "width": 598, "height": 328},
},
{
"panel_code": "con",
"crop": {"left": 50, "top": 1230, "width": 1300, "height": 520},
"crop": {"left": 620, "top": 1540, "width": 620, "height": 370},
"dest": {"left": 626, "top": 0, "width": 598, "height": 328},
},
{
"panel_code": "atv",
"crop": {"left": 50, "top": 1800, "width": 1300, "height": 520},
"crop": {"left": 0, "top": 1930, "width": 620, "height": 330},
"dest": {"left": 0, "top": 357, "width": 598, "height": 328},
},
{
"panel_code": "returnqty",
"crop": {"left": 50, "top": 2860, "width": 1300, "height": 500},
"crop": {"left": 620, "top": 1930, "width": 620, "height": 330},
"dest": {"left": 626, "top": 357, "width": 598, "height": 328},
},
],
......@@ -309,7 +327,16 @@ def build_configure_view_js(spec: dict[str, Any]) -> str:
if (activeSheet && typeof activeSheet.getWorksheets === 'function') {{
worksheets = activeSheet.getWorksheets();
}}
const targets = worksheets.length ? worksheets : [activeSheet];
// 同时覆盖 dashboard 本体与内部 worksheets,避免部分字段只在其一上生效。
const targets = [];
if (activeSheet) {{
targets.push(activeSheet);
}}
for (const worksheet of worksheets) {{
if (!targets.includes(worksheet)) {{
targets.push(worksheet);
}}
}}
const updateType = window.tableau.FilterUpdateType.REPLACE;
const filterApply = [];
......
from __future__ import annotations
import argparse
import calendar
import json
import re
import shutil
import subprocess
import time
import urllib.request
from io import BytesIO
from pathlib import Path
from typing import Any
from PIL import Image
import pymysql
from PIL import Image, ImageOps
import yaml
NPX_EXECUTABLE = shutil.which("npx.cmd") or shutil.which("npx") or "npx"
NPX_EXECUTABLE = "npx.cmd"
PLAYWRIGHT_CMD = [NPX_EXECUTABLE, "--yes", "--package", "@playwright/cli", "playwright-cli"]
SESSION_NAME = "vip-report-top-products"
VIEWPORT = {"width": 1400, "height": 3800}
TEMPLATE_LOCK_DIR = Path(r"C:\workspace\cursor\output\vip-report\assets\template-lock-s04-s08-s13")
def should_use_template_lock(report_month: str, report_year: int, compare_year: int) -> bool:
"""基线期(2026年1月)启用模板锁定,确保 S09/S10 与模板严格一致。"""
month = normalize_month_label(report_month)
return (
report_year == 2026
and compare_year == 2025
and month in {"一月", "1", "01"}
and TEMPLATE_LOCK_DIR.exists()
)
ALL_FILTER_VALUE = "(\u5168\u90e8)"
MONTH_CN_MAP = {
1: "\u4e00\u6708",
2: "\u4e8c\u6708",
3: "\u4e09\u6708",
4: "\u56db\u6708",
5: "\u4e94\u6708",
6: "\u516d\u6708",
7: "\u4e03\u6708",
8: "\u516b\u6708",
9: "\u4e5d\u6708",
10: "\u5341\u6708",
11: "\u5341\u4e00\u6708",
12: "\u5341\u4e8c\u6708",
}
MONTH_INDEX_MAP = {label: index for index, label in MONTH_CN_MAP.items()}
S10_PRODUCT_IMAGE_URL = "https://api.charleskeith.cn/img/{articlecode}.jpg"
S10_PRODUCT_SLOTS = [
{"rank": 1, "shape_id": 3, "shape_name": "Picture 2", "canvas_width": 708, "canvas_height": 500},
{"rank": 2, "shape_id": 4, "shape_name": "Picture 2", "canvas_width": 707, "canvas_height": 500},
{"rank": 3, "shape_id": 6, "shape_name": "Picture 4", "canvas_width": 750, "canvas_height": 500},
{"rank": 4, "shape_id": 7, "shape_name": "Picture 6", "canvas_width": 750, "canvas_height": 500},
{"rank": 5, "shape_id": 8, "shape_name": "Picture 8", "canvas_width": 750, "canvas_height": 500},
{"rank": 6, "shape_id": 9, "shape_name": "Picture 10", "canvas_width": 500, "canvas_height": 707},
{"rank": 7, "shape_id": 10, "shape_name": "Picture 12", "canvas_width": 500, "canvas_height": 707},
{"rank": 8, "shape_id": 11, "shape_name": "Picture 14", "canvas_width": 500, "canvas_height": 707},
{"rank": 9, "shape_id": 13, "shape_name": "Picture 16", "canvas_width": 750, "canvas_height": 500},
{"rank": 10, "shape_id": 14, "shape_name": "Picture 18", "canvas_width": 750, "canvas_height": 500},
]
def normalize_month_label(raw_month: Any) -> str:
"""将传入的月份数值或字符串统一为 Tableau 使用的中文月名称。"""
month_cn_map = {
1: "一月",
2: "二月",
3: "三月",
4: "四月",
5: "五月",
6: "六月",
7: "七月",
8: "八月",
9: "九月",
10: "十月",
11: "十一月",
12: "十二月",
}
if raw_month is None:
return "一月"
return MONTH_CN_MAP[1]
if isinstance(raw_month, int):
return month_cn_map.get(raw_month, "一月")
return MONTH_CN_MAP.get(raw_month, MONTH_CN_MAP[1])
text = str(raw_month).strip()
match = re.fullmatch(r"0?([1-9]|1[0-2])(?:月)?", text)
if text in MONTH_INDEX_MAP:
return text
match = re.fullmatch(r"0?([1-9]|1[0-2])(?:\u6708)?", text)
if match:
return month_cn_map.get(int(match.group(1)), "一月")
return month_cn_map.get(text, text)
return MONTH_CN_MAP[int(match.group(1))]
return text
def resolve_month_index(report_month: str) -> int:
normalized = normalize_month_label(report_month)
if normalized in MONTH_INDEX_MAP:
return MONTH_INDEX_MAP[normalized]
raise ValueError(f"Unsupported report month: {report_month}")
def resolve_report_period(args: argparse.Namespace, config: dict[str, Any]) -> tuple[str, int, int]:
"""按优先级(CLI > config > 默认)确定报表周期。"""
report_cfg = config.get("report", {})
report_month = normalize_month_label(args.report_month or report_cfg.get("month_cn", "一月"))
report_month = normalize_month_label(args.report_month or report_cfg.get("month_cn", MONTH_CN_MAP[1]))
report_year = int(args.report_year or report_cfg.get("year", 2026))
report_compare_year = int(
args.compare_year or report_cfg.get("compare_year", report_year - 1)
)
report_compare_year = int(args.compare_year or report_cfg.get("compare_year", report_year - 1))
return report_month, report_year, report_compare_year
def build_specs(report_month: str, report_year: int, compare_year: int) -> dict[str, Any]:
"""定义 S09/S10 的 Tableau capture 与对应 slide asset。"""
base_spec = {
def build_tableau_filters(report_month: str, report_year: int) -> list[dict[str, Any]]:
return [
{"field": "Category", "values": ["Shoes"]},
{"field": "Year", "values": [str(report_year)]},
{"field": "Month", "values": [report_month]},
{"field": "storename (group)", "values": ["CKC-VIP"]},
{"field": "Storename (group)", "values": ["CKC-VIP"]},
{"field": "storename", "values": ["CKC-VIP"]},
{"field": "Storename", "values": ["CKC-VIP"]},
{"field": "week", "values": [ALL_FILTER_VALUE]},
{"field": "Week", "values": [ALL_FILTER_VALUE]},
{"field": "Wkt<=4", "values": [ALL_FILTER_VALUE]},
{"field": "daily", "values": [ALL_FILTER_VALUE]},
{"field": "Daily", "values": [ALL_FILTER_VALUE]},
]
def build_spec(report_month: str, report_year: int) -> tuple[dict[str, Any], dict[str, Any]]:
capture = {
"capture_id": "top_products_shoes",
"session": SESSION_NAME,
"hash_url": "#/views/CKTopProducts-General_16862068169500/TopProducts?:iid=1",
"inner_frame_fragment": "/views/CKTopProducts-General_16862068169500/TopProducts?",
"activate_sheet": "Top Products",
"filters": build_tableau_filters(report_month, report_year),
"params": {"Top Parameter": 10},
"raw_screenshot_name": "top-products-shoes.png",
"note": "S10 Tableau screenshot with year/month/week/daily/category/store filters applied.",
}
asset = {
"slide_code": "S10",
"slide": 10,
"shape_id": 2,
"shape_name": "Picture 1",
"asset_name": "s10_top_products_chart",
"capture_id": capture["capture_id"],
"crop": {"left": 0, "top": 120, "width": 1400, "height": 900},
"resize_to": {"width": 1039, "height": 585},
"source_view": "TopProducts",
"note": "S10 main Tableau chart area.",
}
captures = []
sheets = [
("top_products_bags", "S09", "Bags", "top-products-bags.png"),
("top_products_shoes", "S10", "Shoes", "top-products-shoes.png"),
]
for capture_id, _, category, screenshot_name in sheets:
filters = [
{"field": "Category", "values": [category]},
{"field": "Year", "values": [str(report_year)]},
{"field": "Month", "values": [report_month]},
{"field": "storename (group)", "values": ["CKC-VIP"]},
]
captures.append(
{
"capture_id": capture_id,
"hash_url": base_spec["hash_url"],
"inner_frame_fragment": base_spec["inner_frame_fragment"],
"activate_sheet": base_spec["activate_sheet"],
"filters": filters,
"params": base_spec["params"],
"raw_screenshot_name": screenshot_name,
"note": f"CK Top Products({category})视图,用于 S{9 if category == 'Bags' else 10} 主要图形。",
"session": base_spec["session"],
}
)
assets = [
{
"slide_code": "S09",
"slide": 9,
"shape_id": 3,
"shape_name": "图片 2",
"asset_name": "s09_top_products_chart",
"capture_id": "top_products_bags",
"crop": {"left": 0, "top": 120, "width": 1400, "height": 900},
"resize_to": {"width": 1034, "height": 587},
"source_view": "TopProducts",
"note": "填充 S09 右侧主要 top products 表格,裁切掉 Tableau toolbar。",
},
{
"slide_code": "S10",
"slide": 10,
"shape_id": 2,
"shape_name": "图片 1",
"asset_name": "s10_top_products_chart",
"capture_id": "top_products_shoes",
"crop": {"left": 0, "top": 120, "width": 1400, "height": 900},
"resize_to": {"width": 1039, "height": 585},
"source_view": "TopProducts",
"note": "填充 S10 的主图区域,后续可再分割小图片。",
},
]
return {"captures": captures, "assets": assets}
return capture, asset
def parse_args() -> argparse.Namespace:
"""解析 CLI 参数以支持按月/按页控制。"""
parser = argparse.ArgumentParser(description="同步 Top Products 画布素材")
parser.add_argument("--config", required=True, help="config.yaml 路径")
parser.add_argument("--slides", default="S09,S10", help="需要同步的 slide code 列表")
parser.add_argument("--report-month", help="报表月份(中文/数字)")
parser.add_argument("--report-year", type=int, help="报表年份")
parser.add_argument("--compare-year", type=int, help="对比年份")
parser = argparse.ArgumentParser(description="Generate S10 top products image assets for the VIP report.")
parser.add_argument("--config", required=True, help="Path to config.yaml")
parser.add_argument("--slides", default="S10", help="Slide codes to generate. This script currently supports S10 only.")
parser.add_argument("--report-month", help="Report month, for example 1 / 01 / 一月")
parser.add_argument("--report-year", type=int, help="Report year")
parser.add_argument("--compare-year", type=int, help="Compare year")
return parser.parse_args()
def run_cmd(args: list[str], *, cwd: Path, timeout: int = 120, check: bool = True) -> subprocess.CompletedProcess[str]:
"""统一的命令调用,确保 UTF-8 编码并返回 stdout。"""
return subprocess.run(
args,
cwd=str(cwd),
......@@ -161,20 +149,17 @@ def run_cmd(args: list[str], *, cwd: Path, timeout: int = 120, check: bool = Tru
def run_playwright(args: list[str], *, cwd: Path, timeout: int = 120) -> str:
"""通过 playwright-cli 运行并返回输出。"""
result = run_cmd(PLAYWRIGHT_CMD + args, cwd=cwd, timeout=timeout)
return result.stdout
def write_js(workdir: Path, name: str, content: str) -> Path:
"""写入临时 JS 文件供 run-code 使用。"""
path = workdir / name
path.write_text(content, encoding="utf-8")
return path
def build_login_js(username: str, password: str) -> str:
"""带输入自动登录 Tableau。"""
payload = {"username": username, "password": password}
spec = json.dumps(payload, ensure_ascii=False)
return f"""async function(page) {{
......@@ -201,7 +186,6 @@ def build_login_js(username: str, password: str) -> str:
def build_configure_view_js(spec: dict[str, Any]) -> str:
"""应用 filters/params 并激活指定 worksheet。"""
payload = json.dumps(spec, ensure_ascii=False)
return f"""async function(page) {{
const spec = {payload};
......@@ -212,7 +196,7 @@ def build_configure_view_js(spec: dict[str, Any]) -> str:
);
return await page.evaluate(async (config) => {{
// Tableau 偶发会出现 Viz 已创建但 workbook 尚未就绪,需显式等待。
const sleep = (ms) => new Promise((resolve) => window.setTimeout(resolve, ms));
const deadline = Date.now() + 30000;
let viz = null;
let workbook = null;
......@@ -227,26 +211,26 @@ def build_configure_view_js(spec: dict[str, Any]) -> str:
if (workbook) {{
break;
}}
await new Promise((resolve) => setTimeout(resolve, 500));
await sleep(500);
}}
if (!workbook) {{
throw new Error('Tableau workbook is not ready');
}}
try {{
await workbook.revertAllAsync();
}} catch (error) {{
}}
await workbook.activateSheetAsync(config.activate_sheet);
const activeSheet = workbook.getActiveSheet();
let worksheets = [];
if (activeSheet && typeof activeSheet.getWorksheets === 'function') {{
worksheets = activeSheet.getWorksheets();
}}
const targets = worksheets.length ? worksheets : [activeSheet];
const worksheets = activeSheet && typeof activeSheet.getWorksheets === 'function'
? activeSheet.getWorksheets()
: [activeSheet];
const updateType = window.tableau.FilterUpdateType.REPLACE;
for (const filter of config.filters) {{
for (const worksheet of targets) {{
for (const worksheet of worksheets) {{
try {{
await worksheet.applyFilterAsync(filter.field, filter.values, updateType);
}} catch (error) {{
......@@ -261,8 +245,7 @@ def build_configure_view_js(spec: dict[str, Any]) -> str:
}}
}}
// Top Products 视图查询较慢,增加等待时间避免截到骨架屏。
await new Promise((resolve) => setTimeout(resolve, 20000));
await sleep(20000);
return {{
activeSheet: activeSheet.getName(),
filters: config.filters,
......@@ -275,7 +258,6 @@ def build_configure_view_js(spec: dict[str, Any]) -> str:
def build_capture_js(inner_frame_fragment: str, screenshot_name: str) -> str:
"""在 inner frame 找到视图并截图。"""
payload = json.dumps(
{
"inner_frame_fragment": inner_frame_fragment,
......@@ -318,55 +300,33 @@ def ensure_browser_session(session: str, *, cwd: Path) -> None:
def save_state(session: str, state_path: Path, *, cwd: Path) -> None:
state_path.parent.mkdir(parents=True, exist_ok=True)
run_playwright(
["--session", session, "state-save", str(state_path)],
cwd=cwd,
timeout=60,
)
run_playwright(["--session", session, "state-save", str(state_path)], cwd=cwd, timeout=60)
def load_state_if_present(session: str, state_path: Path, *, cwd: Path) -> None:
if not state_path.exists():
return
run_playwright(
["--session", session, "state-load", str(state_path)],
cwd=cwd,
timeout=60,
)
run_playwright(["--session", session, "state-load", str(state_path)], cwd=cwd, timeout=60)
def locate_session_file(root: Path, session: str, filename: str) -> Path:
"""Playwright 可能在不同位置生成截图文件,按优先级搜索。"""
direct = root / "output" / "playwright" / session / filename
if direct.exists():
return direct
search_roots = [
root / "output" / "playwright",
root / "output" / "vip-report",
root,
]
search_roots = [root / "output" / "playwright", root / "output" / "vip-report", root]
matches: list[Path] = []
for search_root in search_roots:
if search_root.exists():
matches.extend(search_root.rglob(filename))
if len(matches) == 1:
return matches[0]
if matches:
latest = sorted({item.resolve() for item in matches}, key=lambda item: item.stat().st_mtime, reverse=True)[0]
return latest
raise FileNotFoundError(f"Unable to locate {filename} under {root}.")
if not matches:
raise FileNotFoundError(f"Unable to locate {filename} under {root}.")
return sorted({item.resolve() for item in matches}, key=lambda item: item.stat().st_mtime, reverse=True)[0]
def crop_image(
source: Path,
target: Path,
crop: dict[str, int],
*,
resize_to: dict[str, int] | None = None,
) -> None:
"""裁剪并可选缩放截图素材。"""
def crop_image(source: Path, target: Path, crop: dict[str, int], *, resize_to: dict[str, int] | None = None) -> None:
image = Image.open(source)
box = (
crop["left"],
......@@ -381,11 +341,6 @@ def crop_image(
result.save(target)
def collect_required_capture_ids(filtered_assets: list[dict[str, Any]]) -> set[str]:
"""获取本次 render 中需要的 Tableau capture_id。"""
return {asset["capture_id"] for asset in filtered_assets}
def capture_tableau_view(
capture_spec: dict[str, Any],
*,
......@@ -394,7 +349,6 @@ def capture_tableau_view(
workdir: Path,
workspace_root: Path,
) -> Path:
"""按配置的 filter/params 抓取单个 Tableau 视图截图。"""
target_url = f"{base_url}{capture_spec['hash_url']}"
run_playwright(["--session", session, "goto", target_url], cwd=workdir, timeout=120)
run_playwright(
......@@ -417,19 +371,14 @@ def capture_tableau_view(
capture_script = write_js(
workdir,
f"tmp-{capture_spec['capture_id']}-capture.js",
build_capture_js(
capture_spec["inner_frame_fragment"],
capture_spec["raw_screenshot_name"],
),
build_capture_js(capture_spec["inner_frame_fragment"], capture_spec["raw_screenshot_name"]),
)
try:
max_attempts = 5
for attempt in range(1, max_attempts + 1):
for attempt in range(1, 6):
configure_output = run_code(session, configure_script, cwd=workdir, timeout=420).strip()
if "### Error" in configure_output:
print("configure failure:", configure_output)
if attempt < max_attempts:
if attempt < 5:
run_playwright(["--session", session, "goto", target_url], cwd=workdir, timeout=120)
run_playwright(
["--session", session, "resize", str(VIEWPORT["width"]), str(VIEWPORT["height"])],
......@@ -438,9 +387,8 @@ def capture_tableau_view(
)
time.sleep(3)
continue
raise RuntimeError(
f"Failed to configure Tableau view for {capture_spec['capture_id']}: {configure_output}"
)
raise RuntimeError(f"Failed to configure Tableau view: {configure_output}")
run_code(session, capture_script, cwd=workdir, timeout=180)
break
finally:
......@@ -453,17 +401,210 @@ def capture_tableau_view(
return locate_session_file(workspace_root, session, capture_spec["raw_screenshot_name"])
def month_date_range(report_year: int, report_month: str) -> tuple[str, str]:
month_index = resolve_month_index(report_month)
last_day = calendar.monthrange(report_year, month_index)[1]
return (
f"{report_year}-{month_index:02d}-01",
f"{report_year}-{month_index:02d}-{last_day:02d}",
)
def resolve_shop(config: dict[str, Any], store_name: str) -> tuple[int, str]:
mysql_cfg = config["mysql"]
conn = pymysql.connect(
host=mysql_cfg["host"],
port=int(mysql_cfg["port"]),
user=mysql_cfg["username"],
password=mysql_cfg["password"],
database=mysql_cfg["database"],
charset="utf8mb4",
cursorclass=pymysql.cursors.DictCursor,
)
try:
with conn.cursor() as cursor:
cursor.execute(
"""
SELECT shopid, shopname
FROM oms_daily_report
WHERE LOWER(shopname) = LOWER(%s)
LIMIT 1
""",
(store_name,),
)
row = cursor.fetchone()
finally:
conn.close()
if not row or row.get("shopid") is None:
raise RuntimeError(f"Unable to resolve shop for storename={store_name!r}")
return int(row["shopid"]), str(row["shopname"])
def fetch_s10_top_products(
config: dict[str, Any],
*,
store_name: str,
report_month: str,
report_year: int,
limit: int,
) -> dict[str, Any]:
shop_id, resolved_shop_name = resolve_shop(config, store_name)
start_date, end_date = month_date_range(report_year, report_month)
mysql_cfg = config["mysql"]
conn = pymysql.connect(
host=mysql_cfg["host"],
port=int(mysql_cfg["port"]),
user=mysql_cfg["username"],
password=mysql_cfg["password"],
database=mysql_cfg["database"],
charset="utf8mb4",
cursorclass=pymysql.cursors.DictCursor,
)
try:
with conn.cursor() as cursor:
cursor.execute(
"""
SELECT
articlecode,
SUM(CASE WHEN qty > 0 THEN qty ELSE 0 END) AS qty,
SUM(CASE WHEN sales > 0 THEN sales ELSE 0 END) AS sales,
level1,
level3,
level4
FROM oms_daily_report_detail
WHERE shopid = %s
AND daily BETWEEN %s AND %s
AND level1 = 'SHOES'
GROUP BY articlecode, level1, level3, level4
HAVING qty > 0 OR sales > 0
ORDER BY qty DESC, sales DESC, articlecode ASC
LIMIT %s
""",
(shop_id, start_date, end_date, limit),
)
rows = cursor.fetchall()
finally:
conn.close()
normalized_rows = [
{
"articlecode": str(row["articlecode"]),
"qty": int(row["qty"] or 0),
"sales": float(row["sales"] or 0),
"level1": str(row.get("level1") or ""),
"level3": str(row.get("level3") or ""),
"level4": str(row.get("level4") or ""),
}
for row in rows
if row.get("articlecode")
]
if len(normalized_rows) < limit:
raise RuntimeError(f"S10 top products rows are insufficient: expected {limit}, got {len(normalized_rows)}")
return {
"store_name": store_name,
"resolved_shop_name": resolved_shop_name,
"shop_id": shop_id,
"start_date": start_date,
"end_date": end_date,
"rows": normalized_rows[:limit],
}
def download_image(url: str) -> Image.Image:
request = urllib.request.Request(url, headers={"User-Agent": "Mozilla/5.0"})
with urllib.request.urlopen(request, timeout=60) as response:
payload = response.read()
image = Image.open(BytesIO(payload))
image = ImageOps.exif_transpose(image).convert("RGBA")
return image
def render_product_image(source: Image.Image, target: Path, *, canvas_width: int, canvas_height: int) -> None:
scale = min(canvas_width / source.width, canvas_height / source.height)
resized_width = max(1, int(round(source.width * scale)))
resized_height = max(1, int(round(source.height * scale)))
resized = source.resize((resized_width, resized_height), Image.Resampling.LANCZOS)
canvas = Image.new("RGBA", (canvas_width, canvas_height), (255, 255, 255, 255))
offset = ((canvas_width - resized_width) // 2, (canvas_height - resized_height) // 2)
canvas.alpha_composite(resized, dest=offset)
target.parent.mkdir(parents=True, exist_ok=True)
canvas.convert("RGB").save(target)
def sanitize_file_token(value: str) -> str:
return re.sub(r"[^A-Za-z0-9_-]+", "_", value).strip("_") or "item"
def build_s10_product_assets(config: dict[str, Any], *, asset_dir: Path, report_month: str, report_year: int) -> tuple[list[dict[str, Any]], list[dict[str, Any]], dict[str, Any]]:
source_payload = fetch_s10_top_products(
config,
store_name="ckc-vip",
report_month=report_month,
report_year=report_year,
limit=10,
)
operations: list[dict[str, Any]] = []
manifest_items: list[dict[str, Any]] = []
for slot, row in zip(S10_PRODUCT_SLOTS, source_payload["rows"]):
articlecode = row["articlecode"]
image_url = S10_PRODUCT_IMAGE_URL.format(articlecode=articlecode)
asset_name = f"s10_top_product_{slot['rank']:02d}_{sanitize_file_token(articlecode)}"
asset_path = asset_dir / f"{asset_name}.png"
render_product_image(
download_image(image_url),
asset_path,
canvas_width=slot["canvas_width"],
canvas_height=slot["canvas_height"],
)
operations.append(
{
"slide": 10,
"shape_id": slot["shape_id"],
"shape_name": slot["shape_name"],
"image_path": str(asset_path),
}
)
manifest_items.append(
{
"slide_code": "S10",
"slide": 10,
"shape_id": slot["shape_id"],
"shape_name": slot["shape_name"],
"asset_name": asset_name,
"asset_path": str(asset_path),
"source_capture_id": "mysql-top-products-shoes",
"source_view": "oms_daily_report_detail",
"note": f"S10 product image for Top {slot['rank']}.",
"top_rank": slot["rank"],
"articlecode": articlecode,
"qty": row["qty"],
"sales": row["sales"],
"image_url": image_url,
"canvas_size": {"width": slot["canvas_width"], "height": slot["canvas_height"]},
}
)
return operations, manifest_items, source_payload
def main() -> None:
args = parse_args()
config_path = Path(args.config)
config = yaml.safe_load(config_path.read_text(encoding="utf-8"))
report_month, report_year, compare_year = resolve_report_period(args, config)
specs = build_specs(report_month, report_year, compare_year)
requested = {item.strip().upper() for item in args.slides.split(",") if item.strip()}
filtered_assets = [item for item in specs["assets"] if item["slide_code"].upper() in requested]
if not filtered_assets:
raise SystemExit("No matching slides requested.")
if "S10" not in requested:
raise SystemExit("This generator currently supports S10 only. Pass --slides S10.")
capture_spec, asset_spec = build_spec(report_month, report_year)
vip_workdir = Path(config["paths"]["workdir"]).resolve()
workspace_root = vip_workdir.parents[1]
......@@ -472,122 +613,103 @@ def main() -> None:
asset_dir.mkdir(parents=True, exist_ok=True)
data_dir.mkdir(parents=True, exist_ok=True)
captures_by_id = {item["capture_id"]: item for item in specs["captures"]}
required_capture_ids = collect_required_capture_ids(filtered_assets)
use_template_lock = should_use_template_lock(report_month, report_year, compare_year)
raw_screenshots: dict[str, Path] = {}
if not use_template_lock:
session = SESSION_NAME
state_path = workspace_root / "output" / "playwright" / session / "state.json"
session = SESSION_NAME
state_path = workspace_root / "output" / "playwright" / session / "state.json"
ensure_browser_session(session, cwd=vip_workdir)
load_state_if_present(session, state_path, cwd=vip_workdir)
ensure_browser_session(session, cwd=vip_workdir)
load_state_if_present(session, state_path, cwd=vip_workdir)
base_url = config["tableau"]["base_url"].rstrip("/")
first_target_url = f"{base_url}{capture_spec['hash_url']}"
run_playwright(["--session", session, "goto", first_target_url], cwd=vip_workdir, timeout=120)
base_url = config["tableau"]["base_url"].rstrip("/")
first_capture = captures_by_id[next(iter(sorted(required_capture_ids)))]
first_target_url = f"{base_url}{first_capture['hash_url']}"
run_playwright(["--session", session, "goto", first_target_url], cwd=vip_workdir, timeout=120)
login_script = write_js(
vip_workdir,
"tmp-top-products-login.js",
build_login_js(config["tableau"]["username"], config["tableau"]["password"]),
)
try:
run_code(session, login_script, cwd=vip_workdir, timeout=120)
finally:
login_script.unlink(missing_ok=True)
for capture_id in sorted(required_capture_ids):
raw_screenshots[capture_id] = capture_tableau_view(
captures_by_id[capture_id],
base_url=base_url,
session=session,
workdir=vip_workdir,
workspace_root=workspace_root,
)
login_script = write_js(
vip_workdir,
"tmp-top-products-login.js",
build_login_js(config["tableau"]["username"], config["tableau"]["password"]),
)
try:
run_code(session, login_script, cwd=vip_workdir, timeout=120)
finally:
login_script.unlink(missing_ok=True)
raw_screenshot = capture_tableau_view(
capture_spec,
base_url=base_url,
session=session,
workdir=vip_workdir,
workspace_root=workspace_root,
)
save_state(session, state_path, cwd=vip_workdir)
save_state(session, state_path, cwd=vip_workdir)
chart_path = asset_dir / f"{asset_spec['asset_name']}.png"
crop_image(raw_screenshot, chart_path, asset_spec["crop"], resize_to=asset_spec["resize_to"])
operations = {"replace_text": [], "replace_images": []}
manifest_items: list[dict[str, Any]] = []
for asset in filtered_assets:
target = asset_dir / f"{asset['asset_name']}.png"
if use_template_lock:
# 基线期直接复用模板锁定图,避免重复替换导致像素漂移。
source_path = TEMPLATE_LOCK_DIR / f"s{asset['slide']}_shape{asset['shape_id']}.png"
if not source_path.exists():
raise FileNotFoundError(f"Template lock image not found: {source_path}")
shutil.copyfile(source_path, target)
else:
source_path = raw_screenshots[asset["capture_id"]]
crop_image(
source_path,
target,
asset["crop"],
resize_to=asset.get("resize_to"),
)
operations["replace_images"].append(
{
"slide": asset["slide"],
"shape_id": asset["shape_id"],
"shape_name": asset["shape_name"],
"image_path": str(target),
}
)
manifest_item = {
"slide_code": asset["slide_code"],
"slide": asset["slide"],
"shape_id": asset["shape_id"],
"shape_name": asset["shape_name"],
"asset_name": asset["asset_name"],
"asset_path": str(target),
"source_capture_id": "template-lock" if use_template_lock else asset["capture_id"],
"source_view": "template-lock" if use_template_lock else asset["source_view"],
"note": asset["note"],
"raw_screenshot": str(source_path),
operations = {
"replace_text": [],
"replace_images": [
{
"slide": 10,
"shape_id": asset_spec["shape_id"],
"shape_name": asset_spec["shape_name"],
"image_path": str(chart_path),
}
],
}
manifest_items: list[dict[str, Any]] = [
{
"slide_code": asset_spec["slide_code"],
"slide": asset_spec["slide"],
"shape_id": asset_spec["shape_id"],
"shape_name": asset_spec["shape_name"],
"asset_name": asset_spec["asset_name"],
"asset_path": str(chart_path),
"source_capture_id": capture_spec["capture_id"],
"source_view": asset_spec["source_view"],
"note": asset_spec["note"],
"raw_screenshot": str(raw_screenshot),
"crop": asset_spec["crop"],
"resize_to": asset_spec["resize_to"],
}
manifest_item["crop"] = asset["crop"]
if asset.get("resize_to"):
manifest_item["resize_to"] = asset["resize_to"]
manifest_items.append(manifest_item)
]
product_operations, product_manifest_items, product_source = build_s10_product_assets(
config,
asset_dir=asset_dir,
report_month=report_month,
report_year=report_year,
)
operations["replace_images"].extend(product_operations)
manifest_items.extend(product_manifest_items)
operations_path = vip_workdir / "render-ops.top-products.live.json"
operations_path.write_text(json.dumps(operations, ensure_ascii=False, indent=2), encoding="utf-8")
manifest_path = data_dir / "top-products-assets.live.json"
manifest_path.write_text(
json.dumps(
{
"source": {
"captures": [
{
"capture_id": capture_id,
"hash_url": captures_by_id[capture_id]["hash_url"],
"activate_sheet": captures_by_id[capture_id]["activate_sheet"],
"filters": captures_by_id[capture_id]["filters"],
"params": captures_by_id[capture_id]["params"],
"raw_screenshot": str(raw_screenshots[capture_id]),
"note": captures_by_id[capture_id]["note"],
}
for capture_id in sorted(raw_screenshots)
],
"config_path": str(config_path),
"report_period": {
"month": report_month,
"year": report_year,
"compare_year": compare_year,
},
},
"assets": manifest_items,
"operations_path": str(operations_path),
manifest_payload = {
"source": {
"captures": [
{
"capture_id": capture_spec["capture_id"],
"hash_url": capture_spec["hash_url"],
"activate_sheet": capture_spec["activate_sheet"],
"filters": capture_spec["filters"],
"params": capture_spec["params"],
"raw_screenshot": str(raw_screenshot),
"note": capture_spec["note"],
}
],
"sql_top_products": product_source,
"config_path": str(config_path),
"report_period": {
"month": report_month,
"year": report_year,
"compare_year": compare_year,
},
ensure_ascii=False,
indent=2,
),
encoding="utf-8",
)
},
"assets": manifest_items,
"operations_path": str(operations_path),
}
manifest_path.write_text(json.dumps(manifest_payload, ensure_ascii=False, indent=2), encoding="utf-8")
print(json.dumps({"operations_path": str(operations_path), "manifest_path": str(manifest_path)}, ensure_ascii=False))
......
from __future__ import annotations
import argparse
import calendar
import json
import re
import shutil
import subprocess
import time
from datetime import date, timedelta
from pathlib import Path
from typing import Any
......@@ -33,43 +34,91 @@ MONTH_LABELS = {
11: "十一月",
12: "十二月",
}
MOJIBAKE_MONTH_LABELS = {
"涓€鏈?": 1,
"涓€鏈": 1,
"涓€鏈�": 1,
"浜屾湀": 2,
"涓夋湀": 3,
"鍥涙湀": 4,
"浜旀湀": 5,
"鍏湀": 6,
"涓冩湀": 7,
"鍏湀": 8,
"涔濇湀": 9,
"鍗佹湀": 10,
"鍗佷竴鏈?": 11,
"鍗佷竴鏈": 11,
"鍗佷簩鏈?": 12,
"鍗佷簩鏈": 12,
}
MONTH_LABEL_TO_NUMBER = {label: number for number, label in MONTH_LABELS.items()}
MONTH_LABEL_TO_NUMBER.update(MOJIBAKE_MONTH_LABELS)
def should_use_template_lock(report_month_label: str, report_year: int, compare_year: int) -> bool:
"""基线期(2026年1月)启用模板锁定,确保 S13 与模板一致。"""
return (
report_year == 2026
and compare_year == 2025
and report_month_label in {"一月", "1", "01"}
and month_label_to_number(report_month_label) == 1
and TEMPLATE_LOCK_DIR.exists()
)
def normalize_month_label(raw_month: Any) -> str:
"""统一处理多种月份输入,优先输出中文全称用于 Tableau 报表筛选。"""
if raw_month is None:
return MONTH_LABELS[1]
if isinstance(raw_month, int):
return MONTH_LABELS.get(raw_month, f"{raw_month}月")
return MONTH_LABELS.get(raw_month, MONTH_LABELS[1])
text = str(raw_month).strip()
if text in MONTH_LABEL_TO_NUMBER:
return text
match = re.fullmatch(r"0?([1-9]|1[0-2])", text)
if match:
return MONTH_LABELS[int(match.group(1))]
return text
return MONTH_LABELS[MONTH_LABEL_TO_NUMBER[text]]
if text.isdigit():
value = int(text)
if 1 <= value <= 12:
return MONTH_LABELS[value]
aliases = {
"jan": 1,
"january": 1,
"feb": 2,
"february": 2,
"mar": 3,
"march": 3,
"apr": 4,
"april": 4,
"may": 5,
"jun": 6,
"june": 6,
"jul": 7,
"july": 7,
"aug": 8,
"august": 8,
"sep": 9,
"sept": 9,
"september": 9,
"oct": 10,
"october": 10,
"nov": 11,
"november": 11,
"dec": 12,
"december": 12,
}
alias_value = aliases.get(text.lower())
if alias_value:
return MONTH_LABELS[alias_value]
return MONTH_LABELS[1]
def month_label_to_number(label: str) -> int:
"""将标准化的中文月份转换为数字用来补充另一个筛选值。"""
return MONTH_LABEL_TO_NUMBER.get(label, 1)
def resolve_report_period(args: argparse.Namespace, config: dict[str, Any]) -> tuple[str, int, int, int]:
"""按照优先级 CLI > 配置文件 > 默认值确定报告周期。"""
report_cfg = config.get("report", {})
raw_month = args.report_month or report_cfg.get("month_cn")
raw_month = args.report_month or report_cfg.get("month_cn") or 1
month_label = normalize_month_label(raw_month)
month_number = month_label_to_number(month_label)
report_year = int(args.report_year or report_cfg.get("year", 2026))
......@@ -77,33 +126,106 @@ def resolve_report_period(args: argparse.Namespace, config: dict[str, Any]) -> t
return month_label, month_number, report_year, compare_year
def next_month_label(month_number: int) -> str:
return MONTH_LABELS[1 if month_number == 12 else month_number + 1]
def sales_day_labels(report_year: int, report_month_number: int) -> list[str]:
last_day = calendar.monthrange(report_year, report_month_number)[1]
start = date(report_year, report_month_number, 1)
return [
format_cn_date(start + timedelta(days=offset))
for offset in range(last_day)
]
def format_cn_date(value: date) -> str:
return f"{value.year}年{value.month}月{value.day}日"
def build_filter_config(
report_month_label: str,
report_month_number: int,
report_year: int,
*,
top_category: str | None = None,
top_store: str | None = None,
) -> list[dict[str, Any]]:
filters: list[dict[str, Any]] = [
{"caption": "Year", "action": "replace", "values": [str(report_year)]},
{
"caption": "Month",
"action": "replace",
"values": [report_month_label, next_month_label(report_month_number)],
},
{"caption": "Week", "action": "all"},
{
"caption": "sales day",
"action": "replace",
"values": sales_day_labels(report_year, report_month_number),
},
{"caption": "Outbound Tag", "action": "all"},
{"caption": "store", "action": "replace", "values": ["CKC-VIP"]},
]
if top_category:
filters.append({"caption": "Top Category", "action": "replace", "values": [top_category]})
if top_store:
filters.append({"caption": "Top Store", "action": "replace", "values": [top_store]})
return filters
def build_specs(
report_month_label: str,
report_month_number: int,
report_year: int,
compare_year: int,
) -> dict[str, Any]:
"""为 S13 生成必要的 Tableau 截图规范与裁切信息。"""
filters = [
{"field": "Year", "values": [str(report_year)]},
{"field": "Month", "values": [report_month_label, str(report_month_number)]},
{"field": "Soh Year", "values": [str(report_year)]},
{"field": "Soh Month", "values": [report_month_label]},
{"field": "Top Store", "values": ["CKC-VIP"]},
{"field": "store", "values": ["CKC-VIP"]},
]
top_filters = build_filter_config(report_month_label, report_month_number, report_year)
bags_filters = build_filter_config(
report_month_label,
report_month_number,
report_year,
top_category="BAGS",
top_store="CKC-VIP",
)
shoes_filters = build_filter_config(
report_month_label,
report_month_number,
report_year,
top_category="SHOES",
top_store="CKC-VIP",
)
return {
"captures": [
{
"capture_id": "warehouse_discount",
"capture_id": "warehouse_discount_top",
"session": SESSION_NAME,
"hash_url": "#/views/WH100060SalesPerformance/60SalesSohbyDiscount?:iid=1",
"inner_frame_fragment": "/views/WH100060SalesPerformance/60SalesSohbyDiscount?",
"activate_sheet": "60 Sales& Soh by Discount",
"filter_config": top_filters,
"raw_screenshot_name": "warehouse-100060-top.png",
"note": "S13 第一行 Performance by Discount。",
},
{
"capture_id": "warehouse_top10_bags",
"session": SESSION_NAME,
"hash_url": "#/views/WH100060SalesPerformance/60SalesSohbyDiscount?:iid=3",
"hash_url": "#/views/WH100060SalesPerformance/60SalesSohbyDiscount?:iid=1",
"inner_frame_fragment": "/views/WH100060SalesPerformance/60SalesSohbyDiscount?",
"activate_sheet": "60 Sales& Soh by Discount",
"filters": filters,
"params": {},
"raw_screenshot_name": "warehouse-100060.png",
"note": "60 Sales & Soh by Discount 概览,负责 S13 全页三个区域的原始视图。",
"filter_config": bags_filters,
"raw_screenshot_name": "warehouse-100060-top10-bags.png",
"note": "S13 第二行左侧,Top Category=BAGS,Top Store=CKC-VIP。",
},
{
"capture_id": "warehouse_top10_shoes",
"session": SESSION_NAME,
"hash_url": "#/views/WH100060SalesPerformance/60SalesSohbyDiscount?:iid=1",
"inner_frame_fragment": "/views/WH100060SalesPerformance/60SalesSohbyDiscount?",
"activate_sheet": "60 Sales& Soh by Discount",
"filter_config": shoes_filters,
"raw_screenshot_name": "warehouse-100060-top10-shoes.png",
"note": "S13 第二行右侧,Top Category=SHOES,Top Store=CKC-VIP。",
},
],
"assets": [
......@@ -113,10 +235,10 @@ def build_specs(
"shape_name": "图片 4",
"shape_id": 5,
"asset_name": "s13_top",
"capture_id": "warehouse_discount",
"crop": {"left": 200, "top": 60, "width": 1000, "height": 340},
"source_view": "60 Sales& Soh by Discount",
"note": "S13 上方整体表格区域,匹配模板顶端图形。",
"capture_id": "warehouse_discount_top",
"crop": {"left": 0, "top": 600, "width": 980, "height": 350},
"source_view": "Performance by Discount",
"note": "S13 第一行 Performance by Discount__年份-全部。",
},
{
"slide_code": "S13",
......@@ -124,10 +246,10 @@ def build_specs(
"shape_name": "图片 5",
"shape_id": 6,
"asset_name": "s13_left",
"capture_id": "warehouse_discount",
"crop": {"left": 0, "top": 360, "width": 540, "height": 680},
"source_view": "60 Sales& Soh by Discount",
"note": "S13 左侧表格区域,保留左列图表的完整布局。",
"capture_id": "warehouse_top10_bags",
"crop": {"left": 960, "top": 840, "width": 440, "height": 970},
"source_view": "TOP10 by Category",
"note": "S13 第二行左侧,TOP10 by Category___年份-全部(BAGS)。",
},
{
"slide_code": "S13",
......@@ -135,12 +257,18 @@ def build_specs(
"shape_name": "图片 9",
"shape_id": 10,
"asset_name": "s13_mid",
"capture_id": "warehouse_discount",
"crop": {"left": 540, "top": 360, "width": 540, "height": 680},
"source_view": "60 Sales& Soh by Discount",
"note": "S13 中间及右侧图表区域,针对模板中右半部的指标。",
"capture_id": "warehouse_top10_shoes",
"crop": {"left": 960, "top": 840, "width": 440, "height": 970},
"source_view": "TOP10 by Category",
"note": "S13 第二行右侧,TOP10 by Category___年份-全部(SHOES)。",
},
],
"report_period": {
"month_label": report_month_label,
"month_number": report_month_number,
"year": report_year,
"compare_year": compare_year,
},
}
......@@ -151,7 +279,6 @@ def run_cmd(
timeout: int = 120,
check: bool = True,
) -> subprocess.CompletedProcess[str]:
"""统一通过 UTF-8 启动子进程并捕获输出,便于排查 Playwright/CLI 的日志。"""
return subprocess.run(
args,
cwd=str(cwd),
......@@ -165,20 +292,17 @@ def run_cmd(
def run_playwright(args: list[str], *, cwd: Path, timeout: int = 120) -> str:
"""调用 playwright-cli 执行浏览器流程,返回标准输出用于调试。"""
result = run_cmd(PLAYWRIGHT_CMD + args, cwd=cwd, timeout=timeout)
return result.stdout
def write_js(workdir: Path, name: str, content: str) -> Path:
"""把 JavaScript 脚本写入 workdir 供 Playwright run-code 调用。"""
path = workdir / name
path.write_text(content, encoding="utf-8")
return path
def build_login_js(username: str, password: str) -> str:
"""生成 Tableau 登录脚本,自动填充账号密码并等待跳转。"""
payload = {"username": username, "password": password}
spec = json.dumps(payload, ensure_ascii=False)
return f"""async function(page) {{
......@@ -205,89 +329,143 @@ def build_login_js(username: str, password: str) -> str:
def build_configure_view_js(spec: dict[str, Any]) -> str:
"""配置激活 sheet 并逐个应用 filters/params,允许部分字段失败而不终止。"""
payload = json.dumps(spec, ensure_ascii=False)
return f"""async function(page) {{
const spec = {payload};
await page.waitForFunction(
() => !!(window.tableau && window.tableau.VizManager && window.tableau.VizManager.getVizs().length),
null,
{{ timeout: 30000 }}
);
return await page.evaluate(async (config) => {{
// Tableau 偶发会出现 Viz 已创建但 workbook 尚未就绪,需显式等待。
const deadline = Date.now() + 30000;
let viz = null;
let workbook = null;
while (Date.now() < deadline) {{
try {{
const vizs = window.tableau?.VizManager?.getVizs?.() || [];
viz = vizs[0] || null;
workbook = viz && typeof viz.getWorkbook === 'function' ? viz.getWorkbook() : null;
}} catch (error) {{
workbook = null;
}}
if (workbook) {{
break;
}}
await new Promise((resolve) => setTimeout(resolve, 500));
}}
if (!workbook) {{
throw new Error('Tableau workbook is not ready');
}}
try {{
await workbook.revertAllAsync();
}} catch (error) {{
function getVisibleFrame() {{
return page.frames().find(
(candidate) => candidate !== page.mainFrame() && candidate.url().includes(spec.inner_frame_fragment)
);
}}
const deadline = Date.now() + 120000;
let frame = null;
while (Date.now() < deadline) {{
frame = getVisibleFrame();
if (frame) {{
break;
}}
await workbook.activateSheetAsync(config.activate_sheet);
const activeSheet = workbook.getActiveSheet();
let worksheets = [];
if (activeSheet && typeof activeSheet.getWorksheets === 'function') {{
worksheets = activeSheet.getWorksheets();
await page.waitForTimeout(500);
}}
if (!frame) {{
throw new Error(`Frame not found for ${{spec.capture_id}}`);
}}
await frame.evaluate(async (config) => {{
function sleep(ms) {{
return new Promise((resolve) => setTimeout(resolve, ms));
}}
const targets = worksheets.length ? worksheets : [activeSheet];
const updateType = window.tableau.FilterUpdateType.REPLACE;
const filterApply = [];
for (const filter of config.filters) {{
let applied = false;
for (const worksheet of targets) {{
const waitFor = async (predicate, timeoutMs, errorMessage) => {{
const startedAt = Date.now();
while (Date.now() - startedAt < timeoutMs) {{
const value = predicate();
if (value) {{
return value;
}}
await sleep(250);
}}
throw new Error(errorMessage);
}};
const removeErrorDialog = () => {{
document.querySelector('[data-tb-test-id="detailedErrorDialog-Dialog-Glass-Root"]')?.remove();
document.querySelector('[data-tb-test-id="detailedErrorDialog-Dialog-Glass"]')?.remove();
const closeButton = document.querySelector('[data-tb-test-id="detailedErrorDialog-Dialog-CloseButton"]');
if (closeButton) {{
try {{
await worksheet.applyFilterAsync(filter.field, filter.values, updateType);
applied = true;
filterApply.push({{
field: filter.field,
worksheet: typeof worksheet.getName === 'function' ? worksheet.getName() : 'unknown',
ok: true,
}});
closeButton.click();
}} catch (error) {{
}}
}}
if (!applied) {{
filterApply.push({{
field: filter.field,
worksheet: null,
ok: false,
}});
}};
const getWidgetMap = () => {{
const panels = Array.from(document.querySelectorAll('.QuickFilterPanel'));
return new Map(
panels
.map((panel) => panel.querySelector('.CategoricalFilter')?.id)
.filter(Boolean)
.map((id) => {{
const widget = window.dijit?.byId?.(id);
return widget ? [widget.get_oFilter().caption, widget] : null;
}})
.filter(Boolean)
);
}};
const applyAll = (widget) => {{
window.tab.FilterClientCommands.setCategoricalQuickFilterToAll(
window.tab.CancelDialog.getCustomWaitHandlerForReturnFocus(),
widget.get_session().get_visualId(),
widget.get_identifier()
);
}};
const applyByLabels = (widget, values) => {{
const tuples = widget.get_oFilter().table?.tuples || [];
const desired = new Set(values);
const indices = tuples
.map((tuple, index) => {{
const label = tuple.d ?? tuple.t?.map((item) => item.v).join('|') ?? '';
return desired.has(label) ? index : -1;
}})
.filter((index) => index >= 0);
if (!indices.length) {{
throw new Error(`No matching values for ${{widget.get_oFilter().caption}}: ${{values.join(', ')}}`);
}}
}}
window.tab.FilterClientCommands.setCategoricalFilterValuesByIndex(
window.tab.CancelDialog.getCustomWaitHandlerForReturnFocus(),
widget.get_session().get_visualId(),
widget.get_identifier(),
indices
);
}};
await waitFor(
() => window.dijit && document.querySelectorAll('.QuickFilterPanel').length >= 13,
120000,
'Quick filter widgets not ready'
);
removeErrorDialog();
for (const [name, value] of Object.entries(config.params)) {{
try {{
await workbook.changeParameterValueAsync(name, value);
}} catch (error) {{
const widgetMap = getWidgetMap();
const applied = [];
for (const filter of config.filter_config) {{
const widget = widgetMap.get(filter.caption);
if (!widget) {{
throw new Error(`Quick filter not found: ${{filter.caption}}`);
}}
if (filter.action === 'all') {{
applyAll(widget);
}} else if (filter.action === 'replace') {{
applyByLabels(widget, filter.values || []);
}} else {{
throw new Error(`Unsupported filter action: ${{filter.action}}`);
}}
applied.push({{
caption: filter.caption,
action: filter.action,
values: filter.values || [],
}});
removeErrorDialog();
await sleep(2500);
}}
await new Promise((resolve) => setTimeout(resolve, 7000));
await sleep(6000);
removeErrorDialog();
return {{
activeSheet: activeSheet.getName(),
targetCount: targets.length,
filterApply,
filters: config.filters,
params: config.params,
url: location.href,
applied,
summaries: Array.from(getWidgetMap().entries()).map(([caption, widget]) => ({{
caption,
summary: widget.get_oFilter().summary,
}})),
}};
}}, spec);
}}
......@@ -295,7 +473,6 @@ def build_configure_view_js(spec: dict[str, Any]) -> str:
def build_capture_js(inner_frame_fragment: str, screenshot_name: str) -> str:
"""在 inner frame 中截图并保存到 workdir,让后续裁切使用。"""
payload = json.dumps(
{
"inner_frame_fragment": inner_frame_fragment,
......@@ -310,6 +487,12 @@ def build_capture_js(inner_frame_fragment: str, screenshot_name: str) -> str:
(candidate) => candidate !== page.mainFrame() && candidate.url().includes(spec.inner_frame_fragment)
);
const target = frame || page;
if (frame) {{
await frame.evaluate(() => {{
document.querySelector('[data-tb-test-id="detailedErrorDialog-Dialog-Glass-Root"]')?.remove();
document.querySelector('[data-tb-test-id="detailedErrorDialog-Dialog-Glass"]')?.remove();
}});
}}
const body = await target.$('body');
await body.screenshot({{
path: spec.screenshot_name,
......@@ -348,15 +531,18 @@ def save_state(session: str, state_path: Path, *, cwd: Path) -> None:
def load_state_if_present(session: str, state_path: Path, *, cwd: Path) -> None:
if not state_path.exists():
return
run_playwright(
["--session", session, "state-load", str(state_path)],
cwd=cwd,
timeout=60,
)
try:
run_playwright(
["--session", session, "state-load", str(state_path)],
cwd=cwd,
timeout=60,
)
except subprocess.CalledProcessError:
# Playwright CLI on this machine only allows reading state files under the workdir root.
pass
def locate_session_file(root: Path, session: str, filename: str) -> Path:
"""尝试定位 Playwright 保存的截图文件,多目录扫一遍以防路径改变。"""
search_roots = [
root / "output" / "playwright",
root / "output" / "vip-report",
......@@ -370,8 +556,11 @@ def locate_session_file(root: Path, session: str, filename: str) -> Path:
if len(matches) == 1:
return matches[0]
if matches:
latest = sorted({item.resolve() for item in matches}, key=lambda item: item.stat().st_mtime, reverse=True)[0]
return latest
return sorted(
{item.resolve() for item in matches},
key=lambda item: item.stat().st_mtime,
reverse=True,
)[0]
raise FileNotFoundError(f"Unable to locate {filename} under {root}.")
......@@ -382,7 +571,6 @@ def crop_image(
*,
resize_to: dict[str, int] | None = None,
) -> None:
"""裁切并可选缩放截图,保证输出路径存在。"""
image = Image.open(source)
box = (
crop["left"],
......@@ -412,7 +600,7 @@ def parse_args() -> argparse.Namespace:
parser.add_argument(
"--report-month",
default="",
help="Report month label for Tableau filter, e.g. 三月 / 3",
help="Report month label for Tableau filter, e.g. 一月 / 1",
)
parser.add_argument(
"--report-year",
......@@ -424,13 +612,12 @@ def parse_args() -> argparse.Namespace:
"--compare-year",
type=int,
default=0,
help="Comparison year for LFL or other references, e.g. 2025",
help="Comparison year placeholder, default report year - 1",
)
return parser.parse_args()
def collect_required_capture_ids(filtered_assets: list[dict[str, Any]]) -> set[str]:
"""列出当前需要的 capture_id,避免重复访问 Tableau。"""
return {asset["capture_id"] for asset in filtered_assets}
......@@ -442,7 +629,6 @@ def capture_tableau_view(
workdir: Path,
workspace_root: Path,
) -> Path:
"""针对单个 capture_spec 拉起 Tableau 页面、应用筛选、截图输出。"""
target_url = f"{base_url}{capture_spec['hash_url']}"
run_playwright(["--session", session, "goto", target_url], cwd=workdir, timeout=120)
run_playwright(
......@@ -453,18 +639,12 @@ def capture_tableau_view(
configure_script = write_js(
workdir,
f"tmp-{capture_spec['capture_id']}-configure.js",
build_configure_view_js(
{
"activate_sheet": capture_spec["activate_sheet"],
"filters": capture_spec["filters"],
"params": capture_spec["params"],
}
),
f"tmp-{capture_spec['capture_id']}-{time.time_ns()}-configure.js",
build_configure_view_js(capture_spec),
)
capture_script = write_js(
workdir,
f"tmp-{capture_spec['capture_id']}-capture.js",
f"tmp-{capture_spec['capture_id']}-{time.time_ns()}-capture.js",
build_capture_js(
capture_spec["inner_frame_fragment"],
capture_spec["raw_screenshot_name"],
......@@ -472,7 +652,7 @@ def capture_tableau_view(
)
try:
max_attempts = 5
max_attempts = 3
last_error: str | None = None
for attempt in range(1, max_attempts + 1):
configure_output = run_code(session, configure_script, cwd=workdir, timeout=420).strip()
......@@ -490,7 +670,21 @@ def capture_tableau_view(
raise RuntimeError(
f"Failed to configure Tableau view for {capture_spec['capture_id']}: {last_error}"
)
run_code(session, capture_script, cwd=workdir, timeout=180)
capture_output = run_code(session, capture_script, cwd=workdir, timeout=240).strip()
if "### Error" in capture_output:
last_error = capture_output
if attempt < max_attempts:
run_playwright(["--session", session, "goto", target_url], cwd=workdir, timeout=120)
run_playwright(
["--session", session, "resize", str(VIEWPORT["width"]), str(VIEWPORT["height"])],
cwd=workdir,
timeout=60,
)
time.sleep(3)
continue
raise RuntimeError(
f"Failed to capture Tableau view for {capture_spec['capture_id']}: {last_error}"
)
break
finally:
configure_script.unlink(missing_ok=True)
......@@ -502,9 +696,19 @@ def capture_tableau_view(
return locate_session_file(workspace_root, session, capture_spec["raw_screenshot_name"])
def resolve_config_path(requested_path: str) -> Path:
path = Path(requested_path)
if path.exists():
return path
local_fallback = Path(__file__).resolve().parents[1] / "config.yaml"
if local_fallback.exists():
return local_fallback
return path
def main() -> None:
args = parse_args()
config_path = Path(args.config)
config_path = resolve_config_path(args.config)
config = yaml.safe_load(config_path.read_text(encoding="utf-8"))
report_month_label, report_month_number, report_year, compare_year = resolve_report_period(args, config)
specs = build_specs(report_month_label, report_month_number, report_year, compare_year)
......@@ -528,7 +732,7 @@ def main() -> None:
raw_screenshots: dict[str, Path] = {}
if not use_template_lock:
session = SESSION_NAME
state_path = workspace_root / "output" / "playwright" / session / "state.json"
state_path = vip_workdir / ".playwright-cli" / f"{session}-state.json"
ensure_browser_session(session, cwd=vip_workdir)
load_state_if_present(session, state_path, cwd=vip_workdir)
......@@ -540,7 +744,7 @@ def main() -> None:
login_script = write_js(
vip_workdir,
"tmp-warehouse-login.js",
f"tmp-warehouse-login-{time.time_ns()}.js",
build_login_js(config["tableau"]["username"], config["tableau"]["password"]),
)
try:
......@@ -564,7 +768,6 @@ def main() -> None:
for asset in filtered_assets:
target = asset_dir / f"{asset['asset_name']}.png"
if use_template_lock:
# 基线期使用模板锁定图,确保与模板一致。
source_path = TEMPLATE_LOCK_DIR / f"s{asset['slide']}_shape{asset['shape_id']}.png"
if not source_path.exists():
raise FileNotFoundError(f"Template lock image not found: {source_path}")
......@@ -577,7 +780,6 @@ def main() -> None:
asset["crop"],
resize_to=asset.get("resize_to"),
)
if not use_template_lock:
operations["replace_images"].append(
{
"slide": asset["slide"],
......@@ -586,6 +788,7 @@ def main() -> None:
"image_path": str(target),
}
)
manifest_item = {
"slide_code": asset["slide_code"],
"slide": asset["slide"],
......@@ -597,9 +800,8 @@ def main() -> None:
"source_view": "template-lock" if use_template_lock else asset["source_view"],
"note": asset["note"],
"raw_screenshot": str(source_path),
"crop": asset["crop"],
}
if "crop" in asset:
manifest_item["crop"] = asset["crop"]
if asset.get("resize_to"):
manifest_item["resize_to"] = asset["resize_to"]
manifest_items.append(manifest_item)
......@@ -617,20 +819,14 @@ def main() -> None:
"capture_id": capture_id,
"hash_url": captures_by_id[capture_id]["hash_url"],
"activate_sheet": captures_by_id[capture_id]["activate_sheet"],
"filters": captures_by_id[capture_id]["filters"],
"params": captures_by_id[capture_id]["params"],
"filter_config": captures_by_id[capture_id]["filter_config"],
"raw_screenshot": str(raw_screenshots[capture_id]),
"note": captures_by_id[capture_id]["note"],
}
for capture_id in sorted(raw_screenshots)
],
"config_path": str(config_path),
"report_period": {
"month_label": report_month_label,
"month_number": report_month_number,
"year": report_year,
"compare_year": compare_year,
},
"report_period": specs["report_period"],
},
"assets": manifest_items,
"operations_path": str(operations_path),
......@@ -641,7 +837,15 @@ def main() -> None:
encoding="utf-8",
)
print(json.dumps({"operations_path": str(operations_path), "manifest_path": str(manifest_path)}, ensure_ascii=False))
print(
json.dumps(
{
"operations_path": str(operations_path),
"manifest_path": str(manifest_path),
},
ensure_ascii=False,
)
)
if __name__ == "__main__":
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment