Skip to content

Instantly share code, notes, and snippets.

@majek
Created March 21, 2012 14:48
Show Gist options
  • Save majek/2147792 to your computer and use it in GitHub Desktop.
Save majek/2147792 to your computer and use it in GitHub Desktop.
spring-integration sockjs vs master
diff --git a/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/AbstractClientConnectionFactory.java b/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/AbstractClientConnectionFactory.java
index 7b4252f..87e462d 100644
--- a/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/AbstractClientConnectionFactory.java
+++ b/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/AbstractClientConnectionFactory.java
@@ -57,6 +57,20 @@ public abstract class AbstractClientConnectionFactory extends AbstractConnection
}
}
+ /**
+ * Get a dedicated long-lived connection - regardless of single-use
+ * setting. Caller is responsible to close.
+ * @return
+ * @throws Exception
+ */
+ public abstract TcpConnection getNewConnection() throws Exception;
+
+
+ /**
+ * Gets the single or new connection.
+ * @return
+ * @throws Exception
+ */
protected abstract TcpConnection getOrMakeConnection() throws Exception;
/**
diff --git a/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/AbstractTcpConnectionInterceptor.java b/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/AbstractTcpConnectionInterceptor.java
index fa22449..4c1cf52 100644
--- a/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/AbstractTcpConnectionInterceptor.java
+++ b/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/AbstractTcpConnectionInterceptor.java
@@ -16,6 +16,9 @@
package org.springframework.integration.ip.tcp.connection;
+import java.io.IOException;
+import java.io.InputStream;
+
import org.springframework.core.serializer.Deserializer;
import org.springframework.core.serializer.Serializer;
import org.springframework.integration.Message;
@@ -177,4 +180,8 @@ public abstract class AbstractTcpConnectionInterceptor implements TcpConnectionI
return this.realSender;
}
+ public InputStream getInputStream() throws IOException {
+ return theConnection.getInputStream();
+ }
+
}
diff --git a/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/TcpConnection.java b/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/TcpConnection.java
index ebc9f79..34891c1 100644
--- a/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/TcpConnection.java
+++ b/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/TcpConnection.java
@@ -16,6 +16,8 @@
package org.springframework.integration.ip.tcp.connection;
+import java.io.IOException;
+import java.io.InputStream;
import java.net.Socket;
import java.nio.channels.SocketChannel;
@@ -148,5 +150,10 @@ public interface TcpConnection extends Runnable {
* @return the next sequence number for a message received on this socket
*/
long incrementAndGetConnectionSequence();
-
+
+ /**
+ * @return the Inputstream
+ * @throws IOException
+ */
+ InputStream getInputStream() throws IOException;
}
diff --git a/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/TcpNetClientConnectionFactory.java b/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/TcpNetClientConnectionFactory.java
index 66ea8ec..9c825c9 100644
--- a/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/TcpNetClientConnectionFactory.java
+++ b/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/TcpNetClientConnectionFactory.java
@@ -50,6 +50,11 @@ public class TcpNetClientConnectionFactory extends
if (theConnection != null && theConnection.isOpen()) {
return theConnection;
}
+ return getNewConnection();
+ }
+
+ @Override
+ public TcpConnection getNewConnection() throws Exception {
if (logger.isDebugEnabled()) {
logger.debug("Opening new socket connection to " + this.getHost() + ":" + this.getPort());
}
diff --git a/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/TcpNetConnection.java b/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/TcpNetConnection.java
index e2a877e..0ea7b7a 100644
--- a/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/TcpNetConnection.java
+++ b/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/TcpNetConnection.java
@@ -16,6 +16,8 @@
package org.springframework.integration.ip.tcp.connection;
+import java.io.IOException;
+import java.io.InputStream;
import java.net.Socket;
import java.net.SocketTimeoutException;
@@ -132,6 +134,7 @@ public class TcpNetConnection extends AbstractTcpConnection {
logger.debug("Message received " + message);
}
try {
+ listener = this.getListener();
if (listener == null) {
logger.warn("Unexpected message - no inbound adapter registered with connection " + message);
continue;
@@ -163,4 +166,11 @@ public class TcpNetConnection extends AbstractTcpConnection {
}
}
+ public InputStream getInputStream() throws IOException {
+ if (this.socket != null) {
+ return socket.getInputStream();
+ }
+ return null;
+ }
+
}
diff --git a/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/TcpNioClientConnectionFactory.java b/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/TcpNioClientConnectionFactory.java
index ede4db7..2b91e48 100644
--- a/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/TcpNioClientConnectionFactory.java
+++ b/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/TcpNioClientConnectionFactory.java
@@ -64,6 +64,15 @@ public class TcpNioClientConnectionFactory extends
* @throws SocketException
*/
protected TcpConnection getOrMakeConnection() throws Exception {
+ TcpConnection theConnection = this.getTheConnection();
+ if (theConnection != null && theConnection.isOpen()) {
+ return theConnection;
+ }
+ return getNewConnection();
+ }
+
+ @Override
+ public TcpConnection getNewConnection() throws Exception {
int n = 0;
while (this.selector == null) {
try {
@@ -75,10 +84,6 @@ public class TcpNioClientConnectionFactory extends
throw new Exception("Factory failed to start");
}
}
- TcpConnection theConnection = this.getTheConnection();
- if (theConnection != null && theConnection.isOpen()) {
- return theConnection;
- }
if (logger.isDebugEnabled()) {
logger.debug("Opening new socket channel connection to " + this.getHost() + ":" + this.getPort());
}
@@ -94,8 +99,8 @@ public class TcpNioClientConnectionFactory extends
connection.setLastRead(System.currentTimeMillis());
}
this.connections.put(socketChannel, connection);
- newChannels.add(socketChannel);
- selector.wakeup();
+ this.newChannels.add(socketChannel);
+ this.selector.wakeup();
return wrappedConnection;
}
diff --git a/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/TcpNioConnection.java b/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/TcpNioConnection.java
index fe1b94a..d50c5ba 100644
--- a/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/TcpNioConnection.java
+++ b/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/TcpNioConnection.java
@@ -17,6 +17,7 @@
package org.springframework.integration.ip.tcp.connection;
import java.io.IOException;
+import java.io.InputStream;
import java.io.OutputStream;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
@@ -358,6 +359,10 @@ public class TcpNioConnection extends AbstractTcpConnection {
this.lastRead = lastRead;
}
+ public InputStream getInputStream() throws IOException {
+ return pipedInputStream;
+ }
+
/**
* OutputStream to wrap a SocketChannel; implements timeout on write.
*
diff --git a/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/serializer/AbstractByteArraySerializer.java b/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/serializer/AbstractByteArraySerializer.java
index f630962..83bdcbb 100644
--- a/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/serializer/AbstractByteArraySerializer.java
+++ b/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/serializer/AbstractByteArraySerializer.java
@@ -64,4 +64,20 @@ public abstract class AbstractByteArraySerializer implements
}
}
+ /**
+ * Copy size bytes to a new buffer exactly size bytes long.
+ * @param buffer The buffer containing the data.
+ * @param size The number of bytes to copy.
+ * @return The new buffer, or the buffer parameter if it is
+ * already the correct size.
+ */
+ protected byte[] copyToSizedArray(byte[] buffer, int size) {
+ if (size == buffer.length) {
+ return buffer;
+ }
+ byte[] assembledData = new byte[size];
+ System.arraycopy(buffer, 0, assembledData, 0, size);
+ return assembledData;
+ }
+
}
diff --git a/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/serializer/ByteArrayCrLfSerializer.java b/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/serializer/ByteArrayCrLfSerializer.java
index 596e2ef..8b015ca 100644
--- a/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/serializer/ByteArrayCrLfSerializer.java
+++ b/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/serializer/ByteArrayCrLfSerializer.java
@@ -38,6 +38,13 @@ public class ByteArrayCrLfSerializer extends AbstractByteArraySerializer {
*/
public byte[] deserialize(InputStream inputStream) throws IOException {
byte[] buffer = new byte[this.maxMessageSize];
+ int n = this.fillToCrLf(inputStream, buffer);
+ byte[] assembledData = this.copyToSizedArray(buffer, n);
+ return assembledData;
+ }
+
+ public int fillToCrLf(InputStream inputStream, byte[] buffer)
+ throws IOException, SoftEndOfStreamException {
int n = 0;
int bite;
if (logger.isDebugEnabled()) {
@@ -59,9 +66,7 @@ public class ByteArrayCrLfSerializer extends AbstractByteArraySerializer {
+ this.maxMessageSize);
}
};
- byte[] assembledData = new byte[n-1];
- System.arraycopy(buffer, 0, assembledData, 0, n-1);
- return assembledData;
+ return n-1; // trim \r
}
/**
diff --git a/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/serializer/ByteArrayRawSerializer.java b/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/serializer/ByteArrayRawSerializer.java
index 97a2c3e..82df9f4 100644
--- a/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/serializer/ByteArrayRawSerializer.java
+++ b/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/serializer/ByteArrayRawSerializer.java
@@ -61,8 +61,7 @@ public class ByteArrayRawSerializer extends AbstractByteArraySerializer {
+ this.maxMessageSize);
}
};
- byte[] assembledData = new byte[n];
- System.arraycopy(buffer, 0, assembledData, 0, n);
+ byte[] assembledData = this.copyToSizedArray(buffer, n);
return assembledData;
}
diff --git a/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/serializer/ByteArrayStxEtxSerializer.java b/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/serializer/ByteArrayStxEtxSerializer.java
index 429a393..70d23ca 100644
--- a/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/serializer/ByteArrayStxEtxSerializer.java
+++ b/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/serializer/ByteArrayStxEtxSerializer.java
@@ -62,8 +62,7 @@ public class ByteArrayStxEtxSerializer extends AbstractByteArraySerializer {
+ this.maxMessageSize);
}
}
- byte[] assembledData = new byte[n];
- System.arraycopy(buffer, 0, assembledData, 0, n);
+ byte[] assembledData = this.copyToSizedArray(buffer, n);
return assembledData;
}
diff --git a/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/serializer/StatefulDeserializer.java b/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/serializer/StatefulDeserializer.java
new file mode 100644
index 0000000..d66f0a5
--- /dev/null
+++ b/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/serializer/StatefulDeserializer.java
@@ -0,0 +1,30 @@
+/*
+ * Copyright 2002-2012 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.springframework.integration.ip.tcp.serializer;
+
+import java.io.InputStream;
+
+import org.springframework.core.serializer.Deserializer;
+
+/**
+ * @author Gary Russell
+ * @since 2.2
+ *
+ */
+public interface StatefulDeserializer<T> extends Deserializer<T> {
+
+ void removeState(InputStream inputStream);
+}
diff --git a/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/sockjs/SockJsCallback.java b/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/sockjs/SockJsCallback.java
new file mode 100644
index 0000000..f46f998
--- /dev/null
+++ b/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/sockjs/SockJsCallback.java
@@ -0,0 +1,33 @@
+/*
+ * Copyright 2002-2012 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.springframework.integration.ip.tcp.sockjs;
+
+import org.springframework.integration.ip.tcp.sockjs.support.SockJsFrame;
+
+/**
+ * @author Gary Russell
+ * @since 2.1
+ *
+ */
+public interface SockJsCallback {
+
+ void data(SockJsFrame frame, String uuid);
+
+ void control(SockJsFrame frame, String uuid);
+
+ void closed(String uuid);
+
+}
diff --git a/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/sockjs/SockJsContext.java b/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/sockjs/SockJsContext.java
new file mode 100644
index 0000000..6190895
--- /dev/null
+++ b/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/sockjs/SockJsContext.java
@@ -0,0 +1,45 @@
+/*
+ * Copyright 2002-2012 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.springframework.integration.ip.tcp.sockjs;
+
+/**
+ * @author Gary Russell
+ * @since 2.2
+ *
+ */
+public class SockJsContext {
+
+ private volatile String cookies;
+
+ private final String uuid;
+
+ public SockJsContext(String uuid) {
+ this.uuid = uuid;
+ }
+
+ void setCookies(String cookies) {
+ this.cookies = cookies;
+ }
+
+ String getCookies() {
+ return cookies;
+ }
+
+ String getUuid() {
+ return uuid;
+ }
+
+}
diff --git a/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/sockjs/SockJsOperations.java b/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/sockjs/SockJsOperations.java
new file mode 100644
index 0000000..54f5e1f
--- /dev/null
+++ b/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/sockjs/SockJsOperations.java
@@ -0,0 +1,33 @@
+/*
+ * Copyright 2002-2012 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.springframework.integration.ip.tcp.sockjs;
+
+import org.springframework.integration.ip.tcp.sockjs.support.SockJsFrame;
+
+/**
+ * @author Gary Russell
+ * @since 2.2
+ *
+ */
+public interface SockJsOperations {
+
+ public abstract SockJsContext startStream(String baseResource,
+ final SockJsCallback callback);
+
+ public abstract SockJsFrame sendXhr(String baseResource, String uuid,
+ String data, SockJsContext context);
+
+}
\ No newline at end of file
diff --git a/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/sockjs/SockJsTemplate.java b/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/sockjs/SockJsTemplate.java
new file mode 100644
index 0000000..234adc9
--- /dev/null
+++ b/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/sockjs/SockJsTemplate.java
@@ -0,0 +1,176 @@
+/*
+ * Copyright 2002-2012 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.springframework.integration.ip.tcp.sockjs;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.springframework.core.serializer.Deserializer;
+import org.springframework.integration.Message;
+import org.springframework.integration.ip.tcp.connection.AbstractClientConnectionFactory;
+import org.springframework.integration.ip.tcp.connection.TcpConnection;
+import org.springframework.integration.ip.tcp.connection.TcpListener;
+import org.springframework.integration.ip.tcp.connection.TcpSender;
+import org.springframework.integration.ip.tcp.serializer.StatefulDeserializer;
+import org.springframework.integration.ip.tcp.sockjs.support.SockJsFrame;
+import org.springframework.integration.message.GenericMessage;
+import org.springframework.util.Assert;
+
+/**
+ * @author Gary Russell
+ * @since 2.2
+ *
+ */
+public class SockJsTemplate implements TcpListener, SockJsOperations {
+
+ private final Log logger = LogFactory.getLog(this.getClass());
+
+ private final AbstractClientConnectionFactory connectionFactory;
+
+ private boolean gzipping;
+
+ public SockJsTemplate(AbstractClientConnectionFactory connectionFactory) {
+ this.connectionFactory = connectionFactory;
+ this.connectionFactory.registerListener(this);
+ }
+
+ boolean isGzipping() {
+ return gzipping;
+ }
+
+ void setGzipping(boolean gzipping) {
+ this.gzipping = gzipping;
+ }
+
+ public SockJsContext startStream(String baseResource, final SockJsCallback callback) {
+ final String uuid = UUID.randomUUID().toString();
+ SockJsContext sockJsContext = new SockJsContext(uuid);
+ try {
+ TcpConnection connection = this.connectionFactory.getNewConnection();
+ registerListener(connection, callback, uuid, sockJsContext);
+ registerSender(connection, callback, uuid);
+ connection.send(new GenericMessage<String>(
+ "POST " + baseResource + "/" + uuid + "/xhr_streaming HTTP/1.1\r\n" +
+ "Host: " + this.connectionFactory.getHost() + "\r\n" +
+ "Connection: keep-alive\r\n" +
+ "Accept-Encoding: " + (this.gzipping ? "gzip" : "identity") + "\r\n" +
+ "Content-Length: 0\r\n" +
+ "\r\n"));
+ }
+ catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ return sockJsContext;
+ }
+
+ private void registerSender(TcpConnection connection,
+ final SockJsCallback callback, final String uuid) {
+ connection.registerSender(new TcpSender() {
+
+ public void addNewConnection(TcpConnection connection) {
+ }
+
+ public void removeDeadConnection(TcpConnection connection) {
+ callback.closed(uuid);
+ Deserializer<?> deserializer = connectionFactory.getDeserializer();
+ if (deserializer instanceof StatefulDeserializer) {
+ try {
+ ((StatefulDeserializer<?>) deserializer).removeState(connection.getInputStream());
+ } catch (IOException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+ }
+ });
+ }
+
+ private void registerListener(final TcpConnection connection,
+ final SockJsCallback callback, final String uuid, final SockJsContext sockJsContext) {
+ connection.registerListener(new TcpListener() {
+
+ @SuppressWarnings("unchecked")
+ public boolean onMessage(Message<?> message) {
+ for (SockJsFrame frame : (List<SockJsFrame>) message.getPayload()) {
+ switch (frame.getType()) {
+ case SockJsFrame.TYPE_COOKIES:
+ sockJsContext.setCookies(frame.getPayload());
+ break;
+ case SockJsFrame.TYPE_CLOSE:
+ connection.close();
+ callback.closed(uuid);
+ break;
+ case SockJsFrame.TYPE_HEADERS:
+ case SockJsFrame.TYPE_HEARTBEAT:
+ case SockJsFrame.TYPE_OPEN:
+ case SockJsFrame.TYPE_PING:
+ case SockJsFrame.TYPE_PONG:
+ case SockJsFrame.TYPE_PRELUDE:
+ case SockJsFrame.TYPE_UNEXPECTED:
+ callback.control(frame, uuid);
+ break;
+ case SockJsFrame.TYPE_DATA:
+ callback.data(frame, uuid);
+ }
+ }
+ return false;
+ }
+ });
+ }
+
+ public boolean onMessage(Message<?> message) {
+ logger.error("Should not be called");
+ return false;
+ }
+
+ public SockJsFrame sendXhr(String baseResource, String uuid, String data, SockJsContext context) {
+ try {
+ TcpConnection connection = this.connectionFactory.getNewConnection();
+ final BlockingQueue<SockJsFrame> reply = new LinkedBlockingQueue<SockJsFrame>();
+ connection.registerListener(new TcpListener() {
+ public boolean onMessage(Message<?> message) {
+ @SuppressWarnings("unchecked")
+ SockJsFrame frame = (SockJsFrame) ((List<SockJsFrame>) message.getPayload()).get(0);
+ Assert.isTrue(frame.getType() == SockJsFrame.TYPE_HEADERS);
+ reply.offer(frame);
+ return false;
+ }
+ });
+ connection.send(new GenericMessage<String>(
+ "POST " + baseResource + "/" + uuid + "/xhr_send HTTP/1.1\r\n" +
+ "Host: " + this.connectionFactory.getHost() + "\r\n" +
+ "Accept-Encoding: identity\r\n" +
+ context.getCookies() + "\r\n" +
+ "Content-Length: " + data.length() + "\r\n" +
+ "\r\n" +
+ data));
+ SockJsFrame frame = reply.poll(10, TimeUnit.SECONDS);
+ connection.close();
+ return frame;
+ }
+ catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+
+}
diff --git a/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/sockjs/SockJsWebSocketClient.java b/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/sockjs/SockJsWebSocketClient.java
new file mode 100644
index 0000000..7c7f1b5
--- /dev/null
+++ b/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/sockjs/SockJsWebSocketClient.java
@@ -0,0 +1,118 @@
+/*
+ * Copyright 2002-2012 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.springframework.integration.ip.tcp.sockjs;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.net.Socket;
+import java.util.concurrent.Executors;
+
+import javax.net.SocketFactory;
+
+import org.springframework.integration.ip.tcp.sockjs.serializer.WebSocketSerializer;
+import org.springframework.integration.ip.tcp.sockjs.support.SockJsFrame;
+
+/**
+ * @author Gary Russell
+ * @since 2.2
+ *
+ */
+public class SockJsWebSocketClient {
+
+ private WebSocketSerializer serializer = new WebSocketSerializer();
+
+ public static void main(String[] args) throws Exception {
+ new SockJsWebSocketClient().start();
+ }
+
+ public void start() throws Exception {
+ String init =
+ "GET /echo/517/p8e90wok/websocket HTTP/1.1\r\n" +
+ "Upgrade: websocket\r\n" +
+ "Connection: Upgrade\r\n" +
+ "Host: localhost:9999\r\n" +
+ "Origin: http://localhost:9999\r\n" +
+ "Sec-WebSocket-Key: nGahJFI1wv1Vn/QW5TdFvg==\r\n" +
+ "Sec-WebSocket-Version: 13\r\n\r\n";
+ Socket sock = SocketFactory.getDefault().createSocket("localhost", 8081);
+ sock.getOutputStream().write(init.getBytes());
+ handleUpgrade(sock);
+ Executors.newSingleThreadExecutor().execute(new SocksJSWebSocketReader(sock));
+ while(true) {
+ Thread.sleep(10000);
+ send(sock, "Hello, world!");
+ }
+ }
+
+ private void send(Socket sock, String string) throws IOException {
+ String data = "[\"" + string + "\"]";
+ doSend(sock, data);
+ }
+
+ private void doSend(Socket sock, String data) throws IOException {
+ System.out.println("Sending... " + data);
+ this.serializer.serialize(data, sock.getOutputStream());
+ }
+
+ private void handleUpgrade(Socket sock) throws Exception {
+ BufferedReader reader = new BufferedReader(new InputStreamReader(sock.getInputStream()));
+ while (true) {
+ String s = reader.readLine();
+ System.out.println(s);
+ if (s.length() == 0) {
+ break;
+ }
+ }
+ }
+
+ private class SocksJSWebSocketReader implements Runnable {
+
+ private Socket sock;
+
+ private SocksJSWebSocketReader(Socket sock) {
+ this.sock = sock;
+ }
+
+ public void run() {
+ while (true) {
+ try {
+ SockJsFrame frame = serializer.deserialize(this.sock.getInputStream());
+ if (frame.getType() == SockJsFrame.TYPE_CLOSE) {
+ sock.close();
+ }
+ else if (frame.getType() == SockJsFrame.TYPE_PING) {
+ this.sendPong(frame.getPayload());
+ }
+ } catch (IOException e) {
+ e.printStackTrace();
+ try {
+ this.sock.close();
+ } catch (IOException e1) {
+ e1.printStackTrace();
+ throw new RuntimeException(e1);
+ }
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ private void sendPong(String data) throws IOException {
+ doSend(sock, "pong:" + data);
+ }
+ }
+
+}
diff --git a/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/sockjs/SockJsXHRStreamingClient.java b/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/sockjs/SockJsXHRStreamingClient.java
new file mode 100644
index 0000000..11c3828
--- /dev/null
+++ b/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/sockjs/SockJsXHRStreamingClient.java
@@ -0,0 +1,176 @@
+/*
+ * Copyright 2002-2012 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.springframework.integration.ip.tcp.sockjs;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.Socket;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import javax.net.SocketFactory;
+
+import org.springframework.integration.ip.tcp.serializer.ByteArrayCrLfSerializer;
+import org.springframework.integration.ip.tcp.serializer.SoftEndOfStreamException;
+import org.springframework.integration.ip.tcp.sockjs.serializer.XHRStreamingChunkDeserializer;
+import org.springframework.integration.ip.tcp.sockjs.support.SockJsFrame;
+
+/**
+ * @author Gary Russell
+ * @since 2.2
+ *
+ */
+public class SockJsXHRStreamingClient {
+
+ private XHRStreamingChunkDeserializer deserializer = new XHRStreamingChunkDeserializer();
+
+ private ByteArrayCrLfSerializer crlfDeserializer = new ByteArrayCrLfSerializer();
+
+ Map<String, String> cookies = new ConcurrentHashMap<String, String>(); // TODO: needs to be nicer than this
+
+ public static void main(String[] args) throws Exception {
+ new SockJsXHRStreamingClient().start();
+ }
+
+ public void start() throws Exception {
+ int port = 80;
+// int port = 8081;
+// String host = "localhost";
+ String host = "echo-test.cloudfoundry.com";
+// String host = "192.168.222.132";
+ String uuid = UUID.randomUUID().toString();
+ String init =
+ "POST /echo/000/" + uuid + "/xhr_streaming HTTP/1.1\r\n" +
+ "Host: " + host + "\r\n" +
+ "Connection: keep-alive\r\n" +
+ "Accept-Encoding: identity\r\n" +
+ "Content-Length: 0\r\n" +
+ "\r\n";
+ Socket sock = SocketFactory.getDefault().createSocket(host, port);
+ InputStream inputStream = sock.getInputStream();
+ ExecutorService executor = Executors.newSingleThreadExecutor();
+ executor.execute(new SocksJSXHRStreamingReader(sock, uuid));
+ sock.getOutputStream().write(init.getBytes());
+ String statusLine = "HTTP/1.1 204 No Content";
+ int count = 0;
+ do {
+ Thread.sleep(1000);
+ if (sock.isClosed()) {
+ break;
+ }
+ if (cookies.get(uuid) == null) {
+ System.out.println("No cookies yet");
+ continue;
+ }
+ Socket sender = SocketFactory.getDefault().createSocket(host, port);
+ statusLine = send(sender, uuid, host);
+ }
+ while (statusLine.equals("HTTP/1.1 204 No Content") && count++ < 40);
+ this.deserializer.removeState(inputStream);
+ sock.close();
+ executor.shutdown();
+ }
+
+ private String readHeaders(Socket sock) throws IOException {
+ String statusLine = new String(this.crlfDeserializer.deserialize(sock.getInputStream()));
+ while (true) {
+ String s = new String(this.crlfDeserializer.deserialize(sock.getInputStream()));
+// System.out.println(s);
+ if (s.length() == 0) {
+ break;
+ }
+ }
+// System.out.println("Read headers");
+// System.out.println(statusLine);
+ return statusLine;
+ }
+
+ private String send(Socket sock, String uuid, String host) throws IOException {
+ String content = "[\"" + new String(new char[128]).replace('\0', 'x') + "\"]";
+ String sendData =
+ "POST /echo/000/" + uuid + "/xhr_send HTTP/1.1\r\n" +
+ "Host: " + host + "\r\n" +
+ "Accept-Encoding: identity\r\n" +
+ this.cookies.get(uuid) + "\r\n" +
+ "Content-Length: " + content.length() + "\r\n" +
+ "\r\n" +
+ content;
+ System.out.println("Sending... " + content);
+ sock.getOutputStream().write(sendData.getBytes());
+ String statusLine = readHeaders(sock);
+ sock.close();
+ return statusLine;
+ }
+
+ private class SocksJSXHRStreamingReader implements Runnable {
+
+ private final Socket sock;
+
+ private final String uuid;
+
+ private SocksJSXHRStreamingReader(Socket sock, String uuid) {
+ this.sock = sock;
+ this.uuid = uuid;
+ }
+
+ public void run() {
+ try {
+ while (true) {
+ try {
+ List<SockJsFrame> frames = deserializer.deserialize(this.sock.getInputStream());
+ for (SockJsFrame frame : frames) {
+ if (frame.getType() == SockJsFrame.TYPE_CLOSE) {
+ sock.close();
+ }
+ else if (frame.getType() == SockJsFrame.TYPE_COOKIES) {
+ SockJsXHRStreamingClient.this.cookies.put(this.uuid, frame.getPayload());
+ }
+ }
+ }
+ catch (SoftEndOfStreamException seose) {
+ System.out.println("Stream closed");
+ throw new RuntimeException(seose);
+ }
+ catch (IOException e) {
+ if(!("Socket closed".equals(e.getMessage()))) {
+ e.printStackTrace();
+ throw new RuntimeException(e);
+ }
+ return;
+ }
+ }
+ }
+ catch (RuntimeException re) {
+ if (!(re.getCause() instanceof SoftEndOfStreamException)) {
+ re.printStackTrace();
+ }
+ }
+ finally {
+ try {
+ this.sock.close();
+ } catch (IOException e1) {
+ e1.printStackTrace();
+ throw new RuntimeException(e1);
+ }
+ }
+ }
+ }
+
+}
diff --git a/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/sockjs/serializer/AbstractSockJsDeserializer.java b/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/sockjs/serializer/AbstractSockJsDeserializer.java
new file mode 100644
index 0000000..192e3af
--- /dev/null
+++ b/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/sockjs/serializer/AbstractSockJsDeserializer.java
@@ -0,0 +1,184 @@
+/*
+ * Copyright 2002-2012 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.springframework.integration.ip.tcp.sockjs.serializer;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.zip.GZIPInputStream;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.springframework.integration.ip.tcp.serializer.ByteArrayCrLfSerializer;
+import org.springframework.integration.ip.tcp.serializer.StatefulDeserializer;
+import org.springframework.integration.ip.tcp.sockjs.support.SockJsFrame;
+
+/**
+ * @author Gary Russell
+ * @since 2.1
+ *
+ */
+public abstract class AbstractSockJsDeserializer<T> implements StatefulDeserializer<T> {
+
+ protected final Log logger = LogFactory.getLog(this.getClass());
+
+ protected final ByteArrayCrLfSerializer crlfDeserializer = new ByteArrayCrLfSerializer();
+
+ protected volatile int maxMessageSize = 2048;
+
+ private final Map<InputStream, BasicState> streamState = new ConcurrentHashMap<InputStream, BasicState>();
+
+ void setMaxMessageSize(int maxMessageSize) {
+ this.maxMessageSize = maxMessageSize;
+ }
+
+ protected BasicState getStreamState(InputStream inputStream) {
+ return streamState.get(inputStream);
+ }
+
+ protected List<SockJsFrame> checkStreaming(InputStream inputStream) throws IOException {
+ BasicState isStreaming = this.streamState.get(inputStream);
+ if (isStreaming == null) { //Consume the headers - TODO - check status
+ StringBuilder headersBuilder = new StringBuilder();
+ byte[] headers = new byte[this.maxMessageSize];
+ int headersLength;
+ do {
+ headersLength = this.crlfDeserializer.fillToCrLf(inputStream, headers);
+ String header = new String(headers, 0, headersLength, "UTF-8");
+ headersBuilder.append(header).append("\r\n");
+ }
+ while (headersLength > 0);
+ BasicState basicState = new BasicState();
+ List<SockJsFrame> decodedHeaders = decodeHeaders(headersBuilder.toString(), basicState);
+ this.streamState.put(inputStream, basicState);
+ return decodedHeaders;
+ }
+ return null;
+ }
+
+ private List<SockJsFrame> decodeHeaders(String frameData, BasicState state) {
+ // TODO: Full header separation - mvc utils?
+ List<SockJsFrame> dataList = new ArrayList<SockJsFrame>();
+ System.out.println("Received:Headers\r\n" + frameData);
+ String[] headers = frameData.split("\\r\\n");
+ String cookies = "Cookie: ";
+ for (String header : headers) {
+ if (header.startsWith("Set-Cookie")) {
+ String[] bits = header.split(": *");
+ cookies += bits[1] + "; ";
+ }
+ else if (header.startsWith("Content-Encoding:") && header.contains("gzip")) {
+ state.setGzipping(true);
+ }
+ }
+ System.out.println(cookies);
+ dataList.add(new SockJsFrame(SockJsFrame.TYPE_HEADERS, frameData));
+ if (cookies.length() > 8) {
+ dataList.add(new SockJsFrame(SockJsFrame.TYPE_COOKIES, cookies));
+ }
+ return dataList;
+ }
+
+ protected void checkClosure(int bite) throws IOException {
+ if (bite < 0) {
+ logger.debug("Socket closed during message assembly");
+ throw new IOException("Socket closed during message assembly");
+ }
+ }
+
+ public void removeState(InputStream inputStream) {
+ this.streamState.remove(inputStream);
+ }
+
+ public abstract T deserialize(InputStream inputStream) throws IOException;
+
+ protected SockJsFrame decodeToFrame(String data) {
+ if (data.length() == 1 && data.equals("h")) {
+ System.out.println("Received:SockJS-Heartbeat");
+ return new SockJsFrame(SockJsFrame.TYPE_HEARTBEAT, data);
+ }
+ else if (data.length() == 0x800 && data.startsWith("hhhhhhhhhhhhh")) {
+ System.out.println("Received:SockJS-XHR-Prelude");
+ return new SockJsFrame(SockJsFrame.TYPE_PRELUDE, data);
+ }
+ else if (data.length() == 1 && data.equals("o")) {
+ System.out.println("Received:SockJS-Open");
+ return new SockJsFrame(SockJsFrame.TYPE_OPEN, data);
+ }
+ else if (data.length() > 0 && data.startsWith("c")) {
+ System.out.println("Received SockJS-Close:" + data.substring(1));
+ return new SockJsFrame(SockJsFrame.TYPE_CLOSE, data.substring(1));
+ }
+ else if (data.length() > 0 && data.startsWith("a")) {
+ System.out.println("Received data:" + data.substring(1));
+ return new SockJsFrame(SockJsFrame.TYPE_DATA, data.substring(1));
+ }
+ else {
+ System.out.println("Received unexpected:" + new String(data));
+ return new SockJsFrame(SockJsFrame.TYPE_UNEXPECTED, data);
+ }
+ }
+
+ public static class BasicState {
+
+ private volatile boolean gzipping;
+
+ private volatile GZIPInputStream gzipInputStream;
+
+ private volatile GZIPFeederInputStream gzipFeederInputStream;
+
+ boolean isGzipping() {
+ return gzipping;
+ }
+
+ void setGzipping(boolean gzipping) {
+ this.gzipping = gzipping;
+ }
+
+ GZIPInputStream getGzipInputStream() throws IOException {
+ if (this.gzipInputStream == null) {
+ this.gzipInputStream = new GZIPInputStream(this.gzipFeederInputStream);
+ }
+ return this.gzipInputStream;
+ }
+
+ GZIPFeederInputStream getGzipFeederInputStream() {
+ if (this.gzipFeederInputStream == null) {
+ this.gzipFeederInputStream = new GZIPFeederInputStream();
+ }
+ return gzipFeederInputStream;
+ }
+ }
+
+ public static class GZIPFeederInputStream extends InputStream {
+
+ private volatile InputStream inputStream;
+
+
+ void setInputStream(InputStream inputStream) {
+ this.inputStream = inputStream;
+ }
+
+ @Override
+ public int read() throws IOException {
+ return this.inputStream.read();
+ }
+
+ }
+}
\ No newline at end of file
diff --git a/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/sockjs/serializer/WebSocketSerializer.java b/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/sockjs/serializer/WebSocketSerializer.java
new file mode 100644
index 0000000..2efcc48
--- /dev/null
+++ b/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/sockjs/serializer/WebSocketSerializer.java
@@ -0,0 +1,221 @@
+/*
+ * Copyright 2002-2012 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.springframework.integration.ip.tcp.sockjs.serializer;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.springframework.core.serializer.Serializer;
+import org.springframework.integration.ip.tcp.serializer.SoftEndOfStreamException;
+import org.springframework.integration.ip.tcp.sockjs.support.SockJsFrame;
+
+/**
+ * @author Gary Russell
+ * @since 2.2
+ *
+ */
+public class WebSocketSerializer extends AbstractSockJsDeserializer<SockJsFrame> implements Serializer<String> {
+
+ private final Log logger = LogFactory.getLog(this.getClass());
+
+ private Map<InputStream, StringBuilder> fragments = new ConcurrentHashMap<InputStream, StringBuilder>();
+
+ public void removeFragments(InputStream inputStream) {
+ this.fragments.remove(inputStream);
+ }
+
+ public void serialize(final String data, OutputStream outputStream)
+ throws IOException {
+ int lenBytes;
+ int payloadLen = 0x80; //masked
+ boolean pong = data.startsWith("pong:");
+ String theData = pong ? data.substring(5) : data;
+ int length = theData.length();
+ if (length >= Math.pow(2, 16)) {
+ lenBytes = 8;
+ payloadLen |= 127;
+ }
+ else if (length > 125) {
+ lenBytes = 2;
+ payloadLen |= 126;
+ }
+ else {
+ lenBytes = 0;
+ payloadLen |= length;
+ }
+ int mask = (int) System.currentTimeMillis();
+ ByteBuffer buffer = ByteBuffer.allocate(length + 6 + lenBytes);
+ if (pong) {
+ buffer.put((byte) 0x8a);
+ } else {
+ // Final fragment; text
+ buffer.put((byte) 0x81);
+ }
+ buffer.put((byte) payloadLen);
+ if (lenBytes == 2) {
+ buffer.putShort((short) length);
+ }
+ else if (lenBytes == 8) {
+ buffer.putLong(length);
+ }
+
+ buffer.putInt(mask);
+ byte[] maskBytes = new byte[4];
+ buffer.position(buffer.position() - 4);
+ buffer.get(maskBytes);
+ byte[] bytes = theData.getBytes("UTF-8");
+ for (int i = 0; i < bytes.length; i++) {
+ buffer.put((byte) (bytes[i] ^ maskBytes[i % 4]));
+ }
+ outputStream.write(buffer.array());
+ }
+
+ public SockJsFrame deserialize(InputStream inputStream) throws IOException {
+ int bite;
+ if (logger.isDebugEnabled()) {
+ logger.debug("Available to read:" + inputStream.available());
+ }
+ boolean done = false;
+ int len = 0;
+ int n = 0;
+ int m = 0;
+ byte[] buffer = null;
+ boolean fin = false;
+ boolean ping = false;
+ boolean pong = false;
+ int lenBytes = 0;
+ while (!done ) {
+ bite = inputStream.read();
+// logger.debug("Read:" + Integer.toHexString(bite));
+ if (bite < 0 && n == 0) {
+ throw new SoftEndOfStreamException("Stream closed between payloads");
+ }
+ checkClosure(bite);
+ switch (n++) {
+ case 0:
+ fin = (bite & 0x80) > 0;
+ switch (bite) {
+ case 0x01:
+ case 0x81:
+ logger.debug("Text, fin=" + fin);
+ break;
+ case 0x02:
+ case 0x82:
+ logger.debug("Binary, fin=" + fin);
+ throw new IOException("SockJS doesn't support binary");
+ case 0x89:
+ ping = true;
+ logger.debug("Ping, fin=" + fin);
+ break;
+ case 0x8a:
+ pong = true;
+ logger.debug("Pong, fin=" + fin);
+ break;
+ case 0x88:
+ throw new IOException("Connection closed");
+ default:
+ throw new IOException("Unexpected opcode " + Integer.toHexString(bite));
+ }
+ break;
+ case 1:
+ if ((bite & 0x80) > 0) {
+ throw new IOException("Illegal: Received masked data from server");
+ }
+ if (bite < 126) {
+ len = bite;
+ buffer = new byte[len];
+ }
+ else if (bite == 126) {
+ lenBytes = 2;
+ }
+ else {
+ lenBytes = 8;
+ }
+ break;
+ case 2:
+ case 3:
+ case 4:
+ case 5:
+ if (lenBytes > 4 && bite != 0) {
+ throw new IOException("Max supported length exceeded");
+ }
+ case 6:
+ if (lenBytes > 3 && (bite & 0x80) > 0) {
+ throw new IOException("Max supported length exceeded");
+ }
+ case 7:
+ case 8:
+ case 9:
+ if (lenBytes-- > 0) {
+ len = len << 8 | bite;
+ if (lenBytes == 0) {
+ buffer = new byte[len];
+ }
+ break;
+ }
+ default:
+ buffer[m++] = (byte) bite;
+ done = m >= len;
+ }
+ };
+ String data = new String(buffer, "UTF-8");
+ if (!fin) {
+ StringBuilder builder = this.fragments.get(inputStream);
+ if (builder == null) {
+ builder = new StringBuilder();
+ this.fragments.put(inputStream, builder);
+ }
+ builder.append(data);
+ return null;
+ }
+ else if (ping) {
+ return new SockJsFrame(SockJsFrame.TYPE_PING, data);
+ }
+ else if (pong) {
+ return new SockJsFrame(SockJsFrame.TYPE_PONG, data);
+ }
+ else {
+ StringBuilder builder = this.fragments.get(inputStream);
+ if (builder == null) {
+ return decodeToFrame(data);
+ }
+ else {
+ builder.append(data).toString();
+ this.removeFragments(inputStream);
+ return this.decodeToFrame(builder.toString());
+ }
+ }
+ }
+
+ protected void checkClosure(int bite) throws IOException {
+ if (bite < 0) {
+ logger.debug("Socket closed during message assembly");
+ throw new IOException("Socket closed during message assembly");
+ }
+ }
+
+ public void removeState(InputStream inputStream) {
+ super.removeState(inputStream);
+ this.removeFragments(inputStream);
+ }
+
+}
diff --git a/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/sockjs/serializer/XHRStreamingChunkDeserializer.java b/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/sockjs/serializer/XHRStreamingChunkDeserializer.java
new file mode 100644
index 0000000..d8b5783
--- /dev/null
+++ b/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/sockjs/serializer/XHRStreamingChunkDeserializer.java
@@ -0,0 +1,116 @@
+/*
+ * Copyright 2002-2012 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.springframework.integration.ip.tcp.sockjs.serializer;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.zip.GZIPInputStream;
+
+import org.springframework.integration.ip.tcp.serializer.SoftEndOfStreamException;
+import org.springframework.integration.ip.tcp.sockjs.support.SockJsFrame;
+import org.springframework.util.Assert;
+
+/**
+ * @author Gary Russell
+ * @since 2.2
+ *
+ */
+public class XHRStreamingChunkDeserializer extends AbstractSockJsDeserializer<List<SockJsFrame>> {
+
+ public List<SockJsFrame> deserialize(InputStream inputStream) throws IOException {
+ List<SockJsFrame> headers = checkStreaming(inputStream);
+ if (headers != null) {
+ return headers;
+ }
+ BasicState state = this.getStreamState(inputStream);
+ boolean gzipping = state == null ? false : state.isGzipping();
+ boolean complete = false;
+ StringBuilder builder = new StringBuilder();
+ while (!complete) {
+ byte[] chunkLengthInHex = this.crlfDeserializer.deserialize(inputStream);
+ if (chunkLengthInHex.length == 0) {
+ break;
+ }
+ int chunkLength = 0;
+ try {
+ chunkLength = Integer.parseInt(new String(chunkLengthInHex, "UTF-8"), 16);
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ }
+ if (logger.isDebugEnabled()) {
+ logger.debug("Chunk size = " + chunkLength);
+ }
+ if (chunkLength <= 0) {
+ throw new SoftEndOfStreamException("0 length chunk received");
+ }
+ byte[] chunk = new byte[chunkLength];
+ for (int i = 0; i < chunkLength; i++) {
+ int c = inputStream.read();
+ checkClosure(c);
+ chunk[i] = (byte) c;
+ }
+ int eom = chunk[chunkLength-1];
+ Assert.isTrue(inputStream.read() == '\r', "Expected \\r");
+ Assert.isTrue(inputStream.read() == '\n', "Expected \\n");
+ int adjust = complete ? 1 : 0;
+ String data;
+ if (gzipping) {
+ byte[] unzipped = new byte[this.maxMessageSize];
+ GZIPFeederInputStream feeder = state.getGzipFeederInputStream();
+ ByteArrayInputStream byteStream = new ByteArrayInputStream(chunk, 0, chunkLength - adjust);
+ feeder.setInputStream(byteStream);
+ GZIPInputStream gzipInputStream = state.getGzipInputStream();
+ while (byteStream.available() > 0) {
+ int decodedLength = gzipInputStream.read(unzipped);
+ if (logger.isDebugEnabled()) {
+ logger.debug("Inflated to " + decodedLength);
+ }
+ data = new String(unzipped, 0, decodedLength, "UTF-8");
+ builder.append(data);
+ }
+ eom = builder.charAt(builder.length()-1);
+ }
+ else {
+ data = new String(chunk, 0, chunkLength - adjust, "UTF-8");
+ builder.append(data);
+ }
+ complete = eom == '\n';
+// System.out.println(data.length() + ":" + data);
+ }
+ return this.decodeFrameData(builder.toString());
+ }
+
+ List<SockJsFrame> decodeFrameData(String frameData) throws IOException {
+ List<SockJsFrame> dataList = new ArrayList<SockJsFrame>();
+ // some servers put multiple frames in the same chunk
+ String[] frames;
+ if (frameData.contains("\n")) {
+ frames = frameData.split("\n");
+ }
+ else {
+ frames = new String[] {frameData};
+ }
+ for (String data : frames) {
+ dataList.add(this.decodeToFrame(data));
+ }
+ return dataList;
+ }
+
+}
diff --git a/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/sockjs/support/SockJsFrame.java b/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/sockjs/support/SockJsFrame.java
new file mode 100644
index 0000000..6ba9aa8
--- /dev/null
+++ b/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/sockjs/support/SockJsFrame.java
@@ -0,0 +1,71 @@
+/*
+ * Copyright 2002-2012 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.springframework.integration.ip.tcp.sockjs.support;
+
+/**
+ * @author Gary Russell
+ * @since 2.2
+ *
+ */
+public class SockJsFrame {
+
+ public static final int TYPE_HEADERS = 1;
+
+ public static final int TYPE_HEARTBEAT = 2;
+
+ public static final int TYPE_PRELUDE = 3;
+
+ public static final int TYPE_DATA = 4;
+
+ public static final int TYPE_PING = 5;
+
+ public static final int TYPE_PONG = 6;
+
+ public static final int TYPE_OPEN = 7;
+
+ public static final int TYPE_CLOSE = 8;
+
+ public static final int TYPE_UNEXPECTED = 9;
+
+ public static final int TYPE_COOKIES = 10;
+
+ private static final String[] typeToString = new String[] {
+ "Invalid", "Headers", "HeartBeat", "XHR Prelude", "Data", "Ping", "Pong", "Open", "Close", "Unexpected", "Cookies"
+ };
+
+ private final int type;
+
+ private final String payload;
+
+ public SockJsFrame(int type, String payload) {
+ this.type = type;
+ this.payload = payload;
+ }
+
+ public int getType() {
+ return this.type;
+ }
+
+ public String getPayload() {
+ return this.payload;
+ }
+
+ @Override
+ public String toString() {
+ return "SockJsFrame [type=" + typeToString[type] + ", payload=" + payload + "]";
+ }
+
+}
diff --git a/spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/connection/TcpMessageMapperTests.java b/spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/connection/TcpMessageMapperTests.java
index 7cc0f98..fca1f97 100644
--- a/spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/connection/TcpMessageMapperTests.java
+++ b/spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/connection/TcpMessageMapperTests.java
@@ -20,6 +20,8 @@ import static org.junit.Assert.assertNull;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
+import java.io.IOException;
+import java.io.InputStream;
import java.net.Socket;
import javax.net.SocketFactory;
@@ -88,6 +90,9 @@ public class TcpMessageMapperTests {
public String getConnectionId() {
return "anId";
}
+ public InputStream getInputStream() throws IOException {
+ return null;
+ }
};
Message<Object> message = mapper.toMessage(connection);
assertEquals(TEST_PAYLOAD, new String((byte[]) message.getPayload()));
@@ -140,6 +145,9 @@ public class TcpMessageMapperTests {
public String getConnectionId() {
return "anId";
}
+ public InputStream getInputStream() throws IOException {
+ return null;
+ }
};
Message<Object> message = mapper.toMessage(connection);
assertEquals(TEST_PAYLOAD, new String((byte[]) message.getPayload()));
diff --git a/spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/serializer/WebSocketDeserializerTests.java b/spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/serializer/WebSocketDeserializerTests.java
new file mode 100644
index 0000000..31425e6
--- /dev/null
+++ b/spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/serializer/WebSocketDeserializerTests.java
@@ -0,0 +1,131 @@
+/*
+ * Copyright 2002-2012 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.springframework.integration.ip.tcp.serializer;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.junit.Test;
+import org.springframework.integration.ip.tcp.sockjs.serializer.WebSocketSerializer;
+import org.springframework.integration.ip.tcp.sockjs.support.SockJsFrame;
+
+/**
+ * @author Gary Russell
+ * @since 2.2
+ *
+ */
+public class WebSocketDeserializerTests {
+
+ WebSocketSerializer deserializer = new WebSocketSerializer();
+
+ @Test
+ public void testShortLength() throws Exception {
+ byte[] msg = new byte[] {(byte) 0x81, 0x01, 'A'};
+ InputStream is = new ByteArrayInputStream(msg);
+ SockJsFrame result = deserializer.deserialize(is);
+ assertEquals("A", result.getPayload());
+ }
+
+ @Test
+ public void testMediumLength() throws Exception {
+ byte[] msg = new byte[132];
+ msg[0] = (byte) 0x81;
+ msg[1] = 0x7e;
+ msg[2] = 0;
+ msg[3] = (byte) 0x80;
+ msg[4] = 'A';
+ msg[131] = 'Z';
+ InputStream is = new ByteArrayInputStream(msg);
+ String result = deserializer.deserialize(is).getPayload();
+ assertEquals(128, result.length());
+ assertTrue(result.startsWith("A"));
+ assertTrue(result.endsWith("Z"));
+ }
+
+ @Test
+ public void testMediumLengthFull() throws Exception {
+ byte[] msg = new byte[(int) (Math.pow(2, 16) + 3)];
+ msg[0] = (byte) 0x81;
+ msg[1] = 0x7e;
+ msg[2] = (byte) 0xff;
+ msg[3] = (byte) 0xff;
+ msg[4] = 'A';
+ msg[(int) (Math.pow(2, 16) + 2)] = 'Z';
+ InputStream is = new ByteArrayInputStream(msg);
+ String result = deserializer.deserialize(is).getPayload();
+ assertEquals((int) Math.pow(2, 16) - 1, result.length());
+ assertTrue(result.startsWith("A"));
+ assertTrue(result.endsWith("Z"));
+ }
+
+ @Test
+ public void testLongLength() throws Exception {
+ byte[] msg = new byte[(int) (Math.pow(2, 16) + 10)];
+ msg[0] = (byte) 0x81;
+ msg[1] = 0x7f;
+ msg[2] = 0;
+ msg[3] = 0;
+ msg[4] = 0;
+ msg[5] = 0;
+ msg[6] = 0;
+ msg[7] = 1;
+ msg[8] = 0;
+ msg[9] = 0;
+ msg[10] = 'A';
+ msg[(int) (Math.pow(2, 16) + 9)] = 'Z';
+ InputStream is = new ByteArrayInputStream(msg);
+ String result = deserializer.deserialize(is).getPayload();
+ assertEquals((int) Math.pow(2, 16), result.length());
+ assertTrue(result.startsWith("A"));
+ assertTrue(result.endsWith("Z"));
+ }
+
+ @Test(expected=IOException.class)
+ public void testTooLong() throws Exception {
+ byte[] msg = new byte[10];
+ msg[0] = (byte) 0x81;
+ msg[1] = 0x7f;
+ msg[2] = 0;
+ msg[3] = 0;
+ msg[4] = 1;
+ msg[5] = 0;
+ msg[6] = 0;
+ msg[7] = 1;
+ msg[8] = 0;
+ msg[9] = 0;
+ InputStream is = new ByteArrayInputStream(msg);
+ deserializer.deserialize(is);
+ }
+
+ @Test
+ public void testFragmented() throws Exception {
+ byte[] msg = new byte[] {(byte) 0x01, 0x01, 'A'};
+ InputStream is = new ByteArrayInputStream(msg);
+ SockJsFrame frame = deserializer.deserialize(is);
+ assertNull(frame);
+ msg[0] = (byte) 0x81;
+ msg[1] = 0x01;
+ msg[2] = 'B';
+ is.reset();
+ frame = deserializer.deserialize(is);
+ assertEquals("AB", frame.getPayload());
+ }
+}
diff --git a/spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/serializer/WebSocketSserializerTests.java b/spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/serializer/WebSocketSserializerTests.java
new file mode 100644
index 0000000..8a8a156
--- /dev/null
+++ b/spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/serializer/WebSocketSserializerTests.java
@@ -0,0 +1,119 @@
+/*
+ * Copyright 2002-2012 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.springframework.integration.ip.tcp.serializer;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.ByteArrayOutputStream;
+
+import org.junit.Test;
+import org.springframework.integration.ip.tcp.sockjs.serializer.WebSocketSerializer;
+
+/**
+ * @author Gary Russell
+ * @since 2.2
+ *
+ */
+public class WebSocketSserializerTests {
+
+ WebSocketSerializer serializer = new WebSocketSerializer();
+
+ @Test
+ public void testShortLength() throws Exception {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ String data = "Hello, world!";
+ serializer.serialize(data, baos);
+ byte[] bytes = baos.toByteArray();
+ assertEquals(0x81, (int) bytes[0] & 0xff);
+ assertEquals(0x8d, (int) bytes[1] & 0xff);
+ int mask = 0;
+ for (int i = 6; i < bytes.length; i++) {
+ bytes[i] ^= bytes[mask++ % 4 + 2];
+ }
+ String result = new String(bytes, 6, bytes.length - 6);
+ assertEquals(data, result);
+ }
+
+ @Test
+ public void testMediumLength() throws Exception {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ byte[] bytes = new byte[150];
+ bytes[0] = 'A';
+ bytes[149] = 'Z';
+ String data = new String(bytes);
+ serializer.serialize(data, baos);
+ bytes = baos.toByteArray();
+ assertEquals(0x81, (int) bytes[0] & 0xff);
+ assertEquals(0xFe, (int) bytes[1] & 0xff);
+ assertEquals(0, bytes[2]);
+ assertEquals(150, (int) bytes[3] & 0xff);
+ int mask = 0;
+ for (int i = 8; i < bytes.length; i++) {
+ bytes[i] ^= bytes[mask++ % 4 + 4];
+ }
+ String result = new String(bytes, 8, bytes.length - 8);
+ assertEquals(data, result);
+ }
+
+ @Test
+ public void testMediumLengthFull() throws Exception {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ byte[] bytes = new byte[(int) Math.pow(2, 16) - 1];
+ bytes[0] = 'A';
+ bytes[(int) Math.pow(2, 16) - 2] = 'Z';
+ String data = new String(bytes);
+ serializer.serialize(data, baos);
+ bytes = baos.toByteArray();
+ assertEquals(0x81, (int) bytes[0] & 0xff);
+ assertEquals(0xFe, (int) bytes[1] & 0xff);
+ assertEquals(0xff, (int) bytes[2] & 0xff);
+ assertEquals(0xff, (int) bytes[3] & 0xff);
+ int mask = 0;
+ for (int i = 8; i < bytes.length; i++) {
+ bytes[i] ^= bytes[mask++ % 4 + 4];
+ }
+ String result = new String(bytes, 8, bytes.length - 8);
+ assertEquals(data, result);
+ }
+
+ @Test
+ public void testLongLength() throws Exception {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ byte[] bytes = new byte[(int) Math.pow(2, 16)];
+ bytes[0] = 'A';
+ bytes[(int) Math.pow(2, 16) - 1] = 'Z';
+ String data = new String(bytes);
+ serializer.serialize(data, baos);
+ bytes = baos.toByteArray();
+ assertEquals(0x81, (int) bytes[0] & 0xff);
+ assertEquals(0xff, (int) bytes[1] & 0xff);
+ assertEquals(0, (int) bytes[2] & 0xff);
+ assertEquals(0, (int) bytes[3] & 0xff);
+ assertEquals(0, (int) bytes[4] & 0xff);
+ assertEquals(0, (int) bytes[5] & 0xff);
+ assertEquals(0, (int) bytes[6] & 0xff);
+ assertEquals(1, (int) bytes[7] & 0xff);
+ assertEquals(0, (int) bytes[8] & 0xff);
+ assertEquals(0, (int) bytes[9] & 0xff);
+ int mask = 0;
+ for (int i = 14; i < bytes.length; i++) {
+ bytes[i] ^= bytes[mask++ % 4 + 10];
+ }
+ String result = new String(bytes, 14, bytes.length - 14);
+ assertEquals(data, result);
+ }
+
+}
diff --git a/spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/sockjs/SockJsTemplateTests.java b/spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/sockjs/SockJsTemplateTests.java
new file mode 100644
index 0000000..5568fb7
--- /dev/null
+++ b/spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/sockjs/SockJsTemplateTests.java
@@ -0,0 +1,119 @@
+/*
+ * Copyright 2002-2012 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.springframework.integration.ip.tcp.sockjs;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.junit.Test;
+import org.springframework.integration.ip.tcp.connection.AbstractClientConnectionFactory;
+import org.springframework.integration.ip.tcp.connection.TcpNetClientConnectionFactory;
+import org.springframework.integration.ip.tcp.connection.TcpNioClientConnectionFactory;
+import org.springframework.integration.ip.tcp.serializer.ByteArrayRawSerializer;
+import org.springframework.integration.ip.tcp.sockjs.serializer.XHRStreamingChunkDeserializer;
+import org.springframework.integration.ip.tcp.sockjs.support.SockJsFrame;
+
+/**
+ * @author Gary Russell
+ * @since 2.2
+ *
+ */
+public class SockJsTemplateTests {
+
+ @Test
+ public void testXHRStream() throws Exception {
+ AbstractClientConnectionFactory ccf = new TcpNetClientConnectionFactory("localhost", 8081);
+// AbstractClientConnectionFactory ccf = new TcpNetClientConnectionFactory("echo-test.cloudfoundry.com", 80);
+ testXHSStreamGuts(ccf);
+ }
+
+ @Test
+ public void testXHRStreamNIO() throws Exception {
+ AbstractClientConnectionFactory ccf = new TcpNioClientConnectionFactory("localhost", 8081);
+ testXHSStreamGuts(ccf);
+ }
+
+ @Test
+ public void testXHRStreamGzip() throws Exception {
+// AbstractClientConnectionFactory ccf = new TcpNetClientConnectionFactory("localhost", 8081);
+ AbstractClientConnectionFactory ccf = new TcpNetClientConnectionFactory("echo-test.cloudfoundry.com", 80);
+ testXHSStreamGuts(ccf, true);
+ }
+
+ private void testXHSStreamGuts(AbstractClientConnectionFactory ccf) throws Exception {
+ testXHSStreamGuts(ccf, false);
+ }
+
+ private void testXHSStreamGuts(AbstractClientConnectionFactory ccf, boolean gzipping) throws Exception {
+ ccf.setDeserializer(new XHRStreamingChunkDeserializer());
+ ccf.setSerializer(new ByteArrayRawSerializer());
+ ccf.setSoTimeout(60000);
+ SockJsTemplate template = new SockJsTemplate(ccf);
+ template.setGzipping(gzipping);
+ ccf.start();
+ final List<String> uuids = new ArrayList<String>();
+ final List<String> results = new ArrayList<String>();
+
+ SockJsContext context = template.startStream("/echo/000", new SockJsCallback() {
+
+ public void data(SockJsFrame frame, String uuid) {
+ System.out.println("Received data: " + frame);
+ results.add(frame.getPayload());
+ }
+
+ public void control(SockJsFrame frame, String uuid) {
+ System.out.println("Received control: " + frame);
+ if (frame.getType() == SockJsFrame.TYPE_OPEN) {
+ uuids.add(uuid);
+ }
+ }
+
+ public void closed(String uuid) {
+ System.out.println("Closed");
+ }
+ });
+
+ int n = 0;
+ while (context.getCookies() == null) {
+ Thread.sleep(100);
+ if (n++ > 100) {
+ fail("Failed to receive cookies");
+ }
+ }
+ n = 0;
+ while (uuids.size() < 1) {
+ Thread.sleep(100);
+ if (n++ > 100) {
+ fail("Open was not received");
+ }
+ }
+ for (int i = 0; i < 4; i++) {
+ Thread.sleep(1000);
+ SockJsFrame frame = template.sendXhr("/echo/000", uuids.get(0),
+ "[\"" + new String(new char[128]).replace('\0', 'x') +
+ "\"]", context);
+ System.out.println(frame.getPayload());
+ }
+ assertEquals(4, results.size());
+ while (results.size() > 0) {
+ assertEquals("[\"" + new String(new char[128]).replace('\0', 'x') + "\"]", results.remove(0));
+ }
+ }
+
+}
diff --git a/spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/sockjs/seralizer/XHRStreamingChunkDeserializerTests.java b/spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/sockjs/seralizer/XHRStreamingChunkDeserializerTests.java
new file mode 100644
index 0000000..888fe52
--- /dev/null
+++ b/spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/sockjs/seralizer/XHRStreamingChunkDeserializerTests.java
@@ -0,0 +1,45 @@
+/*
+ * Copyright 2002-2012 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.springframework.integration.ip.tcp.sockjs.seralizer;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.ByteArrayInputStream;
+import java.util.List;
+
+import org.junit.Test;
+import org.springframework.integration.ip.tcp.sockjs.serializer.XHRStreamingChunkDeserializer;
+import org.springframework.integration.ip.tcp.sockjs.support.SockJsFrame;
+
+/**
+ * @author Gary Russell
+ * @since 2.1
+ *
+ */
+public class XHRStreamingChunkDeserializerTests {
+
+ @Test
+ public void test() throws Exception {
+ String test = "HTTP\r\nHeaders\r\n\r\nb\r\naabcdefghi\n\r\n";
+ ByteArrayInputStream bais = new ByteArrayInputStream(test.getBytes());
+ XHRStreamingChunkDeserializer xhrStreamingChunkDeserializer = new XHRStreamingChunkDeserializer();
+ List<SockJsFrame> deserialize = xhrStreamingChunkDeserializer.deserialize(bais);
+ assertEquals("HTTP\r\nHeaders\r\n\r\n", deserialize.get(0).getPayload());
+ deserialize = xhrStreamingChunkDeserializer.deserialize(bais);
+ assertEquals("abcdefghi", deserialize.get(0).getPayload());
+ }
+
+}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment