package com.mes.interaction.workstation.transfer.handler;
|
|
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
|
import com.mes.device.entity.DeviceConfig;
|
import com.mes.device.entity.GlassInfo;
|
import com.mes.device.mapper.DeviceGlassInfoMapper;
|
import com.mes.device.service.DevicePlcOperationService;
|
import com.mes.device.service.GlassInfoService;
|
import com.mes.device.vo.DevicePlcVO;
|
import com.mes.interaction.workstation.base.WorkstationBaseHandler;
|
import com.mes.interaction.workstation.config.WorkstationLogicConfig;
|
import com.mes.s7.enhanced.EnhancedS7Serializer;
|
import com.mes.s7.provider.S7SerializerProvider;
|
import com.mes.service.PlcDynamicDataService;
|
import lombok.extern.slf4j.Slf4j;
|
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Qualifier;
|
import org.springframework.stereotype.Component;
|
import org.springframework.util.StringUtils;
|
import javax.annotation.PreDestroy;
|
|
import java.time.LocalDateTime;
|
import java.util.*;
|
import java.util.concurrent.*;
|
import java.util.concurrent.atomic.AtomicLong;
|
import java.util.stream.Collectors;
|
|
/**
|
* 卧转立主体设备逻辑处理器
|
* 负责玻璃缓冲、容量校验、批次组装、PLC写入等逻辑
|
*/
|
@Slf4j
|
@Component
|
public class HorizontalTransferLogicHandler extends WorkstationBaseHandler {
|
|
private final PlcDynamicDataService plcDynamicDataService;
|
private final GlassInfoService glassInfoService;
|
private final S7SerializerProvider s7SerializerProvider;
|
|
@Autowired(required = false)
|
private DeviceGlassInfoMapper glassInfoMapper;
|
|
// 玻璃缓冲队列:deviceId -> 玻璃信息列表
|
private final Map<String, List<GlassBufferItem>> glassBuffer = new ConcurrentHashMap<>();
|
|
// 最后扫码时间:deviceId -> 最后扫码时间戳
|
private final Map<String, AtomicLong> lastScanTime = new ConcurrentHashMap<>();
|
|
// 监控任务:deviceId -> 监控任务
|
private final Map<String, ScheduledFuture<?>> monitorTasks = new ConcurrentHashMap<>();
|
|
// 监控线程池
|
private final ScheduledExecutorService monitorExecutor = Executors.newScheduledThreadPool(5, r -> {
|
Thread t = new Thread(r, "HorizontalTransferMonitor");
|
t.setDaemon(true);
|
return t;
|
});
|
|
@Autowired
|
public HorizontalTransferLogicHandler(DevicePlcOperationService devicePlcOperationService,
|
PlcDynamicDataService plcDynamicDataService,
|
@Qualifier("deviceGlassInfoService") GlassInfoService glassInfoService,
|
S7SerializerProvider s7SerializerProvider) {
|
super(devicePlcOperationService);
|
this.plcDynamicDataService = plcDynamicDataService;
|
this.glassInfoService = glassInfoService;
|
this.s7SerializerProvider = s7SerializerProvider;
|
}
|
|
@Override
|
public String getDeviceType() {
|
return DeviceConfig.DeviceType.WORKSTATION_TRANSFER;
|
}
|
|
@Override
|
protected DevicePlcVO.OperationResult doExecute(DeviceConfig deviceConfig,
|
String operation,
|
Map<String, Object> params,
|
Map<String, Object> logicParams) {
|
WorkstationLogicConfig config = parseWorkstationConfig(logicParams);
|
|
try {
|
switch (operation) {
|
case "checkAndProcess":
|
case "process":
|
return handleCheckAndProcess(deviceConfig, config, logicParams);
|
case "startMonitor":
|
return handleStartMonitor(deviceConfig, config, logicParams);
|
case "stopMonitor":
|
return handleStopMonitor(deviceConfig);
|
case "clearBuffer":
|
return handleClearBuffer(deviceConfig);
|
default:
|
return buildResult(deviceConfig, operation, false,
|
"不支持的操作: " + operation);
|
}
|
} catch (Exception e) {
|
log.error("卧转立主体处理异常: deviceId={}, operation={}",
|
deviceConfig.getId(), operation, e);
|
return buildResult(deviceConfig, operation, false,
|
"处理异常: " + e.getMessage());
|
}
|
}
|
|
/**
|
* 检查并处理玻璃批次
|
* 从数据库读取最近扫码的玻璃,进行容量判断,组装批次,写入PLC
|
*/
|
private DevicePlcVO.OperationResult handleCheckAndProcess(
|
DeviceConfig deviceConfig,
|
WorkstationLogicConfig config,
|
Map<String, Object> logicParams) {
|
|
String deviceId = deviceConfig.getDeviceId();
|
EnhancedS7Serializer serializer = s7SerializerProvider.getSerializer(deviceConfig);
|
if (serializer == null) {
|
return buildResult(deviceConfig, "checkAndProcess", false,
|
"获取PLC序列化器失败");
|
}
|
|
try {
|
// 1. 从数据库查询最近扫码的玻璃信息(最近1分钟内的记录)
|
List<GlassInfo> recentGlasses = queryRecentScannedGlasses(deviceConfig, logicParams);
|
if (recentGlasses.isEmpty()) {
|
return buildResult(deviceConfig, "checkAndProcess", true,
|
"暂无待处理的玻璃信息");
|
}
|
|
log.info("查询到最近扫码的玻璃: deviceId={}, count={}",
|
deviceId, recentGlasses.size());
|
|
// 2. 更新缓冲队列和最后扫码时间
|
updateBuffer(deviceId, recentGlasses);
|
lastScanTime.put(deviceId, new AtomicLong(System.currentTimeMillis()));
|
|
// 3. 检查是否需要立即处理(容量已满或30s内无新玻璃)
|
List<GlassBufferItem> buffer = glassBuffer.get(deviceId);
|
if (buffer == null || buffer.isEmpty()) {
|
return buildResult(deviceConfig, "checkAndProcess", true,
|
"缓冲队列为空");
|
}
|
|
// 4. 判断是否满足处理条件
|
boolean shouldProcess = shouldProcessBatch(deviceId, buffer, config);
|
if (!shouldProcess) {
|
return buildResult(deviceConfig, "checkAndProcess", true,
|
"等待更多玻璃或30s超时");
|
}
|
|
// 5. 容量判断和批次组装
|
List<GlassInfo> batch = assembleBatch(buffer, config.getVehicleCapacity());
|
if (batch.isEmpty()) {
|
return buildResult(deviceConfig, "checkAndProcess", false,
|
"无法组装有效批次(容量不足)");
|
}
|
|
// 6. 写入PLC
|
DevicePlcVO.OperationResult writeResult = writeBatchToPlc(
|
deviceConfig, batch, serializer, logicParams);
|
|
if (!Boolean.TRUE.equals(writeResult.getSuccess())) {
|
return writeResult;
|
}
|
|
// 7. 从缓冲队列中移除已处理的玻璃
|
removeProcessedGlasses(deviceId, batch);
|
|
String msg = String.format("批次已写入PLC: glassCount=%d, glassIds=%s",
|
batch.size(),
|
batch.stream().map(GlassInfo::getGlassId).collect(Collectors.joining(",")));
|
return buildResult(deviceConfig, "checkAndProcess", true, msg);
|
|
} catch (Exception e) {
|
log.error("检查并处理玻璃批次异常: deviceId={}", deviceId, e);
|
return buildResult(deviceConfig, "checkAndProcess", false,
|
"处理异常: " + e.getMessage());
|
}
|
}
|
|
/**
|
* 查询最近扫码的玻璃信息
|
*/
|
private List<GlassInfo> queryRecentScannedGlasses(
|
DeviceConfig deviceConfig,
|
Map<String, Object> logicParams) {
|
|
if (glassInfoMapper == null) {
|
log.warn("GlassInfoMapper未注入,无法查询最近扫码的玻璃");
|
return Collections.emptyList();
|
}
|
|
try {
|
// 从配置中获取workLine,用于过滤
|
String workLine = getLogicParam(logicParams, "workLine", null);
|
|
// 查询最近2分钟内的玻璃记录(扩大时间窗口,确保不遗漏)
|
Date twoMinutesAgo = new Date(System.currentTimeMillis() - 120000);
|
|
LambdaQueryWrapper<GlassInfo> wrapper = new LambdaQueryWrapper<>();
|
wrapper.eq(GlassInfo::getStatus, GlassInfo.Status.ACTIVE)
|
.ge(GlassInfo::getCreatedTime, twoMinutesAgo)
|
.orderByDesc(GlassInfo::getCreatedTime)
|
.last("LIMIT 20"); // 限制查询数量,避免过多
|
|
// 如果配置了workLine,则过滤description
|
if (workLine != null && !workLine.isEmpty()) {
|
wrapper.like(GlassInfo::getDescription, "workLine=" + workLine);
|
}
|
|
List<GlassInfo> recentGlasses = glassInfoMapper.selectList(wrapper);
|
|
log.debug("查询到最近扫码的玻璃: deviceId={}, workLine={}, count={}",
|
deviceConfig.getId(), workLine, recentGlasses.size());
|
|
return recentGlasses;
|
|
} catch (Exception e) {
|
log.error("查询最近扫码的玻璃信息异常: deviceId={}",
|
deviceConfig.getId(), e);
|
return Collections.emptyList();
|
}
|
}
|
|
/**
|
* 更新缓冲队列
|
*/
|
private void updateBuffer(String deviceId, List<GlassInfo> newGlasses) {
|
List<GlassBufferItem> buffer = glassBuffer.computeIfAbsent(
|
deviceId, k -> new CopyOnWriteArrayList<>());
|
|
Set<String> existingIds = buffer.stream()
|
.map(item -> item.glassInfo.getGlassId())
|
.collect(Collectors.toSet());
|
|
for (GlassInfo glass : newGlasses) {
|
if (!existingIds.contains(glass.getGlassId())) {
|
buffer.add(new GlassBufferItem(glass, System.currentTimeMillis()));
|
log.debug("添加玻璃到缓冲队列: deviceId={}, glassId={}",
|
deviceId, glass.getGlassId());
|
}
|
}
|
}
|
|
/**
|
* 判断是否应该处理批次
|
*/
|
private boolean shouldProcessBatch(String deviceId,
|
List<GlassBufferItem> buffer,
|
WorkstationLogicConfig config) {
|
// 条件1:缓冲队列已满(达到容量限制)
|
int totalLength = buffer.stream()
|
.mapToInt(item -> item.glassInfo.getGlassLength() != null ?
|
item.glassInfo.getGlassLength() : 0)
|
.sum();
|
if (totalLength >= config.getVehicleCapacity()) {
|
log.info("缓冲队列容量已满,触发批次处理: deviceId={}, totalLength={}, capacity={}",
|
deviceId, totalLength, config.getVehicleCapacity());
|
return true;
|
}
|
|
// 条件2:30s内无新玻璃扫码
|
AtomicLong lastTime = lastScanTime.get(deviceId);
|
if (lastTime != null) {
|
long elapsed = System.currentTimeMillis() - lastTime.get();
|
if (elapsed >= config.getTransferDelayMs()) {
|
log.info("30s内无新玻璃扫码,触发批次处理: deviceId={}, elapsed={}ms",
|
deviceId, elapsed);
|
return true;
|
}
|
}
|
|
return false;
|
}
|
|
/**
|
* 组装批次(容量判断)
|
*/
|
private List<GlassInfo> assembleBatch(List<GlassBufferItem> buffer,
|
int vehicleCapacity) {
|
List<GlassInfo> batch = new ArrayList<>();
|
int usedLength = 0;
|
|
for (GlassBufferItem item : buffer) {
|
GlassInfo glass = item.glassInfo;
|
int glassLength = glass.getGlassLength() != null ?
|
glass.getGlassLength() : 0;
|
|
if (usedLength + glassLength <= vehicleCapacity && batch.size() < 6) {
|
batch.add(glass);
|
usedLength += glassLength;
|
} else {
|
break;
|
}
|
}
|
|
return batch;
|
}
|
|
/**
|
* 写入批次到PLC
|
*/
|
private DevicePlcVO.OperationResult writeBatchToPlc(
|
DeviceConfig deviceConfig,
|
List<GlassInfo> batch,
|
EnhancedS7Serializer serializer,
|
Map<String, Object> logicParams) {
|
|
Map<String, Object> payload = new HashMap<>();
|
|
// 写入玻璃ID(最多6个)
|
int count = Math.min(batch.size(), 6);
|
for (int i = 0; i < count; i++) {
|
String fieldName = "plcGlassId" + (i + 1);
|
payload.put(fieldName, batch.get(i).getGlassId());
|
}
|
|
// 写入玻璃数量
|
payload.put("plcGlassCount", count);
|
|
// 写入位置信息(如果有配置)
|
Integer inPosition = getLogicParam(logicParams, "inPosition", null);
|
if (inPosition != null) {
|
payload.put("inPosition", inPosition);
|
}
|
|
// 写入请求字(触发大车)
|
payload.put("plcRequest", 1);
|
|
try {
|
plcDynamicDataService.writePlcData(deviceConfig, payload, serializer);
|
log.info("批次已写入PLC: deviceId={}, glassCount={}",
|
deviceConfig.getId(), count);
|
return buildResult(deviceConfig, "writeBatchToPlc", true,
|
"批次写入成功");
|
} catch (Exception e) {
|
log.error("写入批次到PLC失败: deviceId={}", deviceConfig.getId(), e);
|
return buildResult(deviceConfig, "writeBatchToPlc", false,
|
"写入失败: " + e.getMessage());
|
}
|
}
|
|
/**
|
* 从缓冲队列移除已处理的玻璃
|
*/
|
private void removeProcessedGlasses(String deviceId, List<GlassInfo> processed) {
|
List<GlassBufferItem> buffer = glassBuffer.get(deviceId);
|
if (buffer == null) {
|
return;
|
}
|
|
Set<String> processedIds = processed.stream()
|
.map(GlassInfo::getGlassId)
|
.collect(Collectors.toSet());
|
|
buffer.removeIf(item -> processedIds.contains(item.glassInfo.getGlassId()));
|
}
|
|
/**
|
* 启动监控任务(定期检查并处理)
|
*/
|
private DevicePlcVO.OperationResult handleStartMonitor(
|
DeviceConfig deviceConfig,
|
WorkstationLogicConfig config,
|
Map<String, Object> logicParams) {
|
|
String deviceId = deviceConfig.getDeviceId();
|
|
// 停止旧的监控任务
|
handleStopMonitor(deviceConfig);
|
|
// 获取监控间隔
|
Integer monitorIntervalMs = getLogicParam(logicParams, "monitorIntervalMs",
|
config.getScanIntervalMs());
|
|
// 启动监控任务
|
ScheduledFuture<?> future = monitorExecutor.scheduleWithFixedDelay(() -> {
|
try {
|
handleCheckAndProcess(deviceConfig, config, logicParams);
|
} catch (Exception e) {
|
log.error("监控任务执行异常: deviceId={}", deviceId, e);
|
}
|
}, monitorIntervalMs, monitorIntervalMs, TimeUnit.MILLISECONDS);
|
|
monitorTasks.put(deviceId, future);
|
log.info("已启动卧转立监控任务: deviceId={}, interval={}ms",
|
deviceId, monitorIntervalMs);
|
|
return buildResult(deviceConfig, "startMonitor", true,
|
"监控任务已启动");
|
}
|
|
/**
|
* 停止监控任务
|
*/
|
private DevicePlcVO.OperationResult handleStopMonitor(DeviceConfig deviceConfig) {
|
String deviceId = deviceConfig.getDeviceId();
|
ScheduledFuture<?> future = monitorTasks.remove(deviceId);
|
if (future != null && !future.isCancelled()) {
|
future.cancel(false);
|
log.info("已停止卧转立监控任务: deviceId={}", deviceId);
|
}
|
return buildResult(deviceConfig, "stopMonitor", true, "监控任务已停止");
|
}
|
|
/**
|
* 清空缓冲队列
|
*/
|
private DevicePlcVO.OperationResult handleClearBuffer(DeviceConfig deviceConfig) {
|
String deviceId = deviceConfig.getDeviceId();
|
glassBuffer.remove(deviceId);
|
lastScanTime.remove(deviceId);
|
log.info("已清空缓冲队列: deviceId={}", deviceId);
|
return buildResult(deviceConfig, "clearBuffer", true, "缓冲队列已清空");
|
}
|
|
/**
|
* 构建操作结果
|
*/
|
private DevicePlcVO.OperationResult buildResult(DeviceConfig deviceConfig,
|
String operation,
|
boolean success,
|
String message) {
|
return DevicePlcVO.OperationResult.builder()
|
.deviceId(deviceConfig.getId())
|
.deviceName(deviceConfig.getDeviceName())
|
.deviceCode(deviceConfig.getDeviceCode())
|
.projectId(deviceConfig.getProjectId() != null ?
|
String.valueOf(deviceConfig.getProjectId()) : null)
|
.operation(operation)
|
.success(success)
|
.message(message)
|
.timestamp(LocalDateTime.now())
|
.build();
|
}
|
|
/**
|
* 应用关闭时清理资源
|
*/
|
@PreDestroy
|
public void destroy() {
|
log.info("正在关闭卧转立监控线程池...");
|
|
// 停止所有监控任务
|
for (String deviceId : new ArrayList<>(monitorTasks.keySet())) {
|
ScheduledFuture<?> future = monitorTasks.remove(deviceId);
|
if (future != null && !future.isCancelled()) {
|
future.cancel(false);
|
}
|
}
|
|
// 关闭线程池
|
monitorExecutor.shutdown();
|
try {
|
if (!monitorExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
|
monitorExecutor.shutdownNow();
|
if (!monitorExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
|
log.warn("卧转立监控线程池未能正常关闭");
|
}
|
}
|
} catch (InterruptedException e) {
|
monitorExecutor.shutdownNow();
|
Thread.currentThread().interrupt();
|
}
|
|
log.info("卧转立监控线程池已关闭");
|
}
|
|
/**
|
* 玻璃缓冲项
|
*/
|
private static class GlassBufferItem {
|
final GlassInfo glassInfo;
|
final long timestamp;
|
|
GlassBufferItem(GlassInfo glassInfo, long timestamp) {
|
this.glassInfo = glassInfo;
|
this.timestamp = timestamp;
|
}
|
}
|
}
|