Skip to content

Instantly share code, notes, and snippets.

@StefanBogdan
Last active January 31, 2024 07:53
Show Gist options
  • Save StefanBogdan/350acfb739ced3c990b664c0637aa9aa to your computer and use it in GitHub Desktop.
Save StefanBogdan/350acfb739ced3c990b664c0637aa9aa to your computer and use it in GitHub Desktop.
Migrate data between Weaviate instances
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "markdown",
"id": "2257d533-34c3-42df-83b3-e4b19aa7d6da",
"metadata": {},
"source": [
"# Migrate data between Weaviate instances"
]
},
{
"cell_type": "markdown",
"id": "7d049801-dee4-4136-a17d-3340ad8e9588",
"metadata": {},
"source": [
"This fucntion allows to migrate data between two different Weaviate instances or within the same one. It makes use of the [cursor API](https://weaviate.io/developers/weaviate/api/graphql/additional-operators#cursor-with-after) to do this, so please make sure you have a Source Weaviate version that support this feature, `Weaviate version >=1.18.0`.\n",
"\n",
"**NOTE**: Multi-Tenancy was introduced with version `>=1.20.0`."
]
},
{
"cell_type": "code",
"execution_count": 13,
"id": "394a4673-27e8-4f08-8cf8-6e559914f596",
"metadata": {},
"outputs": [],
"source": [
"from typing import List, Optional\n",
"from weaviate import Client\n",
"from tqdm import tqdm\n",
"\n",
"def migrate_data_from_weaviate_to_weaviate(\n",
"\t\tsource_wv: Client,\n",
"\t\ttarget_wv: Client,\n",
"\t\tfrom_class_name: str,\n",
"\t\tto_class_name: str,\n",
"\t\tfrom_tenant: Optional[str] = None,\n",
"\t\tto_tenant: Optional[str] = None,\n",
"\t\tlimit: int = 500,\n",
"\t\tbatch_size: int = 50,\n",
"\t\tafter_uuid: Optional[str] = None,\n",
"\t\tcount: int = 0,\n",
"\t) -> None:\n",
"\t\"\"\"\n",
"\tMigrate Weaviate data from a Source Weaviate to a Target Weaviate. This function\n",
"\tallows to migrate data in 4 different configs:\n",
"\t\t1. Class -> Class\n",
"\t\t2. Class -> Tenant\n",
"\t\t3. Tenant -> Class\n",
"\t\t4. Tenant -> Tenant\n",
"\tNote that this is mean to migrate data that has no cross-references properties, if\n",
"\tyou have cross-references for the class to migrate some changes might be needed for\n",
"\tthis script.\n",
"\t\t\n",
"\tParameters\n",
"\t----------\n",
"\tsource_wv: Client\n",
"\t\tThe Source Weaviate Client object instance from which to query the data\n",
"\t\t(including the UUID and the underlying vector, if one is present.)\n",
"\ttarget_wv: Client\n",
"\t\tThe Target Weaviate Client object instance to which to ingest the data.\n",
"\t\tNOTE: The batch config is going to be overridden in this function. If you want\n",
"\t\tto keep your previous config of the batch, you can remove the `batch.configure`\n",
"\t\tcall in this function.\n",
"\tfrom_class_name: str\n",
"\t\tThe Source Weaviate class that should be migrated.\n",
"\tto_class_name: str\n",
"\t\tThe Target Weaviate class that should host the Source Weaviate data.\n",
"\tfrom_tenant: Optional[str] = None\n",
"\t\tThe Source Weaviate class tenant that that should be migrated. If it is None,\n",
"\t\tthen it means that the Source class has no Multi-Tenancy enabled and the whole\n",
"\t\tclass needs to be migrated.\n",
"\t\tBy default None\n",
"\tto_tenant: Optional[str] = None\n",
"\t\tThe Target Weaviate class tenant that should host the migrated data.mIf it is\n",
"\t\tNone then it means that Target Weaviate has no Multi-Tenancy enabled and the\n",
"\t\tdata from the Source Weaviate will be in non-Multi-Tenancy class.\n",
"\t\tBy default None\n",
"\tlimit: int = 500\n",
"\t\tThe limit used for quering data from Source Weaviate.\n",
"\t\tNOTE: Do not set to high value to avoid long requests.\n",
"\tbatch_size: int = 50\n",
"\t\tThe batch size configured for the Target Weaviate.\n",
"\t\tNOTE: Do not set to high value to avoid long requests.\n",
"\tafter_uuid: Optional[str] = None\n",
"\t\tThe after UUID to be used in cursor API. It is meant to be used in case the script\n",
"\t\tfaild in the middle of the process of migration. Leave it to None on first run.\n",
"\t\tBy default None\n",
"\tcount: int = 0\n",
"\t\tThe number of objects that were already ingested in the Target Weaviate. It is\n",
"\t\tmeant to be used in case the script faild in the middle of the process of migration,\n",
"\t\tand is used ONLY for the progress bar. Can be ignored.\n",
"\t\"\"\"\n",
"\n",
"\t# get source class properties\n",
"\tproperties = [prop[\"name\"] for prop in source_wv.schema.get(from_class_name)[\"properties\"]]\n",
"\n",
"\t# get number of items in the class/tenant\n",
"\tobj_count_query = (\n",
"\t\tsource_wv\n",
"\t\t.query\n",
"\t\t.aggregate(class_name=from_class_name)\n",
"\t\t.with_meta_count()\n",
"\t)\n",
"\tif from_tenant is not None:\n",
"\t\tobj_count_query = (\n",
"\t\t\tobj_count_query\n",
"\t\t\t.with_tenant(from_tenant)\n",
"\t\t)\n",
"\tnum_objects = (\n",
"\t\tobj_count_query\n",
"\t\t.do()\n",
"\t\t[\"data\"][\"Aggregate\"][from_class_name][0][\"meta\"][\"count\"]\n",
"\t)\n",
"\n",
"\ttry:\n",
"\t\t# configure Target Weaviate Batch\n",
"\t\ttarget_wv.batch.configure(\n",
"\t\t\tbatch_size=batch_size,\n",
"\t\t)\n",
"\t\tadditional_item_config = {\n",
"\t\t\t\"tenant\": to_tenant\n",
"\t\t}\n",
"\t\twith target_wv.batch as target_batch, tqdm(total=(num_objects - count)) as pbar:\n",
"\n",
"\t\t\t# helper function to ingest data into Target Weaviate\n",
"\t\t\tdef ingest_data_in_batches(objects: List[dict]) -> str:\n",
"\t\t\t\t\"\"\"\n",
"\t\t\t\tIngest data into Target Weaviate using Batch API.\n",
"\n",
"\t\t\t\tParameters\n",
"\t\t\t\t----------\n",
"\t\t\t\tobjects: List[dict]\n",
"\t\t\t\t\tA list of Waviate objects from the Source Weaviate, the list conatins\n",
"\t\t\t\t\tall objects of the current Source Weaviate page.\n",
"\t\t\t\t\tCannot be empty list!!!\n",
"\n",
"\t\t\t\tReturns\n",
"\t\t\t\t-------\n",
"\t\t\t\tstr\n",
"\t\t\t\t\tThe last UUID in the Page to be used with cursor API feature.\n",
"\t\t\t\t\"\"\"\n",
"\n",
"\t\t\t\tfor obj in objects:\n",
"\t\t\t\t\tweaviate_obj = obj.copy()\n",
"\t\t\t\t\tvector = weaviate_obj[\"_additional\"][\"vector\"]\n",
"\t\t\t\t\tuuid = weaviate_obj[\"_additional\"][\"id\"]\n",
"\t\t\t\t\tdel weaviate_obj[\"_additional\"]\n",
"\n",
"\t\t\t\t\tif len(vector) == 0:\n",
"\t\t\t\t\t\ttarget_batch.add_data_object(\n",
"\t\t\t\t\t\t\tdata_object=weaviate_obj,\n",
"\t\t\t\t\t\t\tclass_name=to_class_name,\n",
"\t\t\t\t\t\t\tuuid=uuid,\n",
"\t\t\t\t\t\t\t**additional_item_config\n",
"\t\t\t\t\t\t)\n",
"\t\t\t\t\telse:\n",
"\t\t\t\t\t\ttarget_batch.add_data_object(\n",
"\t\t\t\t\t\t\tdata_object=weaviate_obj,\n",
"\t\t\t\t\t\t\tclass_name=to_class_name,\n",
"\t\t\t\t\t\t\tuuid=uuid,\n",
"\t\t\t\t\t\t\tvector=vector,\n",
"\t\t\t\t\t\t\t**additional_item_config\n",
"\t\t\t\t\t\t)\n",
"\t\t\t\treturn uuid\n",
"\n",
"\t\t\t# migrate data\n",
"\t\t\twhile True:\n",
"\n",
"\t\t\t\tquery = (\n",
"\t\t\t\t\tsource_wv\n",
"\t\t\t\t\t.query\n",
"\t\t\t\t\t.get(class_name=from_class_name, properties=properties)\n",
"\t\t\t\t\t.with_additional(['vector', 'id'])\n",
"\t\t\t\t\t.with_limit(limit)\n",
"\t\t\t\t)\n",
"\t\t\t\tif after_uuid:\n",
"\t\t\t\t\tquery = query.with_after(after_uuid)\n",
"\t\t\t\tif from_tenant:\n",
"\t\t\t\t\tquery = query.with_tenant(from_tenant)\n",
"\t\t\t\tsource_data = query.do()\n",
"\n",
"\t\t\t\tif \"errors\" in source_data:\n",
"\t\t\t\t\traise Exception(\n",
"\t\t\t\t\t\tf\"Failed to get data after object UUID '{after_uuid}' for class '{from_class_name}'\",\n",
"\t\t\t\t\t\tf\" from '{from_tenant}'!\\n\" if from_tenant else \"\\n\",\n",
"\t\t\t\t\t\tsource_data[\"errors\"]\n",
"\t\t\t\t\t)\n",
"\t\t\t\tpage_object = source_data[\"data\"][\"Get\"][from_class_name]\n",
"\n",
"\t\t\t\tif len(page_object) == 0:\n",
"\t\t\t\t\tbreak\n",
"\t\t\t\tafter_uuid = ingest_data_in_batches(objects=page_object)\n",
"\t\t\t\tpbar.update(limit)\n",
"\texcept:\n",
"\t\tprint(\n",
"\t\t\tf\"Something went wrong. The last after_uuid was: '{after_uuid}' for Source Weaviate \"\n",
"\t\t\tf\"class {from_class_name}\"\n",
"\t\t\tf\" from tenant {from_tenant}! \" if from_tenant else \". \"\n",
"\t\t\tf\"The Target Weaviate class was {to_class_name}\"\n",
"\t\t\tf\" with tenant {to_tenant}!\\n\" if to_tenant else \"!\\n\"\n",
"\t\t)\n",
"\t\traise\n",
"\tfinally:\n",
"\t\t# The migration function uses the batch API in a context manager and when it exits\n",
"\t\t# the context manager it also shuts down the BatchExecutor, so we can re-start it here.\n",
"\t\t# It get automatically started when entering a new context manager but prints a warning.\n",
"\t\t# It is started in 'finally' in case there is a re-try mechanism on errors\n",
"\t\ttarget_wv.batch.start()\t"
]
},
{
"cell_type": "markdown",
"id": "60c131e2-5be6-461d-9166-734cf2d095c6",
"metadata": {},
"source": [
"## Examples"
]
},
{
"cell_type": "markdown",
"id": "3bd49010-d16f-4a61-af34-9841f83ff6f0",
"metadata": {},
"source": [
"### Class -> Class"
]
},
{
"cell_type": "markdown",
"id": "34d134d9-ee96-4b24-aa66-cb45ea7ceabe",
"metadata": {},
"source": [
"In this example it is showcased how to do a *class-to-class* data transfer between two Weaviate instances. Here **ALL** the classes from the **Source Weaviate** are transfered to the **Target Weaviate**. You can adjust the below example to meet your needs, like having different class names.\n",
"\n",
"**NOTE:** The recommended way is to create the schema for the **Target Weaviate** before migration. This way you could set a different replication factor or other HNSW configurations than the **Source Weaviate**. Here it is assumed that the **Target Weaviate's** schema was already created before."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "12868e6b-308a-4854-9b42-b4b48deeac3f",
"metadata": {},
"outputs": [],
"source": [
"from typing import List\n",
"from weaviate import Client\n",
"\n",
"SOURCE_WEAVIATE_URL = \"https://source.weaviate.network\"\n",
"TARGET_WEAVIATE_URL = \"https://target.weaviate.network\"\n",
"\n",
"source_client = Client(\n",
" url=SOURCE_WEAVIATE_URL,\n",
")\n",
"\n",
"target_client = Client(\n",
" url=TARGET_WEAVIATE_URL,\n",
")\n",
"\n",
"classes: List[str] = [class_schema[\"class\"] for class_schema in source_client.schema.get()[\"classes\"]]\n",
"\n",
"for cls in classes:\n",
" print(f\"Start migration for class '{cls}'\")\n",
" migrate_data_from_weaviate_to_weaviate(\n",
" source_wv=source_client,\n",
" target_wv=target_client,\n",
" from_class_name=cls,\n",
" to_class_name=cls,\n",
" )\n",
" print(f\"Class '{cls}' migrated to '{TARGET_WEAVIATE_URL}'\")"
]
},
{
"cell_type": "markdown",
"id": "53705f33-888d-4ebe-88e8-906d6c5fb7f0",
"metadata": {},
"source": [
"### Tenant -> Tenant"
]
},
{
"cell_type": "markdown",
"id": "56965ec2-4157-4520-b4b5-c439690477a8",
"metadata": {},
"source": [
"In this example it is showcased how to do a *tenant-to-tenant* data transfer between two Weaviate instances. Here **ALL** the classes and tenants from the **Source Weaviate** are transfered to the **Target Weaviate**. You can adjust the below example to meet your needs, like having different class names or tenants.\n",
"\n",
"**NOTE:** The recommended way is to create the schema for the **Target Weaviate** before migration. This way you could set a different replication factor or other HNSW configurations than the **Source Weaviate**. Here it is assumed that the **Target Weaviate's** schema was already created before but **NOT** the Tenants."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "a7ee8660-377e-41dd-b1fd-46bb156ae646",
"metadata": {},
"outputs": [],
"source": [
"from typing import List\n",
"from weaviate import Client, Tenant\n",
"\n",
"SOURCE_WEAVIATE_URL = \"https://source.weaviate.network\"\n",
"TARGET_WEAVIATE_URL = \"https://target.weaviate.network\"\n",
"\n",
"source_client = Client(\n",
" url=SOURCE_WEAVIATE_URL,\n",
")\n",
"\n",
"target_client = Client(\n",
" url=TARGET_WEAVIATE_URL,\n",
")\n",
"\n",
"classes: List[str] = [class_schema[\"class\"] for class_schema in source_client.schema.get()[\"classes\"]]\n",
"\n",
"for cls in classes:\n",
" tenants = source_client.schema.get_class_tenants(class_name=cls)\n",
" target_client.schema.add_class_tenants(\n",
" class_name=cls,\n",
" tenants=tenants,\n",
" )\n",
" print(f\"Start migration for class '{cls}'\")\n",
" for tenant in tenants:\n",
" migrate_data_from_weaviate_to_weaviate(\n",
" source_wv=source_client,\n",
" target_wv=target_client,\n",
" from_class_name=cls,\n",
" to_class_name=cls,\n",
" from_tenant=tenant.name,\n",
" to_tenant=tenant.name,\n",
" \n",
" )\n",
" print(f\"\\tTenant {tenant.name} from class'{cls}' migrated\")\n",
" print(f\"All tenants for class '{cls}' migrated to '{TARGET_WEAVIATE_URL}'\")\n"
]
},
{
"cell_type": "markdown",
"id": "a20357bc-db9e-411f-b80d-601bbf369331",
"metadata": {},
"source": [
"### Class -> Tenant"
]
},
{
"cell_type": "markdown",
"id": "b41442ef-c47a-484f-8de2-738295e45acc",
"metadata": {},
"source": [
"In this example it is showcased how to do a *class-to-tenant* data transfer between two Weaviate instances. Here **ALL** the classes from the **Source Weaviate** are transfered to separate class tenants in the **Target Weaviate**. You can adjust the below example to meet your needs, like having different class names, tenants or tenant configuration. It is assumend that each **Source Weaviate's** class has the naming convention: **CLASS_NAME + \"_\" + TENANT_ID**.|\n",
"\n",
"**NOTE:** The recommended way is to create the schema for the **Target Weaviate** before migration. This way you could set a different replication factor or other HNSW configurations than the **Source Weaviate**.\n",
"\n",
"**Asumptions:**\n",
"- **Target Weaviate** has a class schema created for **CLASS_NAME**.\n",
"- All **Source Weaviate's** classes have the naming convention: **CLASS_NAME + \"_\" + TENANT_ID**, and they are for a single **CLASS_NAME**.\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "9a13f743-ce66-48fd-aea4-6ba8b5473969",
"metadata": {},
"outputs": [],
"source": [
"from typing import List\n",
"from weaviate import Client, Tenant\n",
"\n",
"SOURCE_WEAVIATE_URL = \"https://source.weaviate.network\"\n",
"TARGET_WEAVIATE_URL = \"https://target.weaviate.network\"\n",
"CLASS_NAME = \"MyClass\"\n",
"\n",
"source_client = Client(\n",
" url=SOURCE_WEAVIATE_URL,\n",
")\n",
"\n",
"target_client = Client(\n",
" url=TARGET_WEAVIATE_URL,\n",
")\n",
"\n",
"source_classes: List[str] = [class_schema[\"class\"] for class_schema in source_client.schema.get()[\"classes\"]]\n",
"target_tenants: List[Tenants] = [Tenant(cls.strip(CLASS_NAME + \"_\")) for cls in source_classes]\n",
"\n",
"for cls, tenant in zip(source_classes, target_tenants):\n",
"\n",
" # One could create all tenants at once before the for-loop\n",
" # it is done this way in case there are thousands of tenants\n",
" target_client.schema.add_class_tenants(\n",
" class_name=CLASS_NAME,\n",
" tenants=[tenant],\n",
" )\n",
" print(f\"Start migration for class {cls}\")\n",
" migrate_data_from_weaviate_to_weaviate(\n",
" source_wv=source_client,\n",
" target_wv=target_client,\n",
" from_class_name=cls,\n",
" to_class_name=CLASS_NAME,\n",
" to_tenant=tenant.name,\n",
" )\n",
" print(f\"Class {cls} migrated to Tenant {tenant.name}\")"
]
},
{
"cell_type": "markdown",
"id": "ff1128b3-c2e5-4fcf-aac4-a0507ab4ee93",
"metadata": {},
"source": [
"### Tenant -> Class"
]
},
{
"cell_type": "markdown",
"id": "85839cc6-89dd-40e2-b530-c2786944813d",
"metadata": {},
"source": [
"In this example it is showcased how to do a *tenant-to-class* data transfer between two Weaviate instances. Here **ALL** the tenants of a single class from the **Source Weaviate** are transfered to separate classes in the **Target Weaviate**. You can adjust the below example to meet your needs, like having different class names, tenants or tenant configuration. It is assumend that each **Target Weaviate's** class has the naming convention: **CLASS_NAME + \"_\" + TENANT_ID**.\n",
"\n",
"**NOTE:** The recommended way is to create the schema for the **Target Weaviate** before migration. This way you could set a different replication factor or other HNSW configurations than the **Source Weaviate**.\n",
"\n",
"**Asumptions:**\n",
"- **Target Weaviate** has all classes schema created. (You can add schema creation in this example if needed)\n",
"- All **Source Weaviate's** tenants are of a single **CLASS_NAME**."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "3f152748-c844-4810-8d81-89671f4da9de",
"metadata": {},
"outputs": [],
"source": [
"from typing import List\n",
"from weaviate import Client, Tenant\n",
"\n",
"SOURCE_WEAVIATE_URL = \"https://source.weaviate.network\"\n",
"TARGET_WEAVIATE_URL = \"https://target.weaviate.network\"\n",
"CLASS_NAME = \"MyClass\"\n",
"\n",
"source_client = Client(\n",
" url=SOURCE_WEAVIATE_URL,\n",
")\n",
"\n",
"target_client = Client(\n",
" url=TARGET_WEAVIATE_URL,\n",
")\n",
"\n",
"source_tenants: List[Tenants] = client.schema.get_class_tenants(CLASS_NAME)\n",
"target_classes: List[str] = [CLASS_NAME + \"_\" + tenant.name for tenant in source_tenants]\n",
"\n",
"for cls, tenant in zip(target_classes, source_tenants):\n",
"\n",
" # One could create the target class schema here with the desired config\n",
" \n",
" print(f\"Start migration for Tenat {tenant.name}\")\n",
" migrate_data_from_weaviate_to_weaviate(\n",
" source_wv=source_client,\n",
" target_wv=target_client,\n",
" from_class_name=CLASS_NAME,\n",
" to_class_name=cls,\n",
" from_tenant=tenant.name,\n",
" )\n",
" print(f\"Tenant {tenant.name} migrated to Class {cls}\")"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3 (ipykernel)",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.11.5"
}
},
"nbformat": 4,
"nbformat_minor": 5
}
@paulcalcraft
Copy link

paulcalcraft commented Nov 23, 2023

Thanks for this! There's a small error that breaks if you're not using tenancy for your target. The following lines:

additional_item_config = {
	"tenant": to_tenant
}

should be:

additional_item_config = {}
if to_tenant is not None:
	additional_item_config["tenant"] = to_tenant

@KristianMischke
Copy link

@StefanBogdan Can you provide a license for this code snippet, it's very helpful, but need to know the license in order to use it in other software. Thanks!

@litagent
Copy link

@StefanBogdan can you also share a migration script with cross references as example.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment