Skip to content

Instantly share code, notes, and snippets.

@zebde
Last active June 23, 2016 16:42
Show Gist options
  • Save zebde/8838944546171fa1ecb038c8451435e1 to your computer and use it in GitHub Desktop.
Save zebde/8838944546171fa1ecb038c8451435e1 to your computer and use it in GitHub Desktop.
Sample TAXII polling script
import datetime
import os
import sys
from ConfigParser import SafeConfigParser
from datetime import timedelta
from StringIO import StringIO
import libtaxii as t
import libtaxii.clients as tc
import libtaxii.messages_11 as tm11
import lxml.etree
from dateutil.tz import tzutc
from libtaxii.constants import *
from stix.core import STIXPackage
path = os.path.dirname(os.path.abspath(sys.argv[0]))
parser = SafeConfigParser()
parser.read(path + '/config.ini')
username = parser.get('TAXII', 'Username')
password = parser.get('TAXII', 'Password')
proxy = parser.getboolean('TAXII', 'Proxy')
proxyaddress = parser.get('TAXII', 'ProxyAddress')
ssl = parser.getboolean('TAXII', 'HTTPS')
collection = parser.get('TAXII', 'Collection')
server = parser.get('TAXII', 'Server')
port = parser.get('TAXII', 'Port')
path = parser.get('TAXII', 'Path')
days = parser.get('TAXII', 'Days')
client = tc.HttpClient()
client.set_auth_type(tc.HttpClient.AUTH_BASIC)
client.set_use_https(ssl)
if proxy is True:
client.set_proxy(proxyaddress)
client.set_auth_credentials(
{'username': username, 'password': password})
def main():
discovery_request = tm11.DiscoveryRequest(tm11.generate_message_id())
discovery_xml = discovery_request.to_xml()
poll_params1 = tm11.PollParameters(
allow_asynch=False,
response_type=RT_COUNT_ONLY,
content_bindings=[tm11.ContentBinding(binding_id=CB_STIX_XML_11)],
)
poll_req3 = tm11.PollRequest(
message_id='PollReq03',
collection_name=collection,
poll_parameters=poll_params1,
exclusive_begin_timestamp_label=datetime.datetime.now(
tzutc()) - timedelta(days=int(days)),
inclusive_end_timestamp_label=datetime.datetime.now(tzutc()),
)
poll_xml = poll_req3.to_xml()
http_resp = client.call_taxii_service2(
server, path, VID_TAXII_XML_11,
poll_xml, port=port)
taxii_message = t.get_message_from_http_response(
http_resp, poll_req3.message_id)
if taxii_message.message_type == MSG_POLL_RESPONSE:
try:
for content in taxii_message.content_blocks:
package_io = StringIO(content.content)
pkg = STIXPackage.from_xml(package_io)
title = pkg.id_.split(':', 1)[-1]
with open(title + ".xml", "w") as text_file:
text_file.write(content.content)
print("[+] Successfully generated " + title)
except:
print("[-] Error with TAXII response")
else:
print("[-] Error with TAXII response")
if __name__ == "__main__":
main()
[TAXII]
Proxy: False
ProxyAddress: http://proxy.example.org:8080
HTTPS: False
Username: username
Password: password
Collection: collection.name
Server: taxii.example.com
Port: 80
Path: /taxii-data/
Days: 1

This is a basic TAXII Poll script that has a configuration file for ease of use. Once setup the script will save each content block as an XML file named after the individual package ID.

  • Save the scripts as taxii-poll.py and config.ini.
  • Modify config.ini to accomodate the requirements.
    • Proxy authentication is not currently available.
    • Days is the amount of days from todays date that you would like to poll
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment