采集卡bug修复

This commit is contained in:
Ray
2025-10-29 15:34:28 +08:00
parent bcc971d528
commit 3a8873acc2
6 changed files with 738 additions and 56 deletions

View File

@@ -10,7 +10,8 @@ class ConfigGUI:
def __init__(self): def __init__(self):
self.root = tk.Tk() self.root = tk.Tk()
self.root.title("配置管理 - 火炬之光自动化") self.root.title("配置管理 - 火炬之光自动化")
self.root.geometry("800x600") self.root.geometry("1000x720")
self.root.minsize(900, 600) # 设置最小尺寸
self.selected_index = 0 self.selected_index = 0
self.setup_ui() self.setup_ui()
@@ -102,7 +103,13 @@ class ConfigGUI:
ttk.Button(save_frame, text="保存配置", command=self.save_config).pack(side=tk.LEFT, padx=5) ttk.Button(save_frame, text="保存配置", command=self.save_config).pack(side=tk.LEFT, padx=5)
ttk.Button(save_frame, text="启动预览", command=self.start_preview).pack(side=tk.LEFT, padx=5) ttk.Button(save_frame, text="启动预览", command=self.start_preview).pack(side=tk.LEFT, padx=5)
ttk.Button(save_frame, text="启动程序", command=self.start_program).pack(side=tk.LEFT, padx=5)
# 启动程序按钮组
start_frame = ttk.Frame(right_frame)
start_frame.pack(fill=tk.X, pady=5)
ttk.Button(start_frame, text="启动单个配置组", command=self.start_program).pack(side=tk.LEFT, padx=5)
ttk.Button(start_frame, text="启动多个配置组", command=self.start_multi_program).pack(side=tk.LEFT, padx=5)
def create_entry(self, parent, key, label, prefix=None): def create_entry(self, parent, key, label, prefix=None):
"""创建输入框""" """创建输入框"""
@@ -192,22 +199,57 @@ class ConfigGUI:
def scan_cameras(self, max_index: int = 10): def scan_cameras(self, max_index: int = 10):
"""扫描系统可用的采集卡索引,并填充下拉框""" """扫描系统可用的采集卡索引,并填充下拉框"""
import warnings
found = [] found = []
for idx in range(max_index + 1):
cap = cv2.VideoCapture(idx, cv2.CAP_DSHOW) # 临时设置OpenCV日志级别
if not cap.isOpened(): import os
# 回退默认后端再试 old_level = os.environ.get('OPENCV_LOG_LEVEL', '')
cap = cv2.VideoCapture(idx) os.environ['OPENCV_LOG_LEVEL'] = 'ERROR'
if cap.isOpened(): cv2.setLogLevel(cv2.LOG_LEVEL_ERROR)
found.append(str(idx))
cap.release() try:
for idx in range(max_index + 1):
cap = None
try:
with warnings.catch_warnings():
warnings.filterwarnings('ignore')
# 尝试DSHOW后端
cap = cv2.VideoCapture(idx, cv2.CAP_DSHOW)
if not cap.isOpened():
# 回退默认后端再试
cap = cv2.VideoCapture(idx)
if cap.isOpened():
# 测试读取一帧,确保真正可用
ret, test_frame = cap.read()
if ret and test_frame is not None:
found.append(str(idx))
cap.release()
except Exception:
if cap:
try:
cap.release()
except:
pass
continue
finally:
# 恢复原来的日志级别
if old_level:
os.environ['OPENCV_LOG_LEVEL'] = old_level
else:
os.environ.pop('OPENCV_LOG_LEVEL', None)
if not found: if not found:
found = ["0"] # 至少给一个默认项,避免为空 found = ["0"] # 至少给一个默认项,避免为空
messagebox.showwarning("扫描完成", "未发现可用采集卡,已添加默认选项 0")
else:
messagebox.showinfo("扫描完成", f"发现可用采集卡索引: {', '.join(found)}")
self.camera_index_cb['values'] = found self.camera_index_cb['values'] = found
# 若当前无选择,则选择第一项 # 若当前无选择,则选择第一项
if not self.camera_index_var.get() and found: if not self.camera_index_var.get() and found:
self.camera_index_cb.set(found[0]) self.camera_index_cb.set(found[0])
messagebox.showinfo("扫描完成", f"发现可用采集卡索引: {', '.join(found)}")
def scan_ports(self): def scan_ports(self):
"""扫描系统可用的串口,并填充下拉框""" """扫描系统可用的串口,并填充下拉框"""
@@ -375,12 +417,55 @@ class ConfigGUI:
return result return result
def start_program(self): def start_program(self):
"""启动主程序""" """启动单个配置组的主程序"""
# 保存配置 # 保存配置(静默)
if not self.save_config(): if not self.save_config_silent():
messagebox.showerror("错误", "配置保存失败")
return return
messagebox.showinfo("提示", "配置已保存,请运行主程序") # 检查是否有活动配置组
active_groups = [g for g in config_manager.config['groups'] if g.get('active', False)]
if not active_groups:
messagebox.showwarning("警告", "没有活动的配置组\n\n请先选择一个配置组并设置为活动")
return
# 启动单个配置组
import subprocess
import sys
try:
# 找到活动配置组的索引
active_group = active_groups[0]
group_index = config_manager.config['groups'].index(active_group)
subprocess.Popen([
sys.executable,
"main_single.py",
str(group_index)
], creationflags=subprocess.CREATE_NEW_CONSOLE if sys.platform == 'win32' else 0)
messagebox.showinfo("成功", f"已启动配置组: {active_group['name']}\n\n请在控制台查看运行状态")
except Exception as e:
messagebox.showerror("错误", f"启动失败: {e}")
def start_multi_program(self):
"""启动多个配置组的主程序"""
# 保存配置(静默)
if not self.save_config_silent():
messagebox.showerror("错误", "配置保存失败")
return
# 启动多配置组管理器
import subprocess
import sys
try:
subprocess.Popen([
sys.executable,
"main_multi.py"
], creationflags=subprocess.CREATE_NEW_CONSOLE if sys.platform == 'win32' else 0)
messagebox.showinfo("提示", "多配置组启动器已打开\n\n请在控制台中选择要启动的配置组")
except Exception as e:
messagebox.showerror("错误", f"启动失败: {e}")
def run(self): def run(self):
"""运行GUI""" """运行GUI"""

View File

@@ -22,7 +22,7 @@ def create_launcher():
"""创建启动器界面""" """创建启动器界面"""
root = tk.Tk() root = tk.Tk()
root.title("火炬之光自动化 - 启动器") root.title("火炬之光自动化 - 启动器")
root.geometry("400x300") root.geometry("400x380")
root.resizable(False, False) root.resizable(False, False)
# 标题 # 标题
@@ -49,10 +49,21 @@ def create_launcher():
preview_btn.pack(pady=10) preview_btn.pack(pady=10)
# 主程序按钮 # 主程序按钮
main_btn = tk.Button(button_frame, text="🚀 启动自动化", main_btn = tk.Button(button_frame, text="🚀 启动单个配置组",
command=start_main_program, command=start_main_program,
width=30, height=3, font=('Arial', 12)) width=30, height=2, font=('Arial', 11))
main_btn.pack(pady=10) main_btn.pack(pady=5)
def start_multi_program():
"""启动多配置组程序"""
subprocess.Popen([sys.executable, "main_multi.py"],
creationflags=subprocess.CREATE_NEW_CONSOLE if sys.platform == 'win32' else 0)
# 多配置组启动按钮
multi_btn = tk.Button(button_frame, text="🔥 启动多个配置组",
command=start_multi_program,
width=30, height=2, font=('Arial', 11))
multi_btn.pack(pady=5)
# 说明 # 说明
info_label = tk.Label(root, info_label = tk.Label(root,

145
main_multi.py Normal file
View File

@@ -0,0 +1,145 @@
"""
多配置组启动器
支持同时启动多个配置组的自动化程序
"""
import multiprocessing
import sys
from config import config_manager
from main_single import run_automation_for_group
def main():
"""主函数"""
# 重新加载配置
config_manager.load_config()
# 获取所有配置组
groups = config_manager.config.get('groups', [])
if not groups:
print("❌ 没有找到任何配置组")
print("请先运行 gui_config.py 创建配置组")
return
# 询问要启动哪些配置组
print("=" * 60)
print("🔥 多配置组启动器")
print("=" * 60)
print("\n可用配置组:")
for i, group in enumerate(groups):
active_mark = "" if group.get('active', False) else " "
print(f" [{i}] {active_mark} {group['name']}")
print(f" 串口: {group['serial_port']} | 采集卡: {group['camera_index']}")
print("\n选择启动方式:")
print(" 1. 启动所有活动配置组")
print(" 2. 启动所有配置组")
print(" 3. 选择特定配置组")
print(" 0. 退出")
choice = input("\n请选择 (0-3): ").strip()
selected_indices = []
if choice == "0":
print("👋 退出")
return
elif choice == "1":
# 启动所有活动配置组
selected_indices = [i for i, g in enumerate(groups) if g.get('active', False)]
if not selected_indices:
print("❌ 没有活动的配置组")
return
print(f"\n✅ 将启动 {len(selected_indices)} 个活动配置组")
elif choice == "2":
# 启动所有配置组
selected_indices = list(range(len(groups)))
print(f"\n✅ 将启动所有 {len(selected_indices)} 个配置组")
elif choice == "3":
# 选择特定配置组
indices_input = input("请输入要启动的配置组索引(用逗号分隔,如: 0,1,2): ").strip()
try:
selected_indices = [int(x.strip()) for x in indices_input.split(',')]
# 验证索引有效性
selected_indices = [i for i in selected_indices if 0 <= i < len(groups)]
if not selected_indices:
print("❌ 没有有效的配置组索引")
return
print(f"\n✅ 将启动 {len(selected_indices)} 个配置组")
except ValueError:
print("❌ 输入格式错误")
return
else:
print("❌ 无效选择")
return
# 显示将要启动的配置组
print("\n将要启动的配置组:")
for idx in selected_indices:
group = groups[idx]
print(f"{group['name']} (串口:{group['serial_port']}, 采集卡:{group['camera_index']})")
confirm = input("\n确认启动? (y/n): ").strip().lower()
if confirm != 'y':
print("❌ 取消启动")
return
# 启动多进程
print("\n🚀 开始启动多个配置组...")
processes = []
for idx in selected_indices:
group = groups[idx]
print(f"启动进程: {group['name']}...")
process = multiprocessing.Process(
target=run_automation_for_group,
args=(idx,),
name=f"Group-{idx}-{group['name']}"
)
process.start()
processes.append((idx, group['name'], process))
print(f"{group['name']} 已启动 (PID: {process.pid})")
print(f"\n✅ 成功启动 {len(processes)} 个配置组进程")
print("\n" + "=" * 60)
print("运行状态:")
print("=" * 60)
# 监控进程状态
try:
while True:
alive_count = 0
for idx, name, proc in processes:
if proc.is_alive():
alive_count += 1
else:
print(f"⚠️ {name} 进程已退出 (退出码: {proc.exitcode})")
if alive_count == 0:
print("\n所有进程已退出")
break
import time
time.sleep(2)
# 打印存活状态
alive_names = [name for idx, name, proc in processes if proc.is_alive()]
if alive_names:
print(f"\r📊 运行中: {', '.join(alive_names)} ({alive_count}/{len(processes)})", end='', flush=True)
except KeyboardInterrupt:
print("\n\n🛑 收到停止信号,正在关闭所有进程...")
for idx, name, proc in processes:
if proc.is_alive():
print(f"正在停止 {name}...")
proc.terminate()
proc.join(timeout=5)
if proc.is_alive():
print(f"强制停止 {name}...")
proc.kill()
print(f"{name} 已停止")
print("\n👋 所有进程已停止")
if __name__ == "__main__":
multiprocessing.freeze_support() # Windows下需要
main()

349
main_single.py Normal file
View File

@@ -0,0 +1,349 @@
"""
单配置组运行的主程序
用于在独立进程中运行单个配置组的自动化
"""
import cv2
from utils.get_image import GetImage
from utils.mouse import init_mouse_keyboard, Mouse_guiji
from ultralytics import YOLO
import time
import serial
import ch9329Comm
import random
import math
from utils import shizi
from config import config_manager
import sys
def run_automation_for_group(group_index):
"""为单个配置组运行自动化"""
# 重新加载配置
config_manager.load_config()
group = config_manager.get_group_by_index(group_index)
if group is None:
print(f"❌ 配置组索引 {group_index} 不存在")
return
print(f"🚀 启动配置组: {group['name']} (索引: {group_index})")
print(f" 串口: {group['serial_port']}")
print(f" 采集卡: {group['camera_index']}")
# 加载YOLO模型
model = YOLO(r"best.pt").to('cuda')
model0 = YOLO(r"best0.pt").to('cuda')
# 初始化串口和鼠标
try:
init_mouse_keyboard(group)
except Exception as e:
print(f"❌ 初始化串口失败: {e}")
return
# 初始化键盘和鼠标
keyboard = ch9329Comm.keyboard.DataComm()
from utils.mouse import mouse
mouse_gui = Mouse_guiji()
# 初始化采集卡
get_image = GetImage(
cam_index=group['camera_index'],
width=group['camera_width'],
height=group['camera_height']
)
# 检查采集卡是否初始化成功
if get_image.cap is None:
print(f"❌ 采集卡 {group['camera_index']} 初始化失败")
return
print(f"✅ 配置组 {group['name']} 初始化完成")
# 全局变量
left = 0
top = 30
k = 0 # 控制转圈的方向
panduan = False # 是否在图内
boss_pd = False # 是否到boss关卡
rw = (632, 378)
v = group['move_velocity'] # 从配置读取移动速度
def yolo_shibie(im_PIL, detections, model):
results = model(im_PIL)
for result in results:
for i in range(len(result.boxes.xyxy)):
left, top, right, bottom = result.boxes.xyxy[i]
scalar_tensor = result.boxes.cls[i]
value = scalar_tensor.item()
label = result.names[int(value)]
if label == 'center' or label == 'next' or label == 'boss' or label == 'zhaozi':
player_x = int(left + (right - left) / 2)
player_y = int(top + (bottom - top) / 2) + 30
RW = [player_x, player_y]
detections[label] = RW
elif label == 'daojv' or label == 'gw':
player_x = int(left + (right - left) / 2)
player_y = int(top + (bottom - top) / 2) + 30
RW = [player_x, player_y]
detections[label].append(RW)
elif label == 'npc1' or label == 'npc2' or label == 'npc3' or label == 'npc4':
player_x = int(left + (right - left) / 2)
player_y = int(bottom) + 30
RW = [player_x, player_y]
detections[label] = RW
return detections
def sq(p1, p2):
"""计算两点之间的欧式距离"""
return math.sqrt((p1[0] - p2[0]) ** 2 + (p1[1] - p2[1]) ** 2)
def process_points(points):
if not points:
return None
n = len(points)
if n == 1:
return points[0]
elif n == 2:
x = (points[0][0] + points[1][0]) / 2
y = (points[0][1] + points[1][1]) / 2
return [x, y]
else:
sample_points = random.sample(points, 3)
min_sum = float('inf')
best_point = None
for p in points:
dist_sum = sum(sq(p, sp) for sp in sample_points)
if dist_sum < min_sum:
min_sum = dist_sum
best_point = p
return best_point
def move_randomly(rw, k):
k = k % 4
suiji_t = float(random.randint(10, 13) / 10)
if k == 0:
keyboard.send_data("66")
time.sleep(suiji_t)
keyboard.release()
elif k == 1:
keyboard.send_data("88")
time.sleep(suiji_t)
keyboard.release()
elif k == 2:
keyboard.send_data("44")
time.sleep(suiji_t)
keyboard.release()
elif k == 3:
keyboard.send_data("22")
time.sleep(suiji_t)
keyboard.release()
return k + 1
def move_to(rw, mb):
"""使用配置的v值"""
nonlocal v
v = group['move_velocity']
if rw[0] >= mb[0]:
keyboard.send_data("44")
time.sleep(float(abs(rw[0] - mb[0]) / v))
keyboard.release()
else:
keyboard.send_data("66")
time.sleep(float(abs(rw[0] - mb[0]) / v))
keyboard.release()
if rw[1] >= mb[1]:
keyboard.send_data("88")
time.sleep(float(abs(rw[1] - mb[1]) / v))
keyboard.release()
else:
keyboard.send_data("22")
time.sleep(float(abs(rw[1] - mb[1]) / v))
keyboard.release()
# 主循环
print(f"🔄 配置组 {group['name']} 开始自动化循环...")
while True:
try:
detections = {
'center': None,
'next': None,
'npc1': None,
'npc2': None,
'npc3': None,
'npc4': None,
'boss': None,
'daojv': [],
'gw': [],
'zhaozi': None
}
im_opencv = get_image.get_frame()
if im_opencv is None:
time.sleep(0.1)
continue
detections = yolo_shibie(im_opencv[1], detections, model)
if shizi.tuwai(im_opencv[0]):
im_opencv = get_image.get_frame()
if im_opencv is None:
time.sleep(0.1)
continue
detections = yolo_shibie(im_opencv[1], detections, model0)
print(f'[{group["name"]}] 当前在城镇中')
if detections['npc1'] is not None and sq(rw, detections['npc1']) > 80:
print(f"[{group['name']}] 向npc1靠近")
move_to(rw, detections['npc1'])
continue
elif detections['npc1'] is not None and sq(rw, detections['npc1']) <= 80:
print(f"[{group['name']}] 在npc旁边向上走")
mb = (detections['npc1'][0], detections['npc1'][1] - 1010)
move_to(rw, mb)
continue
elif detections['npc3'] is not None and detections['npc4'] is None:
print(f"[{group['name']}] 在npc3旁边向右走")
mb = (rw[0], detections['npc3'][1] - 50)
move_to(rw, mb)
mb = (rw[0] + 700, rw[1])
move_to(rw, mb)
continue
elif detections['npc4'] is not None:
if sq(detections['npc4'], rw) < 50:
print(f"[{group['name']}] 离npc4很近 直接进入")
keyboard.send_data("DD")
time.sleep(0.15)
keyboard.release()
time.sleep(1)
im_opencv = get_image.get_frame()
if im_opencv and shizi.daoying(im_opencv[0]):
mouse_gui.send_data_absolute(rw[0], rw[1] - 110, may=1)
time.sleep(1)
continue
else:
print(f"[{group['name']}] 离npc4有点远 点击进入")
move_to(rw, detections['npc4'])
time.sleep(1)
im_opencv = get_image.get_frame()
if im_opencv and shizi.daoying(im_opencv[0]):
mouse_gui.send_data_absolute(rw[0], rw[1] - 110, may=1)
time.sleep(1)
continue
elif shizi.tiaozhan(im_opencv[0]):
print(f'[{group["name"]}] 进入塔4')
mouse_gui.send_data_absolute(left + 1100, top + 600, may=1)
time.sleep(0.3)
mouse_gui.send_data_absolute(left + 433, top + 455, may=1)
panduan = True
continue
elif shizi.jieshu(im_opencv[0]):
print(f'[{group["name"]}] 结束挑战')
mouse_gui.send_data_absolute(left + 542, top + 644, may=1)
time.sleep(0.8)
mouse_gui.send_data_absolute(left + 706, top + 454, may=1)
continue
elif panduan:
if shizi.shuzi(im_opencv[0]):
boss_pd = True
print(f"[{group['name']}] 进入到boss")
if shizi.fuhuo(im_opencv[0]):
print(f'[{group["name"]}] 点击复活')
mouse_gui.send_data_absolute(left + 536, top + 627, may=1)
mouse_gui.send_data_absolute(rw[0], rw[1], may=0)
continue
if detections['zhaozi'] is not None:
move_to(rw, detections['zhaozi'])
continue
if len(detections['daojv']) != 0:
move_to(rw, process_points(detections['daojv']))
for i in range(3 + len(detections['daojv'])):
keyboard.send_data("AA")
time.sleep(0.15)
keyboard.release()
continue
if shizi.tuichu(im_opencv[0]) and detections['next'] is None and len(detections['daojv']) == 0 and len(detections['gw']) == 0 and boss_pd:
print(f"[{group['name']}] 识别到可以退出挑战!!!!!!!!!!!!!!!!!!")
for i in range(3):
time.sleep(0.5)
im_opencv = get_image.get_frame()
if im_opencv is None:
continue
detections = {
'center': None, 'next': None,
'npc1': None, 'npc2': None, 'npc3': None, 'npc4': None,
'boss': None, 'zhaozi': None,
'daojv': [], 'gw': []
}
detections = yolo_shibie(im_opencv[1], detections, model)
if detections['next'] is not None or len(detections['daojv']) != 0 or len(detections['gw']) != 0 or detections['boss'] is not None:
break
else:
mouse_gui.send_data_absolute(left + 640, top + 40, may=1)
panduan = False
boss_pd = False
time.sleep(2.0)
continue
if detections['center'] is None and detections['next'] is None and (len(detections['gw']) != 0 or detections["boss"] is not None):
if len(detections['gw']) != 0:
move_to(rw, detections['gw'][0])
time.sleep(0.26)
elif detections['boss'] is not None:
boss_suiji1 = random.randint(-30, 30)
boss_suiji2 = random.randint(-30, 30)
detections['boss'] = (detections['boss'][0] + boss_suiji1, detections['boss'][1] + boss_suiji2)
move_to(rw, detections['boss'])
time.sleep(0.7)
continue
elif (detections['center'] is not None and detections['next'] is None and len(detections['gw']) != 0) or (boss_pd == True and detections['center'] is not None and detections['next'] is None):
if detections['center'][0] >= rw[0] and detections['center'][1] < rw[1]:
mb = (rw[0] + 100, rw[1])
elif detections['center'][0] <= rw[0] and detections['center'][1] < rw[1]:
mb = (rw[0], rw[1] - 100)
elif detections['center'][0] <= rw[0] and detections['center'][1] > rw[1]:
mb = (rw[0] - 100, rw[1])
elif detections['center'][0] >= rw[0] and detections['center'][1] > rw[1]:
mb = (rw[0], rw[1] + 100)
move_to(rw, mb)
time.sleep(0.25)
continue
elif boss_pd == True and detections['center'] is None and detections['next'] is None:
k = move_randomly(rw, k)
continue
elif detections['next'] is not None:
print(f'[{group["name"]}] 进入下一层啦啦啦啦啦啦')
panduan = True
move_to(rw, detections['next'])
for i in range(2):
keyboard.send_data("DD")
time.sleep(0.15)
keyboard.release()
continue
else:
k = move_randomly(rw, k)
continue
elif shizi.daoying(im_opencv[0]):
mouse_gui.send_data_absolute(rw[0], rw[1] - 110, may=1)
time.sleep(1)
continue
except KeyboardInterrupt:
print(f"🛑 配置组 {group['name']} 停止运行")
break
except Exception as e:
print(f"❌ 配置组 {group['name']} 运行错误: {e}")
import traceback
traceback.print_exc()
time.sleep(1)
if __name__ == "__main__":
if len(sys.argv) > 1:
group_index = int(sys.argv[1])
run_automation_for_group(group_index)
else:
# 默认运行活动配置组
active_group = config_manager.get_active_group()
if active_group is None:
print("❌ 没有活动的配置组")
sys.exit(1)
group_index = config_manager.config['groups'].index(active_group)
run_automation_for_group(group_index)

View File

@@ -4,8 +4,14 @@ from tkinter import Canvas
from PIL import Image, ImageTk from PIL import Image, ImageTk
import threading import threading
import numpy as np import numpy as np
import warnings
import os
from config import config_manager from config import config_manager
# 抑制OpenCV的警告信息
os.environ['OPENCV_LOG_LEVEL'] = 'ERROR'
cv2.setLogLevel(cv2.LOG_LEVEL_ERROR)
class PreviewWindow: class PreviewWindow:
"""采集卡预览窗口""" """采集卡预览窗口"""
def __init__(self): def __init__(self):
@@ -31,31 +37,60 @@ class PreviewWindow:
cam_idx = group['camera_index'] cam_idx = group['camera_index']
print(f" 尝试打开采集卡 {cam_idx} ({group['name']})...") print(f" 尝试打开采集卡 {cam_idx} ({group['name']})...")
cap = cv2.VideoCapture(int(cam_idx), cv2.CAP_DSHOW) cap = None
if not cap.isOpened(): # 尝试多种后端打开
print(f" DSHOW模式失败尝试默认模式...") backends_to_try = [
cap = cv2.VideoCapture(int(cam_idx)) (int(cam_idx), cv2.CAP_DSHOW),
(int(cam_idx), cv2.CAP_ANY),
(int(cam_idx), None),
]
if cap.isOpened(): for idx, backend in backends_to_try:
cap.set(cv2.CAP_PROP_FRAME_WIDTH, group['camera_width']) try:
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, group['camera_height']) with warnings.catch_warnings():
# 测试读取一帧 warnings.filterwarnings('ignore')
ret, test_frame = cap.read() if backend is not None:
if ret: cap = cv2.VideoCapture(idx, backend)
self.caps[i] = { else:
'cap': cap, cap = cv2.VideoCapture(idx)
'group': group,
'name': group['name'] if cap.isOpened():
} # 测试读取一帧
loaded_count += 1 ret, test_frame = cap.read()
print(f" ✅ 采集卡 {cam_idx} 初始化成功") if ret and test_frame is not None:
else: break
cap.release() else:
print(f" ❌ 采集卡 {cam_idx} 无法读取帧") cap.release()
cap = None
except Exception:
if cap:
try:
cap.release()
except:
pass
cap = None
continue
if cap and cap.isOpened():
try:
cap.set(cv2.CAP_PROP_FRAME_WIDTH, group['camera_width'])
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, group['camera_height'])
except Exception as e:
print(f" ⚠️ 设置分辨率失败: {e}")
self.caps[i] = {
'cap': cap,
'group': group,
'name': group['name']
}
loaded_count += 1
print(f" ✅ 采集卡 {cam_idx} 初始化成功")
else: else:
print(f" ❌ 采集卡 {cam_idx} 无法打开") print(f" ❌ 采集卡 {cam_idx} 无法打开")
except Exception as e: except Exception as e:
print(f" ❌ 采集卡 {group.get('camera_index', '?')} 初始化失败: {e}") print(f" ❌ 采集卡 {group.get('camera_index', '?')} 初始化失败: {e}")
import traceback
traceback.print_exc()
if loaded_count == 0: if loaded_count == 0:
print("⚠️ 警告:没有成功加载任何采集卡!") print("⚠️ 警告:没有成功加载任何采集卡!")

View File

@@ -41,26 +41,76 @@ import cv2
# cv2.destroyAllWindows() # cv2.destroyAllWindows()
import threading import threading
import warnings
# 抑制OpenCV的警告信息
import os
os.environ['OPENCV_LOG_LEVEL'] = 'ERROR'
cv2.setLogLevel(cv2.LOG_LEVEL_ERROR)
class GetImage: class GetImage:
def __init__(self, cam_index=0, width=1920, height=1080): def __init__(self, cam_index=0, width=1920, height=1080):
print(f"🔧 正在初始化采集卡 {cam_index}...") print(f"🔧 正在初始化采集卡 {cam_index}...")
self.cap = cv2.VideoCapture(cam_index, cv2.CAP_DSHOW) self.cap = None
if not self.cap.isOpened():
print(f"⚠️ DSHOW模式失败尝试默认模式...")
self.cap = cv2.VideoCapture(cam_index)
if not self.cap.isOpened():
print(f"❌ 无法打开采集卡 {cam_index}")
self.cap = None
return
self.cap.set(cv2.CAP_PROP_FRAME_WIDTH, width)
self.cap.set(cv2.CAP_PROP_FRAME_HEIGHT, height)
self.frame = None self.frame = None
self.running = True self.running = True
self.cam_index = cam_index self.cam_index = cam_index
# 尝试多种方式打开采集卡
backends_to_try = [
(cam_index, cv2.CAP_DSHOW),
(cam_index, cv2.CAP_ANY),
(cam_index, None), # 默认后端
]
for idx, backend in backends_to_try:
try:
with warnings.catch_warnings():
warnings.filterwarnings('ignore', category=UserWarning)
if backend is not None:
self.cap = cv2.VideoCapture(idx, backend)
else:
self.cap = cv2.VideoCapture(idx)
if self.cap.isOpened():
# 测试读取一帧
ret, test_frame = self.cap.read()
if ret and test_frame is not None:
print(f"✅ 采集卡 {cam_index} 打开成功")
break
else:
self.cap.release()
self.cap = None
except Exception as e:
if self.cap:
try:
self.cap.release()
except:
pass
self.cap = None
continue
if self.cap is None or not self.cap.isOpened():
print(f"❌ 无法打开采集卡 {cam_index}")
print("请检查:")
print(" 1. 采集卡是否正确连接")
print(" 2. 采集卡索引是否正确(尝试扫描采集卡)")
print(" 3. 采集卡驱动是否安装")
print(" 4. 采集卡是否被其他程序占用")
self.cap = None
return
# 设置分辨率
try:
self.cap.set(cv2.CAP_PROP_FRAME_WIDTH, width)
self.cap.set(cv2.CAP_PROP_FRAME_HEIGHT, height)
# 实际获取设置后的分辨率
actual_width = int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH))
actual_height = int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
print(f" 分辨率设置: {width}x{height} -> 实际: {actual_width}x{actual_height}")
except Exception as e:
print(f"⚠️ 设置分辨率失败: {e}")
# 启动更新线程 # 启动更新线程
threading.Thread(target=self.update, daemon=True).start() threading.Thread(target=self.update, daemon=True).start()
@@ -71,11 +121,18 @@ class GetImage:
def update(self): def update(self):
while self.running and self.cap is not None: while self.running and self.cap is not None:
ret, frame = self.cap.read() try:
if ret: ret, frame = self.cap.read()
self.frame = frame if ret and frame is not None:
else: self.frame = frame
print(f"⚠️ 采集卡 {self.cam_index} 读取失败") else:
# 读取失败时不打印,避免刷屏
pass
except Exception as e:
# 只在异常时打印错误
print(f"⚠️ 采集卡 {self.cam_index} 读取异常: {e}")
import time
time.sleep(0.1) # 出错时短暂延迟
def get_frame(self): def get_frame(self):
if self.cap is None or self.frame is None: if self.cap is None or self.frame is None: