Files
huojv/main_single.py
2025-11-06 09:48:32 +08:00

364 lines
15 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
单配置组运行的主程序
用于在独立进程中运行单个配置组的自动化
"""
import cv2
from utils.get_image import GetImage
from utils.mouse import init_mouse_keyboard, Mouse_guiji
from ultralytics import YOLO
import time
import serial
import ch9329Comm
import random
import math
from utils import shizi
from config import config_manager
import sys
def run_automation_for_group(group_index):
"""为单个配置组运行自动化"""
# 重新加载配置
config_manager.load_config()
group = config_manager.get_group_by_index(group_index)
if group is None:
print(f"❌ 配置组索引 {group_index} 不存在")
return
print(f"🚀 启动配置组: {group['name']} (索引: {group_index})")
print(f" 串口: {group['serial_port']}")
print(f" 采集卡: {group['camera_index']}")
# 加载YOLO模型
model = YOLO(r"best.pt").to('cuda')
model0 = YOLO(r"best0.pt").to('cuda')
# 初始化串口和鼠标
try:
init_mouse_keyboard(group)
except Exception as e:
print(f"❌ 初始化串口失败: {e}")
return
# 初始化键盘和鼠标
keyboard = ch9329Comm.keyboard.DataComm()
from utils.mouse import mouse
mouse_gui = Mouse_guiji()
# 初始化采集卡
get_image = GetImage(
cam_index=group['camera_index'],
width=group['camera_width'],
height=group['camera_height']
)
# 检查采集卡是否初始化成功
if get_image.cap is None:
print(f"❌ 采集卡 {group['camera_index']} 初始化失败")
return
print(f"✅ 配置组 {group['name']} 初始化完成")
# 全局变量
left = 0
top = 30
k = 0 # 控制转圈的方向
panduan = False # 是否在图内
boss_pd = False # 是否到boss关卡
rw = (632, 378)
v = group['move_velocity'] # 从配置读取移动速度
def yolo_shibie(im_PIL, detections, model):
try:
results = model(im_PIL)
for result in results:
if result.boxes is None or len(result.boxes.xyxy) == 0:
continue
for i in range(len(result.boxes.xyxy)):
try:
left = float(result.boxes.xyxy[i][0])
top = float(result.boxes.xyxy[i][1])
right = float(result.boxes.xyxy[i][2])
bottom = float(result.boxes.xyxy[i][3])
cls_id = int(result.boxes.cls[i])
label = result.names[cls_id]
if label == 'center' or label == 'next' or label == 'boss' or label == 'zhaozi':
player_x = int(left + (right - left) / 2) + 3
player_y = int(top + (bottom - top) / 2) + 40
RW = [player_x, player_y]
detections[label] = RW
elif label == 'daojv' or label == 'gw':
player_x = int(left + (right - left) / 2) + 3
player_y = int(top + (bottom - top) / 2) + 40
RW = [player_x, player_y]
if label not in detections:
detections[label] = []
detections[label].append(RW)
elif label == 'npc1' or label == 'npc2' or label == 'npc3' or label == 'npc4':
player_x = int(left + (right - left) / 2)
player_y = int(bottom) + 30
RW = [player_x, player_y]
detections[label] = RW
except Exception as e:
print(f"⚠️ 处理检测框时出错: {e}")
continue
except Exception as e:
print(f"⚠️ YOLO检测出错: {e}")
return detections
def sq(p1, p2):
"""计算两点之间的欧式距离"""
return math.sqrt((p1[0] - p2[0]) ** 2 + (p1[1] - p2[1]) ** 2)
def process_points(points):
if not points:
return None
n = len(points)
if n == 1:
return points[0]
elif n == 2:
x = (points[0][0] + points[1][0]) / 2
y = (points[0][1] + points[1][1]) / 2
return [x, y]
else:
sample_points = random.sample(points, 3)
min_sum = float('inf')
best_point = None
for p in points:
dist_sum = sum(sq(p, sp) for sp in sample_points)
if dist_sum < min_sum:
min_sum = dist_sum
best_point = p
return best_point
def move_randomly(rw, k):
k = k % 4
suiji_t = float(random.randint(10, 13) / 10)
if k == 0:
keyboard.send_data("66")
time.sleep(suiji_t)
keyboard.release()
elif k == 1:
keyboard.send_data("88")
time.sleep(suiji_t)
keyboard.release()
elif k == 2:
keyboard.send_data("44")
time.sleep(suiji_t)
keyboard.release()
elif k == 3:
keyboard.send_data("22")
time.sleep(suiji_t)
keyboard.release()
return k + 1
def move_to(rw, mb):
"""使用配置的v值"""
nonlocal v
v = group['move_velocity']
if rw[0] >= mb[0]:
keyboard.send_data("44")
time.sleep(float(abs(rw[0] - mb[0]) / v))
keyboard.release()
else:
keyboard.send_data("66")
time.sleep(float(abs(rw[0] - mb[0]) / v))
keyboard.release()
if rw[1] >= mb[1]:
keyboard.send_data("88")
time.sleep(float(abs(rw[1] - mb[1]) / v))
keyboard.release()
else:
keyboard.send_data("22")
time.sleep(float(abs(rw[1] - mb[1]) / v))
keyboard.release()
# 主循环
print(f"🔄 配置组 {group['name']} 开始自动化循环...")
while True:
try:
detections = {
'center': None,
'next': None,
'npc1': None,
'npc2': None,
'npc3': None,
'npc4': None,
'boss': None,
'daojv': [],
'gw': [],
'zhaozi': None
}
im_opencv = get_image.get_frame()
if im_opencv is None:
time.sleep(0.1)
continue
detections = yolo_shibie(im_opencv[1], detections, model)
if shizi.tuwai(im_opencv[0]):
im_opencv = get_image.get_frame()
if im_opencv is None:
time.sleep(0.1)
continue
detections = yolo_shibie(im_opencv[1], detections, model0)
print(f'[{group["name"]}] 当前在城镇中')
if detections['npc1'] is not None and sq(rw, detections['npc1']) > 80:
print(f"[{group['name']}] 向npc1靠近")
move_to(rw, detections['npc1'])
continue
elif detections['npc1'] is not None and sq(rw, detections['npc1']) <= 80:
print(f"[{group['name']}] 在npc旁边向上走")
mb = (detections['npc1'][0], detections['npc1'][1] - 1010)
move_to(rw, mb)
continue
elif detections['npc3'] is not None and detections['npc4'] is None:
print(f"[{group['name']}] 在npc3旁边向右走")
mb = (rw[0], detections['npc3'][1] - 50)
move_to(rw, mb)
mb = (rw[0] + 700, rw[1])
move_to(rw, mb)
continue
elif detections['npc4'] is not None:
if sq(detections['npc4'], rw) < 50:
print(f"[{group['name']}] 离npc4很近 直接进入")
keyboard.send_data("DD")
time.sleep(0.15)
keyboard.release()
time.sleep(1)
im_opencv = get_image.get_frame()
if im_opencv and shizi.daoying(im_opencv[0]):
mouse_gui.send_data_absolute(rw[0], rw[1] - 110, may=1)
time.sleep(1)
continue
else:
print(f"[{group['name']}] 离npc4有点远 点击进入")
move_to(rw, detections['npc4'])
time.sleep(1)
im_opencv = get_image.get_frame()
if im_opencv and shizi.daoying(im_opencv[0]):
mouse_gui.send_data_absolute(rw[0], rw[1] - 110, may=1)
time.sleep(1)
continue
elif shizi.tiaozhan(im_opencv[0]):
print(f'[{group["name"]}] 进入塔4')
mouse_gui.send_data_absolute(left + 1100, top + 600, may=1)
time.sleep(0.3)
mouse_gui.send_data_absolute(left + 433, top + 455, may=1)
panduan = True
continue
elif shizi.jieshu(im_opencv[0]):
print(f'[{group["name"]}] 结束挑战')
mouse_gui.send_data_absolute(left + 542, top + 644, may=1)
time.sleep(0.8)
mouse_gui.send_data_absolute(left + 706, top + 454, may=1)
continue
elif panduan:
if shizi.shuzi(im_opencv[0]):
boss_pd = True
print(f"[{group['name']}] 进入到boss")
if shizi.fuhuo(im_opencv[0]):
print(f'[{group["name"]}] 点击复活')
mouse_gui.send_data_absolute(left + 536, top + 627, may=1)
mouse_gui.send_data_absolute(rw[0], rw[1], may=0)
continue
if detections['zhaozi'] is not None:
move_to(rw, detections['zhaozi'])
continue
if len(detections['daojv']) != 0:
move_to(rw, process_points(detections['daojv']))
for i in range(3 + len(detections['daojv'])):
keyboard.send_data("AA")
time.sleep(0.15)
keyboard.release()
continue
if shizi.tuichu(im_opencv[0]) and detections['next'] is None and len(detections['daojv']) == 0 and len(detections['gw']) == 0 and boss_pd:
print(f"[{group['name']}] 识别到可以退出挑战!!!!!!!!!!!!!!!!!!")
for i in range(3):
time.sleep(0.5)
im_opencv = get_image.get_frame()
if im_opencv is None:
continue
detections = {
'center': None, 'next': None,
'npc1': None, 'npc2': None, 'npc3': None, 'npc4': None,
'boss': None, 'zhaozi': None,
'daojv': [], 'gw': []
}
detections = yolo_shibie(im_opencv[1], detections, model)
if detections['next'] is not None or len(detections['daojv']) != 0 or len(detections['gw']) != 0 or detections['boss'] is not None:
break
else:
mouse_gui.send_data_absolute(left + 640, top + 40, may=1)
panduan = False
boss_pd = False
time.sleep(2.0)
continue
if detections['center'] is None and detections['next'] is None and (len(detections['gw']) != 0 or detections["boss"] is not None):
if len(detections['gw']) != 0:
move_to(rw, detections['gw'][0])
time.sleep(0.26)
elif detections['boss'] is not None:
boss_suiji1 = random.randint(-30, 30)
boss_suiji2 = random.randint(-30, 30)
detections['boss'] = (detections['boss'][0] + boss_suiji1, detections['boss'][1] + boss_suiji2)
move_to(rw, detections['boss'])
time.sleep(0.7)
continue
elif (detections['center'] is not None and detections['next'] is None and len(detections['gw']) != 0) or (boss_pd == True and detections['center'] is not None and detections['next'] is None):
if detections['center'][0] >= rw[0] and detections['center'][1] < rw[1]:
mb = (rw[0] + 100, rw[1])
elif detections['center'][0] <= rw[0] and detections['center'][1] < rw[1]:
mb = (rw[0], rw[1] - 100)
elif detections['center'][0] <= rw[0] and detections['center'][1] > rw[1]:
mb = (rw[0] - 100, rw[1])
elif detections['center'][0] >= rw[0] and detections['center'][1] > rw[1]:
mb = (rw[0], rw[1] + 100)
move_to(rw, mb)
time.sleep(0.25)
continue
elif boss_pd == True and detections['center'] is None and detections['next'] is None:
k = move_randomly(rw, k)
continue
elif detections['next'] is not None:
print(f'[{group["name"]}] 进入下一层啦啦啦啦啦啦')
panduan = True
move_to(rw, detections['next'])
for i in range(2):
keyboard.send_data("DD")
time.sleep(0.15)
keyboard.release()
continue
else:
k = move_randomly(rw, k)
continue
elif shizi.daoying(im_opencv[0]):
mouse_gui.send_data_absolute(rw[0], rw[1] - 110, may=1)
time.sleep(1)
continue
except KeyboardInterrupt:
print(f"🛑 配置组 {group['name']} 停止运行")
break
except Exception as e:
print(f"❌ 配置组 {group['name']} 运行错误: {e}")
import traceback
traceback.print_exc()
time.sleep(1)
if __name__ == "__main__":
if len(sys.argv) > 1:
group_index = int(sys.argv[1])
run_automation_for_group(group_index)
else:
# 默认运行活动配置组
active_group = config_manager.get_active_group()
if active_group is None:
print("❌ 没有活动的配置组")
sys.exit(1)
group_index = config_manager.config['groups'].index(active_group)
run_automation_for_group(group_index)