Skip to content

Instantly share code, notes, and snippets.

@cdot65
Created April 3, 2023 21:51
Show Gist options
  • Save cdot65/3b1e9af7ae1e531efb7dc3912f719b6c to your computer and use it in GitHub Desktop.
Save cdot65/3b1e9af7ae1e531efb7dc3912f719b6c to your computer and use it in GitHub Desktop.
Enrich as subnet and get reputation for IP addresses from AutoFocus
import os
import sys
from typing import Dict, Optional
import requests
import json
import gzip
import boto3
import botocore
class IPAddressIntel:
def __init__(self, source_ip_address: str):
self.source_ip_address = source_ip_address
self.ipstack = None
self.fraud_score = None
self.autofocus = None
self.ipqualityscore = None
def set_ipstack(self, data: Dict):
self.ipstack = data
def set_fraud_score(self, score: int):
self.fraud_score = score
def set_autofocus(self, data: Dict):
self.autofocus = data
def set_ipqualityscore(self, data: Dict):
self.ipqualityscore = data
def ipstack_score(api_key: str, ip: str) -> Optional[Dict]:
try:
return requests.get(f"http://api.ipstack.com/{ip}?access_key={api_key}").json()
except:
return None
def af_score(ip: str) -> Optional[Dict]:
from autofocus import AutoFocusAPI
from autofocus.factories.tic import ThreatIntelFactory
AutoFocusAPI.api_key = os.environ.get('AF_API_KEY')
try:
af = ThreatIntelFactory().get_tic_summary(ipv4=ip).__dict__
af['whois'] = str(af['whois'])
return af
except:
return None
def ipqs_score(api_key: str, ip: str) -> Optional[Dict]:
try:
return requests.get(f"https://www.ipqualityscore.com/api/json/ip/{api_key}/{ip}").json()
except:
return None
def score_risk(i: Dict) -> int:
safe_words = ["List", "Get", "Describe"]
score = 50
try:
if i['ipstack']['country_code'] == 'US':
score -= 10
else:
score += 10
except:
print("GeoIP failed")
try:
if any(x in i['eventName'] for x in safe_words):
score -= 10
else:
score += 20
except:
print("event ranking failed")
return score
def process_records(records: Dict) -> Dict:
intel_bucket = os.environ.get('INTEL_BUCKET')
s3 = boto3.client('s3')
transformed = {'Records': []}
for record in records['Records']:
ip_intel = IPAddressIntel(record['sourceIPAddress'])
if json.loads(os.environ.get('IPSTACK_API_ENABLED').lower()):
try:
obj = s3.get_object(Bucket=intel_bucket, Key=f"{ip_intel.source_ip_address}_ipstack.json")
ip_intel.set_ipstack(json.loads(obj['Body'].read()))
except s3.exceptions.NoSuchKey:
ipstack_data = ipstack_score(os.environ.get('IPSTACK_API_KEY'), ip_intel.source_ip_address)
if ipstack_data is not None:
ip_intel.set_ipstack(ipstack_data)
s3.put_object(
Body=json.dumps(ipstack_data),
Key=f"{ip_intel.source_ip_address}_ipstack.json",
Bucket=intel_bucket)
if json.loads(os.environ.get('NATIVE_RISK_ENABLED').lower()):
ip_intel.set_fraud_score(score_risk(record))
if json.loads(os.environ.get('AF_API_ENABLED').lower()):
try:
obj = s3.get_object(Bucket=intel_bucket, Key=f"{ip_intel.source_ip_address}_autofocus.json")
ip_intel.set_autofocus(json.loads(obj['Body'].read()))
except s3.exceptions.NoSuchKey:
autofocus_data = af_score(ip_intel.source_ip_address)
if autofocus_data is not None:
ip_intel.set_autofocus(autofocus_data)
s3.put_object(
Body=json.dumps(autofocus_data),
Key=f"{ip_intel.source_ip_address}_autofocus.json",
Bucket=intel_bucket)
if json.loads(os.environ.get('IPQS_API_ENABLED').lower()):
try:
obj = s3.get_object(Bucket=intel_bucket, Key=f"{ip_intel.source_ip_address}_ipqualityscore.json")
ip_intel.set_ipqualityscore(json.loads(obj['Body'].read()))
except s3.exceptions.NoSuchKey:
ipqualityscore_data = ipqs_score(os.environ.get('IPQS_API_KEY'), ip_intel.source_ip_address)
if ipqualityscore_data is not None:
ip_intel.set_ipqualityscore(ipqualityscore_data)
s3.put_object(
Body=json.dumps(ipqualityscore_data),
Key=f"{ip_intel.source_ip_address}_ipqualityscore.json",
Bucket=intel_bucket)
transformed_record = record.copy()
transformed_record.update(vars(ip_intel))
transformed['Records'].append(transformed_record)
return transformed
if __name__ == "__main__":
with open("sample_input.json", "r") as f:
input_data = json.load(f)
transformed_data = process_records(input_data)
with open("sample_output.json", "w") as f:
json.dump(transformed_data, f)
@cdot65
Copy link
Author

cdot65 commented Apr 3, 2023

You will need to create a requirements.txt file with the following dependencies:

boto3
botocore
autofocus-client-library

To build the Docker image, run:

docker build -t your-image-name .

To run the container:

docker run -it --rm --name your-container-name \
    -e INTEL_BUCKET=your-intel-bucket \
    -e IPSTACK_API_ENABLED=true \
    -e IPSTACK_API_KEY=your-ipstack-api-key \
    -e NATIVE_RISK_ENABLED=true \
    -e AF_API_ENABLED=true \
    -e AF_API_KEY=your-af-api-key \
    -e IPQS_API_ENABLED=true \
    -e IPQS_API_KEY=your-ipqs-api-key \
    your-image-name

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment