Files
huojv/main_multi.py
2025-11-04 11:32:16 +08:00

164 lines
5.9 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
多配置组启动器
支持同时启动多个配置组的自动化程序
"""
import multiprocessing
import sys
from config import config_manager
from main_single import run_automation_for_group
from utils.logger import logger, throttle
import logging
import time
def main():
"""主函数"""
# 重新加载配置
config_manager.load_config()
# 获取所有配置组
groups = config_manager.config.get('groups', [])
if not groups:
logger.error("❌ 没有找到任何配置组")
logger.info("请先运行 gui_config.py 创建配置组")
return
# 询问要启动哪些配置组
logger.info("=" * 60)
logger.info("🔥 多配置组启动器")
logger.info("=" * 60)
logger.info("\n可用配置组:")
for i, group in enumerate(groups):
active_mark = "" if group.get('active', False) else " "
logger.info(f" [{i}] {active_mark} {group['name']}")
logger.info(f" 串口: {group['serial_port']} | 采集卡: {group['camera_index']}")
logger.info("\n选择启动方式:")
logger.info(" 1. 启动所有活动配置组")
logger.info(" 2. 启动所有配置组")
logger.info(" 3. 选择特定配置组")
logger.info(" 0. 退出")
choice = input("\n请选择 (0-3): ").strip()
selected_indices = []
if choice == "0":
logger.info("👋 退出")
return
elif choice == "1":
# 启动所有活动配置组
selected_indices = [i for i, g in enumerate(groups) if g.get('active', False)]
if not selected_indices:
logger.error("❌ 没有活动的配置组")
return
logger.info(f"\n✅ 将启动 {len(selected_indices)} 个活动配置组")
elif choice == "2":
# 启动所有配置组
selected_indices = list(range(len(groups)))
logger.info(f"\n✅ 将启动所有 {len(selected_indices)} 个配置组")
elif choice == "3":
# 选择特定配置组
indices_input = input("请输入要启动的配置组索引(用逗号分隔,如: 0,1,2): ").strip()
try:
selected_indices = [int(x.strip()) for x in indices_input.split(',')]
# 验证索引有效性
selected_indices = [i for i in selected_indices if 0 <= i < len(groups)]
if not selected_indices:
logger.error("❌ 没有有效的配置组索引")
return
logger.info(f"\n✅ 将启动 {len(selected_indices)} 个配置组")
except ValueError:
logger.error("❌ 输入格式错误")
return
else:
logger.error("❌ 无效选择")
return
# 显示将要启动的配置组
logger.info("\n将要启动的配置组:")
for idx in selected_indices:
group = groups[idx]
logger.info(f"{group['name']} (串口:{group['serial_port']}, 采集卡:{group['camera_index']})")
# 串口冲突预检:同一串口被多个组占用通常会导致仅一路成功
port_to_groups = {}
for idx in selected_indices:
g = groups[idx]
port_to_groups.setdefault(g['serial_port'], []).append(g['name'])
conflicts = {p: names for p, names in port_to_groups.items() if p and len(names) > 1}
if conflicts:
logger.warning("⚠️ 检测到串口冲突同一COM被多个组使用")
for p, names in conflicts.items():
logger.warning(f" {p}: {', '.join(names)}")
go_on = input("上述冲突很可能导致仅一组成功,其它失败。仍要继续? (y/n): ").strip().lower()
if go_on != 'y':
logger.info("已取消启动以避免串口冲突")
return
confirm = input("\n确认启动? (y/n): ").strip().lower()
if confirm != 'y':
logger.info("❌ 取消启动")
return
# 启动多进程
processes = []
for idx in selected_indices:
group = groups[idx]
logger.info(f"启动进程: {group['name']}...")
process = multiprocessing.Process(
target=run_automation_for_group,
args=(idx,),
name=f"Group-{idx}-{group['name']}"
)
process.start()
processes.append((idx, group['name'], process))
logger.info(f"{group['name']} 已启动 (PID: {process.pid})")
logger.info(f"\n✅ 成功启动 {len(processes)} 个配置组进程")
logger.info("\n" + "=" * 60)
logger.info("运行状态:")
logger.info("=" * 60)
# 监控进程状态
try:
while True:
alive_count = 0
for idx, name, proc in processes:
if proc.is_alive():
alive_count += 1
else:
logger.warning(f"⚠️ {name} 进程已退出 (退出码: {proc.exitcode})")
if alive_count == 0:
logger.info("\n所有进程已退出")
break
time.sleep(2)
# 打印存活状态
alive_names = [name for idx, name, proc in processes if proc.is_alive()]
if alive_names:
# 每2秒节流一次状态行
throttle("multi_alive", 2.0, logging.INFO, f"📊 运行中: {', '.join(alive_names)} ({alive_count}/{len(processes)})")
except KeyboardInterrupt:
logger.warning("\n\n🛑 收到停止信号,正在关闭所有进程...")
for idx, name, proc in processes:
if proc.is_alive():
logger.info(f"正在停止 {name}...")
proc.terminate()
proc.join(timeout=5)
if proc.is_alive():
logger.warning(f"强制停止 {name}...")
proc.kill()
logger.info(f"{name} 已停止")
logger.info("\n👋 所有进程已停止")
if __name__ == "__main__":
multiprocessing.freeze_support() # Windows下需要
main()