Created
November 13, 2019 00:55
-
-
Save AlLongley/d9f6c612d4cf10e620ed65a29da330f1 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: future_fstrings -*- | |
import os | |
import traceback | |
import sys | |
import time | |
import select | |
import itertools | |
from concurrent import futures | |
from sumoappclient.sumoclient.base import BaseCollector | |
from sumoappclient.omnistorage.factory import ProviderFactory | |
from sumoappclient.sumoclient.factory import OutputHandlerFactory | |
from sumoappclient.sumoclient.httputils import ClientMixin, SessionPool | |
from sumoappclient.common.utils import get_current_timestamp, convert_epoch_to_utc_date | |
def stdinWait(text, time, default=None): | |
print(text) | |
i, o, e = select.select([sys.stdin], [], [], time) | |
if (i): | |
return sys.stdin.readline().strip() | |
else: | |
return default | |
class NetskopeCollector(BaseCollector): | |
SINGLE_PROCESS_LOCK_KEY = 'is_netskopecollector_running' | |
CONFIG_FILENAME = "netskope.yaml" | |
def __init__(self): | |
self.project_dir = self.get_current_dir() | |
super(NetskopeCollector, self).__init__(self.project_dir) | |
self.api_config = self.config['Netskope'] | |
self.netskope_conn = SessionPool(self.collection_config['MAX_RETRY'], self.collection_config['BACKOFF_FACTOR'], logger=self.log) | |
self.netskope_session = self.netskope_conn.get_request_session() | |
if self.collection_config['BACKFILL_DAYS'] > 90: | |
raise Exception('BACKFILL_DAYS cannot be more than 90 days') | |
self.DEFAULT_START_TIME_EPOCH = get_current_timestamp() - self.collection_config['BACKFILL_DAYS']*24*60*60 | |
def get_current_dir(self): | |
cur_dir = os.path.dirname(__file__) | |
return cur_dir | |
def get_endpoint_url(self, event_type): | |
if event_type in self.api_config.get('ALERT_TYPES', []): | |
return self.api_config['NETSKOPE_ALERT_ENDPOINT'] | |
else: | |
return self.api_config['NETSKOPE_EVENT_ENDPOINT'] | |
def set_fetch_state(self, event_type, start_time_epoch, end_time_epoch, last_record_epoch=None): | |
if end_time_epoch: # end time epoch could be none in cases where no event is present | |
assert start_time_epoch <= end_time_epoch | |
obj = { | |
'url': self.get_endpoint_url(event_type), | |
"event_type": event_type, | |
"start_time_epoch": start_time_epoch, | |
"end_time_epoch": end_time_epoch, | |
"last_record_epoch": last_record_epoch | |
} | |
self.kvstore.set(event_type, obj) | |
return obj | |
def set_new_end_epoch_time(self, event_type, start_time_epoch): | |
params = { | |
'token': self.api_config['TOKEN'], | |
'limit': 1, | |
'starttime': start_time_epoch, | |
'endtime': get_current_timestamp(), | |
'skip': 0, | |
'type': event_type | |
} | |
url = self.get_endpoint_url(event_type) | |
success, respjson = ClientMixin.make_request(url, method=self.api_config['FETCH_METHOD'], session=self.netskope_session, params=params, logger=self.log, TIMEOUT=self.collection_config['TIMEOUT'], MAX_RETRY=self.collection_config['MAX_RETRY'], BACKOFF_FACTOR=self.collection_config['BACKOFF_FACTOR']) | |
start_date = convert_epoch_to_utc_date(params['starttime']) | |
end_date = convert_epoch_to_utc_date(params['endtime']) | |
if success and respjson["status"] == "success" and len(respjson["data"]) > 0: | |
obj = self.set_fetch_state(event_type, start_time_epoch, respjson["data"][0]["timestamp"], respjson["data"][0]["timestamp"]) | |
self.log.info(f'''Creating task for {event_type} from {start_date} to {end_date}''') | |
return obj | |
else: | |
self.log.info(f'''No events are available for {event_type} from {start_date} to {end_date}''') | |
return None | |
def is_running(self): | |
self.log.info("Acquiring single instance lock") | |
return self.kvstore.acquire_lock(self.SINGLE_PROCESS_LOCK_KEY) | |
def stop_running(self): | |
self.log.info("Releasing single instance lock") | |
return self.kvstore.release_lock(self.SINGLE_PROCESS_LOCK_KEY) | |
def transform_data(self, data): | |
return data | |
def get_last_record_epoch(self, obj): | |
params = { | |
'token': self.api_config['TOKEN'], | |
'limit': self.api_config['PAGINATION_LIMIT'], | |
'starttime': obj['start_time_epoch'], | |
'endtime': obj['end_time_epoch'], | |
'skip': obj['skip'], | |
'type': obj['event_type'] | |
} | |
params['skip'] -= 1 | |
success, respjson = ClientMixin.make_request(obj['url'], method=self.api_config['FETCH_METHOD'], session=self.netskope_session, params=params, logger=self.log, TIMEOUT=self.collection_config['TIMEOUT'], MAX_RETRY=self.collection_config['MAX_RETRY'], BACKOFF_FACTOR=self.collection_config['BACKOFF_FACTOR']) | |
start_date = convert_epoch_to_utc_date(params['starttime']) | |
end_date = convert_epoch_to_utc_date(params['endtime']) | |
if success and respjson["status"] == "success" and len(respjson["data"]) > 0: | |
last_record_epoch = respjson["data"][0]["timestamp"] | |
last_record_date = convert_epoch_to_utc_date(last_record_epoch) | |
self.log.info(f'''last record for {obj['event_type']} from {params['starttime']} to {params['endtime']} skip: {params['skip']} is {last_record_date}''') | |
return last_record_epoch | |
else: | |
self.log.info("Taking last_record_epoch as input from user") | |
last_record_epoch = stdinWait("Enter last record timestamp for %s > " % obj['event_type'], 15) | |
if not last_record_epoch: | |
raise Exception(f'''last record for {obj['event_type']} from {params['starttime']} to {params['endtime']} skip: {params['skip']} not found''') | |
else: | |
last_record_epoch = int(last_record_epoch) | |
return last_record_epoch | |
def fetch(self, url, event_type, start_time_epoch, end_time_epoch, last_record_epoch): | |
params = { | |
'token': self.api_config['TOKEN'], | |
'limit': self.api_config['PAGINATION_LIMIT'], | |
'starttime': start_time_epoch, | |
'endtime': end_time_epoch, | |
'skip': 0, | |
'type': event_type | |
} | |
if last_record_epoch: | |
params['endtime'] = last_record_epoch # logs of same timestamp may be repeated | |
output_handler = OutputHandlerFactory.get_handler(self.config['Collection']['OUTPUT_HANDLER'], config=self.config) | |
next_request = send_success = True | |
page_count = total_records = 0 | |
move_window = False | |
sess = self.netskope_conn.get_request_session() | |
last_record_epoch = None | |
try: | |
while next_request: | |
page_count += 1 | |
fetch_success, respjson = ClientMixin.make_request(url, method=self.api_config['FETCH_METHOD'], session=sess, params=params, logger=self.log, TIMEOUT=self.collection_config['TIMEOUT'], MAX_RETRY=self.collection_config['MAX_RETRY'], BACKOFF_FACTOR=self.collection_config['BACKOFF_FACTOR']) | |
fetch_success = fetch_success and respjson["status"] == "success" # netskope sends 200 for errors | |
if fetch_success: | |
data = respjson["data"] | |
if len(data) > 0: | |
data = self.transform_data(data) | |
send_success = output_handler.send(data) | |
if send_success: | |
params['skip'] += len(data) | |
total_records += len(data) | |
last_record_epoch = data[-1]["timestamp"] | |
self.log.info(f'''Successfully Sent Page: {page_count} Event Type: {event_type} Datalen: {len( | |
data)} starttime: {convert_epoch_to_utc_date( | |
params['starttime'])} endtime: {convert_epoch_to_utc_date(params['endtime'])} skip: {params['skip']} last_record_epoch: {convert_epoch_to_utc_date(last_record_epoch)}''') | |
else: # no data so moving window | |
move_window = True | |
next_request = fetch_success and send_success and (not move_window) | |
if move_window: | |
self.log.debug( | |
f'''Moving starttime window for {event_type} to {convert_epoch_to_utc_date(params["endtime"] + 1)}''') | |
self.set_fetch_state(event_type, end_time_epoch+1, None, None) | |
elif not (fetch_success and send_success): # saving skip in casee of failures for restarting in future | |
self.set_fetch_state(event_type, start_time_epoch, end_time_epoch, last_record_epoch) | |
self.log.error( | |
f'''Failed to send Event Type: {event_type} Page: {page_count} starttime: {convert_epoch_to_utc_date(params['starttime'])} endtime: {convert_epoch_to_utc_date(params['endtime'])} fetch_success: {fetch_success} send_success: {send_success} skip: {params['skip']} last_record_epoch: {last_record_epoch}''') | |
except Exception as e: | |
self.set_fetch_state(event_type, start_time_epoch, end_time_epoch, last_record_epoch) | |
raise e | |
finally: | |
self.netskope_conn.close() | |
output_handler.close() | |
self.log.info(f''' Total messages fetched {total_records} for Event Type: {event_type} starttime: {convert_epoch_to_utc_date(params['starttime'])} endtime: {convert_epoch_to_utc_date(params['endtime'])}''') | |
def build_task_params(self): | |
tasks = [] | |
for et in itertools.chain(self.api_config.get('ALERT_TYPES', []), self.api_config.get('EVENT_TYPES', [])): | |
if self.kvstore.has_key(et): | |
obj = self.kvstore.get(et) | |
if obj["end_time_epoch"] is None: | |
obj = self.set_new_end_epoch_time(et, obj["start_time_epoch"]) | |
else: | |
self.set_fetch_state(et, self.DEFAULT_START_TIME_EPOCH, None) # setting start time initially otherwise it always fetches in range(cur time, cur time) | |
obj = self.set_new_end_epoch_time(et, self.DEFAULT_START_TIME_EPOCH) | |
if obj is None: # no new events so continue | |
continue | |
if "skip" in obj: | |
# for backward compatibility | |
if ("last_record_epoch" not in obj) or (obj["last_record_epoch"] is None and obj['skip'] > 0): | |
obj["last_record_epoch"] = self.get_last_record_epoch(obj) | |
self.log.info("setting last_record_epoch to %d" % obj["last_record_epoch"]) | |
del obj['skip'] | |
tasks.append(obj) | |
self.log.info(f'''Building tasks {len(tasks)}''') | |
return tasks | |
def run(self): | |
if self.is_running(): | |
try: | |
self.log.info('Starting Netskope Event Forwarder...') | |
task_params = self.build_task_params() | |
all_futures = {} | |
log_types = ",".join([t['event_type'] for t in task_params]) | |
if log_types: | |
self.log.info("spawning %d workers for log types: %s" % (self.config['Collection']['NUM_WORKERS'], log_types)) | |
with futures.ThreadPoolExecutor(max_workers=self.config['Collection']['NUM_WORKERS']) as executor: | |
results = {executor.submit(self.fetch, **param): param for param in task_params} | |
all_futures.update(results) | |
for future in futures.as_completed(all_futures): | |
param = all_futures[future] | |
event_type = param["event_type"] | |
try: | |
future.result() | |
obj = self.kvstore.get(event_type) | |
except Exception as exc: | |
self.log.error(f'''Event Type: {event_type} thread generated an exception: {exc}''', exc_info=True) | |
else: | |
self.log.info(f'''Event Type: {event_type} thread completed {obj}''') | |
finally: | |
self.stop_running() | |
self.netskope_conn.closeall() | |
else: | |
if not self.is_process_running(["sumonetskopecollector", "netskope.py"]): | |
self.kvstore.release_lock_on_expired_key(self.SINGLE_PROCESS_LOCK_KEY) | |
def test(self): | |
params = { | |
"start_time_epoch": 1505228760, | |
"end_time_epoch": int(time.time()), | |
'url': self.api_config['NETSKOPE_EVENT_ENDPOINT'], | |
"event_type": "Application", | |
"skip": 0 | |
} | |
self.fetch(**params) | |
def main(*args, **kwargs): | |
try: | |
ns = NetskopeCollector() | |
ns.run() | |
# ns.test() | |
except BaseException as e: | |
traceback.print_exc() | |
if __name__ == '__main__': | |
main() | |
For logging all input to a file, the quick and dirty is to open a file for appending and then write a line for each entry.
If you replace your print(data)
between 151 and 152 to the following, it will open a log file and append to it
with open("netskope.log",'a+') as fd:
fd.write(data)
fd.write('\n')
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Line 28 - Absolute or relative path to your YAML config (without testing, I would say safest is to place it in same directory as this script)
Line 152 - Change to the following will still receive the data (from line 146) and "transform" it (appears to do nothing here), however no send request will actually be made, but still act as though it has
send_success = True#output_handler.send(data)
(send_success is now True, everything past the # is a comment)
If you would like it to spam the raw JSON data that it has fetch to your console for debugging purposes, you can place the following between lines 151 and 152. Making sure to use all spaces instead of tabs to indent as Python is anal about Indenting errors
print(data)
Note that this will still connect to the "OUTPUT_HANDLER" and not send anything. But if you want to absolutely ensure there's no connection made to the receiver, place a # at the beginning of where the code begins on lines
137
#output_handler = OutputHandlerFactory.get_handler
and 178
#output_handler.close()