diff --git a/gui_config.py b/gui_config.py index 1df64be..bc69884 100644 --- a/gui_config.py +++ b/gui_config.py @@ -10,7 +10,8 @@ class ConfigGUI: def __init__(self): self.root = tk.Tk() self.root.title("配置管理 - 火炬之光自动化") - self.root.geometry("800x600") + self.root.geometry("1000x720") + self.root.minsize(900, 600) # 设置最小尺寸 self.selected_index = 0 self.setup_ui() @@ -102,7 +103,13 @@ class ConfigGUI: ttk.Button(save_frame, text="保存配置", command=self.save_config).pack(side=tk.LEFT, padx=5) ttk.Button(save_frame, text="启动预览", command=self.start_preview).pack(side=tk.LEFT, padx=5) - ttk.Button(save_frame, text="启动程序", command=self.start_program).pack(side=tk.LEFT, padx=5) + + # 启动程序按钮组 + start_frame = ttk.Frame(right_frame) + start_frame.pack(fill=tk.X, pady=5) + + ttk.Button(start_frame, text="启动单个配置组", command=self.start_program).pack(side=tk.LEFT, padx=5) + ttk.Button(start_frame, text="启动多个配置组", command=self.start_multi_program).pack(side=tk.LEFT, padx=5) def create_entry(self, parent, key, label, prefix=None): """创建输入框""" @@ -192,22 +199,57 @@ class ConfigGUI: def scan_cameras(self, max_index: int = 10): """扫描系统可用的采集卡索引,并填充下拉框""" + import warnings found = [] - for idx in range(max_index + 1): - cap = cv2.VideoCapture(idx, cv2.CAP_DSHOW) - if not cap.isOpened(): - # 回退默认后端再试 - cap = cv2.VideoCapture(idx) - if cap.isOpened(): - found.append(str(idx)) - cap.release() + + # 临时设置OpenCV日志级别 + import os + old_level = os.environ.get('OPENCV_LOG_LEVEL', '') + os.environ['OPENCV_LOG_LEVEL'] = 'ERROR' + cv2.setLogLevel(cv2.LOG_LEVEL_ERROR) + + try: + for idx in range(max_index + 1): + cap = None + try: + with warnings.catch_warnings(): + warnings.filterwarnings('ignore') + # 尝试DSHOW后端 + cap = cv2.VideoCapture(idx, cv2.CAP_DSHOW) + if not cap.isOpened(): + # 回退默认后端再试 + cap = cv2.VideoCapture(idx) + + if cap.isOpened(): + # 测试读取一帧,确保真正可用 + ret, test_frame = cap.read() + if ret and test_frame is not None: + found.append(str(idx)) + cap.release() + except Exception: + if cap: + try: + cap.release() + except: + pass + continue + finally: + # 恢复原来的日志级别 + if old_level: + os.environ['OPENCV_LOG_LEVEL'] = old_level + else: + os.environ.pop('OPENCV_LOG_LEVEL', None) + if not found: found = ["0"] # 至少给一个默认项,避免为空 + messagebox.showwarning("扫描完成", "未发现可用采集卡,已添加默认选项 0") + else: + messagebox.showinfo("扫描完成", f"发现可用采集卡索引: {', '.join(found)}") + self.camera_index_cb['values'] = found # 若当前无选择,则选择第一项 if not self.camera_index_var.get() and found: self.camera_index_cb.set(found[0]) - messagebox.showinfo("扫描完成", f"发现可用采集卡索引: {', '.join(found)}") def scan_ports(self): """扫描系统可用的串口,并填充下拉框""" @@ -375,12 +417,55 @@ class ConfigGUI: return result def start_program(self): - """启动主程序""" - # 保存配置 - if not self.save_config(): + """启动单个配置组的主程序""" + # 保存配置(静默) + if not self.save_config_silent(): + messagebox.showerror("错误", "配置保存失败") return - messagebox.showinfo("提示", "配置已保存,请运行主程序") + # 检查是否有活动配置组 + active_groups = [g for g in config_manager.config['groups'] if g.get('active', False)] + if not active_groups: + messagebox.showwarning("警告", "没有活动的配置组\n\n请先选择一个配置组并设置为活动") + return + + # 启动单个配置组 + import subprocess + import sys + try: + # 找到活动配置组的索引 + active_group = active_groups[0] + group_index = config_manager.config['groups'].index(active_group) + + subprocess.Popen([ + sys.executable, + "main_single.py", + str(group_index) + ], creationflags=subprocess.CREATE_NEW_CONSOLE if sys.platform == 'win32' else 0) + + messagebox.showinfo("成功", f"已启动配置组: {active_group['name']}\n\n请在控制台查看运行状态") + except Exception as e: + messagebox.showerror("错误", f"启动失败: {e}") + + def start_multi_program(self): + """启动多个配置组的主程序""" + # 保存配置(静默) + if not self.save_config_silent(): + messagebox.showerror("错误", "配置保存失败") + return + + # 启动多配置组管理器 + import subprocess + import sys + try: + subprocess.Popen([ + sys.executable, + "main_multi.py" + ], creationflags=subprocess.CREATE_NEW_CONSOLE if sys.platform == 'win32' else 0) + + messagebox.showinfo("提示", "多配置组启动器已打开\n\n请在控制台中选择要启动的配置组") + except Exception as e: + messagebox.showerror("错误", f"启动失败: {e}") def run(self): """运行GUI""" diff --git a/launcher.py b/launcher.py index d515ef7..c1512a9 100644 --- a/launcher.py +++ b/launcher.py @@ -22,7 +22,7 @@ def create_launcher(): """创建启动器界面""" root = tk.Tk() root.title("火炬之光自动化 - 启动器") - root.geometry("400x300") + root.geometry("400x380") root.resizable(False, False) # 标题 @@ -49,10 +49,21 @@ def create_launcher(): preview_btn.pack(pady=10) # 主程序按钮 - main_btn = tk.Button(button_frame, text="🚀 启动自动化", + main_btn = tk.Button(button_frame, text="🚀 启动单个配置组", command=start_main_program, - width=30, height=3, font=('Arial', 12)) - main_btn.pack(pady=10) + width=30, height=2, font=('Arial', 11)) + main_btn.pack(pady=5) + + def start_multi_program(): + """启动多配置组程序""" + subprocess.Popen([sys.executable, "main_multi.py"], + creationflags=subprocess.CREATE_NEW_CONSOLE if sys.platform == 'win32' else 0) + + # 多配置组启动按钮 + multi_btn = tk.Button(button_frame, text="🔥 启动多个配置组", + command=start_multi_program, + width=30, height=2, font=('Arial', 11)) + multi_btn.pack(pady=5) # 说明 info_label = tk.Label(root, diff --git a/main_multi.py b/main_multi.py new file mode 100644 index 0000000..c421d33 --- /dev/null +++ b/main_multi.py @@ -0,0 +1,145 @@ +""" +多配置组启动器 +支持同时启动多个配置组的自动化程序 +""" +import multiprocessing +import sys +from config import config_manager +from main_single import run_automation_for_group + +def main(): + """主函数""" + # 重新加载配置 + config_manager.load_config() + + # 获取所有配置组 + groups = config_manager.config.get('groups', []) + + if not groups: + print("❌ 没有找到任何配置组") + print("请先运行 gui_config.py 创建配置组") + return + + # 询问要启动哪些配置组 + print("=" * 60) + print("🔥 多配置组启动器") + print("=" * 60) + print("\n可用配置组:") + for i, group in enumerate(groups): + active_mark = "✓" if group.get('active', False) else " " + print(f" [{i}] {active_mark} {group['name']}") + print(f" 串口: {group['serial_port']} | 采集卡: {group['camera_index']}") + + print("\n选择启动方式:") + print(" 1. 启动所有活动配置组") + print(" 2. 启动所有配置组") + print(" 3. 选择特定配置组") + print(" 0. 退出") + + choice = input("\n请选择 (0-3): ").strip() + + selected_indices = [] + + if choice == "0": + print("👋 退出") + return + elif choice == "1": + # 启动所有活动配置组 + selected_indices = [i for i, g in enumerate(groups) if g.get('active', False)] + if not selected_indices: + print("❌ 没有活动的配置组") + return + print(f"\n✅ 将启动 {len(selected_indices)} 个活动配置组") + elif choice == "2": + # 启动所有配置组 + selected_indices = list(range(len(groups))) + print(f"\n✅ 将启动所有 {len(selected_indices)} 个配置组") + elif choice == "3": + # 选择特定配置组 + indices_input = input("请输入要启动的配置组索引(用逗号分隔,如: 0,1,2): ").strip() + try: + selected_indices = [int(x.strip()) for x in indices_input.split(',')] + # 验证索引有效性 + selected_indices = [i for i in selected_indices if 0 <= i < len(groups)] + if not selected_indices: + print("❌ 没有有效的配置组索引") + return + print(f"\n✅ 将启动 {len(selected_indices)} 个配置组") + except ValueError: + print("❌ 输入格式错误") + return + else: + print("❌ 无效选择") + return + + # 显示将要启动的配置组 + print("\n将要启动的配置组:") + for idx in selected_indices: + group = groups[idx] + print(f" • {group['name']} (串口:{group['serial_port']}, 采集卡:{group['camera_index']})") + + confirm = input("\n确认启动? (y/n): ").strip().lower() + if confirm != 'y': + print("❌ 取消启动") + return + + # 启动多进程 + print("\n🚀 开始启动多个配置组...") + processes = [] + + for idx in selected_indices: + group = groups[idx] + print(f"启动进程: {group['name']}...") + process = multiprocessing.Process( + target=run_automation_for_group, + args=(idx,), + name=f"Group-{idx}-{group['name']}" + ) + process.start() + processes.append((idx, group['name'], process)) + print(f"✅ {group['name']} 已启动 (PID: {process.pid})") + + print(f"\n✅ 成功启动 {len(processes)} 个配置组进程") + print("\n" + "=" * 60) + print("运行状态:") + print("=" * 60) + + # 监控进程状态 + try: + while True: + alive_count = 0 + for idx, name, proc in processes: + if proc.is_alive(): + alive_count += 1 + else: + print(f"⚠️ {name} 进程已退出 (退出码: {proc.exitcode})") + + if alive_count == 0: + print("\n所有进程已退出") + break + + import time + time.sleep(2) + + # 打印存活状态 + alive_names = [name for idx, name, proc in processes if proc.is_alive()] + if alive_names: + print(f"\r📊 运行中: {', '.join(alive_names)} ({alive_count}/{len(processes)})", end='', flush=True) + + except KeyboardInterrupt: + print("\n\n🛑 收到停止信号,正在关闭所有进程...") + for idx, name, proc in processes: + if proc.is_alive(): + print(f"正在停止 {name}...") + proc.terminate() + proc.join(timeout=5) + if proc.is_alive(): + print(f"强制停止 {name}...") + proc.kill() + print(f"✅ {name} 已停止") + print("\n👋 所有进程已停止") + +if __name__ == "__main__": + multiprocessing.freeze_support() # Windows下需要 + main() + diff --git a/main_single.py b/main_single.py new file mode 100644 index 0000000..edc6320 --- /dev/null +++ b/main_single.py @@ -0,0 +1,349 @@ +""" +单配置组运行的主程序 +用于在独立进程中运行单个配置组的自动化 +""" +import cv2 +from utils.get_image import GetImage +from utils.mouse import init_mouse_keyboard, Mouse_guiji +from ultralytics import YOLO +import time +import serial +import ch9329Comm +import random +import math +from utils import shizi +from config import config_manager +import sys + +def run_automation_for_group(group_index): + """为单个配置组运行自动化""" + # 重新加载配置 + config_manager.load_config() + group = config_manager.get_group_by_index(group_index) + + if group is None: + print(f"❌ 配置组索引 {group_index} 不存在") + return + + print(f"🚀 启动配置组: {group['name']} (索引: {group_index})") + print(f" 串口: {group['serial_port']}") + print(f" 采集卡: {group['camera_index']}") + + # 加载YOLO模型 + model = YOLO(r"best.pt").to('cuda') + model0 = YOLO(r"best0.pt").to('cuda') + + # 初始化串口和鼠标 + try: + init_mouse_keyboard(group) + except Exception as e: + print(f"❌ 初始化串口失败: {e}") + return + + # 初始化键盘和鼠标 + keyboard = ch9329Comm.keyboard.DataComm() + from utils.mouse import mouse + mouse_gui = Mouse_guiji() + + # 初始化采集卡 + get_image = GetImage( + cam_index=group['camera_index'], + width=group['camera_width'], + height=group['camera_height'] + ) + + # 检查采集卡是否初始化成功 + if get_image.cap is None: + print(f"❌ 采集卡 {group['camera_index']} 初始化失败") + return + + print(f"✅ 配置组 {group['name']} 初始化完成") + + # 全局变量 + left = 0 + top = 30 + k = 0 # 控制转圈的方向 + panduan = False # 是否在图内 + boss_pd = False # 是否到boss关卡 + rw = (632, 378) + v = group['move_velocity'] # 从配置读取移动速度 + + def yolo_shibie(im_PIL, detections, model): + results = model(im_PIL) + for result in results: + for i in range(len(result.boxes.xyxy)): + left, top, right, bottom = result.boxes.xyxy[i] + scalar_tensor = result.boxes.cls[i] + value = scalar_tensor.item() + label = result.names[int(value)] + if label == 'center' or label == 'next' or label == 'boss' or label == 'zhaozi': + player_x = int(left + (right - left) / 2) + player_y = int(top + (bottom - top) / 2) + 30 + RW = [player_x, player_y] + detections[label] = RW + elif label == 'daojv' or label == 'gw': + player_x = int(left + (right - left) / 2) + player_y = int(top + (bottom - top) / 2) + 30 + RW = [player_x, player_y] + detections[label].append(RW) + elif label == 'npc1' or label == 'npc2' or label == 'npc3' or label == 'npc4': + player_x = int(left + (right - left) / 2) + player_y = int(bottom) + 30 + RW = [player_x, player_y] + detections[label] = RW + return detections + + def sq(p1, p2): + """计算两点之间的欧式距离""" + return math.sqrt((p1[0] - p2[0]) ** 2 + (p1[1] - p2[1]) ** 2) + + def process_points(points): + if not points: + return None + + n = len(points) + if n == 1: + return points[0] + elif n == 2: + x = (points[0][0] + points[1][0]) / 2 + y = (points[0][1] + points[1][1]) / 2 + return [x, y] + else: + sample_points = random.sample(points, 3) + min_sum = float('inf') + best_point = None + for p in points: + dist_sum = sum(sq(p, sp) for sp in sample_points) + if dist_sum < min_sum: + min_sum = dist_sum + best_point = p + return best_point + + def move_randomly(rw, k): + k = k % 4 + suiji_t = float(random.randint(10, 13) / 10) + if k == 0: + keyboard.send_data("66") + time.sleep(suiji_t) + keyboard.release() + elif k == 1: + keyboard.send_data("88") + time.sleep(suiji_t) + keyboard.release() + elif k == 2: + keyboard.send_data("44") + time.sleep(suiji_t) + keyboard.release() + elif k == 3: + keyboard.send_data("22") + time.sleep(suiji_t) + keyboard.release() + return k + 1 + + def move_to(rw, mb): + """使用配置的v值""" + nonlocal v + v = group['move_velocity'] + if rw[0] >= mb[0]: + keyboard.send_data("44") + time.sleep(float(abs(rw[0] - mb[0]) / v)) + keyboard.release() + else: + keyboard.send_data("66") + time.sleep(float(abs(rw[0] - mb[0]) / v)) + keyboard.release() + + if rw[1] >= mb[1]: + keyboard.send_data("88") + time.sleep(float(abs(rw[1] - mb[1]) / v)) + keyboard.release() + else: + keyboard.send_data("22") + time.sleep(float(abs(rw[1] - mb[1]) / v)) + keyboard.release() + + # 主循环 + print(f"🔄 配置组 {group['name']} 开始自动化循环...") + while True: + try: + detections = { + 'center': None, + 'next': None, + 'npc1': None, + 'npc2': None, + 'npc3': None, + 'npc4': None, + 'boss': None, + 'daojv': [], + 'gw': [], + 'zhaozi': None + } + im_opencv = get_image.get_frame() + if im_opencv is None: + time.sleep(0.1) + continue + + detections = yolo_shibie(im_opencv[1], detections, model) + + if shizi.tuwai(im_opencv[0]): + im_opencv = get_image.get_frame() + if im_opencv is None: + time.sleep(0.1) + continue + detections = yolo_shibie(im_opencv[1], detections, model0) + print(f'[{group["name"]}] 当前在城镇中') + if detections['npc1'] is not None and sq(rw, detections['npc1']) > 80: + print(f"[{group['name']}] 向npc1靠近") + move_to(rw, detections['npc1']) + continue + elif detections['npc1'] is not None and sq(rw, detections['npc1']) <= 80: + print(f"[{group['name']}] 在npc旁边,向上走") + mb = (detections['npc1'][0], detections['npc1'][1] - 1010) + move_to(rw, mb) + continue + elif detections['npc3'] is not None and detections['npc4'] is None: + print(f"[{group['name']}] 在npc3旁边,向右走") + mb = (rw[0], detections['npc3'][1] - 50) + move_to(rw, mb) + mb = (rw[0] + 700, rw[1]) + move_to(rw, mb) + continue + elif detections['npc4'] is not None: + if sq(detections['npc4'], rw) < 50: + print(f"[{group['name']}] 离npc4很近 直接进入") + keyboard.send_data("DD") + time.sleep(0.15) + keyboard.release() + time.sleep(1) + im_opencv = get_image.get_frame() + if im_opencv and shizi.daoying(im_opencv[0]): + mouse_gui.send_data_absolute(rw[0], rw[1] - 110, may=1) + time.sleep(1) + continue + else: + print(f"[{group['name']}] 离npc4有点远 点击进入") + move_to(rw, detections['npc4']) + time.sleep(1) + im_opencv = get_image.get_frame() + if im_opencv and shizi.daoying(im_opencv[0]): + mouse_gui.send_data_absolute(rw[0], rw[1] - 110, may=1) + time.sleep(1) + continue + elif shizi.tiaozhan(im_opencv[0]): + print(f'[{group["name"]}] 进入塔4') + mouse_gui.send_data_absolute(left + 1100, top + 600, may=1) + time.sleep(0.3) + mouse_gui.send_data_absolute(left + 433, top + 455, may=1) + panduan = True + continue + elif shizi.jieshu(im_opencv[0]): + print(f'[{group["name"]}] 结束挑战') + mouse_gui.send_data_absolute(left + 542, top + 644, may=1) + time.sleep(0.8) + mouse_gui.send_data_absolute(left + 706, top + 454, may=1) + continue + elif panduan: + if shizi.shuzi(im_opencv[0]): + boss_pd = True + print(f"[{group['name']}] 进入到boss!!!!!!!!!!!!!!!!!!") + if shizi.fuhuo(im_opencv[0]): + print(f'[{group["name"]}] 点击复活') + mouse_gui.send_data_absolute(left + 536, top + 627, may=1) + mouse_gui.send_data_absolute(rw[0], rw[1], may=0) + continue + if detections['zhaozi'] is not None: + move_to(rw, detections['zhaozi']) + continue + if len(detections['daojv']) != 0: + move_to(rw, process_points(detections['daojv'])) + for i in range(3 + len(detections['daojv'])): + keyboard.send_data("AA") + time.sleep(0.15) + keyboard.release() + continue + if shizi.tuichu(im_opencv[0]) and detections['next'] is None and len(detections['daojv']) == 0 and len(detections['gw']) == 0 and boss_pd: + print(f"[{group['name']}] 识别到可以退出挑战!!!!!!!!!!!!!!!!!!") + for i in range(3): + time.sleep(0.5) + im_opencv = get_image.get_frame() + if im_opencv is None: + continue + detections = { + 'center': None, 'next': None, + 'npc1': None, 'npc2': None, 'npc3': None, 'npc4': None, + 'boss': None, 'zhaozi': None, + 'daojv': [], 'gw': [] + } + detections = yolo_shibie(im_opencv[1], detections, model) + if detections['next'] is not None or len(detections['daojv']) != 0 or len(detections['gw']) != 0 or detections['boss'] is not None: + break + else: + mouse_gui.send_data_absolute(left + 640, top + 40, may=1) + panduan = False + boss_pd = False + time.sleep(2.0) + continue + if detections['center'] is None and detections['next'] is None and (len(detections['gw']) != 0 or detections["boss"] is not None): + if len(detections['gw']) != 0: + move_to(rw, detections['gw'][0]) + time.sleep(0.26) + elif detections['boss'] is not None: + boss_suiji1 = random.randint(-30, 30) + boss_suiji2 = random.randint(-30, 30) + detections['boss'] = (detections['boss'][0] + boss_suiji1, detections['boss'][1] + boss_suiji2) + move_to(rw, detections['boss']) + time.sleep(0.7) + continue + elif (detections['center'] is not None and detections['next'] is None and len(detections['gw']) != 0) or (boss_pd == True and detections['center'] is not None and detections['next'] is None): + if detections['center'][0] >= rw[0] and detections['center'][1] < rw[1]: + mb = (rw[0] + 100, rw[1]) + elif detections['center'][0] <= rw[0] and detections['center'][1] < rw[1]: + mb = (rw[0], rw[1] - 100) + elif detections['center'][0] <= rw[0] and detections['center'][1] > rw[1]: + mb = (rw[0] - 100, rw[1]) + elif detections['center'][0] >= rw[0] and detections['center'][1] > rw[1]: + mb = (rw[0], rw[1] + 100) + move_to(rw, mb) + time.sleep(0.25) + continue + elif boss_pd == True and detections['center'] is None and detections['next'] is None: + k = move_randomly(rw, k) + continue + elif detections['next'] is not None: + print(f'[{group["name"]}] 进入下一层啦啦啦啦啦啦') + panduan = True + move_to(rw, detections['next']) + for i in range(2): + keyboard.send_data("DD") + time.sleep(0.15) + keyboard.release() + continue + else: + k = move_randomly(rw, k) + continue + elif shizi.daoying(im_opencv[0]): + mouse_gui.send_data_absolute(rw[0], rw[1] - 110, may=1) + time.sleep(1) + continue + except KeyboardInterrupt: + print(f"🛑 配置组 {group['name']} 停止运行") + break + except Exception as e: + print(f"❌ 配置组 {group['name']} 运行错误: {e}") + import traceback + traceback.print_exc() + time.sleep(1) + +if __name__ == "__main__": + if len(sys.argv) > 1: + group_index = int(sys.argv[1]) + run_automation_for_group(group_index) + else: + # 默认运行活动配置组 + active_group = config_manager.get_active_group() + if active_group is None: + print("❌ 没有活动的配置组") + sys.exit(1) + group_index = config_manager.config['groups'].index(active_group) + run_automation_for_group(group_index) + diff --git a/preview.py b/preview.py index c3694f4..251cf35 100644 --- a/preview.py +++ b/preview.py @@ -4,8 +4,14 @@ from tkinter import Canvas from PIL import Image, ImageTk import threading import numpy as np +import warnings +import os from config import config_manager +# 抑制OpenCV的警告信息 +os.environ['OPENCV_LOG_LEVEL'] = 'ERROR' +cv2.setLogLevel(cv2.LOG_LEVEL_ERROR) + class PreviewWindow: """采集卡预览窗口""" def __init__(self): @@ -31,31 +37,60 @@ class PreviewWindow: cam_idx = group['camera_index'] print(f" 尝试打开采集卡 {cam_idx} ({group['name']})...") - cap = cv2.VideoCapture(int(cam_idx), cv2.CAP_DSHOW) - if not cap.isOpened(): - print(f" DSHOW模式失败,尝试默认模式...") - cap = cv2.VideoCapture(int(cam_idx)) + cap = None + # 尝试多种后端打开 + backends_to_try = [ + (int(cam_idx), cv2.CAP_DSHOW), + (int(cam_idx), cv2.CAP_ANY), + (int(cam_idx), None), + ] - if cap.isOpened(): - cap.set(cv2.CAP_PROP_FRAME_WIDTH, group['camera_width']) - cap.set(cv2.CAP_PROP_FRAME_HEIGHT, group['camera_height']) - # 测试读取一帧 - ret, test_frame = cap.read() - if ret: - self.caps[i] = { - 'cap': cap, - 'group': group, - 'name': group['name'] - } - loaded_count += 1 - print(f" ✅ 采集卡 {cam_idx} 初始化成功") - else: - cap.release() - print(f" ❌ 采集卡 {cam_idx} 无法读取帧") + for idx, backend in backends_to_try: + try: + with warnings.catch_warnings(): + warnings.filterwarnings('ignore') + if backend is not None: + cap = cv2.VideoCapture(idx, backend) + else: + cap = cv2.VideoCapture(idx) + + if cap.isOpened(): + # 测试读取一帧 + ret, test_frame = cap.read() + if ret and test_frame is not None: + break + else: + cap.release() + cap = None + except Exception: + if cap: + try: + cap.release() + except: + pass + cap = None + continue + + if cap and cap.isOpened(): + try: + cap.set(cv2.CAP_PROP_FRAME_WIDTH, group['camera_width']) + cap.set(cv2.CAP_PROP_FRAME_HEIGHT, group['camera_height']) + except Exception as e: + print(f" ⚠️ 设置分辨率失败: {e}") + + self.caps[i] = { + 'cap': cap, + 'group': group, + 'name': group['name'] + } + loaded_count += 1 + print(f" ✅ 采集卡 {cam_idx} 初始化成功") else: print(f" ❌ 采集卡 {cam_idx} 无法打开") except Exception as e: print(f" ❌ 采集卡 {group.get('camera_index', '?')} 初始化失败: {e}") + import traceback + traceback.print_exc() if loaded_count == 0: print("⚠️ 警告:没有成功加载任何采集卡!") diff --git a/utils/get_image.py b/utils/get_image.py index 31f66b4..727c343 100644 --- a/utils/get_image.py +++ b/utils/get_image.py @@ -41,26 +41,76 @@ import cv2 # cv2.destroyAllWindows() import threading +import warnings + +# 抑制OpenCV的警告信息 +import os +os.environ['OPENCV_LOG_LEVEL'] = 'ERROR' +cv2.setLogLevel(cv2.LOG_LEVEL_ERROR) class GetImage: def __init__(self, cam_index=0, width=1920, height=1080): print(f"🔧 正在初始化采集卡 {cam_index}...") - self.cap = cv2.VideoCapture(cam_index, cv2.CAP_DSHOW) - if not self.cap.isOpened(): - print(f"⚠️ DSHOW模式失败,尝试默认模式...") - self.cap = cv2.VideoCapture(cam_index) - - if not self.cap.isOpened(): - print(f"❌ 无法打开采集卡 {cam_index}") - self.cap = None - return - - self.cap.set(cv2.CAP_PROP_FRAME_WIDTH, width) - self.cap.set(cv2.CAP_PROP_FRAME_HEIGHT, height) + self.cap = None self.frame = None self.running = True self.cam_index = cam_index + # 尝试多种方式打开采集卡 + backends_to_try = [ + (cam_index, cv2.CAP_DSHOW), + (cam_index, cv2.CAP_ANY), + (cam_index, None), # 默认后端 + ] + + for idx, backend in backends_to_try: + try: + with warnings.catch_warnings(): + warnings.filterwarnings('ignore', category=UserWarning) + if backend is not None: + self.cap = cv2.VideoCapture(idx, backend) + else: + self.cap = cv2.VideoCapture(idx) + + if self.cap.isOpened(): + # 测试读取一帧 + ret, test_frame = self.cap.read() + if ret and test_frame is not None: + print(f"✅ 采集卡 {cam_index} 打开成功") + break + else: + self.cap.release() + self.cap = None + except Exception as e: + if self.cap: + try: + self.cap.release() + except: + pass + self.cap = None + continue + + if self.cap is None or not self.cap.isOpened(): + print(f"❌ 无法打开采集卡 {cam_index}") + print("请检查:") + print(" 1. 采集卡是否正确连接") + print(" 2. 采集卡索引是否正确(尝试扫描采集卡)") + print(" 3. 采集卡驱动是否安装") + print(" 4. 采集卡是否被其他程序占用") + self.cap = None + return + + # 设置分辨率 + try: + self.cap.set(cv2.CAP_PROP_FRAME_WIDTH, width) + self.cap.set(cv2.CAP_PROP_FRAME_HEIGHT, height) + # 实际获取设置后的分辨率 + actual_width = int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH)) + actual_height = int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) + print(f" 分辨率设置: {width}x{height} -> 实际: {actual_width}x{actual_height}") + except Exception as e: + print(f"⚠️ 设置分辨率失败: {e}") + # 启动更新线程 threading.Thread(target=self.update, daemon=True).start() @@ -71,11 +121,18 @@ class GetImage: def update(self): while self.running and self.cap is not None: - ret, frame = self.cap.read() - if ret: - self.frame = frame - else: - print(f"⚠️ 采集卡 {self.cam_index} 读取失败") + try: + ret, frame = self.cap.read() + if ret and frame is not None: + self.frame = frame + else: + # 读取失败时不打印,避免刷屏 + pass + except Exception as e: + # 只在异常时打印错误 + print(f"⚠️ 采集卡 {self.cam_index} 读取异常: {e}") + import time + time.sleep(0.1) # 出错时短暂延迟 def get_frame(self): if self.cap is None or self.frame is None: