Files
huojv/preview.py
2025-10-30 23:39:27 +08:00

587 lines
26 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import cv2
import tkinter as tk
from tkinter import Canvas
from PIL import Image, ImageTk
import threading
import numpy as np
import warnings
import os
from config import config_manager
# 抑制OpenCV的警告信息兼容不同版本
import sys
import io
import time as _time
os.environ['OPENCV_LOG_LEVEL'] = 'SILENT'
os.environ['OPENCV_IO_ENABLE_OPENEXR'] = '0'
try:
if hasattr(cv2, 'setLogLevel'):
if hasattr(cv2, 'LOG_LEVEL_SILENT'):
cv2.setLogLevel(cv2.LOG_LEVEL_SILENT)
elif hasattr(cv2, 'LOG_LEVEL_ERROR'):
cv2.setLogLevel(cv2.LOG_LEVEL_ERROR)
elif hasattr(cv2, 'utils'):
cv2.utils.setLogLevel(0)
except Exception:
pass
# 简单日志限流
_last_log_times = {}
def log_throttle(key: str, interval_sec: float, text: str):
now = _time.time()
last = _last_log_times.get(key, 0)
if now - last >= interval_sec:
_last_log_times[key] = now
try:
print(text)
except Exception:
pass
class PreviewWindow:
"""采集卡预览窗口"""
def __init__(self):
self.config = config_manager.config
self.caps = {}
self.frames = {}
self.large_windows = {} # idx -> Toplevel
self.running = True
def init_cameras(self):
"""初始化所有相机"""
print("🔧 开始初始化采集卡...")
loaded_count = 0
# 根据配置选择加载哪些组
display = self.config.get('display', {})
use_all = bool(display.get('preview_use_all_groups', True))
if use_all:
target_groups = self.config['groups']
print(f"📷 预览全部配置组,共 {len(target_groups)}")
else:
target_groups = [g for g in self.config['groups'] if g.get('active', False)]
print(f"📷 仅预览活动配置组,共 {len(target_groups)}")
if not target_groups:
print("⚠️ 没有活动的配置组,将不显示任何采集卡。可在配置中启用‘预览全部配置组’或设置活动组")
# 重定向stderr来抑制OpenCV的错误输出
old_stderr = sys.stderr
suppressed_output = io.StringIO()
try:
sys.stderr = suppressed_output
for i, group in enumerate(target_groups):
try:
cam_idx = group['camera_index']
print(f" 尝试打开采集卡 {cam_idx} ({group['name']})...")
cap = None
# 尝试多种后端打开
backends_to_try = [
(int(cam_idx), cv2.CAP_DSHOW),
(int(cam_idx), cv2.CAP_ANY),
(int(cam_idx), None),
]
for idx, backend in backends_to_try:
try:
with warnings.catch_warnings():
warnings.filterwarnings('ignore')
if backend is not None:
cap = cv2.VideoCapture(idx, backend)
else:
cap = cv2.VideoCapture(idx)
if cap.isOpened():
# 测试读取一帧
ret, test_frame = cap.read()
if ret and test_frame is not None:
break
else:
cap.release()
cap = None
except Exception:
if cap:
try:
cap.release()
except:
pass
cap = None
continue
if cap and cap.isOpened():
try:
cap.set(cv2.CAP_PROP_FRAME_WIDTH, group['camera_width'])
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, group['camera_height'])
except Exception as e:
print(f" ⚠️ 设置分辨率失败: {e}")
self.caps[i] = {
'cap': cap,
'group': group,
'name': group['name']
}
loaded_count += 1
print(f" ✅ 采集卡 {cam_idx} 初始化成功")
else:
print(f" ❌ 采集卡 {cam_idx} 无法打开")
except Exception as e:
print(f" ❌ 采集卡 {group.get('camera_index', '?')} 初始化失败: {e}")
import traceback
traceback.print_exc()
finally:
# 恢复stderr
sys.stderr = old_stderr
if loaded_count == 0:
print("⚠️ 警告:没有成功加载任何采集卡!")
print("请检查:")
print("1. 采集卡是否正确连接")
print("2. 采集卡索引是否正确")
print("3. 是否有活动的配置组")
else:
print(f"✅ 成功加载 {loaded_count} 个采集卡")
def grab_once(self):
"""抓取每個采集卡當前單幀(裁剪後,存入 self.frames抓取结果增强debug输出"""
for idx, data in self.caps.items():
try:
cap = data['cap']
frm_name = data.get('name', str(idx))
if cap is None:
print(f"[抓取] 采集卡{idx}({frm_name}) cap为None")
self.frames[idx] = None
continue
ret, frame = cap.read()
if not ret or frame is None:
print(f"[抓取] 采集卡{idx}({frm_name}) 没抓到帧 ret={ret} frame=None")
self.frames[idx] = None
continue
height, width = frame.shape[:2]
crop_top = 30
crop_bottom = min(crop_top + 720, height)
crop_width = min(1280, width)
if crop_bottom <= crop_top or crop_width <= 0:
print(f"[抓取] 采集卡{idx}({frm_name}) 尺寸异常: H={height} W={width} 裁剪失败")
self.frames[idx] = None
continue
frame = frame[crop_top:crop_bottom, 0:crop_width]
# 额外检查:像素全黑?
if np.mean(frame) < 3:
print(f"[抓取] 采集卡{idx}({frm_name}) 帧抓到但内容接近全黑 shape={frame.shape}")
else:
print(f"[抓取] 采集卡{idx}({frm_name}) 成功 shape={frame.shape} 类型={frame.dtype}")
self.frames[idx] = frame
except Exception as e:
print(f"[抓取] 采集卡{idx}({data.get('name', str(idx))}) 抓取异常: {e}")
self.frames[idx] = None
def capture_frames(self):
"""捕获帧"""
while self.running:
for idx, data in self.caps.items():
try:
ret, frame = data['cap'].read()
if ret and frame is not None:
# 裁剪到配置的区域
height, width = frame.shape[:2]
crop_top = 30
crop_bottom = min(crop_top + 720, height)
crop_width = min(1280, width)
# 确保裁剪范围有效
if crop_bottom > crop_top and crop_width > 0:
frame = frame[crop_top:crop_bottom, 0:crop_width]
# 转换颜色空间
frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
self.frames[idx] = frame_rgb
else:
print(f"⚠️ 采集卡 {idx} 裁剪参数无效")
else:
# 读取失败,清除旧帧
if idx in self.frames:
self.frames[idx] = None
except Exception as e:
print(f"捕获帧 {idx} 错误: {e}")
import traceback
traceback.print_exc()
import time
time.sleep(0.01) # 避免CPU占用过高
def create_grid_window(self, master=None):
"""创建网格窗口可挂载在主Tk下作为Toplevel弹窗"""
display = self.config.get('display', {})
preview_width = display.get('preview_width', 1000)
preview_height = display.get('preview_height', 700)
columns = display.get('preview_columns', 2)
rows = display.get('preview_rows', 2)
if master is not None:
root = tk.Toplevel(master)
else:
root = tk.Tk()
root.title("采集卡预览 - 点击放大")
root.geometry(f"{preview_width}x{preview_height}")
root.update_idletasks()
canvas = Canvas(root, bg='black', width=preview_width, height=preview_height)
canvas.pack(fill=tk.BOTH, expand=True)
# 存储图像对象
self.photo_objects = {}
# 用于控制调试输出(只打印前几次)
self.debug_count = 0
def update_frames_once():
"""在主线程中每5秒刷新画面直接显示高频采集线程生成的帧去除grab_once"""
if not self.running:
return
try:
if not self.caps:
canvas.delete("all")
canvas.create_text(
root.winfo_width() // 2,
root.winfo_height() // 2,
text="未找到可用的采集卡\n\n请检查:\n1. 采集卡是否正确连接\n2. 配置组是否有活动的采集卡\n3. 采集卡索引是否正确",
fill='yellow',
font=('Arial', 14),
justify=tk.CENTER
)
root.after(5000, update_frames_once)
return
# 计算每个预览窗口的位置和大小
# 直接使用配置值作为画布尺寸macOS上窗口尺寸可能在显示前返回默认值
canvas_width = preview_width
canvas_height = preview_height
# 尝试获取实际的窗口尺寸,如果有效则使用(大于配置值说明可能被手动调整了)
try:
root.update_idletasks()
actual_width = root.winfo_width()
actual_height = root.winfo_height()
# 只有在获取到合理的尺寸时才使用大于100像素
if actual_width > 100 and actual_height > 100:
canvas_width = actual_width
canvas_height = actual_height
except:
pass # 如果获取失败,使用配置值
cell_width = max(10, canvas_width // columns)
cell_height = max(10, canvas_height // rows)
# 先收集所有需要显示的图像
images_to_draw = []
texts_to_draw = []
frame_idx = 0
for idx in self.caps.keys():
frame = self.frames.get(idx)
has_frame = frame is not None
print(f'[DEBUG] update UI idx={idx}, has_frame={has_frame}, frame_info={getattr(frame, "shape", None)}')
name = self.caps[idx]['name']
row = frame_idx // columns
col = frame_idx % columns
x = col * cell_width
y = row * cell_height
center_x = x + cell_width // 2
center_y = y + cell_height // 2
# 如果没拿到frame临时主动再抓一次并DEBUG
if not has_frame:
print(f'[DEBUG] UI未获得frame尝试临时抓取采集卡{idx}...')
try:
cap_test = self.caps[idx]['cap']
ret, fr2 = cap_test.read()
print(f'[DEBUG] 临时抓取ret={ret}, shape={getattr(fr2, "shape", None)}')
if ret and fr2 is not None:
frame = fr2
has_frame = True
else:
frame = None
has_frame = False
except Exception as e:
print(f'[DEBUG] 临时抓取异常:{e}')
frame = None
has_frame = False
if has_frame:
try:
h, w = frame.shape[:2]
if w <= 0 or h <= 0:
detail = f"采集卡{idx}:{name}\n抓到帧但图像尺寸异常 ({w}x{h})"
texts_to_draw.append((center_x, center_y, detail, 'red'))
frame_idx += 1
continue
scale = min(cell_width / w, cell_height / h, 1.0) * 0.85
new_w = max(1, int(w * scale))
new_h = max(1, int(h * scale))
if new_w <= 0 or new_h <= 0:
detail = f"采集卡{idx}:{name}\n缩放计算异常 ({w}x{h})"
texts_to_draw.append((center_x, center_y, detail, 'red'))
frame_idx += 1
continue
display_frame = frame
if len(display_frame.shape) == 3 and display_frame.shape[2] == 3:
display_frame = cv2.cvtColor(display_frame, cv2.COLOR_BGR2RGB)
resized_frame = cv2.resize(display_frame, (new_w, new_h))
pil_image = Image.fromarray(resized_frame)
photo = ImageTk.PhotoImage(image=pil_image)
if not hasattr(self, 'photo_refs'):
self.photo_refs = {}
self.photo_refs[idx] = photo
canvas.create_image(center_x, center_y, image=photo, anchor='center')
canvas.update() # 保证实时刷新
print(f'[DEBUG] UI idx={idx}, 绘制photo id={id(photo)}, photo_refs={len(self.photo_refs)}')
images_to_draw.append((photo, center_x, center_y))
texts_to_draw.append((center_x, y + 15, name, 'white'))
frame_idx += 1
except Exception as e:
errstr = str(e)
detail = f"采集卡{idx}:{name}\n帧处理异常:{errstr}"
print(f'[DEBUG] idx={idx}, 帧处理异常:{errstr}')
texts_to_draw.append((center_x, center_y, detail, 'red'))
frame_idx += 1
else:
detail = f"采集卡{idx}:{name}\n抓取失败/暂无画面"
print(f'[DEBUG] idx={idx} 依然无可用帧,最终显示错误提示')
texts_to_draw.append((center_x, center_y, detail, 'red'))
frame_idx += 1
# 清空画布
canvas.delete("all")
# 先绘制所有图像(底层)
if images_to_draw and self.debug_count <= 3:
log_throttle("draw_prepare", 2.0, f"✅ 准备绘制 {len(images_to_draw)} 个图像到画布 ({canvas_width}x{canvas_height})")
for photo, x, y in images_to_draw:
try:
# PhotoImage强引用防止被GC
if not hasattr(self, 'photo_refs'):
self.photo_refs = {}
self.photo_refs[idx] = photo
canvas.create_image(x, y, image=photo, anchor='center')
canvas.update() # 保证实时刷新
print(f'[DEBUG] idx={idx}, photo id={id(photo)}, photo refs={len(self.photo_refs)}')
except Exception as e:
if "pyimage" not in str(e).lower():
log_throttle("draw_image_error", 2.0, f"绘制图像错误: {e}")
# 再绘制所有文本(上层)
for x, y, text, color in texts_to_draw:
try:
if color == 'white':
canvas.create_text(x, y, text=text, fill=color, font=('Arial', 10, 'bold'))
else:
canvas.create_text(x, y, text=text, fill=color, font=('Arial', 10))
except Exception as e:
log_throttle("draw_text_error", 2.0, f"绘制文本错误: {e}")
# 绘制分割线,区分不同的采集卡窗口
try:
# 绘制垂直分割线(列之间的分割线)
for col in range(1, columns):
x = col * cell_width
canvas.create_line(
x, 0,
x, canvas_height,
fill='white',
width=2,
dash=(5, 5) # 虚线效果,让分割线更明显
)
# 绘制水平分割线(行之间的分割线)
for row in range(1, rows):
y = row * cell_height
canvas.create_line(
0, y,
canvas_width, y,
fill='white',
width=2,
dash=(5, 5) # 虚线效果
)
except Exception as e:
log_throttle("draw_grid_error", 2.0, f"绘制分割线错误: {e}")
except Exception as e:
log_throttle("update_frame_error", 1.0, f"更新帧错误: {e}")
import traceback
traceback.print_exc()
# 每5秒更新一次
root.after(5000, update_frames_once)
def on_canvas_click(event):
"""点击画布事件"""
window_width = root.winfo_width()
window_height = root.winfo_height()
cell_width = window_width // columns
cell_height = window_height // rows
col = int(event.x // cell_width)
row = int(event.y // cell_height)
index = row * columns + col
# 找到对应的配置
if index < len(self.caps):
idx = list(self.caps.keys())[index]
self.show_large_window(idx)
canvas.bind('<Button-1>', on_canvas_click)
# 等待窗口完全初始化后再开始更新
def start_updates():
"""延迟启动更新,确保窗口已完全显示"""
# 强制更新窗口尺寸
root.update_idletasks()
root.update()
# 等待窗口完全绘制
import time
time.sleep(0.2) # 给窗口更多时间初始化
root.update_idletasks()
root.update()
update_frames_once()
# 使用after在主线程中循环刷新延迟启动给足够时间让窗口初始化
root.after(200, start_updates)
def on_closing():
"""关闭窗口"""
self.running = False
for data in self.caps.values():
data['cap'].release()
root.destroy()
root.protocol("WM_DELETE_WINDOW", on_closing)
# 仅在独立窗口无master时进入事件循环
if master is None:
root.mainloop()
def show_large_window(self, idx):
"""显示指定采集卡的大窗口(可多实例)"""
# 如果已存在该索引窗口,先销毁再创建,确保刷新
if idx in self.large_windows and self.large_windows[idx].winfo_exists():
try:
self.large_windows[idx].destroy()
except Exception:
pass
win = tk.Toplevel()
win.title(f"放大视图 - {self.caps[idx]['name']}")
win.geometry("1280x720")
self.large_windows[idx] = win
canvas = Canvas(win, bg='black')
canvas.pack(fill=tk.BOTH, expand=True)
photo_obj = {}
def update_large_once():
if not self.running or not win.winfo_exists():
return
try:
window_width = win.winfo_width() if win.winfo_width() > 1 else 1280
window_height = win.winfo_height() if win.winfo_height() > 1 else 720
if idx in self.frames and self.frames[idx] is not None:
frame = self.frames[idx]
h, w = frame.shape[:2]
scale = min(window_width / w, window_height / h)
new_w = int(w * scale)
new_h = int(h * scale)
resized_frame = cv2.resize(frame, (new_w, new_h))
if len(resized_frame.shape) == 3 and resized_frame.shape[2] == 3:
resized_frame = cv2.cvtColor(resized_frame, cv2.COLOR_BGR2RGB)
pil_image = Image.fromarray(resized_frame)
photo = ImageTk.PhotoImage(image=pil_image)
photo_obj['img'] = photo
canvas.delete("all")
canvas.create_image(window_width // 2, window_height // 2, image=photo, anchor='center')
else:
canvas.delete("all")
# 显示更详细的调试信息
debug_text = f"等待画面...\n采集卡: {idx}\n帧状态: {idx in self.frames}\n"
if idx in self.frames:
debug_text += f"帧数据: {self.frames[idx] is not None}"
else:
debug_text += "帧数据: 未初始化"
canvas.create_text(
window_width // 2,
window_height // 2,
text=debug_text,
fill='gray',
font=('Arial', 12),
justify='center'
)
except Exception as e:
if "pyimage" not in str(e).lower():
log_throttle("large_update_error", 1.0, f"更新大窗口错误: {e}")
# 放大視窗維持原本高頻刷新
win.after(33, update_large_once)
win.after(33, update_large_once)
def on_close():
try:
if idx in self.large_windows:
del self.large_windows[idx]
finally:
win.destroy()
win.protocol("WM_DELETE_WINDOW", on_close)
def run(self, master=None):
"""运行预览支持主Tk弹窗模式"""
self.init_cameras()
if not self.caps:
print("⚠️ 没有可用的采集卡,将显示错误提示窗口")
display = self.config.get('display', {})
multi_window = bool(display.get('preview_multi_window', False))
if self.caps:
capture_thread = threading.Thread(target=self.capture_frames, daemon=True)
capture_thread.start()
import time
time.sleep(0.5)
if multi_window:
root = tk.Toplevel(master) if master else tk.Tk()
root.title("采集卡预览 - 多窗口模式")
root.geometry("300x80")
tk.Label(root, text="已打开多窗口预览,可单独拖动/调整大小").pack(pady=20)
print("等待采集卡初始化...")
max_wait = 50
wait_count = 0
while wait_count < max_wait and not any(frame is not None for frame in self.frames.values()):
time.sleep(0.1)
wait_count += 1
if wait_count >= max_wait:
print("⚠️ 采集卡初始化超时,可能无法显示画面")
for idx in list(self.caps.keys()):
print(f"打开采集卡 {idx} 的预览窗口...")
self.show_large_window(idx)
def on_root_close():
self.running = False
for data in self.caps.values():
data['cap'].release()
try:
root.destroy()
except Exception:
pass
root.protocol("WM_DELETE_WINDOW", on_root_close)
# 仅在独立窗口无master时进入事件循环
if master is None:
root.mainloop()
else:
self.create_grid_window(master)
if __name__ == "__main__":
preview = PreviewWindow()
preview.run()