Skip to content

Instantly share code, notes, and snippets.

package com.wso2.test;
import org.apache.synapse.MessageContext;
import org.apache.synapse.mediators.AbstractMediator;
import org.apache.synapse.registry.Registry;
import org.apache.synapse.config.Entry;
import org.apache.axiom.om.impl.llom.OMTextImpl;
public class RecourceFinder extends AbstractMediator {
String REGISTRY_MAPPING_STORAGE_URL = "conf:/repository/new";
@Source(type = 'http-request', source.id='login-request',
receiver.url="https://localhost:8280/analytics",
@map(type='json',
@attributes(messageId='trp:messageId',
username='$.username', loginTime='$.loginTime', loginIp = '$.loginIp')))
define stream InputStream (
messageId string,
username string,
loginIp string,
loginTime long);
@Source(type = 'http-request', source.id='login-request',
receiver.url="https://localhost:8280/analytics",
@map(type='json',
@attributes(messageId='trp:messageId',
username='$.username', loginTime='$.loginTime', loginIp = '$.loginIp')))
define stream InputStream (
messageId string,
username string,
loginIp string,
loginTime long);
@sink(type='http-request', sink.id='production-alerts',
publisher.url='http://localhost:8080/alerts',
@map(type='json'))
define stream ProductionAlertStream (id string, name string, amount double);
@source(type='http-response' , sink.id='production-alerts', http.status.code='200',
@map(type='json'))
define stream ResponseStream(alertId long, alertMessage string);
@sink(type='http',
publisher.url='http://localhost:8009/OutputStream', method='{{method}}',
headers="'content-type:application/xml','content-length:94'",
@map(type='xml')))
define stream OutputStream (id String, volume double, price float, method string);
@Source(type = 'http',
receiver.url="https://localhost:8280/Analytics/InputStream", worker.count='10',
@map(type='json')
define stream InputStream (id string, price float, volume double);
@App:name("AbnormalLoginBehaviour")
-- Please refer to https://docs.wso2.com/display/SP400/Quick+Start+Guide on getting started with SP editor.
@sink(type = 'log', @map(type = 'text'))
@Source(type = 'http-request', source.id='testsource',
receiver.url="https://localhost:8280/Analytics/InputStream",
@map(type='json', @attributes(messageId='trp:messageId', username='$.event.username', loginTime='$.event.loginTime', loginIp = '$.event.loginIp')))
define stream InputStream (
messageId string,
@sink(type='http-response', source.id='testsource', message.id='{{messageId}}',
@map(type='json'))
define stream OutputStream (messageId string, username string,risk double);
from dayFilterStream
select messageId, username, ifThenElse(minLoginTime<=loginHour and loginHour<=maxLoginTime, 0.0, ifThenElse(minLoginTime-2<loginHour and loginHour<maxLoginTime+2, 0.5, 1.0)) as risk
insert into OutputStream;
from InputStream#rdbms:query('IS_ANALYTICS_DB', 'select hour(FROM_UNIXTIME(?/1000)) as loginHour, hour(FROM_UNIXTIME(min(timestamp)/1000)) as minLoginTime, hour(FROM_UNIXTIME(max(timestamp)/1000)) as maxLoginTime from OverallAuthTable where DAYOFWEEK(FROM_UNIXTIME(?/1000)) = DAYOFWEEK(FROM_UNIXTIME(timestamp/1000)) and (timestamp > UNIX_TIMESTAMP(subdate(current_date, 28))) and authStepSuccess = 1 and eventType=?;', loginTime, loginTime, 'overall', 'loginHour long, minLoginTime long, maxLoginTime long' )
select username, loginIp, loginHour, minLoginTime, maxLoginTime, messageId
insert into dayFilterStream;