""" 单配置组运行的主程序 用于在独立进程中运行单个配置组的自动化 """ import cv2 from utils.get_image import GetImage from utils.mouse import init_mouse_keyboard, Mouse_guiji from ultralytics import YOLO import time import serial import ch9329Comm import random import math from utils import shizi from config import config_manager import sys def run_automation_for_group(group_index): """为单个配置组运行自动化""" # 重新加载配置 config_manager.load_config() group = config_manager.get_group_by_index(group_index) if group is None: print(f"❌ 配置组索引 {group_index} 不存在") return print(f"🚀 启动配置组: {group['name']} (索引: {group_index})") print(f" 串口: {group['serial_port']}") print(f" 采集卡: {group['camera_index']}") # 加载YOLO模型 model = YOLO(r"best.pt").to('cuda') model0 = YOLO(r"best0.pt").to('cuda') # 初始化串口和鼠标 try: init_mouse_keyboard(group) except Exception as e: print(f"❌ 初始化串口失败: {e}") return # 初始化键盘和鼠标 keyboard = ch9329Comm.keyboard.DataComm() from utils.mouse import mouse mouse_gui = Mouse_guiji() # 初始化采集卡 get_image = GetImage( cam_index=group['camera_index'], width=group['camera_width'], height=group['camera_height'] ) # 检查采集卡是否初始化成功 if get_image.cap is None: print(f"❌ 采集卡 {group['camera_index']} 初始化失败") return print(f"✅ 配置组 {group['name']} 初始化完成") # 全局变量 left = 0 top = 30 k = 0 # 控制转圈的方向 panduan = False # 是否在图内 boss_pd = False # 是否到boss关卡 rw = (632, 378) v = group['move_velocity'] # 从配置读取移动速度 def yolo_shibie(im_PIL, detections, model): try: results = model(im_PIL) for result in results: if result.boxes is None or len(result.boxes.xyxy) == 0: continue for i in range(len(result.boxes.xyxy)): try: left = float(result.boxes.xyxy[i][0]) top = float(result.boxes.xyxy[i][1]) right = float(result.boxes.xyxy[i][2]) bottom = float(result.boxes.xyxy[i][3]) cls_id = int(result.boxes.cls[i]) label = result.names[cls_id] if label == 'center' or label == 'next' or label == 'boss' or label == 'zhaozi': player_x = int(left + (right - left) / 2) + 3 player_y = int(top + (bottom - top) / 2) + 40 RW = [player_x, player_y] detections[label] = RW elif label == 'daojv' or label == 'gw': player_x = int(left + (right - left) / 2) + 3 player_y = int(top + (bottom - top) / 2) + 40 RW = [player_x, player_y] if label not in detections: detections[label] = [] detections[label].append(RW) elif label == 'npc1' or label == 'npc2' or label == 'npc3' or label == 'npc4': player_x = int(left + (right - left) / 2) player_y = int(bottom) + 30 RW = [player_x, player_y] detections[label] = RW except Exception as e: print(f"⚠️ 处理检测框时出错: {e}") continue except Exception as e: print(f"⚠️ YOLO检测出错: {e}") return detections def sq(p1, p2): """计算两点之间的欧式距离""" return math.sqrt((p1[0] - p2[0]) ** 2 + (p1[1] - p2[1]) ** 2) def process_points(points): if not points: return None n = len(points) if n == 1: return points[0] elif n == 2: x = (points[0][0] + points[1][0]) / 2 y = (points[0][1] + points[1][1]) / 2 return [x, y] else: sample_points = random.sample(points, 3) min_sum = float('inf') best_point = None for p in points: dist_sum = sum(sq(p, sp) for sp in sample_points) if dist_sum < min_sum: min_sum = dist_sum best_point = p return best_point def safe_keyboard_send(data, max_retries=3): """安全的键盘发送函数,带重试机制""" nonlocal keyboard for attempt in range(max_retries): try: keyboard.send_data(data) return True except (serial.serialutil.SerialException, PermissionError, OSError) as e: if attempt < max_retries - 1: print(f"⚠️ 键盘发送失败 (尝试 {attempt + 1}/{max_retries}): {e}") time.sleep(0.1 * (attempt + 1)) # 递增延迟 # 尝试重新初始化串口 try: if serial.ser and serial.ser.is_open: serial.ser.close() time.sleep(0.2) init_mouse_keyboard(group) # 重新创建keyboard对象 keyboard = ch9329Comm.keyboard.DataComm() except Exception as init_e: print(f"⚠️ 重新初始化串口失败: {init_e}") else: print(f"❌ 键盘发送失败,已重试 {max_retries} 次: {e}") raise return False def safe_keyboard_release(max_retries=3): """安全的键盘释放函数,带重试机制""" nonlocal keyboard for attempt in range(max_retries): try: keyboard.release() return True except (serial.serialutil.SerialException, PermissionError, OSError) as e: if attempt < max_retries - 1: time.sleep(0.1 * (attempt + 1)) # 尝试重新初始化串口 try: if serial.ser and serial.ser.is_open: serial.ser.close() time.sleep(0.2) init_mouse_keyboard(group) keyboard = ch9329Comm.keyboard.DataComm() except Exception as init_e: print(f"⚠️ 重新初始化串口失败: {init_e}") else: print(f"⚠️ 键盘释放失败: {e}") # 释放失败不算致命错误,继续执行 return False return False def move_randomly(rw, k): k = k % 4 suiji_t = float(random.randint(10, 13) / 10) if k == 0: if safe_keyboard_send("66"): time.sleep(suiji_t) safe_keyboard_release() elif k == 1: if safe_keyboard_send("88"): time.sleep(suiji_t) safe_keyboard_release() elif k == 2: if safe_keyboard_send("44"): time.sleep(suiji_t) safe_keyboard_release() elif k == 3: if safe_keyboard_send("22"): time.sleep(suiji_t) safe_keyboard_release() return k + 1 def move_to(rw, mb): """使用配置的v值""" nonlocal v v = group['move_velocity'] if rw[0] >= mb[0]: if safe_keyboard_send("44"): time.sleep(float(abs(rw[0] - mb[0]) / v)) safe_keyboard_release() else: if safe_keyboard_send("66"): time.sleep(float(abs(rw[0] - mb[0]) / v)) safe_keyboard_release() if rw[1] >= mb[1]: if safe_keyboard_send("88"): time.sleep(float(abs(rw[1] - mb[1]) / v)) safe_keyboard_release() else: if safe_keyboard_send("22"): time.sleep(float(abs(rw[1] - mb[1]) / v)) safe_keyboard_release() # 主循环 print(f"🔄 配置组 {group['name']} 开始自动化循环...") while True: try: detections = { 'center': None, 'next': None, 'npc1': None, 'npc2': None, 'npc3': None, 'npc4': None, 'boss': None, 'daojv': [], 'gw': [], 'zhaozi': None } im_opencv = get_image.get_frame() if im_opencv is None: time.sleep(0.1) continue detections = yolo_shibie(im_opencv[1], detections, model) if shizi.tuwai(im_opencv[0]): im_opencv = get_image.get_frame() if im_opencv is None: time.sleep(0.1) continue detections = yolo_shibie(im_opencv[1], detections, model0) print(f'[{group["name"]}] 当前在城镇中') if detections['npc1'] is not None and sq(rw, detections['npc1']) > 80: print(f"[{group['name']}] 向npc1靠近") move_to(rw, detections['npc1']) continue elif detections['npc1'] is not None and sq(rw, detections['npc1']) <= 80: print(f"[{group['name']}] 在npc旁边,向上走") mb = (detections['npc1'][0], detections['npc1'][1] - 1010) move_to(rw, mb) continue elif detections['npc3'] is not None and detections['npc4'] is None: print(f"[{group['name']}] 在npc3旁边,向右走") mb = (rw[0], detections['npc3'][1] - 50) move_to(rw, mb) mb = (rw[0] + 700, rw[1]) move_to(rw, mb) continue elif detections['npc4'] is not None: if sq(detections['npc4'], rw) < 50: print(f"[{group['name']}] 离npc4很近 直接进入") if safe_keyboard_send("DD"): time.sleep(0.15) safe_keyboard_release() time.sleep(1) im_opencv = get_image.get_frame() if im_opencv and shizi.daoying(im_opencv[0]): mouse_gui.send_data_absolute(rw[0], rw[1] - 110, may=1) time.sleep(1) continue else: print(f"[{group['name']}] 离npc4有点远 点击进入") move_to(rw, detections['npc4']) time.sleep(1) im_opencv = get_image.get_frame() if im_opencv and shizi.daoying(im_opencv[0]): mouse_gui.send_data_absolute(rw[0], rw[1] - 110, may=1) time.sleep(1) continue elif shizi.tiaozhan(im_opencv[0]): print(f'[{group["name"]}] 进入塔4') mouse_gui.send_data_absolute(left + 1100, top + 600, may=1) time.sleep(0.3) mouse_gui.send_data_absolute(left + 433, top + 455, may=1) panduan = True continue elif shizi.jieshu(im_opencv[0]): print(f'[{group["name"]}] 结束挑战') mouse_gui.send_data_absolute(left + 542, top + 644, may=1) time.sleep(0.8) mouse_gui.send_data_absolute(left + 706, top + 454, may=1) continue elif panduan: if shizi.shuzi(im_opencv[0]): boss_pd = True print(f"[{group['name']}] 进入到boss!!!!!!!!!!!!!!!!!!") if shizi.fuhuo(im_opencv[0]): print(f'[{group["name"]}] 点击复活') mouse_gui.send_data_absolute(left + 536, top + 627, may=1) mouse_gui.send_data_absolute(rw[0], rw[1], may=0) continue if detections['zhaozi'] is not None: move_to(rw, detections['zhaozi']) continue if len(detections['daojv']) != 0: move_to(rw, process_points(detections['daojv'])) for i in range(3 + len(detections['daojv'])): if safe_keyboard_send("AA"): time.sleep(0.15) safe_keyboard_release() continue if shizi.tuichu(im_opencv[0]) and detections['next'] is None and len(detections['daojv']) == 0 and len(detections['gw']) == 0 and boss_pd: print(f"[{group['name']}] 识别到可以退出挑战!!!!!!!!!!!!!!!!!!") for i in range(3): time.sleep(0.5) im_opencv = get_image.get_frame() if im_opencv is None: continue detections = { 'center': None, 'next': None, 'npc1': None, 'npc2': None, 'npc3': None, 'npc4': None, 'boss': None, 'zhaozi': None, 'daojv': [], 'gw': [] } detections = yolo_shibie(im_opencv[1], detections, model) if detections['next'] is not None or len(detections['daojv']) != 0 or len(detections['gw']) != 0 or detections['boss'] is not None: break else: mouse_gui.send_data_absolute(left + 640, top + 40, may=1) panduan = False boss_pd = False time.sleep(2.0) continue if detections['center'] is None and detections['next'] is None and (len(detections['gw']) != 0 or detections["boss"] is not None): if len(detections['gw']) != 0: move_to(rw, detections['gw'][0]) time.sleep(0.26) elif detections['boss'] is not None: boss_suiji1 = random.randint(-30, 30) boss_suiji2 = random.randint(-30, 30) detections['boss'] = (detections['boss'][0] + boss_suiji1, detections['boss'][1] + boss_suiji2) move_to(rw, detections['boss']) time.sleep(0.7) continue elif (detections['center'] is not None and detections['next'] is None and len(detections['gw']) != 0) or (boss_pd == True and detections['center'] is not None and detections['next'] is None): if detections['center'][0] >= rw[0] and detections['center'][1] < rw[1]: mb = (rw[0] + 100, rw[1]) elif detections['center'][0] <= rw[0] and detections['center'][1] < rw[1]: mb = (rw[0], rw[1] - 100) elif detections['center'][0] <= rw[0] and detections['center'][1] > rw[1]: mb = (rw[0] - 100, rw[1]) elif detections['center'][0] >= rw[0] and detections['center'][1] > rw[1]: mb = (rw[0], rw[1] + 100) move_to(rw, mb) time.sleep(0.25) continue elif boss_pd == True and detections['center'] is None and detections['next'] is None: k = move_randomly(rw, k) continue elif detections['next'] is not None: print(f'[{group["name"]}] 进入下一层啦啦啦啦啦啦') panduan = True move_to(rw, detections['next']) for i in range(2): if safe_keyboard_send("DD"): time.sleep(0.15) safe_keyboard_release() continue else: k = move_randomly(rw, k) continue elif shizi.daoying(im_opencv[0]): mouse_gui.send_data_absolute(rw[0], rw[1] - 110, may=1) time.sleep(1) continue except KeyboardInterrupt: print(f"🛑 配置组 {group['name']} 停止运行") break except Exception as e: print(f"❌ 配置组 {group['name']} 运行错误: {e}") import traceback traceback.print_exc() time.sleep(1) # 清理资源 try: if serial.ser and serial.ser.is_open: serial.ser.close() print(f"✅ 配置组 {group['name']} 串口已关闭") except Exception as e: print(f"⚠️ 关闭串口时出错: {e}") if __name__ == "__main__": if len(sys.argv) > 1: group_index = int(sys.argv[1]) run_automation_for_group(group_index) else: # 默认运行活动配置组 active_group = config_manager.get_active_group() if active_group is None: print("❌ 没有活动的配置组") sys.exit(1) group_index = config_manager.config['groups'].index(active_group) run_automation_for_group(group_index)