commit aa67b0e37e054aedce8840e8930c19a5bf2b0e00
Author: ray <1416431931@qq.com>
Date: Mon Apr 13 18:09:52 2026 +0800
项目初始化
diff --git a/.idea/.gitignore b/.idea/.gitignore
new file mode 100644
index 0000000..10b731c
--- /dev/null
+++ b/.idea/.gitignore
@@ -0,0 +1,5 @@
+# 默认忽略的文件
+/shelf/
+/workspace.xml
+# 基于编辑器的 HTTP 客户端请求
+/httpRequests/
diff --git a/.idea/misc.xml b/.idea/misc.xml
new file mode 100644
index 0000000..639900d
--- /dev/null
+++ b/.idea/misc.xml
@@ -0,0 +1,6 @@
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/modules.xml b/.idea/modules.xml
new file mode 100644
index 0000000..614b3c1
--- /dev/null
+++ b/.idea/modules.xml
@@ -0,0 +1,8 @@
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/python.iml b/.idea/python.iml
new file mode 100644
index 0000000..d6ebd48
--- /dev/null
+++ b/.idea/python.iml
@@ -0,0 +1,9 @@
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/vcs.xml b/.idea/vcs.xml
new file mode 100644
index 0000000..35eb1dd
--- /dev/null
+++ b/.idea/vcs.xml
@@ -0,0 +1,6 @@
+
+
+
+
+
+
\ No newline at end of file
diff --git a/aliyun-sync/aliyun-aps-sync/.env.example b/aliyun-sync/aliyun-aps-sync/.env.example
new file mode 100644
index 0000000..a693edf
--- /dev/null
+++ b/aliyun-sync/aliyun-aps-sync/.env.example
@@ -0,0 +1,6 @@
+ALIYUN_APS_BASE_URL=https://aps.aliyun.com
+ALIYUN_APS_HEADLESS=false
+ALIYUN_APS_TIMEZONE=Asia/Shanghai
+ALIYUN_APS_CRON=0 6 * * *
+ALIYUN_APS_ORDER_START_DATE=2024-01-01
+ALIYUN_APS_BILL_START_MONTH=2024-01
diff --git a/aliyun-sync/aliyun-aps-sync/.gitignore b/aliyun-sync/aliyun-aps-sync/.gitignore
new file mode 100644
index 0000000..d1b9ba5
--- /dev/null
+++ b/aliyun-sync/aliyun-aps-sync/.gitignore
@@ -0,0 +1,5 @@
+node_modules/
+.env
+.browser/
+data/
+downloads/
diff --git a/aliyun-sync/aliyun-aps-sync/README.md b/aliyun-sync/aliyun-aps-sync/README.md
new file mode 100644
index 0000000..aa96930
--- /dev/null
+++ b/aliyun-sync/aliyun-aps-sync/README.md
@@ -0,0 +1,57 @@
+# aliyun-aps-sync
+
+用于抓取阿里云伙伴中心里的 `我的客户`、`订单查询`、`账单查询`,并把结果和本地上一次同步结果做增量对比。
+
+## 功能
+
+- 首次 `login` 用浏览器手动登录,后续复用本地登录态。
+- `sync` 会同步 3 个模块的数据。
+- 同步后会生成:
+ - `data/current/*.json` 当前全量
+ - `data/history//*.json` 每次快照
+ - `data/delta//*.json` 增量变化
+ - `data/runs/*.json` 每次任务汇总
+- `schedule` 支持常驻进程方式按 cron 表达式每天自动同步。
+
+## 安装
+
+```bash
+cd /Users/qiangredhad/aliyun-aps-sync
+npm install
+cp .env.example .env
+```
+
+## 配置
+
+`.env` 里最重要的两个时间范围:
+
+- `ALIYUN_APS_ORDER_START_DATE`: 订单查询的起始日期,会按月滚动抓取直到今天。
+- `ALIYUN_APS_BILL_START_MONTH`: 账单查询的起始佣金月份,会按月滚动抓取直到当前月。
+
+## 使用
+
+1. 首次登录并保存会话
+
+```bash
+npm run login
+```
+
+2. 手动执行一次同步
+
+```bash
+npm run sync
+```
+
+3. 常驻定时同步
+
+```bash
+npm run schedule
+```
+
+默认 cron 是每天早上 6 点,可在 `.env` 里改 `ALIYUN_APS_CRON`。
+
+## 注意
+
+- 脚本现在基于页面表格 DOM 抓取,如果阿里云伙伴中心页面结构改版,需要调整 `src/sync.js` 里的表格和筛选器选择逻辑。
+- 订单和账单的日期输入框是通过页面已有日期值自动识别的,所以首次跑之前建议先在页面确认默认筛选存在。
+- 如果登录态过期,重新执行 `npm run login` 即可。
diff --git a/aliyun-sync/aliyun-aps-sync/package-lock.json b/aliyun-sync/aliyun-aps-sync/package-lock.json
new file mode 100644
index 0000000..f02444a
--- /dev/null
+++ b/aliyun-sync/aliyun-aps-sync/package-lock.json
@@ -0,0 +1,92 @@
+{
+ "name": "aliyun-aps-sync",
+ "version": "1.0.0",
+ "lockfileVersion": 3,
+ "requires": true,
+ "packages": {
+ "": {
+ "name": "aliyun-aps-sync",
+ "version": "1.0.0",
+ "dependencies": {
+ "dotenv": "^16.6.1",
+ "node-cron": "^4.2.1",
+ "nodemailer": "^6.10.1",
+ "playwright": "^1.58.2"
+ }
+ },
+ "node_modules/dotenv": {
+ "version": "16.6.1",
+ "resolved": "https://registry.npmmirror.com/dotenv/-/dotenv-16.6.1.tgz",
+ "integrity": "sha512-uBq4egWHTcTt33a72vpSG0z3HnPuIl6NqYcTrKEg2azoEyl2hpW0zqlxysq2pK9HlDIHyHyakeYaYnSAwd8bow==",
+ "license": "BSD-2-Clause",
+ "engines": {
+ "node": ">=12"
+ },
+ "funding": {
+ "url": "https://dotenvx.com"
+ }
+ },
+ "node_modules/fsevents": {
+ "version": "2.3.2",
+ "resolved": "https://registry.npmmirror.com/fsevents/-/fsevents-2.3.2.tgz",
+ "integrity": "sha512-xiqMQR4xAeHTuB9uWm+fFRcIOgKBMiOBP+eXiyT7jsgVCq1bkVygt00oASowB7EdtpOHaaPgKt812P9ab+DDKA==",
+ "hasInstallScript": true,
+ "license": "MIT",
+ "optional": true,
+ "os": [
+ "darwin"
+ ],
+ "engines": {
+ "node": "^8.16.0 || ^10.6.0 || >=11.0.0"
+ }
+ },
+ "node_modules/node-cron": {
+ "version": "4.2.1",
+ "resolved": "https://registry.npmmirror.com/node-cron/-/node-cron-4.2.1.tgz",
+ "integrity": "sha512-lgimEHPE/QDgFlywTd8yTR61ptugX3Qer29efeyWw2rv259HtGBNn1vZVmp8lB9uo9wC0t/AT4iGqXxia+CJFg==",
+ "license": "ISC",
+ "engines": {
+ "node": ">=6.0.0"
+ }
+ },
+ "node_modules/nodemailer": {
+ "version": "6.10.1",
+ "resolved": "https://registry.npmjs.org/nodemailer/-/nodemailer-6.10.1.tgz",
+ "integrity": "sha512-Z+iLaBGVaSjbIzQ4pX6XV41HrooLsQ10ZWPUehGmuantvzWoDVBnmsdUcOIDM1t+yPor5pDhVlDESgOMEGxhHA==",
+ "license": "MIT-0",
+ "engines": {
+ "node": ">=6.0.0"
+ }
+ },
+ "node_modules/playwright": {
+ "version": "1.58.2",
+ "resolved": "https://registry.npmmirror.com/playwright/-/playwright-1.58.2.tgz",
+ "integrity": "sha512-vA30H8Nvkq/cPBnNw4Q8TWz1EJyqgpuinBcHET0YVJVFldr8JDNiU9LaWAE1KqSkRYazuaBhTpB5ZzShOezQ6A==",
+ "license": "Apache-2.0",
+ "dependencies": {
+ "playwright-core": "1.58.2"
+ },
+ "bin": {
+ "playwright": "cli.js"
+ },
+ "engines": {
+ "node": ">=18"
+ },
+ "optionalDependencies": {
+ "fsevents": "2.3.2"
+ }
+ },
+ "node_modules/playwright-core": {
+ "version": "1.58.2",
+ "resolved": "https://registry.npmmirror.com/playwright-core/-/playwright-core-1.58.2.tgz",
+ "integrity": "sha512-yZkEtftgwS8CsfYo7nm0KE8jsvm6i/PTgVtB8DL726wNf6H2IMsDuxCpJj59KDaxCtSnrWan2AeDqM7JBaultg==",
+ "license": "Apache-2.0",
+ "bin": {
+ "playwright-core": "cli.js"
+ },
+ "engines": {
+ "node": ">=18"
+ }
+ }
+ }
+}
diff --git a/aliyun-sync/aliyun-aps-sync/package.json b/aliyun-sync/aliyun-aps-sync/package.json
new file mode 100644
index 0000000..3b3f5a8
--- /dev/null
+++ b/aliyun-sync/aliyun-aps-sync/package.json
@@ -0,0 +1,18 @@
+{
+ "name": "aliyun-aps-sync",
+ "version": "1.0.0",
+ "private": true,
+ "type": "module",
+ "scripts": {
+ "login": "node src/index.js login",
+ "sync": "node src/index.js sync",
+ "bills": "node src/index.js bills",
+ "schedule": "node src/index.js schedule"
+ },
+ "dependencies": {
+ "dotenv": "^16.6.1",
+ "nodemailer": "^6.10.1",
+ "node-cron": "^4.2.1",
+ "playwright": "^1.58.2"
+ }
+}
diff --git a/aliyun-sync/aliyun-aps-sync/src/config.js b/aliyun-sync/aliyun-aps-sync/src/config.js
new file mode 100644
index 0000000..2d642e7
--- /dev/null
+++ b/aliyun-sync/aliyun-aps-sync/src/config.js
@@ -0,0 +1,188 @@
+import fs from 'node:fs';
+import path from 'node:path';
+import process from 'node:process';
+import dotenv from 'dotenv';
+
+const rootDir = path.resolve(process.cwd());
+dotenv.config({ path: path.join(rootDir, '.env') });
+
+const toBool = (value, fallback) => {
+ if (value == null) return fallback;
+ return ['1', 'true', 'yes', 'y', 'on'].includes(String(value).trim().toLowerCase());
+};
+
+const ensureDir = (dirPath) => {
+ fs.mkdirSync(dirPath, { recursive: true });
+ return dirPath;
+};
+
+export const config = {
+ rootDir,
+ baseUrl: process.env.ALIYUN_APS_BASE_URL || 'https://aps.aliyun.com',
+ headless: toBool(process.env.ALIYUN_APS_HEADLESS, false),
+ timezone: process.env.ALIYUN_APS_TIMEZONE || 'Asia/Shanghai',
+ cron: process.env.ALIYUN_APS_CRON || '0 6 * * *',
+ orderStartDate: process.env.ALIYUN_APS_ORDER_START_DATE || '2024-01-01',
+ billStartMonth: process.env.ALIYUN_APS_BILL_START_MONTH || '2024-01',
+ smtp: {
+ host: process.env.ALIYUN_APS_SMTP_HOST || 'smtp.qq.com',
+ port: parseInt(process.env.ALIYUN_APS_SMTP_PORT || '465', 10),
+ secure: toBool(process.env.ALIYUN_APS_SMTP_SECURE, true),
+ user: process.env.ALIYUN_APS_SMTP_USER || '',
+ pass: process.env.ALIYUN_APS_SMTP_PASS || '',
+ },
+ notifyEmail: process.env.ALIYUN_APS_NOTIFY_EMAIL || '',
+ closeBrowser: toBool(process.env.ALIYUN_APS_CLOSE_BROWSER, true),
+ fullSync: toBool(process.env.ALIYUN_APS_FULL_SYNC, true),
+ resumeBillMonth: process.env.ALIYUN_APS_RESUME_BILL_MONTH || '',
+ resumeBillPage: Math.max(1, Number.parseInt(process.env.ALIYUN_APS_RESUME_BILL_PAGE || '1', 10) || 1),
+ dbSyncScript: process.env.ALIYUN_APS_DB_SYNC_SCRIPT || '../aps-aliyun-sync/aps_db_sync.py',
+ userDataDir: ensureDir(path.join(rootDir, '.browser')),
+ dataDir: ensureDir(path.join(rootDir, 'data')),
+ downloadDir: ensureDir(path.join(rootDir, 'downloads')),
+};
+
+export const datasets = {
+ customers: {
+ name: 'customers',
+ url: `${config.baseUrl}/#/detail/my_customer/~/customer/list`,
+ heading: '我的客户',
+ pageSize: 100,
+ uniqueKey: (record) => record.accountId || record.loginName || record.__hash,
+ normalize: (record) => {
+ const loginAndUid = record['登录名称/账号ID'] || '';
+ const [loginName = '', accountId = ''] = splitLines(loginAndUid);
+ return {
+ loginName: loginName.replace(/\s+/g, ''),
+ accountId,
+ realName: record['UID实名认证名称'] || '',
+ reportSource: record['报备来源'] || '',
+ reportType: record['报备类型'] || '',
+ tradeMode: record['交易模式'] || '',
+ authStatus: record['实名认证状态'] || '',
+ relationTime: record['关联日期'] || '',
+ owner: record['跟进员工'] || '',
+ cashBalanceCny: record['账户现金余额(CNY)'] || '',
+ invoicePendingCny: record['待开票金额(CNY)'] || '',
+ lastMonthConsumptionCny: record['上月消费金额(CNY)'] || '',
+ thisMonthConsumptionCny: record['本月消费金额(CNY)'] || '',
+ paymentNoticeStatus: record['代为支付告知状态'] || '',
+ inviteType: record['邀约注册类型'] || '',
+ isNewCustomer: record['是否新客户'] || '',
+ isPerformanceQualified: record['是否达成业绩起算点'] || '',
+ customerCategory: record['客户分类'] || '',
+ remark: record['备注'] || '',
+ inactiveMonths: record['客户无消费月数'] || '',
+ releasePlanTime: record['计划释放时间'] || '',
+ releasePlanReason: record['计划释放原因'] || '',
+ };
+ },
+ },
+ orders: {
+ name: 'orders',
+ url: `${config.baseUrl}/#/detail/order/~/costCenter/order`,
+ heading: '订单查询',
+ pageSize: 100,
+ uniqueKey: (record) => record.orderId || record.__hash,
+ normalize: (record, context) => ({
+ orderId: record['订单号'] || '',
+ customerAccount: (record['客户账号'] || '').replace(/\s+/g, ''),
+ customerCategory: record['客户分类'] || '',
+ orderType: record['订单类型'] || '',
+ orderOriginalPriceCny: record['订单原价 (CNY)'] || '',
+ actualPaidCny: record['实付金额 (CNY)'] || '',
+ orderStatus: record['订单状态'] || '',
+ createdAt: record['下单时间'] || '',
+ windowStart: context.windowStart || '',
+ windowEnd: context.windowEnd || '',
+ }),
+ },
+ orderDetails: {
+ name: 'orderDetails',
+ url: `${config.baseUrl}/#/detail/order/~/costCenter/order`,
+ heading: '订单详情',
+ pageSize: 100,
+ uniqueKey: (record) => record.orderId || record.__hash,
+ normalize: (record, context) => ({
+ orderId: record.orderId || '',
+ orderType: record.orderType || '',
+ status: record.status || '',
+ tradeType: record.tradeType || '',
+ customerCategory: record.customerCategory || '',
+ dealerName: record.dealerName || '',
+ dealerUid: record.dealerUid || '',
+ customerType: record.customerType || '',
+ opportunityId: record.opportunityId || '',
+ paymentTime: record.paymentTime || '',
+ orderTime: record.orderTime || '',
+ productName: record.productName || '',
+ productCode: record.productCode || '',
+ originalPriceCny: record.originalPriceCny || '',
+ paidAmountCny: record.paidAmountCny || '',
+ discount: record.discount || '',
+ payableAmountCny: record.payableAmountCny || '',
+ couponAmountCny: record.couponAmountCny || '',
+ windowStart: context.windowStart || '',
+ windowEnd: context.windowEnd || '',
+ }),
+ },
+ customerDetails: {
+ name: 'customerDetails',
+ url: `${config.baseUrl}/#/detail/my_customer/~/customer/list`,
+ heading: '详情',
+ pageSize: 100,
+ uniqueKey: (record) => record.accountId || record.__hash,
+ normalize: (record, context) => ({
+ accountId: context.accountId || '',
+ customerAccount: record.customerAccount || '',
+ customerName: record.customerName || '',
+ customerType: record.customerType || '',
+ tradeMode: record.tradeMode || '',
+ customerSource: record.customerSource || '',
+ realNameStatus: record.realNameStatus || '',
+ email: record.email || '',
+ relationDate: record.relationDate || '',
+ phone: record.phone || '',
+ remark: record.remark || '',
+ paymentNoticeStatus: record.paymentNoticeStatus || '',
+ department: record.department || '',
+ lastMonthPayableTotalCny: record.lastMonthPayableTotalCny || '',
+ lastMonthPrepayCny: record.lastMonthPrepayCny || '',
+ lastMonthPostpayCny: record.lastMonthPostpayCny || '',
+ currentMonthPayableTotalCny: record.currentMonthPayableTotalCny || '',
+ currentMonthPrepayCny: record.currentMonthPrepayCny || '',
+ currentMonthPostpayCny: record.currentMonthPostpayCny || '',
+ }),
+ },
+ bills: {
+ name: 'bills',
+ url: `${config.baseUrl}/#/detail/bill/~/costCenter/bill`,
+ heading: '账单查询',
+ pageSize: 100,
+ uniqueKey: (record) => record.__hash,
+ normalize: (record, context) => ({
+ billingMonth: record['账期'] || '',
+ consumeDate: record['消费时间'] || '',
+ customerAccount: (record['客户账号'] || '').replace(/\s+/g, ''),
+ customerCategory: record['客户分类'] || '',
+ productCategory: record['产品分类'] || '',
+ productName: record['产品名称'] || '',
+ originalPriceCny: record['原价 (CNY)'] || '',
+ customerPayableCny: record['客户应付金额 (CNY)'] || '',
+ billType: record['账单类型'] || '',
+ countsForPerformance: record['是否计入业绩'] || '',
+ commissionable: record['是否返佣'] || '',
+ commissionMonth: record['佣金月份'] || context.month || '',
+ inviteType: record['邀约注册类型'] || '',
+ serviceStartAt: record['服务开始时间'] || '',
+ serviceEndAt: record['服务结束时间'] || '',
+ }),
+ },
+};
+
+function splitLines(value) {
+ return String(value)
+ .split('\n')
+ .map((part) => part.trim())
+ .filter(Boolean);
+}
diff --git a/aliyun-sync/aliyun-aps-sync/src/debug-inputs.js b/aliyun-sync/aliyun-aps-sync/src/debug-inputs.js
new file mode 100644
index 0000000..dd33259
--- /dev/null
+++ b/aliyun-sync/aliyun-aps-sync/src/debug-inputs.js
@@ -0,0 +1,91 @@
+import { chromium } from 'playwright';
+import { config, datasets } from './config.js';
+
+const sleep = (ms) => new Promise((r) => setTimeout(r, ms));
+
+const target = process.argv[2] || 'orders';
+const dataset = datasets[target];
+
+console.log(`正在打开: ${dataset.heading}`);
+
+const context = await chromium.launchPersistentContext(config.userDataDir, {
+ channel: 'chrome',
+ headless: false,
+ acceptDownloads: true,
+ downloadsPath: config.downloadDir,
+});
+
+const page = context.pages()[0] || (await context.newPage());
+await page.goto(dataset.url, { waitUntil: 'domcontentloaded' });
+
+// 等页面加载完
+await page.waitForFunction(
+ (text) => document.body && document.body.innerText.includes(text),
+ dataset.heading,
+ { timeout: 60000 },
+);
+await sleep(3000);
+
+// 探测所有 input 元素的详细信息
+const inputInfo = await page.evaluate(() => {
+ const inputs = document.querySelectorAll('input');
+ return Array.from(inputs).map((el, i) => {
+ // 找到最近的日期选择器父组件
+ const pickerParent = el.closest(
+ '[class*="date-picker"], [class*="month-picker"], [class*="range-picker"], [class*="calendar"]'
+ );
+ return {
+ index: i,
+ value: el.value,
+ placeholder: el.placeholder,
+ type: el.type,
+ readOnly: el.readOnly,
+ className: el.className.substring(0, 120),
+ parentClass: el.parentElement?.className?.substring(0, 120) || '',
+ pickerClass: pickerParent?.className?.substring(0, 150) || '(无)',
+ ariaLabel: el.getAttribute('aria-label') || '',
+ role: el.getAttribute('role') || '',
+ };
+ });
+});
+
+console.log('\n=== 页面所有 input 元素 ===');
+for (const info of inputInfo) {
+ console.log(`\n[input ${info.index}]`);
+ console.log(` value: "${info.value}"`);
+ console.log(` placeholder: "${info.placeholder}"`);
+ console.log(` type: ${info.type}`);
+ console.log(` readOnly: ${info.readOnly}`);
+ console.log(` className: ${info.className}`);
+ console.log(` parentClass: ${info.parentClass}`);
+ console.log(` pickerClass: ${info.pickerClass}`);
+ console.log(` ariaLabel: "${info.ariaLabel}"`);
+}
+
+// 探测 RangePicker 结构
+const rangeInfo = await page.evaluate(() => {
+ const rangePickers = document.querySelectorAll('[class*="range-picker"], [class*="date-picker2"]');
+ return Array.from(rangePickers).map((el, i) => ({
+ index: i,
+ className: el.className.substring(0, 200),
+ inputCount: el.querySelectorAll('input').length,
+ inputs: Array.from(el.querySelectorAll('input')).map(inp => ({
+ value: inp.value,
+ placeholder: inp.placeholder,
+ readOnly: inp.readOnly,
+ })),
+ }));
+});
+
+console.log('\n=== RangePicker / DatePicker2 组件 ===');
+for (const rp of rangeInfo) {
+ console.log(`\n[picker ${rp.index}]`);
+ console.log(` className: ${rp.className}`);
+ console.log(` inputs (${rp.inputCount}):`);
+ for (const inp of rp.inputs) {
+ console.log(` value="${inp.value}" placeholder="${inp.placeholder}" readOnly=${inp.readOnly}`);
+ }
+}
+
+await context.close();
+process.exit(0);
diff --git a/aliyun-sync/aliyun-aps-sync/src/debug-pagination.js b/aliyun-sync/aliyun-aps-sync/src/debug-pagination.js
new file mode 100644
index 0000000..f9ca9a4
--- /dev/null
+++ b/aliyun-sync/aliyun-aps-sync/src/debug-pagination.js
@@ -0,0 +1,135 @@
+import { chromium } from 'playwright';
+import { config, datasets } from './config.js';
+
+const sleep = (ms) => new Promise((r) => setTimeout(r, ms));
+
+console.log('打开订单页面,设置2024-11月,探测分页结构...');
+
+const context = await chromium.launchPersistentContext(config.userDataDir, {
+ channel: 'chrome',
+ headless: false,
+ acceptDownloads: true,
+ downloadsPath: config.downloadDir,
+});
+
+const page = context.pages()[0] || (await context.newPage());
+await page.goto(datasets.orders.url, { waitUntil: 'domcontentloaded' });
+
+await page.waitForFunction(
+ (text) => document.body && document.body.innerText.includes(text),
+ datasets.orders.heading,
+ { timeout: 60000 },
+);
+await sleep(3000);
+
+// 设置日期到2024-11月(数据多的月份)
+const trigger = page.locator('input[placeholder="结束日期"]');
+await trigger.click();
+await sleep(1000);
+const panelEndInput = page.locator('.next-range-picker-panel-input-end-date input');
+await panelEndInput.click();
+await sleep(100);
+await page.keyboard.press('Control+A');
+await page.keyboard.type('2024-11-30', { delay: 30 });
+await sleep(300);
+const panelStartInput = page.locator('.next-range-picker-panel-input-start-date input');
+await panelStartInput.click();
+await sleep(100);
+await page.keyboard.press('Control+A');
+await page.keyboard.type('2024-11-01', { delay: 30 });
+await sleep(300);
+await page.keyboard.press('Enter');
+await sleep(500);
+await page.mouse.click(0, 0);
+await sleep(300);
+await page.keyboard.press('Escape');
+await sleep(1000);
+
+// 点查询
+const queryBtn = page.locator('button:has-text("查询")').first();
+await queryBtn.click();
+await sleep(3000);
+
+// 探测分页组件
+const paginationInfo = await page.evaluate(() => {
+ const result = {
+ // 找所有 pagination 相关的元素
+ paginationContainers: [],
+ allButtons: [],
+ nextItems: [],
+ };
+
+ // 所有 pagination 容器
+ const containers = document.querySelectorAll('[class*="pagination"]');
+ containers.forEach((el, i) => {
+ result.paginationContainers.push({
+ index: i,
+ tag: el.tagName,
+ className: el.className.substring(0, 200),
+ childCount: el.children.length,
+ innerHTML: el.innerHTML.substring(0, 500),
+ });
+ });
+
+ // 所有带 next-btn 类名的按钮(在 pagination 内)
+ const btns = document.querySelectorAll('[class*="pagination"] button, [class*="pagination"] [role="button"], [class*="pagination"] .next-btn');
+ btns.forEach((el, i) => {
+ result.allButtons.push({
+ index: i,
+ tag: el.tagName,
+ text: el.innerText?.trim()?.substring(0, 50) || '',
+ className: el.className.substring(0, 200),
+ disabled: el.hasAttribute('disabled') || el.getAttribute('aria-disabled') === 'true',
+ ariaLabel: el.getAttribute('aria-label') || '',
+ });
+ });
+
+ // 找 next-next 类
+ const nextNextEls = document.querySelectorAll('.next-next, [class*="next-next"]');
+ nextNextEls.forEach((el, i) => {
+ result.nextItems.push({
+ index: i,
+ tag: el.tagName,
+ text: el.innerText?.trim()?.substring(0, 50) || '',
+ className: el.className.substring(0, 200),
+ disabled: el.hasAttribute('disabled'),
+ parentClass: el.parentElement?.className?.substring(0, 100) || '',
+ });
+ });
+
+ return result;
+});
+
+console.log('\n=== Pagination 容器 ===');
+for (const c of paginationInfo.paginationContainers) {
+ console.log(`\n[容器 ${c.index}] <${c.tag}> children=${c.childCount}`);
+ console.log(` class: ${c.className}`);
+ console.log(` html: ${c.innerHTML.substring(0, 300)}`);
+}
+
+console.log('\n=== Pagination 内的按钮 ===');
+for (const b of paginationInfo.allButtons) {
+ console.log(`[btn ${b.index}] <${b.tag}> text="${b.text}" disabled=${b.disabled} aria="${b.ariaLabel}" class="${b.className}"`);
+}
+
+console.log('\n=== next-next 元素 ===');
+for (const n of paginationInfo.nextItems) {
+ console.log(`[next ${n.index}] <${n.tag}> text="${n.text}" disabled=${n.disabled} class="${n.className}" parent="${n.parentClass}"`);
+}
+
+// 再看看当前页码
+const currentPage = await page.evaluate(() => {
+ const active = document.querySelector('.next-pagination-item.current, .next-pagination-list .current');
+ return active ? { text: active.innerText, class: active.className } : null;
+});
+console.log('\n当前页码:', currentPage);
+
+// 看看总共多少条记录的提示
+const totalInfo = await page.evaluate(() => {
+ const el = document.querySelector('[class*="pagination"]');
+ return el ? el.innerText : '(无)';
+});
+console.log('分页文本:', totalInfo);
+
+await context.close();
+process.exit(0);
diff --git a/aliyun-sync/aliyun-aps-sync/src/debug-panel.js b/aliyun-sync/aliyun-aps-sync/src/debug-panel.js
new file mode 100644
index 0000000..f048698
--- /dev/null
+++ b/aliyun-sync/aliyun-aps-sync/src/debug-panel.js
@@ -0,0 +1,91 @@
+import { chromium } from 'playwright';
+import { config, datasets } from './config.js';
+
+const sleep = (ms) => new Promise((r) => setTimeout(r, ms));
+
+console.log('打开订单查询页面并探测 RangePicker 面板...');
+
+const context = await chromium.launchPersistentContext(config.userDataDir, {
+ channel: 'chrome',
+ headless: false,
+ acceptDownloads: true,
+ downloadsPath: config.downloadDir,
+});
+
+const page = context.pages()[0] || (await context.newPage());
+await page.goto(datasets.orders.url, { waitUntil: 'domcontentloaded' });
+
+await page.waitForFunction(
+ (text) => document.body && document.body.innerText.includes(text),
+ datasets.orders.heading,
+ { timeout: 60000 },
+);
+await sleep(3000);
+
+// 点击起始日期 trigger 打开面板
+console.log('--- 点击起始日期触发器 ---');
+const startTrigger = page.locator('input[placeholder="起始日期"]');
+await startTrigger.click();
+await sleep(2000);
+
+// 探测弹出面板里的 input
+const panelInfo = await page.evaluate(() => {
+ // 查找面板(弹出层里的日期输入)
+ const allInputs = document.querySelectorAll('input');
+ const results = [];
+ for (let i = 0; i < allInputs.length; i++) {
+ const el = allInputs[i];
+ // 找 panel 里的 input(不是 trigger 里的)
+ const inPanel = el.closest('.next-range-picker-panel, .next-date-picker-panel, .next-range-picker-body, [class*="overlay"], [class*="popup"]');
+ results.push({
+ index: i,
+ value: el.value,
+ placeholder: el.placeholder,
+ readOnly: el.readOnly,
+ parentClass: el.parentElement?.className?.substring(0, 150) || '',
+ inPanel: !!inPanel,
+ panelClass: inPanel?.className?.substring(0, 150) || '(不在面板内)',
+ });
+ }
+ return results;
+});
+
+console.log('\n=== 面板打开后所有 input ===');
+for (const info of panelInfo) {
+ const marker = info.inPanel ? '📌 面板内' : ' 普通';
+ console.log(`${marker} [input ${info.index}] value="${info.value}" placeholder="${info.placeholder}" readOnly=${info.readOnly}`);
+ if (info.inPanel) {
+ console.log(` parentClass: ${info.parentClass}`);
+ console.log(` panelClass: ${info.panelClass}`);
+ }
+}
+
+// 再看看有没有 panel 级别的面板结构
+const panelStructure = await page.evaluate(() => {
+ const overlays = document.querySelectorAll('.next-overlay-wrapper, .next-overlay-inner, [class*="range-picker-panel"], [class*="range-picker-body"]');
+ return Array.from(overlays).map((el, i) => ({
+ index: i,
+ tag: el.tagName,
+ className: el.className.substring(0, 200),
+ inputCount: el.querySelectorAll('input').length,
+ inputs: Array.from(el.querySelectorAll('input')).map(inp => ({
+ value: inp.value,
+ placeholder: inp.placeholder,
+ readOnly: inp.readOnly,
+ parentClass: inp.parentElement?.className?.substring(0, 120) || '',
+ })),
+ }));
+});
+
+console.log('\n=== 弹出面板结构 ===');
+for (const p of panelStructure) {
+ console.log(`\n[overlay ${p.index}] <${p.tag}>`);
+ console.log(` className: ${p.className}`);
+ console.log(` inputs (${p.inputCount}):`);
+ for (const inp of p.inputs) {
+ console.log(` value="${inp.value}" placeholder="${inp.placeholder}" readOnly=${inp.readOnly} parent="${inp.parentClass}"`);
+ }
+}
+
+await context.close();
+process.exit(0);
diff --git a/aliyun-sync/aliyun-aps-sync/src/index.js b/aliyun-sync/aliyun-aps-sync/src/index.js
new file mode 100644
index 0000000..f98193d
--- /dev/null
+++ b/aliyun-sync/aliyun-aps-sync/src/index.js
@@ -0,0 +1,27 @@
+import { login, scheduleSync, syncAll, syncBillsOnly } from './sync.js';
+
+const command = process.argv[2] || 'sync';
+
+if (command === 'login') {
+ await login();
+ process.exit(0);
+}
+
+if (command === 'sync') {
+ const summary = await syncAll();
+ console.log(JSON.stringify(summary, null, 2));
+ process.exit(0);
+}
+
+if (command === 'bills') {
+ const summary = await syncBillsOnly();
+ console.log(JSON.stringify(summary, null, 2));
+ process.exit(0);
+}
+
+if (command === 'schedule') {
+ await scheduleSync();
+} else {
+ console.error(`不支持的命令: ${command}`);
+ process.exit(1);
+}
diff --git a/aliyun-sync/aliyun-aps-sync/src/notify.js b/aliyun-sync/aliyun-aps-sync/src/notify.js
new file mode 100644
index 0000000..79a1a37
--- /dev/null
+++ b/aliyun-sync/aliyun-aps-sync/src/notify.js
@@ -0,0 +1,56 @@
+import nodemailer from 'nodemailer';
+import { config } from './config.js';
+
+let lastSentAt = 0;
+const ONE_HOUR_MS = 60 * 60 * 1000;
+
+export async function sendLoginAlert() {
+ const now = Date.now();
+ if (now - lastSentAt < ONE_HOUR_MS) {
+ console.log('[通知] 登录提醒1小时内已发送过,跳过重复发送');
+ return;
+ }
+
+ const { smtp, notifyEmail, baseUrl } = config;
+
+ if (!smtp.user || !smtp.pass) {
+ console.warn('[通知] SMTP 用户名或密码为空,跳过发送登录提醒邮件');
+ return;
+ }
+
+ if (!notifyEmail) {
+ console.warn('[通知] 未配置 ALIYUN_APS_NOTIFY_EMAIL,跳过发送登录提醒邮件');
+ return;
+ }
+
+ const transporter = nodemailer.createTransport({
+ host: smtp.host,
+ port: smtp.port,
+ secure: smtp.secure,
+ auth: {
+ user: smtp.user,
+ pass: smtp.pass,
+ },
+ });
+
+ const url = `${baseUrl}/#/signin`;
+ const timestamp = new Date().toISOString();
+ const subject = '[APS同步] 登录态已过期,请手动登录';
+ const text = [
+ '检测到 APS 页面处于登录页,登录态可能已过期。',
+ `时间: ${timestamp}`,
+ `地址: ${url}`,
+ '',
+ '请尽快手动执行登录流程(npm run login)以恢复同步。',
+ ].join('\n');
+
+ await transporter.sendMail({
+ from: smtp.user,
+ to: notifyEmail,
+ subject,
+ text,
+ });
+
+ lastSentAt = now;
+ console.log(`[通知] 登录提醒邮件已发送至 ${notifyEmail}`);
+}
diff --git a/aliyun-sync/aliyun-aps-sync/src/open-page.js b/aliyun-sync/aliyun-aps-sync/src/open-page.js
new file mode 100644
index 0000000..5c3e532
--- /dev/null
+++ b/aliyun-sync/aliyun-aps-sync/src/open-page.js
@@ -0,0 +1,25 @@
+import { chromium } from 'playwright';
+import { config, datasets } from './config.js';
+
+const target = process.argv[2] || 'orders'; // orders | bills | customers
+const dataset = datasets[target];
+if (!dataset) {
+ console.error(`未知页面: ${target},可选: orders, bills, customers`);
+ process.exit(1);
+}
+
+console.log(`正在打开: ${dataset.heading} (${dataset.url})`);
+
+const context = await chromium.launchPersistentContext(config.userDataDir, {
+ channel: 'chrome',
+ headless: false,
+ acceptDownloads: true,
+ downloadsPath: config.downloadDir,
+});
+
+const page = context.pages()[0] || (await context.newPage());
+await page.goto(dataset.url, { waitUntil: 'domcontentloaded' });
+console.log('页面已打开,浏览器保持运行。按 Ctrl+C 关闭。');
+
+// 保持进程运行
+await new Promise(() => {});
diff --git a/aliyun-sync/aliyun-aps-sync/src/storage.js b/aliyun-sync/aliyun-aps-sync/src/storage.js
new file mode 100644
index 0000000..5cc1855
--- /dev/null
+++ b/aliyun-sync/aliyun-aps-sync/src/storage.js
@@ -0,0 +1,80 @@
+import crypto from 'node:crypto';
+import fs from 'node:fs';
+import path from 'node:path';
+import { config } from './config.js';
+
+const ensureDir = (dirPath) => {
+ fs.mkdirSync(dirPath, { recursive: true });
+ return dirPath;
+};
+
+const readJson = (filePath, fallback) => {
+ if (!fs.existsSync(filePath)) return fallback;
+ return JSON.parse(fs.readFileSync(filePath, 'utf8'));
+};
+
+const writeJson = (filePath, value) => {
+ ensureDir(path.dirname(filePath));
+ fs.writeFileSync(filePath, JSON.stringify(value, null, 2));
+};
+
+export const nowStamp = () => new Date().toISOString().replace(/[:.]/g, '-');
+
+export const withHash = (record) => ({
+ ...record,
+ __hash: crypto.createHash('sha256').update(JSON.stringify(record)).digest('hex'),
+});
+
+export const loadCurrentState = (dataset) =>
+ readJson(path.join(config.dataDir, 'current', `${dataset}.json`), { records: [], index: {} });
+
+export function saveDatasetRun(dataset, payload) {
+ const stamp = nowStamp();
+ writeJson(path.join(config.dataDir, 'history', dataset, `${stamp}.json`), payload);
+ writeJson(path.join(config.dataDir, 'current', `${dataset}.json`), payload);
+ return stamp;
+}
+
+export function saveDelta(dataset, stamp, delta) {
+ writeJson(path.join(config.dataDir, 'delta', dataset, `${stamp}.json`), delta);
+}
+
+export function saveRunSummary(stamp, summary) {
+ writeJson(path.join(config.dataDir, 'runs', `${stamp}.json`), summary);
+}
+
+export function diffRecords(previousState, records, uniqueKey) {
+ const previousIndex = previousState.index || {};
+ const nextIndex = {};
+ const added = [];
+ const updated = [];
+
+ for (const record of records) {
+ const key = uniqueKey(record);
+ if (!key) continue;
+ nextIndex[key] = record;
+ if (!previousIndex[key]) {
+ added.push(record);
+ continue;
+ }
+ if (previousIndex[key].__hash !== record.__hash) {
+ updated.push({ before: previousIndex[key], after: record });
+ }
+ }
+
+ const removed = Object.keys(previousIndex)
+ .filter((key) => !nextIndex[key])
+ .map((key) => previousIndex[key]);
+
+ return {
+ records,
+ index: nextIndex,
+ stats: {
+ total: records.length,
+ added: added.length,
+ updated: updated.length,
+ removed: removed.length,
+ },
+ delta: { added, updated, removed },
+ };
+}
diff --git a/aliyun-sync/aliyun-aps-sync/src/sync.js b/aliyun-sync/aliyun-aps-sync/src/sync.js
new file mode 100644
index 0000000..5a82c29
--- /dev/null
+++ b/aliyun-sync/aliyun-aps-sync/src/sync.js
@@ -0,0 +1,824 @@
+import { chromium } from 'playwright';
+import cron from 'node-cron';
+import path from 'node:path';
+import { execSync } from 'node:child_process';
+import { config, datasets } from './config.js';
+import { sendLoginAlert } from './notify.js';
+import {
+ diffRecords,
+ loadCurrentState,
+ nowStamp,
+ saveDatasetRun,
+ saveDelta,
+ saveRunSummary,
+ withHash,
+} from './storage.js';
+
+const sleep = (ms) => new Promise((resolve) => setTimeout(resolve, ms));
+
+let _context = null;
+
+async function getContext() {
+ if (_context) return _context;
+ _context = await chromium.launchPersistentContext(config.userDataDir, {
+ channel: 'chrome',
+ headless: config.headless,
+ acceptDownloads: true,
+ downloadsPath: config.downloadDir,
+ });
+ return _context;
+}
+
+export async function login() {
+ const context = await getContext();
+
+ const page = context.pages()[0] || (await context.newPage());
+ await page.goto(datasets.customers.url, { waitUntil: 'domcontentloaded' });
+ console.log('请在打开的浏览器里完成阿里云伙伴中心登录,然后回到终端按 Ctrl+C 结束。');
+ await waitUntilReady(page, datasets.customers.heading, 10 * 60 * 1000);
+ console.log('登录态已写入 .browser 目录,后续可直接执行 npm run sync。');
+
+ // 必须正常关闭 context,否则登录态不会持久化到磁盘
+ await context.close();
+ _context = null;
+}
+
+export async function syncAll() {
+ const context = await getContext();
+
+ try {
+ const summary = { startedAt: new Date().toISOString(), datasets: {} };
+ const page = context.pages()[0] || (await context.newPage());
+
+ summary.datasets.customers = await syncCustomers(page);
+ summary.datasets.customerDetails = await syncCustomerDetails(page);
+ summary.datasets.orders = await syncOrders(page);
+
+ // syncOrders 完成后,从最新的 orders.json 读取 orderId 列表
+ const latestOrders = loadCurrentState('orders');
+ const orderIdsForDetail = collectValidOrderIds(latestOrders.records || []);
+
+ summary.datasets.orderDetails = await syncOrderDetails(page, orderIdsForDetail);
+ summary.datasets.bills = await syncBills(page);
+ summary.finishedAt = new Date().toISOString();
+
+ const stamp = nowStamp();
+ saveRunSummary(stamp, summary);
+ return summary;
+ } finally {
+ if (config.closeBrowser) {
+ await context.close();
+ _context = null;
+ } else {
+ console.log('浏览器保持运行');
+ }
+ }
+}
+
+export async function scheduleSync() {
+ console.log(`定时任务已启动: ${config.cron} (${config.timezone})`);
+ cron.schedule(
+ config.cron,
+ async () => {
+ try {
+ console.log(`[${new Date().toISOString()}] 开始执行同步`);
+ const summary = await syncAll();
+ console.log(`[${new Date().toISOString()}] 同步完成`, JSON.stringify(summary, null, 2));
+ try {
+ const scriptPath = path.resolve(config.rootDir, config.dbSyncScript);
+ const incrementalFlag = config.fullSync ? '' : ' --incremental';
+ console.log(`[入库] 执行 ${scriptPath}${incrementalFlag ? ' (增量模式)' : ''}`);
+ const output = execSync(`python "${scriptPath}"${incrementalFlag}`, {
+ cwd: path.dirname(scriptPath),
+ encoding: 'utf-8',
+ timeout: 120000,
+ });
+ console.log(output);
+ } catch (e) {
+ console.error('[入库] 失败:', e.message);
+ }
+ } catch (error) {
+ console.error(`[${new Date().toISOString()}] 同步失败`, error);
+ }
+ },
+ { timezone: config.timezone },
+ );
+}
+
+async function syncCustomers(page) {
+ const dataset = datasets.customers;
+ await page.goto(dataset.url, { waitUntil: 'domcontentloaded' });
+ await waitUntilReady(page, dataset.heading);
+ await trySetPageSize(page, dataset.pageSize);
+ const records = await scrapePagedTable(page, dataset, {});
+ return persistDataset(dataset, records, {});
+}
+
+async function syncCustomerDetails(page) {
+ const dataset = datasets.customerDetails;
+ const customersState = loadCurrentState('customers');
+ const allAccountIds = collectValidAccountIds(customersState.records || []);
+
+ if (allAccountIds.length === 0) {
+ console.log('[客户详情] 本地无有效客户 accountId,跳过');
+ return persistDataset(dataset, [], {});
+ }
+
+ console.log(`[客户详情] 共 ${allAccountIds.length} 个客户需要获取详情`);
+ const allDetails = [];
+ const detailBaseUrl =
+ 'https://aps.aliyun.com/?spm=5176.12818093.top-nav.ditem-fx.785716d0LKDpKT#/detail/my_customer/~/customer/';
+
+ for (let index = 0; index < allAccountIds.length; index += 1) {
+ const accountId = allAccountIds[index];
+ console.log(`[客户详情] ${index + 1}/${allAccountIds.length} accountId=${accountId}`);
+
+ // 先跳 about:blank 再跳详情URL(强制 SPA 完整重新加载)
+ await page.goto('about:blank');
+ await sleep(300);
+ await page.goto(`${detailBaseUrl}${accountId}`, { waitUntil: 'domcontentloaded' });
+
+ try {
+ await page.waitForFunction(
+ (text) => document.body && document.body.innerText.includes(text),
+ '详情',
+ { timeout: 15000 },
+ );
+ await sleep(1000);
+ } catch {
+ console.warn(`[客户详情] ${accountId} 详情页加载超时,跳过`);
+ continue;
+ }
+
+ const detail = await extractCustomerDetail(page);
+ allDetails.push({ ...detail, __context: { accountId } });
+ }
+
+ return persistDataset(dataset, dedupeByHash(allDetails), {});
+}
+
+async function syncOrders(page) {
+ const dataset = datasets.orders;
+ let windows;
+
+ if (config.fullSync) {
+ windows = buildMonthlyDateWindows(config.orderStartDate);
+ } else {
+ // 增量模式:只查前一天
+ const yesterday = new Date();
+ yesterday.setDate(yesterday.getDate() - 1);
+ const dateStr = formatDate(yesterday);
+ windows = [{ windowStart: dateStr, windowEnd: dateStr, start: dateStr, end: dateStr }];
+ console.log(`[增量模式] 订单仅查询: ${dateStr}`);
+ }
+
+ const allRecords = [];
+
+ for (const window of windows) {
+ await page.goto(dataset.url, { waitUntil: 'domcontentloaded' });
+ await waitUntilReady(page, dataset.heading);
+ await setDateRange(page, window.start, window.end);
+ await clickQuery(page);
+ await trySetPageSize(page, dataset.pageSize);
+ const records = await scrapePagedTable(page, dataset, window);
+ allRecords.push(...records);
+ }
+
+ return persistDataset(dataset, dedupeByHash(allRecords), {});
+}
+
+async function syncBills(page) {
+ const dataset = datasets.bills;
+ let months;
+ let latestConsumptionDate = null;
+
+ if (config.fullSync) {
+ months = buildMonthList(config.billStartMonth);
+ } else {
+ latestConsumptionDate = getLatestBillConsumptionDate();
+ const incrementalMonth = latestConsumptionDate?.slice(0, 7)
+ || `${new Date().getFullYear()}-${String(new Date().getMonth() + 1).padStart(2, '0')}`;
+ months = [incrementalMonth];
+ console.log(`[增量模式] 账单仅查询: ${incrementalMonth}${latestConsumptionDate ? `, 数据库最新消费时间: ${latestConsumptionDate}` : ''}`);
+ }
+
+ const allRecords = [];
+
+ for (const month of months) {
+ await page.goto(dataset.url, { waitUntil: 'domcontentloaded' });
+ await waitUntilReady(page, dataset.heading);
+ await setMonthValue(page, month);
+ await clickQuery(page);
+ await trySetPageSize(page, dataset.pageSize);
+ let records = await scrapePagedTable(page, dataset, { month });
+ if (latestConsumptionDate) {
+ const before = records.length;
+ records = records.filter((record) => isAfterLatestConsumptionDate(record, latestConsumptionDate));
+ console.log(`[增量模式] 账单按消费时间过滤: ${before} -> ${records.length}`);
+ }
+ allRecords.push(...records);
+ }
+
+ return persistDataset(dataset, dedupeByHash(allRecords), {});
+}
+
+function getLatestBillConsumptionDate() {
+ const scriptPath = path.resolve(config.rootDir, config.dbSyncScript);
+ try {
+ const output = execSync(`python "${scriptPath}" --latest-bill-consumption-time`, {
+ cwd: path.dirname(scriptPath),
+ encoding: 'utf-8',
+ timeout: 120000,
+ }).trim();
+ const latest = output.split(/\r?\n/).map((line) => line.trim()).filter(Boolean).at(-1) || '';
+ return /^\d{4}-\d{2}-\d{2}/.test(latest) ? latest.slice(0, 10) : null;
+ } catch (error) {
+ console.error('[增量模式] 查询数据库最新账单消费时间失败:', error.message);
+ return null;
+ }
+}
+
+function isAfterLatestConsumptionDate(record, latestConsumptionDate) {
+ const consumeDate = String(record['消费时间'] || record.consumeDate || '').trim().slice(0, 10);
+ if (!/^\d{4}-\d{2}-\d{2}$/.test(consumeDate)) {
+ return false;
+ }
+ return consumeDate > latestConsumptionDate;
+}
+
+async function syncOrderDetails(page, cachedOrderIds) {
+ const dataset = datasets.orderDetails;
+
+ // 使用传入的 orderId 列表(在 syncOrders 覆盖 orders.json 之前缓存的)
+ const allOrderIds = cachedOrderIds || [];
+
+ if (allOrderIds.length === 0) {
+ console.log('[订单详情] 本地无订单数据,跳过');
+ return persistDataset(dataset, [], {});
+ }
+
+ console.log(`[订单详情] 共 ${allOrderIds.length} 个订单需要获取详情`);
+ const allDetails = [];
+ const detailBaseUrl = 'https://aps.aliyun.com/?spm=5176.12818093.top-nav.ditem-fx.785716d0LKDpKT#/detail/order/~/costCenter/order/detail/';
+
+ for (let index = 0; index < allOrderIds.length; index += 1) {
+ const orderId = allOrderIds[index];
+ console.log(`[订单详情] ${index + 1}/${allOrderIds.length} orderId=${orderId}`);
+
+ // 先跳 about:blank 再跳详情URL(强制 SPA 完整重新加载)
+ await page.goto('about:blank');
+ await sleep(300);
+ await page.goto(`${detailBaseUrl}${orderId}?projectId=`, { waitUntil: 'domcontentloaded' });
+
+ try {
+ await page.waitForFunction(
+ (text) => document.body && document.body.innerText.includes(text),
+ '订单详情',
+ { timeout: 15000 },
+ );
+ await sleep(1000);
+ } catch {
+ console.warn(`[订单详情] ${orderId} 详情页加载超时,跳过`);
+ continue;
+ }
+
+ const detail = await extractOrderDetail(page);
+ if (!isValidOrderId(detail.orderId)) {
+ detail.orderId = orderId;
+ }
+ allDetails.push({ ...detail, __context: {} });
+ }
+
+ return persistDataset(dataset, dedupeByHash(allDetails), {});
+}
+
+function persistDataset(dataset, records, context) {
+ const normalized = records.map((record) => withHash(dataset.normalize(record, record.__context || context)));
+ const previousState = loadCurrentState(dataset.name);
+ const nextState = diffRecords(previousState, normalized, dataset.uniqueKey);
+ const stamp = saveDatasetRun(dataset.name, nextState);
+ saveDelta(dataset.name, stamp, nextState.delta);
+ return {
+ stamp,
+ stats: nextState.stats,
+ };
+}
+
+async function waitUntilReady(page, heading, timeout = 120000) {
+ await page.waitForLoadState('domcontentloaded');
+ console.log(`[waitUntilReady] 当前URL: ${page.url()}`);
+ console.log(`[waitUntilReady] 等待页面出现: "${heading}"`);
+
+ try {
+ await page.waitForFunction(
+ (text) => document.body && document.body.innerText.includes(text),
+ heading,
+ { timeout },
+ );
+ } catch (err) {
+ // 超时时打印诊断信息
+ const currentUrl = page.url();
+ const bodyText = await page.evaluate(() => document.body?.innerText?.substring(0, 500) || '(空)').catch(() => '(无法获取)');
+ console.error(`[waitUntilReady] 超时!当前URL: ${currentUrl}`);
+ console.error(`[waitUntilReady] 页面内容前500字: ${bodyText}`);
+ if (currentUrl.includes('signin')) {
+ try {
+ await sendLoginAlert();
+ } catch (notifyErr) {
+ console.error('[通知] 发送登录提醒失败:', notifyErr.message);
+ }
+ }
+ throw err;
+ }
+
+ if ((await page.locator('text=登录').count()) > 0 && page.url().includes('login')) {
+ throw new Error('当前未登录,请先执行 npm run login');
+ }
+ await sleep(1500);
+}
+
+async function scrapePagedTable(page, dataset, context) {
+ const pages = [];
+ const visited = new Set();
+
+ while (true) {
+ await waitForTableRows(page);
+ const pageData = await extractTable(page);
+ const pageNum = await currentPageNumber(page);
+ const pageKey = `${pageNum}-${pageData.rows.length}`;
+ console.log(`[抓取] 第${pageNum}页, ${pageData.rows.length}行, key="${pageKey}"`);
+ if (visited.has(pageKey)) {
+ console.log(`[抓取] 重复页面key,停止翻页`);
+ break;
+ }
+ visited.add(pageKey);
+ pages.push(...pageData.rows.map((row) => ({ ...row, __context: context })));
+
+ const moved = await gotoNextPage(page);
+ if (!moved) {
+ console.log(`[抓取] 翻页失败或已到最后一页,停止`);
+ break;
+ }
+ }
+
+ console.log(`[抓取] 共采集 ${pages.length} 条记录`);
+ return pages;
+}
+
+async function extractTable(page) {
+ return page.evaluate(() => {
+ const normalize = (value) =>
+ String(value || '')
+ .replace(/\u00a0/g, ' ')
+ .replace(/\s+\n/g, '\n')
+ .replace(/\n\s+/g, '\n')
+ .trim();
+
+ const headerTables = Array.from(document.querySelectorAll('table')).filter((table) => table.querySelectorAll('thead th').length > 1);
+ const headerTable = headerTables.sort((a, b) => b.querySelectorAll('thead th').length - a.querySelectorAll('thead th').length)[0];
+ if (!headerTable) return { headers: [], rows: [] };
+
+ const headers = Array.from(headerTable.querySelectorAll('thead th')).map((cell) => normalize(cell.textContent));
+ const bodyTables = Array.from(document.querySelectorAll('table')).filter((table) => table.querySelectorAll('tbody tr').length > 0);
+ const bodyTable = bodyTables.sort((a, b) => {
+ const aSize = Math.max(...Array.from(a.querySelectorAll('tbody tr')).map((row) => row.querySelectorAll('td').length), 0);
+ const bSize = Math.max(...Array.from(b.querySelectorAll('tbody tr')).map((row) => row.querySelectorAll('td').length), 0);
+ return bSize - aSize;
+ })[0];
+ if (!bodyTable) return { headers, rows: [] };
+
+ const rows = Array.from(bodyTable.querySelectorAll('tbody tr'))
+ .map((row) => Array.from(row.querySelectorAll('td')).map((cell) => normalize(cell.innerText || cell.textContent)))
+ .filter((cells) => cells.some(Boolean))
+ .map((cells) => {
+ const record = {};
+ headers.forEach((header, index) => {
+ record[header || `column_${index + 1}`] = cells[index] || '';
+ });
+ return record;
+ });
+
+ return { headers, rows };
+ });
+}
+
+async function waitForTableRows(page) {
+ await page.waitForFunction(() => document.querySelectorAll('table tbody tr').length > 0, null, { timeout: 120000 });
+ await sleep(800);
+}
+
+async function currentPageNumber(page) {
+ const active = page.locator('.next-pagination-item.next-current');
+ if ((await active.count()) === 0) return 1;
+ return Number.parseInt((await active.first().innerText()).trim(), 10) || 1;
+}
+
+async function gotoNextPage(page) {
+ const before = await currentPageNumber(page);
+
+ // 用 Playwright locator 定位"下一页"按钮
+ const nextBtn = page.locator('button.next-pagination-item.next-next');
+ if ((await nextBtn.count()) === 0) {
+ console.log('[翻页] 未找到下一页按钮');
+ return false;
+ }
+
+ const disabled = (await nextBtn.getAttribute('disabled')) != null;
+ if (disabled) {
+ console.log('[翻页] 下一页按钮已禁用');
+ return false;
+ }
+
+ // 用 Playwright click(而非 DOM click),确保 React 事件正常触发
+ await nextBtn.click();
+ await sleep(2000);
+
+ const after = await currentPageNumber(page);
+ console.log(`[翻页] ${before} -> ${after}`);
+ return before !== after;
+}
+
+async function trySetPageSize(page, pageSize) {
+ const input = page.locator('input[aria-label="请选择每页显示几条"]').first();
+ if ((await input.count()) === 0) return;
+ await input.click().catch(() => null);
+ await sleep(300);
+ const option = page.locator(`text=${pageSize}`).last();
+ if ((await option.count()) === 0) {
+ await page.keyboard.press('Escape').catch(() => null);
+ return;
+ }
+ await option.click().catch(() => null);
+ await sleep(1200);
+}
+
+async function setDateRange(page, start, end) {
+ console.log(`[订单日期] 设置: ${start} ~ ${end}`);
+
+ await _fillDateRange(page, start, end);
+
+ // 验证
+ const startActual = await page.locator('input[placeholder="起始日期"]').inputValue().catch(() => '');
+ const endActual = await page.locator('input[placeholder="结束日期"]').inputValue().catch(() => '');
+
+ // 如果结果不对,用反向顺序重试(先填开始再填结束)
+ if (startActual !== start || endActual !== end) {
+ console.log(`[订单日期] 首次结果不对: "${startActual}" ~ "${endActual}",反向重试`);
+ await _fillDateRange(page, start, end, true);
+ const s2 = await page.locator('input[placeholder="起始日期"]').inputValue().catch(() => '');
+ const e2 = await page.locator('input[placeholder="结束日期"]').inputValue().catch(() => '');
+ console.log(`[订单日期] 重试结果: "${s2}" ~ "${e2}"`);
+ } else {
+ console.log(`[订单日期] 结果: "${startActual}" ~ "${endActual}"`);
+ }
+}
+
+async function _fillDateRange(page, start, end, startFirst = false) {
+ const trigger = page.locator('input[placeholder="结束日期"]');
+ await trigger.click();
+ await sleep(1000);
+
+ const panelStartInput = page.locator('.next-range-picker-panel-input-start-date input');
+ const panelEndInput = page.locator('.next-range-picker-panel-input-end-date input');
+
+ if (startFirst) {
+ // 先填开始日期
+ await panelStartInput.click();
+ await sleep(100);
+ await page.keyboard.press('Control+A');
+ await page.keyboard.type(start, { delay: 30 });
+ await sleep(300);
+ // 再填结束日期
+ await panelEndInput.click();
+ await sleep(100);
+ await page.keyboard.press('Control+A');
+ await page.keyboard.type(end, { delay: 30 });
+ await sleep(300);
+ } else {
+ // 先填结束日期(默认)
+ await panelEndInput.click();
+ await sleep(100);
+ await page.keyboard.press('Control+A');
+ await page.keyboard.type(end, { delay: 30 });
+ await sleep(300);
+ // 再填开始日期
+ await panelStartInput.click();
+ await sleep(100);
+ await page.keyboard.press('Control+A');
+ await page.keyboard.type(start, { delay: 30 });
+ await sleep(300);
+ }
+
+ await page.keyboard.press('Enter');
+ await sleep(500);
+ await page.mouse.click(0, 0);
+ await sleep(300);
+ await page.keyboard.press('Escape');
+ await sleep(300);
+ await page.locator('.next-overlay-wrapper.opened').waitFor({ state: 'hidden', timeout: 3000 }).catch(() => null);
+ await sleep(300);
+}
+
+async function setMonthValue(page, month) {
+ // 先尝试按 inputValue 匹配 YYYY-MM 格式
+ const inputs = page.locator('input');
+ const total = await inputs.count();
+ const allValues = [];
+
+ for (let index = 0; index < total; index += 1) {
+ const input = inputs.nth(index);
+ const value = await input.inputValue().catch(() => '');
+ const placeholder = await input.getAttribute('placeholder').catch(() => '');
+ allValues.push({ index, value, placeholder });
+
+ if (/^\d{4}-\d{2}$/.test(value)) {
+ console.log(`[账单月份] 通过 value 匹配到 input[${index}], 设置: ${month}`);
+ await typeIntoDateInput(input, month, page);
+ return;
+ }
+ }
+
+ // 如果 value 为空,尝试按 placeholder 匹配月份选择器
+ for (const item of allValues) {
+ if (item.placeholder && /月/.test(item.placeholder)) {
+ console.log(`[账单月份] 通过 placeholder 匹配到 input[${item.index}], 设置: ${month}`);
+ await typeIntoDateInput(inputs.nth(item.index), month, page);
+ return;
+ }
+ }
+
+ // 兜底:找任何看起来像日期/月份选择器的 input(排除搜索框等)
+ for (const item of allValues) {
+ const input = inputs.nth(item.index);
+ const cls = await input.evaluate((el) => el.closest('[class*="date-picker"], [class*="month-picker"], [class*="range-picker"]')?.className || '').catch(() => '');
+ if (cls) {
+ console.log(`[账单月份] 通过父级 class 匹配到 input[${item.index}] (${cls}), 设置: ${month}`);
+ await typeIntoDateInput(input, month, page);
+ return;
+ }
+ }
+
+ console.error('[DEBUG] 账单页面所有 input:', JSON.stringify(allValues, null, 2));
+ throw new Error('未识别到账单佣金月份输入框,请打开页面确认结构是否变化。');
+}
+
+/**
+ * 用键盘输入日期值。
+ * 策略:focus → 全选 → 快速键入 → Tab 移开焦点(触发 blur 提交,但不会像 click 那样打开面板)。
+ * 即使面板弹出,快速键入 + Tab 也能在面板滚动前完成提交并关闭。
+ */
+async function typeIntoDateInput(locator, value, page) {
+ // 移除 readonly
+ await locator.evaluate((node) => node.removeAttribute('readonly'));
+
+ // focus 并全选当前内容
+ await locator.focus();
+ await sleep(100);
+ await page.keyboard.press('Control+A');
+ await sleep(100);
+
+ // 快速逐字符输入新值
+ await page.keyboard.type(value, { delay: 30 });
+ await sleep(200);
+
+ // Tab 移开焦点 → 触发 onBlur 提交值 + 关闭面板
+ await page.keyboard.press('Tab');
+ await sleep(300);
+
+ // 如果面板还在,Escape 兜底关闭
+ await page.keyboard.press('Escape');
+ await sleep(300);
+
+ // 验证
+ const actual = await locator.inputValue().catch(() => '');
+ if (actual !== value) {
+ console.warn(`[WARN] typeIntoDateInput: 期望 "${value}",实际 "${actual}"`);
+ } else {
+ console.log(`[日期设置] 成功: "${value}"`);
+ }
+}
+
+async function clickQuery(page) {
+ const button = page.locator('button:has-text("查询")').first();
+ await button.click();
+ await sleep(1800);
+}
+
+function buildMonthlyDateWindows(startDate) {
+ const start = new Date(`${startDate}T00:00:00+08:00`);
+ const end = new Date();
+ const windows = [];
+ const cursor = new Date(start.getFullYear(), start.getMonth(), 1);
+
+ while (cursor <= end) {
+ const windowStart = new Date(cursor);
+ const windowEnd = new Date(cursor.getFullYear(), cursor.getMonth() + 1, 0);
+ const actualEnd = windowEnd > end ? end : windowEnd;
+ windows.push({
+ windowStart: formatDate(windowStart),
+ windowEnd: formatDate(actualEnd),
+ start: formatDate(windowStart),
+ end: formatDate(actualEnd),
+ });
+ cursor.setMonth(cursor.getMonth() + 1);
+ }
+
+ return windows;
+}
+
+function buildMonthList(startMonth) {
+ const [year, month] = startMonth.split('-').map(Number);
+ const cursor = new Date(year, month - 1, 1);
+ const end = new Date();
+ const months = [];
+
+ while (cursor <= end) {
+ months.push(`${cursor.getFullYear()}-${String(cursor.getMonth() + 1).padStart(2, '0')}`);
+ cursor.setMonth(cursor.getMonth() + 1);
+ }
+
+ return months;
+}
+
+function formatDate(date) {
+ return `${date.getFullYear()}-${String(date.getMonth() + 1).padStart(2, '0')}-${String(date.getDate()).padStart(2, '0')}`;
+}
+
+function dedupeByHash(records) {
+ const seen = new Set();
+ return records.filter((record) => {
+ const key = JSON.stringify(record);
+ if (seen.has(key)) return false;
+ seen.add(key);
+ return true;
+ });
+}
+
+function collectValidOrderIds(records) {
+ const ids = [];
+ const seen = new Set();
+ for (const record of records) {
+ // 支持两种字段名:normalized 后的 orderId 和原始的 订单号
+ const rawOrderId = String(record.orderId || record['订单号'] || '').trim();
+ if (!rawOrderId || rawOrderId.includes('没有数据')) {
+ continue;
+ }
+ if (!isValidOrderId(rawOrderId)) {
+ console.log(`[订单详情] 跳过无效订单号: ${rawOrderId}`);
+ continue;
+ }
+ if (seen.has(rawOrderId)) {
+ continue;
+ }
+ seen.add(rawOrderId);
+ ids.push(rawOrderId);
+ }
+ return ids;
+}
+
+function collectValidAccountIds(records) {
+ const ids = [];
+ const seen = new Set();
+ for (const record of records) {
+ const rawAccountId = String(record.accountId || '').trim();
+ if (!rawAccountId || rawAccountId.includes('没有数据')) {
+ continue;
+ }
+ if (!isValidAccountId(rawAccountId)) {
+ console.log(`[客户详情] 跳过无效 accountId: ${rawAccountId}`);
+ continue;
+ }
+ if (seen.has(rawAccountId)) {
+ continue;
+ }
+ seen.add(rawAccountId);
+ ids.push(rawAccountId);
+ }
+ return ids;
+}
+
+function isValidOrderId(orderId) {
+ const value = String(orderId || '').trim();
+ if (!value) return false;
+ if (value.includes('�')) return false;
+ return /^\d+$/.test(value);
+}
+
+function isValidAccountId(accountId) {
+ const value = String(accountId || '').trim();
+ if (!value) return false;
+ if (value.includes('�')) return false;
+ return /^\d+$/.test(value);
+}
+
+async function extractOrderDetail(page) {
+ return page.evaluate(() => {
+ const text = document.body?.innerText || '';
+
+ const extract = (label) => {
+ const lineBreakPattern = new RegExp(`${label}\\s*(?:\\r?\\n)+\\s*([^\\r\\n]+)`);
+ const lineBreakMatch = text.match(lineBreakPattern);
+ if (lineBreakMatch) return lineBreakMatch[1].trim();
+
+ const inlinePattern = new RegExp(`${label}\\s*[::]?\\s*([^\\r\\n]+)`);
+ const inlineMatch = text.match(inlinePattern);
+ return inlineMatch ? inlineMatch[1].trim() : '';
+ };
+
+ return {
+ orderId: extract('订单号'),
+ orderType: extract('订单类型'),
+ status: extract('状态'),
+ tradeType: extract('交易类型'),
+ customerCategory: extract('客户分类'),
+ dealerName: extract('二级经销商名称'),
+ dealerUid: extract('二级经销商UID'),
+ customerType: extract('客户类型'),
+ opportunityId: extract('商机ID'),
+ paymentTime: extract('支付时间'),
+ orderTime: extract('下单时间'),
+ productName: extract('产品名称'),
+ productCode: extract('产品code'),
+ originalPriceCny: extract('订单原价\\(CNY\\)'),
+ paidAmountCny: extract('实付金额\\(CNY\\)'),
+ discount: extract('订单折扣'),
+ payableAmountCny: extract('应付金额(实付\\+代金券)\\(CNY\\)'),
+ couponAmountCny: extract('代金券金额\\(CNY\\)'),
+ };
+ });
+}
+
+async function extractCustomerDetail(page) {
+ return page.evaluate(() => {
+ const normalize = (value) =>
+ String(value || '')
+ .replace(/\u00a0/g, ' ')
+ .trim();
+
+ const text = normalize(document.body?.innerText || '').replace(/\r/g, '');
+
+ const extract = (label, sourceText = text) => {
+ const lineBreakPattern = new RegExp(`${label}\\s*(?:\\n)+\\s*([^\\n]+)`);
+ const lineBreakMatch = sourceText.match(lineBreakPattern);
+ if (lineBreakMatch) return normalize(lineBreakMatch[1]);
+
+ const inlinePattern = new RegExp(`${label}\\s*[::]?\\s*([^\\n]+)`);
+ const inlineMatch = sourceText.match(inlinePattern);
+ return inlineMatch ? normalize(inlineMatch[1]) : '';
+ };
+
+ const normalizeAmount = (value) => normalize(value).replace(/[¥,]/g, '').trim();
+
+ const buildSection = (startLabel, endLabel = '') => {
+ const start = text.indexOf(startLabel);
+ if (start < 0) return '';
+ const end = endLabel ? text.indexOf(endLabel, start + startLabel.length) : -1;
+ if (end > start) return text.slice(start, end);
+ return text.slice(start);
+ };
+
+ const lastMonthSection = buildSection('上月应付总金额(CNY)', '本月应付总金额(CNY)');
+ const currentMonthSection = buildSection('本月应付总金额(CNY)');
+
+ const extractAmountFromSection = (sectionText, label) => normalizeAmount(extract(label, sectionText));
+
+ let department = '';
+ const table = Array.from(document.querySelectorAll('table')).find((node) =>
+ (node.innerText || '').includes('所属部门'),
+ );
+ if (table) {
+ const rows = table.querySelectorAll('tbody tr');
+ for (const row of rows) {
+ const cells = row.querySelectorAll('td');
+ if (cells.length >= 2) {
+ const value = normalize(cells[1]?.innerText || cells[1]?.textContent || '');
+ if (value) {
+ department = value;
+ break;
+ }
+ }
+ }
+ }
+
+ return {
+ customerAccount: extract('客户账号'),
+ customerName: extract('客户名称'),
+ customerType: extract('客户类型'),
+ tradeMode: extract('交易模式'),
+ customerSource: extract('客户来源'),
+ realNameStatus: extract('实名状态'),
+ email: extract('邮箱'),
+ relationDate: extract('关联日期'),
+ phone: extract('手机号'),
+ remark: extract('备注'),
+ paymentNoticeStatus: extract('代为支付告知状态'),
+ department,
+ lastMonthPayableTotalCny: extractAmountFromSection(lastMonthSection, '上月应付总金额(CNY)'),
+ lastMonthPrepayCny: extractAmountFromSection(lastMonthSection, '预付费金额'),
+ lastMonthPostpayCny: extractAmountFromSection(lastMonthSection, '后付费金额'),
+ currentMonthPayableTotalCny: extractAmountFromSection(currentMonthSection, '本月应付总金额(CNY)'),
+ currentMonthPrepayCny: extractAmountFromSection(currentMonthSection, '预付费金额'),
+ currentMonthPostpayCny: extractAmountFromSection(currentMonthSection, '后付费金额'),
+ };
+ });
+}
diff --git a/aliyun-sync/aps-aliyun-sync/aps_db_sync.py b/aliyun-sync/aps-aliyun-sync/aps_db_sync.py
new file mode 100644
index 0000000..3588897
--- /dev/null
+++ b/aliyun-sync/aps-aliyun-sync/aps_db_sync.py
@@ -0,0 +1,1073 @@
+from __future__ import annotations
+
+"""
+APS Aliyun Data Sync — Database Schema & Sync Pipeline
+=======================================================
+Reads JSON output from aps-aliyun-scraper skill → Syncs to MySQL.
+
+Design decisions:
+ - Customer master: UPSERT on account_id (natural PK from Aliyun).
+ - Customer snapshot: append-only monthly snapshot for historical tracking.
+ - Orders: UPSERT on order_id (natural PK). Status may change (未支付→已支付→作废).
+ - Bills: DELETE+INSERT per (billing_month, customer_account). Bills are immutable within
+ a closed month but may be re-exported with corrections before close.
+ - Product ratio: DELETE+INSERT per (account_id, snapshot_month).
+
+Update strategy rationale:
+ - Customers (~14): tiny, full UPSERT is fine. Snapshot table captures monthly changes.
+ - Orders (~60/mo): small, UPSERT handles status changes cleanly.
+ - Bills (~1000/mo): medium, DELETE+INSERT per customer+month is safest because bills
+ lack a unique row ID and may be corrected before month close.
+"""
+
+import json
+import math
+import logging
+import hashlib
+from datetime import datetime, date
+from pathlib import Path
+from typing import TypedDict, cast
+
+import pymysql
+from pymysql.connections import Connection
+from pymysql.cursors import DictCursor
+
+JsonDict = dict[str, object]
+JsonList = list[JsonDict]
+StatsDict = dict[str, int]
+
+
+class DbConfig(TypedDict):
+ host: str
+ port: int
+ user: str
+ password: str
+ database: str
+ charset: str
+
+# ---------------------------------------------------------------------------
+# Config
+# ---------------------------------------------------------------------------
+DB_CONFIG = {
+ "host": "172.27.137.236",
+ "port": 3306,
+ "user": "ray",
+ "password": "GV0C$ErephgQO7RQc7b6",
+ "database": "crm-prod",
+ "charset": "utf8mb4",
+}
+
+JSON_DIR = Path(r"C:\Users\Administrator\Desktop\aliyun-sync\aliyun-aps-sync\data\current")
+LOG_FORMAT = "%(asctime)s [%(levelname)s] %(message)s"
+logging.basicConfig(level=logging.INFO, format=LOG_FORMAT)
+logger = logging.getLogger("aps_sync")
+
+# ---------------------------------------------------------------------------
+# Schema DDL
+# ---------------------------------------------------------------------------
+DDL_STATEMENTS = [
+ # 1. Customer master — latest state, UPSERT on each sync
+ """
+ CREATE TABLE IF NOT EXISTS aps_customer (
+ account_id VARCHAR(32) NOT NULL COMMENT '阿里云UID',
+ login_name VARCHAR(128) NOT NULL COMMENT '登录名称',
+ real_name VARCHAR(256) DEFAULT NULL COMMENT '实名认证名称',
+ report_source VARCHAR(64) DEFAULT NULL COMMENT '报备来源',
+ report_type VARCHAR(64) DEFAULT NULL COMMENT '报备类型',
+ trade_mode VARCHAR(64) DEFAULT NULL COMMENT '交易模式',
+ real_name_status VARCHAR(64) DEFAULT NULL COMMENT '实名认证状态',
+ relation_date DATETIME DEFAULT NULL COMMENT '关联日期',
+ follow_staff VARCHAR(64) DEFAULT NULL COMMENT '跟进员工',
+ payment_notice_status VARCHAR(32) DEFAULT NULL COMMENT '代为支付告知状态',
+ invite_register_type VARCHAR(64) DEFAULT NULL COMMENT '邀约注册类型',
+ is_new_customer TINYINT(1) DEFAULT NULL COMMENT '是否新客户 1=是 0=否',
+ performance_start_point_reached TINYINT(1) DEFAULT NULL COMMENT '是否达成业绩起算点',
+ customer_category VARCHAR(64) DEFAULT NULL COMMENT '客户分类',
+ remark VARCHAR(512) DEFAULT NULL COMMENT '备注',
+ no_consumption_months INT DEFAULT NULL COMMENT '客户无消费月数',
+ planned_release_time DATE DEFAULT NULL COMMENT '计划释放时间',
+ planned_release_reason VARCHAR(256) DEFAULT NULL COMMENT '计划释放原因',
+ -- detail fields
+ customer_name VARCHAR(256) DEFAULT NULL COMMENT '客户名称(详情)',
+ customer_type VARCHAR(64) DEFAULT NULL COMMENT '客户类型(详情)',
+ customer_source VARCHAR(64) DEFAULT NULL COMMENT '客户来源',
+ email VARCHAR(256) DEFAULT NULL COMMENT '邮箱',
+ phone VARCHAR(32) DEFAULT NULL COMMENT '手机号',
+ department VARCHAR(128) DEFAULT NULL COMMENT '所属部门',
+ -- sync meta
+ created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
+ updated_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
+ PRIMARY KEY (account_id),
+ UNIQUE KEY uk_login_name (login_name)
+ ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='APS客户主表(最新状态)'
+ """,
+
+ # 2. Customer monthly snapshot — append-only, tracks financial changes
+ """
+ CREATE TABLE IF NOT EXISTS aps_customer_snapshot (
+ id BIGINT NOT NULL AUTO_INCREMENT,
+ account_id VARCHAR(32) NOT NULL COMMENT '阿里云UID',
+ snapshot_month VARCHAR(7) NOT NULL COMMENT '快照月份 YYYY-MM',
+ account_cash_balance_cny DECIMAL(16,2) DEFAULT NULL COMMENT '账户现金余额',
+ pending_invoice_amount_cny DECIMAL(16,2) DEFAULT NULL COMMENT '待开票金额',
+ last_month_consumption_cny DECIMAL(16,2) DEFAULT NULL COMMENT '上月消费金额',
+ current_month_consumption_cny DECIMAL(16,2) DEFAULT NULL COMMENT '本月消费金额',
+ last_month_payable_total_cny DECIMAL(16,2) DEFAULT NULL COMMENT '上月应付总金额',
+ last_month_prepay_cny DECIMAL(16,2) DEFAULT NULL COMMENT '上月预付费金额',
+ last_month_postpay_cny DECIMAL(16,2) DEFAULT NULL COMMENT '上月后付费金额',
+ current_month_payable_total_cny DECIMAL(16,2) DEFAULT NULL COMMENT '本月应付总金额',
+ current_month_prepay_cny DECIMAL(16,2) DEFAULT NULL COMMENT '本月预付费金额',
+ current_month_postpay_cny DECIMAL(16,2) DEFAULT NULL COMMENT '本月后付费金额',
+ data_hash VARCHAR(64) DEFAULT NULL COMMENT '数据指纹(去重用)',
+ captured_at DATE NOT NULL COMMENT '抓取日期',
+ created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
+ PRIMARY KEY (id),
+ UNIQUE KEY uk_account_month (account_id, snapshot_month, captured_at),
+ KEY idx_snapshot_month (snapshot_month),
+ CONSTRAINT fk_snapshot_customer FOREIGN KEY (account_id) REFERENCES aps_customer(account_id) ON UPDATE CASCADE
+ ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='APS客户月度快照(追加)'
+ """,
+
+ # 3. Product ratio per snapshot
+ """
+ CREATE TABLE IF NOT EXISTS aps_customer_product_ratio (
+ id BIGINT NOT NULL AUTO_INCREMENT,
+ account_id VARCHAR(32) NOT NULL,
+ snapshot_month VARCHAR(7) NOT NULL COMMENT 'YYYY-MM',
+ product_name VARCHAR(128) NOT NULL COMMENT '产品分类名称',
+ ratio VARCHAR(16) NOT NULL COMMENT '占比(如 41.73%)',
+ created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
+ PRIMARY KEY (id),
+ KEY idx_ratio_account_month (account_id, snapshot_month),
+ CONSTRAINT fk_ratio_customer FOREIGN KEY (account_id) REFERENCES aps_customer(account_id) ON UPDATE CASCADE
+ ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='APS客户产品占比'
+ """,
+
+ # 4. Orders — UPSERT on order_id
+ """
+ CREATE TABLE IF NOT EXISTS aps_order (
+ order_id VARCHAR(32) NOT NULL COMMENT '订单号',
+ customer_account_id VARCHAR(32) DEFAULT NULL COMMENT '客户account_id(FK, 可为空表示未映射)',
+ customer_login_name VARCHAR(128) NOT NULL DEFAULT '' COMMENT '客户账号(冗余,方便查询)',
+ customer_category VARCHAR(64) DEFAULT NULL COMMENT '客户分类',
+ order_type VARCHAR(32) DEFAULT NULL COMMENT '订单类型',
+ original_price_cny DECIMAL(16,2) DEFAULT NULL COMMENT '订单原价',
+ paid_amount_cny DECIMAL(16,2) DEFAULT NULL COMMENT '实付金额',
+ status VARCHAR(32) DEFAULT NULL COMMENT '订单状态',
+ order_time DATETIME DEFAULT NULL COMMENT '下单时间',
+ order_month VARCHAR(7) DEFAULT NULL COMMENT '订单月份 YYYY-MM',
+ created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
+ updated_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
+ PRIMARY KEY (order_id),
+ KEY idx_order_customer (customer_account_id),
+ KEY idx_order_month (order_month),
+ CONSTRAINT fk_order_customer FOREIGN KEY (customer_account_id) REFERENCES aps_customer(account_id) ON UPDATE CASCADE
+ ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='APS订单表(UPSERT)'
+ """,
+
+ # 5. Bills — DELETE+INSERT per (billing_month, customer)
+ """
+ CREATE TABLE IF NOT EXISTS aps_bill (
+ id BIGINT NOT NULL AUTO_INCREMENT,
+ billing_month VARCHAR(7) NOT NULL COMMENT '账期 YYYY-MM or YYYYMM',
+ customer_account_id VARCHAR(32) DEFAULT NULL COMMENT '客户account_id(FK, 可为空表示未映射)',
+ customer_login_name VARCHAR(128) NOT NULL DEFAULT '' COMMENT '客户账号',
+ bill_type VARCHAR(32) DEFAULT NULL COMMENT '账单类型',
+ payment_type VARCHAR(32) DEFAULT NULL COMMENT '付款类型',
+ consumption_time VARCHAR(32) DEFAULT NULL COMMENT '消费时间',
+ customer_uid VARCHAR(32) DEFAULT NULL COMMENT '客户UID',
+ customer_category VARCHAR(64) DEFAULT NULL COMMENT '客户分类',
+ customer_type VARCHAR(64) DEFAULT NULL COMMENT '客户类型',
+ product_code VARCHAR(128) DEFAULT NULL COMMENT '产品code',
+ product_name VARCHAR(256) DEFAULT NULL COMMENT '产品名称',
+ product_category VARCHAR(64) DEFAULT NULL COMMENT '产品分类',
+ cloud_product_instance_id VARCHAR(256) DEFAULT NULL COMMENT '云产品实例ID',
+ staff_account VARCHAR(64) DEFAULT NULL COMMENT '员工账号',
+ opportunity_id VARCHAR(64) DEFAULT NULL COMMENT '商机ID',
+ original_price_cny DECIMAL(16,6) DEFAULT NULL COMMENT '原价',
+ customer_payable_amount_cny DECIMAL(16,6) DEFAULT NULL COMMENT '客户应付金额',
+ voucher_amount_cny DECIMAL(16,6) DEFAULT NULL COMMENT '代金券金额',
+ stored_value_card_amount_cny DECIMAL(16,6) DEFAULT NULL COMMENT '储值卡金额',
+ cash_paid_amount_cny DECIMAL(16,6) DEFAULT NULL COMMENT '现金支付金额',
+ commission_month VARCHAR(7) DEFAULT NULL COMMENT '佣金月份',
+ non_business_compensation_amount_cny DECIMAL(16,6) DEFAULT NULL COMMENT '非业务类赔偿金额',
+ actual_paid_amount_cny DECIMAL(16,6) DEFAULT NULL COMMENT '实付金额',
+ actual_paid_plus_compensation_cny DECIMAL(16,6) DEFAULT NULL COMMENT '实付+非业务类赔偿金额',
+ included_in_performance VARCHAR(8) DEFAULT NULL COMMENT '是否计入业绩',
+ excluded_from_performance_reason VARCHAR(256) DEFAULT NULL COMMENT '不计业绩原因',
+ rebated VARCHAR(8) DEFAULT NULL COMMENT '是否返佣',
+ non_rebate_reason VARCHAR(256) DEFAULT NULL COMMENT '不返佣原因',
+ main_order_id VARCHAR(64) DEFAULT NULL COMMENT '主订单号',
+ sub_order_id VARCHAR(64) DEFAULT NULL COMMENT '子订单号',
+ order_cycle VARCHAR(32) DEFAULT NULL COMMENT '订单周期',
+ order_type VARCHAR(32) DEFAULT NULL COMMENT '订单类型',
+ original_order_id VARCHAR(64) DEFAULT NULL COMMENT '原订单号',
+ service_start_time VARCHAR(32) DEFAULT NULL COMMENT '服务开始时间',
+ service_end_time VARCHAR(32) DEFAULT NULL COMMENT '服务结束时间',
+ invite_register_type VARCHAR(64) DEFAULT NULL COMMENT '邀约注册类型',
+ staff_account_id_at_close VARCHAR(64) DEFAULT NULL,
+ staff_login_at_close VARCHAR(64) DEFAULT NULL,
+ staff_name_at_close VARCHAR(64) DEFAULT NULL,
+ staff_account_id_at_export VARCHAR(64) DEFAULT NULL,
+ staff_login_at_export VARCHAR(64) DEFAULT NULL,
+ staff_name_at_export VARCHAR(64) DEFAULT NULL,
+ partner_uid VARCHAR(32) DEFAULT NULL COMMENT '伙伴UID',
+ created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
+ PRIMARY KEY (id),
+ KEY idx_bill_customer (customer_account_id),
+ KEY idx_bill_month (billing_month),
+ KEY idx_bill_commission (commission_month),
+ CONSTRAINT fk_bill_customer FOREIGN KEY (customer_account_id) REFERENCES aps_customer(account_id) ON UPDATE CASCADE
+ ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='APS账单明细(按月全量替换)'
+ """,
+
+ # 7. Order detail — UPSERT on order_id
+ """
+ CREATE TABLE IF NOT EXISTS aps_order_detail (
+ id INT AUTO_INCREMENT PRIMARY KEY,
+ order_id VARCHAR(32) NOT NULL COMMENT '订单号',
+ order_type VARCHAR(32) COMMENT '订单类型',
+ status VARCHAR(32) COMMENT '订单状态',
+ trade_type VARCHAR(32) COMMENT '交易类型',
+ customer_category VARCHAR(64) COMMENT '客户分类',
+ dealer_name VARCHAR(256) COMMENT '二级经销商名称',
+ dealer_uid VARCHAR(64) COMMENT '二级经销商UID',
+ customer_type VARCHAR(64) COMMENT '客户类型',
+ opportunity_id VARCHAR(64) COMMENT '商机ID',
+ payment_time DATETIME COMMENT '支付时间',
+ order_time DATETIME COMMENT '下单时间',
+ product_name VARCHAR(256) COMMENT '产品名称',
+ product_code VARCHAR(64) COMMENT '产品code',
+ original_price_cny DECIMAL(14,2) COMMENT '订单原价(CNY)',
+ paid_amount_cny DECIMAL(14,2) COMMENT '实付金额(CNY)',
+ discount DECIMAL(8,4) COMMENT '订单折扣',
+ payable_amount_cny DECIMAL(14,2) COMMENT '应付金额(实付+代金券)(CNY)',
+ coupon_amount_cny DECIMAL(14,2) COMMENT '代金券金额(CNY)',
+ created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
+ updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
+ UNIQUE KEY uk_order_id (order_id)
+ ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='APS订单详情表(UPSERT)'
+ """,
+
+ # 6. Sync log — tracks each run
+ """
+ CREATE TABLE IF NOT EXISTS aps_sync_log (
+ id BIGINT NOT NULL AUTO_INCREMENT,
+ sync_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
+ json_file VARCHAR(512) NOT NULL COMMENT 'source json path',
+ billing_month VARCHAR(7) NOT NULL,
+ customer_count INT DEFAULT 0,
+ order_count INT DEFAULT 0,
+ bill_count INT DEFAULT 0,
+ snapshot_count INT DEFAULT 0,
+ status VARCHAR(16) NOT NULL DEFAULT 'success' COMMENT 'success/failed',
+ error_message TEXT DEFAULT NULL,
+ duration_seconds DECIMAL(8,2) DEFAULT NULL,
+ PRIMARY KEY (id),
+ KEY idx_sync_month (billing_month)
+ ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='APS同步执行日志'
+ """,
+]
+
+
+# ---------------------------------------------------------------------------
+# Helpers
+# ---------------------------------------------------------------------------
+def safe_decimal(v: object) -> float | None:
+ """Convert to float or None. Handles '- -', NaN, empty."""
+ if v is None:
+ return None
+ if isinstance(v, str):
+ v = v.strip()
+ if v in ("", "- -", "--", "null"):
+ return None
+ try:
+ return float(v)
+ except ValueError:
+ return None
+ if isinstance(v, (int, float)):
+ if math.isnan(v):
+ return None
+ return float(v)
+ return None
+
+
+def safe_bool(v: object) -> int | None:
+ """Convert '是'/'否'/True/False to 1/0/None."""
+ if v is None:
+ return None
+ if isinstance(v, bool):
+ return 1 if v else 0
+ if isinstance(v, str):
+ if v == "是" or v.lower() == "true":
+ return 1
+ if v == "否" or v.lower() == "false":
+ return 0
+ return None
+
+
+def safe_str(v: object, max_len: int | None = None) -> str | None:
+ if v is None:
+ return None
+ s = str(v).strip()
+ if s in ("", "- -", "--", "无", "null", "None"):
+ return None
+ if max_len and len(s) > max_len:
+ s = s[:max_len]
+ return s
+
+
+def parse_datetime(v: object) -> datetime | str | None:
+ if v is None:
+ return None
+ s = safe_str(v)
+ if s is None:
+ return s
+ for fmt in ("%Y-%m-%d %H:%M:%S", "%Y-%m-%d", "%Y/%m/%d %H:%M:%S"):
+ try:
+ return datetime.strptime(s, fmt)
+ except ValueError:
+ continue
+ return s # fallback: store as string
+
+
+def parse_date(v: object) -> date | None:
+ if v is None:
+ return None
+ s = safe_str(v)
+ if s is None:
+ return s
+ for fmt in ("%Y-%m-%d", "%Y/%m/%d"):
+ try:
+ return datetime.strptime(s, fmt).date()
+ except ValueError:
+ continue
+ return None
+
+
+def data_hash(obj: JsonDict) -> str:
+ """Deterministic hash of dict for change detection."""
+ raw = json.dumps(obj, sort_keys=True, default=str)
+ return hashlib.md5(raw.encode()).hexdigest()
+
+
+def build_login_to_account_map(customers: JsonList) -> dict[str, str]:
+ """login_name -> account_id, with fuzzy key (spaces stripped) as fallback."""
+ m: dict[str, str] = {}
+ for c in customers:
+ ln = cast(str, c["login_name"])
+ account_id = cast(str, c["account_id"])
+ m[ln] = account_id
+ # 同时存一个去空格的 key 做模糊匹配
+ stripped = ln.replace(" ", "")
+ if stripped != ln:
+ m[stripped] = account_id
+ return m
+
+
+def resolve_account_id(login_to_account: dict[str, str], customer_account: str | None) -> str | None:
+ """尝试精确匹配,失败则去空格再匹配。"""
+ if not customer_account:
+ return None
+ account_id = login_to_account.get(customer_account)
+ if account_id:
+ return account_id
+ # 去空格后重试
+ stripped = customer_account.replace(" ", "")
+ return login_to_account.get(stripped)
+
+
+def is_valid_order_id(order_id: str | None) -> bool:
+ """APS 订单号应为纯数字;过滤“没有数据”等占位值。"""
+ return bool(order_id and order_id.isdigit())
+
+
+# ---------------------------------------------------------------------------
+# Sync logic
+# ---------------------------------------------------------------------------
+class APSSyncer:
+ def __init__(self, db_config: DbConfig | None = None):
+ self.db_config: DbConfig = db_config or DB_CONFIG
+ self.conn: Connection[DictCursor] | None = None
+ self.stats: StatsDict = {"customers": 0, "customer_details": 0, "snapshots": 0, "orders": 0, "order_details": 0, "bills": 0}
+
+ def _require_conn(self) -> Connection[DictCursor]:
+ if self.conn is None:
+ raise RuntimeError("Database connection is not initialized")
+ return self.conn
+
+ def connect(self) -> None:
+ self.conn = pymysql.connect(
+ host=self.db_config["host"],
+ port=self.db_config["port"],
+ user=self.db_config["user"],
+ password=self.db_config["password"],
+ database=self.db_config["database"],
+ charset=self.db_config["charset"],
+ cursorclass=DictCursor,
+ )
+ logger.info("Connected to MySQL %s:%s/%s", self.db_config["host"],
+ self.db_config["port"], self.db_config["database"])
+
+ def close(self) -> None:
+ if self.conn:
+ self.conn.close()
+
+ def ensure_schema(self) -> None:
+ """Create tables if not exist."""
+ conn = self._require_conn()
+ with conn.cursor() as cur:
+ for ddl in DDL_STATEMENTS:
+ _ = cur.execute(ddl)
+ # 兼容历史库:允许订单在 customer 未映射时也可入库。
+ # MySQL 外键允许 NULL,因此这里仅放宽列约束,不移除 FK。
+ _ = cur.execute("""
+ ALTER TABLE aps_order
+ MODIFY COLUMN customer_account_id VARCHAR(32) NULL COMMENT '客户account_id(FK, 可为空表示未映射)',
+ MODIFY COLUMN customer_login_name VARCHAR(128) NOT NULL DEFAULT '' COMMENT '客户账号(冗余,方便查询)'
+ """)
+ _ = cur.execute("""
+ ALTER TABLE aps_bill
+ MODIFY COLUMN customer_account_id VARCHAR(32) NULL COMMENT '客户account_id(FK, 可为空表示未映射)',
+ MODIFY COLUMN customer_login_name VARCHAR(128) NOT NULL DEFAULT '' COMMENT '客户账号'
+ """)
+ conn.commit()
+ logger.info("Schema ensured (%d tables)", len(DDL_STATEMENTS))
+
+ @staticmethod
+ def normalize_month(v: object) -> str | None:
+ s = safe_str(v)
+ if s is None or s == "没有数据":
+ return None
+ if len(s) == 6 and s.isdigit():
+ return f"{s[:4]}-{s[4:]}"
+ if len(s) == 7 and s[4] == "-":
+ return s
+ return None
+
+ @staticmethod
+ def load_json_records(file_path: Path) -> JsonList:
+ with open(file_path, "r", encoding="utf-8") as f:
+ data = cast(object, json.load(f))
+ if isinstance(data, dict):
+ data_dict = cast(JsonDict, data)
+ records = data_dict.get("records")
+ if not isinstance(records, list):
+ return []
+ record_list = cast(list[object], records)
+ normalized_records: JsonList = []
+ for record in record_list:
+ if isinstance(record, dict):
+ normalized_records.append(cast(JsonDict, record))
+ return normalized_records
+ if isinstance(data, list):
+ data_list = cast(list[object], data)
+ return [cast(JsonDict, record) for record in data_list if isinstance(record, dict)]
+ return []
+
+ def resolve_data_files(self, data_dir: str) -> tuple[Path, Path, Path, Path, Path]:
+ root = Path(data_dir)
+ if not root.exists() or not root.is_dir():
+ raise FileNotFoundError(f"Data directory not found: {root}")
+
+ customers_file = root / "customers.json"
+ orders_file = root / "orders.json"
+ order_details_file = root / "orderDetails.json"
+ bills_file = root / "bills.json"
+ customer_details_file = root / "customerDetails.json"
+ for fp in (customers_file, orders_file, order_details_file, bills_file):
+ if not fp.exists():
+ raise FileNotFoundError(f"Required JSON file not found: {fp}")
+ return customers_file, orders_file, order_details_file, bills_file, customer_details_file
+
+ @staticmethod
+ def normalize_customer_record(raw: JsonDict) -> JsonDict | None:
+ account_id = safe_str(raw.get("accountId"), 32)
+ login_name = safe_str(raw.get("loginName"), 128)
+ if not account_id or not login_name:
+ return None
+
+ inactive = safe_str(raw.get("inactiveMonths"))
+ no_consumption_months = int(inactive) if inactive and inactive.isdigit() else None
+
+ return {
+ "account_id": account_id,
+ "login_name": login_name,
+ "real_name": safe_str(raw.get("realName"), 256),
+ "report_source": safe_str(raw.get("reportSource"), 64),
+ "report_type": safe_str(raw.get("reportType"), 64),
+ "trade_mode": safe_str(raw.get("tradeMode"), 64),
+ "real_name_status": safe_str(raw.get("authStatus"), 64),
+ "relation_date": parse_datetime(raw.get("relationTime")),
+ "follow_staff": safe_str(raw.get("owner"), 64),
+ "payment_notice_status": safe_str(raw.get("paymentNoticeStatus"), 32),
+ "invite_register_type": safe_str(raw.get("inviteType"), 64),
+ "is_new_customer": safe_bool(raw.get("isNewCustomer")),
+ "performance_start_point_reached": safe_bool(raw.get("isPerformanceQualified")),
+ "customer_category": safe_str(raw.get("customerCategory"), 64),
+ "remark": safe_str(raw.get("remark"), 512),
+ "no_consumption_months": no_consumption_months,
+ "planned_release_time": parse_date(raw.get("releasePlanTime")),
+ "planned_release_reason": safe_str(raw.get("releasePlanReason"), 256),
+ "account_cash_balance_cny": safe_decimal(raw.get("cashBalanceCny")),
+ "pending_invoice_amount_cny": safe_decimal(raw.get("invoicePendingCny")),
+ "last_month_consumption_cny": safe_decimal(raw.get("lastMonthConsumptionCny")),
+ "current_month_consumption_cny": safe_decimal(raw.get("thisMonthConsumptionCny")),
+ }
+
+ # ---- Customer master UPSERT ----
+ def upsert_customer(self, c: JsonDict) -> None:
+ sql = """
+ INSERT INTO aps_customer (
+ account_id, login_name, real_name,
+ report_source, report_type, trade_mode, real_name_status,
+ relation_date, follow_staff, payment_notice_status,
+ invite_register_type, is_new_customer, performance_start_point_reached,
+ customer_category, remark, no_consumption_months,
+ planned_release_time, planned_release_reason,
+ customer_name, customer_type, customer_source,
+ email, phone, department
+ ) VALUES (
+ %s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s
+ ) ON DUPLICATE KEY UPDATE
+ login_name=VALUES(login_name), real_name=VALUES(real_name),
+ report_source=VALUES(report_source), report_type=VALUES(report_type),
+ trade_mode=VALUES(trade_mode), real_name_status=VALUES(real_name_status),
+ relation_date=VALUES(relation_date), follow_staff=VALUES(follow_staff),
+ payment_notice_status=VALUES(payment_notice_status),
+ invite_register_type=VALUES(invite_register_type),
+ is_new_customer=VALUES(is_new_customer),
+ performance_start_point_reached=VALUES(performance_start_point_reached),
+ customer_category=VALUES(customer_category), remark=VALUES(remark),
+ no_consumption_months=VALUES(no_consumption_months),
+ planned_release_time=VALUES(planned_release_time),
+ planned_release_reason=VALUES(planned_release_reason),
+ customer_name=VALUES(customer_name), customer_type=VALUES(customer_type),
+ customer_source=VALUES(customer_source),
+ email=VALUES(email), phone=VALUES(phone), department=VALUES(department)
+ """
+ params = (
+ c["account_id"], c["login_name"], safe_str(c.get("real_name")),
+ safe_str(c.get("report_source")), safe_str(c.get("report_type")),
+ safe_str(c.get("trade_mode")),
+ safe_str(c.get("real_name_status")),
+ c.get("relation_date"),
+ safe_str(c.get("follow_staff")),
+ safe_str(c.get("payment_notice_status")),
+ safe_str(c.get("invite_register_type")),
+ c.get("is_new_customer"),
+ c.get("performance_start_point_reached"),
+ safe_str(c.get("customer_category")),
+ safe_str(c.get("remark")),
+ c.get("no_consumption_months"),
+ c.get("planned_release_time"),
+ safe_str(c.get("planned_release_reason")),
+ None, None, None, None, None, None,
+ )
+ conn = self._require_conn()
+ with conn.cursor() as cur:
+ _ = cur.execute(sql, params)
+ self.stats["customers"] += 1
+
+ # ---- Customer snapshot (append, skip if hash identical) ----
+ def insert_snapshot(self, c: JsonDict, snapshot_month: str, captured_at: str) -> None:
+ financial: JsonDict = {
+ "balance": safe_decimal(c.get("account_cash_balance_cny")),
+ "pending": safe_decimal(c.get("pending_invoice_amount_cny")),
+ "last_consumption": safe_decimal(c.get("last_month_consumption_cny")),
+ "curr_consumption": safe_decimal(c.get("current_month_consumption_cny")),
+ "last_payable": None,
+ "last_pre": None,
+ "last_post": None,
+ "curr_payable": None,
+ "curr_pre": None,
+ "curr_post": None,
+ }
+ h = data_hash(financial)
+ cap_date = parse_date(captured_at)
+
+ # Check if same hash already exists for this account+month+date
+ conn = self._require_conn()
+ with conn.cursor() as cur:
+ _ = cur.execute(
+ "SELECT id FROM aps_customer_snapshot WHERE account_id=%s AND snapshot_month=%s AND captured_at=%s",
+ (c["account_id"], snapshot_month, cap_date),
+ )
+ if cur.fetchone():
+ # Already have snapshot for this date, update it
+ _ = cur.execute("""
+ UPDATE aps_customer_snapshot SET
+ account_cash_balance_cny=%s, pending_invoice_amount_cny=%s,
+ last_month_consumption_cny=%s, current_month_consumption_cny=%s,
+ last_month_payable_total_cny=%s, last_month_prepay_cny=%s, last_month_postpay_cny=%s,
+ current_month_payable_total_cny=%s, current_month_prepay_cny=%s, current_month_postpay_cny=%s,
+ data_hash=%s
+ WHERE account_id=%s AND snapshot_month=%s AND captured_at=%s
+ """, (
+ financial["balance"], financial["pending"],
+ financial["last_consumption"], financial["curr_consumption"],
+ financial["last_payable"], financial["last_pre"], financial["last_post"],
+ financial["curr_payable"], financial["curr_pre"], financial["curr_post"],
+ h, c["account_id"], snapshot_month, cap_date,
+ ))
+ else:
+ _ = cur.execute("""
+ INSERT INTO aps_customer_snapshot (
+ account_id, snapshot_month,
+ account_cash_balance_cny, pending_invoice_amount_cny,
+ last_month_consumption_cny, current_month_consumption_cny,
+ last_month_payable_total_cny, last_month_prepay_cny, last_month_postpay_cny,
+ current_month_payable_total_cny, current_month_prepay_cny, current_month_postpay_cny,
+ data_hash, captured_at
+ ) VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)
+ """, (
+ c["account_id"], snapshot_month,
+ financial["balance"], financial["pending"],
+ financial["last_consumption"], financial["curr_consumption"],
+ financial["last_payable"], financial["last_pre"], financial["last_post"],
+ financial["curr_payable"], financial["curr_pre"], financial["curr_post"],
+ h, cap_date,
+ ))
+ self.stats["snapshots"] += 1
+
+ # ---- Orders UPSERT ----
+ def upsert_orders(self, records: JsonList, login_to_account: dict[str, str]) -> None:
+ sql = """
+ INSERT INTO aps_order (
+ order_id, customer_account_id, customer_login_name,
+ customer_category, order_type, original_price_cny, paid_amount_cny,
+ status, order_time, order_month
+ ) VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)
+ ON DUPLICATE KEY UPDATE
+ customer_category=VALUES(customer_category),
+ order_type=VALUES(order_type),
+ original_price_cny=VALUES(original_price_cny),
+ paid_amount_cny=VALUES(paid_amount_cny),
+ status=VALUES(status),
+ order_time=VALUES(order_time),
+ order_month=VALUES(order_month)
+ """
+ conn = self._require_conn()
+ with conn.cursor() as cur:
+ for o in records:
+ order_id = safe_str(o.get("orderId"), 32)
+ customer_login = safe_str(o.get("customerAccount"), 128)
+ if not order_id:
+ logger.warning("Skipping order without orderId: %s", o)
+ continue
+ if not is_valid_order_id(order_id):
+ logger.warning("Skipping placeholder/invalid orderId '%s'", order_id)
+ continue
+
+ account_id = resolve_account_id(login_to_account, customer_login)
+ if not customer_login:
+ logger.warning("Order %s has empty customerAccount; inserting with NULL customer_account_id", order_id)
+ elif not account_id:
+ logger.warning("Order %s has unresolved customerAccount '%s'; inserting with NULL customer_account_id", order_id, customer_login)
+
+ created_at_raw = o.get("createdAt")
+ order_time = parse_datetime(created_at_raw)
+ order_month = None
+ if order_time and isinstance(order_time, datetime):
+ order_month = order_time.strftime("%Y-%m")
+ else:
+ created_at_prefix = created_at_raw[:7] if isinstance(created_at_raw, str) and len(created_at_raw) >= 7 else None
+ order_month = self.normalize_month(created_at_prefix)
+
+ _ = cur.execute(sql, (
+ order_id, account_id, customer_login or "",
+ safe_str(o.get("customerCategory")), safe_str(o.get("orderType")),
+ safe_decimal(o.get("orderOriginalPriceCny")),
+ safe_decimal(o.get("actualPaidCny")),
+ safe_str(o.get("orderStatus")), order_time, order_month,
+ ))
+ self.stats["orders"] += 1
+
+ # ---- Order details UPSERT ----
+ def upsert_order_details(self, records: JsonList) -> None:
+ sql = """
+ INSERT INTO aps_order_detail (
+ order_id, order_type, status, trade_type, customer_category,
+ dealer_name, dealer_uid, customer_type, opportunity_id,
+ payment_time, order_time, product_name, product_code,
+ original_price_cny, paid_amount_cny, discount,
+ payable_amount_cny, coupon_amount_cny
+ ) VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)
+ ON DUPLICATE KEY UPDATE
+ order_type=VALUES(order_type),
+ status=VALUES(status),
+ trade_type=VALUES(trade_type),
+ customer_category=VALUES(customer_category),
+ dealer_name=VALUES(dealer_name),
+ dealer_uid=VALUES(dealer_uid),
+ customer_type=VALUES(customer_type),
+ opportunity_id=VALUES(opportunity_id),
+ payment_time=VALUES(payment_time),
+ order_time=VALUES(order_time),
+ product_name=VALUES(product_name),
+ product_code=VALUES(product_code),
+ original_price_cny=VALUES(original_price_cny),
+ paid_amount_cny=VALUES(paid_amount_cny),
+ discount=VALUES(discount),
+ payable_amount_cny=VALUES(payable_amount_cny),
+ coupon_amount_cny=VALUES(coupon_amount_cny)
+ """
+ conn = self._require_conn()
+ with conn.cursor() as cur:
+ for d in records:
+ order_id = safe_str(d.get("orderId"), 32)
+ if not order_id:
+ logger.warning("Skipping order detail without orderId: %s", d)
+ continue
+
+ _ = cur.execute(sql, (
+ order_id,
+ safe_str(d.get("orderType"), 32),
+ safe_str(d.get("status"), 32),
+ safe_str(d.get("tradeType"), 32),
+ safe_str(d.get("customerCategory"), 64),
+ safe_str(d.get("dealerName"), 256),
+ safe_str(d.get("dealerUid"), 64),
+ safe_str(d.get("customerType"), 64),
+ safe_str(d.get("opportunityId"), 64),
+ parse_datetime(d.get("paymentTime")),
+ parse_datetime(d.get("orderTime")),
+ safe_str(d.get("productName"), 256),
+ safe_str(d.get("productCode"), 64),
+ safe_decimal(d.get("originalPriceCny")),
+ safe_decimal(d.get("paidAmountCny")),
+ safe_decimal(d.get("discount")),
+ safe_decimal(d.get("payableAmountCny")),
+ safe_decimal(d.get("couponAmountCny")),
+ ))
+ self.stats["order_details"] += 1
+
+ def update_customer_details(self, records: JsonList, snapshot_month: str) -> None:
+ sql_customer = """
+ UPDATE aps_customer SET
+ customer_name=%s, customer_type=%s, customer_source=%s,
+ email=%s, phone=%s, department=%s
+ WHERE account_id=%s
+ """
+ sql_snapshot = """
+ UPDATE aps_customer_snapshot SET
+ last_month_payable_total_cny=%s, last_month_prepay_cny=%s, last_month_postpay_cny=%s,
+ current_month_payable_total_cny=%s, current_month_prepay_cny=%s, current_month_postpay_cny=%s
+ WHERE account_id=%s AND snapshot_month=%s
+ """
+
+ conn = self._require_conn()
+ with conn.cursor() as cur:
+ for r in records:
+ account_id = safe_str(r.get("accountId"), 32)
+ if not account_id:
+ logger.warning("Skipping customer detail without accountId: %s", r)
+ continue
+
+ _ = cur.execute(sql_customer, (
+ safe_str(r.get("customerName"), 256),
+ safe_str(r.get("customerType"), 64),
+ safe_str(r.get("customerSource"), 64),
+ safe_str(r.get("email"), 256),
+ safe_str(r.get("phone"), 32),
+ safe_str(r.get("department"), 128),
+ account_id,
+ ))
+
+ _ = cur.execute(sql_snapshot, (
+ safe_decimal(r.get("lastMonthPayableTotalCny")),
+ safe_decimal(r.get("lastMonthPrepayCny")),
+ safe_decimal(r.get("lastMonthPostpayCny")),
+ safe_decimal(r.get("currentMonthPayableTotalCny")),
+ safe_decimal(r.get("currentMonthPrepayCny")),
+ safe_decimal(r.get("currentMonthPostpayCny")),
+ account_id,
+ snapshot_month,
+ ))
+ self.stats["customer_details"] += 1
+
+ # ---- Bills sync ----
+ def sync_bills(self, records: JsonList, login_to_account: dict[str, str], incremental: bool = False) -> None:
+ grouped: dict[tuple[str | None, str, str], JsonList] = {}
+ skipped = 0
+ for b in records:
+ billing_month = self.normalize_month(b.get("billingMonth"))
+ commission_month = self.normalize_month(b.get("commissionMonth"))
+ customer_login = safe_str(b.get("customerAccount"), 128)
+ if not billing_month or not commission_month:
+ skipped += 1
+ continue
+
+ account_id = resolve_account_id(login_to_account, customer_login)
+ if not customer_login:
+ logger.warning("Bill row has empty customerAccount; inserting with NULL customer_account_id, commission_month=%s", commission_month)
+ elif not account_id:
+ logger.warning("Bill row has unresolved customerAccount '%s'; inserting with NULL customer_account_id, commission_month=%s", customer_login, commission_month)
+
+ key = (account_id, customer_login or "", commission_month)
+ grouped.setdefault(key, []).append(b)
+
+ if skipped:
+ logger.info("Filtered out %d invalid/unresolved bill rows", skipped)
+
+ if not grouped:
+ return
+
+ conn = self._require_conn()
+ with conn.cursor() as cur:
+ sql = """
+ INSERT INTO aps_bill (
+ billing_month, customer_account_id, customer_login_name,
+ bill_type, consumption_time,
+ customer_category,
+ product_name, product_category,
+ original_price_cny, customer_payable_amount_cny,
+ commission_month,
+ included_in_performance,
+ rebated,
+ invite_register_type,
+ service_start_time, service_end_time
+ ) VALUES (
+ %s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s
+ )
+ """
+ for (account_id, login_name, commission_month), rows in grouped.items():
+ if incremental:
+ # 增量模式:逐条检查,已存在则跳过
+ for b in rows:
+ bm = self.normalize_month(b.get("billingMonth"))
+ product_name = safe_str(b.get("productName"))
+ consumption_time = safe_str(b.get("consumeDate"))
+ original_price = safe_decimal(b.get("originalPriceCny"))
+
+ # 用 billing_month + customer + product + consumption_time + original_price 联合判断是否重复
+ if account_id is None:
+ _ = cur.execute(
+ """SELECT COUNT(*) as cnt FROM aps_bill
+ WHERE customer_account_id IS NULL AND customer_login_name=%s AND commission_month=%s
+ AND billing_month=%s AND product_name=%s
+ AND consumption_time=%s AND original_price_cny=%s""",
+ (login_name, commission_month, bm,
+ product_name, consumption_time, original_price),
+ )
+ else:
+ _ = cur.execute(
+ """SELECT COUNT(*) as cnt FROM aps_bill
+ WHERE customer_account_id=%s AND commission_month=%s
+ AND billing_month=%s AND product_name=%s
+ AND consumption_time=%s AND original_price_cny=%s""",
+ (account_id, commission_month, bm,
+ product_name, consumption_time, original_price),
+ )
+ row = cur.fetchone()
+ if row and (row.get("cnt", 0) or row.get("COUNT(*)", 0)) > 0:
+ continue # 已存在,跳过
+
+ _ = cur.execute(sql, (
+ bm, account_id, login_name,
+ safe_str(b.get("billType")), safe_str(b.get("consumeDate")),
+ safe_str(b.get("customerCategory")),
+ safe_str(b.get("productName")), safe_str(b.get("productCategory")),
+ safe_decimal(b.get("originalPriceCny")),
+ safe_decimal(b.get("customerPayableCny")),
+ self.normalize_month(b.get("commissionMonth")),
+ safe_str(b.get("countsForPerformance")),
+ safe_str(b.get("commissionable")),
+ safe_str(b.get("inviteType")),
+ safe_str(b.get("serviceStartAt")), safe_str(b.get("serviceEndAt")),
+ ))
+ self.stats["bills"] += 1
+ else:
+ # 全量模式:DELETE+INSERT per customer+month
+ if account_id is None:
+ _ = cur.execute(
+ "DELETE FROM aps_bill WHERE customer_account_id IS NULL AND customer_login_name=%s AND commission_month=%s",
+ (login_name, commission_month),
+ )
+ else:
+ _ = cur.execute(
+ "DELETE FROM aps_bill WHERE customer_account_id=%s AND commission_month=%s",
+ (account_id, commission_month),
+ )
+ for b in rows:
+ _ = cur.execute(sql, (
+ self.normalize_month(b.get("billingMonth")), account_id, login_name,
+ safe_str(b.get("billType")), safe_str(b.get("consumeDate")),
+ safe_str(b.get("customerCategory")),
+ safe_str(b.get("productName")), safe_str(b.get("productCategory")),
+ safe_decimal(b.get("originalPriceCny")),
+ safe_decimal(b.get("customerPayableCny")),
+ self.normalize_month(b.get("commissionMonth")),
+ safe_str(b.get("countsForPerformance")),
+ safe_str(b.get("commissionable")),
+ safe_str(b.get("inviteType")),
+ safe_str(b.get("serviceStartAt")), safe_str(b.get("serviceEndAt")),
+ ))
+ self.stats["bills"] += 1
+
+ # ---- Main sync entry ----
+ def sync_from_json(self, data_dir: str, incremental: bool = False) -> StatsDict:
+ start = datetime.now()
+ customers_file, orders_file, order_details_file, bills_file, customer_details_file = self.resolve_data_files(data_dir)
+ logger.info("Loading source files from %s%s", data_dir, " (增量模式)" if incremental else "")
+
+ raw_customers = self.load_json_records(customers_file)
+ raw_orders = self.load_json_records(orders_file)
+ raw_order_details = self.load_json_records(order_details_file)
+ raw_bills = self.load_json_records(bills_file)
+ raw_customer_details: JsonList = []
+ try:
+ if customer_details_file.exists():
+ raw_customer_details = self.load_json_records(customer_details_file)
+ else:
+ logger.info("Optional file missing, skip customer details: %s", customer_details_file)
+ except Exception as e:
+ logger.warning("Failed to load optional customer details file %s: %s", customer_details_file, e)
+
+ captured_at = date.today().isoformat()
+ billing_month = None
+ for b in raw_bills:
+ billing_month = self.normalize_month(b.get("billingMonth")) or self.normalize_month(b.get("commissionMonth"))
+ if billing_month:
+ break
+ if not billing_month:
+ billing_month = datetime.now().strftime("%Y-%m")
+
+ self.stats = {"customers": 0, "customer_details": 0, "snapshots": 0, "orders": 0, "order_details": 0, "bills": 0}
+
+ try:
+ self.connect()
+ self.ensure_schema()
+
+ customers: JsonList = []
+ skipped_customers = 0
+ for raw in raw_customers:
+ c = self.normalize_customer_record(raw)
+ if not c:
+ skipped_customers += 1
+ continue
+ customers.append(c)
+ self.upsert_customer(c)
+ self.insert_snapshot(c, billing_month, captured_at)
+
+ if skipped_customers:
+ logger.info("Skipped %d invalid customer rows", skipped_customers)
+
+ login_to_account = build_login_to_account_map(customers)
+ logger.info("Resolved %d customer login_name -> account_id mappings", len(login_to_account))
+
+ if raw_customer_details:
+ self.update_customer_details(raw_customer_details, billing_month)
+
+ self.upsert_orders(raw_orders, login_to_account)
+ self.upsert_order_details(raw_order_details)
+ self.sync_bills(raw_bills, login_to_account, incremental=incremental)
+
+ # Log sync
+ duration = (datetime.now() - start).total_seconds()
+ conn = self._require_conn()
+ with conn.cursor() as cur:
+ _ = cur.execute("""
+ INSERT INTO aps_sync_log (json_file, billing_month, customer_count, order_count, bill_count, snapshot_count, status, duration_seconds)
+ VALUES (%s,%s,%s,%s,%s,%s,'success',%s)
+ """, (
+ str(Path(data_dir)), billing_month,
+ self.stats["customers"], self.stats["orders"],
+ self.stats["bills"], self.stats["snapshots"], duration,
+ ))
+
+ conn.commit()
+ logger.info(
+ "Sync complete in %.1fs: %d customers, %d customer_details, %d snapshots, %d orders, %d order_details, %d bills",
+ duration, self.stats["customers"], self.stats["customer_details"],
+ self.stats["snapshots"], self.stats["orders"], self.stats["order_details"], self.stats["bills"],
+ )
+ return self.stats
+
+ except Exception as e:
+ logger.error("Sync failed: %s", e)
+ if self.conn:
+ duration = (datetime.now() - start).total_seconds()
+ try:
+ conn = self._require_conn()
+ conn.rollback()
+ with conn.cursor() as cur:
+ _ = cur.execute("""
+ INSERT INTO aps_sync_log (json_file, billing_month, status, error_message, duration_seconds)
+ VALUES (%s,%s,'failed',%s,%s)
+ """, (str(Path(data_dir)), billing_month or "unknown", str(e), duration))
+ conn.commit()
+ except Exception:
+ pass
+ raise
+ finally:
+ self.close()
+
+ def find_latest_json(self) -> Path | None:
+ """Resolve default data dir only when all required JSON files exist."""
+ try:
+ _ = self.resolve_data_files(str(JSON_DIR))
+ return JSON_DIR
+ except FileNotFoundError:
+ return None
+
+ def get_latest_bill_consumption_time(self) -> str | None:
+ conn = self._require_conn()
+ with conn.cursor() as cur:
+ _ = cur.execute("SELECT MAX(consumption_time) AS latest_time FROM aps_bill")
+ row = cur.fetchone()
+ if not row:
+ return None
+ latest_time = row.get("latest_time")
+ if latest_time is None:
+ return None
+ if isinstance(latest_time, datetime):
+ return latest_time.strftime("%Y-%m-%d")
+ latest_time_value = cast(object, latest_time)
+ return str(latest_time_value)
+
+
+# ---------------------------------------------------------------------------
+# CLI
+# ---------------------------------------------------------------------------
+def main() -> None:
+ import argparse
+ parser = argparse.ArgumentParser(description="Sync APS Aliyun JSON data to MySQL")
+ _ = parser.add_argument(
+ "--dir",
+ type=str,
+ default=str(JSON_DIR),
+ help="Directory containing customers.json, orders.json, orderDetails.json, bills.json",
+ )
+ _ = parser.add_argument(
+ "--incremental",
+ action="store_true",
+ default=False,
+ help="增量模式: 订单UPSERT, 账单检查存在后跳过(不删除历史数据)",
+ )
+ _ = parser.add_argument(
+ "--latest-bill-consumption-time",
+ action="store_true",
+ default=False,
+ help="仅查询 aps_bill 中最新的 consumption_time 并输出",
+ )
+ args = parser.parse_args()
+ data_dir = cast(str, args.dir)
+ incremental = cast(bool, args.incremental)
+ latest_bill_consumption_time = cast(bool, args.latest_bill_consumption_time)
+
+ syncer = APSSyncer(db_config=DB_CONFIG)
+ if latest_bill_consumption_time:
+ syncer.connect()
+ try:
+ latest_time = syncer.get_latest_bill_consumption_time()
+ if latest_time:
+ print(latest_time)
+ return
+ finally:
+ syncer.close()
+ _ = syncer.sync_from_json(data_dir, incremental=incremental)
+
+
+if __name__ == "__main__":
+ main()
diff --git a/aliyun-sync/aps-aliyun-sync/aps_scheduler.py b/aliyun-sync/aps-aliyun-sync/aps_scheduler.py
new file mode 100644
index 0000000..8aafbaf
--- /dev/null
+++ b/aliyun-sync/aps-aliyun-sync/aps_scheduler.py
@@ -0,0 +1,130 @@
+"""
+Long-running scheduler — watches for new JSON files and syncs to MySQL.
+
+Modes:
+ --watch : File watcher, syncs immediately when new JSON appears (default)
+ --cron : One-shot sync, meant to be called by system cron/launchd
+ --daemon : Combined: runs initial sync + watches for changes
+"""
+
+import time
+import sys
+import signal
+import argparse
+import logging
+from pathlib import Path
+from datetime import datetime
+
+from aps_db_sync import APSSyncer, DB_CONFIG, JSON_DIR
+
+LOG_FORMAT = "%(asctime)s [%(levelname)s] %(message)s"
+logging.basicConfig(level=logging.INFO, format=LOG_FORMAT)
+logger = logging.getLogger("aps_scheduler")
+
+WATCH_INTERVAL_SECONDS = 30
+PROCESSED_MARKER_DIR = JSON_DIR / ".aps_sync_processed"
+
+
+def _update_watch_interval(value: int):
+ global WATCH_INTERVAL_SECONDS
+ WATCH_INTERVAL_SECONDS = value
+
+
+class SyncScheduler:
+ def __init__(self, db_config: dict = None):
+ self.db_config = db_config or DB_CONFIG
+ self.running = True
+ PROCESSED_MARKER_DIR.mkdir(exist_ok=True)
+ signal.signal(signal.SIGINT, self._shutdown)
+ signal.signal(signal.SIGTERM, self._shutdown)
+
+ def _shutdown(self, signum, frame):
+ logger.info("Shutdown signal received, stopping...")
+ self.running = False
+
+ def _marker_path(self, json_path: Path) -> Path:
+ return PROCESSED_MARKER_DIR / f"{json_path.stem}.synced"
+
+ def _is_processed(self, json_path: Path) -> bool:
+ marker = self._marker_path(json_path)
+ if not marker.exists():
+ return False
+ marker_mtime = marker.stat().st_mtime
+ json_mtime = json_path.stat().st_mtime
+ return marker_mtime >= json_mtime
+
+ def _mark_processed(self, json_path: Path):
+ marker = self._marker_path(json_path)
+ marker.write_text(datetime.now().isoformat())
+
+ def find_unprocessed_files(self) -> list[Path]:
+ pattern = "aps_aliyun_customers_with_bills_*.json"
+ all_files = sorted(JSON_DIR.glob(pattern), key=lambda p: p.stat().st_mtime)
+ return [f for f in all_files if not self._is_processed(f)]
+
+ def sync_file(self, json_path: Path) -> bool:
+ logger.info("Syncing: %s", json_path.name)
+ try:
+ syncer = APSSyncer(db_config=self.db_config)
+ syncer.sync_from_json(str(json_path))
+ self._mark_processed(json_path)
+ return True
+ except Exception as e:
+ logger.error("Sync failed for %s: %s", json_path.name, e)
+ return False
+
+ def run_once(self):
+ unprocessed = self.find_unprocessed_files()
+ if not unprocessed:
+ logger.info("No unprocessed JSON files found")
+ return 0
+ count = 0
+ for f in unprocessed:
+ if self.sync_file(f):
+ count += 1
+ logger.info("Processed %d/%d files", count, len(unprocessed))
+ return count
+
+ def run_watch(self):
+ logger.info("Watching %s for new JSON files (interval=%ds)", JSON_DIR, WATCH_INTERVAL_SECONDS)
+ self.run_once()
+ while self.running:
+ time.sleep(WATCH_INTERVAL_SECONDS)
+ unprocessed = self.find_unprocessed_files()
+ for f in unprocessed:
+ if not self.running:
+ break
+ self.sync_file(f)
+ logger.info("Watcher stopped")
+
+
+def main():
+ parser = argparse.ArgumentParser(description="APS Sync Scheduler")
+ parser.add_argument("--mode", choices=["watch", "cron", "daemon"], default="watch",
+ help="watch=file watcher, cron=one-shot, daemon=watch with initial sync")
+ parser.add_argument("--host", default=DB_CONFIG["host"])
+ parser.add_argument("--port", type=int, default=DB_CONFIG["port"])
+ parser.add_argument("--user", default=DB_CONFIG["user"])
+ parser.add_argument("--password", default=DB_CONFIG["password"])
+ parser.add_argument("--database", default=DB_CONFIG["database"])
+ parser.add_argument("--interval", type=int, default=WATCH_INTERVAL_SECONDS,
+ help="Watch interval in seconds")
+ args = parser.parse_args()
+
+ _update_watch_interval(args.interval)
+
+ config = {
+ "host": args.host, "port": args.port, "user": args.user,
+ "password": args.password, "database": args.database, "charset": "utf8mb4",
+ }
+ scheduler = SyncScheduler(db_config=config)
+
+ if args.mode == "cron":
+ count = scheduler.run_once()
+ sys.exit(0 if count >= 0 else 1)
+ else:
+ scheduler.run_watch()
+
+
+if __name__ == "__main__":
+ main()
diff --git a/aliyun-sync/aps-aliyun-sync/check_db.py b/aliyun-sync/aps-aliyun-sync/check_db.py
new file mode 100644
index 0000000..ad9a07b
--- /dev/null
+++ b/aliyun-sync/aps-aliyun-sync/check_db.py
@@ -0,0 +1,27 @@
+import pymysql
+
+conn = pymysql.connect(
+ host='8.156.34.195',
+ port=23149,
+ user='ray',
+ password='GV0C$ErephgQO7RQc7b6',
+ database='goonseek-dev',
+ charset='utf8mb4',
+)
+cur = conn.cursor()
+
+# 检查客户详情字段
+cur.execute('SELECT account_id, customer_name, phone, department FROM aps_customer LIMIT 5')
+rows = cur.fetchall()
+print(f'aps_customer ({len(rows)} rows):')
+for r in rows:
+ print(f' {r}')
+
+# 检查快照金额字段
+cur.execute('SELECT account_id, snapshot_month, last_month_payable_total_cny, current_month_payable_total_cny FROM aps_customer_snapshot LIMIT 5')
+rows2 = cur.fetchall()
+print(f'\naps_customer_snapshot ({len(rows2)} rows):')
+for r in rows2:
+ print(f' {r}')
+
+conn.close()