断点爬取

This commit is contained in:
ray
2026-04-27 09:16:07 +08:00
parent 5c1d0f3fad
commit 2e4ce07340
6 changed files with 283 additions and 38 deletions

View File

@@ -59,6 +59,17 @@ npm run sync
npm run bills
```
### 基于 checkpoint 断点续爬账单
```powershell
npm run bills -- --resume
```
作用:
- 自动读取 `data/checkpoints/bills/` 下最新 checkpoint。
- 从 checkpoint 记录的月份和页码之后继续抓取。
### 启动定时同步
```powershell
@@ -173,6 +184,12 @@ python aps_db_sync.py --sync-target bills
python aps_db_sync.py --incremental --sync-target bills
```
### 直接将最新 bills checkpoint 入库
```powershell
python aps_db_sync.py --sync-target bills --from-checkpoint
```
### 查询数据库最新账单消费时间
```powershell

View File

@@ -43,6 +43,18 @@ npm run login
npm run sync
```
仅抓账单:
```bash
npm run bills
```
如果要从最新账单 checkpoint 继续:
```bash
npm run bills -- --resume
```
如果需要在增量模式下让订单和订单详情从指定日期开始补抓,可以配置:
```bash

View File

@@ -1,7 +1,9 @@
const args = process.argv.slice(2);
const command = args[0] || 'sync';
const extraArgs = args.slice(1);
const billsResume = extraArgs.includes('--resume');
for (const arg of args.slice(1)) {
for (const arg of extraArgs) {
if (arg.startsWith('--incremental-order-start-date=')) {
process.env.ALIYUN_APS_INCREMENTAL_ORDER_START_DATE = arg.split('=').slice(1).join('=');
}
@@ -21,7 +23,7 @@ if (command === 'sync') {
}
if (command === 'bills') {
const summary = await syncBillsOnly();
const summary = await syncBillsOnly({ resume: billsResume });
console.log(JSON.stringify(summary, null, 2));
process.exit(0);
}

View File

@@ -25,13 +25,42 @@ export const withHash = (record) => ({
__hash: crypto.createHash('sha256').update(JSON.stringify(record)).digest('hex'),
});
export const loadCurrentState = (dataset) =>
readJson(path.join(config.dataDir, 'current', `${dataset}.json`), { records: [], index: {} });
function buildIndex(records, uniqueKey = (record) => record?.__hash) {
const index = {};
for (const record of records || []) {
const key = uniqueKey(record);
if (!key) continue;
index[key] = record;
}
return index;
}
export const loadCurrentState = (dataset, uniqueKey = (record) => record?.__hash) => {
const payload = readJson(path.join(config.dataDir, 'current', `${dataset}.json`), { records: [], stats: {} });
return {
...payload,
records: payload.records || [],
index: buildIndex(payload.records || [], uniqueKey),
};
};
function compactDatasetState(payload) {
return {
records: payload.records || [],
stats: payload.stats || {
total: (payload.records || []).length,
added: 0,
updated: 0,
removed: 0,
},
};
}
export function saveDatasetRun(dataset, payload) {
const stamp = nowStamp();
writeJson(path.join(config.dataDir, 'history', dataset, `${stamp}.json`), payload);
writeJson(path.join(config.dataDir, 'current', `${dataset}.json`), payload);
const compactPayload = compactDatasetState(payload);
writeJson(path.join(config.dataDir, 'history', dataset, `${stamp}.json`), compactPayload);
writeJson(path.join(config.dataDir, 'current', `${dataset}.json`), compactPayload);
return stamp;
}

View File

@@ -154,6 +154,37 @@ async function saveStorageState(context) {
console.log(`[storageState] 已保存登录态快照: ${config.storageStateFile}`);
}
function loadLatestBillsCheckpoint() {
const checkpointDir = path.join(config.dataDir, 'checkpoints', 'bills');
if (!fs.existsSync(checkpointDir)) {
return null;
}
const candidates = fs.readdirSync(checkpointDir)
.filter((fileName) => fileName.endsWith('.json'))
.map((fileName) => {
const filePath = path.join(checkpointDir, fileName);
const stat = fs.statSync(filePath);
return { fileName, filePath, mtimeMs: stat.mtimeMs };
})
.sort((a, b) => b.mtimeMs - a.mtimeMs);
if (candidates.length === 0) {
return null;
}
try {
const latest = JSON.parse(fs.readFileSync(candidates[0].filePath, 'utf-8'));
if (!latest || typeof latest !== 'object') {
return null;
}
return latest;
} catch (error) {
console.warn(`[账单检查点] 读取失败,忽略断点续爬: ${error.message}`);
return null;
}
}
async function getPageBodyPreview(page) {
return page
.evaluate(() => document.body?.innerText?.substring(0, 500) || '(空)')
@@ -237,7 +268,7 @@ export async function syncAll() {
summary.datasets.orders = await syncOrders(page);
// syncOrders 完成后,从最新的 orders.json 读取 orderId 列表
const latestOrders = loadCurrentState('orders');
const latestOrders = loadCurrentState('orders', datasets.orders.uniqueKey);
const orderIdsForDetail = collectValidOrderIds(latestOrders.records || []);
summary.datasets.orderDetails = await syncOrderDetails(page, orderIdsForDetail);
@@ -257,7 +288,7 @@ export async function syncAll() {
}
}
export async function syncBillsOnly() {
export async function syncBillsOnly(options = {}) {
const runtimeController = getRuntimeController();
runtimeController.bind();
const context = await getContext();
@@ -266,7 +297,7 @@ export async function syncBillsOnly() {
const summary = { startedAt: new Date().toISOString(), datasets: {} };
const page = context.pages()[0] || (await context.newPage());
summary.datasets.bills = await syncBills(page);
summary.datasets.bills = await syncBills(page, options);
summary.finishedAt = new Date().toISOString();
const stamp = nowStamp();
@@ -325,7 +356,7 @@ async function syncCustomers(page) {
async function syncCustomerDetails(page) {
await runtimeCheckpoint('同步客户详情');
const dataset = datasets.customerDetails;
const customersState = loadCurrentState('customers');
const customersState = loadCurrentState('customers', datasets.customers.uniqueKey);
const allAccountIds = collectValidAccountIds(customersState.records || []);
if (allAccountIds.length === 0) {
@@ -426,9 +457,10 @@ function normalizeConfiguredDate(value) {
return normalized;
}
async function syncBills(page) {
async function syncBills(page, options = {}) {
await runtimeCheckpoint('同步账单');
const dataset = datasets.bills;
const { resume = false } = options;
let months;
let latestConsumptionDate = null;
@@ -442,7 +474,16 @@ async function syncBills(page) {
console.log(`[增量模式] 账单仅查询: ${incrementalMonth}${latestConsumptionDate ? `, 数据库最新消费时间: ${latestConsumptionDate}` : ''}`);
}
const allRecords = [];
const resumeCheckpoint = resume ? loadLatestBillsCheckpoint() : null;
if (resumeCheckpoint?.month) {
const resumeIndex = months.indexOf(resumeCheckpoint.month);
if (resumeIndex >= 0) {
months = months.slice(resumeIndex);
console.log(`[账单续爬] 从 checkpoint 恢复: month=${resumeCheckpoint.month}, page=${resumeCheckpoint.pageNum || 1}, records=${(resumeCheckpoint.records || []).length}`);
}
}
const allNormalizedRecords = [];
for (const month of months) {
await runtimeCheckpoint(`账单月份 ${month}`);
@@ -451,30 +492,54 @@ async function syncBills(page) {
await setMonthValue(page, month);
await clickQuery(page);
await trySetPageSize(page, dataset.pageSize);
const monthRecords = [];
let records = await scrapePagedTable(page, dataset, { month }, {
onPage: async ({ pageData, pageNum }) => {
monthRecords.push(...pageData.rows.map((row) => ({ ...row, __context: { month } })));
let checkpointRecords = monthRecords;
if (latestConsumptionDate) {
checkpointRecords = monthRecords.filter((record) => isAfterLatestConsumptionDate(record, latestConsumptionDate));
let monthNormalizedRecords = [];
let resumeFromPage = 0;
let shouldContinueScrape = true;
if (resumeCheckpoint?.month === month) {
monthNormalizedRecords = Array.isArray(resumeCheckpoint.records) ? resumeCheckpoint.records : [];
resumeFromPage = Number.parseInt(String(resumeCheckpoint.pageNum || 0), 10) || 0;
if (resumeFromPage > 0) {
const moved = await moveBillsToResumeStart(page, resumeFromPage);
if (!moved) {
console.log(`[账单续爬] checkpoint 已在最后一页,无需继续抓取 month=${month}`);
shouldContinueScrape = false;
}
await saveBillsCheckpoint(dataset, month, pageNum, checkpointRecords);
},
});
if (latestConsumptionDate) {
const before = records.length;
records = records.filter((record) => isAfterLatestConsumptionDate(record, latestConsumptionDate));
console.log(`[增量模式] 账单按消费时间过滤: ${before} -> ${records.length}`);
}
}
allRecords.push(...records);
let rawRecords = [];
if (shouldContinueScrape) {
rawRecords = await scrapePagedTable(page, dataset, { month }, {
onPage: async ({ pageNum, pageRows }) => {
const normalizedPageRows = normalizeDatasetRecords(dataset, pageRows, { month });
monthNormalizedRecords.push(...normalizedPageRows);
let checkpointRecords = monthNormalizedRecords;
if (latestConsumptionDate) {
checkpointRecords = monthNormalizedRecords.filter((record) => isAfterLatestConsumptionDate(record, latestConsumptionDate));
}
await saveBillsCheckpoint(dataset, month, pageNum, checkpointRecords);
},
});
}
if (resumeFromPage === 0) {
monthNormalizedRecords = normalizeDatasetRecords(dataset, rawRecords, { month });
}
if (latestConsumptionDate) {
const before = monthNormalizedRecords.length;
monthNormalizedRecords = monthNormalizedRecords.filter((record) => isAfterLatestConsumptionDate(record, latestConsumptionDate));
console.log(`[增量模式] 账单按消费时间过滤: ${before} -> ${monthNormalizedRecords.length}`);
}
allNormalizedRecords.push(...monthNormalizedRecords);
}
return persistDataset(dataset, dedupeByHash(allRecords), {});
return persistNormalizedDataset(dataset, dedupeByHash(allNormalizedRecords));
}
async function saveBillsCheckpoint(dataset, month, pageNum, rawRecords) {
const normalized = dedupeByHash(rawRecords.map((record) => dataset.normalize(record, record.__context || {})).map(withHash));
async function saveBillsCheckpoint(dataset, month, pageNum, normalizedRecords) {
const normalized = dedupeByHash(normalizedRecords);
const checkpointName = `${month}-latest`;
saveCheckpoint(dataset.name, checkpointName, {
month,
@@ -488,6 +553,24 @@ async function saveBillsCheckpoint(dataset, month, pageNum, rawRecords) {
console.log(`[账单检查点] 已落盘: month=${month}, page=${pageNum}, records=${normalized.length}`);
}
function normalizeDatasetRecords(dataset, records, context) {
return records.map((record) => withHash(dataset.normalize(record, record.__context || context)));
}
async function moveBillsToResumeStart(page, resumeFromPage) {
if (resumeFromPage <= 0) {
return true;
}
const reached = await jumpToPage(page, resumeFromPage);
if (!reached) {
throw new Error(`账单续爬失败:无法定位到 checkpoint 页码 ${resumeFromPage}`);
}
const moved = await gotoNextPage(page);
return moved;
}
function getLatestBillConsumptionDate() {
const scriptPath = path.resolve(config.rootDir, config.dbSyncScript);
try {
@@ -561,9 +644,13 @@ async function syncOrderDetails(page, cachedOrderIds) {
}
function persistDataset(dataset, records, context) {
const normalized = records.map((record) => withHash(dataset.normalize(record, record.__context || context)));
const previousState = loadCurrentState(dataset.name);
const nextState = diffRecords(previousState, normalized, dataset.uniqueKey);
const normalized = normalizeDatasetRecords(dataset, records, context);
return persistNormalizedDataset(dataset, normalized);
}
function persistNormalizedDataset(dataset, normalizedRecords) {
const previousState = loadCurrentState(dataset.name, dataset.uniqueKey);
const nextState = diffRecords(previousState, normalizedRecords, dataset.uniqueKey);
const stamp = saveDatasetRun(dataset.name, nextState);
saveDelta(dataset.name, stamp, nextState.delta);
return {
@@ -629,9 +716,10 @@ async function waitUntilReady(page, heading, timeout = 120000, options = {}) {
}
async function scrapePagedTable(page, dataset, context, options = {}) {
const { onPage } = options;
const { onPage, skipInitialPage = false } = options;
const pages = [];
const visited = new Set();
let shouldSkipCurrentPage = skipInitialPage;
while (true) {
await runtimeCheckpoint(`抓取 ${dataset.name} 分页`);
@@ -640,6 +728,16 @@ async function scrapePagedTable(page, dataset, context, options = {}) {
const pageNum = await currentPageNumber(page);
const pageKey = `${pageNum}-${pageData.rows.length}`;
console.log(`[抓取] 第${pageNum}页, ${pageData.rows.length}行, key="${pageKey}"`);
if (shouldSkipCurrentPage) {
console.log(`[抓取] 跳过 checkpoint 已保存页: ${pageNum}`);
shouldSkipCurrentPage = false;
const moved = await gotoNextPage(page);
if (!moved) {
console.log(`[抓取] checkpoint 已位于最后一页,停止`);
break;
}
continue;
}
if (visited.has(pageKey)) {
console.log(`[抓取] 重复页面key停止翻页`);
break;
@@ -732,6 +830,56 @@ async function currentPageNumber(page) {
return Number.parseInt((await active.first().innerText()).trim(), 10) || 1;
}
async function jumpToPage(page, targetPage) {
if (targetPage <= 1) {
return true;
}
const current = await currentPageNumber(page);
if (current === targetPage) {
return true;
}
const jumpInputCandidates = [
'.next-pagination-jump-input input',
'input[aria-label*="页码"]',
'input[aria-label*="页"]',
];
for (const selector of jumpInputCandidates) {
const input = page.locator(selector).first();
if ((await input.count()) === 0) {
continue;
}
await input.click().catch(() => null);
await sleep(100);
await page.keyboard.press('Control+A').catch(() => null);
await page.keyboard.type(String(targetPage), { delay: 20 }).catch(() => null);
await page.keyboard.press('Enter').catch(() => null);
await sleep(1500);
const afterJump = await currentPageNumber(page);
if (afterJump === targetPage) {
console.log(`[账单续爬] 已跳转到第 ${targetPage}`);
return true;
}
}
console.warn(`[账单续爬] 未找到可用跳页输入框,尝试顺序翻到第 ${targetPage}`);
let guard = 0;
while (guard < targetPage + 5) {
const currentPage = await currentPageNumber(page);
if (currentPage >= targetPage) {
return currentPage === targetPage;
}
const moved = await gotoNextPage(page);
if (!moved) {
return false;
}
guard += 1;
}
return false;
}
async function gotoNextPage(page) {
await runtimeCheckpoint('翻页');
const before = await currentPageNumber(page);

View File

@@ -491,7 +491,27 @@ class APSSyncer:
return [cast(JsonDict, record) for record in data_list if isinstance(record, dict)]
return []
def resolve_data_files(self, data_dir: str, sync_target: SyncTarget = SYNC_TARGET_ALL) -> tuple[Path, Path, Path, Path, Path]:
def resolve_latest_bills_checkpoint(self, data_dir: str) -> Path:
current_root = Path(data_dir)
checkpoint_root = current_root.parent / "checkpoints" / "bills"
if not checkpoint_root.exists() or not checkpoint_root.is_dir():
raise FileNotFoundError(f"Bills checkpoint directory not found: {checkpoint_root}")
candidates = sorted(
checkpoint_root.glob("*.json"),
key=lambda item: item.stat().st_mtime,
reverse=True,
)
if not candidates:
raise FileNotFoundError(f"No bills checkpoint file found in: {checkpoint_root}")
return candidates[0]
def resolve_data_files(
self,
data_dir: str,
sync_target: SyncTarget = SYNC_TARGET_ALL,
from_checkpoint: bool = False,
) -> tuple[Path, Path, Path, Path, Path]:
root = Path(data_dir)
if not root.exists() or not root.is_dir():
raise FileNotFoundError(f"Data directory not found: {root}")
@@ -499,9 +519,12 @@ class APSSyncer:
customers_file = root / "customers.json"
orders_file = root / "orders.json"
order_details_file = root / "orderDetails.json"
bills_file = root / "bills.json"
bills_file = self.resolve_latest_bills_checkpoint(data_dir) if from_checkpoint else root / "bills.json"
customer_details_file = root / "customerDetails.json"
if from_checkpoint and sync_target != SYNC_TARGET_BILLS:
raise ValueError("--from-checkpoint 目前仅支持 --sync-target bills")
required_files_by_target = {
SYNC_TARGET_ALL: (customers_file, orders_file, order_details_file, bills_file),
SYNC_TARGET_CUSTOMER: (customers_file,),
@@ -954,12 +977,19 @@ class APSSyncer:
self.stats["bills"] += 1
# ---- Main sync entry ----
def sync_from_json(self, data_dir: str, incremental: bool = False, sync_target: str = SYNC_TARGET_ALL) -> StatsDict:
def sync_from_json(
self,
data_dir: str,
incremental: bool = False,
sync_target: str = SYNC_TARGET_ALL,
from_checkpoint: bool = False,
) -> StatsDict:
start = datetime.now()
normalized_sync_target = normalize_sync_target(sync_target)
customers_file, orders_file, order_details_file, bills_file, customer_details_file = self.resolve_data_files(
data_dir,
normalized_sync_target,
from_checkpoint,
)
logger.info(
"Loading source files from %s%s%s",
@@ -1129,11 +1159,18 @@ def main() -> None:
default=SYNC_TARGET_ALL,
help="选择同步对象: all/customer/order/orderdetails/bills",
)
_ = parser.add_argument(
"--from-checkpoint",
action="store_true",
default=False,
help="仅对 bills 生效:直接从 data/checkpoints/bills 最新 checkpoint 文件入库",
)
args = parser.parse_args()
data_dir = cast(str, args.dir)
incremental = cast(bool, args.incremental)
latest_bill_consumption_time = cast(bool, args.latest_bill_consumption_time)
sync_target = cast(str, args.sync_target)
from_checkpoint = cast(bool, args.from_checkpoint)
syncer = APSSyncer(db_config=DB_CONFIG)
if latest_bill_consumption_time:
@@ -1145,7 +1182,7 @@ def main() -> None:
return
finally:
syncer.close()
_ = syncer.sync_from_json(data_dir, incremental=incremental, sync_target=sync_target)
_ = syncer.sync_from_json(data_dir, incremental=incremental, sync_target=sync_target, from_checkpoint=from_checkpoint)
if __name__ == "__main__":