Skip to content

Instantly share code, notes, and snippets.

@caniko
Last active January 20, 2022 11:35
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save caniko/c52bc97a3b93e1f2d38b655d7f5c15ae to your computer and use it in GitHub Desktop.
Save caniko/c52bc97a3b93e1f2d38b655d7f5c15ae to your computer and use it in GitHub Desktop.
Create materialized view with Python Datastax Driver
from logging import warning
from typing import Union, Iterable, Optional
from cassandra.cqlengine.models import Model
from cassandra.cluster import Session
def join_null_check_columns(column_keys):
return " ".join(f"AND {key} IS NOT NULL" for key in column_keys)
def create_materialized_view(
cassandra_session: Session,
keyspace_name: str,
cassandra_model: Model,
new_partition_key: Union[str, Iterable[str]],
name_of_materialized_view: Optional[str] = None,
old_partition_keys_as_last_clustering_keys: bool = True,
user_defined_clustering_keys: Optional[list[str]] = None,
):
number_of_additional_primary_keys = 0
if isinstance(new_partition_key, str):
if new_partition_key not in cassandra_model._primary_keys.keys():
number_of_additional_primary_keys += 1
new_partition_key_string = new_partition_key
clustering_keys = [
ck for ck in cassandra_model._primary_keys.keys() if ck != new_partition_key
]
name_of_new_view = (
f"{keyspace_name}.{cassandra_model.__name__}_{new_partition_key}"
)
partition_key_null_check_row = f"WHERE {new_partition_key} IS NOT NULL"
else:
new_partition_key = tuple(new_partition_key)
for single_new_partition_key in new_partition_key:
if single_new_partition_key not in cassandra_model._primary_keys.keys():
number_of_additional_primary_keys += 1
new_partition_key_string = f"({', '.join(new_partition_key)})"
clustering_keys = [
ck
for ck in cassandra_model._primary_keys.keys()
if ck not in new_partition_key
]
name_of_new_view = (
f"{keyspace_name}.{cassandra_model.__name__}_{'_'.join(new_partition_key)}"
)
partition_key_null_check_row = f"WHERE {new_partition_key[0]} IS NOT NULL {join_null_check_columns(new_partition_key[1:])}"
if name_of_materialized_view:
name_of_new_view = name_of_materialized_view
if user_defined_clustering_keys:
for ck in clustering_keys:
if ck not in user_defined_clustering_keys:
user_defined_clustering_keys.append(ck)
clustering_keys = user_defined_clustering_keys
elif old_partition_keys_as_last_clustering_keys:
for old_partition_key in cassandra_model._partition_keys.keys():
if old_partition_key in clustering_keys:
clustering_keys.remove(old_partition_key)
clustering_keys.append(old_partition_key)
for ck in clustering_keys:
if ck not in cassandra_model._primary_keys.keys():
number_of_additional_primary_keys += 1
if number_of_additional_primary_keys > 1:
warning(
f"number_of_additional_primary_keys is higher than 1 ({number_of_additional_primary_keys}). "
f"The generation of the materialized view should fail"
)
cassandra_session.execute(
query=(
f"CREATE MATERIALIZED VIEW {name_of_new_view} AS"
f" SELECT * FROM {keyspace_name}.{cassandra_model.__name__}"
f" {partition_key_null_check_row} {join_null_check_columns(clustering_keys)}"
f" PRIMARY KEY ({new_partition_key_string}, {', '.join(clustering_keys)})"
f" WITH comment='Allow query by {new_partition_key_string} instead of ({''.join(cassandra_model._partition_keys.keys())})';"
)
)
return name_of_new_view
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment