Skip to content

Instantly share code, notes, and snippets.

@eungju
Last active May 11, 2018 00:58
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save eungju/9a859df374a4ef091fffcbc9340bfb38 to your computer and use it in GitHub Desktop.
Save eungju/9a859df374a4ef091fffcbc9340bfb38 to your computer and use it in GitHub Desktop.
RawHTTP libthrift TTransport
import com.athaydes.rawhttp.core.EagerBodyReader;
import com.athaydes.rawhttp.core.EagerHttpResponse;
import com.athaydes.rawhttp.core.MethodLine;
import com.athaydes.rawhttp.core.RawHttp;
import com.athaydes.rawhttp.core.RawHttpHeaders;
import com.athaydes.rawhttp.core.RawHttpRequest;
import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TTransportException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketException;
import java.net.URI;
/**
* HTTP implementation of the TTransport interface. Used for working with a
* Thrift web services implementation (using for example TServlet).
* <p>
* This class offers an implementations of the HTTP transport.
*/
public class TRawHttpSocket extends TTransport {
private static final Logger LOGGER = LoggerFactory.getLogger(TRawHttpSocket.class.getName());
private URI uri;
private int connectTimeout;
private int socketTimeout;
private RawHttp rawHttp = new RawHttp();
private MethodLine requestLine;
private Socket socket;
private InputStream socketIn;
private OutputStream socketOut;
private ByteArrayOutputStream requestBuffer;
private ByteArrayInputStream responseBuffer;
public TRawHttpSocket(URI uri, int connectTimeout, int socketTimeout) {
this.uri = uri;
this.connectTimeout = connectTimeout;
this.socketTimeout = socketTimeout;
requestLine = new MethodLine("POST", uri, "HTTP/1.1");
}
@Override
public boolean isOpen() {
if (socket == null) {
return false;
}
return socket.isConnected();
}
@Override
public void open() throws TTransportException {
if (isOpen()) {
throw new TTransportException(TTransportException.ALREADY_OPEN, "Socket already connected.");
}
if (uri.getScheme() == null || !"http".equals(uri.getScheme())) {
throw new TTransportException(TTransportException.NOT_OPEN, "Cannot open non-http scheme.");
}
if (uri.getHost() == null || uri.getHost().length() == 0) {
throw new TTransportException(TTransportException.NOT_OPEN, "Cannot open null host.");
}
if (uri.getPort() <= 0 || uri.getPort() > 65535) {
throw new TTransportException(TTransportException.NOT_OPEN, "Invalid port " + uri.getPort());
}
if (uri.getRawPath() == null || uri.getRawPath().length() == 0) {
throw new TTransportException(TTransportException.NOT_OPEN, "Cannot open null path.");
}
socket = new Socket();
try {
socket.setSoLinger(false, 0);
socket.setTcpNoDelay(true);
socket.setKeepAlive(true);
socket.setSoTimeout(socketTimeout);
} catch (SocketException sx) {
LOGGER.error("Could not configure socket.", sx);
}
try {
socket.connect(new InetSocketAddress(uri.getHost(), uri.getPort()), connectTimeout);
socketIn = socket.getInputStream();
socketOut = socket.getOutputStream();
requestBuffer = new ByteArrayOutputStream();
responseBuffer = new ByteArrayInputStream(new byte[0]);
} catch (IOException iox) {
close();
throw new TTransportException(TTransportException.NOT_OPEN, iox);
}
}
@Override
public void close() {
closeQuietly(requestBuffer);
requestBuffer = null;
closeQuietly(socket);
socket = null;
}
@Override
public int read(byte[] buf, int off, int len) throws TTransportException {
if (responseBuffer == null) {
throw new TTransportException(TTransportException.NOT_OPEN, "Cannot read from null inputStream");
}
int bytesRead;
try {
bytesRead = responseBuffer.read(buf, off, len);
} catch (/*IO*/Exception iox) {
throw new TTransportException(TTransportException.UNKNOWN, iox);
}
if (bytesRead < 0) {
throw new TTransportException(TTransportException.END_OF_FILE);
}
return bytesRead;
}
@Override
public void write(byte[] buf, int off, int len) throws TTransportException {
if (requestBuffer == null) {
throw new TTransportException(TTransportException.NOT_OPEN, "Cannot write to null outputStream");
}
try {
requestBuffer.write(buf, off, len);
} catch (/*IO*/Exception iox) {
throw new TTransportException(TTransportException.UNKNOWN, iox);
}
}
@Override
public void flush() throws TTransportException {
if (requestBuffer == null) {
throw new TTransportException(TTransportException.NOT_OPEN, "Cannot flush null outputStream");
}
byte[] requestBody = requestBuffer.toByteArray();
requestBuffer.reset();
RawHttpRequest request = new RawHttpRequest(requestLine,
RawHttpHeaders.Builder.newBuilder()
.with("Host", uri.getHost() + ":" + uri.getPort())
.with("User-Agent", "Java/TRawHttpSocket")
.with("Accept", "application/x-thrift")
.with("Content-Type", "application/x-thrift")
.with("Content-Length", String.valueOf(requestBody.length))
.build(),
new EagerBodyReader(requestBody));
EagerHttpResponse<?> response;
try {
request.writeTo(socketOut);
response = rawHttp.parseResponse(socketIn).eagerly(true);
} catch (IOException iox) {
throw new TTransportException(iox);
}
int responseCode = response.getStatusCode();
byte[] responseBody = response.getBody().get().asBytes();
if (responseCode != 200) {
throw new TTransportException("HTTP Response code: " + responseCode);
}
responseBuffer = new ByteArrayInputStream(responseBody);
for (String each : response.getHeaders().get("Connection")) {
if ("close".equals(each)) {
close();
break;
}
}
}
private static void closeQuietly(Closeable resource) {
if (resource != null) {
try {
resource.close();
} catch (IOException iox) {
LOGGER.warn("Could not close resource.", iox);
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment