checkpoint断点爬取

This commit is contained in:
ray
2026-04-27 14:29:09 +08:00
parent 2e4ce07340
commit a008d416a3

View File

@@ -491,20 +491,40 @@ class APSSyncer:
return [cast(JsonDict, record) for record in data_list if isinstance(record, dict)]
return []
def resolve_latest_bills_checkpoint(self, data_dir: str) -> Path:
def resolve_bills_checkpoint_dir(self, data_dir: str) -> Path:
current_root = Path(data_dir)
checkpoint_root = current_root.parent / "checkpoints" / "bills"
if not checkpoint_root.exists() or not checkpoint_root.is_dir():
raise FileNotFoundError(f"Bills checkpoint directory not found: {checkpoint_root}")
candidates = sorted(
checkpoint_root.glob("*.json"),
key=lambda item: item.stat().st_mtime,
reverse=True,
)
candidates = sorted(checkpoint_root.glob("*.json"))
if not candidates:
raise FileNotFoundError(f"No bills checkpoint file found in: {checkpoint_root}")
return candidates[0]
return checkpoint_root
def load_bills_checkpoint_records(self, checkpoint_root: Path) -> JsonList:
candidates = sorted(
checkpoint_root.glob("*.json"),
key=lambda item: item.name,
)
logger.info("[from-checkpoint] using checkpoint dir: %s", checkpoint_root)
logger.info("[from-checkpoint] found %d checkpoint files", len(candidates))
merged: JsonList = []
seen_hashes: set[str] = set()
for checkpoint_file in candidates:
records = self.load_json_records(checkpoint_file)
logger.info("[from-checkpoint] loading %s (%d records)", checkpoint_file.name, len(records))
for record in records:
record_hash = safe_str(record.get("__hash"), 64)
if record_hash:
if record_hash in seen_hashes:
continue
seen_hashes.add(record_hash)
merged.append(record)
logger.info("[from-checkpoint] merged %d unique bill records", len(merged))
return merged
def resolve_data_files(
self,
@@ -519,7 +539,7 @@ class APSSyncer:
customers_file = root / "customers.json"
orders_file = root / "orders.json"
order_details_file = root / "orderDetails.json"
bills_file = self.resolve_latest_bills_checkpoint(data_dir) if from_checkpoint else root / "bills.json"
bills_file = self.resolve_bills_checkpoint_dir(data_dir) if from_checkpoint else root / "bills.json"
customer_details_file = root / "customerDetails.json"
if from_checkpoint and sync_target != SYNC_TARGET_BILLS:
@@ -1009,7 +1029,7 @@ class APSSyncer:
if normalized_sync_target in {SYNC_TARGET_ALL, SYNC_TARGET_ORDERDETAILS}:
raw_order_details = self.load_json_records(order_details_file)
if normalized_sync_target in {SYNC_TARGET_ALL, SYNC_TARGET_BILLS}:
raw_bills = self.load_json_records(bills_file)
raw_bills = self.load_bills_checkpoint_records(bills_file) if from_checkpoint else self.load_json_records(bills_file)
raw_customer_details: JsonList = []
try: