Skip to content

Instantly share code, notes, and snippets.

@oprietop
Last active September 23, 2019 15:43
Show Gist options
  • Save oprietop/d815bd1b74a65355ae92dea24aca11cc to your computer and use it in GitHub Desktop.
Save oprietop/d815bd1b74a65355ae92dea24aca11cc to your computer and use it in GitHub Desktop.
Playing with influxdb's http API
#! /usr/bin/python
# -*- coding: utf8 -*-
import os
import json
import time
import requests
from requests.adapters import HTTPAdapter
class influxWrapper(object):
def __init__(self):
# Variables
self.influxdb_url = os.environ.get('INFLUXDB_URL', 'http://influxdb:8086')
self.influxdb_host = os.environ.get('INFLUXDB_HOST', 'influx')
self.influxdb_db = os.environ.get('INFLUXDB_DB', 'events')
self.influxdb_user = os.environ.get('INFLUXDB_USER', 'user')
self.influxdb_pass = os.environ.get('INFLUXDB_PASS', 'pass')
self.http_proxy = os.environ.get('HTTP_PROXY', "http://proxy:8080")
self.https_proxy = os.environ.get('HTTPS_PROXY', "http://proxy:8080")
# Init requests object
self.session = requests.Session()
self.session.auth = (self.influxdb_user, self.influxdb_pass)
self.session.proxies = {"http": self.http_proxy, "https": self.https_proxy}
self.session.mount('http://', HTTPAdapter(max_retries=5))
self.session.mount('https://', HTTPAdapter(max_retries=5))
self.ping()
def ping(self):
response = self.session.get("{}/ping".format(self.influxdb_url))
print("Pinging '{}':\n>> {} {}".format(self.influxdb_url, response.status_code, response.text))
def create_db(self):
payload = {"q": "CREATE DATABASE " + self.influxdb_db}
response = self.session.post("{}/query".format(self.influxdb_url), params=payload)
print('Creating the {} Database:\n>> {} {}'.format(self.influxdb_db, response.status_code, response.json()))
def dump(self):
payload = {"q": "SELECT * FROM /.*/", "db": self.influxdb_db}
response = self.session.post("{}/query".format(self.influxdb_url), params=payload)
print('Dumping all series on {}:\n>> {} {}'.format(self.influxdb_db, response.status_code, response.json()))
def drop_all(self):
payload = {"q": "DROP series FROM /.*/", "db": self.influxdb_db}
response = self.session.post("{}/query".format(self.influxdb_url), params=payload)
print('Dropping all series on {}:\n>> {} {}'.format(self.influxdb_db, response.status_code, response.json()))
def event(self, **kwargs):
payload = { "db": self.influxdb_db }
data = 'events,sysg2r="{sysg2r}",type="{type}" text="{text}",title="{title}"'.format(**kwargs)
response = self.session.post("{}/write".format(self.influxdb_url), params=payload, data=data.encode('utf-8'))
print("Sending event: '{}'\n>> {} {}".format(data, response.status_code, response.text))
def query(self, q='SELECT * FROM /.*/'):
response = self.session.post("{}/query".format(self.influxdb_url), params={"q": q})
return response.text
if __name__ == '__main__':
# Initialize
iw = influxWrapper()
# Create the Database
iw.create_db()
time.sleep(1)
# Write an Event
iw.event( text = '<a href="http://bd.vg" target="_blank">This is a URL</a>'
, title = 'Testing Anotations...(日本語)'
, sysg2r = 'SYS'
, type = 'TEST'
)
time.sleep(1)
# Dump all series
iw.dump()
# Delele all series
iw.drop_all()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment