Last active
September 23, 2019 15:43
-
-
Save oprietop/d815bd1b74a65355ae92dea24aca11cc to your computer and use it in GitHub Desktop.
Playing with influxdb's http API
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#! /usr/bin/python | |
# -*- coding: utf8 -*- | |
import os | |
import json | |
import time | |
import requests | |
from requests.adapters import HTTPAdapter | |
class influxWrapper(object): | |
def __init__(self): | |
# Variables | |
self.influxdb_url = os.environ.get('INFLUXDB_URL', 'http://influxdb:8086') | |
self.influxdb_host = os.environ.get('INFLUXDB_HOST', 'influx') | |
self.influxdb_db = os.environ.get('INFLUXDB_DB', 'events') | |
self.influxdb_user = os.environ.get('INFLUXDB_USER', 'user') | |
self.influxdb_pass = os.environ.get('INFLUXDB_PASS', 'pass') | |
self.http_proxy = os.environ.get('HTTP_PROXY', "http://proxy:8080") | |
self.https_proxy = os.environ.get('HTTPS_PROXY', "http://proxy:8080") | |
# Init requests object | |
self.session = requests.Session() | |
self.session.auth = (self.influxdb_user, self.influxdb_pass) | |
self.session.proxies = {"http": self.http_proxy, "https": self.https_proxy} | |
self.session.mount('http://', HTTPAdapter(max_retries=5)) | |
self.session.mount('https://', HTTPAdapter(max_retries=5)) | |
self.ping() | |
def ping(self): | |
response = self.session.get("{}/ping".format(self.influxdb_url)) | |
print("Pinging '{}':\n>> {} {}".format(self.influxdb_url, response.status_code, response.text)) | |
def create_db(self): | |
payload = {"q": "CREATE DATABASE " + self.influxdb_db} | |
response = self.session.post("{}/query".format(self.influxdb_url), params=payload) | |
print('Creating the {} Database:\n>> {} {}'.format(self.influxdb_db, response.status_code, response.json())) | |
def dump(self): | |
payload = {"q": "SELECT * FROM /.*/", "db": self.influxdb_db} | |
response = self.session.post("{}/query".format(self.influxdb_url), params=payload) | |
print('Dumping all series on {}:\n>> {} {}'.format(self.influxdb_db, response.status_code, response.json())) | |
def drop_all(self): | |
payload = {"q": "DROP series FROM /.*/", "db": self.influxdb_db} | |
response = self.session.post("{}/query".format(self.influxdb_url), params=payload) | |
print('Dropping all series on {}:\n>> {} {}'.format(self.influxdb_db, response.status_code, response.json())) | |
def event(self, **kwargs): | |
payload = { "db": self.influxdb_db } | |
data = 'events,sysg2r="{sysg2r}",type="{type}" text="{text}",title="{title}"'.format(**kwargs) | |
response = self.session.post("{}/write".format(self.influxdb_url), params=payload, data=data.encode('utf-8')) | |
print("Sending event: '{}'\n>> {} {}".format(data, response.status_code, response.text)) | |
def query(self, q='SELECT * FROM /.*/'): | |
response = self.session.post("{}/query".format(self.influxdb_url), params={"q": q}) | |
return response.text | |
if __name__ == '__main__': | |
# Initialize | |
iw = influxWrapper() | |
# Create the Database | |
iw.create_db() | |
time.sleep(1) | |
# Write an Event | |
iw.event( text = '<a href="http://bd.vg" target="_blank">This is a URL</a>' | |
, title = 'Testing Anotations...(日本語)' | |
, sysg2r = 'SYS' | |
, type = 'TEST' | |
) | |
time.sleep(1) | |
# Dump all series | |
iw.dump() | |
# Delele all series | |
iw.drop_all() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment