Created
March 21, 2012 14:48
-
-
Save majek/2147792 to your computer and use it in GitHub Desktop.
spring-integration sockjs vs master
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
diff --git a/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/AbstractClientConnectionFactory.java b/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/AbstractClientConnectionFactory.java | |
index 7b4252f..87e462d 100644 | |
--- a/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/AbstractClientConnectionFactory.java | |
+++ b/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/AbstractClientConnectionFactory.java | |
@@ -57,6 +57,20 @@ public abstract class AbstractClientConnectionFactory extends AbstractConnection | |
} | |
} | |
+ /** | |
+ * Get a dedicated long-lived connection - regardless of single-use | |
+ * setting. Caller is responsible to close. | |
+ * @return | |
+ * @throws Exception | |
+ */ | |
+ public abstract TcpConnection getNewConnection() throws Exception; | |
+ | |
+ | |
+ /** | |
+ * Gets the single or new connection. | |
+ * @return | |
+ * @throws Exception | |
+ */ | |
protected abstract TcpConnection getOrMakeConnection() throws Exception; | |
/** | |
diff --git a/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/AbstractTcpConnectionInterceptor.java b/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/AbstractTcpConnectionInterceptor.java | |
index fa22449..4c1cf52 100644 | |
--- a/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/AbstractTcpConnectionInterceptor.java | |
+++ b/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/AbstractTcpConnectionInterceptor.java | |
@@ -16,6 +16,9 @@ | |
package org.springframework.integration.ip.tcp.connection; | |
+import java.io.IOException; | |
+import java.io.InputStream; | |
+ | |
import org.springframework.core.serializer.Deserializer; | |
import org.springframework.core.serializer.Serializer; | |
import org.springframework.integration.Message; | |
@@ -177,4 +180,8 @@ public abstract class AbstractTcpConnectionInterceptor implements TcpConnectionI | |
return this.realSender; | |
} | |
+ public InputStream getInputStream() throws IOException { | |
+ return theConnection.getInputStream(); | |
+ } | |
+ | |
} | |
diff --git a/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/TcpConnection.java b/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/TcpConnection.java | |
index ebc9f79..34891c1 100644 | |
--- a/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/TcpConnection.java | |
+++ b/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/TcpConnection.java | |
@@ -16,6 +16,8 @@ | |
package org.springframework.integration.ip.tcp.connection; | |
+import java.io.IOException; | |
+import java.io.InputStream; | |
import java.net.Socket; | |
import java.nio.channels.SocketChannel; | |
@@ -148,5 +150,10 @@ public interface TcpConnection extends Runnable { | |
* @return the next sequence number for a message received on this socket | |
*/ | |
long incrementAndGetConnectionSequence(); | |
- | |
+ | |
+ /** | |
+ * @return the Inputstream | |
+ * @throws IOException | |
+ */ | |
+ InputStream getInputStream() throws IOException; | |
} | |
diff --git a/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/TcpNetClientConnectionFactory.java b/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/TcpNetClientConnectionFactory.java | |
index 66ea8ec..9c825c9 100644 | |
--- a/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/TcpNetClientConnectionFactory.java | |
+++ b/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/TcpNetClientConnectionFactory.java | |
@@ -50,6 +50,11 @@ public class TcpNetClientConnectionFactory extends | |
if (theConnection != null && theConnection.isOpen()) { | |
return theConnection; | |
} | |
+ return getNewConnection(); | |
+ } | |
+ | |
+ @Override | |
+ public TcpConnection getNewConnection() throws Exception { | |
if (logger.isDebugEnabled()) { | |
logger.debug("Opening new socket connection to " + this.getHost() + ":" + this.getPort()); | |
} | |
diff --git a/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/TcpNetConnection.java b/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/TcpNetConnection.java | |
index e2a877e..0ea7b7a 100644 | |
--- a/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/TcpNetConnection.java | |
+++ b/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/TcpNetConnection.java | |
@@ -16,6 +16,8 @@ | |
package org.springframework.integration.ip.tcp.connection; | |
+import java.io.IOException; | |
+import java.io.InputStream; | |
import java.net.Socket; | |
import java.net.SocketTimeoutException; | |
@@ -132,6 +134,7 @@ public class TcpNetConnection extends AbstractTcpConnection { | |
logger.debug("Message received " + message); | |
} | |
try { | |
+ listener = this.getListener(); | |
if (listener == null) { | |
logger.warn("Unexpected message - no inbound adapter registered with connection " + message); | |
continue; | |
@@ -163,4 +166,11 @@ public class TcpNetConnection extends AbstractTcpConnection { | |
} | |
} | |
+ public InputStream getInputStream() throws IOException { | |
+ if (this.socket != null) { | |
+ return socket.getInputStream(); | |
+ } | |
+ return null; | |
+ } | |
+ | |
} | |
diff --git a/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/TcpNioClientConnectionFactory.java b/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/TcpNioClientConnectionFactory.java | |
index ede4db7..2b91e48 100644 | |
--- a/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/TcpNioClientConnectionFactory.java | |
+++ b/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/TcpNioClientConnectionFactory.java | |
@@ -64,6 +64,15 @@ public class TcpNioClientConnectionFactory extends | |
* @throws SocketException | |
*/ | |
protected TcpConnection getOrMakeConnection() throws Exception { | |
+ TcpConnection theConnection = this.getTheConnection(); | |
+ if (theConnection != null && theConnection.isOpen()) { | |
+ return theConnection; | |
+ } | |
+ return getNewConnection(); | |
+ } | |
+ | |
+ @Override | |
+ public TcpConnection getNewConnection() throws Exception { | |
int n = 0; | |
while (this.selector == null) { | |
try { | |
@@ -75,10 +84,6 @@ public class TcpNioClientConnectionFactory extends | |
throw new Exception("Factory failed to start"); | |
} | |
} | |
- TcpConnection theConnection = this.getTheConnection(); | |
- if (theConnection != null && theConnection.isOpen()) { | |
- return theConnection; | |
- } | |
if (logger.isDebugEnabled()) { | |
logger.debug("Opening new socket channel connection to " + this.getHost() + ":" + this.getPort()); | |
} | |
@@ -94,8 +99,8 @@ public class TcpNioClientConnectionFactory extends | |
connection.setLastRead(System.currentTimeMillis()); | |
} | |
this.connections.put(socketChannel, connection); | |
- newChannels.add(socketChannel); | |
- selector.wakeup(); | |
+ this.newChannels.add(socketChannel); | |
+ this.selector.wakeup(); | |
return wrappedConnection; | |
} | |
diff --git a/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/TcpNioConnection.java b/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/TcpNioConnection.java | |
index fe1b94a..d50c5ba 100644 | |
--- a/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/TcpNioConnection.java | |
+++ b/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/TcpNioConnection.java | |
@@ -17,6 +17,7 @@ | |
package org.springframework.integration.ip.tcp.connection; | |
import java.io.IOException; | |
+import java.io.InputStream; | |
import java.io.OutputStream; | |
import java.io.PipedInputStream; | |
import java.io.PipedOutputStream; | |
@@ -358,6 +359,10 @@ public class TcpNioConnection extends AbstractTcpConnection { | |
this.lastRead = lastRead; | |
} | |
+ public InputStream getInputStream() throws IOException { | |
+ return pipedInputStream; | |
+ } | |
+ | |
/** | |
* OutputStream to wrap a SocketChannel; implements timeout on write. | |
* | |
diff --git a/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/serializer/AbstractByteArraySerializer.java b/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/serializer/AbstractByteArraySerializer.java | |
index f630962..83bdcbb 100644 | |
--- a/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/serializer/AbstractByteArraySerializer.java | |
+++ b/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/serializer/AbstractByteArraySerializer.java | |
@@ -64,4 +64,20 @@ public abstract class AbstractByteArraySerializer implements | |
} | |
} | |
+ /** | |
+ * Copy size bytes to a new buffer exactly size bytes long. | |
+ * @param buffer The buffer containing the data. | |
+ * @param size The number of bytes to copy. | |
+ * @return The new buffer, or the buffer parameter if it is | |
+ * already the correct size. | |
+ */ | |
+ protected byte[] copyToSizedArray(byte[] buffer, int size) { | |
+ if (size == buffer.length) { | |
+ return buffer; | |
+ } | |
+ byte[] assembledData = new byte[size]; | |
+ System.arraycopy(buffer, 0, assembledData, 0, size); | |
+ return assembledData; | |
+ } | |
+ | |
} | |
diff --git a/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/serializer/ByteArrayCrLfSerializer.java b/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/serializer/ByteArrayCrLfSerializer.java | |
index 596e2ef..8b015ca 100644 | |
--- a/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/serializer/ByteArrayCrLfSerializer.java | |
+++ b/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/serializer/ByteArrayCrLfSerializer.java | |
@@ -38,6 +38,13 @@ public class ByteArrayCrLfSerializer extends AbstractByteArraySerializer { | |
*/ | |
public byte[] deserialize(InputStream inputStream) throws IOException { | |
byte[] buffer = new byte[this.maxMessageSize]; | |
+ int n = this.fillToCrLf(inputStream, buffer); | |
+ byte[] assembledData = this.copyToSizedArray(buffer, n); | |
+ return assembledData; | |
+ } | |
+ | |
+ public int fillToCrLf(InputStream inputStream, byte[] buffer) | |
+ throws IOException, SoftEndOfStreamException { | |
int n = 0; | |
int bite; | |
if (logger.isDebugEnabled()) { | |
@@ -59,9 +66,7 @@ public class ByteArrayCrLfSerializer extends AbstractByteArraySerializer { | |
+ this.maxMessageSize); | |
} | |
}; | |
- byte[] assembledData = new byte[n-1]; | |
- System.arraycopy(buffer, 0, assembledData, 0, n-1); | |
- return assembledData; | |
+ return n-1; // trim \r | |
} | |
/** | |
diff --git a/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/serializer/ByteArrayRawSerializer.java b/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/serializer/ByteArrayRawSerializer.java | |
index 97a2c3e..82df9f4 100644 | |
--- a/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/serializer/ByteArrayRawSerializer.java | |
+++ b/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/serializer/ByteArrayRawSerializer.java | |
@@ -61,8 +61,7 @@ public class ByteArrayRawSerializer extends AbstractByteArraySerializer { | |
+ this.maxMessageSize); | |
} | |
}; | |
- byte[] assembledData = new byte[n]; | |
- System.arraycopy(buffer, 0, assembledData, 0, n); | |
+ byte[] assembledData = this.copyToSizedArray(buffer, n); | |
return assembledData; | |
} | |
diff --git a/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/serializer/ByteArrayStxEtxSerializer.java b/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/serializer/ByteArrayStxEtxSerializer.java | |
index 429a393..70d23ca 100644 | |
--- a/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/serializer/ByteArrayStxEtxSerializer.java | |
+++ b/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/serializer/ByteArrayStxEtxSerializer.java | |
@@ -62,8 +62,7 @@ public class ByteArrayStxEtxSerializer extends AbstractByteArraySerializer { | |
+ this.maxMessageSize); | |
} | |
} | |
- byte[] assembledData = new byte[n]; | |
- System.arraycopy(buffer, 0, assembledData, 0, n); | |
+ byte[] assembledData = this.copyToSizedArray(buffer, n); | |
return assembledData; | |
} | |
diff --git a/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/serializer/StatefulDeserializer.java b/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/serializer/StatefulDeserializer.java | |
new file mode 100644 | |
index 0000000..d66f0a5 | |
--- /dev/null | |
+++ b/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/serializer/StatefulDeserializer.java | |
@@ -0,0 +1,30 @@ | |
+/* | |
+ * Copyright 2002-2012 the original author or authors. | |
+ * | |
+ * Licensed under the Apache License, Version 2.0 (the "License"); | |
+ * you may not use this file except in compliance with the License. | |
+ * You may obtain a copy of the License at | |
+ * | |
+ * http://www.apache.org/licenses/LICENSE-2.0 | |
+ * | |
+ * Unless required by applicable law or agreed to in writing, software | |
+ * distributed under the License is distributed on an "AS IS" BASIS, | |
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
+ * See the License for the specific language governing permissions and | |
+ * limitations under the License. | |
+ */ | |
+package org.springframework.integration.ip.tcp.serializer; | |
+ | |
+import java.io.InputStream; | |
+ | |
+import org.springframework.core.serializer.Deserializer; | |
+ | |
+/** | |
+ * @author Gary Russell | |
+ * @since 2.2 | |
+ * | |
+ */ | |
+public interface StatefulDeserializer<T> extends Deserializer<T> { | |
+ | |
+ void removeState(InputStream inputStream); | |
+} | |
diff --git a/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/sockjs/SockJsCallback.java b/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/sockjs/SockJsCallback.java | |
new file mode 100644 | |
index 0000000..f46f998 | |
--- /dev/null | |
+++ b/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/sockjs/SockJsCallback.java | |
@@ -0,0 +1,33 @@ | |
+/* | |
+ * Copyright 2002-2012 the original author or authors. | |
+ * | |
+ * Licensed under the Apache License, Version 2.0 (the "License"); | |
+ * you may not use this file except in compliance with the License. | |
+ * You may obtain a copy of the License at | |
+ * | |
+ * http://www.apache.org/licenses/LICENSE-2.0 | |
+ * | |
+ * Unless required by applicable law or agreed to in writing, software | |
+ * distributed under the License is distributed on an "AS IS" BASIS, | |
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
+ * See the License for the specific language governing permissions and | |
+ * limitations under the License. | |
+ */ | |
+package org.springframework.integration.ip.tcp.sockjs; | |
+ | |
+import org.springframework.integration.ip.tcp.sockjs.support.SockJsFrame; | |
+ | |
+/** | |
+ * @author Gary Russell | |
+ * @since 2.1 | |
+ * | |
+ */ | |
+public interface SockJsCallback { | |
+ | |
+ void data(SockJsFrame frame, String uuid); | |
+ | |
+ void control(SockJsFrame frame, String uuid); | |
+ | |
+ void closed(String uuid); | |
+ | |
+} | |
diff --git a/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/sockjs/SockJsContext.java b/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/sockjs/SockJsContext.java | |
new file mode 100644 | |
index 0000000..6190895 | |
--- /dev/null | |
+++ b/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/sockjs/SockJsContext.java | |
@@ -0,0 +1,45 @@ | |
+/* | |
+ * Copyright 2002-2012 the original author or authors. | |
+ * | |
+ * Licensed under the Apache License, Version 2.0 (the "License"); | |
+ * you may not use this file except in compliance with the License. | |
+ * You may obtain a copy of the License at | |
+ * | |
+ * http://www.apache.org/licenses/LICENSE-2.0 | |
+ * | |
+ * Unless required by applicable law or agreed to in writing, software | |
+ * distributed under the License is distributed on an "AS IS" BASIS, | |
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
+ * See the License for the specific language governing permissions and | |
+ * limitations under the License. | |
+ */ | |
+package org.springframework.integration.ip.tcp.sockjs; | |
+ | |
+/** | |
+ * @author Gary Russell | |
+ * @since 2.2 | |
+ * | |
+ */ | |
+public class SockJsContext { | |
+ | |
+ private volatile String cookies; | |
+ | |
+ private final String uuid; | |
+ | |
+ public SockJsContext(String uuid) { | |
+ this.uuid = uuid; | |
+ } | |
+ | |
+ void setCookies(String cookies) { | |
+ this.cookies = cookies; | |
+ } | |
+ | |
+ String getCookies() { | |
+ return cookies; | |
+ } | |
+ | |
+ String getUuid() { | |
+ return uuid; | |
+ } | |
+ | |
+} | |
diff --git a/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/sockjs/SockJsOperations.java b/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/sockjs/SockJsOperations.java | |
new file mode 100644 | |
index 0000000..54f5e1f | |
--- /dev/null | |
+++ b/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/sockjs/SockJsOperations.java | |
@@ -0,0 +1,33 @@ | |
+/* | |
+ * Copyright 2002-2012 the original author or authors. | |
+ * | |
+ * Licensed under the Apache License, Version 2.0 (the "License"); | |
+ * you may not use this file except in compliance with the License. | |
+ * You may obtain a copy of the License at | |
+ * | |
+ * http://www.apache.org/licenses/LICENSE-2.0 | |
+ * | |
+ * Unless required by applicable law or agreed to in writing, software | |
+ * distributed under the License is distributed on an "AS IS" BASIS, | |
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
+ * See the License for the specific language governing permissions and | |
+ * limitations under the License. | |
+ */ | |
+package org.springframework.integration.ip.tcp.sockjs; | |
+ | |
+import org.springframework.integration.ip.tcp.sockjs.support.SockJsFrame; | |
+ | |
+/** | |
+ * @author Gary Russell | |
+ * @since 2.2 | |
+ * | |
+ */ | |
+public interface SockJsOperations { | |
+ | |
+ public abstract SockJsContext startStream(String baseResource, | |
+ final SockJsCallback callback); | |
+ | |
+ public abstract SockJsFrame sendXhr(String baseResource, String uuid, | |
+ String data, SockJsContext context); | |
+ | |
+} | |
\ No newline at end of file | |
diff --git a/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/sockjs/SockJsTemplate.java b/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/sockjs/SockJsTemplate.java | |
new file mode 100644 | |
index 0000000..234adc9 | |
--- /dev/null | |
+++ b/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/sockjs/SockJsTemplate.java | |
@@ -0,0 +1,176 @@ | |
+/* | |
+ * Copyright 2002-2012 the original author or authors. | |
+ * | |
+ * Licensed under the Apache License, Version 2.0 (the "License"); | |
+ * you may not use this file except in compliance with the License. | |
+ * You may obtain a copy of the License at | |
+ * | |
+ * http://www.apache.org/licenses/LICENSE-2.0 | |
+ * | |
+ * Unless required by applicable law or agreed to in writing, software | |
+ * distributed under the License is distributed on an "AS IS" BASIS, | |
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
+ * See the License for the specific language governing permissions and | |
+ * limitations under the License. | |
+ */ | |
+package org.springframework.integration.ip.tcp.sockjs; | |
+ | |
+import java.io.IOException; | |
+import java.util.List; | |
+import java.util.UUID; | |
+import java.util.concurrent.BlockingQueue; | |
+import java.util.concurrent.LinkedBlockingQueue; | |
+import java.util.concurrent.TimeUnit; | |
+ | |
+import org.apache.commons.logging.Log; | |
+import org.apache.commons.logging.LogFactory; | |
+import org.springframework.core.serializer.Deserializer; | |
+import org.springframework.integration.Message; | |
+import org.springframework.integration.ip.tcp.connection.AbstractClientConnectionFactory; | |
+import org.springframework.integration.ip.tcp.connection.TcpConnection; | |
+import org.springframework.integration.ip.tcp.connection.TcpListener; | |
+import org.springframework.integration.ip.tcp.connection.TcpSender; | |
+import org.springframework.integration.ip.tcp.serializer.StatefulDeserializer; | |
+import org.springframework.integration.ip.tcp.sockjs.support.SockJsFrame; | |
+import org.springframework.integration.message.GenericMessage; | |
+import org.springframework.util.Assert; | |
+ | |
+/** | |
+ * @author Gary Russell | |
+ * @since 2.2 | |
+ * | |
+ */ | |
+public class SockJsTemplate implements TcpListener, SockJsOperations { | |
+ | |
+ private final Log logger = LogFactory.getLog(this.getClass()); | |
+ | |
+ private final AbstractClientConnectionFactory connectionFactory; | |
+ | |
+ private boolean gzipping; | |
+ | |
+ public SockJsTemplate(AbstractClientConnectionFactory connectionFactory) { | |
+ this.connectionFactory = connectionFactory; | |
+ this.connectionFactory.registerListener(this); | |
+ } | |
+ | |
+ boolean isGzipping() { | |
+ return gzipping; | |
+ } | |
+ | |
+ void setGzipping(boolean gzipping) { | |
+ this.gzipping = gzipping; | |
+ } | |
+ | |
+ public SockJsContext startStream(String baseResource, final SockJsCallback callback) { | |
+ final String uuid = UUID.randomUUID().toString(); | |
+ SockJsContext sockJsContext = new SockJsContext(uuid); | |
+ try { | |
+ TcpConnection connection = this.connectionFactory.getNewConnection(); | |
+ registerListener(connection, callback, uuid, sockJsContext); | |
+ registerSender(connection, callback, uuid); | |
+ connection.send(new GenericMessage<String>( | |
+ "POST " + baseResource + "/" + uuid + "/xhr_streaming HTTP/1.1\r\n" + | |
+ "Host: " + this.connectionFactory.getHost() + "\r\n" + | |
+ "Connection: keep-alive\r\n" + | |
+ "Accept-Encoding: " + (this.gzipping ? "gzip" : "identity") + "\r\n" + | |
+ "Content-Length: 0\r\n" + | |
+ "\r\n")); | |
+ } | |
+ catch (Exception e) { | |
+ throw new RuntimeException(e); | |
+ } | |
+ return sockJsContext; | |
+ } | |
+ | |
+ private void registerSender(TcpConnection connection, | |
+ final SockJsCallback callback, final String uuid) { | |
+ connection.registerSender(new TcpSender() { | |
+ | |
+ public void addNewConnection(TcpConnection connection) { | |
+ } | |
+ | |
+ public void removeDeadConnection(TcpConnection connection) { | |
+ callback.closed(uuid); | |
+ Deserializer<?> deserializer = connectionFactory.getDeserializer(); | |
+ if (deserializer instanceof StatefulDeserializer) { | |
+ try { | |
+ ((StatefulDeserializer<?>) deserializer).removeState(connection.getInputStream()); | |
+ } catch (IOException e) { | |
+ // TODO Auto-generated catch block | |
+ e.printStackTrace(); | |
+ } | |
+ } | |
+ } | |
+ }); | |
+ } | |
+ | |
+ private void registerListener(final TcpConnection connection, | |
+ final SockJsCallback callback, final String uuid, final SockJsContext sockJsContext) { | |
+ connection.registerListener(new TcpListener() { | |
+ | |
+ @SuppressWarnings("unchecked") | |
+ public boolean onMessage(Message<?> message) { | |
+ for (SockJsFrame frame : (List<SockJsFrame>) message.getPayload()) { | |
+ switch (frame.getType()) { | |
+ case SockJsFrame.TYPE_COOKIES: | |
+ sockJsContext.setCookies(frame.getPayload()); | |
+ break; | |
+ case SockJsFrame.TYPE_CLOSE: | |
+ connection.close(); | |
+ callback.closed(uuid); | |
+ break; | |
+ case SockJsFrame.TYPE_HEADERS: | |
+ case SockJsFrame.TYPE_HEARTBEAT: | |
+ case SockJsFrame.TYPE_OPEN: | |
+ case SockJsFrame.TYPE_PING: | |
+ case SockJsFrame.TYPE_PONG: | |
+ case SockJsFrame.TYPE_PRELUDE: | |
+ case SockJsFrame.TYPE_UNEXPECTED: | |
+ callback.control(frame, uuid); | |
+ break; | |
+ case SockJsFrame.TYPE_DATA: | |
+ callback.data(frame, uuid); | |
+ } | |
+ } | |
+ return false; | |
+ } | |
+ }); | |
+ } | |
+ | |
+ public boolean onMessage(Message<?> message) { | |
+ logger.error("Should not be called"); | |
+ return false; | |
+ } | |
+ | |
+ public SockJsFrame sendXhr(String baseResource, String uuid, String data, SockJsContext context) { | |
+ try { | |
+ TcpConnection connection = this.connectionFactory.getNewConnection(); | |
+ final BlockingQueue<SockJsFrame> reply = new LinkedBlockingQueue<SockJsFrame>(); | |
+ connection.registerListener(new TcpListener() { | |
+ public boolean onMessage(Message<?> message) { | |
+ @SuppressWarnings("unchecked") | |
+ SockJsFrame frame = (SockJsFrame) ((List<SockJsFrame>) message.getPayload()).get(0); | |
+ Assert.isTrue(frame.getType() == SockJsFrame.TYPE_HEADERS); | |
+ reply.offer(frame); | |
+ return false; | |
+ } | |
+ }); | |
+ connection.send(new GenericMessage<String>( | |
+ "POST " + baseResource + "/" + uuid + "/xhr_send HTTP/1.1\r\n" + | |
+ "Host: " + this.connectionFactory.getHost() + "\r\n" + | |
+ "Accept-Encoding: identity\r\n" + | |
+ context.getCookies() + "\r\n" + | |
+ "Content-Length: " + data.length() + "\r\n" + | |
+ "\r\n" + | |
+ data)); | |
+ SockJsFrame frame = reply.poll(10, TimeUnit.SECONDS); | |
+ connection.close(); | |
+ return frame; | |
+ } | |
+ catch (Exception e) { | |
+ throw new RuntimeException(e); | |
+ } | |
+ } | |
+ | |
+ | |
+} | |
diff --git a/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/sockjs/SockJsWebSocketClient.java b/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/sockjs/SockJsWebSocketClient.java | |
new file mode 100644 | |
index 0000000..7c7f1b5 | |
--- /dev/null | |
+++ b/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/sockjs/SockJsWebSocketClient.java | |
@@ -0,0 +1,118 @@ | |
+/* | |
+ * Copyright 2002-2012 the original author or authors. | |
+ * | |
+ * Licensed under the Apache License, Version 2.0 (the "License"); | |
+ * you may not use this file except in compliance with the License. | |
+ * You may obtain a copy of the License at | |
+ * | |
+ * http://www.apache.org/licenses/LICENSE-2.0 | |
+ * | |
+ * Unless required by applicable law or agreed to in writing, software | |
+ * distributed under the License is distributed on an "AS IS" BASIS, | |
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
+ * See the License for the specific language governing permissions and | |
+ * limitations under the License. | |
+ */ | |
+package org.springframework.integration.ip.tcp.sockjs; | |
+ | |
+import java.io.BufferedReader; | |
+import java.io.IOException; | |
+import java.io.InputStreamReader; | |
+import java.net.Socket; | |
+import java.util.concurrent.Executors; | |
+ | |
+import javax.net.SocketFactory; | |
+ | |
+import org.springframework.integration.ip.tcp.sockjs.serializer.WebSocketSerializer; | |
+import org.springframework.integration.ip.tcp.sockjs.support.SockJsFrame; | |
+ | |
+/** | |
+ * @author Gary Russell | |
+ * @since 2.2 | |
+ * | |
+ */ | |
+public class SockJsWebSocketClient { | |
+ | |
+ private WebSocketSerializer serializer = new WebSocketSerializer(); | |
+ | |
+ public static void main(String[] args) throws Exception { | |
+ new SockJsWebSocketClient().start(); | |
+ } | |
+ | |
+ public void start() throws Exception { | |
+ String init = | |
+ "GET /echo/517/p8e90wok/websocket HTTP/1.1\r\n" + | |
+ "Upgrade: websocket\r\n" + | |
+ "Connection: Upgrade\r\n" + | |
+ "Host: localhost:9999\r\n" + | |
+ "Origin: http://localhost:9999\r\n" + | |
+ "Sec-WebSocket-Key: nGahJFI1wv1Vn/QW5TdFvg==\r\n" + | |
+ "Sec-WebSocket-Version: 13\r\n\r\n"; | |
+ Socket sock = SocketFactory.getDefault().createSocket("localhost", 8081); | |
+ sock.getOutputStream().write(init.getBytes()); | |
+ handleUpgrade(sock); | |
+ Executors.newSingleThreadExecutor().execute(new SocksJSWebSocketReader(sock)); | |
+ while(true) { | |
+ Thread.sleep(10000); | |
+ send(sock, "Hello, world!"); | |
+ } | |
+ } | |
+ | |
+ private void send(Socket sock, String string) throws IOException { | |
+ String data = "[\"" + string + "\"]"; | |
+ doSend(sock, data); | |
+ } | |
+ | |
+ private void doSend(Socket sock, String data) throws IOException { | |
+ System.out.println("Sending... " + data); | |
+ this.serializer.serialize(data, sock.getOutputStream()); | |
+ } | |
+ | |
+ private void handleUpgrade(Socket sock) throws Exception { | |
+ BufferedReader reader = new BufferedReader(new InputStreamReader(sock.getInputStream())); | |
+ while (true) { | |
+ String s = reader.readLine(); | |
+ System.out.println(s); | |
+ if (s.length() == 0) { | |
+ break; | |
+ } | |
+ } | |
+ } | |
+ | |
+ private class SocksJSWebSocketReader implements Runnable { | |
+ | |
+ private Socket sock; | |
+ | |
+ private SocksJSWebSocketReader(Socket sock) { | |
+ this.sock = sock; | |
+ } | |
+ | |
+ public void run() { | |
+ while (true) { | |
+ try { | |
+ SockJsFrame frame = serializer.deserialize(this.sock.getInputStream()); | |
+ if (frame.getType() == SockJsFrame.TYPE_CLOSE) { | |
+ sock.close(); | |
+ } | |
+ else if (frame.getType() == SockJsFrame.TYPE_PING) { | |
+ this.sendPong(frame.getPayload()); | |
+ } | |
+ } catch (IOException e) { | |
+ e.printStackTrace(); | |
+ try { | |
+ this.sock.close(); | |
+ } catch (IOException e1) { | |
+ e1.printStackTrace(); | |
+ throw new RuntimeException(e1); | |
+ } | |
+ throw new RuntimeException(e); | |
+ } | |
+ } | |
+ } | |
+ | |
+ private void sendPong(String data) throws IOException { | |
+ doSend(sock, "pong:" + data); | |
+ } | |
+ } | |
+ | |
+} | |
diff --git a/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/sockjs/SockJsXHRStreamingClient.java b/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/sockjs/SockJsXHRStreamingClient.java | |
new file mode 100644 | |
index 0000000..11c3828 | |
--- /dev/null | |
+++ b/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/sockjs/SockJsXHRStreamingClient.java | |
@@ -0,0 +1,176 @@ | |
+/* | |
+ * Copyright 2002-2012 the original author or authors. | |
+ * | |
+ * Licensed under the Apache License, Version 2.0 (the "License"); | |
+ * you may not use this file except in compliance with the License. | |
+ * You may obtain a copy of the License at | |
+ * | |
+ * http://www.apache.org/licenses/LICENSE-2.0 | |
+ * | |
+ * Unless required by applicable law or agreed to in writing, software | |
+ * distributed under the License is distributed on an "AS IS" BASIS, | |
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
+ * See the License for the specific language governing permissions and | |
+ * limitations under the License. | |
+ */ | |
+package org.springframework.integration.ip.tcp.sockjs; | |
+ | |
+import java.io.IOException; | |
+import java.io.InputStream; | |
+import java.net.Socket; | |
+import java.util.List; | |
+import java.util.Map; | |
+import java.util.UUID; | |
+import java.util.concurrent.ConcurrentHashMap; | |
+import java.util.concurrent.ExecutorService; | |
+import java.util.concurrent.Executors; | |
+ | |
+import javax.net.SocketFactory; | |
+ | |
+import org.springframework.integration.ip.tcp.serializer.ByteArrayCrLfSerializer; | |
+import org.springframework.integration.ip.tcp.serializer.SoftEndOfStreamException; | |
+import org.springframework.integration.ip.tcp.sockjs.serializer.XHRStreamingChunkDeserializer; | |
+import org.springframework.integration.ip.tcp.sockjs.support.SockJsFrame; | |
+ | |
+/** | |
+ * @author Gary Russell | |
+ * @since 2.2 | |
+ * | |
+ */ | |
+public class SockJsXHRStreamingClient { | |
+ | |
+ private XHRStreamingChunkDeserializer deserializer = new XHRStreamingChunkDeserializer(); | |
+ | |
+ private ByteArrayCrLfSerializer crlfDeserializer = new ByteArrayCrLfSerializer(); | |
+ | |
+ Map<String, String> cookies = new ConcurrentHashMap<String, String>(); // TODO: needs to be nicer than this | |
+ | |
+ public static void main(String[] args) throws Exception { | |
+ new SockJsXHRStreamingClient().start(); | |
+ } | |
+ | |
+ public void start() throws Exception { | |
+ int port = 80; | |
+// int port = 8081; | |
+// String host = "localhost"; | |
+ String host = "echo-test.cloudfoundry.com"; | |
+// String host = "192.168.222.132"; | |
+ String uuid = UUID.randomUUID().toString(); | |
+ String init = | |
+ "POST /echo/000/" + uuid + "/xhr_streaming HTTP/1.1\r\n" + | |
+ "Host: " + host + "\r\n" + | |
+ "Connection: keep-alive\r\n" + | |
+ "Accept-Encoding: identity\r\n" + | |
+ "Content-Length: 0\r\n" + | |
+ "\r\n"; | |
+ Socket sock = SocketFactory.getDefault().createSocket(host, port); | |
+ InputStream inputStream = sock.getInputStream(); | |
+ ExecutorService executor = Executors.newSingleThreadExecutor(); | |
+ executor.execute(new SocksJSXHRStreamingReader(sock, uuid)); | |
+ sock.getOutputStream().write(init.getBytes()); | |
+ String statusLine = "HTTP/1.1 204 No Content"; | |
+ int count = 0; | |
+ do { | |
+ Thread.sleep(1000); | |
+ if (sock.isClosed()) { | |
+ break; | |
+ } | |
+ if (cookies.get(uuid) == null) { | |
+ System.out.println("No cookies yet"); | |
+ continue; | |
+ } | |
+ Socket sender = SocketFactory.getDefault().createSocket(host, port); | |
+ statusLine = send(sender, uuid, host); | |
+ } | |
+ while (statusLine.equals("HTTP/1.1 204 No Content") && count++ < 40); | |
+ this.deserializer.removeState(inputStream); | |
+ sock.close(); | |
+ executor.shutdown(); | |
+ } | |
+ | |
+ private String readHeaders(Socket sock) throws IOException { | |
+ String statusLine = new String(this.crlfDeserializer.deserialize(sock.getInputStream())); | |
+ while (true) { | |
+ String s = new String(this.crlfDeserializer.deserialize(sock.getInputStream())); | |
+// System.out.println(s); | |
+ if (s.length() == 0) { | |
+ break; | |
+ } | |
+ } | |
+// System.out.println("Read headers"); | |
+// System.out.println(statusLine); | |
+ return statusLine; | |
+ } | |
+ | |
+ private String send(Socket sock, String uuid, String host) throws IOException { | |
+ String content = "[\"" + new String(new char[128]).replace('\0', 'x') + "\"]"; | |
+ String sendData = | |
+ "POST /echo/000/" + uuid + "/xhr_send HTTP/1.1\r\n" + | |
+ "Host: " + host + "\r\n" + | |
+ "Accept-Encoding: identity\r\n" + | |
+ this.cookies.get(uuid) + "\r\n" + | |
+ "Content-Length: " + content.length() + "\r\n" + | |
+ "\r\n" + | |
+ content; | |
+ System.out.println("Sending... " + content); | |
+ sock.getOutputStream().write(sendData.getBytes()); | |
+ String statusLine = readHeaders(sock); | |
+ sock.close(); | |
+ return statusLine; | |
+ } | |
+ | |
+ private class SocksJSXHRStreamingReader implements Runnable { | |
+ | |
+ private final Socket sock; | |
+ | |
+ private final String uuid; | |
+ | |
+ private SocksJSXHRStreamingReader(Socket sock, String uuid) { | |
+ this.sock = sock; | |
+ this.uuid = uuid; | |
+ } | |
+ | |
+ public void run() { | |
+ try { | |
+ while (true) { | |
+ try { | |
+ List<SockJsFrame> frames = deserializer.deserialize(this.sock.getInputStream()); | |
+ for (SockJsFrame frame : frames) { | |
+ if (frame.getType() == SockJsFrame.TYPE_CLOSE) { | |
+ sock.close(); | |
+ } | |
+ else if (frame.getType() == SockJsFrame.TYPE_COOKIES) { | |
+ SockJsXHRStreamingClient.this.cookies.put(this.uuid, frame.getPayload()); | |
+ } | |
+ } | |
+ } | |
+ catch (SoftEndOfStreamException seose) { | |
+ System.out.println("Stream closed"); | |
+ throw new RuntimeException(seose); | |
+ } | |
+ catch (IOException e) { | |
+ if(!("Socket closed".equals(e.getMessage()))) { | |
+ e.printStackTrace(); | |
+ throw new RuntimeException(e); | |
+ } | |
+ return; | |
+ } | |
+ } | |
+ } | |
+ catch (RuntimeException re) { | |
+ if (!(re.getCause() instanceof SoftEndOfStreamException)) { | |
+ re.printStackTrace(); | |
+ } | |
+ } | |
+ finally { | |
+ try { | |
+ this.sock.close(); | |
+ } catch (IOException e1) { | |
+ e1.printStackTrace(); | |
+ throw new RuntimeException(e1); | |
+ } | |
+ } | |
+ } | |
+ } | |
+ | |
+} | |
diff --git a/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/sockjs/serializer/AbstractSockJsDeserializer.java b/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/sockjs/serializer/AbstractSockJsDeserializer.java | |
new file mode 100644 | |
index 0000000..192e3af | |
--- /dev/null | |
+++ b/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/sockjs/serializer/AbstractSockJsDeserializer.java | |
@@ -0,0 +1,184 @@ | |
+/* | |
+ * Copyright 2002-2012 the original author or authors. | |
+ * | |
+ * Licensed under the Apache License, Version 2.0 (the "License"); | |
+ * you may not use this file except in compliance with the License. | |
+ * You may obtain a copy of the License at | |
+ * | |
+ * http://www.apache.org/licenses/LICENSE-2.0 | |
+ * | |
+ * Unless required by applicable law or agreed to in writing, software | |
+ * distributed under the License is distributed on an "AS IS" BASIS, | |
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
+ * See the License for the specific language governing permissions and | |
+ * limitations under the License. | |
+ */ | |
+package org.springframework.integration.ip.tcp.sockjs.serializer; | |
+ | |
+import java.io.IOException; | |
+import java.io.InputStream; | |
+import java.util.ArrayList; | |
+import java.util.List; | |
+import java.util.Map; | |
+import java.util.concurrent.ConcurrentHashMap; | |
+import java.util.zip.GZIPInputStream; | |
+ | |
+import org.apache.commons.logging.Log; | |
+import org.apache.commons.logging.LogFactory; | |
+import org.springframework.integration.ip.tcp.serializer.ByteArrayCrLfSerializer; | |
+import org.springframework.integration.ip.tcp.serializer.StatefulDeserializer; | |
+import org.springframework.integration.ip.tcp.sockjs.support.SockJsFrame; | |
+ | |
+/** | |
+ * @author Gary Russell | |
+ * @since 2.1 | |
+ * | |
+ */ | |
+public abstract class AbstractSockJsDeserializer<T> implements StatefulDeserializer<T> { | |
+ | |
+ protected final Log logger = LogFactory.getLog(this.getClass()); | |
+ | |
+ protected final ByteArrayCrLfSerializer crlfDeserializer = new ByteArrayCrLfSerializer(); | |
+ | |
+ protected volatile int maxMessageSize = 2048; | |
+ | |
+ private final Map<InputStream, BasicState> streamState = new ConcurrentHashMap<InputStream, BasicState>(); | |
+ | |
+ void setMaxMessageSize(int maxMessageSize) { | |
+ this.maxMessageSize = maxMessageSize; | |
+ } | |
+ | |
+ protected BasicState getStreamState(InputStream inputStream) { | |
+ return streamState.get(inputStream); | |
+ } | |
+ | |
+ protected List<SockJsFrame> checkStreaming(InputStream inputStream) throws IOException { | |
+ BasicState isStreaming = this.streamState.get(inputStream); | |
+ if (isStreaming == null) { //Consume the headers - TODO - check status | |
+ StringBuilder headersBuilder = new StringBuilder(); | |
+ byte[] headers = new byte[this.maxMessageSize]; | |
+ int headersLength; | |
+ do { | |
+ headersLength = this.crlfDeserializer.fillToCrLf(inputStream, headers); | |
+ String header = new String(headers, 0, headersLength, "UTF-8"); | |
+ headersBuilder.append(header).append("\r\n"); | |
+ } | |
+ while (headersLength > 0); | |
+ BasicState basicState = new BasicState(); | |
+ List<SockJsFrame> decodedHeaders = decodeHeaders(headersBuilder.toString(), basicState); | |
+ this.streamState.put(inputStream, basicState); | |
+ return decodedHeaders; | |
+ } | |
+ return null; | |
+ } | |
+ | |
+ private List<SockJsFrame> decodeHeaders(String frameData, BasicState state) { | |
+ // TODO: Full header separation - mvc utils? | |
+ List<SockJsFrame> dataList = new ArrayList<SockJsFrame>(); | |
+ System.out.println("Received:Headers\r\n" + frameData); | |
+ String[] headers = frameData.split("\\r\\n"); | |
+ String cookies = "Cookie: "; | |
+ for (String header : headers) { | |
+ if (header.startsWith("Set-Cookie")) { | |
+ String[] bits = header.split(": *"); | |
+ cookies += bits[1] + "; "; | |
+ } | |
+ else if (header.startsWith("Content-Encoding:") && header.contains("gzip")) { | |
+ state.setGzipping(true); | |
+ } | |
+ } | |
+ System.out.println(cookies); | |
+ dataList.add(new SockJsFrame(SockJsFrame.TYPE_HEADERS, frameData)); | |
+ if (cookies.length() > 8) { | |
+ dataList.add(new SockJsFrame(SockJsFrame.TYPE_COOKIES, cookies)); | |
+ } | |
+ return dataList; | |
+ } | |
+ | |
+ protected void checkClosure(int bite) throws IOException { | |
+ if (bite < 0) { | |
+ logger.debug("Socket closed during message assembly"); | |
+ throw new IOException("Socket closed during message assembly"); | |
+ } | |
+ } | |
+ | |
+ public void removeState(InputStream inputStream) { | |
+ this.streamState.remove(inputStream); | |
+ } | |
+ | |
+ public abstract T deserialize(InputStream inputStream) throws IOException; | |
+ | |
+ protected SockJsFrame decodeToFrame(String data) { | |
+ if (data.length() == 1 && data.equals("h")) { | |
+ System.out.println("Received:SockJS-Heartbeat"); | |
+ return new SockJsFrame(SockJsFrame.TYPE_HEARTBEAT, data); | |
+ } | |
+ else if (data.length() == 0x800 && data.startsWith("hhhhhhhhhhhhh")) { | |
+ System.out.println("Received:SockJS-XHR-Prelude"); | |
+ return new SockJsFrame(SockJsFrame.TYPE_PRELUDE, data); | |
+ } | |
+ else if (data.length() == 1 && data.equals("o")) { | |
+ System.out.println("Received:SockJS-Open"); | |
+ return new SockJsFrame(SockJsFrame.TYPE_OPEN, data); | |
+ } | |
+ else if (data.length() > 0 && data.startsWith("c")) { | |
+ System.out.println("Received SockJS-Close:" + data.substring(1)); | |
+ return new SockJsFrame(SockJsFrame.TYPE_CLOSE, data.substring(1)); | |
+ } | |
+ else if (data.length() > 0 && data.startsWith("a")) { | |
+ System.out.println("Received data:" + data.substring(1)); | |
+ return new SockJsFrame(SockJsFrame.TYPE_DATA, data.substring(1)); | |
+ } | |
+ else { | |
+ System.out.println("Received unexpected:" + new String(data)); | |
+ return new SockJsFrame(SockJsFrame.TYPE_UNEXPECTED, data); | |
+ } | |
+ } | |
+ | |
+ public static class BasicState { | |
+ | |
+ private volatile boolean gzipping; | |
+ | |
+ private volatile GZIPInputStream gzipInputStream; | |
+ | |
+ private volatile GZIPFeederInputStream gzipFeederInputStream; | |
+ | |
+ boolean isGzipping() { | |
+ return gzipping; | |
+ } | |
+ | |
+ void setGzipping(boolean gzipping) { | |
+ this.gzipping = gzipping; | |
+ } | |
+ | |
+ GZIPInputStream getGzipInputStream() throws IOException { | |
+ if (this.gzipInputStream == null) { | |
+ this.gzipInputStream = new GZIPInputStream(this.gzipFeederInputStream); | |
+ } | |
+ return this.gzipInputStream; | |
+ } | |
+ | |
+ GZIPFeederInputStream getGzipFeederInputStream() { | |
+ if (this.gzipFeederInputStream == null) { | |
+ this.gzipFeederInputStream = new GZIPFeederInputStream(); | |
+ } | |
+ return gzipFeederInputStream; | |
+ } | |
+ } | |
+ | |
+ public static class GZIPFeederInputStream extends InputStream { | |
+ | |
+ private volatile InputStream inputStream; | |
+ | |
+ | |
+ void setInputStream(InputStream inputStream) { | |
+ this.inputStream = inputStream; | |
+ } | |
+ | |
+ @Override | |
+ public int read() throws IOException { | |
+ return this.inputStream.read(); | |
+ } | |
+ | |
+ } | |
+} | |
\ No newline at end of file | |
diff --git a/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/sockjs/serializer/WebSocketSerializer.java b/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/sockjs/serializer/WebSocketSerializer.java | |
new file mode 100644 | |
index 0000000..2efcc48 | |
--- /dev/null | |
+++ b/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/sockjs/serializer/WebSocketSerializer.java | |
@@ -0,0 +1,221 @@ | |
+/* | |
+ * Copyright 2002-2012 the original author or authors. | |
+ * | |
+ * Licensed under the Apache License, Version 2.0 (the "License"); | |
+ * you may not use this file except in compliance with the License. | |
+ * You may obtain a copy of the License at | |
+ * | |
+ * http://www.apache.org/licenses/LICENSE-2.0 | |
+ * | |
+ * Unless required by applicable law or agreed to in writing, software | |
+ * distributed under the License is distributed on an "AS IS" BASIS, | |
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
+ * See the License for the specific language governing permissions and | |
+ * limitations under the License. | |
+ */ | |
+package org.springframework.integration.ip.tcp.sockjs.serializer; | |
+ | |
+import java.io.IOException; | |
+import java.io.InputStream; | |
+import java.io.OutputStream; | |
+import java.nio.ByteBuffer; | |
+import java.util.Map; | |
+import java.util.concurrent.ConcurrentHashMap; | |
+ | |
+import org.apache.commons.logging.Log; | |
+import org.apache.commons.logging.LogFactory; | |
+import org.springframework.core.serializer.Serializer; | |
+import org.springframework.integration.ip.tcp.serializer.SoftEndOfStreamException; | |
+import org.springframework.integration.ip.tcp.sockjs.support.SockJsFrame; | |
+ | |
+/** | |
+ * @author Gary Russell | |
+ * @since 2.2 | |
+ * | |
+ */ | |
+public class WebSocketSerializer extends AbstractSockJsDeserializer<SockJsFrame> implements Serializer<String> { | |
+ | |
+ private final Log logger = LogFactory.getLog(this.getClass()); | |
+ | |
+ private Map<InputStream, StringBuilder> fragments = new ConcurrentHashMap<InputStream, StringBuilder>(); | |
+ | |
+ public void removeFragments(InputStream inputStream) { | |
+ this.fragments.remove(inputStream); | |
+ } | |
+ | |
+ public void serialize(final String data, OutputStream outputStream) | |
+ throws IOException { | |
+ int lenBytes; | |
+ int payloadLen = 0x80; //masked | |
+ boolean pong = data.startsWith("pong:"); | |
+ String theData = pong ? data.substring(5) : data; | |
+ int length = theData.length(); | |
+ if (length >= Math.pow(2, 16)) { | |
+ lenBytes = 8; | |
+ payloadLen |= 127; | |
+ } | |
+ else if (length > 125) { | |
+ lenBytes = 2; | |
+ payloadLen |= 126; | |
+ } | |
+ else { | |
+ lenBytes = 0; | |
+ payloadLen |= length; | |
+ } | |
+ int mask = (int) System.currentTimeMillis(); | |
+ ByteBuffer buffer = ByteBuffer.allocate(length + 6 + lenBytes); | |
+ if (pong) { | |
+ buffer.put((byte) 0x8a); | |
+ } else { | |
+ // Final fragment; text | |
+ buffer.put((byte) 0x81); | |
+ } | |
+ buffer.put((byte) payloadLen); | |
+ if (lenBytes == 2) { | |
+ buffer.putShort((short) length); | |
+ } | |
+ else if (lenBytes == 8) { | |
+ buffer.putLong(length); | |
+ } | |
+ | |
+ buffer.putInt(mask); | |
+ byte[] maskBytes = new byte[4]; | |
+ buffer.position(buffer.position() - 4); | |
+ buffer.get(maskBytes); | |
+ byte[] bytes = theData.getBytes("UTF-8"); | |
+ for (int i = 0; i < bytes.length; i++) { | |
+ buffer.put((byte) (bytes[i] ^ maskBytes[i % 4])); | |
+ } | |
+ outputStream.write(buffer.array()); | |
+ } | |
+ | |
+ public SockJsFrame deserialize(InputStream inputStream) throws IOException { | |
+ int bite; | |
+ if (logger.isDebugEnabled()) { | |
+ logger.debug("Available to read:" + inputStream.available()); | |
+ } | |
+ boolean done = false; | |
+ int len = 0; | |
+ int n = 0; | |
+ int m = 0; | |
+ byte[] buffer = null; | |
+ boolean fin = false; | |
+ boolean ping = false; | |
+ boolean pong = false; | |
+ int lenBytes = 0; | |
+ while (!done ) { | |
+ bite = inputStream.read(); | |
+// logger.debug("Read:" + Integer.toHexString(bite)); | |
+ if (bite < 0 && n == 0) { | |
+ throw new SoftEndOfStreamException("Stream closed between payloads"); | |
+ } | |
+ checkClosure(bite); | |
+ switch (n++) { | |
+ case 0: | |
+ fin = (bite & 0x80) > 0; | |
+ switch (bite) { | |
+ case 0x01: | |
+ case 0x81: | |
+ logger.debug("Text, fin=" + fin); | |
+ break; | |
+ case 0x02: | |
+ case 0x82: | |
+ logger.debug("Binary, fin=" + fin); | |
+ throw new IOException("SockJS doesn't support binary"); | |
+ case 0x89: | |
+ ping = true; | |
+ logger.debug("Ping, fin=" + fin); | |
+ break; | |
+ case 0x8a: | |
+ pong = true; | |
+ logger.debug("Pong, fin=" + fin); | |
+ break; | |
+ case 0x88: | |
+ throw new IOException("Connection closed"); | |
+ default: | |
+ throw new IOException("Unexpected opcode " + Integer.toHexString(bite)); | |
+ } | |
+ break; | |
+ case 1: | |
+ if ((bite & 0x80) > 0) { | |
+ throw new IOException("Illegal: Received masked data from server"); | |
+ } | |
+ if (bite < 126) { | |
+ len = bite; | |
+ buffer = new byte[len]; | |
+ } | |
+ else if (bite == 126) { | |
+ lenBytes = 2; | |
+ } | |
+ else { | |
+ lenBytes = 8; | |
+ } | |
+ break; | |
+ case 2: | |
+ case 3: | |
+ case 4: | |
+ case 5: | |
+ if (lenBytes > 4 && bite != 0) { | |
+ throw new IOException("Max supported length exceeded"); | |
+ } | |
+ case 6: | |
+ if (lenBytes > 3 && (bite & 0x80) > 0) { | |
+ throw new IOException("Max supported length exceeded"); | |
+ } | |
+ case 7: | |
+ case 8: | |
+ case 9: | |
+ if (lenBytes-- > 0) { | |
+ len = len << 8 | bite; | |
+ if (lenBytes == 0) { | |
+ buffer = new byte[len]; | |
+ } | |
+ break; | |
+ } | |
+ default: | |
+ buffer[m++] = (byte) bite; | |
+ done = m >= len; | |
+ } | |
+ }; | |
+ String data = new String(buffer, "UTF-8"); | |
+ if (!fin) { | |
+ StringBuilder builder = this.fragments.get(inputStream); | |
+ if (builder == null) { | |
+ builder = new StringBuilder(); | |
+ this.fragments.put(inputStream, builder); | |
+ } | |
+ builder.append(data); | |
+ return null; | |
+ } | |
+ else if (ping) { | |
+ return new SockJsFrame(SockJsFrame.TYPE_PING, data); | |
+ } | |
+ else if (pong) { | |
+ return new SockJsFrame(SockJsFrame.TYPE_PONG, data); | |
+ } | |
+ else { | |
+ StringBuilder builder = this.fragments.get(inputStream); | |
+ if (builder == null) { | |
+ return decodeToFrame(data); | |
+ } | |
+ else { | |
+ builder.append(data).toString(); | |
+ this.removeFragments(inputStream); | |
+ return this.decodeToFrame(builder.toString()); | |
+ } | |
+ } | |
+ } | |
+ | |
+ protected void checkClosure(int bite) throws IOException { | |
+ if (bite < 0) { | |
+ logger.debug("Socket closed during message assembly"); | |
+ throw new IOException("Socket closed during message assembly"); | |
+ } | |
+ } | |
+ | |
+ public void removeState(InputStream inputStream) { | |
+ super.removeState(inputStream); | |
+ this.removeFragments(inputStream); | |
+ } | |
+ | |
+} | |
diff --git a/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/sockjs/serializer/XHRStreamingChunkDeserializer.java b/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/sockjs/serializer/XHRStreamingChunkDeserializer.java | |
new file mode 100644 | |
index 0000000..d8b5783 | |
--- /dev/null | |
+++ b/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/sockjs/serializer/XHRStreamingChunkDeserializer.java | |
@@ -0,0 +1,116 @@ | |
+/* | |
+ * Copyright 2002-2012 the original author or authors. | |
+ * | |
+ * Licensed under the Apache License, Version 2.0 (the "License"); | |
+ * you may not use this file except in compliance with the License. | |
+ * You may obtain a copy of the License at | |
+ * | |
+ * http://www.apache.org/licenses/LICENSE-2.0 | |
+ * | |
+ * Unless required by applicable law or agreed to in writing, software | |
+ * distributed under the License is distributed on an "AS IS" BASIS, | |
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
+ * See the License for the specific language governing permissions and | |
+ * limitations under the License. | |
+ */ | |
+package org.springframework.integration.ip.tcp.sockjs.serializer; | |
+ | |
+import java.io.ByteArrayInputStream; | |
+import java.io.IOException; | |
+import java.io.InputStream; | |
+import java.util.ArrayList; | |
+import java.util.List; | |
+import java.util.zip.GZIPInputStream; | |
+ | |
+import org.springframework.integration.ip.tcp.serializer.SoftEndOfStreamException; | |
+import org.springframework.integration.ip.tcp.sockjs.support.SockJsFrame; | |
+import org.springframework.util.Assert; | |
+ | |
+/** | |
+ * @author Gary Russell | |
+ * @since 2.2 | |
+ * | |
+ */ | |
+public class XHRStreamingChunkDeserializer extends AbstractSockJsDeserializer<List<SockJsFrame>> { | |
+ | |
+ public List<SockJsFrame> deserialize(InputStream inputStream) throws IOException { | |
+ List<SockJsFrame> headers = checkStreaming(inputStream); | |
+ if (headers != null) { | |
+ return headers; | |
+ } | |
+ BasicState state = this.getStreamState(inputStream); | |
+ boolean gzipping = state == null ? false : state.isGzipping(); | |
+ boolean complete = false; | |
+ StringBuilder builder = new StringBuilder(); | |
+ while (!complete) { | |
+ byte[] chunkLengthInHex = this.crlfDeserializer.deserialize(inputStream); | |
+ if (chunkLengthInHex.length == 0) { | |
+ break; | |
+ } | |
+ int chunkLength = 0; | |
+ try { | |
+ chunkLength = Integer.parseInt(new String(chunkLengthInHex, "UTF-8"), 16); | |
+ } | |
+ catch (Exception e) { | |
+ e.printStackTrace(); | |
+ } | |
+ if (logger.isDebugEnabled()) { | |
+ logger.debug("Chunk size = " + chunkLength); | |
+ } | |
+ if (chunkLength <= 0) { | |
+ throw new SoftEndOfStreamException("0 length chunk received"); | |
+ } | |
+ byte[] chunk = new byte[chunkLength]; | |
+ for (int i = 0; i < chunkLength; i++) { | |
+ int c = inputStream.read(); | |
+ checkClosure(c); | |
+ chunk[i] = (byte) c; | |
+ } | |
+ int eom = chunk[chunkLength-1]; | |
+ Assert.isTrue(inputStream.read() == '\r', "Expected \\r"); | |
+ Assert.isTrue(inputStream.read() == '\n', "Expected \\n"); | |
+ int adjust = complete ? 1 : 0; | |
+ String data; | |
+ if (gzipping) { | |
+ byte[] unzipped = new byte[this.maxMessageSize]; | |
+ GZIPFeederInputStream feeder = state.getGzipFeederInputStream(); | |
+ ByteArrayInputStream byteStream = new ByteArrayInputStream(chunk, 0, chunkLength - adjust); | |
+ feeder.setInputStream(byteStream); | |
+ GZIPInputStream gzipInputStream = state.getGzipInputStream(); | |
+ while (byteStream.available() > 0) { | |
+ int decodedLength = gzipInputStream.read(unzipped); | |
+ if (logger.isDebugEnabled()) { | |
+ logger.debug("Inflated to " + decodedLength); | |
+ } | |
+ data = new String(unzipped, 0, decodedLength, "UTF-8"); | |
+ builder.append(data); | |
+ } | |
+ eom = builder.charAt(builder.length()-1); | |
+ } | |
+ else { | |
+ data = new String(chunk, 0, chunkLength - adjust, "UTF-8"); | |
+ builder.append(data); | |
+ } | |
+ complete = eom == '\n'; | |
+// System.out.println(data.length() + ":" + data); | |
+ } | |
+ return this.decodeFrameData(builder.toString()); | |
+ } | |
+ | |
+ List<SockJsFrame> decodeFrameData(String frameData) throws IOException { | |
+ List<SockJsFrame> dataList = new ArrayList<SockJsFrame>(); | |
+ // some servers put multiple frames in the same chunk | |
+ String[] frames; | |
+ if (frameData.contains("\n")) { | |
+ frames = frameData.split("\n"); | |
+ } | |
+ else { | |
+ frames = new String[] {frameData}; | |
+ } | |
+ for (String data : frames) { | |
+ dataList.add(this.decodeToFrame(data)); | |
+ } | |
+ return dataList; | |
+ } | |
+ | |
+} | |
diff --git a/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/sockjs/support/SockJsFrame.java b/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/sockjs/support/SockJsFrame.java | |
new file mode 100644 | |
index 0000000..6ba9aa8 | |
--- /dev/null | |
+++ b/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/sockjs/support/SockJsFrame.java | |
@@ -0,0 +1,71 @@ | |
+/* | |
+ * Copyright 2002-2012 the original author or authors. | |
+ * | |
+ * Licensed under the Apache License, Version 2.0 (the "License"); | |
+ * you may not use this file except in compliance with the License. | |
+ * You may obtain a copy of the License at | |
+ * | |
+ * http://www.apache.org/licenses/LICENSE-2.0 | |
+ * | |
+ * Unless required by applicable law or agreed to in writing, software | |
+ * distributed under the License is distributed on an "AS IS" BASIS, | |
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
+ * See the License for the specific language governing permissions and | |
+ * limitations under the License. | |
+ */ | |
+package org.springframework.integration.ip.tcp.sockjs.support; | |
+ | |
+/** | |
+ * @author Gary Russell | |
+ * @since 2.2 | |
+ * | |
+ */ | |
+public class SockJsFrame { | |
+ | |
+ public static final int TYPE_HEADERS = 1; | |
+ | |
+ public static final int TYPE_HEARTBEAT = 2; | |
+ | |
+ public static final int TYPE_PRELUDE = 3; | |
+ | |
+ public static final int TYPE_DATA = 4; | |
+ | |
+ public static final int TYPE_PING = 5; | |
+ | |
+ public static final int TYPE_PONG = 6; | |
+ | |
+ public static final int TYPE_OPEN = 7; | |
+ | |
+ public static final int TYPE_CLOSE = 8; | |
+ | |
+ public static final int TYPE_UNEXPECTED = 9; | |
+ | |
+ public static final int TYPE_COOKIES = 10; | |
+ | |
+ private static final String[] typeToString = new String[] { | |
+ "Invalid", "Headers", "HeartBeat", "XHR Prelude", "Data", "Ping", "Pong", "Open", "Close", "Unexpected", "Cookies" | |
+ }; | |
+ | |
+ private final int type; | |
+ | |
+ private final String payload; | |
+ | |
+ public SockJsFrame(int type, String payload) { | |
+ this.type = type; | |
+ this.payload = payload; | |
+ } | |
+ | |
+ public int getType() { | |
+ return this.type; | |
+ } | |
+ | |
+ public String getPayload() { | |
+ return this.payload; | |
+ } | |
+ | |
+ @Override | |
+ public String toString() { | |
+ return "SockJsFrame [type=" + typeToString[type] + ", payload=" + payload + "]"; | |
+ } | |
+ | |
+} | |
diff --git a/spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/connection/TcpMessageMapperTests.java b/spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/connection/TcpMessageMapperTests.java | |
index 7cc0f98..fca1f97 100644 | |
--- a/spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/connection/TcpMessageMapperTests.java | |
+++ b/spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/connection/TcpMessageMapperTests.java | |
@@ -20,6 +20,8 @@ import static org.junit.Assert.assertNull; | |
import static org.mockito.Mockito.mock; | |
import static org.mockito.Mockito.when; | |
+import java.io.IOException; | |
+import java.io.InputStream; | |
import java.net.Socket; | |
import javax.net.SocketFactory; | |
@@ -88,6 +90,9 @@ public class TcpMessageMapperTests { | |
public String getConnectionId() { | |
return "anId"; | |
} | |
+ public InputStream getInputStream() throws IOException { | |
+ return null; | |
+ } | |
}; | |
Message<Object> message = mapper.toMessage(connection); | |
assertEquals(TEST_PAYLOAD, new String((byte[]) message.getPayload())); | |
@@ -140,6 +145,9 @@ public class TcpMessageMapperTests { | |
public String getConnectionId() { | |
return "anId"; | |
} | |
+ public InputStream getInputStream() throws IOException { | |
+ return null; | |
+ } | |
}; | |
Message<Object> message = mapper.toMessage(connection); | |
assertEquals(TEST_PAYLOAD, new String((byte[]) message.getPayload())); | |
diff --git a/spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/serializer/WebSocketDeserializerTests.java b/spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/serializer/WebSocketDeserializerTests.java | |
new file mode 100644 | |
index 0000000..31425e6 | |
--- /dev/null | |
+++ b/spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/serializer/WebSocketDeserializerTests.java | |
@@ -0,0 +1,131 @@ | |
+/* | |
+ * Copyright 2002-2012 the original author or authors. | |
+ * | |
+ * Licensed under the Apache License, Version 2.0 (the "License"); | |
+ * you may not use this file except in compliance with the License. | |
+ * You may obtain a copy of the License at | |
+ * | |
+ * http://www.apache.org/licenses/LICENSE-2.0 | |
+ * | |
+ * Unless required by applicable law or agreed to in writing, software | |
+ * distributed under the License is distributed on an "AS IS" BASIS, | |
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
+ * See the License for the specific language governing permissions and | |
+ * limitations under the License. | |
+ */ | |
+package org.springframework.integration.ip.tcp.serializer; | |
+ | |
+import static org.junit.Assert.assertEquals; | |
+import static org.junit.Assert.assertNull; | |
+import static org.junit.Assert.assertTrue; | |
+ | |
+import java.io.ByteArrayInputStream; | |
+import java.io.IOException; | |
+import java.io.InputStream; | |
+ | |
+import org.junit.Test; | |
+import org.springframework.integration.ip.tcp.sockjs.serializer.WebSocketSerializer; | |
+import org.springframework.integration.ip.tcp.sockjs.support.SockJsFrame; | |
+ | |
+/** | |
+ * @author Gary Russell | |
+ * @since 2.2 | |
+ * | |
+ */ | |
+public class WebSocketDeserializerTests { | |
+ | |
+ WebSocketSerializer deserializer = new WebSocketSerializer(); | |
+ | |
+ @Test | |
+ public void testShortLength() throws Exception { | |
+ byte[] msg = new byte[] {(byte) 0x81, 0x01, 'A'}; | |
+ InputStream is = new ByteArrayInputStream(msg); | |
+ SockJsFrame result = deserializer.deserialize(is); | |
+ assertEquals("A", result.getPayload()); | |
+ } | |
+ | |
+ @Test | |
+ public void testMediumLength() throws Exception { | |
+ byte[] msg = new byte[132]; | |
+ msg[0] = (byte) 0x81; | |
+ msg[1] = 0x7e; | |
+ msg[2] = 0; | |
+ msg[3] = (byte) 0x80; | |
+ msg[4] = 'A'; | |
+ msg[131] = 'Z'; | |
+ InputStream is = new ByteArrayInputStream(msg); | |
+ String result = deserializer.deserialize(is).getPayload(); | |
+ assertEquals(128, result.length()); | |
+ assertTrue(result.startsWith("A")); | |
+ assertTrue(result.endsWith("Z")); | |
+ } | |
+ | |
+ @Test | |
+ public void testMediumLengthFull() throws Exception { | |
+ byte[] msg = new byte[(int) (Math.pow(2, 16) + 3)]; | |
+ msg[0] = (byte) 0x81; | |
+ msg[1] = 0x7e; | |
+ msg[2] = (byte) 0xff; | |
+ msg[3] = (byte) 0xff; | |
+ msg[4] = 'A'; | |
+ msg[(int) (Math.pow(2, 16) + 2)] = 'Z'; | |
+ InputStream is = new ByteArrayInputStream(msg); | |
+ String result = deserializer.deserialize(is).getPayload(); | |
+ assertEquals((int) Math.pow(2, 16) - 1, result.length()); | |
+ assertTrue(result.startsWith("A")); | |
+ assertTrue(result.endsWith("Z")); | |
+ } | |
+ | |
+ @Test | |
+ public void testLongLength() throws Exception { | |
+ byte[] msg = new byte[(int) (Math.pow(2, 16) + 10)]; | |
+ msg[0] = (byte) 0x81; | |
+ msg[1] = 0x7f; | |
+ msg[2] = 0; | |
+ msg[3] = 0; | |
+ msg[4] = 0; | |
+ msg[5] = 0; | |
+ msg[6] = 0; | |
+ msg[7] = 1; | |
+ msg[8] = 0; | |
+ msg[9] = 0; | |
+ msg[10] = 'A'; | |
+ msg[(int) (Math.pow(2, 16) + 9)] = 'Z'; | |
+ InputStream is = new ByteArrayInputStream(msg); | |
+ String result = deserializer.deserialize(is).getPayload(); | |
+ assertEquals((int) Math.pow(2, 16), result.length()); | |
+ assertTrue(result.startsWith("A")); | |
+ assertTrue(result.endsWith("Z")); | |
+ } | |
+ | |
+ @Test(expected=IOException.class) | |
+ public void testTooLong() throws Exception { | |
+ byte[] msg = new byte[10]; | |
+ msg[0] = (byte) 0x81; | |
+ msg[1] = 0x7f; | |
+ msg[2] = 0; | |
+ msg[3] = 0; | |
+ msg[4] = 1; | |
+ msg[5] = 0; | |
+ msg[6] = 0; | |
+ msg[7] = 1; | |
+ msg[8] = 0; | |
+ msg[9] = 0; | |
+ InputStream is = new ByteArrayInputStream(msg); | |
+ deserializer.deserialize(is); | |
+ } | |
+ | |
+ @Test | |
+ public void testFragmented() throws Exception { | |
+ byte[] msg = new byte[] {(byte) 0x01, 0x01, 'A'}; | |
+ InputStream is = new ByteArrayInputStream(msg); | |
+ SockJsFrame frame = deserializer.deserialize(is); | |
+ assertNull(frame); | |
+ msg[0] = (byte) 0x81; | |
+ msg[1] = 0x01; | |
+ msg[2] = 'B'; | |
+ is.reset(); | |
+ frame = deserializer.deserialize(is); | |
+ assertEquals("AB", frame.getPayload()); | |
+ } | |
+} | |
diff --git a/spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/serializer/WebSocketSserializerTests.java b/spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/serializer/WebSocketSserializerTests.java | |
new file mode 100644 | |
index 0000000..8a8a156 | |
--- /dev/null | |
+++ b/spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/serializer/WebSocketSserializerTests.java | |
@@ -0,0 +1,119 @@ | |
+/* | |
+ * Copyright 2002-2012 the original author or authors. | |
+ * | |
+ * Licensed under the Apache License, Version 2.0 (the "License"); | |
+ * you may not use this file except in compliance with the License. | |
+ * You may obtain a copy of the License at | |
+ * | |
+ * http://www.apache.org/licenses/LICENSE-2.0 | |
+ * | |
+ * Unless required by applicable law or agreed to in writing, software | |
+ * distributed under the License is distributed on an "AS IS" BASIS, | |
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
+ * See the License for the specific language governing permissions and | |
+ * limitations under the License. | |
+ */ | |
+package org.springframework.integration.ip.tcp.serializer; | |
+ | |
+import static org.junit.Assert.assertEquals; | |
+ | |
+import java.io.ByteArrayOutputStream; | |
+ | |
+import org.junit.Test; | |
+import org.springframework.integration.ip.tcp.sockjs.serializer.WebSocketSerializer; | |
+ | |
+/** | |
+ * @author Gary Russell | |
+ * @since 2.2 | |
+ * | |
+ */ | |
+public class WebSocketSserializerTests { | |
+ | |
+ WebSocketSerializer serializer = new WebSocketSerializer(); | |
+ | |
+ @Test | |
+ public void testShortLength() throws Exception { | |
+ ByteArrayOutputStream baos = new ByteArrayOutputStream(); | |
+ String data = "Hello, world!"; | |
+ serializer.serialize(data, baos); | |
+ byte[] bytes = baos.toByteArray(); | |
+ assertEquals(0x81, (int) bytes[0] & 0xff); | |
+ assertEquals(0x8d, (int) bytes[1] & 0xff); | |
+ int mask = 0; | |
+ for (int i = 6; i < bytes.length; i++) { | |
+ bytes[i] ^= bytes[mask++ % 4 + 2]; | |
+ } | |
+ String result = new String(bytes, 6, bytes.length - 6); | |
+ assertEquals(data, result); | |
+ } | |
+ | |
+ @Test | |
+ public void testMediumLength() throws Exception { | |
+ ByteArrayOutputStream baos = new ByteArrayOutputStream(); | |
+ byte[] bytes = new byte[150]; | |
+ bytes[0] = 'A'; | |
+ bytes[149] = 'Z'; | |
+ String data = new String(bytes); | |
+ serializer.serialize(data, baos); | |
+ bytes = baos.toByteArray(); | |
+ assertEquals(0x81, (int) bytes[0] & 0xff); | |
+ assertEquals(0xFe, (int) bytes[1] & 0xff); | |
+ assertEquals(0, bytes[2]); | |
+ assertEquals(150, (int) bytes[3] & 0xff); | |
+ int mask = 0; | |
+ for (int i = 8; i < bytes.length; i++) { | |
+ bytes[i] ^= bytes[mask++ % 4 + 4]; | |
+ } | |
+ String result = new String(bytes, 8, bytes.length - 8); | |
+ assertEquals(data, result); | |
+ } | |
+ | |
+ @Test | |
+ public void testMediumLengthFull() throws Exception { | |
+ ByteArrayOutputStream baos = new ByteArrayOutputStream(); | |
+ byte[] bytes = new byte[(int) Math.pow(2, 16) - 1]; | |
+ bytes[0] = 'A'; | |
+ bytes[(int) Math.pow(2, 16) - 2] = 'Z'; | |
+ String data = new String(bytes); | |
+ serializer.serialize(data, baos); | |
+ bytes = baos.toByteArray(); | |
+ assertEquals(0x81, (int) bytes[0] & 0xff); | |
+ assertEquals(0xFe, (int) bytes[1] & 0xff); | |
+ assertEquals(0xff, (int) bytes[2] & 0xff); | |
+ assertEquals(0xff, (int) bytes[3] & 0xff); | |
+ int mask = 0; | |
+ for (int i = 8; i < bytes.length; i++) { | |
+ bytes[i] ^= bytes[mask++ % 4 + 4]; | |
+ } | |
+ String result = new String(bytes, 8, bytes.length - 8); | |
+ assertEquals(data, result); | |
+ } | |
+ | |
+ @Test | |
+ public void testLongLength() throws Exception { | |
+ ByteArrayOutputStream baos = new ByteArrayOutputStream(); | |
+ byte[] bytes = new byte[(int) Math.pow(2, 16)]; | |
+ bytes[0] = 'A'; | |
+ bytes[(int) Math.pow(2, 16) - 1] = 'Z'; | |
+ String data = new String(bytes); | |
+ serializer.serialize(data, baos); | |
+ bytes = baos.toByteArray(); | |
+ assertEquals(0x81, (int) bytes[0] & 0xff); | |
+ assertEquals(0xff, (int) bytes[1] & 0xff); | |
+ assertEquals(0, (int) bytes[2] & 0xff); | |
+ assertEquals(0, (int) bytes[3] & 0xff); | |
+ assertEquals(0, (int) bytes[4] & 0xff); | |
+ assertEquals(0, (int) bytes[5] & 0xff); | |
+ assertEquals(0, (int) bytes[6] & 0xff); | |
+ assertEquals(1, (int) bytes[7] & 0xff); | |
+ assertEquals(0, (int) bytes[8] & 0xff); | |
+ assertEquals(0, (int) bytes[9] & 0xff); | |
+ int mask = 0; | |
+ for (int i = 14; i < bytes.length; i++) { | |
+ bytes[i] ^= bytes[mask++ % 4 + 10]; | |
+ } | |
+ String result = new String(bytes, 14, bytes.length - 14); | |
+ assertEquals(data, result); | |
+ } | |
+ | |
+} | |
diff --git a/spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/sockjs/SockJsTemplateTests.java b/spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/sockjs/SockJsTemplateTests.java | |
new file mode 100644 | |
index 0000000..5568fb7 | |
--- /dev/null | |
+++ b/spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/sockjs/SockJsTemplateTests.java | |
@@ -0,0 +1,119 @@ | |
+/* | |
+ * Copyright 2002-2012 the original author or authors. | |
+ * | |
+ * Licensed under the Apache License, Version 2.0 (the "License"); | |
+ * you may not use this file except in compliance with the License. | |
+ * You may obtain a copy of the License at | |
+ * | |
+ * http://www.apache.org/licenses/LICENSE-2.0 | |
+ * | |
+ * Unless required by applicable law or agreed to in writing, software | |
+ * distributed under the License is distributed on an "AS IS" BASIS, | |
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
+ * See the License for the specific language governing permissions and | |
+ * limitations under the License. | |
+ */ | |
+package org.springframework.integration.ip.tcp.sockjs; | |
+ | |
+import static org.junit.Assert.assertEquals; | |
+import static org.junit.Assert.fail; | |
+ | |
+import java.util.ArrayList; | |
+import java.util.List; | |
+ | |
+import org.junit.Test; | |
+import org.springframework.integration.ip.tcp.connection.AbstractClientConnectionFactory; | |
+import org.springframework.integration.ip.tcp.connection.TcpNetClientConnectionFactory; | |
+import org.springframework.integration.ip.tcp.connection.TcpNioClientConnectionFactory; | |
+import org.springframework.integration.ip.tcp.serializer.ByteArrayRawSerializer; | |
+import org.springframework.integration.ip.tcp.sockjs.serializer.XHRStreamingChunkDeserializer; | |
+import org.springframework.integration.ip.tcp.sockjs.support.SockJsFrame; | |
+ | |
+/** | |
+ * @author Gary Russell | |
+ * @since 2.2 | |
+ * | |
+ */ | |
+public class SockJsTemplateTests { | |
+ | |
+ @Test | |
+ public void testXHRStream() throws Exception { | |
+ AbstractClientConnectionFactory ccf = new TcpNetClientConnectionFactory("localhost", 8081); | |
+// AbstractClientConnectionFactory ccf = new TcpNetClientConnectionFactory("echo-test.cloudfoundry.com", 80); | |
+ testXHSStreamGuts(ccf); | |
+ } | |
+ | |
+ @Test | |
+ public void testXHRStreamNIO() throws Exception { | |
+ AbstractClientConnectionFactory ccf = new TcpNioClientConnectionFactory("localhost", 8081); | |
+ testXHSStreamGuts(ccf); | |
+ } | |
+ | |
+ @Test | |
+ public void testXHRStreamGzip() throws Exception { | |
+// AbstractClientConnectionFactory ccf = new TcpNetClientConnectionFactory("localhost", 8081); | |
+ AbstractClientConnectionFactory ccf = new TcpNetClientConnectionFactory("echo-test.cloudfoundry.com", 80); | |
+ testXHSStreamGuts(ccf, true); | |
+ } | |
+ | |
+ private void testXHSStreamGuts(AbstractClientConnectionFactory ccf) throws Exception { | |
+ testXHSStreamGuts(ccf, false); | |
+ } | |
+ | |
+ private void testXHSStreamGuts(AbstractClientConnectionFactory ccf, boolean gzipping) throws Exception { | |
+ ccf.setDeserializer(new XHRStreamingChunkDeserializer()); | |
+ ccf.setSerializer(new ByteArrayRawSerializer()); | |
+ ccf.setSoTimeout(60000); | |
+ SockJsTemplate template = new SockJsTemplate(ccf); | |
+ template.setGzipping(gzipping); | |
+ ccf.start(); | |
+ final List<String> uuids = new ArrayList<String>(); | |
+ final List<String> results = new ArrayList<String>(); | |
+ | |
+ SockJsContext context = template.startStream("/echo/000", new SockJsCallback() { | |
+ | |
+ public void data(SockJsFrame frame, String uuid) { | |
+ System.out.println("Received data: " + frame); | |
+ results.add(frame.getPayload()); | |
+ } | |
+ | |
+ public void control(SockJsFrame frame, String uuid) { | |
+ System.out.println("Received control: " + frame); | |
+ if (frame.getType() == SockJsFrame.TYPE_OPEN) { | |
+ uuids.add(uuid); | |
+ } | |
+ } | |
+ | |
+ public void closed(String uuid) { | |
+ System.out.println("Closed"); | |
+ } | |
+ }); | |
+ | |
+ int n = 0; | |
+ while (context.getCookies() == null) { | |
+ Thread.sleep(100); | |
+ if (n++ > 100) { | |
+ fail("Failed to receive cookies"); | |
+ } | |
+ } | |
+ n = 0; | |
+ while (uuids.size() < 1) { | |
+ Thread.sleep(100); | |
+ if (n++ > 100) { | |
+ fail("Open was not received"); | |
+ } | |
+ } | |
+ for (int i = 0; i < 4; i++) { | |
+ Thread.sleep(1000); | |
+ SockJsFrame frame = template.sendXhr("/echo/000", uuids.get(0), | |
+ "[\"" + new String(new char[128]).replace('\0', 'x') + | |
+ "\"]", context); | |
+ System.out.println(frame.getPayload()); | |
+ } | |
+ assertEquals(4, results.size()); | |
+ while (results.size() > 0) { | |
+ assertEquals("[\"" + new String(new char[128]).replace('\0', 'x') + "\"]", results.remove(0)); | |
+ } | |
+ } | |
+ | |
+} | |
diff --git a/spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/sockjs/seralizer/XHRStreamingChunkDeserializerTests.java b/spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/sockjs/seralizer/XHRStreamingChunkDeserializerTests.java | |
new file mode 100644 | |
index 0000000..888fe52 | |
--- /dev/null | |
+++ b/spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/sockjs/seralizer/XHRStreamingChunkDeserializerTests.java | |
@@ -0,0 +1,45 @@ | |
+/* | |
+ * Copyright 2002-2012 the original author or authors. | |
+ * | |
+ * Licensed under the Apache License, Version 2.0 (the "License"); | |
+ * you may not use this file except in compliance with the License. | |
+ * You may obtain a copy of the License at | |
+ * | |
+ * http://www.apache.org/licenses/LICENSE-2.0 | |
+ * | |
+ * Unless required by applicable law or agreed to in writing, software | |
+ * distributed under the License is distributed on an "AS IS" BASIS, | |
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
+ * See the License for the specific language governing permissions and | |
+ * limitations under the License. | |
+ */ | |
+package org.springframework.integration.ip.tcp.sockjs.seralizer; | |
+ | |
+import static org.junit.Assert.assertEquals; | |
+ | |
+import java.io.ByteArrayInputStream; | |
+import java.util.List; | |
+ | |
+import org.junit.Test; | |
+import org.springframework.integration.ip.tcp.sockjs.serializer.XHRStreamingChunkDeserializer; | |
+import org.springframework.integration.ip.tcp.sockjs.support.SockJsFrame; | |
+ | |
+/** | |
+ * @author Gary Russell | |
+ * @since 2.1 | |
+ * | |
+ */ | |
+public class XHRStreamingChunkDeserializerTests { | |
+ | |
+ @Test | |
+ public void test() throws Exception { | |
+ String test = "HTTP\r\nHeaders\r\n\r\nb\r\naabcdefghi\n\r\n"; | |
+ ByteArrayInputStream bais = new ByteArrayInputStream(test.getBytes()); | |
+ XHRStreamingChunkDeserializer xhrStreamingChunkDeserializer = new XHRStreamingChunkDeserializer(); | |
+ List<SockJsFrame> deserialize = xhrStreamingChunkDeserializer.deserialize(bais); | |
+ assertEquals("HTTP\r\nHeaders\r\n\r\n", deserialize.get(0).getPayload()); | |
+ deserialize = xhrStreamingChunkDeserializer.deserialize(bais); | |
+ assertEquals("abcdefghi", deserialize.get(0).getPayload()); | |
+ } | |
+ | |
+} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment