Skip to content

Instantly share code, notes, and snippets.

@zpoint
Last active August 15, 2020 04:50
Show Gist options
  • Save zpoint/3364e01e721815705617bab0587658ce to your computer and use it in GitHub Desktop.
Save zpoint/3364e01e721815705617bab0587658ce to your computer and use it in GitHub Desktop.
es painless nested script aggregation and sum of all buckets

I need to achieve my goal in a single ES query:

Know how many employees in every department in a company, and get the count of active employees, new employees, and leave employees for every department, and sum these buckets to get the active/new/leave count for company

The specific condition can be described as follow(simplified version)

  • Need to search for all employee in the target company (c_company_id field)
  • Need to count the employee without a department("missing": "unknown")
  • The employee only stores hire date and leave date
  • The count of active employees in the beginning of the month(employed_origin field)
    • For those with c_status in [1, 2] (not leaved employee), with hire date less than 2020-01-01, will contribute to count of active employees
    • For thoes with c_status value 3(leaved employee), with hire date less than 2020-01-01 and leave date greater than 2020-01-01, will contribute to count of active employees
  • new employees(hire field)
    • Hire date between 2020-01-01 and 2020-02-01
  • leave employees(leave field)
    • c_status value 3 and leave date between 2020-01-01 and 2020-02-01
  • The count of active employees in the end of the month(employed field)(The real needed field)
    • employed_origin + hire - leave
  • Sum all buckets to get the company's employed count, hire count and leave count

After hours of search engine's help, I finally make all the conditions into a single ES query, and the performance also meets my requirement

GET my_index/_search
	{
	  "track_total_hits": 100000,
	  "query": {
	    "bool": {
	      "must": [
	        {
	          "term": {
	            "c_company_id": "xxxxxxxxxxxxx"
	          }
	        }
	      ]
	    }
	  },
	  "size": 0,
	  "aggs": {
	    "department_id": {
	      "terms": {
	        "field": "c_department_id",
	        "size" : 3000,
	        "missing": "unknown"
	      },
	      "aggs": {
	        "employed_origin": {
	          "sum": {
	            "script": {
	              "lang": "painless",
	              "source": "def lower_ms = Instant.parse(params.lower_date + \"T00:00:00.00Z\").toEpochMilli();        if (doc.containsKey('c_work_status') && doc['c_work_status'].size() > 0 && doc.containsKey('c_hire_dt') && doc['c_hire_dt'].size() > 0 && doc['c_hire_dt'].value.toInstant().toEpochMilli() < lower_ms)        {            if (1 <= doc['c_work_status'].value && doc['c_work_status'].value <= 2)            { return 1; }            else if (doc['c_work_status'].value == 4 && doc.containsKey('c_leave_dt') && doc['c_leave_dt'].size() > 0 && doc['c_leave_dt'].value.toInstant().toEpochMilli() >= lower_ms)            { return 1; }            else            { return 0; }        }        else         { return 0; }",
	              "params": {
	                "upper_date": "2020-02-01",
	                "lower_date": "2020-01-01"
	              }
	            }
	          }
	        },
	        "hire": {
	          "sum": {
	            "script": {
	              "lang": "painless",
	              "source": "def lower_ms = Instant.parse(params.lower_date + \"T00:00:00.00Z\").toEpochMilli(); def upper_ms = Instant.parse(params.upper_date + \"T00:00:00.00Z\").toEpochMilli(); if (doc.containsKey('c_hire_dt') && doc['c_hire_dt'].size() > 0 && doc['c_hire_dt'].value.toInstant().toEpochMilli() >= lower_ms && doc['c_hire_dt'].value.toInstant().toEpochMilli() < upper_ms) { return 1; } else { return 0; }",
	              "params": {
	                "upper_date": "2020-02-01",
	                "lower_date": "2020-01-01"
	              }
	            }
	          }
	        },
	        "leave": {
	          "sum": {
	            "script": {
	              "lang": "painless",
	              "source": "def lower_ms = Instant.parse(params.lower_date + \"T00:00:00.00Z\").toEpochMilli(); def upper_ms = Instant.parse(params.upper_date + \"T00:00:00.00Z\").toEpochMilli(); if (doc.containsKey('c_work_status') && doc['c_work_status'].size() > 0 && doc['c_work_status'].value == 3 && doc.containsKey('c_leave_dt') && doc['c_leave_dt'].size() > 0 && doc['c_leave_dt'].value.toInstant().toEpochMilli() >= lower_ms && doc['c_leave_dt'].value.toInstant().toEpochMilli() < upper_ms) { return 1; } else { return 0; }",
	              "params": {
	                "upper_date": "2020-02-01",
	                "lower_date": "2020-01-01"
	              }
	            }
	          }
	
	        },
	        "employed": {
	                "bucket_script": {
	                    "buckets_path": {
	                        "employed_origin": "employed_origin",
	                        "hire": "hire",
	                        "leave": "leave"
	                    },
	                    "script": "params.employed_origin + params.hire - params.leave"
	                }
	            }
	      }
	    },
	    "employed_origin": {
	      "sum_bucket": {
	        "buckets_path": "department_id>employed_origin"
	      }
	    },
	    "employed": {
	      "sum_bucket": {
	        "buckets_path": "department_id>employed"
	      }
	    },
	    "hire": {
	      "sum_bucket": {
	        "buckets_path": "department_id>hire"
	      }
	    },
	    "leave": {
	      "sum_bucket": {
	        "buckets_path": "department_id>leave"
	      }
	    }
	  }
	}

image title

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment