Files
aliyunApsSkill/aliyun-sync/aps-aliyun-sync/aps_db_sync.py
2026-04-13 18:09:52 +08:00

1074 lines
52 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from __future__ import annotations
"""
APS Aliyun Data Sync — Database Schema & Sync Pipeline
=======================================================
Reads JSON output from aps-aliyun-scraper skill → Syncs to MySQL.
Design decisions:
- Customer master: UPSERT on account_id (natural PK from Aliyun).
- Customer snapshot: append-only monthly snapshot for historical tracking.
- Orders: UPSERT on order_id (natural PK). Status may change (未支付→已支付→作废).
- Bills: DELETE+INSERT per (billing_month, customer_account). Bills are immutable within
a closed month but may be re-exported with corrections before close.
- Product ratio: DELETE+INSERT per (account_id, snapshot_month).
Update strategy rationale:
- Customers (~14): tiny, full UPSERT is fine. Snapshot table captures monthly changes.
- Orders (~60/mo): small, UPSERT handles status changes cleanly.
- Bills (~1000/mo): medium, DELETE+INSERT per customer+month is safest because bills
lack a unique row ID and may be corrected before month close.
"""
import json
import math
import logging
import hashlib
from datetime import datetime, date
from pathlib import Path
from typing import TypedDict, cast
import pymysql
from pymysql.connections import Connection
from pymysql.cursors import DictCursor
JsonDict = dict[str, object]
JsonList = list[JsonDict]
StatsDict = dict[str, int]
class DbConfig(TypedDict):
host: str
port: int
user: str
password: str
database: str
charset: str
# ---------------------------------------------------------------------------
# Config
# ---------------------------------------------------------------------------
DB_CONFIG = {
"host": "172.27.137.236",
"port": 3306,
"user": "ray",
"password": "GV0C$ErephgQO7RQc7b6",
"database": "crm-prod",
"charset": "utf8mb4",
}
JSON_DIR = Path(r"C:\Users\Administrator\Desktop\aliyun-sync\aliyun-aps-sync\data\current")
LOG_FORMAT = "%(asctime)s [%(levelname)s] %(message)s"
logging.basicConfig(level=logging.INFO, format=LOG_FORMAT)
logger = logging.getLogger("aps_sync")
# ---------------------------------------------------------------------------
# Schema DDL
# ---------------------------------------------------------------------------
DDL_STATEMENTS = [
# 1. Customer master — latest state, UPSERT on each sync
"""
CREATE TABLE IF NOT EXISTS aps_customer (
account_id VARCHAR(32) NOT NULL COMMENT '阿里云UID',
login_name VARCHAR(128) NOT NULL COMMENT '登录名称',
real_name VARCHAR(256) DEFAULT NULL COMMENT '实名认证名称',
report_source VARCHAR(64) DEFAULT NULL COMMENT '报备来源',
report_type VARCHAR(64) DEFAULT NULL COMMENT '报备类型',
trade_mode VARCHAR(64) DEFAULT NULL COMMENT '交易模式',
real_name_status VARCHAR(64) DEFAULT NULL COMMENT '实名认证状态',
relation_date DATETIME DEFAULT NULL COMMENT '关联日期',
follow_staff VARCHAR(64) DEFAULT NULL COMMENT '跟进员工',
payment_notice_status VARCHAR(32) DEFAULT NULL COMMENT '代为支付告知状态',
invite_register_type VARCHAR(64) DEFAULT NULL COMMENT '邀约注册类型',
is_new_customer TINYINT(1) DEFAULT NULL COMMENT '是否新客户 1=是 0=否',
performance_start_point_reached TINYINT(1) DEFAULT NULL COMMENT '是否达成业绩起算点',
customer_category VARCHAR(64) DEFAULT NULL COMMENT '客户分类',
remark VARCHAR(512) DEFAULT NULL COMMENT '备注',
no_consumption_months INT DEFAULT NULL COMMENT '客户无消费月数',
planned_release_time DATE DEFAULT NULL COMMENT '计划释放时间',
planned_release_reason VARCHAR(256) DEFAULT NULL COMMENT '计划释放原因',
-- detail fields
customer_name VARCHAR(256) DEFAULT NULL COMMENT '客户名称(详情)',
customer_type VARCHAR(64) DEFAULT NULL COMMENT '客户类型(详情)',
customer_source VARCHAR(64) DEFAULT NULL COMMENT '客户来源',
email VARCHAR(256) DEFAULT NULL COMMENT '邮箱',
phone VARCHAR(32) DEFAULT NULL COMMENT '手机号',
department VARCHAR(128) DEFAULT NULL COMMENT '所属部门',
-- sync meta
created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
updated_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (account_id),
UNIQUE KEY uk_login_name (login_name)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='APS客户主表(最新状态)'
""",
# 2. Customer monthly snapshot — append-only, tracks financial changes
"""
CREATE TABLE IF NOT EXISTS aps_customer_snapshot (
id BIGINT NOT NULL AUTO_INCREMENT,
account_id VARCHAR(32) NOT NULL COMMENT '阿里云UID',
snapshot_month VARCHAR(7) NOT NULL COMMENT '快照月份 YYYY-MM',
account_cash_balance_cny DECIMAL(16,2) DEFAULT NULL COMMENT '账户现金余额',
pending_invoice_amount_cny DECIMAL(16,2) DEFAULT NULL COMMENT '待开票金额',
last_month_consumption_cny DECIMAL(16,2) DEFAULT NULL COMMENT '上月消费金额',
current_month_consumption_cny DECIMAL(16,2) DEFAULT NULL COMMENT '本月消费金额',
last_month_payable_total_cny DECIMAL(16,2) DEFAULT NULL COMMENT '上月应付总金额',
last_month_prepay_cny DECIMAL(16,2) DEFAULT NULL COMMENT '上月预付费金额',
last_month_postpay_cny DECIMAL(16,2) DEFAULT NULL COMMENT '上月后付费金额',
current_month_payable_total_cny DECIMAL(16,2) DEFAULT NULL COMMENT '本月应付总金额',
current_month_prepay_cny DECIMAL(16,2) DEFAULT NULL COMMENT '本月预付费金额',
current_month_postpay_cny DECIMAL(16,2) DEFAULT NULL COMMENT '本月后付费金额',
data_hash VARCHAR(64) DEFAULT NULL COMMENT '数据指纹(去重用)',
captured_at DATE NOT NULL COMMENT '抓取日期',
created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (id),
UNIQUE KEY uk_account_month (account_id, snapshot_month, captured_at),
KEY idx_snapshot_month (snapshot_month),
CONSTRAINT fk_snapshot_customer FOREIGN KEY (account_id) REFERENCES aps_customer(account_id) ON UPDATE CASCADE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='APS客户月度快照(追加)'
""",
# 3. Product ratio per snapshot
"""
CREATE TABLE IF NOT EXISTS aps_customer_product_ratio (
id BIGINT NOT NULL AUTO_INCREMENT,
account_id VARCHAR(32) NOT NULL,
snapshot_month VARCHAR(7) NOT NULL COMMENT 'YYYY-MM',
product_name VARCHAR(128) NOT NULL COMMENT '产品分类名称',
ratio VARCHAR(16) NOT NULL COMMENT '占比(如 41.73%)',
created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (id),
KEY idx_ratio_account_month (account_id, snapshot_month),
CONSTRAINT fk_ratio_customer FOREIGN KEY (account_id) REFERENCES aps_customer(account_id) ON UPDATE CASCADE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='APS客户产品占比'
""",
# 4. Orders — UPSERT on order_id
"""
CREATE TABLE IF NOT EXISTS aps_order (
order_id VARCHAR(32) NOT NULL COMMENT '订单号',
customer_account_id VARCHAR(32) DEFAULT NULL COMMENT '客户account_id(FK, 可为空表示未映射)',
customer_login_name VARCHAR(128) NOT NULL DEFAULT '' COMMENT '客户账号(冗余,方便查询)',
customer_category VARCHAR(64) DEFAULT NULL COMMENT '客户分类',
order_type VARCHAR(32) DEFAULT NULL COMMENT '订单类型',
original_price_cny DECIMAL(16,2) DEFAULT NULL COMMENT '订单原价',
paid_amount_cny DECIMAL(16,2) DEFAULT NULL COMMENT '实付金额',
status VARCHAR(32) DEFAULT NULL COMMENT '订单状态',
order_time DATETIME DEFAULT NULL COMMENT '下单时间',
order_month VARCHAR(7) DEFAULT NULL COMMENT '订单月份 YYYY-MM',
created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
updated_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (order_id),
KEY idx_order_customer (customer_account_id),
KEY idx_order_month (order_month),
CONSTRAINT fk_order_customer FOREIGN KEY (customer_account_id) REFERENCES aps_customer(account_id) ON UPDATE CASCADE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='APS订单表(UPSERT)'
""",
# 5. Bills — DELETE+INSERT per (billing_month, customer)
"""
CREATE TABLE IF NOT EXISTS aps_bill (
id BIGINT NOT NULL AUTO_INCREMENT,
billing_month VARCHAR(7) NOT NULL COMMENT '账期 YYYY-MM or YYYYMM',
customer_account_id VARCHAR(32) DEFAULT NULL COMMENT '客户account_id(FK, 可为空表示未映射)',
customer_login_name VARCHAR(128) NOT NULL DEFAULT '' COMMENT '客户账号',
bill_type VARCHAR(32) DEFAULT NULL COMMENT '账单类型',
payment_type VARCHAR(32) DEFAULT NULL COMMENT '付款类型',
consumption_time VARCHAR(32) DEFAULT NULL COMMENT '消费时间',
customer_uid VARCHAR(32) DEFAULT NULL COMMENT '客户UID',
customer_category VARCHAR(64) DEFAULT NULL COMMENT '客户分类',
customer_type VARCHAR(64) DEFAULT NULL COMMENT '客户类型',
product_code VARCHAR(128) DEFAULT NULL COMMENT '产品code',
product_name VARCHAR(256) DEFAULT NULL COMMENT '产品名称',
product_category VARCHAR(64) DEFAULT NULL COMMENT '产品分类',
cloud_product_instance_id VARCHAR(256) DEFAULT NULL COMMENT '云产品实例ID',
staff_account VARCHAR(64) DEFAULT NULL COMMENT '员工账号',
opportunity_id VARCHAR(64) DEFAULT NULL COMMENT '商机ID',
original_price_cny DECIMAL(16,6) DEFAULT NULL COMMENT '原价',
customer_payable_amount_cny DECIMAL(16,6) DEFAULT NULL COMMENT '客户应付金额',
voucher_amount_cny DECIMAL(16,6) DEFAULT NULL COMMENT '代金券金额',
stored_value_card_amount_cny DECIMAL(16,6) DEFAULT NULL COMMENT '储值卡金额',
cash_paid_amount_cny DECIMAL(16,6) DEFAULT NULL COMMENT '现金支付金额',
commission_month VARCHAR(7) DEFAULT NULL COMMENT '佣金月份',
non_business_compensation_amount_cny DECIMAL(16,6) DEFAULT NULL COMMENT '非业务类赔偿金额',
actual_paid_amount_cny DECIMAL(16,6) DEFAULT NULL COMMENT '实付金额',
actual_paid_plus_compensation_cny DECIMAL(16,6) DEFAULT NULL COMMENT '实付+非业务类赔偿金额',
included_in_performance VARCHAR(8) DEFAULT NULL COMMENT '是否计入业绩',
excluded_from_performance_reason VARCHAR(256) DEFAULT NULL COMMENT '不计业绩原因',
rebated VARCHAR(8) DEFAULT NULL COMMENT '是否返佣',
non_rebate_reason VARCHAR(256) DEFAULT NULL COMMENT '不返佣原因',
main_order_id VARCHAR(64) DEFAULT NULL COMMENT '主订单号',
sub_order_id VARCHAR(64) DEFAULT NULL COMMENT '子订单号',
order_cycle VARCHAR(32) DEFAULT NULL COMMENT '订单周期',
order_type VARCHAR(32) DEFAULT NULL COMMENT '订单类型',
original_order_id VARCHAR(64) DEFAULT NULL COMMENT '原订单号',
service_start_time VARCHAR(32) DEFAULT NULL COMMENT '服务开始时间',
service_end_time VARCHAR(32) DEFAULT NULL COMMENT '服务结束时间',
invite_register_type VARCHAR(64) DEFAULT NULL COMMENT '邀约注册类型',
staff_account_id_at_close VARCHAR(64) DEFAULT NULL,
staff_login_at_close VARCHAR(64) DEFAULT NULL,
staff_name_at_close VARCHAR(64) DEFAULT NULL,
staff_account_id_at_export VARCHAR(64) DEFAULT NULL,
staff_login_at_export VARCHAR(64) DEFAULT NULL,
staff_name_at_export VARCHAR(64) DEFAULT NULL,
partner_uid VARCHAR(32) DEFAULT NULL COMMENT '伙伴UID',
created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (id),
KEY idx_bill_customer (customer_account_id),
KEY idx_bill_month (billing_month),
KEY idx_bill_commission (commission_month),
CONSTRAINT fk_bill_customer FOREIGN KEY (customer_account_id) REFERENCES aps_customer(account_id) ON UPDATE CASCADE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='APS账单明细(按月全量替换)'
""",
# 7. Order detail — UPSERT on order_id
"""
CREATE TABLE IF NOT EXISTS aps_order_detail (
id INT AUTO_INCREMENT PRIMARY KEY,
order_id VARCHAR(32) NOT NULL COMMENT '订单号',
order_type VARCHAR(32) COMMENT '订单类型',
status VARCHAR(32) COMMENT '订单状态',
trade_type VARCHAR(32) COMMENT '交易类型',
customer_category VARCHAR(64) COMMENT '客户分类',
dealer_name VARCHAR(256) COMMENT '二级经销商名称',
dealer_uid VARCHAR(64) COMMENT '二级经销商UID',
customer_type VARCHAR(64) COMMENT '客户类型',
opportunity_id VARCHAR(64) COMMENT '商机ID',
payment_time DATETIME COMMENT '支付时间',
order_time DATETIME COMMENT '下单时间',
product_name VARCHAR(256) COMMENT '产品名称',
product_code VARCHAR(64) COMMENT '产品code',
original_price_cny DECIMAL(14,2) COMMENT '订单原价(CNY)',
paid_amount_cny DECIMAL(14,2) COMMENT '实付金额(CNY)',
discount DECIMAL(8,4) COMMENT '订单折扣',
payable_amount_cny DECIMAL(14,2) COMMENT '应付金额(实付+代金券)(CNY)',
coupon_amount_cny DECIMAL(14,2) COMMENT '代金券金额(CNY)',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
UNIQUE KEY uk_order_id (order_id)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='APS订单详情表(UPSERT)'
""",
# 6. Sync log — tracks each run
"""
CREATE TABLE IF NOT EXISTS aps_sync_log (
id BIGINT NOT NULL AUTO_INCREMENT,
sync_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
json_file VARCHAR(512) NOT NULL COMMENT 'source json path',
billing_month VARCHAR(7) NOT NULL,
customer_count INT DEFAULT 0,
order_count INT DEFAULT 0,
bill_count INT DEFAULT 0,
snapshot_count INT DEFAULT 0,
status VARCHAR(16) NOT NULL DEFAULT 'success' COMMENT 'success/failed',
error_message TEXT DEFAULT NULL,
duration_seconds DECIMAL(8,2) DEFAULT NULL,
PRIMARY KEY (id),
KEY idx_sync_month (billing_month)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='APS同步执行日志'
""",
]
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def safe_decimal(v: object) -> float | None:
"""Convert to float or None. Handles '- -', NaN, empty."""
if v is None:
return None
if isinstance(v, str):
v = v.strip()
if v in ("", "- -", "--", "null"):
return None
try:
return float(v)
except ValueError:
return None
if isinstance(v, (int, float)):
if math.isnan(v):
return None
return float(v)
return None
def safe_bool(v: object) -> int | None:
"""Convert ''/''/True/False to 1/0/None."""
if v is None:
return None
if isinstance(v, bool):
return 1 if v else 0
if isinstance(v, str):
if v == "" or v.lower() == "true":
return 1
if v == "" or v.lower() == "false":
return 0
return None
def safe_str(v: object, max_len: int | None = None) -> str | None:
if v is None:
return None
s = str(v).strip()
if s in ("", "- -", "--", "", "null", "None"):
return None
if max_len and len(s) > max_len:
s = s[:max_len]
return s
def parse_datetime(v: object) -> datetime | str | None:
if v is None:
return None
s = safe_str(v)
if s is None:
return s
for fmt in ("%Y-%m-%d %H:%M:%S", "%Y-%m-%d", "%Y/%m/%d %H:%M:%S"):
try:
return datetime.strptime(s, fmt)
except ValueError:
continue
return s # fallback: store as string
def parse_date(v: object) -> date | None:
if v is None:
return None
s = safe_str(v)
if s is None:
return s
for fmt in ("%Y-%m-%d", "%Y/%m/%d"):
try:
return datetime.strptime(s, fmt).date()
except ValueError:
continue
return None
def data_hash(obj: JsonDict) -> str:
"""Deterministic hash of dict for change detection."""
raw = json.dumps(obj, sort_keys=True, default=str)
return hashlib.md5(raw.encode()).hexdigest()
def build_login_to_account_map(customers: JsonList) -> dict[str, str]:
"""login_name -> account_id, with fuzzy key (spaces stripped) as fallback."""
m: dict[str, str] = {}
for c in customers:
ln = cast(str, c["login_name"])
account_id = cast(str, c["account_id"])
m[ln] = account_id
# 同时存一个去空格的 key 做模糊匹配
stripped = ln.replace(" ", "")
if stripped != ln:
m[stripped] = account_id
return m
def resolve_account_id(login_to_account: dict[str, str], customer_account: str | None) -> str | None:
"""尝试精确匹配,失败则去空格再匹配。"""
if not customer_account:
return None
account_id = login_to_account.get(customer_account)
if account_id:
return account_id
# 去空格后重试
stripped = customer_account.replace(" ", "")
return login_to_account.get(stripped)
def is_valid_order_id(order_id: str | None) -> bool:
"""APS 订单号应为纯数字;过滤“没有数据”等占位值。"""
return bool(order_id and order_id.isdigit())
# ---------------------------------------------------------------------------
# Sync logic
# ---------------------------------------------------------------------------
class APSSyncer:
def __init__(self, db_config: DbConfig | None = None):
self.db_config: DbConfig = db_config or DB_CONFIG
self.conn: Connection[DictCursor] | None = None
self.stats: StatsDict = {"customers": 0, "customer_details": 0, "snapshots": 0, "orders": 0, "order_details": 0, "bills": 0}
def _require_conn(self) -> Connection[DictCursor]:
if self.conn is None:
raise RuntimeError("Database connection is not initialized")
return self.conn
def connect(self) -> None:
self.conn = pymysql.connect(
host=self.db_config["host"],
port=self.db_config["port"],
user=self.db_config["user"],
password=self.db_config["password"],
database=self.db_config["database"],
charset=self.db_config["charset"],
cursorclass=DictCursor,
)
logger.info("Connected to MySQL %s:%s/%s", self.db_config["host"],
self.db_config["port"], self.db_config["database"])
def close(self) -> None:
if self.conn:
self.conn.close()
def ensure_schema(self) -> None:
"""Create tables if not exist."""
conn = self._require_conn()
with conn.cursor() as cur:
for ddl in DDL_STATEMENTS:
_ = cur.execute(ddl)
# 兼容历史库:允许订单在 customer 未映射时也可入库。
# MySQL 外键允许 NULL因此这里仅放宽列约束不移除 FK。
_ = cur.execute("""
ALTER TABLE aps_order
MODIFY COLUMN customer_account_id VARCHAR(32) NULL COMMENT '客户account_id(FK, 可为空表示未映射)',
MODIFY COLUMN customer_login_name VARCHAR(128) NOT NULL DEFAULT '' COMMENT '客户账号(冗余,方便查询)'
""")
_ = cur.execute("""
ALTER TABLE aps_bill
MODIFY COLUMN customer_account_id VARCHAR(32) NULL COMMENT '客户account_id(FK, 可为空表示未映射)',
MODIFY COLUMN customer_login_name VARCHAR(128) NOT NULL DEFAULT '' COMMENT '客户账号'
""")
conn.commit()
logger.info("Schema ensured (%d tables)", len(DDL_STATEMENTS))
@staticmethod
def normalize_month(v: object) -> str | None:
s = safe_str(v)
if s is None or s == "没有数据":
return None
if len(s) == 6 and s.isdigit():
return f"{s[:4]}-{s[4:]}"
if len(s) == 7 and s[4] == "-":
return s
return None
@staticmethod
def load_json_records(file_path: Path) -> JsonList:
with open(file_path, "r", encoding="utf-8") as f:
data = cast(object, json.load(f))
if isinstance(data, dict):
data_dict = cast(JsonDict, data)
records = data_dict.get("records")
if not isinstance(records, list):
return []
record_list = cast(list[object], records)
normalized_records: JsonList = []
for record in record_list:
if isinstance(record, dict):
normalized_records.append(cast(JsonDict, record))
return normalized_records
if isinstance(data, list):
data_list = cast(list[object], data)
return [cast(JsonDict, record) for record in data_list if isinstance(record, dict)]
return []
def resolve_data_files(self, data_dir: str) -> tuple[Path, Path, Path, Path, Path]:
root = Path(data_dir)
if not root.exists() or not root.is_dir():
raise FileNotFoundError(f"Data directory not found: {root}")
customers_file = root / "customers.json"
orders_file = root / "orders.json"
order_details_file = root / "orderDetails.json"
bills_file = root / "bills.json"
customer_details_file = root / "customerDetails.json"
for fp in (customers_file, orders_file, order_details_file, bills_file):
if not fp.exists():
raise FileNotFoundError(f"Required JSON file not found: {fp}")
return customers_file, orders_file, order_details_file, bills_file, customer_details_file
@staticmethod
def normalize_customer_record(raw: JsonDict) -> JsonDict | None:
account_id = safe_str(raw.get("accountId"), 32)
login_name = safe_str(raw.get("loginName"), 128)
if not account_id or not login_name:
return None
inactive = safe_str(raw.get("inactiveMonths"))
no_consumption_months = int(inactive) if inactive and inactive.isdigit() else None
return {
"account_id": account_id,
"login_name": login_name,
"real_name": safe_str(raw.get("realName"), 256),
"report_source": safe_str(raw.get("reportSource"), 64),
"report_type": safe_str(raw.get("reportType"), 64),
"trade_mode": safe_str(raw.get("tradeMode"), 64),
"real_name_status": safe_str(raw.get("authStatus"), 64),
"relation_date": parse_datetime(raw.get("relationTime")),
"follow_staff": safe_str(raw.get("owner"), 64),
"payment_notice_status": safe_str(raw.get("paymentNoticeStatus"), 32),
"invite_register_type": safe_str(raw.get("inviteType"), 64),
"is_new_customer": safe_bool(raw.get("isNewCustomer")),
"performance_start_point_reached": safe_bool(raw.get("isPerformanceQualified")),
"customer_category": safe_str(raw.get("customerCategory"), 64),
"remark": safe_str(raw.get("remark"), 512),
"no_consumption_months": no_consumption_months,
"planned_release_time": parse_date(raw.get("releasePlanTime")),
"planned_release_reason": safe_str(raw.get("releasePlanReason"), 256),
"account_cash_balance_cny": safe_decimal(raw.get("cashBalanceCny")),
"pending_invoice_amount_cny": safe_decimal(raw.get("invoicePendingCny")),
"last_month_consumption_cny": safe_decimal(raw.get("lastMonthConsumptionCny")),
"current_month_consumption_cny": safe_decimal(raw.get("thisMonthConsumptionCny")),
}
# ---- Customer master UPSERT ----
def upsert_customer(self, c: JsonDict) -> None:
sql = """
INSERT INTO aps_customer (
account_id, login_name, real_name,
report_source, report_type, trade_mode, real_name_status,
relation_date, follow_staff, payment_notice_status,
invite_register_type, is_new_customer, performance_start_point_reached,
customer_category, remark, no_consumption_months,
planned_release_time, planned_release_reason,
customer_name, customer_type, customer_source,
email, phone, department
) VALUES (
%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s
) ON DUPLICATE KEY UPDATE
login_name=VALUES(login_name), real_name=VALUES(real_name),
report_source=VALUES(report_source), report_type=VALUES(report_type),
trade_mode=VALUES(trade_mode), real_name_status=VALUES(real_name_status),
relation_date=VALUES(relation_date), follow_staff=VALUES(follow_staff),
payment_notice_status=VALUES(payment_notice_status),
invite_register_type=VALUES(invite_register_type),
is_new_customer=VALUES(is_new_customer),
performance_start_point_reached=VALUES(performance_start_point_reached),
customer_category=VALUES(customer_category), remark=VALUES(remark),
no_consumption_months=VALUES(no_consumption_months),
planned_release_time=VALUES(planned_release_time),
planned_release_reason=VALUES(planned_release_reason),
customer_name=VALUES(customer_name), customer_type=VALUES(customer_type),
customer_source=VALUES(customer_source),
email=VALUES(email), phone=VALUES(phone), department=VALUES(department)
"""
params = (
c["account_id"], c["login_name"], safe_str(c.get("real_name")),
safe_str(c.get("report_source")), safe_str(c.get("report_type")),
safe_str(c.get("trade_mode")),
safe_str(c.get("real_name_status")),
c.get("relation_date"),
safe_str(c.get("follow_staff")),
safe_str(c.get("payment_notice_status")),
safe_str(c.get("invite_register_type")),
c.get("is_new_customer"),
c.get("performance_start_point_reached"),
safe_str(c.get("customer_category")),
safe_str(c.get("remark")),
c.get("no_consumption_months"),
c.get("planned_release_time"),
safe_str(c.get("planned_release_reason")),
None, None, None, None, None, None,
)
conn = self._require_conn()
with conn.cursor() as cur:
_ = cur.execute(sql, params)
self.stats["customers"] += 1
# ---- Customer snapshot (append, skip if hash identical) ----
def insert_snapshot(self, c: JsonDict, snapshot_month: str, captured_at: str) -> None:
financial: JsonDict = {
"balance": safe_decimal(c.get("account_cash_balance_cny")),
"pending": safe_decimal(c.get("pending_invoice_amount_cny")),
"last_consumption": safe_decimal(c.get("last_month_consumption_cny")),
"curr_consumption": safe_decimal(c.get("current_month_consumption_cny")),
"last_payable": None,
"last_pre": None,
"last_post": None,
"curr_payable": None,
"curr_pre": None,
"curr_post": None,
}
h = data_hash(financial)
cap_date = parse_date(captured_at)
# Check if same hash already exists for this account+month+date
conn = self._require_conn()
with conn.cursor() as cur:
_ = cur.execute(
"SELECT id FROM aps_customer_snapshot WHERE account_id=%s AND snapshot_month=%s AND captured_at=%s",
(c["account_id"], snapshot_month, cap_date),
)
if cur.fetchone():
# Already have snapshot for this date, update it
_ = cur.execute("""
UPDATE aps_customer_snapshot SET
account_cash_balance_cny=%s, pending_invoice_amount_cny=%s,
last_month_consumption_cny=%s, current_month_consumption_cny=%s,
last_month_payable_total_cny=%s, last_month_prepay_cny=%s, last_month_postpay_cny=%s,
current_month_payable_total_cny=%s, current_month_prepay_cny=%s, current_month_postpay_cny=%s,
data_hash=%s
WHERE account_id=%s AND snapshot_month=%s AND captured_at=%s
""", (
financial["balance"], financial["pending"],
financial["last_consumption"], financial["curr_consumption"],
financial["last_payable"], financial["last_pre"], financial["last_post"],
financial["curr_payable"], financial["curr_pre"], financial["curr_post"],
h, c["account_id"], snapshot_month, cap_date,
))
else:
_ = cur.execute("""
INSERT INTO aps_customer_snapshot (
account_id, snapshot_month,
account_cash_balance_cny, pending_invoice_amount_cny,
last_month_consumption_cny, current_month_consumption_cny,
last_month_payable_total_cny, last_month_prepay_cny, last_month_postpay_cny,
current_month_payable_total_cny, current_month_prepay_cny, current_month_postpay_cny,
data_hash, captured_at
) VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)
""", (
c["account_id"], snapshot_month,
financial["balance"], financial["pending"],
financial["last_consumption"], financial["curr_consumption"],
financial["last_payable"], financial["last_pre"], financial["last_post"],
financial["curr_payable"], financial["curr_pre"], financial["curr_post"],
h, cap_date,
))
self.stats["snapshots"] += 1
# ---- Orders UPSERT ----
def upsert_orders(self, records: JsonList, login_to_account: dict[str, str]) -> None:
sql = """
INSERT INTO aps_order (
order_id, customer_account_id, customer_login_name,
customer_category, order_type, original_price_cny, paid_amount_cny,
status, order_time, order_month
) VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)
ON DUPLICATE KEY UPDATE
customer_category=VALUES(customer_category),
order_type=VALUES(order_type),
original_price_cny=VALUES(original_price_cny),
paid_amount_cny=VALUES(paid_amount_cny),
status=VALUES(status),
order_time=VALUES(order_time),
order_month=VALUES(order_month)
"""
conn = self._require_conn()
with conn.cursor() as cur:
for o in records:
order_id = safe_str(o.get("orderId"), 32)
customer_login = safe_str(o.get("customerAccount"), 128)
if not order_id:
logger.warning("Skipping order without orderId: %s", o)
continue
if not is_valid_order_id(order_id):
logger.warning("Skipping placeholder/invalid orderId '%s'", order_id)
continue
account_id = resolve_account_id(login_to_account, customer_login)
if not customer_login:
logger.warning("Order %s has empty customerAccount; inserting with NULL customer_account_id", order_id)
elif not account_id:
logger.warning("Order %s has unresolved customerAccount '%s'; inserting with NULL customer_account_id", order_id, customer_login)
created_at_raw = o.get("createdAt")
order_time = parse_datetime(created_at_raw)
order_month = None
if order_time and isinstance(order_time, datetime):
order_month = order_time.strftime("%Y-%m")
else:
created_at_prefix = created_at_raw[:7] if isinstance(created_at_raw, str) and len(created_at_raw) >= 7 else None
order_month = self.normalize_month(created_at_prefix)
_ = cur.execute(sql, (
order_id, account_id, customer_login or "",
safe_str(o.get("customerCategory")), safe_str(o.get("orderType")),
safe_decimal(o.get("orderOriginalPriceCny")),
safe_decimal(o.get("actualPaidCny")),
safe_str(o.get("orderStatus")), order_time, order_month,
))
self.stats["orders"] += 1
# ---- Order details UPSERT ----
def upsert_order_details(self, records: JsonList) -> None:
sql = """
INSERT INTO aps_order_detail (
order_id, order_type, status, trade_type, customer_category,
dealer_name, dealer_uid, customer_type, opportunity_id,
payment_time, order_time, product_name, product_code,
original_price_cny, paid_amount_cny, discount,
payable_amount_cny, coupon_amount_cny
) VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)
ON DUPLICATE KEY UPDATE
order_type=VALUES(order_type),
status=VALUES(status),
trade_type=VALUES(trade_type),
customer_category=VALUES(customer_category),
dealer_name=VALUES(dealer_name),
dealer_uid=VALUES(dealer_uid),
customer_type=VALUES(customer_type),
opportunity_id=VALUES(opportunity_id),
payment_time=VALUES(payment_time),
order_time=VALUES(order_time),
product_name=VALUES(product_name),
product_code=VALUES(product_code),
original_price_cny=VALUES(original_price_cny),
paid_amount_cny=VALUES(paid_amount_cny),
discount=VALUES(discount),
payable_amount_cny=VALUES(payable_amount_cny),
coupon_amount_cny=VALUES(coupon_amount_cny)
"""
conn = self._require_conn()
with conn.cursor() as cur:
for d in records:
order_id = safe_str(d.get("orderId"), 32)
if not order_id:
logger.warning("Skipping order detail without orderId: %s", d)
continue
_ = cur.execute(sql, (
order_id,
safe_str(d.get("orderType"), 32),
safe_str(d.get("status"), 32),
safe_str(d.get("tradeType"), 32),
safe_str(d.get("customerCategory"), 64),
safe_str(d.get("dealerName"), 256),
safe_str(d.get("dealerUid"), 64),
safe_str(d.get("customerType"), 64),
safe_str(d.get("opportunityId"), 64),
parse_datetime(d.get("paymentTime")),
parse_datetime(d.get("orderTime")),
safe_str(d.get("productName"), 256),
safe_str(d.get("productCode"), 64),
safe_decimal(d.get("originalPriceCny")),
safe_decimal(d.get("paidAmountCny")),
safe_decimal(d.get("discount")),
safe_decimal(d.get("payableAmountCny")),
safe_decimal(d.get("couponAmountCny")),
))
self.stats["order_details"] += 1
def update_customer_details(self, records: JsonList, snapshot_month: str) -> None:
sql_customer = """
UPDATE aps_customer SET
customer_name=%s, customer_type=%s, customer_source=%s,
email=%s, phone=%s, department=%s
WHERE account_id=%s
"""
sql_snapshot = """
UPDATE aps_customer_snapshot SET
last_month_payable_total_cny=%s, last_month_prepay_cny=%s, last_month_postpay_cny=%s,
current_month_payable_total_cny=%s, current_month_prepay_cny=%s, current_month_postpay_cny=%s
WHERE account_id=%s AND snapshot_month=%s
"""
conn = self._require_conn()
with conn.cursor() as cur:
for r in records:
account_id = safe_str(r.get("accountId"), 32)
if not account_id:
logger.warning("Skipping customer detail without accountId: %s", r)
continue
_ = cur.execute(sql_customer, (
safe_str(r.get("customerName"), 256),
safe_str(r.get("customerType"), 64),
safe_str(r.get("customerSource"), 64),
safe_str(r.get("email"), 256),
safe_str(r.get("phone"), 32),
safe_str(r.get("department"), 128),
account_id,
))
_ = cur.execute(sql_snapshot, (
safe_decimal(r.get("lastMonthPayableTotalCny")),
safe_decimal(r.get("lastMonthPrepayCny")),
safe_decimal(r.get("lastMonthPostpayCny")),
safe_decimal(r.get("currentMonthPayableTotalCny")),
safe_decimal(r.get("currentMonthPrepayCny")),
safe_decimal(r.get("currentMonthPostpayCny")),
account_id,
snapshot_month,
))
self.stats["customer_details"] += 1
# ---- Bills sync ----
def sync_bills(self, records: JsonList, login_to_account: dict[str, str], incremental: bool = False) -> None:
grouped: dict[tuple[str | None, str, str], JsonList] = {}
skipped = 0
for b in records:
billing_month = self.normalize_month(b.get("billingMonth"))
commission_month = self.normalize_month(b.get("commissionMonth"))
customer_login = safe_str(b.get("customerAccount"), 128)
if not billing_month or not commission_month:
skipped += 1
continue
account_id = resolve_account_id(login_to_account, customer_login)
if not customer_login:
logger.warning("Bill row has empty customerAccount; inserting with NULL customer_account_id, commission_month=%s", commission_month)
elif not account_id:
logger.warning("Bill row has unresolved customerAccount '%s'; inserting with NULL customer_account_id, commission_month=%s", customer_login, commission_month)
key = (account_id, customer_login or "", commission_month)
grouped.setdefault(key, []).append(b)
if skipped:
logger.info("Filtered out %d invalid/unresolved bill rows", skipped)
if not grouped:
return
conn = self._require_conn()
with conn.cursor() as cur:
sql = """
INSERT INTO aps_bill (
billing_month, customer_account_id, customer_login_name,
bill_type, consumption_time,
customer_category,
product_name, product_category,
original_price_cny, customer_payable_amount_cny,
commission_month,
included_in_performance,
rebated,
invite_register_type,
service_start_time, service_end_time
) VALUES (
%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s
)
"""
for (account_id, login_name, commission_month), rows in grouped.items():
if incremental:
# 增量模式:逐条检查,已存在则跳过
for b in rows:
bm = self.normalize_month(b.get("billingMonth"))
product_name = safe_str(b.get("productName"))
consumption_time = safe_str(b.get("consumeDate"))
original_price = safe_decimal(b.get("originalPriceCny"))
# 用 billing_month + customer + product + consumption_time + original_price 联合判断是否重复
if account_id is None:
_ = cur.execute(
"""SELECT COUNT(*) as cnt FROM aps_bill
WHERE customer_account_id IS NULL AND customer_login_name=%s AND commission_month=%s
AND billing_month=%s AND product_name=%s
AND consumption_time=%s AND original_price_cny=%s""",
(login_name, commission_month, bm,
product_name, consumption_time, original_price),
)
else:
_ = cur.execute(
"""SELECT COUNT(*) as cnt FROM aps_bill
WHERE customer_account_id=%s AND commission_month=%s
AND billing_month=%s AND product_name=%s
AND consumption_time=%s AND original_price_cny=%s""",
(account_id, commission_month, bm,
product_name, consumption_time, original_price),
)
row = cur.fetchone()
if row and (row.get("cnt", 0) or row.get("COUNT(*)", 0)) > 0:
continue # 已存在,跳过
_ = cur.execute(sql, (
bm, account_id, login_name,
safe_str(b.get("billType")), safe_str(b.get("consumeDate")),
safe_str(b.get("customerCategory")),
safe_str(b.get("productName")), safe_str(b.get("productCategory")),
safe_decimal(b.get("originalPriceCny")),
safe_decimal(b.get("customerPayableCny")),
self.normalize_month(b.get("commissionMonth")),
safe_str(b.get("countsForPerformance")),
safe_str(b.get("commissionable")),
safe_str(b.get("inviteType")),
safe_str(b.get("serviceStartAt")), safe_str(b.get("serviceEndAt")),
))
self.stats["bills"] += 1
else:
# 全量模式DELETE+INSERT per customer+month
if account_id is None:
_ = cur.execute(
"DELETE FROM aps_bill WHERE customer_account_id IS NULL AND customer_login_name=%s AND commission_month=%s",
(login_name, commission_month),
)
else:
_ = cur.execute(
"DELETE FROM aps_bill WHERE customer_account_id=%s AND commission_month=%s",
(account_id, commission_month),
)
for b in rows:
_ = cur.execute(sql, (
self.normalize_month(b.get("billingMonth")), account_id, login_name,
safe_str(b.get("billType")), safe_str(b.get("consumeDate")),
safe_str(b.get("customerCategory")),
safe_str(b.get("productName")), safe_str(b.get("productCategory")),
safe_decimal(b.get("originalPriceCny")),
safe_decimal(b.get("customerPayableCny")),
self.normalize_month(b.get("commissionMonth")),
safe_str(b.get("countsForPerformance")),
safe_str(b.get("commissionable")),
safe_str(b.get("inviteType")),
safe_str(b.get("serviceStartAt")), safe_str(b.get("serviceEndAt")),
))
self.stats["bills"] += 1
# ---- Main sync entry ----
def sync_from_json(self, data_dir: str, incremental: bool = False) -> StatsDict:
start = datetime.now()
customers_file, orders_file, order_details_file, bills_file, customer_details_file = self.resolve_data_files(data_dir)
logger.info("Loading source files from %s%s", data_dir, " (增量模式)" if incremental else "")
raw_customers = self.load_json_records(customers_file)
raw_orders = self.load_json_records(orders_file)
raw_order_details = self.load_json_records(order_details_file)
raw_bills = self.load_json_records(bills_file)
raw_customer_details: JsonList = []
try:
if customer_details_file.exists():
raw_customer_details = self.load_json_records(customer_details_file)
else:
logger.info("Optional file missing, skip customer details: %s", customer_details_file)
except Exception as e:
logger.warning("Failed to load optional customer details file %s: %s", customer_details_file, e)
captured_at = date.today().isoformat()
billing_month = None
for b in raw_bills:
billing_month = self.normalize_month(b.get("billingMonth")) or self.normalize_month(b.get("commissionMonth"))
if billing_month:
break
if not billing_month:
billing_month = datetime.now().strftime("%Y-%m")
self.stats = {"customers": 0, "customer_details": 0, "snapshots": 0, "orders": 0, "order_details": 0, "bills": 0}
try:
self.connect()
self.ensure_schema()
customers: JsonList = []
skipped_customers = 0
for raw in raw_customers:
c = self.normalize_customer_record(raw)
if not c:
skipped_customers += 1
continue
customers.append(c)
self.upsert_customer(c)
self.insert_snapshot(c, billing_month, captured_at)
if skipped_customers:
logger.info("Skipped %d invalid customer rows", skipped_customers)
login_to_account = build_login_to_account_map(customers)
logger.info("Resolved %d customer login_name -> account_id mappings", len(login_to_account))
if raw_customer_details:
self.update_customer_details(raw_customer_details, billing_month)
self.upsert_orders(raw_orders, login_to_account)
self.upsert_order_details(raw_order_details)
self.sync_bills(raw_bills, login_to_account, incremental=incremental)
# Log sync
duration = (datetime.now() - start).total_seconds()
conn = self._require_conn()
with conn.cursor() as cur:
_ = cur.execute("""
INSERT INTO aps_sync_log (json_file, billing_month, customer_count, order_count, bill_count, snapshot_count, status, duration_seconds)
VALUES (%s,%s,%s,%s,%s,%s,'success',%s)
""", (
str(Path(data_dir)), billing_month,
self.stats["customers"], self.stats["orders"],
self.stats["bills"], self.stats["snapshots"], duration,
))
conn.commit()
logger.info(
"Sync complete in %.1fs: %d customers, %d customer_details, %d snapshots, %d orders, %d order_details, %d bills",
duration, self.stats["customers"], self.stats["customer_details"],
self.stats["snapshots"], self.stats["orders"], self.stats["order_details"], self.stats["bills"],
)
return self.stats
except Exception as e:
logger.error("Sync failed: %s", e)
if self.conn:
duration = (datetime.now() - start).total_seconds()
try:
conn = self._require_conn()
conn.rollback()
with conn.cursor() as cur:
_ = cur.execute("""
INSERT INTO aps_sync_log (json_file, billing_month, status, error_message, duration_seconds)
VALUES (%s,%s,'failed',%s,%s)
""", (str(Path(data_dir)), billing_month or "unknown", str(e), duration))
conn.commit()
except Exception:
pass
raise
finally:
self.close()
def find_latest_json(self) -> Path | None:
"""Resolve default data dir only when all required JSON files exist."""
try:
_ = self.resolve_data_files(str(JSON_DIR))
return JSON_DIR
except FileNotFoundError:
return None
def get_latest_bill_consumption_time(self) -> str | None:
conn = self._require_conn()
with conn.cursor() as cur:
_ = cur.execute("SELECT MAX(consumption_time) AS latest_time FROM aps_bill")
row = cur.fetchone()
if not row:
return None
latest_time = row.get("latest_time")
if latest_time is None:
return None
if isinstance(latest_time, datetime):
return latest_time.strftime("%Y-%m-%d")
latest_time_value = cast(object, latest_time)
return str(latest_time_value)
# ---------------------------------------------------------------------------
# CLI
# ---------------------------------------------------------------------------
def main() -> None:
import argparse
parser = argparse.ArgumentParser(description="Sync APS Aliyun JSON data to MySQL")
_ = parser.add_argument(
"--dir",
type=str,
default=str(JSON_DIR),
help="Directory containing customers.json, orders.json, orderDetails.json, bills.json",
)
_ = parser.add_argument(
"--incremental",
action="store_true",
default=False,
help="增量模式: 订单UPSERT, 账单检查存在后跳过(不删除历史数据)",
)
_ = parser.add_argument(
"--latest-bill-consumption-time",
action="store_true",
default=False,
help="仅查询 aps_bill 中最新的 consumption_time 并输出",
)
args = parser.parse_args()
data_dir = cast(str, args.dir)
incremental = cast(bool, args.incremental)
latest_bill_consumption_time = cast(bool, args.latest_bill_consumption_time)
syncer = APSSyncer(db_config=DB_CONFIG)
if latest_bill_consumption_time:
syncer.connect()
try:
latest_time = syncer.get_latest_bill_consumption_time()
if latest_time:
print(latest_time)
return
finally:
syncer.close()
_ = syncer.sync_from_json(data_dir, incremental=incremental)
if __name__ == "__main__":
main()