Skip to content

Instantly share code, notes, and snippets.

@tdudgeon
Created June 17, 2019 13:51
Show Gist options
  • Save tdudgeon/0239a8cef404809a2d9157bbe9b3b3ad to your computer and use it in GitHub Desktop.
Save tdudgeon/0239a8cef404809a2d9157bbe9b3b3ad to your computer and use it in GitHub Desktop.
V2 Fragment network searching in Optibrium's Stardrop. Place these two files in the ~/Stardrop/py/fragnet directory and restart Stardrop.
#! /usr/bin/env python
"""A Python wrapper around the Informatics Matters Fragnet service REST API.
Workflow is to create a FragnetSearch instance, authenticate
and then use the FragnetSearch query methods, like search_neighbourhood(): -
fs = FragnetSearch(fragnet_url, username, password)
fs.authenticate()
fs.search_neighbourhood(...)
Authentication obtains a REST API token from the Squonk server.
The token is automatically renewed so, for the lifetime of your FragnetSearch
object you should only need to authenticate once before using any
FragnetSearch methods.
Note: We do not use the Python logging framework
due to an issue with Tim's MacBook.
Instead we use print statements and debug(), warning()
and error() functions.
"""
import datetime
import os
import pprint
import sys
import urllib
from collections import namedtuple
import requests
# The version of this module.
# Modify with every change, complying with
# semantic 2.0.0 rules.
__version__ = '1.2.0'
# The search result.
# A namedtuple.
SearchResult = namedtuple('SearchResult', 'status_code message json')
# Set to DEBUG
DEBUG = False
def debug(msg):
"""Prints a message (if DEBUG is True)"""
if DEBUG:
print('DEBUG: {}'.format(msg))
def warning(msg):
print('WARNING: {}'.format(msg))
def error(msg):
print('ERROR: {}'.format(msg))
class FragnetSearchException(Exception):
"""A basic exception used by the FragnetSearch class.
"""
pass
class FragnetSearch:
"""The FragnetSearch REST API wrapper class.
Provides convenient and auto-refreshed token-based access to
the Fragnet REST API.
"""
SUPPORTED_CALCULATIONS = ['LOGP',
'TPSA',
'SIM_RDKIT_TANIMOTO',
'SIM_MORGAN2_TANIMOTO',
'SIM_MORGAN3_TANIMOTO']
MAX_LIMIT = 5000
MAX_HOPS = 2
INTERNAL_ERROR_CODE = 600
TOKEN_URI = 'https://squonk.it/auth/realms/squonk/protocol/openid-connect/token'
API = 'fragnet-search/rest/v2'
CLIENT_ID = 'fragnet-search'
REQUEST_TIMOUT_S = 20
# The minimum remaining life of a token (or refresh token) (in seconds)
# before an automatic refresh is triggered.
TOKEN_REFRESH_DEADLINE_S = datetime.timedelta(seconds=45)
def __init__(self, fragnet_host, username, password):
"""Initialises the FragnetSearch module.
An API token is collected when you 'authenticate'.
:param fragnet_host: The Fragnet host and designated port,
i.e. http://fragnet.squonk.it:8080
:type fragnet_host: ``str``
:param username: A Fragnet username
:type username: ``str``
:param password: The Fragnet user's password
:type password: ``str``
"""
# We do nothing other then record parameters
# to be used elsewhere in the class...
self._username = username
self._password = password
self._fragnet_host = fragnet_host
self._access_token = None
self._access_token_expiry = None
self._refresh_token = None
self._refresh_token_expiry = None
debug('fragnet={} username={}'.format(self._fragnet_host,
self._username))
def _extract_tokens(self, json):
"""Gets tokens from a valid json response.
The JSON is expected to contain an 'access_token',
'refresh_token', 'expires_in' and 'refresh_expires_in'.
We calculate the token expiry times here,
which is used by '_check_token()' to automatically
renew the token.
:param json: The JSON payload, containing tokens
"""
if 'access_token' not in json:
error('access_token is not in the json')
return False
if 'expires_in' not in json:
error('expires_in is not in the json')
return False
if 'refresh_token' not in json:
error('refresh_token is not in the json')
return False
# The refresh token may not have an expiry...
if 'refresh_expires_in' not in json:
debug('refresh_expires_in is not in the json')
time_now = datetime.datetime.now()
self._access_token =json['access_token']
self._access_token_expiry = time_now + \
datetime.timedelta(seconds=json['expires_in'])
self._refresh_token = json['refresh_token']
if 'refresh_expires_in' in json:
self._refresh_token_expiry = time_now + \
datetime.timedelta(seconds=json['refresh_expires_in'])
else:
debug('Setting _refresh_expires_in to None (no refresh expiry)...')
self._refresh_token_expiry = None
debug('_access_token_expiry={}'.format(self._access_token_expiry))
debug('_refresh_token_expiry={}'.format(self._refresh_token_expiry))
# OK if we get here...
return True
def _get_new_token(self):
"""Gets a (new) API access token.
"""
debug('Getting a new access token...')
headers = {'Content-Type': 'application/x-www-form-urlencoded'}
payload = {'grant_type': 'password',
'client_id': FragnetSearch.CLIENT_ID,
'username': self._username,
'password': self._password}
try:
resp = requests.post(FragnetSearch.TOKEN_URI,
data=payload,
headers=headers,
timeout=FragnetSearch.REQUEST_TIMOUT_S)
except requests.exceptions.ConnectTimeout:
warning('_get_new_token() POST timeout')
return False
if resp.status_code != 200:
warning('_get_new_token() resp.status_code={}'.format(resp.status_code))
return False
# Get the tokens from the response...
debug('Got token.')
return self._extract_tokens(resp.json())
def _refresh_existing_token(self):
"""Refreshes an (existing) API access token.
"""
debug('Refreshing the existing access token...')
headers = {'Content-Type': 'application/x-www-form-urlencoded'}
payload = {'grant_type': 'refresh_token',
'client_id': FragnetSearch.CLIENT_ID,
'refresh_token': self._refresh_token}
try:
resp = requests.post(FragnetSearch.TOKEN_URI,
data=payload,
headers=headers,
timeout=FragnetSearch.REQUEST_TIMOUT_S)
except requests.exceptions.ConnectTimeout:
warning('_refresh_existing_token() POST timeout')
return False
if resp.status_code != 200:
warning('_refresh_existing_token() resp.status_code={}'.format(resp.status_code))
return False
# Get the tokens from the response...
debug('Refreshed token.')
return self._extract_tokens(resp.json())
def _check_token(self):
"""Refreshes the access token if it's close to expiry.
(i.e. if it's within the refresh period). If the refresh token
is about to expire (i.e. there's been a long time between searches)
then we get a new token.
:returns: False if the token could not be refreshed.
"""
debug('Checking token...')
time_now = datetime.datetime.now()
remaining_token_time = self._access_token_expiry - time_now
if remaining_token_time >= FragnetSearch.TOKEN_REFRESH_DEADLINE_S:
# Token's got plenty of time left to live.
# No need to refresh or get a new token.
debug('Token still has plenty of life remaining.')
return True
# If the refresh token is still 'young' (or has no expiry time)
# then we can rely on refreshing the existing token using it. Otherwise
# we should collect a whole new token...
#
# We set the reaming to me to the limit (which means we'll refresh)
# but we replace that with any remaining time in the refresh token).
# So - if there is not expiry time for the refresh token then
# we always refresh.
remaining_refresh_time = FragnetSearch.TOKEN_REFRESH_DEADLINE_S
if self._refresh_token_expiry:
remaining_refresh_time = self._refresh_token_expiry - time_now
if remaining_refresh_time >= FragnetSearch.TOKEN_REFRESH_DEADLINE_S:
# We should be able to refresh the existing token...
debug('Token too old, refreshing...')
status = self._refresh_existing_token()
else:
# The refresh token is too old,
# we need to get a new token...
debug('Refresh token too old, getting a new token...')
status = self._get_new_token()
# Return status (success or failure)
if status:
debug('Got new token.')
return status
def authenticate(self):
"""Authenticates against the server provided in the class initialiser.
Here we obtain a fresh access and refresh token.
:returns: True on success
:raises: FragnetSearchException on error
"""
debug('Authenticating...')
status = self._get_new_token()
if not status:
raise FragnetSearchException('Unsuccessful Authentication')
debug('Authenticated.')
def search_neighbourhood(self, smiles, hac, rac, hops, limit, calculations, suppliers):
"""Runs a 'search/neighbourhood' query on the Fragnet server.
The API token is collected when this class instance has been
created.
:param smiles: The SMILES string on which th base the search,
This will be URL-encoded bnu the method so you provide
the 'raw' smiles.
:type smiles: ``str``
:param hac: Heavy Atom Count.
:type hac: ``int``
:param rac: Ring Atom Count.
:type rac: ``int``
:param hops: Hops (1 or 2)
:type hops: ``int``
:param limit: Search limit.
:type limit: ``int``
:param calculations: The list of calculations (can be empty)
:type calculations: ``list``
:param suppliers: The list of suppliers (if empty molecules from all suppliers are returned)
:type suppliers: ``list``
:returns: A SearchResult namedtuple consisting of the API 'status_code'
(normally 200 on success), a 'message',
and the response 'json' content
(or None if there is no content).
"""
# Sanity check the arguments...
if not smiles.strip():
return SearchResult(FragnetSearch.INTERNAL_ERROR_CODE,
'EmptySmiles', None)
if hac < 0:
return SearchResult(FragnetSearch.INTERNAL_ERROR_CODE,
'InvalidHAC', None)
if rac < 0:
return SearchResult(FragnetSearch.INTERNAL_ERROR_CODE,
'InvalidRAC', None)
if hops < 1 or hops > FragnetSearch.MAX_HOPS:
return SearchResult(FragnetSearch.INTERNAL_ERROR_CODE,
'InvalidHops (%s)' % hops, None)
if limit < 1 or limit > FragnetSearch.MAX_LIMIT:
return SearchResult(FragnetSearch.INTERNAL_ERROR_CODE,
'InvalidLimit (%s)' % limit, None)
if calculations:
for calculation in calculations:
if calculation not in FragnetSearch.SUPPORTED_CALCULATIONS:
return SearchResult(FragnetSearch.INTERNAL_ERROR_CODE,
'InvalidCalculation: %s' % calculation,
None)
# Always try to refresh the access token.
# The token is only refreshed if it is close to expiry.
if not self._check_token():
return SearchResult(FragnetSearch.INTERNAL_ERROR_CODE,
'APITokenRefreshFailure', None)
# Construct the basic URI, which includes the URL-encoded
# version of the provided SMILES string...
search_uri = '{}/{}/search/neighbourhood/{}'. \
format(self._fragnet_host,
FragnetSearch.API,
urllib.quote(smiles))
headers = {'Authorization': 'bearer {}'.format(self._access_token)}
params = {'hac': hac,
'rac': rac,
'hops': hops,
'limit': limit}
if calculations:
params['calcs'] = ','.join(calculations)
if suppliers:
params['suppliers'] = ','.join(suppliers)
# Make the request...
debug('Calling search/neighbourhood for {}...'.format(smiles))
try:
resp = requests.get(search_uri,
params=params,
headers=headers,
timeout=FragnetSearch.REQUEST_TIMOUT_S)
except requests.exceptions.ConnectTimeout:
return SearchResult(FragnetSearch.INTERNAL_ERROR_CODE,
'RequestTimeout', None)
# Try to extract the JSON content
# and return it and the status code.
content = None
try:
content = resp.json()
debug('Returned from search with content.')
except ValueError:
# No json content.
# Nothing to do - content is already 'None'
warning('ValueError getting response json content.')
pass
return SearchResult(resp.status_code, 'Success', content)
def search_suppliers(self):
"""Runs a 'search/suppliers' query on the Fragnet server.
:returns: A SearchResult namedtuple consisting of the API 'status_code'
(normally 200 on success), a 'message',
and the response 'json' content
(or None if there is no content). The content is
a possibly empty list of supplier name strings.
"""
# Always try to refresh the access token.
# The token is only refreshed if it is close to expiry.
if not self._check_token():
return SearchResult(FragnetSearch.INTERNAL_ERROR_CODE,
'APITokenRefreshFailure', None)
# Construct the basic URI, which includes the URL-encoded
# version of the provided SMILES string...
search_uri = '{}/{}/search/suppliers'. \
format(self._fragnet_host, FragnetSearch.API)
headers = {'Authorization': 'bearer {}'.format(self._access_token)}
# Make the request...
debug('Calling search/suppliers...')
try:
resp = requests.get(search_uri,
headers=headers,
timeout=FragnetSearch.REQUEST_TIMOUT_S)
except requests.exceptions.ConnectTimeout:
return SearchResult(FragnetSearch.INTERNAL_ERROR_CODE,
'RequestTimeout', None)
# Try to extract the JSON content
# and return it and the status code.
content = None
try:
content = resp.json()
debug('Returned from search with content.')
except ValueError:
# No json content.
# Nothing to do - content is already 'None'
warning('ValueError getting response json content.')
pass
# Map content to a list of names.
# The input's a dictionary of labels and names:
#
# [{ u'label': u'V_MP', u'name': u'MolPort'},
# { u'label': u'V_EMOLS_BB', u'name': u'eMolecules-BB'}]
#
translated_content = list(map(lambda x: x['name'], content))
return SearchResult(resp.status_code, 'Success', translated_content)
# -----------------------------------------------------------------------------
# MAIN
# -----------------------------------------------------------------------------
if __name__ == '__main__':
fragnet_username = os.environ.get('FRAGNET_USERNAME', None)
if not fragnet_username:
print('You need to set FRAGNET_USERNAME')
sys.exit(1)
fragnet_password = os.environ.get('FRAGNET_PASSWORD', None)
if not fragnet_password:
print('You need to set FRAGNET_PASSWORD')
sys.exit(1)
# Pretty-printer
pp = pprint.PrettyPrinter(indent=2)
# Create a Fragnet object
# then authenticate (checking for success)...
fs = FragnetSearch('http://fragnet.squonk.it:8080',
fragnet_username, fragnet_password)
fs.authenticate()
# Now run a search for suppliers...
#
# The response code from the server is followed by the JSON content
# The JSON value is 'None' if there is no JSON content.
result = fs.search_suppliers()
pp.pprint(result.status_code)
pp.pprint(result.message)
pp.pprint(result.json)
# Now run a basic search...
hac = 3
rac = 1
hops = 2
limit = 10
calculations = ['LOGP', 'SIM_RDKIT_TANIMOTO']
result = fs.search_neighbourhood('c1ccc(Nc2nc3ccccc3o2)cc1',
hac, rac, hops, limit,
calculations)
pp.pprint(result.status_code)
pp.pprint(result.message)
pp.pprint(result.json)
import json
import math
import app
import cardview
import PythonQt
from fragnet.FragnetSearch import FragnetSearch
from collections import OrderedDict
import sys
try:
layoutengine = sys.modules["__main__"].layoutengine
except Exception as e:
print e
def process_json(data):
query_smiles = data['parameters']['smiles']
print('Processing nodes')
nodesMap, query_id = map_nodes(data['nodes'], query_smiles)
print('Processing edges')
edgesMap = map_edges(data['edges'], nodesMap)
print('Processing groups')
groupsMap = map_groups(data['groups'], nodesMap, edgesMap)
return data['refmol'], query_id, nodesMap, edgesMap, groupsMap
def map_nodes(nodes, query):
"""
Map the nodes
:param nodes: An array of nodes
:return: Dictionary of the nodes keyed by the node's id
"""
m = {}
query_id = None
for node in nodes:
id = node['id']
smiles = node['smiles']
molType = node['molType']
#print("id=%s smiles=%s molType=%s" % (id, smiles, molType))
if smiles == query:
query_id = id
print("Found query molecule " + str(query_id))
m[id] = node
return m, query_id
def map_edges(edges, nodesMap):
"""
Map the edges
:param edges: An array of edges
:param nodesMap: Dictionary of mapped nodes
:return: Dictionary of edges keyed by the edge's id
"""
m = {}
for edge in edges:
id = edge['id']
parentId = edge['parentId']
childId = edge['childId']
label = edge['label']
#print("id=%s parent=%s child=%s label=%s" % (id, parentId, childId, label))
m[id] = edge
return m
def map_groups(groupData, nodesMap, edgesMap):
"""
Map the groups
:param groupData: Array of groups
:param nodesMap: The mapped nodes
:param edgesMap: The mapped edges
:return: Dictionary of groups keyed by the group's key property
"""
m = {}
for group in groupData:
if 'key' in group:
key = group['key']
else:
print('WARNING: key not found for group ' + str(group))
key = 'Undefined'
m[key] = group
return m
searcher = None
optHac = 3
optRac = 1
optHops = 2
optLimit = 1000
def display_error(message):
w = PythonQt.Warning()
w.setWindowTitle('Error')
w.setMessage(message)
w.run()
def settings(appui):
selector = PythonQt.ComboBoxSelector()
selector.setWindowTitle("Specify query parameters")
selector.setText("Parameters")
hac = 'Heavy atom count change'
rac = 'Ring atom count change'
hops = 'Number of hops'
limit = 'Max results'
sep = '--'
selector.setOptions(hac, [optHac, sep, '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', '10'])
selector.addOptions(rac, [optRac, sep, '0', '1', '2', '3', '4', '5', '6'])
selector.addOptions(hops, [optHops, sep, '1', '2'])
selector.addOptions(limit, [optLimit, sep, '100', '500', '1000', '2000', '5000'])
if selector.run():
global optHac
global optRac
global optHops
global optLimit
tmpHac = selector.getSelectedOption(0)
tmpRac = selector.getSelectedOption(1)
tmpHops = selector.getSelectedOption(2)
tmpLimit = selector.getSelectedOption(3)
print(hac, tmpHac)
print(rac, tmpRac)
print(hops, tmpHops)
print(limit, tmpLimit)
if not tmpHac == sep:
optHac = tmpHac
if not tmpRac == sep:
optRac = tmpRac
if not tmpHops == sep:
optHops = tmpHops
if not tmpLimit == sep:
optLimit = tmpLimit
calcs_dict = OrderedDict([
('LogP', 'LOGP'),
('TPSA', 'TPSA'),
('Tanimoto similarity (RDKit)', 'SIM_RDKIT_TANIMOTO'),
('Tanimoto similarity (Morgan2)', 'SIM_MORGAN2_TANIMOTO'),
('Tanimoto similarity (Morgan3)', 'SIM_MORGAN3_TANIMOTO'),
])
calcs_names = {
'LOGP': 'LogP',
'TPSA': 'TPSA',
'SIM_RDKIT_TANIMOTO': 'SimRDKit',
'SIM_MORGAN2_TANIMOTO': 'SimMorgan2',
'SIM_MORGAN3_TANIMOTO': 'SimMorgan3'
}
calcs_selected = ['LOGP', 'TPSA', 'SIM_MORGAN2_TANIMOTO']
def calculations(appui):
global calcs_dict, calcs_selected
sl = PythonQt.SimpleList()
sl.setWindowTitle('Select Calculations')
sl.setLabel('Calculations:')
sl.setAvailableItems(calcs_dict.keys())
sl.setMultipleSelections() # Allow more than one selection
if sl.run(): # True if user clicked OK rather than Cancel
selections = sl.getSelectedItems()
calcs_selected = []
for selection in selections:
calcs_selected.append(calcs_dict[selection])
print('Calcs selected: ' + ','.join(calcs_selected))
suppliers_available = None
suppliers_selected = []
def suppliers(appui):
global suppliers_selected, suppliers_available
if not suppliers_available:
lookup_suppliers()
sl = PythonQt.SimpleList()
sl.setWindowTitle('Select Suppliers')
sl.setLabel('Available suppliers:')
sl.setAvailableItems(suppliers_available)
sl.setMultipleSelections() # Allow more than one selection
if sl.run(): # True if user clicked OK rather than Cancel
suppliers_selected = sl.getSelectedItems()
print('Suppliers selected: ' + ','.join(suppliers_selected))
def get_searcher():
"""Get the searcher, authenticating as needed"""
global searcher
if not searcher:
login = PythonQt.LogInDialog()
if login.run():
user = login.getUser()
pwd = login.getPassword()
print(user)
tmpSearcher = FragnetSearch('http://fragnet.squonk.it:8080', user, pwd)
try:
tmpSearcher.authenticate()
# set the searcher object
searcher = tmpSearcher
except:
msg = 'Authentication failed'
print(msg)
display_error(msg)
searcher = None
return searcher
def lookup_suppliers():
global suppliers_available, searcher
get_searcher()
if not searcher:
return
response = searcher.search_suppliers()
if response.status_code != 200:
msg = 'Suppliers failed: ' + response.message
print(msg)
if response.json:
print(response.json)
display_error(msg)
suppliers_available = []
return
suppliers_available = response.json
print("Suppliers: " + ",".join(suppliers_available))
def runFrag(appui):
global optHac, optRac, optHops, optLimit, searcher
get_searcher()
if not searcher:
return
smiles = appui.getCurrentMolecule()
if not smiles:
msg = 'No structure selected'
print(msg)
display_error(msg)
return
print("Query: smiles=" + smiles + " hac=" + str(optHac) + " rac=" + str(optRac) + " hops=" + str(optHops) +
' limit=' + str(optLimit))
if calcs_selected:
calcs = calcs_selected
print('Calcs: ' + ','.join(calcs))
else:
calcs = None
print('No calcs')
if suppliers_selected:
print('Suppliers: ' + ','.join(suppliers_selected))
else:
print('All suppliers')
# dummy_data = open('py/fragnet/example.json', 'r')
# result = json.loads(dummy_data.read())
# refMol, refMolId, nodesMap, edgesMap, groupsMap = process_json(result)
response = searcher.search_neighbourhood(smiles, int(optHac), int(optRac), int(optHops), int(optLimit), calcs, suppliers_selected)
if response.status_code != 200:
msg = 'Search failed: ' + response.message
print(msg)
if response.json:
print(response.json)
display_error(msg)
return
result = response.json
print("Query executed: " + result['query'])
if 'nodes' not in result or len(result['nodes']) == 0:
msg = 'No results'
print(msg)
display_error(msg)
return
print('Nodes: ' + str(len(result['nodes'])))
refMol, refMolId, nodesMap, edgesMap, groupsMap = process_json(result)
print("RefMol: " + refMol + ' id: ' + str(refMolId))
print(str(len(nodesMap)) + " nodes")
print(str(len(edgesMap)) + " edges")
print(str(len(groupsMap)) + " groups")
structure_column = app.Dataset.meta('Structure', app.mol)
id_column = app.Dataset.meta('MoleculeID')
cmpd_id_column = app.Dataset.meta('VendorIDs')
hac_column = app.Dataset.meta('HAC', app.cont, ((0.0, 0.0), (0, 0)))
rac_column = app.Dataset.meta('RAC', app.cont, ((0.0, 0.0), (0, 0)))
dataset_cols = [structure_column, id_column, cmpd_id_column, hac_column, rac_column]
for calc in calcs_selected:
name = calcs_names[calc]
calc_column = app.Dataset.meta(name, app.cont, ((0.0, 0.0), (0, 0)))
dataset_cols.append(calc_column)
dataset = app.Dataset(dataset_cols)
molecule_ids = []
# handle the reference (query) mol
print('Creating standard columns')
node = nodesMap[refMolId]
add_row_to_data(dataset, node)
molecule_ids.append(refMolId)
# calculated props
print('Creating calculated columns')
add_calculated_props(5, dataset, node['props'])
for key in groupsMap:
group = groupsMap[key]
for member in group['members']:
id = member['id']
if id != refMolId:
node = nodesMap[id]
add_row_to_data(dataset, node)
molecule_ids.append(id)
# now the calc columns
add_calculated_props(5, dataset, node['props'])
print('Packing dataset')
appui.exportDS('fragnet', dataset.pack())
dataset_2 = app.Dataset()
dataset_2.parse(appui.importDS(""))
id_map = {molecule_ids[i]: dataset_2.row_ids[i] for i in range(dataset_2.rowSz())}
cv = cardview.CardViewState()
cv.background = (255,255,255)
pos = 0
for key in groupsMap:
group = groupsMap[key]
classification = group['classification']
x, y = get_stack_position(pos, len(groupsMap))
stack_name = str(pos) + ' ' + classification
cv.createStack(stack_name, int(x), int(y))
print('Created stack ' + stack_name + ' at ' + str(x) + ',' + str(y))
pos += 1
for member in group['members']:
id = member['id']
if id != refMolId:
cv.addCardToStack(id_map[member['id']], stack_name)
cv.addLink(id_map[refMolId], id_map[member['id']], True)
x, y = get_refmol_position(len(groupsMap))
cv.addCard(id_map[refMolId], int(x), int(y))
print('Created refmol card at ' + str(x) + ',' + str(y))
xml = cv.writeXML()
layoutengine.layoutDataSet("", xml)
def add_row_to_data(dataset, node):
dataset.cols[0].append(app.Dataset.molecule(node['smiles']))
dataset.cols[1].append(app.Dataset.string(str(node['id'])))
if 'cmpd_ids' in node['props']:
cpd_ids = node['props']['cmpd_ids']
dataset.cols[2].append(app.Dataset.string(",".join(cpd_ids)))
else:
dataset.cols[2].append(app.Dataset.createInvalidEntry(app.string))
dataset.cols[3].append(app.Dataset.cont(float(node['props']['hac']), 0.0))
dataset.cols[4].append(app.Dataset.cont(float(node['props']['chac']), 0.0))
def add_calculated_props(index, dataset, props):
for calc in calcs_selected:
propname = calc.lower()
# print('Looking for calc ' + propname)
value = None
if propname in props:
value = props[propname]
if value is not None:
# print('Prop ' + str(index) + ' ' + calc + ' = ' + str(value))
dataset.cols[index].append(app.Dataset.cont(float(value), 0.0))
else:
dataset.cols[index].append(app.Dataset.createInvalidEntry(app.cont))
index = index + 1
def get_refmol_position(total):
if total < 9:
return 1, 1
elif total < 11:
return 2, 1
else:
return 3, 1
def get_stack_position(pos, total):
if total < 9:
if pos < 3:
return pos, 0
elif pos == 3:
return 0, 1
elif pos == 4:
return 2, 1
else:
return pos - 5, 2
elif total < 13:
if pos < 5:
return pos, 0
elif pos == 5:
return 0, 1
elif pos == 6:
return 4, 1
else:
return pos - 7, 2
else:
if pos < 7:
return pos, 0
elif pos == 7:
return 0, 1
elif pos == 8:
return 6, 1
else:
x = int((pos - 2) % 7)
y = int(math.floor((pos - 2) / 7) + 1)
return x, y
def get_stardrop_definitions():
definitions = [
{'script_name': 'Fragment Network/Query Settings', 'callback': settings},
{'script_name': 'Fragment Network/Calculation Settings', 'callback': calculations},
{'script_name': 'Fragment Network/Suppliers Settings', 'callback': suppliers},
{'script_name': 'Fragment Network/Run fragmentation', 'callback': runFrag}
]
return definitions
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment