数据5分钟同步一次修改
This commit is contained in:
@@ -35,6 +35,7 @@ let _context = null;
|
||||
let _runtimeController = null;
|
||||
let _browser = null;
|
||||
let _isAttachedBrowser = false;
|
||||
const runningJobs = new Set();
|
||||
|
||||
const AUTH_PAGE_KEYWORDS = [
|
||||
'RAM 用户登录',
|
||||
@@ -334,6 +335,151 @@ function formatDateTime(date) {
|
||||
return `${formatDate(date)} ${String(date.getHours()).padStart(2, '0')}:${String(date.getMinutes()).padStart(2, '0')}:${String(date.getSeconds()).padStart(2, '0')}`;
|
||||
}
|
||||
|
||||
function isSameDate(value, date) {
|
||||
const parsed = parseDbDateTime(value);
|
||||
if (!parsed) {
|
||||
return false;
|
||||
}
|
||||
return formatDate(parsed) === formatDate(date);
|
||||
}
|
||||
|
||||
function addMinutes(date, minutes) {
|
||||
const next = new Date(date);
|
||||
next.setMinutes(next.getMinutes() + minutes);
|
||||
return next;
|
||||
}
|
||||
|
||||
function buildOrderFingerprint(record) {
|
||||
return [
|
||||
String(record.orderStatus || '').trim(),
|
||||
String(record.actualPaidCny || '').trim(),
|
||||
String(record.orderOriginalPriceCny || '').trim(),
|
||||
String(record.orderType || '').trim(),
|
||||
String(record.customerCategory || '').trim(),
|
||||
String(record.createdAt || '').trim(),
|
||||
].join('|');
|
||||
}
|
||||
|
||||
function isFinalOrderStatus(status) {
|
||||
const normalized = String(status || '').trim();
|
||||
if (!normalized) {
|
||||
return false;
|
||||
}
|
||||
return config.hotFinalStatuses.some((item) => item === normalized);
|
||||
}
|
||||
|
||||
async function runLockedJob(jobName, job) {
|
||||
if (runningJobs.has(jobName)) {
|
||||
console.log(`[任务锁] ${jobName} 已在运行,跳过本次执行`);
|
||||
return { skipped: true, reason: 'already_running', jobName };
|
||||
}
|
||||
runningJobs.add(jobName);
|
||||
try {
|
||||
return await job();
|
||||
} finally {
|
||||
runningJobs.delete(jobName);
|
||||
}
|
||||
}
|
||||
|
||||
function buildTodayOrderWindow() {
|
||||
const today = formatDate(new Date());
|
||||
return buildSingleDateWindow(today, today);
|
||||
}
|
||||
|
||||
function computeChangedOrderIds(previousRecords, nextRecords) {
|
||||
const previousMap = new Map();
|
||||
for (const record of previousRecords || []) {
|
||||
const orderId = String(record.orderId || '').trim();
|
||||
if (!orderId) {
|
||||
continue;
|
||||
}
|
||||
previousMap.set(orderId, record);
|
||||
}
|
||||
|
||||
const changedOrderIds = [];
|
||||
for (const record of nextRecords || []) {
|
||||
const orderId = String(record.orderId || '').trim();
|
||||
if (!orderId) {
|
||||
continue;
|
||||
}
|
||||
const previous = previousMap.get(orderId);
|
||||
if (!previous) {
|
||||
changedOrderIds.push(orderId);
|
||||
continue;
|
||||
}
|
||||
if (buildOrderFingerprint(previous) !== buildOrderFingerprint(record)) {
|
||||
changedOrderIds.push(orderId);
|
||||
}
|
||||
}
|
||||
return Array.from(new Set(changedOrderIds));
|
||||
}
|
||||
|
||||
function selectOrderDetailCandidates(orderRecords, changedOrderIds, detailRecords) {
|
||||
const changedSet = new Set((changedOrderIds || []).map((item) => String(item || '').trim()).filter(Boolean));
|
||||
const detailMap = new Map();
|
||||
for (const record of detailRecords || []) {
|
||||
const orderId = String(record.orderId || '').trim();
|
||||
if (!orderId) {
|
||||
continue;
|
||||
}
|
||||
detailMap.set(orderId, record);
|
||||
}
|
||||
|
||||
const now = new Date();
|
||||
const refreshBefore = addMinutes(now, -config.hotOrderDetailRefreshMinutes);
|
||||
const candidateIds = [];
|
||||
for (const record of orderRecords || []) {
|
||||
const orderId = String(record.orderId || '').trim();
|
||||
if (!orderId || !isValidOrderId(orderId)) {
|
||||
continue;
|
||||
}
|
||||
if (changedSet.has(orderId)) {
|
||||
candidateIds.push(orderId);
|
||||
continue;
|
||||
}
|
||||
const status = String(record.orderStatus || '').trim();
|
||||
if (isFinalOrderStatus(status)) {
|
||||
continue;
|
||||
}
|
||||
const detail = detailMap.get(orderId);
|
||||
if (!detail) {
|
||||
candidateIds.push(orderId);
|
||||
continue;
|
||||
}
|
||||
const lastSyncedAt = parseDbDateTime(detail.detailSyncedAt || detail.__detailSyncedAt || '');
|
||||
if (!lastSyncedAt || lastSyncedAt <= refreshBefore) {
|
||||
candidateIds.push(orderId);
|
||||
}
|
||||
}
|
||||
return Array.from(new Set(candidateIds));
|
||||
}
|
||||
|
||||
function summarizeHotPage(previousOrderMap, normalizedPageRows) {
|
||||
let stableCount = 0;
|
||||
let changedCount = 0;
|
||||
let newCount = 0;
|
||||
let todayRowCount = 0;
|
||||
|
||||
for (const record of normalizedPageRows) {
|
||||
if (isSameDate(record.createdAt, new Date())) {
|
||||
todayRowCount += 1;
|
||||
}
|
||||
const orderId = String(record.orderId || '').trim();
|
||||
const previous = previousOrderMap.get(orderId);
|
||||
if (!previous) {
|
||||
newCount += 1;
|
||||
continue;
|
||||
}
|
||||
if (buildOrderFingerprint(previous) === buildOrderFingerprint(record)) {
|
||||
stableCount += 1;
|
||||
} else {
|
||||
changedCount += 1;
|
||||
}
|
||||
}
|
||||
|
||||
return { stableCount, changedCount, newCount, todayRowCount };
|
||||
}
|
||||
|
||||
function buildSingleDateWindow(startDate, endDate) {
|
||||
return [{
|
||||
windowStart: startDate,
|
||||
@@ -552,9 +698,13 @@ export async function syncOrdersOnly(options = {}) {
|
||||
try {
|
||||
const summary = { startedAt: new Date().toISOString(), datasets: {} };
|
||||
page = await resolveActivePage(context, '/detail/order/~/costCenter/order');
|
||||
summary.datasets.orders = await syncOrders(page, options);
|
||||
const orderSyncResult = await syncOrders(page, options);
|
||||
summary.datasets.orders = orderSyncResult;
|
||||
const latestOrders = loadCurrentState('orders', datasets.orders.uniqueKey);
|
||||
const orderIdsForDetail = collectValidOrderIds(latestOrders.records || []);
|
||||
const orderDetailsState = loadCurrentState('orderDetails', datasets.orderDetails.uniqueKey);
|
||||
const orderIdsForDetail = options.hot
|
||||
? selectOrderDetailCandidates(latestOrders.records || [], orderSyncResult.changedOrderIds || [], orderDetailsState.records || [])
|
||||
: collectValidOrderIds(latestOrders.records || []);
|
||||
summary.datasets.orderDetails = await syncOrderDetails(page, orderIdsForDetail);
|
||||
summary.finishedAt = new Date().toISOString();
|
||||
|
||||
@@ -605,10 +755,13 @@ export async function syncMessagesOnly(options = {}) {
|
||||
}
|
||||
|
||||
export async function scheduleSync() {
|
||||
console.log(`定时任务已启动: ${config.cron} (${config.timezone})`);
|
||||
console.log(`定时任务已启动: normal=${config.cron}, hot=${config.hotCron} (${config.timezone})`);
|
||||
cron.schedule(
|
||||
config.cron,
|
||||
async () => {
|
||||
if (config.scheduleMode === 'hot') {
|
||||
return;
|
||||
}
|
||||
try {
|
||||
console.log(`[${new Date().toISOString()}] 开始执行同步 mode=${config.scheduleMode}`);
|
||||
const summary = config.scheduleMode === 'full'
|
||||
@@ -621,6 +774,67 @@ export async function scheduleSync() {
|
||||
},
|
||||
{ timezone: config.timezone },
|
||||
);
|
||||
|
||||
cron.schedule(
|
||||
config.hotCron,
|
||||
async () => {
|
||||
if (config.scheduleMode !== 'hot') {
|
||||
return;
|
||||
}
|
||||
try {
|
||||
console.log(`[${new Date().toISOString()}] 开始执行高频同步 mode=hot`);
|
||||
const summary = await syncHot();
|
||||
console.log(`[${new Date().toISOString()}] 高频同步完成`, JSON.stringify(summary, null, 2));
|
||||
} catch (error) {
|
||||
console.error(`[${new Date().toISOString()}] 高频同步失败`, error);
|
||||
}
|
||||
},
|
||||
{ timezone: config.timezone },
|
||||
);
|
||||
}
|
||||
|
||||
export async function syncHot(options = {}) {
|
||||
return runLockedJob('hot-sync', async () => {
|
||||
const runtimeController = getRuntimeController();
|
||||
runtimeController.bind();
|
||||
const context = await getContext();
|
||||
let page = null;
|
||||
|
||||
try {
|
||||
const summary = { startedAt: new Date().toISOString(), mode: 'hot', datasets: {} };
|
||||
page = await resolveActivePage(context, '/detail/order/~/costCenter/order');
|
||||
const orderSyncResult = await syncOrders(page, { ...options, hot: true, incremental: true, resume: options.resume === true });
|
||||
summary.datasets.orders = orderSyncResult;
|
||||
|
||||
const latestOrders = loadCurrentState('orders', datasets.orders.uniqueKey);
|
||||
const orderDetailsState = loadCurrentState('orderDetails', datasets.orderDetails.uniqueKey);
|
||||
const orderIdsForDetail = selectOrderDetailCandidates(
|
||||
latestOrders.records || [],
|
||||
orderSyncResult.changedOrderIds || [],
|
||||
orderDetailsState.records || [],
|
||||
);
|
||||
|
||||
summary.datasets.orderDetails = await syncOrderDetails(page, orderIdsForDetail, { resume: options.resume === true });
|
||||
page = await resolveActivePage(context, '/message');
|
||||
summary.datasets.messages = await syncMessages(page, { incremental: true, resume: options.resume === true, hot: true });
|
||||
summary.finishedAt = new Date().toISOString();
|
||||
|
||||
const stamp = nowStamp();
|
||||
saveRunSummary(stamp, summary);
|
||||
return summary;
|
||||
} catch (error) {
|
||||
await reportRuntimeError(error, page, { label: 'syncHot', dataset: 'hot', mode: 'hot' });
|
||||
throw error;
|
||||
} finally {
|
||||
if (config.closeBrowser) {
|
||||
await closeContextIfNeeded();
|
||||
} else {
|
||||
console.log('浏览器保持运行');
|
||||
}
|
||||
await closeDbPool();
|
||||
runtimeController.unbind();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
export async function syncAllIncremental() {
|
||||
@@ -632,7 +846,8 @@ export async function syncAllIncremental() {
|
||||
try {
|
||||
const summary = { startedAt: new Date().toISOString(), mode: 'incremental', datasets: {} };
|
||||
page = await resolveActivePage(context, '/detail/order/~/costCenter/order');
|
||||
summary.datasets.orders = await syncOrders(page, { incremental: true, resume: true });
|
||||
const orderSyncResult = await syncOrders(page, { incremental: true, resume: true });
|
||||
summary.datasets.orders = orderSyncResult;
|
||||
const latestOrders = loadCurrentState('orders', datasets.orders.uniqueKey);
|
||||
const orderIdsForDetail = collectValidOrderIds(latestOrders.records || []);
|
||||
summary.datasets.orderDetails = await syncOrderDetails(page, orderIdsForDetail, { resume: true });
|
||||
@@ -790,10 +1005,12 @@ async function syncCustomerDetails(page, options = {}) {
|
||||
async function syncOrders(page, options = {}) {
|
||||
await runtimeCheckpoint('同步订单');
|
||||
const dataset = datasets.orders;
|
||||
const { incremental = false, resume = false } = options;
|
||||
const { incremental = false, resume = false, hot = false } = options;
|
||||
let windows;
|
||||
|
||||
if (!incremental) {
|
||||
if (hot) {
|
||||
windows = buildTodayOrderWindow();
|
||||
} else if (!incremental) {
|
||||
windows = buildMonthlyDateWindows(config.orderStartDate);
|
||||
} else {
|
||||
windows = await buildIncrementalOrderWindows();
|
||||
@@ -808,7 +1025,18 @@ async function syncOrders(page, options = {}) {
|
||||
}
|
||||
}
|
||||
|
||||
const previousState = loadCurrentState(dataset.name, dataset.uniqueKey);
|
||||
const previousRecords = previousState.records || [];
|
||||
const previousOrderMap = new Map(previousRecords.map((record) => [String(record.orderId || '').trim(), record]));
|
||||
const allNormalizedRecords = [];
|
||||
const hotStats = {
|
||||
pagesScanned: 0,
|
||||
stableRows: 0,
|
||||
newRows: 0,
|
||||
changedRows: 0,
|
||||
stoppedEarly: false,
|
||||
stopReason: '',
|
||||
};
|
||||
|
||||
for (const window of windows) {
|
||||
await runtimeCheckpoint(`订单窗口 ${window.start} ~ ${window.end}`);
|
||||
@@ -833,6 +1061,8 @@ async function syncOrders(page, options = {}) {
|
||||
}
|
||||
|
||||
let records = [];
|
||||
let stableRowsInARow = 0;
|
||||
let stablePagesInARow = 0;
|
||||
if (shouldContinueScrape) {
|
||||
records = await scrapePagedTable(page, dataset, window, {
|
||||
onPage: async ({ pageNum, pageRows }) => {
|
||||
@@ -842,7 +1072,47 @@ async function syncOrders(page, options = {}) {
|
||||
await upsertOrders(normalizedPageRows);
|
||||
}
|
||||
await saveOrdersCheckpoint(dataset, window, pageNum, windowNormalizedRecords);
|
||||
|
||||
if (hot) {
|
||||
hotStats.pagesScanned += 1;
|
||||
const pageSummary = summarizeHotPage(previousOrderMap, normalizedPageRows);
|
||||
hotStats.stableRows += pageSummary.stableCount;
|
||||
hotStats.newRows += pageSummary.newCount;
|
||||
hotStats.changedRows += pageSummary.changedCount;
|
||||
|
||||
if (pageSummary.changedCount === 0 && pageSummary.newCount === 0) {
|
||||
stablePagesInARow += 1;
|
||||
} else {
|
||||
stablePagesInARow = 0;
|
||||
}
|
||||
|
||||
if (pageSummary.stableCount === normalizedPageRows.length && normalizedPageRows.length > 0) {
|
||||
stableRowsInARow += normalizedPageRows.length;
|
||||
} else {
|
||||
stableRowsInARow = 0;
|
||||
}
|
||||
}
|
||||
},
|
||||
shouldStop: hot
|
||||
? async ({ pageNum }) => {
|
||||
if (pageNum >= config.hotOrderMaxPagesPerRun) {
|
||||
hotStats.stoppedEarly = true;
|
||||
hotStats.stopReason = `max_pages:${config.hotOrderMaxPagesPerRun}`;
|
||||
return true;
|
||||
}
|
||||
if (stableRowsInARow >= config.hotOrderStableThreshold) {
|
||||
hotStats.stoppedEarly = true;
|
||||
hotStats.stopReason = `stable_rows:${stableRowsInARow}`;
|
||||
return true;
|
||||
}
|
||||
if (stablePagesInARow >= config.hotOrderStablePageThreshold) {
|
||||
hotStats.stoppedEarly = true;
|
||||
hotStats.stopReason = `stable_pages:${stablePagesInARow}`;
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
: undefined,
|
||||
});
|
||||
}
|
||||
|
||||
@@ -856,7 +1126,14 @@ async function syncOrders(page, options = {}) {
|
||||
allNormalizedRecords.push(...windowNormalizedRecords);
|
||||
}
|
||||
|
||||
return persistNormalizedDataset(dataset, dedupeByHash(allNormalizedRecords));
|
||||
const normalizedRecords = dedupeByHash(allNormalizedRecords);
|
||||
const changedOrderIds = computeChangedOrderIds(previousRecords, normalizedRecords);
|
||||
const persisted = persistNormalizedDataset(dataset, normalizedRecords);
|
||||
return {
|
||||
...persisted,
|
||||
changedOrderIds,
|
||||
hot: hot ? hotStats : undefined,
|
||||
};
|
||||
}
|
||||
|
||||
async function buildIncrementalOrderWindows() {
|
||||
@@ -996,7 +1273,7 @@ async function syncBills(page, options = {}) {
|
||||
async function syncMessages(page, options = {}) {
|
||||
await runtimeCheckpoint('同步消息');
|
||||
const dataset = datasets.messages;
|
||||
const { incremental = false, resume = false } = options;
|
||||
const { incremental = false, resume = false, hot = false } = options;
|
||||
await page.goto(dataset.url, { waitUntil: 'domcontentloaded' });
|
||||
await waitUntilReady(page, dataset.heading);
|
||||
await trySetPageSize(page, dataset.pageSize);
|
||||
@@ -1016,6 +1293,14 @@ async function syncMessages(page, options = {}) {
|
||||
}
|
||||
|
||||
let records = [];
|
||||
let hotWatermark = null;
|
||||
if (hot && hasDbConfig()) {
|
||||
const latestMessageTime = await getLatestMessageTimeFromDb();
|
||||
const latest = parseDbDateTime(latestMessageTime);
|
||||
if (latest) {
|
||||
hotWatermark = addMinutes(latest, -config.hotMessageOverlapMinutes);
|
||||
}
|
||||
}
|
||||
if (shouldContinueScrape) {
|
||||
records = await scrapePagedTable(page, dataset, {}, {
|
||||
onPage: async ({ pageNum, pageRows }) => {
|
||||
@@ -1027,26 +1312,41 @@ async function syncMessages(page, options = {}) {
|
||||
await saveMessagesCheckpoint(dataset, pageNum, allNormalizedRecords);
|
||||
},
|
||||
skipInitialPage: resumeFromPage > 0,
|
||||
shouldStop: hot
|
||||
? async ({ pageNum, pageRows }) => {
|
||||
if (pageNum >= config.hotMessageMaxPagesPerRun) {
|
||||
return true;
|
||||
}
|
||||
if (!hotWatermark) {
|
||||
return false;
|
||||
}
|
||||
const normalizedPageRows = normalizeDatasetRecords(dataset, pageRows, {});
|
||||
return normalizedPageRows.length > 0
|
||||
&& normalizedPageRows.every((record) => !isAfterLatestMessageTime(record, hotWatermark));
|
||||
}
|
||||
: undefined,
|
||||
});
|
||||
}
|
||||
|
||||
if (resumeFromPage === 0) {
|
||||
allNormalizedRecords = normalizeDatasetRecords(dataset, records, {});
|
||||
}
|
||||
if (incremental && hasDbConfig()) {
|
||||
if ((incremental || hot) && hasDbConfig()) {
|
||||
try {
|
||||
const latestMessageTime = await getLatestMessageTimeFromDb();
|
||||
if (latestMessageTime) {
|
||||
const latest = parseDbDateTime(latestMessageTime);
|
||||
if (latest) {
|
||||
const watermark = subtractDays(latest, config.messageIncrementalOverlapDays);
|
||||
const watermark = hot
|
||||
? addMinutes(latest, -config.hotMessageOverlapMinutes)
|
||||
: subtractDays(latest, config.messageIncrementalOverlapDays);
|
||||
const before = allNormalizedRecords.length;
|
||||
allNormalizedRecords = allNormalizedRecords.filter((record) => isAfterLatestMessageTime(record, watermark));
|
||||
console.log(`[增量模式] 消息按时间过滤: ${before} -> ${allNormalizedRecords.length} (db_last=${latestMessageTime}, overlap=${config.messageIncrementalOverlapDays}d)`);
|
||||
console.log(`[${hot ? '高频模式' : '增量模式'}] 消息按时间过滤: ${before} -> ${allNormalizedRecords.length} (db_last=${latestMessageTime}, overlap=${hot ? `${config.hotMessageOverlapMinutes}m` : `${config.messageIncrementalOverlapDays}d`})`);
|
||||
}
|
||||
}
|
||||
} catch (error) {
|
||||
console.error('[增量模式] 查询数据库最新消息时间失败:', error.message);
|
||||
console.error(`[${hot ? '高频模式' : '增量模式'}] 查询数据库最新消息时间失败:`, error.message);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1412,10 +1712,11 @@ async function syncOrderDetails(page, cachedOrderIds, options = {}) {
|
||||
if (!isValidOrderId(detail.orderId)) {
|
||||
detail.orderId = target.orderId;
|
||||
}
|
||||
allDetails.push({ ...detail, __context: {} });
|
||||
const detailContext = { detailSyncedAt: new Date().toISOString() };
|
||||
allDetails.push({ ...detail, __context: detailContext });
|
||||
await saveOrderDetailsCheckpoint(dataset, index + 1, allDetails);
|
||||
if (hasDbConfig()) {
|
||||
const normalizedDetail = normalizeDatasetRecords(dataset, [{ ...detail, __context: {} }], {});
|
||||
const normalizedDetail = normalizeDatasetRecords(dataset, [{ ...detail, __context: detailContext }], detailContext);
|
||||
await upsertOrderDetails(normalizedDetail);
|
||||
}
|
||||
|
||||
@@ -1500,7 +1801,7 @@ async function waitUntilReady(page, heading, timeout = 120000, options = {}) {
|
||||
}
|
||||
|
||||
async function scrapePagedTable(page, dataset, context, options = {}) {
|
||||
const { onPage, skipInitialPage = false } = options;
|
||||
const { onPage, skipInitialPage = false, shouldStop } = options;
|
||||
const pages = [];
|
||||
const visited = new Set();
|
||||
let shouldSkipCurrentPage = skipInitialPage;
|
||||
@@ -1533,6 +1834,11 @@ async function scrapePagedTable(page, dataset, context, options = {}) {
|
||||
await onPage({ pageData, pageNum, pageRows });
|
||||
}
|
||||
|
||||
if (shouldStop && await shouldStop({ pageData, pageNum, pageRows, pages })) {
|
||||
console.log(`[抓取] 满足停止条件,在第${pageNum}页提前停止`);
|
||||
break;
|
||||
}
|
||||
|
||||
const moved = await gotoNextPage(page);
|
||||
if (!moved) {
|
||||
console.log(`[抓取] 翻页失败或已到最后一页,停止`);
|
||||
|
||||
Reference in New Issue
Block a user