Skip to content

Instantly share code, notes, and snippets.

@hellojinjie
Created October 15, 2012 12:11
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save hellojinjie/3892143 to your computer and use it in GitHub Desktop.
Save hellojinjie/3892143 to your computer and use it in GitHub Desktop.
package com.xindianbao.isv;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.gearman.Gearman;
import org.gearman.GearmanClient;
import org.gearman.GearmanServer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.taobao.api.domain.NotifyTrade;
import com.taobao.top.syncserver.Configuration;
import com.taobao.top.syncserver.constants.StatusConstants;
import com.taobao.top.syncserver.dao.ITradePersistentDao;
import com.taobao.top.syncserver.domain.BizMessage;
import com.taobao.top.syncserver.domain.TradeMessage;
import com.taobao.top.syncserver.service.IBizMsgProcessService;
import com.taobao.top.syncserver.service.impl.DefaultTradeMsgProcess;
import com.taobao.top.syncserver.util.Constant;
public class XinDianBaoTradeMessageProcessor implements IBizMsgProcessService{
public static final Logger log = LoggerFactory.getLogger(XinDianBaoTradeMessageProcessor.class);
private IBizMsgProcessService tradeMsgProcessService;
private Gearman gearman = Gearman.createGearman();
private GearmanClient client;
private ExecutorService executor = Executors.newSingleThreadExecutor();
public XinDianBaoTradeMessageProcessor(Configuration configure) {
tradeMsgProcessService = new DefaultTradeMsgProcess();
DefaultTradeMsgProcess defaultTradeMsgProcess = (DefaultTradeMsgProcess)tradeMsgProcessService;
defaultTradeMsgProcess.setTradeDao((ITradePersistentDao)configure.getTradeDataPersistentDao());
defaultTradeMsgProcess.setDataSourceService(configure.getDataSourceService());
defaultTradeMsgProcess.setLogService(configure.getLogService());
defaultTradeMsgProcess.setMsgRetryProcessService(configure.getRetryProcessMsgService());
defaultTradeMsgProcess.setSessionService(configure.getSessionService());
defaultTradeMsgProcess.setTradeFields(configure.getConfig(Constant.TRADE_FIELDS));
defaultTradeMsgProcess.setMultiAppManagerService(configure.getMultiAppManagerService());
defaultTradeMsgProcess.setAlertService(configure.getAlertService());
GearmanServer server = gearman.createGearmanServer("127.0.0.1", 4730);
client = gearman.createGearmanClient();
client.addServer(server);
}
@Override
public void processBizMsg(BizMessage bizMsg) {
if (bizMsg == null || (bizMsg != null && bizMsg.getUserId() == null)) {
return;
}
TradeMessage tradeMsg = (TradeMessage) bizMsg;
NotifyTrade trade = tradeMsg.getBizMsg();
long status = StatusConstants.statusMap.get(trade.getStatus());
if (StatusConstants.TRADE_SUCCESS_STATUS_CODE == status) {
this.executor.execute(new Task(client, tradeMsg));
}
}
public static class Task implements Runnable {
private GearmanClient client;
private TradeMessage tradeMsg;
public Task(GearmanClient client, TradeMessage tradeMsg) {
this.client = client;
this.tradeMsg = tradeMsg;
}
@Override
public void run() {
client.submitBackgroundJob("traderate_worker", tradeMsg.getOriginalMsg().getBytes());
log.error("submit to gearman: " + tradeMsg.getOriginalMsg());
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment