From a008d416a3ae8e43d08d8b38692a4acc0d9aa2a1 Mon Sep 17 00:00:00 2001 From: ray <1416431931@qq.com> Date: Mon, 27 Apr 2026 14:29:09 +0800 Subject: [PATCH] =?UTF-8?q?checkpoint=E6=96=AD=E7=82=B9=E7=88=AC=E5=8F=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- aliyun-sync/aps-aliyun-sync/aps_db_sync.py | 38 +++++++++++++++++----- 1 file changed, 29 insertions(+), 9 deletions(-) diff --git a/aliyun-sync/aps-aliyun-sync/aps_db_sync.py b/aliyun-sync/aps-aliyun-sync/aps_db_sync.py index f56ecc8..37c3bd3 100644 --- a/aliyun-sync/aps-aliyun-sync/aps_db_sync.py +++ b/aliyun-sync/aps-aliyun-sync/aps_db_sync.py @@ -491,20 +491,40 @@ class APSSyncer: return [cast(JsonDict, record) for record in data_list if isinstance(record, dict)] return [] - def resolve_latest_bills_checkpoint(self, data_dir: str) -> Path: + def resolve_bills_checkpoint_dir(self, data_dir: str) -> Path: current_root = Path(data_dir) checkpoint_root = current_root.parent / "checkpoints" / "bills" if not checkpoint_root.exists() or not checkpoint_root.is_dir(): raise FileNotFoundError(f"Bills checkpoint directory not found: {checkpoint_root}") - candidates = sorted( - checkpoint_root.glob("*.json"), - key=lambda item: item.stat().st_mtime, - reverse=True, - ) + candidates = sorted(checkpoint_root.glob("*.json")) if not candidates: raise FileNotFoundError(f"No bills checkpoint file found in: {checkpoint_root}") - return candidates[0] + return checkpoint_root + + def load_bills_checkpoint_records(self, checkpoint_root: Path) -> JsonList: + candidates = sorted( + checkpoint_root.glob("*.json"), + key=lambda item: item.name, + ) + logger.info("[from-checkpoint] using checkpoint dir: %s", checkpoint_root) + logger.info("[from-checkpoint] found %d checkpoint files", len(candidates)) + + merged: JsonList = [] + seen_hashes: set[str] = set() + for checkpoint_file in candidates: + records = self.load_json_records(checkpoint_file) + logger.info("[from-checkpoint] loading %s (%d records)", checkpoint_file.name, len(records)) + for record in records: + record_hash = safe_str(record.get("__hash"), 64) + if record_hash: + if record_hash in seen_hashes: + continue + seen_hashes.add(record_hash) + merged.append(record) + + logger.info("[from-checkpoint] merged %d unique bill records", len(merged)) + return merged def resolve_data_files( self, @@ -519,7 +539,7 @@ class APSSyncer: customers_file = root / "customers.json" orders_file = root / "orders.json" order_details_file = root / "orderDetails.json" - bills_file = self.resolve_latest_bills_checkpoint(data_dir) if from_checkpoint else root / "bills.json" + bills_file = self.resolve_bills_checkpoint_dir(data_dir) if from_checkpoint else root / "bills.json" customer_details_file = root / "customerDetails.json" if from_checkpoint and sync_target != SYNC_TARGET_BILLS: @@ -1009,7 +1029,7 @@ class APSSyncer: if normalized_sync_target in {SYNC_TARGET_ALL, SYNC_TARGET_ORDERDETAILS}: raw_order_details = self.load_json_records(order_details_file) if normalized_sync_target in {SYNC_TARGET_ALL, SYNC_TARGET_BILLS}: - raw_bills = self.load_json_records(bills_file) + raw_bills = self.load_bills_checkpoint_records(bills_file) if from_checkpoint else self.load_json_records(bills_file) raw_customer_details: JsonList = [] try: