Skip to content

Instantly share code, notes, and snippets.

@dadoonet
Created June 22, 2016 10:22
Show Gist options
  • Save dadoonet/f76dcc0483e60b3b49a75afd017fd947 to your computer and use it in GitHub Desktop.
Save dadoonet/f76dcc0483e60b3b49a75afd017fd947 to your computer and use it in GitHub Desktop.
Notes for the Ingest Node demo
# 52.35.38.35 - - [19/Apr/2016:12:00:04 +0200] "GET / HTTP/1.1" 200 24
# Start with empty pipeline
GET _ingest/pipeline/_simulate
{
"pipeline" : {
"processors" : [
]
},
"docs" : [
{
"_source" : {
"message" : "52.35.38.35 - - [19/Apr/2016:12:00:04 +0200] \"GET / HTTP/1.1\" 200 24"
}
}
]
}
# Add Grok
{
"grok" : {
"field" : "message",
"patterns" : ["%{COMMONAPACHELOG}"]
}
}
# Add Date
},
{
"date" : {
"field" : "timestamp",
"formats" : ["dd/MMM/YYYY:HH:mm:ss Z"]
}
# Remove message
},
{
"remove" : {
"field" : "message"
}
# Remove old date
},
{
"remove" : {
"field" : "timestamp"
}
# Add Convert
},
{
"convert" : {
"field" : "response",
"type" : "integer"
}
# Add new convert
},
{
"convert" : {
"field" : "bytes",
"type" : "integer"
}
## Do it verbosely
?verbose
# Full pipeline
GET _ingest/pipeline/_simulate
{
"pipeline" : {
"processors" : [
{
"grok" : {
"field" : "message",
"patterns" : ["%{COMMONAPACHELOG}"]
}
},
{
"date" : {
"field" : "timestamp",
"formats" : ["dd/MMM/YYYY:HH:mm:ss Z"]
}
},
{
"remove" : {
"field" : "message"
}
},
{
"remove" : {
"field" : "timestamp"
}
},
{
"convert" : {
"field" : "response",
"type" : "integer"
}
},
{
"convert" : {
"field" : "bytes",
"type" : "integer"
}
}
]
},
"docs" : [
{
"_source" : {
"message" : "52.35.38.35 - - [19/Apr/2016:12:00:04 +0200] \"GET / HTTP/1.1\" 200 24"
}
}
]
}
# Add failure handler for the pipeline (and remove verbose)
],
"on_failure" : [
{
"set" : {
"field" : "_index",
"value" : "error-index"
}
}
GET _ingest/pipeline/_simulate
{
"pipeline" : {
"processors" : [
{
"grok" : {
"field" : "message",
"patterns" : ["%{COMMONAPACHELOG}"]
}
},
{
"date" : {
"field" : "timestamp",
"formats" : ["dd/MMM/YYYY:HH:mm:ss Z"]
}
},
{
"remove" : {
"field" : "message"
}
},
{
"remove" : {
"field" : "timestamp"
}
},
{
"convert" : {
"field" : "response",
"type" : "integer"
}
},
{
"convert" : {
"field" : "bytes",
"type" : "integer"
}
}
],
"on_failure" : [
{
"set" : {
"field" : "_index",
"value" : "error-index"
}
}
]
},
"docs" : [
{
"_source" : {
"message" : "52.35.38.35 - - [19/Apr/2016:12:00:04 +0200] \"GET / HTTP/1.1\" 200 24"
}
}
]
}
# Add failure handler for date processor
,
"on_failure" : [
{
"set" : {
"field" : "@timestamp",
"value" : "1971-12-26T23:59:59.000+0100"
}
}
]
Send with date=30/Feb/2016
GET _ingest/pipeline/_simulate
{
"pipeline" : {
"processors" : [
{
"grok" : {
"field" : "message",
"patterns" : ["%{COMMONAPACHELOG}"]
}
},
{
"date" : {
"field" : "timestamp",
"formats" : ["dd/MMM/YYYY:HH:mm:ss Z"],
"on_failure" : [
{
"set" : {
"field" : "@timestamp",
"value" : "1971-12-26T23:59:59.000+0100"
}
}
]
}
},
{
"remove" : {
"field" : "message"
}
},
{
"remove" : {
"field" : "timestamp"
}
},
{
"convert" : {
"field" : "response",
"type" : "integer"
}
},
{
"convert" : {
"field" : "bytes",
"type" : "integer"
}
}
],
"on_failure" : [
{
"set" : {
"field" : "_index",
"value" : "error-index"
}
}
]
},
"docs" : [
{
"_source" : {
"message" : "52.35.38.35 - - [30/Feb/2016:12:00:04 +0200] \"GET / HTTP/1.1\" 200 24"
}
}
]
}
# Add failure handler for bytes
"on_failure" : [
{
"set" : {
"field" : "bytes",
"value" : -1
}
}
]
Send 2.4 instead of 24
# #####################################################
# ############ STEP 0: Prepare Demo
# #####################################################
# Remove existing data
DELETE .bano-*
# #####################################################
# ############ STEP 1: Install
# #####################################################
# Read existing bano indices
GET _cat/indices?v
GET _bano
# Inject DEPT 17
PUT /_bano/17
GET _cat/indices?v
GET .bano/_search
GET _bano
# Inject DEPT 95
PUT /_bano/95
GET _cat/indices?v
GET _bano
GET .bano/_search
{
"query": {
"match": {
"address.street_name": "myrtilles"
}
}
}
# #####################################################
# ############ STEP 2: Simulate Ingestion
# #####################################################
# From physical address to Geo Point and normalized address
POST /_ingest/pipeline/_simulate?pretty
{
"pipeline" :
{
"processors": [
{
"bano" : {
}
}
]
},
"docs": [ {
"_index": "index",
"_type": "type",
"_id": "id",
"_source": {
"address": {
"number": "25",
"street_name": "georges",
"zipcode": "17440",
"city": "Aytré"
}
}
} ]
}
# From Geo Point to physical address
POST /_ingest/pipeline/_simulate?pretty
{
"pipeline" :
{
"processors": [
{
"bano" : {
}
}
]
},
"docs": [ {
"_index": "index",
"_type": "type",
"_id": "id",
"_source": {
"location": {
"lat": 49.040872,
"lon": 2.017811
}
}
} ]
}
# Find my neighbor
# "lon": 2.017700
# #####################################################
# ############ STEP 3: Mode advanced Ingestion pipeline
# #####################################################
# Combine with Geo Ip processor
DELETE /_ingest/pipeline/bano-test
PUT /_ingest/pipeline/bano-test
{
"processors": [ {
"geoip" : {
"field" : "ip"
}
}, {
"bano": {
"location_lat_field": "geoip.location.lat",
"location_lon_field": "geoip.location.lon"
}
} ]
}
POST /_ingest/pipeline/bano-test/_simulate?verbose
{
"docs": [ {
"_index": "index",
"_type": "type",
"_id": "id",
"_source": {
"ip" : "82.229.80.187"
}
} ]
}
# #####################################################
# ############ STEP 0: Prepare Demo
# #####################################################
# Remove existing data
DELETE person*
# Inject 100k docs with injector.jar
# java -jar injector/injector-5.0.jar 100000 10000
# #####################################################
# ############ STEP 1: Check existing data
# #####################################################
GET person/_search
# Goal is to enrich existing geo points with more advanced address
# For people living in Cergy (95)
GET person/_search
{
"query": {
"term": {
"address.city": {
"value": "Cergy"
}
}
}
}
# #####################################################
# ############ STEP 2: Create Ingestion Pipeline
# #####################################################
DELETE /_ingest/pipeline/bano-person
# For now we use a hack for BANO
PUT /_ingest/pipeline/bano-person
{
"processors": [ {
"set": {
"field": "address.number",
"value": ""
}
},{
"set": {
"field": "address.street_name",
"value": ""
}
},{
"bano": {
"location_lat_field": "address.location.lat",
"location_lon_field": "address.location.lon"
}
} ]
}
# We check
POST /_ingest/pipeline/bano-person/_simulate
{
"docs": [ {
"_index": "index",
"_type": "type",
"_id": "id",
"_source": {
"name" : "David Pilato",
"date_of_birth" : "1971-12-26",
"gender" : "male",
"children" : 3,
"address" : {
"country" : "France",
"zipcode" : "95800",
"city" : "Cergy",
"countrycode" : "FR",
"location" : {
"lon" : 2.017811,
"lat" : 49.040872
}
}
}
} ]
}
# #####################################################
# ############ STEP 3: Reindex
# #####################################################
DELETE person-enriched
PUT person-enriched
{
"settings": {
"number_of_shards": 1,
"number_of_replicas": 0
},
"mappings": {
"person": {
"properties": {
"@timestamp": {
"type": "date",
"format": "dateOptionalTime"
},
"address": {
"properties": {
"city": {
"type": "keyword"
},
"country": {
"type": "keyword"
},
"countrycode": {
"type": "keyword"
},
"location": {
"type": "geo_point"
},
"zipcode": {
"type": "text"
},
"street_name": {
"type": "text"
},
"number": {
"type": "keyword"
},
"full_address": {
"type": "text"
}
}
},
"children": {
"type": "long"
},
"dateOfBirth": {
"type": "date",
"copy_to": [
"@timestamp"
],
"format": "dateOptionalTime"
},
"gender": {
"type": "keyword"
},
"marketing": {
"properties": {
"cars": {
"type": "long"
},
"electronic": {
"type": "long"
},
"fashion": {
"type": "long"
},
"food": {
"type": "long"
},
"garden": {
"type": "long"
},
"hifi": {
"type": "long"
},
"music": {
"type": "long"
},
"shoes": {
"type": "long"
},
"toys": {
"type": "long"
}
}
},
"name": {
"type": "text"
}
}
}
}
}
POST _reindex
{
"source": {
"index": "person",
"query": {
"term": {
"address.city": {
"value": "Cergy"
}
}
}
},
"dest": {
"index": "person-enriched",
"pipeline": "bano-person"
}
}
GET person-enriched/_search
GET person-enriched/_search
{
"aggs": {
"by_number": {
"terms": {
"field": "address.number"
}
}
},
"query": {
"match": {
"address.street_name": "fontaines"
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment