-
-
Save theaeolianmachine/4e4e2df58e3cd402d2f07a3a79fba32e to your computer and use it in GitHub Desktop.
Tonic Data Generation
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Note that we do not guarantee API back compatibility. Please look at the | |
# Swagger API documentation that comes with your instance for your exact | |
# version's endpoints and parameters. If you are using hosted Tonic, you can | |
# find the API documentation here: https://app.tonic.ai/apidocs/index.html | |
# Tested 2020.06.19 with Tonic API v87 | |
import json | |
# Be sure to install requests via pip, pipenv, or poetry before running. | |
import requests | |
# Parameters | |
TONIC_API_KEY = "<<TONIC API KEY>>" | |
TONIC_URL = "https://<<TONIC HOSTNAME>>" | |
WORKSPACE_NAME = "<<TONIC WORKSPACE>>" | |
class TonicSession: | |
def __init__(self, base_url, api_key): | |
self._base_url = base_url | |
self._session = requests.Session() | |
self._api_key = api_key | |
self._session.headers.update({"Authorization": "Apikey {}".format(api_key)}) | |
def _get_url(self, api_snippet): | |
return "{}{}".format(self._base_url, api_snippet) | |
def generate_data(self, workspace_id): | |
generate_data_url = self._get_url("/api/generateData/start") | |
params = {"workspaceId": workspace_id} | |
r = self._session.post(generate_data_url, params=params) | |
if r.ok: | |
print("Data generation started") | |
else: | |
r.raise_for_status() | |
def generate_data_status(self, workspace_id): | |
generate_data_status_url = self._get_url("/api/generateData") | |
params = {"workspaceId": workspace_id} | |
r = self._session.get(generate_data_status_url, params=params) | |
if r.ok: | |
print(json.dumps(r.json(), indent=2)) | |
else: | |
r.raise_for_status() | |
def get_workspaces(self): | |
workspace_list_url = self._get_url("/api/workspace") | |
r = self._session.get(workspace_list_url) | |
if r.ok: | |
return r.json() | |
else: | |
r.raise_for_status() | |
def find_workspace_id(workspaces_json, workspace_name): | |
for workspace in workspaces_json: | |
if workspace["fingerprintName"] == workspace_name: | |
return workspace["id"] | |
raise RuntimeError("No Workspace found with name: {}".format(workspace_name)) | |
def main(): | |
session = TonicSession(TONIC_URL, TONIC_API_KEY) | |
workspace_id = find_workspace_id(session.get_workspaces(), WORKSPACE_NAME) | |
# Starts a new data generation | |
session.generate_data(workspace_id) | |
# Prints out the current status of the workspace | |
# session.generate_data_status(workspace_id) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
For the code segment-
def get_workspaces(self):
workspace_list_url = self._get_url("/api/workspace")
r = self._session.get(workspace_list_url)
if r.ok:
return r.json()
else:
r.raise_for_status()
The following error is coming- probably because of return r.json()
JSONDecodeError Traceback (most recent call last)
C:\Users\320166~1\AppData\Local\Temp/ipykernel_5532/2699010662.py in
5 session.generate_data_status(workspace_id)
6 if name == "main":
----> 7 main()
C:\Users\320166~1\AppData\Local\Temp/ipykernel_5532/2699010662.py in main()
1 def main():
2 session = TonicSession(TONIC_URL, TONIC_API_KEY)
----> 3 workspace_id = find_workspace_id(session.get_workspaces(), WORKSPACE_NAME)
4 session.generate_data(workspace_id)
5 session.generate_data_status(workspace_id)
C:\Users\320166~1\AppData\Local\Temp/ipykernel_5532/2800021434.py in get_workspaces(self)
37
38 if r.ok:
---> 39 return r.json()
40 else:
41 r.raise_for_status()
~\Anaconda3\lib\site-packages\requests\models.py in json(self, **kwargs)
908 # used.
909 pass
--> 910 return complexjson.loads(self.text, **kwargs)
911
912 @Property
~\Anaconda3\lib\json_init_.py in loads(s, cls, object_hook, parse_float, parse_int, parse_constant, object_pairs_hook, **kw)
344 parse_int is None and parse_float is None and
345 parse_constant is None and object_pairs_hook is None and not kw):
--> 346 return _default_decoder.decode(s)
347 if cls is None:
348 cls = JSONDecoder
~\Anaconda3\lib\json\decoder.py in decode(self, s, _w)
335
336 """
--> 337 obj, end = self.raw_decode(s, idx=_w(s, 0).end())
338 end = _w(s, end).end()
339 if end != len(s):
~\Anaconda3\lib\json\decoder.py in raw_decode(self, s, idx)
353 obj, end = self.scan_once(s, idx)
354 except StopIteration as err:
--> 355 raise JSONDecodeError("Expecting value", s, err.value) from None
356 return obj, end
JSONDecodeError: Expecting value: line 1 column 1 (char 0)