From 19e8a833ba0efd1b80968afc8d09d06b88fcafda Mon Sep 17 00:00:00 2001 From: ray <1416431931@qq.com> Date: Tue, 21 Apr 2026 21:16:56 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E9=87=8F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- aliyun-sync/COMMANDS.md | 304 ++++++++++++++++++ aliyun-sync/aliyun-aps-sync/.env.example | 1 + aliyun-sync/aliyun-aps-sync/README.md | 15 +- aliyun-sync/aliyun-aps-sync/src/config.js | 2 + aliyun-sync/aliyun-aps-sync/src/index.js | 11 +- aliyun-sync/aliyun-aps-sync/src/notify.js | 4 +- aliyun-sync/aliyun-aps-sync/src/sync.js | 320 +++++++++++++++++-- aliyun-sync/aps-aliyun-sync/aps_db_sync.py | 127 ++++++-- aliyun-sync/aps-aliyun-sync/aps_scheduler.py | 75 +++-- 9 files changed, 777 insertions(+), 82 deletions(-) create mode 100644 aliyun-sync/COMMANDS.md diff --git a/aliyun-sync/COMMANDS.md b/aliyun-sync/COMMANDS.md new file mode 100644 index 0000000..d848be1 --- /dev/null +++ b/aliyun-sync/COMMANDS.md @@ -0,0 +1,304 @@ +# 阿里云 APS 同步命令清单 + +本文档整理项目中常用的爬取、同步、入库、调度、增量和运行时控制命令。 + +## 1. 项目目录 + +### 前端爬取项目 + +```powershell +cd D:\project\python\aliyun-sync\aliyun-aps-sync +``` + +### 后端入库项目 + +```powershell +cd D:\project\python\aliyun-sync\aps-aliyun-sync +``` + +## 2. 安装依赖 + +在前端爬取项目目录执行: + +```powershell +npm install +``` + +## 3. 登录 + +```powershell +npm run login +``` + +作用: + +- 打开浏览器。 +- 手动完成阿里云 / RAM 登录。 +- 自动验证“我的客户”和“账单查询”页面。 +- 保存登录态到 `.browser/` 和 `.browser/storage-state.json`。 + +## 4. 爬取 / 同步命令 + +### 全量同步全部模块 + +```powershell +npm run sync +``` + +默认同步: + +- customers +- customerDetails +- orders +- orderDetails +- bills + +### 仅爬取账单 + +```powershell +npm run bills +``` + +### 启动定时同步 + +```powershell +npm run schedule +``` + +默认 cron: + +```env +ALIYUN_APS_CRON=0 6 * * * +``` + +表示每天早上 6 点执行。 + +## 5. 增量同步 + +### 默认增量同步 + +在 `.env` 中设置: + +```env +ALIYUN_APS_FULL_SYNC=false +``` + +然后执行: + +```powershell +npm run sync +``` + +默认行为: + +- 订单只查昨天。 +- 订单详情跟随本次订单结果。 +- 账单按数据库最新消费时间增量。 + +### 指定订单 / 订单详情增量起始日期 + +临时命令方式: + +```powershell +npm run sync -- --incremental-order-start-date=2026-01-01 +``` + +`.env` 固定配置方式: + +```env +ALIYUN_APS_FULL_SYNC=false +ALIYUN_APS_INCREMENTAL_ORDER_START_DATE=2026-01-01 +``` + +然后执行: + +```powershell +npm run sync +``` + +行为: + +- 订单从指定日期开始补抓到今天。 +- 订单详情跟随这些订单抓取。 + +### 指定账单起始月份 + +在 `.env` 中设置: + +```env +ALIYUN_APS_BILL_START_MONTH=2024-01 +``` + +然后执行: + +```powershell +npm run bills +``` + +说明:当前账单爬取支持按月份开始;如需按具体日期开始,需要新增账单日期过滤参数。 + +## 6. 后端入库命令 + +进入后端入库项目目录: + +```powershell +cd D:\project\python\aliyun-sync\aps-aliyun-sync +``` + +### 全量入库 + +```powershell +python aps_db_sync.py +``` + +### 增量入库 + +```powershell +python aps_db_sync.py --incremental +``` + +### 指定同步对象入库 + +```powershell +python aps_db_sync.py --sync-target all +python aps_db_sync.py --sync-target customer +python aps_db_sync.py --sync-target order +python aps_db_sync.py --sync-target orderdetails +python aps_db_sync.py --sync-target bills +``` + +### 增量只同步账单入库 + +```powershell +python aps_db_sync.py --incremental --sync-target bills +``` + +### 查询数据库最新账单消费时间 + +```powershell +python aps_db_sync.py --latest-bill-consumption-time +``` + +## 7. 后端调度器命令 + +进入后端入库项目目录: + +```powershell +cd D:\project\python\aliyun-sync\aps-aliyun-sync +``` + +### 启动调度器 + +```powershell +python aps_scheduler.py +``` + +### 指定同步对象启动调度器 + +```powershell +python aps_scheduler.py --sync-target all +python aps_scheduler.py --sync-target customer +python aps_scheduler.py --sync-target order +python aps_scheduler.py --sync-target orderdetails +python aps_scheduler.py --sync-target bills +``` + +## 8. 常用 `.env` 示例 + +文件位置: + +```powershell +D:\project\python\aliyun-sync\aliyun-aps-sync\.env +``` + +### 全量模式 + +```env +ALIYUN_APS_BASE_URL=https://aps.aliyun.com +ALIYUN_APS_HEADLESS=false +ALIYUN_APS_TIMEZONE=Asia/Shanghai +ALIYUN_APS_CRON=0 6 * * * +ALIYUN_APS_FULL_SYNC=true +ALIYUN_APS_ORDER_START_DATE=2024-01-01 +ALIYUN_APS_INCREMENTAL_ORDER_START_DATE= +ALIYUN_APS_BILL_START_MONTH=2024-01 +ALIYUN_APS_CLOSE_BROWSER=true +ALIYUN_APS_DB_SYNC_SCRIPT=../aps-aliyun-sync/aps_db_sync.py +``` + +### 增量模式:默认只查昨天订单 + +```env +ALIYUN_APS_FULL_SYNC=false +ALIYUN_APS_INCREMENTAL_ORDER_START_DATE= +``` + +### 增量模式:指定订单起始日期 + +```env +ALIYUN_APS_FULL_SYNC=false +ALIYUN_APS_INCREMENTAL_ORDER_START_DATE=2026-01-01 +``` + +## 9. 常用组合 + +### 首次使用 + +```powershell +cd D:\project\python\aliyun-sync\aliyun-aps-sync +npm install +npm run login +npm run sync +``` + +### 只抓账单 + +```powershell +cd D:\project\python\aliyun-sync\aliyun-aps-sync +npm run login +npm run bills +``` + +### 订单 / 订单详情从指定日期补抓 + +```powershell +cd D:\project\python\aliyun-sync\aliyun-aps-sync +npm run login +npm run sync -- --incremental-order-start-date=2026-01-01 +``` + +### 抓完后只同步账单入库 + +```powershell +cd D:\project\python\aliyun-sync\aps-aliyun-sync +python aps_db_sync.py --sync-target bills +``` + +### 抓完后增量同步账单入库 + +```powershell +cd D:\project\python\aliyun-sync\aps-aliyun-sync +python aps_db_sync.py --incremental --sync-target bills +``` + +## 10. 清理登录态 + +如果登录态异常,可以删除 `.browser` 后重新登录: + +```powershell +cd D:\project\python\aliyun-sync\aliyun-aps-sync +Remove-Item -Recurse -Force .browser +npm run login +``` + +## 11. 运行时热键 + +脚本运行时可在当前终端中使用: + +| 按键 | 功能 | +| --- | --- | +| F7 | 暂停 | +| F8 | 继续 | +| F9 | 终止 | + +注意:这是当前终端进程内热键,不是系统级全局热键。 diff --git a/aliyun-sync/aliyun-aps-sync/.env.example b/aliyun-sync/aliyun-aps-sync/.env.example index a693edf..86e8c54 100644 --- a/aliyun-sync/aliyun-aps-sync/.env.example +++ b/aliyun-sync/aliyun-aps-sync/.env.example @@ -3,4 +3,5 @@ ALIYUN_APS_HEADLESS=false ALIYUN_APS_TIMEZONE=Asia/Shanghai ALIYUN_APS_CRON=0 6 * * * ALIYUN_APS_ORDER_START_DATE=2024-01-01 +ALIYUN_APS_INCREMENTAL_ORDER_START_DATE= ALIYUN_APS_BILL_START_MONTH=2024-01 diff --git a/aliyun-sync/aliyun-aps-sync/README.md b/aliyun-sync/aliyun-aps-sync/README.md index aa96930..66dd376 100644 --- a/aliyun-sync/aliyun-aps-sync/README.md +++ b/aliyun-sync/aliyun-aps-sync/README.md @@ -23,9 +23,10 @@ cp .env.example .env ## 配置 -`.env` 里最重要的两个时间范围: +`.env` 里最重要的时间范围: - `ALIYUN_APS_ORDER_START_DATE`: 订单查询的起始日期,会按月滚动抓取直到今天。 +- `ALIYUN_APS_INCREMENTAL_ORDER_START_DATE`: 订单/订单详情在增量模式下的指定起始日期;留空时仍默认只查昨天。 - `ALIYUN_APS_BILL_START_MONTH`: 账单查询的起始佣金月份,会按月滚动抓取直到当前月。 ## 使用 @@ -42,6 +43,18 @@ npm run login npm run sync ``` +如果需要在增量模式下让订单和订单详情从指定日期开始补抓,可以配置: + +```bash +ALIYUN_APS_INCREMENTAL_ORDER_START_DATE=2026-01-01 +``` + +或临时执行: + +```bash +npm run sync -- --incremental-order-start-date=2026-01-01 +``` + 3. 常驻定时同步 ```bash diff --git a/aliyun-sync/aliyun-aps-sync/src/config.js b/aliyun-sync/aliyun-aps-sync/src/config.js index 2d642e7..10b5177 100644 --- a/aliyun-sync/aliyun-aps-sync/src/config.js +++ b/aliyun-sync/aliyun-aps-sync/src/config.js @@ -23,6 +23,7 @@ export const config = { timezone: process.env.ALIYUN_APS_TIMEZONE || 'Asia/Shanghai', cron: process.env.ALIYUN_APS_CRON || '0 6 * * *', orderStartDate: process.env.ALIYUN_APS_ORDER_START_DATE || '2024-01-01', + incrementalOrderStartDate: process.env.ALIYUN_APS_INCREMENTAL_ORDER_START_DATE || '', billStartMonth: process.env.ALIYUN_APS_BILL_START_MONTH || '2024-01', smtp: { host: process.env.ALIYUN_APS_SMTP_HOST || 'smtp.qq.com', @@ -38,6 +39,7 @@ export const config = { resumeBillPage: Math.max(1, Number.parseInt(process.env.ALIYUN_APS_RESUME_BILL_PAGE || '1', 10) || 1), dbSyncScript: process.env.ALIYUN_APS_DB_SYNC_SCRIPT || '../aps-aliyun-sync/aps_db_sync.py', userDataDir: ensureDir(path.join(rootDir, '.browser')), + storageStateFile: path.join(rootDir, '.browser', 'storage-state.json'), dataDir: ensureDir(path.join(rootDir, 'data')), downloadDir: ensureDir(path.join(rootDir, 'downloads')), }; diff --git a/aliyun-sync/aliyun-aps-sync/src/index.js b/aliyun-sync/aliyun-aps-sync/src/index.js index f98193d..047a77c 100644 --- a/aliyun-sync/aliyun-aps-sync/src/index.js +++ b/aliyun-sync/aliyun-aps-sync/src/index.js @@ -1,6 +1,13 @@ -import { login, scheduleSync, syncAll, syncBillsOnly } from './sync.js'; +const args = process.argv.slice(2); +const command = args[0] || 'sync'; -const command = process.argv[2] || 'sync'; +for (const arg of args.slice(1)) { + if (arg.startsWith('--incremental-order-start-date=')) { + process.env.ALIYUN_APS_INCREMENTAL_ORDER_START_DATE = arg.split('=').slice(1).join('='); + } +} + +const { login, scheduleSync, syncAll, syncBillsOnly } = await import('./sync.js'); if (command === 'login') { await login(); diff --git a/aliyun-sync/aliyun-aps-sync/src/notify.js b/aliyun-sync/aliyun-aps-sync/src/notify.js index 79a1a37..b1ba050 100644 --- a/aliyun-sync/aliyun-aps-sync/src/notify.js +++ b/aliyun-sync/aliyun-aps-sync/src/notify.js @@ -4,7 +4,7 @@ import { config } from './config.js'; let lastSentAt = 0; const ONE_HOUR_MS = 60 * 60 * 1000; -export async function sendLoginAlert() { +export async function sendLoginAlert(loginUrl = '') { const now = Date.now(); if (now - lastSentAt < ONE_HOUR_MS) { console.log('[通知] 登录提醒1小时内已发送过,跳过重复发送'); @@ -33,7 +33,7 @@ export async function sendLoginAlert() { }, }); - const url = `${baseUrl}/#/signin`; + const url = loginUrl || `${baseUrl}/#/signin`; const timestamp = new Date().toISOString(); const subject = '[APS同步] 登录态已过期,请手动登录'; const text = [ diff --git a/aliyun-sync/aliyun-aps-sync/src/sync.js b/aliyun-sync/aliyun-aps-sync/src/sync.js index 5a82c29..30e4cfd 100644 --- a/aliyun-sync/aliyun-aps-sync/src/sync.js +++ b/aliyun-sync/aliyun-aps-sync/src/sync.js @@ -1,6 +1,8 @@ import { chromium } from 'playwright'; import cron from 'node-cron'; +import fs from 'node:fs'; import path from 'node:path'; +import readline from 'node:readline'; import { execSync } from 'node:child_process'; import { config, datasets } from './config.js'; import { sendLoginAlert } from './notify.js'; @@ -17,6 +19,106 @@ import { const sleep = (ms) => new Promise((resolve) => setTimeout(resolve, ms)); let _context = null; +let _runtimeController = null; + +const AUTH_PAGE_KEYWORDS = [ + 'RAM 用户登录', + '主账号登录', + '钉钉扫码登录', + '用户名', + '下一步', + '登录并使用 RAM', +]; + +async function closeContextIfNeeded() { + if (!_context) return; + await _context.close(); + _context = null; +} + +function getRuntimeController() { + if (_runtimeController) return _runtimeController; + + let paused = false; + let terminated = false; + let keypressBound = false; + + const onKeypress = (_str, key = {}) => { + if (key.name === 'f7') { + if (!paused) { + paused = true; + console.log('[控制] 已暂停(F7)。按 F8 继续,按 F9 终止。'); + } + return; + } + if (key.name === 'f8') { + if (paused) { + paused = false; + console.log('[控制] 已继续(F8)。'); + } + return; + } + if (key.name === 'f9') { + terminated = true; + paused = false; + console.log('[控制] 已请求终止(F9),将在安全检查点停止。'); + } + }; + + const bind = () => { + if (keypressBound || !process.stdin.isTTY) return; + readline.emitKeypressEvents(process.stdin); + if (typeof process.stdin.setRawMode === 'function') { + process.stdin.setRawMode(true); + } + process.stdin.resume(); + process.stdin.on('keypress', onKeypress); + keypressBound = true; + console.log('[控制] 热键已启用:F7 暂停 / F8 继续 / F9 终止'); + }; + + const unbind = () => { + if (!keypressBound) return; + process.stdin.off('keypress', onKeypress); + if (process.stdin.isTTY && typeof process.stdin.setRawMode === 'function') { + process.stdin.setRawMode(false); + } + keypressBound = false; + }; + + const waitIfPaused = async (label = '任务') => { + if (terminated) { + throw new Error(`[控制] 已终止:${label}`); + } + while (paused) { + await sleep(300); + if (terminated) { + throw new Error(`[控制] 已终止:${label}`); + } + } + }; + + const throwIfTerminated = (label = '任务') => { + if (terminated) { + throw new Error(`[控制] 已终止:${label}`); + } + }; + + _runtimeController = { + bind, + unbind, + waitIfPaused, + throwIfTerminated, + }; + + return _runtimeController; +} + +async function runtimeCheckpoint(label) { + const controller = getRuntimeController(); + controller.throwIfTerminated(label); + await controller.waitIfPaused(label); +} async function getContext() { if (_context) return _context; @@ -26,24 +128,103 @@ async function getContext() { acceptDownloads: true, downloadsPath: config.downloadDir, }); + await restoreStorageState(_context); return _context; } +async function restoreStorageState(context) { + if (!fs.existsSync(config.storageStateFile)) { + return; + } + + try { + const state = JSON.parse(fs.readFileSync(config.storageStateFile, 'utf-8')); + if (Array.isArray(state.cookies) && state.cookies.length > 0) { + await context.addCookies(state.cookies); + console.log(`[storageState] 已恢复 ${state.cookies.length} 个 cookie`); + } + } catch (error) { + console.warn(`[storageState] 恢复失败,继续使用 .browser profile: ${error.message}`); + } +} + +async function saveStorageState(context) { + await context.storageState({ path: config.storageStateFile }); + console.log(`[storageState] 已保存登录态快照: ${config.storageStateFile}`); +} + +async function getPageBodyPreview(page) { + return page + .evaluate(() => document.body?.innerText?.substring(0, 500) || '(空)') + .catch(() => '(无法获取)'); +} + +function isAuthUrl(url) { + return /account\.aliyun\.com|signin\.aliyun\.com/.test(url) + || url.includes('login.htm') + || url.includes('/#/signin'); +} + +function hasAuthKeywords(text) { + return AUTH_PAGE_KEYWORDS.some((keyword) => text.includes(keyword)); +} + +async function detectAuthRedirect(page) { + const currentUrl = page.url(); + const bodyText = await getPageBodyPreview(page); + return { + currentUrl, + bodyText, + isAuthPage: isAuthUrl(currentUrl) || hasAuthKeywords(bodyText), + }; +} + +async function ensureDatasetAccessible(page, dataset, timeout = 120000, options = {}) { + await page.goto(dataset.url, { waitUntil: 'domcontentloaded' }); + await waitUntilReady(page, dataset.heading, timeout, options); +} + export async function login() { + const runtimeController = getRuntimeController(); + runtimeController.bind(); const context = await getContext(); + const cleanupAndExit = async (signal) => { + console.log(`[login] 收到 ${signal},正在保存登录态并关闭浏览器...`); + await closeContextIfNeeded(); + process.exit(130); + }; - const page = context.pages()[0] || (await context.newPage()); - await page.goto(datasets.customers.url, { waitUntil: 'domcontentloaded' }); - console.log('请在打开的浏览器里完成阿里云伙伴中心登录,然后回到终端按 Ctrl+C 结束。'); - await waitUntilReady(page, datasets.customers.heading, 10 * 60 * 1000); - console.log('登录态已写入 .browser 目录,后续可直接执行 npm run sync。'); + const onSigint = () => { + void cleanupAndExit('SIGINT'); + }; + const onSigterm = () => { + void cleanupAndExit('SIGTERM'); + }; - // 必须正常关闭 context,否则登录态不会持久化到磁盘 - await context.close(); - _context = null; + process.once('SIGINT', onSigint); + process.once('SIGTERM', onSigterm); + + try { + const page = context.pages()[0] || (await context.newPage()); + await page.goto(datasets.customers.url, { waitUntil: 'domcontentloaded' }); + console.log('请在打开的浏览器里完成阿里云伙伴中心登录。检测到进入“我的客户”和“账单查询”页面后,脚本会自动保存登录态并关闭浏览器。'); + await waitUntilReady(page, datasets.customers.heading, 10 * 60 * 1000, { allowInteractiveAuth: true }); + console.log('[login] 我的客户页验证通过,继续验证账单页登录态...'); + await ensureDatasetAccessible(page, datasets.bills, 60 * 1000, { allowInteractiveAuth: true }); + await sleep(1000); + await saveStorageState(context); + console.log('登录态已写入 .browser 目录,且已验证“我的客户”和“账单查询”页面可访问,后续可直接执行 npm run sync 或 npm run bills。'); + } finally { + process.off('SIGINT', onSigint); + process.off('SIGTERM', onSigterm); + await closeContextIfNeeded(); + runtimeController.unbind(); + } } export async function syncAll() { + const runtimeController = getRuntimeController(); + runtimeController.bind(); const context = await getContext(); try { @@ -67,11 +248,36 @@ export async function syncAll() { return summary; } finally { if (config.closeBrowser) { - await context.close(); - _context = null; + await closeContextIfNeeded(); } else { console.log('浏览器保持运行'); } + runtimeController.unbind(); + } +} + +export async function syncBillsOnly() { + const runtimeController = getRuntimeController(); + runtimeController.bind(); + const context = await getContext(); + + try { + const summary = { startedAt: new Date().toISOString(), datasets: {} }; + const page = context.pages()[0] || (await context.newPage()); + + summary.datasets.bills = await syncBills(page); + summary.finishedAt = new Date().toISOString(); + + const stamp = nowStamp(); + saveRunSummary(stamp, summary); + return summary; + } finally { + if (config.closeBrowser) { + await closeContextIfNeeded(); + } else { + console.log('浏览器保持运行'); + } + runtimeController.unbind(); } } @@ -106,6 +312,7 @@ export async function scheduleSync() { } async function syncCustomers(page) { + await runtimeCheckpoint('同步客户'); const dataset = datasets.customers; await page.goto(dataset.url, { waitUntil: 'domcontentloaded' }); await waitUntilReady(page, dataset.heading); @@ -115,6 +322,7 @@ async function syncCustomers(page) { } async function syncCustomerDetails(page) { + await runtimeCheckpoint('同步客户详情'); const dataset = datasets.customerDetails; const customersState = loadCurrentState('customers'); const allAccountIds = collectValidAccountIds(customersState.records || []); @@ -130,6 +338,7 @@ async function syncCustomerDetails(page) { 'https://aps.aliyun.com/?spm=5176.12818093.top-nav.ditem-fx.785716d0LKDpKT#/detail/my_customer/~/customer/'; for (let index = 0; index < allAccountIds.length; index += 1) { + await runtimeCheckpoint(`客户详情 ${index + 1}/${allAccountIds.length}`); const accountId = allAccountIds[index]; console.log(`[客户详情] ${index + 1}/${allAccountIds.length} accountId=${accountId}`); @@ -158,23 +367,20 @@ async function syncCustomerDetails(page) { } async function syncOrders(page) { + await runtimeCheckpoint('同步订单'); const dataset = datasets.orders; let windows; if (config.fullSync) { windows = buildMonthlyDateWindows(config.orderStartDate); } else { - // 增量模式:只查前一天 - const yesterday = new Date(); - yesterday.setDate(yesterday.getDate() - 1); - const dateStr = formatDate(yesterday); - windows = [{ windowStart: dateStr, windowEnd: dateStr, start: dateStr, end: dateStr }]; - console.log(`[增量模式] 订单仅查询: ${dateStr}`); + windows = buildIncrementalOrderWindows(); } const allRecords = []; for (const window of windows) { + await runtimeCheckpoint(`订单窗口 ${window.start} ~ ${window.end}`); await page.goto(dataset.url, { waitUntil: 'domcontentloaded' }); await waitUntilReady(page, dataset.heading); await setDateRange(page, window.start, window.end); @@ -187,7 +393,40 @@ async function syncOrders(page) { return persistDataset(dataset, dedupeByHash(allRecords), {}); } +function buildIncrementalOrderWindows() { + const configuredStartDate = normalizeConfiguredDate(config.incrementalOrderStartDate); + if (configuredStartDate) { + const windows = buildMonthlyDateWindows(configuredStartDate); + console.log(`[增量模式] 订单从指定日期开始查询: ${configuredStartDate}`); + return windows; + } + + const yesterday = new Date(); + yesterday.setDate(yesterday.getDate() - 1); + const dateStr = formatDate(yesterday); + console.log(`[增量模式] 订单仅查询: ${dateStr}`); + return [{ windowStart: dateStr, windowEnd: dateStr, start: dateStr, end: dateStr }]; +} + +function normalizeConfiguredDate(value) { + const normalized = String(value || '').trim(); + if (!normalized) { + return ''; + } + if (!/^\d{4}-\d{2}-\d{2}$/.test(normalized)) { + throw new Error(`ALIYUN_APS_INCREMENTAL_ORDER_START_DATE 格式无效: ${normalized},期望 YYYY-MM-DD`); + } + + const parsed = new Date(`${normalized}T00:00:00+08:00`); + if (Number.isNaN(parsed.getTime())) { + throw new Error(`ALIYUN_APS_INCREMENTAL_ORDER_START_DATE 不是有效日期: ${normalized}`); + } + + return normalized; +} + async function syncBills(page) { + await runtimeCheckpoint('同步账单'); const dataset = datasets.bills; let months; let latestConsumptionDate = null; @@ -205,6 +444,7 @@ async function syncBills(page) { const allRecords = []; for (const month of months) { + await runtimeCheckpoint(`账单月份 ${month}`); await page.goto(dataset.url, { waitUntil: 'domcontentloaded' }); await waitUntilReady(page, dataset.heading); await setMonthValue(page, month); @@ -247,6 +487,7 @@ function isAfterLatestConsumptionDate(record, latestConsumptionDate) { } async function syncOrderDetails(page, cachedOrderIds) { + await runtimeCheckpoint('同步订单详情'); const dataset = datasets.orderDetails; // 使用传入的 orderId 列表(在 syncOrders 覆盖 orders.json 之前缓存的) @@ -262,6 +503,7 @@ async function syncOrderDetails(page, cachedOrderIds) { const detailBaseUrl = 'https://aps.aliyun.com/?spm=5176.12818093.top-nav.ditem-fx.785716d0LKDpKT#/detail/order/~/costCenter/order/detail/'; for (let index = 0; index < allOrderIds.length; index += 1) { + await runtimeCheckpoint(`订单详情 ${index + 1}/${allOrderIds.length}`); const orderId = allOrderIds[index]; console.log(`[订单详情] ${index + 1}/${allOrderIds.length} orderId=${orderId}`); @@ -304,11 +546,30 @@ function persistDataset(dataset, records, context) { }; } -async function waitUntilReady(page, heading, timeout = 120000) { +async function waitUntilReady(page, heading, timeout = 120000, options = {}) { + await runtimeCheckpoint(`等待页面 ${heading}`); + const { allowInteractiveAuth = false } = options; await page.waitForLoadState('domcontentloaded'); console.log(`[waitUntilReady] 当前URL: ${page.url()}`); console.log(`[waitUntilReady] 等待页面出现: "${heading}"`); + const initialState = await detectAuthRedirect(page); + if (initialState.isAuthPage) { + console.error(`[waitUntilReady] 检测到登录页/鉴权页: ${initialState.currentUrl}`); + console.error(`[waitUntilReady] 页面内容前500字: ${initialState.bodyText}`); + if (!allowInteractiveAuth && isAuthUrl(initialState.currentUrl)) { + try { + await sendLoginAlert(initialState.currentUrl); + } catch (notifyErr) { + console.error('[通知] 发送登录提醒失败:', notifyErr.message); + } + } + if (!allowInteractiveAuth) { + throw new Error(`当前页面仍处于登录/鉴权页,无法进入「${heading}」。请重新执行 npm run login,并确认该账号对该页面有访问权限。`); + } + console.log(`[waitUntilReady] 允许交互式登录,等待用户完成认证后进入「${heading}」...`); + } + try { await page.waitForFunction( (text) => document.body && document.body.innerText.includes(text), @@ -317,22 +578,26 @@ async function waitUntilReady(page, heading, timeout = 120000) { ); } catch (err) { // 超时时打印诊断信息 - const currentUrl = page.url(); - const bodyText = await page.evaluate(() => document.body?.innerText?.substring(0, 500) || '(空)').catch(() => '(无法获取)'); + const { currentUrl, bodyText, isAuthPage } = await detectAuthRedirect(page); console.error(`[waitUntilReady] 超时!当前URL: ${currentUrl}`); console.error(`[waitUntilReady] 页面内容前500字: ${bodyText}`); - if (currentUrl.includes('signin')) { + if (isAuthPage && !allowInteractiveAuth) { try { - await sendLoginAlert(); + await sendLoginAlert(currentUrl); } catch (notifyErr) { console.error('[通知] 发送登录提醒失败:', notifyErr.message); } + throw new Error(`当前页面停留在登录/鉴权页,未能进入「${heading}」。请重新执行 npm run login,并确认该账号对该页面有访问权限。`); + } + if (isAuthPage && allowInteractiveAuth) { + throw new Error(`交互式登录超时,仍未进入「${heading}」。请确认已在浏览器中完成 RAM/阿里云登录,并且当前账号有访问该页面的权限。`); } throw err; } - if ((await page.locator('text=登录').count()) > 0 && page.url().includes('login')) { - throw new Error('当前未登录,请先执行 npm run login'); + const finalState = await detectAuthRedirect(page); + if (finalState.isAuthPage && !allowInteractiveAuth) { + throw new Error(`当前页面仍处于登录/鉴权页,未成功进入「${heading}」。请重新执行 npm run login,并确认该账号对该页面有访问权限。`); } await sleep(1500); } @@ -342,6 +607,7 @@ async function scrapePagedTable(page, dataset, context) { const visited = new Set(); while (true) { + await runtimeCheckpoint(`抓取 ${dataset.name} 分页`); await waitForTableRows(page); const pageData = await extractTable(page); const pageNum = await currentPageNumber(page); @@ -403,6 +669,7 @@ async function extractTable(page) { } async function waitForTableRows(page) { + await runtimeCheckpoint('等待表格加载'); await page.waitForFunction(() => document.querySelectorAll('table tbody tr').length > 0, null, { timeout: 120000 }); await sleep(800); } @@ -414,6 +681,7 @@ async function currentPageNumber(page) { } async function gotoNextPage(page) { + await runtimeCheckpoint('翻页'); const before = await currentPageNumber(page); // 用 Playwright locator 定位"下一页"按钮 @@ -439,6 +707,7 @@ async function gotoNextPage(page) { } async function trySetPageSize(page, pageSize) { + await runtimeCheckpoint(`设置每页 ${pageSize}`); const input = page.locator('input[aria-label="请选择每页显示几条"]').first(); if ((await input.count()) === 0) return; await input.click().catch(() => null); @@ -453,6 +722,7 @@ async function trySetPageSize(page, pageSize) { } async function setDateRange(page, start, end) { + await runtimeCheckpoint(`设置订单日期 ${start} ~ ${end}`); console.log(`[订单日期] 设置: ${start} ~ ${end}`); await _fillDateRange(page, start, end); @@ -474,6 +744,7 @@ async function setDateRange(page, start, end) { } async function _fillDateRange(page, start, end, startFirst = false) { + await runtimeCheckpoint('填写订单日期'); const trigger = page.locator('input[placeholder="结束日期"]'); await trigger.click(); await sleep(1000); @@ -520,6 +791,7 @@ async function _fillDateRange(page, start, end, startFirst = false) { } async function setMonthValue(page, month) { + await runtimeCheckpoint(`设置账单月份 ${month}`); // 先尝试按 inputValue 匹配 YYYY-MM 格式 const inputs = page.locator('input'); const total = await inputs.count(); @@ -568,6 +840,7 @@ async function setMonthValue(page, month) { * 即使面板弹出,快速键入 + Tab 也能在面板滚动前完成提交并关闭。 */ async function typeIntoDateInput(locator, value, page) { + await runtimeCheckpoint(`填写日期输入 ${value}`); // 移除 readonly await locator.evaluate((node) => node.removeAttribute('readonly')); @@ -599,6 +872,7 @@ async function typeIntoDateInput(locator, value, page) { } async function clickQuery(page) { + await runtimeCheckpoint('点击查询'); const button = page.locator('button:has-text("查询")').first(); await button.click(); await sleep(1800); diff --git a/aliyun-sync/aps-aliyun-sync/aps_db_sync.py b/aliyun-sync/aps-aliyun-sync/aps_db_sync.py index 3588897..e15b2e8 100644 --- a/aliyun-sync/aps-aliyun-sync/aps_db_sync.py +++ b/aliyun-sync/aps-aliyun-sync/aps_db_sync.py @@ -35,6 +35,7 @@ from pymysql.cursors import DictCursor JsonDict = dict[str, object] JsonList = list[JsonDict] StatsDict = dict[str, int] +SyncTarget = str class DbConfig(TypedDict): @@ -48,7 +49,7 @@ class DbConfig(TypedDict): # --------------------------------------------------------------------------- # Config # --------------------------------------------------------------------------- -DB_CONFIG = { +DB_CONFIG: DbConfig = { "host": "172.27.137.236", "port": 3306, "user": "ray", @@ -62,6 +63,19 @@ LOG_FORMAT = "%(asctime)s [%(levelname)s] %(message)s" logging.basicConfig(level=logging.INFO, format=LOG_FORMAT) logger = logging.getLogger("aps_sync") +SYNC_TARGET_ALL = "all" +SYNC_TARGET_CUSTOMER = "customer" +SYNC_TARGET_ORDER = "order" +SYNC_TARGET_ORDERDETAILS = "orderdetails" +SYNC_TARGET_BILLS = "bills" +VALID_SYNC_TARGETS = { + SYNC_TARGET_ALL, + SYNC_TARGET_CUSTOMER, + SYNC_TARGET_ORDER, + SYNC_TARGET_ORDERDETAILS, + SYNC_TARGET_BILLS, +} + # --------------------------------------------------------------------------- # Schema DDL # --------------------------------------------------------------------------- @@ -382,6 +396,18 @@ def is_valid_order_id(order_id: str | None) -> bool: return bool(order_id and order_id.isdigit()) +def normalize_sync_target(sync_target: str | None) -> SyncTarget: + if sync_target is None: + return SYNC_TARGET_ALL + normalized = sync_target.strip().lower() + if not normalized: + return SYNC_TARGET_ALL + if normalized not in VALID_SYNC_TARGETS: + valid_targets = ", ".join(sorted(VALID_SYNC_TARGETS)) + raise ValueError(f"Invalid sync target: {sync_target}. Expected one of: {valid_targets}") + return normalized + + # --------------------------------------------------------------------------- # Sync logic # --------------------------------------------------------------------------- @@ -465,7 +491,7 @@ class APSSyncer: return [cast(JsonDict, record) for record in data_list if isinstance(record, dict)] return [] - def resolve_data_files(self, data_dir: str) -> tuple[Path, Path, Path, Path, Path]: + def resolve_data_files(self, data_dir: str, sync_target: SyncTarget = SYNC_TARGET_ALL) -> tuple[Path, Path, Path, Path, Path]: root = Path(data_dir) if not root.exists() or not root.is_dir(): raise FileNotFoundError(f"Data directory not found: {root}") @@ -475,11 +501,33 @@ class APSSyncer: order_details_file = root / "orderDetails.json" bills_file = root / "bills.json" customer_details_file = root / "customerDetails.json" - for fp in (customers_file, orders_file, order_details_file, bills_file): + + required_files_by_target = { + SYNC_TARGET_ALL: (customers_file, orders_file, order_details_file, bills_file), + SYNC_TARGET_CUSTOMER: (customers_file,), + SYNC_TARGET_ORDER: (orders_file,), + SYNC_TARGET_ORDERDETAILS: (order_details_file,), + SYNC_TARGET_BILLS: (bills_file,), + } + for fp in required_files_by_target[sync_target]: if not fp.exists(): raise FileNotFoundError(f"Required JSON file not found: {fp}") return customers_file, orders_file, order_details_file, bills_file, customer_details_file + def fetch_login_to_account_map(self) -> dict[str, str]: + conn = self._require_conn() + with conn.cursor() as cur: + _ = cur.execute("SELECT login_name, account_id FROM aps_customer") + rows = cur.fetchall() + + login_to_account: dict[str, str] = {} + for row in rows: + login_name = safe_str(row.get("login_name"), 128) + account_id = safe_str(row.get("account_id"), 32) + if login_name and account_id: + login_to_account[login_name] = account_id + return login_to_account + @staticmethod def normalize_customer_record(raw: JsonDict) -> JsonDict | None: account_id = safe_str(raw.get("accountId"), 32) @@ -906,20 +954,38 @@ class APSSyncer: self.stats["bills"] += 1 # ---- Main sync entry ---- - def sync_from_json(self, data_dir: str, incremental: bool = False) -> StatsDict: + def sync_from_json(self, data_dir: str, incremental: bool = False, sync_target: str = SYNC_TARGET_ALL) -> StatsDict: start = datetime.now() - customers_file, orders_file, order_details_file, bills_file, customer_details_file = self.resolve_data_files(data_dir) - logger.info("Loading source files from %s%s", data_dir, " (增量模式)" if incremental else "") + normalized_sync_target = normalize_sync_target(sync_target) + customers_file, orders_file, order_details_file, bills_file, customer_details_file = self.resolve_data_files( + data_dir, + normalized_sync_target, + ) + logger.info( + "Loading source files from %s%s%s", + data_dir, + " (增量模式)" if incremental else "", + "" if normalized_sync_target == SYNC_TARGET_ALL else f" (sync_target={normalized_sync_target})", + ) + + raw_customers: JsonList = [] + raw_orders: JsonList = [] + raw_order_details: JsonList = [] + raw_bills: JsonList = [] + if normalized_sync_target in {SYNC_TARGET_ALL, SYNC_TARGET_CUSTOMER}: + raw_customers = self.load_json_records(customers_file) + if normalized_sync_target in {SYNC_TARGET_ALL, SYNC_TARGET_ORDER}: + raw_orders = self.load_json_records(orders_file) + if normalized_sync_target in {SYNC_TARGET_ALL, SYNC_TARGET_ORDERDETAILS}: + raw_order_details = self.load_json_records(order_details_file) + if normalized_sync_target in {SYNC_TARGET_ALL, SYNC_TARGET_BILLS}: + raw_bills = self.load_json_records(bills_file) - raw_customers = self.load_json_records(customers_file) - raw_orders = self.load_json_records(orders_file) - raw_order_details = self.load_json_records(order_details_file) - raw_bills = self.load_json_records(bills_file) raw_customer_details: JsonList = [] try: - if customer_details_file.exists(): + if normalized_sync_target in {SYNC_TARGET_ALL, SYNC_TARGET_CUSTOMER} and customer_details_file.exists(): raw_customer_details = self.load_json_records(customer_details_file) - else: + elif normalized_sync_target in {SYNC_TARGET_ALL, SYNC_TARGET_CUSTOMER}: logger.info("Optional file missing, skip customer details: %s", customer_details_file) except Exception as e: logger.warning("Failed to load optional customer details file %s: %s", customer_details_file, e) @@ -941,27 +1007,33 @@ class APSSyncer: customers: JsonList = [] skipped_customers = 0 - for raw in raw_customers: - c = self.normalize_customer_record(raw) - if not c: - skipped_customers += 1 - continue - customers.append(c) - self.upsert_customer(c) - self.insert_snapshot(c, billing_month, captured_at) + if normalized_sync_target in {SYNC_TARGET_ALL, SYNC_TARGET_CUSTOMER}: + for raw in raw_customers: + c = self.normalize_customer_record(raw) + if not c: + skipped_customers += 1 + continue + customers.append(c) + self.upsert_customer(c) + self.insert_snapshot(c, billing_month, captured_at) if skipped_customers: logger.info("Skipped %d invalid customer rows", skipped_customers) login_to_account = build_login_to_account_map(customers) + if not login_to_account and normalized_sync_target in {SYNC_TARGET_ORDER, SYNC_TARGET_BILLS}: + login_to_account = self.fetch_login_to_account_map() logger.info("Resolved %d customer login_name -> account_id mappings", len(login_to_account)) if raw_customer_details: self.update_customer_details(raw_customer_details, billing_month) - self.upsert_orders(raw_orders, login_to_account) - self.upsert_order_details(raw_order_details) - self.sync_bills(raw_bills, login_to_account, incremental=incremental) + if normalized_sync_target in {SYNC_TARGET_ALL, SYNC_TARGET_ORDER}: + self.upsert_orders(raw_orders, login_to_account) + if normalized_sync_target in {SYNC_TARGET_ALL, SYNC_TARGET_ORDERDETAILS}: + self.upsert_order_details(raw_order_details) + if normalized_sync_target in {SYNC_TARGET_ALL, SYNC_TARGET_BILLS}: + self.sync_bills(raw_bills, login_to_account, incremental=incremental) # Log sync duration = (datetime.now() - start).total_seconds() @@ -1051,10 +1123,17 @@ def main() -> None: default=False, help="仅查询 aps_bill 中最新的 consumption_time 并输出", ) + _ = parser.add_argument( + "--sync-target", + choices=sorted(VALID_SYNC_TARGETS), + default=SYNC_TARGET_ALL, + help="选择同步对象: all/customer/order/orderdetails/bills", + ) args = parser.parse_args() data_dir = cast(str, args.dir) incremental = cast(bool, args.incremental) latest_bill_consumption_time = cast(bool, args.latest_bill_consumption_time) + sync_target = cast(str, args.sync_target) syncer = APSSyncer(db_config=DB_CONFIG) if latest_bill_consumption_time: @@ -1066,7 +1145,7 @@ def main() -> None: return finally: syncer.close() - _ = syncer.sync_from_json(data_dir, incremental=incremental) + _ = syncer.sync_from_json(data_dir, incremental=incremental, sync_target=sync_target) if __name__ == "__main__": diff --git a/aliyun-sync/aps-aliyun-sync/aps_scheduler.py b/aliyun-sync/aps-aliyun-sync/aps_scheduler.py index 8aafbaf..61f2b82 100644 --- a/aliyun-sync/aps-aliyun-sync/aps_scheduler.py +++ b/aliyun-sync/aps-aliyun-sync/aps_scheduler.py @@ -12,33 +12,46 @@ import sys import signal import argparse import logging +import importlib from pathlib import Path from datetime import datetime +from typing import Any, cast -from aps_db_sync import APSSyncer, DB_CONFIG, JSON_DIR +try: + from . import aps_db_sync as aps_db_sync_module +except ImportError: + aps_db_sync_module = importlib.import_module("aps_db_sync") + +APSSyncer = aps_db_sync_module.APSSyncer +db_config_default = cast(dict[str, str | int], aps_db_sync_module.DB_CONFIG) +json_dir = cast(Path, aps_db_sync_module.JSON_DIR) +default_sync_target = cast(str, aps_db_sync_module.SYNC_TARGET_ALL) +valid_sync_targets = cast(set[str], aps_db_sync_module.VALID_SYNC_TARGETS) LOG_FORMAT = "%(asctime)s [%(levelname)s] %(message)s" logging.basicConfig(level=logging.INFO, format=LOG_FORMAT) logger = logging.getLogger("aps_scheduler") -WATCH_INTERVAL_SECONDS = 30 -PROCESSED_MARKER_DIR = JSON_DIR / ".aps_sync_processed" +DEFAULT_WATCH_INTERVAL_SECONDS = 30 +watch_interval_seconds = DEFAULT_WATCH_INTERVAL_SECONDS +PROCESSED_MARKER_DIR = json_dir / ".aps_sync_processed" def _update_watch_interval(value: int): - global WATCH_INTERVAL_SECONDS - WATCH_INTERVAL_SECONDS = value + global watch_interval_seconds + watch_interval_seconds = value class SyncScheduler: - def __init__(self, db_config: dict = None): - self.db_config = db_config or DB_CONFIG - self.running = True + def __init__(self, db_config: dict[str, str | int] | None = None, sync_target: str = default_sync_target): + self.db_config: dict[str, str | int] = db_config or db_config_default + self.sync_target: str = sync_target + self.running: bool = True PROCESSED_MARKER_DIR.mkdir(exist_ok=True) - signal.signal(signal.SIGINT, self._shutdown) - signal.signal(signal.SIGTERM, self._shutdown) + _ = signal.signal(signal.SIGINT, self._shutdown) + _ = signal.signal(signal.SIGTERM, self._shutdown) - def _shutdown(self, signum, frame): + def _shutdown(self, signum: int, frame: object | None): logger.info("Shutdown signal received, stopping...") self.running = False @@ -55,18 +68,18 @@ class SyncScheduler: def _mark_processed(self, json_path: Path): marker = self._marker_path(json_path) - marker.write_text(datetime.now().isoformat()) + _ = marker.write_text(datetime.now().isoformat()) def find_unprocessed_files(self) -> list[Path]: pattern = "aps_aliyun_customers_with_bills_*.json" - all_files = sorted(JSON_DIR.glob(pattern), key=lambda p: p.stat().st_mtime) + all_files = sorted(json_dir.glob(pattern), key=lambda p: p.stat().st_mtime) return [f for f in all_files if not self._is_processed(f)] def sync_file(self, json_path: Path) -> bool: logger.info("Syncing: %s", json_path.name) try: - syncer = APSSyncer(db_config=self.db_config) - syncer.sync_from_json(str(json_path)) + syncer = APSSyncer(db_config=cast(Any, self.db_config)) + _ = syncer.sync_from_json(str(json_path), sync_target=self.sync_target) self._mark_processed(json_path) return True except Exception as e: @@ -86,38 +99,40 @@ class SyncScheduler: return count def run_watch(self): - logger.info("Watching %s for new JSON files (interval=%ds)", JSON_DIR, WATCH_INTERVAL_SECONDS) - self.run_once() + logger.info("Watching %s for new JSON files (interval=%ds)", json_dir, watch_interval_seconds) + _ = self.run_once() while self.running: - time.sleep(WATCH_INTERVAL_SECONDS) + time.sleep(watch_interval_seconds) unprocessed = self.find_unprocessed_files() for f in unprocessed: if not self.running: break - self.sync_file(f) + _ = self.sync_file(f) logger.info("Watcher stopped") def main(): parser = argparse.ArgumentParser(description="APS Sync Scheduler") - parser.add_argument("--mode", choices=["watch", "cron", "daemon"], default="watch", - help="watch=file watcher, cron=one-shot, daemon=watch with initial sync") - parser.add_argument("--host", default=DB_CONFIG["host"]) - parser.add_argument("--port", type=int, default=DB_CONFIG["port"]) - parser.add_argument("--user", default=DB_CONFIG["user"]) - parser.add_argument("--password", default=DB_CONFIG["password"]) - parser.add_argument("--database", default=DB_CONFIG["database"]) - parser.add_argument("--interval", type=int, default=WATCH_INTERVAL_SECONDS, - help="Watch interval in seconds") + _ = parser.add_argument("--mode", choices=["watch", "cron", "daemon"], default="watch", + help="watch=file watcher, cron=one-shot, daemon=watch with initial sync") + _ = parser.add_argument("--host", default=db_config_default["host"]) + _ = parser.add_argument("--port", type=int, default=db_config_default["port"]) + _ = parser.add_argument("--user", default=db_config_default["user"]) + _ = parser.add_argument("--password", default=db_config_default["password"]) + _ = parser.add_argument("--database", default=db_config_default["database"]) + _ = parser.add_argument("--interval", type=int, default=watch_interval_seconds, + help="Watch interval in seconds") + _ = parser.add_argument("--sync-target", choices=sorted(valid_sync_targets), default=default_sync_target, + help="选择同步对象: all/customer/order/orderdetails/bills") args = parser.parse_args() _update_watch_interval(args.interval) - config = { + config: dict[str, str | int] = { "host": args.host, "port": args.port, "user": args.user, "password": args.password, "database": args.database, "charset": "utf8mb4", } - scheduler = SyncScheduler(db_config=config) + scheduler = SyncScheduler(db_config=config, sync_target=args.sync_target) if args.mode == "cron": count = scheduler.run_once()