Commit d90a2b4d authored by allen.wang's avatar allen.wang

feat:init

parent 89ec371c
param(
[string]$ConfigPath = "C:\Users\niuniu\.codex\vip-report\config.yaml",
[string]$Slides = "S10",
[string]$Slides = "S09,S10",
[string]$ReportMonth = "",
[int]$ReportYear = 0,
[int]$CompareYear = 0,
......@@ -40,10 +40,16 @@ if ($CompareYear -gt 0) {
}
python @pythonArgs
if ($LASTEXITCODE -ne 0) {
throw "sync_top_products_assets.py failed with exit code $LASTEXITCODE"
}
if ($Render) {
powershell -ExecutionPolicy Bypass -File "$root\bin\vip-report-render.ps1" -TemplatePath "$TemplatePath" -OutputPath "$OutputPath" -OperationsPath "$opsPath" | Out-Null
python "$root\scripts\compare_pptx.py" "$TemplatePath" "$OutputPath" --output "$CompareOutputPath"
if ($LASTEXITCODE -ne 0) {
throw "compare_pptx.py failed with exit code $LASTEXITCODE"
}
Write-Output $OutputPath
Write-Output $CompareOutputPath
} else {
......
......@@ -122,6 +122,7 @@ def build_filters(
include_compare_year: bool = True,
cumulative_month: bool = True,
storename_value_candidates: list[list[str]] | None = None,
storename_field_candidates: list[str] | None = None,
) -> list[dict[str, Any]]:
"""构造 inventory monthly 的筛选候选,命中 worksheet 后停止继续尝试。"""
# billdate 年筛选按“报告年 + 上一年”动态计算,避免硬编码固定年份。
......@@ -136,21 +137,18 @@ def build_filters(
cumulative=cumulative_month,
)
storename_candidates = storename_value_candidates or [["CKC-VIP"], ["ckc-vip"]]
return [
{
"label": "storename",
"field_candidates": [
"storename (Shoes) ",
"Storename (Shoes) ",
"storename (Shoes)",
"Storename (Shoes)",
storename_fields = storename_field_candidates or [
"storename",
"Storename",
"storecode",
"storename (组)",
"Storename (组)",
],
]
return [
{
"label": "storename",
"field_candidates": storename_fields,
"value_candidates": storename_candidates,
"require_worksheet_match": False,
},
......@@ -162,7 +160,7 @@ def build_filters(
},
{
"label": "brand",
"field_candidates": ["Brand", "brand"],
"field_candidates": ["Brand(Shoes)", "Brand", "brand"],
"value_candidates": [["CK"]],
"require_worksheet_match": False,
},
......@@ -180,14 +178,21 @@ def build_subcategory_filters(
report_year: int,
compare_year: int | None = None,
) -> list[dict[str, Any]]:
"""S08 需要按报告月精确筛选,并限定 category/storename。"""
"""S08 按统一年月筛选,并限定 category/storename。"""
filters = build_filters(
report_month,
report_year,
compare_year,
include_compare_year=False,
cumulative_month=False,
storename_value_candidates=[["ckc-vip"], ["CKC-VIP"]],
include_compare_year=True,
cumulative_month=True,
storename_value_candidates=[["CKC-VIP"], ["ckc-vip"]],
storename_field_candidates=[
"storename",
"Storename",
"storecode",
"storename (组)",
"Storename (组)",
],
)
for item in filters:
if item.get("label") == "brand":
......@@ -216,16 +221,78 @@ def build_subcategory_filters(
return filters
def build_bags_filters(
report_month: str,
report_year: int,
compare_year: int | None = None,
) -> list[dict[str, Any]]:
"""S05 需要严格命中 worksheet 级的年月/门店筛选。"""
filters = build_filters(
report_month,
report_year,
compare_year,
include_compare_year=True,
cumulative_month=True,
storename_value_candidates=[["CKC-VIP"], ["ckc-vip"]],
storename_field_candidates=[
"storename",
"Storename",
"storecode",
"storename (组)",
"Storename (组)",
],
)
for item in filters:
if item.get("label") == "storename":
item["require_worksheet_match"] = True
if item.get("label") in {"storename", "year", "month"}:
item["apply_all_fields"] = True
return filters
def build_shoes_filters(
report_month: str,
report_year: int,
compare_year: int | None = None,
) -> list[dict[str, Any]]:
"""S06 需要命中 Shoes 专属门店字段,其余筛选允许 dashboard 级联动。"""
filters = build_filters(
report_month,
report_year,
compare_year,
include_compare_year=True,
cumulative_month=True,
storename_value_candidates=[["CKC-VIP"], ["ckc-vip"]],
storename_field_candidates=[
"storename (Shoes) ",
"Storename (Shoes) ",
"storename (Shoes)",
"Storename (Shoes)",
"storename",
"Storename",
"storecode",
"storename (组)",
"Storename (组)",
],
)
for item in filters:
if item.get("label") == "storename":
item["require_worksheet_match"] = True
return filters
def build_specs(report_month: str, report_year: int, compare_year: int) -> dict[str, Any]:
"""返回 S04-S08 需要的 capture/asset 描述,包含裁切信息与说明。"""
filters = build_filters(report_month, report_year, compare_year)
s05_filters = build_bags_filters(report_month, report_year, compare_year)
s06_filters = build_shoes_filters(report_month, report_year, compare_year)
s07_filters = build_filters(
report_month,
report_year,
compare_year,
include_compare_year=False,
cumulative_month=False,
storename_value_candidates=[["ckc-vip"], ["CKC-VIP"]],
include_compare_year=True,
cumulative_month=True,
storename_value_candidates=[["CKC-VIP"], ["ckc-vip"]],
)
s08_filters = build_subcategory_filters(report_month, report_year, compare_year)
capture_template = [
......@@ -238,20 +305,56 @@ def build_specs(report_month: str, report_year: int, compare_year: int) -> dict[
"raw_screenshot_name": "inventory-s04-overall.png",
},
{
"capture_id": "bags",
"capture_id": "bags_top",
"hash_url": "#/views/CKInventoryMonthlyReport/Bags?:iid=1",
"inner_frame_fragment": "/views/CKInventoryMonthlyReport/Bags?",
"activate_sheet": "Bags",
"filters": s05_filters,
"capture_mode": "page",
"viewport": {"width": 1280, "height": 720},
"capture_wait_ms": 3000,
"note": "S05 Bags 顶部区域,使用页面截图保留 Sales/YTM 图表。",
"raw_screenshot_name": "inventory-s05-bags-page-top.png",
},
{
"capture_id": "bags_bottom",
"hash_url": "#/views/CKInventoryMonthlyReport/Bags?:iid=1",
"inner_frame_fragment": "/views/CKInventoryMonthlyReport/Bags?",
"activate_sheet": "Bags",
"note": "S05 Bags 视图,仅展示包袋分类信息。",
"raw_screenshot_name": "inventory-s05-bags.png",
"filters": s05_filters,
"capture_mode": "page",
"viewport": {"width": 1280, "height": 720},
"scroll_selector": "#dashboard-viewport",
"scroll_top": 1160,
"capture_wait_ms": 3000,
"note": "S05 Bags 下半部分,滚动 dashboard-viewport 后截图。",
"raw_screenshot_name": "inventory-s05-bags-page-bottom.png",
},
{
"capture_id": "shoes_top",
"hash_url": "#/views/CKInventoryMonthlyReport/Shoes?:iid=1",
"inner_frame_fragment": "/views/CKInventoryMonthlyReport/Shoes?",
"activate_sheet": "Shoes",
"filters": s06_filters,
"capture_mode": "page",
"viewport": {"width": 1280, "height": 720},
"capture_wait_ms": 3000,
"note": "S06 Shoes 顶部区域,使用页面截图保留 Sales/YTM 图表。",
"raw_screenshot_name": "inventory-s06-shoes-page-top.png",
},
{
"capture_id": "shoes",
"capture_id": "shoes_bottom",
"hash_url": "#/views/CKInventoryMonthlyReport/Shoes?:iid=1",
"inner_frame_fragment": "/views/CKInventoryMonthlyReport/Shoes?",
"activate_sheet": "Shoes",
"note": "S06 Shoes 视图,展示鞋类趋势。",
"raw_screenshot_name": "inventory-s06-shoes.png",
"filters": s06_filters,
"capture_mode": "page",
"viewport": {"width": 1280, "height": 720},
"scroll_selector": "#dashboard-viewport",
"scroll_top": 1180,
"capture_wait_ms": 3000,
"note": "S06 Shoes 下半部分,滚动 dashboard-viewport 后截图。",
"raw_screenshot_name": "inventory-s06-shoes-page-bottom.png",
},
{
"capture_id": "discount_regular",
......@@ -291,6 +394,16 @@ def build_specs(report_month: str, report_year: int, compare_year: int) -> dict[
"raw_screenshot_name": info["raw_screenshot_name"],
"note": info["note"],
}
for optional_key in [
"capture_mode",
"viewport",
"ready_patterns",
"scroll_selector",
"scroll_top",
"capture_wait_ms",
]:
if optional_key in info:
spec[optional_key] = info[optional_key]
captures.append(spec)
assets = [
......@@ -336,8 +449,8 @@ def build_specs(report_month: str, report_year: int, compare_year: int) -> dict[
"shape_name": "图片 1",
"shape_id": 2,
"asset_name": "s05_bags_top",
"capture_id": "bags",
"crop": {"left": 100, "top": 128, "width": 1200, "height": 202},
"capture_id": "bags_top",
"crop": {"left": 55, "top": 175, "width": 1175, "height": 250},
"resize_to": {"width": 1093, "height": 237},
"source_view": "Bags",
"note": "S05 第一行:Sales LFL 与 YTM。",
......@@ -348,8 +461,8 @@ def build_specs(report_month: str, report_year: int, compare_year: int) -> dict[
"shape_name": "图片 2",
"shape_id": 3,
"asset_name": "s05_bags_bottom",
"capture_id": "bags",
"crop": {"left": 100, "top": 1320, "width": 1200, "height": 365},
"capture_id": "bags_bottom",
"crop": {"left": 55, "top": 265, "width": 1175, "height": 400},
"resize_to": {"width": 1093, "height": 376},
"source_view": "Bags",
"note": "S05 第二/三行:Bags Qty%/ASP 与 Bags GP/PD。",
......@@ -360,8 +473,8 @@ def build_specs(report_month: str, report_year: int, compare_year: int) -> dict[
"shape_name": "图片 2",
"shape_id": 3,
"asset_name": "s06_shoes_top",
"capture_id": "shoes",
"crop": {"left": 0, "top": 130, "width": 1400, "height": 360},
"capture_id": "shoes_top",
"crop": {"left": 55, "top": 175, "width": 1175, "height": 250},
"resize_to": {"width": 1132, "height": 248},
"source_view": "Shoes",
"note": "S06 第一行:Sales LFL 与 YTM。",
......@@ -372,8 +485,8 @@ def build_specs(report_month: str, report_year: int, compare_year: int) -> dict[
"shape_name": "图片 6",
"shape_id": 7,
"asset_name": "s06_shoes_bottom",
"capture_id": "shoes",
"crop": {"left": 0, "top": 540, "width": 1400, "height": 520},
"capture_id": "shoes_bottom",
"crop": {"left": 55, "top": 235, "width": 1175, "height": 405},
"resize_to": {"width": 1132, "height": 388},
"source_view": "Shoes",
"note": "S06 第二/三行:Shoes Qty% Difference、Shoes ASP LFL、Shoes GP Difference、Shoes PD Difference。",
......@@ -566,9 +679,11 @@ def build_configure_view_js(spec: dict[str, Any]) -> str:
const valueCandidates = filter.value_candidates || (filter.values ? [filter.values] : []);
const requireWorksheetMatch =
(filter.require_worksheet_match !== false) && worksheets.length > 0;
const applyAllFields = !!filter.apply_all_fields;
let matched = false;
for (const field of fieldCandidates) {{
let fieldMatched = false;
for (const values of valueCandidates) {{
const result = await applyFilterCandidate(field, values);
if (result.anySuccess && (!requireWorksheetMatch || result.anyWorksheetSuccess)) {{
......@@ -581,12 +696,16 @@ def build_configure_view_js(spec: dict[str, Any]) -> str:
matched_on_worksheet: result.anyWorksheetSuccess,
}});
matched = true;
fieldMatched = true;
break;
}}
}}
if (matched) {{
if (fieldMatched && !applyAllFields) {{
break;
}}
if (fieldMatched) {{
continue;
}}
}}
if (!matched) {{
......@@ -629,14 +748,22 @@ def build_capture_js(
*,
ready_patterns: list[str] | None = None,
viewport: dict[str, int] | None = None,
capture_mode: str = "frame_body",
scroll_selector: str | None = None,
scroll_top: int | None = None,
capture_wait_ms: int = 3000,
) -> str:
"""找到 inner frame,截取整个 body 作为截图。"""
"""支持 frame-body 与 page 两种截图模式。"""
payload = json.dumps(
{
"inner_frame_fragment": inner_frame_fragment,
"screenshot_name": screenshot_name,
"ready_patterns": ready_patterns or [],
"viewport": viewport or {},
"capture_mode": capture_mode,
"scroll_selector": scroll_selector,
"scroll_top": scroll_top,
"capture_wait_ms": capture_wait_ms,
},
ensure_ascii=False,
)
......@@ -664,12 +791,28 @@ def build_capture_js(
{{ timeout: 60000 }}
);
}}
await page.waitForTimeout(3000);
if (frame && spec.scroll_selector) {{
await frame.evaluate((payload) => {{
const node = document.querySelector(payload.selector);
if (!node) {{
throw new Error(`Scroll target not found: ${{payload.selector}}`);
}}
node.scrollTop = Number(payload.scrollTop || 0);
}}, {{ selector: spec.scroll_selector, scrollTop: spec.scroll_top || 0 }});
}}
await page.waitForTimeout(Number(spec.capture_wait_ms || 3000));
if (spec.capture_mode === 'page') {{
await page.screenshot({{
path: spec.screenshot_name,
scale: 'css',
}});
}} else {{
const body = await target.$('body');
await body.screenshot({{
path: spec.screenshot_name,
scale: 'css',
}});
}}
return {{
frameUrl: frame ? frame.url() : null,
pageUrl: page.url(),
......@@ -763,6 +906,30 @@ def crop_image(
result.save(target)
def wait_for_capture_file(
filename: str,
*,
workdir: Path,
workspace_root: Path,
session: str,
timeout_seconds: int = 45,
) -> Path:
"""等待 Playwright 截图真正落盘,避免 run-code 返回早于文件可见。"""
deadline = time.time() + timeout_seconds
local_output = workdir / filename
while time.time() < deadline:
if local_output.exists():
return local_output
try:
located = locate_session_file(workspace_root, session, filename)
if located.exists():
return located
except FileNotFoundError:
pass
time.sleep(1)
raise FileNotFoundError(f"Unable to locate {filename} under {workspace_root}.")
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(
description="采集 CK Inventory Monthly 的 S04-S08 Tableau 视图素材。"
......@@ -844,6 +1011,10 @@ def capture_tableau_view(
capture_spec["raw_screenshot_name"],
ready_patterns=capture_spec.get("ready_patterns"),
viewport=capture_spec.get("viewport", VIEWPORT),
capture_mode=capture_spec.get("capture_mode", "frame_body"),
scroll_selector=capture_spec.get("scroll_selector"),
scroll_top=capture_spec.get("scroll_top"),
capture_wait_ms=int(capture_spec.get("capture_wait_ms", 3000)),
),
)
......@@ -876,10 +1047,13 @@ def capture_tableau_view(
configure_script.unlink(missing_ok=True)
capture_script.unlink(missing_ok=True)
local_output = workdir / capture_spec["raw_screenshot_name"]
if local_output.exists():
return local_output
return locate_session_file(workspace_root, session, capture_spec["raw_screenshot_name"])
return wait_for_capture_file(
capture_spec["raw_screenshot_name"],
workdir=workdir,
workspace_root=workspace_root,
session=session,
timeout_seconds=45,
)
def main() -> None:
......
......@@ -2,6 +2,7 @@ from __future__ import annotations
import argparse
import calendar
import datetime
import json
import re
import subprocess
......@@ -36,8 +37,47 @@ MONTH_CN_MAP = {
12: "\u5341\u4e8c\u6708",
}
MONTH_INDEX_MAP = {label: index for index, label in MONTH_CN_MAP.items()}
S10_PRODUCT_IMAGE_URL = "https://api.charleskeith.cn/img/{articlecode}.jpg"
S10_PRODUCT_SLOTS = [
PRODUCT_IMAGE_URL = "https://api.charleskeith.cn/img/{articlecode}.jpg"
SLIDE_SPECS: dict[str, dict[str, Any]] = {
"S09": {
"category_filter": "Bags",
"level1": "BAGS",
"capture_id": "top_products_bags",
"slide": 9,
"shape_id": 3,
"shape_name": "Picture 2",
"asset_name": "s09_top_products_chart",
"raw_screenshot_name": "top-products-bags.png",
"resize_to": {"width": 1034, "height": 587},
"crop": {"left": 0, "top": 180, "width": 1050, "height": 600},
"note": "S09 main Tableau chart area.",
"product_slots": [
{"rank": 1, "shape_id": 4, "shape_name": "Picture 2", "canvas_width": 707, "canvas_height": 500},
{"rank": 2, "shape_id": 1042, "shape_name": "Picture 18", "canvas_width": 707, "canvas_height": 500},
{"rank": 3, "shape_id": 5, "shape_name": "Picture 4", "canvas_width": 707, "canvas_height": 500},
{"rank": 4, "shape_id": 1034, "shape_name": "Picture 10", "canvas_width": 707, "canvas_height": 472},
{"rank": 5, "shape_id": 1038, "shape_name": "Picture 14", "canvas_width": 707, "canvas_height": 500},
{"rank": 6, "shape_id": 1036, "shape_name": "Picture 12", "canvas_width": 707, "canvas_height": 500},
{"rank": 7, "shape_id": 1032, "shape_name": "Picture 8", "canvas_width": 707, "canvas_height": 500},
{"rank": 8, "shape_id": 7, "shape_name": "Picture 8", "canvas_width": 707, "canvas_height": 500},
{"rank": 9, "shape_id": 6, "shape_name": "Picture 6", "canvas_width": 707, "canvas_height": 500},
{"rank": 10, "shape_id": 8, "shape_name": "Picture 10", "canvas_width": 707, "canvas_height": 500},
],
},
"S10": {
"category_filter": "Shoes",
"level1": "SHOES",
"capture_id": "top_products_shoes",
"slide": 10,
"shape_id": 2,
"shape_name": "Picture 1",
"asset_name": "s10_top_products_chart",
"raw_screenshot_name": "top-products-shoes.png",
"resize_to": {"width": 1039, "height": 585},
"crop": {"left": 0, "top": 120, "width": 1400, "height": 900},
"note": "S10 main Tableau chart area.",
"product_slots": [
{"rank": 1, "shape_id": 3, "shape_name": "Picture 2", "canvas_width": 708, "canvas_height": 500},
{"rank": 2, "shape_id": 4, "shape_name": "Picture 2", "canvas_width": 707, "canvas_height": 500},
{"rank": 3, "shape_id": 6, "shape_name": "Picture 4", "canvas_width": 750, "canvas_height": 500},
......@@ -48,7 +88,9 @@ S10_PRODUCT_SLOTS = [
{"rank": 8, "shape_id": 11, "shape_name": "Picture 14", "canvas_width": 500, "canvas_height": 707},
{"rank": 9, "shape_id": 13, "shape_name": "Picture 16", "canvas_width": 750, "canvas_height": 500},
{"rank": 10, "shape_id": 14, "shape_name": "Picture 18", "canvas_width": 750, "canvas_height": 500},
]
],
},
}
def normalize_month_label(raw_month: Any) -> str:
......@@ -68,9 +110,19 @@ def normalize_month_label(raw_month: Any) -> str:
def resolve_month_index(report_month: str) -> int:
normalized = normalize_month_label(report_month)
if normalized in MONTH_INDEX_MAP:
return MONTH_INDEX_MAP[normalized]
if normalized not in MONTH_INDEX_MAP:
raise ValueError(f"Unsupported report month: {report_month}")
return MONTH_INDEX_MAP[normalized]
def build_month_week_labels(report_year: int, report_month: str) -> list[str]:
month_index = resolve_month_index(report_month)
last_day = calendar.monthrange(report_year, month_index)[1]
week_numbers = {
datetime.date(report_year, month_index, day).isocalendar()[1]
for day in range(1, last_day + 1)
}
return [f"W{week_number}" for week_number in sorted(week_numbers)]
def resolve_report_period(args: argparse.Namespace, config: dict[str, Any]) -> tuple[str, int, int]:
......@@ -81,58 +133,48 @@ def resolve_report_period(args: argparse.Namespace, config: dict[str, Any]) -> t
return report_month, report_year, report_compare_year
def build_tableau_filters(report_month: str, report_year: int) -> list[dict[str, Any]]:
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="Generate S09/S10 top-products assets for the VIP report.")
parser.add_argument("--config", required=True, help="Path to config.yaml")
parser.add_argument("--slides", default="S09,S10", help="Slide codes to generate.")
parser.add_argument("--report-month", help="Report month, for example 3 / 03 / 三月")
parser.add_argument("--report-year", type=int, help="Report year")
parser.add_argument("--compare-year", type=int, help="Compare year")
return parser.parse_args()
def build_filter_actions(report_month: str, report_year: int, slide_code: str) -> list[dict[str, Any]]:
level1 = SLIDE_SPECS[slide_code]["level1"]
week_labels = build_month_week_labels(report_year, report_month)
return [
{"field": "Category", "values": ["Shoes"]},
{"field": "Year", "values": [str(report_year)]},
{"field": "Month", "values": [report_month]},
{"field": "storename (group)", "values": ["CKC-VIP"]},
{"field": "Storename (group)", "values": ["CKC-VIP"]},
{"field": "storename", "values": ["CKC-VIP"]},
{"field": "Storename", "values": ["CKC-VIP"]},
{"field": "week", "values": [ALL_FILTER_VALUE]},
{"field": "Week", "values": [ALL_FILTER_VALUE]},
{"field": "Wkt<=4", "values": [ALL_FILTER_VALUE]},
{"field": "daily", "values": [ALL_FILTER_VALUE]},
{"field": "Daily", "values": [ALL_FILTER_VALUE]},
{"kind": "filter", "worksheet": "TOP", "field": "year", "values": [str(report_year)]},
{"kind": "filter", "worksheet": "TOP", "field": "月(Daily)", "values": [report_month]},
{"kind": "filter", "worksheet": "TOP", "field": "ISO周数(Daily)", "values": week_labels},
{"kind": "filter", "worksheet": "category contributions", "field": "月(Daily)", "values": [report_month]},
{"kind": "filter", "worksheet": "Soldqty Trend", "field": "Category", "values": [level1]},
{"kind": "filter", "worksheet": "OVERALL TOP", "field": "Category", "values": [level1]},
{"kind": "filter", "worksheet": "TOP", "field": "Category", "values": [level1]},
{"kind": "filter", "worksheet": "Soldqty and SOH details", "field": "Category", "values": [level1]},
{"kind": "filter", "worksheet": "category contributions", "field": "storename (group)", "values": ["CKC-VIP"]},
{"kind": "filter", "worksheet": "category contributions", "field": "storename", "values": ["CKC-VIP"]},
{"kind": "filter", "worksheet": "Soldqty Trend", "field": "storename (group)", "values": ["CKC-VIP"]},
{"kind": "filter", "worksheet": "Soldqty Trend", "field": "storename", "values": ["CKC-VIP"]},
{"kind": "parameter", "name": "Top Parameter", "value": 10},
]
def build_spec(report_month: str, report_year: int) -> tuple[dict[str, Any], dict[str, Any]]:
capture = {
"capture_id": "top_products_shoes",
def build_capture_spec(slide_code: str, report_month: str, report_year: int) -> dict[str, Any]:
slide_spec = SLIDE_SPECS[slide_code]
return {
"capture_id": slide_spec["capture_id"],
"session": SESSION_NAME,
"hash_url": "#/views/CKTopProducts-General_16862068169500/TopProducts?:iid=1",
"inner_frame_fragment": "/views/CKTopProducts-General_16862068169500/TopProducts?",
"activate_sheet": "Top Products",
"filters": build_tableau_filters(report_month, report_year),
"params": {"Top Parameter": 10},
"raw_screenshot_name": "top-products-shoes.png",
"note": "S10 Tableau screenshot with year/month/week/daily/category/store filters applied.",
}
asset = {
"slide_code": "S10",
"slide": 10,
"shape_id": 2,
"shape_name": "Picture 1",
"asset_name": "s10_top_products_chart",
"capture_id": capture["capture_id"],
"crop": {"left": 0, "top": 120, "width": 1400, "height": 900},
"resize_to": {"width": 1039, "height": 585},
"source_view": "TopProducts",
"note": "S10 main Tableau chart area.",
"filter_actions": build_filter_actions(report_month, report_year, slide_code),
"raw_screenshot_name": slide_spec["raw_screenshot_name"],
"note": f"{slide_code} live Tableau screenshot with requested filters applied.",
}
return capture, asset
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="Generate S10 top products image assets for the VIP report.")
parser.add_argument("--config", required=True, help="Path to config.yaml")
parser.add_argument("--slides", default="S10", help="Slide codes to generate. This script currently supports S10 only.")
parser.add_argument("--report-month", help="Report month, for example 1 / 01 / 一月")
parser.add_argument("--report-year", type=int, help="Report year")
parser.add_argument("--compare-year", type=int, help="Compare year")
return parser.parse_args()
def run_cmd(args: list[str], *, cwd: Path, timeout: int = 120, check: bool = True) -> subprocess.CompletedProcess[str]:
......@@ -225,31 +267,36 @@ def build_configure_view_js(spec: dict[str, Any]) -> str:
await workbook.activateSheetAsync(config.activate_sheet);
const activeSheet = workbook.getActiveSheet();
const worksheets = activeSheet && typeof activeSheet.getWorksheets === 'function'
? activeSheet.getWorksheets()
: [activeSheet];
? Object.fromEntries(activeSheet.getWorksheets().map((worksheet) => [worksheet.getName(), worksheet]))
: {{}};
const updateType = window.tableau.FilterUpdateType.REPLACE;
const applyLog = [];
for (const filter of config.filters) {{
for (const worksheet of worksheets) {{
for (const action of config.filter_actions) {{
try {{
await worksheet.applyFilterAsync(filter.field, filter.values, updateType);
}} catch (error) {{
}}
}}
if (action.kind === 'parameter') {{
await workbook.changeParameterValueAsync(action.name, action.value);
applyLog.push({{ kind: action.kind, name: action.name, ok: true }});
continue;
}}
for (const [name, value] of Object.entries(config.params)) {{
try {{
await workbook.changeParameterValueAsync(name, value);
const worksheet = worksheets[action.worksheet];
if (!worksheet) {{
applyLog.push({{ kind: action.kind, worksheet: action.worksheet, field: action.field, ok: false, error: 'worksheet not found' }});
continue;
}}
await worksheet.applyFilterAsync(action.field, action.values, updateType);
applyLog.push({{ kind: action.kind, worksheet: action.worksheet, field: action.field, values: action.values, ok: true }});
}} catch (error) {{
applyLog.push({{ kind: action.kind, worksheet: action.worksheet || null, field: action.field || null, name: action.name || null, ok: false, error: String(error) }});
}}
}}
await sleep(20000);
await sleep(60000);
return {{
activeSheet: activeSheet.getName(),
filters: config.filters,
params: config.params,
filterActions: config.filter_actions,
applyLog,
url: location.href,
}};
}}, spec);
......@@ -287,11 +334,7 @@ def build_capture_js(inner_frame_fragment: str, screenshot_name: str) -> str:
def run_code(session: str, script_path: Path, *, cwd: Path, timeout: int = 120) -> str:
return run_playwright(
["--session", session, "run-code", "--filename", str(script_path)],
cwd=cwd,
timeout=timeout,
)
return run_playwright(["--session", session, "run-code", "--filename", str(script_path)], cwd=cwd, timeout=timeout)
def ensure_browser_session(session: str, *, cwd: Path) -> None:
......@@ -363,8 +406,7 @@ def capture_tableau_view(
build_configure_view_js(
{
"activate_sheet": capture_spec["activate_sheet"],
"filters": capture_spec["filters"],
"params": capture_spec["params"],
"filter_actions": capture_spec["filter_actions"],
}
),
)
......@@ -388,7 +430,6 @@ def capture_tableau_view(
time.sleep(3)
continue
raise RuntimeError(f"Failed to configure Tableau view: {configure_output}")
run_code(session, capture_script, cwd=workdir, timeout=180)
break
finally:
......@@ -404,10 +445,7 @@ def capture_tableau_view(
def month_date_range(report_year: int, report_month: str) -> tuple[str, str]:
month_index = resolve_month_index(report_month)
last_day = calendar.monthrange(report_year, month_index)[1]
return (
f"{report_year}-{month_index:02d}-01",
f"{report_year}-{month_index:02d}-{last_day:02d}",
)
return f"{report_year}-{month_index:02d}-01", f"{report_year}-{month_index:02d}-{last_day:02d}"
def resolve_shop(config: dict[str, Any], store_name: str) -> tuple[int, str]:
......@@ -441,12 +479,13 @@ def resolve_shop(config: dict[str, Any], store_name: str) -> tuple[int, str]:
return int(row["shopid"]), str(row["shopname"])
def fetch_s10_top_products(
def fetch_top_products(
config: dict[str, Any],
*,
store_name: str,
report_month: str,
report_year: int,
level1: str,
limit: int,
) -> dict[str, Any]:
shop_id, resolved_shop_name = resolve_shop(config, store_name)
......@@ -468,20 +507,17 @@ def fetch_s10_top_products(
SELECT
articlecode,
SUM(CASE WHEN qty > 0 THEN qty ELSE 0 END) AS qty,
SUM(CASE WHEN sales > 0 THEN sales ELSE 0 END) AS sales,
level1,
level3,
level4
SUM(CASE WHEN sales > 0 THEN sales ELSE 0 END) AS sales
FROM oms_daily_report_detail
WHERE shopid = %s
AND daily BETWEEN %s AND %s
AND level1 = 'SHOES'
GROUP BY articlecode, level1, level3, level4
AND level1 = %s
GROUP BY articlecode
HAVING qty > 0 OR sales > 0
ORDER BY qty DESC, sales DESC, articlecode ASC
LIMIT %s
""",
(shop_id, start_date, end_date, limit),
(shop_id, start_date, end_date, level1, limit),
)
rows = cursor.fetchall()
finally:
......@@ -492,16 +528,12 @@ def fetch_s10_top_products(
"articlecode": str(row["articlecode"]),
"qty": int(row["qty"] or 0),
"sales": float(row["sales"] or 0),
"level1": str(row.get("level1") or ""),
"level3": str(row.get("level3") or ""),
"level4": str(row.get("level4") or ""),
}
for row in rows
if row.get("articlecode")
]
if len(normalized_rows) < limit:
raise RuntimeError(f"S10 top products rows are insufficient: expected {limit}, got {len(normalized_rows)}")
raise RuntimeError(f"{level1} top products rows are insufficient: expected {limit}, got {len(normalized_rows)}")
return {
"store_name": store_name,
......@@ -509,17 +541,26 @@ def fetch_s10_top_products(
"shop_id": shop_id,
"start_date": start_date,
"end_date": end_date,
"level1": level1,
"rows": normalized_rows[:limit],
}
def download_image(url: str) -> Image.Image:
def download_image(url: str, *, attempts: int = 4, timeout: int = 60) -> Image.Image:
last_error: Exception | None = None
for attempt in range(1, attempts + 1):
try:
request = urllib.request.Request(url, headers={"User-Agent": "Mozilla/5.0"})
with urllib.request.urlopen(request, timeout=60) as response:
with urllib.request.urlopen(request, timeout=timeout) as response:
payload = response.read()
image = Image.open(BytesIO(payload))
image = ImageOps.exif_transpose(image).convert("RGBA")
return image
return ImageOps.exif_transpose(image).convert("RGBA")
except Exception as exc: # pragma: no cover - network retry path
last_error = exc
if attempt < attempts:
time.sleep(min(12, attempt * 3))
continue
raise RuntimeError(f"Failed to download product image from {url}") from last_error
def render_product_image(source: Image.Image, target: Path, *, canvas_width: int, canvas_height: int) -> None:
......@@ -539,33 +580,30 @@ def sanitize_file_token(value: str) -> str:
return re.sub(r"[^A-Za-z0-9_-]+", "_", value).strip("_") or "item"
def build_s10_product_assets(config: dict[str, Any], *, asset_dir: Path, report_month: str, report_year: int) -> tuple[list[dict[str, Any]], list[dict[str, Any]], dict[str, Any]]:
source_payload = fetch_s10_top_products(
config,
store_name="ckc-vip",
report_month=report_month,
report_year=report_year,
limit=10,
)
def build_product_assets(
slide_code: str,
*,
asset_dir: Path,
source_payload: dict[str, Any],
) -> tuple[list[dict[str, Any]], list[dict[str, Any]]]:
slide_spec = SLIDE_SPECS[slide_code]
operations: list[dict[str, Any]] = []
manifest_items: list[dict[str, Any]] = []
for slot, row in zip(S10_PRODUCT_SLOTS, source_payload["rows"]):
for slot, row in zip(slide_spec["product_slots"], source_payload["rows"]):
articlecode = row["articlecode"]
image_url = S10_PRODUCT_IMAGE_URL.format(articlecode=articlecode)
asset_name = f"s10_top_product_{slot['rank']:02d}_{sanitize_file_token(articlecode)}"
image_url = PRODUCT_IMAGE_URL.format(articlecode=articlecode)
asset_name = f"{slide_code.lower()}_top_product_{slot['rank']:02d}_{sanitize_file_token(articlecode)}"
asset_path = asset_dir / f"{asset_name}.png"
render_product_image(
download_image(image_url),
asset_path,
canvas_width=slot["canvas_width"],
canvas_height=slot["canvas_height"],
)
operations.append(
{
"slide": 10,
"slide": slide_spec["slide"],
"shape_id": slot["shape_id"],
"shape_name": slot["shape_name"],
"image_path": str(asset_path),
......@@ -573,15 +611,15 @@ def build_s10_product_assets(config: dict[str, Any], *, asset_dir: Path, report_
)
manifest_items.append(
{
"slide_code": "S10",
"slide": 10,
"slide_code": slide_code,
"slide": slide_spec["slide"],
"shape_id": slot["shape_id"],
"shape_name": slot["shape_name"],
"asset_name": asset_name,
"asset_path": str(asset_path),
"source_capture_id": "mysql-top-products-shoes",
"source_capture_id": f"url-top-products-{slide_code.lower()}",
"source_view": "oms_daily_report_detail",
"note": f"S10 product image for Top {slot['rank']}.",
"note": f"{slide_code} product image for Top {slot['rank']}.",
"top_rank": slot["rank"],
"articlecode": articlecode,
"qty": row["qty"],
......@@ -591,7 +629,7 @@ def build_s10_product_assets(config: dict[str, Any], *, asset_dir: Path, report_
}
)
return operations, manifest_items, source_payload
return operations, manifest_items
def main() -> None:
......@@ -600,11 +638,12 @@ def main() -> None:
config = yaml.safe_load(config_path.read_text(encoding="utf-8"))
report_month, report_year, compare_year = resolve_report_period(args, config)
requested = {item.strip().upper() for item in args.slides.split(",") if item.strip()}
if "S10" not in requested:
raise SystemExit("This generator currently supports S10 only. Pass --slides S10.")
requested = [item.strip().upper() for item in args.slides.split(",") if item.strip()]
requested = [item for item in requested if item in SLIDE_SPECS]
if not requested:
raise SystemExit("No supported slides requested. Use S09 and/or S10.")
capture_spec, asset_spec = build_spec(report_month, report_year)
capture_specs = {slide_code: build_capture_spec(slide_code, report_month, report_year) for slide_code in requested}
vip_workdir = Path(config["paths"]["workdir"]).resolve()
workspace_root = vip_workdir.parents[1]
......@@ -619,7 +658,7 @@ def main() -> None:
load_state_if_present(session, state_path, cwd=vip_workdir)
base_url = config["tableau"]["base_url"].rstrip("/")
first_target_url = f"{base_url}{capture_spec['hash_url']}"
first_target_url = f"{base_url}{capture_specs[requested[0]]['hash_url']}"
run_playwright(["--session", session, "goto", first_target_url], cwd=vip_workdir, timeout=120)
login_script = write_js(
......@@ -632,8 +671,10 @@ def main() -> None:
finally:
login_script.unlink(missing_ok=True)
raw_screenshot = capture_tableau_view(
capture_spec,
raw_screenshots: dict[str, Path] = {}
for slide_code in requested:
raw_screenshots[slide_code] = capture_tableau_view(
capture_specs[slide_code],
base_url=base_url,
session=session,
workdir=vip_workdir,
......@@ -641,42 +682,55 @@ def main() -> None:
)
save_state(session, state_path, cwd=vip_workdir)
chart_path = asset_dir / f"{asset_spec['asset_name']}.png"
crop_image(raw_screenshot, chart_path, asset_spec["crop"], resize_to=asset_spec["resize_to"])
operations = {"replace_text": [], "replace_images": []}
manifest_items: list[dict[str, Any]] = []
sql_sources: dict[str, Any] = {}
operations = {
"replace_text": [],
"replace_images": [
for slide_code in requested:
slide_spec = SLIDE_SPECS[slide_code]
raw_screenshot = raw_screenshots[slide_code]
chart_path = asset_dir / f"{slide_spec['asset_name']}.png"
crop_image(raw_screenshot, chart_path, slide_spec["crop"], resize_to=slide_spec["resize_to"])
operations["replace_images"].append(
{
"slide": 10,
"shape_id": asset_spec["shape_id"],
"shape_name": asset_spec["shape_name"],
"slide": slide_spec["slide"],
"shape_id": slide_spec["shape_id"],
"shape_name": slide_spec["shape_name"],
"image_path": str(chart_path),
}
],
}
manifest_items: list[dict[str, Any]] = [
)
manifest_items.append(
{
"slide_code": asset_spec["slide_code"],
"slide": asset_spec["slide"],
"shape_id": asset_spec["shape_id"],
"shape_name": asset_spec["shape_name"],
"asset_name": asset_spec["asset_name"],
"slide_code": slide_code,
"slide": slide_spec["slide"],
"shape_id": slide_spec["shape_id"],
"shape_name": slide_spec["shape_name"],
"asset_name": slide_spec["asset_name"],
"asset_path": str(chart_path),
"source_capture_id": capture_spec["capture_id"],
"source_view": asset_spec["source_view"],
"note": asset_spec["note"],
"source_capture_id": slide_spec["capture_id"],
"source_view": "TopProducts",
"note": slide_spec["note"],
"raw_screenshot": str(raw_screenshot),
"crop": asset_spec["crop"],
"resize_to": asset_spec["resize_to"],
"crop": slide_spec["crop"],
"resize_to": slide_spec["resize_to"],
}
]
)
product_operations, product_manifest_items, product_source = build_s10_product_assets(
top_source = fetch_top_products(
config,
asset_dir=asset_dir,
store_name="ckc-vip",
report_month=report_month,
report_year=report_year,
level1=slide_spec["level1"],
limit=len(slide_spec["product_slots"]),
)
sql_sources[slide_code] = top_source
product_operations, product_manifest_items = build_product_assets(
slide_code,
asset_dir=asset_dir,
source_payload=top_source,
)
operations["replace_images"].extend(product_operations)
manifest_items.extend(product_manifest_items)
......@@ -689,16 +743,16 @@ def main() -> None:
"source": {
"captures": [
{
"capture_id": capture_spec["capture_id"],
"hash_url": capture_spec["hash_url"],
"activate_sheet": capture_spec["activate_sheet"],
"filters": capture_spec["filters"],
"params": capture_spec["params"],
"raw_screenshot": str(raw_screenshot),
"note": capture_spec["note"],
"capture_id": capture_specs[slide_code]["capture_id"],
"hash_url": capture_specs[slide_code]["hash_url"],
"activate_sheet": capture_specs[slide_code]["activate_sheet"],
"filter_actions": capture_specs[slide_code]["filter_actions"],
"raw_screenshot": str(raw_screenshots[slide_code]),
"note": capture_specs[slide_code]["note"],
}
for slide_code in requested
],
"sql_top_products": product_source,
"url_product_sources": sql_sources,
"config_path": str(config_path),
"report_period": {
"month": report_month,
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment