断点爬取

This commit is contained in:
ray
2026-04-24 18:03:26 +08:00
parent 19e8a833ba
commit 5c1d0f3fad
3 changed files with 71 additions and 5 deletions

View File

@@ -160,7 +160,7 @@ export const datasets = {
name: 'bills',
url: `${config.baseUrl}/#/detail/bill/~/costCenter/bill`,
heading: '账单查询',
pageSize: 100,
pageSize: 20,
uniqueKey: (record) => record.__hash,
normalize: (record, context) => ({
billingMonth: record['账期'] || '',

View File

@@ -43,6 +43,10 @@ export function saveRunSummary(stamp, summary) {
writeJson(path.join(config.dataDir, 'runs', `${stamp}.json`), summary);
}
export function saveCheckpoint(dataset, name, payload) {
writeJson(path.join(config.dataDir, 'checkpoints', dataset, `${name}.json`), payload);
}
export function diffRecords(previousState, records, uniqueKey) {
const previousIndex = previousState.index || {};
const nextIndex = {};

View File

@@ -10,6 +10,7 @@ import {
diffRecords,
loadCurrentState,
nowStamp,
saveCheckpoint,
saveDatasetRun,
saveDelta,
saveRunSummary,
@@ -450,7 +451,17 @@ async function syncBills(page) {
await setMonthValue(page, month);
await clickQuery(page);
await trySetPageSize(page, dataset.pageSize);
let records = await scrapePagedTable(page, dataset, { month });
const monthRecords = [];
let records = await scrapePagedTable(page, dataset, { month }, {
onPage: async ({ pageData, pageNum }) => {
monthRecords.push(...pageData.rows.map((row) => ({ ...row, __context: { month } })));
let checkpointRecords = monthRecords;
if (latestConsumptionDate) {
checkpointRecords = monthRecords.filter((record) => isAfterLatestConsumptionDate(record, latestConsumptionDate));
}
await saveBillsCheckpoint(dataset, month, pageNum, checkpointRecords);
},
});
if (latestConsumptionDate) {
const before = records.length;
records = records.filter((record) => isAfterLatestConsumptionDate(record, latestConsumptionDate));
@@ -462,6 +473,21 @@ async function syncBills(page) {
return persistDataset(dataset, dedupeByHash(allRecords), {});
}
async function saveBillsCheckpoint(dataset, month, pageNum, rawRecords) {
const normalized = dedupeByHash(rawRecords.map((record) => dataset.normalize(record, record.__context || {})).map(withHash));
const checkpointName = `${month}-latest`;
saveCheckpoint(dataset.name, checkpointName, {
month,
pageNum,
savedAt: new Date().toISOString(),
stats: {
total: normalized.length,
},
records: normalized,
});
console.log(`[账单检查点] 已落盘: month=${month}, page=${pageNum}, records=${normalized.length}`);
}
function getLatestBillConsumptionDate() {
const scriptPath = path.resolve(config.rootDir, config.dbSyncScript);
try {
@@ -602,7 +628,8 @@ async function waitUntilReady(page, heading, timeout = 120000, options = {}) {
await sleep(1500);
}
async function scrapePagedTable(page, dataset, context) {
async function scrapePagedTable(page, dataset, context, options = {}) {
const { onPage } = options;
const pages = [];
const visited = new Set();
@@ -618,7 +645,11 @@ async function scrapePagedTable(page, dataset, context) {
break;
}
visited.add(pageKey);
pages.push(...pageData.rows.map((row) => ({ ...row, __context: context })));
const pageRows = pageData.rows.map((row) => ({ ...row, __context: context }));
pages.push(...pageRows);
if (onPage) {
await onPage({ pageData, pageNum, pageRows });
}
const moved = await gotoNextPage(page);
if (!moved) {
@@ -631,6 +662,22 @@ async function scrapePagedTable(page, dataset, context) {
return pages;
}
async function raiseIfSessionExpired(page, label) {
const { currentUrl, bodyText, isAuthPage } = await detectAuthRedirect(page);
if (!isAuthPage) {
return;
}
console.error(`[鉴权] ${label} 时检测到登录页/鉴权页: ${currentUrl}`);
console.error(`[鉴权] 页面内容前500字: ${bodyText}`);
try {
await sendLoginAlert(currentUrl);
} catch (notifyErr) {
console.error('[通知] 发送登录提醒失败:', notifyErr.message);
}
throw new Error(`运行过程中登录态失效(${label})。请重新执行 npm run login 后再继续同步。`);
}
async function extractTable(page) {
return page.evaluate(() => {
const normalize = (value) =>
@@ -670,7 +717,12 @@ async function extractTable(page) {
async function waitForTableRows(page) {
await runtimeCheckpoint('等待表格加载');
await page.waitForFunction(() => document.querySelectorAll('table tbody tr').length > 0, null, { timeout: 120000 });
try {
await page.waitForFunction(() => document.querySelectorAll('table tbody tr').length > 0, null, { timeout: 120000 });
} catch (error) {
await raiseIfSessionExpired(page, '等待表格加载');
throw error;
}
await sleep(800);
}
@@ -700,9 +752,19 @@ async function gotoNextPage(page) {
// 用 Playwright click而非 DOM click确保 React 事件正常触发
await nextBtn.click();
await sleep(2000);
await raiseIfSessionExpired(page, `翻页 ${before} -> next`);
const after = await currentPageNumber(page);
console.log(`[翻页] ${before} -> ${after}`);
if (before > 1 && after === 1) {
throw new Error(`分页从第 ${before} 页异常回退到第 1 页,疑似登录态失效或页面会话已重置。请重新执行 npm run login 后再继续同步。`);
}
if (after < before) {
throw new Error(`分页从第 ${before} 页异常回退到第 ${after} 页,疑似登录态失效或页面状态被重置。请重新执行 npm run login 后再继续同步。`);
}
return before !== after;
}