package com.mes.task.service.impl; import com.fasterxml.jackson.databind.ObjectMapper; import com.mes.task.entity.MultiDeviceTask; import com.mes.task.entity.TaskStepDetail; import com.mes.task.service.TaskStatusNotificationService; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service; import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; import java.io.IOException; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; /** * 任务状态通知服务实现 * * @author mes * @since 2025-01-XX */ @Slf4j @Service public class TaskStatusNotificationServiceImpl implements TaskStatusNotificationService { private final ObjectMapper objectMapper = new ObjectMapper(); // 存储所有SSE连接:taskId -> List private final Map> connections = new ConcurrentHashMap<>(); // 存储所有任务的连接(taskId为null时使用) private final List allTaskConnections = new CopyOnWriteArrayList<>(); // 连接超时时间(毫秒) private static final long TIMEOUT = 30 * 60 * 1000L; // 30分钟 @Override public SseEmitter createConnection(String taskId) { SseEmitter emitter = new SseEmitter(TIMEOUT); // 设置完成和超时回调 emitter.onCompletion(() -> { log.info("SSE连接完成: taskId={}", taskId); removeConnection(taskId, emitter); }); emitter.onTimeout(() -> { log.info("SSE连接超时: taskId={}", taskId); removeConnection(taskId, emitter); }); emitter.onError((ex) -> { log.error("SSE连接错误: taskId={}", taskId, ex); removeConnection(taskId, emitter); }); // 添加到连接列表 if (taskId != null && !taskId.isEmpty()) { connections.computeIfAbsent(taskId, k -> new CopyOnWriteArrayList<>()).add(emitter); } else { allTaskConnections.add(emitter); } try { // 发送初始连接成功消息 Map initData = new HashMap<>(); if (taskId != null) { initData.put("taskId", taskId); } emitter.send(SseEmitter.event() .name("connected") .data(createMessage("连接成功", initData))); } catch (IOException e) { log.error("发送初始消息失败: taskId={}", taskId, e); removeConnection(taskId, emitter); return null; } log.info("创建SSE连接: taskId={}, 当前连接数={}", taskId, getConnectionCount(taskId)); return emitter; } @Override public void notifyTaskStatus(MultiDeviceTask task) { if (task == null || task.getTaskId() == null) { return; } String taskId = task.getTaskId(); Map data = createTaskStatusData(task); // 发送给指定任务的连接 sendToConnections(taskId, "taskStatus", data); // 发送给所有任务的连接 sendToAllTaskConnections("taskStatus", data); log.debug("推送任务状态: taskId={}, status={}", taskId, task.getStatus()); } @Override public void notifyStepUpdate(String taskId, TaskStepDetail step) { if (taskId == null || step == null) { return; } Map data = createStepData(step); // 发送给指定任务的连接 sendToConnections(taskId, "stepUpdate", data); // 发送给所有任务的连接 Map allTaskData = new HashMap<>(); allTaskData.put("taskId", taskId); allTaskData.put("step", data); sendToAllTaskConnections("stepUpdate", allTaskData); log.debug("推送步骤更新: taskId={}, stepOrder={}, status={}", taskId, step.getStepOrder(), step.getStatus()); } @Override public void notifyStepsUpdate(String taskId, List steps) { if (taskId == null || steps == null) { return; } Map data = new HashMap<>(); data.put("taskId", taskId); data.put("steps", steps); data.put("stepCount", steps.size()); // 发送给指定任务的连接 sendToConnections(taskId, "stepsUpdate", data); // 发送给所有任务的连接 sendToAllTaskConnections("stepsUpdate", data); log.debug("推送步骤列表更新: taskId={}, stepCount={}", taskId, steps.size()); } @Override public void closeConnections(String taskId) { if (taskId == null) { return; } List emitters = connections.remove(taskId); if (emitters != null) { for (SseEmitter emitter : emitters) { try { emitter.complete(); } catch (Exception e) { log.warn("关闭SSE连接失败: taskId={}", taskId, e); } } log.info("关闭任务连接: taskId={}, 连接数={}", taskId, emitters.size()); } } @Override public void closeAllConnections() { // 关闭所有任务连接 for (Map.Entry> entry : connections.entrySet()) { for (SseEmitter emitter : entry.getValue()) { try { emitter.complete(); } catch (Exception e) { log.warn("关闭SSE连接失败: taskId={}", entry.getKey(), e); } } } connections.clear(); // 关闭所有任务监听连接 for (SseEmitter emitter : allTaskConnections) { try { emitter.complete(); } catch (Exception e) { log.warn("关闭SSE连接失败", e); } } allTaskConnections.clear(); log.info("关闭所有SSE连接"); } /** * 发送消息到指定任务的连接 */ private void sendToConnections(String taskId, String eventName, Map data) { List emitters = connections.get(taskId); if (emitters == null || emitters.isEmpty()) { return; } List toRemove = new CopyOnWriteArrayList<>(); for (SseEmitter emitter : emitters) { try { emitter.send(SseEmitter.event() .name(eventName) .data(createMessage("", data))); } catch (IOException e) { // 客户端断开连接是正常情况,使用DEBUG级别 if (e instanceof org.apache.catalina.connector.ClientAbortException) { log.debug("客户端断开SSE连接: taskId={}, event={}", taskId, eventName); } else { log.warn("发送SSE消息失败: taskId={}, event={}", taskId, eventName, e); } toRemove.add(emitter); } } // 移除失败的连接 emitters.removeAll(toRemove); } /** * 发送消息到所有任务监听连接 */ private void sendToAllTaskConnections(String eventName, Map data) { if (allTaskConnections.isEmpty()) { return; } List toRemove = new CopyOnWriteArrayList<>(); for (SseEmitter emitter : allTaskConnections) { try { emitter.send(SseEmitter.event() .name(eventName) .data(createMessage("", data))); } catch (IOException e) { // 客户端断开连接是正常情况,使用DEBUG级别 if (e instanceof org.apache.catalina.connector.ClientAbortException) { log.debug("客户端断开SSE连接: event={}", eventName); } else { log.warn("发送SSE消息失败: event={}", eventName, e); } toRemove.add(emitter); } } // 移除失败的连接 allTaskConnections.removeAll(toRemove); } /** * 移除连接 */ private void removeConnection(String taskId, SseEmitter emitter) { if (taskId != null && !taskId.isEmpty()) { List emitters = connections.get(taskId); if (emitters != null) { emitters.remove(emitter); if (emitters.isEmpty()) { connections.remove(taskId); } } } else { allTaskConnections.remove(emitter); } } /** * 获取连接数 */ private int getConnectionCount(String taskId) { if (taskId != null && !taskId.isEmpty()) { List emitters = connections.get(taskId); return emitters != null ? emitters.size() : 0; } return allTaskConnections.size(); } /** * 创建任务状态数据 */ private Map createTaskStatusData(MultiDeviceTask task) { Map data = new HashMap<>(); data.put("taskId", task.getTaskId() != null ? task.getTaskId() : ""); data.put("groupId", task.getGroupId() != null ? task.getGroupId() : ""); data.put("status", task.getStatus() != null ? task.getStatus() : ""); data.put("currentStep", task.getCurrentStep() != null ? task.getCurrentStep() : 0); data.put("totalSteps", task.getTotalSteps() != null ? task.getTotalSteps() : 0); data.put("startTime", task.getStartTime() != null ? task.getStartTime().getTime() : 0); data.put("endTime", task.getEndTime() != null ? task.getEndTime().getTime() : 0); data.put("errorMessage", task.getErrorMessage() != null ? task.getErrorMessage() : ""); return data; } /** * 创建步骤数据 */ private Map createStepData(TaskStepDetail step) { Map data = new HashMap<>(); data.put("id", step.getId() != null ? step.getId() : 0); data.put("stepOrder", step.getStepOrder() != null ? step.getStepOrder() : 0); data.put("deviceId", step.getDeviceId() != null ? step.getDeviceId() : ""); data.put("stepName", step.getStepName() != null ? step.getStepName() : ""); data.put("status", step.getStatus() != null ? step.getStatus() : ""); data.put("startTime", step.getStartTime() != null ? step.getStartTime().getTime() : 0); data.put("endTime", step.getEndTime() != null ? step.getEndTime().getTime() : 0); data.put("durationMs", step.getDurationMs() != null ? step.getDurationMs() : 0); data.put("retryCount", step.getRetryCount() != null ? step.getRetryCount() : 0); data.put("errorMessage", step.getErrorMessage() != null ? step.getErrorMessage() : ""); return data; } /** * 创建消息对象 */ private String createMessage(String message, Map data) { try { Map result = new java.util.HashMap<>(); result.put("timestamp", System.currentTimeMillis()); if (message != null && !message.isEmpty()) { result.put("message", message); } if (data != null && !data.isEmpty()) { result.putAll(data); } return objectMapper.writeValueAsString(result); } catch (Exception e) { log.error("序列化消息失败", e); return "{}"; } } }