Skip to content

Instantly share code, notes, and snippets.

@AlLongley
Created November 13, 2019 00:55
Show Gist options
  • Save AlLongley/d9f6c612d4cf10e620ed65a29da330f1 to your computer and use it in GitHub Desktop.
Save AlLongley/d9f6c612d4cf10e620ed65a29da330f1 to your computer and use it in GitHub Desktop.
# -*- coding: future_fstrings -*-
import os
import traceback
import sys
import time
import select
import itertools
from concurrent import futures
from sumoappclient.sumoclient.base import BaseCollector
from sumoappclient.omnistorage.factory import ProviderFactory
from sumoappclient.sumoclient.factory import OutputHandlerFactory
from sumoappclient.sumoclient.httputils import ClientMixin, SessionPool
from sumoappclient.common.utils import get_current_timestamp, convert_epoch_to_utc_date
def stdinWait(text, time, default=None):
print(text)
i, o, e = select.select([sys.stdin], [], [], time)
if (i):
return sys.stdin.readline().strip()
else:
return default
class NetskopeCollector(BaseCollector):
SINGLE_PROCESS_LOCK_KEY = 'is_netskopecollector_running'
CONFIG_FILENAME = "netskope.yaml"
def __init__(self):
self.project_dir = self.get_current_dir()
super(NetskopeCollector, self).__init__(self.project_dir)
self.api_config = self.config['Netskope']
self.netskope_conn = SessionPool(self.collection_config['MAX_RETRY'], self.collection_config['BACKOFF_FACTOR'], logger=self.log)
self.netskope_session = self.netskope_conn.get_request_session()
if self.collection_config['BACKFILL_DAYS'] > 90:
raise Exception('BACKFILL_DAYS cannot be more than 90 days')
self.DEFAULT_START_TIME_EPOCH = get_current_timestamp() - self.collection_config['BACKFILL_DAYS']*24*60*60
def get_current_dir(self):
cur_dir = os.path.dirname(__file__)
return cur_dir
def get_endpoint_url(self, event_type):
if event_type in self.api_config.get('ALERT_TYPES', []):
return self.api_config['NETSKOPE_ALERT_ENDPOINT']
else:
return self.api_config['NETSKOPE_EVENT_ENDPOINT']
def set_fetch_state(self, event_type, start_time_epoch, end_time_epoch, last_record_epoch=None):
if end_time_epoch: # end time epoch could be none in cases where no event is present
assert start_time_epoch <= end_time_epoch
obj = {
'url': self.get_endpoint_url(event_type),
"event_type": event_type,
"start_time_epoch": start_time_epoch,
"end_time_epoch": end_time_epoch,
"last_record_epoch": last_record_epoch
}
self.kvstore.set(event_type, obj)
return obj
def set_new_end_epoch_time(self, event_type, start_time_epoch):
params = {
'token': self.api_config['TOKEN'],
'limit': 1,
'starttime': start_time_epoch,
'endtime': get_current_timestamp(),
'skip': 0,
'type': event_type
}
url = self.get_endpoint_url(event_type)
success, respjson = ClientMixin.make_request(url, method=self.api_config['FETCH_METHOD'], session=self.netskope_session, params=params, logger=self.log, TIMEOUT=self.collection_config['TIMEOUT'], MAX_RETRY=self.collection_config['MAX_RETRY'], BACKOFF_FACTOR=self.collection_config['BACKOFF_FACTOR'])
start_date = convert_epoch_to_utc_date(params['starttime'])
end_date = convert_epoch_to_utc_date(params['endtime'])
if success and respjson["status"] == "success" and len(respjson["data"]) > 0:
obj = self.set_fetch_state(event_type, start_time_epoch, respjson["data"][0]["timestamp"], respjson["data"][0]["timestamp"])
self.log.info(f'''Creating task for {event_type} from {start_date} to {end_date}''')
return obj
else:
self.log.info(f'''No events are available for {event_type} from {start_date} to {end_date}''')
return None
def is_running(self):
self.log.info("Acquiring single instance lock")
return self.kvstore.acquire_lock(self.SINGLE_PROCESS_LOCK_KEY)
def stop_running(self):
self.log.info("Releasing single instance lock")
return self.kvstore.release_lock(self.SINGLE_PROCESS_LOCK_KEY)
def transform_data(self, data):
return data
def get_last_record_epoch(self, obj):
params = {
'token': self.api_config['TOKEN'],
'limit': self.api_config['PAGINATION_LIMIT'],
'starttime': obj['start_time_epoch'],
'endtime': obj['end_time_epoch'],
'skip': obj['skip'],
'type': obj['event_type']
}
params['skip'] -= 1
success, respjson = ClientMixin.make_request(obj['url'], method=self.api_config['FETCH_METHOD'], session=self.netskope_session, params=params, logger=self.log, TIMEOUT=self.collection_config['TIMEOUT'], MAX_RETRY=self.collection_config['MAX_RETRY'], BACKOFF_FACTOR=self.collection_config['BACKOFF_FACTOR'])
start_date = convert_epoch_to_utc_date(params['starttime'])
end_date = convert_epoch_to_utc_date(params['endtime'])
if success and respjson["status"] == "success" and len(respjson["data"]) > 0:
last_record_epoch = respjson["data"][0]["timestamp"]
last_record_date = convert_epoch_to_utc_date(last_record_epoch)
self.log.info(f'''last record for {obj['event_type']} from {params['starttime']} to {params['endtime']} skip: {params['skip']} is {last_record_date}''')
return last_record_epoch
else:
self.log.info("Taking last_record_epoch as input from user")
last_record_epoch = stdinWait("Enter last record timestamp for %s > " % obj['event_type'], 15)
if not last_record_epoch:
raise Exception(f'''last record for {obj['event_type']} from {params['starttime']} to {params['endtime']} skip: {params['skip']} not found''')
else:
last_record_epoch = int(last_record_epoch)
return last_record_epoch
def fetch(self, url, event_type, start_time_epoch, end_time_epoch, last_record_epoch):
params = {
'token': self.api_config['TOKEN'],
'limit': self.api_config['PAGINATION_LIMIT'],
'starttime': start_time_epoch,
'endtime': end_time_epoch,
'skip': 0,
'type': event_type
}
if last_record_epoch:
params['endtime'] = last_record_epoch # logs of same timestamp may be repeated
output_handler = OutputHandlerFactory.get_handler(self.config['Collection']['OUTPUT_HANDLER'], config=self.config)
next_request = send_success = True
page_count = total_records = 0
move_window = False
sess = self.netskope_conn.get_request_session()
last_record_epoch = None
try:
while next_request:
page_count += 1
fetch_success, respjson = ClientMixin.make_request(url, method=self.api_config['FETCH_METHOD'], session=sess, params=params, logger=self.log, TIMEOUT=self.collection_config['TIMEOUT'], MAX_RETRY=self.collection_config['MAX_RETRY'], BACKOFF_FACTOR=self.collection_config['BACKOFF_FACTOR'])
fetch_success = fetch_success and respjson["status"] == "success" # netskope sends 200 for errors
if fetch_success:
data = respjson["data"]
if len(data) > 0:
data = self.transform_data(data)
send_success = output_handler.send(data)
if send_success:
params['skip'] += len(data)
total_records += len(data)
last_record_epoch = data[-1]["timestamp"]
self.log.info(f'''Successfully Sent Page: {page_count} Event Type: {event_type} Datalen: {len(
data)} starttime: {convert_epoch_to_utc_date(
params['starttime'])} endtime: {convert_epoch_to_utc_date(params['endtime'])} skip: {params['skip']} last_record_epoch: {convert_epoch_to_utc_date(last_record_epoch)}''')
else: # no data so moving window
move_window = True
next_request = fetch_success and send_success and (not move_window)
if move_window:
self.log.debug(
f'''Moving starttime window for {event_type} to {convert_epoch_to_utc_date(params["endtime"] + 1)}''')
self.set_fetch_state(event_type, end_time_epoch+1, None, None)
elif not (fetch_success and send_success): # saving skip in casee of failures for restarting in future
self.set_fetch_state(event_type, start_time_epoch, end_time_epoch, last_record_epoch)
self.log.error(
f'''Failed to send Event Type: {event_type} Page: {page_count} starttime: {convert_epoch_to_utc_date(params['starttime'])} endtime: {convert_epoch_to_utc_date(params['endtime'])} fetch_success: {fetch_success} send_success: {send_success} skip: {params['skip']} last_record_epoch: {last_record_epoch}''')
except Exception as e:
self.set_fetch_state(event_type, start_time_epoch, end_time_epoch, last_record_epoch)
raise e
finally:
self.netskope_conn.close()
output_handler.close()
self.log.info(f''' Total messages fetched {total_records} for Event Type: {event_type} starttime: {convert_epoch_to_utc_date(params['starttime'])} endtime: {convert_epoch_to_utc_date(params['endtime'])}''')
def build_task_params(self):
tasks = []
for et in itertools.chain(self.api_config.get('ALERT_TYPES', []), self.api_config.get('EVENT_TYPES', [])):
if self.kvstore.has_key(et):
obj = self.kvstore.get(et)
if obj["end_time_epoch"] is None:
obj = self.set_new_end_epoch_time(et, obj["start_time_epoch"])
else:
self.set_fetch_state(et, self.DEFAULT_START_TIME_EPOCH, None) # setting start time initially otherwise it always fetches in range(cur time, cur time)
obj = self.set_new_end_epoch_time(et, self.DEFAULT_START_TIME_EPOCH)
if obj is None: # no new events so continue
continue
if "skip" in obj:
# for backward compatibility
if ("last_record_epoch" not in obj) or (obj["last_record_epoch"] is None and obj['skip'] > 0):
obj["last_record_epoch"] = self.get_last_record_epoch(obj)
self.log.info("setting last_record_epoch to %d" % obj["last_record_epoch"])
del obj['skip']
tasks.append(obj)
self.log.info(f'''Building tasks {len(tasks)}''')
return tasks
def run(self):
if self.is_running():
try:
self.log.info('Starting Netskope Event Forwarder...')
task_params = self.build_task_params()
all_futures = {}
log_types = ",".join([t['event_type'] for t in task_params])
if log_types:
self.log.info("spawning %d workers for log types: %s" % (self.config['Collection']['NUM_WORKERS'], log_types))
with futures.ThreadPoolExecutor(max_workers=self.config['Collection']['NUM_WORKERS']) as executor:
results = {executor.submit(self.fetch, **param): param for param in task_params}
all_futures.update(results)
for future in futures.as_completed(all_futures):
param = all_futures[future]
event_type = param["event_type"]
try:
future.result()
obj = self.kvstore.get(event_type)
except Exception as exc:
self.log.error(f'''Event Type: {event_type} thread generated an exception: {exc}''', exc_info=True)
else:
self.log.info(f'''Event Type: {event_type} thread completed {obj}''')
finally:
self.stop_running()
self.netskope_conn.closeall()
else:
if not self.is_process_running(["sumonetskopecollector", "netskope.py"]):
self.kvstore.release_lock_on_expired_key(self.SINGLE_PROCESS_LOCK_KEY)
def test(self):
params = {
"start_time_epoch": 1505228760,
"end_time_epoch": int(time.time()),
'url': self.api_config['NETSKOPE_EVENT_ENDPOINT'],
"event_type": "Application",
"skip": 0
}
self.fetch(**params)
def main(*args, **kwargs):
try:
ns = NetskopeCollector()
ns.run()
# ns.test()
except BaseException as e:
traceback.print_exc()
if __name__ == '__main__':
main()
@AlLongley
Copy link
Author

AlLongley commented Nov 13, 2019

Line 28 - Absolute or relative path to your YAML config (without testing, I would say safest is to place it in same directory as this script)

Line 152 - Change to the following will still receive the data (from line 146) and "transform" it (appears to do nothing here), however no send request will actually be made, but still act as though it has
send_success = True#output_handler.send(data)

(send_success is now True, everything past the # is a comment)

If you would like it to spam the raw JSON data that it has fetch to your console for debugging purposes, you can place the following between lines 151 and 152. Making sure to use all spaces instead of tabs to indent as Python is anal about Indenting errors

print(data)

Note that this will still connect to the "OUTPUT_HANDLER" and not send anything. But if you want to absolutely ensure there's no connection made to the receiver, place a # at the beginning of where the code begins on lines

137 #output_handler = OutputHandlerFactory.get_handler
and 178 #output_handler.close()

@AlLongley
Copy link
Author

AlLongley commented Nov 13, 2019

For logging all input to a file, the quick and dirty is to open a file for appending and then write a line for each entry.

If you replace your print(data) between 151 and 152 to the following, it will open a log file and append to it

with open("netskope.log",'a+') as fd:
    fd.write(data)
    fd.write('\n')

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment