Navigation Menu

Skip to content

Instantly share code, notes, and snippets.

@jtschichold
Last active April 25, 2018 20:22
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jtschichold/45c4e4099e857a53c713eaa381bc452d to your computer and use it in GitHub Desktop.
Save jtschichold/45c4e4099e857a53c713eaa381bc452d to your computer and use it in GitHub Desktop.
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""Script to cleanup Event Service channel. Based on pancloud"""
import os
import sys
import json
import logging
from datetime import datetime
from time import time
curpath = os.path.dirname(os.path.abspath(__file__))
sys.path[:0] = [os.path.join(curpath, os.pardir)]
from pancloud import EventService
# logging.basicConfig(level=logging.DEBUG)
url = 'https://apigw-stg4.us.paloaltonetworks.com'
# `export ACCESS_TOKEN=<access token>`
access_token = os.environ['ACCESS_TOKEN']
# Create Event Service instance
es = EventService(
url=url,
headers={
'Authorization': 'Bearer {}'.format(access_token),
"Content-Type": "application/json",
"Accept": "application/json"
}
)
channel_id = 'EventFilter'
t0 = time()
last_time_generated = 0
while True:
p = es.poll(channel_id)
result = json.loads(p.text)
nevents = 0
for record in result:
events = record.get('event', [])
for e in events:
if e['time_generated'] >= t0:
print 'Current space-time continuum coordinates reached'
sys.exit(0)
if e['time_generated'] > last_time_generated:
last_time_generated = e['time_generated']
nevents += len(events)
if nevents == 0:
print 'No more events'
sys.exit(0)
dt = datetime.fromtimestamp(last_time_generated)
print 'ACK {} ({})'.format(nevents, dt.isoformat())
es.ack(channel_id)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment