Skip to content

Instantly share code, notes, and snippets.

@nadeesha5814
Last active December 15, 2015 04:04
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save nadeesha5814/1eff600c53c57dd57d1a to your computer and use it in GitHub Desktop.
Save nadeesha5814/1eff600c53c57dd57d1a to your computer and use it in GitHub Desktop.
package org.wso2.carbon.sample.apimgt.usage.publisher;
/**
* Created by NADEESHA on 12/14/2015.
*/
import org.wso2.carbon.apimgt.usage.publisher.APIMgtUsageDataBridgeDataPublisher;
import org.wso2.carbon.apimgt.usage.publisher.APIMgtUsageDataPublisher;
import org.wso2.carbon.apimgt.usage.publisher.DataPublisherUtil;
import org.wso2.carbon.apimgt.usage.publisher.dto.FaultPublisherDTO;
import org.wso2.carbon.apimgt.usage.publisher.dto.RequestPublisherDTO;
import org.wso2.carbon.apimgt.usage.publisher.dto.ResponsePublisherDTO;
import org.wso2.carbon.apimgt.usage.publisher.dto.ThrottlePublisherDTO;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.wso2.carbon.apimgt.usage.publisher.dto.DataBridgeFaultPublisherDTO;
import org.wso2.carbon.apimgt.usage.publisher.dto.DataBridgeRequestPublisherDTO;
import org.wso2.carbon.sample.apimgt.usage.publisher.dto.CustomDataBridgeResponsePublisherDTO;
import org.wso2.carbon.apimgt.usage.publisher.dto.DataBridgeThrottlePublisherDTO;
import org.wso2.carbon.apimgt.usage.publisher.internal.DataPublisherAlreadyExistsException;
import org.wso2.carbon.apimgt.usage.publisher.internal.UsageComponent;
import org.wso2.carbon.context.CarbonContext;
import org.wso2.carbon.databridge.agent.thrift.exception.AgentException;
import org.wso2.carbon.databridge.agent.thrift.lb.DataPublisherHolder;
import org.wso2.carbon.databridge.agent.thrift.lb.LoadBalancingDataPublisher;
import org.wso2.carbon.databridge.agent.thrift.lb.ReceiverGroup;
import org.wso2.carbon.databridge.commons.exception.*;
import java.net.MalformedURLException;
import java.util.ArrayList;
import java.util.List;
public class CustomAPIMgtUsageDataBridgeDataPublisher implements APIMgtUsageDataPublisher {
private static final Log log = LogFactory.getLog(APIMgtUsageDataBridgeDataPublisher.class);
private LoadBalancingDataPublisher dataPublisher;
public void init(){
try {
if(log.isDebugEnabled()){
log.debug("Initializing APIMgtUsageDataBridgeDataPublisher");
}
this.dataPublisher = getDataPublisher();
//If Request Stream Definition does not exist.
if(!dataPublisher.isStreamDefinitionAdded(DataPublisherUtil.getApiManagerAnalyticsConfiguration().
getRequestStreamName(), DataPublisherUtil.getApiManagerAnalyticsConfiguration().
getRequestStreamVersion())){
//Get Request Stream Definition
String requestStreamDefinition = DataBridgeRequestPublisherDTO.getStreamDefinition();
//Add Request Stream Definition.
dataPublisher.addStreamDefinition(requestStreamDefinition,
DataPublisherUtil.getApiManagerAnalyticsConfiguration().getRequestStreamName(),
DataPublisherUtil.getApiManagerAnalyticsConfiguration().getRequestStreamVersion());
}
//If Response Stream Definition does not exist.
if(!dataPublisher.isStreamDefinitionAdded(DataPublisherUtil.getApiManagerAnalyticsConfiguration().
getResponseStreamName(), DataPublisherUtil.getApiManagerAnalyticsConfiguration()
.getResponseStreamVersion())){
//Get Response Stream Definition.
String responseStreamDefinition = CustomDataBridgeResponsePublisherDTO.getStreamDefinition();
//Add Response Stream Definition.
dataPublisher.addStreamDefinition(responseStreamDefinition,
DataPublisherUtil.getApiManagerAnalyticsConfiguration().getResponseStreamName(),
DataPublisherUtil.getApiManagerAnalyticsConfiguration().getResponseStreamVersion());
}
//If Fault Stream Definition does not exist.
if(!dataPublisher.isStreamDefinitionAdded(DataPublisherUtil.getApiManagerAnalyticsConfiguration().
getFaultStreamName(), DataPublisherUtil.getApiManagerAnalyticsConfiguration().
getFaultStreamVersion())){
//Get Fault Stream Definition
String faultStreamDefinition = DataBridgeFaultPublisherDTO.getStreamDefinition();
//Add Fault Stream Definition;
dataPublisher.addStreamDefinition(faultStreamDefinition,
DataPublisherUtil.getApiManagerAnalyticsConfiguration().getFaultStreamName(),
DataPublisherUtil.getApiManagerAnalyticsConfiguration().getFaultStreamVersion());
}
//If Throttle Stream Definition does not exist.
if(!dataPublisher.isStreamDefinitionAdded(DataPublisherUtil.getApiManagerAnalyticsConfiguration().
getThrottleStreamName(), DataPublisherUtil.getApiManagerAnalyticsConfiguration().
getThrottleStreamVersion())){
//Get Throttle Stream Definition
String throttleStreamDefinition = DataBridgeThrottlePublisherDTO.getStreamDefinition();
//Add Throttle Stream Definition;
dataPublisher.addStreamDefinition(throttleStreamDefinition,
DataPublisherUtil.getApiManagerAnalyticsConfiguration().getThrottleStreamName(),
DataPublisherUtil.getApiManagerAnalyticsConfiguration().getThrottleStreamVersion());
}
}catch (Exception e){
log.error("Error initializing APIMgtUsageDataBridgeDataPublisher", e);
}
}
public void publishEvent(RequestPublisherDTO requestPublisherDTO) {
DataBridgeRequestPublisherDTO dataBridgeRequestPublisherDTO = new DataBridgeRequestPublisherDTO(requestPublisherDTO);
try {
//Publish Request Data
dataPublisher.publish(DataPublisherUtil.getApiManagerAnalyticsConfiguration().getRequestStreamName(),
DataPublisherUtil.getApiManagerAnalyticsConfiguration().getRequestStreamVersion() ,
System.currentTimeMillis(), new Object[]{"external"}, null,
(Object[]) dataBridgeRequestPublisherDTO.createPayload());
} catch(AgentException e){
log.error("Error while publishing Request event", e);
}
}
public void publishEvent(ResponsePublisherDTO customResponsePublisherDTO) {
CustomDataBridgeResponsePublisherDTO dataBridgeResponsePublisherDTO = new CustomDataBridgeResponsePublisherDTO(customResponsePublisherDTO);
try {
DataPublisherUtil.getApiManagerAnalyticsConfiguration().getResponseStreamName();
DataPublisherUtil.getApiManagerAnalyticsConfiguration().getResponseStreamVersion();
dataBridgeResponsePublisherDTO.createPayload();
//Publish Response Data
dataPublisher.publish(DataPublisherUtil.getApiManagerAnalyticsConfiguration().getResponseStreamName(),
DataPublisherUtil.getApiManagerAnalyticsConfiguration().getResponseStreamVersion() ,
System.currentTimeMillis(), new Object[]{"external"}, null,
(Object[]) dataBridgeResponsePublisherDTO.createPayload());
} catch (AgentException e) {
log.error("Error while publishing Response event", e);
}
}
public void publishEvent(FaultPublisherDTO faultPublisherDTO) {
DataBridgeFaultPublisherDTO dataBridgeFaultPublisherDTO = new DataBridgeFaultPublisherDTO(faultPublisherDTO);
try {
//Publish Fault Data
dataPublisher.publish(DataPublisherUtil.getApiManagerAnalyticsConfiguration().getFaultStreamName(),
DataPublisherUtil.getApiManagerAnalyticsConfiguration().getFaultStreamVersion() ,
System.currentTimeMillis(), new Object[]{"external"}, null,
(Object[]) dataBridgeFaultPublisherDTO.createPayload());
} catch (AgentException e) {
log.error("Error while publishing Fault event", e);
}
}
public void publishEvent(ThrottlePublisherDTO throttPublisherDTO) {
DataBridgeThrottlePublisherDTO dataBridgeThrottlePublisherDTO = new
DataBridgeThrottlePublisherDTO(throttPublisherDTO);
try {
//Publish Throttle data
dataPublisher.publish(DataPublisherUtil.getApiManagerAnalyticsConfiguration().getThrottleStreamName(),
DataPublisherUtil.getApiManagerAnalyticsConfiguration().getThrottleStreamVersion(),
System.currentTimeMillis(), new Object[]{"external"}, null,
(Object[]) dataBridgeThrottlePublisherDTO.createPayload());
} catch (AgentException e) {
log.error("Error while publishing Throttle exceed event", e);
}
}
private static LoadBalancingDataPublisher getDataPublisher()
throws AgentException, MalformedURLException, AuthenticationException,
TransportException {
String tenantDomain = CarbonContext.getThreadLocalCarbonContext().getTenantDomain();
//Get LoadBalancingDataPublisher which has been registered for the tenant.
LoadBalancingDataPublisher loadBalancingDataPublisher = UsageComponent.getDataPublisher(tenantDomain);
//If a LoadBalancingDataPublisher had not been registered for the tenant.
if(loadBalancingDataPublisher == null){
List<String> receiverGroups = org.wso2.carbon.databridge.agent.thrift.util.DataPublisherUtil.
getReceiverGroups(DataPublisherUtil.getApiManagerAnalyticsConfiguration().getBamServerUrlGroups());
String serverUser = DataPublisherUtil.getApiManagerAnalyticsConfiguration().getBamServerUser();
String serverPassword = DataPublisherUtil.getApiManagerAnalyticsConfiguration().getBamServerPassword();
List<ReceiverGroup> allReceiverGroups = new ArrayList<ReceiverGroup>();
for(String receiverGroupString : receiverGroups){
String[] serverURLs = receiverGroupString.split(",");
List<DataPublisherHolder> dataPublisherHolders = new ArrayList<DataPublisherHolder>();
for(int i=0; i<serverURLs.length; i++){
String serverURL = serverURLs[i];
DataPublisherHolder dataPublisherHolder =
new DataPublisherHolder(null, serverURL, serverUser, serverPassword);
dataPublisherHolders.add(dataPublisherHolder);
}
ReceiverGroup receiverGroup = new ReceiverGroup((ArrayList)dataPublisherHolders);
allReceiverGroups.add(receiverGroup);
}
//Create new LoadBalancingDataPublisher for the tenant.
loadBalancingDataPublisher = new LoadBalancingDataPublisher((ArrayList)allReceiverGroups);
try {
//Add created LoadBalancingDataPublisher.
UsageComponent.addDataPublisher(tenantDomain, loadBalancingDataPublisher);
} catch (DataPublisherAlreadyExistsException e) {
log.warn("Attempting to register a data publisher for the tenant " + tenantDomain +
" when one already exists. Returning existing data publisher");
return UsageComponent.getDataPublisher(tenantDomain);
}
}
return loadBalancingDataPublisher;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment