From 4af439a6c9d5185e81c1c14995f00311efd676a0 Mon Sep 17 00:00:00 2001 From: ray Date: Mon, 1 Jun 2026 11:25:41 +0800 Subject: [PATCH] =?UTF-8?q?=E6=BA=90=E4=BF=AE=E6=94=B9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- aliyun-sync/aliyun-aps-sync/.env.example | 1 + aliyun-sync/aliyun-aps-sync/README.md | 34 ++++++++++ aliyun-sync/aliyun-aps-sync/src/config.js | 1 + aliyun-sync/aliyun-aps-sync/src/db.js | 78 +++++++++++++++-------- 4 files changed, 89 insertions(+), 25 deletions(-) diff --git a/aliyun-sync/aliyun-aps-sync/.env.example b/aliyun-sync/aliyun-aps-sync/.env.example index 370991f..739f60f 100644 --- a/aliyun-sync/aliyun-aps-sync/.env.example +++ b/aliyun-sync/aliyun-aps-sync/.env.example @@ -1,4 +1,5 @@ ALIYUN_APS_BASE_URL=https://aps.aliyun.com +ALIYUN_APS_SOURCE_ID=default ALIYUN_APS_HEADLESS=false ALIYUN_APS_BROWSER_MODE=launch ALIYUN_APS_BROWSER_CHANNEL= diff --git a/aliyun-sync/aliyun-aps-sync/README.md b/aliyun-sync/aliyun-aps-sync/README.md index 2ed5f4e..ed00595 100644 --- a/aliyun-sync/aliyun-aps-sync/README.md +++ b/aliyun-sync/aliyun-aps-sync/README.md @@ -259,6 +259,7 @@ ALIYUN_APS_HOT_FINAL_STATUSES=已完成,已关闭,已取消,已退款完成 `.env` 需要配置: ```env +ALIYUN_APS_SOURCE_ID=default ALIYUN_APS_DB_HOST= ALIYUN_APS_DB_PORT=3306 ALIYUN_APS_DB_USER= @@ -268,6 +269,39 @@ ALIYUN_APS_DB_CHARSET=utf8mb4 ALIYUN_APS_DB_CONNECTION_LIMIT=5 ``` +### 多账号 source_id + +如果两个 APS 账号写入同一个数据库,每个账号必须配置不同的 `ALIYUN_APS_SOURCE_ID`: + +```env +# 账号 A +ALIYUN_APS_SOURCE_ID=aliyun_account_a + +# 账号 B +ALIYUN_APS_SOURCE_ID=aliyun_account_b +``` + +同步写库时会把 `source_id` 写入: + +- `aps_customer` +- `aps_order` +- `aps_order_detail` +- `aps_bill` +- `aliyun_aps_messages` + +增量水位也会按 `source_id` 查询,避免两个账号互相影响。 + +建议两个账号使用不同项目目录或不同 `data/.browser` 目录,避免本地登录态和 checkpoint 互相覆盖。 + +生产库建议把唯一键调整为 `source_id + 业务唯一键`,例如: + +```sql +-- 示例,实际约束名以生产库为准 +-- aps_order: UNIQUE(source_id, order_id) +-- aps_order_detail: UNIQUE(source_id, order_id) +-- aliyun_aps_messages: UNIQUE(source_id, msg_id) +``` + ## 浏览器配置 默认不再强制使用 Google Chrome。 diff --git a/aliyun-sync/aliyun-aps-sync/src/config.js b/aliyun-sync/aliyun-aps-sync/src/config.js index fcdfdf9..6371207 100644 --- a/aliyun-sync/aliyun-aps-sync/src/config.js +++ b/aliyun-sync/aliyun-aps-sync/src/config.js @@ -36,6 +36,7 @@ const ensureDir = (dirPath) => { export const config = { rootDir, + sourceId: process.env.ALIYUN_APS_SOURCE_ID || 'default', baseUrl: process.env.ALIYUN_APS_BASE_URL || 'https://aps.aliyun.com', headless: toBool(process.env.ALIYUN_APS_HEADLESS, false), browserMode: (process.env.ALIYUN_APS_BROWSER_MODE || 'launch').trim().toLowerCase(), diff --git a/aliyun-sync/aliyun-aps-sync/src/db.js b/aliyun-sync/aliyun-aps-sync/src/db.js index ab74fd0..6f1bf6e 100644 --- a/aliyun-sync/aliyun-aps-sync/src/db.js +++ b/aliyun-sync/aliyun-aps-sync/src/db.js @@ -7,6 +7,7 @@ let customerMapCache = null; const MESSAGE_TABLE_DDL = ` CREATE TABLE IF NOT EXISTS aliyun_aps_messages ( id bigint NOT NULL AUTO_INCREMENT, + source_id varchar(64) NOT NULL DEFAULT 'default' COMMENT '数据来源账号标识', msg_id varchar(128) NULL DEFAULT NULL COMMENT '消息原始ID', title text NULL COMMENT '消息标题', content text NULL COMMENT '消息内容', @@ -28,11 +29,13 @@ CREATE TABLE IF NOT EXISTS aliyun_aps_messages ( extra_data json NULL COMMENT '其他字段(原始JSON)', crawl_time datetime NULL DEFAULT CURRENT_TIMESTAMP COMMENT '爬取时间', PRIMARY KEY (id), - UNIQUE KEY uk_msg_id (msg_id) + UNIQUE KEY uk_source_msg_id (source_id, msg_id), + KEY idx_source_message_time (source_id, gmt_modified, gmt_created) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='阿里云APS站内消息' `; let customerLifecycleEnsured = false; +const ensuredSourceTables = new Set(); function hasDbConfig() { return Boolean(config.db.host && config.db.user && config.db.database); @@ -135,7 +138,8 @@ async function getCustomerMap() { if (customerMapCache) { return customerMapCache; } - const [rows] = await getPool().query('SELECT account_id, login_name FROM aps_customer'); + await ensureSourceColumn('aps_customer'); + const [rows] = await getPool().execute('SELECT account_id, login_name FROM aps_customer WHERE source_id = ?', [config.sourceId]); const map = new Map(); for (const row of rows) { const loginName = safeString(row.login_name); @@ -158,8 +162,8 @@ function resolveCustomerAccountId(customerMap, customerAccount) { return customerMap.get(normalized) || customerMap.get(normalized.replace(/\s+/g, '')) || null; } -async function queryLatestValue(sql) { - const [rows] = await getPool().query(sql); +async function queryLatestValue(sql, params = []) { + const [rows] = await getPool().execute(sql, params); const row = Array.isArray(rows) ? rows[0] : null; if (!row) { return null; @@ -175,15 +179,18 @@ async function queryLatestValue(sql) { } export async function getLatestOrderTimeFromDb() { - return queryLatestValue('SELECT MAX(order_time) AS latest_time FROM aps_order'); + await ensureSourceColumn('aps_order'); + return queryLatestValue('SELECT MAX(order_time) AS latest_time FROM aps_order WHERE source_id = ?', [config.sourceId]); } export async function getLatestBillConsumptionTimeFromDb() { - return queryLatestValue('SELECT MAX(consumption_time) AS latest_time FROM aps_bill'); + await ensureSourceColumn('aps_bill'); + return queryLatestValue('SELECT MAX(consumption_time) AS latest_time FROM aps_bill WHERE source_id = ?', [config.sourceId]); } export async function getLatestMessageTimeFromDb() { - return queryLatestValue("SELECT MAX(COALESCE(NULLIF(gmt_modified, ''), NULLIF(gmt_created, ''))) AS latest_time FROM aliyun_aps_messages"); + await ensureMessagesTable(); + return queryLatestValue("SELECT MAX(COALESCE(NULLIF(gmt_modified, ''), NULLIF(gmt_created, ''))) AS latest_time FROM aliyun_aps_messages WHERE source_id = ?", [config.sourceId]); } export async function closeDbPool() { @@ -197,12 +204,14 @@ export async function closeDbPool() { export async function ensureMessagesTable() { await getPool().query(MESSAGE_TABLE_DDL); + await ensureSourceColumn('aliyun_aps_messages'); } export async function ensureCustomerLifecycleColumns() { if (customerLifecycleEnsured) { return; } + await ensureSourceColumn('aps_customer'); await ensureColumnExists('aps_customer', 'active', "ALTER TABLE aps_customer ADD COLUMN active TINYINT(1) NOT NULL DEFAULT 1 COMMENT '是否有效 1=有效 0=释放'"); await ensureColumnExists('aps_customer', 'status', "ALTER TABLE aps_customer ADD COLUMN status VARCHAR(32) DEFAULT 'active' COMMENT '客户状态'"); await ensureColumnExists('aps_customer', 'released_at', "ALTER TABLE aps_customer ADD COLUMN released_at DATETIME NULL COMMENT '释放时间'"); @@ -210,6 +219,14 @@ export async function ensureCustomerLifecycleColumns() { customerLifecycleEnsured = true; } +async function ensureSourceColumn(tableName) { + if (ensuredSourceTables.has(tableName)) { + return; + } + await ensureColumnExists(tableName, 'source_id', `ALTER TABLE ${tableName} ADD COLUMN source_id VARCHAR(64) NOT NULL DEFAULT 'default' COMMENT '数据来源账号标识'`); + ensuredSourceTables.add(tableName); +} + async function ensureColumnExists(tableName, columnName, alterSql) { const [rows] = await getPool().execute( `SELECT COUNT(*) AS cnt @@ -233,13 +250,13 @@ export async function upsertCustomers(records) { await ensureCustomerLifecycleColumns(); const sql = ` INSERT INTO aps_customer ( - account_id, login_name, real_name, report_source, report_type, trade_mode, + source_id, account_id, login_name, real_name, report_source, report_type, trade_mode, real_name_status, relation_date, follow_staff, payment_notice_status, invite_register_type, is_new_customer, performance_start_point_reached, customer_category, remark, no_consumption_months, planned_release_time, planned_release_reason, active, status, released_at, release_reason - ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, 1, 'active', NULL, NULL) + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, 1, 'active', NULL, NULL) ON DUPLICATE KEY UPDATE login_name=VALUES(login_name), real_name=VALUES(real_name), @@ -271,6 +288,7 @@ export async function upsertCustomers(records) { continue; } await getPool().execute(sql, [ + config.sourceId, accountId, loginName, safeString(record.realName), @@ -309,7 +327,7 @@ export async function upsertCustomerDetails(records) { department = ?, payment_notice_status = COALESCE(?, payment_notice_status), updated_at = CURRENT_TIMESTAMP - WHERE account_id = ? + WHERE source_id = ? AND account_id = ? `; for (const record of records) { const accountId = safeString(record.accountId); @@ -324,6 +342,7 @@ export async function upsertCustomerDetails(records) { safeString(record.phone), safeString(record.department), safeString(record.paymentNoticeStatus), + config.sourceId, accountId, ]); } @@ -336,8 +355,8 @@ async function findCustomerAccountIdByName(customerName) { return null; } const [rows] = await getPool().execute( - 'SELECT account_id FROM aps_customer WHERE customer_name = ? OR real_name = ? OR login_name = ? LIMIT 1', - [normalized, normalized, normalized], + 'SELECT account_id FROM aps_customer WHERE source_id = ? AND (customer_name = ? OR real_name = ? OR login_name = ?) LIMIT 1', + [config.sourceId, normalized, normalized, normalized], ); return Array.isArray(rows) && rows.length > 0 ? safeString(rows[0].account_id) : null; } @@ -349,8 +368,8 @@ async function markCustomerReleased(customerName, reason, releasedAt) { } await ensureCustomerLifecycleColumns(); await getPool().execute( - 'UPDATE aps_customer SET active = 0, status = ?, released_at = ?, release_reason = ? WHERE account_id = ?', - ['released', releasedAt || new Date().toISOString().slice(0, 19).replace('T', ' '), safeString(reason), accountId], + 'UPDATE aps_customer SET active = 0, status = ?, released_at = ?, release_reason = ? WHERE source_id = ? AND account_id = ?', + ['released', releasedAt || new Date().toISOString().slice(0, 19).replace('T', ' '), safeString(reason), config.sourceId, accountId], ); return true; } @@ -362,8 +381,8 @@ async function markCustomerActive(customerName) { } await ensureCustomerLifecycleColumns(); await getPool().execute( - 'UPDATE aps_customer SET active = 1, status = ?, released_at = NULL, release_reason = NULL WHERE account_id = ?', - ['active', accountId], + 'UPDATE aps_customer SET active = 1, status = ?, released_at = NULL, release_reason = NULL WHERE source_id = ? AND account_id = ?', + ['active', config.sourceId, accountId], ); return true; } @@ -389,13 +408,14 @@ export async function upsertOrders(records) { if (!records?.length) { return { inserted: 0 }; } + await ensureSourceColumn('aps_order'); const customerMap = await getCustomerMap(); const sql = ` INSERT INTO aps_order ( - order_id, customer_account_id, customer_login_name, + source_id, order_id, customer_account_id, customer_login_name, customer_category, order_type, original_price_cny, paid_amount_cny, status, order_time, order_month - ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ON DUPLICATE KEY UPDATE customer_account_id=VALUES(customer_account_id), customer_login_name=VALUES(customer_login_name), @@ -415,6 +435,7 @@ export async function upsertOrders(records) { const customerLoginName = safeString(record.customerAccount) || ''; const accountId = resolveCustomerAccountId(customerMap, customerLoginName); await getPool().execute(sql, [ + config.sourceId, orderId, accountId, customerLoginName, @@ -434,14 +455,15 @@ export async function upsertOrderDetails(records) { if (!records?.length) { return { inserted: 0 }; } + await ensureSourceColumn('aps_order_detail'); const sql = ` INSERT INTO aps_order_detail ( - order_id, order_type, status, trade_type, customer_category, + source_id, order_id, order_type, status, trade_type, customer_category, dealer_name, dealer_uid, customer_type, opportunity_id, payment_time, order_time, product_name, product_code, original_price_cny, paid_amount_cny, discount, payable_amount_cny, coupon_amount_cny - ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ON DUPLICATE KEY UPDATE order_type=VALUES(order_type), status=VALUES(status), @@ -467,6 +489,7 @@ export async function upsertOrderDetails(records) { continue; } await getPool().execute(sql, [ + config.sourceId, orderId, safeString(record.orderType), safeString(record.status), @@ -494,10 +517,12 @@ export async function upsertBills(records) { if (!records?.length) { return { inserted: 0 }; } + await ensureSourceColumn('aps_bill'); const customerMap = await getCustomerMap(); const selectSql = ` SELECT id FROM aps_bill - WHERE billing_month = ? + WHERE source_id = ? + AND billing_month = ? AND commission_month = ? AND customer_login_name = ? AND consumption_time = ? @@ -507,7 +532,7 @@ export async function upsertBills(records) { `; const insertSql = ` INSERT INTO aps_bill ( - billing_month, customer_account_id, customer_login_name, + source_id, billing_month, customer_account_id, customer_login_name, bill_type, consumption_time, customer_category, product_name, product_category, @@ -517,7 +542,7 @@ export async function upsertBills(records) { rebated, invite_register_type, service_start_time, service_end_time - ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) `; const updateSql = ` UPDATE aps_bill SET @@ -546,6 +571,7 @@ export async function upsertBills(records) { const consumptionTime = safeString(record.consumeDate); const originalPrice = safeNumber(record.originalPriceCny); const [rows] = await getPool().execute(selectSql, [ + config.sourceId, billingMonth, commissionMonth, customerLoginName, @@ -572,6 +598,7 @@ export async function upsertBills(records) { } await getPool().execute(insertSql, [ + config.sourceId, billingMonth, accountId, customerLoginName, @@ -601,11 +628,11 @@ export async function upsertMessages(records) { await ensureMessagesTable(); const sql = ` INSERT INTO aliyun_aps_messages ( - msg_id, title, content, msg_type, from_app, biz_code, msg_channel, + source_id, msg_id, title, content, msg_type, from_app, biz_code, msg_channel, category_id, category_name, lv1_category_id, lv2_category_id, lv3_category_id, message_classification, customer_name, order_no, status, gmt_created, gmt_modified, extra_data - ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ON DUPLICATE KEY UPDATE title=VALUES(title), content=VALUES(content), @@ -633,6 +660,7 @@ export async function upsertMessages(records) { continue; } await getPool().execute(sql, [ + config.sourceId, msgId, safeString(record.title), safeString(record.content),