Files
huojv/preview.py
2025-10-29 18:11:25 +08:00

703 lines
32 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import cv2
import tkinter as tk
from tkinter import Canvas
from PIL import Image, ImageTk
import threading
import numpy as np
import warnings
import os
from config import config_manager
# 抑制OpenCV的警告信息兼容不同版本
import sys
import io
os.environ['OPENCV_LOG_LEVEL'] = 'SILENT'
os.environ['OPENCV_IO_ENABLE_OPENEXR'] = '0'
try:
if hasattr(cv2, 'setLogLevel'):
if hasattr(cv2, 'LOG_LEVEL_SILENT'):
cv2.setLogLevel(cv2.LOG_LEVEL_SILENT)
elif hasattr(cv2, 'LOG_LEVEL_ERROR'):
cv2.setLogLevel(cv2.LOG_LEVEL_ERROR)
elif hasattr(cv2, 'utils'):
cv2.utils.setLogLevel(0)
except Exception:
pass
class PreviewWindow:
"""采集卡预览窗口"""
def __init__(self):
self.config = config_manager.config
self.caps = {}
self.frames = {}
self.large_window = None
self.running = True
def init_cameras(self):
"""初始化所有相机"""
print("🔧 开始初始化采集卡...")
loaded_count = 0
# 如果没有活动配置,加载所有配置
active_groups = [g for g in self.config['groups'] if g.get('active', True)]
if not active_groups:
print("⚠️ 没有活动的配置组,将尝试加载所有配置组")
active_groups = self.config['groups']
# 重定向stderr来抑制OpenCV的错误输出
old_stderr = sys.stderr
suppressed_output = io.StringIO()
try:
sys.stderr = suppressed_output
for i, group in enumerate(active_groups):
try:
cam_idx = group['camera_index']
print(f" 尝试打开采集卡 {cam_idx} ({group['name']})...")
cap = None
# 尝试多种后端打开
backends_to_try = [
(int(cam_idx), cv2.CAP_DSHOW),
(int(cam_idx), cv2.CAP_ANY),
(int(cam_idx), None),
]
for idx, backend in backends_to_try:
try:
with warnings.catch_warnings():
warnings.filterwarnings('ignore')
if backend is not None:
cap = cv2.VideoCapture(idx, backend)
else:
cap = cv2.VideoCapture(idx)
if cap.isOpened():
# 测试读取一帧
ret, test_frame = cap.read()
if ret and test_frame is not None:
break
else:
cap.release()
cap = None
except Exception:
if cap:
try:
cap.release()
except:
pass
cap = None
continue
if cap and cap.isOpened():
try:
cap.set(cv2.CAP_PROP_FRAME_WIDTH, group['camera_width'])
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, group['camera_height'])
except Exception as e:
print(f" ⚠️ 设置分辨率失败: {e}")
self.caps[i] = {
'cap': cap,
'group': group,
'name': group['name']
}
loaded_count += 1
print(f" ✅ 采集卡 {cam_idx} 初始化成功")
else:
print(f" ❌ 采集卡 {cam_idx} 无法打开")
except Exception as e:
print(f" ❌ 采集卡 {group.get('camera_index', '?')} 初始化失败: {e}")
import traceback
traceback.print_exc()
finally:
# 恢复stderr
sys.stderr = old_stderr
if loaded_count == 0:
print("⚠️ 警告:没有成功加载任何采集卡!")
print("请检查:")
print("1. 采集卡是否正确连接")
print("2. 采集卡索引是否正确")
print("3. 是否有活动的配置组")
else:
print(f"✅ 成功加载 {loaded_count} 个采集卡")
def capture_frames(self):
"""每5秒截取一张图"""
import time
first_capture = True # 第一次立即截取
while self.running:
if first_capture:
first_capture = False
# 第一次立即截取,不等待
pass
else:
# 之后每5秒截取一次
time.sleep(5.0)
for idx, data in self.caps.items():
try:
ret, frame = data['cap'].read()
if ret and frame is not None:
# 裁剪到配置的区域
height, width = frame.shape[:2]
crop_top = 30
crop_bottom = min(crop_top + 720, height)
crop_width = min(1280, width)
# 确保裁剪范围有效
if crop_bottom > crop_top and crop_width > 0:
frame = frame[crop_top:crop_bottom, 0:crop_width]
# 转换颜色空间
frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
self.frames[idx] = frame_rgb
print(f"📸 采集卡 {idx} 截图已更新")
else:
print(f"⚠️ 采集卡 {idx} 裁剪参数无效")
else:
# 读取失败,清除旧帧
if idx in self.frames:
self.frames[idx] = None
print(f"⚠️ 采集卡 {idx} 读取失败")
except Exception as e:
print(f"捕获帧 {idx} 错误: {e}")
import traceback
traceback.print_exc()
def create_grid_window(self):
"""创建网格窗口"""
# 重新加载配置,确保获取最新值
config_manager.load_config()
self.config = config_manager.config
# 获取显示配置
display = self.config.get('display', {})
preview_width = display.get('preview_width', 1000)
preview_height = display.get('preview_height', 700)
columns = display.get('preview_columns', 2)
rows = display.get('preview_rows', 2)
# 验证配置值是否有效(防止读取到无效的小值)
if preview_width < 100 or preview_height < 100:
print(f"⚠️ 警告: 配置中的预览尺寸无效 ({preview_width}x{preview_height}),使用默认值")
preview_width = 1000
preview_height = 700
if columns < 1 or columns > 10:
columns = 2
if rows < 1 or rows > 10:
rows = 2
# 调试输出配置值
print(f"📐 预览窗口配置: {preview_width}x{preview_height}, 网格: {columns}x{rows}")
root = tk.Tk()
root.title("采集卡预览每5秒更新- 点击放大")
root.geometry(f"{preview_width}x{preview_height}")
root.update_idletasks() # 立即更新窗口尺寸
canvas = Canvas(root, bg='black', width=preview_width, height=preview_height)
canvas.pack(fill=tk.BOTH, expand=True)
# 存储图像对象使用列表保存所有PhotoImage引用防止GC
self.photo_objects_list = []
self.photo_objects = {} # 按索引映射
self.canvas_image_items = {} # 保存canvas中的图像项ID用于更新而不是删除重建
# 用于控制调试输出(只打印前几次)
self.debug_count = 0
def update_frames_once():
"""在主线程中更新一帧使用after循环"""
if not self.running:
return
try:
# 如果没有加载任何采集卡,显示提示
if not self.caps:
canvas.delete("all")
canvas.create_text(
root.winfo_width() // 2,
root.winfo_height() // 2,
text="未找到可用的采集卡\n\n请检查:\n1. 采集卡是否正确连接\n2. 配置组是否有活动的采集卡\n3. 采集卡索引是否正确",
fill='yellow',
font=('Arial', 14),
justify=tk.CENTER
)
root.after(33, update_frames_once)
return
# 重新读取配置,确保获取最新值(修复配置读取问题)
config_manager.load_config()
display = config_manager.config.get('display', {})
current_preview_width = display.get('preview_width', 1000)
current_preview_height = display.get('preview_height', 700)
current_columns = display.get('preview_columns', 2)
current_rows = display.get('preview_rows', 2)
# 验证配置值是否有效
if current_preview_width < 100 or current_preview_height < 100:
current_preview_width = 1000
current_preview_height = 700
if current_columns < 1 or current_columns > 10:
current_columns = 2
if current_rows < 1 or current_rows > 10:
current_rows = 2
# 计算每个预览窗口的位置和大小
# 直接使用配置值作为画布尺寸
canvas_width = current_preview_width
canvas_height = current_preview_height
# 尝试获取实际的窗口尺寸,如果有效则使用(大于配置值说明可能被手动调整了)
try:
root.update_idletasks()
actual_width = root.winfo_width()
actual_height = root.winfo_height()
# 只有在获取到合理的尺寸时才使用大于100像素
if actual_width > 100 and actual_height > 100:
canvas_width = actual_width
canvas_height = actual_height
except:
pass # 如果获取失败,使用配置值
cell_width = max(10, canvas_width // current_columns)
cell_height = max(10, canvas_height // current_rows)
# 先收集所有需要显示的图像
images_to_draw = []
texts_to_draw = []
frame_idx = 0
# 确保至少有一些采集卡数据
if not self.caps:
texts_to_draw.append((canvas_width // 2, canvas_height // 2, "未找到采集卡", 'red'))
for idx in self.caps.keys():
row = frame_idx // current_columns
col = frame_idx % current_columns
x = col * cell_width
y = row * cell_height
center_x = x + cell_width // 2
center_y = y + cell_height // 2
name = self.caps[idx]['name']
# 检查帧数据
has_frame = idx in self.frames and self.frames[idx] is not None
if has_frame:
# 调整图像大小
try:
frame = self.frames[idx]
h, w = frame.shape[:2]
# 确保尺寸有效
if w <= 0 or h <= 0:
texts_to_draw.append((center_x, center_y, f"{name}\n尺寸无效", 'red'))
frame_idx += 1
continue
# 计算缩放比例,留一些边距
scale = min(cell_width / w, cell_height / h, 1.0) * 0.85
new_w = max(1, int(w * scale))
new_h = max(1, int(h * scale))
# 确保缩放后的尺寸有效
if new_w <= 0 or new_h <= 0:
texts_to_draw.append((center_x, center_y, f"{name}\n缩放失败", 'red'))
frame_idx += 1
continue
# 调试输出(仅前几次,避免刷屏)
if frame_idx == 0 and len(images_to_draw) == 0:
self.debug_count += 1
if self.debug_count <= 3: # 只打印前3次
print(f"🔍 预览调试 #{self.debug_count}:")
print(f" 配置尺寸: {current_preview_width}x{current_preview_height}")
print(f" 实际画布: {canvas_width}x{canvas_height}")
print(f" 单元格大小: {cell_width}x{cell_height}")
print(f" 原始帧: {w}x{h}")
print(f" 缩放后: {new_w}x{new_h}")
resized_frame = cv2.resize(frame, (new_w, new_h))
# 转换为PIL图像
# 确保颜色通道正确BGR -> RGB
if len(resized_frame.shape) == 3 and resized_frame.shape[2] == 3:
resized_frame = cv2.cvtColor(resized_frame, cv2.COLOR_BGR2RGB)
pil_image = Image.fromarray(resized_frame)
photo = ImageTk.PhotoImage(image=pil_image)
# 保存引用(按索引保存,方便查找)
self.photo_objects[idx] = photo
# 添加到绘制列表在列表中保存引用防止GC
images_to_draw.append((photo, center_x, center_y, idx))
texts_to_draw.append((center_x, y + 15, name, 'white'))
frame_idx += 1
except Exception as e:
# 忽略pyimage相关错误避免刷屏
if "pyimage" not in str(e).lower():
print(f"处理帧 {idx} 错误: {e}")
import traceback
traceback.print_exc()
# 如果处理失败,显示等待提示
texts_to_draw.append((center_x, center_y, f"{name}\n处理失败", 'red'))
frame_idx += 1
else:
# 显示等待提示
texts_to_draw.append((center_x, center_y, f"{name}\n等待画面...", 'gray'))
frame_idx += 1
# 先收集所有photo对象到列表中确保引用不丢失
photos_current_frame = []
# 先绘制所有图像(底层)
if self.debug_count <= 3:
print(f"📊 绘制状态: {len(images_to_draw)} 个图像, {len(texts_to_draw)} 个文本, 画布={canvas_width}x{canvas_height}")
if images_to_draw:
# 删除旧的文本和分割线但保留图像项这样PhotoImage引用不会被释放
# 只删除文本项和线条项
for item_id in list(canvas.find_all()):
item_tags = canvas.gettags(item_id)
item_type = canvas.type(item_id)
# 删除文本和线条,保留图像
if item_type in ['text', 'line']:
try:
canvas.delete(item_id)
except:
pass
# 现在更新或创建图像项
for i, item in enumerate(images_to_draw):
if len(item) == 4:
photo, x, y, idx = item
else:
photo, x, y = item[:3]
idx = None
try:
# 确保坐标在画布范围内
if 0 <= x <= canvas_width and 0 <= y <= canvas_height:
# 先保存引用到列表防止GC
photos_current_frame.append(photo)
# 更新或创建图像项
if idx in self.canvas_image_items:
# 更新现有图像项
try:
item_id = self.canvas_image_items[idx]
canvas.coords(item_id, x, y)
canvas.itemconfig(item_id, image=photo)
if self.debug_count <= 3 and i == 0:
print(f" 更新图像 #{i} (idx={idx}) 到位置 ({x}, {y})")
except:
# 如果更新失败,删除旧的并创建新的
try:
canvas.delete(self.canvas_image_items[idx])
except:
pass
item_id = canvas.create_image(x, y, image=photo, anchor='center')
self.canvas_image_items[idx] = item_id
if self.debug_count <= 3 and i == 0:
print(f" 重新创建图像 #{i} (idx={idx}) 到位置 ({x}, {y})")
else:
# 创建新图像项
item_id = canvas.create_image(x, y, image=photo, anchor='center')
self.canvas_image_items[idx] = item_id
if self.debug_count <= 3 and i == 0:
print(f" 创建图像 #{i} (idx={idx}) 到位置 ({x}, {y})")
else:
if self.debug_count <= 3:
print(f" ⚠️ 图像 #{i} 位置 ({x}, {y}) 超出画布范围")
except Exception as e:
error_msg = str(e).lower()
print(f" ❌ 绘制图像 #{i} 时出错: {type(e).__name__}: {e}")
import traceback
traceback.print_exc()
# 删除不再存在的图像项
current_indices = set(idx for item in images_to_draw if len(item) >= 4 for idx in [item[3]])
for idx in list(self.canvas_image_items.keys()):
if idx not in current_indices:
try:
canvas.delete(self.canvas_image_items[idx])
del self.canvas_image_items[idx]
except:
pass
# 保存当前帧的所有photo引用
self.photo_objects_list = photos_current_frame[:]
else:
# 如果没有图像,清空所有并显示提示
canvas.delete("all")
self.canvas_image_items.clear()
if self.debug_count <= 3:
print(" ⚠️ 没有图像可绘制")
canvas.create_text(
canvas_width // 2,
canvas_height // 2,
text="等待画面...\n\n如果长时间无画面,请检查:\n1. 采集卡是否正常工作\n2. 配置是否正确",
fill='yellow',
font=('Arial', 14),
justify=tk.CENTER
)
# 再绘制所有文本(上层)
for x, y, text, color in texts_to_draw:
try:
if 0 <= x <= canvas_width and 0 <= y <= canvas_height:
if color == 'white':
canvas.create_text(x, y, text=text, fill=color, font=('Arial', 10, 'bold'))
else:
canvas.create_text(x, y, text=text, fill=color, font=('Arial', 10))
except Exception as e:
print(f"绘制文本错误: {e}")
# 绘制更新时间提示(右上角)
try:
import datetime
update_time = datetime.datetime.now().strftime("%H:%M:%S")
canvas.create_text(
canvas_width - 10, 15,
text=f"最后更新: {update_time}",
fill='gray',
font=('Arial', 8),
anchor='ne'
)
except:
pass
# 强制更新画布显示
canvas.update_idletasks()
# 绘制分割线,区分不同的采集卡窗口
try:
# 绘制垂直分割线(列之间的分割线)
for col in range(1, current_columns):
x = col * cell_width
canvas.create_line(
x, 0,
x, canvas_height,
fill='white',
width=2,
dash=(5, 5) # 虚线效果,让分割线更明显
)
# 绘制水平分割线(行之间的分割线)
for row in range(1, current_rows):
y = row * cell_height
canvas.create_line(
0, y,
canvas_width, y,
fill='white',
width=2,
dash=(5, 5) # 虚线效果
)
except Exception as e:
print(f"绘制分割线错误: {e}")
except Exception as e:
print(f"更新帧错误: {e}")
import traceback
traceback.print_exc()
# 每1秒检查一次是否有新截图截图在另一个线程每5秒更新
root.after(1000, update_frames_once)
def on_canvas_click(event):
"""点击画布事件"""
# 重新读取配置以获取最新的columns和rows
config_manager.load_config()
display = config_manager.config.get('display', {})
click_columns = display.get('preview_columns', 2)
click_rows = display.get('preview_rows', 2)
window_width = root.winfo_width() if root.winfo_width() > 1 else preview_width
window_height = root.winfo_height() if root.winfo_height() > 1 else preview_height
cell_width = window_width // click_columns
cell_height = window_height // click_rows
col = int(event.x // cell_width)
row = int(event.y // cell_height)
index = row * click_columns + col
# 找到对应的配置
if index < len(self.caps):
idx = list(self.caps.keys())[index]
self.show_large_window(idx)
canvas.bind('<Button-1>', on_canvas_click)
# 等待窗口完全初始化后再开始更新
def start_updates():
"""延迟启动更新,确保窗口已完全显示"""
# 强制更新窗口尺寸
root.update_idletasks()
root.update()
# 等待窗口完全绘制
import time
time.sleep(0.2) # 给窗口更多时间初始化
root.update_idletasks()
root.update()
update_frames_once()
# 使用after在主线程中循环刷新延迟启动给足够时间让窗口初始化
root.after(200, start_updates)
def on_closing():
"""关闭窗口"""
self.running = False
for data in self.caps.values():
data['cap'].release()
root.destroy()
root.protocol("WM_DELETE_WINDOW", on_closing)
root.mainloop()
def show_large_window(self, idx):
"""显示大窗口(实时显示指定采集卡)"""
if self.large_window is not None and self.large_window.winfo_exists():
self.large_window.destroy()
self.large_window = tk.Toplevel()
self.large_window.title(f"放大视图 - {self.caps[idx]['name']} (实时)")
self.large_window.geometry("1280x720")
canvas = Canvas(self.large_window, bg='black')
canvas.pack(fill=tk.BOTH, expand=True)
photo_obj = {}
canvas_image_item = None # 保存canvas中的图像项ID
def update_large_once():
if not self.running or not self.large_window.winfo_exists():
return
try:
# 从采集卡实时读取帧(不依赖截图)
if idx not in self.caps:
canvas.delete("all")
canvas.create_text(
640, 360,
text="采集卡已断开",
fill='red',
font=('Arial', 16)
)
self.large_window.after(1000, update_large_once)
return
cap = self.caps[idx]['cap']
ret, frame = cap.read()
# 获取窗口大小
window_width = self.large_window.winfo_width() if self.large_window.winfo_width() > 1 else 1280
window_height = self.large_window.winfo_height() if self.large_window.winfo_height() > 1 else 720
if ret and frame is not None:
# 裁剪到配置的区域
height, width = frame.shape[:2]
crop_top = 30
crop_bottom = min(crop_top + 720, height)
crop_width = min(1280, width)
# 确保裁剪范围有效
if crop_bottom > crop_top and crop_width > 0:
frame = frame[crop_top:crop_bottom, 0:crop_width]
# 转换颜色空间
frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
h, w = frame_rgb.shape[:2]
# 调整到窗口大小
scale = min(window_width / w, window_height / h)
new_w = int(w * scale)
new_h = int(h * scale)
resized_frame = cv2.resize(frame_rgb, (new_w, new_h))
pil_image = Image.fromarray(resized_frame)
photo = ImageTk.PhotoImage(image=pil_image)
# 保存引用到字典确保不被GC
photo_obj['img'] = photo
# 更新或创建图像项
if canvas_image_item is not None:
try:
# 更新现有图像项
canvas.coords(canvas_image_item, window_width // 2, window_height // 2)
canvas.itemconfig(canvas_image_item, image=photo)
except:
# 如果更新失败,删除旧的并创建新的
try:
canvas.delete(canvas_image_item)
except:
pass
canvas_image_item = canvas.create_image(
window_width // 2,
window_height // 2,
image=photo,
anchor='center'
)
else:
# 创建新图像项
canvas.delete("all") # 首次创建时清空
canvas_image_item = canvas.create_image(
window_width // 2,
window_height // 2,
image=photo,
anchor='center'
)
else:
canvas.delete("all")
canvas.create_text(
window_width // 2,
window_height // 2,
text="裁剪参数无效",
fill='red',
font=('Arial', 16)
)
else:
# 显示等待提示
canvas.delete("all")
canvas_image_item = None
canvas.create_text(
window_width // 2,
window_height // 2,
text="等待画面...",
fill='gray',
font=('Arial', 16)
)
except Exception as e:
if "pyimage" not in str(e).lower(): # 忽略pyimage错误避免刷屏
print(f"更新大窗口错误: {e}")
# 约30fps实时显示
self.large_window.after(33, update_large_once)
self.large_window.after(33, update_large_once)
def run(self):
"""运行预览"""
self.init_cameras()
if not self.caps:
# 如果没有加载任何采集卡,仍然显示窗口并显示错误信息
print("⚠️ 没有可用的采集卡,将显示错误提示窗口")
# 启动捕获线程
if self.caps:
capture_thread = threading.Thread(target=self.capture_frames, daemon=True)
capture_thread.start()
# 创建并显示网格窗口
import time
time.sleep(0.5) # 等待几帧
self.create_grid_window()
if __name__ == "__main__":
preview = PreviewWindow()
preview.run()