Compare commits

...

19 Commits

Author SHA1 Message Date
Administrator
f4a5a0a584 配置提交 2025-11-13 11:28:08 +08:00
ray
520b8818cd 修复键鼠冲突 2025-11-12 00:48:26 +08:00
Administrator
5ecc0f2bf5 配置提交 2025-11-10 22:45:13 +08:00
Administrator
809d07256a 配置提交 2025-11-10 22:44:54 +08:00
ray
1d0d6d0b9f 增加配置对应 2025-11-06 09:48:32 +08:00
Administrator
aaa795e00d Merge remote-tracking branch 'origin/ui' into ui 2025-11-05 23:52:44 +08:00
Administrator
316710a24c 1 2025-11-05 23:51:06 +08:00
ray
9db83deab6 靠近npc逻辑调整 2025-11-04 15:07:10 +08:00
ray
6337567ddf Merge remote-tracking branch 'origin/ui' into ui 2025-11-04 14:47:38 +08:00
ray
d0e74f49d0 测试文件提交 2025-11-04 14:47:28 +08:00
Administrator
33b88145bd Merge remote-tracking branch 'origin/ui' into ui 2025-11-04 14:39:52 +08:00
ray
e6cd3658d0 测试文件提交 2025-11-04 14:39:33 +08:00
Administrator
a9e5181bce Merge remote-tracking branch 'origin/ui' into ui 2025-11-04 14:33:08 +08:00
Administrator
84c77ae459 1 2025-11-04 14:32:48 +08:00
ray
c5415156ed 测试文件提交 2025-11-04 11:39:07 +08:00
ray
b10333a308 测试文件提交 2025-11-04 11:32:16 +08:00
Administrator
c9b51f225b 1 2025-11-02 21:12:37 +08:00
Ray
73d1f87ec7 Merge remote-tracking branch 'main/ui' into ui
# Conflicts:
#	preview.py
2025-11-01 23:36:41 +08:00
Ray
4a0549114f 小bug修改 2025-11-01 16:32:07 +08:00
15 changed files with 2018 additions and 153 deletions

2
.idea/huojv.iml generated
View File

@@ -2,7 +2,7 @@
<module type="PYTHON_MODULE" version="4">
<component name="NewModuleRootManager">
<content url="file://$MODULE_DIR$" />
<orderEntry type="jdk" jdkName="Python 3.9" jdkType="Python SDK" />
<orderEntry type="jdk" jdkName="Python 3.12 virtualenv at C:\Users\Administrator\Downloads\huojv\huojv\.venv" jdkType="Python SDK" />
<orderEntry type="sourceFolder" forTests="false" />
</component>
</module>

2
.idea/misc.xml generated
View File

@@ -3,5 +3,5 @@
<component name="Black">
<option name="sdkName" value="D:\CONDA\anaconda3" />
</component>
<component name="ProjectRootManager" version="2" project-jdk-name="Python 3.9" project-jdk-type="Python SDK" />
<component name="ProjectRootManager" version="2" project-jdk-name="Python 3.12 virtualenv at C:\Users\Administrator\Downloads\huojv\huojv\.venv" project-jdk-type="Python SDK" />
</project>

BIN
best0.pt

Binary file not shown.

View File

@@ -1,29 +1,39 @@
{
"groups": [
{
"name": "配置组1",
"serial_port": "COM10",
"serial_baudrate": 9600,
"camera_index": 0,
"camera_width": 1920,
"camera_height": 1080,
"move_velocity": 470,
"active": false
},
{
"name": "火炬游戏",
"serial_port": "COM11",
"name": "服务器",
"serial_port": "COM17",
"serial_baudrate": 9600,
"camera_index": 1,
"camera_width": 1920,
"camera_height": 1080,
"move_velocity": 470,
"active": true
},
{
"name": "2号机",
"serial_port": "COM14",
"serial_baudrate": 9600,
"camera_index": 2,
"camera_width": 1920,
"camera_height": 1080,
"move_velocity": 360,
"active": false
},
{
"name": "3号机",
"serial_port": "COM16",
"serial_baudrate": 9600,
"camera_index": 0,
"camera_width": 1920,
"camera_height": 1080,
"move_velocity": 360,
"active": false
}
],
"display": {
"preview_width": 1000,
"preview_height": 700,
"preview_width": 1920,
"preview_height": 1080,
"preview_columns": 2,
"preview_rows": 2,
"show_preview": true,

View File

@@ -214,13 +214,28 @@ class ConfigGUI:
# 确保串口下拉框列表与当前值一致
if group and 'serial_port' in group:
# 如果当前串口不在选项里,追加
values = list(self.serial_port_cb.cget('values')) if self.serial_port_cb.cget('values') else []
port_value = group['serial_port']
if port_value not in values:
values.append(port_value)
self.serial_port_cb['values'] = values
self.serial_port_cb.set(port_value)
# 如果当前值不在列表中,尝试通过设备名匹配
matched = False
if hasattr(self, 'port_display_map'):
for display_text, device in self.port_display_map.items():
if device == port_value:
if display_text not in values:
values.append(display_text)
self.serial_port_cb.set(display_text)
matched = True
break
if not matched:
# 如果找不到匹配,直接添加原始值
if port_value not in values:
values.append(port_value)
if not hasattr(self, 'port_display_map'):
self.port_display_map = {}
self.port_display_map[port_value] = port_value
self.serial_port_cb.set(port_value)
def scan_cameras(self, max_index: int = 10):
"""扫描系统可用的采集卡索引,并填充下拉框"""
@@ -302,23 +317,59 @@ class ConfigGUI:
self.camera_index_cb.set(found[0])
def scan_ports(self):
"""扫描系统可用的串口,并填充下拉框"""
"""扫描系统可用的串口,并填充下拉框(显示设备描述)"""
from utils.device_scanner import scan_serial_ports_with_info
found_real = []
port_display_map = {} # 显示文本 -> 实际设备名
try:
ports = serial.tools.list_ports.comports()
found_real = [port.device for port in ports]
# 按端口名排序
found_real.sort(key=lambda x: int(x.replace('COM', '')) if x.replace('COM', '').isdigit() else 999)
ports_info = scan_serial_ports_with_info()
for port_info in ports_info:
device = port_info['device']
description = port_info.get('description', '')
hwid = port_info.get('hwid', '')
# 显示格式COM3 (设备描述) [HWID]
if description:
display_text = f"{device} ({description})"
else:
display_text = device
# 如果有HWID添加到显示中较短版本
if hwid:
# 提取VID/PID部分如果存在
if 'VID_' in hwid and 'PID_' in hwid:
import re
vid_match = re.search(r'VID_([0-9A-F]+)', hwid)
pid_match = re.search(r'PID_([0-9A-F]+)', hwid)
if vid_match and pid_match:
display_text += f" [{vid_match.group(1)}:{pid_match.group(1)}]"
found_real.append(display_text)
port_display_map[display_text] = device
except Exception as e:
print(f"扫描串口错误: {e}")
# 回退到简单扫描
try:
ports = serial.tools.list_ports.comports()
found_real = [port.device for port in ports]
found_real.sort(key=lambda x: int(x.replace('COM', '')) if x.replace('COM', '').isdigit() else 999)
port_display_map = {p: p for p in found_real}
except:
pass
# 如果没有发现实际端口,使用默认端口列表
if not found_real:
found = ["COM1", "COM2", "COM3", "COM4", "COM5", "COM6"] # 默认给一些常见端口
found = ["COM1", "COM2", "COM3", "COM4", "COM5", "COM6"]
port_display_map = {p: p for p in found}
messagebox.showwarning("警告", "未发现可用串口设备,已添加常用默认选项")
else:
found = found_real
messagebox.showinfo("扫描完成", f"发现可用串口: {', '.join(found)}")
messagebox.showinfo("扫描完成", f"发现 {len(found)} 个串口设备\n\n设备信息已包含描述和硬件ID")
# 保存映射关系,用于后续获取实际设备名
self.port_display_map = port_display_map
self.serial_port_cb['values'] = found
# 若当前无选择,则选择第一项
@@ -390,6 +441,11 @@ class ConfigGUI:
else:
# 字符串字段
value = value_str if value_str else group.get(key, '')
# 特殊处理串口:从显示文本中提取实际设备名
if key == 'serial_port' and hasattr(self, 'port_display_map'):
# 如果值是显示文本,转换为实际设备名
if value in self.port_display_map:
value = self.port_display_map[value]
group[key] = value
except Exception as e:
@@ -467,6 +523,11 @@ class ConfigGUI:
else:
# 字符串字段
value = value_str if value_str else group.get(key, '')
# 特殊处理串口:从显示文本中提取实际设备名
if key == 'serial_port' and hasattr(self, 'port_display_map'):
# 如果值是显示文本,转换为实际设备名
if value in self.port_display_map:
value = self.port_display_map[value]
group[key] = value
except Exception as e:
@@ -500,7 +561,7 @@ class ConfigGUI:
return result
def start_program(self):
"""启动单个配置组的主程序"""
"""启动单个配置组的主程序并自动弹出采集卡预览窗口"""
# 保存配置(静默)
if not self.save_config_silent():
messagebox.showerror("错误", "配置保存失败")
@@ -527,11 +588,13 @@ class ConfigGUI:
], creationflags=subprocess.CREATE_NEW_CONSOLE if sys.platform == 'win32' else 0)
messagebox.showinfo("成功", f"已启动配置组: {active_group['name']}\n\n请在控制台查看运行状态")
# 成功后弹出预览
self.start_preview()
except Exception as e:
messagebox.showerror("错误", f"启动失败: {e}")
def start_multi_program(self):
"""启动多个配置组的主程序"""
"""启动多个配置组的主程序并自动弹出采集卡预览窗口"""
# 保存配置(静默)
if not self.save_config_silent():
messagebox.showerror("错误", "配置保存失败")
@@ -547,6 +610,8 @@ class ConfigGUI:
], creationflags=subprocess.CREATE_NEW_CONSOLE if sys.platform == 'win32' else 0)
messagebox.showinfo("提示", "多配置组启动器已打开\n\n请在控制台中选择要启动的配置组")
# 成功后弹出预览
self.start_preview()
except Exception as e:
messagebox.showerror("错误", f"启动失败: {e}")
@@ -557,4 +622,3 @@ class ConfigGUI:
if __name__ == "__main__":
app = ConfigGUI()
app.run()

View File

@@ -22,7 +22,7 @@ def create_launcher():
"""创建启动器界面"""
root = tk.Tk()
root.title("火炬之光自动化 - 启动器")
root.geometry("400x380")
root.geometry("400x580")
root.resizable(False, False)
# 标题
@@ -67,7 +67,7 @@ def create_launcher():
# 说明
info_label = tk.Label(root,
text="提示:首次使用请先点击"配置管理"设置参数",
text="提示:首次使用请先点击配置管理设置参数",
font=('Arial', 9),
fg='gray')
info_label.pack(side=tk.BOTTOM, pady=10)

153
main.py
View File

@@ -62,6 +62,18 @@ rw = (632, 378)
v = active_group['move_velocity']
def yolo_shibie(im_PIL, detections, model):
detections = {
'center': None,
'next': None,
'npc1': None,
'npc2': None,
'npc3': None,
'npc4': None,
'boss': None,
'daojv': [],
'gw': [],
'zhaozi': None
}
results = model(im_PIL) # 目标检测
for result in results:
for i in range(len(result.boxes.xyxy)):
@@ -186,7 +198,7 @@ while True:
continue
detections = yolo_shibie(im_opencv[1], detections, model)
if shizi.tuwai(im_opencv[0]): # 进图算法
if shizi.tuwai(im_opencv[0]) or yolo_shibie(im_opencv[1], model0)['npc1'] is not None or yolo_shibie(im_opencv[1], model0)['npc2'] is not None or yolo_shibie(im_opencv[1], model0)['npc3'] is not None or yolo_shibie(im_opencv[1], model0)['npc4'] is not None: # 进图算法 # 进图算法
im_opencv = get_image.get_frame() # [RGB,PIL]
if im_opencv is None:
print("⚠️ 无法获取图像帧,跳过本次循环")
@@ -213,8 +225,12 @@ while True:
move_to(rw, mb)
continue
elif detections['npc4'] is not None:
if sq(detections['npc4'], rw) < 50:
print("离npc4很近 直接进入")
npc4_pos = detections['npc4']
current_distance = sq(npc4_pos, rw)
# 如果已经非常接近,直接进入
if current_distance < 30:
print(f"离npc4很近 (距离={current_distance:.1f}),直接进入")
keyboard.send_data("DD")
time.sleep(0.15)
keyboard.release()
@@ -227,18 +243,132 @@ while True:
mouse_gui.send_data_absolute(rw[0], rw[1] - 110, may=1)
time.sleep(1)
continue
else:
print("离npc4有点远 点击进入")
move_to(rw, detections['npc4'])
# 循环靠近npc4直到足够接近或达到最大尝试次数
print(f"开始靠近npc4当前距离={current_distance:.1f}")
max_attempts = 15 # 最大尝试次数
attempt = 0
last_distance = current_distance
stuck_count = 0 # 卡住计数器
while attempt < max_attempts:
attempt += 1
# 重新获取当前位置和npc4位置
im_opencv = get_image.get_frame()
if im_opencv is None:
print("⚠️ 无法获取图像帧")
break
detections_temp = {
'center': None, 'next': None,
'npc1': None, 'npc2': None, 'npc3': None, 'npc4': None,
'boss': None, 'zhaozi': None,
'daojv': [], 'gw': []
}
detections_temp = yolo_shibie(im_opencv[1], detections_temp, model0)
# 如果npc4丢失重新检测
if detections_temp['npc4'] is None:
print(f"⚠️ npc4丢失重新检测 (尝试 {attempt}/{max_attempts})")
time.sleep(0.5)
continue
npc4_pos = detections_temp['npc4']
current_distance = sq(npc4_pos, rw)
print(f" 尝试 {attempt}/{max_attempts}: 距离npc4={current_distance:.1f}")
# 如果已经足够接近,退出循环
if current_distance < 35:
print(f"✅ 已足够接近npc4 (距离={current_distance:.1f})")
break
# 检测是否卡住(距离不再减小)
if abs(current_distance - last_distance) < 5:
stuck_count += 1
if stuck_count >= 3:
print(f"⚠️ 检测到卡住,尝试微调移动")
# 尝试向npc4方向微调
dx = npc4_pos[0] - rw[0]
dy = npc4_pos[1] - rw[1]
# 计算方向向量,但只移动一小段距离
step_size = 50 # 小步移动
distance_to_move = min(step_size, current_distance * 0.3)
if abs(dx) > abs(dy):
# 主要横向移动
target_x = rw[0] + int(dx / abs(dx) * distance_to_move) if dx != 0 else rw[0]
target_y = rw[1]
else:
# 主要纵向移动
target_x = rw[0]
target_y = rw[1] + int(dy / abs(dy) * distance_to_move) if dy != 0 else rw[1]
mb = (target_x, target_y)
move_to(rw, mb)
time.sleep(0.3)
stuck_count = 0
else:
# 正常移动,但使用小步长
step_size = min(80, current_distance * 0.4) # 移动距离为目标距离的40%最多80像素
dx = npc4_pos[0] - rw[0]
dy = npc4_pos[1] - rw[1]
# 归一化方向向量
if abs(dx) > 0.1 or abs(dy) > 0.1:
direction_mag = math.sqrt(dx*dx + dy*dy)
target_x = rw[0] + int(dx / direction_mag * step_size)
target_y = rw[1] + int(dy / direction_mag * step_size)
mb = (target_x, target_y)
move_to(rw, mb)
time.sleep(0.2) # 短暂等待,让角色移动
else:
# 距离在减小,正常移动
stuck_count = 0
step_size = min(100, current_distance * 0.5) # 移动距离为目标距离的50%最多100像素
dx = npc4_pos[0] - rw[0]
dy = npc4_pos[1] - rw[1]
# 归一化方向向量
if abs(dx) > 0.1 or abs(dy) > 0.1:
direction_mag = math.sqrt(dx*dx + dy*dy)
target_x = rw[0] + int(dx / direction_mag * step_size)
target_y = rw[1] + int(dy / direction_mag * step_size)
mb = (target_x, target_y)
move_to(rw, mb)
time.sleep(0.2) # 短暂等待,让角色移动
last_distance = current_distance
# 短暂延迟,让角色位置更新
time.sleep(0.3)
# 移动完成后,检查是否可以进入
final_distance = sq(npc4_pos, rw)
print(f"靠近完成,最终距离={final_distance:.1f}")
if final_distance < 50:
print("距离足够近,尝试进入")
keyboard.send_data("DD")
time.sleep(0.15)
keyboard.release()
time.sleep(1)
im_opencv = get_image.get_frame() # [RGB,PIL]
im_opencv = get_image.get_frame()
if im_opencv is None:
print("⚠️ 无法获取图像帧")
continue
if shizi.daoying(im_opencv[0]):
mouse_gui.send_data_absolute(rw[0], rw[1] - 110, may=1)
time.sleep(1)
continue
else:
print(f"⚠️ 未能足够接近npc4 (距离={final_distance:.1f}),尝试点击进入")
move_to(rw, npc4_pos)
time.sleep(1)
im_opencv = get_image.get_frame()
if im_opencv is None:
print("⚠️ 无法获取图像帧")
continue
if shizi.daoying(im_opencv[0]):
mouse_gui.send_data_absolute(rw[0], rw[1] - 110, may=1)
time.sleep(1)
continue
elif shizi.tiaozhan(im_opencv[0]): # 开启挑战
print('进入塔4')
mouse_gui.send_data_absolute(left + 1100, top + 600, may=1)
@@ -260,11 +390,8 @@ while True:
if shizi.fuhuo(im_opencv[0]):
print('点击复活')
mouse_gui.send_data_absolute(left + 536, top + 627, may=1)
time.sleep(0.15)
keyboard.release()
time.sleep(0.15)
keyboard.release()
continue
if shizi.tuichu(im_opencv[0]) and detections['next'] is None and len(detections['daojv']) == 0 and len(detections['gw']) == 0 and boss_pd:
print("识别到可以退出挑战!!!!!!!!!!!!!!!!!!")

View File

@@ -82,6 +82,21 @@ def main():
group = groups[idx]
logger.info(f"{group['name']} (串口:{group['serial_port']}, 采集卡:{group['camera_index']})")
# 串口冲突预检:同一串口被多个组占用通常会导致仅一路成功
port_to_groups = {}
for idx in selected_indices:
g = groups[idx]
port_to_groups.setdefault(g['serial_port'], []).append(g['name'])
conflicts = {p: names for p, names in port_to_groups.items() if p and len(names) > 1}
if conflicts:
logger.warning("⚠️ 检测到串口冲突同一COM被多个组使用")
for p, names in conflicts.items():
logger.warning(f" {p}: {', '.join(names)}")
go_on = input("上述冲突很可能导致仅一组成功,其它失败。仍要继续? (y/n): ").strip().lower()
if go_on != 'y':
logger.info("已取消启动以避免串口冲突")
return
confirm = input("\n确认启动? (y/n): ").strip().lower()
if confirm != 'y':
logger.info("❌ 取消启动")

View File

@@ -69,28 +69,42 @@ def run_automation_for_group(group_index):
v = group['move_velocity'] # 从配置读取移动速度
def yolo_shibie(im_PIL, detections, model):
results = model(im_PIL)
for result in results:
for i in range(len(result.boxes.xyxy)):
left, top, right, bottom = result.boxes.xyxy[i]
scalar_tensor = result.boxes.cls[i]
value = scalar_tensor.item()
label = result.names[int(value)]
if label == 'center' or label == 'next' or label == 'boss' or label == 'zhaozi':
player_x = int(left + (right - left) / 2)
player_y = int(top + (bottom - top) / 2) + 30
RW = [player_x, player_y]
detections[label] = RW
elif label == 'daojv' or label == 'gw':
player_x = int(left + (right - left) / 2)
player_y = int(top + (bottom - top) / 2) + 30
RW = [player_x, player_y]
detections[label].append(RW)
elif label == 'npc1' or label == 'npc2' or label == 'npc3' or label == 'npc4':
player_x = int(left + (right - left) / 2)
player_y = int(bottom) + 30
RW = [player_x, player_y]
detections[label] = RW
try:
results = model(im_PIL)
for result in results:
if result.boxes is None or len(result.boxes.xyxy) == 0:
continue
for i in range(len(result.boxes.xyxy)):
try:
left = float(result.boxes.xyxy[i][0])
top = float(result.boxes.xyxy[i][1])
right = float(result.boxes.xyxy[i][2])
bottom = float(result.boxes.xyxy[i][3])
cls_id = int(result.boxes.cls[i])
label = result.names[cls_id]
if label == 'center' or label == 'next' or label == 'boss' or label == 'zhaozi':
player_x = int(left + (right - left) / 2) + 3
player_y = int(top + (bottom - top) / 2) + 40
RW = [player_x, player_y]
detections[label] = RW
elif label == 'daojv' or label == 'gw':
player_x = int(left + (right - left) / 2) + 3
player_y = int(top + (bottom - top) / 2) + 40
RW = [player_x, player_y]
if label not in detections:
detections[label] = []
detections[label].append(RW)
elif label == 'npc1' or label == 'npc2' or label == 'npc3' or label == 'npc4':
player_x = int(left + (right - left) / 2)
player_y = int(bottom) + 30
RW = [player_x, player_y]
detections[label] = RW
except Exception as e:
print(f"⚠️ 处理检测框时出错: {e}")
continue
except Exception as e:
print(f"⚠️ YOLO检测出错: {e}")
return detections
def sq(p1, p2):
@@ -119,25 +133,76 @@ def run_automation_for_group(group_index):
best_point = p
return best_point
def safe_keyboard_send(data, max_retries=3):
"""安全的键盘发送函数,带重试机制"""
nonlocal keyboard
for attempt in range(max_retries):
try:
keyboard.send_data(data)
return True
except (serial.serialutil.SerialException, PermissionError, OSError) as e:
if attempt < max_retries - 1:
print(f"⚠️ 键盘发送失败 (尝试 {attempt + 1}/{max_retries}): {e}")
time.sleep(0.1 * (attempt + 1)) # 递增延迟
# 尝试重新初始化串口
try:
if serial.ser and serial.ser.is_open:
serial.ser.close()
time.sleep(0.2)
init_mouse_keyboard(group)
# 重新创建keyboard对象
keyboard = ch9329Comm.keyboard.DataComm()
except Exception as init_e:
print(f"⚠️ 重新初始化串口失败: {init_e}")
else:
print(f"❌ 键盘发送失败,已重试 {max_retries} 次: {e}")
raise
return False
def safe_keyboard_release(max_retries=3):
"""安全的键盘释放函数,带重试机制"""
nonlocal keyboard
for attempt in range(max_retries):
try:
keyboard.release()
return True
except (serial.serialutil.SerialException, PermissionError, OSError) as e:
if attempt < max_retries - 1:
time.sleep(0.1 * (attempt + 1))
# 尝试重新初始化串口
try:
if serial.ser and serial.ser.is_open:
serial.ser.close()
time.sleep(0.2)
init_mouse_keyboard(group)
keyboard = ch9329Comm.keyboard.DataComm()
except Exception as init_e:
print(f"⚠️ 重新初始化串口失败: {init_e}")
else:
print(f"⚠️ 键盘释放失败: {e}")
# 释放失败不算致命错误,继续执行
return False
return False
def move_randomly(rw, k):
k = k % 4
suiji_t = float(random.randint(10, 13) / 10)
if k == 0:
keyboard.send_data("66")
time.sleep(suiji_t)
keyboard.release()
if safe_keyboard_send("66"):
time.sleep(suiji_t)
safe_keyboard_release()
elif k == 1:
keyboard.send_data("88")
time.sleep(suiji_t)
keyboard.release()
if safe_keyboard_send("88"):
time.sleep(suiji_t)
safe_keyboard_release()
elif k == 2:
keyboard.send_data("44")
time.sleep(suiji_t)
keyboard.release()
if safe_keyboard_send("44"):
time.sleep(suiji_t)
safe_keyboard_release()
elif k == 3:
keyboard.send_data("22")
time.sleep(suiji_t)
keyboard.release()
if safe_keyboard_send("22"):
time.sleep(suiji_t)
safe_keyboard_release()
return k + 1
def move_to(rw, mb):
@@ -145,22 +210,22 @@ def run_automation_for_group(group_index):
nonlocal v
v = group['move_velocity']
if rw[0] >= mb[0]:
keyboard.send_data("44")
time.sleep(float(abs(rw[0] - mb[0]) / v))
keyboard.release()
if safe_keyboard_send("44"):
time.sleep(float(abs(rw[0] - mb[0]) / v))
safe_keyboard_release()
else:
keyboard.send_data("66")
time.sleep(float(abs(rw[0] - mb[0]) / v))
keyboard.release()
if safe_keyboard_send("66"):
time.sleep(float(abs(rw[0] - mb[0]) / v))
safe_keyboard_release()
if rw[1] >= mb[1]:
keyboard.send_data("88")
time.sleep(float(abs(rw[1] - mb[1]) / v))
keyboard.release()
if safe_keyboard_send("88"):
time.sleep(float(abs(rw[1] - mb[1]) / v))
safe_keyboard_release()
else:
keyboard.send_data("22")
time.sleep(float(abs(rw[1] - mb[1]) / v))
keyboard.release()
if safe_keyboard_send("22"):
time.sleep(float(abs(rw[1] - mb[1]) / v))
safe_keyboard_release()
# 主循环
print(f"🔄 配置组 {group['name']} 开始自动化循环...")
@@ -211,9 +276,9 @@ def run_automation_for_group(group_index):
elif detections['npc4'] is not None:
if sq(detections['npc4'], rw) < 50:
print(f"[{group['name']}] 离npc4很近 直接进入")
keyboard.send_data("DD")
time.sleep(0.15)
keyboard.release()
if safe_keyboard_send("DD"):
time.sleep(0.15)
safe_keyboard_release()
time.sleep(1)
im_opencv = get_image.get_frame()
if im_opencv and shizi.daoying(im_opencv[0]):
@@ -257,9 +322,9 @@ def run_automation_for_group(group_index):
if len(detections['daojv']) != 0:
move_to(rw, process_points(detections['daojv']))
for i in range(3 + len(detections['daojv'])):
keyboard.send_data("AA")
time.sleep(0.15)
keyboard.release()
if safe_keyboard_send("AA"):
time.sleep(0.15)
safe_keyboard_release()
continue
if shizi.tuichu(im_opencv[0]) and detections['next'] is None and len(detections['daojv']) == 0 and len(detections['gw']) == 0 and boss_pd:
print(f"[{group['name']}] 识别到可以退出挑战!!!!!!!!!!!!!!!!!!")
@@ -314,9 +379,9 @@ def run_automation_for_group(group_index):
panduan = True
move_to(rw, detections['next'])
for i in range(2):
keyboard.send_data("DD")
time.sleep(0.15)
keyboard.release()
if safe_keyboard_send("DD"):
time.sleep(0.15)
safe_keyboard_release()
continue
else:
k = move_randomly(rw, k)
@@ -334,6 +399,14 @@ def run_automation_for_group(group_index):
traceback.print_exc()
time.sleep(1)
# 清理资源
try:
if serial.ser and serial.ser.is_open:
serial.ser.close()
print(f"✅ 配置组 {group['name']} 串口已关闭")
except Exception as e:
print(f"⚠️ 关闭串口时出错: {e}")
if __name__ == "__main__":
if len(sys.argv) > 1:
group_index = int(sys.argv[1])

796
test_capture_card.py Normal file
View File

@@ -0,0 +1,796 @@
"""
采集卡截图测试类
用于测试采集卡的分辨率和色差
保持与实际代码相同的实现逻辑
"""
import time
import threading
import warnings
import os
import sys
import io
import cv2
import numpy as np
from PIL import Image
from utils.logger import logger, throttle
import logging
# 抑制OpenCV的警告信息兼容不同版本
os.environ['OPENCV_LOG_LEVEL'] = 'SILENT'
os.environ['OPENCV_IO_ENABLE_OPENEXR'] = '0'
try:
if hasattr(cv2, 'setLogLevel'):
if hasattr(cv2, 'LOG_LEVEL_SILENT'):
cv2.setLogLevel(cv2.LOG_LEVEL_SILENT)
elif hasattr(cv2, 'LOG_LEVEL_ERROR'):
cv2.setLogLevel(cv2.LOG_LEVEL_ERROR)
elif hasattr(cv2, 'utils'):
cv2.utils.setLogLevel(0)
except Exception:
pass
class CaptureCardTester:
"""
采集卡测试类
使用与实际代码相同的实现逻辑
"""
def __init__(self, cam_index=0, width=1920, height=1080):
"""
初始化采集卡测试器
:param cam_index: 采集卡索引
:param width: 期望宽度
:param height: 期望高度
"""
logger.info(f"🔧 正在初始化采集卡测试器 {cam_index}...")
self.cap = None
self.frame = None
self.running = True
self.cam_index = cam_index
self.expected_width = width
self.expected_height = height
self.actual_width = None
self.actual_height = None
# 尝试多种方式打开采集卡(与实际代码相同)
backends_to_try = [
(cam_index, cv2.CAP_DSHOW),
(cam_index, cv2.CAP_ANY),
(cam_index, None), # 默认后端
]
# 重定向stderr来抑制OpenCV的错误输出
old_stderr = sys.stderr
suppressed_output = io.StringIO()
try:
sys.stderr = suppressed_output
for idx, backend in backends_to_try:
try:
with warnings.catch_warnings():
warnings.filterwarnings('ignore', category=UserWarning)
if backend is not None:
self.cap = cv2.VideoCapture(idx, backend)
else:
self.cap = cv2.VideoCapture(idx)
if self.cap.isOpened():
# 测试读取一帧
ret, test_frame = self.cap.read()
if ret and test_frame is not None:
logger.info(f"✅ 采集卡 {cam_index} 打开成功")
break
else:
self.cap.release()
self.cap = None
except Exception as e:
if self.cap:
try:
self.cap.release()
except:
pass
self.cap = None
continue
finally:
# 恢复stderr
sys.stderr = old_stderr
if self.cap is None or not self.cap.isOpened():
logger.error(f"❌ 无法打开采集卡 {cam_index}")
logger.error("请检查:\n 1. 采集卡是否正确连接\n 2. 采集卡索引是否正确(尝试扫描采集卡)\n 3. 采集卡驱动是否安装\n 4. 采集卡是否被其他程序占用")
self.cap = None
return
# 设置分辨率(与实际代码相同)
try:
self.cap.set(cv2.CAP_PROP_FRAME_WIDTH, width)
self.cap.set(cv2.CAP_PROP_FRAME_HEIGHT, height)
# 实际获取设置后的分辨率
self.actual_width = int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH))
self.actual_height = int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
logger.info(f" 分辨率设置: {width}x{height} -> 实际: {self.actual_width}x{self.actual_height}")
except Exception as e:
logger.warning(f"⚠️ 设置分辨率失败: {e}")
# 启动更新线程(与实际代码相同)
threading.Thread(target=self.update, daemon=True).start()
# 等待几帧确保采集卡正常工作
time.sleep(1.0)
logger.info(f"✅ 采集卡 {cam_index} 初始化完成")
def update(self):
"""持续更新帧(与实际代码相同)"""
while self.running and self.cap is not None:
try:
ret, frame = self.cap.read()
if ret and frame is not None:
self.frame = frame
# 限制读取频率避免占满CPU
time.sleep(0.008)
else:
# 读取失败时不打印,避免刷屏
time.sleep(0.02)
except Exception as e:
# 只在异常时打印错误
throttle(f"cap_read_err_{self.cam_index}", 2.0, logging.WARNING, f"⚠️ 采集卡 {self.cam_index} 读取异常: {e}")
time.sleep(0.1) # 出错时短暂延迟
def get_frame(self):
"""
获取处理后的帧(与实际代码相同)
返回: [im_opencv, im_PIL] 或 None
"""
if self.cap is None or self.frame is None:
return None
try:
im_opencv = cv2.cvtColor(self.frame, cv2.COLOR_BGR2RGB)
im_opencv = im_opencv[30:30+720, 0:1280] # 裁剪尺寸(与实际代码相同)
im_PIL = Image.fromarray(im_opencv)
return [im_opencv, im_PIL]
except Exception as e:
throttle(f"img_proc_err_{self.cam_index}", 2.0, logging.WARNING, f"⚠️ 图像处理错误: {e}")
return None
def get_raw_frame(self):
"""
获取原始帧(未裁剪)
返回: numpy array 或 None
"""
if self.cap is None or self.frame is None:
return None
return self.frame.copy()
def test_resolution(self):
"""
测试分辨率
返回分辨率信息字典
"""
if self.cap is None:
logger.error("采集卡未初始化")
return None
result = {
'expected': (self.expected_width, self.expected_height),
'actual_cap': (self.actual_width, self.actual_height),
'actual_frame': None,
'cropped_frame': None,
'match': False
}
# 获取原始帧尺寸
raw_frame = self.get_raw_frame()
if raw_frame is not None:
h, w = raw_frame.shape[:2]
result['actual_frame'] = (w, h)
# 获取裁剪后帧尺寸
processed = self.get_frame()
if processed is not None:
im_opencv = processed[0]
h, w = im_opencv.shape[:2]
result['cropped_frame'] = (w, h)
# 检查分辨率是否匹配
if result['actual_cap'] is not None:
result['match'] = (result['actual_cap'][0] == self.expected_width and
result['actual_cap'][1] == self.expected_height)
return result
def test_color_difference(self, frame1=None, frame2=None):
"""
测试色差
:param frame1: 第一帧(可选,不提供则使用当前帧)
:param frame2: 第二帧(可选,不提供则等待一帧后获取)
:return: 色差信息字典
"""
if frame1 is None:
frame1 = self.get_frame()
if frame1 is None:
logger.error("无法获取第一帧")
return None
frame1 = frame1[0] # 使用opencv格式
if frame2 is None:
# 等待一小段时间获取新帧
time.sleep(0.1)
frame2 = self.get_frame()
if frame2 is None:
logger.error("无法获取第二帧")
return None
frame2 = frame2[0] # 使用opencv格式
# 确保两帧尺寸相同
if frame1.shape != frame2.shape:
logger.warning(f"两帧尺寸不同: {frame1.shape} vs {frame2.shape}")
# 调整尺寸
h, w = min(frame1.shape[0], frame2.shape[0]), min(frame1.shape[1], frame2.shape[1])
frame1 = frame1[:h, :w]
frame2 = frame2[:h, :w]
# 计算色差
diff = cv2.absdiff(frame1, frame2)
diff_gray = cv2.cvtColor(diff, cv2.COLOR_RGB2GRAY)
# 计算统计信息
mean_diff = np.mean(diff)
std_diff = np.std(diff)
max_diff = np.max(diff)
mean_diff_gray = np.mean(diff_gray)
# 计算RGB各通道的平均色差
mean_diff_r = np.mean(diff[:, :, 0])
mean_diff_g = np.mean(diff[:, :, 1])
mean_diff_b = np.mean(diff[:, :, 2])
# 计算PSNR峰值信噪比
mse = np.mean((frame1.astype(float) - frame2.astype(float)) ** 2)
if mse == 0:
psnr = float('inf')
else:
psnr = 20 * np.log10(255.0 / np.sqrt(mse))
result = {
'mean_diff': float(mean_diff),
'std_diff': float(std_diff),
'max_diff': int(max_diff),
'mean_diff_gray': float(mean_diff_gray),
'mean_diff_r': float(mean_diff_r),
'mean_diff_g': float(mean_diff_g),
'mean_diff_b': float(mean_diff_b),
'psnr': float(psnr),
'mse': float(mse),
'diff_image': diff,
'diff_gray': diff_gray
}
return result
def test_color_stability(self, num_frames=10, interval=0.1):
"""
测试颜色稳定性(连续多帧的色差)
:param num_frames: 测试帧数
:param interval: 帧间隔(秒)
:return: 稳定性统计信息
"""
frames = []
logger.info(f"开始采集 {num_frames} 帧用于稳定性测试...")
for i in range(num_frames):
frame = self.get_frame()
if frame is None:
logger.warning(f"无法获取第 {i+1}")
continue
frames.append(frame[0]) # 使用opencv格式
if i < num_frames - 1:
time.sleep(interval)
if len(frames) < 2:
logger.error("采集的帧数不足")
return None
# 计算所有帧之间的平均色差
all_diffs = []
for i in range(len(frames) - 1):
diff_result = self.test_color_difference(frames[i], frames[i+1])
if diff_result:
all_diffs.append(diff_result['mean_diff'])
if not all_diffs:
return None
result = {
'num_frames': len(frames),
'avg_mean_diff': float(np.mean(all_diffs)),
'std_mean_diff': float(np.std(all_diffs)),
'min_mean_diff': float(np.min(all_diffs)),
'max_mean_diff': float(np.max(all_diffs)),
'all_diffs': [float(d) for d in all_diffs]
}
return result
def print_resolution_test(self):
"""打印分辨率测试结果"""
print("\n" + "="*60)
print("分辨率测试结果")
print("="*60)
result = self.test_resolution()
if result is None:
print("❌ 测试失败:采集卡未初始化")
return
print(f"期望分辨率: {result['expected'][0]} x {result['expected'][1]}")
if result['actual_cap']:
print(f"实际分辨率(采集卡): {result['actual_cap'][0]} x {result['actual_cap'][1]}")
match_str = "✅ 匹配" if result['match'] else "❌ 不匹配"
print(f"分辨率匹配: {match_str}")
if result['actual_frame']:
print(f"实际分辨率(原始帧): {result['actual_frame'][0]} x {result['actual_frame'][1]}")
if result['cropped_frame']:
print(f"裁剪后分辨率: {result['cropped_frame'][0]} x {result['cropped_frame'][1]}")
print("="*60 + "\n")
def print_color_test(self):
"""打印色差测试结果"""
print("\n" + "="*60)
print("色差测试结果(两帧对比)")
print("="*60)
result = self.test_color_difference()
if result is None:
print("❌ 测试失败:无法获取帧")
return
print(f"平均色差: {result['mean_diff']:.2f}")
print(f"色差标准差: {result['std_diff']:.2f}")
print(f"最大色差: {result['max_diff']}")
print(f"灰度平均色差: {result['mean_diff_gray']:.2f}")
print(f"\nRGB通道平均色差:")
print(f" R通道: {result['mean_diff_r']:.2f}")
print(f" G通道: {result['mean_diff_g']:.2f}")
print(f" B通道: {result['mean_diff_b']:.2f}")
print(f"\nPSNR (峰值信噪比): {result['psnr']:.2f} dB")
print(f"MSE (均方误差): {result['mse']:.2f}")
print("="*60 + "\n")
def print_stability_test(self, num_frames=10):
"""打印稳定性测试结果"""
print("\n" + "="*60)
print(f"颜色稳定性测试结果({num_frames}帧)")
print("="*60)
result = self.test_color_stability(num_frames)
if result is None:
print("❌ 测试失败:无法获取足够的帧")
return
print(f"测试帧数: {result['num_frames']}")
print(f"平均色差均值: {result['avg_mean_diff']:.2f}")
print(f"色差标准差: {result['std_mean_diff']:.2f}")
print(f"最小色差: {result['min_mean_diff']:.2f}")
print(f"最大色差: {result['max_mean_diff']:.2f}")
print(f"\n各帧间色差: {[f'{d:.2f}' for d in result['all_diffs']]}")
print("="*60 + "\n")
def save_test_images(self, save_dir="test_output"):
"""保存测试图像"""
os.makedirs(save_dir, exist_ok=True)
# 保存原始帧
raw_frame = self.get_raw_frame()
if raw_frame is not None:
cv2.imwrite(os.path.join(save_dir, "raw_frame.jpg"), raw_frame)
logger.info(f"已保存原始帧: {save_dir}/raw_frame.jpg")
# 保存处理后的帧
processed = self.get_frame()
if processed is not None:
cv2.imwrite(os.path.join(save_dir, "processed_frame.jpg"), cv2.cvtColor(processed[0], cv2.COLOR_RGB2BGR))
logger.info(f"已保存处理后帧: {save_dir}/processed_frame.jpg")
# 保存色差图
color_diff = self.test_color_difference()
if color_diff is not None:
cv2.imwrite(os.path.join(save_dir, "color_diff.jpg"), cv2.cvtColor(color_diff['diff_image'], cv2.COLOR_RGB2BGR))
cv2.imwrite(os.path.join(save_dir, "color_diff_gray.jpg"), color_diff['diff_gray'])
logger.info(f"已保存色差图: {save_dir}/color_diff.jpg")
logger.info(f"已保存灰度色差图: {save_dir}/color_diff_gray.jpg")
def release(self):
"""释放资源(与实际代码相同)"""
self.running = False
time.sleep(0.2)
if self.cap is not None:
self.cap.release()
cv2.destroyAllWindows()
logger.info("🔚 采集卡已释放")
def __del__(self):
"""析构函数(与实际代码相同)"""
if hasattr(self, "cap") and self.cap is not None:
try:
self.release()
except:
pass
class MultiCaptureCardTester:
"""
多采集卡测试管理器
支持同时测试多张采集卡
"""
def __init__(self):
"""初始化多采集卡测试管理器"""
self.testers = {} # {cam_index: CaptureCardTester}
self.config = None
def load_from_config(self):
"""从配置文件加载采集卡"""
try:
from config import config_manager
config_manager.load_config()
self.config = config_manager.config
groups = self.config.get('groups', [])
if not groups:
logger.warning("配置文件中没有找到配置组")
return []
# 获取所有配置组中的采集卡
camera_configs = []
for group in groups:
cam_idx = group.get('camera_index')
cam_width = group.get('camera_width', 1920)
cam_height = group.get('camera_height', 1080)
name = group.get('name', f"配置组{groups.index(group)}")
# 检查是否已存在相同索引的采集卡
if cam_idx not in [c['index'] for c in camera_configs]:
camera_configs.append({
'index': cam_idx,
'width': cam_width,
'height': cam_height,
'name': name,
'group': group
})
return camera_configs
except Exception as e:
logger.error(f"从配置加载失败: {e}")
return []
def add_camera(self, cam_index, width=1920, height=1080, name=None):
"""
添加一张采集卡到测试列表
:param cam_index: 采集卡索引
:param width: 宽度
:param height: 高度
:param name: 采集卡名称(可选)
"""
if name is None:
name = f"采集卡{cam_index}"
if cam_index in self.testers:
logger.warning(f"采集卡 {cam_index} 已存在,将重新初始化")
self.testers[cam_index].release()
tester = CaptureCardTester(cam_index=cam_index, width=width, height=height)
if tester.cap is not None:
tester.name = name
self.testers[cam_index] = tester
return True
else:
logger.error(f"无法初始化采集卡 {cam_index}")
return False
def initialize_from_config(self, use_all_groups=True):
"""
从配置初始化所有采集卡
:param use_all_groups: 是否使用所有配置组False则只使用活动配置组
"""
camera_configs = self.load_from_config()
if not camera_configs:
logger.warning("没有找到可用的采集卡配置")
return False
# 筛选配置组
if not use_all_groups:
camera_configs = [c for c in camera_configs if c.get('group', {}).get('active', False)]
if not camera_configs:
logger.warning("没有活动的配置组")
return False
logger.info(f"📷 找到 {len(camera_configs)} 张采集卡配置")
success_count = 0
for config in camera_configs:
if self.add_camera(
cam_index=config['index'],
width=config['width'],
height=config['height'],
name=config['name']
):
success_count += 1
logger.info(f"✅ 成功初始化 {success_count}/{len(camera_configs)} 张采集卡")
return success_count > 0
def test_all_resolution(self):
"""测试所有采集卡的分辨率"""
print("\n" + "="*60)
print("所有采集卡分辨率测试")
print("="*60)
results = {}
for cam_index, tester in self.testers.items():
name = getattr(tester, 'name', f"采集卡{cam_index}")
print(f"\n{name} (索引: {cam_index})】")
result = tester.test_resolution()
results[cam_index] = result
# 汇总结果
print("\n" + "-"*60)
print("分辨率测试汇总")
print("-"*60)
for cam_index, result in results.items():
if result is None:
continue
name = getattr(self.testers[cam_index], 'name', f"采集卡{cam_index}")
match_str = "" if result.get('match', False) else ""
print(f"{match_str} {name}: 期望{result['expected']} -> 实际{result.get('actual_cap', 'N/A')}")
return results
def test_all_color(self):
"""测试所有采集卡的色差"""
print("\n" + "="*60)
print("所有采集卡色差测试")
print("="*60)
results = {}
for cam_index, tester in self.testers.items():
name = getattr(tester, 'name', f"采集卡{cam_index}")
print(f"\n{name} (索引: {cam_index})】")
result = tester.test_color_difference()
results[cam_index] = result
# 汇总结果
print("\n" + "-"*60)
print("色差测试汇总")
print("-"*60)
for cam_index, result in results.items():
if result is None:
continue
name = getattr(self.testers[cam_index], 'name', f"采集卡{cam_index}")
print(f"{name}: 平均色差={result['mean_diff']:.2f}, PSNR={result['psnr']:.2f}dB")
return results
def test_all_stability(self, num_frames=10):
"""测试所有采集卡的稳定性"""
print("\n" + "="*60)
print("所有采集卡稳定性测试")
print("="*60)
results = {}
for cam_index, tester in self.testers.items():
name = getattr(tester, 'name', f"采集卡{cam_index}")
print(f"\n{name} (索引: {cam_index})】")
result = tester.test_color_stability(num_frames=num_frames)
results[cam_index] = result
# 汇总结果
print("\n" + "-"*60)
print("稳定性测试汇总")
print("-"*60)
for cam_index, result in results.items():
if result is None:
continue
name = getattr(self.testers[cam_index], 'name', f"采集卡{cam_index}")
print(f"{name}: 平均色差={result['avg_mean_diff']:.2f}±{result['std_mean_diff']:.2f}")
return results
def save_all_test_images(self, base_dir="test_output"):
"""保存所有采集卡的测试图像"""
for cam_index, tester in self.testers.items():
name = getattr(tester, 'name', f"采集卡{cam_index}")
# 清理名称中的特殊字符,用于目录名
safe_name = "".join(c for c in name if c.isalnum() or c in (' ', '-', '_')).rstrip()
save_dir = os.path.join(base_dir, safe_name)
tester.save_test_images(save_dir=save_dir)
def release_all(self):
"""释放所有采集卡"""
for tester in self.testers.values():
try:
tester.release()
except:
pass
self.testers.clear()
def __del__(self):
"""析构函数"""
self.release_all()
def scan_cameras(max_index=10):
"""
扫描可用的采集卡
:param max_index: 最大扫描索引
:return: 可用采集卡索引列表
"""
print("🔍 正在扫描采集卡...")
available = []
old_stderr = sys.stderr
suppressed_output = io.StringIO()
try:
sys.stderr = suppressed_output
for i in range(max_index):
cap = None
try:
with warnings.catch_warnings():
warnings.filterwarnings('ignore')
cap = cv2.VideoCapture(i, cv2.CAP_DSHOW)
if cap.isOpened():
ret, frame = cap.read()
if ret and frame is not None:
available.append(i)
print(f" ✅ 找到采集卡: 索引 {i}")
if cap:
cap.release()
except:
if cap:
try:
cap.release()
except:
pass
finally:
sys.stderr = old_stderr
if not available:
print(" ❌ 未找到可用的采集卡")
else:
print(f"✅ 共找到 {len(available)} 张采集卡")
return available
def main():
"""主测试函数"""
print("="*60)
print("采集卡截图测试工具(支持多采集卡)")
print("="*60)
multi_tester = MultiCaptureCardTester()
# 选择测试模式
print("\n选择测试模式:")
print(" 1. 从配置文件加载(所有配置组)")
print(" 2. 从配置文件加载(仅活动配置组)")
print(" 3. 手动指定采集卡索引")
print(" 4. 扫描采集卡")
print(" 0. 退出")
choice = input("\n请选择 (0-4): ").strip()
if choice == "0":
print("👋 退出")
return
elif choice == "1":
# 从配置加载所有配置组
if not multi_tester.initialize_from_config(use_all_groups=True):
print("❌ 无法从配置初始化采集卡")
return
elif choice == "2":
# 从配置加载活动配置组
if not multi_tester.initialize_from_config(use_all_groups=False):
print("❌ 无法从配置初始化采集卡")
return
elif choice == "3":
# 手动指定
indices_input = input("请输入采集卡索引(用逗号分隔,如: 0,1,2): ").strip()
try:
indices = [int(x.strip()) for x in indices_input.split(',')]
width_input = input("请输入宽度 (默认1920): ").strip()
height_input = input("请输入高度 (默认1080): ").strip()
width = int(width_input) if width_input else 1920
height = int(height_input) if height_input else 1080
success_count = 0
for idx in indices:
if multi_tester.add_camera(idx, width=width, height=height):
success_count += 1
if success_count == 0:
print("❌ 无法初始化任何采集卡")
return
except ValueError:
print("❌ 输入格式错误")
return
elif choice == "4":
# 扫描采集卡
available = scan_cameras()
if not available:
return
indices_input = input(f"请输入要测试的采集卡索引(用逗号分隔,可用: {available}): ").strip()
try:
indices = [int(x.strip()) for x in indices_input.split(',')]
# 验证索引有效性
indices = [i for i in indices if i in available]
if not indices:
print("❌ 没有有效的采集卡索引")
return
width_input = input("请输入宽度 (默认1920): ").strip()
height_input = input("请输入高度 (默认1080): ").strip()
width = int(width_input) if width_input else 1920
height = int(height_input) if height_input else 1080
success_count = 0
for idx in indices:
if multi_tester.add_camera(idx, width=width, height=height):
success_count += 1
if success_count == 0:
print("❌ 无法初始化任何采集卡")
return
except ValueError:
print("❌ 输入格式错误")
return
else:
print("❌ 无效选择")
return
if not multi_tester.testers:
print("❌ 没有可用的采集卡")
return
try:
# 等待采集卡稳定
print("\n等待采集卡稳定...")
time.sleep(1.0)
# 测试分辨率
multi_tester.test_all_resolution()
# 等待一下确保采集卡稳定
time.sleep(0.5)
# 测试色差
multi_tester.test_all_color()
# 测试稳定性
multi_tester.test_all_stability(num_frames=10)
# 保存测试图像
multi_tester.save_all_test_images()
print("\n✅ 所有测试完成!")
print("按 Enter 键退出...")
input()
except KeyboardInterrupt:
print("\n\n用户中断测试")
except Exception as e:
logger.error(f"测试过程中发生错误: {e}")
import traceback
traceback.print_exc()
finally:
multi_tester.release_all()
if __name__ == "__main__":
main()

197
utils/device_scanner.py Normal file
View File

@@ -0,0 +1,197 @@
"""
设备扫描工具 - 通过唯一标识符而非索引来识别设备
解决重启后设备序号混乱的问题
"""
import cv2
import serial.tools.list_ports
import warnings
import sys
import io
from typing import List, Dict, Optional
def get_camera_info(cam_index: int) -> Optional[Dict]:
"""
获取采集卡的详细信息(包括设备名称)
:param cam_index: 采集卡索引
:return: 包含设备信息的字典如果失败返回None
"""
old_stderr = sys.stderr
suppressed_output = io.StringIO()
try:
sys.stderr = suppressed_output
with warnings.catch_warnings():
warnings.filterwarnings('ignore')
# 尝试多种后端
backends = [
(cam_index, cv2.CAP_DSHOW),
(cam_index, cv2.CAP_ANY),
(cam_index, None),
]
cap = None
for idx, backend in backends:
try:
if backend is not None:
cap = cv2.VideoCapture(idx, backend)
else:
cap = cv2.VideoCapture(idx)
if cap.isOpened():
ret, frame = cap.read()
if ret and frame is not None:
# 尝试获取设备名称(不同后端可能支持不同)
device_name = None
backend_name = None
if backend == cv2.CAP_DSHOW:
backend_name = "DirectShow"
# DirectShow可能支持获取设备名称
try:
# 某些情况下可以通过属性获取
pass # OpenCV限制无法直接获取设备名称
except:
pass
elif backend == cv2.CAP_ANY:
backend_name = "Any"
else:
backend_name = "Default"
# 获取分辨率信息作为辅助标识
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
cap.release()
return {
'index': cam_index,
'backend': backend_name,
'width': width,
'height': height,
'name': f"采集卡-{cam_index}",
'available': True
}
else:
if cap:
cap.release()
cap = None
except Exception:
if cap:
try:
cap.release()
except:
pass
cap = None
continue
return None
finally:
sys.stderr = old_stderr
def scan_cameras_with_info(max_index: int = 10) -> List[Dict]:
"""
扫描所有可用采集卡,返回详细信息列表
:param max_index: 最大扫描索引
:return: 采集卡信息列表
"""
cameras = []
for i in range(max_index + 1):
info = get_camera_info(i)
if info:
cameras.append(info)
return cameras
def get_serial_port_info(port_name: str) -> Optional[Dict]:
"""
获取串口的详细信息包括硬件ID和描述
:param port_name: 串口名称(如 "COM3"
:return: 包含串口信息的字典
"""
try:
ports = serial.tools.list_ports.comports()
for port in ports:
if port.device == port_name:
return {
'device': port.device,
'description': port.description,
'hwid': port.hwid, # 硬件ID唯一标识
'vid': port.vid if hasattr(port, 'vid') else None,
'pid': port.pid if hasattr(port, 'pid') else None,
'serial_number': port.serial_number if hasattr(port, 'serial_number') else None,
'manufacturer': port.manufacturer if hasattr(port, 'manufacturer') else None,
'name': port.description or port.device
}
except Exception as e:
print(f"⚠️ 获取串口信息失败: {e}")
return None
def scan_serial_ports_with_info() -> List[Dict]:
"""
扫描所有可用串口,返回详细信息列表
:return: 串口信息列表
"""
ports_info = []
try:
ports = serial.tools.list_ports.comports()
for port in ports:
info = {
'device': port.device,
'description': port.description,
'hwid': port.hwid,
'vid': port.vid if hasattr(port, 'vid') else None,
'pid': port.pid if hasattr(port, 'pid') else None,
'serial_number': port.serial_number if hasattr(port, 'serial_number') else None,
'manufacturer': port.manufacturer if hasattr(port, 'manufacturer') else None,
'name': port.description or port.device
}
ports_info.append(info)
except Exception as e:
print(f"⚠️ 扫描串口失败: {e}")
# 按设备名排序
ports_info.sort(key=lambda x: int(x['device'].replace('COM', '')) if x['device'].replace('COM', '').isdigit() else 999)
return ports_info
def find_camera_by_index(cameras: List[Dict], index: int) -> Optional[Dict]:
"""通过索引查找采集卡"""
for cam in cameras:
if cam['index'] == index:
return cam
return None
def find_serial_by_hwid(ports: List[Dict], hwid: str) -> Optional[Dict]:
"""通过硬件ID查找串口最可靠"""
for port in ports:
if port['hwid'] == hwid:
return port
return None
def find_serial_by_device(ports: List[Dict], device: str) -> Optional[Dict]:
"""通过设备名查找串口(兼容旧配置)"""
for port in ports:
if port['device'] == device:
return port
return None
if __name__ == "__main__":
# 测试代码
print("=" * 60)
print("采集卡扫描:")
print("=" * 60)
cameras = scan_cameras_with_info()
for cam in cameras:
print(f" 索引 {cam['index']}: {cam['name']} ({cam['width']}x{cam['height']})")
print("\n" + "=" * 60)
print("串口扫描:")
print("=" * 60)
ports = scan_serial_ports_with_info()
for port in ports:
print(f" {port['device']}: {port['name']}")
print(f" 硬件ID: {port['hwid']}")
if port['vid'] and port['pid']:
print(f" VID/PID: {port['vid']:04X}/{port['pid']:04X}")

View File

@@ -10,18 +10,32 @@ mouse = None
def init_mouse_keyboard(config_group):
"""初始化鼠标和串口"""
global serial, mouse
from utils.logger import logger
# 初始化串口
serial.ser = serial.Serial(
config_group['serial_port'],
config_group['serial_baudrate']
)
try:
logger.info(f"🔧 正在打开串口: {config_group['serial_port']} @ {config_group['serial_baudrate']}")
serial.ser = serial.Serial(
config_group['serial_port'],
config_group['serial_baudrate'],
timeout=1
)
logger.info(f"✅ 串口已打开: {config_group['serial_port']} @ {config_group['serial_baudrate']}")
except Exception as e:
logger.error(f"❌ 串口打开失败: {e}")
raise
# 初始化鼠标
mouse = ch9329Comm.mouse.DataComm(
config_group['camera_width'],
config_group['camera_height']
)
print(f"✅ 串口已打开: {config_group['serial_port']} {config_group['serial_baudrate']}")
print(f"✅ 鼠标已初始化: {config_group['camera_width']}x{config_group['camera_height']}")
try:
logger.info(f"🔧 正在初始化鼠标: {config_group['camera_width']}x{config_group['camera_height']}")
mouse = ch9329Comm.mouse.DataComm(
config_group['camera_width'],
config_group['camera_height']
)
logger.info(f"✅ 鼠标已初始化: {config_group['camera_width']}x{config_group['camera_height']}")
except Exception as e:
logger.error(f"❌ 鼠标初始化失败: {e}")
raise
def bezier_point(t, p0, p1, p2, p3):
"""计算三次贝塞尔曲线上的点"""

View File

@@ -68,7 +68,7 @@ def tuwai(image):
img_bytes = img_encoded.tobytes()
result = ocr.classification(img_bytes)
print(result)
if result == "tap" or result=='tqp' or result=='top':
if result == "tap" or result=='tqp' or result=='top' or result=='tp':
return True
else:
return False

View File

@@ -1,55 +1,397 @@
import cv2
from utils.get_image import get_image
from utils.get_image import GetImage
from ultralytics import YOLO
from config import config_manager
from utils.logger import logger
import os
import numpy as np
model = YOLO(r"best0.pt").to('cuda')
# 检查模型文件是否存在
model_path = r"best0.pt"
if not os.path.exists(model_path):
print(f"❌ 模型文件不存在: {model_path}")
exit(1)
def yolo_shibie(im_PIL, detections):
results = model(im_PIL)
result = results[0]
# 加载YOLO模型
try:
model = YOLO(model_path).to('cuda')
print(f"✅ 模型加载成功: {model_path}")
except Exception as e:
print(f"❌ 模型加载失败: {e}")
exit(1)
# ✅ 获取绘制好框的图像
frame_with_boxes = result.plot()
def enhance_sharpness(image, strength=1.5):
"""
增强图像锐度
:param image: 输入图像BGR格式
:param strength: 锐化强度1.0-3.0默认1.5
:return: 锐化后的图像
"""
# 创建锐化核
kernel = np.array([[-1, -1, -1],
[-1, 9*strength, -1],
[-1, -1, -1]]) / (9*strength - 8)
sharpened = cv2.filter2D(image, -1, kernel)
return sharpened
# ✅ 用 OpenCV 动态显示
cv2.imshow("YOLO实时检测", frame_with_boxes)
# ESC 或 Q 键退出
if cv2.waitKey(1) & 0xFF in [27, ord('q')]:
return None
def enhance_contrast(image, alpha=1.2, beta=10):
"""
增强对比度和亮度
:param image: 输入图像
:param alpha: 对比度控制1.0-3.0默认1.2
:param beta: 亮度控制(-100到100默认10
:return: 增强后的图像
"""
return cv2.convertScaleAbs(image, alpha=alpha, beta=beta)
# ✅ 提取检测信息
for i in range(len(result.boxes.xyxy)):
left, top, right, bottom = result.boxes.xyxy[i]
cls_id = int(result.boxes.cls[i])
label = result.names[cls_id]
if label in ['center', 'next', 'npc1', 'npc2', 'npc3', 'npc4', 'boss', 'zhaozi']:
player_x = int(left + (right - left) / 2) + 3
player_y = int(top + (bottom - top) / 2) + 40
detections[label] = [player_x, player_y]
elif label in ['daojv', 'gw']:
player_x = int(left + (right - left) / 2) + 3
player_y = int(top + (bottom - top) / 2) + 40
detections[label].append([player_x, player_y])
def denoise_image(image, method='bilateral'):
"""
去噪处理
:param image: 输入图像
:param method: 去噪方法 ('bilateral', 'gaussian', 'fastNlMeans')
:return: 去噪后的图像
"""
if method == 'bilateral':
# 双边滤波,保留边缘的同时去噪
return cv2.bilateralFilter(image, 9, 75, 75)
elif method == 'gaussian':
# 高斯模糊去噪
return cv2.GaussianBlur(image, (5, 5), 0)
elif method == 'fastNlMeans':
# 非局部均值去噪(效果最好但较慢)
return cv2.fastNlMeansDenoisingColored(image, None, 10, 10, 7, 21)
return image
def apply_enhancements(image, sharpness=True, contrast=True, denoise=True,
sharp_strength=1.5, contrast_alpha=1.2, contrast_beta=10,
denoise_method='bilateral'):
"""
应用所有图像增强
:param image: 输入图像BGR格式
:param sharpness: 是否锐化
:param contrast: 是否增强对比度
:param denoise: 是否去噪
:param sharp_strength: 锐化强度
:param contrast_alpha: 对比度系数
:param contrast_beta: 亮度调整
:param denoise_method: 去噪方法
:return: 增强后的图像
"""
enhanced = image.copy()
if denoise:
enhanced = denoise_image(enhanced, denoise_method)
if contrast:
enhanced = enhance_contrast(enhanced, contrast_alpha, contrast_beta)
if sharpness:
enhanced = enhance_sharpness(enhanced, sharp_strength)
return enhanced
def set_camera_properties(cap, brightness=None, contrast=None, saturation=None,
sharpness=None, gain=None, exposure=None):
"""
设置采集卡硬件参数
:param cap: VideoCapture对象
:param brightness: 亮度 (0-100)
:param contrast: 对比度 (0-100)
:param saturation: 饱和度 (0-100)
:param sharpness: 锐度 (0-100)
:param gain: 增益 (0-100)
:param exposure: 曝光 (通常为负值,如-6)
"""
props = {
cv2.CAP_PROP_BRIGHTNESS: brightness,
cv2.CAP_PROP_CONTRAST: contrast,
cv2.CAP_PROP_SATURATION: saturation,
cv2.CAP_PROP_SHARPNESS: sharpness,
cv2.CAP_PROP_GAIN: gain,
cv2.CAP_PROP_EXPOSURE: exposure,
}
for prop, value in props.items():
if value is not None:
try:
cap.set(prop, value)
actual = cap.get(prop)
logger.info(f" 设置 {prop.name if hasattr(prop, 'name') else prop}: {value} -> 实际: {actual:.2f}")
except Exception as e:
logger.warning(f" ⚠️ 设置参数 {prop} 失败: {e}")
def yolo_shibie(im_PIL, detections, model, enhance_enabled=False, enhance_params=None):
"""
YOLO识别函数
:param im_PIL: PIL图像对象
:param detections: 检测结果字典
:param model: YOLO模型
:param enhance_enabled: 是否启用图像增强
:param enhance_params: 图像增强参数
:return: 更新后的detections字典如果用户退出则返回None
"""
if im_PIL is None:
return detections
try:
results = model(im_PIL)
result = results[0]
# ✅ 获取绘制好框的图像RGB格式
frame_with_boxes_rgb = result.plot()
# ✅ 转换为BGR格式用于OpenCV显示
frame_with_boxes_bgr = cv2.cvtColor(frame_with_boxes_rgb, cv2.COLOR_RGB2BGR)
# 应用图像增强(如果启用)
display_frame = frame_with_boxes_bgr.copy()
if enhance_enabled and enhance_params:
try:
display_frame = apply_enhancements(display_frame, **enhance_params)
except Exception as e:
print(f"⚠️ 图像增强失败: {e}")
# 显示YOLO检测结果
cv2.imshow("YOLO Real-time Detection", display_frame)
# ✅ 提取检测信息
if result.boxes is not None and len(result.boxes.xyxy) > 0:
# 用于存储多个候选npc4如果检测到多个
npc4_candidates = []
for i in range(len(result.boxes.xyxy)):
try:
left = float(result.boxes.xyxy[i][0])
top = float(result.boxes.xyxy[i][1])
right = float(result.boxes.xyxy[i][2])
bottom = float(result.boxes.xyxy[i][3])
cls_id = int(result.boxes.cls[i])
label = result.names[cls_id]
# 获取置信度(如果可用)
confidence = float(result.boxes.conf[i]) if hasattr(result.boxes, 'conf') and len(result.boxes.conf) > i else 1.0
# npc1-npc4 使用底部位置与main.py保持一致
if label in ['npc1', 'npc2', 'npc3', 'npc4']:
player_x = int(left + (right - left) / 2)
player_y = int(bottom) + 30 # 使用底部位置与main.py保持一致
position = [player_x, player_y]
# 特殊处理npc4如果检测到多个收集所有候选
if label == 'npc4':
npc4_candidates.append({
'position': position,
'confidence': confidence,
'box': [left, top, right, bottom],
'area': (right - left) * (bottom - top) # 检测框面积
})
else:
# npc1-npc3直接赋值如果已经有值保留置信度更高的
if detections[label] is None or (hasattr(result.boxes, 'conf') and
confidence > 0.5):
detections[label] = position
# 其他目标使用中心点
elif label in ['center', 'next', 'boss', 'zhaozi']:
player_x = int(left + (right - left) / 2) + 3
player_y = int(top + (bottom - top) / 2) + 40
detections[label] = [player_x, player_y]
# 道具和怪物可以多个
elif label in ['daojv', 'gw']:
player_x = int(left + (right - left) / 2) + 3
player_y = int(top + (bottom - top) / 2) + 40
# 确保列表存在
if label not in detections:
detections[label] = []
detections[label].append([player_x, player_y])
except Exception as e:
print(f"⚠️ 处理检测框时出错: {e}")
continue
# 处理npc4如果检测到多个选择最合适的
if npc4_candidates:
# 按置信度排序,选择置信度最高的
npc4_candidates.sort(key=lambda x: x['confidence'], reverse=True)
# 选择最佳候选(置信度最高且面积合理)
best_npc4 = None
for candidate in npc4_candidates:
# 置信度阈值至少0.3(可根据实际情况调整)
if candidate['confidence'] >= 0.3:
# 检查检测框面积是否合理(避免过小的误检)
area = candidate['area']
if area > 100: # 最小面积阈值
best_npc4 = candidate
break
if best_npc4:
detections['npc4'] = best_npc4['position']
# 可选:输出调试信息
# print(f"✅ 检测到npc4: 位置={best_npc4['position']}, 置信度={best_npc4['confidence']:.2f}")
elif len(npc4_candidates) == 1:
# 如果只有一个候选,即使置信度较低也使用
detections['npc4'] = npc4_candidates[0]['position']
except Exception as e:
print(f"⚠️ YOLO检测出错: {e}")
return detections
while True:
detections = {
'center': None, 'next': None,
'npc1': None, 'npc2': None, 'npc3': None, 'npc4': None,
'boss': None, 'zhaozi': None,
'daojv': [], 'gw': []
}
def main():
"""主函数"""
print("="*60)
print("YOLO实时检测测试")
print("="*60)
im_opencv = get_image.get_frame() # [RGB, PIL]
detections = yolo_shibie(im_opencv[1], detections)
# 从配置加载采集卡设置
active_group = config_manager.get_active_group()
if detections is None: # 用户退出
break
if active_group is None:
print("⚠️ 没有活动的配置组,使用默认设置")
print("提示: 可以运行 python gui_config.py 设置配置")
cam_index = 0
width = 1920
height = 1080
else:
print(f"📋 使用配置组: {active_group['name']}")
cam_index = active_group['camera_index']
width = active_group['camera_width']
height = active_group['camera_height']
print(detections)
print(f" 采集卡索引: {cam_index}")
print(f" 分辨率: {width}x{height}")
print()
cv2.destroyAllWindows()
# 初始化采集卡
print("🔧 正在初始化采集卡...")
get_image = GetImage(
cam_index=cam_index,
width=width,
height=height
)
if get_image.cap is None:
print("❌ 采集卡初始化失败")
print("请检查:")
print("1. 采集卡是否正确连接")
print("2. 采集卡索引是否正确")
print("3. 采集卡驱动是否安装")
return
# 设置采集卡硬件参数以提高清晰度(可选)
print("\n🔧 设置采集卡参数以提高清晰度...")
print("提示: 可以根据实际情况调整这些参数")
set_camera_properties(
get_image.cap,
brightness=50, # 亮度 (0-100)
contrast=50, # 对比度 (0-100)
saturation=55, # 饱和度 (0-100)
sharpness=60, # 锐度 (0-100提高清晰度)
gain=None, # 增益 (根据实际情况调整)
exposure=None # 曝光 (根据实际情况调整,通常为负值)
)
print("✅ 采集卡初始化成功")
print("\n快捷键:")
print(" 'q' 或 ESC - 退出")
print(" 'e' - 切换图像增强")
print(" '1'/'2' - 调整锐化强度 (+/-0.1)")
print(" '3'/'4' - 调整对比度 (+/-0.1)")
print()
try:
frame_count = 0
enhance_enabled = False # 默认关闭图像增强
# 图像增强参数
enhance_params = {
'sharpness': True,
'contrast': True,
'denoise': True,
'sharp_strength': 1.5,
'contrast_alpha': 1.2,
'contrast_beta': 10,
'denoise_method': 'bilateral'
}
while True:
# 获取帧
frame_data = get_image.get_frame()
if frame_data is None:
print("⚠️ 无法获取帧,跳过...")
continue
# frame_data 是 [im_opencv_rgb, im_PIL] 格式
# im_opencv_rgb 已经是RGB格式经过BGR2RGB转换
im_opencv_rgb, im_PIL = frame_data
if im_PIL is None:
print("⚠️ PIL图像为空跳过...")
continue
# 初始化检测结果字典
detections = {
'center': None, 'next': None,
'npc1': None, 'npc2': None, 'npc3': None, 'npc4': None,
'boss': None, 'zhaozi': None,
'daojv': [], 'gw': []
}
# 执行YOLO检测
detections = yolo_shibie(im_PIL, detections, model, enhance_enabled, enhance_params)
# 检查按键
key = cv2.waitKey(1) & 0xFF
if key in [27, ord('q'), ord('Q')]:
print("\n用户退出")
break
elif key == ord('e') or key == ord('E'):
enhance_enabled = not enhance_enabled
status = "开启" if enhance_enabled else "关闭"
print(f"图像增强: {status} (锐化={enhance_params['sharp_strength']:.1f}, "
f"对比度={enhance_params['contrast_alpha']:.1f})")
elif key == ord('1'):
enhance_params['sharp_strength'] = min(3.0, enhance_params['sharp_strength'] + 0.1)
print(f"锐化强度: {enhance_params['sharp_strength']:.1f}")
elif key == ord('2'):
enhance_params['sharp_strength'] = max(0.5, enhance_params['sharp_strength'] - 0.1)
print(f"锐化强度: {enhance_params['sharp_strength']:.1f}")
elif key == ord('3'):
enhance_params['contrast_alpha'] = min(3.0, enhance_params['contrast_alpha'] + 0.1)
print(f"对比度: {enhance_params['contrast_alpha']:.1f}")
elif key == ord('4'):
enhance_params['contrast_alpha'] = max(0.5, enhance_params['contrast_alpha'] - 0.1)
print(f"对比度: {enhance_params['contrast_alpha']:.1f}")
frame_count += 1
if frame_count % 30 == 0: # 每30帧打印一次
print(f"📊 已处理 {frame_count}")
# 打印有检测到的目标
detected_items = {k: v for k, v in detections.items() if v is not None and v != []}
if detected_items:
print(f" 检测到: {detected_items}")
except KeyboardInterrupt:
print("\n\n用户中断测试")
except Exception as e:
print(f"\n❌ 测试过程中发生错误: {e}")
import traceback
traceback.print_exc()
finally:
# 清理资源
get_image.release()
cv2.destroyAllWindows()
print("🔚 测试结束")
if __name__ == "__main__":
main()

227
yolotest2.py Normal file
View File

@@ -0,0 +1,227 @@
"""
从main.py提取的YOLO识别测试文件
使用与main.py相同的识别逻辑
"""
import cv2
from utils.get_image import GetImage
from ultralytics import YOLO
from config import config_manager
import os
# 检查模型文件是否存在
model_path = r"best.pt"
model0_path = r"best0.pt"
if not os.path.exists(model_path):
print(f"❌ 模型文件不存在: {model_path}")
exit(1)
if not os.path.exists(model0_path):
print(f"❌ 模型文件不存在: {model0_path}")
exit(1)
# 加载YOLO模型与main.py保持一致
try:
model = YOLO(model_path).to('cuda')
model0 = YOLO(model0_path).to('cuda')
print(f"✅ 模型加载成功: {model_path}")
print(f"✅ 模型加载成功: {model0_path}")
except Exception as e:
print(f"❌ 模型加载失败: {e}")
exit(1)
def yolo_shibie(im_PIL, detections, model):
"""
YOLO识别函数与main.py中的实现完全一致
:param im_PIL: PIL图像对象
:param detections: 检测结果字典
:param model: YOLO模型
:return: 更新后的detections字典
"""
results = model(im_PIL) # 目标检测
for result in results:
for i in range(len(result.boxes.xyxy)):
left, top, right, bottom = result.boxes.xyxy[i]
scalar_tensor = result.boxes.cls[i]
value = scalar_tensor.item()
label = result.names[int(value)]
if label == 'center' or label == 'next' or label == 'boss' or label == 'zhaozi':
player_x = int(left + (right - left) / 2)
player_y = int(top + (bottom - top) / 2) + 30
RW = [player_x, player_y]
detections[label] = RW
elif label == 'daojv' or label == 'gw':
player_x = int(left + (right - left) / 2)
player_y = int(top + (bottom - top) / 2) + 30
RW = [player_x, player_y]
detections[label].append(RW)
elif label == 'npc1' or label == 'npc2' or label == 'npc3' or label == 'npc4':
player_x = int(left + (right - left) / 2)
player_y = int(bottom) + 30
RW = [player_x, player_y]
detections[label] = RW
return detections
def main():
"""主函数"""
print("="*60)
print("YOLO识别测试main.py逻辑")
print("="*60)
# 从配置加载采集卡设置
active_group = config_manager.get_active_group()
if active_group is None:
print("⚠️ 没有活动的配置组,使用默认设置")
print("提示: 可以运行 python gui_config.py 设置配置")
cam_index = 0
width = 1920
height = 1080
use_model = model # 默认使用model
else:
print(f"📋 使用配置组: {active_group['name']}")
cam_index = active_group['camera_index']
width = active_group['camera_width']
height = active_group['camera_height']
use_model = model0 # 城镇中使用model0
print(f" 使用模型: model0 (best0.pt) - 用于城镇识别")
print(f" 采集卡索引: {cam_index}")
print(f" 分辨率: {width}x{height}")
print()
# 初始化采集卡
print("🔧 正在初始化采集卡...")
get_image = GetImage(
cam_index=cam_index,
width=width,
height=height
)
if get_image.cap is None:
print("❌ 采集卡初始化失败")
print("请检查:")
print("1. 采集卡是否正确连接")
print("2. 采集卡索引是否正确")
print("3. 采集卡驱动是否安装")
return
print("✅ 采集卡初始化成功")
print("\n快捷键:")
print(" 'q' 或 ESC - 退出")
print(" 'm' - 切换模型 (model/model0)")
print(" 'd' - 显示/隐藏检测信息")
print()
try:
frame_count = 0
show_detections = True # 是否显示检测信息
current_model = use_model # 当前使用的模型
current_model_name = "model0" if use_model == model0 else "model"
while True:
# 获取帧
frame_data = get_image.get_frame()
if frame_data is None:
print("⚠️ 无法获取帧,跳过...")
continue
# frame_data 是 [im_opencv_rgb, im_PIL] 格式
im_opencv_rgb, im_PIL = frame_data
if im_PIL is None:
print("⚠️ PIL图像为空跳过...")
continue
# 初始化检测结果字典
detections = {
'center': None, 'next': None,
'npc1': None, 'npc2': None, 'npc3': None, 'npc4': None,
'boss': None, 'zhaozi': None,
'daojv': [], 'gw': []
}
# 执行YOLO检测使用main.py的逻辑
detections = yolo_shibie(im_PIL, detections, current_model)
# 获取绘制好框的图像用于显示
try:
results = current_model(im_PIL)
result = results[0]
frame_with_boxes_rgb = result.plot()
frame_with_boxes_bgr = cv2.cvtColor(frame_with_boxes_rgb, cv2.COLOR_RGB2BGR)
except Exception as e:
print(f"⚠️ 绘制检测框失败: {e}")
frame_with_boxes_bgr = cv2.cvtColor(im_opencv_rgb, cv2.COLOR_RGB2BGR)
# 在图像上显示检测信息
if show_detections:
# 显示模型名称
cv2.putText(frame_with_boxes_bgr, f"Model: {current_model_name}",
(10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2)
# 显示检测到的目标
y_offset = 60
detected_items = []
for key, value in detections.items():
if value is not None and value != []:
if key in ['daojv', 'gw']:
detected_items.append(f"{key}: {len(value)}")
else:
detected_items.append(f"{key}: {value}")
if detected_items:
text = f"Detected: {', '.join(detected_items[:5])}" # 最多显示5个
if len(detected_items) > 5:
text += f" ... (+{len(detected_items)-5})"
cv2.putText(frame_with_boxes_bgr, text,
(10, y_offset), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 0), 2)
# 显示图像
cv2.imshow("YOLO Detection (main.py logic)", frame_with_boxes_bgr)
# 检查按键
key = cv2.waitKey(1) & 0xFF
if key in [27, ord('q'), ord('Q')]:
print("\n用户退出")
break
elif key == ord('m') or key == ord('M'):
# 切换模型
if current_model == model:
current_model = model0
current_model_name = "model0"
else:
current_model = model
current_model_name = "model"
print(f"切换模型: {current_model_name}")
elif key == ord('d') or key == ord('D'):
show_detections = not show_detections
print(f"显示检测信息: {'开启' if show_detections else '关闭'}")
frame_count += 1
if frame_count % 30 == 0: # 每30帧打印一次
print(f"📊 已处理 {frame_count} 帧 (模型: {current_model_name})")
# 打印有检测到的目标
detected_items = {k: v for k, v in detections.items() if v is not None and v != []}
if detected_items:
print(f" 检测到: {detected_items}")
except KeyboardInterrupt:
print("\n\n用户中断测试")
except Exception as e:
print(f"\n❌ 测试过程中发生错误: {e}")
import traceback
traceback.print_exc()
finally:
# 清理资源
get_image.release()
cv2.destroyAllWindows()
print("🔚 测试结束")
if __name__ == "__main__":
main()