Skip to content

Instantly share code, notes, and snippets.

@milo2012
Created June 20, 2024 04:57
Show Gist options
  • Save milo2012/0da0c406cf9fdf7f4603350feb94fd90 to your computer and use it in GitHub Desktop.
Save milo2012/0da0c406cf9fdf7f4603350feb94fd90 to your computer and use it in GitHub Desktop.
import argparse
import requests
import re
def main():
# Parse command line arguments
parser = argparse.ArgumentParser(description='Dump data from a specific measurement/table in an InfluxDB instance via HTTP API.')
parser.add_argument('-u', '--url', type=str, required=True, help='URL of the InfluxDB query endpoint')
parser.add_argument('-d', '--database', type=str, help='Name of the database')
parser.add_argument('-t', '--table', type=str, help='Name of the measurement/table')
parser.add_argument('-n', '--num_rows', type=int, default=50, help='Number of rows to return (default: 50)')
parser.add_argument('-s', '--search', type=str, help='Search for columns containing this text')
parser.add_argument('--list-databases', action='store_true', help='List all databases')
parser.add_argument('--list-tables', action='store_true', help='List all tables (measurements) under a specified database')
parser.add_argument('--list-version', action='store_true', help='Show InfluxDB version')
args = parser.parse_args()
if args.list_databases:
list_databases(args)
elif args.list_tables:
list_tables(args)
elif args.list_version:
list_version(args)
elif args.database and args.table:
fetch_data(args)
else:
# Default behavior: Dump all data
dump_all_data(args)
def list_databases(args):
# Construct URL for SHOW DATABASES query
url = f"{args.url}/query?db=&q=SHOW DATABASES"
try:
# Perform GET request to fetch databases
response = requests.get(url)
response.raise_for_status() # Raise exception for bad status codes
# Parse JSON response
data = response.json()
# Check if there are results
if 'results' in data and len(data['results']) > 0:
for result in data['results']:
if 'series' in result:
for series in result['series']:
if 'values' in series:
# Extract database names from the response
databases = [db[0] for db in series['values']]
for database_name in databases:
print(f"Database: {database_name}")
except requests.exceptions.RequestException as e:
print(f"Error fetching databases: {e}")
def list_tables(args):
if args.database:
# Construct URL for SHOW MEASUREMENTS query
url = f"{args.url}/query?db={args.database}&q=SHOW MEASUREMENTS"
else:
# Construct URL for SHOW MEASUREMENTS query across all databases
url = f"{args.url}/query?db=&q=SHOW MEASUREMENTS"
try:
# Perform GET request to fetch measurements
response = requests.get(url)
response.raise_for_status() # Raise exception for bad status codes
# Parse JSON response
data = response.json()
# Check if there are results
if 'results' in data and len(data['results']) > 0:
for result in data['results']:
if 'series' in result:
for series in result['series']:
if 'values' in series:
# Extract measurement names from the response
measurements = [measurement[0] for measurement in series['values']]
# Filter out non-alphanumeric measurements
measurements = [m for m in measurements if is_valid_measurement_name(m)]
for measurement_name in measurements:
print(f"Database: {args.database}, Table: {measurement_name}")
except requests.exceptions.RequestException as e:
if args.database:
print(f"Error fetching measurements for database '{args.database}': {e}")
else:
print(f"Error fetching measurements across databases: {e}")
def list_version(args):
# Construct URL for SHOW VERSION query
url = f"{args.url}/ping"
try:
# Perform GET request to fetch version
response = requests.get(url)
response.raise_for_status() # Raise exception for bad status codes
# Extract and print InfluxDB version
version_info = response.json()
print(f"InfluxDB Version: {version_info['version']}")
except requests.exceptions.RequestException as e:
print(f"Error fetching InfluxDB version: {e}")
def fetch_data(args):
# Fetch data for a specific measurement/table
# Construct URL for SELECT * query with LIMIT
url = f"{args.url}/query?db={args.database}&q=SELECT * FROM \"{args.table}\" LIMIT {args.num_rows}"
try:
# Perform GET request to fetch data
response = requests.get(url)
response.raise_for_status() # Raise exception for bad status codes
# Parse JSON response for data
data = response.json()
# Check if there are results
if 'results' in data and len(data['results']) > 0:
for result in data['results']:
if 'series' in result:
for series in result['series']:
if 'columns' in series and 'values' in series:
# Filter columns based on search text
if args.search:
filtered_columns = [col for col in series['columns'] if args.search.lower() in col.lower()]
else:
filtered_columns = series['columns']
# Print column names as headers
print(",".join(filtered_columns))
# Print retrieved data points
for record in series['values']:
print(",".join(map(str, record)))
except requests.exceptions.RequestException as e:
print(f"Error fetching data for measurement '{args.table}' in database '{args.database}': {e}")
def dump_all_data(args):
# Fetch and dump all data from all measurements in all databases
# Construct URL for SHOW DATABASES query
url_databases = f"{args.url}/query?db=&q=SHOW DATABASES"
try:
# Perform GET request to fetch databases
response_databases = requests.get(url_databases)
response_databases.raise_for_status() # Raise exception for bad status codes
# Parse JSON response for databases
data_databases = response_databases.json()
# Check if there are results
if 'results' in data_databases and len(data_databases['results']) > 0:
for result_db in data_databases['results']:
if 'series' in result_db:
for series_db in result_db['series']:
if 'values' in series_db:
# Extract database names from the response
databases = [db[0] for db in series_db['values']]
for database_name in databases:
#print(f"Database: {database_name}")
# Construct URL for SHOW MEASUREMENTS query
url_measurements = f"{args.url}/query?db={database_name}&q=SHOW MEASUREMENTS"
try:
# Perform GET request to fetch measurements
response_measurements = requests.get(url_measurements)
response_measurements.raise_for_status() # Raise exception for bad status codes
# Parse JSON response for measurements
data_measurements = response_measurements.json()
# Check if there are results
if 'results' in data_measurements and len(data_measurements['results']) > 0:
for result_meas in data_measurements['results']:
if 'series' in result_meas:
for series_meas in result_meas['series']:
if 'values' in series_meas:
# Extract measurement names from the response
measurements = [meas[0] for meas in series_meas['values']]
# Filter out non-alphanumeric measurements
measurements = [m for m in measurements if is_valid_measurement_name(m)]
for measurement_name in measurements:
print(f"Database: {database_name} Table: {measurement_name}")
# Construct URL for SELECT * query with LIMIT
url_data = f"{args.url}/query?db={database_name}&q=SELECT * FROM \"{measurement_name}\" LIMIT {args.num_rows}"
try:
# Perform GET request to fetch data
response_data = requests.get(url_data)
response_data.raise_for_status() # Raise exception for bad status codes
# Parse JSON response for data
data_data = response_data.json()
# Check if there are results
if 'results' in data_data and len(data_data['results']) > 0:
for result_data in data_data['results']:
if 'series' in result_data:
for series_data in result_data['series']:
if 'columns' in series_data and 'values' in series_data:
# Filter columns based on search text
if args.search:
filtered_columns = [col for col in series_data['columns'] if args.search.lower() in col.lower()]
#print(filtered_columns)
else:
filtered_columns = series_data['columns']
# Print column names as headers
if len(filtered_columns)>0:
print(",".join(filtered_columns))
# Print retrieved data points
for record in series_data['values']:
print(",".join(map(str, record)))
print(f"\n")
except requests.exceptions.RequestException as e:
print(f"Error fetching data for measurement '{measurement_name}' in database '{database_name}': {e}")
except requests.exceptions.RequestException as e:
print(f"Error fetching measurements for database '{database_name}': {e}")
except requests.exceptions.RequestException as e:
print(f"Error fetching databases: {e}")
def is_valid_measurement_name(name):
# Regular expression to check if name consists only of alphanumeric characters and underscores
return re.match(r'^[a-zA-Z0-9_]+$', name) is not None
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment