diff --git a/main_multi.py b/main_multi.py index 41b9efd..b7b2208 100644 --- a/main_multi.py +++ b/main_multi.py @@ -81,6 +81,21 @@ def main(): for idx in selected_indices: group = groups[idx] logger.info(f" • {group['name']} (串口:{group['serial_port']}, 采集卡:{group['camera_index']})") + + # 串口冲突预检:同一串口被多个组占用通常会导致仅一路成功 + port_to_groups = {} + for idx in selected_indices: + g = groups[idx] + port_to_groups.setdefault(g['serial_port'], []).append(g['name']) + conflicts = {p: names for p, names in port_to_groups.items() if p and len(names) > 1} + if conflicts: + logger.warning("⚠️ 检测到串口冲突(同一COM被多个组使用):") + for p, names in conflicts.items(): + logger.warning(f" {p}: {', '.join(names)}") + go_on = input("上述冲突很可能导致仅一组成功,其它失败。仍要继续? (y/n): ").strip().lower() + if go_on != 'y': + logger.info("已取消启动以避免串口冲突") + return confirm = input("\n确认启动? (y/n): ").strip().lower() if confirm != 'y': diff --git a/test_capture_card.py b/test_capture_card.py new file mode 100644 index 0000000..9510d07 --- /dev/null +++ b/test_capture_card.py @@ -0,0 +1,796 @@ +""" +采集卡截图测试类 +用于测试采集卡的分辨率和色差 +保持与实际代码相同的实现逻辑 +""" +import time +import threading +import warnings +import os +import sys +import io +import cv2 +import numpy as np +from PIL import Image +from utils.logger import logger, throttle +import logging + +# 抑制OpenCV的警告信息(兼容不同版本) +os.environ['OPENCV_LOG_LEVEL'] = 'SILENT' +os.environ['OPENCV_IO_ENABLE_OPENEXR'] = '0' + +try: + if hasattr(cv2, 'setLogLevel'): + if hasattr(cv2, 'LOG_LEVEL_SILENT'): + cv2.setLogLevel(cv2.LOG_LEVEL_SILENT) + elif hasattr(cv2, 'LOG_LEVEL_ERROR'): + cv2.setLogLevel(cv2.LOG_LEVEL_ERROR) + elif hasattr(cv2, 'utils'): + cv2.utils.setLogLevel(0) +except Exception: + pass + + +class CaptureCardTester: + """ + 采集卡测试类 + 使用与实际代码相同的实现逻辑 + """ + + def __init__(self, cam_index=0, width=1920, height=1080): + """ + 初始化采集卡测试器 + :param cam_index: 采集卡索引 + :param width: 期望宽度 + :param height: 期望高度 + """ + logger.info(f"🔧 正在初始化采集卡测试器 {cam_index}...") + self.cap = None + self.frame = None + self.running = True + self.cam_index = cam_index + self.expected_width = width + self.expected_height = height + self.actual_width = None + self.actual_height = None + + # 尝试多种方式打开采集卡(与实际代码相同) + backends_to_try = [ + (cam_index, cv2.CAP_DSHOW), + (cam_index, cv2.CAP_ANY), + (cam_index, None), # 默认后端 + ] + + # 重定向stderr来抑制OpenCV的错误输出 + old_stderr = sys.stderr + suppressed_output = io.StringIO() + + try: + sys.stderr = suppressed_output + + for idx, backend in backends_to_try: + try: + with warnings.catch_warnings(): + warnings.filterwarnings('ignore', category=UserWarning) + if backend is not None: + self.cap = cv2.VideoCapture(idx, backend) + else: + self.cap = cv2.VideoCapture(idx) + + if self.cap.isOpened(): + # 测试读取一帧 + ret, test_frame = self.cap.read() + if ret and test_frame is not None: + logger.info(f"✅ 采集卡 {cam_index} 打开成功") + break + else: + self.cap.release() + self.cap = None + except Exception as e: + if self.cap: + try: + self.cap.release() + except: + pass + self.cap = None + continue + finally: + # 恢复stderr + sys.stderr = old_stderr + + if self.cap is None or not self.cap.isOpened(): + logger.error(f"❌ 无法打开采集卡 {cam_index}") + logger.error("请检查:\n 1. 采集卡是否正确连接\n 2. 采集卡索引是否正确(尝试扫描采集卡)\n 3. 采集卡驱动是否安装\n 4. 采集卡是否被其他程序占用") + self.cap = None + return + + # 设置分辨率(与实际代码相同) + try: + self.cap.set(cv2.CAP_PROP_FRAME_WIDTH, width) + self.cap.set(cv2.CAP_PROP_FRAME_HEIGHT, height) + # 实际获取设置后的分辨率 + self.actual_width = int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH)) + self.actual_height = int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) + logger.info(f" 分辨率设置: {width}x{height} -> 实际: {self.actual_width}x{self.actual_height}") + except Exception as e: + logger.warning(f"⚠️ 设置分辨率失败: {e}") + + # 启动更新线程(与实际代码相同) + threading.Thread(target=self.update, daemon=True).start() + + # 等待几帧确保采集卡正常工作 + time.sleep(1.0) + logger.info(f"✅ 采集卡 {cam_index} 初始化完成") + + def update(self): + """持续更新帧(与实际代码相同)""" + while self.running and self.cap is not None: + try: + ret, frame = self.cap.read() + if ret and frame is not None: + self.frame = frame + # 限制读取频率,避免占满CPU + time.sleep(0.008) + else: + # 读取失败时不打印,避免刷屏 + time.sleep(0.02) + except Exception as e: + # 只在异常时打印错误 + throttle(f"cap_read_err_{self.cam_index}", 2.0, logging.WARNING, f"⚠️ 采集卡 {self.cam_index} 读取异常: {e}") + time.sleep(0.1) # 出错时短暂延迟 + + def get_frame(self): + """ + 获取处理后的帧(与实际代码相同) + 返回: [im_opencv, im_PIL] 或 None + """ + if self.cap is None or self.frame is None: + return None + try: + im_opencv = cv2.cvtColor(self.frame, cv2.COLOR_BGR2RGB) + im_opencv = im_opencv[30:30+720, 0:1280] # 裁剪尺寸(与实际代码相同) + im_PIL = Image.fromarray(im_opencv) + return [im_opencv, im_PIL] + except Exception as e: + throttle(f"img_proc_err_{self.cam_index}", 2.0, logging.WARNING, f"⚠️ 图像处理错误: {e}") + return None + + def get_raw_frame(self): + """ + 获取原始帧(未裁剪) + 返回: numpy array 或 None + """ + if self.cap is None or self.frame is None: + return None + return self.frame.copy() + + def test_resolution(self): + """ + 测试分辨率 + 返回分辨率信息字典 + """ + if self.cap is None: + logger.error("采集卡未初始化") + return None + + result = { + 'expected': (self.expected_width, self.expected_height), + 'actual_cap': (self.actual_width, self.actual_height), + 'actual_frame': None, + 'cropped_frame': None, + 'match': False + } + + # 获取原始帧尺寸 + raw_frame = self.get_raw_frame() + if raw_frame is not None: + h, w = raw_frame.shape[:2] + result['actual_frame'] = (w, h) + + # 获取裁剪后帧尺寸 + processed = self.get_frame() + if processed is not None: + im_opencv = processed[0] + h, w = im_opencv.shape[:2] + result['cropped_frame'] = (w, h) + + # 检查分辨率是否匹配 + if result['actual_cap'] is not None: + result['match'] = (result['actual_cap'][0] == self.expected_width and + result['actual_cap'][1] == self.expected_height) + + return result + + def test_color_difference(self, frame1=None, frame2=None): + """ + 测试色差 + :param frame1: 第一帧(可选,不提供则使用当前帧) + :param frame2: 第二帧(可选,不提供则等待一帧后获取) + :return: 色差信息字典 + """ + if frame1 is None: + frame1 = self.get_frame() + if frame1 is None: + logger.error("无法获取第一帧") + return None + frame1 = frame1[0] # 使用opencv格式 + + if frame2 is None: + # 等待一小段时间获取新帧 + time.sleep(0.1) + frame2 = self.get_frame() + if frame2 is None: + logger.error("无法获取第二帧") + return None + frame2 = frame2[0] # 使用opencv格式 + + # 确保两帧尺寸相同 + if frame1.shape != frame2.shape: + logger.warning(f"两帧尺寸不同: {frame1.shape} vs {frame2.shape}") + # 调整尺寸 + h, w = min(frame1.shape[0], frame2.shape[0]), min(frame1.shape[1], frame2.shape[1]) + frame1 = frame1[:h, :w] + frame2 = frame2[:h, :w] + + # 计算色差 + diff = cv2.absdiff(frame1, frame2) + diff_gray = cv2.cvtColor(diff, cv2.COLOR_RGB2GRAY) + + # 计算统计信息 + mean_diff = np.mean(diff) + std_diff = np.std(diff) + max_diff = np.max(diff) + mean_diff_gray = np.mean(diff_gray) + + # 计算RGB各通道的平均色差 + mean_diff_r = np.mean(diff[:, :, 0]) + mean_diff_g = np.mean(diff[:, :, 1]) + mean_diff_b = np.mean(diff[:, :, 2]) + + # 计算PSNR(峰值信噪比) + mse = np.mean((frame1.astype(float) - frame2.astype(float)) ** 2) + if mse == 0: + psnr = float('inf') + else: + psnr = 20 * np.log10(255.0 / np.sqrt(mse)) + + result = { + 'mean_diff': float(mean_diff), + 'std_diff': float(std_diff), + 'max_diff': int(max_diff), + 'mean_diff_gray': float(mean_diff_gray), + 'mean_diff_r': float(mean_diff_r), + 'mean_diff_g': float(mean_diff_g), + 'mean_diff_b': float(mean_diff_b), + 'psnr': float(psnr), + 'mse': float(mse), + 'diff_image': diff, + 'diff_gray': diff_gray + } + + return result + + def test_color_stability(self, num_frames=10, interval=0.1): + """ + 测试颜色稳定性(连续多帧的色差) + :param num_frames: 测试帧数 + :param interval: 帧间隔(秒) + :return: 稳定性统计信息 + """ + frames = [] + logger.info(f"开始采集 {num_frames} 帧用于稳定性测试...") + + for i in range(num_frames): + frame = self.get_frame() + if frame is None: + logger.warning(f"无法获取第 {i+1} 帧") + continue + frames.append(frame[0]) # 使用opencv格式 + if i < num_frames - 1: + time.sleep(interval) + + if len(frames) < 2: + logger.error("采集的帧数不足") + return None + + # 计算所有帧之间的平均色差 + all_diffs = [] + for i in range(len(frames) - 1): + diff_result = self.test_color_difference(frames[i], frames[i+1]) + if diff_result: + all_diffs.append(diff_result['mean_diff']) + + if not all_diffs: + return None + + result = { + 'num_frames': len(frames), + 'avg_mean_diff': float(np.mean(all_diffs)), + 'std_mean_diff': float(np.std(all_diffs)), + 'min_mean_diff': float(np.min(all_diffs)), + 'max_mean_diff': float(np.max(all_diffs)), + 'all_diffs': [float(d) for d in all_diffs] + } + + return result + + def print_resolution_test(self): + """打印分辨率测试结果""" + print("\n" + "="*60) + print("分辨率测试结果") + print("="*60) + result = self.test_resolution() + if result is None: + print("❌ 测试失败:采集卡未初始化") + return + + print(f"期望分辨率: {result['expected'][0]} x {result['expected'][1]}") + if result['actual_cap']: + print(f"实际分辨率(采集卡): {result['actual_cap'][0]} x {result['actual_cap'][1]}") + match_str = "✅ 匹配" if result['match'] else "❌ 不匹配" + print(f"分辨率匹配: {match_str}") + + if result['actual_frame']: + print(f"实际分辨率(原始帧): {result['actual_frame'][0]} x {result['actual_frame'][1]}") + + if result['cropped_frame']: + print(f"裁剪后分辨率: {result['cropped_frame'][0]} x {result['cropped_frame'][1]}") + print("="*60 + "\n") + + def print_color_test(self): + """打印色差测试结果""" + print("\n" + "="*60) + print("色差测试结果(两帧对比)") + print("="*60) + result = self.test_color_difference() + if result is None: + print("❌ 测试失败:无法获取帧") + return + + print(f"平均色差: {result['mean_diff']:.2f}") + print(f"色差标准差: {result['std_diff']:.2f}") + print(f"最大色差: {result['max_diff']}") + print(f"灰度平均色差: {result['mean_diff_gray']:.2f}") + print(f"\nRGB通道平均色差:") + print(f" R通道: {result['mean_diff_r']:.2f}") + print(f" G通道: {result['mean_diff_g']:.2f}") + print(f" B通道: {result['mean_diff_b']:.2f}") + print(f"\nPSNR (峰值信噪比): {result['psnr']:.2f} dB") + print(f"MSE (均方误差): {result['mse']:.2f}") + print("="*60 + "\n") + + def print_stability_test(self, num_frames=10): + """打印稳定性测试结果""" + print("\n" + "="*60) + print(f"颜色稳定性测试结果({num_frames}帧)") + print("="*60) + result = self.test_color_stability(num_frames) + if result is None: + print("❌ 测试失败:无法获取足够的帧") + return + + print(f"测试帧数: {result['num_frames']}") + print(f"平均色差均值: {result['avg_mean_diff']:.2f}") + print(f"色差标准差: {result['std_mean_diff']:.2f}") + print(f"最小色差: {result['min_mean_diff']:.2f}") + print(f"最大色差: {result['max_mean_diff']:.2f}") + print(f"\n各帧间色差: {[f'{d:.2f}' for d in result['all_diffs']]}") + print("="*60 + "\n") + + def save_test_images(self, save_dir="test_output"): + """保存测试图像""" + os.makedirs(save_dir, exist_ok=True) + + # 保存原始帧 + raw_frame = self.get_raw_frame() + if raw_frame is not None: + cv2.imwrite(os.path.join(save_dir, "raw_frame.jpg"), raw_frame) + logger.info(f"已保存原始帧: {save_dir}/raw_frame.jpg") + + # 保存处理后的帧 + processed = self.get_frame() + if processed is not None: + cv2.imwrite(os.path.join(save_dir, "processed_frame.jpg"), cv2.cvtColor(processed[0], cv2.COLOR_RGB2BGR)) + logger.info(f"已保存处理后帧: {save_dir}/processed_frame.jpg") + + # 保存色差图 + color_diff = self.test_color_difference() + if color_diff is not None: + cv2.imwrite(os.path.join(save_dir, "color_diff.jpg"), cv2.cvtColor(color_diff['diff_image'], cv2.COLOR_RGB2BGR)) + cv2.imwrite(os.path.join(save_dir, "color_diff_gray.jpg"), color_diff['diff_gray']) + logger.info(f"已保存色差图: {save_dir}/color_diff.jpg") + logger.info(f"已保存灰度色差图: {save_dir}/color_diff_gray.jpg") + + def release(self): + """释放资源(与实际代码相同)""" + self.running = False + time.sleep(0.2) + if self.cap is not None: + self.cap.release() + cv2.destroyAllWindows() + logger.info("🔚 采集卡已释放") + + def __del__(self): + """析构函数(与实际代码相同)""" + if hasattr(self, "cap") and self.cap is not None: + try: + self.release() + except: + pass + + +class MultiCaptureCardTester: + """ + 多采集卡测试管理器 + 支持同时测试多张采集卡 + """ + + def __init__(self): + """初始化多采集卡测试管理器""" + self.testers = {} # {cam_index: CaptureCardTester} + self.config = None + + def load_from_config(self): + """从配置文件加载采集卡""" + try: + from config import config_manager + config_manager.load_config() + self.config = config_manager.config + + groups = self.config.get('groups', []) + if not groups: + logger.warning("配置文件中没有找到配置组") + return [] + + # 获取所有配置组中的采集卡 + camera_configs = [] + for group in groups: + cam_idx = group.get('camera_index') + cam_width = group.get('camera_width', 1920) + cam_height = group.get('camera_height', 1080) + name = group.get('name', f"配置组{groups.index(group)}") + + # 检查是否已存在相同索引的采集卡 + if cam_idx not in [c['index'] for c in camera_configs]: + camera_configs.append({ + 'index': cam_idx, + 'width': cam_width, + 'height': cam_height, + 'name': name, + 'group': group + }) + + return camera_configs + except Exception as e: + logger.error(f"从配置加载失败: {e}") + return [] + + def add_camera(self, cam_index, width=1920, height=1080, name=None): + """ + 添加一张采集卡到测试列表 + :param cam_index: 采集卡索引 + :param width: 宽度 + :param height: 高度 + :param name: 采集卡名称(可选) + """ + if name is None: + name = f"采集卡{cam_index}" + + if cam_index in self.testers: + logger.warning(f"采集卡 {cam_index} 已存在,将重新初始化") + self.testers[cam_index].release() + + tester = CaptureCardTester(cam_index=cam_index, width=width, height=height) + if tester.cap is not None: + tester.name = name + self.testers[cam_index] = tester + return True + else: + logger.error(f"无法初始化采集卡 {cam_index}") + return False + + def initialize_from_config(self, use_all_groups=True): + """ + 从配置初始化所有采集卡 + :param use_all_groups: 是否使用所有配置组,False则只使用活动配置组 + """ + camera_configs = self.load_from_config() + + if not camera_configs: + logger.warning("没有找到可用的采集卡配置") + return False + + # 筛选配置组 + if not use_all_groups: + camera_configs = [c for c in camera_configs if c.get('group', {}).get('active', False)] + if not camera_configs: + logger.warning("没有活动的配置组") + return False + + logger.info(f"📷 找到 {len(camera_configs)} 张采集卡配置") + + success_count = 0 + for config in camera_configs: + if self.add_camera( + cam_index=config['index'], + width=config['width'], + height=config['height'], + name=config['name'] + ): + success_count += 1 + + logger.info(f"✅ 成功初始化 {success_count}/{len(camera_configs)} 张采集卡") + return success_count > 0 + + def test_all_resolution(self): + """测试所有采集卡的分辨率""" + print("\n" + "="*60) + print("所有采集卡分辨率测试") + print("="*60) + + results = {} + for cam_index, tester in self.testers.items(): + name = getattr(tester, 'name', f"采集卡{cam_index}") + print(f"\n【{name} (索引: {cam_index})】") + result = tester.test_resolution() + results[cam_index] = result + + # 汇总结果 + print("\n" + "-"*60) + print("分辨率测试汇总") + print("-"*60) + for cam_index, result in results.items(): + if result is None: + continue + name = getattr(self.testers[cam_index], 'name', f"采集卡{cam_index}") + match_str = "✅" if result.get('match', False) else "❌" + print(f"{match_str} {name}: 期望{result['expected']} -> 实际{result.get('actual_cap', 'N/A')}") + + return results + + def test_all_color(self): + """测试所有采集卡的色差""" + print("\n" + "="*60) + print("所有采集卡色差测试") + print("="*60) + + results = {} + for cam_index, tester in self.testers.items(): + name = getattr(tester, 'name', f"采集卡{cam_index}") + print(f"\n【{name} (索引: {cam_index})】") + result = tester.test_color_difference() + results[cam_index] = result + + # 汇总结果 + print("\n" + "-"*60) + print("色差测试汇总") + print("-"*60) + for cam_index, result in results.items(): + if result is None: + continue + name = getattr(self.testers[cam_index], 'name', f"采集卡{cam_index}") + print(f"{name}: 平均色差={result['mean_diff']:.2f}, PSNR={result['psnr']:.2f}dB") + + return results + + def test_all_stability(self, num_frames=10): + """测试所有采集卡的稳定性""" + print("\n" + "="*60) + print("所有采集卡稳定性测试") + print("="*60) + + results = {} + for cam_index, tester in self.testers.items(): + name = getattr(tester, 'name', f"采集卡{cam_index}") + print(f"\n【{name} (索引: {cam_index})】") + result = tester.test_color_stability(num_frames=num_frames) + results[cam_index] = result + + # 汇总结果 + print("\n" + "-"*60) + print("稳定性测试汇总") + print("-"*60) + for cam_index, result in results.items(): + if result is None: + continue + name = getattr(self.testers[cam_index], 'name', f"采集卡{cam_index}") + print(f"{name}: 平均色差={result['avg_mean_diff']:.2f}±{result['std_mean_diff']:.2f}") + + return results + + def save_all_test_images(self, base_dir="test_output"): + """保存所有采集卡的测试图像""" + for cam_index, tester in self.testers.items(): + name = getattr(tester, 'name', f"采集卡{cam_index}") + # 清理名称中的特殊字符,用于目录名 + safe_name = "".join(c for c in name if c.isalnum() or c in (' ', '-', '_')).rstrip() + save_dir = os.path.join(base_dir, safe_name) + tester.save_test_images(save_dir=save_dir) + + def release_all(self): + """释放所有采集卡""" + for tester in self.testers.values(): + try: + tester.release() + except: + pass + self.testers.clear() + + def __del__(self): + """析构函数""" + self.release_all() + + +def scan_cameras(max_index=10): + """ + 扫描可用的采集卡 + :param max_index: 最大扫描索引 + :return: 可用采集卡索引列表 + """ + print("🔍 正在扫描采集卡...") + available = [] + + old_stderr = sys.stderr + suppressed_output = io.StringIO() + + try: + sys.stderr = suppressed_output + + for i in range(max_index): + cap = None + try: + with warnings.catch_warnings(): + warnings.filterwarnings('ignore') + cap = cv2.VideoCapture(i, cv2.CAP_DSHOW) + if cap.isOpened(): + ret, frame = cap.read() + if ret and frame is not None: + available.append(i) + print(f" ✅ 找到采集卡: 索引 {i}") + if cap: + cap.release() + except: + if cap: + try: + cap.release() + except: + pass + finally: + sys.stderr = old_stderr + + if not available: + print(" ❌ 未找到可用的采集卡") + else: + print(f"✅ 共找到 {len(available)} 张采集卡") + + return available + + +def main(): + """主测试函数""" + print("="*60) + print("采集卡截图测试工具(支持多采集卡)") + print("="*60) + + multi_tester = MultiCaptureCardTester() + + # 选择测试模式 + print("\n选择测试模式:") + print(" 1. 从配置文件加载(所有配置组)") + print(" 2. 从配置文件加载(仅活动配置组)") + print(" 3. 手动指定采集卡索引") + print(" 4. 扫描采集卡") + print(" 0. 退出") + + choice = input("\n请选择 (0-4): ").strip() + + if choice == "0": + print("👋 退出") + return + elif choice == "1": + # 从配置加载所有配置组 + if not multi_tester.initialize_from_config(use_all_groups=True): + print("❌ 无法从配置初始化采集卡") + return + elif choice == "2": + # 从配置加载活动配置组 + if not multi_tester.initialize_from_config(use_all_groups=False): + print("❌ 无法从配置初始化采集卡") + return + elif choice == "3": + # 手动指定 + indices_input = input("请输入采集卡索引(用逗号分隔,如: 0,1,2): ").strip() + try: + indices = [int(x.strip()) for x in indices_input.split(',')] + width_input = input("请输入宽度 (默认1920): ").strip() + height_input = input("请输入高度 (默认1080): ").strip() + width = int(width_input) if width_input else 1920 + height = int(height_input) if height_input else 1080 + + success_count = 0 + for idx in indices: + if multi_tester.add_camera(idx, width=width, height=height): + success_count += 1 + + if success_count == 0: + print("❌ 无法初始化任何采集卡") + return + except ValueError: + print("❌ 输入格式错误") + return + elif choice == "4": + # 扫描采集卡 + available = scan_cameras() + if not available: + return + + indices_input = input(f"请输入要测试的采集卡索引(用逗号分隔,可用: {available}): ").strip() + try: + indices = [int(x.strip()) for x in indices_input.split(',')] + # 验证索引有效性 + indices = [i for i in indices if i in available] + if not indices: + print("❌ 没有有效的采集卡索引") + return + + width_input = input("请输入宽度 (默认1920): ").strip() + height_input = input("请输入高度 (默认1080): ").strip() + width = int(width_input) if width_input else 1920 + height = int(height_input) if height_input else 1080 + + success_count = 0 + for idx in indices: + if multi_tester.add_camera(idx, width=width, height=height): + success_count += 1 + + if success_count == 0: + print("❌ 无法初始化任何采集卡") + return + except ValueError: + print("❌ 输入格式错误") + return + else: + print("❌ 无效选择") + return + + if not multi_tester.testers: + print("❌ 没有可用的采集卡") + return + + try: + # 等待采集卡稳定 + print("\n等待采集卡稳定...") + time.sleep(1.0) + + # 测试分辨率 + multi_tester.test_all_resolution() + + # 等待一下确保采集卡稳定 + time.sleep(0.5) + + # 测试色差 + multi_tester.test_all_color() + + # 测试稳定性 + multi_tester.test_all_stability(num_frames=10) + + # 保存测试图像 + multi_tester.save_all_test_images() + + print("\n✅ 所有测试完成!") + print("按 Enter 键退出...") + input() + + except KeyboardInterrupt: + print("\n\n用户中断测试") + except Exception as e: + logger.error(f"测试过程中发生错误: {e}") + import traceback + traceback.print_exc() + finally: + multi_tester.release_all() + + +if __name__ == "__main__": + main() + diff --git a/yolo_test.py b/yolo_test.py index 58c4435..b58c497 100644 --- a/yolo_test.py +++ b/yolo_test.py @@ -1,55 +1,377 @@ import cv2 -from utils.get_image import get_image +from utils.get_image import GetImage from ultralytics import YOLO +from config import config_manager +from utils.logger import logger +import os +import numpy as np -model = YOLO(r"best0.pt").to('cuda') +# 检查模型文件是否存在 +model_path = r"best0.pt" +if not os.path.exists(model_path): + print(f"❌ 模型文件不存在: {model_path}") + exit(1) -def yolo_shibie(im_PIL, detections): - results = model(im_PIL) - result = results[0] +# 加载YOLO模型 +try: + model = YOLO(model_path).to('cuda') + print(f"✅ 模型加载成功: {model_path}") +except Exception as e: + print(f"❌ 模型加载失败: {e}") + exit(1) - # ✅ 获取绘制好框的图像 - frame_with_boxes = result.plot() +def enhance_sharpness(image, strength=1.5): + """ + 增强图像锐度 + :param image: 输入图像(BGR格式) + :param strength: 锐化强度(1.0-3.0,默认1.5) + :return: 锐化后的图像 + """ + # 创建锐化核 + kernel = np.array([[-1, -1, -1], + [-1, 9*strength, -1], + [-1, -1, -1]]) / (9*strength - 8) + sharpened = cv2.filter2D(image, -1, kernel) + return sharpened - # ✅ 用 OpenCV 动态显示 - cv2.imshow("YOLO实时检测", frame_with_boxes) - # ESC 或 Q 键退出 - if cv2.waitKey(1) & 0xFF in [27, ord('q')]: - return None +def enhance_contrast(image, alpha=1.2, beta=10): + """ + 增强对比度和亮度 + :param image: 输入图像 + :param alpha: 对比度控制(1.0-3.0,默认1.2) + :param beta: 亮度控制(-100到100,默认10) + :return: 增强后的图像 + """ + return cv2.convertScaleAbs(image, alpha=alpha, beta=beta) - # ✅ 提取检测信息 - for i in range(len(result.boxes.xyxy)): - left, top, right, bottom = result.boxes.xyxy[i] - cls_id = int(result.boxes.cls[i]) - label = result.names[cls_id] - if label in ['center', 'next', 'npc1', 'npc2', 'npc3', 'npc4', 'boss', 'zhaozi']: - player_x = int(left + (right - left) / 2) + 3 - player_y = int(top + (bottom - top) / 2) + 40 - detections[label] = [player_x, player_y] - elif label in ['daojv', 'gw']: - player_x = int(left + (right - left) / 2) + 3 - player_y = int(top + (bottom - top) / 2) + 40 - detections[label].append([player_x, player_y]) +def denoise_image(image, method='bilateral'): + """ + 去噪处理 + :param image: 输入图像 + :param method: 去噪方法 ('bilateral', 'gaussian', 'fastNlMeans') + :return: 去噪后的图像 + """ + if method == 'bilateral': + # 双边滤波,保留边缘的同时去噪 + return cv2.bilateralFilter(image, 9, 75, 75) + elif method == 'gaussian': + # 高斯模糊去噪 + return cv2.GaussianBlur(image, (5, 5), 0) + elif method == 'fastNlMeans': + # 非局部均值去噪(效果最好但较慢) + return cv2.fastNlMeansDenoisingColored(image, None, 10, 10, 7, 21) + return image + +def apply_enhancements(image, sharpness=True, contrast=True, denoise=True, + sharp_strength=1.5, contrast_alpha=1.2, contrast_beta=10, + denoise_method='bilateral'): + """ + 应用所有图像增强 + :param image: 输入图像(BGR格式) + :param sharpness: 是否锐化 + :param contrast: 是否增强对比度 + :param denoise: 是否去噪 + :param sharp_strength: 锐化强度 + :param contrast_alpha: 对比度系数 + :param contrast_beta: 亮度调整 + :param denoise_method: 去噪方法 + :return: 增强后的图像 + """ + enhanced = image.copy() + + if denoise: + enhanced = denoise_image(enhanced, denoise_method) + + if contrast: + enhanced = enhance_contrast(enhanced, contrast_alpha, contrast_beta) + + if sharpness: + enhanced = enhance_sharpness(enhanced, sharp_strength) + + return enhanced + + +def set_camera_properties(cap, brightness=None, contrast=None, saturation=None, + sharpness=None, gain=None, exposure=None): + """ + 设置采集卡硬件参数 + :param cap: VideoCapture对象 + :param brightness: 亮度 (0-100) + :param contrast: 对比度 (0-100) + :param saturation: 饱和度 (0-100) + :param sharpness: 锐度 (0-100) + :param gain: 增益 (0-100) + :param exposure: 曝光 (通常为负值,如-6) + """ + props = { + cv2.CAP_PROP_BRIGHTNESS: brightness, + cv2.CAP_PROP_CONTRAST: contrast, + cv2.CAP_PROP_SATURATION: saturation, + cv2.CAP_PROP_SHARPNESS: sharpness, + cv2.CAP_PROP_GAIN: gain, + cv2.CAP_PROP_EXPOSURE: exposure, + } + + for prop, value in props.items(): + if value is not None: + try: + cap.set(prop, value) + actual = cap.get(prop) + logger.info(f" 设置 {prop.name if hasattr(prop, 'name') else prop}: {value} -> 实际: {actual:.2f}") + except Exception as e: + logger.warning(f" ⚠️ 设置参数 {prop} 失败: {e}") + + +def yolo_shibie(im_PIL, im_opencv_rgb, raw_frame_bgr, detections, model, show_original=True, + enhance_enabled=False, enhance_params=None): + """ + YOLO识别函数 + :param im_PIL: PIL图像对象 + :param im_opencv_rgb: RGB格式的OpenCV图像(裁剪后) + :param raw_frame_bgr: 原始BGR格式的OpenCV图像(未裁剪,与raw_frame.jpg一致) + :param detections: 检测结果字典 + :param model: YOLO模型 + :param show_original: 是否同时显示原始帧 + :return: 更新后的detections字典,如果用户退出则返回None + """ + if im_PIL is None: + return detections + + try: + results = model(im_PIL) + result = results[0] + + # ✅ 获取绘制好框的图像(RGB格式) + frame_with_boxes_rgb = result.plot() + + # ✅ 转换为BGR格式用于OpenCV显示 + frame_with_boxes_bgr = cv2.cvtColor(frame_with_boxes_rgb, cv2.COLOR_RGB2BGR) + + # 应用图像增强(如果启用) + display_frame = frame_with_boxes_bgr.copy() + if enhance_enabled and enhance_params: + try: + display_frame = apply_enhancements(display_frame, **enhance_params) + except Exception as e: + print(f"⚠️ 图像增强失败: {e}") + + # 显示画面 + if show_original and raw_frame_bgr is not None: + # 同时显示原始帧和检测结果(并排显示) + # 调整原始帧大小以匹配裁剪后的检测结果 + h, w = display_frame.shape[:2] + # 裁剪原始帧(与get_frame的处理一致:30:30+720, 0:1280) + raw_height, raw_width = raw_frame_bgr.shape[:2] + crop_top = 30 + crop_bottom = min(crop_top + h, raw_height) + crop_right = min(w, raw_width) + raw_cropped = raw_frame_bgr[crop_top:crop_bottom, 0:crop_right] + + # 如果尺寸不匹配,调整原始帧大小 + if raw_cropped.shape[:2] != (h, w): + raw_cropped = cv2.resize(raw_cropped, (w, h)) + + # 并排显示:原始帧(左) | 检测结果(右) + # 原始帧已经是BGR格式,检测结果也是BGR格式,可以直接拼接 + combined = cv2.hconcat([raw_cropped, display_frame]) + cv2.imshow("Original BGR (Left) | YOLO Detection (Right)", combined) + else: + # 只显示检测结果 + cv2.imshow("YOLO Real-time Detection", display_frame) + + # ✅ 提取检测信息 + if result.boxes is not None and len(result.boxes.xyxy) > 0: + for i in range(len(result.boxes.xyxy)): + try: + left = float(result.boxes.xyxy[i][0]) + top = float(result.boxes.xyxy[i][1]) + right = float(result.boxes.xyxy[i][2]) + bottom = float(result.boxes.xyxy[i][3]) + cls_id = int(result.boxes.cls[i]) + label = result.names[cls_id] + + if label in ['center', 'next', 'npc1', 'npc2', 'npc3', 'npc4', 'boss', 'zhaozi']: + player_x = int(left + (right - left) / 2) + 3 + player_y = int(top + (bottom - top) / 2) + 40 + detections[label] = [player_x, player_y] + elif label in ['daojv', 'gw']: + player_x = int(left + (right - left) / 2) + 3 + player_y = int(top + (bottom - top) / 2) + 40 + # 确保列表存在 + if label not in detections: + detections[label] = [] + detections[label].append([player_x, player_y]) + except Exception as e: + print(f"⚠️ 处理检测框时出错: {e}") + continue + + except Exception as e: + print(f"⚠️ YOLO检测出错: {e}") + return detections -while True: - detections = { - 'center': None, 'next': None, - 'npc1': None, 'npc2': None, 'npc3': None, 'npc4': None, - 'boss': None, 'zhaozi': None, - 'daojv': [], 'gw': [] - } +def main(): + """主函数""" + print("="*60) + print("YOLO实时检测测试") + print("="*60) + + # 从配置加载采集卡设置 + active_group = config_manager.get_active_group() + + if active_group is None: + print("⚠️ 没有活动的配置组,使用默认设置") + print("提示: 可以运行 python gui_config.py 设置配置") + cam_index = 0 + width = 1920 + height = 1080 + else: + print(f"📋 使用配置组: {active_group['name']}") + cam_index = active_group['camera_index'] + width = active_group['camera_width'] + height = active_group['camera_height'] + + print(f" 采集卡索引: {cam_index}") + print(f" 分辨率: {width}x{height}") + print() + + # 初始化采集卡 + print("🔧 正在初始化采集卡...") + get_image = GetImage( + cam_index=cam_index, + width=width, + height=height + ) + + if get_image.cap is None: + print("❌ 采集卡初始化失败") + print("请检查:") + print("1. 采集卡是否正确连接") + print("2. 采集卡索引是否正确") + print("3. 采集卡驱动是否安装") + return + + # 设置采集卡硬件参数以提高清晰度(可选) + print("\n🔧 设置采集卡参数以提高清晰度...") + print("提示: 可以根据实际情况调整这些参数") + set_camera_properties( + get_image.cap, + brightness=50, # 亮度 (0-100) + contrast=50, # 对比度 (0-100) + saturation=55, # 饱和度 (0-100) + sharpness=60, # 锐度 (0-100,提高清晰度) + gain=None, # 增益 (根据实际情况调整) + exposure=None # 曝光 (根据实际情况调整,通常为负值) + ) + + print("✅ 采集卡初始化成功") + print("\n快捷键:") + print(" 'q' 或 ESC - 退出") + print(" 'o' - 切换原始帧对比模式") + print(" 'e' - 切换图像增强") + print(" '1'/'2' - 调整锐化强度 (+/-0.1)") + print(" '3'/'4' - 调整对比度 (+/-0.1)") + print() + + try: + frame_count = 0 + show_original = True # 默认同时显示原始帧和检测结果 + enhance_enabled = False # 默认关闭图像增强 + + # 图像增强参数 + enhance_params = { + 'sharpness': True, + 'contrast': True, + 'denoise': True, + 'sharp_strength': 1.5, + 'contrast_alpha': 1.2, + 'contrast_beta': 10, + 'denoise_method': 'bilateral' + } + + while True: + # 获取帧 + frame_data = get_image.get_frame() + + if frame_data is None: + print("⚠️ 无法获取帧,跳过...") + continue + + # frame_data 是 [im_opencv_rgb, im_PIL] 格式 + # im_opencv_rgb 已经是RGB格式(经过BGR2RGB转换) + im_opencv_rgb, im_PIL = frame_data + + if im_PIL is None: + print("⚠️ PIL图像为空,跳过...") + continue + + # 获取原始BGR帧(与test_capture_card.py保存的raw_frame.jpg一致) + raw_frame_bgr = None + if get_image.cap is not None and get_image.frame is not None: + raw_frame_bgr = get_image.frame.copy() # 原始BGR格式,未裁剪 + + # 初始化检测结果字典 + detections = { + 'center': None, 'next': None, + 'npc1': None, 'npc2': None, 'npc3': None, 'npc4': None, + 'boss': None, 'zhaozi': None, + 'daojv': [], 'gw': [] + } + + # 执行YOLO检测 + detections = yolo_shibie(im_PIL, im_opencv_rgb, raw_frame_bgr, detections, model, + show_original, enhance_enabled, enhance_params) + + # 检查按键 + key = cv2.waitKey(1) & 0xFF + if key in [27, ord('q'), ord('Q')]: + print("\n用户退出") + break + elif key == ord('o') or key == ord('O'): + show_original = not show_original + print(f"切换显示模式: {'原始帧对比' if show_original else '仅检测结果'}") + elif key == ord('e') or key == ord('E'): + enhance_enabled = not enhance_enabled + status = "开启" if enhance_enabled else "关闭" + print(f"图像增强: {status} (锐化={enhance_params['sharp_strength']:.1f}, " + f"对比度={enhance_params['contrast_alpha']:.1f})") + elif key == ord('1'): + enhance_params['sharp_strength'] = min(3.0, enhance_params['sharp_strength'] + 0.1) + print(f"锐化强度: {enhance_params['sharp_strength']:.1f}") + elif key == ord('2'): + enhance_params['sharp_strength'] = max(0.5, enhance_params['sharp_strength'] - 0.1) + print(f"锐化强度: {enhance_params['sharp_strength']:.1f}") + elif key == ord('3'): + enhance_params['contrast_alpha'] = min(3.0, enhance_params['contrast_alpha'] + 0.1) + print(f"对比度: {enhance_params['contrast_alpha']:.1f}") + elif key == ord('4'): + enhance_params['contrast_alpha'] = max(0.5, enhance_params['contrast_alpha'] - 0.1) + print(f"对比度: {enhance_params['contrast_alpha']:.1f}") + + frame_count += 1 + if frame_count % 30 == 0: # 每30帧打印一次 + print(f"📊 已处理 {frame_count} 帧") + # 打印有检测到的目标 + detected_items = {k: v for k, v in detections.items() if v is not None and v != []} + if detected_items: + print(f" 检测到: {detected_items}") + + except KeyboardInterrupt: + print("\n\n用户中断测试") + except Exception as e: + print(f"\n❌ 测试过程中发生错误: {e}") + import traceback + traceback.print_exc() + finally: + # 清理资源 + get_image.release() + cv2.destroyAllWindows() + print("🔚 测试结束") - im_opencv = get_image.get_frame() # [RGB, PIL] - detections = yolo_shibie(im_opencv[1], detections) - if detections is None: # 用户退出 - break - - print(detections) - -cv2.destroyAllWindows() +if __name__ == "__main__": + main()