diff --git a/aliyun-sync/COMMANDS.md b/aliyun-sync/COMMANDS.md index d848be1..413bbd0 100644 --- a/aliyun-sync/COMMANDS.md +++ b/aliyun-sync/COMMANDS.md @@ -59,6 +59,17 @@ npm run sync npm run bills ``` +### 基于 checkpoint 断点续爬账单 + +```powershell +npm run bills -- --resume +``` + +作用: + +- 自动读取 `data/checkpoints/bills/` 下最新 checkpoint。 +- 从 checkpoint 记录的月份和页码之后继续抓取。 + ### 启动定时同步 ```powershell @@ -173,6 +184,12 @@ python aps_db_sync.py --sync-target bills python aps_db_sync.py --incremental --sync-target bills ``` +### 直接将最新 bills checkpoint 入库 + +```powershell +python aps_db_sync.py --sync-target bills --from-checkpoint +``` + ### 查询数据库最新账单消费时间 ```powershell diff --git a/aliyun-sync/aliyun-aps-sync/README.md b/aliyun-sync/aliyun-aps-sync/README.md index 66dd376..52596cf 100644 --- a/aliyun-sync/aliyun-aps-sync/README.md +++ b/aliyun-sync/aliyun-aps-sync/README.md @@ -43,6 +43,18 @@ npm run login npm run sync ``` +仅抓账单: + +```bash +npm run bills +``` + +如果要从最新账单 checkpoint 继续: + +```bash +npm run bills -- --resume +``` + 如果需要在增量模式下让订单和订单详情从指定日期开始补抓,可以配置: ```bash diff --git a/aliyun-sync/aliyun-aps-sync/src/index.js b/aliyun-sync/aliyun-aps-sync/src/index.js index 047a77c..04f0e64 100644 --- a/aliyun-sync/aliyun-aps-sync/src/index.js +++ b/aliyun-sync/aliyun-aps-sync/src/index.js @@ -1,7 +1,9 @@ const args = process.argv.slice(2); const command = args[0] || 'sync'; +const extraArgs = args.slice(1); +const billsResume = extraArgs.includes('--resume'); -for (const arg of args.slice(1)) { +for (const arg of extraArgs) { if (arg.startsWith('--incremental-order-start-date=')) { process.env.ALIYUN_APS_INCREMENTAL_ORDER_START_DATE = arg.split('=').slice(1).join('='); } @@ -21,7 +23,7 @@ if (command === 'sync') { } if (command === 'bills') { - const summary = await syncBillsOnly(); + const summary = await syncBillsOnly({ resume: billsResume }); console.log(JSON.stringify(summary, null, 2)); process.exit(0); } diff --git a/aliyun-sync/aliyun-aps-sync/src/storage.js b/aliyun-sync/aliyun-aps-sync/src/storage.js index 04a25f1..75f1088 100644 --- a/aliyun-sync/aliyun-aps-sync/src/storage.js +++ b/aliyun-sync/aliyun-aps-sync/src/storage.js @@ -25,13 +25,42 @@ export const withHash = (record) => ({ __hash: crypto.createHash('sha256').update(JSON.stringify(record)).digest('hex'), }); -export const loadCurrentState = (dataset) => - readJson(path.join(config.dataDir, 'current', `${dataset}.json`), { records: [], index: {} }); +function buildIndex(records, uniqueKey = (record) => record?.__hash) { + const index = {}; + for (const record of records || []) { + const key = uniqueKey(record); + if (!key) continue; + index[key] = record; + } + return index; +} + +export const loadCurrentState = (dataset, uniqueKey = (record) => record?.__hash) => { + const payload = readJson(path.join(config.dataDir, 'current', `${dataset}.json`), { records: [], stats: {} }); + return { + ...payload, + records: payload.records || [], + index: buildIndex(payload.records || [], uniqueKey), + }; +}; + +function compactDatasetState(payload) { + return { + records: payload.records || [], + stats: payload.stats || { + total: (payload.records || []).length, + added: 0, + updated: 0, + removed: 0, + }, + }; +} export function saveDatasetRun(dataset, payload) { const stamp = nowStamp(); - writeJson(path.join(config.dataDir, 'history', dataset, `${stamp}.json`), payload); - writeJson(path.join(config.dataDir, 'current', `${dataset}.json`), payload); + const compactPayload = compactDatasetState(payload); + writeJson(path.join(config.dataDir, 'history', dataset, `${stamp}.json`), compactPayload); + writeJson(path.join(config.dataDir, 'current', `${dataset}.json`), compactPayload); return stamp; } diff --git a/aliyun-sync/aliyun-aps-sync/src/sync.js b/aliyun-sync/aliyun-aps-sync/src/sync.js index cbee3cf..a2af5fd 100644 --- a/aliyun-sync/aliyun-aps-sync/src/sync.js +++ b/aliyun-sync/aliyun-aps-sync/src/sync.js @@ -154,6 +154,37 @@ async function saveStorageState(context) { console.log(`[storageState] 已保存登录态快照: ${config.storageStateFile}`); } +function loadLatestBillsCheckpoint() { + const checkpointDir = path.join(config.dataDir, 'checkpoints', 'bills'); + if (!fs.existsSync(checkpointDir)) { + return null; + } + + const candidates = fs.readdirSync(checkpointDir) + .filter((fileName) => fileName.endsWith('.json')) + .map((fileName) => { + const filePath = path.join(checkpointDir, fileName); + const stat = fs.statSync(filePath); + return { fileName, filePath, mtimeMs: stat.mtimeMs }; + }) + .sort((a, b) => b.mtimeMs - a.mtimeMs); + + if (candidates.length === 0) { + return null; + } + + try { + const latest = JSON.parse(fs.readFileSync(candidates[0].filePath, 'utf-8')); + if (!latest || typeof latest !== 'object') { + return null; + } + return latest; + } catch (error) { + console.warn(`[账单检查点] 读取失败,忽略断点续爬: ${error.message}`); + return null; + } +} + async function getPageBodyPreview(page) { return page .evaluate(() => document.body?.innerText?.substring(0, 500) || '(空)') @@ -237,7 +268,7 @@ export async function syncAll() { summary.datasets.orders = await syncOrders(page); // syncOrders 完成后,从最新的 orders.json 读取 orderId 列表 - const latestOrders = loadCurrentState('orders'); + const latestOrders = loadCurrentState('orders', datasets.orders.uniqueKey); const orderIdsForDetail = collectValidOrderIds(latestOrders.records || []); summary.datasets.orderDetails = await syncOrderDetails(page, orderIdsForDetail); @@ -257,7 +288,7 @@ export async function syncAll() { } } -export async function syncBillsOnly() { +export async function syncBillsOnly(options = {}) { const runtimeController = getRuntimeController(); runtimeController.bind(); const context = await getContext(); @@ -266,7 +297,7 @@ export async function syncBillsOnly() { const summary = { startedAt: new Date().toISOString(), datasets: {} }; const page = context.pages()[0] || (await context.newPage()); - summary.datasets.bills = await syncBills(page); + summary.datasets.bills = await syncBills(page, options); summary.finishedAt = new Date().toISOString(); const stamp = nowStamp(); @@ -325,7 +356,7 @@ async function syncCustomers(page) { async function syncCustomerDetails(page) { await runtimeCheckpoint('同步客户详情'); const dataset = datasets.customerDetails; - const customersState = loadCurrentState('customers'); + const customersState = loadCurrentState('customers', datasets.customers.uniqueKey); const allAccountIds = collectValidAccountIds(customersState.records || []); if (allAccountIds.length === 0) { @@ -426,9 +457,10 @@ function normalizeConfiguredDate(value) { return normalized; } -async function syncBills(page) { +async function syncBills(page, options = {}) { await runtimeCheckpoint('同步账单'); const dataset = datasets.bills; + const { resume = false } = options; let months; let latestConsumptionDate = null; @@ -442,7 +474,16 @@ async function syncBills(page) { console.log(`[增量模式] 账单仅查询: ${incrementalMonth}${latestConsumptionDate ? `, 数据库最新消费时间: ${latestConsumptionDate}` : ''}`); } - const allRecords = []; + const resumeCheckpoint = resume ? loadLatestBillsCheckpoint() : null; + if (resumeCheckpoint?.month) { + const resumeIndex = months.indexOf(resumeCheckpoint.month); + if (resumeIndex >= 0) { + months = months.slice(resumeIndex); + console.log(`[账单续爬] 从 checkpoint 恢复: month=${resumeCheckpoint.month}, page=${resumeCheckpoint.pageNum || 1}, records=${(resumeCheckpoint.records || []).length}`); + } + } + + const allNormalizedRecords = []; for (const month of months) { await runtimeCheckpoint(`账单月份 ${month}`); @@ -451,30 +492,54 @@ async function syncBills(page) { await setMonthValue(page, month); await clickQuery(page); await trySetPageSize(page, dataset.pageSize); - const monthRecords = []; - let records = await scrapePagedTable(page, dataset, { month }, { - onPage: async ({ pageData, pageNum }) => { - monthRecords.push(...pageData.rows.map((row) => ({ ...row, __context: { month } }))); - let checkpointRecords = monthRecords; - if (latestConsumptionDate) { - checkpointRecords = monthRecords.filter((record) => isAfterLatestConsumptionDate(record, latestConsumptionDate)); + + let monthNormalizedRecords = []; + let resumeFromPage = 0; + let shouldContinueScrape = true; + if (resumeCheckpoint?.month === month) { + monthNormalizedRecords = Array.isArray(resumeCheckpoint.records) ? resumeCheckpoint.records : []; + resumeFromPage = Number.parseInt(String(resumeCheckpoint.pageNum || 0), 10) || 0; + if (resumeFromPage > 0) { + const moved = await moveBillsToResumeStart(page, resumeFromPage); + if (!moved) { + console.log(`[账单续爬] checkpoint 已在最后一页,无需继续抓取 month=${month}`); + shouldContinueScrape = false; } - await saveBillsCheckpoint(dataset, month, pageNum, checkpointRecords); - }, - }); - if (latestConsumptionDate) { - const before = records.length; - records = records.filter((record) => isAfterLatestConsumptionDate(record, latestConsumptionDate)); - console.log(`[增量模式] 账单按消费时间过滤: ${before} -> ${records.length}`); + } } - allRecords.push(...records); + + let rawRecords = []; + if (shouldContinueScrape) { + rawRecords = await scrapePagedTable(page, dataset, { month }, { + onPage: async ({ pageNum, pageRows }) => { + const normalizedPageRows = normalizeDatasetRecords(dataset, pageRows, { month }); + monthNormalizedRecords.push(...normalizedPageRows); + let checkpointRecords = monthNormalizedRecords; + if (latestConsumptionDate) { + checkpointRecords = monthNormalizedRecords.filter((record) => isAfterLatestConsumptionDate(record, latestConsumptionDate)); + } + await saveBillsCheckpoint(dataset, month, pageNum, checkpointRecords); + }, + }); + } + + if (resumeFromPage === 0) { + monthNormalizedRecords = normalizeDatasetRecords(dataset, rawRecords, { month }); + } + + if (latestConsumptionDate) { + const before = monthNormalizedRecords.length; + monthNormalizedRecords = monthNormalizedRecords.filter((record) => isAfterLatestConsumptionDate(record, latestConsumptionDate)); + console.log(`[增量模式] 账单按消费时间过滤: ${before} -> ${monthNormalizedRecords.length}`); + } + allNormalizedRecords.push(...monthNormalizedRecords); } - return persistDataset(dataset, dedupeByHash(allRecords), {}); + return persistNormalizedDataset(dataset, dedupeByHash(allNormalizedRecords)); } -async function saveBillsCheckpoint(dataset, month, pageNum, rawRecords) { - const normalized = dedupeByHash(rawRecords.map((record) => dataset.normalize(record, record.__context || {})).map(withHash)); +async function saveBillsCheckpoint(dataset, month, pageNum, normalizedRecords) { + const normalized = dedupeByHash(normalizedRecords); const checkpointName = `${month}-latest`; saveCheckpoint(dataset.name, checkpointName, { month, @@ -488,6 +553,24 @@ async function saveBillsCheckpoint(dataset, month, pageNum, rawRecords) { console.log(`[账单检查点] 已落盘: month=${month}, page=${pageNum}, records=${normalized.length}`); } +function normalizeDatasetRecords(dataset, records, context) { + return records.map((record) => withHash(dataset.normalize(record, record.__context || context))); +} + +async function moveBillsToResumeStart(page, resumeFromPage) { + if (resumeFromPage <= 0) { + return true; + } + + const reached = await jumpToPage(page, resumeFromPage); + if (!reached) { + throw new Error(`账单续爬失败:无法定位到 checkpoint 页码 ${resumeFromPage}`); + } + + const moved = await gotoNextPage(page); + return moved; +} + function getLatestBillConsumptionDate() { const scriptPath = path.resolve(config.rootDir, config.dbSyncScript); try { @@ -561,9 +644,13 @@ async function syncOrderDetails(page, cachedOrderIds) { } function persistDataset(dataset, records, context) { - const normalized = records.map((record) => withHash(dataset.normalize(record, record.__context || context))); - const previousState = loadCurrentState(dataset.name); - const nextState = diffRecords(previousState, normalized, dataset.uniqueKey); + const normalized = normalizeDatasetRecords(dataset, records, context); + return persistNormalizedDataset(dataset, normalized); +} + +function persistNormalizedDataset(dataset, normalizedRecords) { + const previousState = loadCurrentState(dataset.name, dataset.uniqueKey); + const nextState = diffRecords(previousState, normalizedRecords, dataset.uniqueKey); const stamp = saveDatasetRun(dataset.name, nextState); saveDelta(dataset.name, stamp, nextState.delta); return { @@ -629,9 +716,10 @@ async function waitUntilReady(page, heading, timeout = 120000, options = {}) { } async function scrapePagedTable(page, dataset, context, options = {}) { - const { onPage } = options; + const { onPage, skipInitialPage = false } = options; const pages = []; const visited = new Set(); + let shouldSkipCurrentPage = skipInitialPage; while (true) { await runtimeCheckpoint(`抓取 ${dataset.name} 分页`); @@ -640,6 +728,16 @@ async function scrapePagedTable(page, dataset, context, options = {}) { const pageNum = await currentPageNumber(page); const pageKey = `${pageNum}-${pageData.rows.length}`; console.log(`[抓取] 第${pageNum}页, ${pageData.rows.length}行, key="${pageKey}"`); + if (shouldSkipCurrentPage) { + console.log(`[抓取] 跳过 checkpoint 已保存页: ${pageNum}`); + shouldSkipCurrentPage = false; + const moved = await gotoNextPage(page); + if (!moved) { + console.log(`[抓取] checkpoint 已位于最后一页,停止`); + break; + } + continue; + } if (visited.has(pageKey)) { console.log(`[抓取] 重复页面key,停止翻页`); break; @@ -732,6 +830,56 @@ async function currentPageNumber(page) { return Number.parseInt((await active.first().innerText()).trim(), 10) || 1; } +async function jumpToPage(page, targetPage) { + if (targetPage <= 1) { + return true; + } + + const current = await currentPageNumber(page); + if (current === targetPage) { + return true; + } + + const jumpInputCandidates = [ + '.next-pagination-jump-input input', + 'input[aria-label*="页码"]', + 'input[aria-label*="页"]', + ]; + + for (const selector of jumpInputCandidates) { + const input = page.locator(selector).first(); + if ((await input.count()) === 0) { + continue; + } + await input.click().catch(() => null); + await sleep(100); + await page.keyboard.press('Control+A').catch(() => null); + await page.keyboard.type(String(targetPage), { delay: 20 }).catch(() => null); + await page.keyboard.press('Enter').catch(() => null); + await sleep(1500); + const afterJump = await currentPageNumber(page); + if (afterJump === targetPage) { + console.log(`[账单续爬] 已跳转到第 ${targetPage} 页`); + return true; + } + } + + console.warn(`[账单续爬] 未找到可用跳页输入框,尝试顺序翻到第 ${targetPage} 页`); + let guard = 0; + while (guard < targetPage + 5) { + const currentPage = await currentPageNumber(page); + if (currentPage >= targetPage) { + return currentPage === targetPage; + } + const moved = await gotoNextPage(page); + if (!moved) { + return false; + } + guard += 1; + } + return false; +} + async function gotoNextPage(page) { await runtimeCheckpoint('翻页'); const before = await currentPageNumber(page); diff --git a/aliyun-sync/aps-aliyun-sync/aps_db_sync.py b/aliyun-sync/aps-aliyun-sync/aps_db_sync.py index e15b2e8..f56ecc8 100644 --- a/aliyun-sync/aps-aliyun-sync/aps_db_sync.py +++ b/aliyun-sync/aps-aliyun-sync/aps_db_sync.py @@ -491,7 +491,27 @@ class APSSyncer: return [cast(JsonDict, record) for record in data_list if isinstance(record, dict)] return [] - def resolve_data_files(self, data_dir: str, sync_target: SyncTarget = SYNC_TARGET_ALL) -> tuple[Path, Path, Path, Path, Path]: + def resolve_latest_bills_checkpoint(self, data_dir: str) -> Path: + current_root = Path(data_dir) + checkpoint_root = current_root.parent / "checkpoints" / "bills" + if not checkpoint_root.exists() or not checkpoint_root.is_dir(): + raise FileNotFoundError(f"Bills checkpoint directory not found: {checkpoint_root}") + + candidates = sorted( + checkpoint_root.glob("*.json"), + key=lambda item: item.stat().st_mtime, + reverse=True, + ) + if not candidates: + raise FileNotFoundError(f"No bills checkpoint file found in: {checkpoint_root}") + return candidates[0] + + def resolve_data_files( + self, + data_dir: str, + sync_target: SyncTarget = SYNC_TARGET_ALL, + from_checkpoint: bool = False, + ) -> tuple[Path, Path, Path, Path, Path]: root = Path(data_dir) if not root.exists() or not root.is_dir(): raise FileNotFoundError(f"Data directory not found: {root}") @@ -499,9 +519,12 @@ class APSSyncer: customers_file = root / "customers.json" orders_file = root / "orders.json" order_details_file = root / "orderDetails.json" - bills_file = root / "bills.json" + bills_file = self.resolve_latest_bills_checkpoint(data_dir) if from_checkpoint else root / "bills.json" customer_details_file = root / "customerDetails.json" + if from_checkpoint and sync_target != SYNC_TARGET_BILLS: + raise ValueError("--from-checkpoint 目前仅支持 --sync-target bills") + required_files_by_target = { SYNC_TARGET_ALL: (customers_file, orders_file, order_details_file, bills_file), SYNC_TARGET_CUSTOMER: (customers_file,), @@ -954,12 +977,19 @@ class APSSyncer: self.stats["bills"] += 1 # ---- Main sync entry ---- - def sync_from_json(self, data_dir: str, incremental: bool = False, sync_target: str = SYNC_TARGET_ALL) -> StatsDict: + def sync_from_json( + self, + data_dir: str, + incremental: bool = False, + sync_target: str = SYNC_TARGET_ALL, + from_checkpoint: bool = False, + ) -> StatsDict: start = datetime.now() normalized_sync_target = normalize_sync_target(sync_target) customers_file, orders_file, order_details_file, bills_file, customer_details_file = self.resolve_data_files( data_dir, normalized_sync_target, + from_checkpoint, ) logger.info( "Loading source files from %s%s%s", @@ -1129,11 +1159,18 @@ def main() -> None: default=SYNC_TARGET_ALL, help="选择同步对象: all/customer/order/orderdetails/bills", ) + _ = parser.add_argument( + "--from-checkpoint", + action="store_true", + default=False, + help="仅对 bills 生效:直接从 data/checkpoints/bills 最新 checkpoint 文件入库", + ) args = parser.parse_args() data_dir = cast(str, args.dir) incremental = cast(bool, args.incremental) latest_bill_consumption_time = cast(bool, args.latest_bill_consumption_time) sync_target = cast(str, args.sync_target) + from_checkpoint = cast(bool, args.from_checkpoint) syncer = APSSyncer(db_config=DB_CONFIG) if latest_bill_consumption_time: @@ -1145,7 +1182,7 @@ def main() -> None: return finally: syncer.close() - _ = syncer.sync_from_json(data_dir, incremental=incremental, sync_target=sync_target) + _ = syncer.sync_from_json(data_dir, incremental=incremental, sync_target=sync_target, from_checkpoint=from_checkpoint) if __name__ == "__main__":