From aa67b0e37e054aedce8840e8930c19a5bf2b0e00 Mon Sep 17 00:00:00 2001 From: ray <1416431931@qq.com> Date: Mon, 13 Apr 2026 18:09:52 +0800 Subject: [PATCH] =?UTF-8?q?=E9=A1=B9=E7=9B=AE=E5=88=9D=E5=A7=8B=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .idea/.gitignore | 5 + .idea/misc.xml | 6 + .idea/modules.xml | 8 + .idea/python.iml | 9 + .idea/vcs.xml | 6 + aliyun-sync/aliyun-aps-sync/.env.example | 6 + aliyun-sync/aliyun-aps-sync/.gitignore | 5 + aliyun-sync/aliyun-aps-sync/README.md | 57 + aliyun-sync/aliyun-aps-sync/package-lock.json | 92 ++ aliyun-sync/aliyun-aps-sync/package.json | 18 + aliyun-sync/aliyun-aps-sync/src/config.js | 188 +++ .../aliyun-aps-sync/src/debug-inputs.js | 91 ++ .../aliyun-aps-sync/src/debug-pagination.js | 135 +++ .../aliyun-aps-sync/src/debug-panel.js | 91 ++ aliyun-sync/aliyun-aps-sync/src/index.js | 27 + aliyun-sync/aliyun-aps-sync/src/notify.js | 56 + aliyun-sync/aliyun-aps-sync/src/open-page.js | 25 + aliyun-sync/aliyun-aps-sync/src/storage.js | 80 ++ aliyun-sync/aliyun-aps-sync/src/sync.js | 824 +++++++++++++ aliyun-sync/aps-aliyun-sync/aps_db_sync.py | 1073 +++++++++++++++++ aliyun-sync/aps-aliyun-sync/aps_scheduler.py | 130 ++ aliyun-sync/aps-aliyun-sync/check_db.py | 27 + 22 files changed, 2959 insertions(+) create mode 100644 .idea/.gitignore create mode 100644 .idea/misc.xml create mode 100644 .idea/modules.xml create mode 100644 .idea/python.iml create mode 100644 .idea/vcs.xml create mode 100644 aliyun-sync/aliyun-aps-sync/.env.example create mode 100644 aliyun-sync/aliyun-aps-sync/.gitignore create mode 100644 aliyun-sync/aliyun-aps-sync/README.md create mode 100644 aliyun-sync/aliyun-aps-sync/package-lock.json create mode 100644 aliyun-sync/aliyun-aps-sync/package.json create mode 100644 aliyun-sync/aliyun-aps-sync/src/config.js create mode 100644 aliyun-sync/aliyun-aps-sync/src/debug-inputs.js create mode 100644 aliyun-sync/aliyun-aps-sync/src/debug-pagination.js create mode 100644 aliyun-sync/aliyun-aps-sync/src/debug-panel.js create mode 100644 aliyun-sync/aliyun-aps-sync/src/index.js create mode 100644 aliyun-sync/aliyun-aps-sync/src/notify.js create mode 100644 aliyun-sync/aliyun-aps-sync/src/open-page.js create mode 100644 aliyun-sync/aliyun-aps-sync/src/storage.js create mode 100644 aliyun-sync/aliyun-aps-sync/src/sync.js create mode 100644 aliyun-sync/aps-aliyun-sync/aps_db_sync.py create mode 100644 aliyun-sync/aps-aliyun-sync/aps_scheduler.py create mode 100644 aliyun-sync/aps-aliyun-sync/check_db.py diff --git a/.idea/.gitignore b/.idea/.gitignore new file mode 100644 index 0000000..10b731c --- /dev/null +++ b/.idea/.gitignore @@ -0,0 +1,5 @@ +# 默认忽略的文件 +/shelf/ +/workspace.xml +# 基于编辑器的 HTTP 客户端请求 +/httpRequests/ diff --git a/.idea/misc.xml b/.idea/misc.xml new file mode 100644 index 0000000..639900d --- /dev/null +++ b/.idea/misc.xml @@ -0,0 +1,6 @@ + + + + + + \ No newline at end of file diff --git a/.idea/modules.xml b/.idea/modules.xml new file mode 100644 index 0000000..614b3c1 --- /dev/null +++ b/.idea/modules.xml @@ -0,0 +1,8 @@ + + + + + + + + \ No newline at end of file diff --git a/.idea/python.iml b/.idea/python.iml new file mode 100644 index 0000000..d6ebd48 --- /dev/null +++ b/.idea/python.iml @@ -0,0 +1,9 @@ + + + + + + + + + \ No newline at end of file diff --git a/.idea/vcs.xml b/.idea/vcs.xml new file mode 100644 index 0000000..35eb1dd --- /dev/null +++ b/.idea/vcs.xml @@ -0,0 +1,6 @@ + + + + + + \ No newline at end of file diff --git a/aliyun-sync/aliyun-aps-sync/.env.example b/aliyun-sync/aliyun-aps-sync/.env.example new file mode 100644 index 0000000..a693edf --- /dev/null +++ b/aliyun-sync/aliyun-aps-sync/.env.example @@ -0,0 +1,6 @@ +ALIYUN_APS_BASE_URL=https://aps.aliyun.com +ALIYUN_APS_HEADLESS=false +ALIYUN_APS_TIMEZONE=Asia/Shanghai +ALIYUN_APS_CRON=0 6 * * * +ALIYUN_APS_ORDER_START_DATE=2024-01-01 +ALIYUN_APS_BILL_START_MONTH=2024-01 diff --git a/aliyun-sync/aliyun-aps-sync/.gitignore b/aliyun-sync/aliyun-aps-sync/.gitignore new file mode 100644 index 0000000..d1b9ba5 --- /dev/null +++ b/aliyun-sync/aliyun-aps-sync/.gitignore @@ -0,0 +1,5 @@ +node_modules/ +.env +.browser/ +data/ +downloads/ diff --git a/aliyun-sync/aliyun-aps-sync/README.md b/aliyun-sync/aliyun-aps-sync/README.md new file mode 100644 index 0000000..aa96930 --- /dev/null +++ b/aliyun-sync/aliyun-aps-sync/README.md @@ -0,0 +1,57 @@ +# aliyun-aps-sync + +用于抓取阿里云伙伴中心里的 `我的客户`、`订单查询`、`账单查询`,并把结果和本地上一次同步结果做增量对比。 + +## 功能 + +- 首次 `login` 用浏览器手动登录,后续复用本地登录态。 +- `sync` 会同步 3 个模块的数据。 +- 同步后会生成: + - `data/current/*.json` 当前全量 + - `data/history//*.json` 每次快照 + - `data/delta//*.json` 增量变化 + - `data/runs/*.json` 每次任务汇总 +- `schedule` 支持常驻进程方式按 cron 表达式每天自动同步。 + +## 安装 + +```bash +cd /Users/qiangredhad/aliyun-aps-sync +npm install +cp .env.example .env +``` + +## 配置 + +`.env` 里最重要的两个时间范围: + +- `ALIYUN_APS_ORDER_START_DATE`: 订单查询的起始日期,会按月滚动抓取直到今天。 +- `ALIYUN_APS_BILL_START_MONTH`: 账单查询的起始佣金月份,会按月滚动抓取直到当前月。 + +## 使用 + +1. 首次登录并保存会话 + +```bash +npm run login +``` + +2. 手动执行一次同步 + +```bash +npm run sync +``` + +3. 常驻定时同步 + +```bash +npm run schedule +``` + +默认 cron 是每天早上 6 点,可在 `.env` 里改 `ALIYUN_APS_CRON`。 + +## 注意 + +- 脚本现在基于页面表格 DOM 抓取,如果阿里云伙伴中心页面结构改版,需要调整 `src/sync.js` 里的表格和筛选器选择逻辑。 +- 订单和账单的日期输入框是通过页面已有日期值自动识别的,所以首次跑之前建议先在页面确认默认筛选存在。 +- 如果登录态过期,重新执行 `npm run login` 即可。 diff --git a/aliyun-sync/aliyun-aps-sync/package-lock.json b/aliyun-sync/aliyun-aps-sync/package-lock.json new file mode 100644 index 0000000..f02444a --- /dev/null +++ b/aliyun-sync/aliyun-aps-sync/package-lock.json @@ -0,0 +1,92 @@ +{ + "name": "aliyun-aps-sync", + "version": "1.0.0", + "lockfileVersion": 3, + "requires": true, + "packages": { + "": { + "name": "aliyun-aps-sync", + "version": "1.0.0", + "dependencies": { + "dotenv": "^16.6.1", + "node-cron": "^4.2.1", + "nodemailer": "^6.10.1", + "playwright": "^1.58.2" + } + }, + "node_modules/dotenv": { + "version": "16.6.1", + "resolved": "https://registry.npmmirror.com/dotenv/-/dotenv-16.6.1.tgz", + "integrity": "sha512-uBq4egWHTcTt33a72vpSG0z3HnPuIl6NqYcTrKEg2azoEyl2hpW0zqlxysq2pK9HlDIHyHyakeYaYnSAwd8bow==", + "license": "BSD-2-Clause", + "engines": { + "node": ">=12" + }, + "funding": { + "url": "https://dotenvx.com" + } + }, + "node_modules/fsevents": { + "version": "2.3.2", + "resolved": "https://registry.npmmirror.com/fsevents/-/fsevents-2.3.2.tgz", + "integrity": "sha512-xiqMQR4xAeHTuB9uWm+fFRcIOgKBMiOBP+eXiyT7jsgVCq1bkVygt00oASowB7EdtpOHaaPgKt812P9ab+DDKA==", + "hasInstallScript": true, + "license": "MIT", + "optional": true, + "os": [ + "darwin" + ], + "engines": { + "node": "^8.16.0 || ^10.6.0 || >=11.0.0" + } + }, + "node_modules/node-cron": { + "version": "4.2.1", + "resolved": "https://registry.npmmirror.com/node-cron/-/node-cron-4.2.1.tgz", + "integrity": "sha512-lgimEHPE/QDgFlywTd8yTR61ptugX3Qer29efeyWw2rv259HtGBNn1vZVmp8lB9uo9wC0t/AT4iGqXxia+CJFg==", + "license": "ISC", + "engines": { + "node": ">=6.0.0" + } + }, + "node_modules/nodemailer": { + "version": "6.10.1", + "resolved": "https://registry.npmjs.org/nodemailer/-/nodemailer-6.10.1.tgz", + "integrity": "sha512-Z+iLaBGVaSjbIzQ4pX6XV41HrooLsQ10ZWPUehGmuantvzWoDVBnmsdUcOIDM1t+yPor5pDhVlDESgOMEGxhHA==", + "license": "MIT-0", + "engines": { + "node": ">=6.0.0" + } + }, + "node_modules/playwright": { + "version": "1.58.2", + "resolved": "https://registry.npmmirror.com/playwright/-/playwright-1.58.2.tgz", + "integrity": "sha512-vA30H8Nvkq/cPBnNw4Q8TWz1EJyqgpuinBcHET0YVJVFldr8JDNiU9LaWAE1KqSkRYazuaBhTpB5ZzShOezQ6A==", + "license": "Apache-2.0", + "dependencies": { + "playwright-core": "1.58.2" + }, + "bin": { + "playwright": "cli.js" + }, + "engines": { + "node": ">=18" + }, + "optionalDependencies": { + "fsevents": "2.3.2" + } + }, + "node_modules/playwright-core": { + "version": "1.58.2", + "resolved": "https://registry.npmmirror.com/playwright-core/-/playwright-core-1.58.2.tgz", + "integrity": "sha512-yZkEtftgwS8CsfYo7nm0KE8jsvm6i/PTgVtB8DL726wNf6H2IMsDuxCpJj59KDaxCtSnrWan2AeDqM7JBaultg==", + "license": "Apache-2.0", + "bin": { + "playwright-core": "cli.js" + }, + "engines": { + "node": ">=18" + } + } + } +} diff --git a/aliyun-sync/aliyun-aps-sync/package.json b/aliyun-sync/aliyun-aps-sync/package.json new file mode 100644 index 0000000..3b3f5a8 --- /dev/null +++ b/aliyun-sync/aliyun-aps-sync/package.json @@ -0,0 +1,18 @@ +{ + "name": "aliyun-aps-sync", + "version": "1.0.0", + "private": true, + "type": "module", + "scripts": { + "login": "node src/index.js login", + "sync": "node src/index.js sync", + "bills": "node src/index.js bills", + "schedule": "node src/index.js schedule" + }, + "dependencies": { + "dotenv": "^16.6.1", + "nodemailer": "^6.10.1", + "node-cron": "^4.2.1", + "playwright": "^1.58.2" + } +} diff --git a/aliyun-sync/aliyun-aps-sync/src/config.js b/aliyun-sync/aliyun-aps-sync/src/config.js new file mode 100644 index 0000000..2d642e7 --- /dev/null +++ b/aliyun-sync/aliyun-aps-sync/src/config.js @@ -0,0 +1,188 @@ +import fs from 'node:fs'; +import path from 'node:path'; +import process from 'node:process'; +import dotenv from 'dotenv'; + +const rootDir = path.resolve(process.cwd()); +dotenv.config({ path: path.join(rootDir, '.env') }); + +const toBool = (value, fallback) => { + if (value == null) return fallback; + return ['1', 'true', 'yes', 'y', 'on'].includes(String(value).trim().toLowerCase()); +}; + +const ensureDir = (dirPath) => { + fs.mkdirSync(dirPath, { recursive: true }); + return dirPath; +}; + +export const config = { + rootDir, + baseUrl: process.env.ALIYUN_APS_BASE_URL || 'https://aps.aliyun.com', + headless: toBool(process.env.ALIYUN_APS_HEADLESS, false), + timezone: process.env.ALIYUN_APS_TIMEZONE || 'Asia/Shanghai', + cron: process.env.ALIYUN_APS_CRON || '0 6 * * *', + orderStartDate: process.env.ALIYUN_APS_ORDER_START_DATE || '2024-01-01', + billStartMonth: process.env.ALIYUN_APS_BILL_START_MONTH || '2024-01', + smtp: { + host: process.env.ALIYUN_APS_SMTP_HOST || 'smtp.qq.com', + port: parseInt(process.env.ALIYUN_APS_SMTP_PORT || '465', 10), + secure: toBool(process.env.ALIYUN_APS_SMTP_SECURE, true), + user: process.env.ALIYUN_APS_SMTP_USER || '', + pass: process.env.ALIYUN_APS_SMTP_PASS || '', + }, + notifyEmail: process.env.ALIYUN_APS_NOTIFY_EMAIL || '', + closeBrowser: toBool(process.env.ALIYUN_APS_CLOSE_BROWSER, true), + fullSync: toBool(process.env.ALIYUN_APS_FULL_SYNC, true), + resumeBillMonth: process.env.ALIYUN_APS_RESUME_BILL_MONTH || '', + resumeBillPage: Math.max(1, Number.parseInt(process.env.ALIYUN_APS_RESUME_BILL_PAGE || '1', 10) || 1), + dbSyncScript: process.env.ALIYUN_APS_DB_SYNC_SCRIPT || '../aps-aliyun-sync/aps_db_sync.py', + userDataDir: ensureDir(path.join(rootDir, '.browser')), + dataDir: ensureDir(path.join(rootDir, 'data')), + downloadDir: ensureDir(path.join(rootDir, 'downloads')), +}; + +export const datasets = { + customers: { + name: 'customers', + url: `${config.baseUrl}/#/detail/my_customer/~/customer/list`, + heading: '我的客户', + pageSize: 100, + uniqueKey: (record) => record.accountId || record.loginName || record.__hash, + normalize: (record) => { + const loginAndUid = record['登录名称/账号ID'] || ''; + const [loginName = '', accountId = ''] = splitLines(loginAndUid); + return { + loginName: loginName.replace(/\s+/g, ''), + accountId, + realName: record['UID实名认证名称'] || '', + reportSource: record['报备来源'] || '', + reportType: record['报备类型'] || '', + tradeMode: record['交易模式'] || '', + authStatus: record['实名认证状态'] || '', + relationTime: record['关联日期'] || '', + owner: record['跟进员工'] || '', + cashBalanceCny: record['账户现金余额(CNY)'] || '', + invoicePendingCny: record['待开票金额(CNY)'] || '', + lastMonthConsumptionCny: record['上月消费金额(CNY)'] || '', + thisMonthConsumptionCny: record['本月消费金额(CNY)'] || '', + paymentNoticeStatus: record['代为支付告知状态'] || '', + inviteType: record['邀约注册类型'] || '', + isNewCustomer: record['是否新客户'] || '', + isPerformanceQualified: record['是否达成业绩起算点'] || '', + customerCategory: record['客户分类'] || '', + remark: record['备注'] || '', + inactiveMonths: record['客户无消费月数'] || '', + releasePlanTime: record['计划释放时间'] || '', + releasePlanReason: record['计划释放原因'] || '', + }; + }, + }, + orders: { + name: 'orders', + url: `${config.baseUrl}/#/detail/order/~/costCenter/order`, + heading: '订单查询', + pageSize: 100, + uniqueKey: (record) => record.orderId || record.__hash, + normalize: (record, context) => ({ + orderId: record['订单号'] || '', + customerAccount: (record['客户账号'] || '').replace(/\s+/g, ''), + customerCategory: record['客户分类'] || '', + orderType: record['订单类型'] || '', + orderOriginalPriceCny: record['订单原价 (CNY)'] || '', + actualPaidCny: record['实付金额 (CNY)'] || '', + orderStatus: record['订单状态'] || '', + createdAt: record['下单时间'] || '', + windowStart: context.windowStart || '', + windowEnd: context.windowEnd || '', + }), + }, + orderDetails: { + name: 'orderDetails', + url: `${config.baseUrl}/#/detail/order/~/costCenter/order`, + heading: '订单详情', + pageSize: 100, + uniqueKey: (record) => record.orderId || record.__hash, + normalize: (record, context) => ({ + orderId: record.orderId || '', + orderType: record.orderType || '', + status: record.status || '', + tradeType: record.tradeType || '', + customerCategory: record.customerCategory || '', + dealerName: record.dealerName || '', + dealerUid: record.dealerUid || '', + customerType: record.customerType || '', + opportunityId: record.opportunityId || '', + paymentTime: record.paymentTime || '', + orderTime: record.orderTime || '', + productName: record.productName || '', + productCode: record.productCode || '', + originalPriceCny: record.originalPriceCny || '', + paidAmountCny: record.paidAmountCny || '', + discount: record.discount || '', + payableAmountCny: record.payableAmountCny || '', + couponAmountCny: record.couponAmountCny || '', + windowStart: context.windowStart || '', + windowEnd: context.windowEnd || '', + }), + }, + customerDetails: { + name: 'customerDetails', + url: `${config.baseUrl}/#/detail/my_customer/~/customer/list`, + heading: '详情', + pageSize: 100, + uniqueKey: (record) => record.accountId || record.__hash, + normalize: (record, context) => ({ + accountId: context.accountId || '', + customerAccount: record.customerAccount || '', + customerName: record.customerName || '', + customerType: record.customerType || '', + tradeMode: record.tradeMode || '', + customerSource: record.customerSource || '', + realNameStatus: record.realNameStatus || '', + email: record.email || '', + relationDate: record.relationDate || '', + phone: record.phone || '', + remark: record.remark || '', + paymentNoticeStatus: record.paymentNoticeStatus || '', + department: record.department || '', + lastMonthPayableTotalCny: record.lastMonthPayableTotalCny || '', + lastMonthPrepayCny: record.lastMonthPrepayCny || '', + lastMonthPostpayCny: record.lastMonthPostpayCny || '', + currentMonthPayableTotalCny: record.currentMonthPayableTotalCny || '', + currentMonthPrepayCny: record.currentMonthPrepayCny || '', + currentMonthPostpayCny: record.currentMonthPostpayCny || '', + }), + }, + bills: { + name: 'bills', + url: `${config.baseUrl}/#/detail/bill/~/costCenter/bill`, + heading: '账单查询', + pageSize: 100, + uniqueKey: (record) => record.__hash, + normalize: (record, context) => ({ + billingMonth: record['账期'] || '', + consumeDate: record['消费时间'] || '', + customerAccount: (record['客户账号'] || '').replace(/\s+/g, ''), + customerCategory: record['客户分类'] || '', + productCategory: record['产品分类'] || '', + productName: record['产品名称'] || '', + originalPriceCny: record['原价 (CNY)'] || '', + customerPayableCny: record['客户应付金额 (CNY)'] || '', + billType: record['账单类型'] || '', + countsForPerformance: record['是否计入业绩'] || '', + commissionable: record['是否返佣'] || '', + commissionMonth: record['佣金月份'] || context.month || '', + inviteType: record['邀约注册类型'] || '', + serviceStartAt: record['服务开始时间'] || '', + serviceEndAt: record['服务结束时间'] || '', + }), + }, +}; + +function splitLines(value) { + return String(value) + .split('\n') + .map((part) => part.trim()) + .filter(Boolean); +} diff --git a/aliyun-sync/aliyun-aps-sync/src/debug-inputs.js b/aliyun-sync/aliyun-aps-sync/src/debug-inputs.js new file mode 100644 index 0000000..dd33259 --- /dev/null +++ b/aliyun-sync/aliyun-aps-sync/src/debug-inputs.js @@ -0,0 +1,91 @@ +import { chromium } from 'playwright'; +import { config, datasets } from './config.js'; + +const sleep = (ms) => new Promise((r) => setTimeout(r, ms)); + +const target = process.argv[2] || 'orders'; +const dataset = datasets[target]; + +console.log(`正在打开: ${dataset.heading}`); + +const context = await chromium.launchPersistentContext(config.userDataDir, { + channel: 'chrome', + headless: false, + acceptDownloads: true, + downloadsPath: config.downloadDir, +}); + +const page = context.pages()[0] || (await context.newPage()); +await page.goto(dataset.url, { waitUntil: 'domcontentloaded' }); + +// 等页面加载完 +await page.waitForFunction( + (text) => document.body && document.body.innerText.includes(text), + dataset.heading, + { timeout: 60000 }, +); +await sleep(3000); + +// 探测所有 input 元素的详细信息 +const inputInfo = await page.evaluate(() => { + const inputs = document.querySelectorAll('input'); + return Array.from(inputs).map((el, i) => { + // 找到最近的日期选择器父组件 + const pickerParent = el.closest( + '[class*="date-picker"], [class*="month-picker"], [class*="range-picker"], [class*="calendar"]' + ); + return { + index: i, + value: el.value, + placeholder: el.placeholder, + type: el.type, + readOnly: el.readOnly, + className: el.className.substring(0, 120), + parentClass: el.parentElement?.className?.substring(0, 120) || '', + pickerClass: pickerParent?.className?.substring(0, 150) || '(无)', + ariaLabel: el.getAttribute('aria-label') || '', + role: el.getAttribute('role') || '', + }; + }); +}); + +console.log('\n=== 页面所有 input 元素 ==='); +for (const info of inputInfo) { + console.log(`\n[input ${info.index}]`); + console.log(` value: "${info.value}"`); + console.log(` placeholder: "${info.placeholder}"`); + console.log(` type: ${info.type}`); + console.log(` readOnly: ${info.readOnly}`); + console.log(` className: ${info.className}`); + console.log(` parentClass: ${info.parentClass}`); + console.log(` pickerClass: ${info.pickerClass}`); + console.log(` ariaLabel: "${info.ariaLabel}"`); +} + +// 探测 RangePicker 结构 +const rangeInfo = await page.evaluate(() => { + const rangePickers = document.querySelectorAll('[class*="range-picker"], [class*="date-picker2"]'); + return Array.from(rangePickers).map((el, i) => ({ + index: i, + className: el.className.substring(0, 200), + inputCount: el.querySelectorAll('input').length, + inputs: Array.from(el.querySelectorAll('input')).map(inp => ({ + value: inp.value, + placeholder: inp.placeholder, + readOnly: inp.readOnly, + })), + })); +}); + +console.log('\n=== RangePicker / DatePicker2 组件 ==='); +for (const rp of rangeInfo) { + console.log(`\n[picker ${rp.index}]`); + console.log(` className: ${rp.className}`); + console.log(` inputs (${rp.inputCount}):`); + for (const inp of rp.inputs) { + console.log(` value="${inp.value}" placeholder="${inp.placeholder}" readOnly=${inp.readOnly}`); + } +} + +await context.close(); +process.exit(0); diff --git a/aliyun-sync/aliyun-aps-sync/src/debug-pagination.js b/aliyun-sync/aliyun-aps-sync/src/debug-pagination.js new file mode 100644 index 0000000..f9ca9a4 --- /dev/null +++ b/aliyun-sync/aliyun-aps-sync/src/debug-pagination.js @@ -0,0 +1,135 @@ +import { chromium } from 'playwright'; +import { config, datasets } from './config.js'; + +const sleep = (ms) => new Promise((r) => setTimeout(r, ms)); + +console.log('打开订单页面,设置2024-11月,探测分页结构...'); + +const context = await chromium.launchPersistentContext(config.userDataDir, { + channel: 'chrome', + headless: false, + acceptDownloads: true, + downloadsPath: config.downloadDir, +}); + +const page = context.pages()[0] || (await context.newPage()); +await page.goto(datasets.orders.url, { waitUntil: 'domcontentloaded' }); + +await page.waitForFunction( + (text) => document.body && document.body.innerText.includes(text), + datasets.orders.heading, + { timeout: 60000 }, +); +await sleep(3000); + +// 设置日期到2024-11月(数据多的月份) +const trigger = page.locator('input[placeholder="结束日期"]'); +await trigger.click(); +await sleep(1000); +const panelEndInput = page.locator('.next-range-picker-panel-input-end-date input'); +await panelEndInput.click(); +await sleep(100); +await page.keyboard.press('Control+A'); +await page.keyboard.type('2024-11-30', { delay: 30 }); +await sleep(300); +const panelStartInput = page.locator('.next-range-picker-panel-input-start-date input'); +await panelStartInput.click(); +await sleep(100); +await page.keyboard.press('Control+A'); +await page.keyboard.type('2024-11-01', { delay: 30 }); +await sleep(300); +await page.keyboard.press('Enter'); +await sleep(500); +await page.mouse.click(0, 0); +await sleep(300); +await page.keyboard.press('Escape'); +await sleep(1000); + +// 点查询 +const queryBtn = page.locator('button:has-text("查询")').first(); +await queryBtn.click(); +await sleep(3000); + +// 探测分页组件 +const paginationInfo = await page.evaluate(() => { + const result = { + // 找所有 pagination 相关的元素 + paginationContainers: [], + allButtons: [], + nextItems: [], + }; + + // 所有 pagination 容器 + const containers = document.querySelectorAll('[class*="pagination"]'); + containers.forEach((el, i) => { + result.paginationContainers.push({ + index: i, + tag: el.tagName, + className: el.className.substring(0, 200), + childCount: el.children.length, + innerHTML: el.innerHTML.substring(0, 500), + }); + }); + + // 所有带 next-btn 类名的按钮(在 pagination 内) + const btns = document.querySelectorAll('[class*="pagination"] button, [class*="pagination"] [role="button"], [class*="pagination"] .next-btn'); + btns.forEach((el, i) => { + result.allButtons.push({ + index: i, + tag: el.tagName, + text: el.innerText?.trim()?.substring(0, 50) || '', + className: el.className.substring(0, 200), + disabled: el.hasAttribute('disabled') || el.getAttribute('aria-disabled') === 'true', + ariaLabel: el.getAttribute('aria-label') || '', + }); + }); + + // 找 next-next 类 + const nextNextEls = document.querySelectorAll('.next-next, [class*="next-next"]'); + nextNextEls.forEach((el, i) => { + result.nextItems.push({ + index: i, + tag: el.tagName, + text: el.innerText?.trim()?.substring(0, 50) || '', + className: el.className.substring(0, 200), + disabled: el.hasAttribute('disabled'), + parentClass: el.parentElement?.className?.substring(0, 100) || '', + }); + }); + + return result; +}); + +console.log('\n=== Pagination 容器 ==='); +for (const c of paginationInfo.paginationContainers) { + console.log(`\n[容器 ${c.index}] <${c.tag}> children=${c.childCount}`); + console.log(` class: ${c.className}`); + console.log(` html: ${c.innerHTML.substring(0, 300)}`); +} + +console.log('\n=== Pagination 内的按钮 ==='); +for (const b of paginationInfo.allButtons) { + console.log(`[btn ${b.index}] <${b.tag}> text="${b.text}" disabled=${b.disabled} aria="${b.ariaLabel}" class="${b.className}"`); +} + +console.log('\n=== next-next 元素 ==='); +for (const n of paginationInfo.nextItems) { + console.log(`[next ${n.index}] <${n.tag}> text="${n.text}" disabled=${n.disabled} class="${n.className}" parent="${n.parentClass}"`); +} + +// 再看看当前页码 +const currentPage = await page.evaluate(() => { + const active = document.querySelector('.next-pagination-item.current, .next-pagination-list .current'); + return active ? { text: active.innerText, class: active.className } : null; +}); +console.log('\n当前页码:', currentPage); + +// 看看总共多少条记录的提示 +const totalInfo = await page.evaluate(() => { + const el = document.querySelector('[class*="pagination"]'); + return el ? el.innerText : '(无)'; +}); +console.log('分页文本:', totalInfo); + +await context.close(); +process.exit(0); diff --git a/aliyun-sync/aliyun-aps-sync/src/debug-panel.js b/aliyun-sync/aliyun-aps-sync/src/debug-panel.js new file mode 100644 index 0000000..f048698 --- /dev/null +++ b/aliyun-sync/aliyun-aps-sync/src/debug-panel.js @@ -0,0 +1,91 @@ +import { chromium } from 'playwright'; +import { config, datasets } from './config.js'; + +const sleep = (ms) => new Promise((r) => setTimeout(r, ms)); + +console.log('打开订单查询页面并探测 RangePicker 面板...'); + +const context = await chromium.launchPersistentContext(config.userDataDir, { + channel: 'chrome', + headless: false, + acceptDownloads: true, + downloadsPath: config.downloadDir, +}); + +const page = context.pages()[0] || (await context.newPage()); +await page.goto(datasets.orders.url, { waitUntil: 'domcontentloaded' }); + +await page.waitForFunction( + (text) => document.body && document.body.innerText.includes(text), + datasets.orders.heading, + { timeout: 60000 }, +); +await sleep(3000); + +// 点击起始日期 trigger 打开面板 +console.log('--- 点击起始日期触发器 ---'); +const startTrigger = page.locator('input[placeholder="起始日期"]'); +await startTrigger.click(); +await sleep(2000); + +// 探测弹出面板里的 input +const panelInfo = await page.evaluate(() => { + // 查找面板(弹出层里的日期输入) + const allInputs = document.querySelectorAll('input'); + const results = []; + for (let i = 0; i < allInputs.length; i++) { + const el = allInputs[i]; + // 找 panel 里的 input(不是 trigger 里的) + const inPanel = el.closest('.next-range-picker-panel, .next-date-picker-panel, .next-range-picker-body, [class*="overlay"], [class*="popup"]'); + results.push({ + index: i, + value: el.value, + placeholder: el.placeholder, + readOnly: el.readOnly, + parentClass: el.parentElement?.className?.substring(0, 150) || '', + inPanel: !!inPanel, + panelClass: inPanel?.className?.substring(0, 150) || '(不在面板内)', + }); + } + return results; +}); + +console.log('\n=== 面板打开后所有 input ==='); +for (const info of panelInfo) { + const marker = info.inPanel ? '📌 面板内' : ' 普通'; + console.log(`${marker} [input ${info.index}] value="${info.value}" placeholder="${info.placeholder}" readOnly=${info.readOnly}`); + if (info.inPanel) { + console.log(` parentClass: ${info.parentClass}`); + console.log(` panelClass: ${info.panelClass}`); + } +} + +// 再看看有没有 panel 级别的面板结构 +const panelStructure = await page.evaluate(() => { + const overlays = document.querySelectorAll('.next-overlay-wrapper, .next-overlay-inner, [class*="range-picker-panel"], [class*="range-picker-body"]'); + return Array.from(overlays).map((el, i) => ({ + index: i, + tag: el.tagName, + className: el.className.substring(0, 200), + inputCount: el.querySelectorAll('input').length, + inputs: Array.from(el.querySelectorAll('input')).map(inp => ({ + value: inp.value, + placeholder: inp.placeholder, + readOnly: inp.readOnly, + parentClass: inp.parentElement?.className?.substring(0, 120) || '', + })), + })); +}); + +console.log('\n=== 弹出面板结构 ==='); +for (const p of panelStructure) { + console.log(`\n[overlay ${p.index}] <${p.tag}>`); + console.log(` className: ${p.className}`); + console.log(` inputs (${p.inputCount}):`); + for (const inp of p.inputs) { + console.log(` value="${inp.value}" placeholder="${inp.placeholder}" readOnly=${inp.readOnly} parent="${inp.parentClass}"`); + } +} + +await context.close(); +process.exit(0); diff --git a/aliyun-sync/aliyun-aps-sync/src/index.js b/aliyun-sync/aliyun-aps-sync/src/index.js new file mode 100644 index 0000000..f98193d --- /dev/null +++ b/aliyun-sync/aliyun-aps-sync/src/index.js @@ -0,0 +1,27 @@ +import { login, scheduleSync, syncAll, syncBillsOnly } from './sync.js'; + +const command = process.argv[2] || 'sync'; + +if (command === 'login') { + await login(); + process.exit(0); +} + +if (command === 'sync') { + const summary = await syncAll(); + console.log(JSON.stringify(summary, null, 2)); + process.exit(0); +} + +if (command === 'bills') { + const summary = await syncBillsOnly(); + console.log(JSON.stringify(summary, null, 2)); + process.exit(0); +} + +if (command === 'schedule') { + await scheduleSync(); +} else { + console.error(`不支持的命令: ${command}`); + process.exit(1); +} diff --git a/aliyun-sync/aliyun-aps-sync/src/notify.js b/aliyun-sync/aliyun-aps-sync/src/notify.js new file mode 100644 index 0000000..79a1a37 --- /dev/null +++ b/aliyun-sync/aliyun-aps-sync/src/notify.js @@ -0,0 +1,56 @@ +import nodemailer from 'nodemailer'; +import { config } from './config.js'; + +let lastSentAt = 0; +const ONE_HOUR_MS = 60 * 60 * 1000; + +export async function sendLoginAlert() { + const now = Date.now(); + if (now - lastSentAt < ONE_HOUR_MS) { + console.log('[通知] 登录提醒1小时内已发送过,跳过重复发送'); + return; + } + + const { smtp, notifyEmail, baseUrl } = config; + + if (!smtp.user || !smtp.pass) { + console.warn('[通知] SMTP 用户名或密码为空,跳过发送登录提醒邮件'); + return; + } + + if (!notifyEmail) { + console.warn('[通知] 未配置 ALIYUN_APS_NOTIFY_EMAIL,跳过发送登录提醒邮件'); + return; + } + + const transporter = nodemailer.createTransport({ + host: smtp.host, + port: smtp.port, + secure: smtp.secure, + auth: { + user: smtp.user, + pass: smtp.pass, + }, + }); + + const url = `${baseUrl}/#/signin`; + const timestamp = new Date().toISOString(); + const subject = '[APS同步] 登录态已过期,请手动登录'; + const text = [ + '检测到 APS 页面处于登录页,登录态可能已过期。', + `时间: ${timestamp}`, + `地址: ${url}`, + '', + '请尽快手动执行登录流程(npm run login)以恢复同步。', + ].join('\n'); + + await transporter.sendMail({ + from: smtp.user, + to: notifyEmail, + subject, + text, + }); + + lastSentAt = now; + console.log(`[通知] 登录提醒邮件已发送至 ${notifyEmail}`); +} diff --git a/aliyun-sync/aliyun-aps-sync/src/open-page.js b/aliyun-sync/aliyun-aps-sync/src/open-page.js new file mode 100644 index 0000000..5c3e532 --- /dev/null +++ b/aliyun-sync/aliyun-aps-sync/src/open-page.js @@ -0,0 +1,25 @@ +import { chromium } from 'playwright'; +import { config, datasets } from './config.js'; + +const target = process.argv[2] || 'orders'; // orders | bills | customers +const dataset = datasets[target]; +if (!dataset) { + console.error(`未知页面: ${target},可选: orders, bills, customers`); + process.exit(1); +} + +console.log(`正在打开: ${dataset.heading} (${dataset.url})`); + +const context = await chromium.launchPersistentContext(config.userDataDir, { + channel: 'chrome', + headless: false, + acceptDownloads: true, + downloadsPath: config.downloadDir, +}); + +const page = context.pages()[0] || (await context.newPage()); +await page.goto(dataset.url, { waitUntil: 'domcontentloaded' }); +console.log('页面已打开,浏览器保持运行。按 Ctrl+C 关闭。'); + +// 保持进程运行 +await new Promise(() => {}); diff --git a/aliyun-sync/aliyun-aps-sync/src/storage.js b/aliyun-sync/aliyun-aps-sync/src/storage.js new file mode 100644 index 0000000..5cc1855 --- /dev/null +++ b/aliyun-sync/aliyun-aps-sync/src/storage.js @@ -0,0 +1,80 @@ +import crypto from 'node:crypto'; +import fs from 'node:fs'; +import path from 'node:path'; +import { config } from './config.js'; + +const ensureDir = (dirPath) => { + fs.mkdirSync(dirPath, { recursive: true }); + return dirPath; +}; + +const readJson = (filePath, fallback) => { + if (!fs.existsSync(filePath)) return fallback; + return JSON.parse(fs.readFileSync(filePath, 'utf8')); +}; + +const writeJson = (filePath, value) => { + ensureDir(path.dirname(filePath)); + fs.writeFileSync(filePath, JSON.stringify(value, null, 2)); +}; + +export const nowStamp = () => new Date().toISOString().replace(/[:.]/g, '-'); + +export const withHash = (record) => ({ + ...record, + __hash: crypto.createHash('sha256').update(JSON.stringify(record)).digest('hex'), +}); + +export const loadCurrentState = (dataset) => + readJson(path.join(config.dataDir, 'current', `${dataset}.json`), { records: [], index: {} }); + +export function saveDatasetRun(dataset, payload) { + const stamp = nowStamp(); + writeJson(path.join(config.dataDir, 'history', dataset, `${stamp}.json`), payload); + writeJson(path.join(config.dataDir, 'current', `${dataset}.json`), payload); + return stamp; +} + +export function saveDelta(dataset, stamp, delta) { + writeJson(path.join(config.dataDir, 'delta', dataset, `${stamp}.json`), delta); +} + +export function saveRunSummary(stamp, summary) { + writeJson(path.join(config.dataDir, 'runs', `${stamp}.json`), summary); +} + +export function diffRecords(previousState, records, uniqueKey) { + const previousIndex = previousState.index || {}; + const nextIndex = {}; + const added = []; + const updated = []; + + for (const record of records) { + const key = uniqueKey(record); + if (!key) continue; + nextIndex[key] = record; + if (!previousIndex[key]) { + added.push(record); + continue; + } + if (previousIndex[key].__hash !== record.__hash) { + updated.push({ before: previousIndex[key], after: record }); + } + } + + const removed = Object.keys(previousIndex) + .filter((key) => !nextIndex[key]) + .map((key) => previousIndex[key]); + + return { + records, + index: nextIndex, + stats: { + total: records.length, + added: added.length, + updated: updated.length, + removed: removed.length, + }, + delta: { added, updated, removed }, + }; +} diff --git a/aliyun-sync/aliyun-aps-sync/src/sync.js b/aliyun-sync/aliyun-aps-sync/src/sync.js new file mode 100644 index 0000000..5a82c29 --- /dev/null +++ b/aliyun-sync/aliyun-aps-sync/src/sync.js @@ -0,0 +1,824 @@ +import { chromium } from 'playwright'; +import cron from 'node-cron'; +import path from 'node:path'; +import { execSync } from 'node:child_process'; +import { config, datasets } from './config.js'; +import { sendLoginAlert } from './notify.js'; +import { + diffRecords, + loadCurrentState, + nowStamp, + saveDatasetRun, + saveDelta, + saveRunSummary, + withHash, +} from './storage.js'; + +const sleep = (ms) => new Promise((resolve) => setTimeout(resolve, ms)); + +let _context = null; + +async function getContext() { + if (_context) return _context; + _context = await chromium.launchPersistentContext(config.userDataDir, { + channel: 'chrome', + headless: config.headless, + acceptDownloads: true, + downloadsPath: config.downloadDir, + }); + return _context; +} + +export async function login() { + const context = await getContext(); + + const page = context.pages()[0] || (await context.newPage()); + await page.goto(datasets.customers.url, { waitUntil: 'domcontentloaded' }); + console.log('请在打开的浏览器里完成阿里云伙伴中心登录,然后回到终端按 Ctrl+C 结束。'); + await waitUntilReady(page, datasets.customers.heading, 10 * 60 * 1000); + console.log('登录态已写入 .browser 目录,后续可直接执行 npm run sync。'); + + // 必须正常关闭 context,否则登录态不会持久化到磁盘 + await context.close(); + _context = null; +} + +export async function syncAll() { + const context = await getContext(); + + try { + const summary = { startedAt: new Date().toISOString(), datasets: {} }; + const page = context.pages()[0] || (await context.newPage()); + + summary.datasets.customers = await syncCustomers(page); + summary.datasets.customerDetails = await syncCustomerDetails(page); + summary.datasets.orders = await syncOrders(page); + + // syncOrders 完成后,从最新的 orders.json 读取 orderId 列表 + const latestOrders = loadCurrentState('orders'); + const orderIdsForDetail = collectValidOrderIds(latestOrders.records || []); + + summary.datasets.orderDetails = await syncOrderDetails(page, orderIdsForDetail); + summary.datasets.bills = await syncBills(page); + summary.finishedAt = new Date().toISOString(); + + const stamp = nowStamp(); + saveRunSummary(stamp, summary); + return summary; + } finally { + if (config.closeBrowser) { + await context.close(); + _context = null; + } else { + console.log('浏览器保持运行'); + } + } +} + +export async function scheduleSync() { + console.log(`定时任务已启动: ${config.cron} (${config.timezone})`); + cron.schedule( + config.cron, + async () => { + try { + console.log(`[${new Date().toISOString()}] 开始执行同步`); + const summary = await syncAll(); + console.log(`[${new Date().toISOString()}] 同步完成`, JSON.stringify(summary, null, 2)); + try { + const scriptPath = path.resolve(config.rootDir, config.dbSyncScript); + const incrementalFlag = config.fullSync ? '' : ' --incremental'; + console.log(`[入库] 执行 ${scriptPath}${incrementalFlag ? ' (增量模式)' : ''}`); + const output = execSync(`python "${scriptPath}"${incrementalFlag}`, { + cwd: path.dirname(scriptPath), + encoding: 'utf-8', + timeout: 120000, + }); + console.log(output); + } catch (e) { + console.error('[入库] 失败:', e.message); + } + } catch (error) { + console.error(`[${new Date().toISOString()}] 同步失败`, error); + } + }, + { timezone: config.timezone }, + ); +} + +async function syncCustomers(page) { + const dataset = datasets.customers; + await page.goto(dataset.url, { waitUntil: 'domcontentloaded' }); + await waitUntilReady(page, dataset.heading); + await trySetPageSize(page, dataset.pageSize); + const records = await scrapePagedTable(page, dataset, {}); + return persistDataset(dataset, records, {}); +} + +async function syncCustomerDetails(page) { + const dataset = datasets.customerDetails; + const customersState = loadCurrentState('customers'); + const allAccountIds = collectValidAccountIds(customersState.records || []); + + if (allAccountIds.length === 0) { + console.log('[客户详情] 本地无有效客户 accountId,跳过'); + return persistDataset(dataset, [], {}); + } + + console.log(`[客户详情] 共 ${allAccountIds.length} 个客户需要获取详情`); + const allDetails = []; + const detailBaseUrl = + 'https://aps.aliyun.com/?spm=5176.12818093.top-nav.ditem-fx.785716d0LKDpKT#/detail/my_customer/~/customer/'; + + for (let index = 0; index < allAccountIds.length; index += 1) { + const accountId = allAccountIds[index]; + console.log(`[客户详情] ${index + 1}/${allAccountIds.length} accountId=${accountId}`); + + // 先跳 about:blank 再跳详情URL(强制 SPA 完整重新加载) + await page.goto('about:blank'); + await sleep(300); + await page.goto(`${detailBaseUrl}${accountId}`, { waitUntil: 'domcontentloaded' }); + + try { + await page.waitForFunction( + (text) => document.body && document.body.innerText.includes(text), + '详情', + { timeout: 15000 }, + ); + await sleep(1000); + } catch { + console.warn(`[客户详情] ${accountId} 详情页加载超时,跳过`); + continue; + } + + const detail = await extractCustomerDetail(page); + allDetails.push({ ...detail, __context: { accountId } }); + } + + return persistDataset(dataset, dedupeByHash(allDetails), {}); +} + +async function syncOrders(page) { + const dataset = datasets.orders; + let windows; + + if (config.fullSync) { + windows = buildMonthlyDateWindows(config.orderStartDate); + } else { + // 增量模式:只查前一天 + const yesterday = new Date(); + yesterday.setDate(yesterday.getDate() - 1); + const dateStr = formatDate(yesterday); + windows = [{ windowStart: dateStr, windowEnd: dateStr, start: dateStr, end: dateStr }]; + console.log(`[增量模式] 订单仅查询: ${dateStr}`); + } + + const allRecords = []; + + for (const window of windows) { + await page.goto(dataset.url, { waitUntil: 'domcontentloaded' }); + await waitUntilReady(page, dataset.heading); + await setDateRange(page, window.start, window.end); + await clickQuery(page); + await trySetPageSize(page, dataset.pageSize); + const records = await scrapePagedTable(page, dataset, window); + allRecords.push(...records); + } + + return persistDataset(dataset, dedupeByHash(allRecords), {}); +} + +async function syncBills(page) { + const dataset = datasets.bills; + let months; + let latestConsumptionDate = null; + + if (config.fullSync) { + months = buildMonthList(config.billStartMonth); + } else { + latestConsumptionDate = getLatestBillConsumptionDate(); + const incrementalMonth = latestConsumptionDate?.slice(0, 7) + || `${new Date().getFullYear()}-${String(new Date().getMonth() + 1).padStart(2, '0')}`; + months = [incrementalMonth]; + console.log(`[增量模式] 账单仅查询: ${incrementalMonth}${latestConsumptionDate ? `, 数据库最新消费时间: ${latestConsumptionDate}` : ''}`); + } + + const allRecords = []; + + for (const month of months) { + await page.goto(dataset.url, { waitUntil: 'domcontentloaded' }); + await waitUntilReady(page, dataset.heading); + await setMonthValue(page, month); + await clickQuery(page); + await trySetPageSize(page, dataset.pageSize); + let records = await scrapePagedTable(page, dataset, { month }); + if (latestConsumptionDate) { + const before = records.length; + records = records.filter((record) => isAfterLatestConsumptionDate(record, latestConsumptionDate)); + console.log(`[增量模式] 账单按消费时间过滤: ${before} -> ${records.length}`); + } + allRecords.push(...records); + } + + return persistDataset(dataset, dedupeByHash(allRecords), {}); +} + +function getLatestBillConsumptionDate() { + const scriptPath = path.resolve(config.rootDir, config.dbSyncScript); + try { + const output = execSync(`python "${scriptPath}" --latest-bill-consumption-time`, { + cwd: path.dirname(scriptPath), + encoding: 'utf-8', + timeout: 120000, + }).trim(); + const latest = output.split(/\r?\n/).map((line) => line.trim()).filter(Boolean).at(-1) || ''; + return /^\d{4}-\d{2}-\d{2}/.test(latest) ? latest.slice(0, 10) : null; + } catch (error) { + console.error('[增量模式] 查询数据库最新账单消费时间失败:', error.message); + return null; + } +} + +function isAfterLatestConsumptionDate(record, latestConsumptionDate) { + const consumeDate = String(record['消费时间'] || record.consumeDate || '').trim().slice(0, 10); + if (!/^\d{4}-\d{2}-\d{2}$/.test(consumeDate)) { + return false; + } + return consumeDate > latestConsumptionDate; +} + +async function syncOrderDetails(page, cachedOrderIds) { + const dataset = datasets.orderDetails; + + // 使用传入的 orderId 列表(在 syncOrders 覆盖 orders.json 之前缓存的) + const allOrderIds = cachedOrderIds || []; + + if (allOrderIds.length === 0) { + console.log('[订单详情] 本地无订单数据,跳过'); + return persistDataset(dataset, [], {}); + } + + console.log(`[订单详情] 共 ${allOrderIds.length} 个订单需要获取详情`); + const allDetails = []; + const detailBaseUrl = 'https://aps.aliyun.com/?spm=5176.12818093.top-nav.ditem-fx.785716d0LKDpKT#/detail/order/~/costCenter/order/detail/'; + + for (let index = 0; index < allOrderIds.length; index += 1) { + const orderId = allOrderIds[index]; + console.log(`[订单详情] ${index + 1}/${allOrderIds.length} orderId=${orderId}`); + + // 先跳 about:blank 再跳详情URL(强制 SPA 完整重新加载) + await page.goto('about:blank'); + await sleep(300); + await page.goto(`${detailBaseUrl}${orderId}?projectId=`, { waitUntil: 'domcontentloaded' }); + + try { + await page.waitForFunction( + (text) => document.body && document.body.innerText.includes(text), + '订单详情', + { timeout: 15000 }, + ); + await sleep(1000); + } catch { + console.warn(`[订单详情] ${orderId} 详情页加载超时,跳过`); + continue; + } + + const detail = await extractOrderDetail(page); + if (!isValidOrderId(detail.orderId)) { + detail.orderId = orderId; + } + allDetails.push({ ...detail, __context: {} }); + } + + return persistDataset(dataset, dedupeByHash(allDetails), {}); +} + +function persistDataset(dataset, records, context) { + const normalized = records.map((record) => withHash(dataset.normalize(record, record.__context || context))); + const previousState = loadCurrentState(dataset.name); + const nextState = diffRecords(previousState, normalized, dataset.uniqueKey); + const stamp = saveDatasetRun(dataset.name, nextState); + saveDelta(dataset.name, stamp, nextState.delta); + return { + stamp, + stats: nextState.stats, + }; +} + +async function waitUntilReady(page, heading, timeout = 120000) { + await page.waitForLoadState('domcontentloaded'); + console.log(`[waitUntilReady] 当前URL: ${page.url()}`); + console.log(`[waitUntilReady] 等待页面出现: "${heading}"`); + + try { + await page.waitForFunction( + (text) => document.body && document.body.innerText.includes(text), + heading, + { timeout }, + ); + } catch (err) { + // 超时时打印诊断信息 + const currentUrl = page.url(); + const bodyText = await page.evaluate(() => document.body?.innerText?.substring(0, 500) || '(空)').catch(() => '(无法获取)'); + console.error(`[waitUntilReady] 超时!当前URL: ${currentUrl}`); + console.error(`[waitUntilReady] 页面内容前500字: ${bodyText}`); + if (currentUrl.includes('signin')) { + try { + await sendLoginAlert(); + } catch (notifyErr) { + console.error('[通知] 发送登录提醒失败:', notifyErr.message); + } + } + throw err; + } + + if ((await page.locator('text=登录').count()) > 0 && page.url().includes('login')) { + throw new Error('当前未登录,请先执行 npm run login'); + } + await sleep(1500); +} + +async function scrapePagedTable(page, dataset, context) { + const pages = []; + const visited = new Set(); + + while (true) { + await waitForTableRows(page); + const pageData = await extractTable(page); + const pageNum = await currentPageNumber(page); + const pageKey = `${pageNum}-${pageData.rows.length}`; + console.log(`[抓取] 第${pageNum}页, ${pageData.rows.length}行, key="${pageKey}"`); + if (visited.has(pageKey)) { + console.log(`[抓取] 重复页面key,停止翻页`); + break; + } + visited.add(pageKey); + pages.push(...pageData.rows.map((row) => ({ ...row, __context: context }))); + + const moved = await gotoNextPage(page); + if (!moved) { + console.log(`[抓取] 翻页失败或已到最后一页,停止`); + break; + } + } + + console.log(`[抓取] 共采集 ${pages.length} 条记录`); + return pages; +} + +async function extractTable(page) { + return page.evaluate(() => { + const normalize = (value) => + String(value || '') + .replace(/\u00a0/g, ' ') + .replace(/\s+\n/g, '\n') + .replace(/\n\s+/g, '\n') + .trim(); + + const headerTables = Array.from(document.querySelectorAll('table')).filter((table) => table.querySelectorAll('thead th').length > 1); + const headerTable = headerTables.sort((a, b) => b.querySelectorAll('thead th').length - a.querySelectorAll('thead th').length)[0]; + if (!headerTable) return { headers: [], rows: [] }; + + const headers = Array.from(headerTable.querySelectorAll('thead th')).map((cell) => normalize(cell.textContent)); + const bodyTables = Array.from(document.querySelectorAll('table')).filter((table) => table.querySelectorAll('tbody tr').length > 0); + const bodyTable = bodyTables.sort((a, b) => { + const aSize = Math.max(...Array.from(a.querySelectorAll('tbody tr')).map((row) => row.querySelectorAll('td').length), 0); + const bSize = Math.max(...Array.from(b.querySelectorAll('tbody tr')).map((row) => row.querySelectorAll('td').length), 0); + return bSize - aSize; + })[0]; + if (!bodyTable) return { headers, rows: [] }; + + const rows = Array.from(bodyTable.querySelectorAll('tbody tr')) + .map((row) => Array.from(row.querySelectorAll('td')).map((cell) => normalize(cell.innerText || cell.textContent))) + .filter((cells) => cells.some(Boolean)) + .map((cells) => { + const record = {}; + headers.forEach((header, index) => { + record[header || `column_${index + 1}`] = cells[index] || ''; + }); + return record; + }); + + return { headers, rows }; + }); +} + +async function waitForTableRows(page) { + await page.waitForFunction(() => document.querySelectorAll('table tbody tr').length > 0, null, { timeout: 120000 }); + await sleep(800); +} + +async function currentPageNumber(page) { + const active = page.locator('.next-pagination-item.next-current'); + if ((await active.count()) === 0) return 1; + return Number.parseInt((await active.first().innerText()).trim(), 10) || 1; +} + +async function gotoNextPage(page) { + const before = await currentPageNumber(page); + + // 用 Playwright locator 定位"下一页"按钮 + const nextBtn = page.locator('button.next-pagination-item.next-next'); + if ((await nextBtn.count()) === 0) { + console.log('[翻页] 未找到下一页按钮'); + return false; + } + + const disabled = (await nextBtn.getAttribute('disabled')) != null; + if (disabled) { + console.log('[翻页] 下一页按钮已禁用'); + return false; + } + + // 用 Playwright click(而非 DOM click),确保 React 事件正常触发 + await nextBtn.click(); + await sleep(2000); + + const after = await currentPageNumber(page); + console.log(`[翻页] ${before} -> ${after}`); + return before !== after; +} + +async function trySetPageSize(page, pageSize) { + const input = page.locator('input[aria-label="请选择每页显示几条"]').first(); + if ((await input.count()) === 0) return; + await input.click().catch(() => null); + await sleep(300); + const option = page.locator(`text=${pageSize}`).last(); + if ((await option.count()) === 0) { + await page.keyboard.press('Escape').catch(() => null); + return; + } + await option.click().catch(() => null); + await sleep(1200); +} + +async function setDateRange(page, start, end) { + console.log(`[订单日期] 设置: ${start} ~ ${end}`); + + await _fillDateRange(page, start, end); + + // 验证 + const startActual = await page.locator('input[placeholder="起始日期"]').inputValue().catch(() => ''); + const endActual = await page.locator('input[placeholder="结束日期"]').inputValue().catch(() => ''); + + // 如果结果不对,用反向顺序重试(先填开始再填结束) + if (startActual !== start || endActual !== end) { + console.log(`[订单日期] 首次结果不对: "${startActual}" ~ "${endActual}",反向重试`); + await _fillDateRange(page, start, end, true); + const s2 = await page.locator('input[placeholder="起始日期"]').inputValue().catch(() => ''); + const e2 = await page.locator('input[placeholder="结束日期"]').inputValue().catch(() => ''); + console.log(`[订单日期] 重试结果: "${s2}" ~ "${e2}"`); + } else { + console.log(`[订单日期] 结果: "${startActual}" ~ "${endActual}"`); + } +} + +async function _fillDateRange(page, start, end, startFirst = false) { + const trigger = page.locator('input[placeholder="结束日期"]'); + await trigger.click(); + await sleep(1000); + + const panelStartInput = page.locator('.next-range-picker-panel-input-start-date input'); + const panelEndInput = page.locator('.next-range-picker-panel-input-end-date input'); + + if (startFirst) { + // 先填开始日期 + await panelStartInput.click(); + await sleep(100); + await page.keyboard.press('Control+A'); + await page.keyboard.type(start, { delay: 30 }); + await sleep(300); + // 再填结束日期 + await panelEndInput.click(); + await sleep(100); + await page.keyboard.press('Control+A'); + await page.keyboard.type(end, { delay: 30 }); + await sleep(300); + } else { + // 先填结束日期(默认) + await panelEndInput.click(); + await sleep(100); + await page.keyboard.press('Control+A'); + await page.keyboard.type(end, { delay: 30 }); + await sleep(300); + // 再填开始日期 + await panelStartInput.click(); + await sleep(100); + await page.keyboard.press('Control+A'); + await page.keyboard.type(start, { delay: 30 }); + await sleep(300); + } + + await page.keyboard.press('Enter'); + await sleep(500); + await page.mouse.click(0, 0); + await sleep(300); + await page.keyboard.press('Escape'); + await sleep(300); + await page.locator('.next-overlay-wrapper.opened').waitFor({ state: 'hidden', timeout: 3000 }).catch(() => null); + await sleep(300); +} + +async function setMonthValue(page, month) { + // 先尝试按 inputValue 匹配 YYYY-MM 格式 + const inputs = page.locator('input'); + const total = await inputs.count(); + const allValues = []; + + for (let index = 0; index < total; index += 1) { + const input = inputs.nth(index); + const value = await input.inputValue().catch(() => ''); + const placeholder = await input.getAttribute('placeholder').catch(() => ''); + allValues.push({ index, value, placeholder }); + + if (/^\d{4}-\d{2}$/.test(value)) { + console.log(`[账单月份] 通过 value 匹配到 input[${index}], 设置: ${month}`); + await typeIntoDateInput(input, month, page); + return; + } + } + + // 如果 value 为空,尝试按 placeholder 匹配月份选择器 + for (const item of allValues) { + if (item.placeholder && /月/.test(item.placeholder)) { + console.log(`[账单月份] 通过 placeholder 匹配到 input[${item.index}], 设置: ${month}`); + await typeIntoDateInput(inputs.nth(item.index), month, page); + return; + } + } + + // 兜底:找任何看起来像日期/月份选择器的 input(排除搜索框等) + for (const item of allValues) { + const input = inputs.nth(item.index); + const cls = await input.evaluate((el) => el.closest('[class*="date-picker"], [class*="month-picker"], [class*="range-picker"]')?.className || '').catch(() => ''); + if (cls) { + console.log(`[账单月份] 通过父级 class 匹配到 input[${item.index}] (${cls}), 设置: ${month}`); + await typeIntoDateInput(input, month, page); + return; + } + } + + console.error('[DEBUG] 账单页面所有 input:', JSON.stringify(allValues, null, 2)); + throw new Error('未识别到账单佣金月份输入框,请打开页面确认结构是否变化。'); +} + +/** + * 用键盘输入日期值。 + * 策略:focus → 全选 → 快速键入 → Tab 移开焦点(触发 blur 提交,但不会像 click 那样打开面板)。 + * 即使面板弹出,快速键入 + Tab 也能在面板滚动前完成提交并关闭。 + */ +async function typeIntoDateInput(locator, value, page) { + // 移除 readonly + await locator.evaluate((node) => node.removeAttribute('readonly')); + + // focus 并全选当前内容 + await locator.focus(); + await sleep(100); + await page.keyboard.press('Control+A'); + await sleep(100); + + // 快速逐字符输入新值 + await page.keyboard.type(value, { delay: 30 }); + await sleep(200); + + // Tab 移开焦点 → 触发 onBlur 提交值 + 关闭面板 + await page.keyboard.press('Tab'); + await sleep(300); + + // 如果面板还在,Escape 兜底关闭 + await page.keyboard.press('Escape'); + await sleep(300); + + // 验证 + const actual = await locator.inputValue().catch(() => ''); + if (actual !== value) { + console.warn(`[WARN] typeIntoDateInput: 期望 "${value}",实际 "${actual}"`); + } else { + console.log(`[日期设置] 成功: "${value}"`); + } +} + +async function clickQuery(page) { + const button = page.locator('button:has-text("查询")').first(); + await button.click(); + await sleep(1800); +} + +function buildMonthlyDateWindows(startDate) { + const start = new Date(`${startDate}T00:00:00+08:00`); + const end = new Date(); + const windows = []; + const cursor = new Date(start.getFullYear(), start.getMonth(), 1); + + while (cursor <= end) { + const windowStart = new Date(cursor); + const windowEnd = new Date(cursor.getFullYear(), cursor.getMonth() + 1, 0); + const actualEnd = windowEnd > end ? end : windowEnd; + windows.push({ + windowStart: formatDate(windowStart), + windowEnd: formatDate(actualEnd), + start: formatDate(windowStart), + end: formatDate(actualEnd), + }); + cursor.setMonth(cursor.getMonth() + 1); + } + + return windows; +} + +function buildMonthList(startMonth) { + const [year, month] = startMonth.split('-').map(Number); + const cursor = new Date(year, month - 1, 1); + const end = new Date(); + const months = []; + + while (cursor <= end) { + months.push(`${cursor.getFullYear()}-${String(cursor.getMonth() + 1).padStart(2, '0')}`); + cursor.setMonth(cursor.getMonth() + 1); + } + + return months; +} + +function formatDate(date) { + return `${date.getFullYear()}-${String(date.getMonth() + 1).padStart(2, '0')}-${String(date.getDate()).padStart(2, '0')}`; +} + +function dedupeByHash(records) { + const seen = new Set(); + return records.filter((record) => { + const key = JSON.stringify(record); + if (seen.has(key)) return false; + seen.add(key); + return true; + }); +} + +function collectValidOrderIds(records) { + const ids = []; + const seen = new Set(); + for (const record of records) { + // 支持两种字段名:normalized 后的 orderId 和原始的 订单号 + const rawOrderId = String(record.orderId || record['订单号'] || '').trim(); + if (!rawOrderId || rawOrderId.includes('没有数据')) { + continue; + } + if (!isValidOrderId(rawOrderId)) { + console.log(`[订单详情] 跳过无效订单号: ${rawOrderId}`); + continue; + } + if (seen.has(rawOrderId)) { + continue; + } + seen.add(rawOrderId); + ids.push(rawOrderId); + } + return ids; +} + +function collectValidAccountIds(records) { + const ids = []; + const seen = new Set(); + for (const record of records) { + const rawAccountId = String(record.accountId || '').trim(); + if (!rawAccountId || rawAccountId.includes('没有数据')) { + continue; + } + if (!isValidAccountId(rawAccountId)) { + console.log(`[客户详情] 跳过无效 accountId: ${rawAccountId}`); + continue; + } + if (seen.has(rawAccountId)) { + continue; + } + seen.add(rawAccountId); + ids.push(rawAccountId); + } + return ids; +} + +function isValidOrderId(orderId) { + const value = String(orderId || '').trim(); + if (!value) return false; + if (value.includes('�')) return false; + return /^\d+$/.test(value); +} + +function isValidAccountId(accountId) { + const value = String(accountId || '').trim(); + if (!value) return false; + if (value.includes('�')) return false; + return /^\d+$/.test(value); +} + +async function extractOrderDetail(page) { + return page.evaluate(() => { + const text = document.body?.innerText || ''; + + const extract = (label) => { + const lineBreakPattern = new RegExp(`${label}\\s*(?:\\r?\\n)+\\s*([^\\r\\n]+)`); + const lineBreakMatch = text.match(lineBreakPattern); + if (lineBreakMatch) return lineBreakMatch[1].trim(); + + const inlinePattern = new RegExp(`${label}\\s*[::]?\\s*([^\\r\\n]+)`); + const inlineMatch = text.match(inlinePattern); + return inlineMatch ? inlineMatch[1].trim() : ''; + }; + + return { + orderId: extract('订单号'), + orderType: extract('订单类型'), + status: extract('状态'), + tradeType: extract('交易类型'), + customerCategory: extract('客户分类'), + dealerName: extract('二级经销商名称'), + dealerUid: extract('二级经销商UID'), + customerType: extract('客户类型'), + opportunityId: extract('商机ID'), + paymentTime: extract('支付时间'), + orderTime: extract('下单时间'), + productName: extract('产品名称'), + productCode: extract('产品code'), + originalPriceCny: extract('订单原价\\(CNY\\)'), + paidAmountCny: extract('实付金额\\(CNY\\)'), + discount: extract('订单折扣'), + payableAmountCny: extract('应付金额(实付\\+代金券)\\(CNY\\)'), + couponAmountCny: extract('代金券金额\\(CNY\\)'), + }; + }); +} + +async function extractCustomerDetail(page) { + return page.evaluate(() => { + const normalize = (value) => + String(value || '') + .replace(/\u00a0/g, ' ') + .trim(); + + const text = normalize(document.body?.innerText || '').replace(/\r/g, ''); + + const extract = (label, sourceText = text) => { + const lineBreakPattern = new RegExp(`${label}\\s*(?:\\n)+\\s*([^\\n]+)`); + const lineBreakMatch = sourceText.match(lineBreakPattern); + if (lineBreakMatch) return normalize(lineBreakMatch[1]); + + const inlinePattern = new RegExp(`${label}\\s*[::]?\\s*([^\\n]+)`); + const inlineMatch = sourceText.match(inlinePattern); + return inlineMatch ? normalize(inlineMatch[1]) : ''; + }; + + const normalizeAmount = (value) => normalize(value).replace(/[¥,]/g, '').trim(); + + const buildSection = (startLabel, endLabel = '') => { + const start = text.indexOf(startLabel); + if (start < 0) return ''; + const end = endLabel ? text.indexOf(endLabel, start + startLabel.length) : -1; + if (end > start) return text.slice(start, end); + return text.slice(start); + }; + + const lastMonthSection = buildSection('上月应付总金额(CNY)', '本月应付总金额(CNY)'); + const currentMonthSection = buildSection('本月应付总金额(CNY)'); + + const extractAmountFromSection = (sectionText, label) => normalizeAmount(extract(label, sectionText)); + + let department = ''; + const table = Array.from(document.querySelectorAll('table')).find((node) => + (node.innerText || '').includes('所属部门'), + ); + if (table) { + const rows = table.querySelectorAll('tbody tr'); + for (const row of rows) { + const cells = row.querySelectorAll('td'); + if (cells.length >= 2) { + const value = normalize(cells[1]?.innerText || cells[1]?.textContent || ''); + if (value) { + department = value; + break; + } + } + } + } + + return { + customerAccount: extract('客户账号'), + customerName: extract('客户名称'), + customerType: extract('客户类型'), + tradeMode: extract('交易模式'), + customerSource: extract('客户来源'), + realNameStatus: extract('实名状态'), + email: extract('邮箱'), + relationDate: extract('关联日期'), + phone: extract('手机号'), + remark: extract('备注'), + paymentNoticeStatus: extract('代为支付告知状态'), + department, + lastMonthPayableTotalCny: extractAmountFromSection(lastMonthSection, '上月应付总金额(CNY)'), + lastMonthPrepayCny: extractAmountFromSection(lastMonthSection, '预付费金额'), + lastMonthPostpayCny: extractAmountFromSection(lastMonthSection, '后付费金额'), + currentMonthPayableTotalCny: extractAmountFromSection(currentMonthSection, '本月应付总金额(CNY)'), + currentMonthPrepayCny: extractAmountFromSection(currentMonthSection, '预付费金额'), + currentMonthPostpayCny: extractAmountFromSection(currentMonthSection, '后付费金额'), + }; + }); +} diff --git a/aliyun-sync/aps-aliyun-sync/aps_db_sync.py b/aliyun-sync/aps-aliyun-sync/aps_db_sync.py new file mode 100644 index 0000000..3588897 --- /dev/null +++ b/aliyun-sync/aps-aliyun-sync/aps_db_sync.py @@ -0,0 +1,1073 @@ +from __future__ import annotations + +""" +APS Aliyun Data Sync — Database Schema & Sync Pipeline +======================================================= +Reads JSON output from aps-aliyun-scraper skill → Syncs to MySQL. + +Design decisions: + - Customer master: UPSERT on account_id (natural PK from Aliyun). + - Customer snapshot: append-only monthly snapshot for historical tracking. + - Orders: UPSERT on order_id (natural PK). Status may change (未支付→已支付→作废). + - Bills: DELETE+INSERT per (billing_month, customer_account). Bills are immutable within + a closed month but may be re-exported with corrections before close. + - Product ratio: DELETE+INSERT per (account_id, snapshot_month). + +Update strategy rationale: + - Customers (~14): tiny, full UPSERT is fine. Snapshot table captures monthly changes. + - Orders (~60/mo): small, UPSERT handles status changes cleanly. + - Bills (~1000/mo): medium, DELETE+INSERT per customer+month is safest because bills + lack a unique row ID and may be corrected before month close. +""" + +import json +import math +import logging +import hashlib +from datetime import datetime, date +from pathlib import Path +from typing import TypedDict, cast + +import pymysql +from pymysql.connections import Connection +from pymysql.cursors import DictCursor + +JsonDict = dict[str, object] +JsonList = list[JsonDict] +StatsDict = dict[str, int] + + +class DbConfig(TypedDict): + host: str + port: int + user: str + password: str + database: str + charset: str + +# --------------------------------------------------------------------------- +# Config +# --------------------------------------------------------------------------- +DB_CONFIG = { + "host": "172.27.137.236", + "port": 3306, + "user": "ray", + "password": "GV0C$ErephgQO7RQc7b6", + "database": "crm-prod", + "charset": "utf8mb4", +} + +JSON_DIR = Path(r"C:\Users\Administrator\Desktop\aliyun-sync\aliyun-aps-sync\data\current") +LOG_FORMAT = "%(asctime)s [%(levelname)s] %(message)s" +logging.basicConfig(level=logging.INFO, format=LOG_FORMAT) +logger = logging.getLogger("aps_sync") + +# --------------------------------------------------------------------------- +# Schema DDL +# --------------------------------------------------------------------------- +DDL_STATEMENTS = [ + # 1. Customer master — latest state, UPSERT on each sync + """ + CREATE TABLE IF NOT EXISTS aps_customer ( + account_id VARCHAR(32) NOT NULL COMMENT '阿里云UID', + login_name VARCHAR(128) NOT NULL COMMENT '登录名称', + real_name VARCHAR(256) DEFAULT NULL COMMENT '实名认证名称', + report_source VARCHAR(64) DEFAULT NULL COMMENT '报备来源', + report_type VARCHAR(64) DEFAULT NULL COMMENT '报备类型', + trade_mode VARCHAR(64) DEFAULT NULL COMMENT '交易模式', + real_name_status VARCHAR(64) DEFAULT NULL COMMENT '实名认证状态', + relation_date DATETIME DEFAULT NULL COMMENT '关联日期', + follow_staff VARCHAR(64) DEFAULT NULL COMMENT '跟进员工', + payment_notice_status VARCHAR(32) DEFAULT NULL COMMENT '代为支付告知状态', + invite_register_type VARCHAR(64) DEFAULT NULL COMMENT '邀约注册类型', + is_new_customer TINYINT(1) DEFAULT NULL COMMENT '是否新客户 1=是 0=否', + performance_start_point_reached TINYINT(1) DEFAULT NULL COMMENT '是否达成业绩起算点', + customer_category VARCHAR(64) DEFAULT NULL COMMENT '客户分类', + remark VARCHAR(512) DEFAULT NULL COMMENT '备注', + no_consumption_months INT DEFAULT NULL COMMENT '客户无消费月数', + planned_release_time DATE DEFAULT NULL COMMENT '计划释放时间', + planned_release_reason VARCHAR(256) DEFAULT NULL COMMENT '计划释放原因', + -- detail fields + customer_name VARCHAR(256) DEFAULT NULL COMMENT '客户名称(详情)', + customer_type VARCHAR(64) DEFAULT NULL COMMENT '客户类型(详情)', + customer_source VARCHAR(64) DEFAULT NULL COMMENT '客户来源', + email VARCHAR(256) DEFAULT NULL COMMENT '邮箱', + phone VARCHAR(32) DEFAULT NULL COMMENT '手机号', + department VARCHAR(128) DEFAULT NULL COMMENT '所属部门', + -- sync meta + created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP, + updated_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, + PRIMARY KEY (account_id), + UNIQUE KEY uk_login_name (login_name) + ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='APS客户主表(最新状态)' + """, + + # 2. Customer monthly snapshot — append-only, tracks financial changes + """ + CREATE TABLE IF NOT EXISTS aps_customer_snapshot ( + id BIGINT NOT NULL AUTO_INCREMENT, + account_id VARCHAR(32) NOT NULL COMMENT '阿里云UID', + snapshot_month VARCHAR(7) NOT NULL COMMENT '快照月份 YYYY-MM', + account_cash_balance_cny DECIMAL(16,2) DEFAULT NULL COMMENT '账户现金余额', + pending_invoice_amount_cny DECIMAL(16,2) DEFAULT NULL COMMENT '待开票金额', + last_month_consumption_cny DECIMAL(16,2) DEFAULT NULL COMMENT '上月消费金额', + current_month_consumption_cny DECIMAL(16,2) DEFAULT NULL COMMENT '本月消费金额', + last_month_payable_total_cny DECIMAL(16,2) DEFAULT NULL COMMENT '上月应付总金额', + last_month_prepay_cny DECIMAL(16,2) DEFAULT NULL COMMENT '上月预付费金额', + last_month_postpay_cny DECIMAL(16,2) DEFAULT NULL COMMENT '上月后付费金额', + current_month_payable_total_cny DECIMAL(16,2) DEFAULT NULL COMMENT '本月应付总金额', + current_month_prepay_cny DECIMAL(16,2) DEFAULT NULL COMMENT '本月预付费金额', + current_month_postpay_cny DECIMAL(16,2) DEFAULT NULL COMMENT '本月后付费金额', + data_hash VARCHAR(64) DEFAULT NULL COMMENT '数据指纹(去重用)', + captured_at DATE NOT NULL COMMENT '抓取日期', + created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY (id), + UNIQUE KEY uk_account_month (account_id, snapshot_month, captured_at), + KEY idx_snapshot_month (snapshot_month), + CONSTRAINT fk_snapshot_customer FOREIGN KEY (account_id) REFERENCES aps_customer(account_id) ON UPDATE CASCADE + ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='APS客户月度快照(追加)' + """, + + # 3. Product ratio per snapshot + """ + CREATE TABLE IF NOT EXISTS aps_customer_product_ratio ( + id BIGINT NOT NULL AUTO_INCREMENT, + account_id VARCHAR(32) NOT NULL, + snapshot_month VARCHAR(7) NOT NULL COMMENT 'YYYY-MM', + product_name VARCHAR(128) NOT NULL COMMENT '产品分类名称', + ratio VARCHAR(16) NOT NULL COMMENT '占比(如 41.73%)', + created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY (id), + KEY idx_ratio_account_month (account_id, snapshot_month), + CONSTRAINT fk_ratio_customer FOREIGN KEY (account_id) REFERENCES aps_customer(account_id) ON UPDATE CASCADE + ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='APS客户产品占比' + """, + + # 4. Orders — UPSERT on order_id + """ + CREATE TABLE IF NOT EXISTS aps_order ( + order_id VARCHAR(32) NOT NULL COMMENT '订单号', + customer_account_id VARCHAR(32) DEFAULT NULL COMMENT '客户account_id(FK, 可为空表示未映射)', + customer_login_name VARCHAR(128) NOT NULL DEFAULT '' COMMENT '客户账号(冗余,方便查询)', + customer_category VARCHAR(64) DEFAULT NULL COMMENT '客户分类', + order_type VARCHAR(32) DEFAULT NULL COMMENT '订单类型', + original_price_cny DECIMAL(16,2) DEFAULT NULL COMMENT '订单原价', + paid_amount_cny DECIMAL(16,2) DEFAULT NULL COMMENT '实付金额', + status VARCHAR(32) DEFAULT NULL COMMENT '订单状态', + order_time DATETIME DEFAULT NULL COMMENT '下单时间', + order_month VARCHAR(7) DEFAULT NULL COMMENT '订单月份 YYYY-MM', + created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP, + updated_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, + PRIMARY KEY (order_id), + KEY idx_order_customer (customer_account_id), + KEY idx_order_month (order_month), + CONSTRAINT fk_order_customer FOREIGN KEY (customer_account_id) REFERENCES aps_customer(account_id) ON UPDATE CASCADE + ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='APS订单表(UPSERT)' + """, + + # 5. Bills — DELETE+INSERT per (billing_month, customer) + """ + CREATE TABLE IF NOT EXISTS aps_bill ( + id BIGINT NOT NULL AUTO_INCREMENT, + billing_month VARCHAR(7) NOT NULL COMMENT '账期 YYYY-MM or YYYYMM', + customer_account_id VARCHAR(32) DEFAULT NULL COMMENT '客户account_id(FK, 可为空表示未映射)', + customer_login_name VARCHAR(128) NOT NULL DEFAULT '' COMMENT '客户账号', + bill_type VARCHAR(32) DEFAULT NULL COMMENT '账单类型', + payment_type VARCHAR(32) DEFAULT NULL COMMENT '付款类型', + consumption_time VARCHAR(32) DEFAULT NULL COMMENT '消费时间', + customer_uid VARCHAR(32) DEFAULT NULL COMMENT '客户UID', + customer_category VARCHAR(64) DEFAULT NULL COMMENT '客户分类', + customer_type VARCHAR(64) DEFAULT NULL COMMENT '客户类型', + product_code VARCHAR(128) DEFAULT NULL COMMENT '产品code', + product_name VARCHAR(256) DEFAULT NULL COMMENT '产品名称', + product_category VARCHAR(64) DEFAULT NULL COMMENT '产品分类', + cloud_product_instance_id VARCHAR(256) DEFAULT NULL COMMENT '云产品实例ID', + staff_account VARCHAR(64) DEFAULT NULL COMMENT '员工账号', + opportunity_id VARCHAR(64) DEFAULT NULL COMMENT '商机ID', + original_price_cny DECIMAL(16,6) DEFAULT NULL COMMENT '原价', + customer_payable_amount_cny DECIMAL(16,6) DEFAULT NULL COMMENT '客户应付金额', + voucher_amount_cny DECIMAL(16,6) DEFAULT NULL COMMENT '代金券金额', + stored_value_card_amount_cny DECIMAL(16,6) DEFAULT NULL COMMENT '储值卡金额', + cash_paid_amount_cny DECIMAL(16,6) DEFAULT NULL COMMENT '现金支付金额', + commission_month VARCHAR(7) DEFAULT NULL COMMENT '佣金月份', + non_business_compensation_amount_cny DECIMAL(16,6) DEFAULT NULL COMMENT '非业务类赔偿金额', + actual_paid_amount_cny DECIMAL(16,6) DEFAULT NULL COMMENT '实付金额', + actual_paid_plus_compensation_cny DECIMAL(16,6) DEFAULT NULL COMMENT '实付+非业务类赔偿金额', + included_in_performance VARCHAR(8) DEFAULT NULL COMMENT '是否计入业绩', + excluded_from_performance_reason VARCHAR(256) DEFAULT NULL COMMENT '不计业绩原因', + rebated VARCHAR(8) DEFAULT NULL COMMENT '是否返佣', + non_rebate_reason VARCHAR(256) DEFAULT NULL COMMENT '不返佣原因', + main_order_id VARCHAR(64) DEFAULT NULL COMMENT '主订单号', + sub_order_id VARCHAR(64) DEFAULT NULL COMMENT '子订单号', + order_cycle VARCHAR(32) DEFAULT NULL COMMENT '订单周期', + order_type VARCHAR(32) DEFAULT NULL COMMENT '订单类型', + original_order_id VARCHAR(64) DEFAULT NULL COMMENT '原订单号', + service_start_time VARCHAR(32) DEFAULT NULL COMMENT '服务开始时间', + service_end_time VARCHAR(32) DEFAULT NULL COMMENT '服务结束时间', + invite_register_type VARCHAR(64) DEFAULT NULL COMMENT '邀约注册类型', + staff_account_id_at_close VARCHAR(64) DEFAULT NULL, + staff_login_at_close VARCHAR(64) DEFAULT NULL, + staff_name_at_close VARCHAR(64) DEFAULT NULL, + staff_account_id_at_export VARCHAR(64) DEFAULT NULL, + staff_login_at_export VARCHAR(64) DEFAULT NULL, + staff_name_at_export VARCHAR(64) DEFAULT NULL, + partner_uid VARCHAR(32) DEFAULT NULL COMMENT '伙伴UID', + created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY (id), + KEY idx_bill_customer (customer_account_id), + KEY idx_bill_month (billing_month), + KEY idx_bill_commission (commission_month), + CONSTRAINT fk_bill_customer FOREIGN KEY (customer_account_id) REFERENCES aps_customer(account_id) ON UPDATE CASCADE + ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='APS账单明细(按月全量替换)' + """, + + # 7. Order detail — UPSERT on order_id + """ + CREATE TABLE IF NOT EXISTS aps_order_detail ( + id INT AUTO_INCREMENT PRIMARY KEY, + order_id VARCHAR(32) NOT NULL COMMENT '订单号', + order_type VARCHAR(32) COMMENT '订单类型', + status VARCHAR(32) COMMENT '订单状态', + trade_type VARCHAR(32) COMMENT '交易类型', + customer_category VARCHAR(64) COMMENT '客户分类', + dealer_name VARCHAR(256) COMMENT '二级经销商名称', + dealer_uid VARCHAR(64) COMMENT '二级经销商UID', + customer_type VARCHAR(64) COMMENT '客户类型', + opportunity_id VARCHAR(64) COMMENT '商机ID', + payment_time DATETIME COMMENT '支付时间', + order_time DATETIME COMMENT '下单时间', + product_name VARCHAR(256) COMMENT '产品名称', + product_code VARCHAR(64) COMMENT '产品code', + original_price_cny DECIMAL(14,2) COMMENT '订单原价(CNY)', + paid_amount_cny DECIMAL(14,2) COMMENT '实付金额(CNY)', + discount DECIMAL(8,4) COMMENT '订单折扣', + payable_amount_cny DECIMAL(14,2) COMMENT '应付金额(实付+代金券)(CNY)', + coupon_amount_cny DECIMAL(14,2) COMMENT '代金券金额(CNY)', + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', + updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间', + UNIQUE KEY uk_order_id (order_id) + ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='APS订单详情表(UPSERT)' + """, + + # 6. Sync log — tracks each run + """ + CREATE TABLE IF NOT EXISTS aps_sync_log ( + id BIGINT NOT NULL AUTO_INCREMENT, + sync_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP, + json_file VARCHAR(512) NOT NULL COMMENT 'source json path', + billing_month VARCHAR(7) NOT NULL, + customer_count INT DEFAULT 0, + order_count INT DEFAULT 0, + bill_count INT DEFAULT 0, + snapshot_count INT DEFAULT 0, + status VARCHAR(16) NOT NULL DEFAULT 'success' COMMENT 'success/failed', + error_message TEXT DEFAULT NULL, + duration_seconds DECIMAL(8,2) DEFAULT NULL, + PRIMARY KEY (id), + KEY idx_sync_month (billing_month) + ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='APS同步执行日志' + """, +] + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- +def safe_decimal(v: object) -> float | None: + """Convert to float or None. Handles '- -', NaN, empty.""" + if v is None: + return None + if isinstance(v, str): + v = v.strip() + if v in ("", "- -", "--", "null"): + return None + try: + return float(v) + except ValueError: + return None + if isinstance(v, (int, float)): + if math.isnan(v): + return None + return float(v) + return None + + +def safe_bool(v: object) -> int | None: + """Convert '是'/'否'/True/False to 1/0/None.""" + if v is None: + return None + if isinstance(v, bool): + return 1 if v else 0 + if isinstance(v, str): + if v == "是" or v.lower() == "true": + return 1 + if v == "否" or v.lower() == "false": + return 0 + return None + + +def safe_str(v: object, max_len: int | None = None) -> str | None: + if v is None: + return None + s = str(v).strip() + if s in ("", "- -", "--", "无", "null", "None"): + return None + if max_len and len(s) > max_len: + s = s[:max_len] + return s + + +def parse_datetime(v: object) -> datetime | str | None: + if v is None: + return None + s = safe_str(v) + if s is None: + return s + for fmt in ("%Y-%m-%d %H:%M:%S", "%Y-%m-%d", "%Y/%m/%d %H:%M:%S"): + try: + return datetime.strptime(s, fmt) + except ValueError: + continue + return s # fallback: store as string + + +def parse_date(v: object) -> date | None: + if v is None: + return None + s = safe_str(v) + if s is None: + return s + for fmt in ("%Y-%m-%d", "%Y/%m/%d"): + try: + return datetime.strptime(s, fmt).date() + except ValueError: + continue + return None + + +def data_hash(obj: JsonDict) -> str: + """Deterministic hash of dict for change detection.""" + raw = json.dumps(obj, sort_keys=True, default=str) + return hashlib.md5(raw.encode()).hexdigest() + + +def build_login_to_account_map(customers: JsonList) -> dict[str, str]: + """login_name -> account_id, with fuzzy key (spaces stripped) as fallback.""" + m: dict[str, str] = {} + for c in customers: + ln = cast(str, c["login_name"]) + account_id = cast(str, c["account_id"]) + m[ln] = account_id + # 同时存一个去空格的 key 做模糊匹配 + stripped = ln.replace(" ", "") + if stripped != ln: + m[stripped] = account_id + return m + + +def resolve_account_id(login_to_account: dict[str, str], customer_account: str | None) -> str | None: + """尝试精确匹配,失败则去空格再匹配。""" + if not customer_account: + return None + account_id = login_to_account.get(customer_account) + if account_id: + return account_id + # 去空格后重试 + stripped = customer_account.replace(" ", "") + return login_to_account.get(stripped) + + +def is_valid_order_id(order_id: str | None) -> bool: + """APS 订单号应为纯数字;过滤“没有数据”等占位值。""" + return bool(order_id and order_id.isdigit()) + + +# --------------------------------------------------------------------------- +# Sync logic +# --------------------------------------------------------------------------- +class APSSyncer: + def __init__(self, db_config: DbConfig | None = None): + self.db_config: DbConfig = db_config or DB_CONFIG + self.conn: Connection[DictCursor] | None = None + self.stats: StatsDict = {"customers": 0, "customer_details": 0, "snapshots": 0, "orders": 0, "order_details": 0, "bills": 0} + + def _require_conn(self) -> Connection[DictCursor]: + if self.conn is None: + raise RuntimeError("Database connection is not initialized") + return self.conn + + def connect(self) -> None: + self.conn = pymysql.connect( + host=self.db_config["host"], + port=self.db_config["port"], + user=self.db_config["user"], + password=self.db_config["password"], + database=self.db_config["database"], + charset=self.db_config["charset"], + cursorclass=DictCursor, + ) + logger.info("Connected to MySQL %s:%s/%s", self.db_config["host"], + self.db_config["port"], self.db_config["database"]) + + def close(self) -> None: + if self.conn: + self.conn.close() + + def ensure_schema(self) -> None: + """Create tables if not exist.""" + conn = self._require_conn() + with conn.cursor() as cur: + for ddl in DDL_STATEMENTS: + _ = cur.execute(ddl) + # 兼容历史库:允许订单在 customer 未映射时也可入库。 + # MySQL 外键允许 NULL,因此这里仅放宽列约束,不移除 FK。 + _ = cur.execute(""" + ALTER TABLE aps_order + MODIFY COLUMN customer_account_id VARCHAR(32) NULL COMMENT '客户account_id(FK, 可为空表示未映射)', + MODIFY COLUMN customer_login_name VARCHAR(128) NOT NULL DEFAULT '' COMMENT '客户账号(冗余,方便查询)' + """) + _ = cur.execute(""" + ALTER TABLE aps_bill + MODIFY COLUMN customer_account_id VARCHAR(32) NULL COMMENT '客户account_id(FK, 可为空表示未映射)', + MODIFY COLUMN customer_login_name VARCHAR(128) NOT NULL DEFAULT '' COMMENT '客户账号' + """) + conn.commit() + logger.info("Schema ensured (%d tables)", len(DDL_STATEMENTS)) + + @staticmethod + def normalize_month(v: object) -> str | None: + s = safe_str(v) + if s is None or s == "没有数据": + return None + if len(s) == 6 and s.isdigit(): + return f"{s[:4]}-{s[4:]}" + if len(s) == 7 and s[4] == "-": + return s + return None + + @staticmethod + def load_json_records(file_path: Path) -> JsonList: + with open(file_path, "r", encoding="utf-8") as f: + data = cast(object, json.load(f)) + if isinstance(data, dict): + data_dict = cast(JsonDict, data) + records = data_dict.get("records") + if not isinstance(records, list): + return [] + record_list = cast(list[object], records) + normalized_records: JsonList = [] + for record in record_list: + if isinstance(record, dict): + normalized_records.append(cast(JsonDict, record)) + return normalized_records + if isinstance(data, list): + data_list = cast(list[object], data) + return [cast(JsonDict, record) for record in data_list if isinstance(record, dict)] + return [] + + def resolve_data_files(self, data_dir: str) -> tuple[Path, Path, Path, Path, Path]: + root = Path(data_dir) + if not root.exists() or not root.is_dir(): + raise FileNotFoundError(f"Data directory not found: {root}") + + customers_file = root / "customers.json" + orders_file = root / "orders.json" + order_details_file = root / "orderDetails.json" + bills_file = root / "bills.json" + customer_details_file = root / "customerDetails.json" + for fp in (customers_file, orders_file, order_details_file, bills_file): + if not fp.exists(): + raise FileNotFoundError(f"Required JSON file not found: {fp}") + return customers_file, orders_file, order_details_file, bills_file, customer_details_file + + @staticmethod + def normalize_customer_record(raw: JsonDict) -> JsonDict | None: + account_id = safe_str(raw.get("accountId"), 32) + login_name = safe_str(raw.get("loginName"), 128) + if not account_id or not login_name: + return None + + inactive = safe_str(raw.get("inactiveMonths")) + no_consumption_months = int(inactive) if inactive and inactive.isdigit() else None + + return { + "account_id": account_id, + "login_name": login_name, + "real_name": safe_str(raw.get("realName"), 256), + "report_source": safe_str(raw.get("reportSource"), 64), + "report_type": safe_str(raw.get("reportType"), 64), + "trade_mode": safe_str(raw.get("tradeMode"), 64), + "real_name_status": safe_str(raw.get("authStatus"), 64), + "relation_date": parse_datetime(raw.get("relationTime")), + "follow_staff": safe_str(raw.get("owner"), 64), + "payment_notice_status": safe_str(raw.get("paymentNoticeStatus"), 32), + "invite_register_type": safe_str(raw.get("inviteType"), 64), + "is_new_customer": safe_bool(raw.get("isNewCustomer")), + "performance_start_point_reached": safe_bool(raw.get("isPerformanceQualified")), + "customer_category": safe_str(raw.get("customerCategory"), 64), + "remark": safe_str(raw.get("remark"), 512), + "no_consumption_months": no_consumption_months, + "planned_release_time": parse_date(raw.get("releasePlanTime")), + "planned_release_reason": safe_str(raw.get("releasePlanReason"), 256), + "account_cash_balance_cny": safe_decimal(raw.get("cashBalanceCny")), + "pending_invoice_amount_cny": safe_decimal(raw.get("invoicePendingCny")), + "last_month_consumption_cny": safe_decimal(raw.get("lastMonthConsumptionCny")), + "current_month_consumption_cny": safe_decimal(raw.get("thisMonthConsumptionCny")), + } + + # ---- Customer master UPSERT ---- + def upsert_customer(self, c: JsonDict) -> None: + sql = """ + INSERT INTO aps_customer ( + account_id, login_name, real_name, + report_source, report_type, trade_mode, real_name_status, + relation_date, follow_staff, payment_notice_status, + invite_register_type, is_new_customer, performance_start_point_reached, + customer_category, remark, no_consumption_months, + planned_release_time, planned_release_reason, + customer_name, customer_type, customer_source, + email, phone, department + ) VALUES ( + %s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s + ) ON DUPLICATE KEY UPDATE + login_name=VALUES(login_name), real_name=VALUES(real_name), + report_source=VALUES(report_source), report_type=VALUES(report_type), + trade_mode=VALUES(trade_mode), real_name_status=VALUES(real_name_status), + relation_date=VALUES(relation_date), follow_staff=VALUES(follow_staff), + payment_notice_status=VALUES(payment_notice_status), + invite_register_type=VALUES(invite_register_type), + is_new_customer=VALUES(is_new_customer), + performance_start_point_reached=VALUES(performance_start_point_reached), + customer_category=VALUES(customer_category), remark=VALUES(remark), + no_consumption_months=VALUES(no_consumption_months), + planned_release_time=VALUES(planned_release_time), + planned_release_reason=VALUES(planned_release_reason), + customer_name=VALUES(customer_name), customer_type=VALUES(customer_type), + customer_source=VALUES(customer_source), + email=VALUES(email), phone=VALUES(phone), department=VALUES(department) + """ + params = ( + c["account_id"], c["login_name"], safe_str(c.get("real_name")), + safe_str(c.get("report_source")), safe_str(c.get("report_type")), + safe_str(c.get("trade_mode")), + safe_str(c.get("real_name_status")), + c.get("relation_date"), + safe_str(c.get("follow_staff")), + safe_str(c.get("payment_notice_status")), + safe_str(c.get("invite_register_type")), + c.get("is_new_customer"), + c.get("performance_start_point_reached"), + safe_str(c.get("customer_category")), + safe_str(c.get("remark")), + c.get("no_consumption_months"), + c.get("planned_release_time"), + safe_str(c.get("planned_release_reason")), + None, None, None, None, None, None, + ) + conn = self._require_conn() + with conn.cursor() as cur: + _ = cur.execute(sql, params) + self.stats["customers"] += 1 + + # ---- Customer snapshot (append, skip if hash identical) ---- + def insert_snapshot(self, c: JsonDict, snapshot_month: str, captured_at: str) -> None: + financial: JsonDict = { + "balance": safe_decimal(c.get("account_cash_balance_cny")), + "pending": safe_decimal(c.get("pending_invoice_amount_cny")), + "last_consumption": safe_decimal(c.get("last_month_consumption_cny")), + "curr_consumption": safe_decimal(c.get("current_month_consumption_cny")), + "last_payable": None, + "last_pre": None, + "last_post": None, + "curr_payable": None, + "curr_pre": None, + "curr_post": None, + } + h = data_hash(financial) + cap_date = parse_date(captured_at) + + # Check if same hash already exists for this account+month+date + conn = self._require_conn() + with conn.cursor() as cur: + _ = cur.execute( + "SELECT id FROM aps_customer_snapshot WHERE account_id=%s AND snapshot_month=%s AND captured_at=%s", + (c["account_id"], snapshot_month, cap_date), + ) + if cur.fetchone(): + # Already have snapshot for this date, update it + _ = cur.execute(""" + UPDATE aps_customer_snapshot SET + account_cash_balance_cny=%s, pending_invoice_amount_cny=%s, + last_month_consumption_cny=%s, current_month_consumption_cny=%s, + last_month_payable_total_cny=%s, last_month_prepay_cny=%s, last_month_postpay_cny=%s, + current_month_payable_total_cny=%s, current_month_prepay_cny=%s, current_month_postpay_cny=%s, + data_hash=%s + WHERE account_id=%s AND snapshot_month=%s AND captured_at=%s + """, ( + financial["balance"], financial["pending"], + financial["last_consumption"], financial["curr_consumption"], + financial["last_payable"], financial["last_pre"], financial["last_post"], + financial["curr_payable"], financial["curr_pre"], financial["curr_post"], + h, c["account_id"], snapshot_month, cap_date, + )) + else: + _ = cur.execute(""" + INSERT INTO aps_customer_snapshot ( + account_id, snapshot_month, + account_cash_balance_cny, pending_invoice_amount_cny, + last_month_consumption_cny, current_month_consumption_cny, + last_month_payable_total_cny, last_month_prepay_cny, last_month_postpay_cny, + current_month_payable_total_cny, current_month_prepay_cny, current_month_postpay_cny, + data_hash, captured_at + ) VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s) + """, ( + c["account_id"], snapshot_month, + financial["balance"], financial["pending"], + financial["last_consumption"], financial["curr_consumption"], + financial["last_payable"], financial["last_pre"], financial["last_post"], + financial["curr_payable"], financial["curr_pre"], financial["curr_post"], + h, cap_date, + )) + self.stats["snapshots"] += 1 + + # ---- Orders UPSERT ---- + def upsert_orders(self, records: JsonList, login_to_account: dict[str, str]) -> None: + sql = """ + INSERT INTO aps_order ( + order_id, customer_account_id, customer_login_name, + customer_category, order_type, original_price_cny, paid_amount_cny, + status, order_time, order_month + ) VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s) + ON DUPLICATE KEY UPDATE + customer_category=VALUES(customer_category), + order_type=VALUES(order_type), + original_price_cny=VALUES(original_price_cny), + paid_amount_cny=VALUES(paid_amount_cny), + status=VALUES(status), + order_time=VALUES(order_time), + order_month=VALUES(order_month) + """ + conn = self._require_conn() + with conn.cursor() as cur: + for o in records: + order_id = safe_str(o.get("orderId"), 32) + customer_login = safe_str(o.get("customerAccount"), 128) + if not order_id: + logger.warning("Skipping order without orderId: %s", o) + continue + if not is_valid_order_id(order_id): + logger.warning("Skipping placeholder/invalid orderId '%s'", order_id) + continue + + account_id = resolve_account_id(login_to_account, customer_login) + if not customer_login: + logger.warning("Order %s has empty customerAccount; inserting with NULL customer_account_id", order_id) + elif not account_id: + logger.warning("Order %s has unresolved customerAccount '%s'; inserting with NULL customer_account_id", order_id, customer_login) + + created_at_raw = o.get("createdAt") + order_time = parse_datetime(created_at_raw) + order_month = None + if order_time and isinstance(order_time, datetime): + order_month = order_time.strftime("%Y-%m") + else: + created_at_prefix = created_at_raw[:7] if isinstance(created_at_raw, str) and len(created_at_raw) >= 7 else None + order_month = self.normalize_month(created_at_prefix) + + _ = cur.execute(sql, ( + order_id, account_id, customer_login or "", + safe_str(o.get("customerCategory")), safe_str(o.get("orderType")), + safe_decimal(o.get("orderOriginalPriceCny")), + safe_decimal(o.get("actualPaidCny")), + safe_str(o.get("orderStatus")), order_time, order_month, + )) + self.stats["orders"] += 1 + + # ---- Order details UPSERT ---- + def upsert_order_details(self, records: JsonList) -> None: + sql = """ + INSERT INTO aps_order_detail ( + order_id, order_type, status, trade_type, customer_category, + dealer_name, dealer_uid, customer_type, opportunity_id, + payment_time, order_time, product_name, product_code, + original_price_cny, paid_amount_cny, discount, + payable_amount_cny, coupon_amount_cny + ) VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s) + ON DUPLICATE KEY UPDATE + order_type=VALUES(order_type), + status=VALUES(status), + trade_type=VALUES(trade_type), + customer_category=VALUES(customer_category), + dealer_name=VALUES(dealer_name), + dealer_uid=VALUES(dealer_uid), + customer_type=VALUES(customer_type), + opportunity_id=VALUES(opportunity_id), + payment_time=VALUES(payment_time), + order_time=VALUES(order_time), + product_name=VALUES(product_name), + product_code=VALUES(product_code), + original_price_cny=VALUES(original_price_cny), + paid_amount_cny=VALUES(paid_amount_cny), + discount=VALUES(discount), + payable_amount_cny=VALUES(payable_amount_cny), + coupon_amount_cny=VALUES(coupon_amount_cny) + """ + conn = self._require_conn() + with conn.cursor() as cur: + for d in records: + order_id = safe_str(d.get("orderId"), 32) + if not order_id: + logger.warning("Skipping order detail without orderId: %s", d) + continue + + _ = cur.execute(sql, ( + order_id, + safe_str(d.get("orderType"), 32), + safe_str(d.get("status"), 32), + safe_str(d.get("tradeType"), 32), + safe_str(d.get("customerCategory"), 64), + safe_str(d.get("dealerName"), 256), + safe_str(d.get("dealerUid"), 64), + safe_str(d.get("customerType"), 64), + safe_str(d.get("opportunityId"), 64), + parse_datetime(d.get("paymentTime")), + parse_datetime(d.get("orderTime")), + safe_str(d.get("productName"), 256), + safe_str(d.get("productCode"), 64), + safe_decimal(d.get("originalPriceCny")), + safe_decimal(d.get("paidAmountCny")), + safe_decimal(d.get("discount")), + safe_decimal(d.get("payableAmountCny")), + safe_decimal(d.get("couponAmountCny")), + )) + self.stats["order_details"] += 1 + + def update_customer_details(self, records: JsonList, snapshot_month: str) -> None: + sql_customer = """ + UPDATE aps_customer SET + customer_name=%s, customer_type=%s, customer_source=%s, + email=%s, phone=%s, department=%s + WHERE account_id=%s + """ + sql_snapshot = """ + UPDATE aps_customer_snapshot SET + last_month_payable_total_cny=%s, last_month_prepay_cny=%s, last_month_postpay_cny=%s, + current_month_payable_total_cny=%s, current_month_prepay_cny=%s, current_month_postpay_cny=%s + WHERE account_id=%s AND snapshot_month=%s + """ + + conn = self._require_conn() + with conn.cursor() as cur: + for r in records: + account_id = safe_str(r.get("accountId"), 32) + if not account_id: + logger.warning("Skipping customer detail without accountId: %s", r) + continue + + _ = cur.execute(sql_customer, ( + safe_str(r.get("customerName"), 256), + safe_str(r.get("customerType"), 64), + safe_str(r.get("customerSource"), 64), + safe_str(r.get("email"), 256), + safe_str(r.get("phone"), 32), + safe_str(r.get("department"), 128), + account_id, + )) + + _ = cur.execute(sql_snapshot, ( + safe_decimal(r.get("lastMonthPayableTotalCny")), + safe_decimal(r.get("lastMonthPrepayCny")), + safe_decimal(r.get("lastMonthPostpayCny")), + safe_decimal(r.get("currentMonthPayableTotalCny")), + safe_decimal(r.get("currentMonthPrepayCny")), + safe_decimal(r.get("currentMonthPostpayCny")), + account_id, + snapshot_month, + )) + self.stats["customer_details"] += 1 + + # ---- Bills sync ---- + def sync_bills(self, records: JsonList, login_to_account: dict[str, str], incremental: bool = False) -> None: + grouped: dict[tuple[str | None, str, str], JsonList] = {} + skipped = 0 + for b in records: + billing_month = self.normalize_month(b.get("billingMonth")) + commission_month = self.normalize_month(b.get("commissionMonth")) + customer_login = safe_str(b.get("customerAccount"), 128) + if not billing_month or not commission_month: + skipped += 1 + continue + + account_id = resolve_account_id(login_to_account, customer_login) + if not customer_login: + logger.warning("Bill row has empty customerAccount; inserting with NULL customer_account_id, commission_month=%s", commission_month) + elif not account_id: + logger.warning("Bill row has unresolved customerAccount '%s'; inserting with NULL customer_account_id, commission_month=%s", customer_login, commission_month) + + key = (account_id, customer_login or "", commission_month) + grouped.setdefault(key, []).append(b) + + if skipped: + logger.info("Filtered out %d invalid/unresolved bill rows", skipped) + + if not grouped: + return + + conn = self._require_conn() + with conn.cursor() as cur: + sql = """ + INSERT INTO aps_bill ( + billing_month, customer_account_id, customer_login_name, + bill_type, consumption_time, + customer_category, + product_name, product_category, + original_price_cny, customer_payable_amount_cny, + commission_month, + included_in_performance, + rebated, + invite_register_type, + service_start_time, service_end_time + ) VALUES ( + %s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s + ) + """ + for (account_id, login_name, commission_month), rows in grouped.items(): + if incremental: + # 增量模式:逐条检查,已存在则跳过 + for b in rows: + bm = self.normalize_month(b.get("billingMonth")) + product_name = safe_str(b.get("productName")) + consumption_time = safe_str(b.get("consumeDate")) + original_price = safe_decimal(b.get("originalPriceCny")) + + # 用 billing_month + customer + product + consumption_time + original_price 联合判断是否重复 + if account_id is None: + _ = cur.execute( + """SELECT COUNT(*) as cnt FROM aps_bill + WHERE customer_account_id IS NULL AND customer_login_name=%s AND commission_month=%s + AND billing_month=%s AND product_name=%s + AND consumption_time=%s AND original_price_cny=%s""", + (login_name, commission_month, bm, + product_name, consumption_time, original_price), + ) + else: + _ = cur.execute( + """SELECT COUNT(*) as cnt FROM aps_bill + WHERE customer_account_id=%s AND commission_month=%s + AND billing_month=%s AND product_name=%s + AND consumption_time=%s AND original_price_cny=%s""", + (account_id, commission_month, bm, + product_name, consumption_time, original_price), + ) + row = cur.fetchone() + if row and (row.get("cnt", 0) or row.get("COUNT(*)", 0)) > 0: + continue # 已存在,跳过 + + _ = cur.execute(sql, ( + bm, account_id, login_name, + safe_str(b.get("billType")), safe_str(b.get("consumeDate")), + safe_str(b.get("customerCategory")), + safe_str(b.get("productName")), safe_str(b.get("productCategory")), + safe_decimal(b.get("originalPriceCny")), + safe_decimal(b.get("customerPayableCny")), + self.normalize_month(b.get("commissionMonth")), + safe_str(b.get("countsForPerformance")), + safe_str(b.get("commissionable")), + safe_str(b.get("inviteType")), + safe_str(b.get("serviceStartAt")), safe_str(b.get("serviceEndAt")), + )) + self.stats["bills"] += 1 + else: + # 全量模式:DELETE+INSERT per customer+month + if account_id is None: + _ = cur.execute( + "DELETE FROM aps_bill WHERE customer_account_id IS NULL AND customer_login_name=%s AND commission_month=%s", + (login_name, commission_month), + ) + else: + _ = cur.execute( + "DELETE FROM aps_bill WHERE customer_account_id=%s AND commission_month=%s", + (account_id, commission_month), + ) + for b in rows: + _ = cur.execute(sql, ( + self.normalize_month(b.get("billingMonth")), account_id, login_name, + safe_str(b.get("billType")), safe_str(b.get("consumeDate")), + safe_str(b.get("customerCategory")), + safe_str(b.get("productName")), safe_str(b.get("productCategory")), + safe_decimal(b.get("originalPriceCny")), + safe_decimal(b.get("customerPayableCny")), + self.normalize_month(b.get("commissionMonth")), + safe_str(b.get("countsForPerformance")), + safe_str(b.get("commissionable")), + safe_str(b.get("inviteType")), + safe_str(b.get("serviceStartAt")), safe_str(b.get("serviceEndAt")), + )) + self.stats["bills"] += 1 + + # ---- Main sync entry ---- + def sync_from_json(self, data_dir: str, incremental: bool = False) -> StatsDict: + start = datetime.now() + customers_file, orders_file, order_details_file, bills_file, customer_details_file = self.resolve_data_files(data_dir) + logger.info("Loading source files from %s%s", data_dir, " (增量模式)" if incremental else "") + + raw_customers = self.load_json_records(customers_file) + raw_orders = self.load_json_records(orders_file) + raw_order_details = self.load_json_records(order_details_file) + raw_bills = self.load_json_records(bills_file) + raw_customer_details: JsonList = [] + try: + if customer_details_file.exists(): + raw_customer_details = self.load_json_records(customer_details_file) + else: + logger.info("Optional file missing, skip customer details: %s", customer_details_file) + except Exception as e: + logger.warning("Failed to load optional customer details file %s: %s", customer_details_file, e) + + captured_at = date.today().isoformat() + billing_month = None + for b in raw_bills: + billing_month = self.normalize_month(b.get("billingMonth")) or self.normalize_month(b.get("commissionMonth")) + if billing_month: + break + if not billing_month: + billing_month = datetime.now().strftime("%Y-%m") + + self.stats = {"customers": 0, "customer_details": 0, "snapshots": 0, "orders": 0, "order_details": 0, "bills": 0} + + try: + self.connect() + self.ensure_schema() + + customers: JsonList = [] + skipped_customers = 0 + for raw in raw_customers: + c = self.normalize_customer_record(raw) + if not c: + skipped_customers += 1 + continue + customers.append(c) + self.upsert_customer(c) + self.insert_snapshot(c, billing_month, captured_at) + + if skipped_customers: + logger.info("Skipped %d invalid customer rows", skipped_customers) + + login_to_account = build_login_to_account_map(customers) + logger.info("Resolved %d customer login_name -> account_id mappings", len(login_to_account)) + + if raw_customer_details: + self.update_customer_details(raw_customer_details, billing_month) + + self.upsert_orders(raw_orders, login_to_account) + self.upsert_order_details(raw_order_details) + self.sync_bills(raw_bills, login_to_account, incremental=incremental) + + # Log sync + duration = (datetime.now() - start).total_seconds() + conn = self._require_conn() + with conn.cursor() as cur: + _ = cur.execute(""" + INSERT INTO aps_sync_log (json_file, billing_month, customer_count, order_count, bill_count, snapshot_count, status, duration_seconds) + VALUES (%s,%s,%s,%s,%s,%s,'success',%s) + """, ( + str(Path(data_dir)), billing_month, + self.stats["customers"], self.stats["orders"], + self.stats["bills"], self.stats["snapshots"], duration, + )) + + conn.commit() + logger.info( + "Sync complete in %.1fs: %d customers, %d customer_details, %d snapshots, %d orders, %d order_details, %d bills", + duration, self.stats["customers"], self.stats["customer_details"], + self.stats["snapshots"], self.stats["orders"], self.stats["order_details"], self.stats["bills"], + ) + return self.stats + + except Exception as e: + logger.error("Sync failed: %s", e) + if self.conn: + duration = (datetime.now() - start).total_seconds() + try: + conn = self._require_conn() + conn.rollback() + with conn.cursor() as cur: + _ = cur.execute(""" + INSERT INTO aps_sync_log (json_file, billing_month, status, error_message, duration_seconds) + VALUES (%s,%s,'failed',%s,%s) + """, (str(Path(data_dir)), billing_month or "unknown", str(e), duration)) + conn.commit() + except Exception: + pass + raise + finally: + self.close() + + def find_latest_json(self) -> Path | None: + """Resolve default data dir only when all required JSON files exist.""" + try: + _ = self.resolve_data_files(str(JSON_DIR)) + return JSON_DIR + except FileNotFoundError: + return None + + def get_latest_bill_consumption_time(self) -> str | None: + conn = self._require_conn() + with conn.cursor() as cur: + _ = cur.execute("SELECT MAX(consumption_time) AS latest_time FROM aps_bill") + row = cur.fetchone() + if not row: + return None + latest_time = row.get("latest_time") + if latest_time is None: + return None + if isinstance(latest_time, datetime): + return latest_time.strftime("%Y-%m-%d") + latest_time_value = cast(object, latest_time) + return str(latest_time_value) + + +# --------------------------------------------------------------------------- +# CLI +# --------------------------------------------------------------------------- +def main() -> None: + import argparse + parser = argparse.ArgumentParser(description="Sync APS Aliyun JSON data to MySQL") + _ = parser.add_argument( + "--dir", + type=str, + default=str(JSON_DIR), + help="Directory containing customers.json, orders.json, orderDetails.json, bills.json", + ) + _ = parser.add_argument( + "--incremental", + action="store_true", + default=False, + help="增量模式: 订单UPSERT, 账单检查存在后跳过(不删除历史数据)", + ) + _ = parser.add_argument( + "--latest-bill-consumption-time", + action="store_true", + default=False, + help="仅查询 aps_bill 中最新的 consumption_time 并输出", + ) + args = parser.parse_args() + data_dir = cast(str, args.dir) + incremental = cast(bool, args.incremental) + latest_bill_consumption_time = cast(bool, args.latest_bill_consumption_time) + + syncer = APSSyncer(db_config=DB_CONFIG) + if latest_bill_consumption_time: + syncer.connect() + try: + latest_time = syncer.get_latest_bill_consumption_time() + if latest_time: + print(latest_time) + return + finally: + syncer.close() + _ = syncer.sync_from_json(data_dir, incremental=incremental) + + +if __name__ == "__main__": + main() diff --git a/aliyun-sync/aps-aliyun-sync/aps_scheduler.py b/aliyun-sync/aps-aliyun-sync/aps_scheduler.py new file mode 100644 index 0000000..8aafbaf --- /dev/null +++ b/aliyun-sync/aps-aliyun-sync/aps_scheduler.py @@ -0,0 +1,130 @@ +""" +Long-running scheduler — watches for new JSON files and syncs to MySQL. + +Modes: + --watch : File watcher, syncs immediately when new JSON appears (default) + --cron : One-shot sync, meant to be called by system cron/launchd + --daemon : Combined: runs initial sync + watches for changes +""" + +import time +import sys +import signal +import argparse +import logging +from pathlib import Path +from datetime import datetime + +from aps_db_sync import APSSyncer, DB_CONFIG, JSON_DIR + +LOG_FORMAT = "%(asctime)s [%(levelname)s] %(message)s" +logging.basicConfig(level=logging.INFO, format=LOG_FORMAT) +logger = logging.getLogger("aps_scheduler") + +WATCH_INTERVAL_SECONDS = 30 +PROCESSED_MARKER_DIR = JSON_DIR / ".aps_sync_processed" + + +def _update_watch_interval(value: int): + global WATCH_INTERVAL_SECONDS + WATCH_INTERVAL_SECONDS = value + + +class SyncScheduler: + def __init__(self, db_config: dict = None): + self.db_config = db_config or DB_CONFIG + self.running = True + PROCESSED_MARKER_DIR.mkdir(exist_ok=True) + signal.signal(signal.SIGINT, self._shutdown) + signal.signal(signal.SIGTERM, self._shutdown) + + def _shutdown(self, signum, frame): + logger.info("Shutdown signal received, stopping...") + self.running = False + + def _marker_path(self, json_path: Path) -> Path: + return PROCESSED_MARKER_DIR / f"{json_path.stem}.synced" + + def _is_processed(self, json_path: Path) -> bool: + marker = self._marker_path(json_path) + if not marker.exists(): + return False + marker_mtime = marker.stat().st_mtime + json_mtime = json_path.stat().st_mtime + return marker_mtime >= json_mtime + + def _mark_processed(self, json_path: Path): + marker = self._marker_path(json_path) + marker.write_text(datetime.now().isoformat()) + + def find_unprocessed_files(self) -> list[Path]: + pattern = "aps_aliyun_customers_with_bills_*.json" + all_files = sorted(JSON_DIR.glob(pattern), key=lambda p: p.stat().st_mtime) + return [f for f in all_files if not self._is_processed(f)] + + def sync_file(self, json_path: Path) -> bool: + logger.info("Syncing: %s", json_path.name) + try: + syncer = APSSyncer(db_config=self.db_config) + syncer.sync_from_json(str(json_path)) + self._mark_processed(json_path) + return True + except Exception as e: + logger.error("Sync failed for %s: %s", json_path.name, e) + return False + + def run_once(self): + unprocessed = self.find_unprocessed_files() + if not unprocessed: + logger.info("No unprocessed JSON files found") + return 0 + count = 0 + for f in unprocessed: + if self.sync_file(f): + count += 1 + logger.info("Processed %d/%d files", count, len(unprocessed)) + return count + + def run_watch(self): + logger.info("Watching %s for new JSON files (interval=%ds)", JSON_DIR, WATCH_INTERVAL_SECONDS) + self.run_once() + while self.running: + time.sleep(WATCH_INTERVAL_SECONDS) + unprocessed = self.find_unprocessed_files() + for f in unprocessed: + if not self.running: + break + self.sync_file(f) + logger.info("Watcher stopped") + + +def main(): + parser = argparse.ArgumentParser(description="APS Sync Scheduler") + parser.add_argument("--mode", choices=["watch", "cron", "daemon"], default="watch", + help="watch=file watcher, cron=one-shot, daemon=watch with initial sync") + parser.add_argument("--host", default=DB_CONFIG["host"]) + parser.add_argument("--port", type=int, default=DB_CONFIG["port"]) + parser.add_argument("--user", default=DB_CONFIG["user"]) + parser.add_argument("--password", default=DB_CONFIG["password"]) + parser.add_argument("--database", default=DB_CONFIG["database"]) + parser.add_argument("--interval", type=int, default=WATCH_INTERVAL_SECONDS, + help="Watch interval in seconds") + args = parser.parse_args() + + _update_watch_interval(args.interval) + + config = { + "host": args.host, "port": args.port, "user": args.user, + "password": args.password, "database": args.database, "charset": "utf8mb4", + } + scheduler = SyncScheduler(db_config=config) + + if args.mode == "cron": + count = scheduler.run_once() + sys.exit(0 if count >= 0 else 1) + else: + scheduler.run_watch() + + +if __name__ == "__main__": + main() diff --git a/aliyun-sync/aps-aliyun-sync/check_db.py b/aliyun-sync/aps-aliyun-sync/check_db.py new file mode 100644 index 0000000..ad9a07b --- /dev/null +++ b/aliyun-sync/aps-aliyun-sync/check_db.py @@ -0,0 +1,27 @@ +import pymysql + +conn = pymysql.connect( + host='8.156.34.195', + port=23149, + user='ray', + password='GV0C$ErephgQO7RQc7b6', + database='goonseek-dev', + charset='utf8mb4', +) +cur = conn.cursor() + +# 检查客户详情字段 +cur.execute('SELECT account_id, customer_name, phone, department FROM aps_customer LIMIT 5') +rows = cur.fetchall() +print(f'aps_customer ({len(rows)} rows):') +for r in rows: + print(f' {r}') + +# 检查快照金额字段 +cur.execute('SELECT account_id, snapshot_month, last_month_payable_total_cny, current_month_payable_total_cny FROM aps_customer_snapshot LIMIT 5') +rows2 = cur.fetchall() +print(f'\naps_customer_snapshot ({len(rows2)} rows):') +for r in rows2: + print(f' {r}') + +conn.close()