Skip to content

Instantly share code, notes, and snippets.

@anthonylouisbsb
Last active June 10, 2020 12:06
Show Gist options
  • Save anthonylouisbsb/324b9b8a00ff800c194e7d2cfad9c159 to your computer and use it in GitHub Desktop.
Save anthonylouisbsb/324b9b8a00ff800c194e7d2cfad9c159 to your computer and use it in GitHub Desktop.
async def create_columns_in_bulk(client, columns):
"""
Create a new column if it does not exist.
"""
for query in columns:
validate_column(query)
query = {'query': columns}
await create_single_column(client, query, True)
async def create_single_column(client, query, validated=False):
"""
Create a new column if it does not exist.
"""
if not validated:
validate_column(query)
response = []
def callback(result):
if result['status'] == 'error':
response.append(exceptions.ColumnException(
result['code'], result.get('msg', '')))
retry_count = 0
while True:
try:
node_id = get_node_to_send_column_command(client)
logging.info("sending column creation command to node: %s", node_id.id)
tmp_result = await client.execute_single_node(node_id, SCHEMA_CREATION_COMMAND,
query)
callback(tmp_result)
logging.info(
"sending command msg {} to node {}".format(query, node_id.id))
break
except Exception as error:
if retry_count < client.get_max_retry_count():
logging.info('got error {}, trying again...'.format(error))
await client.reconnect()
retry_count += 1
else:
raise error
if len(response) > 0:
raise response[0]
async def update_relation(client, query):
"""
Create a new column if it does not exist.
"""
response = []
def callback(result):
if result['status'] == 'error':
response.append(exceptions.ColumnException(
result['code'], result.get('msg', '')))
retry_count = 0
while True:
try:
node_id = get_node_to_send_column_command(client)
logging.info("sending relation update command to node: %s",
node_id.id)
tmp_result = await client.execute_single_node(node_id,
RELATION_CREATION_COMMAND, query)
callback(tmp_result)
logging.info(
"sending command msg {} to node {}".format(query, node_id.id))
break
except Exception as error:
if retry_count < client.get_max_retry_count():
logging.warning('got error {}, trying again...'.format(error))
await client.reconnect()
retry_count += 1
else:
raise error
if len(response) > 0:
raise response[0]
def get_node_to_send_column_command(client):
up_nodes = list(client.node_manager.get_online_nodes())
up_nodes_set = set([k.id for k in up_nodes])
data_center = client.get_preferred_datacenter_for_column_creation()
primary_nodes = set(int(n) for n in client.data_centers[data_center])
up_primary_nodes = sorted(list(up_nodes_set & primary_nodes))
try:
elected = up_primary_nodes[
client.next_column_creation_node_index % len(up_primary_nodes)]
except ZeroDivisionError:
if up_nodes:
message = (
"No nodes available in data center %s (preferred for column creation), "
"will try another node in other data center..."
% data_center)
logging.warning(message)
elected = random.choice(up_nodes)
else:
message = "No up nodes available for column creation."
logging.error(message)
raise exceptions.ColumnException("ColumnException", message)
client.next_column_creation_node_index += 1
for up_node in up_nodes:
if up_node.id == elected:
return up_node
return elected
async def get_list_of_columns(client, full=False):
column_list = []
columns = {}
def callback(result):
if result.get('status') == 'error':
raise exceptions.SearchException(
result.get('code'), result.get('msg'))
if full:
column_list.extend(result["result"])
else:
for r in result["result"]:
hash_ = r["hash"]
if hash_ in columns:
columns[hash_]["__count"] += 1
else:
r["__count"] = 1
columns[hash_] = r
retry_count = 0
while True:
try:
_, responses = await client.execute_on_cluster(COLUMN_LIST_COMMAND,
{"column_list": {}})
for response in responses:
response = response.result()
if isinstance(response, Exception):
raise Exception("Unavailable node, error: %s" % response)
callback(response)
break
except Exception as error:
logging.debug('Error happening while trying'
' to map column {}'.format(error))
if retry_count < client.get_max_retry_count():
await client.reconnect()
retry_count += 1
else:
raise
if full:
return column_list
return [m for m in columns.values()
if m["__count"] == len(client.node_manager.get_registered_nodes())]
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment