Skip to content

Instantly share code, notes, and snippets.

@krhoyt
Created October 20, 2014 20:18
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save krhoyt/c3894dc63ff4a73ef6b6 to your computer and use it in GitHub Desktop.
Save krhoyt/c3894dc63ff4a73ef6b6 to your computer and use it in GitHub Desktop.
Custom rolled STOMP client for Arduino Yun. Should work with Ethernet client as well. Shows both publishing and subscribing.
// Libraries
#include <Bridge.h>
#include <Console.h>
#include <YunClient.h>
// Connectivity parameters
#define ENDPOINT "kaazing.kevinhoyt.com"
#define LOGIN " "
#define PASSCODE " "
#define PORT 61613
#define TOPIC_IN "/topic/iota.led"
#define TOPIC_OUT "/topic/iota.photocell"
// Pin constants for LED
const int RED = 11;
const int GREEN = 10;
const int BLUE = 9;
// Constants for photocell
const int PHOTOCELL = 0;
// Connectivity
String response;
String session;
YunClient client;
// Light reading
// Decouple from loop
int light = 0;
// Decoupling delay
int clock = 100;
unsigned long start = 0;
// Setup
void setup()
{
// Pin modes for LED
pinMode( RED, OUTPUT );
pinMode( GREEN, OUTPUT );
pinMode( BLUE, OUTPUT );
// Pin mode for photocell
pinMode( PHOTOCELL, INPUT );
// Yun network connectivity
Bridge.begin();
// Debugging
// Console.begin();
// Wait for bridge
// while( !Console );
// Start client
if( connect() )
{
subscribe( TOPIC_IN );
}
}
// Loop
void loop()
{
unsigned long now;
// Process STOMP data
stomp();
// Get analog pin reading
light = analogRead( PHOTOCELL );
if( clock != 0 )
{
// Milliseconds since boot
now = millis();
// Duration passed to send serial
// Used versus delay function
// Pausing would block serial input
if( ( now - start ) > clock )
{
// Reset timer
start = now;
// Debug
// Console.print( "Slow: " );
// Console.println( light );
// Publish to clients
publish( TOPIC_OUT, String( light ) );
}
} else {
// Debug
// Console.print( "Fast: " );
// Console.println( light );
// Publish to clients
publish( TOPIC_OUT, String( light ) );
}
}
// Callback when messages arrive
void callback()
{
// Hold color values
int blue = 0;
int green = 0;
int red = 0;
// Frame data
String frame;
String message;
// Get header
frame = getValue( response, 0, "\n" );
// Receipt messages
if( frame == "RECEIPT" )
{
// Console.print( "Receipt: " );
// Console.println( getHeader( response, "receipt-id" ) );
} else if( frame == "MESSAGE" ) {
// Message arrived
message = getValue( response, 1, "\n\n" );
// Debug
// Console.print( "Message: " );
// Console.println( message );
if( message.indexOf( "," ) > 0 )
{
// Parse RGB values
red = getValue( message, 0, "," ).toInt();
green = getValue( message, 1, "," ).toInt();
blue = getValue( message, 2, "," ).toInt();
// Send the values to their respective pins
analogWrite( RED, red );
analogWrite( GREEN, green );
analogWrite( BLUE, blue );
} else {
if( message.indexOf( "d" ) == 0 )
{
// Console.print( "New clock: " );
// Console.println( message );
clock = message.substring( 1 ).toInt();
} else {
if( message == "0" )
{
analogWrite( RED, 0 );
analogWrite( GREEN, 0 );
analogWrite( BLUE, 0 );
} else {
analogWrite( RED, 255 );
analogWrite( GREEN, 255 );
analogWrite( BLUE, 255 );
}
}
}
}
}
/**
* STOMP implementation
**/
// Connect to broker
boolean connect()
{
int result = 0;
// Not already connected
if( !connected() )
{
// Connect to broker
result = client.connect( ENDPOINT, PORT );
// Negotiate protocol
if( result )
{
client.println( "CONNECT" );
client.println( "accept-version:1.0" );
client.print( "host:" );
client.println( ENDPOINT );
client.print( "login:" );
client.println( LOGIN );
client.print( "passcode:" );
client.println( PASSCODE );
client.println();
client.write( 0x0 );
// Wait for acknowledge
while( !client.available() ) {;}
// Read resulting frame
readFrame();
// Acknowledge handshake
if( getValue( response, 0, "\n" ) == "CONNECTED" )
{
// Store session
session = getHeader( response, "session" );
// Notify of progress
// Console.print( "Connected (" );
// Console.print( session );
// Console.println( ")." );
// Return connected
return true;
}
}
}
// Fail
return false;
}
// Check to see if connected
boolean connected()
{
boolean result;
if( client == NULL )
{
result = false;
} else {
result = ( int )client.connected();
}
return result;
}
// Count the number of parts in a string
// Used to help replace lack of split
int count( String content, String delimeter )
{
int count = 0;
int end;
int start = 0;
// Count occurances of delimeter
do {
end = content.indexOf( delimeter, start );
start = end + 1;
count = count + 1;
} while( end > 0 );
// Return occurance count
return count;
}
// Reads response for specific header
// Extracts and returns header value
String getHeader( String content, String header )
{
int parts;
int start;
String line;
String prefix;
String result;
// How many lines in response
parts = count( content, "\n" );
// Start on line after frame line
// Look for header prefix match
for( int p = 1; p < parts; p++ )
{
// Header line
// Split into parts
line = getValue( content, p, "\n" );
prefix = getValue( line, 0, ":" );
// If prefix matches
if( prefix == header )
{
// Get value for header
start = line.indexOf( ":" ) + 1;
result = line.substring( start );
break;
}
}
// Return result
return result;
}
// Get a specific section of a string
// Based on delimeters
// Used to replace lack of split
String getValue( String content, int part, String delimeter )
{
int end;
int start = 0;
String result;
// Iterate past unwanted values
for( int count = 0; count < part; count++ )
{
end = content.indexOf( delimeter, start );
start = end + delimeter.length();
}
// Get next occurance of delimeter
// May return -1 if not found
end = content.indexOf( delimeter, start );
// If no more occurances
if( end == -1 )
{
// Must be last value in content
// Parse out remainder
result = content.substring( start );
} else {
// Otherwise parse out segment of content
result = content.substring( start, end );
}
// Clean off white space
result.trim();
// Return resulting content
return result;
}
boolean publish( String topic, String message )
{
if( !connected() )
{
return false;
}
// Debug
// Console.print( "Going to send: " );
// Console.println( message );
// Send the message
client.println( "SEND" );
client.print( "destination:" );
client.println( topic );
client.println();
client.print( message );
client.write( 0x0 );
return true;
}
void readFrame()
{
// Read response
response = client.readStringUntil( 0x0 );
// Clear off last NULL byte
client.read();
}
boolean stomp()
{
// Connected to broker
if( connected() )
{
// Bytes available
if( client.available() )
{
// Read incoming frame
readFrame();
// If response is present
if( response.length() > 0 )
{
callback();
}
}
}
}
void subscribe( String topic )
{
if( connected() )
{
client.println( "SUBSCRIBE" );
client.print( "id:" );
client.println( session );
client.print( "destination:" );
client.println( topic );
client.println( "receipt:subscribed" );
client.println();
client.write( 0x0 );
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment