Skip to content

Instantly share code, notes, and snippets.

@therokh
Created February 22, 2018 23:08
Show Gist options
  • Save therokh/84982717e5f0829edb84dca6ead1d6c8 to your computer and use it in GitHub Desktop.
Save therokh/84982717e5f0829edb84dca6ead1d6c8 to your computer and use it in GitHub Desktop.
Script to parse Pshitt JSON output into InfluxDB
import requests
import geohash
import json
from datetime import datetime, timezone
from influxdb import InfluxDBClient
# noinspection PyUnresolvedReferences
from requests.packages.urllib3.exceptions import InsecureRequestWarning
# noinspection PyUnresolvedReferences
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
# Load JSON file
json_file = json.load(open('$FILENAME'))
# Var dec
count = 0
influx_payload = []
results = {}
failed = []
# Iterate over the entries and insert into InfluxDB
for attempt in json_file['attempts']:
count += 1
country_code = ""
country = ""
# GeoIP lookups are slow and we don't want to hammer the provider, so insert
# the result into 'results' dictionary and query that on each iteration.
# Check if this IP has already had a lookup
if attempt['src_ip'] in results:
# Positive match, don't do another
print(str(attempt['src_ip'] + " is cached"))
else:
print(str(attempt['src_ip']) + " not found, performing lookup")
# Do a lookup for the geo data, cache the results
lookup = requests.get('http://geoip.nekudo.com/api/{}'.format(attempt['src_ip'])).json()
results[attempt['src_ip']] = lookup
# If we cannot find the geolocation data for an IP, don't add it. Report at end
data = results.get(attempt['src_ip'])
if 'country' not in data:
print("Unable to locate data for " + str(attempt['src_ip']))
failed.append([attempt['src_ip'],attempt['timestamp']])
else:
country_code = data['country']['code']
country = data['country']['name']
influx_payload.append(
{
"measurement": "login_attempt",
"tags": {
"host": "$insert_your_honeypot_hostname_here",
"country_code": country_code,
"country": country,
},
"time": attempt['timestamp'],
"fields": {
"source_ip": attempt['src_ip'],
}
}
)
# Console output
print("Iteration: " + str(count))
print("Timestamp: " + str(attempt['timestamp']))
print("Source IP: " + str(attempt['src_ip']))
print("Country: " + str(data['country']['name']))
#print("City: " + str(data['city']))
if (count % 1000) == 0:
# Write to DB every 1000 entries. The remainder will be caught at the end.
# This is to prevent a massive write failing midway if we are dealing with a huge import.
# Format: host, port, user, pw, database
influx = InfluxDBClient('$INFLUXDBHOST', 8086, '$DBUSER', '$DBPASSWORD', '$DB')
influx.write_points(influx_payload)
print("=== Wrote to DB ===")
influx_payload = []
print("=======")
# Write any remaining items to InfluxDB
influx = InfluxDBClient('$INFLUXDBHOST', 8086, '$DBUSER', '$DBPASSWORD', '$DB')
influx.write_points(influx_payload)
print("Wrote to DB")
print("===============")
print("Imported IPs")
for key,val in results.items():
print(key, "=>", val['country'])
@bgulla
Copy link

bgulla commented Feb 23, 2018

do you have this running as cron job? wouldn't it re-ingest already parse entries? thanks

@therokh
Copy link
Author

therokh commented Feb 27, 2018

I run it manually. My plan for automation was to fork pshitt and have it talk to Influx directly

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment