package com.mes.task.service.impl; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.extension.plugins.pagination.Page; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.mes.device.entity.DeviceConfig; import com.mes.device.entity.DeviceGroupConfig; import com.mes.device.mapper.DeviceGroupRelationMapper; import com.mes.device.service.DeviceGroupConfigService; import com.mes.task.dto.MultiDeviceTaskQuery; import com.mes.task.dto.MultiDeviceTaskRequest; import com.mes.task.dto.TaskParameters; import com.mes.task.entity.MultiDeviceTask; import com.mes.task.entity.TaskStepDetail; import com.mes.task.mapper.MultiDeviceTaskMapper; import com.mes.task.mapper.TaskStepDetailMapper; import com.mes.task.model.TaskExecutionResult; import com.mes.task.service.MultiDeviceTaskService; import com.mes.task.service.TaskExecutionEngine; import com.mes.task.service.TaskStatusNotificationService; import lombok.extern.slf4j.Slf4j; import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; import org.springframework.util.CollectionUtils; import org.springframework.util.StringUtils; import java.text.SimpleDateFormat; import java.util.Date; import java.util.List; import java.util.Locale; import java.util.Map; /** * 多设备任务服务实现 */ @Slf4j @Service public class MultiDeviceTaskServiceImpl extends ServiceImpl implements MultiDeviceTaskService { private final DeviceGroupConfigService deviceGroupConfigService; private final DeviceGroupRelationMapper deviceGroupRelationMapper; private final TaskStepDetailMapper taskStepDetailMapper; private final TaskExecutionEngine taskExecutionEngine; private final TaskStatusNotificationService notificationService; private final ObjectMapper objectMapper; public MultiDeviceTaskServiceImpl( DeviceGroupConfigService deviceGroupConfigService, DeviceGroupRelationMapper deviceGroupRelationMapper, TaskStepDetailMapper taskStepDetailMapper, TaskExecutionEngine taskExecutionEngine, TaskStatusNotificationService notificationService, ObjectMapper objectMapper) { this.deviceGroupConfigService = deviceGroupConfigService; this.deviceGroupRelationMapper = deviceGroupRelationMapper; this.taskStepDetailMapper = taskStepDetailMapper; this.taskExecutionEngine = taskExecutionEngine; this.notificationService = notificationService; this.objectMapper = objectMapper; } @Override @Transactional(rollbackFor = Exception.class) public MultiDeviceTask startTask(MultiDeviceTaskRequest request) { DeviceGroupConfig groupConfig = deviceGroupConfigService.getDeviceGroupById(request.getGroupId()); if (groupConfig == null) { throw new IllegalArgumentException("设备组不存在: " + request.getGroupId()); } if (groupConfig.getStatus() != DeviceGroupConfig.Status.ENABLED) { throw new IllegalStateException("设备组未启用,无法执行任务"); } List devices = deviceGroupRelationMapper.getOrderedDeviceConfigs(groupConfig.getId()); if (CollectionUtils.isEmpty(devices)) { throw new IllegalStateException("设备组未配置任何设备,无法执行任务"); } TaskParameters parameters = request.getParameters(); if (parameters == null) { parameters = new TaskParameters(); } // 默认允许卧转立扫码设备在任务执行阶段获取玻璃信息 boolean hasGlassIds = !CollectionUtils.isEmpty(parameters.getGlassIds()); if (!hasGlassIds) { log.info("测试任务未提供玻璃ID,将在设备组流程中由卧转立扫码设备采集玻璃信息: groupId={}", groupConfig.getId()); } // 创建任务记录 MultiDeviceTask task = new MultiDeviceTask(); task.setTaskId(generateTaskId(groupConfig)); task.setGroupId(String.valueOf(groupConfig.getId())); task.setProjectId(String.valueOf(groupConfig.getProjectId())); task.setStatus(MultiDeviceTask.Status.PENDING.name()); task.setCurrentStep(0); task.setTotalSteps(devices.size()); task.setStartTime(new Date()); save(task); // 异步执行任务,立即返回任务ID executeTaskAsync(task, groupConfig, devices, parameters); log.info("设备组任务已启动(异步执行): taskId={}, groupId={}, groupName={}", task.getTaskId(), groupConfig.getId(), groupConfig.getGroupName()); return task; } /** * 异步执行设备组任务 * 每个设备组作为独立线程执行,互不阻塞 */ @Async("deviceGroupTaskExecutor") public void executeTaskAsync(MultiDeviceTask task, DeviceGroupConfig groupConfig, List devices, TaskParameters parameters) { try { log.info("开始执行设备组任务: taskId={}, groupId={}, deviceCount={}", task.getTaskId(), groupConfig.getId(), devices.size()); // 更新任务状态为运行中 task.setStatus(MultiDeviceTask.Status.RUNNING.name()); updateById(task); // 通知任务开始 notificationService.notifyTaskStatus(task); // 执行任务 TaskExecutionResult result = taskExecutionEngine.execute(task, groupConfig, devices, parameters); // 检查任务数据中是否包含持续运行的标记 Map resultData = result.getData(); boolean isContinuousTask = resultData != null && "任务已启动,定时器在后台运行中".equals(resultData.get("message")); // 如果是持续运行的任务(定时器模式),保持 RUNNING 状态,不更新为 COMPLETED if (isContinuousTask && result.isSuccess()) { log.info("任务已启动定时器,保持运行状态: taskId={}, message={}", task.getTaskId(), resultData.get("message")); task.setResultData(writeJson(resultData)); updateById(task); // 通知任务状态(保持 RUNNING) notificationService.notifyTaskStatus(task); return; } // 更新任务结果(非持续运行的任务) task.setStatus(result.isSuccess() ? MultiDeviceTask.Status.COMPLETED.name() : MultiDeviceTask.Status.FAILED.name()); task.setErrorMessage(result.isSuccess() ? null : result.getMessage()); task.setEndTime(new Date()); task.setResultData(writeJson(resultData)); updateById(task); // 通知任务完成 notificationService.notifyTaskStatus(task); log.info("设备组任务执行完成: taskId={}, success={}, message={}", task.getTaskId(), result.isSuccess(), result.getMessage()); } catch (Exception ex) { log.error("设备组任务执行异常: taskId={}, groupId={}", task.getTaskId(), groupConfig.getId(), ex); // 更新任务状态为失败 task.setStatus(MultiDeviceTask.Status.FAILED.name()); task.setErrorMessage(ex.getMessage()); task.setEndTime(new Date()); updateById(task); // 通知任务失败 notificationService.notifyTaskStatus(task); } } @Override public MultiDeviceTask getTaskByTaskId(String taskId) { LambdaQueryWrapper wrapper = new LambdaQueryWrapper<>(); wrapper.eq(MultiDeviceTask::getTaskId, taskId); return getOne(wrapper); } @Override public List getTaskSteps(String taskId) { LambdaQueryWrapper wrapper = new LambdaQueryWrapper<>(); wrapper.eq(TaskStepDetail::getTaskId, taskId); wrapper.orderByAsc(TaskStepDetail::getStepOrder); return taskStepDetailMapper.selectList(wrapper); } @Override public boolean cancelTask(String taskId) { MultiDeviceTask task = getTaskByTaskId(taskId); if (task == null) { return false; } if (!MultiDeviceTask.Status.RUNNING.name().equals(task.getStatus())) { return false; } taskExecutionEngine.requestTaskCancellation(taskId); task.setStatus(MultiDeviceTask.Status.CANCELLED.name()); task.setEndTime(new Date()); boolean updated = updateById(task); if (updated) { notificationService.notifyTaskStatus(task); } return updated; } @Override public Page queryTasks(MultiDeviceTaskQuery query) { int page = query.getPage() != null && query.getPage() > 0 ? query.getPage() : 1; int size = query.getSize() != null && query.getSize() > 0 ? query.getSize() : 10; Page pageParam = new Page<>(page, size); LambdaQueryWrapper wrapper = new LambdaQueryWrapper<>(); if (query.getGroupId() != null) { wrapper.eq(MultiDeviceTask::getGroupId, String.valueOf(query.getGroupId())); } if (StringUtils.hasText(query.getStatus())) { wrapper.eq(MultiDeviceTask::getStatus, query.getStatus().toUpperCase(Locale.ROOT)); } wrapper.orderByDesc(MultiDeviceTask::getCreatedTime); return page(pageParam, wrapper); } private String generateTaskId(DeviceGroupConfig groupConfig) { SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHmmss", Locale.CHINA); return "TASK_" + groupConfig.getId() + "_" + sdf.format(new Date()); } private String writeJson(Object data) { if (data == null) { return "{}"; } try { return objectMapper.writeValueAsString(data); } catch (JsonProcessingException e) { return "{}"; } } }