Skip to content

Instantly share code, notes, and snippets.

@garyrussell
Last active December 1, 2015 14:07
Show Gist options
  • Save garyrussell/fa29537d30b037e24fea to your computer and use it in GitHub Desktop.
Save garyrussell/fa29537d30b037e24fea to your computer and use it in GitHub Desktop.
Work Around for Unwanted MQTT Unsubscribe - proxy the client
/*
* Copyright 2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.foo;
import org.aopalliance.intercept.MethodInterceptor;
import org.aopalliance.intercept.MethodInvocation;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.eclipse.paho.client.mqttv3.IMqttToken;
import org.eclipse.paho.client.mqttv3.MqttAsyncClient;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.springframework.aop.framework.ProxyFactoryBean;
/**
* @author Gary Russell
* @since 4.3
*
*/
public class IgnoreUnsubscribePahoClientFactory extends DefaultMqttPahoClientFactory {
private static final Log logger = LogFactory.getLog(IgnoreUnsubscribePahoClientFactory.class);
private IMqttToken alwaysComplete;
@Override
public MqttAsyncClient getAsyncClientInstance(String uri, String clientId) throws MqttException {
MqttAsyncClient asyncClientInstance = super.getAsyncClientInstance(uri, clientId);
buildTokenIfNeeded();
return proxy(asyncClientInstance);
}
private MqttAsyncClient proxy(MqttAsyncClient asyncClientInstance) {
ProxyFactoryBean pfb = new ProxyFactoryBean();
pfb.addAdvice(new MethodInterceptor() {
@Override
public Object invoke(MethodInvocation invocation) throws Throwable {
if (invocation.getMethod().getName().equals("unsubscribe")) {
logger.warn("Skipping unsubscribe");
return alwaysComplete;
}
else {
return invocation.proceed();
}
}
});
pfb.setProxyTargetClass(true);
pfb.setTarget(asyncClientInstance);
return (MqttAsyncClient) pfb.getObject();
}
private void buildTokenIfNeeded() {
if (this.alwaysComplete != null) {
return;
}
ProxyFactoryBean pfb = new ProxyFactoryBean();
pfb.addAdvice(new MethodInterceptor() {
@Override
public Object invoke(MethodInvocation invocation) throws Throwable {
return null;
}
});
pfb.setInterfaces(IMqttToken.class);
this.alwaysComplete = (IMqttToken) pfb.getObject();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment