Skip to content

Instantly share code, notes, and snippets.

@anidotnet
Created May 24, 2014 04:02
Show Gist options
  • Save anidotnet/04b61af241490c4b2e26 to your computer and use it in GitHub Desktop.
Save anidotnet/04b61af241490c4b2e26 to your computer and use it in GitHub Desktop.
Possible memory leak of mod-mysql-postgresql
package com.dizitart;
import org.vertx.java.core.json.JsonObject;
import org.vertx.java.platform.Container;
/**
* Created by Anindya Chatterjee on 4/6/14.
*/
public class ApplicationConfig {
public static JsonObject getDbConfig(Container container){
String dbHost = container.env().get("OPENSHIFT_MYSQL_DB_HOST") != null ? container.env().get("OPENSHIFT_MYSQL_DB_HOST") : "127.0.0.1";
int dbPort = container.env().get("OPENSHIFT_MYSQL_DB_PORT") != null ? Integer.parseInt(container.env().get("OPENSHIFT_MYSQL_DB_PORT")) : 3306;
JsonObject mysqlConfig = new JsonObject();
mysqlConfig.putString("address", "mysql.async");
mysqlConfig.putString("connection", "MySQL");
mysqlConfig.putString("host", dbHost);
mysqlConfig.putNumber("port", dbPort);
mysqlConfig.putString("username", "root");
mysqlConfig.putString("password", "*******");
mysqlConfig.putString("database", "blog");
return mysqlConfig;
}
public static long getDbTimeout(){
return 5000;
}
public static String getHostConfig(Container container){
String host = container.env().get("OPENSHIFT_VERTX_IP") != null ? container.env().get("OPENSHIFT_VERTX_IP") : "127.0.0.1";
return host;
}
public static int getPortConfig(Container container){
int port = container.env().get("OPENSHIFT_VERTX_PORT") != null ? Integer.parseInt(container.env().get("OPENSHIFT_VERTX_PORT")) : 8080;
return port;
}
}
package com.dizitart.workers;
import org.vertx.java.core.Handler;
import org.vertx.java.core.eventbus.Message;
import org.vertx.java.core.json.JsonArray;
import org.vertx.java.core.json.JsonObject;
import org.vertx.java.platform.Verticle;
/**
* Created by 256441 on 4/9/14.
*/
public class DBMessageConversionVerticle extends Verticle {
public static final String BUS_ADDRESS_SELECT = "async.db.message.formatting.select";
public static final String BUS_ADDRESS_INSERT = "async.db.message.formatting.insert";
@Override
public void start() {
vertx.eventBus().registerHandler(BUS_ADDRESS_SELECT, new Handler<Message<JsonObject>>() {
@Override
public void handle(Message<JsonObject> message) {
JsonArray result = formatSelectMessage(message);
message.reply(result);
}
});
vertx.eventBus().registerHandler(BUS_ADDRESS_INSERT, new Handler<Message<JsonObject>>() {
@Override
public void handle(Message<JsonObject> message) {
JsonArray result = formatInsertMessage(message);
message.reply(result);
}
});
}
private JsonArray formatSelectMessage(Message<JsonObject> message){
JsonObject jsonObject = message.body();
String result = "[";
if(jsonObject != null){
JsonArray fieldArray = jsonObject.getArray("fields");
JsonArray valueArray = jsonObject.getArray("results");
int rowNum = valueArray != null ? valueArray.size() : 0;
if(rowNum > 0){
for(int i = 0; i < rowNum; i ++){
int fieldNum = fieldArray != null ? fieldArray.size() : 0;
JsonArray row = valueArray.get(i);
String object = "{";
for (int j = 0; j < fieldNum; j++){
String key = fieldArray.get(j);
Object value = row.get(j);
if(value instanceof String){
object = object + "\"" + key + "\": \"" + value + "\"";
} else {
object = object + "\"" + key + "\": " + value;
}
object = j != fieldNum - 1 ? object + ", " : object + "}";
}
result = result + object;
result = i != rowNum - 1 ? result + ", " : result + "]";
}
return new JsonArray(result);
}
}
return new JsonArray();
}
private JsonArray formatInsertMessage(Message<JsonObject> message){
JsonObject jsonObject = message.body();
String result ="[";
if(jsonObject != null){
String loginId = jsonObject.getString("login_id");
String emailId = jsonObject.getString("email_id");
String password = jsonObject.getString("passwd");
String last_login = jsonObject.getString("last_login");
result = result + "[\"" + loginId + "\", \"" + emailId + "\", \"" + password + "\", \"" + last_login + "\"]";
}
result = result + "]";
return new JsonArray(result);
}
}
package com.dizitart.handlers;
import org.vertx.java.core.Vertx;
import org.vertx.java.core.http.RouteMatcher;
/**
* Created by Anindya Chatterjee on 4/6/14.
*/
public class DispatchHandler {
private static DispatchHandler dispatchHandler = new DispatchHandler();
private final RouteMatcher routeMatcher;
private Vertx vertx;
private DispatchHandler(){
routeMatcher = new RouteMatcher();
}
public static DispatchHandler createInstance(Vertx vertx){
dispatchHandler.vertx = vertx;
return dispatchHandler;
}
public RouteMatcher getRouteMatcher(){
return routeMatcher;
}
public void registerUrlHandler(UrlHandler handler){
if(handler != null){
handler.register(vertx);
switch (handler.getRequestMethod()){
case CONNECT:
routeMatcher.connect(handler.getUrlPattern(), handler);
break;
case DELETE:
routeMatcher.delete(handler.getUrlPattern(), handler);
break;
case GET:
routeMatcher.get(handler.getUrlPattern(), handler);
break;
case HEAD:
routeMatcher.head(handler.getUrlPattern(), handler);
break;
case OPTIONS:
routeMatcher.options(handler.getUrlPattern(), handler);
break;
case PATCH:
routeMatcher.patch(handler.getUrlPattern(), handler);
break;
case POST:
routeMatcher.post(handler.getUrlPattern(), handler);
break;
case PUT:
routeMatcher.put(handler.getUrlPattern(), handler);
break;
case TRACE:
routeMatcher.trace(handler.getUrlPattern(), handler);
break;
default:
routeMatcher.get(handler.getUrlPattern(), handler);
break;
}
}
}
}
package com.dizitart.handlers;
/**
* Created by Anindya Chatterjee on 4/6/14.
*/
public enum RequestMethod {
OPTIONS,
GET,
HEAD,
POST,
PUT,
PATCH,
DELETE,
TRACE,
CONNECT,
}
package com.dizitart;
/*
* Copyright 2013 Red Hat, Inc.
*
* Red Hat licenses this file to you under the Apache License, version 2.0
* (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*
* @author <a href="http://tfox.org">Tim Fox</a>
*/
import com.dizitart.handlers.DispatchHandler;
import com.dizitart.handlers.UserDeleteHandler;
import com.dizitart.handlers.UserGetHandler;
import com.dizitart.handlers.UserPostHandler;
import org.vertx.java.core.AsyncResult;
import org.vertx.java.core.AsyncResultHandler;
import org.vertx.java.core.Handler;
import org.vertx.java.core.http.HttpServer;
import org.vertx.java.core.http.HttpServerRequest;
import org.vertx.java.core.http.RouteMatcher;
import org.vertx.java.core.json.JsonObject;
import org.vertx.java.core.streams.Pump;
import org.vertx.java.platform.Verticle;
public class RestVerticle extends Verticle {
private String asyncMysqlModId;
public void start() {
String host = ApplicationConfig.getHostConfig(container);
int port = ApplicationConfig.getPortConfig(container);
final JsonObject mysqlConfig = ApplicationConfig.getDbConfig(container);
container.deployModule("io.vertx~mod-mysql-postgresql~0.3.0-SNAPSHOT", mysqlConfig, new AsyncResultHandler<String>() {
@Override
public void handle(AsyncResult<String> stringAsyncResult) {
if(stringAsyncResult.succeeded()){
asyncMysqlModId = stringAsyncResult.result();
container.logger().error(mysqlConfig.encodePrettily());
} else {
container.logger().fatal(stringAsyncResult.cause());
container.exit();
}
}
});
container.deployWorkerVerticle("com.dizitart.workers.DBMessageConversionVerticle", null, 1, true);
DispatchHandler dispatchHandler = DispatchHandler.createInstance(vertx);
initUrlMapping(dispatchHandler);
RouteMatcher routeMatcher = dispatchHandler.getRouteMatcher();
vertx.createHttpServer()
.setCompressionSupported(true)
.requestHandler(routeMatcher)
.listen(port, host);
container.logger().info("Webserver started, listening on port 8080");
}
private void initUrlMapping(DispatchHandler dispatchHandler) {
dispatchHandler.registerUrlHandler(new UserGetHandler());
dispatchHandler.registerUrlHandler(new UserPostHandler());
dispatchHandler.registerUrlHandler(new UserDeleteHandler());
}
@Override
public void stop() {
container.undeployModule(asyncMysqlModId);
super.stop();
}
}
package com.dizitart.handlers;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.vertx.java.core.Handler;
import org.vertx.java.core.Vertx;
import org.vertx.java.core.http.HttpServerRequest;
/**
* Created by Anindya Chatterjee on 4/6/14.
*/
public abstract class UrlHandler implements Handler<HttpServerRequest> {
protected Vertx vertx;
protected Logger logger = LogManager.getLogger();
public void register(Vertx vertx) {
this.vertx = vertx;
}
public abstract String getUrlPattern();
public abstract RequestMethod getRequestMethod();
}
package com.dizitart.handlers;
import com.dizitart.ApplicationConfig;
import com.dizitart.workers.DBMessageConversionVerticle;
import org.vertx.java.core.AsyncResult;
import org.vertx.java.core.AsyncResultHandler;
import org.vertx.java.core.eventbus.Message;
import org.vertx.java.core.http.HttpServerRequest;
import org.vertx.java.core.json.JsonArray;
import org.vertx.java.core.json.JsonObject;
/**
* Created by Anindya Chatterjee on 4/6/14.
*/
public class UserDeleteHandler extends UrlHandler {
private String requestParameter = "userId";
@Override
public void handle(final HttpServerRequest httpServerRequest) {
String userId = httpServerRequest.params().get(requestParameter);
JsonArray values = new JsonArray(new String[]{userId});
JsonObject json = new JsonObject()
.putString("action", "prepared")
.putString("table", "user")
.putString("statement", "delete from user where login_id = ?")
.putArray("values", values);
long dbTimeOut = ApplicationConfig.getDbTimeout();
vertx.eventBus().sendWithTimeout("mysql.async", json, dbTimeOut, new AsyncResultHandler<Message<JsonObject>>() {
@Override
public void handle(AsyncResult<Message<JsonObject>> result) {
if(result.succeeded()) {
httpServerRequest.response().setStatusCode(200).end();
} else {
logger.fatal("Message timeout from DB", result.cause());
httpServerRequest.response().setStatusCode(500).end(result.cause().getLocalizedMessage());
}
}
});
}
@Override
public String getUrlPattern() {
return "/user/delete/:" + requestParameter;
}
@Override
public RequestMethod getRequestMethod() {
return RequestMethod.DELETE;
}
}
package com.dizitart.handlers;
import com.dizitart.ApplicationConfig;
import com.dizitart.workers.DBMessageConversionVerticle;
import org.vertx.java.core.AsyncResult;
import org.vertx.java.core.AsyncResultHandler;
import org.vertx.java.core.Handler;
import org.vertx.java.core.eventbus.Message;
import org.vertx.java.core.http.HttpServerRequest;
import org.vertx.java.core.json.JsonArray;
import org.vertx.java.core.json.JsonObject;
/**
* Created by Anindya Chatterjee on 4/6/14.
*/
public class UserGetHandler extends UrlHandler {
private String requestParameter = "userId";
@Override
public void handle(final HttpServerRequest httpServerRequest) {
String userId = httpServerRequest.params().get(requestParameter);
JsonArray values = new JsonArray(new String[]{userId});
JsonObject json = new JsonObject()
.putString("action", "prepared")
.putString("table", "user")
.putString("statement", "select * from user where login_id = ?")
.putArray("values", values);
long dbTimeOut = ApplicationConfig.getDbTimeout();
vertx.eventBus().sendWithTimeout("mysql.async", json, dbTimeOut, new AsyncResultHandler<Message<JsonObject>>() {
@Override
public void handle(AsyncResult<Message<JsonObject>> result) {
if(result.succeeded()) {
Message<JsonObject> message = result.result();
vertx.eventBus().send(DBMessageConversionVerticle.BUS_ADDRESS_SELECT, message.body(), new DBMessageReplyHandler(httpServerRequest));
} else {
logger.fatal("Message timeout from DB", result.cause());
httpServerRequest.response().setStatusCode(500).end(result.cause().getLocalizedMessage());
}
}
});
}
@Override
public String getUrlPattern() {
return "/user/:" + requestParameter;
}
@Override
public RequestMethod getRequestMethod() {
return RequestMethod.GET;
}
private class DBMessageReplyHandler implements Handler<Message<JsonArray>> {
private HttpServerRequest httpServerRequest;
public DBMessageReplyHandler(HttpServerRequest httpServerRequest) {
this.httpServerRequest = httpServerRequest;
}
@Override
public void handle(Message<JsonArray> message) {
JsonArray result = message.body();
if (result != null && result.size() > 0) {
JsonArray rows = message.body();
if(rows != null && rows.size() > 0){
JsonObject row = rows.get(0);
httpServerRequest.response().setStatusCode(200).end(row.encodePrettily());
} else {
httpServerRequest.response().setStatusCode(404).end();
}
} else {
httpServerRequest.response().setStatusCode(404).end();
}
}
}
}
package com.dizitart.handlers;
import com.dizitart.ApplicationConfig;
import com.dizitart.workers.DBMessageConversionVerticle;
import org.vertx.java.core.AsyncResult;
import org.vertx.java.core.AsyncResultHandler;
import org.vertx.java.core.Handler;
import org.vertx.java.core.buffer.Buffer;
import org.vertx.java.core.eventbus.Message;
import org.vertx.java.core.http.HttpServerRequest;
import org.vertx.java.core.json.JsonArray;
import org.vertx.java.core.json.JsonObject;
/**
* Created by Anindya Chatterjee on 4/6/14.
*/
public class UserPostHandler extends UrlHandler {
@Override
public void handle(final HttpServerRequest httpServerRequest) {
httpServerRequest.bodyHandler(new UserBodyHandler(httpServerRequest));
}
@Override
public String getUrlPattern() {
return "/user/create";
}
@Override
public RequestMethod getRequestMethod() {
return RequestMethod.POST;
}
private class UserBodyHandler implements Handler<Buffer> {
private final HttpServerRequest httpServerRequest;
public UserBodyHandler(HttpServerRequest httpServerRequest) {
this.httpServerRequest = httpServerRequest;
}
@Override
public void handle(Buffer event) {
final String body = event.getString(0, event.length());
JsonObject newObject = new JsonObject(body);
final JsonArray column = new JsonArray("[\"login_id\", \"email_id\", \"passwd\", \"last_login\"]");
vertx.eventBus().send(DBMessageConversionVerticle.BUS_ADDRESS_INSERT, newObject, new Handler<Message<JsonArray>>() {
@Override
public void handle(Message<JsonArray> message) {
JsonArray values = message.body();
JsonObject dbObject = new JsonObject()
.putString("action", "insert")
.putString("table", "user")
.putArray("fields", column)
.putArray("values", values);
long dbTimeOut = ApplicationConfig.getDbTimeout();
vertx.eventBus().sendWithTimeout("mysql.async", dbObject, dbTimeOut, new InsertResultHandler());
}
});
}
private class InsertResultHandler implements AsyncResultHandler<Message<JsonObject>> {
@Override
public void handle(AsyncResult<Message<JsonObject>> event) {
if(event.succeeded()){
httpServerRequest.response().setStatusCode(200).end();
} else {
httpServerRequest.response().setStatusCode(500).end(event.cause().getLocalizedMessage());
}
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment