Skip to content

Instantly share code, notes, and snippets.

@kimutansk
Created October 29, 2015 02:34
Show Gist options
  • Save kimutansk/f656a772ad8b369f0964 to your computer and use it in GitHub Desktop.
Save kimutansk/f656a772ad8b369f0964 to your computer and use it in GitHub Desktop.
AWS IoTにJVM系言語(Scala)から接続するには(Subscribe) ref: http://qiita.com/kimutansk/items/37991e59f3cc9c4fd3fa
# mosquitto_pub --cafile rootCA.pem --cert cert.pem --key private.pem -h ******.iot.ap-northeast-1.amazonaws.com -p 8883 -q 1 -d -t test-topic -i mosquitto-publisher -m TestPublishMessage
Client mosquitto-publisher sending CONNECT
Client mosquitto-publisher received CONNACK
Client mosquitto-publisher sending PUBLISH (d0, q1, r0, m1, 'test-topic', ... (18 bytes))
Client mosquitto-publisher received PUBACK (Mid: 1)
Client mosquitto-publisher sending DISCONNECT
(起動コマンド)
Message received. : Topic=test-topic, Payload=TestPublishMessage
import org.eclipse.paho.client.mqttv3.{MqttClient, MqttConnectOptions}
/** MQTT Subscribe Test Class */
object MqttSubscriber {
def main(args: Array[String]) {
// Connect Target
val brokerURI:String = "ssl://******.iot.ap-northeast-1.amazonaws.com:8883"
// SocketFactoryGenerate
val socketFactory = SocketFactoryGenerator.generateFromFilePath("/etc/cert/rootCA.pem", "/etc/cert/cert.pem", "/etc/cert/private.pem")
// MQTT Client generate
val client:MqttClient = new MqttClient(brokerURI, "mqtt-subscriber")
client.setCallback(new SubscribeMqttCallback)
val options:MqttConnectOptions = new MqttConnectOptions()
options.setSocketFactory(socketFactory)
client.connect(options)
client.subscribe("test-topic")
Thread.sleep(60000)
}
}
import java.io.{ByteArrayInputStream, InputStream, InputStreamReader}
import java.nio.file.{Files, Paths}
import java.security.cert.{CertificateFactory, X509Certificate}
import java.security.{KeyPair, KeyStore, Security}
import javax.net.ssl.{KeyManagerFactory, SSLContext, SSLSocketFactory, TrustManagerFactory}
import org.bouncycastle.cert.X509CertificateHolder
import org.bouncycastle.jce.provider.BouncyCastleProvider
import org.bouncycastle.openssl.jcajce.JcaPEMKeyConverter
import org.bouncycastle.openssl.{PEMKeyPair, PEMParser}
/** Factory for [[javax.net.ssl.SSLSocketFactory]] instances. */
object SocketFactoryGenerator {
/**
* Generate [[javax.net.ssl.SSLSocketFactory]] from pem file paths.
*
* @param rootCaFilePath Root CA file path
* @param certFilePath Certificate file path
* @param keyFilePath Private key file path
* @return Generated [[javax.net.ssl.SSLSocketFactory]]
*/
def generateFromFilePath(rootCaFilePath:String, certFilePath:String, keyFilePath:String):SSLSocketFactory = {
Security.addProvider(new BouncyCastleProvider())
// load Root CA certificate
val rootCaParser:PEMParser = new PEMParser(new InputStreamReader(new ByteArrayInputStream(Files.readAllBytes(Paths.get(rootCaFilePath)))))
val rootCaCertHolder:X509CertificateHolder = rootCaParser.readObject().asInstanceOf[X509CertificateHolder]
val rootCaCert:X509Certificate = convertToJavaCertificate(rootCaCertHolder)
rootCaParser.close()
// load Server certificate
val certParser:PEMParser = new PEMParser(new InputStreamReader(new ByteArrayInputStream(Files.readAllBytes(Paths.get(certFilePath)))))
val serverCertHolder:X509CertificateHolder = certParser.readObject.asInstanceOf[X509CertificateHolder]
val serverCert:X509Certificate = convertToJavaCertificate(serverCertHolder)
certParser.close()
// load Private Key
val keyParser:PEMParser = new PEMParser(new InputStreamReader(new ByteArrayInputStream(Files.readAllBytes(Paths.get(keyFilePath)))))
val pemKeyPair:PEMKeyPair = keyParser.readObject.asInstanceOf[PEMKeyPair]
val keyPair:KeyPair = new JcaPEMKeyConverter().getKeyPair(pemKeyPair)
keyParser.close()
// Root CA certificate is used to authenticate server
val rootCAKeyStore:KeyStore = KeyStore.getInstance(KeyStore.getDefaultType())
rootCAKeyStore.load(null, null)
rootCAKeyStore.setCertificateEntry("ca-certificate", convertToJavaCertificate(rootCaCertHolder))
val tmf:TrustManagerFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm())
tmf.init(rootCAKeyStore);
// client key and certificates are sent to server so it can authenticate us
val ks:KeyStore = KeyStore.getInstance(KeyStore.getDefaultType())
ks.load(null, null)
ks.setCertificateEntry("certificate", serverCert)
ks.setKeyEntry("private-key", keyPair.getPrivate(), "DummyPassword".toCharArray, Array(serverCert))
val kmf:KeyManagerFactory = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm())
kmf.init(ks, "DummyPassword".toCharArray());
// finally, create SSL socket factory
val context:SSLContext = SSLContext.getInstance("TLSv1.2")
context.init(kmf.getKeyManagers(), tmf.getTrustManagers(), null)
context.getSocketFactory()
}
def convertToJavaCertificate(certificateHolder:X509CertificateHolder):X509Certificate = {
val is:InputStream = new ByteArrayInputStream(certificateHolder.toASN1Structure.getEncoded);
try {
CertificateFactory.getInstance("X.509").generateCertificate(is).asInstanceOf[X509Certificate]
} finally is.close()
}
}
import org.eclipse.paho.client.mqttv3.{IMqttDeliveryToken, MqttCallback, MqttMessage}
/** Subscribe MqttCallBack */
class SubscribeMqttCallback extends MqttCallback{
override def deliveryComplete(iMqttDeliveryToken: IMqttDeliveryToken): Unit = ???
override def messageArrived(s: String, mqttMessage: MqttMessage): Unit = {
System.out.println("Message received. : Topic=" + s + ", Payload=" + mqttMessage.toString)
}
override def connectionLost(throwable: Throwable): Unit = ???
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment