Skip to content

Instantly share code, notes, and snippets.

@djptek
Last active September 26, 2018 13:01
Show Gist options
  • Save djptek/73e3bd31b4157c16af82a17546195f6f to your computer and use it in GitHub Desktop.
Save djptek/73e3bd31b4157c16af82a17546195f6f to your computer and use it in GitHub Desktop.
Training subscription draw at Elastic{ON} 2018
# permit regex to create a matcher for email domain
# ensure this setting has been enabled in elasticsearch.yml
# script.painless.regex.enabled: true
# define the ingest pipeline
PUT _ingest/pipeline/email_to_id_route_and_redact
{
"description": "use Ticket_Reference_ID as _id to dedupe, route by domain and redact ALL potential PII",
"processors": [
{
"script": {
"source": """
// set document id to Ticket_Reference_ID
ctx._id = ctx.Ticket_Reference_ID;
// extract domain from email
def domain = /.*@/.matcher(ctx.Email).replaceAll('');
// if domain == elastic.co, route to ineligible index
ctx._index = (domain == 'elastic.co') ?
'employee_ineligible' : 'lead_eligible';
// iterate over the document properties
Iterator properties = ctx.keySet().iterator();
while (properties.hasNext()) {
def property = properties.next();
// extract the field prefix
def prefix = property.substring(0,1);
// redact fields not generated by ES, excepting Ticket_Reference_ID
if(!(prefix.equals('_') ||
prefix.equals('@') ||
property.equals('Ticket_Reference_ID')))
// set those fields to pii_redacted
ctx[property] = 'pii_redacted';
}
"""
}
}
]
}
# test with the simulate API
POST _ingest/pipeline/email_to_id_route_and_redact/_simulate
{
"docs" : [
{ "_index": "index",
"_type": "_doc",
"_source": {
"field1":"test",
"field2":"test",
"Email":"ineligible:-(@elastic.co",
"Ticket_Reference_ID":"ABCDE12345"} },
{
"_index": "index",
"_type": "_doc",
"_source": {
"field1":"test",
"field2":"test",
"Email":"eligible:-)@not_elastic.suffix",
"Ticket_Reference_ID":"FGHIJ67890"} }
]
}
# clean up
DELETE *eligible*
PUT _template/single_shard
{
"index_patterns" : "*eligible",
"order" : 1,
"settings": {
"number_of_shards": 1,
"number_of_replicas": 0
}
}
# count the eligible and ineligible candidates using a terms aggregation:
GET *eligible/_search
{
"size": 0,
"aggs" : {
"eligibility" : {
"terms" : { "field" : "_index" }
}
}
}
# query the documents using random_score function_score query
GET lead_eligible/_search
{
"size": 1,
"_source": "T*",
"query": {
"function_score": {
"functions": [
{
"random_score": {
"seed": 30625,
"field": "_id"
}
}
]
}
}
}
# index some (fake) test content
PUT _bulk
{ "index" : { "_index" : "", "_type" : "_doc", "pipeline": "email_to_id_route_and_redact"} }
{ "field1":"test", "field2":"test", "Email":"ineligible:-(@elastic.co", "Ticket_Reference_ID":"ABCDE12345"}
{ "index" : { "_index" : "", "_type" : "_doc", "pipeline": "email_to_id_route_and_redact"} }
{ "field1":"test", "field2":"test", "Email":"eligible:-)@not_elastic.suffix", "Ticket_Reference_ID":"FGHIJ67890"}
# query for indexed content
GET *eligible/_search
{
"size": 2,
"aggs" : {
"eligibility" : {
"terms" : { "field" : "_index" }
}
}
}
# expected output (took value will vary)
{
"took": 125,
"timed_out": false,
"_shards": {
"total": 2,
"successful": 2,
"skipped": 0,
"failed": 0
},
"hits": {
"total": 2,
"max_score": 1,
"hits": [
{
"_index": "employee_ineligible",
"_type": "_doc",
"_id": "ABCDE12345",
"_score": 1,
"_source": {
"field1": "pii_redacted",
"Email": "pii_redacted",
"field2": "pii_redacted",
"Ticket_Reference_ID": "ABCDE12345"
}
},
{
"_index": "lead_eligible",
"_type": "_doc",
"_id": "FGHIJ67890",
"_score": 1,
"_source": {
"field1": "pii_redacted",
"Email": "pii_redacted",
"field2": "pii_redacted",
"Ticket_Reference_ID": "FGHIJ67890"
}
}
]
},
"aggregations": {
"eligibility": {
"doc_count_error_upper_bound": 0,
"sum_other_doc_count": 0,
"buckets": [
{
"key": "employee_ineligible",
"doc_count": 1
},
{
"key": "lead_eligible",
"doc_count": 1
}
]
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment