Compare commits

...

5 Commits

Author SHA1 Message Date
Administrator
f4a5a0a584 配置提交 2025-11-13 11:28:08 +08:00
ray
520b8818cd 修复键鼠冲突 2025-11-12 00:48:26 +08:00
Administrator
5ecc0f2bf5 配置提交 2025-11-10 22:45:13 +08:00
Administrator
809d07256a 配置提交 2025-11-10 22:44:54 +08:00
ray
1d0d6d0b9f 增加配置对应 2025-11-06 09:48:32 +08:00
8 changed files with 457 additions and 88 deletions

2
.idea/huojv.iml generated
View File

@@ -2,7 +2,7 @@
<module type="PYTHON_MODULE" version="4"> <module type="PYTHON_MODULE" version="4">
<component name="NewModuleRootManager"> <component name="NewModuleRootManager">
<content url="file://$MODULE_DIR$" /> <content url="file://$MODULE_DIR$" />
<orderEntry type="jdk" jdkName="Python 3.9" jdkType="Python SDK" /> <orderEntry type="jdk" jdkName="Python 3.12 virtualenv at C:\Users\Administrator\Downloads\huojv\huojv\.venv" jdkType="Python SDK" />
<orderEntry type="sourceFolder" forTests="false" /> <orderEntry type="sourceFolder" forTests="false" />
</component> </component>
</module> </module>

2
.idea/misc.xml generated
View File

@@ -3,5 +3,5 @@
<component name="Black"> <component name="Black">
<option name="sdkName" value="D:\CONDA\anaconda3" /> <option name="sdkName" value="D:\CONDA\anaconda3" />
</component> </component>
<component name="ProjectRootManager" version="2" project-jdk-name="Python 3.9" project-jdk-type="Python SDK" /> <component name="ProjectRootManager" version="2" project-jdk-name="Python 3.12 virtualenv at C:\Users\Administrator\Downloads\huojv\huojv\.venv" project-jdk-type="Python SDK" />
</project> </project>

View File

@@ -2,22 +2,42 @@
"groups": [ "groups": [
{ {
"name": "服务器", "name": "服务器",
"serial_port": "COM17",
"serial_baudrate": 9600,
"camera_index": 1,
"camera_width": 1920,
"camera_height": 1080,
"move_velocity": 470,
"active": true
},
{
"name": "2号机",
"serial_port": "COM14",
"serial_baudrate": 9600,
"camera_index": 2,
"camera_width": 1920,
"camera_height": 1080,
"move_velocity": 360,
"active": false
},
{
"name": "3号机",
"serial_port": "COM16", "serial_port": "COM16",
"serial_baudrate": 9600, "serial_baudrate": 9600,
"camera_index": 0, "camera_index": 0,
"camera_width": 1920, "camera_width": 1920,
"camera_height": 1080, "camera_height": 1080,
"move_velocity": 470, "move_velocity": 360,
"active": true "active": false
} }
], ],
"display": { "display": {
"preview_width": 1920, "preview_width": 1920,
"preview_height": 1080, "preview_height": 1080,
"preview_columns": 1, "preview_columns": 2,
"preview_rows": 1, "preview_rows": 2,
"show_preview": true, "show_preview": true,
"preview_multi_window": false, "preview_multi_window": false,
"preview_use_all_groups": false "preview_use_all_groups": true
} }
} }

View File

@@ -214,12 +214,27 @@ class ConfigGUI:
# 确保串口下拉框列表与当前值一致 # 确保串口下拉框列表与当前值一致
if group and 'serial_port' in group: if group and 'serial_port' in group:
# 如果当前串口不在选项里,追加
values = list(self.serial_port_cb.cget('values')) if self.serial_port_cb.cget('values') else [] values = list(self.serial_port_cb.cget('values')) if self.serial_port_cb.cget('values') else []
port_value = group['serial_port'] port_value = group['serial_port']
# 如果当前值不在列表中,尝试通过设备名匹配
matched = False
if hasattr(self, 'port_display_map'):
for display_text, device in self.port_display_map.items():
if device == port_value:
if display_text not in values:
values.append(display_text)
self.serial_port_cb.set(display_text)
matched = True
break
if not matched:
# 如果找不到匹配,直接添加原始值
if port_value not in values: if port_value not in values:
values.append(port_value) values.append(port_value)
self.serial_port_cb['values'] = values if not hasattr(self, 'port_display_map'):
self.port_display_map = {}
self.port_display_map[port_value] = port_value
self.serial_port_cb.set(port_value) self.serial_port_cb.set(port_value)
def scan_cameras(self, max_index: int = 10): def scan_cameras(self, max_index: int = 10):
@@ -302,23 +317,59 @@ class ConfigGUI:
self.camera_index_cb.set(found[0]) self.camera_index_cb.set(found[0])
def scan_ports(self): def scan_ports(self):
"""扫描系统可用的串口,并填充下拉框""" """扫描系统可用的串口,并填充下拉框(显示设备描述)"""
from utils.device_scanner import scan_serial_ports_with_info
found_real = [] found_real = []
port_display_map = {} # 显示文本 -> 实际设备名
try:
ports_info = scan_serial_ports_with_info()
for port_info in ports_info:
device = port_info['device']
description = port_info.get('description', '')
hwid = port_info.get('hwid', '')
# 显示格式COM3 (设备描述) [HWID]
if description:
display_text = f"{device} ({description})"
else:
display_text = device
# 如果有HWID添加到显示中较短版本
if hwid:
# 提取VID/PID部分如果存在
if 'VID_' in hwid and 'PID_' in hwid:
import re
vid_match = re.search(r'VID_([0-9A-F]+)', hwid)
pid_match = re.search(r'PID_([0-9A-F]+)', hwid)
if vid_match and pid_match:
display_text += f" [{vid_match.group(1)}:{pid_match.group(1)}]"
found_real.append(display_text)
port_display_map[display_text] = device
except Exception as e:
print(f"扫描串口错误: {e}")
# 回退到简单扫描
try: try:
ports = serial.tools.list_ports.comports() ports = serial.tools.list_ports.comports()
found_real = [port.device for port in ports] found_real = [port.device for port in ports]
# 按端口名排序
found_real.sort(key=lambda x: int(x.replace('COM', '')) if x.replace('COM', '').isdigit() else 999) found_real.sort(key=lambda x: int(x.replace('COM', '')) if x.replace('COM', '').isdigit() else 999)
except Exception as e: port_display_map = {p: p for p in found_real}
print(f"扫描串口错误: {e}") except:
pass
# 如果没有发现实际端口,使用默认端口列表 # 如果没有发现实际端口,使用默认端口列表
if not found_real: if not found_real:
found = ["COM1", "COM2", "COM3", "COM4", "COM5", "COM6"] # 默认给一些常见端口 found = ["COM1", "COM2", "COM3", "COM4", "COM5", "COM6"]
port_display_map = {p: p for p in found}
messagebox.showwarning("警告", "未发现可用串口设备,已添加常用默认选项") messagebox.showwarning("警告", "未发现可用串口设备,已添加常用默认选项")
else: else:
found = found_real found = found_real
messagebox.showinfo("扫描完成", f"发现可用串口: {', '.join(found)}") messagebox.showinfo("扫描完成", f"发现 {len(found)} 个串口设备\n\n设备信息已包含描述和硬件ID")
# 保存映射关系,用于后续获取实际设备名
self.port_display_map = port_display_map
self.serial_port_cb['values'] = found self.serial_port_cb['values'] = found
# 若当前无选择,则选择第一项 # 若当前无选择,则选择第一项
@@ -390,6 +441,11 @@ class ConfigGUI:
else: else:
# 字符串字段 # 字符串字段
value = value_str if value_str else group.get(key, '') value = value_str if value_str else group.get(key, '')
# 特殊处理串口:从显示文本中提取实际设备名
if key == 'serial_port' and hasattr(self, 'port_display_map'):
# 如果值是显示文本,转换为实际设备名
if value in self.port_display_map:
value = self.port_display_map[value]
group[key] = value group[key] = value
except Exception as e: except Exception as e:
@@ -467,6 +523,11 @@ class ConfigGUI:
else: else:
# 字符串字段 # 字符串字段
value = value_str if value_str else group.get(key, '') value = value_str if value_str else group.get(key, '')
# 特殊处理串口:从显示文本中提取实际设备名
if key == 'serial_port' and hasattr(self, 'port_display_map'):
# 如果值是显示文本,转换为实际设备名
if value in self.port_display_map:
value = self.port_display_map[value]
group[key] = value group[key] = value
except Exception as e: except Exception as e:
@@ -500,7 +561,7 @@ class ConfigGUI:
return result return result
def start_program(self): def start_program(self):
"""启动单个配置组的主程序""" """启动单个配置组的主程序并自动弹出采集卡预览窗口"""
# 保存配置(静默) # 保存配置(静默)
if not self.save_config_silent(): if not self.save_config_silent():
messagebox.showerror("错误", "配置保存失败") messagebox.showerror("错误", "配置保存失败")
@@ -527,11 +588,13 @@ class ConfigGUI:
], creationflags=subprocess.CREATE_NEW_CONSOLE if sys.platform == 'win32' else 0) ], creationflags=subprocess.CREATE_NEW_CONSOLE if sys.platform == 'win32' else 0)
messagebox.showinfo("成功", f"已启动配置组: {active_group['name']}\n\n请在控制台查看运行状态") messagebox.showinfo("成功", f"已启动配置组: {active_group['name']}\n\n请在控制台查看运行状态")
# 成功后弹出预览
self.start_preview()
except Exception as e: except Exception as e:
messagebox.showerror("错误", f"启动失败: {e}") messagebox.showerror("错误", f"启动失败: {e}")
def start_multi_program(self): def start_multi_program(self):
"""启动多个配置组的主程序""" """启动多个配置组的主程序并自动弹出采集卡预览窗口"""
# 保存配置(静默) # 保存配置(静默)
if not self.save_config_silent(): if not self.save_config_silent():
messagebox.showerror("错误", "配置保存失败") messagebox.showerror("错误", "配置保存失败")
@@ -547,6 +610,8 @@ class ConfigGUI:
], creationflags=subprocess.CREATE_NEW_CONSOLE if sys.platform == 'win32' else 0) ], creationflags=subprocess.CREATE_NEW_CONSOLE if sys.platform == 'win32' else 0)
messagebox.showinfo("提示", "多配置组启动器已打开\n\n请在控制台中选择要启动的配置组") messagebox.showinfo("提示", "多配置组启动器已打开\n\n请在控制台中选择要启动的配置组")
# 成功后弹出预览
self.start_preview()
except Exception as e: except Exception as e:
messagebox.showerror("错误", f"启动失败: {e}") messagebox.showerror("错误", f"启动失败: {e}")
@@ -557,4 +622,3 @@ class ConfigGUI:
if __name__ == "__main__": if __name__ == "__main__":
app = ConfigGUI() app = ConfigGUI()
app.run() app.run()

View File

@@ -69,28 +69,42 @@ def run_automation_for_group(group_index):
v = group['move_velocity'] # 从配置读取移动速度 v = group['move_velocity'] # 从配置读取移动速度
def yolo_shibie(im_PIL, detections, model): def yolo_shibie(im_PIL, detections, model):
try:
results = model(im_PIL) results = model(im_PIL)
for result in results: for result in results:
if result.boxes is None or len(result.boxes.xyxy) == 0:
continue
for i in range(len(result.boxes.xyxy)): for i in range(len(result.boxes.xyxy)):
left, top, right, bottom = result.boxes.xyxy[i] try:
scalar_tensor = result.boxes.cls[i] left = float(result.boxes.xyxy[i][0])
value = scalar_tensor.item() top = float(result.boxes.xyxy[i][1])
label = result.names[int(value)] right = float(result.boxes.xyxy[i][2])
bottom = float(result.boxes.xyxy[i][3])
cls_id = int(result.boxes.cls[i])
label = result.names[cls_id]
if label == 'center' or label == 'next' or label == 'boss' or label == 'zhaozi': if label == 'center' or label == 'next' or label == 'boss' or label == 'zhaozi':
player_x = int(left + (right - left) / 2) player_x = int(left + (right - left) / 2) + 3
player_y = int(top + (bottom - top) / 2) + 30 player_y = int(top + (bottom - top) / 2) + 40
RW = [player_x, player_y] RW = [player_x, player_y]
detections[label] = RW detections[label] = RW
elif label == 'daojv' or label == 'gw': elif label == 'daojv' or label == 'gw':
player_x = int(left + (right - left) / 2) player_x = int(left + (right - left) / 2) + 3
player_y = int(top + (bottom - top) / 2) + 30 player_y = int(top + (bottom - top) / 2) + 40
RW = [player_x, player_y] RW = [player_x, player_y]
if label not in detections:
detections[label] = []
detections[label].append(RW) detections[label].append(RW)
elif label == 'npc1' or label == 'npc2' or label == 'npc3' or label == 'npc4': elif label == 'npc1' or label == 'npc2' or label == 'npc3' or label == 'npc4':
player_x = int(left + (right - left) / 2) player_x = int(left + (right - left) / 2)
player_y = int(bottom) + 30 player_y = int(bottom) + 30
RW = [player_x, player_y] RW = [player_x, player_y]
detections[label] = RW detections[label] = RW
except Exception as e:
print(f"⚠️ 处理检测框时出错: {e}")
continue
except Exception as e:
print(f"⚠️ YOLO检测出错: {e}")
return detections return detections
def sq(p1, p2): def sq(p1, p2):
@@ -119,25 +133,76 @@ def run_automation_for_group(group_index):
best_point = p best_point = p
return best_point return best_point
def safe_keyboard_send(data, max_retries=3):
"""安全的键盘发送函数,带重试机制"""
nonlocal keyboard
for attempt in range(max_retries):
try:
keyboard.send_data(data)
return True
except (serial.serialutil.SerialException, PermissionError, OSError) as e:
if attempt < max_retries - 1:
print(f"⚠️ 键盘发送失败 (尝试 {attempt + 1}/{max_retries}): {e}")
time.sleep(0.1 * (attempt + 1)) # 递增延迟
# 尝试重新初始化串口
try:
if serial.ser and serial.ser.is_open:
serial.ser.close()
time.sleep(0.2)
init_mouse_keyboard(group)
# 重新创建keyboard对象
keyboard = ch9329Comm.keyboard.DataComm()
except Exception as init_e:
print(f"⚠️ 重新初始化串口失败: {init_e}")
else:
print(f"❌ 键盘发送失败,已重试 {max_retries} 次: {e}")
raise
return False
def safe_keyboard_release(max_retries=3):
"""安全的键盘释放函数,带重试机制"""
nonlocal keyboard
for attempt in range(max_retries):
try:
keyboard.release()
return True
except (serial.serialutil.SerialException, PermissionError, OSError) as e:
if attempt < max_retries - 1:
time.sleep(0.1 * (attempt + 1))
# 尝试重新初始化串口
try:
if serial.ser and serial.ser.is_open:
serial.ser.close()
time.sleep(0.2)
init_mouse_keyboard(group)
keyboard = ch9329Comm.keyboard.DataComm()
except Exception as init_e:
print(f"⚠️ 重新初始化串口失败: {init_e}")
else:
print(f"⚠️ 键盘释放失败: {e}")
# 释放失败不算致命错误,继续执行
return False
return False
def move_randomly(rw, k): def move_randomly(rw, k):
k = k % 4 k = k % 4
suiji_t = float(random.randint(10, 13) / 10) suiji_t = float(random.randint(10, 13) / 10)
if k == 0: if k == 0:
keyboard.send_data("66") if safe_keyboard_send("66"):
time.sleep(suiji_t) time.sleep(suiji_t)
keyboard.release() safe_keyboard_release()
elif k == 1: elif k == 1:
keyboard.send_data("88") if safe_keyboard_send("88"):
time.sleep(suiji_t) time.sleep(suiji_t)
keyboard.release() safe_keyboard_release()
elif k == 2: elif k == 2:
keyboard.send_data("44") if safe_keyboard_send("44"):
time.sleep(suiji_t) time.sleep(suiji_t)
keyboard.release() safe_keyboard_release()
elif k == 3: elif k == 3:
keyboard.send_data("22") if safe_keyboard_send("22"):
time.sleep(suiji_t) time.sleep(suiji_t)
keyboard.release() safe_keyboard_release()
return k + 1 return k + 1
def move_to(rw, mb): def move_to(rw, mb):
@@ -145,22 +210,22 @@ def run_automation_for_group(group_index):
nonlocal v nonlocal v
v = group['move_velocity'] v = group['move_velocity']
if rw[0] >= mb[0]: if rw[0] >= mb[0]:
keyboard.send_data("44") if safe_keyboard_send("44"):
time.sleep(float(abs(rw[0] - mb[0]) / v)) time.sleep(float(abs(rw[0] - mb[0]) / v))
keyboard.release() safe_keyboard_release()
else: else:
keyboard.send_data("66") if safe_keyboard_send("66"):
time.sleep(float(abs(rw[0] - mb[0]) / v)) time.sleep(float(abs(rw[0] - mb[0]) / v))
keyboard.release() safe_keyboard_release()
if rw[1] >= mb[1]: if rw[1] >= mb[1]:
keyboard.send_data("88") if safe_keyboard_send("88"):
time.sleep(float(abs(rw[1] - mb[1]) / v)) time.sleep(float(abs(rw[1] - mb[1]) / v))
keyboard.release() safe_keyboard_release()
else: else:
keyboard.send_data("22") if safe_keyboard_send("22"):
time.sleep(float(abs(rw[1] - mb[1]) / v)) time.sleep(float(abs(rw[1] - mb[1]) / v))
keyboard.release() safe_keyboard_release()
# 主循环 # 主循环
print(f"🔄 配置组 {group['name']} 开始自动化循环...") print(f"🔄 配置组 {group['name']} 开始自动化循环...")
@@ -211,9 +276,9 @@ def run_automation_for_group(group_index):
elif detections['npc4'] is not None: elif detections['npc4'] is not None:
if sq(detections['npc4'], rw) < 50: if sq(detections['npc4'], rw) < 50:
print(f"[{group['name']}] 离npc4很近 直接进入") print(f"[{group['name']}] 离npc4很近 直接进入")
keyboard.send_data("DD") if safe_keyboard_send("DD"):
time.sleep(0.15) time.sleep(0.15)
keyboard.release() safe_keyboard_release()
time.sleep(1) time.sleep(1)
im_opencv = get_image.get_frame() im_opencv = get_image.get_frame()
if im_opencv and shizi.daoying(im_opencv[0]): if im_opencv and shizi.daoying(im_opencv[0]):
@@ -257,9 +322,9 @@ def run_automation_for_group(group_index):
if len(detections['daojv']) != 0: if len(detections['daojv']) != 0:
move_to(rw, process_points(detections['daojv'])) move_to(rw, process_points(detections['daojv']))
for i in range(3 + len(detections['daojv'])): for i in range(3 + len(detections['daojv'])):
keyboard.send_data("AA") if safe_keyboard_send("AA"):
time.sleep(0.15) time.sleep(0.15)
keyboard.release() safe_keyboard_release()
continue continue
if shizi.tuichu(im_opencv[0]) and detections['next'] is None and len(detections['daojv']) == 0 and len(detections['gw']) == 0 and boss_pd: if shizi.tuichu(im_opencv[0]) and detections['next'] is None and len(detections['daojv']) == 0 and len(detections['gw']) == 0 and boss_pd:
print(f"[{group['name']}] 识别到可以退出挑战!!!!!!!!!!!!!!!!!!") print(f"[{group['name']}] 识别到可以退出挑战!!!!!!!!!!!!!!!!!!")
@@ -314,9 +379,9 @@ def run_automation_for_group(group_index):
panduan = True panduan = True
move_to(rw, detections['next']) move_to(rw, detections['next'])
for i in range(2): for i in range(2):
keyboard.send_data("DD") if safe_keyboard_send("DD"):
time.sleep(0.15) time.sleep(0.15)
keyboard.release() safe_keyboard_release()
continue continue
else: else:
k = move_randomly(rw, k) k = move_randomly(rw, k)
@@ -334,6 +399,14 @@ def run_automation_for_group(group_index):
traceback.print_exc() traceback.print_exc()
time.sleep(1) time.sleep(1)
# 清理资源
try:
if serial.ser and serial.ser.is_open:
serial.ser.close()
print(f"✅ 配置组 {group['name']} 串口已关闭")
except Exception as e:
print(f"⚠️ 关闭串口时出错: {e}")
if __name__ == "__main__": if __name__ == "__main__":
if len(sys.argv) > 1: if len(sys.argv) > 1:
group_index = int(sys.argv[1]) group_index = int(sys.argv[1])

197
utils/device_scanner.py Normal file
View File

@@ -0,0 +1,197 @@
"""
设备扫描工具 - 通过唯一标识符而非索引来识别设备
解决重启后设备序号混乱的问题
"""
import cv2
import serial.tools.list_ports
import warnings
import sys
import io
from typing import List, Dict, Optional
def get_camera_info(cam_index: int) -> Optional[Dict]:
"""
获取采集卡的详细信息(包括设备名称)
:param cam_index: 采集卡索引
:return: 包含设备信息的字典如果失败返回None
"""
old_stderr = sys.stderr
suppressed_output = io.StringIO()
try:
sys.stderr = suppressed_output
with warnings.catch_warnings():
warnings.filterwarnings('ignore')
# 尝试多种后端
backends = [
(cam_index, cv2.CAP_DSHOW),
(cam_index, cv2.CAP_ANY),
(cam_index, None),
]
cap = None
for idx, backend in backends:
try:
if backend is not None:
cap = cv2.VideoCapture(idx, backend)
else:
cap = cv2.VideoCapture(idx)
if cap.isOpened():
ret, frame = cap.read()
if ret and frame is not None:
# 尝试获取设备名称(不同后端可能支持不同)
device_name = None
backend_name = None
if backend == cv2.CAP_DSHOW:
backend_name = "DirectShow"
# DirectShow可能支持获取设备名称
try:
# 某些情况下可以通过属性获取
pass # OpenCV限制无法直接获取设备名称
except:
pass
elif backend == cv2.CAP_ANY:
backend_name = "Any"
else:
backend_name = "Default"
# 获取分辨率信息作为辅助标识
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
cap.release()
return {
'index': cam_index,
'backend': backend_name,
'width': width,
'height': height,
'name': f"采集卡-{cam_index}",
'available': True
}
else:
if cap:
cap.release()
cap = None
except Exception:
if cap:
try:
cap.release()
except:
pass
cap = None
continue
return None
finally:
sys.stderr = old_stderr
def scan_cameras_with_info(max_index: int = 10) -> List[Dict]:
"""
扫描所有可用采集卡,返回详细信息列表
:param max_index: 最大扫描索引
:return: 采集卡信息列表
"""
cameras = []
for i in range(max_index + 1):
info = get_camera_info(i)
if info:
cameras.append(info)
return cameras
def get_serial_port_info(port_name: str) -> Optional[Dict]:
"""
获取串口的详细信息包括硬件ID和描述
:param port_name: 串口名称(如 "COM3"
:return: 包含串口信息的字典
"""
try:
ports = serial.tools.list_ports.comports()
for port in ports:
if port.device == port_name:
return {
'device': port.device,
'description': port.description,
'hwid': port.hwid, # 硬件ID唯一标识
'vid': port.vid if hasattr(port, 'vid') else None,
'pid': port.pid if hasattr(port, 'pid') else None,
'serial_number': port.serial_number if hasattr(port, 'serial_number') else None,
'manufacturer': port.manufacturer if hasattr(port, 'manufacturer') else None,
'name': port.description or port.device
}
except Exception as e:
print(f"⚠️ 获取串口信息失败: {e}")
return None
def scan_serial_ports_with_info() -> List[Dict]:
"""
扫描所有可用串口,返回详细信息列表
:return: 串口信息列表
"""
ports_info = []
try:
ports = serial.tools.list_ports.comports()
for port in ports:
info = {
'device': port.device,
'description': port.description,
'hwid': port.hwid,
'vid': port.vid if hasattr(port, 'vid') else None,
'pid': port.pid if hasattr(port, 'pid') else None,
'serial_number': port.serial_number if hasattr(port, 'serial_number') else None,
'manufacturer': port.manufacturer if hasattr(port, 'manufacturer') else None,
'name': port.description or port.device
}
ports_info.append(info)
except Exception as e:
print(f"⚠️ 扫描串口失败: {e}")
# 按设备名排序
ports_info.sort(key=lambda x: int(x['device'].replace('COM', '')) if x['device'].replace('COM', '').isdigit() else 999)
return ports_info
def find_camera_by_index(cameras: List[Dict], index: int) -> Optional[Dict]:
"""通过索引查找采集卡"""
for cam in cameras:
if cam['index'] == index:
return cam
return None
def find_serial_by_hwid(ports: List[Dict], hwid: str) -> Optional[Dict]:
"""通过硬件ID查找串口最可靠"""
for port in ports:
if port['hwid'] == hwid:
return port
return None
def find_serial_by_device(ports: List[Dict], device: str) -> Optional[Dict]:
"""通过设备名查找串口(兼容旧配置)"""
for port in ports:
if port['device'] == device:
return port
return None
if __name__ == "__main__":
# 测试代码
print("=" * 60)
print("采集卡扫描:")
print("=" * 60)
cameras = scan_cameras_with_info()
for cam in cameras:
print(f" 索引 {cam['index']}: {cam['name']} ({cam['width']}x{cam['height']})")
print("\n" + "=" * 60)
print("串口扫描:")
print("=" * 60)
ports = scan_serial_ports_with_info()
for port in ports:
print(f" {port['device']}: {port['name']}")
print(f" 硬件ID: {port['hwid']}")
if port['vid'] and port['pid']:
print(f" VID/PID: {port['vid']:04X}/{port['pid']:04X}")

View File

@@ -10,18 +10,32 @@ mouse = None
def init_mouse_keyboard(config_group): def init_mouse_keyboard(config_group):
"""初始化鼠标和串口""" """初始化鼠标和串口"""
global serial, mouse global serial, mouse
from utils.logger import logger
# 初始化串口 # 初始化串口
try:
logger.info(f"🔧 正在打开串口: {config_group['serial_port']} @ {config_group['serial_baudrate']}")
serial.ser = serial.Serial( serial.ser = serial.Serial(
config_group['serial_port'], config_group['serial_port'],
config_group['serial_baudrate'] config_group['serial_baudrate'],
timeout=1
) )
logger.info(f"✅ 串口已打开: {config_group['serial_port']} @ {config_group['serial_baudrate']}")
except Exception as e:
logger.error(f"❌ 串口打开失败: {e}")
raise
# 初始化鼠标 # 初始化鼠标
try:
logger.info(f"🔧 正在初始化鼠标: {config_group['camera_width']}x{config_group['camera_height']}")
mouse = ch9329Comm.mouse.DataComm( mouse = ch9329Comm.mouse.DataComm(
config_group['camera_width'], config_group['camera_width'],
config_group['camera_height'] config_group['camera_height']
) )
print(f"串口已打开: {config_group['serial_port']} {config_group['serial_baudrate']}") logger.info(f"鼠标已初始化: {config_group['camera_width']}x{config_group['camera_height']}")
print(f"✅ 鼠标已初始化: {config_group['camera_width']}x{config_group['camera_height']}") except Exception as e:
logger.error(f"❌ 鼠标初始化失败: {e}")
raise
def bezier_point(t, p0, p1, p2, p3): def bezier_point(t, p0, p1, p2, p3):
"""计算三次贝塞尔曲线上的点""" """计算三次贝塞尔曲线上的点"""

View File

@@ -224,3 +224,4 @@ def main():
if __name__ == "__main__": if __name__ == "__main__":
main() main()