diff --git a/aliyun-sync/COMMANDS.md b/aliyun-sync/COMMANDS.md index 413bbd0..3aeb78e 100644 --- a/aliyun-sync/COMMANDS.md +++ b/aliyun-sync/COMMANDS.md @@ -1,25 +1,13 @@ -# 阿里云 APS 同步命令清单 +# 阿里云 APS Node 同步命令清单 -本文档整理项目中常用的爬取、同步、入库、调度、增量和运行时控制命令。 - -## 1. 项目目录 - -### 前端爬取项目 +## 1. 进入项目目录 ```powershell cd D:\project\python\aliyun-sync\aliyun-aps-sync ``` -### 后端入库项目 - -```powershell -cd D:\project\python\aliyun-sync\aps-aliyun-sync -``` - ## 2. 安装依赖 -在前端爬取项目目录执行: - ```powershell npm install ``` @@ -32,234 +20,125 @@ npm run login 作用: -- 打开浏览器。 -- 手动完成阿里云 / RAM 登录。 -- 自动验证“我的客户”和“账单查询”页面。 -- 保存登录态到 `.browser/` 和 `.browser/storage-state.json`。 +- 打开浏览器 +- 手动完成阿里云 / RAM 登录 +- 自动验证我的客户和账单查询 +- 保存登录态 -## 4. 爬取 / 同步命令 - -### 全量同步全部模块 +## 4. 全量同步 ```powershell npm run sync ``` -默认同步: +默认包含: - customers - customerDetails - orders - orderDetails - bills +- messages -### 仅爬取账单 +并在同步过程中直接写入数据库。 + +## 5. 日增量同步 + +```powershell +npm run incremental +``` + +默认包含: + +- orders +- orderDetails +- bills +- messages + +说明: + +- 不抓 customer +- 按数据库 watermark + overlap 抓取 + +## 6. 单独抓账单 ```powershell npm run bills ``` -### 基于 checkpoint 断点续爬账单 +## 7. 单独抓消息 + +```powershell +npm run messages +``` + +## 8. 从最新 checkpoint 继续抓账单 ```powershell npm run bills -- --resume ``` -作用: - -- 自动读取 `data/checkpoints/bills/` 下最新 checkpoint。 -- 从 checkpoint 记录的月份和页码之后继续抓取。 - -### 启动定时同步 +## 9. 定时任务 ```powershell npm run schedule ``` -默认 cron: +默认读取: ```env -ALIYUN_APS_CRON=0 6 * * * +ALIYUN_APS_SCHEDULE_MODE=incremental ``` -表示每天早上 6 点执行。 - -## 5. 增量同步 - -### 默认增量同步 - -在 `.env` 中设置: - -```env -ALIYUN_APS_FULL_SYNC=false -``` - -然后执行: - -```powershell -npm run sync -``` - -默认行为: - -- 订单只查昨天。 -- 订单详情跟随本次订单结果。 -- 账单按数据库最新消费时间增量。 - -### 指定订单 / 订单详情增量起始日期 - -临时命令方式: - -```powershell -npm run sync -- --incremental-order-start-date=2026-01-01 -``` - -`.env` 固定配置方式: - -```env -ALIYUN_APS_FULL_SYNC=false -ALIYUN_APS_INCREMENTAL_ORDER_START_DATE=2026-01-01 -``` - -然后执行: - -```powershell -npm run sync -``` - -行为: - -- 订单从指定日期开始补抓到今天。 -- 订单详情跟随这些订单抓取。 - -### 指定账单起始月份 - -在 `.env` 中设置: - -```env -ALIYUN_APS_BILL_START_MONTH=2024-01 -``` - -然后执行: - -```powershell -npm run bills -``` - -说明:当前账单爬取支持按月份开始;如需按具体日期开始,需要新增账单日期过滤参数。 - -## 6. 后端入库命令 - -进入后端入库项目目录: - -```powershell -cd D:\project\python\aliyun-sync\aps-aliyun-sync -``` - -### 全量入库 - -```powershell -python aps_db_sync.py -``` - -### 增量入库 - -```powershell -python aps_db_sync.py --incremental -``` - -### 指定同步对象入库 - -```powershell -python aps_db_sync.py --sync-target all -python aps_db_sync.py --sync-target customer -python aps_db_sync.py --sync-target order -python aps_db_sync.py --sync-target orderdetails -python aps_db_sync.py --sync-target bills -``` - -### 增量只同步账单入库 - -```powershell -python aps_db_sync.py --incremental --sync-target bills -``` - -### 直接将最新 bills checkpoint 入库 - -```powershell -python aps_db_sync.py --sync-target bills --from-checkpoint -``` - -### 查询数据库最新账单消费时间 - -```powershell -python aps_db_sync.py --latest-bill-consumption-time -``` - -## 7. 后端调度器命令 - -进入后端入库项目目录: - -```powershell -cd D:\project\python\aliyun-sync\aps-aliyun-sync -``` - -### 启动调度器 - -```powershell -python aps_scheduler.py -``` - -### 指定同步对象启动调度器 - -```powershell -python aps_scheduler.py --sync-target all -python aps_scheduler.py --sync-target customer -python aps_scheduler.py --sync-target order -python aps_scheduler.py --sync-target orderdetails -python aps_scheduler.py --sync-target bills -``` - -## 8. 常用 `.env` 示例 - -文件位置: - -```powershell -D:\project\python\aliyun-sync\aliyun-aps-sync\.env -``` - -### 全量模式 +## 10. 常用 `.env` 配置 ```env ALIYUN_APS_BASE_URL=https://aps.aliyun.com ALIYUN_APS_HEADLESS=false +ALIYUN_APS_BROWSER_CHANNEL= +ALIYUN_APS_BROWSER_EXECUTABLE_PATH= ALIYUN_APS_TIMEZONE=Asia/Shanghai ALIYUN_APS_CRON=0 6 * * * +ALIYUN_APS_SCHEDULE_MODE=incremental +ALIYUN_APS_CLOSE_BROWSER=true + ALIYUN_APS_FULL_SYNC=true ALIYUN_APS_ORDER_START_DATE=2024-01-01 ALIYUN_APS_INCREMENTAL_ORDER_START_DATE= ALIYUN_APS_BILL_START_MONTH=2024-01 + +ALIYUN_APS_ORDER_INCREMENTAL_OVERLAP_DAYS=2 +ALIYUN_APS_BILL_INCREMENTAL_OVERLAP_DAYS=7 +ALIYUN_APS_MESSAGE_INCREMENTAL_OVERLAP_DAYS=7 + +ALIYUN_APS_DB_HOST= +ALIYUN_APS_DB_PORT=3306 +ALIYUN_APS_DB_USER= +ALIYUN_APS_DB_PASSWORD= +ALIYUN_APS_DB_NAME= +ALIYUN_APS_DB_CHARSET=utf8mb4 +ALIYUN_APS_DB_CONNECTION_LIMIT=5 + +ALIYUN_APS_SMTP_HOST= +ALIYUN_APS_SMTP_PORT=465 +ALIYUN_APS_SMTP_SECURE=true +ALIYUN_APS_SMTP_USER= +ALIYUN_APS_SMTP_PASS= +ALIYUN_APS_NOTIFY_EMAIL= + ALIYUN_APS_CLOSE_BROWSER=true -ALIYUN_APS_DB_SYNC_SCRIPT=../aps-aliyun-sync/aps_db_sync.py ``` -### 增量模式:默认只查昨天订单 +浏览器选择规则: -```env -ALIYUN_APS_FULL_SYNC=false -ALIYUN_APS_INCREMENTAL_ORDER_START_DATE= -``` +- 两项都留空:使用 Playwright 自带 Chromium +- `ALIYUN_APS_BROWSER_CHANNEL=chrome`:使用 Chrome +- `ALIYUN_APS_BROWSER_CHANNEL=msedge`:使用 Edge +- `ALIYUN_APS_BROWSER_EXECUTABLE_PATH=...`:使用指定浏览器路径 -### 增量模式:指定订单起始日期 +## 11. 推荐执行顺序 -```env -ALIYUN_APS_FULL_SYNC=false -ALIYUN_APS_INCREMENTAL_ORDER_START_DATE=2026-01-01 -``` - -## 9. 常用组合 - -### 首次使用 +### 首次初始化 ```powershell cd D:\project\python\aliyun-sync\aliyun-aps-sync @@ -268,49 +147,44 @@ npm run login npm run sync ``` -### 只抓账单 +### 日常增量 + +```powershell +cd D:\project\python\aliyun-sync\aliyun-aps-sync +npm run incremental +``` + +### 单独同步消息 + +```powershell +cd D:\project\python\aliyun-sync\aliyun-aps-sync +npm run messages +``` + +### 账单长任务恢复 ```powershell cd D:\project\python\aliyun-sync\aliyun-aps-sync npm run login -npm run bills +npm run bills -- --resume ``` -### 订单 / 订单详情从指定日期补抓 +### 常驻定时任务 ```powershell cd D:\project\python\aliyun-sync\aliyun-aps-sync -npm run login -npm run sync -- --incremental-order-start-date=2026-01-01 +npm run schedule ``` -### 抓完后只同步账单入库 +## 12. 错误文件 -```powershell -cd D:\project\python\aliyun-sync\aps-aliyun-sync -python aps_db_sync.py --sync-target bills +运行异常时会保存: + +```text +data/errors// ``` -### 抓完后增量同步账单入库 - -```powershell -cd D:\project\python\aliyun-sync\aps-aliyun-sync -python aps_db_sync.py --incremental --sync-target bills -``` - -## 10. 清理登录态 - -如果登录态异常,可以删除 `.browser` 后重新登录: - -```powershell -cd D:\project\python\aliyun-sync\aliyun-aps-sync -Remove-Item -Recurse -Force .browser -npm run login -``` - -## 11. 运行时热键 - -脚本运行时可在当前终端中使用: +## 13. 运行时热键 | 按键 | 功能 | | --- | --- | @@ -318,4 +192,13 @@ npm run login | F8 | 继续 | | F9 | 终止 | -注意:这是当前终端进程内热键,不是系统级全局热键。 +## 14. 本地数据目录 + +```text +data/current/ +data/history/ +data/delta/ +data/checkpoints/ +data/runs/ +data/errors/ +``` diff --git a/aliyun-sync/aliyun-aps-sync/.env.example b/aliyun-sync/aliyun-aps-sync/.env.example index 86e8c54..dccfd35 100644 --- a/aliyun-sync/aliyun-aps-sync/.env.example +++ b/aliyun-sync/aliyun-aps-sync/.env.example @@ -1,7 +1,28 @@ ALIYUN_APS_BASE_URL=https://aps.aliyun.com ALIYUN_APS_HEADLESS=false +ALIYUN_APS_BROWSER_CHANNEL= +ALIYUN_APS_BROWSER_EXECUTABLE_PATH= ALIYUN_APS_TIMEZONE=Asia/Shanghai ALIYUN_APS_CRON=0 6 * * * +ALIYUN_APS_SCHEDULE_MODE=incremental +ALIYUN_APS_CLOSE_BROWSER=true +ALIYUN_APS_FULL_SYNC=true ALIYUN_APS_ORDER_START_DATE=2024-01-01 ALIYUN_APS_INCREMENTAL_ORDER_START_DATE= ALIYUN_APS_BILL_START_MONTH=2024-01 +ALIYUN_APS_ORDER_INCREMENTAL_OVERLAP_DAYS=2 +ALIYUN_APS_BILL_INCREMENTAL_OVERLAP_DAYS=7 +ALIYUN_APS_MESSAGE_INCREMENTAL_OVERLAP_DAYS=7 +ALIYUN_APS_DB_HOST= +ALIYUN_APS_DB_PORT=3306 +ALIYUN_APS_DB_USER= +ALIYUN_APS_DB_PASSWORD= +ALIYUN_APS_DB_NAME= +ALIYUN_APS_DB_CHARSET=utf8mb4 +ALIYUN_APS_DB_CONNECTION_LIMIT=5 +ALIYUN_APS_SMTP_HOST=smtp.163.com +ALIYUN_APS_SMTP_PORT=465 +ALIYUN_APS_SMTP_SECURE=true +ALIYUN_APS_SMTP_USER= +ALIYUN_APS_SMTP_PASS= +ALIYUN_APS_NOTIFY_EMAIL= diff --git a/aliyun-sync/aliyun-aps-sync/README.md b/aliyun-sync/aliyun-aps-sync/README.md index 52596cf..d7606cb 100644 --- a/aliyun-sync/aliyun-aps-sync/README.md +++ b/aliyun-sync/aliyun-aps-sync/README.md @@ -1,82 +1,223 @@ # aliyun-aps-sync -用于抓取阿里云伙伴中心里的 `我的客户`、`订单查询`、`账单查询`,并把结果和本地上一次同步结果做增量对比。 +Node 版阿里云 APS 同步工具。 -## 功能 +当前主流程已经统一为: -- 首次 `login` 用浏览器手动登录,后续复用本地登录态。 -- `sync` 会同步 3 个模块的数据。 -- 同步后会生成: - - `data/current/*.json` 当前全量 - - `data/history//*.json` 每次快照 - - `data/delta//*.json` 增量变化 - - `data/runs/*.json` 每次任务汇总 -- `schedule` 支持常驻进程方式按 cron 表达式每天自动同步。 +- Playwright 抓取 APS 页面 +- 本地保留 `current/history/delta/checkpoints/errors` 数据 +- 同步过程中直接写入 MySQL +- 定时任务默认执行日增量 -## 安装 +## 同步范围 -```bash -cd /Users/qiangredhad/aliyun-aps-sync -npm install -cp .env.example .env -``` +- customers +- customerDetails +- orders +- orderDetails +- bills +- messages -## 配置 +## 模式说明 -`.env` 里最重要的时间范围: +### Full 模式 -- `ALIYUN_APS_ORDER_START_DATE`: 订单查询的起始日期,会按月滚动抓取直到今天。 -- `ALIYUN_APS_INCREMENTAL_ORDER_START_DATE`: 订单/订单详情在增量模式下的指定起始日期;留空时仍默认只查昨天。 -- `ALIYUN_APS_BILL_START_MONTH`: 账单查询的起始佣金月份,会按月滚动抓取直到当前月。 - -## 使用 - -1. 首次登录并保存会话 - -```bash -npm run login -``` - -2. 手动执行一次同步 +执行: ```bash npm run sync ``` -仅抓账单: +行为: + +- 抓全量 customer + customerDetails +- 抓 orders / orderDetails / bills / messages +- 同步过程中直接写数据库 + +### Incremental 模式 + +执行: + +```bash +npm run incremental +``` + +行为: + +- 不抓 customer +- 抓 orders / orderDetails / bills / messages +- 以数据库 watermark + overlap 为增量窗口 + +## 登录 + +```bash +npm run login +``` + +会自动验证: + +- 我的客户 +- 账单查询 + +并保存登录态到: + +- `.browser/` +- `.browser/storage-state.json` + +## 账单 + +### 单独抓账单 ```bash npm run bills ``` -如果要从最新账单 checkpoint 继续: +### 从最新 checkpoint 继续抓账单 ```bash npm run bills -- --resume ``` -如果需要在增量模式下让订单和订单详情从指定日期开始补抓,可以配置: +## 消息 + +单独抓消息: ```bash -ALIYUN_APS_INCREMENTAL_ORDER_START_DATE=2026-01-01 +npm run messages ``` -或临时执行: - -```bash -npm run sync -- --incremental-order-start-date=2026-01-01 -``` - -3. 常驻定时同步 +## 定时任务 ```bash npm run schedule ``` -默认 cron 是每天早上 6 点,可在 `.env` 里改 `ALIYUN_APS_CRON`。 +默认按 `.env` 中: -## 注意 +```env +ALIYUN_APS_SCHEDULE_MODE=incremental +``` -- 脚本现在基于页面表格 DOM 抓取,如果阿里云伙伴中心页面结构改版,需要调整 `src/sync.js` 里的表格和筛选器选择逻辑。 -- 订单和账单的日期输入框是通过页面已有日期值自动识别的,所以首次跑之前建议先在页面确认默认筛选存在。 -- 如果登录态过期,重新执行 `npm run login` 即可。 +执行日增量。 + +## 增量窗口 + +### orders + +由数据库中 `aps_order.order_time` 最大值决定,回退: + +```env +ALIYUN_APS_ORDER_INCREMENTAL_OVERLAP_DAYS=2 +``` + +### bills + +由数据库中 `aps_bill.consumption_time` 最大值决定,回退: + +```env +ALIYUN_APS_BILL_INCREMENTAL_OVERLAP_DAYS=7 +``` + +### messages + +由数据库中 `aliyun_aps_messages.gmt_modified/gmt_created` 最大值决定,回退: + +```env +ALIYUN_APS_MESSAGE_INCREMENTAL_OVERLAP_DAYS=7 +``` + +## 数据库配置 + +`.env` 需要配置: + +```env +ALIYUN_APS_DB_HOST= +ALIYUN_APS_DB_PORT=3306 +ALIYUN_APS_DB_USER= +ALIYUN_APS_DB_PASSWORD= +ALIYUN_APS_DB_NAME= +ALIYUN_APS_DB_CHARSET=utf8mb4 +ALIYUN_APS_DB_CONNECTION_LIMIT=5 +``` + +## 浏览器配置 + +默认不再强制使用 Google Chrome。 + +可选配置: + +```env +ALIYUN_APS_BROWSER_CHANNEL= +ALIYUN_APS_BROWSER_EXECUTABLE_PATH= +``` + +说明: + +- 两项都留空:使用 Playwright 自带 Chromium。 +- `ALIYUN_APS_BROWSER_CHANNEL=chrome`:使用本机 Chrome。 +- `ALIYUN_APS_BROWSER_CHANNEL=msedge`:使用本机 Edge。 +- `ALIYUN_APS_BROWSER_EXECUTABLE_PATH=...`:指定本地浏览器可执行文件路径。 + +## 邮件告警 + +任意运行异常会尝试: + +- 保存错误上下文 JSON +- 截图当前页面 +- 发送告警邮件 + +`.env` 配置: + +```env +ALIYUN_APS_SMTP_HOST= +ALIYUN_APS_SMTP_PORT=465 +ALIYUN_APS_SMTP_SECURE=true +ALIYUN_APS_SMTP_USER= +ALIYUN_APS_SMTP_PASS= +ALIYUN_APS_NOTIFY_EMAIL= +``` + +错误文件目录: + +```text +data/errors// +``` + +## 本地数据目录 + +```text +data/current/ +data/history/ +data/delta/ +data/checkpoints/ +data/runs/ +data/errors/ +``` + +## customer 状态规则 + +- full 抓到 customer 时,默认写为 `active=1` +- messages 中如果明确识别到“释放”,则标记: + - `active=0` + - `status='released'` +- messages 中如果明确识别到“关联/报备成功/新增客户/绑定客户”,则恢复: + - `active=1` + - `status='active'` + +## 安装 + +```bash +npm install +``` + +## 运行时热键 + +- `F7` 暂停 +- `F8` 继续 +- `F9` 终止 + +## 说明 + +- Python 入库脚本已不再是主流程依赖。 +- bills 仍保留 checkpoint/resume 能力。 +- messages 当前先按列表分页抓取,如后续页面需要详情抓取,再补 detail flow。 diff --git a/aliyun-sync/aliyun-aps-sync/package-lock.json b/aliyun-sync/aliyun-aps-sync/package-lock.json index f02444a..9c831cf 100644 --- a/aliyun-sync/aliyun-aps-sync/package-lock.json +++ b/aliyun-sync/aliyun-aps-sync/package-lock.json @@ -9,11 +9,40 @@ "version": "1.0.0", "dependencies": { "dotenv": "^16.6.1", + "mysql2": "^3.15.2", "node-cron": "^4.2.1", "nodemailer": "^6.10.1", "playwright": "^1.58.2" } }, + "node_modules/@types/node": { + "version": "25.6.0", + "resolved": "https://registry.npmmirror.com/@types/node/-/node-25.6.0.tgz", + "integrity": "sha512-+qIYRKdNYJwY3vRCZMdJbPLJAtGjQBudzZzdzwQYkEPQd+PJGixUL5QfvCLDaULoLv+RhT3LDkwEfKaAkgSmNQ==", + "license": "MIT", + "peer": true, + "dependencies": { + "undici-types": "~7.19.0" + } + }, + "node_modules/aws-ssl-profiles": { + "version": "1.1.2", + "resolved": "https://registry.npmmirror.com/aws-ssl-profiles/-/aws-ssl-profiles-1.1.2.tgz", + "integrity": "sha512-NZKeq9AfyQvEeNlN0zSYAaWrmBffJh3IELMZfRpJVWgrpEbtEpnjvzqBPf+mxoI287JohRDoa+/nsfqqiZmF6g==", + "license": "MIT", + "engines": { + "node": ">= 6.0.0" + } + }, + "node_modules/denque": { + "version": "2.1.0", + "resolved": "https://registry.npmmirror.com/denque/-/denque-2.1.0.tgz", + "integrity": "sha512-HVQE3AAb/pxF8fQAoiqpvg9i3evqug3hoiwakOyZAwJm+6vZehbkYXZ0l4JxS+I3QxM97v5aaRNhj8v5oBhekw==", + "license": "Apache-2.0", + "engines": { + "node": ">=0.10" + } + }, "node_modules/dotenv": { "version": "16.6.1", "resolved": "https://registry.npmmirror.com/dotenv/-/dotenv-16.6.1.tgz", @@ -40,6 +69,92 @@ "node": "^8.16.0 || ^10.6.0 || >=11.0.0" } }, + "node_modules/generate-function": { + "version": "2.3.1", + "resolved": "https://registry.npmmirror.com/generate-function/-/generate-function-2.3.1.tgz", + "integrity": "sha512-eeB5GfMNeevm/GRYq20ShmsaGcmI81kIX2K9XQx5miC8KdHaC6Jm0qQ8ZNeGOi7wYB8OsdxKs+Y2oVuTFuVwKQ==", + "license": "MIT", + "dependencies": { + "is-property": "^1.0.2" + } + }, + "node_modules/iconv-lite": { + "version": "0.7.2", + "resolved": "https://registry.npmmirror.com/iconv-lite/-/iconv-lite-0.7.2.tgz", + "integrity": "sha512-im9DjEDQ55s9fL4EYzOAv0yMqmMBSZp6G0VvFyTMPKWxiSBHUj9NW/qqLmXUwXrrM7AvqSlTCfvqRb0cM8yYqw==", + "license": "MIT", + "dependencies": { + "safer-buffer": ">= 2.1.2 < 3.0.0" + }, + "engines": { + "node": ">=0.10.0" + }, + "funding": { + "type": "opencollective", + "url": "https://opencollective.com/express" + } + }, + "node_modules/is-property": { + "version": "1.0.2", + "resolved": "https://registry.npmmirror.com/is-property/-/is-property-1.0.2.tgz", + "integrity": "sha512-Ks/IoX00TtClbGQr4TWXemAnktAQvYB7HzcCxDGqEZU6oCmb2INHuOoKxbtR+HFkmYWBKv/dOZtGRiAjDhj92g==", + "license": "MIT" + }, + "node_modules/long": { + "version": "5.3.2", + "resolved": "https://registry.npmmirror.com/long/-/long-5.3.2.tgz", + "integrity": "sha512-mNAgZ1GmyNhD7AuqnTG3/VQ26o760+ZYBPKjPvugO8+nLbYfX6TVpJPseBvopbdY+qpZ/lKUnmEc1LeZYS3QAA==", + "license": "Apache-2.0" + }, + "node_modules/lru.min": { + "version": "1.1.4", + "resolved": "https://registry.npmmirror.com/lru.min/-/lru.min-1.1.4.tgz", + "integrity": "sha512-DqC6n3QQ77zdFpCMASA1a3Jlb64Hv2N2DciFGkO/4L9+q/IpIAuRlKOvCXabtRW6cQf8usbmM6BE/TOPysCdIA==", + "license": "MIT", + "engines": { + "bun": ">=1.0.0", + "deno": ">=1.30.0", + "node": ">=8.0.0" + }, + "funding": { + "type": "github", + "url": "https://github.com/sponsors/wellwelwel" + } + }, + "node_modules/mysql2": { + "version": "3.22.3", + "resolved": "https://registry.npmmirror.com/mysql2/-/mysql2-3.22.3.tgz", + "integrity": "sha512-uWWxvZSRvRhtBdh2CdcuK83YcOfPdmEeEYB069bAmPnV93QApDGVPuvCQOLjlh7tYHEWdgQPrn6kosDxHBVLkA==", + "license": "MIT", + "dependencies": { + "aws-ssl-profiles": "^1.1.2", + "denque": "^2.1.0", + "generate-function": "^2.3.1", + "iconv-lite": "^0.7.2", + "long": "^5.3.2", + "lru.min": "^1.1.4", + "named-placeholders": "^1.1.6", + "sql-escaper": "^1.3.3" + }, + "engines": { + "node": ">= 8.0" + }, + "peerDependencies": { + "@types/node": ">= 8" + } + }, + "node_modules/named-placeholders": { + "version": "1.1.6", + "resolved": "https://registry.npmmirror.com/named-placeholders/-/named-placeholders-1.1.6.tgz", + "integrity": "sha512-Tz09sEL2EEuv5fFowm419c1+a/jSMiBjI9gHxVLrVdbUkkNUUfjsVYs9pVZu5oCon/kmRh9TfLEObFtkVxmY0w==", + "license": "MIT", + "dependencies": { + "lru.min": "^1.1.0" + }, + "engines": { + "node": ">=8.0.0" + } + }, "node_modules/node-cron": { "version": "4.2.1", "resolved": "https://registry.npmmirror.com/node-cron/-/node-cron-4.2.1.tgz", @@ -87,6 +202,34 @@ "engines": { "node": ">=18" } + }, + "node_modules/safer-buffer": { + "version": "2.1.2", + "resolved": "https://registry.npmmirror.com/safer-buffer/-/safer-buffer-2.1.2.tgz", + "integrity": "sha512-YZo3K82SD7Riyi0E1EQPojLz7kpepnSQI9IyPbHHg1XXXevb5dJI7tpyN2ADxGcQbHG7vcyRHk0cbwqcQriUtg==", + "license": "MIT" + }, + "node_modules/sql-escaper": { + "version": "1.3.3", + "resolved": "https://registry.npmmirror.com/sql-escaper/-/sql-escaper-1.3.3.tgz", + "integrity": "sha512-BsTCV265VpTp8tm1wyIm1xqQCS+Q9NHx2Sr+WcnUrgLrQ6yiDIvHYJV5gHxsj1lMBy2zm5twLaZao8Jd+S8JJw==", + "license": "MIT", + "engines": { + "bun": ">=1.0.0", + "deno": ">=2.0.0", + "node": ">=12.0.0" + }, + "funding": { + "type": "github", + "url": "https://github.com/mysqljs/sql-escaper?sponsor=1" + } + }, + "node_modules/undici-types": { + "version": "7.19.2", + "resolved": "https://registry.npmmirror.com/undici-types/-/undici-types-7.19.2.tgz", + "integrity": "sha512-qYVnV5OEm2AW8cJMCpdV20CDyaN3g0AjDlOGf1OW4iaDEx8MwdtChUp4zu4H0VP3nDRF/8RKWH+IPp9uW0YGZg==", + "license": "MIT", + "peer": true } } } diff --git a/aliyun-sync/aliyun-aps-sync/package.json b/aliyun-sync/aliyun-aps-sync/package.json index 3b3f5a8..92b7a9c 100644 --- a/aliyun-sync/aliyun-aps-sync/package.json +++ b/aliyun-sync/aliyun-aps-sync/package.json @@ -6,11 +6,14 @@ "scripts": { "login": "node src/index.js login", "sync": "node src/index.js sync", + "incremental": "node src/index.js incremental", "bills": "node src/index.js bills", + "messages": "node src/index.js messages", "schedule": "node src/index.js schedule" }, "dependencies": { "dotenv": "^16.6.1", + "mysql2": "^3.15.2", "nodemailer": "^6.10.1", "node-cron": "^4.2.1", "playwright": "^1.58.2" diff --git a/aliyun-sync/aliyun-aps-sync/src/config.js b/aliyun-sync/aliyun-aps-sync/src/config.js index 8fc22af..5a2049d 100644 --- a/aliyun-sync/aliyun-aps-sync/src/config.js +++ b/aliyun-sync/aliyun-aps-sync/src/config.js @@ -20,11 +20,17 @@ export const config = { rootDir, baseUrl: process.env.ALIYUN_APS_BASE_URL || 'https://aps.aliyun.com', headless: toBool(process.env.ALIYUN_APS_HEADLESS, false), + browserChannel: (process.env.ALIYUN_APS_BROWSER_CHANNEL || '').trim(), + browserExecutablePath: (process.env.ALIYUN_APS_BROWSER_EXECUTABLE_PATH || '').trim(), timezone: process.env.ALIYUN_APS_TIMEZONE || 'Asia/Shanghai', cron: process.env.ALIYUN_APS_CRON || '0 6 * * *', orderStartDate: process.env.ALIYUN_APS_ORDER_START_DATE || '2024-01-01', incrementalOrderStartDate: process.env.ALIYUN_APS_INCREMENTAL_ORDER_START_DATE || '', billStartMonth: process.env.ALIYUN_APS_BILL_START_MONTH || '2024-01', + orderIncrementalOverlapDays: Math.max(0, Number.parseInt(process.env.ALIYUN_APS_ORDER_INCREMENTAL_OVERLAP_DAYS || '2', 10) || 2), + billIncrementalOverlapDays: Math.max(0, Number.parseInt(process.env.ALIYUN_APS_BILL_INCREMENTAL_OVERLAP_DAYS || '7', 10) || 7), + messageIncrementalOverlapDays: Math.max(0, Number.parseInt(process.env.ALIYUN_APS_MESSAGE_INCREMENTAL_OVERLAP_DAYS || '7', 10) || 7), + scheduleMode: process.env.ALIYUN_APS_SCHEDULE_MODE || 'incremental', smtp: { host: process.env.ALIYUN_APS_SMTP_HOST || 'smtp.qq.com', port: parseInt(process.env.ALIYUN_APS_SMTP_PORT || '465', 10), @@ -37,11 +43,20 @@ export const config = { fullSync: toBool(process.env.ALIYUN_APS_FULL_SYNC, true), resumeBillMonth: process.env.ALIYUN_APS_RESUME_BILL_MONTH || '', resumeBillPage: Math.max(1, Number.parseInt(process.env.ALIYUN_APS_RESUME_BILL_PAGE || '1', 10) || 1), - dbSyncScript: process.env.ALIYUN_APS_DB_SYNC_SCRIPT || '../aps-aliyun-sync/aps_db_sync.py', userDataDir: ensureDir(path.join(rootDir, '.browser')), storageStateFile: path.join(rootDir, '.browser', 'storage-state.json'), dataDir: ensureDir(path.join(rootDir, 'data')), downloadDir: ensureDir(path.join(rootDir, 'downloads')), + errorDir: ensureDir(path.join(rootDir, 'data', 'errors')), + db: { + host: process.env.ALIYUN_APS_DB_HOST || '', + port: parseInt(process.env.ALIYUN_APS_DB_PORT || '3306', 10), + user: process.env.ALIYUN_APS_DB_USER || '', + password: process.env.ALIYUN_APS_DB_PASSWORD || '', + database: process.env.ALIYUN_APS_DB_NAME || '', + charset: process.env.ALIYUN_APS_DB_CHARSET || 'utf8mb4', + connectionLimit: Math.max(1, Number.parseInt(process.env.ALIYUN_APS_DB_CONNECTION_LIMIT || '5', 10) || 5), + }, }; export const datasets = { @@ -180,6 +195,34 @@ export const datasets = { serviceEndAt: record['服务结束时间'] || '', }), }, + messages: { + name: 'messages', + url: `${config.baseUrl}/#/message`, + heading: '消息', + pageSize: 20, + uniqueKey: (record) => record.msgId || record.__hash, + normalize: (record) => ({ + msgId: pickFirst(record, ['消息ID', 'msg_id', '消息id', 'ID', 'id']), + title: pickFirst(record, ['消息标题', '标题', 'title']), + content: pickFirst(record, ['消息内容', '内容', 'content']), + msgType: pickFirst(record, ['消息类型', 'type', 'msg_type']), + fromApp: pickFirst(record, ['来源应用', 'from_app', '应用']), + bizCode: pickFirst(record, ['业务编码', 'biz_code']), + msgChannel: pickFirst(record, ['消息通道', 'msg_channel']), + categoryId: pickFirst(record, ['分类ID', 'category_id']), + categoryName: pickFirst(record, ['分类名称', 'category_name']), + lv1CategoryId: pickFirst(record, ['一级分类ID', 'lv1_category_id']), + lv2CategoryId: pickFirst(record, ['二级分类ID', 'lv2_category_id']), + lv3CategoryId: pickFirst(record, ['三级分类ID', 'lv3_category_id']), + messageClassification: pickFirst(record, ['归类结果', 'message_classification']), + customerName: pickFirst(record, ['客户名称', 'customer_name']), + orderNo: pickFirst(record, ['订单号', 'order_no']), + status: pickFirst(record, ['消息状态', '状态', 'status']), + gmtCreated: pickFirst(record, ['消息创建时间', '创建时间', 'gmt_created']), + gmtModified: pickFirst(record, ['消息修改时间', '修改时间', 'gmt_modified']), + extraData: record, + }), + }, }; function splitLines(value) { @@ -188,3 +231,13 @@ function splitLines(value) { .map((part) => part.trim()) .filter(Boolean); } + +function pickFirst(record, keys) { + for (const key of keys) { + const value = record[key]; + if (value != null && String(value).trim()) { + return String(value).trim(); + } + } + return ''; +} diff --git a/aliyun-sync/aliyun-aps-sync/src/db.js b/aliyun-sync/aliyun-aps-sync/src/db.js new file mode 100644 index 0000000..cad83bc --- /dev/null +++ b/aliyun-sync/aliyun-aps-sync/src/db.js @@ -0,0 +1,596 @@ +import mysql from 'mysql2/promise'; +import { config } from './config.js'; + +let pool = null; + +const MESSAGE_TABLE_DDL = ` +CREATE TABLE IF NOT EXISTS aliyun_aps_messages ( + id bigint NOT NULL AUTO_INCREMENT, + msg_id varchar(128) NULL DEFAULT NULL COMMENT '消息原始ID', + title text NULL COMMENT '消息标题', + content text NULL COMMENT '消息内容', + msg_type varchar(64) NULL DEFAULT NULL COMMENT '消息类型', + from_app varchar(128) NULL DEFAULT NULL COMMENT '来源应用', + biz_code varchar(128) NULL DEFAULT NULL COMMENT '业务编码', + msg_channel varchar(64) NULL DEFAULT NULL COMMENT '消息通道', + category_id varchar(64) NULL DEFAULT NULL COMMENT '分类ID', + category_name varchar(128) NULL DEFAULT NULL COMMENT '分类名称', + lv1_category_id varchar(64) NULL DEFAULT NULL COMMENT '一级分类ID', + lv2_category_id varchar(64) NULL DEFAULT NULL COMMENT '二级分类ID', + lv3_category_id varchar(64) NULL DEFAULT NULL COMMENT '三级分类ID', + message_classification varchar(255) NULL DEFAULT NULL COMMENT '归类结果', + customer_name varchar(255) NULL DEFAULT NULL COMMENT '客户名称', + order_no varchar(128) NULL DEFAULT NULL COMMENT '订单号', + status varchar(32) NULL DEFAULT NULL COMMENT '消息状态(已读/未读)', + gmt_created varchar(64) NULL DEFAULT NULL COMMENT '消息创建时间', + gmt_modified varchar(64) NULL DEFAULT NULL COMMENT '消息修改时间', + extra_data json NULL COMMENT '其他字段(原始JSON)', + crawl_time datetime NULL DEFAULT CURRENT_TIMESTAMP COMMENT '爬取时间', + PRIMARY KEY (id), + UNIQUE KEY uk_msg_id (msg_id) +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='阿里云APS站内消息' +`; + +let customerLifecycleEnsured = false; + +function hasDbConfig() { + return Boolean(config.db.host && config.db.user && config.db.database); +} + +function getPool() { + if (!hasDbConfig()) { + throw new Error('未配置数据库连接信息,请设置 ALIYUN_APS_DB_HOST / USER / PASSWORD / NAME'); + } + if (pool) { + return pool; + } + pool = mysql.createPool({ + host: config.db.host, + port: config.db.port, + user: config.db.user, + password: config.db.password, + database: config.db.database, + charset: config.db.charset, + connectionLimit: config.db.connectionLimit, + waitForConnections: true, + }); + return pool; +} + +function normalizeMonth(value) { + const normalized = String(value || '').trim(); + if (!normalized) { + return null; + } + if (/^\d{4}-\d{2}$/.test(normalized)) { + return normalized; + } + if (/^\d{6}$/.test(normalized)) { + return `${normalized.slice(0, 4)}-${normalized.slice(4, 6)}`; + } + return null; +} + +function safeString(value) { + if (value == null) { + return null; + } + const normalized = String(value).trim(); + return normalized ? normalized : null; +} + +function safeNumber(value) { + if (value == null || value === '') { + return null; + } + const normalized = Number.parseFloat(String(value).replace(/,/g, '').trim()); + return Number.isFinite(normalized) ? normalized : null; +} + +async function getCustomerMap() { + const [rows] = await getPool().query('SELECT account_id, login_name FROM aps_customer'); + const map = new Map(); + for (const row of rows) { + const loginName = safeString(row.login_name); + const accountId = safeString(row.account_id); + if (!loginName || !accountId) { + continue; + } + map.set(loginName, accountId); + map.set(loginName.replace(/\s+/g, ''), accountId); + } + return map; +} + +function resolveCustomerAccountId(customerMap, customerAccount) { + const normalized = safeString(customerAccount); + if (!normalized) { + return null; + } + return customerMap.get(normalized) || customerMap.get(normalized.replace(/\s+/g, '')) || null; +} + +async function queryLatestValue(sql) { + const [rows] = await getPool().query(sql); + const row = Array.isArray(rows) ? rows[0] : null; + if (!row) { + return null; + } + const latest = row.latest_time; + if (!latest) { + return null; + } + if (latest instanceof Date) { + return latest.toISOString().slice(0, 19).replace('T', ' '); + } + return String(latest); +} + +export async function getLatestOrderTimeFromDb() { + return queryLatestValue('SELECT MAX(order_time) AS latest_time FROM aps_order'); +} + +export async function getLatestBillConsumptionTimeFromDb() { + return queryLatestValue('SELECT MAX(consumption_time) AS latest_time FROM aps_bill'); +} + +export async function getLatestMessageTimeFromDb() { + return queryLatestValue("SELECT MAX(COALESCE(NULLIF(gmt_modified, ''), NULLIF(gmt_created, ''))) AS latest_time FROM aliyun_aps_messages"); +} + +export async function closeDbPool() { + if (!pool) { + return; + } + await pool.end(); + pool = null; +} + +export async function ensureMessagesTable() { + await getPool().query(MESSAGE_TABLE_DDL); +} + +export async function ensureCustomerLifecycleColumns() { + if (customerLifecycleEnsured) { + return; + } + await getPool().query("ALTER TABLE aps_customer ADD COLUMN IF NOT EXISTS active TINYINT(1) NOT NULL DEFAULT 1 COMMENT '是否有效 1=有效 0=释放'"); + await getPool().query("ALTER TABLE aps_customer ADD COLUMN IF NOT EXISTS status VARCHAR(32) DEFAULT 'active' COMMENT '客户状态'"); + await getPool().query("ALTER TABLE aps_customer ADD COLUMN IF NOT EXISTS released_at DATETIME NULL COMMENT '释放时间'"); + await getPool().query("ALTER TABLE aps_customer ADD COLUMN IF NOT EXISTS release_reason VARCHAR(255) NULL COMMENT '释放原因'"); + customerLifecycleEnsured = true; +} + +export async function upsertCustomers(records) { + if (!records?.length) { + return { inserted: 0 }; + } + await ensureCustomerLifecycleColumns(); + const sql = ` + INSERT INTO aps_customer ( + account_id, login_name, real_name, report_source, report_type, trade_mode, + real_name_status, relation_date, follow_staff, payment_notice_status, + invite_register_type, is_new_customer, performance_start_point_reached, + customer_category, remark, no_consumption_months, + planned_release_time, planned_release_reason, + active, status, released_at, release_reason + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, 1, 'active', NULL, NULL) + ON DUPLICATE KEY UPDATE + login_name=VALUES(login_name), + real_name=VALUES(real_name), + report_source=VALUES(report_source), + report_type=VALUES(report_type), + trade_mode=VALUES(trade_mode), + real_name_status=VALUES(real_name_status), + relation_date=VALUES(relation_date), + follow_staff=VALUES(follow_staff), + payment_notice_status=VALUES(payment_notice_status), + invite_register_type=VALUES(invite_register_type), + is_new_customer=VALUES(is_new_customer), + performance_start_point_reached=VALUES(performance_start_point_reached), + customer_category=VALUES(customer_category), + remark=VALUES(remark), + no_consumption_months=VALUES(no_consumption_months), + planned_release_time=VALUES(planned_release_time), + planned_release_reason=VALUES(planned_release_reason), + active=1, + status='active', + released_at=NULL, + release_reason=NULL + `; + + for (const record of records) { + const accountId = safeString(record.accountId); + const loginName = safeString(record.loginName); + if (!accountId || !loginName) { + continue; + } + await getPool().execute(sql, [ + accountId, + loginName, + safeString(record.realName), + safeString(record.reportSource), + safeString(record.reportType), + safeString(record.tradeMode), + safeString(record.authStatus), + safeString(record.relationTime), + safeString(record.owner), + safeString(record.paymentNoticeStatus), + safeString(record.inviteType), + safeString(record.isNewCustomer) === '是' ? 1 : safeString(record.isNewCustomer) === '否' ? 0 : null, + safeString(record.isPerformanceQualified) === '是' ? 1 : safeString(record.isPerformanceQualified) === '否' ? 0 : null, + safeString(record.customerCategory), + safeString(record.remark), + safeNumber(record.inactiveMonths), + safeString(record.releasePlanTime), + safeString(record.releasePlanReason), + ]); + } + return { inserted: records.length }; +} + +export async function upsertCustomerDetails(records) { + if (!records?.length) { + return { inserted: 0 }; + } + await ensureCustomerLifecycleColumns(); + const sql = ` + UPDATE aps_customer SET + customer_name = ?, + customer_type = ?, + customer_source = ?, + email = ?, + phone = ?, + department = ?, + payment_notice_status = COALESCE(?, payment_notice_status), + updated_at = CURRENT_TIMESTAMP + WHERE account_id = ? + `; + for (const record of records) { + const accountId = safeString(record.accountId); + if (!accountId) { + continue; + } + await getPool().execute(sql, [ + safeString(record.customerName), + safeString(record.customerType), + safeString(record.customerSource), + safeString(record.email), + safeString(record.phone), + safeString(record.department), + safeString(record.paymentNoticeStatus), + accountId, + ]); + } + return { inserted: records.length }; +} + +async function findCustomerAccountIdByName(customerName) { + const normalized = safeString(customerName); + if (!normalized) { + return null; + } + const [rows] = await getPool().execute( + 'SELECT account_id FROM aps_customer WHERE customer_name = ? OR real_name = ? OR login_name = ? LIMIT 1', + [normalized, normalized, normalized], + ); + return Array.isArray(rows) && rows.length > 0 ? safeString(rows[0].account_id) : null; +} + +async function markCustomerReleased(customerName, reason, releasedAt) { + const accountId = await findCustomerAccountIdByName(customerName); + if (!accountId) { + return false; + } + await ensureCustomerLifecycleColumns(); + await getPool().execute( + 'UPDATE aps_customer SET active = 0, status = ?, released_at = ?, release_reason = ? WHERE account_id = ?', + ['released', releasedAt || new Date().toISOString().slice(0, 19).replace('T', ' '), safeString(reason), accountId], + ); + return true; +} + +async function markCustomerActive(customerName) { + const accountId = await findCustomerAccountIdByName(customerName); + if (!accountId) { + return false; + } + await ensureCustomerLifecycleColumns(); + await getPool().execute( + 'UPDATE aps_customer SET active = 1, status = ?, released_at = NULL, release_reason = NULL WHERE account_id = ?', + ['active', accountId], + ); + return true; +} + +async function applyCustomerLifecycleFromMessage(record) { + const title = safeString(record.title) || ''; + const content = safeString(record.content) || ''; + const customerName = safeString(record.customerName) || ''; + if (!customerName) { + return; + } + const text = `${title}\n${content}`; + if (/释放/.test(text)) { + await markCustomerReleased(customerName, title || content, safeString(record.gmtModified) || safeString(record.gmtCreated)); + return; + } + if (/(关联|报备成功|新增客户|绑定客户)/.test(text)) { + await markCustomerActive(customerName); + } +} + +export async function upsertOrders(records) { + if (!records?.length) { + return { inserted: 0 }; + } + const customerMap = await getCustomerMap(); + const sql = ` + INSERT INTO aps_order ( + order_id, customer_account_id, customer_login_name, + customer_category, order_type, original_price_cny, paid_amount_cny, + status, order_time, order_month + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + ON DUPLICATE KEY UPDATE + customer_account_id=VALUES(customer_account_id), + customer_login_name=VALUES(customer_login_name), + customer_category=VALUES(customer_category), + order_type=VALUES(order_type), + original_price_cny=VALUES(original_price_cny), + paid_amount_cny=VALUES(paid_amount_cny), + status=VALUES(status), + order_time=VALUES(order_time), + order_month=VALUES(order_month) + `; + for (const record of records) { + const orderId = safeString(record.orderId); + if (!orderId) { + continue; + } + const customerLoginName = safeString(record.customerAccount) || ''; + const accountId = resolveCustomerAccountId(customerMap, customerLoginName); + await getPool().execute(sql, [ + orderId, + accountId, + customerLoginName, + safeString(record.customerCategory), + safeString(record.orderType), + safeNumber(record.orderOriginalPriceCny), + safeNumber(record.actualPaidCny), + safeString(record.orderStatus), + safeString(record.createdAt), + safeString(record.createdAt)?.slice(0, 7) || null, + ]); + } + return { inserted: records.length }; +} + +export async function upsertOrderDetails(records) { + if (!records?.length) { + return { inserted: 0 }; + } + const sql = ` + INSERT INTO aps_order_detail ( + order_id, order_type, status, trade_type, customer_category, + dealer_name, dealer_uid, customer_type, opportunity_id, + payment_time, order_time, product_name, product_code, + original_price_cny, paid_amount_cny, discount, + payable_amount_cny, coupon_amount_cny + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + ON DUPLICATE KEY UPDATE + order_type=VALUES(order_type), + status=VALUES(status), + trade_type=VALUES(trade_type), + customer_category=VALUES(customer_category), + dealer_name=VALUES(dealer_name), + dealer_uid=VALUES(dealer_uid), + customer_type=VALUES(customer_type), + opportunity_id=VALUES(opportunity_id), + payment_time=VALUES(payment_time), + order_time=VALUES(order_time), + product_name=VALUES(product_name), + product_code=VALUES(product_code), + original_price_cny=VALUES(original_price_cny), + paid_amount_cny=VALUES(paid_amount_cny), + discount=VALUES(discount), + payable_amount_cny=VALUES(payable_amount_cny), + coupon_amount_cny=VALUES(coupon_amount_cny) + `; + for (const record of records) { + const orderId = safeString(record.orderId); + if (!orderId) { + continue; + } + await getPool().execute(sql, [ + orderId, + safeString(record.orderType), + safeString(record.status), + safeString(record.tradeType), + safeString(record.customerCategory), + safeString(record.dealerName), + safeString(record.dealerUid), + safeString(record.customerType), + safeString(record.opportunityId), + safeString(record.paymentTime), + safeString(record.orderTime), + safeString(record.productName), + safeString(record.productCode), + safeNumber(record.originalPriceCny), + safeNumber(record.paidAmountCny), + safeNumber(record.discount), + safeNumber(record.payableAmountCny), + safeNumber(record.couponAmountCny), + ]); + } + return { inserted: records.length }; +} + +export async function upsertBills(records) { + if (!records?.length) { + return { inserted: 0 }; + } + const customerMap = await getCustomerMap(); + const selectSql = ` + SELECT id FROM aps_bill + WHERE billing_month = ? + AND commission_month = ? + AND customer_login_name = ? + AND consumption_time = ? + AND product_name = ? + AND original_price_cny <=> ? + LIMIT 1 + `; + const insertSql = ` + INSERT INTO aps_bill ( + billing_month, customer_account_id, customer_login_name, + bill_type, consumption_time, + customer_category, + product_name, product_category, + original_price_cny, customer_payable_amount_cny, + commission_month, + included_in_performance, + rebated, + invite_register_type, + service_start_time, service_end_time + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + `; + const updateSql = ` + UPDATE aps_bill SET + customer_account_id = ?, + bill_type = ?, + customer_category = ?, + product_category = ?, + customer_payable_amount_cny = ?, + included_in_performance = ?, + rebated = ?, + invite_register_type = ?, + service_start_time = ?, + service_end_time = ? + WHERE id = ? + `; + + for (const record of records) { + const billingMonth = normalizeMonth(record.billingMonth); + const commissionMonth = normalizeMonth(record.commissionMonth); + if (!billingMonth || !commissionMonth) { + continue; + } + const customerLoginName = safeString(record.customerAccount) || ''; + const accountId = resolveCustomerAccountId(customerMap, customerLoginName); + const productName = safeString(record.productName); + const consumptionTime = safeString(record.consumeDate); + const originalPrice = safeNumber(record.originalPriceCny); + const [rows] = await getPool().execute(selectSql, [ + billingMonth, + commissionMonth, + customerLoginName, + consumptionTime, + productName, + originalPrice, + ]); + + if (Array.isArray(rows) && rows.length > 0) { + await getPool().execute(updateSql, [ + accountId, + safeString(record.billType), + safeString(record.customerCategory), + safeString(record.productCategory), + safeNumber(record.customerPayableCny), + safeString(record.countsForPerformance), + safeString(record.commissionable), + safeString(record.inviteType), + safeString(record.serviceStartAt), + safeString(record.serviceEndAt), + rows[0].id, + ]); + continue; + } + + await getPool().execute(insertSql, [ + billingMonth, + accountId, + customerLoginName, + safeString(record.billType), + consumptionTime, + safeString(record.customerCategory), + productName, + safeString(record.productCategory), + originalPrice, + safeNumber(record.customerPayableCny), + commissionMonth, + safeString(record.countsForPerformance), + safeString(record.commissionable), + safeString(record.inviteType), + safeString(record.serviceStartAt), + safeString(record.serviceEndAt), + ]); + } + + return { inserted: records.length }; +} + +export async function upsertMessages(records) { + if (!records?.length) { + return { inserted: 0 }; + } + await ensureMessagesTable(); + const sql = ` + INSERT INTO aliyun_aps_messages ( + msg_id, title, content, msg_type, from_app, biz_code, msg_channel, + category_id, category_name, lv1_category_id, lv2_category_id, lv3_category_id, + message_classification, customer_name, order_no, status, + gmt_created, gmt_modified, extra_data + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + ON DUPLICATE KEY UPDATE + title=VALUES(title), + content=VALUES(content), + msg_type=VALUES(msg_type), + from_app=VALUES(from_app), + biz_code=VALUES(biz_code), + msg_channel=VALUES(msg_channel), + category_id=VALUES(category_id), + category_name=VALUES(category_name), + lv1_category_id=VALUES(lv1_category_id), + lv2_category_id=VALUES(lv2_category_id), + lv3_category_id=VALUES(lv3_category_id), + message_classification=VALUES(message_classification), + customer_name=VALUES(customer_name), + order_no=VALUES(order_no), + status=VALUES(status), + gmt_created=VALUES(gmt_created), + gmt_modified=VALUES(gmt_modified), + extra_data=VALUES(extra_data), + crawl_time=CURRENT_TIMESTAMP + `; + for (const record of records) { + const msgId = safeString(record.msgId) || safeString(record.__hash); + if (!msgId) { + continue; + } + await getPool().execute(sql, [ + msgId, + safeString(record.title), + safeString(record.content), + safeString(record.msgType), + safeString(record.fromApp), + safeString(record.bizCode), + safeString(record.msgChannel), + safeString(record.categoryId), + safeString(record.categoryName), + safeString(record.lv1CategoryId), + safeString(record.lv2CategoryId), + safeString(record.lv3CategoryId), + safeString(record.messageClassification), + safeString(record.customerName), + safeString(record.orderNo), + safeString(record.status), + safeString(record.gmtCreated), + safeString(record.gmtModified), + JSON.stringify(record.extraData || record), + ]); + await applyCustomerLifecycleFromMessage(record); + } + return { inserted: records.length }; +} + +export { hasDbConfig }; diff --git a/aliyun-sync/aliyun-aps-sync/src/index.js b/aliyun-sync/aliyun-aps-sync/src/index.js index 04f0e64..7aa242d 100644 --- a/aliyun-sync/aliyun-aps-sync/src/index.js +++ b/aliyun-sync/aliyun-aps-sync/src/index.js @@ -1,3 +1,5 @@ +import { config } from './config.js'; + const args = process.argv.slice(2); const command = args[0] || 'sync'; const extraArgs = args.slice(1); @@ -9,7 +11,7 @@ for (const arg of extraArgs) { } } -const { login, scheduleSync, syncAll, syncBillsOnly } = await import('./sync.js'); +const { login, scheduleSync, syncAll, syncAllIncremental, syncBillsOnly, syncMessagesOnly } = await import('./sync.js'); if (command === 'login') { await login(); @@ -22,12 +24,24 @@ if (command === 'sync') { process.exit(0); } +if (command === 'incremental') { + const summary = await syncAllIncremental(); + console.log(JSON.stringify(summary, null, 2)); + process.exit(0); +} + if (command === 'bills') { const summary = await syncBillsOnly({ resume: billsResume }); console.log(JSON.stringify(summary, null, 2)); process.exit(0); } +if (command === 'messages') { + const summary = await syncMessagesOnly({ incremental: config.scheduleMode === 'incremental' }); + console.log(JSON.stringify(summary, null, 2)); + process.exit(0); +} + if (command === 'schedule') { await scheduleSync(); } else { diff --git a/aliyun-sync/aliyun-aps-sync/src/notify.js b/aliyun-sync/aliyun-aps-sync/src/notify.js index b1ba050..c7c479c 100644 --- a/aliyun-sync/aliyun-aps-sync/src/notify.js +++ b/aliyun-sync/aliyun-aps-sync/src/notify.js @@ -4,6 +4,49 @@ import { config } from './config.js'; let lastSentAt = 0; const ONE_HOUR_MS = 60 * 60 * 1000; +function canSendMail() { + if (!config.smtp.user || !config.smtp.pass) { + console.warn('[通知] SMTP 用户名或密码为空,跳过发送邮件'); + return false; + } + + if (!config.notifyEmail) { + console.warn('[通知] 未配置 ALIYUN_APS_NOTIFY_EMAIL,跳过发送邮件'); + return false; + } + + return true; +} + +function createTransporter() { + return nodemailer.createTransport({ + host: config.smtp.host, + port: config.smtp.port, + secure: config.smtp.secure, + auth: { + user: config.smtp.user, + pass: config.smtp.pass, + }, + }); +} + +export async function sendRuntimeErrorAlert({ subject, text, attachments = [] }) { + if (!canSendMail()) { + return; + } + + const transporter = createTransporter(); + await transporter.sendMail({ + from: config.smtp.user, + to: config.notifyEmail, + subject, + text, + attachments, + }); + + console.log(`[通知] 错误提醒邮件已发送至 ${config.notifyEmail}`); +} + export async function sendLoginAlert(loginUrl = '') { const now = Date.now(); if (now - lastSentAt < ONE_HOUR_MS) { @@ -11,27 +54,13 @@ export async function sendLoginAlert(loginUrl = '') { return; } - const { smtp, notifyEmail, baseUrl } = config; + const { baseUrl } = config; - if (!smtp.user || !smtp.pass) { - console.warn('[通知] SMTP 用户名或密码为空,跳过发送登录提醒邮件'); + if (!canSendMail()) { return; } - if (!notifyEmail) { - console.warn('[通知] 未配置 ALIYUN_APS_NOTIFY_EMAIL,跳过发送登录提醒邮件'); - return; - } - - const transporter = nodemailer.createTransport({ - host: smtp.host, - port: smtp.port, - secure: smtp.secure, - auth: { - user: smtp.user, - pass: smtp.pass, - }, - }); + const transporter = createTransporter(); const url = loginUrl || `${baseUrl}/#/signin`; const timestamp = new Date().toISOString(); @@ -45,12 +74,12 @@ export async function sendLoginAlert(loginUrl = '') { ].join('\n'); await transporter.sendMail({ - from: smtp.user, - to: notifyEmail, + from: config.smtp.user, + to: config.notifyEmail, subject, text, }); lastSentAt = now; - console.log(`[通知] 登录提醒邮件已发送至 ${notifyEmail}`); + console.log(`[通知] 登录提醒邮件已发送至 ${config.notifyEmail}`); } diff --git a/aliyun-sync/aliyun-aps-sync/src/sync.js b/aliyun-sync/aliyun-aps-sync/src/sync.js index a2af5fd..21230a4 100644 --- a/aliyun-sync/aliyun-aps-sync/src/sync.js +++ b/aliyun-sync/aliyun-aps-sync/src/sync.js @@ -3,9 +3,21 @@ import cron from 'node-cron'; import fs from 'node:fs'; import path from 'node:path'; import readline from 'node:readline'; -import { execSync } from 'node:child_process'; import { config, datasets } from './config.js'; -import { sendLoginAlert } from './notify.js'; +import { sendLoginAlert, sendRuntimeErrorAlert } from './notify.js'; +import { + closeDbPool, + getLatestBillConsumptionTimeFromDb, + getLatestMessageTimeFromDb, + getLatestOrderTimeFromDb, + hasDbConfig, + upsertBills, + upsertCustomerDetails, + upsertCustomers, + upsertMessages, + upsertOrderDetails, + upsertOrders, +} from './db.js'; import { diffRecords, loadCurrentState, @@ -121,14 +133,56 @@ async function runtimeCheckpoint(label) { await controller.waitIfPaused(label); } +function clearStaleBrowserProfileLocks() { + const lockFiles = ['SingletonLock', 'SingletonCookie', 'SingletonSocket']; + const now = Date.now(); + const staleMs = 10 * 60 * 1000; + + for (const fileName of lockFiles) { + const filePath = path.join(config.userDataDir, fileName); + if (!fs.existsSync(filePath)) { + continue; + } + + try { + const stat = fs.statSync(filePath); + const ageMs = now - stat.mtimeMs; + if (ageMs < staleMs) { + console.log(`[浏览器锁] 检测到活跃锁文件,保留: ${fileName}`); + continue; + } + fs.rmSync(filePath, { force: true }); + console.log(`[浏览器锁] 已清理陈旧锁文件: ${fileName}`); + } catch (error) { + console.warn(`[浏览器锁] 清理 ${fileName} 失败: ${error.message}`); + } + } +} + async function getContext() { if (_context) return _context; - _context = await chromium.launchPersistentContext(config.userDataDir, { - channel: 'chrome', + clearStaleBrowserProfileLocks(); + const launchOptions = { headless: config.headless, acceptDownloads: true, downloadsPath: config.downloadDir, - }); + }; + if (config.browserChannel) { + launchOptions.channel = config.browserChannel; + } + if (config.browserExecutablePath) { + launchOptions.executablePath = config.browserExecutablePath; + } + try { + _context = await chromium.launchPersistentContext(config.userDataDir, launchOptions); + } catch (error) { + const browserHint = config.browserExecutablePath + ? `executablePath=${config.browserExecutablePath}` + : config.browserChannel + ? `channel=${config.browserChannel}` + : 'bundled-chromium'; + throw new Error(`浏览器启动失败(${browserHint})。请确认没有其他浏览器占用 .browser 目录,或删除 .browser 后重新执行 npm run login。原始错误: ${error.message}`); + } await restoreStorageState(_context); return _context; } @@ -185,6 +239,91 @@ function loadLatestBillsCheckpoint() { } } +function subtractDays(dateValue, days) { + const next = new Date(dateValue); + next.setDate(next.getDate() - days); + return next; +} + +function parseDbDateTime(value) { + const normalized = String(value || '').trim(); + if (!normalized) { + return null; + } + const parsed = new Date(normalized.replace(' ', 'T')); + return Number.isNaN(parsed.getTime()) ? null : parsed; +} + +function formatDateTime(date) { + return `${formatDate(date)} ${String(date.getHours()).padStart(2, '0')}:${String(date.getMinutes()).padStart(2, '0')}:${String(date.getSeconds()).padStart(2, '0')}`; +} + +function buildSingleDateWindow(startDate, endDate) { + return [{ + windowStart: startDate, + windowEnd: endDate, + start: startDate, + end: endDate, + }]; +} + +async function captureErrorArtifacts(page, metadata = {}) { + const stamp = nowStamp(); + const artifactDir = path.join(config.errorDir, metadata.dataset || 'general'); + fs.mkdirSync(artifactDir, { recursive: true }); + + const jsonPath = path.join(artifactDir, `${stamp}.json`); + const screenshotPath = path.join(artifactDir, `${stamp}.png`); + const payload = { + ...metadata, + capturedAt: new Date().toISOString(), + pageUrl: page?.url?.() || '', + stack: metadata.error?.stack || metadata.errorMessage || '', + }; + fs.writeFileSync(jsonPath, JSON.stringify(payload, null, 2)); + + let screenshotSaved = false; + if (page) { + try { + await page.screenshot({ path: screenshotPath, fullPage: true }); + screenshotSaved = true; + } catch (error) { + console.error('[错误截图] 保存失败:', error.message); + } + } + + return { + jsonPath, + screenshotPath: screenshotSaved ? screenshotPath : '', + }; +} + +async function reportRuntimeError(error, page, metadata = {}) { + const artifacts = await captureErrorArtifacts(page, { + ...metadata, + errorMessage: error.message, + error, + }); + const subject = `[APS同步异常] ${metadata.label || metadata.dataset || 'sync'} failed`; + const text = [ + `时间: ${new Date().toISOString()}`, + `任务: ${metadata.label || ''}`, + `数据集: ${metadata.dataset || ''}`, + `模式: ${metadata.mode || ''}`, + `URL: ${page?.url?.() || ''}`, + `错误: ${error.message}`, + `JSON: ${artifacts.jsonPath}`, + artifacts.screenshotPath ? `截图: ${artifacts.screenshotPath}` : '截图: 保存失败', + ].join('\n'); + + const attachments = [{ filename: path.basename(artifacts.jsonPath), path: artifacts.jsonPath }]; + if (artifacts.screenshotPath) { + attachments.push({ filename: path.basename(artifacts.screenshotPath), path: artifacts.screenshotPath }); + } + + await sendRuntimeErrorAlert({ subject, text, attachments }); +} + async function getPageBodyPreview(page) { return page .evaluate(() => document.body?.innerText?.substring(0, 500) || '(空)') @@ -258,32 +397,41 @@ export async function syncAll() { const runtimeController = getRuntimeController(); runtimeController.bind(); const context = await getContext(); + let page = null; try { const summary = { startedAt: new Date().toISOString(), datasets: {} }; - const page = context.pages()[0] || (await context.newPage()); + page = context.pages()[0] || (await context.newPage()); - summary.datasets.customers = await syncCustomers(page); - summary.datasets.customerDetails = await syncCustomerDetails(page); - summary.datasets.orders = await syncOrders(page); + if (config.fullSync) { + summary.datasets.customers = await syncCustomers(page); + summary.datasets.customerDetails = await syncCustomerDetails(page); + } + + summary.datasets.orders = await syncOrders(page, { incremental: !config.fullSync }); // syncOrders 完成后,从最新的 orders.json 读取 orderId 列表 const latestOrders = loadCurrentState('orders', datasets.orders.uniqueKey); const orderIdsForDetail = collectValidOrderIds(latestOrders.records || []); summary.datasets.orderDetails = await syncOrderDetails(page, orderIdsForDetail); - summary.datasets.bills = await syncBills(page); + summary.datasets.bills = await syncBills(page, { incremental: !config.fullSync }); + summary.datasets.messages = await syncMessages(page, { incremental: !config.fullSync }); summary.finishedAt = new Date().toISOString(); const stamp = nowStamp(); saveRunSummary(stamp, summary); return summary; + } catch (error) { + await reportRuntimeError(error, page, { label: 'syncAll', dataset: 'all', mode: config.fullSync ? 'full' : 'incremental' }); + throw error; } finally { if (config.closeBrowser) { await closeContextIfNeeded(); } else { console.log('浏览器保持运行'); } + await closeDbPool(); runtimeController.unbind(); } } @@ -292,10 +440,11 @@ export async function syncBillsOnly(options = {}) { const runtimeController = getRuntimeController(); runtimeController.bind(); const context = await getContext(); + let page = null; try { const summary = { startedAt: new Date().toISOString(), datasets: {} }; - const page = context.pages()[0] || (await context.newPage()); + page = context.pages()[0] || (await context.newPage()); summary.datasets.bills = await syncBills(page, options); summary.finishedAt = new Date().toISOString(); @@ -303,12 +452,45 @@ export async function syncBillsOnly(options = {}) { const stamp = nowStamp(); saveRunSummary(stamp, summary); return summary; + } catch (error) { + await reportRuntimeError(error, page, { label: 'syncBillsOnly', dataset: 'bills', mode: options.incremental ? 'incremental' : 'full' }); + throw error; } finally { if (config.closeBrowser) { await closeContextIfNeeded(); } else { console.log('浏览器保持运行'); } + await closeDbPool(); + runtimeController.unbind(); + } +} + +export async function syncMessagesOnly(options = {}) { + const runtimeController = getRuntimeController(); + runtimeController.bind(); + const context = await getContext(); + let page = null; + + try { + const summary = { startedAt: new Date().toISOString(), datasets: {} }; + page = context.pages()[0] || (await context.newPage()); + summary.datasets.messages = await syncMessages(page, options); + summary.finishedAt = new Date().toISOString(); + + const stamp = nowStamp(); + saveRunSummary(stamp, summary); + return summary; + } catch (error) { + await reportRuntimeError(error, page, { label: 'syncMessagesOnly', dataset: 'messages', mode: options.incremental ? 'incremental' : 'full' }); + throw error; + } finally { + if (config.closeBrowser) { + await closeContextIfNeeded(); + } else { + console.log('浏览器保持运行'); + } + await closeDbPool(); runtimeController.unbind(); } } @@ -319,22 +501,11 @@ export async function scheduleSync() { config.cron, async () => { try { - console.log(`[${new Date().toISOString()}] 开始执行同步`); - const summary = await syncAll(); + console.log(`[${new Date().toISOString()}] 开始执行同步 mode=${config.scheduleMode}`); + const summary = config.scheduleMode === 'full' + ? await syncAll() + : await syncAllIncremental(); console.log(`[${new Date().toISOString()}] 同步完成`, JSON.stringify(summary, null, 2)); - try { - const scriptPath = path.resolve(config.rootDir, config.dbSyncScript); - const incrementalFlag = config.fullSync ? '' : ' --incremental'; - console.log(`[入库] 执行 ${scriptPath}${incrementalFlag ? ' (增量模式)' : ''}`); - const output = execSync(`python "${scriptPath}"${incrementalFlag}`, { - cwd: path.dirname(scriptPath), - encoding: 'utf-8', - timeout: 120000, - }); - console.log(output); - } catch (e) { - console.error('[入库] 失败:', e.message); - } } catch (error) { console.error(`[${new Date().toISOString()}] 同步失败`, error); } @@ -343,6 +514,40 @@ export async function scheduleSync() { ); } +export async function syncAllIncremental() { + const runtimeController = getRuntimeController(); + runtimeController.bind(); + const context = await getContext(); + let page = null; + + try { + const summary = { startedAt: new Date().toISOString(), mode: 'incremental', datasets: {} }; + page = context.pages()[0] || (await context.newPage()); + summary.datasets.orders = await syncOrders(page, { incremental: true }); + const latestOrders = loadCurrentState('orders', datasets.orders.uniqueKey); + const orderIdsForDetail = collectValidOrderIds(latestOrders.records || []); + summary.datasets.orderDetails = await syncOrderDetails(page, orderIdsForDetail); + summary.datasets.bills = await syncBills(page, { incremental: true }); + summary.datasets.messages = await syncMessages(page, { incremental: true }); + summary.finishedAt = new Date().toISOString(); + + const stamp = nowStamp(); + saveRunSummary(stamp, summary); + return summary; + } catch (error) { + await reportRuntimeError(error, page, { label: 'syncAllIncremental', dataset: 'incremental', mode: 'incremental' }); + throw error; + } finally { + if (config.closeBrowser) { + await closeContextIfNeeded(); + } else { + console.log('浏览器保持运行'); + } + await closeDbPool(); + runtimeController.unbind(); + } +} + async function syncCustomers(page) { await runtimeCheckpoint('同步客户'); const dataset = datasets.customers; @@ -350,6 +555,10 @@ async function syncCustomers(page) { await waitUntilReady(page, dataset.heading); await trySetPageSize(page, dataset.pageSize); const records = await scrapePagedTable(page, dataset, {}); + if (hasDbConfig()) { + const normalizedRecords = dedupeByHash(normalizeDatasetRecords(dataset, records, {})); + await upsertCustomers(normalizedRecords); + } return persistDataset(dataset, records, {}); } @@ -393,20 +602,25 @@ async function syncCustomerDetails(page) { const detail = await extractCustomerDetail(page); allDetails.push({ ...detail, __context: { accountId } }); + if (hasDbConfig()) { + const normalizedDetail = normalizeDatasetRecords(dataset, [{ ...detail, __context: { accountId } }], {}); + await upsertCustomerDetails(normalizedDetail); + } } return persistDataset(dataset, dedupeByHash(allDetails), {}); } -async function syncOrders(page) { +async function syncOrders(page, options = {}) { await runtimeCheckpoint('同步订单'); const dataset = datasets.orders; + const { incremental = false } = options; let windows; - if (config.fullSync) { + if (!incremental) { windows = buildMonthlyDateWindows(config.orderStartDate); } else { - windows = buildIncrementalOrderWindows(); + windows = await buildIncrementalOrderWindows(); } const allRecords = []; @@ -420,12 +634,16 @@ async function syncOrders(page) { await trySetPageSize(page, dataset.pageSize); const records = await scrapePagedTable(page, dataset, window); allRecords.push(...records); + if (hasDbConfig()) { + const normalizedWindowRecords = dedupeByHash(normalizeDatasetRecords(dataset, records, window)); + await upsertOrders(normalizedWindowRecords); + } } return persistDataset(dataset, dedupeByHash(allRecords), {}); } -function buildIncrementalOrderWindows() { +async function buildIncrementalOrderWindows() { const configuredStartDate = normalizeConfiguredDate(config.incrementalOrderStartDate); if (configuredStartDate) { const windows = buildMonthlyDateWindows(configuredStartDate); @@ -433,11 +651,27 @@ function buildIncrementalOrderWindows() { return windows; } - const yesterday = new Date(); - yesterday.setDate(yesterday.getDate() - 1); - const dateStr = formatDate(yesterday); - console.log(`[增量模式] 订单仅查询: ${dateStr}`); - return [{ windowStart: dateStr, windowEnd: dateStr, start: dateStr, end: dateStr }]; + if (!hasDbConfig()) { + const yesterday = new Date(); + yesterday.setDate(yesterday.getDate() - 1); + const dateStr = formatDate(yesterday); + console.log(`[增量模式] 未配置数据库,订单仅查询: ${dateStr}`); + return buildSingleDateWindow(dateStr, dateStr); + } + + const latestOrderTime = await getLatestOrderTimeFromDb(); + const runAt = new Date(); + const parsed = parseDbDateTime(latestOrderTime); + if (!parsed) { + const dateStr = formatDate(runAt); + console.log(`[增量模式] 数据库无订单水位,订单仅查询当天: ${dateStr}`); + return buildSingleDateWindow(dateStr, dateStr); + } + + const startDate = formatDate(subtractDays(parsed, config.orderIncrementalOverlapDays)); + const endDate = formatDate(runAt); + console.log(`[增量模式] 订单窗口: ${startDate} ~ ${endDate} (db_last=${latestOrderTime}, overlap=${config.orderIncrementalOverlapDays}d)`); + return buildSingleDateWindow(startDate, endDate); } function normalizeConfiguredDate(value) { @@ -460,18 +694,20 @@ function normalizeConfiguredDate(value) { async function syncBills(page, options = {}) { await runtimeCheckpoint('同步账单'); const dataset = datasets.bills; - const { resume = false } = options; + const { resume = false, incremental = false } = options; let months; let latestConsumptionDate = null; - if (config.fullSync) { + if (!incremental) { months = buildMonthList(config.billStartMonth); } else { - latestConsumptionDate = getLatestBillConsumptionDate(); - const incrementalMonth = latestConsumptionDate?.slice(0, 7) - || `${new Date().getFullYear()}-${String(new Date().getMonth() + 1).padStart(2, '0')}`; - months = [incrementalMonth]; - console.log(`[增量模式] 账单仅查询: ${incrementalMonth}${latestConsumptionDate ? `, 数据库最新消费时间: ${latestConsumptionDate}` : ''}`); + latestConsumptionDate = await getLatestBillConsumptionDate(); + const startDate = latestConsumptionDate ? latestConsumptionDate.slice(0, 10) : formatDate(new Date()); + const endDate = formatDate(new Date()); + const startMonth = startDate.slice(0, 7); + const endMonth = endDate.slice(0, 7); + months = buildMonthList(startMonth).filter((month) => month <= endMonth); + console.log(`[增量模式] 账单窗口: ${startDate} ~ ${endDate}${latestConsumptionDate ? `, 数据库最新消费时间: ${latestConsumptionDate}` : ''}`); } const resumeCheckpoint = resume ? loadLatestBillsCheckpoint() : null; @@ -514,6 +750,9 @@ async function syncBills(page, options = {}) { onPage: async ({ pageNum, pageRows }) => { const normalizedPageRows = normalizeDatasetRecords(dataset, pageRows, { month }); monthNormalizedRecords.push(...normalizedPageRows); + if (hasDbConfig()) { + await upsertBills(normalizedPageRows); + } let checkpointRecords = monthNormalizedRecords; if (latestConsumptionDate) { checkpointRecords = monthNormalizedRecords.filter((record) => isAfterLatestConsumptionDate(record, latestConsumptionDate)); @@ -538,6 +777,42 @@ async function syncBills(page, options = {}) { return persistNormalizedDataset(dataset, dedupeByHash(allNormalizedRecords)); } +async function syncMessages(page, options = {}) { + await runtimeCheckpoint('同步消息'); + const dataset = datasets.messages; + const { incremental = false } = options; + await page.goto(dataset.url, { waitUntil: 'domcontentloaded' }); + await waitUntilReady(page, dataset.heading); + await trySetPageSize(page, dataset.pageSize); + + let records = await scrapePagedTable(page, dataset, {}, { + onPage: hasDbConfig() + ? async ({ pageRows }) => { + const normalizedPageRows = normalizeDatasetRecords(dataset, pageRows, {}); + await upsertMessages(normalizedPageRows); + } + : undefined, + }); + if (incremental && hasDbConfig()) { + try { + const latestMessageTime = await getLatestMessageTimeFromDb(); + if (latestMessageTime) { + const latest = parseDbDateTime(latestMessageTime); + if (latest) { + const watermark = subtractDays(latest, config.messageIncrementalOverlapDays); + const before = records.length; + records = records.filter((record) => isAfterLatestMessageTime(record, watermark)); + console.log(`[增量模式] 消息按时间过滤: ${before} -> ${records.length} (db_last=${latestMessageTime}, overlap=${config.messageIncrementalOverlapDays}d)`); + } + } + } catch (error) { + console.error('[增量模式] 查询数据库最新消息时间失败:', error.message); + } + } + + return persistDataset(dataset, dedupeByHash(records), {}); +} + async function saveBillsCheckpoint(dataset, month, pageNum, normalizedRecords) { const normalized = dedupeByHash(normalizedRecords); const checkpointName = `${month}-latest`; @@ -571,16 +846,22 @@ async function moveBillsToResumeStart(page, resumeFromPage) { return moved; } -function getLatestBillConsumptionDate() { - const scriptPath = path.resolve(config.rootDir, config.dbSyncScript); +async function getLatestBillConsumptionDate() { + if (!hasDbConfig()) { + console.warn('[增量模式] 未配置数据库连接,无法读取账单水位,回退到当前日期'); + return null; + } + try { - const output = execSync(`python "${scriptPath}" --latest-bill-consumption-time`, { - cwd: path.dirname(scriptPath), - encoding: 'utf-8', - timeout: 120000, - }).trim(); - const latest = output.split(/\r?\n/).map((line) => line.trim()).filter(Boolean).at(-1) || ''; - return /^\d{4}-\d{2}-\d{2}/.test(latest) ? latest.slice(0, 10) : null; + const latest = await getLatestBillConsumptionTimeFromDb(); + if (!latest || !/^\d{4}-\d{2}-\d{2}/.test(latest)) { + return null; + } + const parsed = parseDbDateTime(latest); + if (!parsed) { + return latest.slice(0, 10); + } + return formatDate(subtractDays(parsed, config.billIncrementalOverlapDays)); } catch (error) { console.error('[增量模式] 查询数据库最新账单消费时间失败:', error.message); return null; @@ -595,6 +876,18 @@ function isAfterLatestConsumptionDate(record, latestConsumptionDate) { return consumeDate > latestConsumptionDate; } +function isAfterLatestMessageTime(record, watermarkDate) { + const value = String(record['消息修改时间'] || record['修改时间'] || record.gmtModified || record['消息创建时间'] || record['创建时间'] || record.gmtCreated || '').trim(); + if (!value) { + return true; + } + const parsed = parseDbDateTime(value); + if (!parsed) { + return true; + } + return parsed >= watermarkDate; +} + async function syncOrderDetails(page, cachedOrderIds) { await runtimeCheckpoint('同步订单详情'); const dataset = datasets.orderDetails; @@ -638,6 +931,10 @@ async function syncOrderDetails(page, cachedOrderIds) { detail.orderId = orderId; } allDetails.push({ ...detail, __context: {} }); + if (hasDbConfig()) { + const normalizedDetail = normalizeDatasetRecords(dataset, [{ ...detail, __context: {} }], {}); + await upsertOrderDetails(normalizedDetail); + } } return persistDataset(dataset, dedupeByHash(allDetails), {});