package com.mes.milo.runner.subscription; import com.mes.milo.utils.CustomUtil; import lombok.extern.slf4j.Slf4j; import org.eclipse.milo.opcua.sdk.client.OpcUaClient; import org.eclipse.milo.opcua.sdk.client.api.subscriptions.UaSubscription; import org.eclipse.milo.opcua.sdk.client.api.subscriptions.UaSubscriptionManager; import org.eclipse.milo.opcua.sdk.client.subscriptions.ManagedDataItem; import org.eclipse.milo.opcua.sdk.client.subscriptions.ManagedSubscription; import org.eclipse.milo.opcua.stack.core.types.builtin.NodeId; import org.eclipse.milo.opcua.stack.core.types.builtin.StatusCode; import org.eclipse.milo.opcua.stack.core.types.builtin.unsigned.UInteger; import java.util.ArrayList; import java.util.List; import java.util.concurrent.CountDownLatch; /** * 类 SubscriptionRunner 功能描述: * * @author mes * @version 0.0.1 * @date 2022/01/01 23:49 */ @Slf4j public class SubscriptionRunner { /** * 点位list */ private final List identifiers; private final double samplingInterval; public SubscriptionRunner(List identifiers) { this.identifiers = identifiers; this.samplingInterval = 1000.0D; } public SubscriptionRunner(List identifiers, double samplingInterval) { this.identifiers = identifiers; this.samplingInterval = samplingInterval; } public void run(OpcUaClient opcUaClient, SubscriptionCallback callback) { final CountDownLatch downLatch = new CountDownLatch(1); //添加订阅监听器,用于处理断线重连后的订阅问题 opcUaClient.getSubscriptionManager().addSubscriptionListener(new CustomSubscriptionListener(opcUaClient, callback)); //处理订阅逻辑 handler(opcUaClient, callback); try { //持续监听 downLatch.await(); } catch (Exception e) { log.error("订阅时出现了异常:{}", e.getMessage(), e); } } private void handler(OpcUaClient opcUaClient, SubscriptionCallback callback) { try { //创建订阅 ManagedSubscription subscription = ManagedSubscription.create(opcUaClient, samplingInterval); subscription.setDefaultSamplingInterval(samplingInterval); subscription.setDefaultQueueSize(UInteger.valueOf(10)); List nodeIdList = new ArrayList<>(); for (String identifier : identifiers) { nodeIdList.add(CustomUtil.parseNodeId(identifier)); } List dataItemList = subscription.createDataItems(nodeIdList); for (ManagedDataItem dataItem : dataItemList) { dataItem.addDataValueListener((item) -> { try { callback.onSubscribe (dataItem, item); } catch (Exception e) { e.printStackTrace(); } }); } } catch (Exception e) { log.error("订阅时出现了异常:{}", e.getMessage(), e); } } private class CustomSubscriptionListener implements UaSubscriptionManager.SubscriptionListener { private final OpcUaClient client; private final SubscriptionCallback callback; public CustomSubscriptionListener(OpcUaClient client, SubscriptionCallback callback) { this.client = client; this.callback = callback; } /** * 重连时 尝试恢复之前的订阅失败时 会调用此方法 * * @param uaSubscription 订阅 * @param statusCode 状态 */ @Override public void onSubscriptionTransferFailed(UaSubscription uaSubscription, StatusCode statusCode) { log.debug("恢复订阅失败 需要重新订阅"); //在回调方法中重新订阅 handler(client, callback); } } }