import mysql from 'mysql2/promise'; import { config } from './config.js'; let pool = null; let customerMapCache = null; const MESSAGE_TABLE_DDL = ` CREATE TABLE IF NOT EXISTS aliyun_aps_messages ( id bigint NOT NULL AUTO_INCREMENT, source_id varchar(64) NOT NULL DEFAULT 'default' COMMENT '数据来源账号标识', msg_id varchar(128) NULL DEFAULT NULL COMMENT '消息原始ID', title text NULL COMMENT '消息标题', content text NULL COMMENT '消息内容', msg_type varchar(64) NULL DEFAULT NULL COMMENT '消息类型', from_app varchar(128) NULL DEFAULT NULL COMMENT '来源应用', biz_code varchar(128) NULL DEFAULT NULL COMMENT '业务编码', msg_channel varchar(64) NULL DEFAULT NULL COMMENT '消息通道', category_id varchar(64) NULL DEFAULT NULL COMMENT '分类ID', category_name varchar(128) NULL DEFAULT NULL COMMENT '分类名称', lv1_category_id varchar(64) NULL DEFAULT NULL COMMENT '一级分类ID', lv2_category_id varchar(64) NULL DEFAULT NULL COMMENT '二级分类ID', lv3_category_id varchar(64) NULL DEFAULT NULL COMMENT '三级分类ID', message_classification varchar(255) NULL DEFAULT NULL COMMENT '归类结果', customer_name varchar(255) NULL DEFAULT NULL COMMENT '客户名称', order_no varchar(128) NULL DEFAULT NULL COMMENT '订单号', status varchar(32) NULL DEFAULT NULL COMMENT '消息状态(已读/未读)', gmt_created varchar(64) NULL DEFAULT NULL COMMENT '消息创建时间', gmt_modified varchar(64) NULL DEFAULT NULL COMMENT '消息修改时间', extra_data json NULL COMMENT '其他字段(原始JSON)', crawl_time datetime NULL DEFAULT CURRENT_TIMESTAMP COMMENT '爬取时间', PRIMARY KEY (id), UNIQUE KEY uk_source_msg_id (source_id, msg_id), KEY idx_source_message_time (source_id, gmt_modified, gmt_created) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='阿里云APS站内消息' `; let customerLifecycleEnsured = false; const ensuredSourceTables = new Set(); function hasDbConfig() { return Boolean(config.db.host && config.db.user && config.db.database); } function getPool() { if (!hasDbConfig()) { throw new Error('未配置数据库连接信息,请设置 ALIYUN_APS_DB_HOST / USER / PASSWORD / NAME'); } if (pool) { return pool; } pool = mysql.createPool({ host: config.db.host, port: config.db.port, user: config.db.user, password: config.db.password, database: config.db.database, charset: config.db.charset, connectionLimit: config.db.connectionLimit, connectTimeout: config.db.connectTimeout, waitForConnections: true, }); return pool; } function normalizeMonth(value) { const normalized = String(value || '').trim(); if (!normalized) { return null; } if (/^\d{4}-\d{2}$/.test(normalized)) { return normalized; } if (/^\d{6}$/.test(normalized)) { return `${normalized.slice(0, 4)}-${normalized.slice(4, 6)}`; } return null; } function safeString(value) { if (value == null) { return null; } const normalized = String(value).trim(); return normalized ? normalized : null; } function safeNumber(value) { if (value == null || value === '') { return null; } const normalized = Number.parseFloat(String(value).replace(/,/g, '').trim()); return Number.isFinite(normalized) ? normalized : null; } function safeDate(value) { const normalized = safeString(value); if (!normalized) { return null; } if (normalized === '- -' || normalized === '--' || normalized === '-') { return null; } if (/^\d{4}-\d{2}-\d{2}$/.test(normalized)) { return normalized; } const parsed = new Date(normalized.replace(/\./g, '-').replace(' ', 'T')); if (Number.isNaN(parsed.getTime())) { return null; } const year = parsed.getFullYear(); const month = String(parsed.getMonth() + 1).padStart(2, '0'); const day = String(parsed.getDate()).padStart(2, '0'); return `${year}-${month}-${day}`; } function safeDateTime(value) { const normalized = safeString(value); if (!normalized) { return null; } if (normalized === '- -' || normalized === '--' || normalized === '-') { return null; } const parsed = new Date(normalized.replace(/\./g, '-').replace(' ', 'T')); if (Number.isNaN(parsed.getTime())) { return null; } const year = parsed.getFullYear(); const month = String(parsed.getMonth() + 1).padStart(2, '0'); const day = String(parsed.getDate()).padStart(2, '0'); const hours = String(parsed.getHours()).padStart(2, '0'); const minutes = String(parsed.getMinutes()).padStart(2, '0'); const seconds = String(parsed.getSeconds()).padStart(2, '0'); return `${year}-${month}-${day} ${hours}:${minutes}:${seconds}`; } async function getCustomerMap() { if (customerMapCache) { return customerMapCache; } await ensureSourceColumn('aps_customer'); const [rows] = await getPool().execute('SELECT account_id, login_name FROM aps_customer WHERE source_id = ?', [config.sourceId]); const map = new Map(); for (const row of rows) { const loginName = safeString(row.login_name); const accountId = safeString(row.account_id); if (!loginName || !accountId) { continue; } map.set(loginName, accountId); map.set(loginName.replace(/\s+/g, ''), accountId); } customerMapCache = map; return customerMapCache; } function resolveCustomerAccountId(customerMap, customerAccount) { const normalized = safeString(customerAccount); if (!normalized) { return null; } return customerMap.get(normalized) || customerMap.get(normalized.replace(/\s+/g, '')) || null; } async function queryLatestValue(sql, params = []) { const [rows] = await getPool().execute(sql, params); const row = Array.isArray(rows) ? rows[0] : null; if (!row) { return null; } const latest = row.latest_time; if (!latest) { return null; } if (latest instanceof Date) { return latest.toISOString().slice(0, 19).replace('T', ' '); } return String(latest); } export async function getLatestOrderTimeFromDb() { await ensureSourceColumn('aps_order'); return queryLatestValue('SELECT MAX(order_time) AS latest_time FROM aps_order WHERE source_id = ?', [config.sourceId]); } export async function getLatestBillConsumptionTimeFromDb() { await ensureSourceColumn('aps_bill'); return queryLatestValue('SELECT MAX(consumption_time) AS latest_time FROM aps_bill WHERE source_id = ?', [config.sourceId]); } export async function getLatestMessageTimeFromDb() { await ensureMessagesTable(); return queryLatestValue("SELECT MAX(COALESCE(NULLIF(gmt_modified, ''), NULLIF(gmt_created, ''))) AS latest_time FROM aliyun_aps_messages WHERE source_id = ?", [config.sourceId]); } export async function closeDbPool() { if (!pool) { return; } await pool.end(); pool = null; customerMapCache = null; } export async function ensureMessagesTable() { await getPool().query(MESSAGE_TABLE_DDL); await ensureSourceColumn('aliyun_aps_messages'); } export async function ensureCustomerLifecycleColumns() { if (customerLifecycleEnsured) { return; } await ensureSourceColumn('aps_customer'); await ensureColumnExists('aps_customer', 'active', "ALTER TABLE aps_customer ADD COLUMN active TINYINT(1) NOT NULL DEFAULT 1 COMMENT '是否有效 1=有效 0=释放'"); await ensureColumnExists('aps_customer', 'status', "ALTER TABLE aps_customer ADD COLUMN status VARCHAR(32) DEFAULT 'active' COMMENT '客户状态'"); await ensureColumnExists('aps_customer', 'released_at', "ALTER TABLE aps_customer ADD COLUMN released_at DATETIME NULL COMMENT '释放时间'"); await ensureColumnExists('aps_customer', 'release_reason', "ALTER TABLE aps_customer ADD COLUMN release_reason VARCHAR(255) NULL COMMENT '释放原因'"); customerLifecycleEnsured = true; } async function ensureSourceColumn(tableName) { if (ensuredSourceTables.has(tableName)) { return; } await ensureColumnExists(tableName, 'source_id', `ALTER TABLE ${tableName} ADD COLUMN source_id VARCHAR(64) NOT NULL DEFAULT 'default' COMMENT '数据来源账号标识'`); ensuredSourceTables.add(tableName); } async function ensureColumnExists(tableName, columnName, alterSql) { const [rows] = await getPool().execute( `SELECT COUNT(*) AS cnt FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_SCHEMA = ? AND TABLE_NAME = ? AND COLUMN_NAME = ?`, [config.db.database, tableName, columnName], ); const exists = Array.isArray(rows) && Number(rows[0]?.cnt || 0) > 0; if (exists) { return; } await getPool().query(alterSql); } export async function upsertCustomers(records) { if (!records?.length) { return { inserted: 0 }; } await ensureCustomerLifecycleColumns(); const sql = ` INSERT INTO aps_customer ( source_id, account_id, login_name, real_name, report_source, report_type, trade_mode, real_name_status, relation_date, follow_staff, payment_notice_status, invite_register_type, is_new_customer, performance_start_point_reached, customer_category, remark, no_consumption_months, planned_release_time, planned_release_reason, active, status, released_at, release_reason ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, 1, 'active', NULL, NULL) ON DUPLICATE KEY UPDATE login_name=VALUES(login_name), real_name=VALUES(real_name), report_source=VALUES(report_source), report_type=VALUES(report_type), trade_mode=VALUES(trade_mode), real_name_status=VALUES(real_name_status), relation_date=VALUES(relation_date), follow_staff=VALUES(follow_staff), payment_notice_status=VALUES(payment_notice_status), invite_register_type=VALUES(invite_register_type), is_new_customer=VALUES(is_new_customer), performance_start_point_reached=VALUES(performance_start_point_reached), customer_category=VALUES(customer_category), remark=VALUES(remark), no_consumption_months=VALUES(no_consumption_months), planned_release_time=VALUES(planned_release_time), planned_release_reason=VALUES(planned_release_reason), active=1, status='active', released_at=NULL, release_reason=NULL `; for (const record of records) { const accountId = safeString(record.accountId); const loginName = safeString(record.loginName); if (!accountId || !loginName) { continue; } await getPool().execute(sql, [ config.sourceId, accountId, loginName, safeString(record.realName), safeString(record.reportSource), safeString(record.reportType), safeString(record.tradeMode), safeString(record.authStatus), safeDateTime(record.relationTime), safeString(record.owner), safeString(record.paymentNoticeStatus), safeString(record.inviteType), safeString(record.isNewCustomer) === '是' ? 1 : safeString(record.isNewCustomer) === '否' ? 0 : null, safeString(record.isPerformanceQualified) === '是' ? 1 : safeString(record.isPerformanceQualified) === '否' ? 0 : null, safeString(record.customerCategory), safeString(record.remark), safeNumber(record.inactiveMonths), safeDate(record.releasePlanTime), safeString(record.releasePlanReason), ]); } return { inserted: records.length }; } export async function upsertCustomerDetails(records) { if (!records?.length) { return { inserted: 0 }; } await ensureCustomerLifecycleColumns(); const sql = ` UPDATE aps_customer SET customer_name = ?, customer_type = ?, customer_source = ?, email = ?, phone = ?, department = ?, payment_notice_status = COALESCE(?, payment_notice_status), updated_at = CURRENT_TIMESTAMP WHERE source_id = ? AND account_id = ? `; for (const record of records) { const accountId = safeString(record.accountId); if (!accountId) { continue; } await getPool().execute(sql, [ safeString(record.customerName), safeString(record.customerType), safeString(record.customerSource), safeString(record.email), safeString(record.phone), safeString(record.department), safeString(record.paymentNoticeStatus), config.sourceId, accountId, ]); } return { inserted: records.length }; } async function findCustomerAccountIdByName(customerName) { const normalized = safeString(customerName); if (!normalized) { return null; } const [rows] = await getPool().execute( 'SELECT account_id FROM aps_customer WHERE source_id = ? AND (customer_name = ? OR real_name = ? OR login_name = ?) LIMIT 1', [config.sourceId, normalized, normalized, normalized], ); return Array.isArray(rows) && rows.length > 0 ? safeString(rows[0].account_id) : null; } async function markCustomerReleased(customerName, reason, releasedAt) { const accountId = await findCustomerAccountIdByName(customerName); if (!accountId) { return false; } await ensureCustomerLifecycleColumns(); await getPool().execute( 'UPDATE aps_customer SET active = 0, status = ?, released_at = ?, release_reason = ? WHERE source_id = ? AND account_id = ?', ['released', releasedAt || new Date().toISOString().slice(0, 19).replace('T', ' '), safeString(reason), config.sourceId, accountId], ); return true; } async function markCustomerActive(customerName) { const accountId = await findCustomerAccountIdByName(customerName); if (!accountId) { return false; } await ensureCustomerLifecycleColumns(); await getPool().execute( 'UPDATE aps_customer SET active = 1, status = ?, released_at = NULL, release_reason = NULL WHERE source_id = ? AND account_id = ?', ['active', config.sourceId, accountId], ); return true; } async function applyCustomerLifecycleFromMessage(record) { const title = safeString(record.title) || ''; const content = safeString(record.content) || ''; const customerName = safeString(record.customerName) || ''; if (!customerName) { return; } const text = `${title}\n${content}`; if (/释放/.test(text)) { await markCustomerReleased(customerName, title || content, safeString(record.gmtModified) || safeString(record.gmtCreated)); return; } if (/(关联|报备成功|新增客户|绑定客户)/.test(text)) { await markCustomerActive(customerName); } } export async function upsertOrders(records) { if (!records?.length) { return { inserted: 0 }; } await ensureSourceColumn('aps_order'); const customerMap = await getCustomerMap(); const sql = ` INSERT INTO aps_order ( source_id, order_id, customer_account_id, customer_login_name, customer_category, order_type, original_price_cny, paid_amount_cny, status, order_time, order_month ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ON DUPLICATE KEY UPDATE customer_account_id=VALUES(customer_account_id), customer_login_name=VALUES(customer_login_name), customer_category=VALUES(customer_category), order_type=VALUES(order_type), original_price_cny=VALUES(original_price_cny), paid_amount_cny=VALUES(paid_amount_cny), status=VALUES(status), order_time=VALUES(order_time), order_month=VALUES(order_month) `; for (const record of records) { const orderId = safeString(record.orderId); if (!orderId) { continue; } const customerLoginName = safeString(record.customerAccount) || ''; const accountId = resolveCustomerAccountId(customerMap, customerLoginName); await getPool().execute(sql, [ config.sourceId, orderId, accountId, customerLoginName, safeString(record.customerCategory), safeString(record.orderType), safeNumber(record.orderOriginalPriceCny), safeNumber(record.actualPaidCny), safeString(record.orderStatus), safeString(record.createdAt), safeString(record.createdAt)?.slice(0, 7) || null, ]); } return { inserted: records.length }; } export async function upsertOrderDetails(records) { if (!records?.length) { return { inserted: 0 }; } await ensureSourceColumn('aps_order_detail'); const sql = ` INSERT INTO aps_order_detail ( source_id, order_id, order_type, status, trade_type, customer_category, dealer_name, dealer_uid, customer_type, opportunity_id, payment_time, order_time, product_name, product_code, original_price_cny, paid_amount_cny, discount, payable_amount_cny, coupon_amount_cny ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ON DUPLICATE KEY UPDATE order_type=VALUES(order_type), status=VALUES(status), trade_type=VALUES(trade_type), customer_category=VALUES(customer_category), dealer_name=VALUES(dealer_name), dealer_uid=VALUES(dealer_uid), customer_type=VALUES(customer_type), opportunity_id=VALUES(opportunity_id), payment_time=VALUES(payment_time), order_time=VALUES(order_time), product_name=VALUES(product_name), product_code=VALUES(product_code), original_price_cny=VALUES(original_price_cny), paid_amount_cny=VALUES(paid_amount_cny), discount=VALUES(discount), payable_amount_cny=VALUES(payable_amount_cny), coupon_amount_cny=VALUES(coupon_amount_cny) `; for (const record of records) { const orderId = safeString(record.orderId); if (!orderId) { continue; } await getPool().execute(sql, [ config.sourceId, orderId, safeString(record.orderType), safeString(record.status), safeString(record.tradeType), safeString(record.customerCategory), safeString(record.dealerName), safeString(record.dealerUid), safeString(record.customerType), safeString(record.opportunityId), safeDateTime(record.paymentTime), safeDateTime(record.orderTime), safeString(record.productName), safeString(record.productCode), safeNumber(record.originalPriceCny), safeNumber(record.paidAmountCny), safeNumber(record.discount), safeNumber(record.payableAmountCny), safeNumber(record.couponAmountCny), ]); } return { inserted: records.length }; } export async function upsertBills(records) { if (!records?.length) { return { inserted: 0 }; } await ensureSourceColumn('aps_bill'); const customerMap = await getCustomerMap(); const selectSql = ` SELECT id FROM aps_bill WHERE source_id = ? AND billing_month = ? AND commission_month = ? AND customer_login_name = ? AND consumption_time = ? AND product_name = ? AND original_price_cny <=> ? LIMIT 1 `; const insertSql = ` INSERT INTO aps_bill ( source_id, billing_month, customer_account_id, customer_login_name, bill_type, consumption_time, customer_category, product_name, product_category, original_price_cny, customer_payable_amount_cny, commission_month, included_in_performance, rebated, invite_register_type, service_start_time, service_end_time ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) `; const updateSql = ` UPDATE aps_bill SET customer_account_id = ?, bill_type = ?, customer_category = ?, product_category = ?, customer_payable_amount_cny = ?, included_in_performance = ?, rebated = ?, invite_register_type = ?, service_start_time = ?, service_end_time = ? WHERE id = ? `; for (const record of records) { const billingMonth = normalizeMonth(record.billingMonth); const commissionMonth = normalizeMonth(record.commissionMonth); if (!billingMonth || !commissionMonth) { continue; } const customerLoginName = safeString(record.customerAccount) || ''; const accountId = resolveCustomerAccountId(customerMap, customerLoginName); const productName = safeString(record.productName); const consumptionTime = safeString(record.consumeDate); const originalPrice = safeNumber(record.originalPriceCny); const [rows] = await getPool().execute(selectSql, [ config.sourceId, billingMonth, commissionMonth, customerLoginName, consumptionTime, productName, originalPrice, ]); if (Array.isArray(rows) && rows.length > 0) { await getPool().execute(updateSql, [ accountId, safeString(record.billType), safeString(record.customerCategory), safeString(record.productCategory), safeNumber(record.customerPayableCny), safeString(record.countsForPerformance), safeString(record.commissionable), safeString(record.inviteType), safeString(record.serviceStartAt), safeString(record.serviceEndAt), rows[0].id, ]); continue; } await getPool().execute(insertSql, [ config.sourceId, billingMonth, accountId, customerLoginName, safeString(record.billType), consumptionTime, safeString(record.customerCategory), productName, safeString(record.productCategory), originalPrice, safeNumber(record.customerPayableCny), commissionMonth, safeString(record.countsForPerformance), safeString(record.commissionable), safeString(record.inviteType), safeString(record.serviceStartAt), safeString(record.serviceEndAt), ]); } return { inserted: records.length }; } export async function upsertMessages(records) { if (!records?.length) { return { inserted: 0 }; } await ensureMessagesTable(); const sql = ` INSERT INTO aliyun_aps_messages ( source_id, msg_id, title, content, msg_type, from_app, biz_code, msg_channel, category_id, category_name, lv1_category_id, lv2_category_id, lv3_category_id, message_classification, customer_name, order_no, status, gmt_created, gmt_modified, extra_data ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ON DUPLICATE KEY UPDATE title=VALUES(title), content=VALUES(content), msg_type=VALUES(msg_type), from_app=VALUES(from_app), biz_code=VALUES(biz_code), msg_channel=VALUES(msg_channel), category_id=VALUES(category_id), category_name=VALUES(category_name), lv1_category_id=VALUES(lv1_category_id), lv2_category_id=VALUES(lv2_category_id), lv3_category_id=VALUES(lv3_category_id), message_classification=VALUES(message_classification), customer_name=VALUES(customer_name), order_no=VALUES(order_no), status=VALUES(status), gmt_created=VALUES(gmt_created), gmt_modified=VALUES(gmt_modified), extra_data=VALUES(extra_data), crawl_time=CURRENT_TIMESTAMP `; for (const record of records) { const msgId = safeString(record.msgId) || safeString(record.__hash); if (!msgId) { continue; } await getPool().execute(sql, [ config.sourceId, msgId, safeString(record.title), safeString(record.content), safeString(record.msgType), safeString(record.fromApp), safeString(record.bizCode), safeString(record.msgChannel), safeString(record.categoryId), safeString(record.categoryName), safeString(record.lv1CategoryId), safeString(record.lv2CategoryId), safeString(record.lv3CategoryId), safeString(record.messageClassification), safeString(record.customerName), safeString(record.orderNo), safeString(record.status), safeString(record.gmtCreated), safeString(record.gmtModified), JSON.stringify(record.extraData || record), ]); await applyCustomerLifecycleFromMessage(record); } return { inserted: records.length }; } export { hasDbConfig };