Skip to content

Instantly share code, notes, and snippets.

@csm10495
Created February 11, 2023 05:27
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save csm10495/65ca17fdb758ecec69c2bb8532a78b3b to your computer and use it in GitHub Desktop.
Save csm10495/65ca17fdb758ecec69c2bb8532a78b3b to your computer and use it in GitHub Desktop.
cosmos2json.py - A quick and dirty script to run queries against a cosmos db's container
'''
Simple script to do a simple query against a cosmosdb in azure
Can use it to dump a db to json.. if ya need that for some reason.
MIT License - Charles Machalow
# pip install azure-cosmos
# pip install azure-identity
'''
import argparse
import io
import json
import sys
from azure.cosmos import CosmosClient
class OutputJSONStreamer:
"""
A hacky way of streaming the output of a query in a json-y format
"""
def __init__(self, output: io.StringIO):
self._initial_items = []
self.output = output
def __enter__(self):
return self
def __exit__(self, *args, **kargs):
if len(self._initial_items) > 2:
self.output.write("\n]")
elif len(self._initial_items) == 1:
self.output.write(json.dumps(self._initial_items[0], indent=4))
elif len(self._initial_items) == 0:
self.output.write('{}')
else:
self.output.write(json.dumps(self._initial_items, indent=4))
def process(self, item):
if len(self._initial_items) < 2:
self._initial_items.append(item)
if len(self._initial_items) == 2:
self.output.write("[\n")
self.output.write(json.dumps(self._initial_items[0], indent=4))
self.output.write(",\n")
self.output.write(json.dumps(self._initial_items[1], indent=4))
self._initial_items.append(None)
return
if len(self._initial_items) > 2:
self.output.write(",\n")
self.output.write(json.dumps(item, indent=4))
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument("--endpoint", required=True, help="Cosmos DB endpoint")
parser.add_argument("--key",required=True, help="Cosmos DB key")
parser.add_argument("--database", required=True,help="Cosmos database")
parser.add_argument("--container",required=True, help="Cosmos container")
parser.add_argument("--query",required=True, help="query to execute.. for example: 'select * from c'")
parser.add_argument("-o", "--output",required=False, default=None, help="If given a path to send this output to (otherwise goes to stdout)")
args = parser.parse_args()
with CosmosClient(url=args.endpoint, credential=args.key) as cosmos:
db = cosmos.get_database_client(args.database)
container = db.get_container_client(args.container)
info = container.query_items(args.query, enable_cross_partition_query=True)
output = open(args.output, 'w') if args.output else sys.stdout
try:
with OutputJSONStreamer(output) as streamer:
for q in info:
streamer.process(q)
finally:
if args.output:
output.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment