Skip to content

Instantly share code, notes, and snippets.

@mydoghasworms
Last active August 29, 2015 14:05
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mydoghasworms/19b9ba447c2fbf46c994 to your computer and use it in GitHub Desktop.
Save mydoghasworms/19b9ba447c2fbf46c994 to your computer and use it in GitHub Desktop.
Indexing and searching SAPI PI message payloads with a Ruby retrieval script and an HTML page
<!DOCTYPE html>
<html>
<!--
This provides a simple, single-purpose search front-end for the an Elasticsearch index containing
SAP PI message payloads that were indexed with the accompanying script. For more information, visit
http://ceronio.net/2014/08/sap-pi-message-payload-search-using-elasticsearch/
-->
<head>
<link rel="stylesheet" type="text/css" href="javascript/webix/webix.css"/>
<script type="text/javascript" src="javascript/webix/webix_debug.js"></script>
<script type="text/javascript" src="javascript/elasticsearch-js/elasticsearch.js"></script>
<style>
.error_row {
background-color: #FFAAAA;
}
</style>
</head>
<body>
<script>
var PAGE_SIZE = 20;
var client = elasticsearch.Client(
{
host: location.host,
log: 'trace'
}
);
var default_query = {
// IF YOU CHANGE THE NAME OF THE INDEX IN THE INDEXING SCRIPT, REMEMBER TO ADJUST IT HERE AS WELL:
index: 'abcclnt100',
type: 'pimsg',
from: 0,
size: PAGE_SIZE,
body: {
sort: //{"timestamp": "desc"}
{"time_sent": "desc"},
query: {
match: {
payload: {
query: "",
operator: "and"
}
}
}
}
};
var current_query;
/**
* Entry point for starting a new search with a specified query
*/
function newPayloadSearch(query) {
default_query.body.query.match.payload.query = query;
executePayloadSearch();
}
/**
* Executes the search using the currently set parameters, only allowing you to change the 'from' offset
*/
function executePayloadSearch(from) {
if(typeof(from)==='undefined') from = 0;
default_query.from = from;
client.search(default_query).then(function (resp) {
$$("search_result_data").clearAll();
$$("search_result_data").parse(mapSearchData(resp.hits.hits));
$$("search_result_table").adjust();
$$("result_detail").parse({
from: default_query.from + 1,
to: default_query.from + resp.hits.hits.length,
total: resp.hits.total,
took: resp.took
});
$$("result_detail").show();
adjustSearchResultColumns();
// Enable or disable Previous and Next buttons depending on the size/offset of results
if (default_query.from > 0)
$$("prev_button").enable();
else
$$("prev_button").disable();
if (resp.hits.total > default_query.from + PAGE_SIZE)
$$("next_button").enable();
else
$$("next_button").disable();
}, function (err) {
webix.message({type: "error", expire: -1, text: err.message});
$$("prev_button").disable();
$$("next_button").disable();
});
}
function adjustSearchResultColumns() {
var t = $$("search_result_table");
for (i = 0; i < t.config.columns.length; i++) {
t.adjustColumn(t.config.columns[i].id);
}
}
/**
* Return the message referenced by the given message
*/
function getReferencedMessage(message) {
function response_handler(resp) {
if (typeof(resp.hits) == "undefined")
$$("message_detail_form").parse(flattenHit(resp));
else {
if (resp.hits.total == 0) {
webix.message({type: "info", expire: 2000, text: "There is no corresponding message"});
} else
$$("message_detail_form").parse(flattenHit(resp.hits.hits[0]));
}
}
function error_handler(err) {
webix.message({type: "error", expire: -1, text: err.message});
}
if (message.message_class == "RQ") {
client.search({
index: 'aepclnt500_new',
type: 'pimsg',
q: 'ref_to_message_id:' + message.msgid
}).then(response_handler,
error_handler);
} else {
client.get({index: 'aepclnt500_new', type: 'pimsg', id: message.ref_to_message_id}).then(response_handler,
error_handler);
}
}
/**
* Flatten a single hit
*/
function flattenHit(hit) {
return {
msgid: hit._id,
pid: hit._source.pid,
timestamp: hit._source.timestamp,
message_class: hit._source.message_class,
processing_mode: hit._source.processing_mode,
ref_to_message_id: hit._source.ref_to_message_id,
conversation_id: hit._source.conversation_id,
time_sent: hit._source.time_sent,
interface_name: hit._source.interface_name,
interface_namespace: hit._source.interface_namespace,
sender_service: hit._source.sender_service,
sender_name: hit._source.sender_name,
sender_namespace: hit._source.sender_namespace,
wsuid: hit._source.wsuid,
payload: hit._source.payload
};
}
/**
* Flatten the search data as apparently webix data collections don't handle nested objects
* @param hits resp.hits.hits from search result
*/
function mapSearchData(hits) {
return hits.map(function (hit) {
return flattenHit(hit);
});
}
// Component layout
webix.ready(function () {
webix.ui({
rows: [
// Row with search control
{
cols: [
{
view: "search",
placeholder: "Payload Search",
id: "payloadSearch",
width: 350,
on: {
'onKeyPress': function (code, e) {
if (code === 13 && !e.ctrlKey && !e.shiftKey && !e.altKey) {
newPayloadSearch($$('payloadSearch').getValue());
}
}
}
},
{}
]
},
{
view: "template",
id: "result_detail",
height: 25,
template: "Showing #from# to #to# of #total# hits (#took# miliseconds)",
hidden: true
},
{
//width: 200,
cols: [
{
view: "button",
disabled: true,
id: "prev_button",
label: "<< Previous",
click: function() {
executePayloadSearch(default_query.from - PAGE_SIZE);
}
},
{
view: "button",
disabled: true,
id: "next_button",
label: "Next >>",
click: function() {
executePayloadSearch(default_query.from + PAGE_SIZE);
}
}
]
},
{ cols: [
{
view: "datatable",
id: "search_result_table",
resizeColumn: true,
select: "row",
scheme: {
$change: function (item) {
if (item.message_class == "AE")
item.$css = "error_row";
}
},
height: "auto",
autoheight: true,
columns: [
{id: "msgid", header: "Msg Guid", adjust: true},
{id: "timestamp", header: "Exe. time", adjust: true},
{id: "pid", header: "Pipeline ID", adjust: true},
{id: "message_class", header: "Msg Class", adjust: true},
{id: "processing_mode", header: "Proc. Mode", adjust: true},
{id: "time_sent", header: "Time Sent", adjust: true},
{id: "interface_name", header: "Intf. Name", adjust: true},
{id: "interface_namespace", header: "Intf. Namespace", adjust: true}
]
},
{view: "resizer"},
{
rows: [
{
view: "form",
editable: false,
width: 400,
id: "message_detail_form",
elements: [
{view: "text", name: "msgid", label: "Msg Guid", readonly: true, labelPosition: "top"},
{view: "text", name: "timestamp", label: "Timestamp", readonly: true, labelPosition: "top"},
{view: "textarea", name: "payload", label: "Payload", height: 200, readonly: true, labelPosition: "top"},
{view: "text", name: "message_class", label: "Msg Class", readonly: true, labelPosition: "top"}
]
},
{view: "button", id: "ref_to_message_button", label: "Referenced Message",
click: function () {
getReferencedMessage($$("message_detail_form").getValues());
}
}
]
}
]}
]
});
var search_result_data = new webix.DataCollection({id: "search_result_data"});
$$("search_result_table").sync($$("search_result_data"));
$$('search_result_table').attachEvent("onAfterSelect", function (id) {
search_result_data.setCursor(id);
});
$$("message_detail_form").bind($$("search_result_data"));
});
</script>
</body>
</html>
#!/usr/bin/ruby
# encoding: utf-8
# Provides a simple, single-purpose retrieval and indexing script for SAP PI messages using the nwrfc gem
# and Elasticsearch with the ruby-elasticsearch gem.
# For more information, visit http://ceronio.net/2014/08/sap-pi-message-payload-search-using-elasticsearch/
require 'nwrfc'
require 'date'
require 'elasticsearch'
require 'logger'
include NWRFC
# Set up logger
logger = Logger.new('logfile.log', 10, 4096000)
#-- Configuration options
# NetWeaver connection details
NW_LOGON = {'user' => 'user', 'passwd' => 'secret', #'trace' => 2,
'client' => '100', 'ashost' => 'ajax.example.com', 'sysnr' => '00'}
# ElasticSearch Host
ES_HOST = '127.0.0.1' #'192.168.56.101'
# ElasticSearch Index
ES_INDEX = 'abcclnt100'
# Message selection options
MSG_COUNT = 100000
MSG_FROM_TIME = 20140813000000.0
MSG_TO_TIME = 20140814000000.0
MSG_ERRORS_ONLY = false
#-- Set up and check connection to ElasticSearch host
es = Elasticsearch::Client.new host: ES_HOST #, log: true
es.cluster.health
begin
#-- Connect to NW system
conn = Connection.new(NW_LOGON)
#-- Get function definitions and caller instances
fn_sxmb_query = conn.get_function('SXMB_GET_MESSAGE_LIST')
fn_sxmb_msgraw = conn.get_function('SXMB_READ_MESSAGE_VERSION_RAW')
fc_sxmb_query = fn_sxmb_query.get_function_call
fc_sxmb_msgraw = fn_sxmb_msgraw.get_function_call
#-- Retrieve messages
fc_sxmb_query[:IM_MESSAGE_COUNT] = MSG_COUNT
fc_sxmb_query[:IM_FILTER][:FROM_TIME] = MSG_FROM_TIME
fc_sxmb_query[:IM_FILTER][:TO_TIME] = MSG_TO_TIME
fc_sxmb_query[:IM_FILTER][:MSGSTATE_TAB].new_row { |row| row[nil]= '024' } if MSG_ERRORS_ONLY
logger.info('Retrieving message list')
fc_sxmb_query.invoke
logger.info("Got #{fc_sxmb_query[:EX_MESSAGE_DATA_LIST].size} records")
fc_sxmb_query[:EX_MESSAGE_DATA_LIST].each { |row|
begin
fc_sxmb_msgraw[:SELECTION] = 2
fc_sxmb_msgraw[:VERSION_REQUEST] = '000'
fc_sxmb_msgraw[:MESSAGEKEY][:MSGID] = row[:MSGGUID]
fc_sxmb_msgraw[:MESSAGEKEY][:PID] = row[:PID]
fc_sxmb_msgraw.invoke
# Concatenate rows of payload (In case of SE/System Error, there is no payload)
payload = fc_sxmb_msgraw[:MESSAGEPAYLOAD].inject('') {|payload, entry| payload + entry[:PAYLOAD].force_encoding('UTF-8') }
index_result = es.index index: ES_INDEX,
type: 'pimsg',
id: row[:MSGGUID],
body: {
pid: row[:PID],
timestamp: DateTime.parse(fc_sxmb_msgraw[:TIMESTAMP].to_s),
message_class: fc_sxmb_msgraw[:MAIN_ATTR][:MESSAGE_CLASS],
processing_mode: fc_sxmb_msgraw[:MAIN_ATTR][:PROCESSING_MODE],
ref_to_message_id: fc_sxmb_msgraw[:MAIN_ATTR][:REF_TO_MESSAGE_ID].each_byte.map { |b| b.to_s(16).rjust(2, '0') }.join.upcase,
conversation_id: fc_sxmb_msgraw[:MAIN_ATTR][:CONVERSATION_ID].each_byte.map { |b| b.to_s(16).rjust(2, '0') }.join.upcase,
time_sent: DateTime.parse(fc_sxmb_msgraw[:MAIN_ATTR][:TIME_SENT].to_s),
interface_name: fc_sxmb_msgraw[:MAIN_ATTR][:INTERFACE][:NAME],
interface_namespace: fc_sxmb_msgraw[:MAIN_ATTR][:INTERFACE][:NAMESPACE],
sender_service: fc_sxmb_msgraw[:MAIN_ATTR][:SENDER][:SERVICE],
sender_name: fc_sxmb_msgraw[:MAIN_ATTR][:SENDER][:NAME],
sender_namespace: fc_sxmb_msgraw[:MAIN_ATTR][:SENDER][:NAMESPACE],
wsuid: fc_sxmb_msgraw[:MAIN_ATTR][:WSUID],
payload: payload
}
print '.'
rescue NWRFC::NWError => e
logger.error(e)
logger.error("#{e.group} #{e.code}")
print 'E'
rescue => e
logger.error("#{row[:MSGGUID]} #{row[:PID]}")
logger.error(e)
print 'E'
end
}
logger.info('Finished retrieving payloads and indexing')
rescue NWRFC::NWError => e
logger.error(e)
puts e
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment