Skip to content

Instantly share code, notes, and snippets.

@jamieparkinson
Created April 21, 2021 11:53
Show Gist options
  • Save jamieparkinson/88f6d652fd2f7266bb95d931863aae9f to your computer and use it in GitHub Desktop.
Save jamieparkinson/88f6d652fd2f7266bb95d931863aae9f to your computer and use it in GitHub Desktop.
Matcher graphs
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import boto3\n",
"from dynamo import get_graph_table_row, get_graph_component\n",
"from elastic import get_pipeline_storage_es_client, get_nodes_properties\n",
"index_date=\"2021-03-19\""
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"session = boto3.Session(profile_name=\"platform-read_only\")\n",
"# Necessary for reading secrets\n",
"dev_session = boto3.Session(profile_name=\"platform-developer\")\n",
"es = get_pipeline_storage_es_client(dev_session, index_date=index_date)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import networkx as nx\n",
"\n",
"component_cache = {}\n",
"nodes_cache = {}\n",
"\n",
"def get_graph(work_id):\n",
" graph = nx.Graph()\n",
" \n",
" try:\n",
" initial_row = get_graph_table_row(\n",
" session, index_date=index_date, work_id=work_id\n",
" )\n",
" except Exception:\n",
" raise Exception(\"Could not find work in graph\")\n",
"\n",
" component_id = initial_row[\"componentId\"]\n",
" \n",
" if component_id not in component_cache:\n",
" graph_component = get_graph_component(\n",
" session, index_date=index_date, component_id=component_id\n",
" )\n",
" component_cache[component_id] = graph_component\n",
" else:\n",
" graph_component = component_cache[component_id]\n",
" \n",
" if len(graph_component) < 3:\n",
" return None\n",
" \n",
" node_ids = [node[\"id\"] for node in graph_component]\n",
" node_links = [node[\"linkedIds\"] for node in graph_component]\n",
" \n",
" try:\n",
" tup = tuple(node_ids)\n",
" if tup not in nodes_cache:\n",
" nodes = get_nodes_properties(es, index_date=index_date, work_ids=node_ids)\n",
" nodes_cache[tup] = nodes\n",
" else:\n",
" nodes = nodes_cache[tup]\n",
" except:\n",
" return None\n",
" \n",
" deleted_node_ids = {node[\"id\"] for node in nodes if node[\"type\"] == \"Deleted\"}\n",
" valid_node_links = [\n",
" [dest for dest in ids if dest not in deleted_node_ids] for ids in node_links\n",
" ]\n",
" for node, links in filter(\n",
" lambda x: x[0][\"type\"] != \"Deleted\", zip(nodes, valid_node_links)\n",
" ):\n",
" graph.add_node(node[\"id\"], source_id_type=node[\"source_id_type\"])\n",
" graph.add_edges_from([(node[\"id\"], dest) for dest in links])\n",
" \n",
" return graph"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"def is_potential_overlinked_miro(graph):\n",
" maybe_miros = [n for n in graph.nodes if graph.out_degree(n) == 0 and graph.in_degree(n) > 1]\n",
" for node in maybe_miros:\n",
" linkers = graph.predecessors(node)\n",
" linker_subgraph = graph.subgraph(linkers)\n",
" if nx.number_weakly_connected_components(linker_subgraph) > 1:\n",
" return True\n",
" return False\n",
" "
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"def get_miro_n_links(graph, miro_id):\n",
" neighbors = nx.neighbors(graph, miro_id)\n",
" sierra_neighbors = [n for n in neighbors if graph.nodes[n][\"source_id_type\"] == \"sierra-system-number\"]\n",
" neighbor_subgraph = graph.subgraph(sierra_neighbors)\n",
" return nx.number_connected_components(neighbor_subgraph)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"def scan_table(dynamo_client, *, TableName, **kwargs):\n",
" \"\"\"\n",
" Generates all the items in a DynamoDB table.\n",
" :param dynamo_client: A boto3 client for DynamoDB.\n",
" :param TableName: The name of the table to scan.\n",
" Other keyword arguments will be passed directly to the Scan operation.\n",
" See https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/dynamodb.html#DynamoDB.Client.scan\n",
" \"\"\"\n",
" paginator = dynamo_client.get_paginator(\"scan\")\n",
" for page in paginator.paginate(TableName=TableName, **kwargs):\n",
" yield from page[\"Items\"]"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from tqdm.notebook import tqdm\n",
"from elasticsearch.helpers import scan\n",
"\n",
"miro_links = {}\n",
"for hit in tqdm(scan(es, scroll=\"30m\", _source=[\"state.sourceIdentifier.identifierType.id\", \"type\", \"data.mergeCandidates\"], index=f\"works-identified-{index_date}\")):\n",
" if hit[\"_source\"][\"state\"][\"sourceIdentifier\"][\"identifierType\"][\"id\"] != \"sierra-system-number\":\n",
" continue\n",
" if hit[\"_source\"][\"type\"] == \"Deleted\":\n",
" continue\n",
" \n",
" for identifier in hit[\"_source\"][\"data\"][\"mergeCandidates\"]:\n",
" if identifier[\"id\"][\"sourceIdentifier\"][\"identifierType\"][\"id\"] == \"miro-image-number\":\n",
" c_id = identifier[\"id\"][\"canonicalId\"]\n",
" miro_links[c_id] = miro_links.get(c_id, 0) + 1 "
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from tqdm.notebook import tqdm\n",
"\n",
"potential_overlinks = [work_id for work_id, count in miro_links.items() if count >= 2]\n",
"\n",
"definite_problem_works = {}\n",
"pw_graphs = {}\n",
"for work_id in tqdm(potential_overlinks):\n",
" graph = get_graph(work_id)\n",
" if graph is not None and graph.has_node(work_id):\n",
" n = get_miro_n_links(graph, work_id)\n",
" if n > 1:\n",
" definite_problem_works[work_id] = n\n",
" pw_graphs[work_id] = graph"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"sorted(definite_problem_works, key=definite_problem_works.get)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"distinct_graphs = {}\n",
"topology_counts = {}\n",
"graphs = list(pw_graphs.items())\n",
"for i, (work_id, graph) in enumerate(graphs):\n",
" print(i, work_id)\n",
" for j in range(i):\n",
" comp_id, comp_graph = graphs[j]\n",
" if graph.has_node(comp_id):\n",
" break\n",
" if nx.is_isomorphic(graph, comp_graph):\n",
" topology_counts[work_id] = topology_counts.get(work_id, 0) + 1\n",
" break\n",
" if i == j + 1:\n",
" topology_counts[work_id] = 1\n",
" distinct_graphs[work_id] = graph"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"len(distinct_graphs)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import matplotlib.pyplot as plt\n",
"%matplotlib inline\n",
"plt.rcParams['figure.figsize'] = [20, 17]\n",
"for i, (_, graph) in enumerate(distinct_graphs.items()):\n",
" colors = get_color_map(graph)\n",
" plt.subplot(7, 9, i + 1)\n",
" nx.draw(graph, node_color=colors, node_size=15)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"def get_overlinked_miros(graph):\n",
" miros = [n for n, data in graph.nodes(data=True) if data[\"source_id_type\"] == \"miro-image-number\"]\n",
" result = []\n",
" for node in miros:\n",
" linkers = graph.neighbors(node)\n",
" linker_subgraph = graph.subgraph(linkers)\n",
" if nx.number_connected_components(linker_subgraph) > 1:\n",
" result.append(node)\n",
" return result\n",
"\n",
"\n",
"def has_mets_neighbor(graph, node):\n",
" return any(True for n in graph.neighbors(node) if graph.nodes[n][\"source_id_type\"] == \"mets\")\n",
"\n",
"def apply_split_routine(old_graph):\n",
" graph = old_graph.copy()\n",
" M = sorted(get_overlinked_miros(graph), key=lambda x: get_miro_n_links(graph, x))\n",
" for m in M:\n",
" neighbors = list(graph.neighbors(m))\n",
" mets_neighbor = next((n for n in neighbors if has_mets_neighbor(graph, n)), None)\n",
" if mets_neighbor:\n",
" for n in neighbors:\n",
" if n != mets_neighbor:\n",
" graph.remove_edge(m, n)\n",
" else:\n",
" d = min([graph.degree(n, weight=\"weight\") for n in neighbors])\n",
" N = [n for n in neighbors if graph.degree(n, weight=\"weight\") > d]\n",
" if not N and d > 1:\n",
" N = neighbors\n",
" for n in N:\n",
" graph.remove_edge(m, n)\n",
" \n",
" return graph, len(get_overlinked_miros(graph))"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"def get_color_map(graph):\n",
" def _get_iterator():\n",
" for _, data in graph.nodes(data=True):\n",
" if data[\"source_id_type\"] == \"miro-image-number\":\n",
" yield \"r\"\n",
" elif data[\"source_id_type\"] == \"mets\":\n",
" yield \"g\"\n",
" else:\n",
" yield \"b\"\n",
" return list(_get_iterator())"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"ambi_graphs = {}\n",
"split_graphs = {}\n",
"\n",
"for ident, graph in distinct_graphs.items():\n",
" split_graph, n_remaining = apply_split_routine(graph)\n",
" if n_remaining == 0:\n",
" split_graphs[ident] = split_graph\n",
" else:\n",
" ambi_graphs[ident] = split_graph"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"plt.rcParams['figure.figsize'] = [20, 17]\n",
"for i, (_, graph) in enumerate(split_graphs.items()):\n",
" colors = get_color_map(graph)\n",
" plt.subplot(7, 7, i + 1)\n",
" nx.draw(graph, node_color=colors, node_size=15)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"plt.rcParams['figure.figsize'] = [20, 17]\n",
"for i, (_, graph) in enumerate(ambi_graphs.items()):\n",
" colors = get_color_map(graph)\n",
" plt.subplot(5, 5, i + 1)\n",
" nx.draw(graph, node_color=colors, node_size=15)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "matcher-nb",
"language": "python",
"name": "matcher-nb"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.7.4"
}
},
"nbformat": 4,
"nbformat_minor": 4
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment