Files
huojv/preview.py
2025-10-29 16:53:34 +08:00

448 lines
19 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import cv2
import tkinter as tk
from tkinter import Canvas
from PIL import Image, ImageTk
import threading
import numpy as np
import warnings
import os
from config import config_manager
# 抑制OpenCV的警告信息兼容不同版本
import sys
import io
os.environ['OPENCV_LOG_LEVEL'] = 'SILENT'
os.environ['OPENCV_IO_ENABLE_OPENEXR'] = '0'
try:
if hasattr(cv2, 'setLogLevel'):
if hasattr(cv2, 'LOG_LEVEL_SILENT'):
cv2.setLogLevel(cv2.LOG_LEVEL_SILENT)
elif hasattr(cv2, 'LOG_LEVEL_ERROR'):
cv2.setLogLevel(cv2.LOG_LEVEL_ERROR)
elif hasattr(cv2, 'utils'):
cv2.utils.setLogLevel(0)
except Exception:
pass
class PreviewWindow:
"""采集卡预览窗口"""
def __init__(self):
self.config = config_manager.config
self.caps = {}
self.frames = {}
self.large_window = None
self.running = True
def init_cameras(self):
"""初始化所有相机"""
print("🔧 开始初始化采集卡...")
loaded_count = 0
# 如果没有活动配置,加载所有配置
active_groups = [g for g in self.config['groups'] if g.get('active', True)]
if not active_groups:
print("⚠️ 没有活动的配置组,将尝试加载所有配置组")
active_groups = self.config['groups']
# 重定向stderr来抑制OpenCV的错误输出
old_stderr = sys.stderr
suppressed_output = io.StringIO()
try:
sys.stderr = suppressed_output
for i, group in enumerate(active_groups):
try:
cam_idx = group['camera_index']
print(f" 尝试打开采集卡 {cam_idx} ({group['name']})...")
cap = None
# 尝试多种后端打开
backends_to_try = [
(int(cam_idx), cv2.CAP_DSHOW),
(int(cam_idx), cv2.CAP_ANY),
(int(cam_idx), None),
]
for idx, backend in backends_to_try:
try:
with warnings.catch_warnings():
warnings.filterwarnings('ignore')
if backend is not None:
cap = cv2.VideoCapture(idx, backend)
else:
cap = cv2.VideoCapture(idx)
if cap.isOpened():
# 测试读取一帧
ret, test_frame = cap.read()
if ret and test_frame is not None:
break
else:
cap.release()
cap = None
except Exception:
if cap:
try:
cap.release()
except:
pass
cap = None
continue
if cap and cap.isOpened():
try:
cap.set(cv2.CAP_PROP_FRAME_WIDTH, group['camera_width'])
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, group['camera_height'])
except Exception as e:
print(f" ⚠️ 设置分辨率失败: {e}")
self.caps[i] = {
'cap': cap,
'group': group,
'name': group['name']
}
loaded_count += 1
print(f" ✅ 采集卡 {cam_idx} 初始化成功")
else:
print(f" ❌ 采集卡 {cam_idx} 无法打开")
except Exception as e:
print(f" ❌ 采集卡 {group.get('camera_index', '?')} 初始化失败: {e}")
import traceback
traceback.print_exc()
finally:
# 恢复stderr
sys.stderr = old_stderr
if loaded_count == 0:
print("⚠️ 警告:没有成功加载任何采集卡!")
print("请检查:")
print("1. 采集卡是否正确连接")
print("2. 采集卡索引是否正确")
print("3. 是否有活动的配置组")
else:
print(f"✅ 成功加载 {loaded_count} 个采集卡")
def capture_frames(self):
"""捕获帧"""
while self.running:
for idx, data in self.caps.items():
try:
ret, frame = data['cap'].read()
if ret and frame is not None:
# 裁剪到配置的区域
height, width = frame.shape[:2]
crop_top = 30
crop_bottom = min(crop_top + 720, height)
crop_width = min(1280, width)
# 确保裁剪范围有效
if crop_bottom > crop_top and crop_width > 0:
frame = frame[crop_top:crop_bottom, 0:crop_width]
# 转换颜色空间
frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
self.frames[idx] = frame_rgb
else:
print(f"⚠️ 采集卡 {idx} 裁剪参数无效")
else:
# 读取失败,清除旧帧
if idx in self.frames:
self.frames[idx] = None
except Exception as e:
print(f"捕获帧 {idx} 错误: {e}")
import traceback
traceback.print_exc()
import time
time.sleep(0.01) # 避免CPU占用过高
def create_grid_window(self):
"""创建网格窗口"""
root = tk.Tk()
root.title("采集卡预览 - 点击放大")
root.geometry("1000x700")
# 获取显示配置
display = self.config.get('display', {})
preview_width = display.get('preview_width', 640)
preview_height = display.get('preview_height', 360)
columns = display.get('preview_columns', 2)
rows = display.get('preview_rows', 2)
canvas = Canvas(root, bg='black')
canvas.pack(fill=tk.BOTH, expand=True)
# 存储图像对象
self.photo_objects = {}
def update_frames_once():
"""在主线程中更新一帧使用after循环"""
if not self.running:
return
try:
# 如果没有加载任何采集卡,显示提示
if not self.caps:
canvas.delete("all")
canvas.create_text(
root.winfo_width() // 2,
root.winfo_height() // 2,
text="未找到可用的采集卡\n\n请检查:\n1. 采集卡是否正确连接\n2. 配置组是否有活动的采集卡\n3. 采集卡索引是否正确",
fill='yellow',
font=('Arial', 14),
justify=tk.CENTER
)
root.after(33, update_frames_once)
return
# 计算每个预览窗口的位置和大小
# 确保画布已完全初始化
root.update_idletasks()
canvas.update_idletasks()
# 获取画布实际尺寸
canvas_width = canvas.winfo_width()
canvas_height = canvas.winfo_height()
# 如果画布还没初始化,使用默认值
if canvas_width <= 1 or canvas_height <= 1:
canvas_width = 1000
canvas_height = 700
# 确保尺寸有效
if canvas_width < 100 or canvas_height < 100:
canvas_width = 1000
canvas_height = 700
cell_width = max(10, canvas_width // columns)
cell_height = max(10, canvas_height // rows)
# 先收集所有需要显示的图像
images_to_draw = []
texts_to_draw = []
frame_idx = 0
for idx in self.caps.keys():
row = frame_idx // columns
col = frame_idx % columns
x = col * cell_width
y = row * cell_height
center_x = x + cell_width // 2
center_y = y + cell_height // 2
name = self.caps[idx]['name']
# 检查帧数据
has_frame = idx in self.frames and self.frames[idx] is not None
if has_frame:
# 调整图像大小
try:
frame = self.frames[idx]
h, w = frame.shape[:2]
# 确保尺寸有效
if w <= 0 or h <= 0:
texts_to_draw.append((center_x, center_y, f"{name}\n尺寸无效", 'red'))
frame_idx += 1
continue
# 计算缩放比例,留一些边距
scale = min(cell_width / w, cell_height / h, 1.0) * 0.85
new_w = max(1, int(w * scale))
new_h = max(1, int(h * scale))
# 确保缩放后的尺寸有效
if new_w <= 0 or new_h <= 0:
texts_to_draw.append((center_x, center_y, f"{name}\n缩放失败", 'red'))
frame_idx += 1
continue
# 调试输出(仅前几次)
if frame_idx == 0 and len(images_to_draw) == 0:
print(f"🔍 预览调试: 画布={canvas_width}x{canvas_height}, 单元格={cell_width}x{cell_height}, 原始帧={w}x{h}, 缩放后={new_w}x{new_h}")
resized_frame = cv2.resize(frame, (new_w, new_h))
# 转换为PIL图像
# 确保颜色通道正确BGR -> RGB
if len(resized_frame.shape) == 3 and resized_frame.shape[2] == 3:
resized_frame = cv2.cvtColor(resized_frame, cv2.COLOR_BGR2RGB)
pil_image = Image.fromarray(resized_frame)
photo = ImageTk.PhotoImage(image=pil_image)
# 保持引用避免被GC
self.photo_objects[idx] = photo
images_to_draw.append((photo, center_x, center_y))
texts_to_draw.append((center_x, y + 15, name, 'white'))
frame_idx += 1
except Exception as e:
# 忽略pyimage相关错误避免刷屏
if "pyimage" not in str(e).lower():
print(f"处理帧 {idx} 错误: {e}")
import traceback
traceback.print_exc()
# 如果处理失败,显示等待提示
texts_to_draw.append((center_x, center_y, f"{name}\n处理失败", 'red'))
frame_idx += 1
else:
# 显示等待提示
texts_to_draw.append((center_x, center_y, f"{name}\n等待画面...", 'gray'))
frame_idx += 1
# 清空画布
canvas.delete("all")
# 先绘制所有图像(底层)
if images_to_draw:
if frame_idx == len(images_to_draw):
print(f"✅ 准备绘制 {len(images_to_draw)} 个图像")
for photo, x, y in images_to_draw:
try:
canvas.create_image(x, y, image=photo, anchor='center')
except Exception as e:
if "pyimage" not in str(e).lower():
print(f"绘制图像错误: {e}")
# 再绘制所有文本(上层)
for x, y, text, color in texts_to_draw:
try:
if color == 'white':
canvas.create_text(x, y, text=text, fill=color, font=('Arial', 10, 'bold'))
else:
canvas.create_text(x, y, text=text, fill=color, font=('Arial', 10))
except Exception as e:
print(f"绘制文本错误: {e}")
except Exception as e:
print(f"更新帧错误: {e}")
import traceback
traceback.print_exc()
# 约30fps
root.after(33, update_frames_once)
def on_canvas_click(event):
"""点击画布事件"""
window_width = root.winfo_width()
window_height = root.winfo_height()
cell_width = window_width // columns
cell_height = window_height // rows
col = int(event.x // cell_width)
row = int(event.y // cell_height)
index = row * columns + col
# 找到对应的配置
if index < len(self.caps):
idx = list(self.caps.keys())[index]
self.show_large_window(idx)
canvas.bind('<Button-1>', on_canvas_click)
# 等待窗口完全初始化后再开始更新
def start_updates():
"""延迟启动更新,确保窗口已完全显示"""
root.update_idletasks()
root.update()
update_frames_once()
# 使用after在主线程中循环刷新延迟启动
root.after(100, start_updates)
def on_closing():
"""关闭窗口"""
self.running = False
for data in self.caps.values():
data['cap'].release()
root.destroy()
root.protocol("WM_DELETE_WINDOW", on_closing)
root.mainloop()
def show_large_window(self, idx):
"""显示大窗口"""
if self.large_window is not None and self.large_window.winfo_exists():
self.large_window.destroy()
self.large_window = tk.Toplevel()
self.large_window.title(f"放大视图 - {self.caps[idx]['name']}")
self.large_window.geometry("1280x720")
canvas = Canvas(self.large_window, bg='black')
canvas.pack(fill=tk.BOTH, expand=True)
photo_obj = {}
def update_large_once():
if not self.running or not self.large_window.winfo_exists():
return
try:
# 获取窗口大小
window_width = self.large_window.winfo_width() if self.large_window.winfo_width() > 1 else 1280
window_height = self.large_window.winfo_height() if self.large_window.winfo_height() > 1 else 720
if idx in self.frames and self.frames[idx] is not None:
# 先处理图像,再清空画布
frame = self.frames[idx]
h, w = frame.shape[:2]
# 调整到窗口大小
scale = min(window_width / w, window_height / h)
new_w = int(w * scale)
new_h = int(h * scale)
resized_frame = cv2.resize(frame, (new_w, new_h))
# 确保颜色通道正确BGR -> RGB
if len(resized_frame.shape) == 3 and resized_frame.shape[2] == 3:
resized_frame = cv2.cvtColor(resized_frame, cv2.COLOR_BGR2RGB)
pil_image = Image.fromarray(resized_frame)
photo = ImageTk.PhotoImage(image=pil_image)
# 保存引用到字典确保不被GC
photo_obj['img'] = photo
# 清空并绘制新图像
canvas.delete("all")
canvas.create_image(window_width // 2, window_height // 2, image=photo, anchor='center')
else:
# 显示等待提示
canvas.delete("all")
canvas.create_text(
window_width // 2,
window_height // 2,
text="等待画面...",
fill='gray',
font=('Arial', 16)
)
except Exception as e:
if "pyimage" not in str(e).lower(): # 忽略pyimage错误避免刷屏
print(f"更新大窗口错误: {e}")
self.large_window.after(33, update_large_once)
self.large_window.after(33, update_large_once)
def run(self):
"""运行预览"""
self.init_cameras()
if not self.caps:
# 如果没有加载任何采集卡,仍然显示窗口并显示错误信息
print("⚠️ 没有可用的采集卡,将显示错误提示窗口")
# 启动捕获线程
if self.caps:
capture_thread = threading.Thread(target=self.capture_frames, daemon=True)
capture_thread.start()
# 创建并显示网格窗口
import time
time.sleep(0.5) # 等待几帧
self.create_grid_window()
if __name__ == "__main__":
preview = PreviewWindow()
preview.run()