package com.mes.task.service.impl;
|
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
import com.mes.task.entity.MultiDeviceTask;
|
import com.mes.task.entity.TaskStepDetail;
|
import com.mes.task.service.TaskStatusNotificationService;
|
import lombok.extern.slf4j.Slf4j;
|
import org.springframework.stereotype.Service;
|
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
|
|
import java.io.IOException;
|
import java.util.Collections;
|
import java.util.HashMap;
|
import java.util.List;
|
import java.util.Map;
|
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.CopyOnWriteArrayList;
|
|
/**
|
* 任务状态通知服务实现
|
*
|
* @author mes
|
* @since 2025-01-XX
|
*/
|
@Slf4j
|
@Service
|
public class TaskStatusNotificationServiceImpl implements TaskStatusNotificationService {
|
|
private final ObjectMapper objectMapper = new ObjectMapper();
|
|
// 存储所有SSE连接:taskId -> List<SseEmitter>
|
private final Map<String, List<SseEmitter>> connections = new ConcurrentHashMap<>();
|
|
// 存储所有任务的连接(taskId为null时使用)
|
private final List<SseEmitter> allTaskConnections = new CopyOnWriteArrayList<>();
|
|
// 连接超时时间(毫秒)
|
private static final long TIMEOUT = 30 * 60 * 1000L; // 30分钟
|
|
@Override
|
public SseEmitter createConnection(String taskId) {
|
SseEmitter emitter = new SseEmitter(TIMEOUT);
|
|
// 设置完成和超时回调
|
emitter.onCompletion(() -> {
|
log.info("SSE连接完成: taskId={}", taskId);
|
removeConnection(taskId, emitter);
|
});
|
|
emitter.onTimeout(() -> {
|
log.info("SSE连接超时: taskId={}", taskId);
|
removeConnection(taskId, emitter);
|
});
|
|
emitter.onError((ex) -> {
|
log.error("SSE连接错误: taskId={}", taskId, ex);
|
removeConnection(taskId, emitter);
|
});
|
|
// 添加到连接列表
|
if (taskId != null && !taskId.isEmpty()) {
|
connections.computeIfAbsent(taskId, k -> new CopyOnWriteArrayList<>()).add(emitter);
|
} else {
|
allTaskConnections.add(emitter);
|
}
|
|
try {
|
// 发送初始连接成功消息
|
Map<String, Object> initData = new HashMap<>();
|
if (taskId != null) {
|
initData.put("taskId", taskId);
|
}
|
emitter.send(SseEmitter.event()
|
.name("connected")
|
.data(createMessage("连接成功", initData)));
|
} catch (IOException e) {
|
log.error("发送初始消息失败: taskId={}", taskId, e);
|
removeConnection(taskId, emitter);
|
return null;
|
}
|
|
log.info("创建SSE连接: taskId={}, 当前连接数={}", taskId, getConnectionCount(taskId));
|
return emitter;
|
}
|
|
@Override
|
public void notifyTaskStatus(MultiDeviceTask task) {
|
if (task == null || task.getTaskId() == null) {
|
return;
|
}
|
|
String taskId = task.getTaskId();
|
Map<String, Object> data = createTaskStatusData(task);
|
|
// 发送给指定任务的连接
|
sendToConnections(taskId, "taskStatus", data);
|
|
// 发送给所有任务的连接
|
sendToAllTaskConnections("taskStatus", data);
|
|
log.debug("推送任务状态: taskId={}, status={}", taskId, task.getStatus());
|
}
|
|
@Override
|
public void notifyStepUpdate(String taskId, TaskStepDetail step) {
|
if (taskId == null || step == null) {
|
return;
|
}
|
|
Map<String, Object> data = createStepData(step);
|
|
// 发送给指定任务的连接
|
sendToConnections(taskId, "stepUpdate", data);
|
|
// 发送给所有任务的连接
|
Map<String, Object> allTaskData = new HashMap<>();
|
allTaskData.put("taskId", taskId);
|
allTaskData.put("step", data);
|
sendToAllTaskConnections("stepUpdate", allTaskData);
|
|
log.debug("推送步骤更新: taskId={}, stepOrder={}, status={}",
|
taskId, step.getStepOrder(), step.getStatus());
|
}
|
|
@Override
|
public void notifyStepsUpdate(String taskId, List<TaskStepDetail> steps) {
|
if (taskId == null || steps == null) {
|
return;
|
}
|
|
Map<String, Object> data = new HashMap<>();
|
data.put("taskId", taskId);
|
data.put("steps", steps);
|
data.put("stepCount", steps.size());
|
|
// 发送给指定任务的连接
|
sendToConnections(taskId, "stepsUpdate", data);
|
|
// 发送给所有任务的连接
|
sendToAllTaskConnections("stepsUpdate", data);
|
|
log.debug("推送步骤列表更新: taskId={}, stepCount={}", taskId, steps.size());
|
}
|
|
@Override
|
public void closeConnections(String taskId) {
|
if (taskId == null) {
|
return;
|
}
|
|
List<SseEmitter> emitters = connections.remove(taskId);
|
if (emitters != null) {
|
for (SseEmitter emitter : emitters) {
|
try {
|
emitter.complete();
|
} catch (Exception e) {
|
log.warn("关闭SSE连接失败: taskId={}", taskId, e);
|
}
|
}
|
log.info("关闭任务连接: taskId={}, 连接数={}", taskId, emitters.size());
|
}
|
}
|
|
@Override
|
public void closeAllConnections() {
|
// 关闭所有任务连接
|
for (Map.Entry<String, List<SseEmitter>> entry : connections.entrySet()) {
|
for (SseEmitter emitter : entry.getValue()) {
|
try {
|
emitter.complete();
|
} catch (Exception e) {
|
log.warn("关闭SSE连接失败: taskId={}", entry.getKey(), e);
|
}
|
}
|
}
|
connections.clear();
|
|
// 关闭所有任务监听连接
|
for (SseEmitter emitter : allTaskConnections) {
|
try {
|
emitter.complete();
|
} catch (Exception e) {
|
log.warn("关闭SSE连接失败", e);
|
}
|
}
|
allTaskConnections.clear();
|
|
log.info("关闭所有SSE连接");
|
}
|
|
/**
|
* 发送消息到指定任务的连接
|
*/
|
private void sendToConnections(String taskId, String eventName, Map<String, Object> data) {
|
List<SseEmitter> emitters = connections.get(taskId);
|
if (emitters == null || emitters.isEmpty()) {
|
return;
|
}
|
|
List<SseEmitter> toRemove = new CopyOnWriteArrayList<>();
|
for (SseEmitter emitter : emitters) {
|
try {
|
emitter.send(SseEmitter.event()
|
.name(eventName)
|
.data(createMessage("", data)));
|
} catch (IOException e) {
|
log.warn("发送SSE消息失败: taskId={}, event={}", taskId, eventName, e);
|
toRemove.add(emitter);
|
}
|
}
|
|
// 移除失败的连接
|
emitters.removeAll(toRemove);
|
}
|
|
/**
|
* 发送消息到所有任务监听连接
|
*/
|
private void sendToAllTaskConnections(String eventName, Map<String, Object> data) {
|
if (allTaskConnections.isEmpty()) {
|
return;
|
}
|
|
List<SseEmitter> toRemove = new CopyOnWriteArrayList<>();
|
for (SseEmitter emitter : allTaskConnections) {
|
try {
|
emitter.send(SseEmitter.event()
|
.name(eventName)
|
.data(createMessage("", data)));
|
} catch (IOException e) {
|
log.warn("发送SSE消息失败: event={}", eventName, e);
|
toRemove.add(emitter);
|
}
|
}
|
|
// 移除失败的连接
|
allTaskConnections.removeAll(toRemove);
|
}
|
|
/**
|
* 移除连接
|
*/
|
private void removeConnection(String taskId, SseEmitter emitter) {
|
if (taskId != null && !taskId.isEmpty()) {
|
List<SseEmitter> emitters = connections.get(taskId);
|
if (emitters != null) {
|
emitters.remove(emitter);
|
if (emitters.isEmpty()) {
|
connections.remove(taskId);
|
}
|
}
|
} else {
|
allTaskConnections.remove(emitter);
|
}
|
}
|
|
/**
|
* 获取连接数
|
*/
|
private int getConnectionCount(String taskId) {
|
if (taskId != null && !taskId.isEmpty()) {
|
List<SseEmitter> emitters = connections.get(taskId);
|
return emitters != null ? emitters.size() : 0;
|
}
|
return allTaskConnections.size();
|
}
|
|
/**
|
* 创建任务状态数据
|
*/
|
private Map<String, Object> createTaskStatusData(MultiDeviceTask task) {
|
Map<String, Object> data = new HashMap<>();
|
data.put("taskId", task.getTaskId() != null ? task.getTaskId() : "");
|
data.put("groupId", task.getGroupId() != null ? task.getGroupId() : "");
|
data.put("status", task.getStatus() != null ? task.getStatus() : "");
|
data.put("currentStep", task.getCurrentStep() != null ? task.getCurrentStep() : 0);
|
data.put("totalSteps", task.getTotalSteps() != null ? task.getTotalSteps() : 0);
|
data.put("startTime", task.getStartTime() != null ? task.getStartTime().getTime() : 0);
|
data.put("endTime", task.getEndTime() != null ? task.getEndTime().getTime() : 0);
|
data.put("errorMessage", task.getErrorMessage() != null ? task.getErrorMessage() : "");
|
return data;
|
}
|
|
/**
|
* 创建步骤数据
|
*/
|
private Map<String, Object> createStepData(TaskStepDetail step) {
|
Map<String, Object> data = new HashMap<>();
|
data.put("id", step.getId() != null ? step.getId() : 0);
|
data.put("stepOrder", step.getStepOrder() != null ? step.getStepOrder() : 0);
|
data.put("deviceId", step.getDeviceId() != null ? step.getDeviceId() : "");
|
data.put("stepName", step.getStepName() != null ? step.getStepName() : "");
|
data.put("status", step.getStatus() != null ? step.getStatus() : "");
|
data.put("startTime", step.getStartTime() != null ? step.getStartTime().getTime() : 0);
|
data.put("endTime", step.getEndTime() != null ? step.getEndTime().getTime() : 0);
|
data.put("durationMs", step.getDurationMs() != null ? step.getDurationMs() : 0);
|
data.put("retryCount", step.getRetryCount() != null ? step.getRetryCount() : 0);
|
data.put("errorMessage", step.getErrorMessage() != null ? step.getErrorMessage() : "");
|
return data;
|
}
|
|
/**
|
* 创建消息对象
|
*/
|
private String createMessage(String message, Map<String, Object> data) {
|
try {
|
Map<String, Object> result = new java.util.HashMap<>();
|
result.put("timestamp", System.currentTimeMillis());
|
if (message != null && !message.isEmpty()) {
|
result.put("message", message);
|
}
|
if (data != null && !data.isEmpty()) {
|
result.putAll(data);
|
}
|
return objectMapper.writeValueAsString(result);
|
} catch (Exception e) {
|
log.error("序列化消息失败", e);
|
return "{}";
|
}
|
}
|
}
|