订单详情报错提交

This commit is contained in:
ray
2026-05-07 14:45:01 +08:00
parent bbba33cab0
commit a06cdc70f1
4 changed files with 330 additions and 40 deletions

View File

@@ -31,7 +31,7 @@ npm run login
npm run sync
```
如果要从已有 checkpoint 继续全量流程(当前主要覆盖 orders + bills
如果要从已有 checkpoint 继续全量流程(覆盖 customers / customerDetails / orders / orderDetails / bills / messages
```powershell
npm run sync -- --resume

View File

@@ -28,7 +28,7 @@ Node 版阿里云 APS 同步工具。
npm run sync
```
如果要让 full sync 从已有 checkpoint 继续(当前主要覆盖 orders + bills
如果要让 full sync 从已有 checkpoint 继续(覆盖 customers / customerDetails / orders / orderDetails / bills / messages
```bash
npm run sync -- --resume

View File

@@ -5,6 +5,7 @@ const command = args[0] || 'sync';
const extraArgs = args.slice(1);
const billsResume = extraArgs.includes('--resume');
const ordersIncremental = extraArgs.includes('--incremental');
const messagesResume = extraArgs.includes('--resume');
for (const arg of extraArgs) {
if (arg.startsWith('--incremental-order-start-date=')) {
@@ -44,7 +45,7 @@ if (command === 'orders') {
}
if (command === 'messages') {
const summary = await syncMessagesOnly({ incremental: config.scheduleMode === 'incremental' });
const summary = await syncMessagesOnly({ incremental: config.scheduleMode === 'incremental', resume: messagesResume });
console.log(JSON.stringify(summary, null, 2));
process.exit(0);
}

View File

@@ -481,8 +481,8 @@ export async function syncAll(options = {}) {
page = await resolveActivePage(context, '/detail/my_customer/~/customer/list');
if (config.fullSync) {
summary.datasets.customers = await syncCustomers(page);
summary.datasets.customerDetails = await syncCustomerDetails(page);
summary.datasets.customers = await syncCustomers(page, { resume });
summary.datasets.customerDetails = await syncCustomerDetails(page, { resume });
}
summary.datasets.orders = await syncOrders(page, { incremental: !config.fullSync, resume });
@@ -491,9 +491,9 @@ export async function syncAll(options = {}) {
const latestOrders = loadCurrentState('orders', datasets.orders.uniqueKey);
const orderIdsForDetail = collectValidOrderIds(latestOrders.records || []);
summary.datasets.orderDetails = await syncOrderDetails(page, orderIdsForDetail);
summary.datasets.orderDetails = await syncOrderDetails(page, orderIdsForDetail, options);
summary.datasets.bills = await syncBills(page, { incremental: !config.fullSync, resume });
summary.datasets.messages = await syncMessages(page, { incremental: !config.fullSync });
summary.datasets.messages = await syncMessages(page, { incremental: !config.fullSync, resume });
summary.finishedAt = new Date().toISOString();
const stamp = nowStamp();
@@ -632,12 +632,12 @@ export async function syncAllIncremental() {
try {
const summary = { startedAt: new Date().toISOString(), mode: 'incremental', datasets: {} };
page = await resolveActivePage(context, '/detail/order/~/costCenter/order');
summary.datasets.orders = await syncOrders(page, { incremental: true });
summary.datasets.orders = await syncOrders(page, { incremental: true, resume: true });
const latestOrders = loadCurrentState('orders', datasets.orders.uniqueKey);
const orderIdsForDetail = collectValidOrderIds(latestOrders.records || []);
summary.datasets.orderDetails = await syncOrderDetails(page, orderIdsForDetail);
summary.datasets.bills = await syncBills(page, { incremental: true });
summary.datasets.messages = await syncMessages(page, { incremental: true });
summary.datasets.orderDetails = await syncOrderDetails(page, orderIdsForDetail, { resume: true });
summary.datasets.bills = await syncBills(page, { incremental: true, resume: true });
summary.datasets.messages = await syncMessages(page, { incremental: true, resume: true });
summary.finishedAt = new Date().toISOString();
const stamp = nowStamp();
@@ -657,25 +657,59 @@ export async function syncAllIncremental() {
}
}
async function syncCustomers(page) {
async function syncCustomers(page, options = {}) {
await runtimeCheckpoint('同步客户');
const dataset = datasets.customers;
const { resume = false } = options;
await page.goto(dataset.url, { waitUntil: 'domcontentloaded' });
await waitUntilReady(page, dataset.heading);
await trySetPageSize(page, dataset.pageSize);
const records = await scrapePagedTable(page, dataset, {});
if (hasDbConfig()) {
const normalizedRecords = dedupeByHash(normalizeDatasetRecords(dataset, records, {}));
await upsertCustomers(normalizedRecords);
const resumeCheckpoint = resume ? loadLatestCustomersCheckpoint() : null;
let resumeFromPage = Number.parseInt(String(resumeCheckpoint?.pageNum || 0), 10) || 0;
let shouldContinueScrape = true;
let allNormalizedRecords = Array.isArray(resumeCheckpoint?.records) ? resumeCheckpoint.records : [];
if (resumeFromPage > 0) {
console.log(`[客户续爬] 从 checkpoint 恢复: page=${resumeFromPage}, records=${allNormalizedRecords.length}`);
const moved = await moveCustomersToResumeStart(page, resumeFromPage);
if (!moved) {
console.log('[客户续爬] checkpoint 已在最后一页,无需继续抓取');
shouldContinueScrape = false;
}
}
return persistDataset(dataset, records, {});
let records = [];
if (shouldContinueScrape) {
records = await scrapePagedTable(page, dataset, {}, {
onPage: async ({ pageNum, pageRows }) => {
const normalizedPageRows = normalizeDatasetRecords(dataset, pageRows, {});
allNormalizedRecords.push(...normalizedPageRows);
if (hasDbConfig()) {
await upsertCustomers(normalizedPageRows);
}
await saveCustomersCheckpoint(dataset, pageNum, allNormalizedRecords);
},
skipInitialPage: resumeFromPage > 0,
});
}
if (resumeFromPage === 0) {
allNormalizedRecords = normalizeDatasetRecords(dataset, records, {});
if (hasDbConfig()) {
await upsertCustomers(dedupeByHash(allNormalizedRecords));
}
}
return persistDataset(dataset, dedupeByHash(allNormalizedRecords), {});
}
async function syncCustomerDetails(page) {
async function syncCustomerDetails(page, options = {}) {
await runtimeCheckpoint('同步客户详情');
const dataset = datasets.customerDetails;
const customersState = loadCurrentState('customers', datasets.customers.uniqueKey);
const customerTargets = collectCustomerDetailTargets(customersState.records || []);
const resumeCheckpoint = options.resume ? loadLatestCustomerDetailsCheckpoint() : null;
if (customerTargets.length === 0) {
console.log('[客户详情] 本地无有效客户定位信息,跳过');
@@ -683,14 +717,18 @@ async function syncCustomerDetails(page) {
}
console.log(`[客户详情] 共 ${customerTargets.length} 个客户需要获取详情`);
const allDetails = [];
const allDetails = Array.isArray(resumeCheckpoint?.records) ? resumeCheckpoint.records : [];
let currentListPage = 0;
let startIndex = Number.parseInt(String(resumeCheckpoint?.currentIndex || 0), 10) || 0;
if (startIndex > 0) {
console.log(`[客户详情续爬] 从 checkpoint 恢复: index=${startIndex}, records=${allDetails.length}`);
}
await page.goto(datasets.customers.url, { waitUntil: 'domcontentloaded' });
await waitUntilReady(page, datasets.customers.heading);
await trySetPageSize(page, datasets.customers.pageSize);
for (let index = 0; index < customerTargets.length; index += 1) {
for (let index = startIndex; index < customerTargets.length; index += 1) {
await runtimeCheckpoint(`客户详情 ${index + 1}/${customerTargets.length}`);
const target = customerTargets[index];
console.log(`[客户详情] ${index + 1}/${customerTargets.length} accountId=${target.accountId} page=${target.pageNum}`);
@@ -699,16 +737,15 @@ async function syncCustomerDetails(page) {
await sleep(pauseMs);
if (target.pageNum > 0 && currentListPage !== target.pageNum) {
const reached = await jumpToPage(page, target.pageNum);
const reached = await jumpToCustomerPage(page, target.pageNum);
if (!reached) {
console.warn(`[客户详情] 无法跳到第 ${target.pageNum} 页,跳过 ${target.accountId}`);
continue;
}
currentListPage = target.pageNum;
await waitForTableRows(page);
}
const clicked = await clickCustomerDetailFromList(page, target);
const clicked = await clickCustomerDetailFromListWithRetry(page, target);
if (!clicked) {
console.warn(`[客户详情] 列表中未找到 accountId=${target.accountId},跳过`);
continue;
@@ -724,20 +761,20 @@ async function syncCustomerDetails(page) {
} catch {
console.warn(`[客户详情] ${target.accountId} 详情页加载超时,跳过`);
await page.goBack({ waitUntil: 'domcontentloaded' }).catch(() => null);
await waitUntilReady(page, datasets.customers.heading).catch(() => null);
await recoverCustomerListState(page, currentListPage).catch(() => null);
continue;
}
const detail = await extractCustomerDetail(page);
allDetails.push({ ...detail, __context: { accountId: target.accountId } });
await saveCustomerDetailsCheckpoint(dataset, index + 1, allDetails);
if (hasDbConfig()) {
const normalizedDetail = normalizeDatasetRecords(dataset, [{ ...detail, __context: { accountId: target.accountId } }], {});
await upsertCustomerDetails(normalizedDetail);
}
await page.goBack({ waitUntil: 'domcontentloaded' }).catch(() => null);
await waitUntilReady(page, datasets.customers.heading).catch(() => null);
await trySetPageSize(page, datasets.customers.pageSize).catch(() => null);
await recoverCustomerListState(page, currentListPage).catch(() => null);
currentListPage = target.pageNum;
}
@@ -953,19 +990,43 @@ async function syncBills(page, options = {}) {
async function syncMessages(page, options = {}) {
await runtimeCheckpoint('同步消息');
const dataset = datasets.messages;
const { incremental = false } = options;
const { incremental = false, resume = false } = options;
await page.goto(dataset.url, { waitUntil: 'domcontentloaded' });
await waitUntilReady(page, dataset.heading);
await trySetPageSize(page, dataset.pageSize);
let records = await scrapePagedTable(page, dataset, {}, {
onPage: hasDbConfig()
? async ({ pageRows }) => {
const normalizedPageRows = normalizeDatasetRecords(dataset, pageRows, {});
const resumeCheckpoint = resume ? loadLatestMessagesCheckpoint() : null;
let resumeFromPage = Number.parseInt(String(resumeCheckpoint?.pageNum || 0), 10) || 0;
let shouldContinueScrape = true;
let allNormalizedRecords = Array.isArray(resumeCheckpoint?.records) ? resumeCheckpoint.records : [];
if (resumeFromPage > 0) {
console.log(`[消息续爬] 从 checkpoint 恢复: page=${resumeFromPage}, records=${allNormalizedRecords.length}`);
const moved = await moveMessagesToResumeStart(page, resumeFromPage);
if (!moved) {
console.log('[消息续爬] checkpoint 已在最后一页,无需继续抓取');
shouldContinueScrape = false;
}
}
let records = [];
if (shouldContinueScrape) {
records = await scrapePagedTable(page, dataset, {}, {
onPage: async ({ pageNum, pageRows }) => {
const normalizedPageRows = normalizeDatasetRecords(dataset, pageRows, {});
allNormalizedRecords.push(...normalizedPageRows);
if (hasDbConfig()) {
await upsertMessages(normalizedPageRows);
}
: undefined,
});
await saveMessagesCheckpoint(dataset, pageNum, allNormalizedRecords);
},
skipInitialPage: resumeFromPage > 0,
});
}
if (resumeFromPage === 0) {
allNormalizedRecords = normalizeDatasetRecords(dataset, records, {});
}
if (incremental && hasDbConfig()) {
try {
const latestMessageTime = await getLatestMessageTimeFromDb();
@@ -973,9 +1034,9 @@ async function syncMessages(page, options = {}) {
const latest = parseDbDateTime(latestMessageTime);
if (latest) {
const watermark = subtractDays(latest, config.messageIncrementalOverlapDays);
const before = records.length;
records = records.filter((record) => isAfterLatestMessageTime(record, watermark));
console.log(`[增量模式] 消息按时间过滤: ${before} -> ${records.length} (db_last=${latestMessageTime}, overlap=${config.messageIncrementalOverlapDays}d)`);
const before = allNormalizedRecords.length;
allNormalizedRecords = allNormalizedRecords.filter((record) => isAfterLatestMessageTime(record, watermark));
console.log(`[增量模式] 消息按时间过滤: ${before} -> ${allNormalizedRecords.length} (db_last=${latestMessageTime}, overlap=${config.messageIncrementalOverlapDays}d)`);
}
}
} catch (error) {
@@ -983,7 +1044,63 @@ async function syncMessages(page, options = {}) {
}
}
return persistDataset(dataset, dedupeByHash(records), {});
return persistDataset(dataset, dedupeByHash(allNormalizedRecords), {});
}
async function saveMessagesCheckpoint(dataset, pageNum, normalizedRecords) {
const normalized = dedupeByHash(normalizedRecords);
saveCheckpoint(dataset.name, `page-${pageNum}`, {
pageNum,
savedAt: new Date().toISOString(),
stats: { total: normalized.length },
records: normalized,
});
console.log(`[消息检查点] 已落盘: page=${pageNum}, records=${normalized.length}`);
}
function loadLatestMessagesCheckpoint() {
const checkpointDir = path.join(config.dataDir, 'checkpoints', 'messages');
if (!fs.existsSync(checkpointDir)) {
return null;
}
const candidates = fs.readdirSync(checkpointDir)
.filter((fileName) => fileName.endsWith('.json'))
.map((fileName) => {
const filePath = path.join(checkpointDir, fileName);
const stat = fs.statSync(filePath);
return { fileName, filePath, mtimeMs: stat.mtimeMs };
})
.sort((a, b) => b.mtimeMs - a.mtimeMs);
if (candidates.length === 0) {
return null;
}
try {
const latest = JSON.parse(fs.readFileSync(candidates[0].filePath, 'utf-8'));
if (!latest || typeof latest !== 'object') {
return null;
}
return latest;
} catch (error) {
console.warn(`[消息检查点] 读取失败,忽略断点续爬: ${error.message}`);
return null;
}
}
async function moveMessagesToResumeStart(page, resumeFromPage) {
if (resumeFromPage <= 0) {
return true;
}
const reached = await jumpToPage(page, resumeFromPage);
if (!reached) {
throw new Error(`消息续爬失败:无法定位到 checkpoint 页码 ${resumeFromPage}`);
}
const moved = await gotoNextPage(page);
return moved;
}
async function saveBillsCheckpoint(dataset, month, pageNum, normalizedRecords) {
@@ -1017,6 +1134,116 @@ async function saveOrdersCheckpoint(dataset, window, pageNum, normalizedRecords)
console.log(`[订单检查点] 已落盘: ${window.start} ~ ${window.end}, page=${pageNum}, records=${normalized.length}`);
}
async function saveCustomersCheckpoint(dataset, pageNum, normalizedRecords) {
const normalized = dedupeByHash(normalizedRecords);
saveCheckpoint(dataset.name, `page-${pageNum}`, {
pageNum,
savedAt: new Date().toISOString(),
stats: { total: normalized.length },
records: normalized,
});
console.log(`[客户检查点] 已落盘: page=${pageNum}, records=${normalized.length}`);
}
function loadLatestCustomersCheckpoint() {
const checkpointDir = path.join(config.dataDir, 'checkpoints', 'customers');
if (!fs.existsSync(checkpointDir)) {
return null;
}
const candidates = fs.readdirSync(checkpointDir)
.filter((fileName) => fileName.endsWith('.json'))
.map((fileName) => {
const filePath = path.join(checkpointDir, fileName);
const stat = fs.statSync(filePath);
return { fileName, filePath, mtimeMs: stat.mtimeMs };
})
.sort((a, b) => b.mtimeMs - a.mtimeMs);
if (candidates.length === 0) {
return null;
}
try {
const latest = JSON.parse(fs.readFileSync(candidates[0].filePath, 'utf-8'));
if (!latest || typeof latest !== 'object') {
return null;
}
return latest;
} catch (error) {
console.warn(`[客户检查点] 读取失败,忽略断点续爬: ${error.message}`);
return null;
}
}
async function saveCustomerDetailsCheckpoint(dataset, currentIndex, records) {
const normalized = dedupeByHash(records);
saveCheckpoint(dataset.name, 'latest', {
currentIndex,
savedAt: new Date().toISOString(),
stats: { total: normalized.length },
records: normalized,
});
console.log(`[客户详情检查点] 已落盘: index=${currentIndex}, records=${normalized.length}`);
}
function loadLatestCustomerDetailsCheckpoint() {
const checkpointDir = path.join(config.dataDir, 'checkpoints', 'customerDetails');
if (!fs.existsSync(checkpointDir)) {
return null;
}
const latestFile = path.join(checkpointDir, 'latest.json');
if (!fs.existsSync(latestFile)) {
const candidates = fs.readdirSync(checkpointDir).filter((fileName) => fileName.endsWith('.json'));
if (candidates.length === 0) {
return null;
}
return JSON.parse(fs.readFileSync(path.join(checkpointDir, candidates[0]), 'utf-8'));
}
try {
return JSON.parse(fs.readFileSync(latestFile, 'utf-8'));
} catch (error) {
console.warn(`[客户详情检查点] 读取失败: ${error.message}`);
return null;
}
}
async function saveOrderDetailsCheckpoint(dataset, currentIndex, records) {
const normalized = dedupeByHash(records);
saveCheckpoint(dataset.name, 'latest', {
currentIndex,
savedAt: new Date().toISOString(),
stats: { total: normalized.length },
records: normalized,
});
console.log(`[订单详情检查点] 已落盘: index=${currentIndex}, records=${normalized.length}`);
}
function loadLatestOrderDetailsCheckpoint() {
const checkpointDir = path.join(config.dataDir, 'checkpoints', 'orderDetails');
if (!fs.existsSync(checkpointDir)) {
return null;
}
const latestFile = path.join(checkpointDir, 'latest.json');
if (!fs.existsSync(latestFile)) {
const candidates = fs.readdirSync(checkpointDir).filter((fileName) => fileName.endsWith('.json'));
if (candidates.length === 0) {
return null;
}
return JSON.parse(fs.readFileSync(path.join(checkpointDir, candidates[0]), 'utf-8'));
}
try {
return JSON.parse(fs.readFileSync(latestFile, 'utf-8'));
} catch (error) {
console.warn(`[订单详情检查点] 读取失败: ${error.message}`);
return null;
}
}
function normalizeDatasetRecords(dataset, records, context) {
return records.map((record) => withHash(dataset.normalize(record, record.__context || context)));
}
@@ -1049,6 +1276,20 @@ async function moveOrdersToResumeStart(page, resumeFromPage) {
return moved;
}
async function moveCustomersToResumeStart(page, resumeFromPage) {
if (resumeFromPage <= 0) {
return true;
}
const reached = await jumpToPage(page, resumeFromPage);
if (!reached) {
throw new Error(`客户续爬失败:无法定位到 checkpoint 页码 ${resumeFromPage}`);
}
const moved = await gotoNextPage(page);
return moved;
}
async function getLatestBillConsumptionDate() {
if (!hasDbConfig()) {
console.warn('[增量模式] 未配置数据库连接,无法读取账单水位,回退到当前日期');
@@ -1091,9 +1332,10 @@ function isAfterLatestMessageTime(record, watermarkDate) {
return parsed >= watermarkDate;
}
async function syncOrderDetails(page, cachedOrderIds) {
async function syncOrderDetails(page, cachedOrderIds, options = {}) {
await runtimeCheckpoint('同步订单详情');
const dataset = datasets.orderDetails;
const resumeCheckpoint = options.resume ? loadLatestOrderDetailsCheckpoint() : null;
// 使用传入的 orderId 列表(在 syncOrders 覆盖 orders.json 之前缓存的)
const allOrderIds = cachedOrderIds || [];
@@ -1104,10 +1346,14 @@ async function syncOrderDetails(page, cachedOrderIds) {
}
console.log(`[订单详情] 共 ${allOrderIds.length} 个订单需要获取详情`);
const allDetails = [];
const allDetails = Array.isArray(resumeCheckpoint?.records) ? resumeCheckpoint.records : [];
const startIndex = Number.parseInt(String(resumeCheckpoint?.currentIndex || 0), 10) || 0;
if (startIndex > 0) {
console.log(`[订单详情续爬] 从 checkpoint 恢复: index=${startIndex}, records=${allDetails.length}`);
}
const detailBaseUrl = 'https://aps.aliyun.com/?spm=5176.12818093.top-nav.ditem-fx.785716d0LKDpKT#/detail/order/~/costCenter/order/detail/';
for (let index = 0; index < allOrderIds.length; index += 1) {
for (let index = startIndex; index < allOrderIds.length; index += 1) {
await runtimeCheckpoint(`订单详情 ${index + 1}/${allOrderIds.length}`);
const orderId = allOrderIds[index];
console.log(`[订单详情] ${index + 1}/${allOrderIds.length} orderId=${orderId}`);
@@ -1134,6 +1380,7 @@ async function syncOrderDetails(page, cachedOrderIds) {
detail.orderId = orderId;
}
allDetails.push({ ...detail, __context: {} });
await saveOrderDetailsCheckpoint(dataset, index + 1, allDetails);
if (hasDbConfig()) {
const normalizedDetail = normalizeDatasetRecords(dataset, [{ ...detail, __context: {} }], {});
await upsertOrderDetails(normalizedDetail);
@@ -1740,6 +1987,48 @@ async function clickCustomerDetailFromList(page, target) {
return clicked;
}
async function clickCustomerDetailFromListWithRetry(page, target) {
const attempts = [target.pageNum, Math.max(1, target.pageNum - 1), target.pageNum + 1];
for (const pageNum of attempts) {
if (pageNum > 0 && pageNum !== target.pageNum) {
const reached = await jumpToCustomerPage(page, pageNum);
if (!reached) {
continue;
}
await waitForStableCustomerList(page);
}
const clicked = await clickCustomerDetailFromList(page, target);
if (clicked) {
return true;
}
}
return false;
}
async function jumpToCustomerPage(page, pageNum) {
const reached = await jumpToPage(page, pageNum);
if (reached) {
console.log(`[客户详情] 已跳转到第 ${pageNum}`);
}
return reached;
}
async function waitForStableCustomerList(page) {
await waitForTableRows(page).catch(() => null);
await sleep(600);
await waitForTableRows(page).catch(() => null);
}
async function recoverCustomerListState(page, pageNum) {
await waitUntilReady(page, datasets.customers.heading).catch(() => null);
await trySetPageSize(page, datasets.customers.pageSize).catch(() => null);
if (pageNum > 0) {
await jumpToCustomerPage(page, pageNum).catch(() => null);
await waitForStableCustomerList(page).catch(() => null);
}
}
function isValidOrderId(orderId) {
const value = String(orderId || '').trim();
if (!value) return false;