From 366ba040d2447bacd3455299425e3166f1f992bb Mon Sep 17 00:00:00 2001
From: huang <1532065656@qq.com>
Date: 星期四, 20 十一月 2025 14:38:32 +0800
Subject: [PATCH] 添加大车、大理片笼以及多设备串行/并行执行写入基础逻辑
---
mes-processes/mes-plcSend/src/main/java/com/mes/task/service/TaskExecutionEngine.java | 610 +++++++++++++++++++++++++++++++++++++++++++++++++------
1 files changed, 544 insertions(+), 66 deletions(-)
diff --git a/mes-processes/mes-plcSend/src/main/java/com/mes/task/service/TaskExecutionEngine.java b/mes-processes/mes-plcSend/src/main/java/com/mes/task/service/TaskExecutionEngine.java
index 6e6bcbd..2919b0f 100644
--- a/mes-processes/mes-plcSend/src/main/java/com/mes/task/service/TaskExecutionEngine.java
+++ b/mes-processes/mes-plcSend/src/main/java/com/mes/task/service/TaskExecutionEngine.java
@@ -3,6 +3,7 @@
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.mes.device.entity.DeviceConfig;
import com.mes.device.entity.DeviceGroupConfig;
@@ -12,14 +13,17 @@
import com.mes.interaction.DeviceLogicHandlerFactory;
import com.mes.interaction.base.InteractionContext;
import com.mes.interaction.base.InteractionResult;
+import com.mes.device.service.DeviceCoordinationService;
import com.mes.device.service.DeviceInteractionService;
import com.mes.task.dto.TaskParameters;
import com.mes.task.entity.MultiDeviceTask;
import com.mes.task.entity.TaskStepDetail;
import com.mes.task.mapper.MultiDeviceTaskMapper;
import com.mes.task.mapper.TaskStepDetailMapper;
+import com.mes.task.model.RetryPolicy;
import com.mes.task.model.TaskExecutionContext;
import com.mes.task.model.TaskExecutionResult;
+import com.mes.task.service.TaskStatusNotificationService;
import com.mes.device.vo.DevicePlcVO;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
@@ -28,9 +32,11 @@
import org.springframework.util.StringUtils;
import java.util.*;
+import java.util.concurrent.*;
/**
* 澶氳澶囦换鍔℃墽琛屽紩鎿�
+ * 鏀寔涓茶鍜屽苟琛屼袱绉嶆墽琛屾ā寮�
*/
@Slf4j
@Component
@@ -38,6 +44,11 @@
public class TaskExecutionEngine {
private static final Map<String, String> DEFAULT_OPERATIONS = new HashMap<>();
+ private static final TypeReference<Map<String, Object>> MAP_TYPE = new TypeReference<Map<String, Object>>() {};
+
+ // 鎵ц妯″紡甯搁噺
+ private static final String EXECUTION_MODE_SERIAL = "SERIAL";
+ private static final String EXECUTION_MODE_PARALLEL = "PARALLEL";
static {
DEFAULT_OPERATIONS.put(DeviceConfig.DeviceType.LOAD_VEHICLE, "feedGlass");
@@ -50,7 +61,16 @@
private final DeviceInteractionService deviceInteractionService;
private final DeviceInteractionRegistry interactionRegistry;
private final DeviceLogicHandlerFactory handlerFactory;
+ private final DeviceCoordinationService deviceCoordinationService;
+ private final TaskStatusNotificationService notificationService;
private final ObjectMapper objectMapper;
+
+ // 绾跨▼姹犵敤浜庡苟琛屾墽琛�
+ private final ExecutorService executorService = Executors.newCachedThreadPool(r -> {
+ Thread t = new Thread(r, "TaskExecutionEngine-Parallel");
+ t.setDaemon(true);
+ return t;
+ });
public TaskExecutionResult execute(MultiDeviceTask task,
DeviceGroupConfig groupConfig,
@@ -62,24 +82,55 @@
}
TaskExecutionContext context = new TaskExecutionContext(parameters);
+
+ // 璁惧鍗忚皟锛氭鏌ヤ緷璧栧叧绯诲拰鎵ц鏉′欢
+ DeviceCoordinationService.CoordinationResult coordinationResult =
+ deviceCoordinationService.coordinateExecution(groupConfig, devices, context);
+ if (!coordinationResult.canExecute()) {
+ log.warn("璁惧鍗忚皟澶辫触: {}", coordinationResult.getMessage());
+ return TaskExecutionResult.failure(coordinationResult.getMessage(), Collections.emptyMap());
+ }
+ log.info("璁惧鍗忚皟鎴愬姛: {}", coordinationResult.getMessage());
+
task.setTotalSteps(devices.size());
task.setStatus(MultiDeviceTask.Status.RUNNING.name());
multiDeviceTaskMapper.updateById(task);
+
+ // 閫氱煡浠诲姟寮�濮嬫墽琛�
+ notificationService.notifyTaskStatus(task);
+
+ // 纭畾鎵ц妯″紡
+ String executionMode = determineExecutionMode(groupConfig);
+ Integer maxConcurrent = getMaxConcurrentDevices(groupConfig);
- List<Map<String, Object>> stepSummaries = new ArrayList<>();
- boolean success = true;
- String failureMessage = null;
+ log.info("浠诲姟鎵ц妯″紡: {}, 鏈�澶у苟鍙戞暟: {}, 璁惧鏁�: {}", executionMode, maxConcurrent, devices.size());
- for (int i = 0; i < devices.size(); i++) {
- DeviceConfig device = devices.get(i);
- int order = i + 1;
- TaskStepDetail step = createStepRecord(task, device, order);
- StepResult stepResult = executeStep(task, step, device, context);
- stepSummaries.add(stepResult.toSummary());
- if (!stepResult.isSuccess()) {
- success = false;
- failureMessage = stepResult.getMessage();
- break;
+ List<Map<String, Object>> stepSummaries;
+ boolean success;
+ String failureMessage;
+
+ if (EXECUTION_MODE_PARALLEL.equals(executionMode)) {
+ // 骞惰鎵ц妯″紡
+ stepSummaries = new ArrayList<>(Collections.nCopies(devices.size(), null));
+ Pair<Boolean, String> result = executeParallel(task, devices, context, stepSummaries, maxConcurrent);
+ success = result.getFirst();
+ failureMessage = result.getSecond();
+ } else {
+ // 涓茶鎵ц妯″紡锛堥粯璁わ級
+ stepSummaries = new ArrayList<>();
+ success = true;
+ failureMessage = null;
+ for (int i = 0; i < devices.size(); i++) {
+ DeviceConfig device = devices.get(i);
+ int order = i + 1;
+ TaskStepDetail step = createStepRecord(task, device, order);
+ StepResult stepResult = executeStep(task, step, device, context);
+ stepSummaries.add(stepResult.toSummary());
+ if (!stepResult.isSuccess()) {
+ success = false;
+ failureMessage = stepResult.getMessage();
+ break;
+ }
}
}
@@ -87,11 +138,212 @@
payload.put("steps", stepSummaries);
payload.put("groupId", groupConfig.getId());
payload.put("deviceCount", devices.size());
+ payload.put("executionMode", executionMode);
+ // 鏇存柊浠诲姟鏈�缁堢姸鎬�
+ if (success) {
+ task.setStatus(MultiDeviceTask.Status.COMPLETED.name());
+ } else {
+ task.setStatus(MultiDeviceTask.Status.FAILED.name());
+ task.setErrorMessage(failureMessage);
+ }
+ task.setEndTime(new Date());
+ multiDeviceTaskMapper.updateById(task);
+
+ // 閫氱煡浠诲姟瀹屾垚
+ notificationService.notifyTaskStatus(task);
+
if (success) {
return TaskExecutionResult.success(payload);
}
return TaskExecutionResult.failure(failureMessage != null ? failureMessage : "浠诲姟鎵ц澶辫触", payload);
+ }
+
+ /**
+ * 骞惰鎵ц澶氫釜璁惧鎿嶄綔
+ */
+ private Pair<Boolean, String> executeParallel(MultiDeviceTask task,
+ List<DeviceConfig> devices,
+ TaskExecutionContext context,
+ List<Map<String, Object>> stepSummaries,
+ Integer maxConcurrent) {
+ int concurrency = maxConcurrent != null && maxConcurrent > 0
+ ? Math.min(maxConcurrent, devices.size())
+ : devices.size();
+
+ // 鍒涘缓鎵�鏈夋楠よ褰�
+ List<TaskStepDetail> steps = new ArrayList<>();
+ for (int i = 0; i < devices.size(); i++) {
+ DeviceConfig device = devices.get(i);
+ int order = i + 1;
+ TaskStepDetail step = createStepRecord(task, device, order);
+ steps.add(step);
+ }
+
+ // 浣跨敤淇″彿閲忔帶鍒跺苟鍙戞暟
+ Semaphore semaphore = new Semaphore(concurrency);
+ List<CompletableFuture<StepResult>> futures = new ArrayList<>();
+
+ for (int i = 0; i < devices.size(); i++) {
+ final int index = i;
+ final DeviceConfig device = devices.get(index);
+ final TaskStepDetail step = steps.get(index);
+
+ CompletableFuture<StepResult> future = CompletableFuture.supplyAsync(() -> {
+ try {
+ semaphore.acquire();
+ try {
+ return executeStep(task, step, device, context);
+ } finally {
+ semaphore.release();
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ log.error("骞惰鎵ц琚腑鏂�, deviceId={}", device.getId(), e);
+ return StepResult.failure(device.getDeviceName(), "鎵ц琚腑鏂�");
+ } catch (Exception e) {
+ log.error("骞惰鎵ц寮傚父, deviceId={}", device.getId(), e);
+ return StepResult.failure(device.getDeviceName(), e.getMessage());
+ }
+ }, executorService);
+
+ final int finalIndex = index;
+ future.whenComplete((result, throwable) -> {
+ if (throwable != null) {
+ log.error("骞惰鎵ц瀹屾垚鏃跺紓甯�, deviceId={}", device.getId(), throwable);
+ stepSummaries.set(finalIndex, StepResult.failure(device.getDeviceName(),
+ throwable.getMessage()).toSummary());
+ } else if (result != null) {
+ stepSummaries.set(finalIndex, result.toSummary());
+ }
+ });
+
+ futures.add(future);
+ }
+
+ // 绛夊緟鎵�鏈変换鍔″畬鎴�
+ CompletableFuture<Void> allFutures = CompletableFuture.allOf(
+ futures.toArray(new CompletableFuture[0])
+ );
+
+ try {
+ allFutures.get(30, TimeUnit.MINUTES); // 鏈�澶氱瓑寰�30鍒嗛挓
+ } catch (TimeoutException e) {
+ log.error("骞惰鎵ц瓒呮椂, taskId={}", task.getTaskId(), e);
+ return Pair.of(false, "浠诲姟鎵ц瓒呮椂");
+ } catch (Exception e) {
+ log.error("绛夊緟骞惰鎵ц瀹屾垚鏃跺紓甯�, taskId={}", task.getTaskId(), e);
+ return Pair.of(false, "绛夊緟鎵ц瀹屾垚鏃跺彂鐢熷紓甯�: " + e.getMessage());
+ }
+
+ // 妫�鏌ユ墍鏈夋楠ょ殑鎵ц缁撴灉
+ boolean allSuccess = true;
+ String firstFailureMessage = null;
+ for (int i = 0; i < futures.size(); i++) {
+ try {
+ StepResult result = futures.get(i).get();
+ if (result != null && !result.isSuccess()) {
+ allSuccess = false;
+ if (firstFailureMessage == null) {
+ firstFailureMessage = result.getMessage();
+ }
+ }
+ } catch (Exception e) {
+ log.error("鑾峰彇姝ラ鎵ц缁撴灉寮傚父, stepIndex={}", i, e);
+ allSuccess = false;
+ if (firstFailureMessage == null) {
+ firstFailureMessage = "鑾峰彇鎵ц缁撴灉寮傚父: " + e.getMessage();
+ }
+ }
+ }
+
+ return Pair.of(allSuccess, firstFailureMessage);
+ }
+
+ /**
+ * 纭畾鎵ц妯″紡
+ */
+ private String determineExecutionMode(DeviceGroupConfig groupConfig) {
+ if (groupConfig == null) {
+ return EXECUTION_MODE_SERIAL; // 榛樿涓茶
+ }
+
+ // 浠巈xtraConfig涓鍙杄xecutionMode
+ String extraConfig = groupConfig.getExtraConfig();
+ if (StringUtils.hasText(extraConfig)) {
+ try {
+ Map<String, Object> config = objectMapper.readValue(extraConfig, MAP_TYPE);
+ Object mode = config.get("executionMode");
+ if (mode != null) {
+ String modeStr = String.valueOf(mode).toUpperCase();
+ if (EXECUTION_MODE_PARALLEL.equals(modeStr) || EXECUTION_MODE_SERIAL.equals(modeStr)) {
+ return modeStr;
+ }
+ }
+ } catch (Exception e) {
+ log.warn("瑙f瀽璁惧缁勬墽琛屾ā寮忓け璐�, groupId={}", groupConfig.getId(), e);
+ }
+ }
+
+ // 濡傛灉鏈塵axConcurrentDevices涓斿ぇ浜�1锛岄粯璁や娇鐢ㄥ苟琛屾ā寮�
+ if (groupConfig.getMaxConcurrentDevices() != null && groupConfig.getMaxConcurrentDevices() > 1) {
+ return EXECUTION_MODE_PARALLEL;
+ }
+
+ return EXECUTION_MODE_SERIAL; // 榛樿涓茶
+ }
+
+ /**
+ * 鑾峰彇鏈�澶у苟鍙戣澶囨暟
+ */
+ private Integer getMaxConcurrentDevices(DeviceGroupConfig groupConfig) {
+ if (groupConfig == null) {
+ return 1;
+ }
+
+ // 浠巈xtraConfig涓鍙杕axConcurrent
+ String extraConfig = groupConfig.getExtraConfig();
+ if (StringUtils.hasText(extraConfig)) {
+ try {
+ Map<String, Object> config = objectMapper.readValue(extraConfig, MAP_TYPE);
+ Object maxConcurrent = config.get("maxConcurrent");
+ if (maxConcurrent instanceof Number) {
+ return ((Number) maxConcurrent).intValue();
+ }
+ } catch (Exception e) {
+ log.warn("瑙f瀽璁惧缁勬渶澶у苟鍙戞暟澶辫触, groupId={}", groupConfig.getId(), e);
+ }
+ }
+
+ // 浣跨敤瀹炰綋瀛楁
+ return groupConfig.getMaxConcurrentDevices() != null && groupConfig.getMaxConcurrentDevices() > 0
+ ? groupConfig.getMaxConcurrentDevices()
+ : 1;
+ }
+
+ /**
+ * 绠�鍗曠殑Pair绫荤敤浜庤繑鍥炰袱涓��
+ */
+ private static class Pair<T, U> {
+ private final T first;
+ private final U second;
+
+ private Pair(T first, U second) {
+ this.first = first;
+ this.second = second;
+ }
+
+ public static <T, U> Pair<T, U> of(T first, U second) {
+ return new Pair<>(first, second);
+ }
+
+ public T getFirst() {
+ return first;
+ }
+
+ public U getSecond() {
+ return second;
+ }
}
private TaskStepDetail createStepRecord(MultiDeviceTask task, DeviceConfig device, int order) {
@@ -110,13 +362,25 @@
TaskStepDetail step,
DeviceConfig device,
TaskExecutionContext context) {
+ return executeStepWithRetry(task, step, device, context, getRetryPolicy(device));
+ }
+
+ /**
+ * 甯﹂噸璇曠殑姝ラ鎵ц
+ */
+ private StepResult executeStepWithRetry(MultiDeviceTask task,
+ TaskStepDetail step,
+ DeviceConfig device,
+ TaskExecutionContext context,
+ RetryPolicy retryPolicy) {
Date startTime = new Date();
step.setStartTime(startTime);
step.setStatus(TaskStepDetail.Status.RUNNING.name());
+ step.setRetryCount(0);
DeviceInteraction deviceInteraction = interactionRegistry.getInteraction(device.getDeviceType());
if (deviceInteraction != null) {
- return executeInteractionStep(task, step, device, context, deviceInteraction);
+ return executeInteractionStepWithRetry(task, step, device, context, deviceInteraction, retryPolicy);
}
Map<String, Object> params = buildOperationParams(device, context);
@@ -125,65 +389,260 @@
String operation = determineOperation(device, params);
DeviceLogicHandler handler = handlerFactory.getHandler(device.getDeviceType());
- DevicePlcVO.OperationResult result;
+
+ int retryAttempt = 0;
+ Exception lastException = null;
+
+ while (retryAttempt <= retryPolicy.getMaxRetryCount()) {
+ try {
+ if (retryAttempt > 0) {
+ // 閲嶈瘯鍓嶇瓑寰�
+ long waitTime = retryPolicy.calculateRetryInterval(retryAttempt);
+ log.info("姝ラ鎵ц閲嶈瘯: deviceId={}, operation={}, retryAttempt={}/{}, waitTime={}ms",
+ device.getId(), operation, retryAttempt, retryPolicy.getMaxRetryCount(), waitTime);
+ Thread.sleep(waitTime);
+
+ // 鏇存柊姝ラ鐘舵��
+ step.setRetryCount(retryAttempt);
+ step.setStatus(TaskStepDetail.Status.RUNNING.name());
+ step.setStartTime(new Date());
+ taskStepDetailMapper.updateById(step);
+ }
- try {
- if (handler == null) {
- result = deviceInteractionService.executeOperation(device.getId(), operation, params);
- } else {
- result = handler.execute(device, operation, params);
+ DevicePlcVO.OperationResult result;
+ if (handler == null) {
+ result = deviceInteractionService.executeOperation(device.getId(), operation, params);
+ } else {
+ result = handler.execute(device, operation, params);
+ }
+
+ boolean opSuccess = Boolean.TRUE.equals(result.getSuccess());
+ updateStepAfterOperation(step, result, opSuccess);
+ updateTaskProgress(task, step.getStepOrder(), opSuccess);
+
+ // 閫氱煡姝ラ鏇存柊
+ notificationService.notifyStepUpdate(task.getTaskId(), step);
+
+ if (opSuccess) {
+ updateContextAfterSuccess(device, context, params);
+
+ // 鍚屾璁惧鐘舵��
+ deviceCoordinationService.syncDeviceStatus(device,
+ DeviceCoordinationService.DeviceStatus.COMPLETED, context);
+
+ return StepResult.success(device.getDeviceName(), result.getMessage());
+ } else {
+ // 涓氬姟澶辫触锛屽垽鏂槸鍚﹀彲閲嶈瘯
+ if (retryAttempt < retryPolicy.getMaxRetryCount() && isRetryableFailure(result)) {
+ retryAttempt++;
+ lastException = new RuntimeException(result.getMessage());
+ log.warn("姝ラ鎵ц澶辫触锛屽噯澶囬噸璇�: deviceId={}, operation={}, retryAttempt={}, message={}",
+ device.getId(), operation, retryAttempt, result.getMessage());
+ continue;
+ }
+
+ // 鍚屾澶辫触鐘舵��
+ deviceCoordinationService.syncDeviceStatus(device,
+ DeviceCoordinationService.DeviceStatus.FAILED, context);
+
+ return StepResult.failure(device.getDeviceName(), result.getMessage());
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ log.error("姝ラ鎵ц琚腑鏂�, deviceId={}, operation={}", device.getId(), operation, e);
+ step.setStatus(TaskStepDetail.Status.FAILED.name());
+ step.setErrorMessage("鎵ц琚腑鏂�: " + e.getMessage());
+ step.setEndTime(new Date());
+ step.setDurationMs(step.getEndTime().getTime() - step.getStartTime().getTime());
+ step.setRetryCount(retryAttempt);
+ taskStepDetailMapper.updateById(step);
+ updateTaskProgress(task, step.getStepOrder(), false);
+ return StepResult.failure(device.getDeviceName(), "鎵ц琚腑鏂�");
+ } catch (Exception e) {
+ lastException = e;
+ log.error("璁惧鎿嶄綔寮傚父, deviceId={}, operation={}, retryAttempt={}",
+ device.getId(), operation, retryAttempt, e);
+
+ // 鍒ゆ柇鏄惁鍙噸璇�
+ if (retryAttempt < retryPolicy.getMaxRetryCount() && retryPolicy.isRetryable(e)) {
+ retryAttempt++;
+ log.warn("姝ラ鎵ц寮傚父锛屽噯澶囬噸璇�: deviceId={}, operation={}, retryAttempt={}, exception={}",
+ device.getId(), operation, retryAttempt, e.getClass().getSimpleName());
+ continue;
+ }
+
+ // 涓嶅彲閲嶈瘯鎴栬揪鍒版渶澶ч噸璇曟鏁�
+ step.setStatus(TaskStepDetail.Status.FAILED.name());
+ step.setErrorMessage(e.getMessage());
+ step.setEndTime(new Date());
+ step.setDurationMs(step.getEndTime().getTime() - step.getStartTime().getTime());
+ step.setRetryCount(retryAttempt);
+ taskStepDetailMapper.updateById(step);
+ updateTaskProgress(task, step.getStepOrder(), false);
+
+ // 閫氱煡姝ラ鏇存柊
+ notificationService.notifyStepUpdate(task.getTaskId(), step);
+
+ // 鍚屾澶辫触鐘舵��
+ deviceCoordinationService.syncDeviceStatus(device,
+ DeviceCoordinationService.DeviceStatus.FAILED, context);
+
+ String errorMsg = retryAttempt > 0
+ ? String.format("鎵ц澶辫触锛堝凡閲嶈瘯%d娆★級: %s", retryAttempt, e.getMessage())
+ : e.getMessage();
+ return StepResult.failure(device.getDeviceName(), errorMsg);
}
-
- boolean opSuccess = Boolean.TRUE.equals(result.getSuccess());
- updateStepAfterOperation(step, result, opSuccess);
- updateTaskProgress(task, step.getStepOrder(), opSuccess);
-
- if (opSuccess) {
- updateContextAfterSuccess(device, context, params);
- return StepResult.success(device.getDeviceName(), result.getMessage());
- }
- return StepResult.failure(device.getDeviceName(), result.getMessage());
- } catch (Exception e) {
- log.error("璁惧鎿嶄綔寮傚父, deviceId={}, operation={}", device.getId(), operation, e);
- step.setStatus(TaskStepDetail.Status.FAILED.name());
- step.setErrorMessage(e.getMessage());
- step.setEndTime(new Date());
- step.setDurationMs(step.getEndTime().getTime() - step.getStartTime().getTime());
- taskStepDetailMapper.updateById(step);
- updateTaskProgress(task, step.getStepOrder(), false);
- return StepResult.failure(device.getDeviceName(), e.getMessage());
}
+
+ // 杈惧埌鏈�澶ч噸璇曟鏁�
+ step.setStatus(TaskStepDetail.Status.FAILED.name());
+ step.setErrorMessage(lastException != null ? lastException.getMessage() : "鎵ц澶辫触");
+ step.setEndTime(new Date());
+ step.setDurationMs(step.getEndTime().getTime() - step.getStartTime().getTime());
+ step.setRetryCount(retryAttempt);
+ taskStepDetailMapper.updateById(step);
+ updateTaskProgress(task, step.getStepOrder(), false);
+
+ // 閫氱煡姝ラ鏇存柊
+ notificationService.notifyStepUpdate(task.getTaskId(), step);
+
+ deviceCoordinationService.syncDeviceStatus(device,
+ DeviceCoordinationService.DeviceStatus.FAILED, context);
+
+ return StepResult.failure(device.getDeviceName(),
+ String.format("鎵ц澶辫触锛堝凡閲嶈瘯%d娆★級", retryAttempt));
}
- private StepResult executeInteractionStep(MultiDeviceTask task,
- TaskStepDetail step,
- DeviceConfig device,
- TaskExecutionContext context,
- DeviceInteraction deviceInteraction) {
- try {
- InteractionContext interactionContext = new InteractionContext(device, context);
- step.setInputData(toJson(context.getParameters()));
- InteractionResult interactionResult = deviceInteraction.execute(interactionContext);
- boolean success = interactionResult != null && interactionResult.isSuccess();
- updateStepAfterInteraction(step, interactionResult);
- updateTaskProgress(task, step.getStepOrder(), success);
+ /**
+ * 甯﹂噸璇曠殑浜や簰姝ラ鎵ц
+ */
+ private StepResult executeInteractionStepWithRetry(MultiDeviceTask task,
+ TaskStepDetail step,
+ DeviceConfig device,
+ TaskExecutionContext context,
+ DeviceInteraction deviceInteraction,
+ RetryPolicy retryPolicy) {
+ int retryAttempt = 0;
+ Exception lastException = null;
+
+ while (retryAttempt <= retryPolicy.getMaxRetryCount()) {
+ try {
+ if (retryAttempt > 0) {
+ long waitTime = retryPolicy.calculateRetryInterval(retryAttempt);
+ log.info("浜や簰姝ラ鎵ц閲嶈瘯: deviceId={}, retryAttempt={}/{}, waitTime={}ms",
+ device.getId(), retryAttempt, retryPolicy.getMaxRetryCount(), waitTime);
+ Thread.sleep(waitTime);
+
+ step.setRetryCount(retryAttempt);
+ step.setStatus(TaskStepDetail.Status.RUNNING.name());
+ step.setStartTime(new Date());
+ taskStepDetailMapper.updateById(step);
+ }
- if (success) {
- return StepResult.success(device.getDeviceName(), interactionResult.getMessage());
+ InteractionContext interactionContext = new InteractionContext(device, context);
+ step.setInputData(toJson(context.getParameters()));
+ InteractionResult interactionResult = deviceInteraction.execute(interactionContext);
+ boolean success = interactionResult != null && interactionResult.isSuccess();
+ updateStepAfterInteraction(step, interactionResult);
+ updateTaskProgress(task, step.getStepOrder(), success);
+
+ if (success) {
+ deviceCoordinationService.syncDeviceStatus(device,
+ DeviceCoordinationService.DeviceStatus.COMPLETED, context);
+ return StepResult.success(device.getDeviceName(), interactionResult.getMessage());
+ } else {
+ if (retryAttempt < retryPolicy.getMaxRetryCount()) {
+ retryAttempt++;
+ continue;
+ }
+ deviceCoordinationService.syncDeviceStatus(device,
+ DeviceCoordinationService.DeviceStatus.FAILED, context);
+ String message = interactionResult != null ? interactionResult.getMessage() : "浜や簰鎵ц澶辫触";
+ return StepResult.failure(device.getDeviceName(), message);
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ log.error("浜や簰姝ラ鎵ц琚腑鏂�, deviceId={}", device.getId(), e);
+ step.setStatus(TaskStepDetail.Status.FAILED.name());
+ step.setErrorMessage("鎵ц琚腑鏂�: " + e.getMessage());
+ step.setEndTime(new Date());
+ step.setDurationMs(step.getEndTime().getTime() - step.getStartTime().getTime());
+ step.setRetryCount(retryAttempt);
+ taskStepDetailMapper.updateById(step);
+ updateTaskProgress(task, step.getStepOrder(), false);
+ return StepResult.failure(device.getDeviceName(), "鎵ц琚腑鏂�");
+ } catch (Exception e) {
+ lastException = e;
+ log.error("浜や簰鎵ц寮傚父, deviceId={}, retryAttempt={}", device.getId(), retryAttempt, e);
+
+ if (retryAttempt < retryPolicy.getMaxRetryCount() && retryPolicy.isRetryable(e)) {
+ retryAttempt++;
+ continue;
+ }
+
+ step.setStatus(TaskStepDetail.Status.FAILED.name());
+ step.setErrorMessage(e.getMessage());
+ step.setEndTime(new Date());
+ step.setDurationMs(step.getEndTime().getTime() - step.getStartTime().getTime());
+ step.setRetryCount(retryAttempt);
+ taskStepDetailMapper.updateById(step);
+ updateTaskProgress(task, step.getStepOrder(), false);
+
+ // 閫氱煡姝ラ鏇存柊
+ notificationService.notifyStepUpdate(task.getTaskId(), step);
+
+ deviceCoordinationService.syncDeviceStatus(device,
+ DeviceCoordinationService.DeviceStatus.FAILED, context);
+
+ String errorMsg = retryAttempt > 0
+ ? String.format("鎵ц澶辫触锛堝凡閲嶈瘯%d娆★級: %s", retryAttempt, e.getMessage())
+ : e.getMessage();
+ return StepResult.failure(device.getDeviceName(), errorMsg);
}
- String message = interactionResult != null ? interactionResult.getMessage() : "浜や簰鎵ц澶辫触";
- return StepResult.failure(device.getDeviceName(), message);
- } catch (Exception e) {
- log.error("浜や簰鎵ц寮傚父, deviceId={}", device.getId(), e);
- step.setStatus(TaskStepDetail.Status.FAILED.name());
- step.setErrorMessage(e.getMessage());
- step.setEndTime(new Date());
- step.setDurationMs(step.getEndTime().getTime() - step.getStartTime().getTime());
- taskStepDetailMapper.updateById(step);
- updateTaskProgress(task, step.getStepOrder(), false);
- return StepResult.failure(device.getDeviceName(), e.getMessage());
}
+
+ step.setStatus(TaskStepDetail.Status.FAILED.name());
+ step.setErrorMessage(lastException != null ? lastException.getMessage() : "浜や簰鎵ц澶辫触");
+ step.setEndTime(new Date());
+ step.setDurationMs(step.getEndTime().getTime() - step.getStartTime().getTime());
+ step.setRetryCount(retryAttempt);
+ taskStepDetailMapper.updateById(step);
+ updateTaskProgress(task, step.getStepOrder(), false);
+
+ // 閫氱煡姝ラ鏇存柊
+ notificationService.notifyStepUpdate(task.getTaskId(), step);
+
+ deviceCoordinationService.syncDeviceStatus(device,
+ DeviceCoordinationService.DeviceStatus.FAILED, context);
+
+ return StepResult.failure(device.getDeviceName(),
+ String.format("鎵ц澶辫触锛堝凡閲嶈瘯%d娆★級", retryAttempt));
}
+
+ /**
+ * 鑾峰彇閲嶈瘯绛栫暐
+ */
+ private RetryPolicy getRetryPolicy(DeviceConfig device) {
+ // 鍙互浠庤澶囬厤缃腑璇诲彇閲嶈瘯绛栫暐
+ // 鏆傛椂浣跨敤榛樿绛栫暐
+ return RetryPolicy.defaultPolicy();
+ }
+
+ /**
+ * 鍒ゆ柇涓氬姟澶辫触鏄惁鍙噸璇�
+ */
+ private boolean isRetryableFailure(DevicePlcVO.OperationResult result) {
+ if (result == null || result.getMessage() == null) {
+ return false;
+ }
+ String message = result.getMessage().toLowerCase();
+ // 缃戠粶閿欒銆佽秴鏃堕敊璇彲閲嶈瘯
+ return message.contains("timeout") ||
+ message.contains("connection") ||
+ message.contains("缃戠粶") ||
+ message.contains("瓒呮椂");
+ }
+
private void updateStepAfterOperation(TaskStepDetail step,
DevicePlcVO.OperationResult result,
@@ -223,6 +682,9 @@
update.set(MultiDeviceTask::getStatus, MultiDeviceTask.Status.FAILED.name());
}
multiDeviceTaskMapper.update(null, update);
+
+ // 閫氱煡浠诲姟鐘舵�佹洿鏂�
+ notificationService.notifyTaskStatus(task);
}
private String determineOperation(DeviceConfig device, Map<String, Object> params) {
@@ -302,12 +764,28 @@
private void updateContextAfterSuccess(DeviceConfig device,
TaskExecutionContext context,
Map<String, Object> params) {
+ List<String> glassIds = extractGlassIds(params);
+
switch (device.getDeviceType()) {
case DeviceConfig.DeviceType.LOAD_VEHICLE:
- context.setLoadedGlassIds(extractGlassIds(params));
+ context.setLoadedGlassIds(glassIds);
+ // 鏁版嵁浼犻�掞細涓婂ぇ杞� -> 涓嬩竴涓澶�
+ if (!CollectionUtils.isEmpty(glassIds)) {
+ Map<String, Object> transferData = new HashMap<>();
+ transferData.put("glassIds", glassIds);
+ transferData.put("sourceDevice", device.getDeviceCode());
+ // 杩欓噷绠�鍖栧鐞嗭紝瀹為檯搴旇鎵惧埌涓嬩竴涓澶�
+ // 鍦ㄤ覆琛屾ā寮忎笅锛屼笅涓�涓澶囦細鍦ㄥ惊鐜腑鑷姩鑾峰彇
+ }
break;
case DeviceConfig.DeviceType.LARGE_GLASS:
- context.setProcessedGlassIds(extractGlassIds(params));
+ context.setProcessedGlassIds(glassIds);
+ // 鏁版嵁浼犻�掞細澶х悊鐗� -> 涓嬩竴涓澶�
+ if (!CollectionUtils.isEmpty(glassIds)) {
+ Map<String, Object> transferData = new HashMap<>();
+ transferData.put("glassIds", glassIds);
+ transferData.put("sourceDevice", device.getDeviceCode());
+ }
break;
default:
break;
--
Gitblit v1.8.0