Skip to content

Instantly share code, notes, and snippets.

@sjohnr
Last active August 29, 2015 14:02
Show Gist options
  • Save sjohnr/986ad20fcb7690981391 to your computer and use it in GitHub Desktop.
Save sjohnr/986ad20fcb7690981391 to your computer and use it in GitHub Desktop.
GSL Codec
.# codec_java.gsl
.#
.# Generates a codec for a protocol specification
.#
.include "library.gsl"
.resolve_includes()
.global.ClassName = java_class_name($(class.name))
.echo "Generating $(ClassName).java..."
.output "$(ClassName).java"
/* ============================================================================
* $(ClassName).java
*
* Generated codec class for $(ClassName)
* ----------------------------------------------------------------------------
* $(string.trim (class->license.):block )
* ============================================================================
*/
package org.$(switches.package:);
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.ArrayList;
import java.util.HashMap;
import java.nio.ByteBuffer;
import org.zeromq.ZFrame;
import org.zeromq.ZMQ;
import org.zeromq.ZMQ.Socket;
/**
* $(ClassName) codec.
*
* The specification for this class is as follows:
* <pre class="text">
.for message
. message.name = "$(message.name:c)"
. for field where item() = 1
. for class.header
. for field as hfield
. copy hfield before field
. endfor
. endfor
. else
. for class.header
. for field as hfield
. copy hfield to message
. endfor
. endfor
. endfor
.
* $(NAME) - $(string.trim(.?''):left)
. for field
. if type = "number"
. size ?= 8
* $(name) $(type) $(size)
. if size = 1
. field.ctype = "int"
. elsif size = 2
. field.ctype = "int"
. elsif size = 4
. field.ctype = "long"
. elsif size = 8
. field.ctype = "long"
. else
. echo "E: bad size $(size) for $(name)"
. endif
. elsif type = "octets"
* $(name) $(type) [$(size)]
. else
* $(name) $(type)
. endif
. field.name = "$(field.name:c)"
. for class.field as cfield where cfield.name = field.name
. if cfield.type <> field.type
. echo "E: field '$(name)' must have same type everywhere"
. endif
. else
. copy field to class
. endfor
. endfor
.endfor
* </pre>
*
* @author $(switches.author:)
*/
public class $(ClassName) {
.for define
public static final int $(CLASS.NAME)_$(DEFINE.NAME:C) = $(value);
.endfor
.for message
public static final int $(MESSAGE.NAME) = $(id);
.endfor
.for class.field where type = "octets"
public static final int $(FIELD.NAME)_SIZE = $(size);
.endfor
// Structure of our class
private ZFrame address; // Address of peer if any
private int id; // $(ClassName) message ID
private ByteBuffer needle; // Read/write pointer for serialization
.for class.field
. if type = "number"
private $(ctype) $(name);
. elsif type = "octets"
private byte[] $(name) = new byte[$(size)];
. elsif type = "number"
private long $(name);
. elsif type = "string"
private String $(name);
. elsif type = "strings"
private List<String> $(name);
. elsif type = "dictionary"
private Map<String, String> $(name);
private int $(name)Bytes;
. elsif type = "frame"
private ZFrame $(name);
. endif
.endfor
/**
* Create a new $(ClassName).
*
* @param id The Message ID
*/
public $(ClassName)(int id) {
this.id = id;
}
/**
* Destroy the $(ClassName).
*/
public void destroy() {
// Free class properties
if (address != null)
address.destroy();
address = null;
}
// --------------------------------------------------------------------------
// Network data encoding macros
// Put a 1-byte number to the frame
private final void putNumber1(int value) {
needle.put((byte) value);
}
// Get a 1-byte number to the frame
// then make it unsigned
private int getNumber1() {
int value = needle.get();
if (value < 0)
value =(0xff) & value;
return value;
}
// Put a 2-byte number to the frame
private final void putNumber2(int value) {
needle.putShort((short) value);
}
// Get a 2-byte number to the frame
private int getNumber2() {
int value = needle.getShort();
if (value < 0)
value =(0xffff) & value;
return value;
}
// Put a 4-byte number to the frame
private final void putNumber4(long value) {
needle.putInt((int) value);
}
// Get a 4-byte number to the frame
// then make it unsigned
private long getNumber4() {
long value = needle.getInt();
if (value < 0)
value =(0xffffffff) & value;
return value;
}
// Put a 8-byte number to the frame
public void putNumber8(long value) {
needle.putLong(value);
}
// Get a 8-byte number to the frame
public long getNumber8() {
return needle.getLong();
}
// Put a block to the frame
private void putBlock(byte[] value, int size) {
needle.put(value, 0, size);
}
private byte[] getBlock(int size) {
byte[] value = new byte[size];
needle.get(value);
return value;
}
// Put a string to the frame
public void putString(String value) {
needle.put((byte) value.length());
needle.put(value.getBytes());
}
// Get a string from the frame
public String getString() {
int size = getNumber1();
byte[] value = new byte[size];
needle.get(value);
return new String(value);
}
/**
* Receive and parse a $(ClassName) from the socket. Returns new object or
* null if error. Will block if there's no message waiting.
*
* @param input The socket used to receive this $(ClassName)
*/
public static $(ClassName) recv(Socket input) {
assert (input != null);
$(ClassName) self = new $(ClassName)(0);
ZFrame frame = null;
try {
// Read valid message frame from socket; we loop over any
// garbage data we might receive from badly-connected peers
while (true) {
// If we're reading from a ROUTER socket, get address
if (input.getType() == ZMQ.ROUTER) {
self.address = ZFrame.recvFrame(input);
if (self.address == null)
return null; // Interrupted
if (!input.hasReceiveMore())
throw new IllegalArgumentException();
}
// Read and parse command in frame
frame = ZFrame.recvFrame(input);
if (frame == null)
return null; // Interrupted
// Get and check protocol signature
self.needle = ByteBuffer.wrap(frame.getData());
int signature = self.getNumber2();
if (signature ==(0xAAA0 | $(class.signature)))
break; // Valid signature
// Protocol assertion, drop message
while (input.hasReceiveMore()) {
frame.destroy();
frame = ZFrame.recvFrame(input);
}
frame.destroy();
}
// Get message id, which is first byte in frame
self.id = self.getNumber1();
int listSize;
int hashSize;
switch (self.id) {
.for class.message
case $(MESSAGE.NAME):
. for field
. if type = "number"
self.$(name) = self.getNumber$(size)();
. if defined(field.value)
if (self.$(name) != $(field.value:))
throw new IllegalArgumentException();
. endif
. elsif type = "octets"
self.$(name) = self.getBlock($(size));
. if defined(field.value)
if (self.$(name) != $(field.value:))
throw new IllegalArgumentException();
. endif
. elsif type = "string"
self.$(name) = self.getString();
. if defined(field.value)
if (!self.$(name).equals( "$(field.value:)"))
throw new IllegalArgumentException();
. endif
. elsif type = "strings"
listSize = self.getNumber1();
self.$(name) = new ArrayList<String>();
while (listSize-- > 0) {
String string = self.getString();
self.$(name).add(string);
}
. elsif type = "dictionary"
hashSize = self.getNumber1();
self.$(name) = new HashMap<String, String>();
while (hashSize-- > 0) {
String string = self.getString();
String[] kv = string.split("=");
self.$(name).put(kv[0], kv[1]);
}
. elsif type = "frame"
// Get next frame, leave current untouched
if (!input.hasReceiveMore())
throw new IllegalArgumentException();
self.$(name) = ZFrame.recvFrame(input);
. endif
. endfor
break;
.endfor
default:
throw new IllegalArgumentException();
}
return self;
} catch (Exception e) {
// Error returns
System.out.printf("E: malformed message '%d'\\n", self.id);
self.destroy();
return null;
} finally {
if (frame != null)
frame.destroy();
}
}
.for class.field where type = "dictionary"
// Count size of key=value pair
private static void $(name)Count(final Map.Entry<String, String> entry, $(ClassName) self) {
self.$(name)Bytes += entry.getKey().length() + 1 + entry.getValue().length() + 1;
}
// Serialize $(name) key=value pair
private static void $(name)Write(final Map.Entry<String, String> entry, $(ClassName) self) {
String string = entry.getKey() + "=" + entry.getValue();
self.putString(string);
}
.endfor
/**
* Send the $(ClassName) to the socket, and destroy it.
*
* @param socket The socket used to send this $(ClassName)
*/
public boolean send(Socket socket) {
assert (socket != null);
// Calculate size of serialized data
int frameSize = 2 + 1; // Signature and message ID
switch (id) {
.for class.message
case $(MESSAGE.NAME):
. for field
. if type = "number"
// $(name) is a $(size)-byte integer
frameSize += $(size);
. elsif type = "octets"
// $(name) is a block of $(size) bytes
frameSize += $(size);
. elsif type = "string"
// $(name) is a string with 1-byte length
. if defined(field.value)
frameSize += 1 + "$(field.value:)".length();
. else
frameSize++; // Size is one octet
if ($(name) != null)
frameSize += $(name).length();
. endif
. elsif type = "strings"
// $(name) is an array of strings
frameSize++; // Size is one octet
if ($(name) != null) {
for (String value : $(name))
frameSize += 1 + value.length();
}
. elsif type = "dictionary"
// $(name) is an array of key=value strings
frameSize++; // Size is one octet
if ($(name) != null) {
$(name)Bytes = 0;
for (Map.Entry<String, String> entry: $(name).entrySet()) {
$(name)Count(entry, this);
}
frameSize += $(name)Bytes;
}
. endif
. endfor
break;
.endfor
default:
System.out.printf("E: bad message type '%d', not sent\\n", id);
assert (false);
}
// Now serialize message into the frame
ZFrame frame = new ZFrame(new byte[frameSize]);
needle = ByteBuffer.wrap(frame.getData());
int frameFlags = 0;
putNumber2(0xAAA0 | $(class.signature));
putNumber1((byte) id);
switch (id) {
.for class.message
case $(MESSAGE.NAME):
. for field
. if type = "number"
. if defined(field.value)
putNumber$(size)($(field.value:));
. else
putNumber$(size)($(name));
. endif
. elsif type = "octets"
putBlock($(name), $(size));
. elsif type = "string"
. if defined(field.value)
putString("$(field.value:)");
. else
if ($(name) != null)
putString($(name));
else
putNumber1((byte) 0); // Empty string
. endif
. elsif type = "strings"
if ($(name) != null) {
putNumber1((byte) $(name).size());
for (String value : $(name)) {
putString(value);
}
}
else
putNumber1((byte) 0); // Empty string array
. elsif type = "dictionary"
if ($(name) != null) {
putNumber1((byte) $(name).size());
for (Map.Entry<String, String> entry: $(name).entrySet()) {
$(name)Write(entry, this);
}
}
else
putNumber1((byte) 0); // Empty dictionary
. elsif type = "frame"
frameFlags = ZMQ.SNDMORE;
. endif
. endfor
break;
.endfor
}
// If we're sending to a ROUTER, we send the address first
if (socket.getType() == ZMQ.ROUTER) {
assert (address != null);
if (!address.sendAndDestroy(socket, ZMQ.SNDMORE)) {
destroy();
return false;
}
}
// Now send the data frame
if (!frame.sendAndDestroy(socket, frameFlags)) {
frame.destroy();
destroy();
return false;
}
// Now send any frame fields, in order
switch (id) {
.for class.message where count(field, type = "frame")
case $(MESSAGE.NAME):
. for field where type = "frame"
// If $(name) isn't set, send an empty frame
if ($(field.name) == null)
$(field.name) = new ZFrame("".getBytes());
. if last()
if (!$(field.name).sendAndDestroy(socket, 0)) {
. else
if (!$(field.name).sendAndDestroy(socket, ZMQ.SNDMORE)) {
. endif
frame.destroy();
destroy();
return false;
}
. endfor
break;
.endfor
}
// Destroy $(ClassName) object
destroy();
return true;
}
.for message
/**
* Send the $(message.NAME) to the socket in one step.
*/
public static void send$(Name)(Socket output\
.for field where !defined(value)
,
. if type = "number"
$(ctype) $(name)\
. elsif type = "octets"
byte[] $(name)\
. elsif type = "string"
String $(name)\
. elsif type = "strings"
Collection<String> $(name)\
. elsif type = "dictionary"
Map<String, String> $(name)\
. elsif type = "frame"
ZFrame $(name)\
. endif
.endfor
) {
$(ClassName) self = new $(ClassName)($(ClassName).$(MESSAGE.NAME));
.for field where !defined(value)
. if type = "number" | type = "octets" | type = "string"
self.set$(Name)($(name));
. elsif type = "strings"
self.set$(Name)(new ArrayList<String>($(name)));
. elsif type = "dictionary"
self.set$(Name)(new HashMap<String, String>($(name)));
. elsif type = "frame"
self.set$(Name)($(name).duplicate());
. endif
.endfor
self.send(output);
}
.endfor
/**
* Duplicate the $(ClassName) message.
*
* @param self The instance of $(ClassName) to duplicate
*/
public $(ClassName) dup($(ClassName) self) {
if (self == null)
return null;
$(ClassName) copy = new $(ClassName)(self.id);
if (self.address != null)
copy.address = self.address.duplicate();
switch (self.id) {
.for class.message
case $(MESSAGE.NAME):
. for field
. if type = "number"
copy.$(name) = self.$(name);
. elsif type = "octets"
System.arraycopy(copy.$(name), 0, self.$(name), 0, $(size));
. elsif type = "string"
copy.$(name) = self.$(name);
. elsif type = "strings"
copy.$(name) = new ArrayList<String>(self.$(name));
. elsif type = "dictionary"
copy.$(name) = new HashMap<String, String>(self.$(name));
. elsif type = "frame"
copy.$(name) = self.$(name).duplicate();
. endif
. endfor
break;
.endfor
}
return copy;
}
.for class.field where type = "dictionary"
/**
* Dump $(name) key=value pair to stdout.
*
* @param entry The entry to dump
* @param self The $(ClassName) instance
*/
public static void $(name)Dump(Map.Entry<String, String> entry, $(ClassName) self) {
System.out.printf(" %s=%s\\n", entry.getKey(), entry.getValue());
}
.endfor
/**
* Print contents of message to stdout.
*/
public void dump() {
switch (id) {
.for class.message
case $(MESSAGE.NAME):
System.out.println("$(MESSAGE.NAME):");
. for field
. if type = "number"
. if defined(field.value)
System.out.printf(" $(name)=$(field.value)\\n");
. else
System.out.printf(" $(name)=%d\\n",(long)$(name));
. endif
. elsif type = "octets"
System.out.printf(" $(name)=");
int $(name)Index;
for ($(name)Index = 0; $(name)Index < $(size); $(name)Index++) {
if ($(name)Index != 0 &&($(name)Index % 4 == 0))
System.out.printf("-");
System.out.printf("%02X", $(name)[$(name)Index]);
}
System.out.printf("\\n");
. elsif type = "string"
. if defined(field.value)
System.out.printf(" $(name)=$(field.value)\\n");
. else
if ($(name) != null)
System.out.printf(" $(name)='%s'\\n", $(name));
else
System.out.printf(" $(name)=\\n");
. endif
. elsif type = "strings"
System.out.printf(" $(name)={");
if ($(name) != null) {
for (String value : $(name)) {
System.out.printf(" '%s'", value);
}
}
System.out.printf(" }\\n");
. elsif type = "dictionary"
System.out.printf(" $(name)={\\n");
if ($(name) != null) {
for (Map.Entry<String, String> entry : $(name).entrySet())
$(name)Dump(entry, this);
}
System.out.printf(" }\\n");
. elsif type = "frame"
System.out.printf(" $(name)={\\n");
if ($(name) != null) {
int size = $(name).size();
byte[] data = $(name).getData();
System.out.printf(" size=%d\\n", $(name).size());
if (size > 32)
size = 32;
int $(name)Index;
for ($(name)Index = 0; $(name)Index < size; $(name)Index++) {
if ($(name)Index != 0 &&($(name)Index % 4 == 0))
System.out.printf("-");
System.out.printf("%02X", data[$(name)Index]);
}
}
System.out.printf(" }\\n");
. endif
. endfor
break;
.endfor
}
}
/**
* Get the message address.
*
* @return The message address frame
*/
public ZFrame getAddress() {
return address;
}
/**
* Set the message address.
*
* @param address The new message address
*/
public void setAddress(ZFrame address) {
if (this.address != null)
this.address.destroy();
this.address = address.duplicate();
}
/**
* Get the $(ClassName) id.
*
* @return The $(ClassName) id
*/
public int getId() {
return id;
}
/**
* Set the $(ClassName) id.
*
* @param id The new message id
*/
public void setId(int id) {
this.id = id;
}
.for class.field where !defined(value)
. if type = "number"
/**
* Get the $(name) field.
*
* @return The $(name) field
*/
public $(ctype) get$(Name)() {
return $(name);
}
/**
* Set the $(name) field.
*
* @param $(name) The $(name) field
*/
public void set$(Name)($(ctype) $(name)) {
this.$(name) = $(name);
}
. elsif type = "octets"
/**
* Get the $(name) field.
*
* @return The $(name) field
*/
public byte[] get$(Name)() {
return $(name);
}
/**
* Set the $(name) field.
*
* @param $(name) The $(name) field
*/
public void set$(Name)(byte[] $(name)) {
System.arraycopy($(name), 0, this.$(name), 0, $(size));
}
. elsif type = "string"
/**
* Get the $(name) field.
*
* @return The $(name) field
*/
public String get$(Name)() {
return $(name);
}
/**
* Set the $(name) field.
*
* @param $(name) The $(name) field
*/
public void set$(Name)(String format, Object... args) {
// Format into newly allocated string
$(name) = String.format(format, args);
}
. elsif type = "strings"
/**
* Get the list of $(name) strings.
*
* @return The $(name) strings
*/
public List<String> get$(Name)() {
return $(name);
}
/**
* Iterate through the $(name) field, and append a $(name) value.
*
* @param format The string format
* @param args The arguments used to build the string
*/
public void add$(stem_s(Name))(String format, Object... args) {
// Format into newly allocated string
String string = String.format(format, args);
// Attach string to list
if ($(name) == null)
$(name) = new ArrayList<String>();
$(name).add(string);
}
/**
* Set the list of $(name) strings.
*
* @param value The collection of strings
*/
public void set$(Name)(Collection<String> value) {
$(name) = new ArrayList(value);
}
. elsif type = "dictionary"
/**
* Get the the $(name) dictionary.
*
* @return The $(name) dictionary
*/
public Map<String, String> get$(Name)() {
return $(name);
}
/**
* Get a value in the $(name) dictionary as a string.
*
* @param key The dictionary key
* @param defaultValue the default value if the key does not exist
*/
public String get$(stem_s(Name))String(String key, String defaultValue) {
String value = null;
if ($(name) != null)
value = $(name).get(key);
if (value == null)
value = defaultValue;
return value;
}
/**
* Get a value in the $(name) dictionary as a long.
*
* @param key The dictionary key
* @param defaultValue the default value if the key does not exist
*/
public long get$(stem_s(Name))Number(String key, long defaultValue) {
long value = defaultValue;
String string = null;
if ($(name) != null)
string = $(name).get(key);
if (string != null)
value = Long.valueOf(string);
return value;
}
/**
* Set a value in the $(name) dictionary.
*
* @param key The dictionary key
* @param format The string format
* @param args The arguments used to build the string
*/
public void put$(stem_s(Name))(String key, String format, Object... args) {
// Format string into buffer
String string = String.format(format, args);
// Store string in hash table
if ($(name) == null)
$(name) = new HashMap<String, String>();
$(name).put(key, string);
$(name)Bytes += key.length() + 1 + string.length();
}
/**
* Set the $(name) dictionary.
*
* @param value The new $(name) dictionary
*/
public void set$(Name)(Map<String, String> value) {
if (value != null)
$(name) = new HashMap<String, String>(value);
else
$(name) = value;
}
. elsif type = "frame"
/**
* Get the $(name) field.
*
* @return The $(name) field
*/
public ZFrame get$(Name)() {
return $(name);
}
/**
* Set the $(name) field, and takes ownership of supplied frame.
*
* @param frame The new $(name) frame
*/
public void set$(Name)(ZFrame frame) {
if ($(name) != null)
$(name).destroy();
$(name) = frame;
}
. endif
.endfor
}
.echo "Generating zeromq/zyre/Test$(ClassName).java..."
.output "Test$(ClassName).java"
package org.$(switches.package:);
import static org.junit.Assert.*;
import org.junit.Test;
import org.zeromq.ZMQ;
import org.zeromq.ZMQ.Socket;
import org.zeromq.ZFrame;
import org.zeromq.ZContext;
/**
* Test $(ClassName).
*/
public class Test$(ClassName) {
@Test
public void test$(ClassName)() {
System.out.printf(" * $(class.name): ");
// Simple create/destroy test
$(ClassName) self = new $(ClassName)(0);
assert (self != null);
self.destroy();
// Create pair of sockets we can send through
ZContext ctx = new ZContext();
assert (ctx != null);
Socket output = ctx.createSocket(ZMQ.DEALER);
assert (output != null);
output.bind("inproc://selftest");
Socket input = ctx.createSocket(ZMQ.ROUTER);
assert (input != null);
input.connect("inproc://selftest");
// Encode/send/decode and verify each message type
.for class.message
self = new $(ClassName)($(ClassName).$(MESSAGE.NAME));
. for field where !defined(value)
. if type = "number"
self.set$(Name)((byte) 123);
. elsif type = "octets"
byte[] $(name)Data = new byte[$(ClassName).$(FIELD.NAME)_SIZE];
for (int i=0; i < $(ClassName).$(FIELD.NAME)_SIZE; i++)
$(name)Data[i] = 123;
self.set$(Name)($(name)Data);
. elsif type = "string"
self.set$(Name)("Life is short but Now lasts for ever");
. elsif type = "strings"
self.add$(stem_s(Name))("Name: %s", "Brutus");
self.add$(stem_s(Name))("Age: %d", 43);
. elsif type = "dictionary"
self.put$(stem_s(Name))("Name", "Brutus");
self.put$(stem_s(Name))("Age", "%d", 43);
. elsif type = "frame"
self.set$(Name)(new ZFrame("Captcha Diem"));
. endif
. endfor
self.send(output);
self = $(ClassName).recv(input);
assert (self != null);
. for field where !defined(value)
. if type = "number"
assertEquals(self.get$(Name)(), 123);
. elsif type = "octets"
assertEquals(self.get$(Name)()[0], 123);
assertEquals(self.$(name)()[$(ClassName).$(FIELD.NAME)_SIZE - 1], 123);
. elsif type = "string"
assertEquals(self.get$(Name)(), "Life is short but Now lasts for ever");
. elsif type = "strings"
assertEquals(self.get$(Name)().size(), 2);
assertEquals(self.get$(Name)().get(0), "Name: Brutus");
assertEquals(self.get$(Name)().get(1), "Age: 43");
. elsif type = "dictionary"
assertEquals(self.get$(Name)().size(), 2);
assertEquals(self.get$(stem_s(Name))String("Name", "?"), "Brutus");
assertEquals(self.get$(stem_s(Name))Number("Age", 0), 43);
. elsif type = "frame"
assertTrue(self.get$(Name)().streq("Captcha Diem"));
. endif
. endfor
self.destroy();
.endfor
ctx.destroy();
System.out.printf("OK\\n");
}
}
<?xml version="1.0"?>
<deposits script = "interest.gsl" >
<deposit amount = "1000000" rate = "5" years = "20" />
<deposit amount = "500000" rate = "4" years = "10" />
<deposit amount = "2500000" rate = "6" years = "15" />
</deposits>
gsl -q -script:codec_java -package:zyre -author:$USER zre_msg.xml zre_log_msg.xml
echo "hello world"
.template 0
for deposit
year = 1
accumulated = amount
while year < years
accumulated = accumulated * (rate / 100 + 1)
year = year + 1
endwhile
echo "Original amount:" + amount + " becomes: " + accumulated
endfor
.endtemplate
.# Library functions
.#
.function resolve_includes()
for class.include
if defined(include.filename)
my.include_file = class.load_file(filename)
if defined(my.include_file)
move my.include_file after include
else
echo "E: error loading include file: $(filename): $(xml.error?)"
endif
else
echo "E: required attribute 'filename' not defined"
endif
endfor
.endfunction
.function java_class_name(name)
neatName = "$(Name)"
return "$(string.replace(neatName, '_'))"
.endfunction
.function stem_s(name)
my.retval = "$(Name)"
my.last_char = string.substr(my.retval, string.length(my.retval)-1, string.length(my.retval))
if my.last_char = "s"
my.retval = string.substr(my.retval, 0, string.length(my.retval)-2)
endif
return my.retval
.endfunction
<license>
Copyright (c) 1991-2012 iMatix Corporation -- http://www.imatix.com
Copyright other contributors as noted in the AUTHORS file.
This file is part of Zyre, an open-source framework for proximity-based
peer-to-peer applications -- See http://zyre.org.
This is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License as published by the
Free Software Foundation; either version 3 of the License, or (at your
option) any later version.
This software is distributed in the hope that it will be useful, but
WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTA-
BILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General
Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see http://www.gnu.org/licenses/.
</license>
package org.zyre;
import static org.junit.Assert.*;
import org.junit.Test;
import org.zeromq.ZMQ;
import org.zeromq.ZMQ.Socket;
import org.zeromq.ZFrame;
import org.zeromq.ZContext;
/**
* Test ZreLogMsg.
*/
public class TestZreLogMsg {
@Test
public void testZreLogMsg() {
System.out.printf(" * zre_log_msg: ");
// Simple create/destroy test
ZreLogMsg self = new ZreLogMsg(0);
assert (self != null);
self.destroy();
// Create pair of sockets we can send through
ZContext ctx = new ZContext();
assert (ctx != null);
Socket output = ctx.createSocket(ZMQ.DEALER);
assert (output != null);
output.bind("inproc://selftest");
Socket input = ctx.createSocket(ZMQ.ROUTER);
assert (input != null);
input.connect("inproc://selftest");
// Encode/send/decode and verify each message type
self = new ZreLogMsg(ZreLogMsg.LOG);
self.setLevel((byte) 123);
self.setEvent((byte) 123);
self.setNode((byte) 123);
self.setPeer((byte) 123);
self.setTime((byte) 123);
self.setData("Life is short but Now lasts for ever");
self.send(output);
self = ZreLogMsg.recv(input);
assert (self != null);
assertEquals(self.getLevel(), 123);
assertEquals(self.getEvent(), 123);
assertEquals(self.getNode(), 123);
assertEquals(self.getPeer(), 123);
assertEquals(self.getTime(), 123);
assertEquals(self.getData(), "Life is short but Now lasts for ever");
self.destroy();
ctx.destroy();
System.out.printf("OK\n");
}
}
package org.zyre;
import static org.junit.Assert.*;
import org.junit.Test;
import org.zeromq.ZMQ;
import org.zeromq.ZMQ.Socket;
import org.zeromq.ZFrame;
import org.zeromq.ZContext;
/**
* Test ZreMsg.
*/
public class TestZreMsg {
@Test
public void testZreMsg() {
System.out.printf(" * zre_msg: ");
// Simple create/destroy test
ZreMsg self = new ZreMsg(0);
assert (self != null);
self.destroy();
// Create pair of sockets we can send through
ZContext ctx = new ZContext();
assert (ctx != null);
Socket output = ctx.createSocket(ZMQ.DEALER);
assert (output != null);
output.bind("inproc://selftest");
Socket input = ctx.createSocket(ZMQ.ROUTER);
assert (input != null);
input.connect("inproc://selftest");
// Encode/send/decode and verify each message type
self = new ZreMsg(ZreMsg.HELLO);
self.setSequence((byte) 123);
self.setIpaddress("Life is short but Now lasts for ever");
self.setMailbox((byte) 123);
self.addGroups("Name: %s", "Brutus");
self.addGroups("Age: %d", 43);
self.setStatus((byte) 123);
self.putHeaders("Name", "Brutus");
self.putHeaders("Age", "%d", 43);
self.send(output);
self = ZreMsg.recv(input);
assert (self != null);
assertEquals(self.getSequence(), 123);
assertEquals(self.getIpaddress(), "Life is short but Now lasts for ever");
assertEquals(self.getMailbox(), 123);
assertEquals(self.getGroups().size(), 2);
assertEquals(self.getGroups().get(0), "Name: Brutus");
assertEquals(self.getGroups().get(1), "Age: 43");
assertEquals(self.getStatus(), 123);
assertEquals(self.getHeaders().size(), 2);
assertEquals(self.getHeadersString("Name", "?"), "Brutus");
assertEquals(self.getHeadersNumber("Age", 0), 43);
self.destroy();
self = new ZreMsg(ZreMsg.WHISPER);
self.setSequence((byte) 123);
self.setContent(new ZFrame("Captcha Diem"));
self.send(output);
self = ZreMsg.recv(input);
assert (self != null);
assertEquals(self.getSequence(), 123);
assertTrue(self.getContent().streq("Captcha Diem"));
self.destroy();
self = new ZreMsg(ZreMsg.SHOUT);
self.setSequence((byte) 123);
self.setGroup("Life is short but Now lasts for ever");
self.setContent(new ZFrame("Captcha Diem"));
self.send(output);
self = ZreMsg.recv(input);
assert (self != null);
assertEquals(self.getSequence(), 123);
assertEquals(self.getGroup(), "Life is short but Now lasts for ever");
assertTrue(self.getContent().streq("Captcha Diem"));
self.destroy();
self = new ZreMsg(ZreMsg.JOIN);
self.setSequence((byte) 123);
self.setGroup("Life is short but Now lasts for ever");
self.setStatus((byte) 123);
self.send(output);
self = ZreMsg.recv(input);
assert (self != null);
assertEquals(self.getSequence(), 123);
assertEquals(self.getGroup(), "Life is short but Now lasts for ever");
assertEquals(self.getStatus(), 123);
self.destroy();
self = new ZreMsg(ZreMsg.LEAVE);
self.setSequence((byte) 123);
self.setGroup("Life is short but Now lasts for ever");
self.setStatus((byte) 123);
self.send(output);
self = ZreMsg.recv(input);
assert (self != null);
assertEquals(self.getSequence(), 123);
assertEquals(self.getGroup(), "Life is short but Now lasts for ever");
assertEquals(self.getStatus(), 123);
self.destroy();
self = new ZreMsg(ZreMsg.PING);
self.setSequence((byte) 123);
self.send(output);
self = ZreMsg.recv(input);
assert (self != null);
assertEquals(self.getSequence(), 123);
self.destroy();
self = new ZreMsg(ZreMsg.PING_OK);
self.setSequence((byte) 123);
self.send(output);
self = ZreMsg.recv(input);
assert (self != null);
assertEquals(self.getSequence(), 123);
self.destroy();
ctx.destroy();
System.out.printf("OK\n");
}
}
<class name = "zre_log_msg" signature = "2" title = "work with ZRE logging messages">
This is the ZRE logging protocol raw version.
<include filename = "license.xml" />
<!-- Protocol constants -->
<define name = "VERSION" value = "1" />
<define name = "LEVEL_ERROR" value = "1" />
<define name = "LEVEL_WARNING" value = "2" />
<define name = "LEVEL_INFO" value = "3" />
<define name = "EVENT_JOIN" value = "1" />
<define name = "EVENT_LEAVE" value = "2" />
<define name = "EVENT_ENTER" value = "3" />
<define name = "EVENT_EXIT" value = "4" />
<message name = "LOG" id = "1">
<field name = "level" type = "number" size = "1" />
<field name = "event" type = "number" size = "1" />
<field name = "node" type = "number" size = "2" />
<field name = "peer" type = "number" size = "2" />
<field name = "time" type = "number" size = "8" />
<field name = "data" type = "string" />
Log an event
</message>
</class>
<class name = "zre_msg" signature = "1" title = "work with ZRE messages">
This is the ZRE protocol raw version.
<include filename = "license.xml" />
<!-- Protocol constants -->
<define name = "VERSION" value = "1" />
<!-- Header for all messages -->
<header>
<field name = "sequence" type = "number" size = "2" />
</header>
<message name = "HELLO" id = "1">
<field name = "ipaddress" type = "string" />
<field name = "mailbox" type = "number" size = "2" />
<field name = "groups" type = "strings" />
<field name = "status" type = "number" size = "1" />
<field name = "headers" type = "dictionary" />
Greet a peer so it can connect back to us
</message>
<message name = "WHISPER" id = "2">
<field name = "content" type = "frame" />
Send a message to a peer
</message>
<message name = "SHOUT" id = "3">
<field name = "group" type = "string" />
<field name = "content" type = "frame" />
Send a message to a group
</message>
<message name = "JOIN" id = "4">
<field name = "group" type = "string" />
<field name = "status" type = "number" size = "1" />
Join a group
</message>
<message name = "LEAVE" id = "5">
<field name = "group" type = "string" />
<field name = "status" type = "number" size = "1" />
Leave a group
</message>
<message name = "PING" id = "6">
Ping a peer that has gone silent
</message>
<message name = "PING-OK" id = "7">
Reply to a peer's ping
</message>
</class>
/* ============================================================================
* ZreLogMsg.java
*
* Generated codec class for ZreLogMsg
* ----------------------------------------------------------------------------
* Copyright (c) 1991-2012 iMatix Corporation -- http://www.imatix.com
* Copyright other contributors as noted in the AUTHORS file.
*
* This file is part of Zyre, an open-source framework for proximity-based
* peer-to-peer applications -- See http://zyre.org.
*
* This is free software; you can redistribute it and/or modify it under
* the terms of the GNU Lesser General Public License as published by the
* Free Software Foundation; either version 3 of the License, or (at your
* option) any later version.
*
* This software is distributed in the hope that it will be useful, but
* WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTA-
* BILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General
* Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this program. If not, see http://www.gnu.org/licenses/.
* ============================================================================
*/
package org.zyre;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.ArrayList;
import java.util.HashMap;
import java.nio.ByteBuffer;
import org.zeromq.ZFrame;
import org.zeromq.ZMQ;
import org.zeromq.ZMQ.Socket;
/**
* ZreLogMsg codec.
*
* The specification for this class is as follows:
* <pre class="text">
* LOG - Log an event
* level number 1
* event number 1
* node number 2
* peer number 2
* time number 8
* data string
* </pre>
*
* @author sriesenberg
*/
public class ZreLogMsg {
public static final int ZRE_LOG_MSG_VERSION = 1;
public static final int ZRE_LOG_MSG_LEVEL_ERROR = 1;
public static final int ZRE_LOG_MSG_LEVEL_WARNING = 2;
public static final int ZRE_LOG_MSG_LEVEL_INFO = 3;
public static final int ZRE_LOG_MSG_EVENT_JOIN = 1;
public static final int ZRE_LOG_MSG_EVENT_LEAVE = 2;
public static final int ZRE_LOG_MSG_EVENT_ENTER = 3;
public static final int ZRE_LOG_MSG_EVENT_EXIT = 4;
public static final int LOG = 1;
// Structure of our class
private ZFrame address; // Address of peer if any
private int id; // ZreLogMsg message ID
private ByteBuffer needle; // Read/write pointer for serialization
private int level;
private int event;
private int node;
private int peer;
private long time;
private String data;
/**
* Create a new ZreLogMsg.
*
* @param id The Message ID
*/
public ZreLogMsg(int id) {
this.id = id;
}
/**
* Destroy the ZreLogMsg.
*/
public void destroy() {
// Free class properties
if (address != null)
address.destroy();
address = null;
}
// --------------------------------------------------------------------------
// Network data encoding macros
// Put a 1-byte number to the frame
private final void putNumber1(int value) {
needle.put((byte) value);
}
// Get a 1-byte number to the frame
// then make it unsigned
private int getNumber1() {
int value = needle.get();
if (value < 0)
value =(0xff) & value;
return value;
}
// Put a 2-byte number to the frame
private final void putNumber2(int value) {
needle.putShort((short) value);
}
// Get a 2-byte number to the frame
private int getNumber2() {
int value = needle.getShort();
if (value < 0)
value =(0xffff) & value;
return value;
}
// Put a 4-byte number to the frame
private final void putNumber4(long value) {
needle.putInt((int) value);
}
// Get a 4-byte number to the frame
// then make it unsigned
private long getNumber4() {
long value = needle.getInt();
if (value < 0)
value =(0xffffffff) & value;
return value;
}
// Put a 8-byte number to the frame
public void putNumber8(long value) {
needle.putLong(value);
}
// Get a 8-byte number to the frame
public long getNumber8() {
return needle.getLong();
}
// Put a block to the frame
private void putBlock(byte[] value, int size) {
needle.put(value, 0, size);
}
private byte[] getBlock(int size) {
byte[] value = new byte[size];
needle.get(value);
return value;
}
// Put a string to the frame
public void putString(String value) {
needle.put((byte) value.length());
needle.put(value.getBytes());
}
// Get a string from the frame
public String getString() {
int size = getNumber1();
byte[] value = new byte[size];
needle.get(value);
return new String(value);
}
/**
* Receive and parse a ZreLogMsg from the socket. Returns new object or
* null if error. Will block if there's no message waiting.
*
* @param input The socket used to receive this ZreLogMsg
*/
public static ZreLogMsg recv(Socket input) {
assert (input != null);
ZreLogMsg self = new ZreLogMsg(0);
ZFrame frame = null;
try {
// Read valid message frame from socket; we loop over any
// garbage data we might receive from badly-connected peers
while (true) {
// If we're reading from a ROUTER socket, get address
if (input.getType() == ZMQ.ROUTER) {
self.address = ZFrame.recvFrame(input);
if (self.address == null)
return null; // Interrupted
if (!input.hasReceiveMore())
throw new IllegalArgumentException();
}
// Read and parse command in frame
frame = ZFrame.recvFrame(input);
if (frame == null)
return null; // Interrupted
// Get and check protocol signature
self.needle = ByteBuffer.wrap(frame.getData());
int signature = self.getNumber2();
if (signature ==(0xAAA0 | 2))
break; // Valid signature
// Protocol assertion, drop message
while (input.hasReceiveMore()) {
frame.destroy();
frame = ZFrame.recvFrame(input);
}
frame.destroy();
}
// Get message id, which is first byte in frame
self.id = self.getNumber1();
int listSize;
int hashSize;
switch (self.id) {
case LOG:
self.level = self.getNumber1();
self.event = self.getNumber1();
self.node = self.getNumber2();
self.peer = self.getNumber2();
self.time = self.getNumber8();
self.data = self.getString();
break;
default:
throw new IllegalArgumentException();
}
return self;
} catch (Exception e) {
// Error returns
System.out.printf("E: malformed message '%d'\n", self.id);
self.destroy();
return null;
} finally {
if (frame != null)
frame.destroy();
}
}
/**
* Send the ZreLogMsg to the socket, and destroy it.
*
* @param socket The socket used to send this ZreLogMsg
*/
public boolean send(Socket socket) {
assert (socket != null);
// Calculate size of serialized data
int frameSize = 2 + 1; // Signature and message ID
switch (id) {
case LOG:
// level is a 1-byte integer
frameSize += 1;
// event is a 1-byte integer
frameSize += 1;
// node is a 2-byte integer
frameSize += 2;
// peer is a 2-byte integer
frameSize += 2;
// time is a 8-byte integer
frameSize += 8;
// data is a string with 1-byte length
frameSize++; // Size is one octet
if (data != null)
frameSize += data.length();
break;
default:
System.out.printf("E: bad message type '%d', not sent\n", id);
assert (false);
}
// Now serialize message into the frame
ZFrame frame = new ZFrame(new byte[frameSize]);
needle = ByteBuffer.wrap(frame.getData());
int frameFlags = 0;
putNumber2(0xAAA0 | 2);
putNumber1((byte) id);
switch (id) {
case LOG:
putNumber1(level);
putNumber1(event);
putNumber2(node);
putNumber2(peer);
putNumber8(time);
if (data != null)
putString(data);
else
putNumber1((byte) 0); // Empty string
break;
}
// If we're sending to a ROUTER, we send the address first
if (socket.getType() == ZMQ.ROUTER) {
assert (address != null);
if (!address.sendAndDestroy(socket, ZMQ.SNDMORE)) {
destroy();
return false;
}
}
// Now send the data frame
if (!frame.sendAndDestroy(socket, frameFlags)) {
frame.destroy();
destroy();
return false;
}
// Now send any frame fields, in order
switch (id) {
}
// Destroy ZreLogMsg object
destroy();
return true;
}
/**
* Send the LOG to the socket in one step.
*/
public static void sendLog(Socket output,
int level,
int event,
int node,
int peer,
long time,
String data) {
ZreLogMsg self = new ZreLogMsg(ZreLogMsg.LOG);
self.setLevel(level);
self.setEvent(event);
self.setNode(node);
self.setPeer(peer);
self.setTime(time);
self.setData(data);
self.send(output);
}
/**
* Duplicate the ZreLogMsg message.
*
* @param self The instance of ZreLogMsg to duplicate
*/
public ZreLogMsg dup(ZreLogMsg self) {
if (self == null)
return null;
ZreLogMsg copy = new ZreLogMsg(self.id);
if (self.address != null)
copy.address = self.address.duplicate();
switch (self.id) {
case LOG:
copy.level = self.level;
copy.event = self.event;
copy.node = self.node;
copy.peer = self.peer;
copy.time = self.time;
copy.data = self.data;
break;
}
return copy;
}
/**
* Print contents of message to stdout.
*/
public void dump() {
switch (id) {
case LOG:
System.out.println("LOG:");
System.out.printf(" level=%d\n",(long)level);
System.out.printf(" event=%d\n",(long)event);
System.out.printf(" node=%d\n",(long)node);
System.out.printf(" peer=%d\n",(long)peer);
System.out.printf(" time=%d\n",(long)time);
if (data != null)
System.out.printf(" data='%s'\n", data);
else
System.out.printf(" data=\n");
break;
}
}
/**
* Get the message address.
*
* @return The message address frame
*/
public ZFrame getAddress() {
return address;
}
/**
* Set the message address.
*
* @param address The new message address
*/
public void setAddress(ZFrame address) {
if (this.address != null)
this.address.destroy();
this.address = address.duplicate();
}
/**
* Get the ZreLogMsg id.
*
* @return The ZreLogMsg id
*/
public int getId() {
return id;
}
/**
* Set the ZreLogMsg id.
*
* @param id The new message id
*/
public void setId(int id) {
this.id = id;
}
/**
* Get the level field.
*
* @return The level field
*/
public int getLevel() {
return level;
}
/**
* Set the level field.
*
* @param level The level field
*/
public void setLevel(int level) {
this.level = level;
}
/**
* Get the event field.
*
* @return The event field
*/
public int getEvent() {
return event;
}
/**
* Set the event field.
*
* @param event The event field
*/
public void setEvent(int event) {
this.event = event;
}
/**
* Get the node field.
*
* @return The node field
*/
public int getNode() {
return node;
}
/**
* Set the node field.
*
* @param node The node field
*/
public void setNode(int node) {
this.node = node;
}
/**
* Get the peer field.
*
* @return The peer field
*/
public int getPeer() {
return peer;
}
/**
* Set the peer field.
*
* @param peer The peer field
*/
public void setPeer(int peer) {
this.peer = peer;
}
/**
* Get the time field.
*
* @return The time field
*/
public long getTime() {
return time;
}
/**
* Set the time field.
*
* @param time The time field
*/
public void setTime(long time) {
this.time = time;
}
/**
* Get the data field.
*
* @return The data field
*/
public String getData() {
return data;
}
/**
* Set the data field.
*
* @param data The data field
*/
public void setData(String format, Object... args) {
// Format into newly allocated string
data = String.format(format, args);
}
}
/* ============================================================================
* ZreMsg.java
*
* Generated codec class for ZreMsg
* ----------------------------------------------------------------------------
* Copyright (c) 1991-2012 iMatix Corporation -- http://www.imatix.com
* Copyright other contributors as noted in the AUTHORS file.
*
* This file is part of Zyre, an open-source framework for proximity-based
* peer-to-peer applications -- See http://zyre.org.
*
* This is free software; you can redistribute it and/or modify it under
* the terms of the GNU Lesser General Public License as published by the
* Free Software Foundation; either version 3 of the License, or (at your
* option) any later version.
*
* This software is distributed in the hope that it will be useful, but
* WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTA-
* BILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General
* Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this program. If not, see http://www.gnu.org/licenses/.
* ============================================================================
*/
package org.zyre;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.ArrayList;
import java.util.HashMap;
import java.nio.ByteBuffer;
import org.zeromq.ZFrame;
import org.zeromq.ZMQ;
import org.zeromq.ZMQ.Socket;
/**
* ZreMsg codec.
*
* The specification for this class is as follows:
* <pre class="text">
* HELLO - Greet a peer so it can connect back to us
* sequence number 2
* ipaddress string
* mailbox number 2
* groups strings
* status number 1
* headers dictionary
* WHISPER - Send a message to a peer
* sequence number 2
* content frame
* SHOUT - Send a message to a group
* sequence number 2
* group string
* content frame
* JOIN - Join a group
* sequence number 2
* group string
* status number 1
* LEAVE - Leave a group
* sequence number 2
* group string
* status number 1
* PING - Ping a peer that has gone silent
* sequence number 2
* PING_OK - Reply to a peer's ping
* sequence number 2
* </pre>
*
* @author sriesenberg
*/
public class ZreMsg {
public static final int ZRE_MSG_VERSION = 1;
public static final int HELLO = 1;
public static final int WHISPER = 2;
public static final int SHOUT = 3;
public static final int JOIN = 4;
public static final int LEAVE = 5;
public static final int PING = 6;
public static final int PING_OK = 7;
// Structure of our class
private ZFrame address; // Address of peer if any
private int id; // ZreMsg message ID
private ByteBuffer needle; // Read/write pointer for serialization
private int sequence;
private String ipaddress;
private int mailbox;
private List<String> groups;
private int status;
private Map<String, String> headers;
private int headersBytes;
private ZFrame content;
private String group;
/**
* Create a new ZreMsg.
*
* @param id The Message ID
*/
public ZreMsg(int id) {
this.id = id;
}
/**
* Destroy the ZreMsg.
*/
public void destroy() {
// Free class properties
if (address != null)
address.destroy();
address = null;
}
// --------------------------------------------------------------------------
// Network data encoding macros
// Put a 1-byte number to the frame
private final void putNumber1(int value) {
needle.put((byte) value);
}
// Get a 1-byte number to the frame
// then make it unsigned
private int getNumber1() {
int value = needle.get();
if (value < 0)
value =(0xff) & value;
return value;
}
// Put a 2-byte number to the frame
private final void putNumber2(int value) {
needle.putShort((short) value);
}
// Get a 2-byte number to the frame
private int getNumber2() {
int value = needle.getShort();
if (value < 0)
value =(0xffff) & value;
return value;
}
// Put a 4-byte number to the frame
private final void putNumber4(long value) {
needle.putInt((int) value);
}
// Get a 4-byte number to the frame
// then make it unsigned
private long getNumber4() {
long value = needle.getInt();
if (value < 0)
value =(0xffffffff) & value;
return value;
}
// Put a 8-byte number to the frame
public void putNumber8(long value) {
needle.putLong(value);
}
// Get a 8-byte number to the frame
public long getNumber8() {
return needle.getLong();
}
// Put a block to the frame
private void putBlock(byte[] value, int size) {
needle.put(value, 0, size);
}
private byte[] getBlock(int size) {
byte[] value = new byte[size];
needle.get(value);
return value;
}
// Put a string to the frame
public void putString(String value) {
needle.put((byte) value.length());
needle.put(value.getBytes());
}
// Get a string from the frame
public String getString() {
int size = getNumber1();
byte[] value = new byte[size];
needle.get(value);
return new String(value);
}
/**
* Receive and parse a ZreMsg from the socket. Returns new object or
* null if error. Will block if there's no message waiting.
*
* @param input The socket used to receive this ZreMsg
*/
public static ZreMsg recv(Socket input) {
assert (input != null);
ZreMsg self = new ZreMsg(0);
ZFrame frame = null;
try {
// Read valid message frame from socket; we loop over any
// garbage data we might receive from badly-connected peers
while (true) {
// If we're reading from a ROUTER socket, get address
if (input.getType() == ZMQ.ROUTER) {
self.address = ZFrame.recvFrame(input);
if (self.address == null)
return null; // Interrupted
if (!input.hasReceiveMore())
throw new IllegalArgumentException();
}
// Read and parse command in frame
frame = ZFrame.recvFrame(input);
if (frame == null)
return null; // Interrupted
// Get and check protocol signature
self.needle = ByteBuffer.wrap(frame.getData());
int signature = self.getNumber2();
if (signature ==(0xAAA0 | 1))
break; // Valid signature
// Protocol assertion, drop message
while (input.hasReceiveMore()) {
frame.destroy();
frame = ZFrame.recvFrame(input);
}
frame.destroy();
}
// Get message id, which is first byte in frame
self.id = self.getNumber1();
int listSize;
int hashSize;
switch (self.id) {
case HELLO:
self.sequence = self.getNumber2();
self.ipaddress = self.getString();
self.mailbox = self.getNumber2();
listSize = self.getNumber1();
self.groups = new ArrayList<String>();
while (listSize-- > 0) {
String string = self.getString();
self.groups.add(string);
}
self.status = self.getNumber1();
hashSize = self.getNumber1();
self.headers = new HashMap<String, String>();
while (hashSize-- > 0) {
String string = self.getString();
String[] kv = string.split("=");
self.headers.put(kv[0], kv[1]);
}
break;
case WHISPER:
self.sequence = self.getNumber2();
// Get next frame, leave current untouched
if (!input.hasReceiveMore())
throw new IllegalArgumentException();
self.content = ZFrame.recvFrame(input);
break;
case SHOUT:
self.sequence = self.getNumber2();
self.group = self.getString();
// Get next frame, leave current untouched
if (!input.hasReceiveMore())
throw new IllegalArgumentException();
self.content = ZFrame.recvFrame(input);
break;
case JOIN:
self.sequence = self.getNumber2();
self.group = self.getString();
self.status = self.getNumber1();
break;
case LEAVE:
self.sequence = self.getNumber2();
self.group = self.getString();
self.status = self.getNumber1();
break;
case PING:
self.sequence = self.getNumber2();
break;
case PING_OK:
self.sequence = self.getNumber2();
break;
default:
throw new IllegalArgumentException();
}
return self;
} catch (Exception e) {
// Error returns
System.out.printf("E: malformed message '%d'\n", self.id);
self.destroy();
return null;
} finally {
if (frame != null)
frame.destroy();
}
}
// Count size of key=value pair
private static void headersCount(final Map.Entry<String, String> entry, ZreMsg self) {
self.headersBytes += entry.getKey().length() + 1 + entry.getValue().length() + 1;
}
// Serialize headers key=value pair
private static void headersWrite(final Map.Entry<String, String> entry, ZreMsg self) {
String string = entry.getKey() + "=" + entry.getValue();
self.putString(string);
}
/**
* Send the ZreMsg to the socket, and destroy it.
*
* @param socket The socket used to send this ZreMsg
*/
public boolean send(Socket socket) {
assert (socket != null);
// Calculate size of serialized data
int frameSize = 2 + 1; // Signature and message ID
switch (id) {
case HELLO:
// sequence is a 2-byte integer
frameSize += 2;
// ipaddress is a string with 1-byte length
frameSize++; // Size is one octet
if (ipaddress != null)
frameSize += ipaddress.length();
// mailbox is a 2-byte integer
frameSize += 2;
// groups is an array of strings
frameSize++; // Size is one octet
if (groups != null) {
for (String value : groups)
frameSize += 1 + value.length();
}
// status is a 1-byte integer
frameSize += 1;
// headers is an array of key=value strings
frameSize++; // Size is one octet
if (headers != null) {
headersBytes = 0;
for (Map.Entry<String, String> entry: headers.entrySet()) {
headersCount(entry, this);
}
frameSize += headersBytes;
}
break;
case WHISPER:
// sequence is a 2-byte integer
frameSize += 2;
break;
case SHOUT:
// sequence is a 2-byte integer
frameSize += 2;
// group is a string with 1-byte length
frameSize++; // Size is one octet
if (group != null)
frameSize += group.length();
break;
case JOIN:
// sequence is a 2-byte integer
frameSize += 2;
// group is a string with 1-byte length
frameSize++; // Size is one octet
if (group != null)
frameSize += group.length();
// status is a 1-byte integer
frameSize += 1;
break;
case LEAVE:
// sequence is a 2-byte integer
frameSize += 2;
// group is a string with 1-byte length
frameSize++; // Size is one octet
if (group != null)
frameSize += group.length();
// status is a 1-byte integer
frameSize += 1;
break;
case PING:
// sequence is a 2-byte integer
frameSize += 2;
break;
case PING_OK:
// sequence is a 2-byte integer
frameSize += 2;
break;
default:
System.out.printf("E: bad message type '%d', not sent\n", id);
assert (false);
}
// Now serialize message into the frame
ZFrame frame = new ZFrame(new byte[frameSize]);
needle = ByteBuffer.wrap(frame.getData());
int frameFlags = 0;
putNumber2(0xAAA0 | 1);
putNumber1((byte) id);
switch (id) {
case HELLO:
putNumber2(sequence);
if (ipaddress != null)
putString(ipaddress);
else
putNumber1((byte) 0); // Empty string
putNumber2(mailbox);
if (groups != null) {
putNumber1((byte) groups.size());
for (String value : groups) {
putString(value);
}
}
else
putNumber1((byte) 0); // Empty string array
putNumber1(status);
if (headers != null) {
putNumber1((byte) headers.size());
for (Map.Entry<String, String> entry: headers.entrySet()) {
headersWrite(entry, this);
}
}
else
putNumber1((byte) 0); // Empty dictionary
break;
case WHISPER:
putNumber2(sequence);
frameFlags = ZMQ.SNDMORE;
break;
case SHOUT:
putNumber2(sequence);
if (group != null)
putString(group);
else
putNumber1((byte) 0); // Empty string
frameFlags = ZMQ.SNDMORE;
break;
case JOIN:
putNumber2(sequence);
if (group != null)
putString(group);
else
putNumber1((byte) 0); // Empty string
putNumber1(status);
break;
case LEAVE:
putNumber2(sequence);
if (group != null)
putString(group);
else
putNumber1((byte) 0); // Empty string
putNumber1(status);
break;
case PING:
putNumber2(sequence);
break;
case PING_OK:
putNumber2(sequence);
break;
}
// If we're sending to a ROUTER, we send the address first
if (socket.getType() == ZMQ.ROUTER) {
assert (address != null);
if (!address.sendAndDestroy(socket, ZMQ.SNDMORE)) {
destroy();
return false;
}
}
// Now send the data frame
if (!frame.sendAndDestroy(socket, frameFlags)) {
frame.destroy();
destroy();
return false;
}
// Now send any frame fields, in order
switch (id) {
case WHISPER:
// If content isn't set, send an empty frame
if (content == null)
content = new ZFrame("".getBytes());
if (!content.sendAndDestroy(socket, 0)) {
frame.destroy();
destroy();
return false;
}
break;
case SHOUT:
// If content isn't set, send an empty frame
if (content == null)
content = new ZFrame("".getBytes());
if (!content.sendAndDestroy(socket, 0)) {
frame.destroy();
destroy();
return false;
}
break;
}
// Destroy ZreMsg object
destroy();
return true;
}
/**
* Send the HELLO to the socket in one step.
*/
public static void sendHello(Socket output,
int sequence,
String ipaddress,
int mailbox,
Collection<String> groups,
int status,
Map<String, String> headers) {
ZreMsg self = new ZreMsg(ZreMsg.HELLO);
self.setSequence(sequence);
self.setIpaddress(ipaddress);
self.setMailbox(mailbox);
self.setGroups(new ArrayList<String>(groups));
self.setStatus(status);
self.setHeaders(new HashMap<String, String>(headers));
self.send(output);
}
/**
* Send the WHISPER to the socket in one step.
*/
public static void sendWhisper(Socket output,
int sequence,
ZFrame content) {
ZreMsg self = new ZreMsg(ZreMsg.WHISPER);
self.setSequence(sequence);
self.setContent(content.duplicate());
self.send(output);
}
/**
* Send the SHOUT to the socket in one step.
*/
public static void sendShout(Socket output,
int sequence,
String group,
ZFrame content) {
ZreMsg self = new ZreMsg(ZreMsg.SHOUT);
self.setSequence(sequence);
self.setGroup(group);
self.setContent(content.duplicate());
self.send(output);
}
/**
* Send the JOIN to the socket in one step.
*/
public static void sendJoin(Socket output,
int sequence,
String group,
int status) {
ZreMsg self = new ZreMsg(ZreMsg.JOIN);
self.setSequence(sequence);
self.setGroup(group);
self.setStatus(status);
self.send(output);
}
/**
* Send the LEAVE to the socket in one step.
*/
public static void sendLeave(Socket output,
int sequence,
String group,
int status) {
ZreMsg self = new ZreMsg(ZreMsg.LEAVE);
self.setSequence(sequence);
self.setGroup(group);
self.setStatus(status);
self.send(output);
}
/**
* Send the PING to the socket in one step.
*/
public static void sendPing(Socket output,
int sequence) {
ZreMsg self = new ZreMsg(ZreMsg.PING);
self.setSequence(sequence);
self.send(output);
}
/**
* Send the PING_OK to the socket in one step.
*/
public static void sendPing_Ok(Socket output,
int sequence) {
ZreMsg self = new ZreMsg(ZreMsg.PING_OK);
self.setSequence(sequence);
self.send(output);
}
/**
* Duplicate the ZreMsg message.
*
* @param self The instance of ZreMsg to duplicate
*/
public ZreMsg dup(ZreMsg self) {
if (self == null)
return null;
ZreMsg copy = new ZreMsg(self.id);
if (self.address != null)
copy.address = self.address.duplicate();
switch (self.id) {
case HELLO:
copy.sequence = self.sequence;
copy.ipaddress = self.ipaddress;
copy.mailbox = self.mailbox;
copy.groups = new ArrayList<String>(self.groups);
copy.status = self.status;
copy.headers = new HashMap<String, String>(self.headers);
break;
case WHISPER:
copy.sequence = self.sequence;
copy.content = self.content.duplicate();
break;
case SHOUT:
copy.sequence = self.sequence;
copy.group = self.group;
copy.content = self.content.duplicate();
break;
case JOIN:
copy.sequence = self.sequence;
copy.group = self.group;
copy.status = self.status;
break;
case LEAVE:
copy.sequence = self.sequence;
copy.group = self.group;
copy.status = self.status;
break;
case PING:
copy.sequence = self.sequence;
break;
case PING_OK:
copy.sequence = self.sequence;
break;
}
return copy;
}
/**
* Dump headers key=value pair to stdout.
*
* @param entry The entry to dump
* @param self The ZreMsg instance
*/
public static void headersDump(Map.Entry<String, String> entry, ZreMsg self) {
System.out.printf(" %s=%s\n", entry.getKey(), entry.getValue());
}
/**
* Print contents of message to stdout.
*/
public void dump() {
switch (id) {
case HELLO:
System.out.println("HELLO:");
System.out.printf(" sequence=%d\n",(long)sequence);
if (ipaddress != null)
System.out.printf(" ipaddress='%s'\n", ipaddress);
else
System.out.printf(" ipaddress=\n");
System.out.printf(" mailbox=%d\n",(long)mailbox);
System.out.printf(" groups={");
if (groups != null) {
for (String value : groups) {
System.out.printf(" '%s'", value);
}
}
System.out.printf(" }\n");
System.out.printf(" status=%d\n",(long)status);
System.out.printf(" headers={\n");
if (headers != null) {
for (Map.Entry<String, String> entry : headers.entrySet())
headersDump(entry, this);
}
System.out.printf(" }\n");
break;
case WHISPER:
System.out.println("WHISPER:");
System.out.printf(" sequence=%d\n",(long)sequence);
System.out.printf(" content={\n");
if (content != null) {
int size = content.size();
byte[] data = content.getData();
System.out.printf(" size=%d\n", content.size());
if (size > 32)
size = 32;
int contentIndex;
for (contentIndex = 0; contentIndex < size; contentIndex++) {
if (contentIndex != 0 &&(contentIndex % 4 == 0))
System.out.printf("-");
System.out.printf("%02X", data[contentIndex]);
}
}
System.out.printf(" }\n");
break;
case SHOUT:
System.out.println("SHOUT:");
System.out.printf(" sequence=%d\n",(long)sequence);
if (group != null)
System.out.printf(" group='%s'\n", group);
else
System.out.printf(" group=\n");
System.out.printf(" content={\n");
if (content != null) {
int size = content.size();
byte[] data = content.getData();
System.out.printf(" size=%d\n", content.size());
if (size > 32)
size = 32;
int contentIndex;
for (contentIndex = 0; contentIndex < size; contentIndex++) {
if (contentIndex != 0 &&(contentIndex % 4 == 0))
System.out.printf("-");
System.out.printf("%02X", data[contentIndex]);
}
}
System.out.printf(" }\n");
break;
case JOIN:
System.out.println("JOIN:");
System.out.printf(" sequence=%d\n",(long)sequence);
if (group != null)
System.out.printf(" group='%s'\n", group);
else
System.out.printf(" group=\n");
System.out.printf(" status=%d\n",(long)status);
break;
case LEAVE:
System.out.println("LEAVE:");
System.out.printf(" sequence=%d\n",(long)sequence);
if (group != null)
System.out.printf(" group='%s'\n", group);
else
System.out.printf(" group=\n");
System.out.printf(" status=%d\n",(long)status);
break;
case PING:
System.out.println("PING:");
System.out.printf(" sequence=%d\n",(long)sequence);
break;
case PING_OK:
System.out.println("PING_OK:");
System.out.printf(" sequence=%d\n",(long)sequence);
break;
}
}
/**
* Get the message address.
*
* @return The message address frame
*/
public ZFrame getAddress() {
return address;
}
/**
* Set the message address.
*
* @param address The new message address
*/
public void setAddress(ZFrame address) {
if (this.address != null)
this.address.destroy();
this.address = address.duplicate();
}
/**
* Get the ZreMsg id.
*
* @return The ZreMsg id
*/
public int getId() {
return id;
}
/**
* Set the ZreMsg id.
*
* @param id The new message id
*/
public void setId(int id) {
this.id = id;
}
/**
* Get the sequence field.
*
* @return The sequence field
*/
public int getSequence() {
return sequence;
}
/**
* Set the sequence field.
*
* @param sequence The sequence field
*/
public void setSequence(int sequence) {
this.sequence = sequence;
}
/**
* Get the ipaddress field.
*
* @return The ipaddress field
*/
public String getIpaddress() {
return ipaddress;
}
/**
* Set the ipaddress field.
*
* @param ipaddress The ipaddress field
*/
public void setIpaddress(String format, Object... args) {
// Format into newly allocated string
ipaddress = String.format(format, args);
}
/**
* Get the mailbox field.
*
* @return The mailbox field
*/
public int getMailbox() {
return mailbox;
}
/**
* Set the mailbox field.
*
* @param mailbox The mailbox field
*/
public void setMailbox(int mailbox) {
this.mailbox = mailbox;
}
/**
* Get the list of groups strings.
*
* @return The groups strings
*/
public List<String> getGroups() {
return groups;
}
/**
* Iterate through the groups field, and append a groups value.
*
* @param format The string format
* @param args The arguments used to build the string
*/
public void addGroup(String format, Object... args) {
// Format into newly allocated string
String string = String.format(format, args);
// Attach string to list
if (groups == null)
groups = new ArrayList<String>();
groups.add(string);
}
/**
* Set the list of groups strings.
*
* @param value The collection of strings
*/
public void setGroups(Collection<String> value) {
groups = new ArrayList(value);
}
/**
* Get the status field.
*
* @return The status field
*/
public int getStatus() {
return status;
}
/**
* Set the status field.
*
* @param status The status field
*/
public void setStatus(int status) {
this.status = status;
}
/**
* Get the the headers dictionary.
*
* @return The headers dictionary
*/
public Map<String, String> getHeaders() {
return headers;
}
/**
* Get a value in the headers dictionary as a string.
*
* @param key The dictionary key
* @param defaultValue the default value if the key does not exist
*/
public String getHeaderString(String key, String defaultValue) {
String value = null;
if (headers != null)
value = headers.get(key);
if (value == null)
value = defaultValue;
return value;
}
/**
* Get a value in the headers dictionary as a long.
*
* @param key The dictionary key
* @param defaultValue the default value if the key does not exist
*/
public long getHeaderNumber(String key, long defaultValue) {
long value = defaultValue;
String string = null;
if (headers != null)
string = headers.get(key);
if (string != null)
value = Long.valueOf(string);
return value;
}
/**
* Set a value in the headers dictionary.
*
* @param key The dictionary key
* @param format The string format
* @param args The arguments used to build the string
*/
public void putHeader(String key, String format, Object... args) {
// Format string into buffer
String string = String.format(format, args);
// Store string in hash table
if (headers == null)
headers = new HashMap<String, String>();
headers.put(key, string);
headersBytes += key.length() + 1 + string.length();
}
/**
* Set the headers dictionary.
*
* @param value The new headers dictionary
*/
public void setHeaders(Map<String, String> value) {
if (value != null)
headers = new HashMap<String, String>(value);
else
headers = value;
}
/**
* Get the content field.
*
* @return The content field
*/
public ZFrame getContent() {
return content;
}
/**
* Set the content field, and takes ownership of supplied frame.
*
* @param frame The new content frame
*/
public void setContent(ZFrame frame) {
if (content != null)
content.destroy();
content = frame;
}
/**
* Get the group field.
*
* @return The group field
*/
public String getGroup() {
return group;
}
/**
* Set the group field.
*
* @param group The group field
*/
public void setGroup(String format, Object... args) {
// Format into newly allocated string
group = String.format(format, args);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment