From 5ead561b94e358cf670e4153885847e74ca459af Mon Sep 17 00:00:00 2001 From: ray <1416431931@qq.com> Date: Tue, 28 Apr 2026 17:06:32 +0800 Subject: [PATCH] =?UTF-8?q?sql=E4=BF=AE=E6=94=B9=E6=8F=90=E4=BA=A4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- aliyun-sync/COMMANDS.md | 6 ++ aliyun-sync/aliyun-aps-sync/README.md | 6 ++ aliyun-sync/aliyun-aps-sync/src/config.js | 2 +- aliyun-sync/aliyun-aps-sync/src/index.js | 2 +- aliyun-sync/aliyun-aps-sync/src/sync.js | 124 ++++++++++++++++++++-- 5 files changed, 127 insertions(+), 13 deletions(-) diff --git a/aliyun-sync/COMMANDS.md b/aliyun-sync/COMMANDS.md index 3aeb78e..bfd24a5 100644 --- a/aliyun-sync/COMMANDS.md +++ b/aliyun-sync/COMMANDS.md @@ -31,6 +31,12 @@ npm run login npm run sync ``` +如果要从已有 checkpoint 继续全量流程(当前主要覆盖 orders + bills): + +```powershell +npm run sync -- --resume +``` + 默认包含: - customers diff --git a/aliyun-sync/aliyun-aps-sync/README.md b/aliyun-sync/aliyun-aps-sync/README.md index d7606cb..d8fb171 100644 --- a/aliyun-sync/aliyun-aps-sync/README.md +++ b/aliyun-sync/aliyun-aps-sync/README.md @@ -28,6 +28,12 @@ Node 版阿里云 APS 同步工具。 npm run sync ``` +如果要让 full sync 从已有 checkpoint 继续(当前主要覆盖 orders + bills): + +```bash +npm run sync -- --resume +``` + 行为: - 抓全量 customer + customerDetails diff --git a/aliyun-sync/aliyun-aps-sync/src/config.js b/aliyun-sync/aliyun-aps-sync/src/config.js index 5a2049d..ba56b19 100644 --- a/aliyun-sync/aliyun-aps-sync/src/config.js +++ b/aliyun-sync/aliyun-aps-sync/src/config.js @@ -64,7 +64,7 @@ export const datasets = { name: 'customers', url: `${config.baseUrl}/#/detail/my_customer/~/customer/list`, heading: '我的客户', - pageSize: 100, + pageSize: 20, uniqueKey: (record) => record.accountId || record.loginName || record.__hash, normalize: (record) => { const loginAndUid = record['登录名称/账号ID'] || ''; diff --git a/aliyun-sync/aliyun-aps-sync/src/index.js b/aliyun-sync/aliyun-aps-sync/src/index.js index 7aa242d..486e4f5 100644 --- a/aliyun-sync/aliyun-aps-sync/src/index.js +++ b/aliyun-sync/aliyun-aps-sync/src/index.js @@ -19,7 +19,7 @@ if (command === 'login') { } if (command === 'sync') { - const summary = await syncAll(); + const summary = await syncAll({ resume: billsResume }); console.log(JSON.stringify(summary, null, 2)); process.exit(0); } diff --git a/aliyun-sync/aliyun-aps-sync/src/sync.js b/aliyun-sync/aliyun-aps-sync/src/sync.js index 21230a4..cf25823 100644 --- a/aliyun-sync/aliyun-aps-sync/src/sync.js +++ b/aliyun-sync/aliyun-aps-sync/src/sync.js @@ -239,6 +239,37 @@ function loadLatestBillsCheckpoint() { } } +function loadLatestOrdersCheckpoint() { + const checkpointDir = path.join(config.dataDir, 'checkpoints', 'orders'); + if (!fs.existsSync(checkpointDir)) { + return null; + } + + const candidates = fs.readdirSync(checkpointDir) + .filter((fileName) => fileName.endsWith('.json')) + .map((fileName) => { + const filePath = path.join(checkpointDir, fileName); + const stat = fs.statSync(filePath); + return { fileName, filePath, mtimeMs: stat.mtimeMs }; + }) + .sort((a, b) => b.mtimeMs - a.mtimeMs); + + if (candidates.length === 0) { + return null; + } + + try { + const latest = JSON.parse(fs.readFileSync(candidates[0].filePath, 'utf-8')); + if (!latest || typeof latest !== 'object') { + return null; + } + return latest; + } catch (error) { + console.warn(`[订单检查点] 读取失败,忽略断点续爬: ${error.message}`); + return null; + } +} + function subtractDays(dateValue, days) { const next = new Date(dateValue); next.setDate(next.getDate() - days); @@ -393,11 +424,12 @@ export async function login() { } } -export async function syncAll() { +export async function syncAll(options = {}) { const runtimeController = getRuntimeController(); runtimeController.bind(); const context = await getContext(); let page = null; + const { resume = false } = options; try { const summary = { startedAt: new Date().toISOString(), datasets: {} }; @@ -408,14 +440,14 @@ export async function syncAll() { summary.datasets.customerDetails = await syncCustomerDetails(page); } - summary.datasets.orders = await syncOrders(page, { incremental: !config.fullSync }); + summary.datasets.orders = await syncOrders(page, { incremental: !config.fullSync, resume }); // syncOrders 完成后,从最新的 orders.json 读取 orderId 列表 const latestOrders = loadCurrentState('orders', datasets.orders.uniqueKey); const orderIdsForDetail = collectValidOrderIds(latestOrders.records || []); summary.datasets.orderDetails = await syncOrderDetails(page, orderIdsForDetail); - summary.datasets.bills = await syncBills(page, { incremental: !config.fullSync }); + summary.datasets.bills = await syncBills(page, { incremental: !config.fullSync, resume }); summary.datasets.messages = await syncMessages(page, { incremental: !config.fullSync }); summary.finishedAt = new Date().toISOString(); @@ -614,7 +646,7 @@ async function syncCustomerDetails(page) { async function syncOrders(page, options = {}) { await runtimeCheckpoint('同步订单'); const dataset = datasets.orders; - const { incremental = false } = options; + const { incremental = false, resume = false } = options; let windows; if (!incremental) { @@ -623,7 +655,16 @@ async function syncOrders(page, options = {}) { windows = await buildIncrementalOrderWindows(); } - const allRecords = []; + const resumeCheckpoint = resume ? loadLatestOrdersCheckpoint() : null; + if (resumeCheckpoint?.windowStart) { + const resumeIndex = windows.findIndex((window) => window.start === resumeCheckpoint.windowStart && window.end === resumeCheckpoint.windowEnd); + if (resumeIndex >= 0) { + windows = windows.slice(resumeIndex); + console.log(`[订单续爬] 从 checkpoint 恢复: ${resumeCheckpoint.windowStart} ~ ${resumeCheckpoint.windowEnd}, page=${resumeCheckpoint.pageNum || 1}, records=${(resumeCheckpoint.records || []).length}`); + } + } + + const allNormalizedRecords = []; for (const window of windows) { await runtimeCheckpoint(`订单窗口 ${window.start} ~ ${window.end}`); @@ -632,15 +673,46 @@ async function syncOrders(page, options = {}) { await setDateRange(page, window.start, window.end); await clickQuery(page); await trySetPageSize(page, dataset.pageSize); - const records = await scrapePagedTable(page, dataset, window); - allRecords.push(...records); - if (hasDbConfig()) { - const normalizedWindowRecords = dedupeByHash(normalizeDatasetRecords(dataset, records, window)); - await upsertOrders(normalizedWindowRecords); + let windowNormalizedRecords = []; + let resumeFromPage = 0; + let shouldContinueScrape = true; + if (resumeCheckpoint?.windowStart === window.start && resumeCheckpoint?.windowEnd === window.end) { + windowNormalizedRecords = Array.isArray(resumeCheckpoint.records) ? resumeCheckpoint.records : []; + resumeFromPage = Number.parseInt(String(resumeCheckpoint.pageNum || 0), 10) || 0; + if (resumeFromPage > 0) { + const moved = await moveOrdersToResumeStart(page, resumeFromPage); + if (!moved) { + console.log(`[订单续爬] checkpoint 已在最后一页,无需继续抓取 window=${window.start}~${window.end}`); + shouldContinueScrape = false; + } + } } + + let records = []; + if (shouldContinueScrape) { + records = await scrapePagedTable(page, dataset, window, { + onPage: async ({ pageNum, pageRows }) => { + const normalizedPageRows = normalizeDatasetRecords(dataset, pageRows, window); + windowNormalizedRecords.push(...normalizedPageRows); + if (hasDbConfig()) { + await upsertOrders(normalizedPageRows); + } + await saveOrdersCheckpoint(dataset, window, pageNum, windowNormalizedRecords); + }, + }); + } + + if (resumeFromPage === 0) { + windowNormalizedRecords = normalizeDatasetRecords(dataset, records, window); + if (hasDbConfig()) { + await upsertOrders(dedupeByHash(windowNormalizedRecords)); + } + } + + allNormalizedRecords.push(...windowNormalizedRecords); } - return persistDataset(dataset, dedupeByHash(allRecords), {}); + return persistNormalizedDataset(dataset, dedupeByHash(allNormalizedRecords)); } async function buildIncrementalOrderWindows() { @@ -828,6 +900,22 @@ async function saveBillsCheckpoint(dataset, month, pageNum, normalizedRecords) { console.log(`[账单检查点] 已落盘: month=${month}, page=${pageNum}, records=${normalized.length}`); } +async function saveOrdersCheckpoint(dataset, window, pageNum, normalizedRecords) { + const normalized = dedupeByHash(normalizedRecords); + const checkpointName = `${window.start}_${window.end}`.replace(/[^0-9_-]/g, '-'); + saveCheckpoint(dataset.name, checkpointName, { + windowStart: window.start, + windowEnd: window.end, + pageNum, + savedAt: new Date().toISOString(), + stats: { + total: normalized.length, + }, + records: normalized, + }); + console.log(`[订单检查点] 已落盘: ${window.start} ~ ${window.end}, page=${pageNum}, records=${normalized.length}`); +} + function normalizeDatasetRecords(dataset, records, context) { return records.map((record) => withHash(dataset.normalize(record, record.__context || context))); } @@ -846,6 +934,20 @@ async function moveBillsToResumeStart(page, resumeFromPage) { return moved; } +async function moveOrdersToResumeStart(page, resumeFromPage) { + if (resumeFromPage <= 0) { + return true; + } + + const reached = await jumpToPage(page, resumeFromPage); + if (!reached) { + throw new Error(`订单续爬失败:无法定位到 checkpoint 页码 ${resumeFromPage}`); + } + + const moved = await gotoNextPage(page); + return moved; +} + async function getLatestBillConsumptionDate() { if (!hasDbConfig()) { console.warn('[增量模式] 未配置数据库连接,无法读取账单水位,回退到当前日期');