Skip to content

Instantly share code, notes, and snippets.

@sudhirpandey
Last active July 31, 2018 09:22
Show Gist options
  • Save sudhirpandey/49890b9f1eee28be3cd10d42b24684bf to your computer and use it in GitHub Desktop.
Save sudhirpandey/49890b9f1eee28be3cd10d42b24684bf to your computer and use it in GitHub Desktop.
Spark codes for data in elastic search
from pyspark import SparkConf, SparkContext
from collections import namedtuple
import re
conf = SparkConf().setAppName("ESTest")
sc = SparkContext(conf=conf)
es_read_conf = {
"es.nodes" : "es-server",
"es.port" : "19200",
"es.resource" : "paasapp-2018.07.31/access_log",
# "es.query": '{ "range":{ "@timestamp" : {"gte" : "now-15m", "lte" : "now" }}}'
"es.query": "query.json"
}
es_rdd = sc.newAPIHadoopRDD(
inputFormatClass="org.elasticsearch.hadoop.mr.EsInputFormat",
keyClass="org.apache.hadoop.io.NullWritable",
valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable",
conf=es_read_conf)
def botpresent(item):
regex="http?:\/\/.+?\/([a-zA-Z]+)\/?.*"
if "BOT: true" in item[1]["message"]:
return [("BOT",1)]
elif "BOT: false" in item[1]["message"]:
return [("NONBOT",1)]
elif "FetchError:" in item[1]["message"]:
matches=re.findall(regex,item[1]["message"])
return [(matches[0]+"-fetcherror", 1)]
elif "No hit:" in item[1]["message"]:
return [ ("NoHIT", 1)]
elif "catching state when error" in item[1]["message"]:
return [ ("stateError", 1)]
else:
# this for finding out works that will make give of clue of the other kind of error message
#words=item[1]["message"].split()
#wordlist=map(lambda x:(x,1),words)
#wordlist.append(("Unclassified", 1))
print ( item[1]["message"].encode('utf-8').strip() )
return[("Unclassified", 1)]
#return wordlist
def countsignificant(item):
words=item[1]["message"].split()
return words
#for debug
#print(es_rdd.first())
#var=es_rdd.first()
#for key, val in var[1].iteritems():
#print key
# print(var[1]["message"])
docs = es_rdd.map(lambda item: item[0])
# get the count of BOT,NONBOT and other
reqbotclassification_rdd = es_rdd.flatMap(botpresent)
reqbotclassification=reqbotclassification_rdd.reduceByKey(lambda a,b:a+b).sortBy(lambda item: -item[1])
print('Total number of docs: %s' % docs.count())
results = reqbotclassification.collect()
for result in results:
print(result)
from pyspark import SparkConf, SparkContext
from collections import namedtuple
conf = SparkConf().setAppName("ESTest")
sc = SparkContext(conf=conf)
Access = namedtuple("Access", ["country", "os"])
es_read_conf = {
"es.nodes" : "servername",
"es.port" : "9200",
"es.resource" : "index/syslog"
}
es_rdd = sc.newAPIHadoopRDD(
inputFormatClass="org.elasticsearch.hadoop.mr.EsInputFormat",
keyClass="org.apache.hadoop.io.NullWritable",
valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable",
conf=es_read_conf)
def keytuple(item):
#record=Access(country=item['geoip']['country_name'],os=item['os'])
print(item)
return item[1]["geoip"]["country_name" ]+','+item[1]["os"]
#for debug
#print(es_rdd.first())
#var=es_rdd.first()
#for key, val in var[1].iteritems():
#print key
#print(var[1]["os"])
docs = es_rdd.map(lambda item: item[0])
countries = es_rdd.map(lambda item: item[1]["geoip"]["country_name"] if "geoip" in item[1] else None)
country_os = es_rdd.map(lambda item: item[1]["geoip"]["country_name"]+'-'+item[1]["os"] if all ( k in item[1] for k in ("geoip","os")) else None)
#sort in descending order
country_total_hit = countries.map( lambda x:(x, 1)).reduceByKey(lambda a, b: a+b).sortBy(lambda item: -item[1])
#sort out countries with platform
country_os_stat = country_os.map( lambda x:(x, 1)).reduceByKey(lambda a, b: a+b).sortBy(lambda item: -item[1])
#Print out total docs
print('Total number of docs: %s' % docs.count())
#Print out docs without country info
print('Total number of docs with CountryInfo: %s' % countries.count())
#Print distinct countries count
print('Number from countries %s' % countries.distinct().count())
#Print Top ten countries that are requesting count
print('Top ten countries making request')
results = country_total_hit.collect()
for result in results[:10]:
print(result)
print('Top ten countries making request')
results = country_os_stat.collect()
for result in results[:10]:
print(result)
{
"filtered": {
"query": {
"multi_match": {
"query": "URL",
"fields": ["message"]
}
},
"filter": {
"bool": {
"must": [
{
"range": {
"@timestamp": {
"gte": "now-1h",
"lte": "now"
}
}
},
{
"match": {
"kubernetes.labels.appname": "appname"
}
}
],
"must_not":[
{
"match": {
"message": "U2"
}
}
]
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment