客户5分钟更新一次
This commit is contained in:
@@ -211,6 +211,19 @@ export async function getLatestOrderTimeFromDb() {
|
|||||||
return queryLatestValue('SELECT MAX(order_time) AS latest_time FROM aps_order WHERE source_id = ?', [config.sourceId]);
|
return queryLatestValue('SELECT MAX(order_time) AS latest_time FROM aps_order WHERE source_id = ?', [config.sourceId]);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
export async function customerExists(accountId) {
|
||||||
|
const normalized = safeString(accountId);
|
||||||
|
if (!normalized) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
await ensureCustomerLifecycleColumns();
|
||||||
|
const [rows] = await getPool().execute(
|
||||||
|
'SELECT 1 AS matched FROM aps_customer WHERE source_id = ? AND account_id = ? LIMIT 1',
|
||||||
|
[config.sourceId, normalized],
|
||||||
|
);
|
||||||
|
return Array.isArray(rows) && rows.length > 0;
|
||||||
|
}
|
||||||
|
|
||||||
export async function getLatestBillConsumptionTimeFromDb() {
|
export async function getLatestBillConsumptionTimeFromDb() {
|
||||||
await ensureSourceColumn('aps_bill');
|
await ensureSourceColumn('aps_bill');
|
||||||
return queryLatestValue('SELECT MAX(consumption_time) AS latest_time FROM aps_bill WHERE source_id = ?', [config.sourceId]);
|
return queryLatestValue('SELECT MAX(consumption_time) AS latest_time FROM aps_bill WHERE source_id = ?', [config.sourceId]);
|
||||||
|
|||||||
@@ -7,6 +7,7 @@ import { config, datasets } from './config.js';
|
|||||||
import { sendLoginAlert, sendRuntimeErrorAlert } from './notify.js';
|
import { sendLoginAlert, sendRuntimeErrorAlert } from './notify.js';
|
||||||
import {
|
import {
|
||||||
closeDbPool,
|
closeDbPool,
|
||||||
|
customerExists,
|
||||||
getExistingMessageIds,
|
getExistingMessageIds,
|
||||||
getExistingMessageFingerprints,
|
getExistingMessageFingerprints,
|
||||||
getLatestBillConsumptionTimeFromDb,
|
getLatestBillConsumptionTimeFromDb,
|
||||||
@@ -861,6 +862,8 @@ export async function syncHot(options = {}) {
|
|||||||
summary.datasets.orderDetails = await syncOrderDetails(page, orderIdsForDetail, { resume: options.resume === true });
|
summary.datasets.orderDetails = await syncOrderDetails(page, orderIdsForDetail, { resume: options.resume === true });
|
||||||
page = await resolveActivePage(context, '/message');
|
page = await resolveActivePage(context, '/message');
|
||||||
summary.datasets.messages = await syncMessages(page, { incremental: true, resume: options.resume === true, hot: true });
|
summary.datasets.messages = await syncMessages(page, { incremental: true, resume: options.resume === true, hot: true });
|
||||||
|
page = await resolveActivePage(context, '/detail/my_customer/~/customer/list');
|
||||||
|
summary.datasets.customerHot = await syncOneCustomerHot(page, { resume: options.resume === true });
|
||||||
summary.finishedAt = new Date().toISOString();
|
summary.finishedAt = new Date().toISOString();
|
||||||
|
|
||||||
const stamp = nowStamp();
|
const stamp = nowStamp();
|
||||||
@@ -883,6 +886,48 @@ export async function syncHot(options = {}) {
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async function syncOneCustomerHot(page, options = {}) {
|
||||||
|
await runtimeCheckpoint('高频同步客户');
|
||||||
|
const dataset = datasets.customers;
|
||||||
|
await page.goto(dataset.url, { waitUntil: 'domcontentloaded' });
|
||||||
|
await waitUntilReady(page, dataset.heading);
|
||||||
|
await trySetPageSize(page, dataset.pageSize);
|
||||||
|
|
||||||
|
const pageData = await extractTable(page);
|
||||||
|
const normalizedRows = normalizeDatasetRecords(dataset, pageData.rows || [], { pageNum: 1 });
|
||||||
|
const target = normalizedRows.find((record) => String(record.accountId || '').trim());
|
||||||
|
if (!target) {
|
||||||
|
return { skipped: true, reason: 'no_customer_found' };
|
||||||
|
}
|
||||||
|
|
||||||
|
if (await customerExists(target.accountId)) {
|
||||||
|
console.log(`[客户高频] accountId=${target.accountId} 已存在,停止本轮客户抓取`);
|
||||||
|
return { skipped: true, reason: 'customer_exists', accountId: target.accountId };
|
||||||
|
}
|
||||||
|
|
||||||
|
await upsertCustomers([target]);
|
||||||
|
|
||||||
|
const clicked = await clickCustomerDetailFromListWithRetry(page, target);
|
||||||
|
if (!clicked) {
|
||||||
|
return { skipped: false, inserted: true, accountId: target.accountId, detail: 'click_failed' };
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
await page.waitForFunction(
|
||||||
|
(text) => document.body && document.body.innerText.includes(text),
|
||||||
|
'详情',
|
||||||
|
{ timeout: 15000 },
|
||||||
|
);
|
||||||
|
await sleep(1000);
|
||||||
|
const detail = await extractCustomerDetail(page);
|
||||||
|
const normalizedDetail = normalizeDatasetRecords(datasets.customerDetails, [{ ...detail, accountId: target.accountId, loginName: target.loginName }], { accountId: target.accountId });
|
||||||
|
await upsertCustomerDetails(normalizedDetail);
|
||||||
|
return { skipped: false, inserted: true, accountId: target.accountId, detail: 'ok' };
|
||||||
|
} catch (error) {
|
||||||
|
return { skipped: false, inserted: true, accountId: target.accountId, detail: `extract_failed:${error.message}` };
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
export async function syncAllIncremental() {
|
export async function syncAllIncremental() {
|
||||||
const runtimeController = getRuntimeController();
|
const runtimeController = getRuntimeController();
|
||||||
runtimeController.bind();
|
runtimeController.bind();
|
||||||
|
|||||||
Reference in New Issue
Block a user