diff --git a/aliyun-sync/aliyun-aps-sync/.env.example b/aliyun-sync/aliyun-aps-sync/.env.example index 9821f39..370991f 100644 --- a/aliyun-sync/aliyun-aps-sync/.env.example +++ b/aliyun-sync/aliyun-aps-sync/.env.example @@ -6,6 +6,7 @@ ALIYUN_APS_BROWSER_EXECUTABLE_PATH= ALIYUN_APS_CDP_URL=http://127.0.0.1:9222 ALIYUN_APS_TIMEZONE=Asia/Shanghai ALIYUN_APS_CRON=0 6 * * * +ALIYUN_APS_HOT_CRON=*/5 * * * * ALIYUN_APS_SCHEDULE_MODE=incremental ALIYUN_APS_CLOSE_BROWSER=true ALIYUN_APS_FULL_SYNC=true @@ -15,6 +16,13 @@ ALIYUN_APS_BILL_START_MONTH=2024-01 ALIYUN_APS_ORDER_INCREMENTAL_OVERLAP_DAYS=2 ALIYUN_APS_BILL_INCREMENTAL_OVERLAP_DAYS=7 ALIYUN_APS_MESSAGE_INCREMENTAL_OVERLAP_DAYS=7 +ALIYUN_APS_HOT_MESSAGE_OVERLAP_MINUTES=15 +ALIYUN_APS_HOT_ORDER_STABLE_THRESHOLD=100 +ALIYUN_APS_HOT_ORDER_STABLE_PAGE_THRESHOLD=2 +ALIYUN_APS_HOT_ORDER_MAX_PAGES=20 +ALIYUN_APS_HOT_MESSAGE_MAX_PAGES=10 +ALIYUN_APS_HOT_ORDER_DETAIL_REFRESH_MINUTES=30 +ALIYUN_APS_HOT_FINAL_STATUSES=已完成,已关闭,已取消,已退款完成 ALIYUN_APS_DB_HOST= ALIYUN_APS_DB_PORT=3306 ALIYUN_APS_DB_USER= diff --git a/aliyun-sync/aliyun-aps-sync/.github/workflows/playwright.yml b/aliyun-sync/aliyun-aps-sync/.github/workflows/playwright.yml new file mode 100644 index 0000000..3eb1314 --- /dev/null +++ b/aliyun-sync/aliyun-aps-sync/.github/workflows/playwright.yml @@ -0,0 +1,27 @@ +name: Playwright Tests +on: + push: + branches: [ main, master ] + pull_request: + branches: [ main, master ] +jobs: + test: + timeout-minutes: 60 + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + - uses: actions/setup-node@v4 + with: + node-version: lts/* + - name: Install dependencies + run: npm ci + - name: Install Playwright Browsers + run: npx playwright install --with-deps + - name: Run Playwright tests + run: npx playwright test + - uses: actions/upload-artifact@v4 + if: ${{ !cancelled() }} + with: + name: playwright-report + path: playwright-report/ + retention-days: 30 diff --git a/aliyun-sync/aliyun-aps-sync/.gitignore b/aliyun-sync/aliyun-aps-sync/.gitignore index d1b9ba5..261731f 100644 --- a/aliyun-sync/aliyun-aps-sync/.gitignore +++ b/aliyun-sync/aliyun-aps-sync/.gitignore @@ -3,3 +3,11 @@ node_modules/ .browser/ data/ downloads/ +.sisyphus/ + +# Playwright +/test-results/ +/playwright-report/ +/blob-report/ +/playwright/.cache/ +/playwright/.auth/ diff --git a/aliyun-sync/aliyun-aps-sync/README.md b/aliyun-sync/aliyun-aps-sync/README.md index 45759c5..2ed5f4e 100644 --- a/aliyun-sync/aliyun-aps-sync/README.md +++ b/aliyun-sync/aliyun-aps-sync/README.md @@ -54,6 +54,28 @@ npm run incremental - 抓 orders / orderDetails / bills / messages - 以数据库 watermark + overlap 为增量窗口 +### Hot 模式 + +执行: + +```bash +npm run hot +``` + +行为: + +- 每次只抓**当天订单** +- 从订单第一页开始扫描 +- 订单列表按“连续稳定行 / 连续稳定页 / 最大页数”提前停止 +- 订单详情只抓:新增订单、列表有变化订单、缺失详情订单、非终态且到达兜底刷新时间的订单 +- 消息按数据库最新时间回退分钟 overlap 后抓取,并在旧页提前停止 + +适用场景: + +- 白天高频追当天订单 +- 订单量较大,不希望每 5 分钟重复扫完整个当天分页 +- 需要兼顾详情完整性和抓取效率 + ## 登录 ```bash @@ -117,6 +139,26 @@ npm run orders -- --resume npm run messages ``` +## 高频同步 + +手动执行一次高频同步: + +```bash +npm run hot +``` + +如果 PowerShell 禁止 `npm.ps1`,可以直接执行: + +```bash +node src/index.js hot +``` + +说明: + +- `hot` 只覆盖当天订单、订单详情、消息 +- 不抓 customer / customerDetails / bills +- 适合作为工作时间内的高频轮询任务 + ## 定时任务 ```bash @@ -131,6 +173,26 @@ ALIYUN_APS_SCHEDULE_MODE=incremental 执行日增量。 +如果要执行 5 分钟高频同步,可以设置: + +```env +ALIYUN_APS_SCHEDULE_MODE=hot +ALIYUN_APS_HOT_CRON=*/5 * * * * +``` + +然后执行: + +```bash +npm run schedule +``` + +说明: + +- `incremental`:按现有增量策略抓 orders / orderDetails / bills / messages +- `full`:按全量策略执行 +- `hot`:每轮只抓当天 orders / orderDetails / messages +- hot 模式内置任务锁;如果上一轮还没结束,会跳过下一轮,避免重叠执行 + ## 增量窗口 ### orders @@ -157,6 +219,41 @@ ALIYUN_APS_BILL_INCREMENTAL_OVERLAP_DAYS=7 ALIYUN_APS_MESSAGE_INCREMENTAL_OVERLAP_DAYS=7 ``` +## 高频模式配置 + +推荐配置: + +```env +ALIYUN_APS_SCHEDULE_MODE=hot +ALIYUN_APS_HOT_CRON=*/5 * * * * +ALIYUN_APS_HOT_MESSAGE_OVERLAP_MINUTES=15 +ALIYUN_APS_HOT_ORDER_STABLE_THRESHOLD=100 +ALIYUN_APS_HOT_ORDER_STABLE_PAGE_THRESHOLD=2 +ALIYUN_APS_HOT_ORDER_MAX_PAGES=20 +ALIYUN_APS_HOT_MESSAGE_MAX_PAGES=10 +ALIYUN_APS_HOT_ORDER_DETAIL_REFRESH_MINUTES=30 +ALIYUN_APS_HOT_FINAL_STATUSES=已完成,已关闭,已取消,已退款完成 +``` + +含义: + +- `ALIYUN_APS_HOT_CRON`:高频任务 cron,默认每 5 分钟一次 +- `ALIYUN_APS_HOT_MESSAGE_OVERLAP_MINUTES`:消息高频模式的回扫分钟数 +- `ALIYUN_APS_HOT_ORDER_STABLE_THRESHOLD`:订单扫描中连续多少条稳定记录后停止 +- `ALIYUN_APS_HOT_ORDER_STABLE_PAGE_THRESHOLD`:订单扫描中连续多少页无新增/变更后停止 +- `ALIYUN_APS_HOT_ORDER_MAX_PAGES`:订单每轮最多扫描页数,防止高峰期跑太久 +- `ALIYUN_APS_HOT_MESSAGE_MAX_PAGES`:消息每轮最多扫描页数 +- `ALIYUN_APS_HOT_ORDER_DETAIL_REFRESH_MINUTES`:非终态订单详情兜底刷新间隔 +- `ALIYUN_APS_HOT_FINAL_STATUSES`:视为终态的订单状态,终态订单在无变化时会尽量跳过详情抓取 + +默认策略: + +- 订单按最新到最旧扫描 +- 新订单或列表字段变化的订单会进入详情抓取 +- 已抓过且无变化的终态订单会直接跳过详情 +- 非终态订单会按兜底刷新时间周期性重抓详情 +- 消息使用 watermark + overlap,避免 5 分钟轮询时漏边界消息 + ## 数据库配置 `.env` 需要配置: diff --git a/aliyun-sync/aliyun-aps-sync/package-lock.json b/aliyun-sync/aliyun-aps-sync/package-lock.json index 9c831cf..80fdc1d 100644 --- a/aliyun-sync/aliyun-aps-sync/package-lock.json +++ b/aliyun-sync/aliyun-aps-sync/package-lock.json @@ -13,14 +13,31 @@ "node-cron": "^4.2.1", "nodemailer": "^6.10.1", "playwright": "^1.58.2" + }, + "devDependencies": { + "@playwright/test": "^1.59.1", + "@types/node": "^25.6.2" + } + }, + "node_modules/@playwright/test": { + "version": "1.59.1", + "resolved": "https://registry.npmjs.org/@playwright/test/-/test-1.59.1.tgz", + "integrity": "sha512-PG6q63nQg5c9rIi4/Z5lR5IVF7yU5MqmKaPOe0HSc0O2cX1fPi96sUQu5j7eo4gKCkB2AnNGoWt7y4/Xx3Kcqg==", + "dev": true, + "dependencies": { + "playwright": "1.59.1" + }, + "bin": { + "playwright": "cli.js" + }, + "engines": { + "node": ">=18" } }, "node_modules/@types/node": { - "version": "25.6.0", - "resolved": "https://registry.npmmirror.com/@types/node/-/node-25.6.0.tgz", - "integrity": "sha512-+qIYRKdNYJwY3vRCZMdJbPLJAtGjQBudzZzdzwQYkEPQd+PJGixUL5QfvCLDaULoLv+RhT3LDkwEfKaAkgSmNQ==", - "license": "MIT", - "peer": true, + "version": "25.6.2", + "resolved": "https://registry.npmjs.org/@types/node/-/node-25.6.2.tgz", + "integrity": "sha512-sokuT28dxf9JT5Kady1fsXOvI4HVpjZa95NKT5y9PNTIrs2AsobR4GFAA90ZG8M+nxVRLysCXsVj6eGC7Vbrlw==", "dependencies": { "undici-types": "~7.19.0" } @@ -174,12 +191,11 @@ } }, "node_modules/playwright": { - "version": "1.58.2", - "resolved": "https://registry.npmmirror.com/playwright/-/playwright-1.58.2.tgz", - "integrity": "sha512-vA30H8Nvkq/cPBnNw4Q8TWz1EJyqgpuinBcHET0YVJVFldr8JDNiU9LaWAE1KqSkRYazuaBhTpB5ZzShOezQ6A==", - "license": "Apache-2.0", + "version": "1.59.1", + "resolved": "https://registry.npmjs.org/playwright/-/playwright-1.59.1.tgz", + "integrity": "sha512-C8oWjPR3F81yljW9o5OxcWzfh6avkVwDD2VYdwIGqTkl+OGFISgypqzfu7dOe4QNLL2aqcWBmI3PMtLIK233lw==", "dependencies": { - "playwright-core": "1.58.2" + "playwright-core": "1.59.1" }, "bin": { "playwright": "cli.js" @@ -192,10 +208,9 @@ } }, "node_modules/playwright-core": { - "version": "1.58.2", - "resolved": "https://registry.npmmirror.com/playwright-core/-/playwright-core-1.58.2.tgz", - "integrity": "sha512-yZkEtftgwS8CsfYo7nm0KE8jsvm6i/PTgVtB8DL726wNf6H2IMsDuxCpJj59KDaxCtSnrWan2AeDqM7JBaultg==", - "license": "Apache-2.0", + "version": "1.59.1", + "resolved": "https://registry.npmjs.org/playwright-core/-/playwright-core-1.59.1.tgz", + "integrity": "sha512-HBV/RJg81z5BiiZ9yPzIiClYV/QMsDCKUyogwH9p3MCP6IYjUFu/MActgYAvK0oWyV9NlwM3GLBjADyWgydVyg==", "bin": { "playwright-core": "cli.js" }, @@ -228,8 +243,7 @@ "version": "7.19.2", "resolved": "https://registry.npmmirror.com/undici-types/-/undici-types-7.19.2.tgz", "integrity": "sha512-qYVnV5OEm2AW8cJMCpdV20CDyaN3g0AjDlOGf1OW4iaDEx8MwdtChUp4zu4H0VP3nDRF/8RKWH+IPp9uW0YGZg==", - "license": "MIT", - "peer": true + "license": "MIT" } } } diff --git a/aliyun-sync/aliyun-aps-sync/package.json b/aliyun-sync/aliyun-aps-sync/package.json index 83e818d..35a0139 100644 --- a/aliyun-sync/aliyun-aps-sync/package.json +++ b/aliyun-sync/aliyun-aps-sync/package.json @@ -7,6 +7,7 @@ "login": "node src/index.js login", "sync": "node src/index.js sync", "incremental": "node src/index.js incremental", + "hot": "node src/index.js hot", "bills": "node src/index.js bills", "orders": "node src/index.js orders", "messages": "node src/index.js messages", @@ -15,8 +16,12 @@ "dependencies": { "dotenv": "^16.6.1", "mysql2": "^3.15.2", - "nodemailer": "^6.10.1", "node-cron": "^4.2.1", + "nodemailer": "^6.10.1", "playwright": "^1.58.2" + }, + "devDependencies": { + "@playwright/test": "^1.59.1", + "@types/node": "^25.6.2" } } diff --git a/aliyun-sync/aliyun-aps-sync/playwright.config.ts b/aliyun-sync/aliyun-aps-sync/playwright.config.ts new file mode 100644 index 0000000..6dfc0d9 --- /dev/null +++ b/aliyun-sync/aliyun-aps-sync/playwright.config.ts @@ -0,0 +1,79 @@ +import { defineConfig, devices } from '@playwright/test'; + +/** + * Read environment variables from file. + * https://github.com/motdotla/dotenv + */ +// import dotenv from 'dotenv'; +// import path from 'path'; +// dotenv.config({ path: path.resolve(__dirname, '.env') }); + +/** + * See https://playwright.dev/docs/test-configuration. + */ +export default defineConfig({ + testDir: './tests', + /* Run tests in files in parallel */ + fullyParallel: true, + /* Fail the build on CI if you accidentally left test.only in the source code. */ + forbidOnly: !!process.env.CI, + /* Retry on CI only */ + retries: process.env.CI ? 2 : 0, + /* Opt out of parallel tests on CI. */ + workers: process.env.CI ? 1 : undefined, + /* Reporter to use. See https://playwright.dev/docs/test-reporters */ + reporter: 'html', + /* Shared settings for all the projects below. See https://playwright.dev/docs/api/class-testoptions. */ + use: { + /* Base URL to use in actions like `await page.goto('')`. */ + // baseURL: 'http://localhost:3000', + + /* Collect trace when retrying the failed test. See https://playwright.dev/docs/trace-viewer */ + trace: 'on-first-retry', + }, + + /* Configure projects for major browsers */ + projects: [ + { + name: 'chromium', + use: { ...devices['Desktop Chrome'] }, + }, + + { + name: 'firefox', + use: { ...devices['Desktop Firefox'] }, + }, + + { + name: 'webkit', + use: { ...devices['Desktop Safari'] }, + }, + + /* Test against mobile viewports. */ + // { + // name: 'Mobile Chrome', + // use: { ...devices['Pixel 5'] }, + // }, + // { + // name: 'Mobile Safari', + // use: { ...devices['iPhone 12'] }, + // }, + + /* Test against branded browsers. */ + // { + // name: 'Microsoft Edge', + // use: { ...devices['Desktop Edge'], channel: 'msedge' }, + // }, + // { + // name: 'Google Chrome', + // use: { ...devices['Desktop Chrome'], channel: 'chrome' }, + // }, + ], + + /* Run your local dev server before starting the tests */ + // webServer: { + // command: 'npm run start', + // url: 'http://localhost:3000', + // reuseExistingServer: !process.env.CI, + // }, +}); diff --git a/aliyun-sync/aliyun-aps-sync/src/config.js b/aliyun-sync/aliyun-aps-sync/src/config.js index 674cfe3..fcdfdf9 100644 --- a/aliyun-sync/aliyun-aps-sync/src/config.js +++ b/aliyun-sync/aliyun-aps-sync/src/config.js @@ -11,6 +11,24 @@ const toBool = (value, fallback) => { return ['1', 'true', 'yes', 'y', 'on'].includes(String(value).trim().toLowerCase()); }; +const toInt = (value, fallback, min = Number.MIN_SAFE_INTEGER) => { + const parsed = Number.parseInt(String(value ?? ''), 10); + if (Number.isNaN(parsed)) { + return fallback; + } + return Math.max(min, parsed); +}; + +const toList = (value, fallback = []) => { + if (value == null || String(value).trim() === '') { + return fallback; + } + return String(value) + .split(',') + .map((item) => item.trim()) + .filter(Boolean); +}; + const ensureDir = (dirPath) => { fs.mkdirSync(dirPath, { recursive: true }); return dirPath; @@ -26,12 +44,20 @@ export const config = { cdpUrl: (process.env.ALIYUN_APS_CDP_URL || 'http://127.0.0.1:9222').trim(), timezone: process.env.ALIYUN_APS_TIMEZONE || 'Asia/Shanghai', cron: process.env.ALIYUN_APS_CRON || '0 6 * * *', + hotCron: process.env.ALIYUN_APS_HOT_CRON || '*/5 * * * *', orderStartDate: process.env.ALIYUN_APS_ORDER_START_DATE || '2024-01-01', incrementalOrderStartDate: process.env.ALIYUN_APS_INCREMENTAL_ORDER_START_DATE || '', billStartMonth: process.env.ALIYUN_APS_BILL_START_MONTH || '2024-01', - orderIncrementalOverlapDays: Math.max(0, Number.parseInt(process.env.ALIYUN_APS_ORDER_INCREMENTAL_OVERLAP_DAYS || '2', 10) || 2), - billIncrementalOverlapDays: Math.max(0, Number.parseInt(process.env.ALIYUN_APS_BILL_INCREMENTAL_OVERLAP_DAYS || '7', 10) || 7), - messageIncrementalOverlapDays: Math.max(0, Number.parseInt(process.env.ALIYUN_APS_MESSAGE_INCREMENTAL_OVERLAP_DAYS || '7', 10) || 7), + orderIncrementalOverlapDays: toInt(process.env.ALIYUN_APS_ORDER_INCREMENTAL_OVERLAP_DAYS || '2', 2, 0), + billIncrementalOverlapDays: toInt(process.env.ALIYUN_APS_BILL_INCREMENTAL_OVERLAP_DAYS || '7', 7, 0), + messageIncrementalOverlapDays: toInt(process.env.ALIYUN_APS_MESSAGE_INCREMENTAL_OVERLAP_DAYS || '7', 7, 0), + hotMessageOverlapMinutes: toInt(process.env.ALIYUN_APS_HOT_MESSAGE_OVERLAP_MINUTES || '15', 15, 0), + hotOrderStableThreshold: toInt(process.env.ALIYUN_APS_HOT_ORDER_STABLE_THRESHOLD || '100', 100, 1), + hotOrderStablePageThreshold: toInt(process.env.ALIYUN_APS_HOT_ORDER_STABLE_PAGE_THRESHOLD || '2', 2, 1), + hotOrderMaxPagesPerRun: toInt(process.env.ALIYUN_APS_HOT_ORDER_MAX_PAGES || '20', 20, 1), + hotMessageMaxPagesPerRun: toInt(process.env.ALIYUN_APS_HOT_MESSAGE_MAX_PAGES || '10', 10, 1), + hotOrderDetailRefreshMinutes: toInt(process.env.ALIYUN_APS_HOT_ORDER_DETAIL_REFRESH_MINUTES || '30', 30, 1), + hotFinalStatuses: toList(process.env.ALIYUN_APS_HOT_FINAL_STATUSES, ['已完成', '已关闭', '已取消', '已退款完成']), scheduleMode: process.env.ALIYUN_APS_SCHEDULE_MODE || 'incremental', smtp: { host: process.env.ALIYUN_APS_SMTP_HOST || 'smtp.qq.com', @@ -146,6 +172,7 @@ export const datasets = { couponAmountCny: record.couponAmountCny || '', windowStart: context.windowStart || '', windowEnd: context.windowEnd || '', + detailSyncedAt: context.detailSyncedAt || record.detailSyncedAt || '', }), }, customerDetails: { diff --git a/aliyun-sync/aliyun-aps-sync/src/index.js b/aliyun-sync/aliyun-aps-sync/src/index.js index fed6335..7f55b1d 100644 --- a/aliyun-sync/aliyun-aps-sync/src/index.js +++ b/aliyun-sync/aliyun-aps-sync/src/index.js @@ -6,6 +6,7 @@ const extraArgs = args.slice(1); const billsResume = extraArgs.includes('--resume'); const ordersIncremental = extraArgs.includes('--incremental'); const messagesResume = extraArgs.includes('--resume'); +const hotResume = extraArgs.includes('--resume'); for (const arg of extraArgs) { if (arg.startsWith('--incremental-order-start-date=')) { @@ -13,7 +14,7 @@ for (const arg of extraArgs) { } } -const { login, scheduleSync, syncAll, syncAllIncremental, syncBillsOnly, syncMessagesOnly, syncOrdersOnly } = await import('./sync.js'); +const { login, scheduleSync, syncAll, syncAllIncremental, syncBillsOnly, syncMessagesOnly, syncOrdersOnly, syncHot } = await import('./sync.js'); if (command === 'login') { await login(); @@ -50,6 +51,12 @@ if (command === 'messages') { process.exit(0); } +if (command === 'hot') { + const summary = await syncHot({ resume: hotResume }); + console.log(JSON.stringify(summary, null, 2)); + process.exit(0); +} + if (command === 'schedule') { await scheduleSync(); } else { diff --git a/aliyun-sync/aliyun-aps-sync/src/sync.js b/aliyun-sync/aliyun-aps-sync/src/sync.js index 7a4a8cb..df1c095 100644 --- a/aliyun-sync/aliyun-aps-sync/src/sync.js +++ b/aliyun-sync/aliyun-aps-sync/src/sync.js @@ -35,6 +35,7 @@ let _context = null; let _runtimeController = null; let _browser = null; let _isAttachedBrowser = false; +const runningJobs = new Set(); const AUTH_PAGE_KEYWORDS = [ 'RAM 用户登录', @@ -334,6 +335,151 @@ function formatDateTime(date) { return `${formatDate(date)} ${String(date.getHours()).padStart(2, '0')}:${String(date.getMinutes()).padStart(2, '0')}:${String(date.getSeconds()).padStart(2, '0')}`; } +function isSameDate(value, date) { + const parsed = parseDbDateTime(value); + if (!parsed) { + return false; + } + return formatDate(parsed) === formatDate(date); +} + +function addMinutes(date, minutes) { + const next = new Date(date); + next.setMinutes(next.getMinutes() + minutes); + return next; +} + +function buildOrderFingerprint(record) { + return [ + String(record.orderStatus || '').trim(), + String(record.actualPaidCny || '').trim(), + String(record.orderOriginalPriceCny || '').trim(), + String(record.orderType || '').trim(), + String(record.customerCategory || '').trim(), + String(record.createdAt || '').trim(), + ].join('|'); +} + +function isFinalOrderStatus(status) { + const normalized = String(status || '').trim(); + if (!normalized) { + return false; + } + return config.hotFinalStatuses.some((item) => item === normalized); +} + +async function runLockedJob(jobName, job) { + if (runningJobs.has(jobName)) { + console.log(`[任务锁] ${jobName} 已在运行,跳过本次执行`); + return { skipped: true, reason: 'already_running', jobName }; + } + runningJobs.add(jobName); + try { + return await job(); + } finally { + runningJobs.delete(jobName); + } +} + +function buildTodayOrderWindow() { + const today = formatDate(new Date()); + return buildSingleDateWindow(today, today); +} + +function computeChangedOrderIds(previousRecords, nextRecords) { + const previousMap = new Map(); + for (const record of previousRecords || []) { + const orderId = String(record.orderId || '').trim(); + if (!orderId) { + continue; + } + previousMap.set(orderId, record); + } + + const changedOrderIds = []; + for (const record of nextRecords || []) { + const orderId = String(record.orderId || '').trim(); + if (!orderId) { + continue; + } + const previous = previousMap.get(orderId); + if (!previous) { + changedOrderIds.push(orderId); + continue; + } + if (buildOrderFingerprint(previous) !== buildOrderFingerprint(record)) { + changedOrderIds.push(orderId); + } + } + return Array.from(new Set(changedOrderIds)); +} + +function selectOrderDetailCandidates(orderRecords, changedOrderIds, detailRecords) { + const changedSet = new Set((changedOrderIds || []).map((item) => String(item || '').trim()).filter(Boolean)); + const detailMap = new Map(); + for (const record of detailRecords || []) { + const orderId = String(record.orderId || '').trim(); + if (!orderId) { + continue; + } + detailMap.set(orderId, record); + } + + const now = new Date(); + const refreshBefore = addMinutes(now, -config.hotOrderDetailRefreshMinutes); + const candidateIds = []; + for (const record of orderRecords || []) { + const orderId = String(record.orderId || '').trim(); + if (!orderId || !isValidOrderId(orderId)) { + continue; + } + if (changedSet.has(orderId)) { + candidateIds.push(orderId); + continue; + } + const status = String(record.orderStatus || '').trim(); + if (isFinalOrderStatus(status)) { + continue; + } + const detail = detailMap.get(orderId); + if (!detail) { + candidateIds.push(orderId); + continue; + } + const lastSyncedAt = parseDbDateTime(detail.detailSyncedAt || detail.__detailSyncedAt || ''); + if (!lastSyncedAt || lastSyncedAt <= refreshBefore) { + candidateIds.push(orderId); + } + } + return Array.from(new Set(candidateIds)); +} + +function summarizeHotPage(previousOrderMap, normalizedPageRows) { + let stableCount = 0; + let changedCount = 0; + let newCount = 0; + let todayRowCount = 0; + + for (const record of normalizedPageRows) { + if (isSameDate(record.createdAt, new Date())) { + todayRowCount += 1; + } + const orderId = String(record.orderId || '').trim(); + const previous = previousOrderMap.get(orderId); + if (!previous) { + newCount += 1; + continue; + } + if (buildOrderFingerprint(previous) === buildOrderFingerprint(record)) { + stableCount += 1; + } else { + changedCount += 1; + } + } + + return { stableCount, changedCount, newCount, todayRowCount }; +} + function buildSingleDateWindow(startDate, endDate) { return [{ windowStart: startDate, @@ -552,9 +698,13 @@ export async function syncOrdersOnly(options = {}) { try { const summary = { startedAt: new Date().toISOString(), datasets: {} }; page = await resolveActivePage(context, '/detail/order/~/costCenter/order'); - summary.datasets.orders = await syncOrders(page, options); + const orderSyncResult = await syncOrders(page, options); + summary.datasets.orders = orderSyncResult; const latestOrders = loadCurrentState('orders', datasets.orders.uniqueKey); - const orderIdsForDetail = collectValidOrderIds(latestOrders.records || []); + const orderDetailsState = loadCurrentState('orderDetails', datasets.orderDetails.uniqueKey); + const orderIdsForDetail = options.hot + ? selectOrderDetailCandidates(latestOrders.records || [], orderSyncResult.changedOrderIds || [], orderDetailsState.records || []) + : collectValidOrderIds(latestOrders.records || []); summary.datasets.orderDetails = await syncOrderDetails(page, orderIdsForDetail); summary.finishedAt = new Date().toISOString(); @@ -605,10 +755,13 @@ export async function syncMessagesOnly(options = {}) { } export async function scheduleSync() { - console.log(`定时任务已启动: ${config.cron} (${config.timezone})`); + console.log(`定时任务已启动: normal=${config.cron}, hot=${config.hotCron} (${config.timezone})`); cron.schedule( config.cron, async () => { + if (config.scheduleMode === 'hot') { + return; + } try { console.log(`[${new Date().toISOString()}] 开始执行同步 mode=${config.scheduleMode}`); const summary = config.scheduleMode === 'full' @@ -621,6 +774,67 @@ export async function scheduleSync() { }, { timezone: config.timezone }, ); + + cron.schedule( + config.hotCron, + async () => { + if (config.scheduleMode !== 'hot') { + return; + } + try { + console.log(`[${new Date().toISOString()}] 开始执行高频同步 mode=hot`); + const summary = await syncHot(); + console.log(`[${new Date().toISOString()}] 高频同步完成`, JSON.stringify(summary, null, 2)); + } catch (error) { + console.error(`[${new Date().toISOString()}] 高频同步失败`, error); + } + }, + { timezone: config.timezone }, + ); +} + +export async function syncHot(options = {}) { + return runLockedJob('hot-sync', async () => { + const runtimeController = getRuntimeController(); + runtimeController.bind(); + const context = await getContext(); + let page = null; + + try { + const summary = { startedAt: new Date().toISOString(), mode: 'hot', datasets: {} }; + page = await resolveActivePage(context, '/detail/order/~/costCenter/order'); + const orderSyncResult = await syncOrders(page, { ...options, hot: true, incremental: true, resume: options.resume === true }); + summary.datasets.orders = orderSyncResult; + + const latestOrders = loadCurrentState('orders', datasets.orders.uniqueKey); + const orderDetailsState = loadCurrentState('orderDetails', datasets.orderDetails.uniqueKey); + const orderIdsForDetail = selectOrderDetailCandidates( + latestOrders.records || [], + orderSyncResult.changedOrderIds || [], + orderDetailsState.records || [], + ); + + summary.datasets.orderDetails = await syncOrderDetails(page, orderIdsForDetail, { resume: options.resume === true }); + page = await resolveActivePage(context, '/message'); + summary.datasets.messages = await syncMessages(page, { incremental: true, resume: options.resume === true, hot: true }); + summary.finishedAt = new Date().toISOString(); + + const stamp = nowStamp(); + saveRunSummary(stamp, summary); + return summary; + } catch (error) { + await reportRuntimeError(error, page, { label: 'syncHot', dataset: 'hot', mode: 'hot' }); + throw error; + } finally { + if (config.closeBrowser) { + await closeContextIfNeeded(); + } else { + console.log('浏览器保持运行'); + } + await closeDbPool(); + runtimeController.unbind(); + } + }); } export async function syncAllIncremental() { @@ -632,7 +846,8 @@ export async function syncAllIncremental() { try { const summary = { startedAt: new Date().toISOString(), mode: 'incremental', datasets: {} }; page = await resolveActivePage(context, '/detail/order/~/costCenter/order'); - summary.datasets.orders = await syncOrders(page, { incremental: true, resume: true }); + const orderSyncResult = await syncOrders(page, { incremental: true, resume: true }); + summary.datasets.orders = orderSyncResult; const latestOrders = loadCurrentState('orders', datasets.orders.uniqueKey); const orderIdsForDetail = collectValidOrderIds(latestOrders.records || []); summary.datasets.orderDetails = await syncOrderDetails(page, orderIdsForDetail, { resume: true }); @@ -790,10 +1005,12 @@ async function syncCustomerDetails(page, options = {}) { async function syncOrders(page, options = {}) { await runtimeCheckpoint('同步订单'); const dataset = datasets.orders; - const { incremental = false, resume = false } = options; + const { incremental = false, resume = false, hot = false } = options; let windows; - if (!incremental) { + if (hot) { + windows = buildTodayOrderWindow(); + } else if (!incremental) { windows = buildMonthlyDateWindows(config.orderStartDate); } else { windows = await buildIncrementalOrderWindows(); @@ -808,7 +1025,18 @@ async function syncOrders(page, options = {}) { } } + const previousState = loadCurrentState(dataset.name, dataset.uniqueKey); + const previousRecords = previousState.records || []; + const previousOrderMap = new Map(previousRecords.map((record) => [String(record.orderId || '').trim(), record])); const allNormalizedRecords = []; + const hotStats = { + pagesScanned: 0, + stableRows: 0, + newRows: 0, + changedRows: 0, + stoppedEarly: false, + stopReason: '', + }; for (const window of windows) { await runtimeCheckpoint(`订单窗口 ${window.start} ~ ${window.end}`); @@ -833,6 +1061,8 @@ async function syncOrders(page, options = {}) { } let records = []; + let stableRowsInARow = 0; + let stablePagesInARow = 0; if (shouldContinueScrape) { records = await scrapePagedTable(page, dataset, window, { onPage: async ({ pageNum, pageRows }) => { @@ -842,7 +1072,47 @@ async function syncOrders(page, options = {}) { await upsertOrders(normalizedPageRows); } await saveOrdersCheckpoint(dataset, window, pageNum, windowNormalizedRecords); + + if (hot) { + hotStats.pagesScanned += 1; + const pageSummary = summarizeHotPage(previousOrderMap, normalizedPageRows); + hotStats.stableRows += pageSummary.stableCount; + hotStats.newRows += pageSummary.newCount; + hotStats.changedRows += pageSummary.changedCount; + + if (pageSummary.changedCount === 0 && pageSummary.newCount === 0) { + stablePagesInARow += 1; + } else { + stablePagesInARow = 0; + } + + if (pageSummary.stableCount === normalizedPageRows.length && normalizedPageRows.length > 0) { + stableRowsInARow += normalizedPageRows.length; + } else { + stableRowsInARow = 0; + } + } }, + shouldStop: hot + ? async ({ pageNum }) => { + if (pageNum >= config.hotOrderMaxPagesPerRun) { + hotStats.stoppedEarly = true; + hotStats.stopReason = `max_pages:${config.hotOrderMaxPagesPerRun}`; + return true; + } + if (stableRowsInARow >= config.hotOrderStableThreshold) { + hotStats.stoppedEarly = true; + hotStats.stopReason = `stable_rows:${stableRowsInARow}`; + return true; + } + if (stablePagesInARow >= config.hotOrderStablePageThreshold) { + hotStats.stoppedEarly = true; + hotStats.stopReason = `stable_pages:${stablePagesInARow}`; + return true; + } + return false; + } + : undefined, }); } @@ -856,7 +1126,14 @@ async function syncOrders(page, options = {}) { allNormalizedRecords.push(...windowNormalizedRecords); } - return persistNormalizedDataset(dataset, dedupeByHash(allNormalizedRecords)); + const normalizedRecords = dedupeByHash(allNormalizedRecords); + const changedOrderIds = computeChangedOrderIds(previousRecords, normalizedRecords); + const persisted = persistNormalizedDataset(dataset, normalizedRecords); + return { + ...persisted, + changedOrderIds, + hot: hot ? hotStats : undefined, + }; } async function buildIncrementalOrderWindows() { @@ -996,7 +1273,7 @@ async function syncBills(page, options = {}) { async function syncMessages(page, options = {}) { await runtimeCheckpoint('同步消息'); const dataset = datasets.messages; - const { incremental = false, resume = false } = options; + const { incremental = false, resume = false, hot = false } = options; await page.goto(dataset.url, { waitUntil: 'domcontentloaded' }); await waitUntilReady(page, dataset.heading); await trySetPageSize(page, dataset.pageSize); @@ -1016,6 +1293,14 @@ async function syncMessages(page, options = {}) { } let records = []; + let hotWatermark = null; + if (hot && hasDbConfig()) { + const latestMessageTime = await getLatestMessageTimeFromDb(); + const latest = parseDbDateTime(latestMessageTime); + if (latest) { + hotWatermark = addMinutes(latest, -config.hotMessageOverlapMinutes); + } + } if (shouldContinueScrape) { records = await scrapePagedTable(page, dataset, {}, { onPage: async ({ pageNum, pageRows }) => { @@ -1027,26 +1312,41 @@ async function syncMessages(page, options = {}) { await saveMessagesCheckpoint(dataset, pageNum, allNormalizedRecords); }, skipInitialPage: resumeFromPage > 0, + shouldStop: hot + ? async ({ pageNum, pageRows }) => { + if (pageNum >= config.hotMessageMaxPagesPerRun) { + return true; + } + if (!hotWatermark) { + return false; + } + const normalizedPageRows = normalizeDatasetRecords(dataset, pageRows, {}); + return normalizedPageRows.length > 0 + && normalizedPageRows.every((record) => !isAfterLatestMessageTime(record, hotWatermark)); + } + : undefined, }); } if (resumeFromPage === 0) { allNormalizedRecords = normalizeDatasetRecords(dataset, records, {}); } - if (incremental && hasDbConfig()) { + if ((incremental || hot) && hasDbConfig()) { try { const latestMessageTime = await getLatestMessageTimeFromDb(); if (latestMessageTime) { const latest = parseDbDateTime(latestMessageTime); if (latest) { - const watermark = subtractDays(latest, config.messageIncrementalOverlapDays); + const watermark = hot + ? addMinutes(latest, -config.hotMessageOverlapMinutes) + : subtractDays(latest, config.messageIncrementalOverlapDays); const before = allNormalizedRecords.length; allNormalizedRecords = allNormalizedRecords.filter((record) => isAfterLatestMessageTime(record, watermark)); - console.log(`[增量模式] 消息按时间过滤: ${before} -> ${allNormalizedRecords.length} (db_last=${latestMessageTime}, overlap=${config.messageIncrementalOverlapDays}d)`); + console.log(`[${hot ? '高频模式' : '增量模式'}] 消息按时间过滤: ${before} -> ${allNormalizedRecords.length} (db_last=${latestMessageTime}, overlap=${hot ? `${config.hotMessageOverlapMinutes}m` : `${config.messageIncrementalOverlapDays}d`})`); } } } catch (error) { - console.error('[增量模式] 查询数据库最新消息时间失败:', error.message); + console.error(`[${hot ? '高频模式' : '增量模式'}] 查询数据库最新消息时间失败:`, error.message); } } @@ -1412,10 +1712,11 @@ async function syncOrderDetails(page, cachedOrderIds, options = {}) { if (!isValidOrderId(detail.orderId)) { detail.orderId = target.orderId; } - allDetails.push({ ...detail, __context: {} }); + const detailContext = { detailSyncedAt: new Date().toISOString() }; + allDetails.push({ ...detail, __context: detailContext }); await saveOrderDetailsCheckpoint(dataset, index + 1, allDetails); if (hasDbConfig()) { - const normalizedDetail = normalizeDatasetRecords(dataset, [{ ...detail, __context: {} }], {}); + const normalizedDetail = normalizeDatasetRecords(dataset, [{ ...detail, __context: detailContext }], detailContext); await upsertOrderDetails(normalizedDetail); } @@ -1500,7 +1801,7 @@ async function waitUntilReady(page, heading, timeout = 120000, options = {}) { } async function scrapePagedTable(page, dataset, context, options = {}) { - const { onPage, skipInitialPage = false } = options; + const { onPage, skipInitialPage = false, shouldStop } = options; const pages = []; const visited = new Set(); let shouldSkipCurrentPage = skipInitialPage; @@ -1533,6 +1834,11 @@ async function scrapePagedTable(page, dataset, context, options = {}) { await onPage({ pageData, pageNum, pageRows }); } + if (shouldStop && await shouldStop({ pageData, pageNum, pageRows, pages })) { + console.log(`[抓取] 满足停止条件,在第${pageNum}页提前停止`); + break; + } + const moved = await gotoNextPage(page); if (!moved) { console.log(`[抓取] 翻页失败或已到最后一页,停止`); diff --git a/aliyun-sync/aliyun-aps-sync/tests/example.spec.ts b/aliyun-sync/aliyun-aps-sync/tests/example.spec.ts new file mode 100644 index 0000000..54a906a --- /dev/null +++ b/aliyun-sync/aliyun-aps-sync/tests/example.spec.ts @@ -0,0 +1,18 @@ +import { test, expect } from '@playwright/test'; + +test('has title', async ({ page }) => { + await page.goto('https://playwright.dev/'); + + // Expect a title "to contain" a substring. + await expect(page).toHaveTitle(/Playwright/); +}); + +test('get started link', async ({ page }) => { + await page.goto('https://playwright.dev/'); + + // Click the get started link. + await page.getByRole('link', { name: 'Get started' }).click(); + + // Expects page to have a heading with the name of Installation. + await expect(page.getByRole('heading', { name: 'Installation' })).toBeVisible(); +});