python切换node的版本

This commit is contained in:
ray
2026-04-28 10:05:38 +08:00
parent a008d416a3
commit 552d840f3f
10 changed files with 1524 additions and 344 deletions

View File

@@ -1,25 +1,13 @@
# 阿里云 APS 同步命令清单
# 阿里云 APS Node 同步命令清单
本文档整理项目中常用的爬取、同步、入库、调度、增量和运行时控制命令。
## 1. 项目目录
### 前端爬取项目
## 1. 进入项目目录
```powershell
cd D:\project\python\aliyun-sync\aliyun-aps-sync
```
### 后端入库项目
```powershell
cd D:\project\python\aliyun-sync\aps-aliyun-sync
```
## 2. 安装依赖
在前端爬取项目目录执行:
```powershell
npm install
```
@@ -32,234 +20,125 @@ npm run login
作用:
- 打开浏览器
- 手动完成阿里云 / RAM 登录
- 自动验证我的客户”和“账单查询”页面。
- 保存登录态`.browser/``.browser/storage-state.json`
- 打开浏览器
- 手动完成阿里云 / RAM 登录
- 自动验证我的客户账单查询
- 保存登录态
## 4. 爬取 / 同步命令
### 全量同步全部模块
## 4. 全量同步
```powershell
npm run sync
```
默认同步
默认包含
- customers
- customerDetails
- orders
- orderDetails
- bills
- messages
### 仅爬取账单
并在同步过程中直接写入数据库。
## 5. 日增量同步
```powershell
npm run incremental
```
默认包含:
- orders
- orderDetails
- bills
- messages
说明:
- 不抓 customer
- 按数据库 watermark + overlap 抓取
## 6. 单独抓账单
```powershell
npm run bills
```
### 基于 checkpoint 断点续爬账单
## 7. 单独抓消息
```powershell
npm run messages
```
## 8. 从最新 checkpoint 继续抓账单
```powershell
npm run bills -- --resume
```
作用:
- 自动读取 `data/checkpoints/bills/` 下最新 checkpoint。
- 从 checkpoint 记录的月份和页码之后继续抓取。
### 启动定时同步
## 9. 定时任务
```powershell
npm run schedule
```
默认 cron
默认读取
```env
ALIYUN_APS_CRON=0 6 * * *
ALIYUN_APS_SCHEDULE_MODE=incremental
```
表示每天早上 6 点执行。
## 5. 增量同步
### 默认增量同步
`.env` 中设置:
```env
ALIYUN_APS_FULL_SYNC=false
```
然后执行:
```powershell
npm run sync
```
默认行为:
- 订单只查昨天。
- 订单详情跟随本次订单结果。
- 账单按数据库最新消费时间增量。
### 指定订单 / 订单详情增量起始日期
临时命令方式:
```powershell
npm run sync -- --incremental-order-start-date=2026-01-01
```
`.env` 固定配置方式:
```env
ALIYUN_APS_FULL_SYNC=false
ALIYUN_APS_INCREMENTAL_ORDER_START_DATE=2026-01-01
```
然后执行:
```powershell
npm run sync
```
行为:
- 订单从指定日期开始补抓到今天。
- 订单详情跟随这些订单抓取。
### 指定账单起始月份
`.env` 中设置:
```env
ALIYUN_APS_BILL_START_MONTH=2024-01
```
然后执行:
```powershell
npm run bills
```
说明:当前账单爬取支持按月份开始;如需按具体日期开始,需要新增账单日期过滤参数。
## 6. 后端入库命令
进入后端入库项目目录:
```powershell
cd D:\project\python\aliyun-sync\aps-aliyun-sync
```
### 全量入库
```powershell
python aps_db_sync.py
```
### 增量入库
```powershell
python aps_db_sync.py --incremental
```
### 指定同步对象入库
```powershell
python aps_db_sync.py --sync-target all
python aps_db_sync.py --sync-target customer
python aps_db_sync.py --sync-target order
python aps_db_sync.py --sync-target orderdetails
python aps_db_sync.py --sync-target bills
```
### 增量只同步账单入库
```powershell
python aps_db_sync.py --incremental --sync-target bills
```
### 直接将最新 bills checkpoint 入库
```powershell
python aps_db_sync.py --sync-target bills --from-checkpoint
```
### 查询数据库最新账单消费时间
```powershell
python aps_db_sync.py --latest-bill-consumption-time
```
## 7. 后端调度器命令
进入后端入库项目目录:
```powershell
cd D:\project\python\aliyun-sync\aps-aliyun-sync
```
### 启动调度器
```powershell
python aps_scheduler.py
```
### 指定同步对象启动调度器
```powershell
python aps_scheduler.py --sync-target all
python aps_scheduler.py --sync-target customer
python aps_scheduler.py --sync-target order
python aps_scheduler.py --sync-target orderdetails
python aps_scheduler.py --sync-target bills
```
## 8. 常用 `.env` 示例
文件位置:
```powershell
D:\project\python\aliyun-sync\aliyun-aps-sync\.env
```
### 全量模式
## 10. 常用 `.env` 配置
```env
ALIYUN_APS_BASE_URL=https://aps.aliyun.com
ALIYUN_APS_HEADLESS=false
ALIYUN_APS_BROWSER_CHANNEL=
ALIYUN_APS_BROWSER_EXECUTABLE_PATH=
ALIYUN_APS_TIMEZONE=Asia/Shanghai
ALIYUN_APS_CRON=0 6 * * *
ALIYUN_APS_SCHEDULE_MODE=incremental
ALIYUN_APS_CLOSE_BROWSER=true
ALIYUN_APS_FULL_SYNC=true
ALIYUN_APS_ORDER_START_DATE=2024-01-01
ALIYUN_APS_INCREMENTAL_ORDER_START_DATE=
ALIYUN_APS_BILL_START_MONTH=2024-01
ALIYUN_APS_ORDER_INCREMENTAL_OVERLAP_DAYS=2
ALIYUN_APS_BILL_INCREMENTAL_OVERLAP_DAYS=7
ALIYUN_APS_MESSAGE_INCREMENTAL_OVERLAP_DAYS=7
ALIYUN_APS_DB_HOST=
ALIYUN_APS_DB_PORT=3306
ALIYUN_APS_DB_USER=
ALIYUN_APS_DB_PASSWORD=
ALIYUN_APS_DB_NAME=
ALIYUN_APS_DB_CHARSET=utf8mb4
ALIYUN_APS_DB_CONNECTION_LIMIT=5
ALIYUN_APS_SMTP_HOST=
ALIYUN_APS_SMTP_PORT=465
ALIYUN_APS_SMTP_SECURE=true
ALIYUN_APS_SMTP_USER=
ALIYUN_APS_SMTP_PASS=
ALIYUN_APS_NOTIFY_EMAIL=
ALIYUN_APS_CLOSE_BROWSER=true
ALIYUN_APS_DB_SYNC_SCRIPT=../aps-aliyun-sync/aps_db_sync.py
```
### 增量模式:默认只查昨天订单
浏览器选择规则:
```env
ALIYUN_APS_FULL_SYNC=false
ALIYUN_APS_INCREMENTAL_ORDER_START_DATE=
```
- 两项都留空:使用 Playwright 自带 Chromium
- `ALIYUN_APS_BROWSER_CHANNEL=chrome`:使用 Chrome
- `ALIYUN_APS_BROWSER_CHANNEL=msedge`:使用 Edge
- `ALIYUN_APS_BROWSER_EXECUTABLE_PATH=...`:使用指定浏览器路径
### 增量模式:指定订单起始日期
## 11. 推荐执行顺序
```env
ALIYUN_APS_FULL_SYNC=false
ALIYUN_APS_INCREMENTAL_ORDER_START_DATE=2026-01-01
```
## 9. 常用组合
### 首次使用
### 首次初始化
```powershell
cd D:\project\python\aliyun-sync\aliyun-aps-sync
@@ -268,49 +147,44 @@ npm run login
npm run sync
```
### 只抓账单
### 日常增量
```powershell
cd D:\project\python\aliyun-sync\aliyun-aps-sync
npm run incremental
```
### 单独同步消息
```powershell
cd D:\project\python\aliyun-sync\aliyun-aps-sync
npm run messages
```
### 账单长任务恢复
```powershell
cd D:\project\python\aliyun-sync\aliyun-aps-sync
npm run login
npm run bills
npm run bills -- --resume
```
### 订单 / 订单详情从指定日期补抓
### 常驻定时任务
```powershell
cd D:\project\python\aliyun-sync\aliyun-aps-sync
npm run login
npm run sync -- --incremental-order-start-date=2026-01-01
npm run schedule
```
### 抓完后只同步账单入库
## 12. 错误文件
```powershell
cd D:\project\python\aliyun-sync\aps-aliyun-sync
python aps_db_sync.py --sync-target bills
运行异常时会保存:
```text
data/errors/<dataset>/
```
### 抓完后增量同步账单入库
```powershell
cd D:\project\python\aliyun-sync\aps-aliyun-sync
python aps_db_sync.py --incremental --sync-target bills
```
## 10. 清理登录态
如果登录态异常,可以删除 `.browser` 后重新登录:
```powershell
cd D:\project\python\aliyun-sync\aliyun-aps-sync
Remove-Item -Recurse -Force .browser
npm run login
```
## 11. 运行时热键
脚本运行时可在当前终端中使用:
## 13. 运行时热键
| 按键 | 功能 |
| --- | --- |
@@ -318,4 +192,13 @@ npm run login
| F8 | 继续 |
| F9 | 终止 |
注意:这是当前终端进程内热键,不是系统级全局热键。
## 14. 本地数据目录
```text
data/current/
data/history/
data/delta/
data/checkpoints/
data/runs/
data/errors/
```

View File

@@ -1,7 +1,28 @@
ALIYUN_APS_BASE_URL=https://aps.aliyun.com
ALIYUN_APS_HEADLESS=false
ALIYUN_APS_BROWSER_CHANNEL=
ALIYUN_APS_BROWSER_EXECUTABLE_PATH=
ALIYUN_APS_TIMEZONE=Asia/Shanghai
ALIYUN_APS_CRON=0 6 * * *
ALIYUN_APS_SCHEDULE_MODE=incremental
ALIYUN_APS_CLOSE_BROWSER=true
ALIYUN_APS_FULL_SYNC=true
ALIYUN_APS_ORDER_START_DATE=2024-01-01
ALIYUN_APS_INCREMENTAL_ORDER_START_DATE=
ALIYUN_APS_BILL_START_MONTH=2024-01
ALIYUN_APS_ORDER_INCREMENTAL_OVERLAP_DAYS=2
ALIYUN_APS_BILL_INCREMENTAL_OVERLAP_DAYS=7
ALIYUN_APS_MESSAGE_INCREMENTAL_OVERLAP_DAYS=7
ALIYUN_APS_DB_HOST=
ALIYUN_APS_DB_PORT=3306
ALIYUN_APS_DB_USER=
ALIYUN_APS_DB_PASSWORD=
ALIYUN_APS_DB_NAME=
ALIYUN_APS_DB_CHARSET=utf8mb4
ALIYUN_APS_DB_CONNECTION_LIMIT=5
ALIYUN_APS_SMTP_HOST=smtp.163.com
ALIYUN_APS_SMTP_PORT=465
ALIYUN_APS_SMTP_SECURE=true
ALIYUN_APS_SMTP_USER=
ALIYUN_APS_SMTP_PASS=
ALIYUN_APS_NOTIFY_EMAIL=

View File

@@ -1,82 +1,223 @@
# aliyun-aps-sync
用于抓取阿里云伙伴中心里的 `我的客户``订单查询``账单查询`,并把结果和本地上一次同步结果做增量对比
Node 版阿里云 APS 同步工具
## 功能
当前主流程已经统一为:
- 首次 `login` 用浏览器手动登录,后续复用本地登录态。
- `sync` 会同步 3 个模块的数据
- 同步后会生成:
- `data/current/*.json` 当前全
- `data/history/<dataset>/*.json` 每次快照
- `data/delta/<dataset>/*.json` 增量变化
- `data/runs/*.json` 每次任务汇总
- `schedule` 支持常驻进程方式按 cron 表达式每天自动同步。
- Playwright 抓取 APS 页面
- 本地保留 `current/history/delta/checkpoints/errors` 数据
- 同步过程中直接写入 MySQL
- 定时任务默认执行日增
## 安装
## 同步范围
```bash
cd /Users/qiangredhad/aliyun-aps-sync
npm install
cp .env.example .env
```
- customers
- customerDetails
- orders
- orderDetails
- bills
- messages
## 配置
## 模式说明
`.env` 里最重要的时间范围:
### Full 模式
- `ALIYUN_APS_ORDER_START_DATE`: 订单查询的起始日期,会按月滚动抓取直到今天。
- `ALIYUN_APS_INCREMENTAL_ORDER_START_DATE`: 订单/订单详情在增量模式下的指定起始日期;留空时仍默认只查昨天。
- `ALIYUN_APS_BILL_START_MONTH`: 账单查询的起始佣金月份,会按月滚动抓取直到当前月。
## 使用
1. 首次登录并保存会话
```bash
npm run login
```
2. 手动执行一次同步
执行:
```bash
npm run sync
```
仅抓账单
行为
- 抓全量 customer + customerDetails
- 抓 orders / orderDetails / bills / messages
- 同步过程中直接写数据库
### Incremental 模式
执行:
```bash
npm run incremental
```
行为:
- 不抓 customer
- 抓 orders / orderDetails / bills / messages
- 以数据库 watermark + overlap 为增量窗口
## 登录
```bash
npm run login
```
会自动验证:
- 我的客户
- 账单查询
并保存登录态到:
- `.browser/`
- `.browser/storage-state.json`
## 账单
### 单独抓账单
```bash
npm run bills
```
如果要从最新账单 checkpoint 继续
### 从最新 checkpoint 继续抓账单
```bash
npm run bills -- --resume
```
如果需要在增量模式下让订单和订单详情从指定日期开始补抓,可以配置:
## 消息
单独抓消息:
```bash
ALIYUN_APS_INCREMENTAL_ORDER_START_DATE=2026-01-01
npm run messages
```
或临时执行:
```bash
npm run sync -- --incremental-order-start-date=2026-01-01
```
3. 常驻定时同步
## 定时任务
```bash
npm run schedule
```
默认 cron 是每天早上 6 点,可在 `.env` 里改 `ALIYUN_APS_CRON`
默认`.env` 中:
## 注意
```env
ALIYUN_APS_SCHEDULE_MODE=incremental
```
- 脚本现在基于页面表格 DOM 抓取,如果阿里云伙伴中心页面结构改版,需要调整 `src/sync.js` 里的表格和筛选器选择逻辑
- 订单和账单的日期输入框是通过页面已有日期值自动识别的,所以首次跑之前建议先在页面确认默认筛选存在。
- 如果登录态过期,重新执行 `npm run login` 即可。
执行日增量
## 增量窗口
### orders
由数据库中 `aps_order.order_time` 最大值决定,回退:
```env
ALIYUN_APS_ORDER_INCREMENTAL_OVERLAP_DAYS=2
```
### bills
由数据库中 `aps_bill.consumption_time` 最大值决定,回退:
```env
ALIYUN_APS_BILL_INCREMENTAL_OVERLAP_DAYS=7
```
### messages
由数据库中 `aliyun_aps_messages.gmt_modified/gmt_created` 最大值决定,回退:
```env
ALIYUN_APS_MESSAGE_INCREMENTAL_OVERLAP_DAYS=7
```
## 数据库配置
`.env` 需要配置:
```env
ALIYUN_APS_DB_HOST=
ALIYUN_APS_DB_PORT=3306
ALIYUN_APS_DB_USER=
ALIYUN_APS_DB_PASSWORD=
ALIYUN_APS_DB_NAME=
ALIYUN_APS_DB_CHARSET=utf8mb4
ALIYUN_APS_DB_CONNECTION_LIMIT=5
```
## 浏览器配置
默认不再强制使用 Google Chrome。
可选配置:
```env
ALIYUN_APS_BROWSER_CHANNEL=
ALIYUN_APS_BROWSER_EXECUTABLE_PATH=
```
说明:
- 两项都留空:使用 Playwright 自带 Chromium。
- `ALIYUN_APS_BROWSER_CHANNEL=chrome`:使用本机 Chrome。
- `ALIYUN_APS_BROWSER_CHANNEL=msedge`:使用本机 Edge。
- `ALIYUN_APS_BROWSER_EXECUTABLE_PATH=...`:指定本地浏览器可执行文件路径。
## 邮件告警
任意运行异常会尝试:
- 保存错误上下文 JSON
- 截图当前页面
- 发送告警邮件
`.env` 配置:
```env
ALIYUN_APS_SMTP_HOST=
ALIYUN_APS_SMTP_PORT=465
ALIYUN_APS_SMTP_SECURE=true
ALIYUN_APS_SMTP_USER=
ALIYUN_APS_SMTP_PASS=
ALIYUN_APS_NOTIFY_EMAIL=
```
错误文件目录:
```text
data/errors/<dataset>/
```
## 本地数据目录
```text
data/current/
data/history/
data/delta/
data/checkpoints/
data/runs/
data/errors/
```
## customer 状态规则
- full 抓到 customer 时,默认写为 `active=1`
- messages 中如果明确识别到“释放”,则标记:
- `active=0`
- `status='released'`
- messages 中如果明确识别到“关联/报备成功/新增客户/绑定客户”,则恢复:
- `active=1`
- `status='active'`
## 安装
```bash
npm install
```
## 运行时热键
- `F7` 暂停
- `F8` 继续
- `F9` 终止
## 说明
- Python 入库脚本已不再是主流程依赖。
- bills 仍保留 checkpoint/resume 能力。
- messages 当前先按列表分页抓取,如后续页面需要详情抓取,再补 detail flow。

View File

@@ -9,11 +9,40 @@
"version": "1.0.0",
"dependencies": {
"dotenv": "^16.6.1",
"mysql2": "^3.15.2",
"node-cron": "^4.2.1",
"nodemailer": "^6.10.1",
"playwright": "^1.58.2"
}
},
"node_modules/@types/node": {
"version": "25.6.0",
"resolved": "https://registry.npmmirror.com/@types/node/-/node-25.6.0.tgz",
"integrity": "sha512-+qIYRKdNYJwY3vRCZMdJbPLJAtGjQBudzZzdzwQYkEPQd+PJGixUL5QfvCLDaULoLv+RhT3LDkwEfKaAkgSmNQ==",
"license": "MIT",
"peer": true,
"dependencies": {
"undici-types": "~7.19.0"
}
},
"node_modules/aws-ssl-profiles": {
"version": "1.1.2",
"resolved": "https://registry.npmmirror.com/aws-ssl-profiles/-/aws-ssl-profiles-1.1.2.tgz",
"integrity": "sha512-NZKeq9AfyQvEeNlN0zSYAaWrmBffJh3IELMZfRpJVWgrpEbtEpnjvzqBPf+mxoI287JohRDoa+/nsfqqiZmF6g==",
"license": "MIT",
"engines": {
"node": ">= 6.0.0"
}
},
"node_modules/denque": {
"version": "2.1.0",
"resolved": "https://registry.npmmirror.com/denque/-/denque-2.1.0.tgz",
"integrity": "sha512-HVQE3AAb/pxF8fQAoiqpvg9i3evqug3hoiwakOyZAwJm+6vZehbkYXZ0l4JxS+I3QxM97v5aaRNhj8v5oBhekw==",
"license": "Apache-2.0",
"engines": {
"node": ">=0.10"
}
},
"node_modules/dotenv": {
"version": "16.6.1",
"resolved": "https://registry.npmmirror.com/dotenv/-/dotenv-16.6.1.tgz",
@@ -40,6 +69,92 @@
"node": "^8.16.0 || ^10.6.0 || >=11.0.0"
}
},
"node_modules/generate-function": {
"version": "2.3.1",
"resolved": "https://registry.npmmirror.com/generate-function/-/generate-function-2.3.1.tgz",
"integrity": "sha512-eeB5GfMNeevm/GRYq20ShmsaGcmI81kIX2K9XQx5miC8KdHaC6Jm0qQ8ZNeGOi7wYB8OsdxKs+Y2oVuTFuVwKQ==",
"license": "MIT",
"dependencies": {
"is-property": "^1.0.2"
}
},
"node_modules/iconv-lite": {
"version": "0.7.2",
"resolved": "https://registry.npmmirror.com/iconv-lite/-/iconv-lite-0.7.2.tgz",
"integrity": "sha512-im9DjEDQ55s9fL4EYzOAv0yMqmMBSZp6G0VvFyTMPKWxiSBHUj9NW/qqLmXUwXrrM7AvqSlTCfvqRb0cM8yYqw==",
"license": "MIT",
"dependencies": {
"safer-buffer": ">= 2.1.2 < 3.0.0"
},
"engines": {
"node": ">=0.10.0"
},
"funding": {
"type": "opencollective",
"url": "https://opencollective.com/express"
}
},
"node_modules/is-property": {
"version": "1.0.2",
"resolved": "https://registry.npmmirror.com/is-property/-/is-property-1.0.2.tgz",
"integrity": "sha512-Ks/IoX00TtClbGQr4TWXemAnktAQvYB7HzcCxDGqEZU6oCmb2INHuOoKxbtR+HFkmYWBKv/dOZtGRiAjDhj92g==",
"license": "MIT"
},
"node_modules/long": {
"version": "5.3.2",
"resolved": "https://registry.npmmirror.com/long/-/long-5.3.2.tgz",
"integrity": "sha512-mNAgZ1GmyNhD7AuqnTG3/VQ26o760+ZYBPKjPvugO8+nLbYfX6TVpJPseBvopbdY+qpZ/lKUnmEc1LeZYS3QAA==",
"license": "Apache-2.0"
},
"node_modules/lru.min": {
"version": "1.1.4",
"resolved": "https://registry.npmmirror.com/lru.min/-/lru.min-1.1.4.tgz",
"integrity": "sha512-DqC6n3QQ77zdFpCMASA1a3Jlb64Hv2N2DciFGkO/4L9+q/IpIAuRlKOvCXabtRW6cQf8usbmM6BE/TOPysCdIA==",
"license": "MIT",
"engines": {
"bun": ">=1.0.0",
"deno": ">=1.30.0",
"node": ">=8.0.0"
},
"funding": {
"type": "github",
"url": "https://github.com/sponsors/wellwelwel"
}
},
"node_modules/mysql2": {
"version": "3.22.3",
"resolved": "https://registry.npmmirror.com/mysql2/-/mysql2-3.22.3.tgz",
"integrity": "sha512-uWWxvZSRvRhtBdh2CdcuK83YcOfPdmEeEYB069bAmPnV93QApDGVPuvCQOLjlh7tYHEWdgQPrn6kosDxHBVLkA==",
"license": "MIT",
"dependencies": {
"aws-ssl-profiles": "^1.1.2",
"denque": "^2.1.0",
"generate-function": "^2.3.1",
"iconv-lite": "^0.7.2",
"long": "^5.3.2",
"lru.min": "^1.1.4",
"named-placeholders": "^1.1.6",
"sql-escaper": "^1.3.3"
},
"engines": {
"node": ">= 8.0"
},
"peerDependencies": {
"@types/node": ">= 8"
}
},
"node_modules/named-placeholders": {
"version": "1.1.6",
"resolved": "https://registry.npmmirror.com/named-placeholders/-/named-placeholders-1.1.6.tgz",
"integrity": "sha512-Tz09sEL2EEuv5fFowm419c1+a/jSMiBjI9gHxVLrVdbUkkNUUfjsVYs9pVZu5oCon/kmRh9TfLEObFtkVxmY0w==",
"license": "MIT",
"dependencies": {
"lru.min": "^1.1.0"
},
"engines": {
"node": ">=8.0.0"
}
},
"node_modules/node-cron": {
"version": "4.2.1",
"resolved": "https://registry.npmmirror.com/node-cron/-/node-cron-4.2.1.tgz",
@@ -87,6 +202,34 @@
"engines": {
"node": ">=18"
}
},
"node_modules/safer-buffer": {
"version": "2.1.2",
"resolved": "https://registry.npmmirror.com/safer-buffer/-/safer-buffer-2.1.2.tgz",
"integrity": "sha512-YZo3K82SD7Riyi0E1EQPojLz7kpepnSQI9IyPbHHg1XXXevb5dJI7tpyN2ADxGcQbHG7vcyRHk0cbwqcQriUtg==",
"license": "MIT"
},
"node_modules/sql-escaper": {
"version": "1.3.3",
"resolved": "https://registry.npmmirror.com/sql-escaper/-/sql-escaper-1.3.3.tgz",
"integrity": "sha512-BsTCV265VpTp8tm1wyIm1xqQCS+Q9NHx2Sr+WcnUrgLrQ6yiDIvHYJV5gHxsj1lMBy2zm5twLaZao8Jd+S8JJw==",
"license": "MIT",
"engines": {
"bun": ">=1.0.0",
"deno": ">=2.0.0",
"node": ">=12.0.0"
},
"funding": {
"type": "github",
"url": "https://github.com/mysqljs/sql-escaper?sponsor=1"
}
},
"node_modules/undici-types": {
"version": "7.19.2",
"resolved": "https://registry.npmmirror.com/undici-types/-/undici-types-7.19.2.tgz",
"integrity": "sha512-qYVnV5OEm2AW8cJMCpdV20CDyaN3g0AjDlOGf1OW4iaDEx8MwdtChUp4zu4H0VP3nDRF/8RKWH+IPp9uW0YGZg==",
"license": "MIT",
"peer": true
}
}
}

View File

@@ -6,11 +6,14 @@
"scripts": {
"login": "node src/index.js login",
"sync": "node src/index.js sync",
"incremental": "node src/index.js incremental",
"bills": "node src/index.js bills",
"messages": "node src/index.js messages",
"schedule": "node src/index.js schedule"
},
"dependencies": {
"dotenv": "^16.6.1",
"mysql2": "^3.15.2",
"nodemailer": "^6.10.1",
"node-cron": "^4.2.1",
"playwright": "^1.58.2"

View File

@@ -20,11 +20,17 @@ export const config = {
rootDir,
baseUrl: process.env.ALIYUN_APS_BASE_URL || 'https://aps.aliyun.com',
headless: toBool(process.env.ALIYUN_APS_HEADLESS, false),
browserChannel: (process.env.ALIYUN_APS_BROWSER_CHANNEL || '').trim(),
browserExecutablePath: (process.env.ALIYUN_APS_BROWSER_EXECUTABLE_PATH || '').trim(),
timezone: process.env.ALIYUN_APS_TIMEZONE || 'Asia/Shanghai',
cron: process.env.ALIYUN_APS_CRON || '0 6 * * *',
orderStartDate: process.env.ALIYUN_APS_ORDER_START_DATE || '2024-01-01',
incrementalOrderStartDate: process.env.ALIYUN_APS_INCREMENTAL_ORDER_START_DATE || '',
billStartMonth: process.env.ALIYUN_APS_BILL_START_MONTH || '2024-01',
orderIncrementalOverlapDays: Math.max(0, Number.parseInt(process.env.ALIYUN_APS_ORDER_INCREMENTAL_OVERLAP_DAYS || '2', 10) || 2),
billIncrementalOverlapDays: Math.max(0, Number.parseInt(process.env.ALIYUN_APS_BILL_INCREMENTAL_OVERLAP_DAYS || '7', 10) || 7),
messageIncrementalOverlapDays: Math.max(0, Number.parseInt(process.env.ALIYUN_APS_MESSAGE_INCREMENTAL_OVERLAP_DAYS || '7', 10) || 7),
scheduleMode: process.env.ALIYUN_APS_SCHEDULE_MODE || 'incremental',
smtp: {
host: process.env.ALIYUN_APS_SMTP_HOST || 'smtp.qq.com',
port: parseInt(process.env.ALIYUN_APS_SMTP_PORT || '465', 10),
@@ -37,11 +43,20 @@ export const config = {
fullSync: toBool(process.env.ALIYUN_APS_FULL_SYNC, true),
resumeBillMonth: process.env.ALIYUN_APS_RESUME_BILL_MONTH || '',
resumeBillPage: Math.max(1, Number.parseInt(process.env.ALIYUN_APS_RESUME_BILL_PAGE || '1', 10) || 1),
dbSyncScript: process.env.ALIYUN_APS_DB_SYNC_SCRIPT || '../aps-aliyun-sync/aps_db_sync.py',
userDataDir: ensureDir(path.join(rootDir, '.browser')),
storageStateFile: path.join(rootDir, '.browser', 'storage-state.json'),
dataDir: ensureDir(path.join(rootDir, 'data')),
downloadDir: ensureDir(path.join(rootDir, 'downloads')),
errorDir: ensureDir(path.join(rootDir, 'data', 'errors')),
db: {
host: process.env.ALIYUN_APS_DB_HOST || '',
port: parseInt(process.env.ALIYUN_APS_DB_PORT || '3306', 10),
user: process.env.ALIYUN_APS_DB_USER || '',
password: process.env.ALIYUN_APS_DB_PASSWORD || '',
database: process.env.ALIYUN_APS_DB_NAME || '',
charset: process.env.ALIYUN_APS_DB_CHARSET || 'utf8mb4',
connectionLimit: Math.max(1, Number.parseInt(process.env.ALIYUN_APS_DB_CONNECTION_LIMIT || '5', 10) || 5),
},
};
export const datasets = {
@@ -180,6 +195,34 @@ export const datasets = {
serviceEndAt: record['服务结束时间'] || '',
}),
},
messages: {
name: 'messages',
url: `${config.baseUrl}/#/message`,
heading: '消息',
pageSize: 20,
uniqueKey: (record) => record.msgId || record.__hash,
normalize: (record) => ({
msgId: pickFirst(record, ['消息ID', 'msg_id', '消息id', 'ID', 'id']),
title: pickFirst(record, ['消息标题', '标题', 'title']),
content: pickFirst(record, ['消息内容', '内容', 'content']),
msgType: pickFirst(record, ['消息类型', 'type', 'msg_type']),
fromApp: pickFirst(record, ['来源应用', 'from_app', '应用']),
bizCode: pickFirst(record, ['业务编码', 'biz_code']),
msgChannel: pickFirst(record, ['消息通道', 'msg_channel']),
categoryId: pickFirst(record, ['分类ID', 'category_id']),
categoryName: pickFirst(record, ['分类名称', 'category_name']),
lv1CategoryId: pickFirst(record, ['一级分类ID', 'lv1_category_id']),
lv2CategoryId: pickFirst(record, ['二级分类ID', 'lv2_category_id']),
lv3CategoryId: pickFirst(record, ['三级分类ID', 'lv3_category_id']),
messageClassification: pickFirst(record, ['归类结果', 'message_classification']),
customerName: pickFirst(record, ['客户名称', 'customer_name']),
orderNo: pickFirst(record, ['订单号', 'order_no']),
status: pickFirst(record, ['消息状态', '状态', 'status']),
gmtCreated: pickFirst(record, ['消息创建时间', '创建时间', 'gmt_created']),
gmtModified: pickFirst(record, ['消息修改时间', '修改时间', 'gmt_modified']),
extraData: record,
}),
},
};
function splitLines(value) {
@@ -188,3 +231,13 @@ function splitLines(value) {
.map((part) => part.trim())
.filter(Boolean);
}
function pickFirst(record, keys) {
for (const key of keys) {
const value = record[key];
if (value != null && String(value).trim()) {
return String(value).trim();
}
}
return '';
}

View File

@@ -0,0 +1,596 @@
import mysql from 'mysql2/promise';
import { config } from './config.js';
let pool = null;
const MESSAGE_TABLE_DDL = `
CREATE TABLE IF NOT EXISTS aliyun_aps_messages (
id bigint NOT NULL AUTO_INCREMENT,
msg_id varchar(128) NULL DEFAULT NULL COMMENT '消息原始ID',
title text NULL COMMENT '消息标题',
content text NULL COMMENT '消息内容',
msg_type varchar(64) NULL DEFAULT NULL COMMENT '消息类型',
from_app varchar(128) NULL DEFAULT NULL COMMENT '来源应用',
biz_code varchar(128) NULL DEFAULT NULL COMMENT '业务编码',
msg_channel varchar(64) NULL DEFAULT NULL COMMENT '消息通道',
category_id varchar(64) NULL DEFAULT NULL COMMENT '分类ID',
category_name varchar(128) NULL DEFAULT NULL COMMENT '分类名称',
lv1_category_id varchar(64) NULL DEFAULT NULL COMMENT '一级分类ID',
lv2_category_id varchar(64) NULL DEFAULT NULL COMMENT '二级分类ID',
lv3_category_id varchar(64) NULL DEFAULT NULL COMMENT '三级分类ID',
message_classification varchar(255) NULL DEFAULT NULL COMMENT '归类结果',
customer_name varchar(255) NULL DEFAULT NULL COMMENT '客户名称',
order_no varchar(128) NULL DEFAULT NULL COMMENT '订单号',
status varchar(32) NULL DEFAULT NULL COMMENT '消息状态(已读/未读)',
gmt_created varchar(64) NULL DEFAULT NULL COMMENT '消息创建时间',
gmt_modified varchar(64) NULL DEFAULT NULL COMMENT '消息修改时间',
extra_data json NULL COMMENT '其他字段(原始JSON)',
crawl_time datetime NULL DEFAULT CURRENT_TIMESTAMP COMMENT '爬取时间',
PRIMARY KEY (id),
UNIQUE KEY uk_msg_id (msg_id)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='阿里云APS站内消息'
`;
let customerLifecycleEnsured = false;
function hasDbConfig() {
return Boolean(config.db.host && config.db.user && config.db.database);
}
function getPool() {
if (!hasDbConfig()) {
throw new Error('未配置数据库连接信息,请设置 ALIYUN_APS_DB_HOST / USER / PASSWORD / NAME');
}
if (pool) {
return pool;
}
pool = mysql.createPool({
host: config.db.host,
port: config.db.port,
user: config.db.user,
password: config.db.password,
database: config.db.database,
charset: config.db.charset,
connectionLimit: config.db.connectionLimit,
waitForConnections: true,
});
return pool;
}
function normalizeMonth(value) {
const normalized = String(value || '').trim();
if (!normalized) {
return null;
}
if (/^\d{4}-\d{2}$/.test(normalized)) {
return normalized;
}
if (/^\d{6}$/.test(normalized)) {
return `${normalized.slice(0, 4)}-${normalized.slice(4, 6)}`;
}
return null;
}
function safeString(value) {
if (value == null) {
return null;
}
const normalized = String(value).trim();
return normalized ? normalized : null;
}
function safeNumber(value) {
if (value == null || value === '') {
return null;
}
const normalized = Number.parseFloat(String(value).replace(/,/g, '').trim());
return Number.isFinite(normalized) ? normalized : null;
}
async function getCustomerMap() {
const [rows] = await getPool().query('SELECT account_id, login_name FROM aps_customer');
const map = new Map();
for (const row of rows) {
const loginName = safeString(row.login_name);
const accountId = safeString(row.account_id);
if (!loginName || !accountId) {
continue;
}
map.set(loginName, accountId);
map.set(loginName.replace(/\s+/g, ''), accountId);
}
return map;
}
function resolveCustomerAccountId(customerMap, customerAccount) {
const normalized = safeString(customerAccount);
if (!normalized) {
return null;
}
return customerMap.get(normalized) || customerMap.get(normalized.replace(/\s+/g, '')) || null;
}
async function queryLatestValue(sql) {
const [rows] = await getPool().query(sql);
const row = Array.isArray(rows) ? rows[0] : null;
if (!row) {
return null;
}
const latest = row.latest_time;
if (!latest) {
return null;
}
if (latest instanceof Date) {
return latest.toISOString().slice(0, 19).replace('T', ' ');
}
return String(latest);
}
export async function getLatestOrderTimeFromDb() {
return queryLatestValue('SELECT MAX(order_time) AS latest_time FROM aps_order');
}
export async function getLatestBillConsumptionTimeFromDb() {
return queryLatestValue('SELECT MAX(consumption_time) AS latest_time FROM aps_bill');
}
export async function getLatestMessageTimeFromDb() {
return queryLatestValue("SELECT MAX(COALESCE(NULLIF(gmt_modified, ''), NULLIF(gmt_created, ''))) AS latest_time FROM aliyun_aps_messages");
}
export async function closeDbPool() {
if (!pool) {
return;
}
await pool.end();
pool = null;
}
export async function ensureMessagesTable() {
await getPool().query(MESSAGE_TABLE_DDL);
}
export async function ensureCustomerLifecycleColumns() {
if (customerLifecycleEnsured) {
return;
}
await getPool().query("ALTER TABLE aps_customer ADD COLUMN IF NOT EXISTS active TINYINT(1) NOT NULL DEFAULT 1 COMMENT '是否有效 1=有效 0=释放'");
await getPool().query("ALTER TABLE aps_customer ADD COLUMN IF NOT EXISTS status VARCHAR(32) DEFAULT 'active' COMMENT '客户状态'");
await getPool().query("ALTER TABLE aps_customer ADD COLUMN IF NOT EXISTS released_at DATETIME NULL COMMENT '释放时间'");
await getPool().query("ALTER TABLE aps_customer ADD COLUMN IF NOT EXISTS release_reason VARCHAR(255) NULL COMMENT '释放原因'");
customerLifecycleEnsured = true;
}
export async function upsertCustomers(records) {
if (!records?.length) {
return { inserted: 0 };
}
await ensureCustomerLifecycleColumns();
const sql = `
INSERT INTO aps_customer (
account_id, login_name, real_name, report_source, report_type, trade_mode,
real_name_status, relation_date, follow_staff, payment_notice_status,
invite_register_type, is_new_customer, performance_start_point_reached,
customer_category, remark, no_consumption_months,
planned_release_time, planned_release_reason,
active, status, released_at, release_reason
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, 1, 'active', NULL, NULL)
ON DUPLICATE KEY UPDATE
login_name=VALUES(login_name),
real_name=VALUES(real_name),
report_source=VALUES(report_source),
report_type=VALUES(report_type),
trade_mode=VALUES(trade_mode),
real_name_status=VALUES(real_name_status),
relation_date=VALUES(relation_date),
follow_staff=VALUES(follow_staff),
payment_notice_status=VALUES(payment_notice_status),
invite_register_type=VALUES(invite_register_type),
is_new_customer=VALUES(is_new_customer),
performance_start_point_reached=VALUES(performance_start_point_reached),
customer_category=VALUES(customer_category),
remark=VALUES(remark),
no_consumption_months=VALUES(no_consumption_months),
planned_release_time=VALUES(planned_release_time),
planned_release_reason=VALUES(planned_release_reason),
active=1,
status='active',
released_at=NULL,
release_reason=NULL
`;
for (const record of records) {
const accountId = safeString(record.accountId);
const loginName = safeString(record.loginName);
if (!accountId || !loginName) {
continue;
}
await getPool().execute(sql, [
accountId,
loginName,
safeString(record.realName),
safeString(record.reportSource),
safeString(record.reportType),
safeString(record.tradeMode),
safeString(record.authStatus),
safeString(record.relationTime),
safeString(record.owner),
safeString(record.paymentNoticeStatus),
safeString(record.inviteType),
safeString(record.isNewCustomer) === '是' ? 1 : safeString(record.isNewCustomer) === '否' ? 0 : null,
safeString(record.isPerformanceQualified) === '是' ? 1 : safeString(record.isPerformanceQualified) === '否' ? 0 : null,
safeString(record.customerCategory),
safeString(record.remark),
safeNumber(record.inactiveMonths),
safeString(record.releasePlanTime),
safeString(record.releasePlanReason),
]);
}
return { inserted: records.length };
}
export async function upsertCustomerDetails(records) {
if (!records?.length) {
return { inserted: 0 };
}
await ensureCustomerLifecycleColumns();
const sql = `
UPDATE aps_customer SET
customer_name = ?,
customer_type = ?,
customer_source = ?,
email = ?,
phone = ?,
department = ?,
payment_notice_status = COALESCE(?, payment_notice_status),
updated_at = CURRENT_TIMESTAMP
WHERE account_id = ?
`;
for (const record of records) {
const accountId = safeString(record.accountId);
if (!accountId) {
continue;
}
await getPool().execute(sql, [
safeString(record.customerName),
safeString(record.customerType),
safeString(record.customerSource),
safeString(record.email),
safeString(record.phone),
safeString(record.department),
safeString(record.paymentNoticeStatus),
accountId,
]);
}
return { inserted: records.length };
}
async function findCustomerAccountIdByName(customerName) {
const normalized = safeString(customerName);
if (!normalized) {
return null;
}
const [rows] = await getPool().execute(
'SELECT account_id FROM aps_customer WHERE customer_name = ? OR real_name = ? OR login_name = ? LIMIT 1',
[normalized, normalized, normalized],
);
return Array.isArray(rows) && rows.length > 0 ? safeString(rows[0].account_id) : null;
}
async function markCustomerReleased(customerName, reason, releasedAt) {
const accountId = await findCustomerAccountIdByName(customerName);
if (!accountId) {
return false;
}
await ensureCustomerLifecycleColumns();
await getPool().execute(
'UPDATE aps_customer SET active = 0, status = ?, released_at = ?, release_reason = ? WHERE account_id = ?',
['released', releasedAt || new Date().toISOString().slice(0, 19).replace('T', ' '), safeString(reason), accountId],
);
return true;
}
async function markCustomerActive(customerName) {
const accountId = await findCustomerAccountIdByName(customerName);
if (!accountId) {
return false;
}
await ensureCustomerLifecycleColumns();
await getPool().execute(
'UPDATE aps_customer SET active = 1, status = ?, released_at = NULL, release_reason = NULL WHERE account_id = ?',
['active', accountId],
);
return true;
}
async function applyCustomerLifecycleFromMessage(record) {
const title = safeString(record.title) || '';
const content = safeString(record.content) || '';
const customerName = safeString(record.customerName) || '';
if (!customerName) {
return;
}
const text = `${title}\n${content}`;
if (/释放/.test(text)) {
await markCustomerReleased(customerName, title || content, safeString(record.gmtModified) || safeString(record.gmtCreated));
return;
}
if (/(关联|报备成功|新增客户|绑定客户)/.test(text)) {
await markCustomerActive(customerName);
}
}
export async function upsertOrders(records) {
if (!records?.length) {
return { inserted: 0 };
}
const customerMap = await getCustomerMap();
const sql = `
INSERT INTO aps_order (
order_id, customer_account_id, customer_login_name,
customer_category, order_type, original_price_cny, paid_amount_cny,
status, order_time, order_month
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
ON DUPLICATE KEY UPDATE
customer_account_id=VALUES(customer_account_id),
customer_login_name=VALUES(customer_login_name),
customer_category=VALUES(customer_category),
order_type=VALUES(order_type),
original_price_cny=VALUES(original_price_cny),
paid_amount_cny=VALUES(paid_amount_cny),
status=VALUES(status),
order_time=VALUES(order_time),
order_month=VALUES(order_month)
`;
for (const record of records) {
const orderId = safeString(record.orderId);
if (!orderId) {
continue;
}
const customerLoginName = safeString(record.customerAccount) || '';
const accountId = resolveCustomerAccountId(customerMap, customerLoginName);
await getPool().execute(sql, [
orderId,
accountId,
customerLoginName,
safeString(record.customerCategory),
safeString(record.orderType),
safeNumber(record.orderOriginalPriceCny),
safeNumber(record.actualPaidCny),
safeString(record.orderStatus),
safeString(record.createdAt),
safeString(record.createdAt)?.slice(0, 7) || null,
]);
}
return { inserted: records.length };
}
export async function upsertOrderDetails(records) {
if (!records?.length) {
return { inserted: 0 };
}
const sql = `
INSERT INTO aps_order_detail (
order_id, order_type, status, trade_type, customer_category,
dealer_name, dealer_uid, customer_type, opportunity_id,
payment_time, order_time, product_name, product_code,
original_price_cny, paid_amount_cny, discount,
payable_amount_cny, coupon_amount_cny
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
ON DUPLICATE KEY UPDATE
order_type=VALUES(order_type),
status=VALUES(status),
trade_type=VALUES(trade_type),
customer_category=VALUES(customer_category),
dealer_name=VALUES(dealer_name),
dealer_uid=VALUES(dealer_uid),
customer_type=VALUES(customer_type),
opportunity_id=VALUES(opportunity_id),
payment_time=VALUES(payment_time),
order_time=VALUES(order_time),
product_name=VALUES(product_name),
product_code=VALUES(product_code),
original_price_cny=VALUES(original_price_cny),
paid_amount_cny=VALUES(paid_amount_cny),
discount=VALUES(discount),
payable_amount_cny=VALUES(payable_amount_cny),
coupon_amount_cny=VALUES(coupon_amount_cny)
`;
for (const record of records) {
const orderId = safeString(record.orderId);
if (!orderId) {
continue;
}
await getPool().execute(sql, [
orderId,
safeString(record.orderType),
safeString(record.status),
safeString(record.tradeType),
safeString(record.customerCategory),
safeString(record.dealerName),
safeString(record.dealerUid),
safeString(record.customerType),
safeString(record.opportunityId),
safeString(record.paymentTime),
safeString(record.orderTime),
safeString(record.productName),
safeString(record.productCode),
safeNumber(record.originalPriceCny),
safeNumber(record.paidAmountCny),
safeNumber(record.discount),
safeNumber(record.payableAmountCny),
safeNumber(record.couponAmountCny),
]);
}
return { inserted: records.length };
}
export async function upsertBills(records) {
if (!records?.length) {
return { inserted: 0 };
}
const customerMap = await getCustomerMap();
const selectSql = `
SELECT id FROM aps_bill
WHERE billing_month = ?
AND commission_month = ?
AND customer_login_name = ?
AND consumption_time = ?
AND product_name = ?
AND original_price_cny <=> ?
LIMIT 1
`;
const insertSql = `
INSERT INTO aps_bill (
billing_month, customer_account_id, customer_login_name,
bill_type, consumption_time,
customer_category,
product_name, product_category,
original_price_cny, customer_payable_amount_cny,
commission_month,
included_in_performance,
rebated,
invite_register_type,
service_start_time, service_end_time
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
`;
const updateSql = `
UPDATE aps_bill SET
customer_account_id = ?,
bill_type = ?,
customer_category = ?,
product_category = ?,
customer_payable_amount_cny = ?,
included_in_performance = ?,
rebated = ?,
invite_register_type = ?,
service_start_time = ?,
service_end_time = ?
WHERE id = ?
`;
for (const record of records) {
const billingMonth = normalizeMonth(record.billingMonth);
const commissionMonth = normalizeMonth(record.commissionMonth);
if (!billingMonth || !commissionMonth) {
continue;
}
const customerLoginName = safeString(record.customerAccount) || '';
const accountId = resolveCustomerAccountId(customerMap, customerLoginName);
const productName = safeString(record.productName);
const consumptionTime = safeString(record.consumeDate);
const originalPrice = safeNumber(record.originalPriceCny);
const [rows] = await getPool().execute(selectSql, [
billingMonth,
commissionMonth,
customerLoginName,
consumptionTime,
productName,
originalPrice,
]);
if (Array.isArray(rows) && rows.length > 0) {
await getPool().execute(updateSql, [
accountId,
safeString(record.billType),
safeString(record.customerCategory),
safeString(record.productCategory),
safeNumber(record.customerPayableCny),
safeString(record.countsForPerformance),
safeString(record.commissionable),
safeString(record.inviteType),
safeString(record.serviceStartAt),
safeString(record.serviceEndAt),
rows[0].id,
]);
continue;
}
await getPool().execute(insertSql, [
billingMonth,
accountId,
customerLoginName,
safeString(record.billType),
consumptionTime,
safeString(record.customerCategory),
productName,
safeString(record.productCategory),
originalPrice,
safeNumber(record.customerPayableCny),
commissionMonth,
safeString(record.countsForPerformance),
safeString(record.commissionable),
safeString(record.inviteType),
safeString(record.serviceStartAt),
safeString(record.serviceEndAt),
]);
}
return { inserted: records.length };
}
export async function upsertMessages(records) {
if (!records?.length) {
return { inserted: 0 };
}
await ensureMessagesTable();
const sql = `
INSERT INTO aliyun_aps_messages (
msg_id, title, content, msg_type, from_app, biz_code, msg_channel,
category_id, category_name, lv1_category_id, lv2_category_id, lv3_category_id,
message_classification, customer_name, order_no, status,
gmt_created, gmt_modified, extra_data
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
ON DUPLICATE KEY UPDATE
title=VALUES(title),
content=VALUES(content),
msg_type=VALUES(msg_type),
from_app=VALUES(from_app),
biz_code=VALUES(biz_code),
msg_channel=VALUES(msg_channel),
category_id=VALUES(category_id),
category_name=VALUES(category_name),
lv1_category_id=VALUES(lv1_category_id),
lv2_category_id=VALUES(lv2_category_id),
lv3_category_id=VALUES(lv3_category_id),
message_classification=VALUES(message_classification),
customer_name=VALUES(customer_name),
order_no=VALUES(order_no),
status=VALUES(status),
gmt_created=VALUES(gmt_created),
gmt_modified=VALUES(gmt_modified),
extra_data=VALUES(extra_data),
crawl_time=CURRENT_TIMESTAMP
`;
for (const record of records) {
const msgId = safeString(record.msgId) || safeString(record.__hash);
if (!msgId) {
continue;
}
await getPool().execute(sql, [
msgId,
safeString(record.title),
safeString(record.content),
safeString(record.msgType),
safeString(record.fromApp),
safeString(record.bizCode),
safeString(record.msgChannel),
safeString(record.categoryId),
safeString(record.categoryName),
safeString(record.lv1CategoryId),
safeString(record.lv2CategoryId),
safeString(record.lv3CategoryId),
safeString(record.messageClassification),
safeString(record.customerName),
safeString(record.orderNo),
safeString(record.status),
safeString(record.gmtCreated),
safeString(record.gmtModified),
JSON.stringify(record.extraData || record),
]);
await applyCustomerLifecycleFromMessage(record);
}
return { inserted: records.length };
}
export { hasDbConfig };

View File

@@ -1,3 +1,5 @@
import { config } from './config.js';
const args = process.argv.slice(2);
const command = args[0] || 'sync';
const extraArgs = args.slice(1);
@@ -9,7 +11,7 @@ for (const arg of extraArgs) {
}
}
const { login, scheduleSync, syncAll, syncBillsOnly } = await import('./sync.js');
const { login, scheduleSync, syncAll, syncAllIncremental, syncBillsOnly, syncMessagesOnly } = await import('./sync.js');
if (command === 'login') {
await login();
@@ -22,12 +24,24 @@ if (command === 'sync') {
process.exit(0);
}
if (command === 'incremental') {
const summary = await syncAllIncremental();
console.log(JSON.stringify(summary, null, 2));
process.exit(0);
}
if (command === 'bills') {
const summary = await syncBillsOnly({ resume: billsResume });
console.log(JSON.stringify(summary, null, 2));
process.exit(0);
}
if (command === 'messages') {
const summary = await syncMessagesOnly({ incremental: config.scheduleMode === 'incremental' });
console.log(JSON.stringify(summary, null, 2));
process.exit(0);
}
if (command === 'schedule') {
await scheduleSync();
} else {

View File

@@ -4,6 +4,49 @@ import { config } from './config.js';
let lastSentAt = 0;
const ONE_HOUR_MS = 60 * 60 * 1000;
function canSendMail() {
if (!config.smtp.user || !config.smtp.pass) {
console.warn('[通知] SMTP 用户名或密码为空,跳过发送邮件');
return false;
}
if (!config.notifyEmail) {
console.warn('[通知] 未配置 ALIYUN_APS_NOTIFY_EMAIL跳过发送邮件');
return false;
}
return true;
}
function createTransporter() {
return nodemailer.createTransport({
host: config.smtp.host,
port: config.smtp.port,
secure: config.smtp.secure,
auth: {
user: config.smtp.user,
pass: config.smtp.pass,
},
});
}
export async function sendRuntimeErrorAlert({ subject, text, attachments = [] }) {
if (!canSendMail()) {
return;
}
const transporter = createTransporter();
await transporter.sendMail({
from: config.smtp.user,
to: config.notifyEmail,
subject,
text,
attachments,
});
console.log(`[通知] 错误提醒邮件已发送至 ${config.notifyEmail}`);
}
export async function sendLoginAlert(loginUrl = '') {
const now = Date.now();
if (now - lastSentAt < ONE_HOUR_MS) {
@@ -11,27 +54,13 @@ export async function sendLoginAlert(loginUrl = '') {
return;
}
const { smtp, notifyEmail, baseUrl } = config;
const { baseUrl } = config;
if (!smtp.user || !smtp.pass) {
console.warn('[通知] SMTP 用户名或密码为空,跳过发送登录提醒邮件');
if (!canSendMail()) {
return;
}
if (!notifyEmail) {
console.warn('[通知] 未配置 ALIYUN_APS_NOTIFY_EMAIL跳过发送登录提醒邮件');
return;
}
const transporter = nodemailer.createTransport({
host: smtp.host,
port: smtp.port,
secure: smtp.secure,
auth: {
user: smtp.user,
pass: smtp.pass,
},
});
const transporter = createTransporter();
const url = loginUrl || `${baseUrl}/#/signin`;
const timestamp = new Date().toISOString();
@@ -45,12 +74,12 @@ export async function sendLoginAlert(loginUrl = '') {
].join('\n');
await transporter.sendMail({
from: smtp.user,
to: notifyEmail,
from: config.smtp.user,
to: config.notifyEmail,
subject,
text,
});
lastSentAt = now;
console.log(`[通知] 登录提醒邮件已发送至 ${notifyEmail}`);
console.log(`[通知] 登录提醒邮件已发送至 ${config.notifyEmail}`);
}

View File

@@ -3,9 +3,21 @@ import cron from 'node-cron';
import fs from 'node:fs';
import path from 'node:path';
import readline from 'node:readline';
import { execSync } from 'node:child_process';
import { config, datasets } from './config.js';
import { sendLoginAlert } from './notify.js';
import { sendLoginAlert, sendRuntimeErrorAlert } from './notify.js';
import {
closeDbPool,
getLatestBillConsumptionTimeFromDb,
getLatestMessageTimeFromDb,
getLatestOrderTimeFromDb,
hasDbConfig,
upsertBills,
upsertCustomerDetails,
upsertCustomers,
upsertMessages,
upsertOrderDetails,
upsertOrders,
} from './db.js';
import {
diffRecords,
loadCurrentState,
@@ -121,14 +133,56 @@ async function runtimeCheckpoint(label) {
await controller.waitIfPaused(label);
}
function clearStaleBrowserProfileLocks() {
const lockFiles = ['SingletonLock', 'SingletonCookie', 'SingletonSocket'];
const now = Date.now();
const staleMs = 10 * 60 * 1000;
for (const fileName of lockFiles) {
const filePath = path.join(config.userDataDir, fileName);
if (!fs.existsSync(filePath)) {
continue;
}
try {
const stat = fs.statSync(filePath);
const ageMs = now - stat.mtimeMs;
if (ageMs < staleMs) {
console.log(`[浏览器锁] 检测到活跃锁文件,保留: ${fileName}`);
continue;
}
fs.rmSync(filePath, { force: true });
console.log(`[浏览器锁] 已清理陈旧锁文件: ${fileName}`);
} catch (error) {
console.warn(`[浏览器锁] 清理 ${fileName} 失败: ${error.message}`);
}
}
}
async function getContext() {
if (_context) return _context;
_context = await chromium.launchPersistentContext(config.userDataDir, {
channel: 'chrome',
clearStaleBrowserProfileLocks();
const launchOptions = {
headless: config.headless,
acceptDownloads: true,
downloadsPath: config.downloadDir,
});
};
if (config.browserChannel) {
launchOptions.channel = config.browserChannel;
}
if (config.browserExecutablePath) {
launchOptions.executablePath = config.browserExecutablePath;
}
try {
_context = await chromium.launchPersistentContext(config.userDataDir, launchOptions);
} catch (error) {
const browserHint = config.browserExecutablePath
? `executablePath=${config.browserExecutablePath}`
: config.browserChannel
? `channel=${config.browserChannel}`
: 'bundled-chromium';
throw new Error(`浏览器启动失败(${browserHint})。请确认没有其他浏览器占用 .browser 目录,或删除 .browser 后重新执行 npm run login。原始错误: ${error.message}`);
}
await restoreStorageState(_context);
return _context;
}
@@ -185,6 +239,91 @@ function loadLatestBillsCheckpoint() {
}
}
function subtractDays(dateValue, days) {
const next = new Date(dateValue);
next.setDate(next.getDate() - days);
return next;
}
function parseDbDateTime(value) {
const normalized = String(value || '').trim();
if (!normalized) {
return null;
}
const parsed = new Date(normalized.replace(' ', 'T'));
return Number.isNaN(parsed.getTime()) ? null : parsed;
}
function formatDateTime(date) {
return `${formatDate(date)} ${String(date.getHours()).padStart(2, '0')}:${String(date.getMinutes()).padStart(2, '0')}:${String(date.getSeconds()).padStart(2, '0')}`;
}
function buildSingleDateWindow(startDate, endDate) {
return [{
windowStart: startDate,
windowEnd: endDate,
start: startDate,
end: endDate,
}];
}
async function captureErrorArtifacts(page, metadata = {}) {
const stamp = nowStamp();
const artifactDir = path.join(config.errorDir, metadata.dataset || 'general');
fs.mkdirSync(artifactDir, { recursive: true });
const jsonPath = path.join(artifactDir, `${stamp}.json`);
const screenshotPath = path.join(artifactDir, `${stamp}.png`);
const payload = {
...metadata,
capturedAt: new Date().toISOString(),
pageUrl: page?.url?.() || '',
stack: metadata.error?.stack || metadata.errorMessage || '',
};
fs.writeFileSync(jsonPath, JSON.stringify(payload, null, 2));
let screenshotSaved = false;
if (page) {
try {
await page.screenshot({ path: screenshotPath, fullPage: true });
screenshotSaved = true;
} catch (error) {
console.error('[错误截图] 保存失败:', error.message);
}
}
return {
jsonPath,
screenshotPath: screenshotSaved ? screenshotPath : '',
};
}
async function reportRuntimeError(error, page, metadata = {}) {
const artifacts = await captureErrorArtifacts(page, {
...metadata,
errorMessage: error.message,
error,
});
const subject = `[APS同步异常] ${metadata.label || metadata.dataset || 'sync'} failed`;
const text = [
`时间: ${new Date().toISOString()}`,
`任务: ${metadata.label || ''}`,
`数据集: ${metadata.dataset || ''}`,
`模式: ${metadata.mode || ''}`,
`URL: ${page?.url?.() || ''}`,
`错误: ${error.message}`,
`JSON: ${artifacts.jsonPath}`,
artifacts.screenshotPath ? `截图: ${artifacts.screenshotPath}` : '截图: 保存失败',
].join('\n');
const attachments = [{ filename: path.basename(artifacts.jsonPath), path: artifacts.jsonPath }];
if (artifacts.screenshotPath) {
attachments.push({ filename: path.basename(artifacts.screenshotPath), path: artifacts.screenshotPath });
}
await sendRuntimeErrorAlert({ subject, text, attachments });
}
async function getPageBodyPreview(page) {
return page
.evaluate(() => document.body?.innerText?.substring(0, 500) || '(空)')
@@ -258,32 +397,41 @@ export async function syncAll() {
const runtimeController = getRuntimeController();
runtimeController.bind();
const context = await getContext();
let page = null;
try {
const summary = { startedAt: new Date().toISOString(), datasets: {} };
const page = context.pages()[0] || (await context.newPage());
page = context.pages()[0] || (await context.newPage());
summary.datasets.customers = await syncCustomers(page);
summary.datasets.customerDetails = await syncCustomerDetails(page);
summary.datasets.orders = await syncOrders(page);
if (config.fullSync) {
summary.datasets.customers = await syncCustomers(page);
summary.datasets.customerDetails = await syncCustomerDetails(page);
}
summary.datasets.orders = await syncOrders(page, { incremental: !config.fullSync });
// syncOrders 完成后,从最新的 orders.json 读取 orderId 列表
const latestOrders = loadCurrentState('orders', datasets.orders.uniqueKey);
const orderIdsForDetail = collectValidOrderIds(latestOrders.records || []);
summary.datasets.orderDetails = await syncOrderDetails(page, orderIdsForDetail);
summary.datasets.bills = await syncBills(page);
summary.datasets.bills = await syncBills(page, { incremental: !config.fullSync });
summary.datasets.messages = await syncMessages(page, { incremental: !config.fullSync });
summary.finishedAt = new Date().toISOString();
const stamp = nowStamp();
saveRunSummary(stamp, summary);
return summary;
} catch (error) {
await reportRuntimeError(error, page, { label: 'syncAll', dataset: 'all', mode: config.fullSync ? 'full' : 'incremental' });
throw error;
} finally {
if (config.closeBrowser) {
await closeContextIfNeeded();
} else {
console.log('浏览器保持运行');
}
await closeDbPool();
runtimeController.unbind();
}
}
@@ -292,10 +440,11 @@ export async function syncBillsOnly(options = {}) {
const runtimeController = getRuntimeController();
runtimeController.bind();
const context = await getContext();
let page = null;
try {
const summary = { startedAt: new Date().toISOString(), datasets: {} };
const page = context.pages()[0] || (await context.newPage());
page = context.pages()[0] || (await context.newPage());
summary.datasets.bills = await syncBills(page, options);
summary.finishedAt = new Date().toISOString();
@@ -303,12 +452,45 @@ export async function syncBillsOnly(options = {}) {
const stamp = nowStamp();
saveRunSummary(stamp, summary);
return summary;
} catch (error) {
await reportRuntimeError(error, page, { label: 'syncBillsOnly', dataset: 'bills', mode: options.incremental ? 'incremental' : 'full' });
throw error;
} finally {
if (config.closeBrowser) {
await closeContextIfNeeded();
} else {
console.log('浏览器保持运行');
}
await closeDbPool();
runtimeController.unbind();
}
}
export async function syncMessagesOnly(options = {}) {
const runtimeController = getRuntimeController();
runtimeController.bind();
const context = await getContext();
let page = null;
try {
const summary = { startedAt: new Date().toISOString(), datasets: {} };
page = context.pages()[0] || (await context.newPage());
summary.datasets.messages = await syncMessages(page, options);
summary.finishedAt = new Date().toISOString();
const stamp = nowStamp();
saveRunSummary(stamp, summary);
return summary;
} catch (error) {
await reportRuntimeError(error, page, { label: 'syncMessagesOnly', dataset: 'messages', mode: options.incremental ? 'incremental' : 'full' });
throw error;
} finally {
if (config.closeBrowser) {
await closeContextIfNeeded();
} else {
console.log('浏览器保持运行');
}
await closeDbPool();
runtimeController.unbind();
}
}
@@ -319,22 +501,11 @@ export async function scheduleSync() {
config.cron,
async () => {
try {
console.log(`[${new Date().toISOString()}] 开始执行同步`);
const summary = await syncAll();
console.log(`[${new Date().toISOString()}] 开始执行同步 mode=${config.scheduleMode}`);
const summary = config.scheduleMode === 'full'
? await syncAll()
: await syncAllIncremental();
console.log(`[${new Date().toISOString()}] 同步完成`, JSON.stringify(summary, null, 2));
try {
const scriptPath = path.resolve(config.rootDir, config.dbSyncScript);
const incrementalFlag = config.fullSync ? '' : ' --incremental';
console.log(`[入库] 执行 ${scriptPath}${incrementalFlag ? ' (增量模式)' : ''}`);
const output = execSync(`python "${scriptPath}"${incrementalFlag}`, {
cwd: path.dirname(scriptPath),
encoding: 'utf-8',
timeout: 120000,
});
console.log(output);
} catch (e) {
console.error('[入库] 失败:', e.message);
}
} catch (error) {
console.error(`[${new Date().toISOString()}] 同步失败`, error);
}
@@ -343,6 +514,40 @@ export async function scheduleSync() {
);
}
export async function syncAllIncremental() {
const runtimeController = getRuntimeController();
runtimeController.bind();
const context = await getContext();
let page = null;
try {
const summary = { startedAt: new Date().toISOString(), mode: 'incremental', datasets: {} };
page = context.pages()[0] || (await context.newPage());
summary.datasets.orders = await syncOrders(page, { incremental: true });
const latestOrders = loadCurrentState('orders', datasets.orders.uniqueKey);
const orderIdsForDetail = collectValidOrderIds(latestOrders.records || []);
summary.datasets.orderDetails = await syncOrderDetails(page, orderIdsForDetail);
summary.datasets.bills = await syncBills(page, { incremental: true });
summary.datasets.messages = await syncMessages(page, { incremental: true });
summary.finishedAt = new Date().toISOString();
const stamp = nowStamp();
saveRunSummary(stamp, summary);
return summary;
} catch (error) {
await reportRuntimeError(error, page, { label: 'syncAllIncremental', dataset: 'incremental', mode: 'incremental' });
throw error;
} finally {
if (config.closeBrowser) {
await closeContextIfNeeded();
} else {
console.log('浏览器保持运行');
}
await closeDbPool();
runtimeController.unbind();
}
}
async function syncCustomers(page) {
await runtimeCheckpoint('同步客户');
const dataset = datasets.customers;
@@ -350,6 +555,10 @@ async function syncCustomers(page) {
await waitUntilReady(page, dataset.heading);
await trySetPageSize(page, dataset.pageSize);
const records = await scrapePagedTable(page, dataset, {});
if (hasDbConfig()) {
const normalizedRecords = dedupeByHash(normalizeDatasetRecords(dataset, records, {}));
await upsertCustomers(normalizedRecords);
}
return persistDataset(dataset, records, {});
}
@@ -393,20 +602,25 @@ async function syncCustomerDetails(page) {
const detail = await extractCustomerDetail(page);
allDetails.push({ ...detail, __context: { accountId } });
if (hasDbConfig()) {
const normalizedDetail = normalizeDatasetRecords(dataset, [{ ...detail, __context: { accountId } }], {});
await upsertCustomerDetails(normalizedDetail);
}
}
return persistDataset(dataset, dedupeByHash(allDetails), {});
}
async function syncOrders(page) {
async function syncOrders(page, options = {}) {
await runtimeCheckpoint('同步订单');
const dataset = datasets.orders;
const { incremental = false } = options;
let windows;
if (config.fullSync) {
if (!incremental) {
windows = buildMonthlyDateWindows(config.orderStartDate);
} else {
windows = buildIncrementalOrderWindows();
windows = await buildIncrementalOrderWindows();
}
const allRecords = [];
@@ -420,12 +634,16 @@ async function syncOrders(page) {
await trySetPageSize(page, dataset.pageSize);
const records = await scrapePagedTable(page, dataset, window);
allRecords.push(...records);
if (hasDbConfig()) {
const normalizedWindowRecords = dedupeByHash(normalizeDatasetRecords(dataset, records, window));
await upsertOrders(normalizedWindowRecords);
}
}
return persistDataset(dataset, dedupeByHash(allRecords), {});
}
function buildIncrementalOrderWindows() {
async function buildIncrementalOrderWindows() {
const configuredStartDate = normalizeConfiguredDate(config.incrementalOrderStartDate);
if (configuredStartDate) {
const windows = buildMonthlyDateWindows(configuredStartDate);
@@ -433,11 +651,27 @@ function buildIncrementalOrderWindows() {
return windows;
}
const yesterday = new Date();
yesterday.setDate(yesterday.getDate() - 1);
const dateStr = formatDate(yesterday);
console.log(`[增量模式] 订单仅查询: ${dateStr}`);
return [{ windowStart: dateStr, windowEnd: dateStr, start: dateStr, end: dateStr }];
if (!hasDbConfig()) {
const yesterday = new Date();
yesterday.setDate(yesterday.getDate() - 1);
const dateStr = formatDate(yesterday);
console.log(`[增量模式] 未配置数据库,订单仅查询: ${dateStr}`);
return buildSingleDateWindow(dateStr, dateStr);
}
const latestOrderTime = await getLatestOrderTimeFromDb();
const runAt = new Date();
const parsed = parseDbDateTime(latestOrderTime);
if (!parsed) {
const dateStr = formatDate(runAt);
console.log(`[增量模式] 数据库无订单水位,订单仅查询当天: ${dateStr}`);
return buildSingleDateWindow(dateStr, dateStr);
}
const startDate = formatDate(subtractDays(parsed, config.orderIncrementalOverlapDays));
const endDate = formatDate(runAt);
console.log(`[增量模式] 订单窗口: ${startDate} ~ ${endDate} (db_last=${latestOrderTime}, overlap=${config.orderIncrementalOverlapDays}d)`);
return buildSingleDateWindow(startDate, endDate);
}
function normalizeConfiguredDate(value) {
@@ -460,18 +694,20 @@ function normalizeConfiguredDate(value) {
async function syncBills(page, options = {}) {
await runtimeCheckpoint('同步账单');
const dataset = datasets.bills;
const { resume = false } = options;
const { resume = false, incremental = false } = options;
let months;
let latestConsumptionDate = null;
if (config.fullSync) {
if (!incremental) {
months = buildMonthList(config.billStartMonth);
} else {
latestConsumptionDate = getLatestBillConsumptionDate();
const incrementalMonth = latestConsumptionDate?.slice(0, 7)
|| `${new Date().getFullYear()}-${String(new Date().getMonth() + 1).padStart(2, '0')}`;
months = [incrementalMonth];
console.log(`[增量模式] 账单仅查询: ${incrementalMonth}${latestConsumptionDate ? `, 数据库最新消费时间: ${latestConsumptionDate}` : ''}`);
latestConsumptionDate = await getLatestBillConsumptionDate();
const startDate = latestConsumptionDate ? latestConsumptionDate.slice(0, 10) : formatDate(new Date());
const endDate = formatDate(new Date());
const startMonth = startDate.slice(0, 7);
const endMonth = endDate.slice(0, 7);
months = buildMonthList(startMonth).filter((month) => month <= endMonth);
console.log(`[增量模式] 账单窗口: ${startDate} ~ ${endDate}${latestConsumptionDate ? `, 数据库最新消费时间: ${latestConsumptionDate}` : ''}`);
}
const resumeCheckpoint = resume ? loadLatestBillsCheckpoint() : null;
@@ -514,6 +750,9 @@ async function syncBills(page, options = {}) {
onPage: async ({ pageNum, pageRows }) => {
const normalizedPageRows = normalizeDatasetRecords(dataset, pageRows, { month });
monthNormalizedRecords.push(...normalizedPageRows);
if (hasDbConfig()) {
await upsertBills(normalizedPageRows);
}
let checkpointRecords = monthNormalizedRecords;
if (latestConsumptionDate) {
checkpointRecords = monthNormalizedRecords.filter((record) => isAfterLatestConsumptionDate(record, latestConsumptionDate));
@@ -538,6 +777,42 @@ async function syncBills(page, options = {}) {
return persistNormalizedDataset(dataset, dedupeByHash(allNormalizedRecords));
}
async function syncMessages(page, options = {}) {
await runtimeCheckpoint('同步消息');
const dataset = datasets.messages;
const { incremental = false } = options;
await page.goto(dataset.url, { waitUntil: 'domcontentloaded' });
await waitUntilReady(page, dataset.heading);
await trySetPageSize(page, dataset.pageSize);
let records = await scrapePagedTable(page, dataset, {}, {
onPage: hasDbConfig()
? async ({ pageRows }) => {
const normalizedPageRows = normalizeDatasetRecords(dataset, pageRows, {});
await upsertMessages(normalizedPageRows);
}
: undefined,
});
if (incremental && hasDbConfig()) {
try {
const latestMessageTime = await getLatestMessageTimeFromDb();
if (latestMessageTime) {
const latest = parseDbDateTime(latestMessageTime);
if (latest) {
const watermark = subtractDays(latest, config.messageIncrementalOverlapDays);
const before = records.length;
records = records.filter((record) => isAfterLatestMessageTime(record, watermark));
console.log(`[增量模式] 消息按时间过滤: ${before} -> ${records.length} (db_last=${latestMessageTime}, overlap=${config.messageIncrementalOverlapDays}d)`);
}
}
} catch (error) {
console.error('[增量模式] 查询数据库最新消息时间失败:', error.message);
}
}
return persistDataset(dataset, dedupeByHash(records), {});
}
async function saveBillsCheckpoint(dataset, month, pageNum, normalizedRecords) {
const normalized = dedupeByHash(normalizedRecords);
const checkpointName = `${month}-latest`;
@@ -571,16 +846,22 @@ async function moveBillsToResumeStart(page, resumeFromPage) {
return moved;
}
function getLatestBillConsumptionDate() {
const scriptPath = path.resolve(config.rootDir, config.dbSyncScript);
async function getLatestBillConsumptionDate() {
if (!hasDbConfig()) {
console.warn('[增量模式] 未配置数据库连接,无法读取账单水位,回退到当前日期');
return null;
}
try {
const output = execSync(`python "${scriptPath}" --latest-bill-consumption-time`, {
cwd: path.dirname(scriptPath),
encoding: 'utf-8',
timeout: 120000,
}).trim();
const latest = output.split(/\r?\n/).map((line) => line.trim()).filter(Boolean).at(-1) || '';
return /^\d{4}-\d{2}-\d{2}/.test(latest) ? latest.slice(0, 10) : null;
const latest = await getLatestBillConsumptionTimeFromDb();
if (!latest || !/^\d{4}-\d{2}-\d{2}/.test(latest)) {
return null;
}
const parsed = parseDbDateTime(latest);
if (!parsed) {
return latest.slice(0, 10);
}
return formatDate(subtractDays(parsed, config.billIncrementalOverlapDays));
} catch (error) {
console.error('[增量模式] 查询数据库最新账单消费时间失败:', error.message);
return null;
@@ -595,6 +876,18 @@ function isAfterLatestConsumptionDate(record, latestConsumptionDate) {
return consumeDate > latestConsumptionDate;
}
function isAfterLatestMessageTime(record, watermarkDate) {
const value = String(record['消息修改时间'] || record['修改时间'] || record.gmtModified || record['消息创建时间'] || record['创建时间'] || record.gmtCreated || '').trim();
if (!value) {
return true;
}
const parsed = parseDbDateTime(value);
if (!parsed) {
return true;
}
return parsed >= watermarkDate;
}
async function syncOrderDetails(page, cachedOrderIds) {
await runtimeCheckpoint('同步订单详情');
const dataset = datasets.orderDetails;
@@ -638,6 +931,10 @@ async function syncOrderDetails(page, cachedOrderIds) {
detail.orderId = orderId;
}
allDetails.push({ ...detail, __context: {} });
if (hasDbConfig()) {
const normalizedDetail = normalizeDatasetRecords(dataset, [{ ...detail, __context: {} }], {});
await upsertOrderDetails(normalizedDetail);
}
}
return persistDataset(dataset, dedupeByHash(allDetails), {});