增量
This commit is contained in:
@@ -35,6 +35,7 @@ from pymysql.cursors import DictCursor
|
||||
JsonDict = dict[str, object]
|
||||
JsonList = list[JsonDict]
|
||||
StatsDict = dict[str, int]
|
||||
SyncTarget = str
|
||||
|
||||
|
||||
class DbConfig(TypedDict):
|
||||
@@ -48,7 +49,7 @@ class DbConfig(TypedDict):
|
||||
# ---------------------------------------------------------------------------
|
||||
# Config
|
||||
# ---------------------------------------------------------------------------
|
||||
DB_CONFIG = {
|
||||
DB_CONFIG: DbConfig = {
|
||||
"host": "172.27.137.236",
|
||||
"port": 3306,
|
||||
"user": "ray",
|
||||
@@ -62,6 +63,19 @@ LOG_FORMAT = "%(asctime)s [%(levelname)s] %(message)s"
|
||||
logging.basicConfig(level=logging.INFO, format=LOG_FORMAT)
|
||||
logger = logging.getLogger("aps_sync")
|
||||
|
||||
SYNC_TARGET_ALL = "all"
|
||||
SYNC_TARGET_CUSTOMER = "customer"
|
||||
SYNC_TARGET_ORDER = "order"
|
||||
SYNC_TARGET_ORDERDETAILS = "orderdetails"
|
||||
SYNC_TARGET_BILLS = "bills"
|
||||
VALID_SYNC_TARGETS = {
|
||||
SYNC_TARGET_ALL,
|
||||
SYNC_TARGET_CUSTOMER,
|
||||
SYNC_TARGET_ORDER,
|
||||
SYNC_TARGET_ORDERDETAILS,
|
||||
SYNC_TARGET_BILLS,
|
||||
}
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Schema DDL
|
||||
# ---------------------------------------------------------------------------
|
||||
@@ -382,6 +396,18 @@ def is_valid_order_id(order_id: str | None) -> bool:
|
||||
return bool(order_id and order_id.isdigit())
|
||||
|
||||
|
||||
def normalize_sync_target(sync_target: str | None) -> SyncTarget:
|
||||
if sync_target is None:
|
||||
return SYNC_TARGET_ALL
|
||||
normalized = sync_target.strip().lower()
|
||||
if not normalized:
|
||||
return SYNC_TARGET_ALL
|
||||
if normalized not in VALID_SYNC_TARGETS:
|
||||
valid_targets = ", ".join(sorted(VALID_SYNC_TARGETS))
|
||||
raise ValueError(f"Invalid sync target: {sync_target}. Expected one of: {valid_targets}")
|
||||
return normalized
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Sync logic
|
||||
# ---------------------------------------------------------------------------
|
||||
@@ -465,7 +491,7 @@ class APSSyncer:
|
||||
return [cast(JsonDict, record) for record in data_list if isinstance(record, dict)]
|
||||
return []
|
||||
|
||||
def resolve_data_files(self, data_dir: str) -> tuple[Path, Path, Path, Path, Path]:
|
||||
def resolve_data_files(self, data_dir: str, sync_target: SyncTarget = SYNC_TARGET_ALL) -> tuple[Path, Path, Path, Path, Path]:
|
||||
root = Path(data_dir)
|
||||
if not root.exists() or not root.is_dir():
|
||||
raise FileNotFoundError(f"Data directory not found: {root}")
|
||||
@@ -475,11 +501,33 @@ class APSSyncer:
|
||||
order_details_file = root / "orderDetails.json"
|
||||
bills_file = root / "bills.json"
|
||||
customer_details_file = root / "customerDetails.json"
|
||||
for fp in (customers_file, orders_file, order_details_file, bills_file):
|
||||
|
||||
required_files_by_target = {
|
||||
SYNC_TARGET_ALL: (customers_file, orders_file, order_details_file, bills_file),
|
||||
SYNC_TARGET_CUSTOMER: (customers_file,),
|
||||
SYNC_TARGET_ORDER: (orders_file,),
|
||||
SYNC_TARGET_ORDERDETAILS: (order_details_file,),
|
||||
SYNC_TARGET_BILLS: (bills_file,),
|
||||
}
|
||||
for fp in required_files_by_target[sync_target]:
|
||||
if not fp.exists():
|
||||
raise FileNotFoundError(f"Required JSON file not found: {fp}")
|
||||
return customers_file, orders_file, order_details_file, bills_file, customer_details_file
|
||||
|
||||
def fetch_login_to_account_map(self) -> dict[str, str]:
|
||||
conn = self._require_conn()
|
||||
with conn.cursor() as cur:
|
||||
_ = cur.execute("SELECT login_name, account_id FROM aps_customer")
|
||||
rows = cur.fetchall()
|
||||
|
||||
login_to_account: dict[str, str] = {}
|
||||
for row in rows:
|
||||
login_name = safe_str(row.get("login_name"), 128)
|
||||
account_id = safe_str(row.get("account_id"), 32)
|
||||
if login_name and account_id:
|
||||
login_to_account[login_name] = account_id
|
||||
return login_to_account
|
||||
|
||||
@staticmethod
|
||||
def normalize_customer_record(raw: JsonDict) -> JsonDict | None:
|
||||
account_id = safe_str(raw.get("accountId"), 32)
|
||||
@@ -906,20 +954,38 @@ class APSSyncer:
|
||||
self.stats["bills"] += 1
|
||||
|
||||
# ---- Main sync entry ----
|
||||
def sync_from_json(self, data_dir: str, incremental: bool = False) -> StatsDict:
|
||||
def sync_from_json(self, data_dir: str, incremental: bool = False, sync_target: str = SYNC_TARGET_ALL) -> StatsDict:
|
||||
start = datetime.now()
|
||||
customers_file, orders_file, order_details_file, bills_file, customer_details_file = self.resolve_data_files(data_dir)
|
||||
logger.info("Loading source files from %s%s", data_dir, " (增量模式)" if incremental else "")
|
||||
normalized_sync_target = normalize_sync_target(sync_target)
|
||||
customers_file, orders_file, order_details_file, bills_file, customer_details_file = self.resolve_data_files(
|
||||
data_dir,
|
||||
normalized_sync_target,
|
||||
)
|
||||
logger.info(
|
||||
"Loading source files from %s%s%s",
|
||||
data_dir,
|
||||
" (增量模式)" if incremental else "",
|
||||
"" if normalized_sync_target == SYNC_TARGET_ALL else f" (sync_target={normalized_sync_target})",
|
||||
)
|
||||
|
||||
raw_customers: JsonList = []
|
||||
raw_orders: JsonList = []
|
||||
raw_order_details: JsonList = []
|
||||
raw_bills: JsonList = []
|
||||
if normalized_sync_target in {SYNC_TARGET_ALL, SYNC_TARGET_CUSTOMER}:
|
||||
raw_customers = self.load_json_records(customers_file)
|
||||
if normalized_sync_target in {SYNC_TARGET_ALL, SYNC_TARGET_ORDER}:
|
||||
raw_orders = self.load_json_records(orders_file)
|
||||
if normalized_sync_target in {SYNC_TARGET_ALL, SYNC_TARGET_ORDERDETAILS}:
|
||||
raw_order_details = self.load_json_records(order_details_file)
|
||||
if normalized_sync_target in {SYNC_TARGET_ALL, SYNC_TARGET_BILLS}:
|
||||
raw_bills = self.load_json_records(bills_file)
|
||||
|
||||
raw_customers = self.load_json_records(customers_file)
|
||||
raw_orders = self.load_json_records(orders_file)
|
||||
raw_order_details = self.load_json_records(order_details_file)
|
||||
raw_bills = self.load_json_records(bills_file)
|
||||
raw_customer_details: JsonList = []
|
||||
try:
|
||||
if customer_details_file.exists():
|
||||
if normalized_sync_target in {SYNC_TARGET_ALL, SYNC_TARGET_CUSTOMER} and customer_details_file.exists():
|
||||
raw_customer_details = self.load_json_records(customer_details_file)
|
||||
else:
|
||||
elif normalized_sync_target in {SYNC_TARGET_ALL, SYNC_TARGET_CUSTOMER}:
|
||||
logger.info("Optional file missing, skip customer details: %s", customer_details_file)
|
||||
except Exception as e:
|
||||
logger.warning("Failed to load optional customer details file %s: %s", customer_details_file, e)
|
||||
@@ -941,27 +1007,33 @@ class APSSyncer:
|
||||
|
||||
customers: JsonList = []
|
||||
skipped_customers = 0
|
||||
for raw in raw_customers:
|
||||
c = self.normalize_customer_record(raw)
|
||||
if not c:
|
||||
skipped_customers += 1
|
||||
continue
|
||||
customers.append(c)
|
||||
self.upsert_customer(c)
|
||||
self.insert_snapshot(c, billing_month, captured_at)
|
||||
if normalized_sync_target in {SYNC_TARGET_ALL, SYNC_TARGET_CUSTOMER}:
|
||||
for raw in raw_customers:
|
||||
c = self.normalize_customer_record(raw)
|
||||
if not c:
|
||||
skipped_customers += 1
|
||||
continue
|
||||
customers.append(c)
|
||||
self.upsert_customer(c)
|
||||
self.insert_snapshot(c, billing_month, captured_at)
|
||||
|
||||
if skipped_customers:
|
||||
logger.info("Skipped %d invalid customer rows", skipped_customers)
|
||||
|
||||
login_to_account = build_login_to_account_map(customers)
|
||||
if not login_to_account and normalized_sync_target in {SYNC_TARGET_ORDER, SYNC_TARGET_BILLS}:
|
||||
login_to_account = self.fetch_login_to_account_map()
|
||||
logger.info("Resolved %d customer login_name -> account_id mappings", len(login_to_account))
|
||||
|
||||
if raw_customer_details:
|
||||
self.update_customer_details(raw_customer_details, billing_month)
|
||||
|
||||
self.upsert_orders(raw_orders, login_to_account)
|
||||
self.upsert_order_details(raw_order_details)
|
||||
self.sync_bills(raw_bills, login_to_account, incremental=incremental)
|
||||
if normalized_sync_target in {SYNC_TARGET_ALL, SYNC_TARGET_ORDER}:
|
||||
self.upsert_orders(raw_orders, login_to_account)
|
||||
if normalized_sync_target in {SYNC_TARGET_ALL, SYNC_TARGET_ORDERDETAILS}:
|
||||
self.upsert_order_details(raw_order_details)
|
||||
if normalized_sync_target in {SYNC_TARGET_ALL, SYNC_TARGET_BILLS}:
|
||||
self.sync_bills(raw_bills, login_to_account, incremental=incremental)
|
||||
|
||||
# Log sync
|
||||
duration = (datetime.now() - start).total_seconds()
|
||||
@@ -1051,10 +1123,17 @@ def main() -> None:
|
||||
default=False,
|
||||
help="仅查询 aps_bill 中最新的 consumption_time 并输出",
|
||||
)
|
||||
_ = parser.add_argument(
|
||||
"--sync-target",
|
||||
choices=sorted(VALID_SYNC_TARGETS),
|
||||
default=SYNC_TARGET_ALL,
|
||||
help="选择同步对象: all/customer/order/orderdetails/bills",
|
||||
)
|
||||
args = parser.parse_args()
|
||||
data_dir = cast(str, args.dir)
|
||||
incremental = cast(bool, args.incremental)
|
||||
latest_bill_consumption_time = cast(bool, args.latest_bill_consumption_time)
|
||||
sync_target = cast(str, args.sync_target)
|
||||
|
||||
syncer = APSSyncer(db_config=DB_CONFIG)
|
||||
if latest_bill_consumption_time:
|
||||
@@ -1066,7 +1145,7 @@ def main() -> None:
|
||||
return
|
||||
finally:
|
||||
syncer.close()
|
||||
_ = syncer.sync_from_json(data_dir, incremental=incremental)
|
||||
_ = syncer.sync_from_json(data_dir, incremental=incremental, sync_target=sync_target)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
@@ -12,33 +12,46 @@ import sys
|
||||
import signal
|
||||
import argparse
|
||||
import logging
|
||||
import importlib
|
||||
from pathlib import Path
|
||||
from datetime import datetime
|
||||
from typing import Any, cast
|
||||
|
||||
from aps_db_sync import APSSyncer, DB_CONFIG, JSON_DIR
|
||||
try:
|
||||
from . import aps_db_sync as aps_db_sync_module
|
||||
except ImportError:
|
||||
aps_db_sync_module = importlib.import_module("aps_db_sync")
|
||||
|
||||
APSSyncer = aps_db_sync_module.APSSyncer
|
||||
db_config_default = cast(dict[str, str | int], aps_db_sync_module.DB_CONFIG)
|
||||
json_dir = cast(Path, aps_db_sync_module.JSON_DIR)
|
||||
default_sync_target = cast(str, aps_db_sync_module.SYNC_TARGET_ALL)
|
||||
valid_sync_targets = cast(set[str], aps_db_sync_module.VALID_SYNC_TARGETS)
|
||||
|
||||
LOG_FORMAT = "%(asctime)s [%(levelname)s] %(message)s"
|
||||
logging.basicConfig(level=logging.INFO, format=LOG_FORMAT)
|
||||
logger = logging.getLogger("aps_scheduler")
|
||||
|
||||
WATCH_INTERVAL_SECONDS = 30
|
||||
PROCESSED_MARKER_DIR = JSON_DIR / ".aps_sync_processed"
|
||||
DEFAULT_WATCH_INTERVAL_SECONDS = 30
|
||||
watch_interval_seconds = DEFAULT_WATCH_INTERVAL_SECONDS
|
||||
PROCESSED_MARKER_DIR = json_dir / ".aps_sync_processed"
|
||||
|
||||
|
||||
def _update_watch_interval(value: int):
|
||||
global WATCH_INTERVAL_SECONDS
|
||||
WATCH_INTERVAL_SECONDS = value
|
||||
global watch_interval_seconds
|
||||
watch_interval_seconds = value
|
||||
|
||||
|
||||
class SyncScheduler:
|
||||
def __init__(self, db_config: dict = None):
|
||||
self.db_config = db_config or DB_CONFIG
|
||||
self.running = True
|
||||
def __init__(self, db_config: dict[str, str | int] | None = None, sync_target: str = default_sync_target):
|
||||
self.db_config: dict[str, str | int] = db_config or db_config_default
|
||||
self.sync_target: str = sync_target
|
||||
self.running: bool = True
|
||||
PROCESSED_MARKER_DIR.mkdir(exist_ok=True)
|
||||
signal.signal(signal.SIGINT, self._shutdown)
|
||||
signal.signal(signal.SIGTERM, self._shutdown)
|
||||
_ = signal.signal(signal.SIGINT, self._shutdown)
|
||||
_ = signal.signal(signal.SIGTERM, self._shutdown)
|
||||
|
||||
def _shutdown(self, signum, frame):
|
||||
def _shutdown(self, signum: int, frame: object | None):
|
||||
logger.info("Shutdown signal received, stopping...")
|
||||
self.running = False
|
||||
|
||||
@@ -55,18 +68,18 @@ class SyncScheduler:
|
||||
|
||||
def _mark_processed(self, json_path: Path):
|
||||
marker = self._marker_path(json_path)
|
||||
marker.write_text(datetime.now().isoformat())
|
||||
_ = marker.write_text(datetime.now().isoformat())
|
||||
|
||||
def find_unprocessed_files(self) -> list[Path]:
|
||||
pattern = "aps_aliyun_customers_with_bills_*.json"
|
||||
all_files = sorted(JSON_DIR.glob(pattern), key=lambda p: p.stat().st_mtime)
|
||||
all_files = sorted(json_dir.glob(pattern), key=lambda p: p.stat().st_mtime)
|
||||
return [f for f in all_files if not self._is_processed(f)]
|
||||
|
||||
def sync_file(self, json_path: Path) -> bool:
|
||||
logger.info("Syncing: %s", json_path.name)
|
||||
try:
|
||||
syncer = APSSyncer(db_config=self.db_config)
|
||||
syncer.sync_from_json(str(json_path))
|
||||
syncer = APSSyncer(db_config=cast(Any, self.db_config))
|
||||
_ = syncer.sync_from_json(str(json_path), sync_target=self.sync_target)
|
||||
self._mark_processed(json_path)
|
||||
return True
|
||||
except Exception as e:
|
||||
@@ -86,38 +99,40 @@ class SyncScheduler:
|
||||
return count
|
||||
|
||||
def run_watch(self):
|
||||
logger.info("Watching %s for new JSON files (interval=%ds)", JSON_DIR, WATCH_INTERVAL_SECONDS)
|
||||
self.run_once()
|
||||
logger.info("Watching %s for new JSON files (interval=%ds)", json_dir, watch_interval_seconds)
|
||||
_ = self.run_once()
|
||||
while self.running:
|
||||
time.sleep(WATCH_INTERVAL_SECONDS)
|
||||
time.sleep(watch_interval_seconds)
|
||||
unprocessed = self.find_unprocessed_files()
|
||||
for f in unprocessed:
|
||||
if not self.running:
|
||||
break
|
||||
self.sync_file(f)
|
||||
_ = self.sync_file(f)
|
||||
logger.info("Watcher stopped")
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description="APS Sync Scheduler")
|
||||
parser.add_argument("--mode", choices=["watch", "cron", "daemon"], default="watch",
|
||||
help="watch=file watcher, cron=one-shot, daemon=watch with initial sync")
|
||||
parser.add_argument("--host", default=DB_CONFIG["host"])
|
||||
parser.add_argument("--port", type=int, default=DB_CONFIG["port"])
|
||||
parser.add_argument("--user", default=DB_CONFIG["user"])
|
||||
parser.add_argument("--password", default=DB_CONFIG["password"])
|
||||
parser.add_argument("--database", default=DB_CONFIG["database"])
|
||||
parser.add_argument("--interval", type=int, default=WATCH_INTERVAL_SECONDS,
|
||||
help="Watch interval in seconds")
|
||||
_ = parser.add_argument("--mode", choices=["watch", "cron", "daemon"], default="watch",
|
||||
help="watch=file watcher, cron=one-shot, daemon=watch with initial sync")
|
||||
_ = parser.add_argument("--host", default=db_config_default["host"])
|
||||
_ = parser.add_argument("--port", type=int, default=db_config_default["port"])
|
||||
_ = parser.add_argument("--user", default=db_config_default["user"])
|
||||
_ = parser.add_argument("--password", default=db_config_default["password"])
|
||||
_ = parser.add_argument("--database", default=db_config_default["database"])
|
||||
_ = parser.add_argument("--interval", type=int, default=watch_interval_seconds,
|
||||
help="Watch interval in seconds")
|
||||
_ = parser.add_argument("--sync-target", choices=sorted(valid_sync_targets), default=default_sync_target,
|
||||
help="选择同步对象: all/customer/order/orderdetails/bills")
|
||||
args = parser.parse_args()
|
||||
|
||||
_update_watch_interval(args.interval)
|
||||
|
||||
config = {
|
||||
config: dict[str, str | int] = {
|
||||
"host": args.host, "port": args.port, "user": args.user,
|
||||
"password": args.password, "database": args.database, "charset": "utf8mb4",
|
||||
}
|
||||
scheduler = SyncScheduler(db_config=config)
|
||||
scheduler = SyncScheduler(db_config=config, sync_target=args.sync_target)
|
||||
|
||||
if args.mode == "cron":
|
||||
count = scheduler.run_once()
|
||||
|
||||
Reference in New Issue
Block a user