sql修改提交

This commit is contained in:
ray
2026-04-28 17:06:32 +08:00
parent e981b2aa95
commit 5ead561b94
5 changed files with 127 additions and 13 deletions

View File

@@ -31,6 +31,12 @@ npm run login
npm run sync
```
如果要从已有 checkpoint 继续全量流程(当前主要覆盖 orders + bills
```powershell
npm run sync -- --resume
```
默认包含:
- customers

View File

@@ -28,6 +28,12 @@ Node 版阿里云 APS 同步工具。
npm run sync
```
如果要让 full sync 从已有 checkpoint 继续(当前主要覆盖 orders + bills
```bash
npm run sync -- --resume
```
行为:
- 抓全量 customer + customerDetails

View File

@@ -64,7 +64,7 @@ export const datasets = {
name: 'customers',
url: `${config.baseUrl}/#/detail/my_customer/~/customer/list`,
heading: '我的客户',
pageSize: 100,
pageSize: 20,
uniqueKey: (record) => record.accountId || record.loginName || record.__hash,
normalize: (record) => {
const loginAndUid = record['登录名称/账号ID'] || '';

View File

@@ -19,7 +19,7 @@ if (command === 'login') {
}
if (command === 'sync') {
const summary = await syncAll();
const summary = await syncAll({ resume: billsResume });
console.log(JSON.stringify(summary, null, 2));
process.exit(0);
}

View File

@@ -239,6 +239,37 @@ function loadLatestBillsCheckpoint() {
}
}
function loadLatestOrdersCheckpoint() {
const checkpointDir = path.join(config.dataDir, 'checkpoints', 'orders');
if (!fs.existsSync(checkpointDir)) {
return null;
}
const candidates = fs.readdirSync(checkpointDir)
.filter((fileName) => fileName.endsWith('.json'))
.map((fileName) => {
const filePath = path.join(checkpointDir, fileName);
const stat = fs.statSync(filePath);
return { fileName, filePath, mtimeMs: stat.mtimeMs };
})
.sort((a, b) => b.mtimeMs - a.mtimeMs);
if (candidates.length === 0) {
return null;
}
try {
const latest = JSON.parse(fs.readFileSync(candidates[0].filePath, 'utf-8'));
if (!latest || typeof latest !== 'object') {
return null;
}
return latest;
} catch (error) {
console.warn(`[订单检查点] 读取失败,忽略断点续爬: ${error.message}`);
return null;
}
}
function subtractDays(dateValue, days) {
const next = new Date(dateValue);
next.setDate(next.getDate() - days);
@@ -393,11 +424,12 @@ export async function login() {
}
}
export async function syncAll() {
export async function syncAll(options = {}) {
const runtimeController = getRuntimeController();
runtimeController.bind();
const context = await getContext();
let page = null;
const { resume = false } = options;
try {
const summary = { startedAt: new Date().toISOString(), datasets: {} };
@@ -408,14 +440,14 @@ export async function syncAll() {
summary.datasets.customerDetails = await syncCustomerDetails(page);
}
summary.datasets.orders = await syncOrders(page, { incremental: !config.fullSync });
summary.datasets.orders = await syncOrders(page, { incremental: !config.fullSync, resume });
// syncOrders 完成后,从最新的 orders.json 读取 orderId 列表
const latestOrders = loadCurrentState('orders', datasets.orders.uniqueKey);
const orderIdsForDetail = collectValidOrderIds(latestOrders.records || []);
summary.datasets.orderDetails = await syncOrderDetails(page, orderIdsForDetail);
summary.datasets.bills = await syncBills(page, { incremental: !config.fullSync });
summary.datasets.bills = await syncBills(page, { incremental: !config.fullSync, resume });
summary.datasets.messages = await syncMessages(page, { incremental: !config.fullSync });
summary.finishedAt = new Date().toISOString();
@@ -614,7 +646,7 @@ async function syncCustomerDetails(page) {
async function syncOrders(page, options = {}) {
await runtimeCheckpoint('同步订单');
const dataset = datasets.orders;
const { incremental = false } = options;
const { incremental = false, resume = false } = options;
let windows;
if (!incremental) {
@@ -623,7 +655,16 @@ async function syncOrders(page, options = {}) {
windows = await buildIncrementalOrderWindows();
}
const allRecords = [];
const resumeCheckpoint = resume ? loadLatestOrdersCheckpoint() : null;
if (resumeCheckpoint?.windowStart) {
const resumeIndex = windows.findIndex((window) => window.start === resumeCheckpoint.windowStart && window.end === resumeCheckpoint.windowEnd);
if (resumeIndex >= 0) {
windows = windows.slice(resumeIndex);
console.log(`[订单续爬] 从 checkpoint 恢复: ${resumeCheckpoint.windowStart} ~ ${resumeCheckpoint.windowEnd}, page=${resumeCheckpoint.pageNum || 1}, records=${(resumeCheckpoint.records || []).length}`);
}
}
const allNormalizedRecords = [];
for (const window of windows) {
await runtimeCheckpoint(`订单窗口 ${window.start} ~ ${window.end}`);
@@ -632,15 +673,46 @@ async function syncOrders(page, options = {}) {
await setDateRange(page, window.start, window.end);
await clickQuery(page);
await trySetPageSize(page, dataset.pageSize);
const records = await scrapePagedTable(page, dataset, window);
allRecords.push(...records);
if (hasDbConfig()) {
const normalizedWindowRecords = dedupeByHash(normalizeDatasetRecords(dataset, records, window));
await upsertOrders(normalizedWindowRecords);
let windowNormalizedRecords = [];
let resumeFromPage = 0;
let shouldContinueScrape = true;
if (resumeCheckpoint?.windowStart === window.start && resumeCheckpoint?.windowEnd === window.end) {
windowNormalizedRecords = Array.isArray(resumeCheckpoint.records) ? resumeCheckpoint.records : [];
resumeFromPage = Number.parseInt(String(resumeCheckpoint.pageNum || 0), 10) || 0;
if (resumeFromPage > 0) {
const moved = await moveOrdersToResumeStart(page, resumeFromPage);
if (!moved) {
console.log(`[订单续爬] checkpoint 已在最后一页,无需继续抓取 window=${window.start}~${window.end}`);
shouldContinueScrape = false;
}
}
}
let records = [];
if (shouldContinueScrape) {
records = await scrapePagedTable(page, dataset, window, {
onPage: async ({ pageNum, pageRows }) => {
const normalizedPageRows = normalizeDatasetRecords(dataset, pageRows, window);
windowNormalizedRecords.push(...normalizedPageRows);
if (hasDbConfig()) {
await upsertOrders(normalizedPageRows);
}
await saveOrdersCheckpoint(dataset, window, pageNum, windowNormalizedRecords);
},
});
}
if (resumeFromPage === 0) {
windowNormalizedRecords = normalizeDatasetRecords(dataset, records, window);
if (hasDbConfig()) {
await upsertOrders(dedupeByHash(windowNormalizedRecords));
}
}
allNormalizedRecords.push(...windowNormalizedRecords);
}
return persistDataset(dataset, dedupeByHash(allRecords), {});
return persistNormalizedDataset(dataset, dedupeByHash(allNormalizedRecords));
}
async function buildIncrementalOrderWindows() {
@@ -828,6 +900,22 @@ async function saveBillsCheckpoint(dataset, month, pageNum, normalizedRecords) {
console.log(`[账单检查点] 已落盘: month=${month}, page=${pageNum}, records=${normalized.length}`);
}
async function saveOrdersCheckpoint(dataset, window, pageNum, normalizedRecords) {
const normalized = dedupeByHash(normalizedRecords);
const checkpointName = `${window.start}_${window.end}`.replace(/[^0-9_-]/g, '-');
saveCheckpoint(dataset.name, checkpointName, {
windowStart: window.start,
windowEnd: window.end,
pageNum,
savedAt: new Date().toISOString(),
stats: {
total: normalized.length,
},
records: normalized,
});
console.log(`[订单检查点] 已落盘: ${window.start} ~ ${window.end}, page=${pageNum}, records=${normalized.length}`);
}
function normalizeDatasetRecords(dataset, records, context) {
return records.map((record) => withHash(dataset.normalize(record, record.__context || context)));
}
@@ -846,6 +934,20 @@ async function moveBillsToResumeStart(page, resumeFromPage) {
return moved;
}
async function moveOrdersToResumeStart(page, resumeFromPage) {
if (resumeFromPage <= 0) {
return true;
}
const reached = await jumpToPage(page, resumeFromPage);
if (!reached) {
throw new Error(`订单续爬失败:无法定位到 checkpoint 页码 ${resumeFromPage}`);
}
const moved = await gotoNextPage(page);
return moved;
}
async function getLatestBillConsumptionDate() {
if (!hasDbConfig()) {
console.warn('[增量模式] 未配置数据库连接,无法读取账单水位,回退到当前日期');