hangzhoumesParent/common/servicebase/src/main/java/com/mes/tools/WebSocketServer.java
@@ -10,166 +10,131 @@ import javax.websocket.*; import javax.websocket.server.PathParam; import javax.websocket.server.ServerEndpoint; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.io.IOException; import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.Semaphore; @ServerEndpoint(value = "/api/talk/{username}") @Component("webSocketServer") @Component public class WebSocketServer { public static ConfigurableApplicationContext applicationContext; private static final Logger log = LoggerFactory.getLogger(WebSocketServer.class); private List<String> messages; /** * 记录当前在线连接数 */ public static final Map<String, ArrayList<WebSocketServer>> sessionMap = new ConcurrentHashMap<>(); private static final int MAX_MESSAGE_SIZE = 50000; // 单次消息分块阈值 private static final Semaphore semaphore = new Semaphore(100); // 流量控制 public String username; public Session session; // 按用户名分组存储Session(线程安全) private static final Map<String, List<WebSocketServer>> sessionMap = new ConcurrentHashMap<>(); public WebSocketServer() { this.messages = new ArrayList<>(); } // 当前连接的用户名和Session private String webSocketName; private Session session; private final List<String> messages = new CopyOnWriteArrayList<>(); // 线程安全消息记录 /** * 连接建立成功调用的方法 * 连接建立 */ @OnOpen public void onOpen(Session session, @PathParam("username") String username) { this.username = username; public void onOpen(Session session, @PathParam("webSocketName") String webSocketName) { this.webSocketName = webSocketName; this.session = session; List<WebSocketServer> webSocketServers = sessionMap.get(username); if (webSocketServers == null) { ArrayList<WebSocketServer> arrayListwebserver = new ArrayList<WebSocketServer>(); arrayListwebserver.add(this); sessionMap.put(username, arrayListwebserver); } else { webSocketServers.add(this); } log.info("有新用户加入,username={}, 当前在线人数为:{}", username, sessionMap.get(username).size()); sessionMap.computeIfAbsent(webSocketName, k -> new CopyOnWriteArrayList<>()) .add(this); // JSONObject result = new JSONObject(); // JSONArray array = new JSONArray(); // result.set("users", array); // for (Object key : sessionMap.keySet()) { // JSONObject jsonObject = new JSONObject(); // jsonObject.set("username", key); // array.add(jsonObject); // } // sendAllMessage(JSONUtil.toJsonStr(result)); // 后台发送消息给所有的客户端 log.info("用户连接: webSocketName={}, 当前会话数: {}", webSocketName, sessionMap.getOrDefault(webSocketName, Collections.emptyList()).size()); } /** * 连接关闭调用的方法 * 连接关闭 */ @OnClose public void onClose(Session session, @PathParam("username") String username) { List<WebSocketServer> webSocketServers = sessionMap.get(username); ArrayList<WebSocketServer> arrayListwebserver = new ArrayList<WebSocketServer>(); if (webSocketServers.size() > 1) { for (WebSocketServer webSocketServer : webSocketServers) { if (webSocketServer != this) { arrayListwebserver.add(webSocketServer); public void onClose() { List<WebSocketServer> sessions = sessionMap.get(webSocketName); if (sessions != null) { sessions.remove(this); if (sessions.isEmpty()) { sessionMap.remove(webSocketName); } log.info("用户断开: webSocketName={}, 剩余会话数: {}", webSocketName, sessionMap.getOrDefault(webSocketName, Collections.emptyList()).size()); } sessionMap.put(username, arrayListwebserver); log.info("移除username={}一名用户session, {}的当前在线人数为:{}", username, username, sessionMap.get(username).size()); } else { sessionMap.remove(username); log.info("移除username={}一名用户session, {}连接关闭, 当前连接数为:{}", username, username, sessionMap.size()); } } /** * 收到客户端消息后调用的方法 * 后台收到客户端发送过来的消息 * onMessage 是一个消息的中转站 * 接受 浏览器端 socket.send 发送过来的 json数据 * * @param message 客户端发送过来的消息 * 接收消息 */ @OnMessage public void onMessage(String message, Session session, @PathParam("username") String username) { log.info("服务端收到用户username={}的消息:{}", username, message); public void onMessage(String message) { log.info("收到消息: webSocketName={}, content={}", webSocketName, message); JSONObject obj = JSONUtil.parseObj(message); String text = obj.getStr("data"); JSONObject jsonObject = new JSONObject(); jsonObject.set("message", text); this.messages.add(text); } @OnError public void onError(Session session, Throwable error) { log.error("发生错误"); error.printStackTrace(); messages.add(obj.getStr("data")); // 存储消息历史 } /** * 服务端发送消息给客户端 * 错误处理 */ public void sendMessage(String message) { @OnError public void onError(Throwable error) { log.error("WebSocket错误: webSocketName={}", webSocketName, error); } /** * 向当前用户的所有会话发送消息 */ public void sendToWeb(String webSocketName, String message) { List<WebSocketServer> sessions = sessionMap.get(webSocketName); if (sessions == null) return; sessions.forEach(ws -> { try { // log.info("服务端给客户端[{}]发送消息{}", this.session.getId(), message); if(this.session.isOpen()){ int maxChunkSize = 50000; // 定义最大的分块大小 int length = message.length(); if(length>50000){ int chunks = (int) Math.ceil((double) length / maxChunkSize); //分块发送消息 for (int i = 0; i < chunks; i++) { int startIndex = i * maxChunkSize; int endIndex = Math.min(startIndex + maxChunkSize, length); String chunk = message.substring(startIndex, endIndex); // 判断是否是最后一块消息 boolean isLastChunk = (i == chunks - 1); if(isLastChunk==true){ chunk+="<END>"; } // 发送分块消息,并传递是否是最后一块消息的标识 this.session.getBasicRemote().sendText(chunk); } }else{ this.session.getBasicRemote().sendText(message); } } semaphore.acquire(); ws.sendChunkedMessage(message); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } catch (Exception e) { log.error("发送消息给客户端失败:{}", e.getMessage(), e); log.error("推送失败: webSocketName={}", webSocketName, e); } finally { semaphore.release(); } }); } /** * 分块发送大消息 */ private void sendChunkedMessage(String message) { if (!session.isOpen()) return; try { if (message.length() <= MAX_MESSAGE_SIZE) { session.getBasicRemote().sendText(message); return; } // 分块发送 int chunks = (int) Math.ceil((double) message.length() / MAX_MESSAGE_SIZE); for (int i = 0; i < chunks; i++) { int start = i * MAX_MESSAGE_SIZE; int end = Math.min(start + MAX_MESSAGE_SIZE, message.length()); String chunk = message.substring(start, end) + (i == chunks - 1 ? "<END>" : ""); session.getBasicRemote().sendText(chunk); } } catch (IOException e) { log.error("消息发送失败: webSocketName={}", webSocketName, e); } } // /** // * 服务端发送消息给所有客户端 // */ // public void sendAllMessage(String message) { // try { // for (WebSocketServer webSocketServer : sessionMap.values()) { // // log.info("服务端给客户端[{}]发送消息{}", this.session.getId(), message); // webSocketServer.sendMessage(message); // } // } catch (Exception e) { // log.error("服务端发送消息给客户端失败", e); // } // } // --- 工具方法 --- public static Set<String> getOnlineUsers() { return sessionMap.keySet(); } public List<String> getMessages() { return messages; return Collections.unmodifiableList(messages); } public void clearMessages() { messages.clear(); } } hangzhoumesParent/common/servicebase/src/main/java/com/mes/tools/WebSocketUtils.java
New file @@ -0,0 +1,31 @@ package com.mes.tools; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; import javax.annotation.Resource; /** * @Author : zhoush * @Date: 2025/8/22 14:58 * @Description: */ @Slf4j @Component public class WebSocketUtils { @Resource WebSocketServer webSocketServer; @Resource ObjectMapper objectMapper; public <T> void sendToWeb(String webSocketName, T t) { try { String message = objectMapper.writeValueAsString(t); webSocketServer.sendToWeb(webSocketName, message); } catch (JsonProcessingException ex) { log.info("{}发送数据失败:{}", webSocketName, ex.getMessage()); } } } hangzhoumesParent/moduleService/CacheGlassModule/src/main/java/com/mes/job/PushMessageToIndex.java
@@ -18,7 +18,7 @@ import com.mes.opctask.entity.EdgStorageDeviceTaskHistory; import com.mes.opctask.service.EdgStorageDeviceTaskHistoryService; import com.mes.opctask.service.EdgStorageDeviceTaskService; import com.mes.tools.WebSocketServer; import com.mes.tools.WebSocketUtils; import lombok.extern.slf4j.Slf4j; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; @@ -29,7 +29,6 @@ import java.time.LocalDateTime; import java.time.ZoneId; import java.time.format.DateTimeFormatter; import java.util.ArrayList; import java.util.Date; import java.util.List; @@ -57,10 +56,8 @@ @Resource LargenScreenService largenScreenService; private static final String EDG_STORAGE_DEVICE_ONE_TASK = "edg_storage_device_one_task"; private static final String EDG_STORAGE_DEVICE_TWO_TASK = "edg_storage_device_two_task"; @Resource private WebSocketUtils webSocketUtils; @Scheduled(fixedDelay = 1000) public void CacheGlassOneTasks() { @@ -84,22 +81,7 @@ .eq(EdgStorageDeviceTaskHistory::getTaskState, Const.RAW_GLASS_TASK_NEW) .orderByDesc(EdgStorageDeviceTaskHistory::getCreateTime).last("limit 1")); jsonObject.append("taskMessage", taskHistory); ArrayList<WebSocketServer> sendwServer = WebSocketServer.sessionMap.get(webSocketName); if (sendwServer != null) { for (WebSocketServer webserver : sendwServer) { if (webserver != null) { webserver.sendMessage(jsonObject.toString()); List<String> messages = webserver.getMessages(); if (!messages.isEmpty()) { // // 将最后一个消息转换为整数类型的列表 webserver.clearMessages(); } } else { log.info("Home is closed"); } } } webSocketUtils.sendToWeb(webSocketName, jsonObject); } @Scheduled(fixedDelay = 1000) @@ -114,21 +96,7 @@ public void currentCutDrawingTaskChild(String webSocketName, int deviceId, int stationCell) { JSONObject jsonObject = edgStorageCageDetailsService.queryCurrentCutDrawing(deviceId, stationCell); ArrayList<WebSocketServer> sendwServer = WebSocketServer.sessionMap.get(webSocketName); if (sendwServer != null) { for (WebSocketServer webserver : sendwServer) { if (webserver != null) { webserver.sendMessage(jsonObject.toString()); List<String> messages = webserver.getMessages(); if (!messages.isEmpty()) { // // 将最后一个消息转换为整数类型的列表 webserver.clearMessages(); } } else { log.info("Home is closed"); } } } webSocketUtils.sendToWeb(webSocketName, jsonObject); } @Scheduled(fixedDelay = 1000) @@ -137,53 +105,22 @@ //磨边信息 List<EdgGlassTaskInfo> edgTasks = edgGlassTaskInfoService.selectEdgInfo(); jsonObject.append("edgTasks", edgTasks); ArrayList<WebSocketServer> sendwServer = WebSocketServer.sessionMap.get("edgTasks"); if (sendwServer != null) { for (WebSocketServer webserver : sendwServer) { if (webserver != null) { webserver.sendMessage(jsonObject.toString()); List<String> messages = webserver.getMessages(); if (!messages.isEmpty()) { // // 将最后一个消息转换为整数类型的列表 webserver.clearMessages(); } } else { log.info("edgTasks is closed"); } } } webSocketUtils.sendToWeb("edgTasks", jsonObject); } @Scheduled(fixedDelay = 5000) public void querySameDayProductionTask() { JSONObject jsonObject = new JSONObject(); ArrayList<WebSocketServer> sendwServer = WebSocketServer.sessionMap.get("largenScreenProduction"); if (sendwServer != null) { List<DailyProductionVO> productionVO = largenScreenService.querySameDayProduction(new DateRequest()); jsonObject.append("productionVO", productionVO); for (WebSocketServer webserver : sendwServer) { if (webserver != null) { webserver.sendMessage(jsonObject.toString()); List<String> messages = webserver.getMessages(); if (!messages.isEmpty()) { // // 将最后一个消息转换为整数类型的列表 webserver.clearMessages(); } } else { log.info("largenScreenProduction is closed"); } } } webSocketUtils.sendToWeb("largenScreenProduction", jsonObject); } @Scheduled(fixedDelay = 5000) public void largenScreen() { JSONObject jsonObject = new JSONObject(); //磨边信息 ArrayList<WebSocketServer> sendwServer = WebSocketServer.sessionMap.get("largenScreen"); if (sendwServer != null) { Date startOfToday = new Date(LocalDate.now().atStartOfDay(ZoneId.systemDefault()).toInstant().toEpochMilli()); SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd"); String formatted = sdf.format(startOfToday); @@ -222,18 +159,8 @@ jsonObject.append("pieChartVOS", pieChartVOS); List<RunTime> loadRunTimes = edgStorageDeviceTaskHistoryService.queryRunTimes(LocalDate.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd"))); jsonObject.append("loadRunTimes", loadRunTimes); for (WebSocketServer webserver : sendwServer) { if (webserver != null) { webserver.sendMessage(jsonObject.toString()); List<String> messages = webserver.getMessages(); if (!messages.isEmpty()) { // // 将最后一个消息转换为整数类型的列表 webserver.clearMessages(); } } else { log.info("largenScreen is closed"); } } } webSocketUtils.sendToWeb("largenScreen", jsonObject); } } hangzhoumesParent/moduleService/CacheVerticalGlassModule/src/main/java/com/mes/job/OPCPlcSlicecage.java
@@ -20,7 +20,7 @@ import com.mes.s7.entity.S7DataDLPTwo; import com.mes.temperingglass.entity.TemperingGlassInfo; import com.mes.temperingglass.service.TemperingGlassInfoService; import com.mes.tools.WebSocketServer; import com.mes.tools.WebSocketUtils; import com.mes.utils.RedisUtil; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang.StringUtils; @@ -68,6 +68,9 @@ @Autowired @Qualifier("s7SerializerDLPTwo") private S7Serializer s7SerializerDLPTwo; @Resource private WebSocketUtils webSocketUtils; private JSONObject jsonObject = new JSONObject(); @@ -248,58 +251,25 @@ * fixedDelay : 上一个调用结束后再次调用的延时 */ @Scheduled(fixedDelay = 3000) public void plcStorageCageTask() throws InterruptedException { public void plcStorageCageTask() throws Exception { jsonObject = new JSONObject(); try { //查询使用数据源1查询数据 queryDataSource1(); //查询使用数据源2查询数据 // queryDataSource2(); ArrayList<WebSocketServer> sendwServer = WebSocketServer.sessionMap.get("slicecage"); if (sendwServer != null) { for (WebSocketServer webserver : sendwServer) { if (webserver != null) { webserver.sendMessage(jsonObject.toString()); List<String> messages = webserver.getMessages(); if (!messages.isEmpty()) { // // 将最后一个消息转换为整数类型的列表 webserver.clearMessages(); } } else { log.info("Home is closed"); } } } } catch (Exception e) { e.printStackTrace(); } webSocketUtils.sendToWeb("slicecage", jsonObject); } @Scheduled(fixedDelay = 1000) public void largenScreen() { JSONObject jsonObject = new JSONObject(); //理片笼使用情况 ArrayList<WebSocketServer> sendwServer = WebSocketServer.sessionMap.get("largenScreen"); if (sendwServer != null) { List<Map<String, Object>> bigStorageCageUsage = bigStorageCageService.selectBigStorageCageUsage(); jsonObject.append("bigStorageCageUsage", bigStorageCageUsage); List<PieChartVO> pieChartVOS = bigStorageCageService.queryPieChart(); jsonObject.append("pieChartVOS", pieChartVOS); List<RunTime> tempRunTimes = bigStorageCageHistoryTaskService.queryRunTimes(LocalDate.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd"))); jsonObject.append("tempRunTimes", tempRunTimes); for (WebSocketServer webserver : sendwServer) { if (webserver != null) { webserver.sendMessage(jsonObject.toString()); List<String> messages = webserver.getMessages(); if (!messages.isEmpty()) { // // 将最后一个消息转换为整数类型的列表 webserver.clearMessages(); } } else { log.info("largenScreen is closed"); } } } webSocketUtils.sendToWeb("largenScreen", jsonObject); } } hangzhoumesParent/moduleService/CacheVerticalGlassModule/src/main/java/com/mes/job/PlcSlicecage.java
@@ -8,7 +8,7 @@ import com.mes.glassinfo.service.GlassInfoService; import com.mes.temperingglass.entity.TemperingGlassInfo; import com.mes.temperingglass.service.TemperingGlassInfoService; import com.mes.tools.WebSocketServer; import com.mes.tools.WebSocketUtils; import com.mes.utils.RedisUtil; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; @@ -36,6 +36,9 @@ private RedisUtil redisUtil; @Autowired private GlassInfoService glassInfoService; @Resource private WebSocketUtils webSocketUtils; // @Value("${mes.scan.ip}") // private String scanIp; @@ -119,29 +122,7 @@ queryDataSource1(); //查询使用数据源2查询数据 queryDataSource2(); try { ArrayList<WebSocketServer> sendwServer = WebSocketServer.sessionMap.get("slicecage"); if (sendwServer != null) { for (WebSocketServer webserver : sendwServer) { if (webserver != null) { webserver.sendMessage(jsonObject.toString()); List<String> messages = webserver.getMessages(); if (!messages.isEmpty()) { // // 将最后一个消息转换为整数类型的列表 webserver.clearMessages(); } } else { log.info("Home is closed"); } } } } catch (Exception e) { e.printStackTrace(); // 打印堆栈信息,方便定位问题 System.out.println("webserver出现异常: " + e.getMessage()); // TODO: handle exception } webSocketUtils.sendToWeb("slicecage", jsonObject); } catch (Exception e) { e.printStackTrace(); } @@ -156,16 +137,7 @@ //出片任务数据 List<BigStorageCageDetails> bigStorageCageDetailsOutTask = bigStorageCageDetailsService.selectOutTask(); jsonObject.append("bigStorageCageDetailsOutTask", bigStorageCageDetailsOutTask); ArrayList<WebSocketServer> sendwServer = WebSocketServer.sessionMap.get("isRun"); if (sendwServer != null) { for (WebSocketServer webserver : sendwServer) { if (webserver != null) { webserver.sendMessage(jsonObject.toString()); } else { log.info("Home is closed"); } } } webSocketUtils.sendToWeb("isRun", jsonObject); } // @Scheduled(fixedDelay = Long.MAX_VALUE) hangzhoumesParent/moduleService/LoadGlassModule/src/main/java/com/mes/job/PlcLoadGlassTask.java
@@ -12,6 +12,7 @@ import com.mes.rawglassdetails.entity.RawGlassStorageDetails; import com.mes.rawglassstation.service.RawGlassStorageStationService; import com.mes.tools.WebSocketServer; import com.mes.tools.WebSocketUtils; import com.mes.uppattenusage.entity.UpPattenUsage; import com.mes.uppattenusage.service.UpPattenUsageService; import com.mes.utils.RedisUtil; @@ -54,6 +55,9 @@ private static final String LOAD_GLASS_DEVICE_ONE_TASK = "load_glass_device_one_task"; private static final String LOAD_GLASS_DEVICE_TWO_TASK = "load_glass_device_two_task"; @Resource private WebSocketUtils webSocketUtils; /** * fixedRate : 上一个调用开始后再次调用的延时(不用等待上一次调用完成) @@ -169,16 +173,7 @@ List<RawGlassStorageDetails> stationList = rawGlassStorageDetailList.stream().filter(item -> item.getDeviceId() == stationCell).collect(Collectors.toList()); jsonObject.append("stationList", stationList); ArrayList<WebSocketServer> sendwServer = WebSocketServer.sessionMap.get(webSocketName); if (sendwServer != null) { for (WebSocketServer webserver : sendwServer) { if (webserver != null) { webserver.sendMessage(jsonObject.toString()); } else { log.info("loadGlass is closed"); } } } webSocketUtils.sendToWeb(webSocketName, jsonObject); } catch (Exception e) { e.printStackTrace(); } @@ -192,17 +187,7 @@ String inkageStatus = plcParameterObject.getPlcParameter("InkageStatus").getValue(); // String inkageStatus ="1"; jsonObject.append("InkageStatus", inkageStatus); ArrayList<WebSocketServer> sendwServer = WebSocketServer.sessionMap.get("loadGlass"); if (sendwServer != null) { for (WebSocketServer webserver : sendwServer) { if (webserver != null) { webserver.sendMessage(jsonObject.toString()); } else { log.info("loadGlass is closed"); } } } webSocketUtils.sendToWeb("loadGlass", jsonObject); } hangzhoumesParent/moduleService/hollowGlassModule/src/main/java/com/mes/hollow/service/impl/HollowGlassRelationInfoServiceImpl.java
@@ -69,6 +69,7 @@ @Override public HollowBigStorageDTO queryHollowTargetSlot(String flowCardId, Integer glassType, double width, double height, int totalLayer, int layer) { log.info("玻璃流程卡:{},序号:{},总层数:{},层数:{}", flowCardId, glassType, totalLayer, layer); //按照玻璃信息获取关系表中对应的大理片笼格子号 HollowGlassRelationInfo relationInfoOne = hollowGlassRelationInfoService.getOne(new LambdaQueryWrapper<HollowGlassRelationInfo>() .eq(HollowGlassRelationInfo::getFlowCardId, flowCardId) @@ -102,6 +103,7 @@ .last("limit 1") ); } Assert.isTrue(null != relationInfoOne, "相关流程卡未找到对应的组号信息,玻璃流程卡:{},序号:{},总层数:{},层数:{}", flowCardId, glassType, totalLayer, layer); Integer slotWidth = sysConfigService.queryConfigValue(ConstSysConfig.HOLLOW_SLOT_WIDTH); //详情表内获取本组是否已经有玻璃在笼子内(0表示提前占用) int taskCount = hollowGlassOutRelationInfoService.count(new LambdaQueryWrapper<HollowGlassOutRelationInfo>() hangzhoumesParent/moduleService/hollowGlassModule/src/main/java/com/mes/job/OpcPlcStorageCageHollowTask.java
@@ -307,6 +307,7 @@ hollowBigStorageCageDetailsService.update(new LambdaUpdateWrapper<HollowBigStorageCageDetails>() .set(HollowBigStorageCageDetails::getState, Const.GLASS_STATE_TAKE) .eq(HollowBigStorageCageDetails::getState, Const.GLASS_STATE_NEW)); hollowGlassRelationInfoService.update(new LambdaUpdateWrapper<HollowGlassRelationInfo>() .set(HollowGlassRelationInfo::getGlassId, null) .set(HollowGlassRelationInfo::getTemperingLayoutId, null) @@ -315,6 +316,8 @@ .set(HollowGlassRelationInfo::getState, Const.HOLLOW_RELATION_NEW) .eq(HollowGlassRelationInfo::getState, Const.HOLLOW_RELATION_OCCUPY) ); //将格子尺寸恢复 hollowBigStorageCageService.resetCage(); return; } //历史数据入库