Skip to content

Instantly share code, notes, and snippets.

@kevinherron
Last active August 27, 2019 13:29
Show Gist options
  • Save kevinherron/9e4fc2831332ac37751c57bf7b2dd07d to your computer and use it in GitHub Desktop.
Save kevinherron/9e4fc2831332ac37751c57bf7b2dd07d to your computer and use it in GitHub Desktop.
modified subscription example showing re-creating on transfer failure
/*
* Copyright (c) 2019 the Eclipse Milo Authors
*
* This program and the accompanying materials are made
* available under the terms of the Eclipse Public License 2.0
* which is available at https://www.eclipse.org/legal/epl-2.0/
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.milo.examples.client;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiConsumer;
import org.eclipse.milo.opcua.sdk.client.OpcUaClient;
import org.eclipse.milo.opcua.sdk.client.api.subscriptions.UaMonitoredItem;
import org.eclipse.milo.opcua.sdk.client.api.subscriptions.UaSubscription;
import org.eclipse.milo.opcua.sdk.client.api.subscriptions.UaSubscriptionManager;
import org.eclipse.milo.opcua.stack.core.AttributeId;
import org.eclipse.milo.opcua.stack.core.Identifiers;
import org.eclipse.milo.opcua.stack.core.types.builtin.DataValue;
import org.eclipse.milo.opcua.stack.core.types.builtin.QualifiedName;
import org.eclipse.milo.opcua.stack.core.types.builtin.StatusCode;
import org.eclipse.milo.opcua.stack.core.types.builtin.unsigned.UInteger;
import org.eclipse.milo.opcua.stack.core.types.enumerated.MonitoringMode;
import org.eclipse.milo.opcua.stack.core.types.enumerated.TimestampsToReturn;
import org.eclipse.milo.opcua.stack.core.types.structured.MonitoredItemCreateRequest;
import org.eclipse.milo.opcua.stack.core.types.structured.MonitoringParameters;
import org.eclipse.milo.opcua.stack.core.types.structured.ReadValueId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static com.google.common.collect.Lists.newArrayList;
import static org.eclipse.milo.opcua.stack.core.types.builtin.unsigned.Unsigned.uint;
public class SubscriptionExample implements ClientExample {
public static void main(String[] args) throws Exception {
SubscriptionExample example = new SubscriptionExample();
new ClientExampleRunner(example, false).run();
}
private final Logger logger = LoggerFactory.getLogger(getClass());
private final AtomicLong clientHandles = new AtomicLong(1L);
@Override
public void run(OpcUaClient client, CompletableFuture<OpcUaClient> future) throws Exception {
// synchronous connect
client.connect().get();
client.getSubscriptionManager().addSubscriptionListener(new UaSubscriptionManager.SubscriptionListener() {
@Override
public void onSubscriptionTransferFailed(UaSubscription subscription, StatusCode statusCode) {
logger.info("transfer failed, re-creating subscription...");
// re-create a subscription @ 1000ms
try {
UaSubscription newSubscription = client.getSubscriptionManager()
.createSubscription(1000.0)
.get();
// re-create a monitored item
createMonitoredItem(newSubscription);
} catch (InterruptedException | ExecutionException e) {
logger.error("failed to re-create subscription and item!", e);
}
}
});
// create a subscription @ 1000ms
UaSubscription subscription = client.getSubscriptionManager().createSubscription(1000.0).get();
// create a monitored item
createMonitoredItem(subscription);
// let the example run for 5 seconds then terminate
Thread.sleep(60000);
future.complete(client);
}
private void createMonitoredItem(UaSubscription subscription) throws InterruptedException, java.util.concurrent.ExecutionException {
// subscribe to the Value attribute of the server's CurrentTime node
ReadValueId readValueId = new ReadValueId(
Identifiers.Server_ServerStatus_CurrentTime,
AttributeId.Value.uid(), null, QualifiedName.NULL_VALUE
);
// important: client handle must be unique per item
UInteger clientHandle = uint(clientHandles.getAndIncrement());
MonitoringParameters parameters = new MonitoringParameters(
clientHandle,
1000.0, // sampling interval
null, // filter, null means use default
uint(10), // queue size
true // discard oldest
);
MonitoredItemCreateRequest request = new MonitoredItemCreateRequest(
readValueId, MonitoringMode.Reporting, parameters
);
// when creating items in MonitoringMode.Reporting this callback is where each item needs to have its
// value/event consumer hooked up. The alternative is to create the item in sampling mode, hook up the
// consumer after the creation call completes, and then change the mode for all items to reporting.
BiConsumer<UaMonitoredItem, Integer> onItemCreated =
(item, id) -> item.setValueConsumer(this::onSubscriptionValue);
List<UaMonitoredItem> items = subscription.createMonitoredItems(
TimestampsToReturn.Both,
newArrayList(request),
onItemCreated
).get();
for (UaMonitoredItem item : items) {
if (item.getStatusCode().isGood()) {
logger.info("item created for nodeId={}", item.getReadValueId().getNodeId());
} else {
logger.warn(
"failed to create item for nodeId={} (status={})",
item.getReadValueId().getNodeId(), item.getStatusCode());
}
}
}
private void onSubscriptionValue(UaMonitoredItem item, DataValue value) {
logger.info(
"subscription value received: item={}, value={}",
item.getReadValueId().getNodeId(), value.getValue());
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment