package com.mes.task.service.impl;
|
|
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
|
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
|
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
|
import com.fasterxml.jackson.core.JsonProcessingException;
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
import com.mes.device.entity.DeviceConfig;
|
import com.mes.device.entity.DeviceGroupConfig;
|
import com.mes.device.mapper.DeviceGroupRelationMapper;
|
import com.mes.device.service.DeviceGroupConfigService;
|
import com.mes.task.dto.MultiDeviceTaskQuery;
|
import com.mes.task.dto.MultiDeviceTaskRequest;
|
import com.mes.task.dto.TaskParameters;
|
import com.mes.task.entity.MultiDeviceTask;
|
import com.mes.task.entity.TaskStepDetail;
|
import com.mes.task.mapper.MultiDeviceTaskMapper;
|
import com.mes.task.mapper.TaskStepDetailMapper;
|
import com.mes.task.model.TaskExecutionResult;
|
import com.mes.task.service.MultiDeviceTaskService;
|
import com.mes.task.service.TaskExecutionEngine;
|
import com.mes.task.service.TaskStatusNotificationService;
|
import lombok.extern.slf4j.Slf4j;
|
import org.springframework.scheduling.annotation.Async;
|
import org.springframework.stereotype.Service;
|
import org.springframework.transaction.annotation.Transactional;
|
import org.springframework.util.CollectionUtils;
|
import org.springframework.util.StringUtils;
|
|
import java.text.SimpleDateFormat;
|
import java.util.Date;
|
import java.util.List;
|
import java.util.Locale;
|
import java.util.Map;
|
|
/**
|
* 多设备任务服务实现
|
*/
|
@Slf4j
|
@Service
|
public class MultiDeviceTaskServiceImpl extends ServiceImpl<MultiDeviceTaskMapper, MultiDeviceTask>
|
implements MultiDeviceTaskService {
|
|
private final DeviceGroupConfigService deviceGroupConfigService;
|
private final DeviceGroupRelationMapper deviceGroupRelationMapper;
|
private final TaskStepDetailMapper taskStepDetailMapper;
|
private final TaskExecutionEngine taskExecutionEngine;
|
private final TaskStatusNotificationService notificationService;
|
private final ObjectMapper objectMapper;
|
|
public MultiDeviceTaskServiceImpl(
|
DeviceGroupConfigService deviceGroupConfigService,
|
DeviceGroupRelationMapper deviceGroupRelationMapper,
|
TaskStepDetailMapper taskStepDetailMapper,
|
TaskExecutionEngine taskExecutionEngine,
|
TaskStatusNotificationService notificationService,
|
ObjectMapper objectMapper) {
|
this.deviceGroupConfigService = deviceGroupConfigService;
|
this.deviceGroupRelationMapper = deviceGroupRelationMapper;
|
this.taskStepDetailMapper = taskStepDetailMapper;
|
this.taskExecutionEngine = taskExecutionEngine;
|
this.notificationService = notificationService;
|
this.objectMapper = objectMapper;
|
}
|
|
@Override
|
@Transactional(rollbackFor = Exception.class)
|
public MultiDeviceTask startTask(MultiDeviceTaskRequest request) {
|
DeviceGroupConfig groupConfig = deviceGroupConfigService.getDeviceGroupById(request.getGroupId());
|
if (groupConfig == null) {
|
throw new IllegalArgumentException("设备组不存在: " + request.getGroupId());
|
}
|
if (groupConfig.getStatus() != DeviceGroupConfig.Status.ENABLED) {
|
throw new IllegalStateException("设备组未启用,无法执行任务");
|
}
|
|
List<DeviceConfig> devices = deviceGroupRelationMapper.getOrderedDeviceConfigs(groupConfig.getId());
|
if (CollectionUtils.isEmpty(devices)) {
|
throw new IllegalStateException("设备组未配置任何设备,无法执行任务");
|
}
|
|
TaskParameters parameters = request.getParameters();
|
if (parameters == null) {
|
parameters = new TaskParameters();
|
}
|
|
// 默认允许卧转立扫码设备在任务执行阶段获取玻璃信息
|
boolean hasGlassIds = !CollectionUtils.isEmpty(parameters.getGlassIds());
|
if (!hasGlassIds) {
|
log.info("测试任务未提供玻璃ID,将在设备组流程中由卧转立扫码设备采集玻璃信息: groupId={}",
|
groupConfig.getId());
|
}
|
|
// 创建任务记录
|
MultiDeviceTask task = new MultiDeviceTask();
|
task.setTaskId(generateTaskId(groupConfig));
|
task.setGroupId(String.valueOf(groupConfig.getId()));
|
task.setProjectId(String.valueOf(groupConfig.getProjectId()));
|
task.setStatus(MultiDeviceTask.Status.PENDING.name());
|
task.setCurrentStep(0);
|
task.setTotalSteps(devices.size());
|
task.setStartTime(new Date());
|
save(task);
|
|
// 异步执行任务,立即返回任务ID
|
executeTaskAsync(task, groupConfig, devices, parameters);
|
|
log.info("设备组任务已启动(异步执行): taskId={}, groupId={}, groupName={}",
|
task.getTaskId(), groupConfig.getId(), groupConfig.getGroupName());
|
|
return task;
|
}
|
|
/**
|
* 异步执行设备组任务
|
* 每个设备组作为独立线程执行,互不阻塞
|
*/
|
@Async("deviceGroupTaskExecutor")
|
public void executeTaskAsync(MultiDeviceTask task,
|
DeviceGroupConfig groupConfig,
|
List<DeviceConfig> devices,
|
TaskParameters parameters) {
|
try {
|
log.info("开始执行设备组任务: taskId={}, groupId={}, deviceCount={}",
|
task.getTaskId(), groupConfig.getId(), devices.size());
|
|
// 更新任务状态为运行中
|
task.setStatus(MultiDeviceTask.Status.RUNNING.name());
|
updateById(task);
|
|
// 通知任务开始
|
notificationService.notifyTaskStatus(task);
|
|
// 执行任务
|
TaskExecutionResult result = taskExecutionEngine.execute(task, groupConfig, devices, parameters);
|
|
// 检查任务数据中是否包含持续运行的标记
|
Map<String, Object> resultData = result.getData();
|
boolean isContinuousTask = resultData != null && "任务已启动,定时器在后台运行中".equals(resultData.get("message"));
|
|
// 如果是持续运行的任务(定时器模式),保持 RUNNING 状态,不更新为 COMPLETED
|
if (isContinuousTask && result.isSuccess()) {
|
log.info("任务已启动定时器,保持运行状态: taskId={}, message={}",
|
task.getTaskId(), resultData.get("message"));
|
task.setResultData(writeJson(resultData));
|
updateById(task);
|
// 通知任务状态(保持 RUNNING)
|
notificationService.notifyTaskStatus(task);
|
return;
|
}
|
|
// 更新任务结果(非持续运行的任务)
|
task.setStatus(result.isSuccess() ? MultiDeviceTask.Status.COMPLETED.name() : MultiDeviceTask.Status.FAILED.name());
|
task.setErrorMessage(result.isSuccess() ? null : result.getMessage());
|
task.setEndTime(new Date());
|
task.setResultData(writeJson(resultData));
|
updateById(task);
|
|
// 通知任务完成
|
notificationService.notifyTaskStatus(task);
|
|
log.info("设备组任务执行完成: taskId={}, success={}, message={}",
|
task.getTaskId(), result.isSuccess(), result.getMessage());
|
|
} catch (Exception ex) {
|
log.error("设备组任务执行异常: taskId={}, groupId={}", task.getTaskId(), groupConfig.getId(), ex);
|
|
// 更新任务状态为失败
|
task.setStatus(MultiDeviceTask.Status.FAILED.name());
|
task.setErrorMessage(ex.getMessage());
|
task.setEndTime(new Date());
|
updateById(task);
|
|
// 通知任务失败
|
notificationService.notifyTaskStatus(task);
|
}
|
}
|
|
@Override
|
public MultiDeviceTask getTaskByTaskId(String taskId) {
|
LambdaQueryWrapper<MultiDeviceTask> wrapper = new LambdaQueryWrapper<>();
|
wrapper.eq(MultiDeviceTask::getTaskId, taskId);
|
return getOne(wrapper);
|
}
|
|
@Override
|
public List<TaskStepDetail> getTaskSteps(String taskId) {
|
LambdaQueryWrapper<TaskStepDetail> wrapper = new LambdaQueryWrapper<>();
|
wrapper.eq(TaskStepDetail::getTaskId, taskId);
|
wrapper.orderByAsc(TaskStepDetail::getStepOrder);
|
return taskStepDetailMapper.selectList(wrapper);
|
}
|
|
@Override
|
public boolean cancelTask(String taskId) {
|
MultiDeviceTask task = getTaskByTaskId(taskId);
|
if (task == null) {
|
return false;
|
}
|
// 允许在 RUNNING 或 FAILED 状态下执行取消操作
|
String status = task.getStatus();
|
boolean cancellable = MultiDeviceTask.Status.RUNNING.name().equals(status)
|
|| MultiDeviceTask.Status.FAILED.name().equals(status);
|
if (!cancellable) {
|
return false;
|
}
|
// 标记任务取消并停止所有定时器
|
taskExecutionEngine.requestTaskCancellation(taskId);
|
task.setStatus(MultiDeviceTask.Status.CANCELLED.name());
|
task.setEndTime(new Date());
|
boolean updated = updateById(task);
|
if (updated) {
|
notificationService.notifyTaskStatus(task);
|
}
|
return updated;
|
}
|
|
@Override
|
public Page<MultiDeviceTask> queryTasks(MultiDeviceTaskQuery query) {
|
int page = query.getPage() != null && query.getPage() > 0 ? query.getPage() : 1;
|
int size = query.getSize() != null && query.getSize() > 0 ? query.getSize() : 10;
|
Page<MultiDeviceTask> pageParam = new Page<>(page, size);
|
|
LambdaQueryWrapper<MultiDeviceTask> wrapper = new LambdaQueryWrapper<>();
|
if (query.getGroupId() != null) {
|
wrapper.eq(MultiDeviceTask::getGroupId, String.valueOf(query.getGroupId()));
|
}
|
if (StringUtils.hasText(query.getStatus())) {
|
wrapper.eq(MultiDeviceTask::getStatus, query.getStatus().toUpperCase(Locale.ROOT));
|
}
|
wrapper.orderByDesc(MultiDeviceTask::getCreatedTime);
|
return page(pageParam, wrapper);
|
}
|
|
private String generateTaskId(DeviceGroupConfig groupConfig) {
|
SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHmmss", Locale.CHINA);
|
return "TASK_" + groupConfig.getId() + "_" + sdf.format(new Date());
|
}
|
|
private String writeJson(Object data) {
|
if (data == null) {
|
return "{}";
|
}
|
try {
|
return objectMapper.writeValueAsString(data);
|
} catch (JsonProcessingException e) {
|
return "{}";
|
}
|
}
|
}
|