Skip to content

Instantly share code, notes, and snippets.

@davidski
Created July 16, 2014 20:14
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save davidski/39788b65ac2754c063bf to your computer and use it in GitHub Desktop.
Save davidski/39788b65ac2754c063bf to your computer and use it in GitHub Desktop.
Sample Elasticsearch Scroll Query
from elasticsearch import Elasticsearch, helpers
import csv
import logging
logging.basicConfig()
target_file = "traffic.csv"
es=Elasticsearch("your.es.host.example.tld", timeout=480)
query_body = """
{
"size": 1500,
"fields" : ["@timestamp", "src_geoip.country_name", "protocol", "dstport"],
"query": {
"filtered": {
"query": {
"match_all": {}
},
"filter": {
"terms": {
"srcip": ["8.8.8.8", "8.8.4.4]
},
"bool": {
"must": [
{
"range": {
"@timestamp": {
"gt" : "now-3w",
"lt" : "now"
}
}
},
{
"exists": {
"field": "dstport"
}
},
{
"term": {
"srczone" : "external"
}
}
]
}
}
}
}
}
"""
csvfile = open(target_file, 'wb')
class DictUnicodeProxy(object):
def __init__(self, d):
self.d = d
def __iter__(self):
return self.d.__iter__()
def get(self, item, default=None):
i = self.d.get(item, default)
if isinstance(i, list):
i = i[0]
if isinstance(i, unicode):
return i.encode('utf-8')
return i
scanResp = helpers.scan(client=es, query=query_body, scroll="90m", index="", timeout="10m")
headers = ['@timestamp','src_geoip.country_name','protocol','dstport']
csvwriter = csv.DictWriter(csvfile, delimiter='\t', fieldnames=headers)
csvwriter.writeheader()
for resp in scanResp:
csvwriter.writerow(DictUnicodeProxy(resp['fields']))
csvfile.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment