diff --git a/aliyun-sync/aliyun-aps-sync/src/db.js b/aliyun-sync/aliyun-aps-sync/src/db.js index 0a23ad5..4e3b08d 100644 --- a/aliyun-sync/aliyun-aps-sync/src/db.js +++ b/aliyun-sync/aliyun-aps-sync/src/db.js @@ -211,6 +211,19 @@ export async function getLatestOrderTimeFromDb() { return queryLatestValue('SELECT MAX(order_time) AS latest_time FROM aps_order WHERE source_id = ?', [config.sourceId]); } +export async function customerExists(accountId) { + const normalized = safeString(accountId); + if (!normalized) { + return false; + } + await ensureCustomerLifecycleColumns(); + const [rows] = await getPool().execute( + 'SELECT 1 AS matched FROM aps_customer WHERE source_id = ? AND account_id = ? LIMIT 1', + [config.sourceId, normalized], + ); + return Array.isArray(rows) && rows.length > 0; +} + export async function getLatestBillConsumptionTimeFromDb() { await ensureSourceColumn('aps_bill'); return queryLatestValue('SELECT MAX(consumption_time) AS latest_time FROM aps_bill WHERE source_id = ?', [config.sourceId]); diff --git a/aliyun-sync/aliyun-aps-sync/src/sync.js b/aliyun-sync/aliyun-aps-sync/src/sync.js index e8d29d8..e557c69 100644 --- a/aliyun-sync/aliyun-aps-sync/src/sync.js +++ b/aliyun-sync/aliyun-aps-sync/src/sync.js @@ -7,6 +7,7 @@ import { config, datasets } from './config.js'; import { sendLoginAlert, sendRuntimeErrorAlert } from './notify.js'; import { closeDbPool, + customerExists, getExistingMessageIds, getExistingMessageFingerprints, getLatestBillConsumptionTimeFromDb, @@ -861,6 +862,8 @@ export async function syncHot(options = {}) { summary.datasets.orderDetails = await syncOrderDetails(page, orderIdsForDetail, { resume: options.resume === true }); page = await resolveActivePage(context, '/message'); summary.datasets.messages = await syncMessages(page, { incremental: true, resume: options.resume === true, hot: true }); + page = await resolveActivePage(context, '/detail/my_customer/~/customer/list'); + summary.datasets.customerHot = await syncOneCustomerHot(page, { resume: options.resume === true }); summary.finishedAt = new Date().toISOString(); const stamp = nowStamp(); @@ -883,6 +886,48 @@ export async function syncHot(options = {}) { }); } +async function syncOneCustomerHot(page, options = {}) { + await runtimeCheckpoint('高频同步客户'); + const dataset = datasets.customers; + await page.goto(dataset.url, { waitUntil: 'domcontentloaded' }); + await waitUntilReady(page, dataset.heading); + await trySetPageSize(page, dataset.pageSize); + + const pageData = await extractTable(page); + const normalizedRows = normalizeDatasetRecords(dataset, pageData.rows || [], { pageNum: 1 }); + const target = normalizedRows.find((record) => String(record.accountId || '').trim()); + if (!target) { + return { skipped: true, reason: 'no_customer_found' }; + } + + if (await customerExists(target.accountId)) { + console.log(`[客户高频] accountId=${target.accountId} 已存在,停止本轮客户抓取`); + return { skipped: true, reason: 'customer_exists', accountId: target.accountId }; + } + + await upsertCustomers([target]); + + const clicked = await clickCustomerDetailFromListWithRetry(page, target); + if (!clicked) { + return { skipped: false, inserted: true, accountId: target.accountId, detail: 'click_failed' }; + } + + try { + await page.waitForFunction( + (text) => document.body && document.body.innerText.includes(text), + '详情', + { timeout: 15000 }, + ); + await sleep(1000); + const detail = await extractCustomerDetail(page); + const normalizedDetail = normalizeDatasetRecords(datasets.customerDetails, [{ ...detail, accountId: target.accountId, loginName: target.loginName }], { accountId: target.accountId }); + await upsertCustomerDetails(normalizedDetail); + return { skipped: false, inserted: true, accountId: target.accountId, detail: 'ok' }; + } catch (error) { + return { skipped: false, inserted: true, accountId: target.accountId, detail: `extract_failed:${error.message}` }; + } +} + export async function syncAllIncremental() { const runtimeController = getRuntimeController(); runtimeController.bind();