-
-
Save anthonylouisbsb/324b9b8a00ff800c194e7d2cfad9c159 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
async def create_columns_in_bulk(client, columns): | |
""" | |
Create a new column if it does not exist. | |
""" | |
for query in columns: | |
validate_column(query) | |
query = {'query': columns} | |
await create_single_column(client, query, True) | |
async def create_single_column(client, query, validated=False): | |
""" | |
Create a new column if it does not exist. | |
""" | |
if not validated: | |
validate_column(query) | |
response = [] | |
def callback(result): | |
if result['status'] == 'error': | |
response.append(exceptions.ColumnException( | |
result['code'], result.get('msg', ''))) | |
retry_count = 0 | |
while True: | |
try: | |
node_id = get_node_to_send_column_command(client) | |
logging.info("sending column creation command to node: %s", node_id.id) | |
tmp_result = await client.execute_single_node(node_id, SCHEMA_CREATION_COMMAND, | |
query) | |
callback(tmp_result) | |
logging.info( | |
"sending command msg {} to node {}".format(query, node_id.id)) | |
break | |
except Exception as error: | |
if retry_count < client.get_max_retry_count(): | |
logging.info('got error {}, trying again...'.format(error)) | |
await client.reconnect() | |
retry_count += 1 | |
else: | |
raise error | |
if len(response) > 0: | |
raise response[0] | |
async def update_relation(client, query): | |
""" | |
Create a new column if it does not exist. | |
""" | |
response = [] | |
def callback(result): | |
if result['status'] == 'error': | |
response.append(exceptions.ColumnException( | |
result['code'], result.get('msg', ''))) | |
retry_count = 0 | |
while True: | |
try: | |
node_id = get_node_to_send_column_command(client) | |
logging.info("sending relation update command to node: %s", | |
node_id.id) | |
tmp_result = await client.execute_single_node(node_id, | |
RELATION_CREATION_COMMAND, query) | |
callback(tmp_result) | |
logging.info( | |
"sending command msg {} to node {}".format(query, node_id.id)) | |
break | |
except Exception as error: | |
if retry_count < client.get_max_retry_count(): | |
logging.warning('got error {}, trying again...'.format(error)) | |
await client.reconnect() | |
retry_count += 1 | |
else: | |
raise error | |
if len(response) > 0: | |
raise response[0] | |
def get_node_to_send_column_command(client): | |
up_nodes = list(client.node_manager.get_online_nodes()) | |
up_nodes_set = set([k.id for k in up_nodes]) | |
data_center = client.get_preferred_datacenter_for_column_creation() | |
primary_nodes = set(int(n) for n in client.data_centers[data_center]) | |
up_primary_nodes = sorted(list(up_nodes_set & primary_nodes)) | |
try: | |
elected = up_primary_nodes[ | |
client.next_column_creation_node_index % len(up_primary_nodes)] | |
except ZeroDivisionError: | |
if up_nodes: | |
message = ( | |
"No nodes available in data center %s (preferred for column creation), " | |
"will try another node in other data center..." | |
% data_center) | |
logging.warning(message) | |
elected = random.choice(up_nodes) | |
else: | |
message = "No up nodes available for column creation." | |
logging.error(message) | |
raise exceptions.ColumnException("ColumnException", message) | |
client.next_column_creation_node_index += 1 | |
for up_node in up_nodes: | |
if up_node.id == elected: | |
return up_node | |
return elected | |
async def get_list_of_columns(client, full=False): | |
column_list = [] | |
columns = {} | |
def callback(result): | |
if result.get('status') == 'error': | |
raise exceptions.SearchException( | |
result.get('code'), result.get('msg')) | |
if full: | |
column_list.extend(result["result"]) | |
else: | |
for r in result["result"]: | |
hash_ = r["hash"] | |
if hash_ in columns: | |
columns[hash_]["__count"] += 1 | |
else: | |
r["__count"] = 1 | |
columns[hash_] = r | |
retry_count = 0 | |
while True: | |
try: | |
_, responses = await client.execute_on_cluster(COLUMN_LIST_COMMAND, | |
{"column_list": {}}) | |
for response in responses: | |
response = response.result() | |
if isinstance(response, Exception): | |
raise Exception("Unavailable node, error: %s" % response) | |
callback(response) | |
break | |
except Exception as error: | |
logging.debug('Error happening while trying' | |
' to map column {}'.format(error)) | |
if retry_count < client.get_max_retry_count(): | |
await client.reconnect() | |
retry_count += 1 | |
else: | |
raise | |
if full: | |
return column_list | |
return [m for m in columns.values() | |
if m["__count"] == len(client.node_manager.get_registered_nodes())] | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment