Skip to content

Instantly share code, notes, and snippets.

@pjbull
Last active April 4, 2021 06:49
Show Gist options
  • Save pjbull/1b8f92b84a40cd6fe2033468f98594a4 to your computer and use it in GitHub Desktop.
Save pjbull/1b8f92b84a40cd6fe2033468f98594a4 to your computer and use it in GitHub Desktop.
Can we detect if a class implementing __fspath__ is called with a writeable mode from open?
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Knowing if `open` called your `__fspath__`: A journey\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Let's write out a file to use"
]
},
{
"cell_type": "code",
"execution_count": 1,
"metadata": {},
"outputs": [],
"source": [
"!rm -f hello.txt\n",
"!echo \"hi!\" >> hello.txt"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## First pass is getting `code_context`:"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [],
"source": [
"import inspect\n",
"from pathlib import Path\n",
"\n",
"class C0:\n",
" def __fspath__(self): \n",
" caller_src = inspect.getframeinfo(inspect.stack()[1].frame).code_context\n",
" \n",
" print(caller_src)\n",
" \n",
" return str(Path(\"hello.txt\").resolve())\n"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"['with open(cell_0, \"r\") as f:\\n']\n"
]
}
],
"source": [
"cell_0 = C0()\n",
" \n",
"with open(cell_0, \"r\") as f:\n",
" assert f.read() == \"hi!\\n\"\n",
" "
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"[' \"r\"\\n']\n"
]
}
],
"source": [
"with open(\n",
" cell_0,\n",
" \"r\"\n",
") as f:\n",
" assert f.read() == \"hi!\\n\""
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Well, that's painful.... what else?\n",
"\n",
"\n",
"## Let's try `inspect.getsource` so that we go beyond `code_context`"
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {},
"outputs": [],
"source": [
"class C1:\n",
" def __fspath__(self): \n",
" caller_src = inspect.getsource(\n",
" inspect.stack()[1].frame\n",
" )\n",
" \n",
" print(caller_src)\n",
" \n",
" return str(Path(\"hello.txt\").resolve())\n"
]
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"cell_1 = C1()\n",
" \n",
"with open(cell_1, \"r\") as f:\n",
" assert f.read() == \"hi!\\n\"\n",
"\n"
]
}
],
"source": [
"cell_1 = C1()\n",
" \n",
"with open(cell_1, \"r\") as f:\n",
" assert f.read() == \"hi!\\n\""
]
},
{
"cell_type": "code",
"execution_count": 7,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"# I'm weirdly spaced\n",
"with open(\n",
" cell_1,\n",
" \"r\"\n",
") as f:\n",
" assert f.read() == \"hi!\\n\"\n",
"\n"
]
}
],
"source": [
"# I'm weirdly spaced\n",
"with open(\n",
" cell_1,\n",
" \"r\"\n",
") as f:\n",
" assert f.read() == \"hi!\\n\""
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Woo! We got some code, now let's build a regex to match the open write modes:"
]
},
{
"cell_type": "code",
"execution_count": 8,
"metadata": {},
"outputs": [],
"source": [
"import re\n",
"\n",
"# thanks @jayqi for tracking all these down\n",
"WRITE_MODES = {\"r+\", \"w\", \"w+\", \"a\", \"a+\", \"rb+\", \"wb\", \"wb+\", \"ab\", \"ab+\"}\n",
"\n",
"# regex escape `+`\n",
"RE_WRITE_MODES = {s.replace(\"+\", \"\\+\") for s in WRITE_MODES}\n",
"\n",
"\n",
"pattern = re.compile(\n",
" \"open\\(\"\n",
" \"[^,]+\"\n",
" \"[^\\\"]*\"\n",
" \"[\\\"']\" \n",
" \"(?P<mode>\" +\n",
" \"|\".join(RE_WRITE_MODES) +\n",
" \")\"\n",
" \"[\\\"']\"\n",
" \"\\)\"\n",
")\n",
"\n",
"def _write_from_open_call(source):\n",
" m = re.search(\n",
" pattern,\n",
" source, \n",
" )\n",
" \n",
" return m is not None"
]
},
{
"cell_type": "code",
"execution_count": 9,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"True"
]
},
"execution_count": 9,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"_write_from_open_call(\n",
"\"\"\"\n",
"with open(cell_1, \"wb+\") as f:\n",
" assert f.read() == \"hi!\\n\"\n",
"\"\"\"\n",
")"
]
},
{
"cell_type": "code",
"execution_count": 10,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"False"
]
},
"execution_count": 10,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"_write_from_open_call(\n",
"\"\"\"\n",
"with open(cell_1, \"r\") as f:\n",
" assert f.read() == \"hi!\\n\"\n",
"\"\"\"\n",
")"
]
},
{
"cell_type": "code",
"execution_count": 11,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"False"
]
},
"execution_count": 11,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"_write_from_open_call(\n",
"\"\"\"\n",
"with close(cell_1, \"r\") as f:\n",
" assert f.read() == \"hi!\\n\"\n",
"\"\"\"\n",
")"
]
},
{
"cell_type": "code",
"execution_count": 12,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"True"
]
},
"execution_count": 12,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"_write_from_open_call(\n",
"\"\"\"\n",
"with close(cell_1, \"r\") as f:\n",
" assert f.read() == \"hi!\\n\"\n",
" \n",
"with open(Path('not_a_C2'), 'w') as f2:\n",
" assert f.read() == \"hi!\\n\"\n",
"\"\"\"\n",
")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"That's all well and good, but what if the source context is a lot longer and we have multiple `open` and the `S3Path` version is a read, but some other one is a write? The last `True` above should be `False`...\n",
"\n",
"\n",
"## Down the rabbit-hole: parse the AST"
]
},
{
"cell_type": "code",
"execution_count": 13,
"metadata": {},
"outputs": [],
"source": [
"import ast\n",
"import inspect\n",
"\n",
"\n",
"def _is_open_call_write_with_var(ast_node, var_names=None, var_type=None):\n",
" \"\"\" For a given AST node, check that the node is a `Call`, and that the\n",
" call is to a function with the name `open`, and that the last argument\n",
" \n",
" If passed, return True if the first argument is a variable with a name in var_names.\n",
" \n",
" If passed, return True if the first arg is a Call to instantiate var_type. \n",
" \"\"\"\n",
" if not isinstance(ast_node, ast.Call):\n",
" return False\n",
" if not hasattr(ast_node, \"func\"):\n",
" return False\n",
" if not hasattr(ast_node.func, \"id\"):\n",
" return False\n",
" if ast_node.func.id != \"open\":\n",
" return False\n",
" \n",
" # we are in an open call, get the path as first arg\n",
" path = ast_node.args[0]\n",
" \n",
" # get the mode as second arg or kwarg where arg==mode\n",
" mode = (\n",
" ast_node.args[1]\n",
" if len(ast_node.args) >= 2 else\n",
" [kwarg for kwarg in ast_node.keywords if kwarg.arg == \"mode\"][0].value\n",
" )\n",
" \n",
" # Ensure the path is either a call to instantiate var_type or\n",
" # the name of a variable we know is of the right type\n",
" path_is_of_type = (\n",
" (isinstance(path, ast.Call)\n",
" and path.func.id == var_type.__name__\n",
" )\n",
" or\n",
" (hasattr(path, \"id\") and (path.id in var_names))\n",
" )\n",
" \n",
" return (mode.s in WRITE_MODES) and path_is_of_type\n",
"\n",
"class C2:\n",
" def __fspath__(self):\n",
" # same getsource\n",
" caller_src = inspect.getsource(\n",
" inspect.stack()[1].frame\n",
" )\n",
"\n",
" # also get local variables in the frame\n",
" caller_local_variables = inspect.stack()[1].frame.f_locals\n",
" \n",
" # get all the instances in the previous frame of our class\n",
" instances_of_type = [\n",
" varname for varname, instance in caller_local_variables.items()\n",
" if isinstance(instance, type(self))\n",
" ]\n",
" \n",
" # Walk the AST of the previous frame source and see if\n",
" # open is called with a variable of our type...\n",
" print(\n",
" any(\n",
" _is_open_call_write_with_var(\n",
" n,\n",
" var_names=instances_of_type,\n",
" var_type=type(self)\n",
" ) for n in ast.walk(ast.parse(caller_src))\n",
" )\n",
" )\n",
"\n",
" return str(Path(\"hello.txt\").resolve())\n"
]
},
{
"cell_type": "code",
"execution_count": 14,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"False\n"
]
}
],
"source": [
"cell_2 = C2()\n",
" \n",
"# False = mode is r\n",
"with open(cell_2, \"r\") as f:\n",
" assert f.read() == \"hi!\\n\""
]
},
{
"cell_type": "code",
"execution_count": 15,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"True\n"
]
}
],
"source": [
"# True - with var `cell_2`, which is of type \n",
"with open(cell_2, \"r+\") as f:\n",
" assert f.read() == \"hi!\\n\""
]
},
{
"cell_type": "code",
"execution_count": 16,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"True\n"
]
}
],
"source": [
"# True - var `cell_2`, which is of type (mode is a kwarg)\n",
"with open(cell_2, mode=\"r+\") as f:\n",
" assert f.read() == \"hi!\\n\"\n",
" "
]
},
{
"cell_type": "code",
"execution_count": 17,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"True\n"
]
}
],
"source": [
"# True - weird spacing\n",
"with open(cell_2,\n",
" mode=\"r+\") as f:\n",
" assert f.read() == \"hi!\\n\""
]
},
{
"cell_type": "code",
"execution_count": 18,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"True\n"
]
}
],
"source": [
"# True - weird spacing and direct call to C2\n",
"with open(C2(),\n",
" mode=\"r+\") as f:\n",
" assert f.read() == \"hi!\\n\""
]
},
{
"cell_type": "code",
"execution_count": 19,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"False\n",
"False\n"
]
}
],
"source": [
"# False - call variable is path even if there is a C2\n",
"\n",
"cell_2 = C2()\n",
"path = Path(\"hello.txt\")\n",
" \n",
"with open(path, \"r+\") as f:\n",
" assert f.read() == \"hi!\\n\"\n",
" \n",
"# call fspath to make sure we are false for read\n",
"with open(cell_2, \"r\") as f:\n",
" assert f.read() == \"hi!\\n\"\n",
" \n",
"# call fspath to make sure we are false for read\n",
"with open(C2(), \"r\") as f:\n",
" assert f.read() == \"hi!\\n\""
]
},
{
"cell_type": "code",
"execution_count": 20,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"False\n",
"False\n"
]
}
],
"source": [
"# False - call variable is path\n",
"\n",
"cell_2 = C2()\n",
" \n",
"with open(Path(\"hello.txt\"), \"r+\") as f:\n",
" assert f.read() == \"hi!\\n\"\n",
" \n",
"# False 1 - call fspath to make sure we are false for read\n",
"with open(cell_2, \"r\") as f:\n",
" assert f.read() == \"hi!\\n\"\n",
" \n",
"# False 2 - call fspath to make sure we are false for read\n",
"with open(C2(), \"r\") as f:\n",
" assert f.read() == \"hi!\\n\""
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Benchmarking"
]
},
{
"cell_type": "code",
"execution_count": 21,
"metadata": {},
"outputs": [],
"source": [
"# no raising or checking to compare for benchmarking\n",
"class Base:\n",
" def __fspath__(self):\n",
" return str(Path(\"hello.txt\").resolve())"
]
},
{
"cell_type": "code",
"execution_count": 22,
"metadata": {},
"outputs": [],
"source": [
"class C1:\n",
" def __fspath__(self): \n",
" caller_src = inspect.getsource(\n",
" inspect.stack()[1].frame\n",
" )\n",
" \n",
" if _write_from_open_call(caller_src):\n",
" raise Exception(\"No writing!\")\n",
" \n",
" return str(Path(\"hello.txt\").resolve())\n",
"\n",
" "
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Let's redefine to actually raise so that we can compare. We'll add three different frame getting methods:\n",
"\n",
" - `inspect.stack` - known to be slow\n",
" - `inspect.currentframe` - should be faster\n",
" - `sys._getframe` - should be fastest, but CPython only + internal method"
]
},
{
"cell_type": "code",
"execution_count": 23,
"metadata": {},
"outputs": [],
"source": [
"import sys\n",
"\n",
"class C2:\n",
" def __init__(self, method='inspect'):\n",
" self.method = method\n",
" \n",
" def __fspath__(self):\n",
" # different frame fetching methods have different properties\n",
" if self.method == 'inspect':\n",
" frame = inspect.stack()[1].frame\n",
" elif self.method == 'currentframe':\n",
" frame = inspect.currentframe().f_back\n",
" else:\n",
" frame = sys._getframe().f_back\n",
" \n",
" # same getsource\n",
" caller_src = inspect.getsource(frame)\n",
"\n",
" # also get local variables in the frame\n",
" caller_local_variables = frame.f_locals\n",
" \n",
" # get all the instances in the previous frame of our class\n",
" instances_of_type = [\n",
" varname for varname, instance in caller_local_variables.items()\n",
" if isinstance(instance, type(self))\n",
" ]\n",
" \n",
" # Walk the AST of the previous frame source and see if\n",
" # open is called with a variable of our type...\n",
" if any(\n",
" _is_open_call_write_with_var(\n",
" n,\n",
" var_names=instances_of_type,\n",
" var_type=type(self)\n",
" ) for n in ast.walk(ast.parse(caller_src))\n",
" ):\n",
" raise Exception(\"DEFINITELY no writing!\")\n",
" \n",
" \n",
" return str(Path(\"hello.txt\").resolve())\n"
]
},
{
"cell_type": "code",
"execution_count": 24,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"95.4 µs ± 1.55 µs per loop (mean ± std. dev. of 7 runs, 10000 loops each)\n"
]
}
],
"source": [
"%%timeit\n",
"\n",
"try:\n",
" with open(Base(),\n",
" mode=\"r+\") as f:\n",
" assert f.read() == \"hi!\\n\"\n",
"except:\n",
" pass"
]
},
{
"cell_type": "code",
"execution_count": 25,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"6.88 ms ± 155 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)\n"
]
}
],
"source": [
"%%timeit\n",
"\n",
"try:\n",
" with open(C1(),\n",
" mode=\"r+\") as f:\n",
" assert f.read() == \"hi!\\n\"\n",
"except:\n",
" pass"
]
},
{
"cell_type": "code",
"execution_count": 26,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"6.65 ms ± 127 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)\n"
]
}
],
"source": [
"%%timeit\n",
"\n",
"try:\n",
" with open(C2(),\n",
" mode=\"r+\") as f:\n",
" assert f.read() == \"hi!\\n\"\n",
"except:\n",
" pass"
]
},
{
"cell_type": "code",
"execution_count": 27,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"783 µs ± 11.1 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)\n"
]
}
],
"source": [
"%%timeit\n",
"\n",
"try:\n",
" with open(C2(method=\"currentframe\"),\n",
" mode=\"r+\") as f:\n",
" assert f.read() == \"hi!\\n\"\n",
"except:\n",
" pass"
]
},
{
"cell_type": "code",
"execution_count": 28,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"777 µs ± 13.7 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)\n"
]
}
],
"source": [
"%%timeit\n",
"\n",
"try:\n",
" with open(C2(method=\"_getframe\"),\n",
" mode=\"r+\") as f:\n",
" assert f.read() == \"hi!\\n\"\n",
"except:\n",
" pass"
]
},
{
"cell_type": "code",
"execution_count": 29,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"589 µs ± 7.71 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)\n"
]
}
],
"source": [
"%%timeit\n",
"\n",
"with open(Path(\"test.txt\"), \"w\") as f:\n",
" f.write(\"\".join([\"a\"] * 50000))"
]
},
{
"cell_type": "code",
"execution_count": 30,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"48.828125\n"
]
}
],
"source": [
"# writing a 48KB file\n",
"print(Path(\"test.txt\").stat().st_size / (1024))"
]
},
{
"cell_type": "code",
"execution_count": 31,
"metadata": {},
"outputs": [],
"source": [
"# reading is faster, so make larger file\n",
"with open(Path(\"test.txt\"), \"w\") as f:\n",
" f.write(\"\".join([\"a\"] * 1_700_000))"
]
},
{
"cell_type": "code",
"execution_count": 32,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"1.621246337890625\n"
]
}
],
"source": [
"# reading a 1.6MB file\n",
"print(Path(\"test.txt\").stat().st_size / (1024 ** 2))"
]
},
{
"cell_type": "code",
"execution_count": 33,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"734 µs ± 11.6 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)\n"
]
}
],
"source": [
"%%timeit\n",
"with open(Path(\"test.txt\"), \"r\") as f:\n",
" data = f.read()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Compare with scalene to see where is slow"
]
},
{
"cell_type": "code",
"execution_count": 34,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Scalene extension successfully loaded. Note: Scalene currently only\n",
"supports CPU+GPU profiling inside Jupyter notebooks. For full Scalene\n",
"profiling, use the command line version.\n"
]
}
],
"source": [
"%load_ext scalene"
]
},
{
"cell_type": "code",
"execution_count": 35,
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"<pre style=\"white-space:pre;overflow-x:auto;line-height:normal;font-family:Menlo,'DejaVu Sans Mono',consolas,'Courier New',monospace\"><span style=\"font-style: italic\"> [22]: % of time = 99.87% out of 6.00s. </span>\n",
" ╷ ╷ ╷ ╷ ╷ \n",
" <span style=\"font-weight: bold\"> Line </span>│<span style=\"font-weight: bold\">Time % </span>│<span style=\"font-weight: bold\">Time % </span>│<span style=\"font-weight: bold\">Sys </span>│<span style=\"font-weight: bold\"> </span>│<span style=\"font-weight: bold\"> </span> \n",
" │<span style=\"font-weight: bold\">Python </span>│<span style=\"font-weight: bold\">native </span>│<span style=\"font-weight: bold\">% </span>│<span style=\"font-weight: bold\">[22] </span>│ \n",
"╺━━━━━━┿━━━━━━━┿━━━━━━━┿━━━━┿━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┿━━━╸\n",
" ... │ │ │ │ │ \n",
" 4 │<span style=\"color: #800000; text-decoration-color: #800000; font-weight: bold\"> 89%</span> │<span style=\"color: #800000; text-decoration-color: #800000; font-weight: bold\"> 11%</span> │ │<span style=\"color: #000000; text-decoration-color: #000000; background-color: #f8f8f8\"> inspect</span><span style=\"color: #666666; text-decoration-color: #666666; background-color: #f8f8f8\">.</span><span style=\"color: #000000; text-decoration-color: #000000; background-color: #f8f8f8\">stack()[</span><span style=\"color: #666666; text-decoration-color: #666666; background-color: #f8f8f8\">1</span><span style=\"color: #000000; text-decoration-color: #000000; background-color: #f8f8f8\">]</span><span style=\"color: #666666; text-decoration-color: #666666; background-color: #f8f8f8\">.</span><span style=\"color: #000000; text-decoration-color: #000000; background-color: #f8f8f8\">frame</span><span style=\"background-color: #f8f8f8\"> </span> │ \n",
" ... │ │ │ │ │ \n",
" │ │ │ │ │ \n",
"╶──────┼───────┼───────┼────┼─────────────────────────────────────────────────────────────────────────────────────────────────┼───╴\n",
" │ │ │ │ │<span style=\"font-weight: bold; font-style: italic\">f…</span> \n",
" │ │ │ │ │<span style=\"font-weight: bold; font-style: italic\">s…</span> \n",
" │ │ │ │ │<span style=\"font-weight: bold; font-style: italic\">f…</span> \n",
" │ │ │ │ │<span style=\"font-weight: bold; font-style: italic\">&lt;…</span> \n",
" 2 │<span style=\"color: #800000; text-decoration-color: #800000; font-weight: bold\"> 89%</span> │<span style=\"color: #800000; text-decoration-color: #800000; font-weight: bold\"> 11%</span> │ │<span style=\"color: #000000; text-decoration-color: #000000; background-color: #f8f8f8\">C1</span><span style=\"color: #666666; text-decoration-color: #666666; background-color: #f8f8f8\">.</span><span style=\"color: #000000; text-decoration-color: #000000; background-color: #f8f8f8\">__fspath__</span><span style=\"background-color: #f8f8f8\"> </span> │ \n",
" ╵ ╵ ╵ ╵ ╵ \n",
"</pre>\n"
],
"text/plain": [
"\u001b[3m [22]: % of time = 99.87% out of 6.00s. \u001b[0m\n",
" ╷ ╷ ╷ ╷ ╷ \n",
" \u001b[1m \u001b[0m\u001b[1mLine\u001b[0m\u001b[1m \u001b[0m│\u001b[1mTime %\u001b[0m\u001b[1m \u001b[0m│\u001b[1mTime %\u001b[0m\u001b[1m \u001b[0m│\u001b[1mSys\u001b[0m\u001b[1m \u001b[0m│\u001b[1m \u001b[0m\u001b[1m \u001b[0m│\u001b[1m \u001b[0m\u001b[1m \u001b[0m \n",
" │\u001b[1mPython\u001b[0m\u001b[1m \u001b[0m│\u001b[1mnative\u001b[0m\u001b[1m \u001b[0m│\u001b[1m% \u001b[0m\u001b[1m \u001b[0m│\u001b[1m[22] \u001b[0m\u001b[1m \u001b[0m│ \n",
"╺━━━━━━┿━━━━━━━┿━━━━━━━┿━━━━┿━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┿━━━╸\n",
" ... │ │ │ │ │ \n",
" 4 │\u001b[1;31m 89%\u001b[0m │\u001b[1;31m 11%\u001b[0m │ │\u001b[38;2;0;0;0;48;2;248;248;248m \u001b[0m\u001b[38;2;0;0;0;48;2;248;248;248minspect\u001b[0m\u001b[38;2;102;102;102;48;2;248;248;248m.\u001b[0m\u001b[38;2;0;0;0;48;2;248;248;248mstack\u001b[0m\u001b[38;2;0;0;0;48;2;248;248;248m(\u001b[0m\u001b[38;2;0;0;0;48;2;248;248;248m)\u001b[0m\u001b[38;2;0;0;0;48;2;248;248;248m[\u001b[0m\u001b[38;2;102;102;102;48;2;248;248;248m1\u001b[0m\u001b[38;2;0;0;0;48;2;248;248;248m]\u001b[0m\u001b[38;2;102;102;102;48;2;248;248;248m.\u001b[0m\u001b[38;2;0;0;0;48;2;248;248;248mframe\u001b[0m\u001b[48;2;248;248;248m \u001b[0m │ \n",
" ... │ │ │ │ │ \n",
" │ │ │ │ │ \n",
"╶──────┼───────┼───────┼────┼─────────────────────────────────────────────────────────────────────────────────────────────────┼───╴\n",
" │ │ │ │ │\u001b[1;3mf…\u001b[0m \n",
" │ │ │ │ │\u001b[1;3ms…\u001b[0m \n",
" │ │ │ │ │\u001b[1;3mf…\u001b[0m \n",
" │ │ │ │ │\u001b[1;3m<…\u001b[0m \n",
" 2 │\u001b[1;31m 89%\u001b[0m │\u001b[1;31m 11%\u001b[0m │ │\u001b[38;2;0;0;0;48;2;248;248;248mC1\u001b[0m\u001b[38;2;102;102;102;48;2;248;248;248m.\u001b[0m\u001b[38;2;0;0;0;48;2;248;248;248m__fspath__\u001b[0m\u001b[48;2;248;248;248m \u001b[0m │ \n",
" ╵ ╵ ╵ ╵ ╵ \n"
]
},
"metadata": {},
"output_type": "display_data"
},
{
"data": {
"text/html": [
"<pre style=\"white-space:pre;overflow-x:auto;line-height:normal;font-family:Menlo,'DejaVu Sans Mono',consolas,'Courier New',monospace\">generated by the <a href=\"https://github.com/plasma-umass/scalene\"><span style=\"color: #0000ff; text-decoration-color: #0000ff\">scalene</span></a> profiler \n",
"</pre>\n"
],
"text/plain": [
"generated by the \u001b]8;id=1617518848.389377-908377;https://github.com/plasma-umass/scalene\u001b\\\u001b[94mscalene\u001b[0m\u001b]8;;\u001b\\ profiler \n"
]
},
"metadata": {},
"output_type": "display_data"
}
],
"source": [
"%%scalene --reduced-profile --html\n",
"\n",
"for i in range(1000):\n",
" try:\n",
" with open(C1(),\n",
" mode=\"r+\") as f:\n",
" assert f.read() == \"hi!\\n\"\n",
" except:\n",
" pass"
]
},
{
"cell_type": "code",
"execution_count": 36,
"metadata": {
"scrolled": false
},
"outputs": [
{
"data": {
"text/html": [
"<pre style=\"white-space:pre;overflow-x:auto;line-height:normal;font-family:Menlo,'DejaVu Sans Mono',consolas,'Courier New',monospace\"><span style=\"font-style: italic\"> [23]: % of time = 99.56% out of 6.16s. </span>\n",
" ╷ ╷ ╷ ╷ ╷ \n",
" <span style=\"font-weight: bold\"> Line </span>│<span style=\"font-weight: bold\">Time % </span>│<span style=\"font-weight: bold\">Time % </span>│<span style=\"font-weight: bold\">Sys </span>│<span style=\"font-weight: bold\"> </span>│<span style=\"font-weight: bold\"> </span> \n",
" │<span style=\"font-weight: bold\">Python </span>│<span style=\"font-weight: bold\">native </span>│<span style=\"font-weight: bold\">% </span>│<span style=\"font-weight: bold\">[23] </span>│ \n",
"╺━━━━━━┿━━━━━━━┿━━━━━━━┿━━━━┿━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┿━━━╸\n",
" ... │ │ │ │ │ \n",
" 10 │<span style=\"color: #800000; text-decoration-color: #800000; font-weight: bold\"> 79%</span> │<span style=\"color: #800000; text-decoration-color: #800000; font-weight: bold\"> 10%</span> │ │<span style=\"color: #000000; text-decoration-color: #000000; background-color: #f8f8f8\"> frame </span><span style=\"color: #666666; text-decoration-color: #666666; background-color: #f8f8f8\">=</span><span style=\"color: #000000; text-decoration-color: #000000; background-color: #f8f8f8\"> inspect</span><span style=\"color: #666666; text-decoration-color: #666666; background-color: #f8f8f8\">.</span><span style=\"color: #000000; text-decoration-color: #000000; background-color: #f8f8f8\">stack()[</span><span style=\"color: #666666; text-decoration-color: #666666; background-color: #f8f8f8\">1</span><span style=\"color: #000000; text-decoration-color: #000000; background-color: #f8f8f8\">]</span><span style=\"color: #666666; text-decoration-color: #666666; background-color: #f8f8f8\">.</span><span style=\"color: #000000; text-decoration-color: #000000; background-color: #f8f8f8\">frame</span><span style=\"background-color: #f8f8f8\"> </span> │ \n",
" ... │ │ │ │ │ \n",
" 17 │ 7% │ │ │<span style=\"color: #000000; text-decoration-color: #000000; background-color: #f8f8f8\"> caller_src </span><span style=\"color: #666666; text-decoration-color: #666666; background-color: #f8f8f8\">=</span><span style=\"color: #000000; text-decoration-color: #000000; background-color: #f8f8f8\"> inspect</span><span style=\"color: #666666; text-decoration-color: #666666; background-color: #f8f8f8\">.</span><span style=\"color: #000000; text-decoration-color: #000000; background-color: #f8f8f8\">getsource(frame)</span><span style=\"background-color: #f8f8f8\"> </span> │ \n",
" ... │ │ │ │ │ \n",
" 35 │ 2% │ │ │<span style=\"color: #000000; text-decoration-color: #000000; background-color: #f8f8f8\"> ) </span><span style=\"color: #008000; text-decoration-color: #008000; background-color: #f8f8f8; font-weight: bold\">for</span><span style=\"color: #000000; text-decoration-color: #000000; background-color: #f8f8f8\"> n </span><span style=\"color: #aa22ff; text-decoration-color: #aa22ff; background-color: #f8f8f8; font-weight: bold\">in</span><span style=\"color: #000000; text-decoration-color: #000000; background-color: #f8f8f8\"> ast</span><span style=\"color: #666666; text-decoration-color: #666666; background-color: #f8f8f8\">.</span><span style=\"color: #000000; text-decoration-color: #000000; background-color: #f8f8f8\">walk(ast</span><span style=\"color: #666666; text-decoration-color: #666666; background-color: #f8f8f8\">.</span><span style=\"color: #000000; text-decoration-color: #000000; background-color: #f8f8f8\">parse(caller_src))</span><span style=\"background-color: #f8f8f8\"> </span> │ \n",
" ... │ │ │ │ │ \n",
" │ │ │ │ │ \n",
"╶──────┼───────┼───────┼────┼─────────────────────────────────────────────────────────────────────────────────────────────────┼───╴\n",
" │ │ │ │ │<span style=\"font-weight: bold; font-style: italic\">f…</span> \n",
" │ │ │ │ │<span style=\"font-weight: bold; font-style: italic\">s…</span> \n",
" │ │ │ │ │<span style=\"font-weight: bold; font-style: italic\">f…</span> \n",
" │ │ │ │ │<span style=\"font-weight: bold; font-style: italic\">&lt;…</span> \n",
" 7 │<span style=\"color: #800000; text-decoration-color: #800000; font-weight: bold\"> 88%</span> │<span style=\"color: #800000; text-decoration-color: #800000; font-weight: bold\"> 11%</span> │ │<span style=\"color: #000000; text-decoration-color: #000000; background-color: #f8f8f8\">C2</span><span style=\"color: #666666; text-decoration-color: #666666; background-color: #f8f8f8\">.</span><span style=\"color: #000000; text-decoration-color: #000000; background-color: #f8f8f8\">__fspath__</span><span style=\"background-color: #f8f8f8\"> </span> │ \n",
" ╵ ╵ ╵ ╵ ╵ \n",
"</pre>\n"
],
"text/plain": [
"\u001b[3m [23]: % of time = 99.56% out of 6.16s. \u001b[0m\n",
" ╷ ╷ ╷ ╷ ╷ \n",
" \u001b[1m \u001b[0m\u001b[1mLine\u001b[0m\u001b[1m \u001b[0m│\u001b[1mTime %\u001b[0m\u001b[1m \u001b[0m│\u001b[1mTime %\u001b[0m\u001b[1m \u001b[0m│\u001b[1mSys\u001b[0m\u001b[1m \u001b[0m│\u001b[1m \u001b[0m\u001b[1m \u001b[0m│\u001b[1m \u001b[0m\u001b[1m \u001b[0m \n",
" │\u001b[1mPython\u001b[0m\u001b[1m \u001b[0m│\u001b[1mnative\u001b[0m\u001b[1m \u001b[0m│\u001b[1m% \u001b[0m\u001b[1m \u001b[0m│\u001b[1m[23] \u001b[0m\u001b[1m \u001b[0m│ \n",
"╺━━━━━━┿━━━━━━━┿━━━━━━━┿━━━━┿━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┿━━━╸\n",
" ... │ │ │ │ │ \n",
" 10 │\u001b[1;31m 79%\u001b[0m │\u001b[1;31m 10%\u001b[0m │ │\u001b[38;2;0;0;0;48;2;248;248;248m \u001b[0m\u001b[38;2;0;0;0;48;2;248;248;248mframe\u001b[0m\u001b[38;2;0;0;0;48;2;248;248;248m \u001b[0m\u001b[38;2;102;102;102;48;2;248;248;248m=\u001b[0m\u001b[38;2;0;0;0;48;2;248;248;248m \u001b[0m\u001b[38;2;0;0;0;48;2;248;248;248minspect\u001b[0m\u001b[38;2;102;102;102;48;2;248;248;248m.\u001b[0m\u001b[38;2;0;0;0;48;2;248;248;248mstack\u001b[0m\u001b[38;2;0;0;0;48;2;248;248;248m(\u001b[0m\u001b[38;2;0;0;0;48;2;248;248;248m)\u001b[0m\u001b[38;2;0;0;0;48;2;248;248;248m[\u001b[0m\u001b[38;2;102;102;102;48;2;248;248;248m1\u001b[0m\u001b[38;2;0;0;0;48;2;248;248;248m]\u001b[0m\u001b[38;2;102;102;102;48;2;248;248;248m.\u001b[0m\u001b[38;2;0;0;0;48;2;248;248;248mframe\u001b[0m\u001b[48;2;248;248;248m \u001b[0m │ \n",
" ... │ │ │ │ │ \n",
" 17 │ 7% │ │ │\u001b[38;2;0;0;0;48;2;248;248;248m \u001b[0m\u001b[38;2;0;0;0;48;2;248;248;248mcaller_src\u001b[0m\u001b[38;2;0;0;0;48;2;248;248;248m \u001b[0m\u001b[38;2;102;102;102;48;2;248;248;248m=\u001b[0m\u001b[38;2;0;0;0;48;2;248;248;248m \u001b[0m\u001b[38;2;0;0;0;48;2;248;248;248minspect\u001b[0m\u001b[38;2;102;102;102;48;2;248;248;248m.\u001b[0m\u001b[38;2;0;0;0;48;2;248;248;248mgetsource\u001b[0m\u001b[38;2;0;0;0;48;2;248;248;248m(\u001b[0m\u001b[38;2;0;0;0;48;2;248;248;248mframe\u001b[0m\u001b[38;2;0;0;0;48;2;248;248;248m)\u001b[0m\u001b[48;2;248;248;248m \u001b[0m │ \n",
" ... │ │ │ │ │ \n",
" 35 │ 2% │ │ │\u001b[38;2;0;0;0;48;2;248;248;248m \u001b[0m\u001b[38;2;0;0;0;48;2;248;248;248m)\u001b[0m\u001b[38;2;0;0;0;48;2;248;248;248m \u001b[0m\u001b[1;38;2;0;128;0;48;2;248;248;248mfor\u001b[0m\u001b[38;2;0;0;0;48;2;248;248;248m \u001b[0m\u001b[38;2;0;0;0;48;2;248;248;248mn\u001b[0m\u001b[38;2;0;0;0;48;2;248;248;248m \u001b[0m\u001b[1;38;2;170;34;255;48;2;248;248;248min\u001b[0m\u001b[38;2;0;0;0;48;2;248;248;248m \u001b[0m\u001b[38;2;0;0;0;48;2;248;248;248mast\u001b[0m\u001b[38;2;102;102;102;48;2;248;248;248m.\u001b[0m\u001b[38;2;0;0;0;48;2;248;248;248mwalk\u001b[0m\u001b[38;2;0;0;0;48;2;248;248;248m(\u001b[0m\u001b[38;2;0;0;0;48;2;248;248;248mast\u001b[0m\u001b[38;2;102;102;102;48;2;248;248;248m.\u001b[0m\u001b[38;2;0;0;0;48;2;248;248;248mparse\u001b[0m\u001b[38;2;0;0;0;48;2;248;248;248m(\u001b[0m\u001b[38;2;0;0;0;48;2;248;248;248mcaller_src\u001b[0m\u001b[38;2;0;0;0;48;2;248;248;248m)\u001b[0m\u001b[38;2;0;0;0;48;2;248;248;248m)\u001b[0m\u001b[48;2;248;248;248m \u001b[0m │ \n",
" ... │ │ │ │ │ \n",
" │ │ │ │ │ \n",
"╶──────┼───────┼───────┼────┼─────────────────────────────────────────────────────────────────────────────────────────────────┼───╴\n",
" │ │ │ │ │\u001b[1;3mf…\u001b[0m \n",
" │ │ │ │ │\u001b[1;3ms…\u001b[0m \n",
" │ │ │ │ │\u001b[1;3mf…\u001b[0m \n",
" │ │ │ │ │\u001b[1;3m<…\u001b[0m \n",
" 7 │\u001b[1;31m 88%\u001b[0m │\u001b[1;31m 11%\u001b[0m │ │\u001b[38;2;0;0;0;48;2;248;248;248mC2\u001b[0m\u001b[38;2;102;102;102;48;2;248;248;248m.\u001b[0m\u001b[38;2;0;0;0;48;2;248;248;248m__fspath__\u001b[0m\u001b[48;2;248;248;248m \u001b[0m │ \n",
" ╵ ╵ ╵ ╵ ╵ \n"
]
},
"metadata": {},
"output_type": "display_data"
},
{
"data": {
"text/html": [
"<pre style=\"white-space:pre;overflow-x:auto;line-height:normal;font-family:Menlo,'DejaVu Sans Mono',consolas,'Courier New',monospace\">generated by the <a href=\"https://github.com/plasma-umass/scalene\"><span style=\"color: #0000ff; text-decoration-color: #0000ff\">scalene</span></a> profiler \n",
"</pre>\n"
],
"text/plain": [
"generated by the \u001b]8;id=1617518854.577183-510445;https://github.com/plasma-umass/scalene\u001b\\\u001b[94mscalene\u001b[0m\u001b]8;;\u001b\\ profiler \n"
]
},
"metadata": {},
"output_type": "display_data"
}
],
"source": [
"%%scalene --reduced-profile --html\n",
"\n",
"for i in range(1000):\n",
" try:\n",
" with open(C2(),\n",
" mode=\"r+\") as f:\n",
" assert f.read() == \"hi!\\n\"\n",
" except:\n",
" pass"
]
},
{
"cell_type": "code",
"execution_count": 37,
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"<pre style=\"white-space:pre;overflow-x:auto;line-height:normal;font-family:Menlo,'DejaVu Sans Mono',consolas,'Courier New',monospace\"><span style=\"font-style: italic\"> [23]: % of time = 97.30% out of 0.55s. </span>\n",
" ╷ ╷ ╷ ╷ ╷ \n",
" <span style=\"font-weight: bold\"> Line </span>│<span style=\"font-weight: bold\">Time % </span>│<span style=\"font-weight: bold\">Time % </span>│<span style=\"font-weight: bold\">Sys </span>│<span style=\"font-weight: bold\"> </span>│<span style=\"font-weight: bold\"> </span> \n",
" │<span style=\"font-weight: bold\">Python </span>│<span style=\"font-weight: bold\">native </span>│<span style=\"font-weight: bold\">% </span>│<span style=\"font-weight: bold\">[23] </span>│ \n",
"╺━━━━━━┿━━━━━━━┿━━━━━━━┿━━━━━┿━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┿━━╸\n",
" ... │ │ │ │ │ \n",
" 17 │<span style=\"color: #800000; text-decoration-color: #800000; font-weight: bold\"> 76%</span> │<span style=\"color: #800000; text-decoration-color: #800000; font-weight: bold\"> 10%</span> │ 2% │<span style=\"color: #000000; text-decoration-color: #000000; background-color: #f8f8f8\"> caller_src </span><span style=\"color: #666666; text-decoration-color: #666666; background-color: #f8f8f8\">=</span><span style=\"color: #000000; text-decoration-color: #000000; background-color: #f8f8f8\"> inspect</span><span style=\"color: #666666; text-decoration-color: #666666; background-color: #f8f8f8\">.</span><span style=\"color: #000000; text-decoration-color: #000000; background-color: #f8f8f8\">getsource(frame)</span><span style=\"background-color: #f8f8f8\"> </span> │ \n",
" ... │ │ │ │ │ \n",
" 24 │ 2% │ │ │<span style=\"color: #000000; text-decoration-color: #000000; background-color: #f8f8f8\"> varname </span><span style=\"color: #008000; text-decoration-color: #008000; background-color: #f8f8f8; font-weight: bold\">for</span><span style=\"color: #000000; text-decoration-color: #000000; background-color: #f8f8f8\"> varname, instance </span><span style=\"color: #aa22ff; text-decoration-color: #aa22ff; background-color: #f8f8f8; font-weight: bold\">in</span><span style=\"color: #000000; text-decoration-color: #000000; background-color: #f8f8f8\"> caller_local_variables</span><span style=\"color: #666666; text-decoration-color: #666666; background-color: #f8f8f8\">.</span><span style=\"color: #000000; text-decoration-color: #000000; background-color: #f8f8f8\">items()</span><span style=\"background-color: #f8f8f8\"> </span> │ \n",
" 25 │ 3% │ │ │<span style=\"color: #000000; text-decoration-color: #000000; background-color: #f8f8f8\"> </span><span style=\"color: #008000; text-decoration-color: #008000; background-color: #f8f8f8; font-weight: bold\">if</span><span style=\"color: #000000; text-decoration-color: #000000; background-color: #f8f8f8\"> </span><span style=\"color: #008000; text-decoration-color: #008000; background-color: #f8f8f8\">isinstance</span><span style=\"color: #000000; text-decoration-color: #000000; background-color: #f8f8f8\">(instance, </span><span style=\"color: #008000; text-decoration-color: #008000; background-color: #f8f8f8\">type</span><span style=\"color: #000000; text-decoration-color: #000000; background-color: #f8f8f8\">(</span><span style=\"color: #008000; text-decoration-color: #008000; background-color: #f8f8f8\">self</span><span style=\"color: #000000; text-decoration-color: #000000; background-color: #f8f8f8\">))</span><span style=\"background-color: #f8f8f8\"> </span> │ \n",
" ... │ │ │ │ │ \n",
" 31 │ 4% │ │ │<span style=\"color: #000000; text-decoration-color: #000000; background-color: #f8f8f8\"> _is_open_call_write_with_var(</span><span style=\"background-color: #f8f8f8\"> </span> │ \n",
" ... │ │ │ │ │ \n",
" 35 │ 2% │ │ │<span style=\"color: #000000; text-decoration-color: #000000; background-color: #f8f8f8\"> ) </span><span style=\"color: #008000; text-decoration-color: #008000; background-color: #f8f8f8; font-weight: bold\">for</span><span style=\"color: #000000; text-decoration-color: #000000; background-color: #f8f8f8\"> n </span><span style=\"color: #aa22ff; text-decoration-color: #aa22ff; background-color: #f8f8f8; font-weight: bold\">in</span><span style=\"color: #000000; text-decoration-color: #000000; background-color: #f8f8f8\"> ast</span><span style=\"color: #666666; text-decoration-color: #666666; background-color: #f8f8f8\">.</span><span style=\"color: #000000; text-decoration-color: #000000; background-color: #f8f8f8\">walk(ast</span><span style=\"color: #666666; text-decoration-color: #666666; background-color: #f8f8f8\">.</span><span style=\"color: #000000; text-decoration-color: #000000; background-color: #f8f8f8\">parse(caller_src))</span><span style=\"background-color: #f8f8f8\"> </span> │ \n",
" ... │ │ │ │ │ \n",
" │ │ │ │ │ \n",
"╶──────┼───────┼───────┼─────┼─────────────────────────────────────────────────────────────────────────────────────────────────┼──╴\n",
" │ │ │ │ │<span style=\"font-weight: bold; font-style: italic\">…</span> \n",
" │ │ │ │ │<span style=\"font-weight: bold; font-style: italic\">…</span> \n",
" │ │ │ │ │<span style=\"font-weight: bold; font-style: italic\">…</span> \n",
" │ │ │ │ │<span style=\"font-weight: bold; font-style: italic\">…</span> \n",
" 7 │<span style=\"color: #800000; text-decoration-color: #800000; font-weight: bold\"> 86%</span> │<span style=\"color: #800000; text-decoration-color: #800000; font-weight: bold\"> 11%</span> │ 2% │<span style=\"color: #000000; text-decoration-color: #000000; background-color: #f8f8f8\">C2</span><span style=\"color: #666666; text-decoration-color: #666666; background-color: #f8f8f8\">.</span><span style=\"color: #000000; text-decoration-color: #000000; background-color: #f8f8f8\">__fspath__</span><span style=\"background-color: #f8f8f8\"> </span> │ \n",
" ╵ ╵ ╵ ╵ ╵ \n",
"</pre>\n"
],
"text/plain": [
"\u001b[3m [23]: % of time = 97.30% out of 0.55s. \u001b[0m\n",
" ╷ ╷ ╷ ╷ ╷ \n",
" \u001b[1m \u001b[0m\u001b[1mLine\u001b[0m\u001b[1m \u001b[0m│\u001b[1mTime %\u001b[0m\u001b[1m \u001b[0m│\u001b[1mTime %\u001b[0m\u001b[1m \u001b[0m│\u001b[1mSys \u001b[0m\u001b[1m \u001b[0m│\u001b[1m \u001b[0m\u001b[1m \u001b[0m│\u001b[1m \u001b[0m\u001b[1m \u001b[0m \n",
" │\u001b[1mPython\u001b[0m\u001b[1m \u001b[0m│\u001b[1mnative\u001b[0m\u001b[1m \u001b[0m│\u001b[1m% \u001b[0m\u001b[1m \u001b[0m│\u001b[1m[23] \u001b[0m\u001b[1m \u001b[0m│ \n",
"╺━━━━━━┿━━━━━━━┿━━━━━━━┿━━━━━┿━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┿━━╸\n",
" ... │ │ │ │ │ \n",
" 17 │\u001b[1;31m 76%\u001b[0m │\u001b[1;31m 10%\u001b[0m │ 2% │\u001b[38;2;0;0;0;48;2;248;248;248m \u001b[0m\u001b[38;2;0;0;0;48;2;248;248;248mcaller_src\u001b[0m\u001b[38;2;0;0;0;48;2;248;248;248m \u001b[0m\u001b[38;2;102;102;102;48;2;248;248;248m=\u001b[0m\u001b[38;2;0;0;0;48;2;248;248;248m \u001b[0m\u001b[38;2;0;0;0;48;2;248;248;248minspect\u001b[0m\u001b[38;2;102;102;102;48;2;248;248;248m.\u001b[0m\u001b[38;2;0;0;0;48;2;248;248;248mgetsource\u001b[0m\u001b[38;2;0;0;0;48;2;248;248;248m(\u001b[0m\u001b[38;2;0;0;0;48;2;248;248;248mframe\u001b[0m\u001b[38;2;0;0;0;48;2;248;248;248m)\u001b[0m\u001b[48;2;248;248;248m \u001b[0m │ \n",
" ... │ │ │ │ │ \n",
" 24 │ 2% │ │ │\u001b[38;2;0;0;0;48;2;248;248;248m \u001b[0m\u001b[38;2;0;0;0;48;2;248;248;248mvarname\u001b[0m\u001b[38;2;0;0;0;48;2;248;248;248m \u001b[0m\u001b[1;38;2;0;128;0;48;2;248;248;248mfor\u001b[0m\u001b[38;2;0;0;0;48;2;248;248;248m \u001b[0m\u001b[38;2;0;0;0;48;2;248;248;248mvarname\u001b[0m\u001b[38;2;0;0;0;48;2;248;248;248m,\u001b[0m\u001b[38;2;0;0;0;48;2;248;248;248m \u001b[0m\u001b[38;2;0;0;0;48;2;248;248;248minstance\u001b[0m\u001b[38;2;0;0;0;48;2;248;248;248m \u001b[0m\u001b[1;38;2;170;34;255;48;2;248;248;248min\u001b[0m\u001b[38;2;0;0;0;48;2;248;248;248m \u001b[0m\u001b[38;2;0;0;0;48;2;248;248;248mcaller_local_variables\u001b[0m\u001b[38;2;102;102;102;48;2;248;248;248m.\u001b[0m\u001b[38;2;0;0;0;48;2;248;248;248mitems\u001b[0m\u001b[38;2;0;0;0;48;2;248;248;248m(\u001b[0m\u001b[38;2;0;0;0;48;2;248;248;248m)\u001b[0m\u001b[48;2;248;248;248m \u001b[0m │ \n",
" 25 │ 3% │ │ │\u001b[38;2;0;0;0;48;2;248;248;248m \u001b[0m\u001b[1;38;2;0;128;0;48;2;248;248;248mif\u001b[0m\u001b[38;2;0;0;0;48;2;248;248;248m \u001b[0m\u001b[38;2;0;128;0;48;2;248;248;248misinstance\u001b[0m\u001b[38;2;0;0;0;48;2;248;248;248m(\u001b[0m\u001b[38;2;0;0;0;48;2;248;248;248minstance\u001b[0m\u001b[38;2;0;0;0;48;2;248;248;248m,\u001b[0m\u001b[38;2;0;0;0;48;2;248;248;248m \u001b[0m\u001b[38;2;0;128;0;48;2;248;248;248mtype\u001b[0m\u001b[38;2;0;0;0;48;2;248;248;248m(\u001b[0m\u001b[38;2;0;128;0;48;2;248;248;248mself\u001b[0m\u001b[38;2;0;0;0;48;2;248;248;248m)\u001b[0m\u001b[38;2;0;0;0;48;2;248;248;248m)\u001b[0m\u001b[48;2;248;248;248m \u001b[0m │ \n",
" ... │ │ │ │ │ \n",
" 31 │ 4% │ │ │\u001b[38;2;0;0;0;48;2;248;248;248m \u001b[0m\u001b[38;2;0;0;0;48;2;248;248;248m_is_open_call_write_with_var\u001b[0m\u001b[38;2;0;0;0;48;2;248;248;248m(\u001b[0m\u001b[48;2;248;248;248m \u001b[0m │ \n",
" ... │ │ │ │ │ \n",
" 35 │ 2% │ │ │\u001b[38;2;0;0;0;48;2;248;248;248m \u001b[0m\u001b[38;2;0;0;0;48;2;248;248;248m)\u001b[0m\u001b[38;2;0;0;0;48;2;248;248;248m \u001b[0m\u001b[1;38;2;0;128;0;48;2;248;248;248mfor\u001b[0m\u001b[38;2;0;0;0;48;2;248;248;248m \u001b[0m\u001b[38;2;0;0;0;48;2;248;248;248mn\u001b[0m\u001b[38;2;0;0;0;48;2;248;248;248m \u001b[0m\u001b[1;38;2;170;34;255;48;2;248;248;248min\u001b[0m\u001b[38;2;0;0;0;48;2;248;248;248m \u001b[0m\u001b[38;2;0;0;0;48;2;248;248;248mast\u001b[0m\u001b[38;2;102;102;102;48;2;248;248;248m.\u001b[0m\u001b[38;2;0;0;0;48;2;248;248;248mwalk\u001b[0m\u001b[38;2;0;0;0;48;2;248;248;248m(\u001b[0m\u001b[38;2;0;0;0;48;2;248;248;248mast\u001b[0m\u001b[38;2;102;102;102;48;2;248;248;248m.\u001b[0m\u001b[38;2;0;0;0;48;2;248;248;248mparse\u001b[0m\u001b[38;2;0;0;0;48;2;248;248;248m(\u001b[0m\u001b[38;2;0;0;0;48;2;248;248;248mcaller_src\u001b[0m\u001b[38;2;0;0;0;48;2;248;248;248m)\u001b[0m\u001b[38;2;0;0;0;48;2;248;248;248m)\u001b[0m\u001b[48;2;248;248;248m \u001b[0m │ \n",
" ... │ │ │ │ │ \n",
" │ │ │ │ │ \n",
"╶──────┼───────┼───────┼─────┼─────────────────────────────────────────────────────────────────────────────────────────────────┼──╴\n",
" │ │ │ │ │\u001b[1;3m…\u001b[0m \n",
" │ │ │ │ │\u001b[1;3m…\u001b[0m \n",
" │ │ │ │ │\u001b[1;3m…\u001b[0m \n",
" │ │ │ │ │\u001b[1;3m…\u001b[0m \n",
" 7 │\u001b[1;31m 86%\u001b[0m │\u001b[1;31m 11%\u001b[0m │ 2% │\u001b[38;2;0;0;0;48;2;248;248;248mC2\u001b[0m\u001b[38;2;102;102;102;48;2;248;248;248m.\u001b[0m\u001b[38;2;0;0;0;48;2;248;248;248m__fspath__\u001b[0m\u001b[48;2;248;248;248m \u001b[0m │ \n",
" ╵ ╵ ╵ ╵ ╵ \n"
]
},
"metadata": {},
"output_type": "display_data"
},
{
"data": {
"text/html": [
"<pre style=\"white-space:pre;overflow-x:auto;line-height:normal;font-family:Menlo,'DejaVu Sans Mono',consolas,'Courier New',monospace\"><span style=\"font-style: italic\"> [13]: % of time = 2.70% out of 0.55s. </span>\n",
" ╷ ╷ ╷ ╷ ╷ \n",
" <span style=\"font-weight: bold\"> Line </span>│<span style=\"font-weight: bold\">Time % </span>│<span style=\"font-weight: bold\">Time % </span>│<span style=\"font-weight: bold\">Sys </span>│<span style=\"font-weight: bold\"> </span>│<span style=\"font-weight: bold\"> </span> \n",
" │<span style=\"font-weight: bold\">Python </span>│<span style=\"font-weight: bold\">native </span>│<span style=\"font-weight: bold\">% </span>│<span style=\"font-weight: bold\">[13] </span>│ \n",
"╺━━━━━━┿━━━━━━━┿━━━━━━━┿━━━━┿━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┿━━━╸\n",
" ... │ │ │ │ │ \n",
" 13 │ 2% │ │ │<span style=\"color: #000000; text-decoration-color: #000000; background-color: #f8f8f8\"> </span><span style=\"color: #008000; text-decoration-color: #008000; background-color: #f8f8f8; font-weight: bold\">if</span><span style=\"color: #000000; text-decoration-color: #000000; background-color: #f8f8f8\"> </span><span style=\"color: #aa22ff; text-decoration-color: #aa22ff; background-color: #f8f8f8; font-weight: bold\">not</span><span style=\"color: #000000; text-decoration-color: #000000; background-color: #f8f8f8\"> </span><span style=\"color: #008000; text-decoration-color: #008000; background-color: #f8f8f8\">isinstance</span><span style=\"color: #000000; text-decoration-color: #000000; background-color: #f8f8f8\">(ast_node, ast</span><span style=\"color: #666666; text-decoration-color: #666666; background-color: #f8f8f8\">.</span><span style=\"color: #000000; text-decoration-color: #000000; background-color: #f8f8f8\">Call):</span><span style=\"background-color: #f8f8f8\"> </span> │ \n",
" ... │ │ │ │ │ \n",
" │ │ │ │ │ \n",
"╶──────┼───────┼───────┼────┼─────────────────────────────────────────────────────────────────────────────────────────────────┼───╴\n",
" │ │ │ │ │<span style=\"font-weight: bold; font-style: italic\">f…</span> \n",
" │ │ │ │ │<span style=\"font-weight: bold; font-style: italic\">s…</span> \n",
" │ │ │ │ │<span style=\"font-weight: bold; font-style: italic\">f…</span> \n",
" │ │ │ │ │<span style=\"font-weight: bold; font-style: italic\">&lt;…</span> \n",
" 5 │ 2% │ │ │<span style=\"color: #000000; text-decoration-color: #000000; background-color: #f8f8f8\">C2</span><span style=\"color: #666666; text-decoration-color: #666666; background-color: #f8f8f8\">.</span><span style=\"color: #000000; text-decoration-color: #000000; background-color: #f8f8f8\">_is_open_call_write_with_var</span><span style=\"background-color: #f8f8f8\"> </span> │ \n",
" ╵ ╵ ╵ ╵ ╵ \n",
"</pre>\n"
],
"text/plain": [
"\u001b[3m [13]: % of time = 2.70% out of 0.55s. \u001b[0m\n",
" ╷ ╷ ╷ ╷ ╷ \n",
" \u001b[1m \u001b[0m\u001b[1mLine\u001b[0m\u001b[1m \u001b[0m│\u001b[1mTime %\u001b[0m\u001b[1m \u001b[0m│\u001b[1mTime %\u001b[0m\u001b[1m \u001b[0m│\u001b[1mSys\u001b[0m\u001b[1m \u001b[0m│\u001b[1m \u001b[0m\u001b[1m \u001b[0m│\u001b[1m \u001b[0m\u001b[1m \u001b[0m \n",
" │\u001b[1mPython\u001b[0m\u001b[1m \u001b[0m│\u001b[1mnative\u001b[0m\u001b[1m \u001b[0m│\u001b[1m% \u001b[0m\u001b[1m \u001b[0m│\u001b[1m[13] \u001b[0m\u001b[1m \u001b[0m│ \n",
"╺━━━━━━┿━━━━━━━┿━━━━━━━┿━━━━┿━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┿━━━╸\n",
" ... │ │ │ │ │ \n",
" 13 │ 2% │ │ │\u001b[38;2;0;0;0;48;2;248;248;248m \u001b[0m\u001b[1;38;2;0;128;0;48;2;248;248;248mif\u001b[0m\u001b[38;2;0;0;0;48;2;248;248;248m \u001b[0m\u001b[1;38;2;170;34;255;48;2;248;248;248mnot\u001b[0m\u001b[38;2;0;0;0;48;2;248;248;248m \u001b[0m\u001b[38;2;0;128;0;48;2;248;248;248misinstance\u001b[0m\u001b[38;2;0;0;0;48;2;248;248;248m(\u001b[0m\u001b[38;2;0;0;0;48;2;248;248;248mast_node\u001b[0m\u001b[38;2;0;0;0;48;2;248;248;248m,\u001b[0m\u001b[38;2;0;0;0;48;2;248;248;248m \u001b[0m\u001b[38;2;0;0;0;48;2;248;248;248mast\u001b[0m\u001b[38;2;102;102;102;48;2;248;248;248m.\u001b[0m\u001b[38;2;0;0;0;48;2;248;248;248mCall\u001b[0m\u001b[38;2;0;0;0;48;2;248;248;248m)\u001b[0m\u001b[38;2;0;0;0;48;2;248;248;248m:\u001b[0m\u001b[48;2;248;248;248m \u001b[0m │ \n",
" ... │ │ │ │ │ \n",
" │ │ │ │ │ \n",
"╶──────┼───────┼───────┼────┼─────────────────────────────────────────────────────────────────────────────────────────────────┼───╴\n",
" │ │ │ │ │\u001b[1;3mf…\u001b[0m \n",
" │ │ │ │ │\u001b[1;3ms…\u001b[0m \n",
" │ │ │ │ │\u001b[1;3mf…\u001b[0m \n",
" │ │ │ │ │\u001b[1;3m<…\u001b[0m \n",
" 5 │ 2% │ │ │\u001b[38;2;0;0;0;48;2;248;248;248mC2\u001b[0m\u001b[38;2;102;102;102;48;2;248;248;248m.\u001b[0m\u001b[38;2;0;0;0;48;2;248;248;248m_is_open_call_write_with_var\u001b[0m\u001b[48;2;248;248;248m \u001b[0m │ \n",
" ╵ ╵ ╵ ╵ ╵ \n"
]
},
"metadata": {},
"output_type": "display_data"
},
{
"data": {
"text/html": [
"<pre style=\"white-space:pre;overflow-x:auto;line-height:normal;font-family:Menlo,'DejaVu Sans Mono',consolas,'Courier New',monospace\">generated by the <a href=\"https://github.com/plasma-umass/scalene\"><span style=\"color: #0000ff; text-decoration-color: #0000ff\">scalene</span></a> profiler \n",
"</pre>\n"
],
"text/plain": [
"generated by the \u001b]8;id=1617518855.1870432-91386;https://github.com/plasma-umass/scalene\u001b\\\u001b[94mscalene\u001b[0m\u001b]8;;\u001b\\ profiler \n"
]
},
"metadata": {},
"output_type": "display_data"
}
],
"source": [
"%%scalene --reduced-profile --html\n",
"\n",
"for i in range(1000):\n",
" try:\n",
" with open(C2(method=\"currentfrmae\"),\n",
" mode=\"r+\") as f:\n",
" assert f.read() == \"hi!\\n\"\n",
" except:\n",
" pass"
]
},
{
"cell_type": "code",
"execution_count": 38,
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"<pre style=\"white-space:pre;overflow-x:auto;line-height:normal;font-family:Menlo,'DejaVu Sans Mono',consolas,'Courier New',monospace\"><span style=\"font-style: italic\"> [23]: % of time = 100.00% out of 0.56s. </span>\n",
" ╷ ╷ ╷ ╷ ╷ \n",
" <span style=\"font-weight: bold\"> Line </span>│<span style=\"font-weight: bold\">Time % </span>│<span style=\"font-weight: bold\">Time % </span>│<span style=\"font-weight: bold\">Sys </span>│<span style=\"font-weight: bold\"> </span>│<span style=\"font-weight: bold\"> </span> \n",
" │<span style=\"font-weight: bold\">Python </span>│<span style=\"font-weight: bold\">native </span>│<span style=\"font-weight: bold\">% </span>│<span style=\"font-weight: bold\">[23] </span>│ \n",
"╺━━━━━━┿━━━━━━━┿━━━━━━━┿━━━━━┿━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┿━━╸\n",
" ... │ │ │ │ │ \n",
" 7 │ 2% │ │ │<span style=\"color: #000000; text-decoration-color: #000000; background-color: #f8f8f8\"> </span><span style=\"color: #008000; text-decoration-color: #008000; background-color: #f8f8f8; font-weight: bold\">def</span><span style=\"color: #000000; text-decoration-color: #000000; background-color: #f8f8f8\"> </span><span style=\"color: #0000ff; text-decoration-color: #0000ff; background-color: #f8f8f8\">__fspath__</span><span style=\"color: #000000; text-decoration-color: #000000; background-color: #f8f8f8\">(</span><span style=\"color: #008000; text-decoration-color: #008000; background-color: #f8f8f8\">self</span><span style=\"color: #000000; text-decoration-color: #000000; background-color: #f8f8f8\">):</span><span style=\"background-color: #f8f8f8\"> </span> │ \n",
" ... │ │ │ │ │ \n",
" 17 │<span style=\"color: #800000; text-decoration-color: #800000; font-weight: bold\"> 72%</span> │<span style=\"color: #800000; text-decoration-color: #800000; font-weight: bold\"> 9%</span> │ 2% │<span style=\"color: #000000; text-decoration-color: #000000; background-color: #f8f8f8\"> caller_src </span><span style=\"color: #666666; text-decoration-color: #666666; background-color: #f8f8f8\">=</span><span style=\"color: #000000; text-decoration-color: #000000; background-color: #f8f8f8\"> inspect</span><span style=\"color: #666666; text-decoration-color: #666666; background-color: #f8f8f8\">.</span><span style=\"color: #000000; text-decoration-color: #000000; background-color: #f8f8f8\">getsource(frame)</span><span style=\"background-color: #f8f8f8\"> </span> │ \n",
" ... │ │ │ │ │ \n",
" 31 │ 1% │ │ │<span style=\"color: #000000; text-decoration-color: #000000; background-color: #f8f8f8\"> _is_open_call_write_with_var(</span><span style=\"background-color: #f8f8f8\"> </span> │ \n",
" ... │ │ │ │ │ \n",
" 35 │ 14% │ 2% │ │<span style=\"color: #000000; text-decoration-color: #000000; background-color: #f8f8f8\"> ) </span><span style=\"color: #008000; text-decoration-color: #008000; background-color: #f8f8f8; font-weight: bold\">for</span><span style=\"color: #000000; text-decoration-color: #000000; background-color: #f8f8f8\"> n </span><span style=\"color: #aa22ff; text-decoration-color: #aa22ff; background-color: #f8f8f8; font-weight: bold\">in</span><span style=\"color: #000000; text-decoration-color: #000000; background-color: #f8f8f8\"> ast</span><span style=\"color: #666666; text-decoration-color: #666666; background-color: #f8f8f8\">.</span><span style=\"color: #000000; text-decoration-color: #000000; background-color: #f8f8f8\">walk(ast</span><span style=\"color: #666666; text-decoration-color: #666666; background-color: #f8f8f8\">.</span><span style=\"color: #000000; text-decoration-color: #000000; background-color: #f8f8f8\">parse(caller_src))</span><span style=\"background-color: #f8f8f8\"> </span> │ \n",
" ... │ │ │ │ │ \n",
" │ │ │ │ │ \n",
"╶──────┼───────┼───────┼─────┼─────────────────────────────────────────────────────────────────────────────────────────────────┼──╴\n",
" │ │ │ │ │<span style=\"font-weight: bold; font-style: italic\">…</span> \n",
" │ │ │ │ │<span style=\"font-weight: bold; font-style: italic\">…</span> \n",
" │ │ │ │ │<span style=\"font-weight: bold; font-style: italic\">…</span> \n",
" │ │ │ │ │<span style=\"font-weight: bold; font-style: italic\">…</span> \n",
" 7 │<span style=\"color: #800000; text-decoration-color: #800000; font-weight: bold\"> 89%</span> │<span style=\"color: #800000; text-decoration-color: #800000; font-weight: bold\"> 11%</span> │ 2% │<span style=\"color: #000000; text-decoration-color: #000000; background-color: #f8f8f8\">C2</span><span style=\"color: #666666; text-decoration-color: #666666; background-color: #f8f8f8\">.</span><span style=\"color: #000000; text-decoration-color: #000000; background-color: #f8f8f8\">__fspath__</span><span style=\"background-color: #f8f8f8\"> </span> │ \n",
" ╵ ╵ ╵ ╵ ╵ \n",
"</pre>\n"
],
"text/plain": [
"\u001b[3m [23]: % of time = 100.00% out of 0.56s. \u001b[0m\n",
" ╷ ╷ ╷ ╷ ╷ \n",
" \u001b[1m \u001b[0m\u001b[1mLine\u001b[0m\u001b[1m \u001b[0m│\u001b[1mTime %\u001b[0m\u001b[1m \u001b[0m│\u001b[1mTime %\u001b[0m\u001b[1m \u001b[0m│\u001b[1mSys \u001b[0m\u001b[1m \u001b[0m│\u001b[1m \u001b[0m\u001b[1m \u001b[0m│\u001b[1m \u001b[0m\u001b[1m \u001b[0m \n",
" │\u001b[1mPython\u001b[0m\u001b[1m \u001b[0m│\u001b[1mnative\u001b[0m\u001b[1m \u001b[0m│\u001b[1m% \u001b[0m\u001b[1m \u001b[0m│\u001b[1m[23] \u001b[0m\u001b[1m \u001b[0m│ \n",
"╺━━━━━━┿━━━━━━━┿━━━━━━━┿━━━━━┿━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┿━━╸\n",
" ... │ │ │ │ │ \n",
" 7 │ 2% │ │ │\u001b[38;2;0;0;0;48;2;248;248;248m \u001b[0m\u001b[1;38;2;0;128;0;48;2;248;248;248mdef\u001b[0m\u001b[38;2;0;0;0;48;2;248;248;248m \u001b[0m\u001b[38;2;0;0;255;48;2;248;248;248m__fspath__\u001b[0m\u001b[38;2;0;0;0;48;2;248;248;248m(\u001b[0m\u001b[38;2;0;128;0;48;2;248;248;248mself\u001b[0m\u001b[38;2;0;0;0;48;2;248;248;248m)\u001b[0m\u001b[38;2;0;0;0;48;2;248;248;248m:\u001b[0m\u001b[48;2;248;248;248m \u001b[0m │ \n",
" ... │ │ │ │ │ \n",
" 17 │\u001b[1;31m 72%\u001b[0m │\u001b[1;31m 9%\u001b[0m │ 2% │\u001b[38;2;0;0;0;48;2;248;248;248m \u001b[0m\u001b[38;2;0;0;0;48;2;248;248;248mcaller_src\u001b[0m\u001b[38;2;0;0;0;48;2;248;248;248m \u001b[0m\u001b[38;2;102;102;102;48;2;248;248;248m=\u001b[0m\u001b[38;2;0;0;0;48;2;248;248;248m \u001b[0m\u001b[38;2;0;0;0;48;2;248;248;248minspect\u001b[0m\u001b[38;2;102;102;102;48;2;248;248;248m.\u001b[0m\u001b[38;2;0;0;0;48;2;248;248;248mgetsource\u001b[0m\u001b[38;2;0;0;0;48;2;248;248;248m(\u001b[0m\u001b[38;2;0;0;0;48;2;248;248;248mframe\u001b[0m\u001b[38;2;0;0;0;48;2;248;248;248m)\u001b[0m\u001b[48;2;248;248;248m \u001b[0m │ \n",
" ... │ │ │ │ │ \n",
" 31 │ 1% │ │ │\u001b[38;2;0;0;0;48;2;248;248;248m \u001b[0m\u001b[38;2;0;0;0;48;2;248;248;248m_is_open_call_write_with_var\u001b[0m\u001b[38;2;0;0;0;48;2;248;248;248m(\u001b[0m\u001b[48;2;248;248;248m \u001b[0m │ \n",
" ... │ │ │ │ │ \n",
" 35 │ 14% │ 2% │ │\u001b[38;2;0;0;0;48;2;248;248;248m \u001b[0m\u001b[38;2;0;0;0;48;2;248;248;248m)\u001b[0m\u001b[38;2;0;0;0;48;2;248;248;248m \u001b[0m\u001b[1;38;2;0;128;0;48;2;248;248;248mfor\u001b[0m\u001b[38;2;0;0;0;48;2;248;248;248m \u001b[0m\u001b[38;2;0;0;0;48;2;248;248;248mn\u001b[0m\u001b[38;2;0;0;0;48;2;248;248;248m \u001b[0m\u001b[1;38;2;170;34;255;48;2;248;248;248min\u001b[0m\u001b[38;2;0;0;0;48;2;248;248;248m \u001b[0m\u001b[38;2;0;0;0;48;2;248;248;248mast\u001b[0m\u001b[38;2;102;102;102;48;2;248;248;248m.\u001b[0m\u001b[38;2;0;0;0;48;2;248;248;248mwalk\u001b[0m\u001b[38;2;0;0;0;48;2;248;248;248m(\u001b[0m\u001b[38;2;0;0;0;48;2;248;248;248mast\u001b[0m\u001b[38;2;102;102;102;48;2;248;248;248m.\u001b[0m\u001b[38;2;0;0;0;48;2;248;248;248mparse\u001b[0m\u001b[38;2;0;0;0;48;2;248;248;248m(\u001b[0m\u001b[38;2;0;0;0;48;2;248;248;248mcaller_src\u001b[0m\u001b[38;2;0;0;0;48;2;248;248;248m)\u001b[0m\u001b[38;2;0;0;0;48;2;248;248;248m)\u001b[0m\u001b[48;2;248;248;248m \u001b[0m │ \n",
" ... │ │ │ │ │ \n",
" │ │ │ │ │ \n",
"╶──────┼───────┼───────┼─────┼─────────────────────────────────────────────────────────────────────────────────────────────────┼──╴\n",
" │ │ │ │ │\u001b[1;3m…\u001b[0m \n",
" │ │ │ │ │\u001b[1;3m…\u001b[0m \n",
" │ │ │ │ │\u001b[1;3m…\u001b[0m \n",
" │ │ │ │ │\u001b[1;3m…\u001b[0m \n",
" 7 │\u001b[1;31m 89%\u001b[0m │\u001b[1;31m 11%\u001b[0m │ 2% │\u001b[38;2;0;0;0;48;2;248;248;248mC2\u001b[0m\u001b[38;2;102;102;102;48;2;248;248;248m.\u001b[0m\u001b[38;2;0;0;0;48;2;248;248;248m__fspath__\u001b[0m\u001b[48;2;248;248;248m \u001b[0m │ \n",
" ╵ ╵ ╵ ╵ ╵ \n"
]
},
"metadata": {},
"output_type": "display_data"
},
{
"data": {
"text/html": [
"<pre style=\"white-space:pre;overflow-x:auto;line-height:normal;font-family:Menlo,'DejaVu Sans Mono',consolas,'Courier New',monospace\">generated by the <a href=\"https://github.com/plasma-umass/scalene\"><span style=\"color: #0000ff; text-decoration-color: #0000ff\">scalene</span></a> profiler \n",
"</pre>\n"
],
"text/plain": [
"generated by the \u001b]8;id=1617518855.7806919-754780;https://github.com/plasma-umass/scalene\u001b\\\u001b[94mscalene\u001b[0m\u001b]8;;\u001b\\ profiler \n"
]
},
"metadata": {},
"output_type": "display_data"
}
],
"source": [
"%%scalene --reduced-profile --html\n",
"\n",
"for i in range(1000):\n",
" try:\n",
" with open(C2(method=\"_getframe\"),\n",
" mode=\"r+\") as f:\n",
" assert f.read() == \"hi!\\n\"\n",
" except:\n",
" pass"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## What if we make the context huge by adding an entire spellchecker based on Peter Norvig's approach?\n",
"\n",
"https://github.com/barrust/pyspellchecker"
]
},
{
"cell_type": "code",
"execution_count": 39,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"160 µs ± 7.37 µs per loop (mean ± std. dev. of 7 runs, 10000 loops each)\n"
]
}
],
"source": [
"%%timeit\n",
"\n",
"\"\"\" Additional utility functions \"\"\"\n",
"import contextlib\n",
"import gzip\n",
"import functools\n",
"import re\n",
"import warnings\n",
"\n",
"\n",
"def fail_after(version):\n",
" \"\"\" Decorator to add to tests to ensure that they fail if a deprecated\n",
" feature is not removed before the specified version\n",
"\n",
" Args:\n",
" version (str): The version to check against \"\"\"\n",
"\n",
" def decorator_wrapper(func):\n",
" @functools.wraps(func)\n",
" def test_inner(*args, **kwargs):\n",
" if [int(x) for x in version.split(\".\")] <= [\n",
" int(x) for x in __version__.split(\".\")\n",
" ]:\n",
" msg = \"The function {} must be fully removed as it is depricated and must be removed by version {}\".format(\n",
" func.__name__, version\n",
" )\n",
" raise AssertionError(msg)\n",
" return func(*args, **kwargs)\n",
"\n",
" return test_inner\n",
"\n",
" return decorator_wrapper\n",
"\n",
"\n",
"def deprecated(message=\"\"):\n",
" \"\"\" A simplistic decorator to mark functions as deprecated. The function\n",
" will pass a message to the user on the first use of the function\n",
"\n",
" Args:\n",
" message (str): The message to display if the function is deprecated\n",
" \"\"\"\n",
"\n",
" def decorator_wrapper(func):\n",
" @functools.wraps(func)\n",
" def function_wrapper(*args, **kwargs):\n",
" func_name = func.__name__\n",
" if func_name not in function_wrapper.deprecated_items:\n",
" msg = \"Function {} is now deprecated! {}\".format(func.__name__, message)\n",
" warnings.warn(msg, category=DeprecationWarning, stacklevel=2)\n",
" function_wrapper.deprecated_items.add(func_name)\n",
"\n",
" return func(*args, **kwargs)\n",
"\n",
" # set this up the first time the decorator is called\n",
" function_wrapper.deprecated_items = set()\n",
"\n",
" return function_wrapper\n",
"\n",
" return decorator_wrapper\n",
"\n",
"\n",
"def ensure_unicode(_str, encoding=\"utf-8\"):\n",
" \"\"\" Simplify checking if passed in data are bytes or a string and decode\n",
" bytes into unicode.\n",
"\n",
" Args:\n",
" _str (str): The input string (possibly bytes)\n",
" encoding (str): The encoding to use if input is bytes\n",
" Returns:\n",
" str: The encoded string\n",
" \"\"\"\n",
" if isinstance(_str, bytes):\n",
" return _str.decode(encoding)\n",
" return _str\n",
"\n",
"\n",
"@contextlib.contextmanager\n",
"def __gzip_read(filename, mode=\"rb\", encoding=\"UTF-8\"):\n",
" \"\"\" Context manager to correctly handle the decoding of the output of \\\n",
" the gzip file\n",
"\n",
" Args:\n",
" filename (str): The filename to open\n",
" mode (str): The mode to read the data\n",
" encoding (str): The file encoding to use\n",
" Yields:\n",
" str: The string data from the gzip file read\n",
" \"\"\"\n",
" with gzip.open(filename, mode=mode, encoding=encoding) as fobj:\n",
" yield fobj.read()\n",
"\n",
"\n",
"@contextlib.contextmanager\n",
"def load_file(filename, encoding):\n",
" \"\"\" Context manager to handle opening a gzip or text file correctly and\n",
" reading all the data\n",
"\n",
" Args:\n",
" filename (str): The filename to open\n",
" encoding (str): The file encoding to use\n",
" Yields:\n",
" str: The string data from the file read\n",
" \"\"\"\n",
" if filename[-3:].lower() == \".gz\":\n",
" with __gzip_read(filename, mode=\"rt\", encoding=encoding) as data:\n",
" yield data\n",
" else:\n",
" with open(filename, mode=\"r\", encoding=encoding) as fobj:\n",
" yield fobj.read()\n",
"\n",
"\n",
"def write_file(filepath, encoding, gzipped, data):\n",
" \"\"\" Write the data to file either as a gzip file or text based on the\n",
" gzipped parameter\n",
"\n",
" Args:\n",
" filepath (str): The filename to open\n",
" encoding (str): The file encoding to use\n",
" gzipped (bool): Whether the file should be gzipped or not\n",
" data (str): The data to be written out\n",
" \"\"\"\n",
" if gzipped:\n",
" with gzip.open(filepath, \"wt\") as fobj:\n",
" fobj.write(data)\n",
" else:\n",
" with open(filepath, \"w\", encoding=encoding) as fobj:\n",
" fobj.write(data)\n",
"\n",
"\n",
"def _parse_into_words(text):\n",
" \"\"\" Parse the text into words; currently removes punctuation except for\n",
" apostrophies.\n",
"\n",
" Args:\n",
" text (str): The text to split into words\n",
" \"\"\"\n",
" # see: https://stackoverflow.com/a/12705513\n",
" return re.findall(r\"(\\w[\\w']*\\w|\\w)\", text)\n",
"\n",
"\n",
"\"\"\" SpellChecker Module; simple, intuitive spell checker based on the post by\n",
" Peter Norvig. See: https://norvig.com/spell-correct.html \"\"\"\n",
"import gzip\n",
"import json\n",
"import pkgutil\n",
"import string\n",
"from collections import Counter\n",
"\n",
"\n",
"class SpellChecker(object):\n",
" \"\"\" The SpellChecker class encapsulates the basics needed to accomplish a\n",
" simple spell checking algorithm. It is based on the work by\n",
" Peter Norvig (https://norvig.com/spell-correct.html)\n",
"\n",
" Args:\n",
" language (str): The language of the dictionary to load or None \\\n",
" for no dictionary. Supported languages are `en`, `es`, `de`, `fr`, \\\n",
" `pt` and `ru`. Defaults to `en`. A list of languages may be \\\n",
" provided and all languages will be loaded.\n",
" local_dictionary (str): The path to a locally stored word \\\n",
" frequency dictionary; if provided, no language will be loaded\n",
" distance (int): The edit distance to use. Defaults to 2.\n",
" case_sensitive (bool): Flag to use a case sensitive dictionary or \\\n",
" not, only available when not using a language dictionary.\n",
" Note:\n",
" Using a case sensitive dictionary can be slow to correct words.\"\"\"\n",
"\n",
" __slots__ = [\"_distance\", \"_word_frequency\", \"_tokenizer\", \"_case_sensitive\"]\n",
"\n",
" def __init__(\n",
" self,\n",
" language=\"en\",\n",
" local_dictionary=None,\n",
" distance=2,\n",
" tokenizer=None,\n",
" case_sensitive=False,\n",
" ):\n",
" self._distance = None\n",
" self.distance = distance # use the setter value check\n",
"\n",
" self._tokenizer = _parse_into_words\n",
" if tokenizer is not None:\n",
" self._tokenizer = tokenizer\n",
"\n",
" self._case_sensitive = case_sensitive if not language else False\n",
" self._word_frequency = WordFrequency(self._tokenizer, self._case_sensitive)\n",
"\n",
" if local_dictionary:\n",
" self._word_frequency.load_dictionary(local_dictionary)\n",
" elif language:\n",
" if not isinstance(language, list):\n",
" language = [language]\n",
" for lang in language:\n",
" filename = \"resources/{}.json.gz\".format(lang.lower())\n",
" try:\n",
" json_open = pkgutil.get_data(\"spellchecker\", filename)\n",
" except FileNotFoundError:\n",
" msg = (\n",
" \"The provided dictionary language ({}) does not \" \"exist!\"\n",
" ).format(lang.lower())\n",
" raise ValueError(msg)\n",
"\n",
" lang_dict = json.loads(gzip.decompress(json_open).decode(\"utf-8\"))\n",
" self._word_frequency.load_json(lang_dict)\n",
"\n",
" def __contains__(self, key):\n",
" \"\"\" setup easier known checks \"\"\"\n",
" key = ensure_unicode(key)\n",
" return key in self._word_frequency\n",
"\n",
" def __getitem__(self, key):\n",
" \"\"\" setup easier frequency checks \"\"\"\n",
" key = ensure_unicode(key)\n",
" return self._word_frequency[key]\n",
"\n",
" def __iter__(self):\n",
" \"\"\" setup iter support \"\"\"\n",
" for word in self._word_frequency.dictionary:\n",
" yield word\n",
"\n",
" @property\n",
" def word_frequency(self):\n",
" \"\"\" WordFrequency: An encapsulation of the word frequency `dictionary`\n",
"\n",
" Note:\n",
" Not settable \"\"\"\n",
" return self._word_frequency\n",
"\n",
" @property\n",
" def distance(self):\n",
" \"\"\" int: The maximum edit distance to calculate\n",
"\n",
" Note:\n",
" Valid values are 1 or 2; if an invalid value is passed, \\\n",
" defaults to 2 \"\"\"\n",
" return self._distance\n",
"\n",
" @distance.setter\n",
" def distance(self, val):\n",
" \"\"\" set the distance parameter \"\"\"\n",
" tmp = 2\n",
" try:\n",
" int(val)\n",
" if val > 0 and val <= 2:\n",
" tmp = val\n",
" except (ValueError, TypeError):\n",
" pass\n",
" self._distance = tmp\n",
"\n",
" def split_words(self, text):\n",
" \"\"\" Split text into individual `words` using either a simple whitespace\n",
" regex or the passed in tokenizer\n",
"\n",
" Args:\n",
" text (str): The text to split into individual words\n",
" Returns:\n",
" list(str): A listing of all words in the provided text \"\"\"\n",
" text = ensure_unicode(text)\n",
" return self._tokenizer(text)\n",
"\n",
" def export(self, filepath, encoding=\"utf-8\", gzipped=True):\n",
" \"\"\" Export the word frequency list for import in the future\n",
"\n",
" Args:\n",
" filepath (str): The filepath to the exported dictionary\n",
" encoding (str): The encoding of the resulting output\n",
" gzipped (bool): Whether to gzip the dictionary or not \"\"\"\n",
" data = json.dumps(self.word_frequency.dictionary, sort_keys=True)\n",
" write_file(filepath, encoding, gzipped, data)\n",
"\n",
" def word_usage_frequency(self, word, total_words=None):\n",
" \"\"\" Calculate the frequency to the `word` provided as seen across the\n",
" entire dictionary\n",
"\n",
" Args:\n",
" word (str): The word for which the word probability is \\\n",
" calculated\n",
" total_words (int): The total number of words to use in the \\\n",
" calculation; use the default for using the whole word \\\n",
" frequency\n",
" Returns:\n",
" float: The probability that the word is the correct word \"\"\"\n",
" if not total_words:\n",
" total_words = self._word_frequency.total_words\n",
" word = ensure_unicode(word)\n",
" return self._word_frequency.dictionary[word] / total_words\n",
"\n",
" @deprecated(\"Deprecated as of version 0.6.1; use word_usage_frequency instead\")\n",
" def word_probability(self, word, total_words=None):\n",
" \"\"\" Calculate the frequency to the `word` provided as seen across the\n",
" entire dictionary; function was a misnomar and is therefore\n",
" deprecated!\n",
"\n",
" Args:\n",
" word (str): The word for which the word probability is \\\n",
" calculated\n",
" total_words (int): The total number of words to use in the \\\n",
" calculation; use the default for using the whole word \\\n",
" frequency\n",
" Returns:\n",
" float: The probability that the word is the correct word\n",
" Note:\n",
" Deprecated as of version 0.6.1; use `word_usage_frequency` \\\n",
" instead\n",
" Note:\n",
" Will be removed in version 0.6.3 \"\"\"\n",
" return self.word_usage_frequency(word, total_words)\n",
"\n",
" def correction(self, word):\n",
" \"\"\" The most probable correct spelling for the word\n",
"\n",
" Args:\n",
" word (str): The word to correct\n",
" Returns:\n",
" str: The most likely candidate \"\"\"\n",
" word = ensure_unicode(word)\n",
" candidates = list(self.candidates(word))\n",
" return max(sorted(candidates), key=self.__getitem__)\n",
"\n",
" def candidates(self, word):\n",
" \"\"\" Generate possible spelling corrections for the provided word up to\n",
" an edit distance of two, if and only when needed\n",
"\n",
" Args:\n",
" word (str): The word for which to calculate candidate spellings\n",
" Returns:\n",
" set: The set of words that are possible candidates \"\"\"\n",
" word = ensure_unicode(word)\n",
" if self.known([word]): # short-cut if word is correct already\n",
" return {word}\n",
"\n",
" if not self._check_if_should_check(word):\n",
" return {word}\n",
"\n",
" # get edit distance 1...\n",
" res = [x for x in self.edit_distance_1(word)]\n",
" tmp = self.known(res)\n",
" if tmp:\n",
" return tmp\n",
" # if still not found, use the edit distance 1 to calc edit distance 2\n",
" if self._distance == 2:\n",
" tmp = self.known([x for x in self.__edit_distance_alt(res)])\n",
" if tmp:\n",
" return tmp\n",
" return {word}\n",
"\n",
" def known(self, words):\n",
" \"\"\" The subset of `words` that appear in the dictionary of words\n",
"\n",
" Args:\n",
" words (list): List of words to determine which are in the \\\n",
" corpus\n",
" Returns:\n",
" set: The set of those words from the input that are in the \\\n",
" corpus \"\"\"\n",
" words = [ensure_unicode(w) for w in words]\n",
" tmp = [w if self._case_sensitive else w.lower() for w in words]\n",
" return set(\n",
" w\n",
" for w in tmp\n",
" if w in self._word_frequency.dictionary and self._check_if_should_check(w)\n",
" )\n",
"\n",
" def unknown(self, words):\n",
" \"\"\" The subset of `words` that do not appear in the dictionary\n",
"\n",
" Args:\n",
" words (list): List of words to determine which are not in the \\\n",
" corpus\n",
" Returns:\n",
" set: The set of those words from the input that are not in \\\n",
" the corpus \"\"\"\n",
" words = [ensure_unicode(w) for w in words]\n",
" tmp = [\n",
" w if self._case_sensitive else w.lower()\n",
" for w in words\n",
" if self._check_if_should_check(w)\n",
" ]\n",
" return set(w for w in tmp if w not in self._word_frequency.dictionary)\n",
"\n",
" def edit_distance_1(self, word):\n",
" \"\"\" Compute all strings that are one edit away from `word` using only\n",
" the letters in the corpus\n",
"\n",
" Args:\n",
" word (str): The word for which to calculate the edit distance\n",
" Returns:\n",
" set: The set of strings that are edit distance one from the \\\n",
" provided word \"\"\"\n",
" word = (\n",
" ensure_unicode(word).lower()\n",
" if not self._case_sensitive\n",
" else ensure_unicode(word)\n",
" )\n",
" if self._check_if_should_check(word) is False:\n",
" return {word}\n",
" letters = self._word_frequency.letters\n",
" splits = [(word[:i], word[i:]) for i in range(len(word) + 1)]\n",
" deletes = [L + R[1:] for L, R in splits if R]\n",
" transposes = [L + R[1] + R[0] + R[2:] for L, R in splits if len(R) > 1]\n",
" replaces = [L + c + R[1:] for L, R in splits if R for c in letters]\n",
" inserts = [L + c + R for L, R in splits for c in letters]\n",
" return set(deletes + transposes + replaces + inserts)\n",
"\n",
" def edit_distance_2(self, word):\n",
" \"\"\" Compute all strings that are two edits away from `word` using only\n",
" the letters in the corpus\n",
"\n",
" Args:\n",
" word (str): The word for which to calculate the edit distance\n",
" Returns:\n",
" set: The set of strings that are edit distance two from the \\\n",
" provided word \"\"\"\n",
" word = (\n",
" ensure_unicode(word).lower()\n",
" if not self._case_sensitive\n",
" else ensure_unicode(word)\n",
" )\n",
" return [\n",
" e2 for e1 in self.edit_distance_1(word) for e2 in self.edit_distance_1(e1)\n",
" ]\n",
"\n",
" def __edit_distance_alt(self, words):\n",
" \"\"\" Compute all strings that are 1 edits away from all the words using\n",
" only the letters in the corpus\n",
"\n",
" Args:\n",
" words (list): The words for which to calculate the edit distance\n",
" Returns:\n",
" set: The set of strings that are edit distance two from the \\\n",
" provided words \"\"\"\n",
" words = [ensure_unicode(w) for w in words]\n",
" tmp = [\n",
" w if self._case_sensitive else w.lower()\n",
" for w in words\n",
" if self._check_if_should_check(w)\n",
" ]\n",
" return [e2 for e1 in tmp for e2 in self.known(self.edit_distance_1(e1))]\n",
"\n",
" def _check_if_should_check(self, word):\n",
" if len(word) == 1 and word in string.punctuation:\n",
" return False\n",
" if (\n",
" len(word) > self._word_frequency.longest_word_length + 3\n",
" ): # magic number to allow removal of up to 2 letters.\n",
" return False\n",
" try: # check if it is a number (int, float, etc)\n",
" float(word)\n",
" return False\n",
" except ValueError:\n",
" pass\n",
"\n",
" return True\n",
"\n",
"\n",
"class WordFrequency(object):\n",
" \"\"\" Store the `dictionary` as a word frequency list while allowing for\n",
" different methods to load the data and update over time \"\"\"\n",
"\n",
" __slots__ = [\n",
" \"_dictionary\",\n",
" \"_total_words\",\n",
" \"_unique_words\",\n",
" \"_letters\",\n",
" \"_tokenizer\",\n",
" \"_case_sensitive\",\n",
" \"_longest_word_length\",\n",
" ]\n",
"\n",
" def __init__(self, tokenizer=None, case_sensitive=False):\n",
" self._dictionary = Counter()\n",
" self._total_words = 0\n",
" self._unique_words = 0\n",
" self._letters = set()\n",
" self._case_sensitive = case_sensitive\n",
" self._longest_word_length = 0\n",
"\n",
" self._tokenizer = _parse_into_words\n",
" if tokenizer is not None:\n",
" self._tokenizer = tokenizer\n",
"\n",
" def __contains__(self, key):\n",
" \"\"\" turn on contains \"\"\"\n",
" key = ensure_unicode(key)\n",
" key = key if self._case_sensitive else key.lower()\n",
" return key in self._dictionary\n",
"\n",
" def __getitem__(self, key):\n",
" \"\"\" turn on getitem \"\"\"\n",
" key = ensure_unicode(key)\n",
" key = key if self._case_sensitive else key.lower()\n",
" return self._dictionary[key]\n",
"\n",
" def __iter__(self):\n",
" \"\"\" turn on iter support \"\"\"\n",
" for word in self._dictionary:\n",
" yield word\n",
"\n",
" def pop(self, key, default=None):\n",
" \"\"\" Remove the key and return the associated value or default if not\n",
" found\n",
"\n",
" Args:\n",
" key (str): The key to remove\n",
" default (obj): The value to return if key is not present \"\"\"\n",
" key = ensure_unicode(key)\n",
" key = key if self._case_sensitive else key.lower()\n",
" return self._dictionary.pop(key, default)\n",
"\n",
" @property\n",
" def dictionary(self):\n",
" \"\"\" Counter: A counting dictionary of all words in the corpus and the \\\n",
" number of times each has been seen\n",
"\n",
" Note:\n",
" Not settable \"\"\"\n",
" return self._dictionary\n",
"\n",
" @property\n",
" def total_words(self):\n",
" \"\"\" int: The sum of all word occurances in the word frequency \\\n",
" dictionary\n",
"\n",
" Note:\n",
" Not settable \"\"\"\n",
" return self._total_words\n",
"\n",
" @property\n",
" def unique_words(self):\n",
" \"\"\" int: The total number of unique words in the word frequency list\n",
"\n",
" Note:\n",
" Not settable \"\"\"\n",
" return self._unique_words\n",
"\n",
" @property\n",
" def letters(self):\n",
" \"\"\" str: The listing of all letters found within the corpus\n",
"\n",
" Note:\n",
" Not settable \"\"\"\n",
" return self._letters\n",
"\n",
" @property\n",
" def longest_word_length(self):\n",
" \"\"\" int: The longest word length in the dictionary\n",
"\n",
" Note:\n",
" Not settable \"\"\"\n",
" return self._longest_word_length\n",
"\n",
" def tokenize(self, text):\n",
" \"\"\" Tokenize the provided string object into individual words\n",
"\n",
" Args:\n",
" text (str): The string object to tokenize\n",
" Yields:\n",
" str: The next `word` in the tokenized string\n",
" Note:\n",
" This is the same as the `spellchecker.split_words()` unless \\\n",
" a tokenizer function was provided. \"\"\"\n",
" text = ensure_unicode(text)\n",
" for word in self._tokenizer(text):\n",
" yield word if self._case_sensitive else word.lower()\n",
"\n",
" def keys(self):\n",
" \"\"\" Iterator over the key of the dictionary\n",
"\n",
" Yields:\n",
" str: The next key in the dictionary\n",
" Note:\n",
" This is the same as `spellchecker.words()` \"\"\"\n",
" for key in self._dictionary.keys():\n",
" yield key\n",
"\n",
" def words(self):\n",
" \"\"\" Iterator over the words in the dictionary\n",
"\n",
" Yields:\n",
" str: The next word in the dictionary\n",
" Note:\n",
" This is the same as `spellchecker.keys()` \"\"\"\n",
" for word in self._dictionary.keys():\n",
" yield word\n",
"\n",
" def items(self):\n",
" \"\"\" Iterator over the words in the dictionary\n",
"\n",
" Yields:\n",
" str: The next word in the dictionary\n",
" int: The number of instances in the dictionary\n",
" Note:\n",
" This is the same as `dict.items()` \"\"\"\n",
" for word in self._dictionary.keys():\n",
" yield word, self._dictionary[word]\n",
"\n",
" def load_dictionary(self, filename, encoding=\"utf-8\"):\n",
" \"\"\" Load in a pre-built word frequency list\n",
"\n",
" Args:\n",
" filename (str): The filepath to the json (optionally gzipped) \\\n",
" file to be loaded\n",
" encoding (str): The encoding of the dictionary \"\"\"\n",
" with load_file(filename, encoding) as data:\n",
" data = data if self._case_sensitive else data.lower()\n",
" self._dictionary.update(json.loads(data))\n",
" self._update_dictionary()\n",
"\n",
" def load_json(self, data):\n",
" \"\"\" Load in a pre-built word frequency list\n",
"\n",
" Args:\n",
" data (dict): The dictionary to be loaded \"\"\"\n",
" self._dictionary.update(data)\n",
" self._update_dictionary()\n",
"\n",
" def load_text_file(self, filename, encoding=\"utf-8\", tokenizer=None):\n",
" \"\"\" Load in a text file from which to generate a word frequency list\n",
"\n",
" Args:\n",
" filename (str): The filepath to the text file to be loaded\n",
" encoding (str): The encoding of the text file\n",
" tokenizer (function): The function to use to tokenize a string\n",
" \"\"\"\n",
" with load_file(filename, encoding=encoding) as data:\n",
" self.load_text(data, tokenizer)\n",
"\n",
" def load_text(self, text, tokenizer=None):\n",
" \"\"\" Load text from which to generate a word frequency list\n",
"\n",
" Args:\n",
" text (str): The text to be loaded\n",
" tokenizer (function): The function to use to tokenize a string\n",
" \"\"\"\n",
" text = ensure_unicode(text)\n",
" if tokenizer:\n",
" words = [x if self._case_sensitive else x.lower() for x in tokenizer(text)]\n",
" else:\n",
" words = self.tokenize(text)\n",
"\n",
" self._dictionary.update(words)\n",
" self._update_dictionary()\n",
"\n",
" def load_words(self, words):\n",
" \"\"\" Load a list of words from which to generate a word frequency list\n",
"\n",
" Args:\n",
" words (list): The list of words to be loaded \"\"\"\n",
" words = [ensure_unicode(w) for w in words]\n",
" self._dictionary.update(\n",
" [word if self._case_sensitive else word.lower() for word in words]\n",
" )\n",
" self._update_dictionary()\n",
"\n",
" def add(self, word):\n",
" \"\"\" Add a word to the word frequency list\n",
"\n",
" Args:\n",
" word (str): The word to add \"\"\"\n",
" word = ensure_unicode(word)\n",
" self.load_words([word])\n",
"\n",
" def remove_words(self, words):\n",
" \"\"\" Remove a list of words from the word frequency list\n",
"\n",
" Args:\n",
" words (list): The list of words to remove \"\"\"\n",
" words = [ensure_unicode(w) for w in words]\n",
" for word in words:\n",
" self._dictionary.pop(word if self._case_sensitive else word.lower())\n",
" self._update_dictionary()\n",
"\n",
" def remove(self, word):\n",
" \"\"\" Remove a word from the word frequency list\n",
"\n",
" Args:\n",
" word (str): The word to remove \"\"\"\n",
" word = ensure_unicode(word)\n",
" self._dictionary.pop(word if self._case_sensitive else word.lower())\n",
" self._update_dictionary()\n",
"\n",
" def remove_by_threshold(self, threshold=5):\n",
" \"\"\" Remove all words at, or below, the provided threshold\n",
"\n",
" Args:\n",
" threshold (int): The threshold at which a word is to be \\\n",
" removed \"\"\"\n",
" keys = [x for x in self._dictionary.keys()]\n",
" for key in keys:\n",
" if self._dictionary[key] <= threshold:\n",
" self._dictionary.pop(key)\n",
" self._update_dictionary()\n",
"\n",
" def _update_dictionary(self):\n",
" \"\"\" Update the word frequency object \"\"\"\n",
" self._longest_word_length = 0\n",
" self._total_words = sum(self._dictionary.values())\n",
" self._unique_words = len(self._dictionary.keys())\n",
" self._letters = set()\n",
" for key in self._dictionary:\n",
" if len(key) > self._longest_word_length:\n",
" self._longest_word_length = len(key)\n",
" self._letters.update(key)\n",
" \n",
"\n",
"try:\n",
" with open(Base(),\n",
" mode=\"r+\") as f:\n",
" assert f.read() == \"hi!\\n\"\n",
"except:\n",
" pass\n",
"\n"
]
},
{
"cell_type": "code",
"execution_count": 40,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"7.12 ms ± 37.6 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)\n"
]
}
],
"source": [
"%%timeit\n",
"\n",
"\"\"\" Additional utility functions \"\"\"\n",
"import contextlib\n",
"import gzip\n",
"import functools\n",
"import re\n",
"import warnings\n",
"\n",
"\n",
"def fail_after(version):\n",
" \"\"\" Decorator to add to tests to ensure that they fail if a deprecated\n",
" feature is not removed before the specified version\n",
"\n",
" Args:\n",
" version (str): The version to check against \"\"\"\n",
"\n",
" def decorator_wrapper(func):\n",
" @functools.wraps(func)\n",
" def test_inner(*args, **kwargs):\n",
" if [int(x) for x in version.split(\".\")] <= [\n",
" int(x) for x in __version__.split(\".\")\n",
" ]:\n",
" msg = \"The function {} must be fully removed as it is depricated and must be removed by version {}\".format(\n",
" func.__name__, version\n",
" )\n",
" raise AssertionError(msg)\n",
" return func(*args, **kwargs)\n",
"\n",
" return test_inner\n",
"\n",
" return decorator_wrapper\n",
"\n",
"\n",
"def deprecated(message=\"\"):\n",
" \"\"\" A simplistic decorator to mark functions as deprecated. The function\n",
" will pass a message to the user on the first use of the function\n",
"\n",
" Args:\n",
" message (str): The message to display if the function is deprecated\n",
" \"\"\"\n",
"\n",
" def decorator_wrapper(func):\n",
" @functools.wraps(func)\n",
" def function_wrapper(*args, **kwargs):\n",
" func_name = func.__name__\n",
" if func_name not in function_wrapper.deprecated_items:\n",
" msg = \"Function {} is now deprecated! {}\".format(func.__name__, message)\n",
" warnings.warn(msg, category=DeprecationWarning, stacklevel=2)\n",
" function_wrapper.deprecated_items.add(func_name)\n",
"\n",
" return func(*args, **kwargs)\n",
"\n",
" # set this up the first time the decorator is called\n",
" function_wrapper.deprecated_items = set()\n",
"\n",
" return function_wrapper\n",
"\n",
" return decorator_wrapper\n",
"\n",
"\n",
"def ensure_unicode(_str, encoding=\"utf-8\"):\n",
" \"\"\" Simplify checking if passed in data are bytes or a string and decode\n",
" bytes into unicode.\n",
"\n",
" Args:\n",
" _str (str): The input string (possibly bytes)\n",
" encoding (str): The encoding to use if input is bytes\n",
" Returns:\n",
" str: The encoded string\n",
" \"\"\"\n",
" if isinstance(_str, bytes):\n",
" return _str.decode(encoding)\n",
" return _str\n",
"\n",
"\n",
"@contextlib.contextmanager\n",
"def __gzip_read(filename, mode=\"rb\", encoding=\"UTF-8\"):\n",
" \"\"\" Context manager to correctly handle the decoding of the output of \\\n",
" the gzip file\n",
"\n",
" Args:\n",
" filename (str): The filename to open\n",
" mode (str): The mode to read the data\n",
" encoding (str): The file encoding to use\n",
" Yields:\n",
" str: The string data from the gzip file read\n",
" \"\"\"\n",
" with gzip.open(filename, mode=mode, encoding=encoding) as fobj:\n",
" yield fobj.read()\n",
"\n",
"\n",
"@contextlib.contextmanager\n",
"def load_file(filename, encoding):\n",
" \"\"\" Context manager to handle opening a gzip or text file correctly and\n",
" reading all the data\n",
"\n",
" Args:\n",
" filename (str): The filename to open\n",
" encoding (str): The file encoding to use\n",
" Yields:\n",
" str: The string data from the file read\n",
" \"\"\"\n",
" if filename[-3:].lower() == \".gz\":\n",
" with __gzip_read(filename, mode=\"rt\", encoding=encoding) as data:\n",
" yield data\n",
" else:\n",
" with open(filename, mode=\"r\", encoding=encoding) as fobj:\n",
" yield fobj.read()\n",
"\n",
"\n",
"def write_file(filepath, encoding, gzipped, data):\n",
" \"\"\" Write the data to file either as a gzip file or text based on the\n",
" gzipped parameter\n",
"\n",
" Args:\n",
" filepath (str): The filename to open\n",
" encoding (str): The file encoding to use\n",
" gzipped (bool): Whether the file should be gzipped or not\n",
" data (str): The data to be written out\n",
" \"\"\"\n",
" if gzipped:\n",
" with gzip.open(filepath, \"wt\") as fobj:\n",
" fobj.write(data)\n",
" else:\n",
" with open(filepath, \"w\", encoding=encoding) as fobj:\n",
" fobj.write(data)\n",
"\n",
"\n",
"def _parse_into_words(text):\n",
" \"\"\" Parse the text into words; currently removes punctuation except for\n",
" apostrophies.\n",
"\n",
" Args:\n",
" text (str): The text to split into words\n",
" \"\"\"\n",
" # see: https://stackoverflow.com/a/12705513\n",
" return re.findall(r\"(\\w[\\w']*\\w|\\w)\", text)\n",
"\n",
"\n",
"\"\"\" SpellChecker Module; simple, intuitive spell checker based on the post by\n",
" Peter Norvig. See: https://norvig.com/spell-correct.html \"\"\"\n",
"import gzip\n",
"import json\n",
"import pkgutil\n",
"import string\n",
"from collections import Counter\n",
"\n",
"\n",
"class SpellChecker(object):\n",
" \"\"\" The SpellChecker class encapsulates the basics needed to accomplish a\n",
" simple spell checking algorithm. It is based on the work by\n",
" Peter Norvig (https://norvig.com/spell-correct.html)\n",
"\n",
" Args:\n",
" language (str): The language of the dictionary to load or None \\\n",
" for no dictionary. Supported languages are `en`, `es`, `de`, `fr`, \\\n",
" `pt` and `ru`. Defaults to `en`. A list of languages may be \\\n",
" provided and all languages will be loaded.\n",
" local_dictionary (str): The path to a locally stored word \\\n",
" frequency dictionary; if provided, no language will be loaded\n",
" distance (int): The edit distance to use. Defaults to 2.\n",
" case_sensitive (bool): Flag to use a case sensitive dictionary or \\\n",
" not, only available when not using a language dictionary.\n",
" Note:\n",
" Using a case sensitive dictionary can be slow to correct words.\"\"\"\n",
"\n",
" __slots__ = [\"_distance\", \"_word_frequency\", \"_tokenizer\", \"_case_sensitive\"]\n",
"\n",
" def __init__(\n",
" self,\n",
" language=\"en\",\n",
" local_dictionary=None,\n",
" distance=2,\n",
" tokenizer=None,\n",
" case_sensitive=False,\n",
" ):\n",
" self._distance = None\n",
" self.distance = distance # use the setter value check\n",
"\n",
" self._tokenizer = _parse_into_words\n",
" if tokenizer is not None:\n",
" self._tokenizer = tokenizer\n",
"\n",
" self._case_sensitive = case_sensitive if not language else False\n",
" self._word_frequency = WordFrequency(self._tokenizer, self._case_sensitive)\n",
"\n",
" if local_dictionary:\n",
" self._word_frequency.load_dictionary(local_dictionary)\n",
" elif language:\n",
" if not isinstance(language, list):\n",
" language = [language]\n",
" for lang in language:\n",
" filename = \"resources/{}.json.gz\".format(lang.lower())\n",
" try:\n",
" json_open = pkgutil.get_data(\"spellchecker\", filename)\n",
" except FileNotFoundError:\n",
" msg = (\n",
" \"The provided dictionary language ({}) does not \" \"exist!\"\n",
" ).format(lang.lower())\n",
" raise ValueError(msg)\n",
"\n",
" lang_dict = json.loads(gzip.decompress(json_open).decode(\"utf-8\"))\n",
" self._word_frequency.load_json(lang_dict)\n",
"\n",
" def __contains__(self, key):\n",
" \"\"\" setup easier known checks \"\"\"\n",
" key = ensure_unicode(key)\n",
" return key in self._word_frequency\n",
"\n",
" def __getitem__(self, key):\n",
" \"\"\" setup easier frequency checks \"\"\"\n",
" key = ensure_unicode(key)\n",
" return self._word_frequency[key]\n",
"\n",
" def __iter__(self):\n",
" \"\"\" setup iter support \"\"\"\n",
" for word in self._word_frequency.dictionary:\n",
" yield word\n",
"\n",
" @property\n",
" def word_frequency(self):\n",
" \"\"\" WordFrequency: An encapsulation of the word frequency `dictionary`\n",
"\n",
" Note:\n",
" Not settable \"\"\"\n",
" return self._word_frequency\n",
"\n",
" @property\n",
" def distance(self):\n",
" \"\"\" int: The maximum edit distance to calculate\n",
"\n",
" Note:\n",
" Valid values are 1 or 2; if an invalid value is passed, \\\n",
" defaults to 2 \"\"\"\n",
" return self._distance\n",
"\n",
" @distance.setter\n",
" def distance(self, val):\n",
" \"\"\" set the distance parameter \"\"\"\n",
" tmp = 2\n",
" try:\n",
" int(val)\n",
" if val > 0 and val <= 2:\n",
" tmp = val\n",
" except (ValueError, TypeError):\n",
" pass\n",
" self._distance = tmp\n",
"\n",
" def split_words(self, text):\n",
" \"\"\" Split text into individual `words` using either a simple whitespace\n",
" regex or the passed in tokenizer\n",
"\n",
" Args:\n",
" text (str): The text to split into individual words\n",
" Returns:\n",
" list(str): A listing of all words in the provided text \"\"\"\n",
" text = ensure_unicode(text)\n",
" return self._tokenizer(text)\n",
"\n",
" def export(self, filepath, encoding=\"utf-8\", gzipped=True):\n",
" \"\"\" Export the word frequency list for import in the future\n",
"\n",
" Args:\n",
" filepath (str): The filepath to the exported dictionary\n",
" encoding (str): The encoding of the resulting output\n",
" gzipped (bool): Whether to gzip the dictionary or not \"\"\"\n",
" data = json.dumps(self.word_frequency.dictionary, sort_keys=True)\n",
" write_file(filepath, encoding, gzipped, data)\n",
"\n",
" def word_usage_frequency(self, word, total_words=None):\n",
" \"\"\" Calculate the frequency to the `word` provided as seen across the\n",
" entire dictionary\n",
"\n",
" Args:\n",
" word (str): The word for which the word probability is \\\n",
" calculated\n",
" total_words (int): The total number of words to use in the \\\n",
" calculation; use the default for using the whole word \\\n",
" frequency\n",
" Returns:\n",
" float: The probability that the word is the correct word \"\"\"\n",
" if not total_words:\n",
" total_words = self._word_frequency.total_words\n",
" word = ensure_unicode(word)\n",
" return self._word_frequency.dictionary[word] / total_words\n",
"\n",
" @deprecated(\"Deprecated as of version 0.6.1; use word_usage_frequency instead\")\n",
" def word_probability(self, word, total_words=None):\n",
" \"\"\" Calculate the frequency to the `word` provided as seen across the\n",
" entire dictionary; function was a misnomar and is therefore\n",
" deprecated!\n",
"\n",
" Args:\n",
" word (str): The word for which the word probability is \\\n",
" calculated\n",
" total_words (int): The total number of words to use in the \\\n",
" calculation; use the default for using the whole word \\\n",
" frequency\n",
" Returns:\n",
" float: The probability that the word is the correct word\n",
" Note:\n",
" Deprecated as of version 0.6.1; use `word_usage_frequency` \\\n",
" instead\n",
" Note:\n",
" Will be removed in version 0.6.3 \"\"\"\n",
" return self.word_usage_frequency(word, total_words)\n",
"\n",
" def correction(self, word):\n",
" \"\"\" The most probable correct spelling for the word\n",
"\n",
" Args:\n",
" word (str): The word to correct\n",
" Returns:\n",
" str: The most likely candidate \"\"\"\n",
" word = ensure_unicode(word)\n",
" candidates = list(self.candidates(word))\n",
" return max(sorted(candidates), key=self.__getitem__)\n",
"\n",
" def candidates(self, word):\n",
" \"\"\" Generate possible spelling corrections for the provided word up to\n",
" an edit distance of two, if and only when needed\n",
"\n",
" Args:\n",
" word (str): The word for which to calculate candidate spellings\n",
" Returns:\n",
" set: The set of words that are possible candidates \"\"\"\n",
" word = ensure_unicode(word)\n",
" if self.known([word]): # short-cut if word is correct already\n",
" return {word}\n",
"\n",
" if not self._check_if_should_check(word):\n",
" return {word}\n",
"\n",
" # get edit distance 1...\n",
" res = [x for x in self.edit_distance_1(word)]\n",
" tmp = self.known(res)\n",
" if tmp:\n",
" return tmp\n",
" # if still not found, use the edit distance 1 to calc edit distance 2\n",
" if self._distance == 2:\n",
" tmp = self.known([x for x in self.__edit_distance_alt(res)])\n",
" if tmp:\n",
" return tmp\n",
" return {word}\n",
"\n",
" def known(self, words):\n",
" \"\"\" The subset of `words` that appear in the dictionary of words\n",
"\n",
" Args:\n",
" words (list): List of words to determine which are in the \\\n",
" corpus\n",
" Returns:\n",
" set: The set of those words from the input that are in the \\\n",
" corpus \"\"\"\n",
" words = [ensure_unicode(w) for w in words]\n",
" tmp = [w if self._case_sensitive else w.lower() for w in words]\n",
" return set(\n",
" w\n",
" for w in tmp\n",
" if w in self._word_frequency.dictionary and self._check_if_should_check(w)\n",
" )\n",
"\n",
" def unknown(self, words):\n",
" \"\"\" The subset of `words` that do not appear in the dictionary\n",
"\n",
" Args:\n",
" words (list): List of words to determine which are not in the \\\n",
" corpus\n",
" Returns:\n",
" set: The set of those words from the input that are not in \\\n",
" the corpus \"\"\"\n",
" words = [ensure_unicode(w) for w in words]\n",
" tmp = [\n",
" w if self._case_sensitive else w.lower()\n",
" for w in words\n",
" if self._check_if_should_check(w)\n",
" ]\n",
" return set(w for w in tmp if w not in self._word_frequency.dictionary)\n",
"\n",
" def edit_distance_1(self, word):\n",
" \"\"\" Compute all strings that are one edit away from `word` using only\n",
" the letters in the corpus\n",
"\n",
" Args:\n",
" word (str): The word for which to calculate the edit distance\n",
" Returns:\n",
" set: The set of strings that are edit distance one from the \\\n",
" provided word \"\"\"\n",
" word = (\n",
" ensure_unicode(word).lower()\n",
" if not self._case_sensitive\n",
" else ensure_unicode(word)\n",
" )\n",
" if self._check_if_should_check(word) is False:\n",
" return {word}\n",
" letters = self._word_frequency.letters\n",
" splits = [(word[:i], word[i:]) for i in range(len(word) + 1)]\n",
" deletes = [L + R[1:] for L, R in splits if R]\n",
" transposes = [L + R[1] + R[0] + R[2:] for L, R in splits if len(R) > 1]\n",
" replaces = [L + c + R[1:] for L, R in splits if R for c in letters]\n",
" inserts = [L + c + R for L, R in splits for c in letters]\n",
" return set(deletes + transposes + replaces + inserts)\n",
"\n",
" def edit_distance_2(self, word):\n",
" \"\"\" Compute all strings that are two edits away from `word` using only\n",
" the letters in the corpus\n",
"\n",
" Args:\n",
" word (str): The word for which to calculate the edit distance\n",
" Returns:\n",
" set: The set of strings that are edit distance two from the \\\n",
" provided word \"\"\"\n",
" word = (\n",
" ensure_unicode(word).lower()\n",
" if not self._case_sensitive\n",
" else ensure_unicode(word)\n",
" )\n",
" return [\n",
" e2 for e1 in self.edit_distance_1(word) for e2 in self.edit_distance_1(e1)\n",
" ]\n",
"\n",
" def __edit_distance_alt(self, words):\n",
" \"\"\" Compute all strings that are 1 edits away from all the words using\n",
" only the letters in the corpus\n",
"\n",
" Args:\n",
" words (list): The words for which to calculate the edit distance\n",
" Returns:\n",
" set: The set of strings that are edit distance two from the \\\n",
" provided words \"\"\"\n",
" words = [ensure_unicode(w) for w in words]\n",
" tmp = [\n",
" w if self._case_sensitive else w.lower()\n",
" for w in words\n",
" if self._check_if_should_check(w)\n",
" ]\n",
" return [e2 for e1 in tmp for e2 in self.known(self.edit_distance_1(e1))]\n",
"\n",
" def _check_if_should_check(self, word):\n",
" if len(word) == 1 and word in string.punctuation:\n",
" return False\n",
" if (\n",
" len(word) > self._word_frequency.longest_word_length + 3\n",
" ): # magic number to allow removal of up to 2 letters.\n",
" return False\n",
" try: # check if it is a number (int, float, etc)\n",
" float(word)\n",
" return False\n",
" except ValueError:\n",
" pass\n",
"\n",
" return True\n",
"\n",
"\n",
"class WordFrequency(object):\n",
" \"\"\" Store the `dictionary` as a word frequency list while allowing for\n",
" different methods to load the data and update over time \"\"\"\n",
"\n",
" __slots__ = [\n",
" \"_dictionary\",\n",
" \"_total_words\",\n",
" \"_unique_words\",\n",
" \"_letters\",\n",
" \"_tokenizer\",\n",
" \"_case_sensitive\",\n",
" \"_longest_word_length\",\n",
" ]\n",
"\n",
" def __init__(self, tokenizer=None, case_sensitive=False):\n",
" self._dictionary = Counter()\n",
" self._total_words = 0\n",
" self._unique_words = 0\n",
" self._letters = set()\n",
" self._case_sensitive = case_sensitive\n",
" self._longest_word_length = 0\n",
"\n",
" self._tokenizer = _parse_into_words\n",
" if tokenizer is not None:\n",
" self._tokenizer = tokenizer\n",
"\n",
" def __contains__(self, key):\n",
" \"\"\" turn on contains \"\"\"\n",
" key = ensure_unicode(key)\n",
" key = key if self._case_sensitive else key.lower()\n",
" return key in self._dictionary\n",
"\n",
" def __getitem__(self, key):\n",
" \"\"\" turn on getitem \"\"\"\n",
" key = ensure_unicode(key)\n",
" key = key if self._case_sensitive else key.lower()\n",
" return self._dictionary[key]\n",
"\n",
" def __iter__(self):\n",
" \"\"\" turn on iter support \"\"\"\n",
" for word in self._dictionary:\n",
" yield word\n",
"\n",
" def pop(self, key, default=None):\n",
" \"\"\" Remove the key and return the associated value or default if not\n",
" found\n",
"\n",
" Args:\n",
" key (str): The key to remove\n",
" default (obj): The value to return if key is not present \"\"\"\n",
" key = ensure_unicode(key)\n",
" key = key if self._case_sensitive else key.lower()\n",
" return self._dictionary.pop(key, default)\n",
"\n",
" @property\n",
" def dictionary(self):\n",
" \"\"\" Counter: A counting dictionary of all words in the corpus and the \\\n",
" number of times each has been seen\n",
"\n",
" Note:\n",
" Not settable \"\"\"\n",
" return self._dictionary\n",
"\n",
" @property\n",
" def total_words(self):\n",
" \"\"\" int: The sum of all word occurances in the word frequency \\\n",
" dictionary\n",
"\n",
" Note:\n",
" Not settable \"\"\"\n",
" return self._total_words\n",
"\n",
" @property\n",
" def unique_words(self):\n",
" \"\"\" int: The total number of unique words in the word frequency list\n",
"\n",
" Note:\n",
" Not settable \"\"\"\n",
" return self._unique_words\n",
"\n",
" @property\n",
" def letters(self):\n",
" \"\"\" str: The listing of all letters found within the corpus\n",
"\n",
" Note:\n",
" Not settable \"\"\"\n",
" return self._letters\n",
"\n",
" @property\n",
" def longest_word_length(self):\n",
" \"\"\" int: The longest word length in the dictionary\n",
"\n",
" Note:\n",
" Not settable \"\"\"\n",
" return self._longest_word_length\n",
"\n",
" def tokenize(self, text):\n",
" \"\"\" Tokenize the provided string object into individual words\n",
"\n",
" Args:\n",
" text (str): The string object to tokenize\n",
" Yields:\n",
" str: The next `word` in the tokenized string\n",
" Note:\n",
" This is the same as the `spellchecker.split_words()` unless \\\n",
" a tokenizer function was provided. \"\"\"\n",
" text = ensure_unicode(text)\n",
" for word in self._tokenizer(text):\n",
" yield word if self._case_sensitive else word.lower()\n",
"\n",
" def keys(self):\n",
" \"\"\" Iterator over the key of the dictionary\n",
"\n",
" Yields:\n",
" str: The next key in the dictionary\n",
" Note:\n",
" This is the same as `spellchecker.words()` \"\"\"\n",
" for key in self._dictionary.keys():\n",
" yield key\n",
"\n",
" def words(self):\n",
" \"\"\" Iterator over the words in the dictionary\n",
"\n",
" Yields:\n",
" str: The next word in the dictionary\n",
" Note:\n",
" This is the same as `spellchecker.keys()` \"\"\"\n",
" for word in self._dictionary.keys():\n",
" yield word\n",
"\n",
" def items(self):\n",
" \"\"\" Iterator over the words in the dictionary\n",
"\n",
" Yields:\n",
" str: The next word in the dictionary\n",
" int: The number of instances in the dictionary\n",
" Note:\n",
" This is the same as `dict.items()` \"\"\"\n",
" for word in self._dictionary.keys():\n",
" yield word, self._dictionary[word]\n",
"\n",
" def load_dictionary(self, filename, encoding=\"utf-8\"):\n",
" \"\"\" Load in a pre-built word frequency list\n",
"\n",
" Args:\n",
" filename (str): The filepath to the json (optionally gzipped) \\\n",
" file to be loaded\n",
" encoding (str): The encoding of the dictionary \"\"\"\n",
" with load_file(filename, encoding) as data:\n",
" data = data if self._case_sensitive else data.lower()\n",
" self._dictionary.update(json.loads(data))\n",
" self._update_dictionary()\n",
"\n",
" def load_json(self, data):\n",
" \"\"\" Load in a pre-built word frequency list\n",
"\n",
" Args:\n",
" data (dict): The dictionary to be loaded \"\"\"\n",
" self._dictionary.update(data)\n",
" self._update_dictionary()\n",
"\n",
" def load_text_file(self, filename, encoding=\"utf-8\", tokenizer=None):\n",
" \"\"\" Load in a text file from which to generate a word frequency list\n",
"\n",
" Args:\n",
" filename (str): The filepath to the text file to be loaded\n",
" encoding (str): The encoding of the text file\n",
" tokenizer (function): The function to use to tokenize a string\n",
" \"\"\"\n",
" with load_file(filename, encoding=encoding) as data:\n",
" self.load_text(data, tokenizer)\n",
"\n",
" def load_text(self, text, tokenizer=None):\n",
" \"\"\" Load text from which to generate a word frequency list\n",
"\n",
" Args:\n",
" text (str): The text to be loaded\n",
" tokenizer (function): The function to use to tokenize a string\n",
" \"\"\"\n",
" text = ensure_unicode(text)\n",
" if tokenizer:\n",
" words = [x if self._case_sensitive else x.lower() for x in tokenizer(text)]\n",
" else:\n",
" words = self.tokenize(text)\n",
"\n",
" self._dictionary.update(words)\n",
" self._update_dictionary()\n",
"\n",
" def load_words(self, words):\n",
" \"\"\" Load a list of words from which to generate a word frequency list\n",
"\n",
" Args:\n",
" words (list): The list of words to be loaded \"\"\"\n",
" words = [ensure_unicode(w) for w in words]\n",
" self._dictionary.update(\n",
" [word if self._case_sensitive else word.lower() for word in words]\n",
" )\n",
" self._update_dictionary()\n",
"\n",
" def add(self, word):\n",
" \"\"\" Add a word to the word frequency list\n",
"\n",
" Args:\n",
" word (str): The word to add \"\"\"\n",
" word = ensure_unicode(word)\n",
" self.load_words([word])\n",
"\n",
" def remove_words(self, words):\n",
" \"\"\" Remove a list of words from the word frequency list\n",
"\n",
" Args:\n",
" words (list): The list of words to remove \"\"\"\n",
" words = [ensure_unicode(w) for w in words]\n",
" for word in words:\n",
" self._dictionary.pop(word if self._case_sensitive else word.lower())\n",
" self._update_dictionary()\n",
"\n",
" def remove(self, word):\n",
" \"\"\" Remove a word from the word frequency list\n",
"\n",
" Args:\n",
" word (str): The word to remove \"\"\"\n",
" word = ensure_unicode(word)\n",
" self._dictionary.pop(word if self._case_sensitive else word.lower())\n",
" self._update_dictionary()\n",
"\n",
" def remove_by_threshold(self, threshold=5):\n",
" \"\"\" Remove all words at, or below, the provided threshold\n",
"\n",
" Args:\n",
" threshold (int): The threshold at which a word is to be \\\n",
" removed \"\"\"\n",
" keys = [x for x in self._dictionary.keys()]\n",
" for key in keys:\n",
" if self._dictionary[key] <= threshold:\n",
" self._dictionary.pop(key)\n",
" self._update_dictionary()\n",
"\n",
" def _update_dictionary(self):\n",
" \"\"\" Update the word frequency object \"\"\"\n",
" self._longest_word_length = 0\n",
" self._total_words = sum(self._dictionary.values())\n",
" self._unique_words = len(self._dictionary.keys())\n",
" self._letters = set()\n",
" for key in self._dictionary:\n",
" if len(key) > self._longest_word_length:\n",
" self._longest_word_length = len(key)\n",
" self._letters.update(key)\n",
" \n",
" \n",
"try:\n",
" with open(C1(),\n",
" mode=\"r+\") as f:\n",
" assert f.read() == \"hi!\\n\"\n",
"except:\n",
" pass\n"
]
},
{
"cell_type": "code",
"execution_count": 41,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"7.31 ms ± 164 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)\n"
]
}
],
"source": [
"%%timeit\n",
"\n",
"\n",
"\"\"\" Additional utility functions \"\"\"\n",
"import contextlib\n",
"import gzip\n",
"import functools\n",
"import re\n",
"import warnings\n",
"\n",
"\n",
"def fail_after(version):\n",
" \"\"\" Decorator to add to tests to ensure that they fail if a deprecated\n",
" feature is not removed before the specified version\n",
"\n",
" Args:\n",
" version (str): The version to check against \"\"\"\n",
"\n",
" def decorator_wrapper(func):\n",
" @functools.wraps(func)\n",
" def test_inner(*args, **kwargs):\n",
" if [int(x) for x in version.split(\".\")] <= [\n",
" int(x) for x in __version__.split(\".\")\n",
" ]:\n",
" msg = \"The function {} must be fully removed as it is depricated and must be removed by version {}\".format(\n",
" func.__name__, version\n",
" )\n",
" raise AssertionError(msg)\n",
" return func(*args, **kwargs)\n",
"\n",
" return test_inner\n",
"\n",
" return decorator_wrapper\n",
"\n",
"\n",
"def deprecated(message=\"\"):\n",
" \"\"\" A simplistic decorator to mark functions as deprecated. The function\n",
" will pass a message to the user on the first use of the function\n",
"\n",
" Args:\n",
" message (str): The message to display if the function is deprecated\n",
" \"\"\"\n",
"\n",
" def decorator_wrapper(func):\n",
" @functools.wraps(func)\n",
" def function_wrapper(*args, **kwargs):\n",
" func_name = func.__name__\n",
" if func_name not in function_wrapper.deprecated_items:\n",
" msg = \"Function {} is now deprecated! {}\".format(func.__name__, message)\n",
" warnings.warn(msg, category=DeprecationWarning, stacklevel=2)\n",
" function_wrapper.deprecated_items.add(func_name)\n",
"\n",
" return func(*args, **kwargs)\n",
"\n",
" # set this up the first time the decorator is called\n",
" function_wrapper.deprecated_items = set()\n",
"\n",
" return function_wrapper\n",
"\n",
" return decorator_wrapper\n",
"\n",
"\n",
"def ensure_unicode(_str, encoding=\"utf-8\"):\n",
" \"\"\" Simplify checking if passed in data are bytes or a string and decode\n",
" bytes into unicode.\n",
"\n",
" Args:\n",
" _str (str): The input string (possibly bytes)\n",
" encoding (str): The encoding to use if input is bytes\n",
" Returns:\n",
" str: The encoded string\n",
" \"\"\"\n",
" if isinstance(_str, bytes):\n",
" return _str.decode(encoding)\n",
" return _str\n",
"\n",
"\n",
"@contextlib.contextmanager\n",
"def __gzip_read(filename, mode=\"rb\", encoding=\"UTF-8\"):\n",
" \"\"\" Context manager to correctly handle the decoding of the output of \\\n",
" the gzip file\n",
"\n",
" Args:\n",
" filename (str): The filename to open\n",
" mode (str): The mode to read the data\n",
" encoding (str): The file encoding to use\n",
" Yields:\n",
" str: The string data from the gzip file read\n",
" \"\"\"\n",
" with gzip.open(filename, mode=mode, encoding=encoding) as fobj:\n",
" yield fobj.read()\n",
"\n",
"\n",
"@contextlib.contextmanager\n",
"def load_file(filename, encoding):\n",
" \"\"\" Context manager to handle opening a gzip or text file correctly and\n",
" reading all the data\n",
"\n",
" Args:\n",
" filename (str): The filename to open\n",
" encoding (str): The file encoding to use\n",
" Yields:\n",
" str: The string data from the file read\n",
" \"\"\"\n",
" if filename[-3:].lower() == \".gz\":\n",
" with __gzip_read(filename, mode=\"rt\", encoding=encoding) as data:\n",
" yield data\n",
" else:\n",
" with open(filename, mode=\"r\", encoding=encoding) as fobj:\n",
" yield fobj.read()\n",
"\n",
"\n",
"def write_file(filepath, encoding, gzipped, data):\n",
" \"\"\" Write the data to file either as a gzip file or text based on the\n",
" gzipped parameter\n",
"\n",
" Args:\n",
" filepath (str): The filename to open\n",
" encoding (str): The file encoding to use\n",
" gzipped (bool): Whether the file should be gzipped or not\n",
" data (str): The data to be written out\n",
" \"\"\"\n",
" if gzipped:\n",
" with gzip.open(filepath, \"wt\") as fobj:\n",
" fobj.write(data)\n",
" else:\n",
" with open(filepath, \"w\", encoding=encoding) as fobj:\n",
" fobj.write(data)\n",
"\n",
"\n",
"def _parse_into_words(text):\n",
" \"\"\" Parse the text into words; currently removes punctuation except for\n",
" apostrophies.\n",
"\n",
" Args:\n",
" text (str): The text to split into words\n",
" \"\"\"\n",
" # see: https://stackoverflow.com/a/12705513\n",
" return re.findall(r\"(\\w[\\w']*\\w|\\w)\", text)\n",
"\n",
"\n",
"\"\"\" SpellChecker Module; simple, intuitive spell checker based on the post by\n",
" Peter Norvig. See: https://norvig.com/spell-correct.html \"\"\"\n",
"import gzip\n",
"import json\n",
"import pkgutil\n",
"import string\n",
"from collections import Counter\n",
"\n",
"\n",
"class SpellChecker(object):\n",
" \"\"\" The SpellChecker class encapsulates the basics needed to accomplish a\n",
" simple spell checking algorithm. It is based on the work by\n",
" Peter Norvig (https://norvig.com/spell-correct.html)\n",
"\n",
" Args:\n",
" language (str): The language of the dictionary to load or None \\\n",
" for no dictionary. Supported languages are `en`, `es`, `de`, `fr`, \\\n",
" `pt` and `ru`. Defaults to `en`. A list of languages may be \\\n",
" provided and all languages will be loaded.\n",
" local_dictionary (str): The path to a locally stored word \\\n",
" frequency dictionary; if provided, no language will be loaded\n",
" distance (int): The edit distance to use. Defaults to 2.\n",
" case_sensitive (bool): Flag to use a case sensitive dictionary or \\\n",
" not, only available when not using a language dictionary.\n",
" Note:\n",
" Using a case sensitive dictionary can be slow to correct words.\"\"\"\n",
"\n",
" __slots__ = [\"_distance\", \"_word_frequency\", \"_tokenizer\", \"_case_sensitive\"]\n",
"\n",
" def __init__(\n",
" self,\n",
" language=\"en\",\n",
" local_dictionary=None,\n",
" distance=2,\n",
" tokenizer=None,\n",
" case_sensitive=False,\n",
" ):\n",
" self._distance = None\n",
" self.distance = distance # use the setter value check\n",
"\n",
" self._tokenizer = _parse_into_words\n",
" if tokenizer is not None:\n",
" self._tokenizer = tokenizer\n",
"\n",
" self._case_sensitive = case_sensitive if not language else False\n",
" self._word_frequency = WordFrequency(self._tokenizer, self._case_sensitive)\n",
"\n",
" if local_dictionary:\n",
" self._word_frequency.load_dictionary(local_dictionary)\n",
" elif language:\n",
" if not isinstance(language, list):\n",
" language = [language]\n",
" for lang in language:\n",
" filename = \"resources/{}.json.gz\".format(lang.lower())\n",
" try:\n",
" json_open = pkgutil.get_data(\"spellchecker\", filename)\n",
" except FileNotFoundError:\n",
" msg = (\n",
" \"The provided dictionary language ({}) does not \" \"exist!\"\n",
" ).format(lang.lower())\n",
" raise ValueError(msg)\n",
"\n",
" lang_dict = json.loads(gzip.decompress(json_open).decode(\"utf-8\"))\n",
" self._word_frequency.load_json(lang_dict)\n",
"\n",
" def __contains__(self, key):\n",
" \"\"\" setup easier known checks \"\"\"\n",
" key = ensure_unicode(key)\n",
" return key in self._word_frequency\n",
"\n",
" def __getitem__(self, key):\n",
" \"\"\" setup easier frequency checks \"\"\"\n",
" key = ensure_unicode(key)\n",
" return self._word_frequency[key]\n",
"\n",
" def __iter__(self):\n",
" \"\"\" setup iter support \"\"\"\n",
" for word in self._word_frequency.dictionary:\n",
" yield word\n",
"\n",
" @property\n",
" def word_frequency(self):\n",
" \"\"\" WordFrequency: An encapsulation of the word frequency `dictionary`\n",
"\n",
" Note:\n",
" Not settable \"\"\"\n",
" return self._word_frequency\n",
"\n",
" @property\n",
" def distance(self):\n",
" \"\"\" int: The maximum edit distance to calculate\n",
"\n",
" Note:\n",
" Valid values are 1 or 2; if an invalid value is passed, \\\n",
" defaults to 2 \"\"\"\n",
" return self._distance\n",
"\n",
" @distance.setter\n",
" def distance(self, val):\n",
" \"\"\" set the distance parameter \"\"\"\n",
" tmp = 2\n",
" try:\n",
" int(val)\n",
" if val > 0 and val <= 2:\n",
" tmp = val\n",
" except (ValueError, TypeError):\n",
" pass\n",
" self._distance = tmp\n",
"\n",
" def split_words(self, text):\n",
" \"\"\" Split text into individual `words` using either a simple whitespace\n",
" regex or the passed in tokenizer\n",
"\n",
" Args:\n",
" text (str): The text to split into individual words\n",
" Returns:\n",
" list(str): A listing of all words in the provided text \"\"\"\n",
" text = ensure_unicode(text)\n",
" return self._tokenizer(text)\n",
"\n",
" def export(self, filepath, encoding=\"utf-8\", gzipped=True):\n",
" \"\"\" Export the word frequency list for import in the future\n",
"\n",
" Args:\n",
" filepath (str): The filepath to the exported dictionary\n",
" encoding (str): The encoding of the resulting output\n",
" gzipped (bool): Whether to gzip the dictionary or not \"\"\"\n",
" data = json.dumps(self.word_frequency.dictionary, sort_keys=True)\n",
" write_file(filepath, encoding, gzipped, data)\n",
"\n",
" def word_usage_frequency(self, word, total_words=None):\n",
" \"\"\" Calculate the frequency to the `word` provided as seen across the\n",
" entire dictionary\n",
"\n",
" Args:\n",
" word (str): The word for which the word probability is \\\n",
" calculated\n",
" total_words (int): The total number of words to use in the \\\n",
" calculation; use the default for using the whole word \\\n",
" frequency\n",
" Returns:\n",
" float: The probability that the word is the correct word \"\"\"\n",
" if not total_words:\n",
" total_words = self._word_frequency.total_words\n",
" word = ensure_unicode(word)\n",
" return self._word_frequency.dictionary[word] / total_words\n",
"\n",
" @deprecated(\"Deprecated as of version 0.6.1; use word_usage_frequency instead\")\n",
" def word_probability(self, word, total_words=None):\n",
" \"\"\" Calculate the frequency to the `word` provided as seen across the\n",
" entire dictionary; function was a misnomar and is therefore\n",
" deprecated!\n",
"\n",
" Args:\n",
" word (str): The word for which the word probability is \\\n",
" calculated\n",
" total_words (int): The total number of words to use in the \\\n",
" calculation; use the default for using the whole word \\\n",
" frequency\n",
" Returns:\n",
" float: The probability that the word is the correct word\n",
" Note:\n",
" Deprecated as of version 0.6.1; use `word_usage_frequency` \\\n",
" instead\n",
" Note:\n",
" Will be removed in version 0.6.3 \"\"\"\n",
" return self.word_usage_frequency(word, total_words)\n",
"\n",
" def correction(self, word):\n",
" \"\"\" The most probable correct spelling for the word\n",
"\n",
" Args:\n",
" word (str): The word to correct\n",
" Returns:\n",
" str: The most likely candidate \"\"\"\n",
" word = ensure_unicode(word)\n",
" candidates = list(self.candidates(word))\n",
" return max(sorted(candidates), key=self.__getitem__)\n",
"\n",
" def candidates(self, word):\n",
" \"\"\" Generate possible spelling corrections for the provided word up to\n",
" an edit distance of two, if and only when needed\n",
"\n",
" Args:\n",
" word (str): The word for which to calculate candidate spellings\n",
" Returns:\n",
" set: The set of words that are possible candidates \"\"\"\n",
" word = ensure_unicode(word)\n",
" if self.known([word]): # short-cut if word is correct already\n",
" return {word}\n",
"\n",
" if not self._check_if_should_check(word):\n",
" return {word}\n",
"\n",
" # get edit distance 1...\n",
" res = [x for x in self.edit_distance_1(word)]\n",
" tmp = self.known(res)\n",
" if tmp:\n",
" return tmp\n",
" # if still not found, use the edit distance 1 to calc edit distance 2\n",
" if self._distance == 2:\n",
" tmp = self.known([x for x in self.__edit_distance_alt(res)])\n",
" if tmp:\n",
" return tmp\n",
" return {word}\n",
"\n",
" def known(self, words):\n",
" \"\"\" The subset of `words` that appear in the dictionary of words\n",
"\n",
" Args:\n",
" words (list): List of words to determine which are in the \\\n",
" corpus\n",
" Returns:\n",
" set: The set of those words from the input that are in the \\\n",
" corpus \"\"\"\n",
" words = [ensure_unicode(w) for w in words]\n",
" tmp = [w if self._case_sensitive else w.lower() for w in words]\n",
" return set(\n",
" w\n",
" for w in tmp\n",
" if w in self._word_frequency.dictionary and self._check_if_should_check(w)\n",
" )\n",
"\n",
" def unknown(self, words):\n",
" \"\"\" The subset of `words` that do not appear in the dictionary\n",
"\n",
" Args:\n",
" words (list): List of words to determine which are not in the \\\n",
" corpus\n",
" Returns:\n",
" set: The set of those words from the input that are not in \\\n",
" the corpus \"\"\"\n",
" words = [ensure_unicode(w) for w in words]\n",
" tmp = [\n",
" w if self._case_sensitive else w.lower()\n",
" for w in words\n",
" if self._check_if_should_check(w)\n",
" ]\n",
" return set(w for w in tmp if w not in self._word_frequency.dictionary)\n",
"\n",
" def edit_distance_1(self, word):\n",
" \"\"\" Compute all strings that are one edit away from `word` using only\n",
" the letters in the corpus\n",
"\n",
" Args:\n",
" word (str): The word for which to calculate the edit distance\n",
" Returns:\n",
" set: The set of strings that are edit distance one from the \\\n",
" provided word \"\"\"\n",
" word = (\n",
" ensure_unicode(word).lower()\n",
" if not self._case_sensitive\n",
" else ensure_unicode(word)\n",
" )\n",
" if self._check_if_should_check(word) is False:\n",
" return {word}\n",
" letters = self._word_frequency.letters\n",
" splits = [(word[:i], word[i:]) for i in range(len(word) + 1)]\n",
" deletes = [L + R[1:] for L, R in splits if R]\n",
" transposes = [L + R[1] + R[0] + R[2:] for L, R in splits if len(R) > 1]\n",
" replaces = [L + c + R[1:] for L, R in splits if R for c in letters]\n",
" inserts = [L + c + R for L, R in splits for c in letters]\n",
" return set(deletes + transposes + replaces + inserts)\n",
"\n",
" def edit_distance_2(self, word):\n",
" \"\"\" Compute all strings that are two edits away from `word` using only\n",
" the letters in the corpus\n",
"\n",
" Args:\n",
" word (str): The word for which to calculate the edit distance\n",
" Returns:\n",
" set: The set of strings that are edit distance two from the \\\n",
" provided word \"\"\"\n",
" word = (\n",
" ensure_unicode(word).lower()\n",
" if not self._case_sensitive\n",
" else ensure_unicode(word)\n",
" )\n",
" return [\n",
" e2 for e1 in self.edit_distance_1(word) for e2 in self.edit_distance_1(e1)\n",
" ]\n",
"\n",
" def __edit_distance_alt(self, words):\n",
" \"\"\" Compute all strings that are 1 edits away from all the words using\n",
" only the letters in the corpus\n",
"\n",
" Args:\n",
" words (list): The words for which to calculate the edit distance\n",
" Returns:\n",
" set: The set of strings that are edit distance two from the \\\n",
" provided words \"\"\"\n",
" words = [ensure_unicode(w) for w in words]\n",
" tmp = [\n",
" w if self._case_sensitive else w.lower()\n",
" for w in words\n",
" if self._check_if_should_check(w)\n",
" ]\n",
" return [e2 for e1 in tmp for e2 in self.known(self.edit_distance_1(e1))]\n",
"\n",
" def _check_if_should_check(self, word):\n",
" if len(word) == 1 and word in string.punctuation:\n",
" return False\n",
" if (\n",
" len(word) > self._word_frequency.longest_word_length + 3\n",
" ): # magic number to allow removal of up to 2 letters.\n",
" return False\n",
" try: # check if it is a number (int, float, etc)\n",
" float(word)\n",
" return False\n",
" except ValueError:\n",
" pass\n",
"\n",
" return True\n",
"\n",
"\n",
"class WordFrequency(object):\n",
" \"\"\" Store the `dictionary` as a word frequency list while allowing for\n",
" different methods to load the data and update over time \"\"\"\n",
"\n",
" __slots__ = [\n",
" \"_dictionary\",\n",
" \"_total_words\",\n",
" \"_unique_words\",\n",
" \"_letters\",\n",
" \"_tokenizer\",\n",
" \"_case_sensitive\",\n",
" \"_longest_word_length\",\n",
" ]\n",
"\n",
" def __init__(self, tokenizer=None, case_sensitive=False):\n",
" self._dictionary = Counter()\n",
" self._total_words = 0\n",
" self._unique_words = 0\n",
" self._letters = set()\n",
" self._case_sensitive = case_sensitive\n",
" self._longest_word_length = 0\n",
"\n",
" self._tokenizer = _parse_into_words\n",
" if tokenizer is not None:\n",
" self._tokenizer = tokenizer\n",
"\n",
" def __contains__(self, key):\n",
" \"\"\" turn on contains \"\"\"\n",
" key = ensure_unicode(key)\n",
" key = key if self._case_sensitive else key.lower()\n",
" return key in self._dictionary\n",
"\n",
" def __getitem__(self, key):\n",
" \"\"\" turn on getitem \"\"\"\n",
" key = ensure_unicode(key)\n",
" key = key if self._case_sensitive else key.lower()\n",
" return self._dictionary[key]\n",
"\n",
" def __iter__(self):\n",
" \"\"\" turn on iter support \"\"\"\n",
" for word in self._dictionary:\n",
" yield word\n",
"\n",
" def pop(self, key, default=None):\n",
" \"\"\" Remove the key and return the associated value or default if not\n",
" found\n",
"\n",
" Args:\n",
" key (str): The key to remove\n",
" default (obj): The value to return if key is not present \"\"\"\n",
" key = ensure_unicode(key)\n",
" key = key if self._case_sensitive else key.lower()\n",
" return self._dictionary.pop(key, default)\n",
"\n",
" @property\n",
" def dictionary(self):\n",
" \"\"\" Counter: A counting dictionary of all words in the corpus and the \\\n",
" number of times each has been seen\n",
"\n",
" Note:\n",
" Not settable \"\"\"\n",
" return self._dictionary\n",
"\n",
" @property\n",
" def total_words(self):\n",
" \"\"\" int: The sum of all word occurances in the word frequency \\\n",
" dictionary\n",
"\n",
" Note:\n",
" Not settable \"\"\"\n",
" return self._total_words\n",
"\n",
" @property\n",
" def unique_words(self):\n",
" \"\"\" int: The total number of unique words in the word frequency list\n",
"\n",
" Note:\n",
" Not settable \"\"\"\n",
" return self._unique_words\n",
"\n",
" @property\n",
" def letters(self):\n",
" \"\"\" str: The listing of all letters found within the corpus\n",
"\n",
" Note:\n",
" Not settable \"\"\"\n",
" return self._letters\n",
"\n",
" @property\n",
" def longest_word_length(self):\n",
" \"\"\" int: The longest word length in the dictionary\n",
"\n",
" Note:\n",
" Not settable \"\"\"\n",
" return self._longest_word_length\n",
"\n",
" def tokenize(self, text):\n",
" \"\"\" Tokenize the provided string object into individual words\n",
"\n",
" Args:\n",
" text (str): The string object to tokenize\n",
" Yields:\n",
" str: The next `word` in the tokenized string\n",
" Note:\n",
" This is the same as the `spellchecker.split_words()` unless \\\n",
" a tokenizer function was provided. \"\"\"\n",
" text = ensure_unicode(text)\n",
" for word in self._tokenizer(text):\n",
" yield word if self._case_sensitive else word.lower()\n",
"\n",
" def keys(self):\n",
" \"\"\" Iterator over the key of the dictionary\n",
"\n",
" Yields:\n",
" str: The next key in the dictionary\n",
" Note:\n",
" This is the same as `spellchecker.words()` \"\"\"\n",
" for key in self._dictionary.keys():\n",
" yield key\n",
"\n",
" def words(self):\n",
" \"\"\" Iterator over the words in the dictionary\n",
"\n",
" Yields:\n",
" str: The next word in the dictionary\n",
" Note:\n",
" This is the same as `spellchecker.keys()` \"\"\"\n",
" for word in self._dictionary.keys():\n",
" yield word\n",
"\n",
" def items(self):\n",
" \"\"\" Iterator over the words in the dictionary\n",
"\n",
" Yields:\n",
" str: The next word in the dictionary\n",
" int: The number of instances in the dictionary\n",
" Note:\n",
" This is the same as `dict.items()` \"\"\"\n",
" for word in self._dictionary.keys():\n",
" yield word, self._dictionary[word]\n",
"\n",
" def load_dictionary(self, filename, encoding=\"utf-8\"):\n",
" \"\"\" Load in a pre-built word frequency list\n",
"\n",
" Args:\n",
" filename (str): The filepath to the json (optionally gzipped) \\\n",
" file to be loaded\n",
" encoding (str): The encoding of the dictionary \"\"\"\n",
" with load_file(filename, encoding) as data:\n",
" data = data if self._case_sensitive else data.lower()\n",
" self._dictionary.update(json.loads(data))\n",
" self._update_dictionary()\n",
"\n",
" def load_json(self, data):\n",
" \"\"\" Load in a pre-built word frequency list\n",
"\n",
" Args:\n",
" data (dict): The dictionary to be loaded \"\"\"\n",
" self._dictionary.update(data)\n",
" self._update_dictionary()\n",
"\n",
" def load_text_file(self, filename, encoding=\"utf-8\", tokenizer=None):\n",
" \"\"\" Load in a text file from which to generate a word frequency list\n",
"\n",
" Args:\n",
" filename (str): The filepath to the text file to be loaded\n",
" encoding (str): The encoding of the text file\n",
" tokenizer (function): The function to use to tokenize a string\n",
" \"\"\"\n",
" with load_file(filename, encoding=encoding) as data:\n",
" self.load_text(data, tokenizer)\n",
"\n",
" def load_text(self, text, tokenizer=None):\n",
" \"\"\" Load text from which to generate a word frequency list\n",
"\n",
" Args:\n",
" text (str): The text to be loaded\n",
" tokenizer (function): The function to use to tokenize a string\n",
" \"\"\"\n",
" text = ensure_unicode(text)\n",
" if tokenizer:\n",
" words = [x if self._case_sensitive else x.lower() for x in tokenizer(text)]\n",
" else:\n",
" words = self.tokenize(text)\n",
"\n",
" self._dictionary.update(words)\n",
" self._update_dictionary()\n",
"\n",
" def load_words(self, words):\n",
" \"\"\" Load a list of words from which to generate a word frequency list\n",
"\n",
" Args:\n",
" words (list): The list of words to be loaded \"\"\"\n",
" words = [ensure_unicode(w) for w in words]\n",
" self._dictionary.update(\n",
" [word if self._case_sensitive else word.lower() for word in words]\n",
" )\n",
" self._update_dictionary()\n",
"\n",
" def add(self, word):\n",
" \"\"\" Add a word to the word frequency list\n",
"\n",
" Args:\n",
" word (str): The word to add \"\"\"\n",
" word = ensure_unicode(word)\n",
" self.load_words([word])\n",
"\n",
" def remove_words(self, words):\n",
" \"\"\" Remove a list of words from the word frequency list\n",
"\n",
" Args:\n",
" words (list): The list of words to remove \"\"\"\n",
" words = [ensure_unicode(w) for w in words]\n",
" for word in words:\n",
" self._dictionary.pop(word if self._case_sensitive else word.lower())\n",
" self._update_dictionary()\n",
"\n",
" def remove(self, word):\n",
" \"\"\" Remove a word from the word frequency list\n",
"\n",
" Args:\n",
" word (str): The word to remove \"\"\"\n",
" word = ensure_unicode(word)\n",
" self._dictionary.pop(word if self._case_sensitive else word.lower())\n",
" self._update_dictionary()\n",
"\n",
" def remove_by_threshold(self, threshold=5):\n",
" \"\"\" Remove all words at, or below, the provided threshold\n",
"\n",
" Args:\n",
" threshold (int): The threshold at which a word is to be \\\n",
" removed \"\"\"\n",
" keys = [x for x in self._dictionary.keys()]\n",
" for key in keys:\n",
" if self._dictionary[key] <= threshold:\n",
" self._dictionary.pop(key)\n",
" self._update_dictionary()\n",
"\n",
" def _update_dictionary(self):\n",
" \"\"\" Update the word frequency object \"\"\"\n",
" self._longest_word_length = 0\n",
" self._total_words = sum(self._dictionary.values())\n",
" self._unique_words = len(self._dictionary.keys())\n",
" self._letters = set()\n",
" for key in self._dictionary:\n",
" if len(key) > self._longest_word_length:\n",
" self._longest_word_length = len(key)\n",
" self._letters.update(key)\n",
"\n",
"\n",
"try:\n",
" with open(C2(),\n",
" mode=\"r+\") as f:\n",
" assert f.read() == \"hi!\\n\"\n",
"except:\n",
" pass\n"
]
},
{
"cell_type": "code",
"execution_count": 42,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"936 µs ± 12.9 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)\n"
]
}
],
"source": [
"%%timeit\n",
"\n",
"\n",
"\"\"\" Additional utility functions \"\"\"\n",
"import contextlib\n",
"import gzip\n",
"import functools\n",
"import re\n",
"import warnings\n",
"\n",
"\n",
"def fail_after(version):\n",
" \"\"\" Decorator to add to tests to ensure that they fail if a deprecated\n",
" feature is not removed before the specified version\n",
"\n",
" Args:\n",
" version (str): The version to check against \"\"\"\n",
"\n",
" def decorator_wrapper(func):\n",
" @functools.wraps(func)\n",
" def test_inner(*args, **kwargs):\n",
" if [int(x) for x in version.split(\".\")] <= [\n",
" int(x) for x in __version__.split(\".\")\n",
" ]:\n",
" msg = \"The function {} must be fully removed as it is depricated and must be removed by version {}\".format(\n",
" func.__name__, version\n",
" )\n",
" raise AssertionError(msg)\n",
" return func(*args, **kwargs)\n",
"\n",
" return test_inner\n",
"\n",
" return decorator_wrapper\n",
"\n",
"\n",
"def deprecated(message=\"\"):\n",
" \"\"\" A simplistic decorator to mark functions as deprecated. The function\n",
" will pass a message to the user on the first use of the function\n",
"\n",
" Args:\n",
" message (str): The message to display if the function is deprecated\n",
" \"\"\"\n",
"\n",
" def decorator_wrapper(func):\n",
" @functools.wraps(func)\n",
" def function_wrapper(*args, **kwargs):\n",
" func_name = func.__name__\n",
" if func_name not in function_wrapper.deprecated_items:\n",
" msg = \"Function {} is now deprecated! {}\".format(func.__name__, message)\n",
" warnings.warn(msg, category=DeprecationWarning, stacklevel=2)\n",
" function_wrapper.deprecated_items.add(func_name)\n",
"\n",
" return func(*args, **kwargs)\n",
"\n",
" # set this up the first time the decorator is called\n",
" function_wrapper.deprecated_items = set()\n",
"\n",
" return function_wrapper\n",
"\n",
" return decorator_wrapper\n",
"\n",
"\n",
"def ensure_unicode(_str, encoding=\"utf-8\"):\n",
" \"\"\" Simplify checking if passed in data are bytes or a string and decode\n",
" bytes into unicode.\n",
"\n",
" Args:\n",
" _str (str): The input string (possibly bytes)\n",
" encoding (str): The encoding to use if input is bytes\n",
" Returns:\n",
" str: The encoded string\n",
" \"\"\"\n",
" if isinstance(_str, bytes):\n",
" return _str.decode(encoding)\n",
" return _str\n",
"\n",
"\n",
"@contextlib.contextmanager\n",
"def __gzip_read(filename, mode=\"rb\", encoding=\"UTF-8\"):\n",
" \"\"\" Context manager to correctly handle the decoding of the output of \\\n",
" the gzip file\n",
"\n",
" Args:\n",
" filename (str): The filename to open\n",
" mode (str): The mode to read the data\n",
" encoding (str): The file encoding to use\n",
" Yields:\n",
" str: The string data from the gzip file read\n",
" \"\"\"\n",
" with gzip.open(filename, mode=mode, encoding=encoding) as fobj:\n",
" yield fobj.read()\n",
"\n",
"\n",
"@contextlib.contextmanager\n",
"def load_file(filename, encoding):\n",
" \"\"\" Context manager to handle opening a gzip or text file correctly and\n",
" reading all the data\n",
"\n",
" Args:\n",
" filename (str): The filename to open\n",
" encoding (str): The file encoding to use\n",
" Yields:\n",
" str: The string data from the file read\n",
" \"\"\"\n",
" if filename[-3:].lower() == \".gz\":\n",
" with __gzip_read(filename, mode=\"rt\", encoding=encoding) as data:\n",
" yield data\n",
" else:\n",
" with open(filename, mode=\"r\", encoding=encoding) as fobj:\n",
" yield fobj.read()\n",
"\n",
"\n",
"def write_file(filepath, encoding, gzipped, data):\n",
" \"\"\" Write the data to file either as a gzip file or text based on the\n",
" gzipped parameter\n",
"\n",
" Args:\n",
" filepath (str): The filename to open\n",
" encoding (str): The file encoding to use\n",
" gzipped (bool): Whether the file should be gzipped or not\n",
" data (str): The data to be written out\n",
" \"\"\"\n",
" if gzipped:\n",
" with gzip.open(filepath, \"wt\") as fobj:\n",
" fobj.write(data)\n",
" else:\n",
" with open(filepath, \"w\", encoding=encoding) as fobj:\n",
" fobj.write(data)\n",
"\n",
"\n",
"def _parse_into_words(text):\n",
" \"\"\" Parse the text into words; currently removes punctuation except for\n",
" apostrophies.\n",
"\n",
" Args:\n",
" text (str): The text to split into words\n",
" \"\"\"\n",
" # see: https://stackoverflow.com/a/12705513\n",
" return re.findall(r\"(\\w[\\w']*\\w|\\w)\", text)\n",
"\n",
"\n",
"\"\"\" SpellChecker Module; simple, intuitive spell checker based on the post by\n",
" Peter Norvig. See: https://norvig.com/spell-correct.html \"\"\"\n",
"import gzip\n",
"import json\n",
"import pkgutil\n",
"import string\n",
"from collections import Counter\n",
"\n",
"\n",
"class SpellChecker(object):\n",
" \"\"\" The SpellChecker class encapsulates the basics needed to accomplish a\n",
" simple spell checking algorithm. It is based on the work by\n",
" Peter Norvig (https://norvig.com/spell-correct.html)\n",
"\n",
" Args:\n",
" language (str): The language of the dictionary to load or None \\\n",
" for no dictionary. Supported languages are `en`, `es`, `de`, `fr`, \\\n",
" `pt` and `ru`. Defaults to `en`. A list of languages may be \\\n",
" provided and all languages will be loaded.\n",
" local_dictionary (str): The path to a locally stored word \\\n",
" frequency dictionary; if provided, no language will be loaded\n",
" distance (int): The edit distance to use. Defaults to 2.\n",
" case_sensitive (bool): Flag to use a case sensitive dictionary or \\\n",
" not, only available when not using a language dictionary.\n",
" Note:\n",
" Using a case sensitive dictionary can be slow to correct words.\"\"\"\n",
"\n",
" __slots__ = [\"_distance\", \"_word_frequency\", \"_tokenizer\", \"_case_sensitive\"]\n",
"\n",
" def __init__(\n",
" self,\n",
" language=\"en\",\n",
" local_dictionary=None,\n",
" distance=2,\n",
" tokenizer=None,\n",
" case_sensitive=False,\n",
" ):\n",
" self._distance = None\n",
" self.distance = distance # use the setter value check\n",
"\n",
" self._tokenizer = _parse_into_words\n",
" if tokenizer is not None:\n",
" self._tokenizer = tokenizer\n",
"\n",
" self._case_sensitive = case_sensitive if not language else False\n",
" self._word_frequency = WordFrequency(self._tokenizer, self._case_sensitive)\n",
"\n",
" if local_dictionary:\n",
" self._word_frequency.load_dictionary(local_dictionary)\n",
" elif language:\n",
" if not isinstance(language, list):\n",
" language = [language]\n",
" for lang in language:\n",
" filename = \"resources/{}.json.gz\".format(lang.lower())\n",
" try:\n",
" json_open = pkgutil.get_data(\"spellchecker\", filename)\n",
" except FileNotFoundError:\n",
" msg = (\n",
" \"The provided dictionary language ({}) does not \" \"exist!\"\n",
" ).format(lang.lower())\n",
" raise ValueError(msg)\n",
"\n",
" lang_dict = json.loads(gzip.decompress(json_open).decode(\"utf-8\"))\n",
" self._word_frequency.load_json(lang_dict)\n",
"\n",
" def __contains__(self, key):\n",
" \"\"\" setup easier known checks \"\"\"\n",
" key = ensure_unicode(key)\n",
" return key in self._word_frequency\n",
"\n",
" def __getitem__(self, key):\n",
" \"\"\" setup easier frequency checks \"\"\"\n",
" key = ensure_unicode(key)\n",
" return self._word_frequency[key]\n",
"\n",
" def __iter__(self):\n",
" \"\"\" setup iter support \"\"\"\n",
" for word in self._word_frequency.dictionary:\n",
" yield word\n",
"\n",
" @property\n",
" def word_frequency(self):\n",
" \"\"\" WordFrequency: An encapsulation of the word frequency `dictionary`\n",
"\n",
" Note:\n",
" Not settable \"\"\"\n",
" return self._word_frequency\n",
"\n",
" @property\n",
" def distance(self):\n",
" \"\"\" int: The maximum edit distance to calculate\n",
"\n",
" Note:\n",
" Valid values are 1 or 2; if an invalid value is passed, \\\n",
" defaults to 2 \"\"\"\n",
" return self._distance\n",
"\n",
" @distance.setter\n",
" def distance(self, val):\n",
" \"\"\" set the distance parameter \"\"\"\n",
" tmp = 2\n",
" try:\n",
" int(val)\n",
" if val > 0 and val <= 2:\n",
" tmp = val\n",
" except (ValueError, TypeError):\n",
" pass\n",
" self._distance = tmp\n",
"\n",
" def split_words(self, text):\n",
" \"\"\" Split text into individual `words` using either a simple whitespace\n",
" regex or the passed in tokenizer\n",
"\n",
" Args:\n",
" text (str): The text to split into individual words\n",
" Returns:\n",
" list(str): A listing of all words in the provided text \"\"\"\n",
" text = ensure_unicode(text)\n",
" return self._tokenizer(text)\n",
"\n",
" def export(self, filepath, encoding=\"utf-8\", gzipped=True):\n",
" \"\"\" Export the word frequency list for import in the future\n",
"\n",
" Args:\n",
" filepath (str): The filepath to the exported dictionary\n",
" encoding (str): The encoding of the resulting output\n",
" gzipped (bool): Whether to gzip the dictionary or not \"\"\"\n",
" data = json.dumps(self.word_frequency.dictionary, sort_keys=True)\n",
" write_file(filepath, encoding, gzipped, data)\n",
"\n",
" def word_usage_frequency(self, word, total_words=None):\n",
" \"\"\" Calculate the frequency to the `word` provided as seen across the\n",
" entire dictionary\n",
"\n",
" Args:\n",
" word (str): The word for which the word probability is \\\n",
" calculated\n",
" total_words (int): The total number of words to use in the \\\n",
" calculation; use the default for using the whole word \\\n",
" frequency\n",
" Returns:\n",
" float: The probability that the word is the correct word \"\"\"\n",
" if not total_words:\n",
" total_words = self._word_frequency.total_words\n",
" word = ensure_unicode(word)\n",
" return self._word_frequency.dictionary[word] / total_words\n",
"\n",
" @deprecated(\"Deprecated as of version 0.6.1; use word_usage_frequency instead\")\n",
" def word_probability(self, word, total_words=None):\n",
" \"\"\" Calculate the frequency to the `word` provided as seen across the\n",
" entire dictionary; function was a misnomar and is therefore\n",
" deprecated!\n",
"\n",
" Args:\n",
" word (str): The word for which the word probability is \\\n",
" calculated\n",
" total_words (int): The total number of words to use in the \\\n",
" calculation; use the default for using the whole word \\\n",
" frequency\n",
" Returns:\n",
" float: The probability that the word is the correct word\n",
" Note:\n",
" Deprecated as of version 0.6.1; use `word_usage_frequency` \\\n",
" instead\n",
" Note:\n",
" Will be removed in version 0.6.3 \"\"\"\n",
" return self.word_usage_frequency(word, total_words)\n",
"\n",
" def correction(self, word):\n",
" \"\"\" The most probable correct spelling for the word\n",
"\n",
" Args:\n",
" word (str): The word to correct\n",
" Returns:\n",
" str: The most likely candidate \"\"\"\n",
" word = ensure_unicode(word)\n",
" candidates = list(self.candidates(word))\n",
" return max(sorted(candidates), key=self.__getitem__)\n",
"\n",
" def candidates(self, word):\n",
" \"\"\" Generate possible spelling corrections for the provided word up to\n",
" an edit distance of two, if and only when needed\n",
"\n",
" Args:\n",
" word (str): The word for which to calculate candidate spellings\n",
" Returns:\n",
" set: The set of words that are possible candidates \"\"\"\n",
" word = ensure_unicode(word)\n",
" if self.known([word]): # short-cut if word is correct already\n",
" return {word}\n",
"\n",
" if not self._check_if_should_check(word):\n",
" return {word}\n",
"\n",
" # get edit distance 1...\n",
" res = [x for x in self.edit_distance_1(word)]\n",
" tmp = self.known(res)\n",
" if tmp:\n",
" return tmp\n",
" # if still not found, use the edit distance 1 to calc edit distance 2\n",
" if self._distance == 2:\n",
" tmp = self.known([x for x in self.__edit_distance_alt(res)])\n",
" if tmp:\n",
" return tmp\n",
" return {word}\n",
"\n",
" def known(self, words):\n",
" \"\"\" The subset of `words` that appear in the dictionary of words\n",
"\n",
" Args:\n",
" words (list): List of words to determine which are in the \\\n",
" corpus\n",
" Returns:\n",
" set: The set of those words from the input that are in the \\\n",
" corpus \"\"\"\n",
" words = [ensure_unicode(w) for w in words]\n",
" tmp = [w if self._case_sensitive else w.lower() for w in words]\n",
" return set(\n",
" w\n",
" for w in tmp\n",
" if w in self._word_frequency.dictionary and self._check_if_should_check(w)\n",
" )\n",
"\n",
" def unknown(self, words):\n",
" \"\"\" The subset of `words` that do not appear in the dictionary\n",
"\n",
" Args:\n",
" words (list): List of words to determine which are not in the \\\n",
" corpus\n",
" Returns:\n",
" set: The set of those words from the input that are not in \\\n",
" the corpus \"\"\"\n",
" words = [ensure_unicode(w) for w in words]\n",
" tmp = [\n",
" w if self._case_sensitive else w.lower()\n",
" for w in words\n",
" if self._check_if_should_check(w)\n",
" ]\n",
" return set(w for w in tmp if w not in self._word_frequency.dictionary)\n",
"\n",
" def edit_distance_1(self, word):\n",
" \"\"\" Compute all strings that are one edit away from `word` using only\n",
" the letters in the corpus\n",
"\n",
" Args:\n",
" word (str): The word for which to calculate the edit distance\n",
" Returns:\n",
" set: The set of strings that are edit distance one from the \\\n",
" provided word \"\"\"\n",
" word = (\n",
" ensure_unicode(word).lower()\n",
" if not self._case_sensitive\n",
" else ensure_unicode(word)\n",
" )\n",
" if self._check_if_should_check(word) is False:\n",
" return {word}\n",
" letters = self._word_frequency.letters\n",
" splits = [(word[:i], word[i:]) for i in range(len(word) + 1)]\n",
" deletes = [L + R[1:] for L, R in splits if R]\n",
" transposes = [L + R[1] + R[0] + R[2:] for L, R in splits if len(R) > 1]\n",
" replaces = [L + c + R[1:] for L, R in splits if R for c in letters]\n",
" inserts = [L + c + R for L, R in splits for c in letters]\n",
" return set(deletes + transposes + replaces + inserts)\n",
"\n",
" def edit_distance_2(self, word):\n",
" \"\"\" Compute all strings that are two edits away from `word` using only\n",
" the letters in the corpus\n",
"\n",
" Args:\n",
" word (str): The word for which to calculate the edit distance\n",
" Returns:\n",
" set: The set of strings that are edit distance two from the \\\n",
" provided word \"\"\"\n",
" word = (\n",
" ensure_unicode(word).lower()\n",
" if not self._case_sensitive\n",
" else ensure_unicode(word)\n",
" )\n",
" return [\n",
" e2 for e1 in self.edit_distance_1(word) for e2 in self.edit_distance_1(e1)\n",
" ]\n",
"\n",
" def __edit_distance_alt(self, words):\n",
" \"\"\" Compute all strings that are 1 edits away from all the words using\n",
" only the letters in the corpus\n",
"\n",
" Args:\n",
" words (list): The words for which to calculate the edit distance\n",
" Returns:\n",
" set: The set of strings that are edit distance two from the \\\n",
" provided words \"\"\"\n",
" words = [ensure_unicode(w) for w in words]\n",
" tmp = [\n",
" w if self._case_sensitive else w.lower()\n",
" for w in words\n",
" if self._check_if_should_check(w)\n",
" ]\n",
" return [e2 for e1 in tmp for e2 in self.known(self.edit_distance_1(e1))]\n",
"\n",
" def _check_if_should_check(self, word):\n",
" if len(word) == 1 and word in string.punctuation:\n",
" return False\n",
" if (\n",
" len(word) > self._word_frequency.longest_word_length + 3\n",
" ): # magic number to allow removal of up to 2 letters.\n",
" return False\n",
" try: # check if it is a number (int, float, etc)\n",
" float(word)\n",
" return False\n",
" except ValueError:\n",
" pass\n",
"\n",
" return True\n",
"\n",
"\n",
"class WordFrequency(object):\n",
" \"\"\" Store the `dictionary` as a word frequency list while allowing for\n",
" different methods to load the data and update over time \"\"\"\n",
"\n",
" __slots__ = [\n",
" \"_dictionary\",\n",
" \"_total_words\",\n",
" \"_unique_words\",\n",
" \"_letters\",\n",
" \"_tokenizer\",\n",
" \"_case_sensitive\",\n",
" \"_longest_word_length\",\n",
" ]\n",
"\n",
" def __init__(self, tokenizer=None, case_sensitive=False):\n",
" self._dictionary = Counter()\n",
" self._total_words = 0\n",
" self._unique_words = 0\n",
" self._letters = set()\n",
" self._case_sensitive = case_sensitive\n",
" self._longest_word_length = 0\n",
"\n",
" self._tokenizer = _parse_into_words\n",
" if tokenizer is not None:\n",
" self._tokenizer = tokenizer\n",
"\n",
" def __contains__(self, key):\n",
" \"\"\" turn on contains \"\"\"\n",
" key = ensure_unicode(key)\n",
" key = key if self._case_sensitive else key.lower()\n",
" return key in self._dictionary\n",
"\n",
" def __getitem__(self, key):\n",
" \"\"\" turn on getitem \"\"\"\n",
" key = ensure_unicode(key)\n",
" key = key if self._case_sensitive else key.lower()\n",
" return self._dictionary[key]\n",
"\n",
" def __iter__(self):\n",
" \"\"\" turn on iter support \"\"\"\n",
" for word in self._dictionary:\n",
" yield word\n",
"\n",
" def pop(self, key, default=None):\n",
" \"\"\" Remove the key and return the associated value or default if not\n",
" found\n",
"\n",
" Args:\n",
" key (str): The key to remove\n",
" default (obj): The value to return if key is not present \"\"\"\n",
" key = ensure_unicode(key)\n",
" key = key if self._case_sensitive else key.lower()\n",
" return self._dictionary.pop(key, default)\n",
"\n",
" @property\n",
" def dictionary(self):\n",
" \"\"\" Counter: A counting dictionary of all words in the corpus and the \\\n",
" number of times each has been seen\n",
"\n",
" Note:\n",
" Not settable \"\"\"\n",
" return self._dictionary\n",
"\n",
" @property\n",
" def total_words(self):\n",
" \"\"\" int: The sum of all word occurances in the word frequency \\\n",
" dictionary\n",
"\n",
" Note:\n",
" Not settable \"\"\"\n",
" return self._total_words\n",
"\n",
" @property\n",
" def unique_words(self):\n",
" \"\"\" int: The total number of unique words in the word frequency list\n",
"\n",
" Note:\n",
" Not settable \"\"\"\n",
" return self._unique_words\n",
"\n",
" @property\n",
" def letters(self):\n",
" \"\"\" str: The listing of all letters found within the corpus\n",
"\n",
" Note:\n",
" Not settable \"\"\"\n",
" return self._letters\n",
"\n",
" @property\n",
" def longest_word_length(self):\n",
" \"\"\" int: The longest word length in the dictionary\n",
"\n",
" Note:\n",
" Not settable \"\"\"\n",
" return self._longest_word_length\n",
"\n",
" def tokenize(self, text):\n",
" \"\"\" Tokenize the provided string object into individual words\n",
"\n",
" Args:\n",
" text (str): The string object to tokenize\n",
" Yields:\n",
" str: The next `word` in the tokenized string\n",
" Note:\n",
" This is the same as the `spellchecker.split_words()` unless \\\n",
" a tokenizer function was provided. \"\"\"\n",
" text = ensure_unicode(text)\n",
" for word in self._tokenizer(text):\n",
" yield word if self._case_sensitive else word.lower()\n",
"\n",
" def keys(self):\n",
" \"\"\" Iterator over the key of the dictionary\n",
"\n",
" Yields:\n",
" str: The next key in the dictionary\n",
" Note:\n",
" This is the same as `spellchecker.words()` \"\"\"\n",
" for key in self._dictionary.keys():\n",
" yield key\n",
"\n",
" def words(self):\n",
" \"\"\" Iterator over the words in the dictionary\n",
"\n",
" Yields:\n",
" str: The next word in the dictionary\n",
" Note:\n",
" This is the same as `spellchecker.keys()` \"\"\"\n",
" for word in self._dictionary.keys():\n",
" yield word\n",
"\n",
" def items(self):\n",
" \"\"\" Iterator over the words in the dictionary\n",
"\n",
" Yields:\n",
" str: The next word in the dictionary\n",
" int: The number of instances in the dictionary\n",
" Note:\n",
" This is the same as `dict.items()` \"\"\"\n",
" for word in self._dictionary.keys():\n",
" yield word, self._dictionary[word]\n",
"\n",
" def load_dictionary(self, filename, encoding=\"utf-8\"):\n",
" \"\"\" Load in a pre-built word frequency list\n",
"\n",
" Args:\n",
" filename (str): The filepath to the json (optionally gzipped) \\\n",
" file to be loaded\n",
" encoding (str): The encoding of the dictionary \"\"\"\n",
" with load_file(filename, encoding) as data:\n",
" data = data if self._case_sensitive else data.lower()\n",
" self._dictionary.update(json.loads(data))\n",
" self._update_dictionary()\n",
"\n",
" def load_json(self, data):\n",
" \"\"\" Load in a pre-built word frequency list\n",
"\n",
" Args:\n",
" data (dict): The dictionary to be loaded \"\"\"\n",
" self._dictionary.update(data)\n",
" self._update_dictionary()\n",
"\n",
" def load_text_file(self, filename, encoding=\"utf-8\", tokenizer=None):\n",
" \"\"\" Load in a text file from which to generate a word frequency list\n",
"\n",
" Args:\n",
" filename (str): The filepath to the text file to be loaded\n",
" encoding (str): The encoding of the text file\n",
" tokenizer (function): The function to use to tokenize a string\n",
" \"\"\"\n",
" with load_file(filename, encoding=encoding) as data:\n",
" self.load_text(data, tokenizer)\n",
"\n",
" def load_text(self, text, tokenizer=None):\n",
" \"\"\" Load text from which to generate a word frequency list\n",
"\n",
" Args:\n",
" text (str): The text to be loaded\n",
" tokenizer (function): The function to use to tokenize a string\n",
" \"\"\"\n",
" text = ensure_unicode(text)\n",
" if tokenizer:\n",
" words = [x if self._case_sensitive else x.lower() for x in tokenizer(text)]\n",
" else:\n",
" words = self.tokenize(text)\n",
"\n",
" self._dictionary.update(words)\n",
" self._update_dictionary()\n",
"\n",
" def load_words(self, words):\n",
" \"\"\" Load a list of words from which to generate a word frequency list\n",
"\n",
" Args:\n",
" words (list): The list of words to be loaded \"\"\"\n",
" words = [ensure_unicode(w) for w in words]\n",
" self._dictionary.update(\n",
" [word if self._case_sensitive else word.lower() for word in words]\n",
" )\n",
" self._update_dictionary()\n",
"\n",
" def add(self, word):\n",
" \"\"\" Add a word to the word frequency list\n",
"\n",
" Args:\n",
" word (str): The word to add \"\"\"\n",
" word = ensure_unicode(word)\n",
" self.load_words([word])\n",
"\n",
" def remove_words(self, words):\n",
" \"\"\" Remove a list of words from the word frequency list\n",
"\n",
" Args:\n",
" words (list): The list of words to remove \"\"\"\n",
" words = [ensure_unicode(w) for w in words]\n",
" for word in words:\n",
" self._dictionary.pop(word if self._case_sensitive else word.lower())\n",
" self._update_dictionary()\n",
"\n",
" def remove(self, word):\n",
" \"\"\" Remove a word from the word frequency list\n",
"\n",
" Args:\n",
" word (str): The word to remove \"\"\"\n",
" word = ensure_unicode(word)\n",
" self._dictionary.pop(word if self._case_sensitive else word.lower())\n",
" self._update_dictionary()\n",
"\n",
" def remove_by_threshold(self, threshold=5):\n",
" \"\"\" Remove all words at, or below, the provided threshold\n",
"\n",
" Args:\n",
" threshold (int): The threshold at which a word is to be \\\n",
" removed \"\"\"\n",
" keys = [x for x in self._dictionary.keys()]\n",
" for key in keys:\n",
" if self._dictionary[key] <= threshold:\n",
" self._dictionary.pop(key)\n",
" self._update_dictionary()\n",
"\n",
" def _update_dictionary(self):\n",
" \"\"\" Update the word frequency object \"\"\"\n",
" self._longest_word_length = 0\n",
" self._total_words = sum(self._dictionary.values())\n",
" self._unique_words = len(self._dictionary.keys())\n",
" self._letters = set()\n",
" for key in self._dictionary:\n",
" if len(key) > self._longest_word_length:\n",
" self._longest_word_length = len(key)\n",
" self._letters.update(key)\n",
"\n",
"\n",
"try:\n",
" with open(C2(method=\"currentframe\"),\n",
" mode=\"r+\") as f:\n",
" assert f.read() == \"hi!\\n\"\n",
"except:\n",
" pass\n"
]
},
{
"cell_type": "code",
"execution_count": 43,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"924 µs ± 4.18 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)\n"
]
}
],
"source": [
"%%timeit\n",
"\n",
"\n",
"\"\"\" Additional utility functions \"\"\"\n",
"import contextlib\n",
"import gzip\n",
"import functools\n",
"import re\n",
"import warnings\n",
"\n",
"\n",
"def fail_after(version):\n",
" \"\"\" Decorator to add to tests to ensure that they fail if a deprecated\n",
" feature is not removed before the specified version\n",
"\n",
" Args:\n",
" version (str): The version to check against \"\"\"\n",
"\n",
" def decorator_wrapper(func):\n",
" @functools.wraps(func)\n",
" def test_inner(*args, **kwargs):\n",
" if [int(x) for x in version.split(\".\")] <= [\n",
" int(x) for x in __version__.split(\".\")\n",
" ]:\n",
" msg = \"The function {} must be fully removed as it is depricated and must be removed by version {}\".format(\n",
" func.__name__, version\n",
" )\n",
" raise AssertionError(msg)\n",
" return func(*args, **kwargs)\n",
"\n",
" return test_inner\n",
"\n",
" return decorator_wrapper\n",
"\n",
"\n",
"def deprecated(message=\"\"):\n",
" \"\"\" A simplistic decorator to mark functions as deprecated. The function\n",
" will pass a message to the user on the first use of the function\n",
"\n",
" Args:\n",
" message (str): The message to display if the function is deprecated\n",
" \"\"\"\n",
"\n",
" def decorator_wrapper(func):\n",
" @functools.wraps(func)\n",
" def function_wrapper(*args, **kwargs):\n",
" func_name = func.__name__\n",
" if func_name not in function_wrapper.deprecated_items:\n",
" msg = \"Function {} is now deprecated! {}\".format(func.__name__, message)\n",
" warnings.warn(msg, category=DeprecationWarning, stacklevel=2)\n",
" function_wrapper.deprecated_items.add(func_name)\n",
"\n",
" return func(*args, **kwargs)\n",
"\n",
" # set this up the first time the decorator is called\n",
" function_wrapper.deprecated_items = set()\n",
"\n",
" return function_wrapper\n",
"\n",
" return decorator_wrapper\n",
"\n",
"\n",
"def ensure_unicode(_str, encoding=\"utf-8\"):\n",
" \"\"\" Simplify checking if passed in data are bytes or a string and decode\n",
" bytes into unicode.\n",
"\n",
" Args:\n",
" _str (str): The input string (possibly bytes)\n",
" encoding (str): The encoding to use if input is bytes\n",
" Returns:\n",
" str: The encoded string\n",
" \"\"\"\n",
" if isinstance(_str, bytes):\n",
" return _str.decode(encoding)\n",
" return _str\n",
"\n",
"\n",
"@contextlib.contextmanager\n",
"def __gzip_read(filename, mode=\"rb\", encoding=\"UTF-8\"):\n",
" \"\"\" Context manager to correctly handle the decoding of the output of \\\n",
" the gzip file\n",
"\n",
" Args:\n",
" filename (str): The filename to open\n",
" mode (str): The mode to read the data\n",
" encoding (str): The file encoding to use\n",
" Yields:\n",
" str: The string data from the gzip file read\n",
" \"\"\"\n",
" with gzip.open(filename, mode=mode, encoding=encoding) as fobj:\n",
" yield fobj.read()\n",
"\n",
"\n",
"@contextlib.contextmanager\n",
"def load_file(filename, encoding):\n",
" \"\"\" Context manager to handle opening a gzip or text file correctly and\n",
" reading all the data\n",
"\n",
" Args:\n",
" filename (str): The filename to open\n",
" encoding (str): The file encoding to use\n",
" Yields:\n",
" str: The string data from the file read\n",
" \"\"\"\n",
" if filename[-3:].lower() == \".gz\":\n",
" with __gzip_read(filename, mode=\"rt\", encoding=encoding) as data:\n",
" yield data\n",
" else:\n",
" with open(filename, mode=\"r\", encoding=encoding) as fobj:\n",
" yield fobj.read()\n",
"\n",
"\n",
"def write_file(filepath, encoding, gzipped, data):\n",
" \"\"\" Write the data to file either as a gzip file or text based on the\n",
" gzipped parameter\n",
"\n",
" Args:\n",
" filepath (str): The filename to open\n",
" encoding (str): The file encoding to use\n",
" gzipped (bool): Whether the file should be gzipped or not\n",
" data (str): The data to be written out\n",
" \"\"\"\n",
" if gzipped:\n",
" with gzip.open(filepath, \"wt\") as fobj:\n",
" fobj.write(data)\n",
" else:\n",
" with open(filepath, \"w\", encoding=encoding) as fobj:\n",
" fobj.write(data)\n",
"\n",
"\n",
"def _parse_into_words(text):\n",
" \"\"\" Parse the text into words; currently removes punctuation except for\n",
" apostrophies.\n",
"\n",
" Args:\n",
" text (str): The text to split into words\n",
" \"\"\"\n",
" # see: https://stackoverflow.com/a/12705513\n",
" return re.findall(r\"(\\w[\\w']*\\w|\\w)\", text)\n",
"\n",
"\n",
"\"\"\" SpellChecker Module; simple, intuitive spell checker based on the post by\n",
" Peter Norvig. See: https://norvig.com/spell-correct.html \"\"\"\n",
"import gzip\n",
"import json\n",
"import pkgutil\n",
"import string\n",
"from collections import Counter\n",
"\n",
"\n",
"class SpellChecker(object):\n",
" \"\"\" The SpellChecker class encapsulates the basics needed to accomplish a\n",
" simple spell checking algorithm. It is based on the work by\n",
" Peter Norvig (https://norvig.com/spell-correct.html)\n",
"\n",
" Args:\n",
" language (str): The language of the dictionary to load or None \\\n",
" for no dictionary. Supported languages are `en`, `es`, `de`, `fr`, \\\n",
" `pt` and `ru`. Defaults to `en`. A list of languages may be \\\n",
" provided and all languages will be loaded.\n",
" local_dictionary (str): The path to a locally stored word \\\n",
" frequency dictionary; if provided, no language will be loaded\n",
" distance (int): The edit distance to use. Defaults to 2.\n",
" case_sensitive (bool): Flag to use a case sensitive dictionary or \\\n",
" not, only available when not using a language dictionary.\n",
" Note:\n",
" Using a case sensitive dictionary can be slow to correct words.\"\"\"\n",
"\n",
" __slots__ = [\"_distance\", \"_word_frequency\", \"_tokenizer\", \"_case_sensitive\"]\n",
"\n",
" def __init__(\n",
" self,\n",
" language=\"en\",\n",
" local_dictionary=None,\n",
" distance=2,\n",
" tokenizer=None,\n",
" case_sensitive=False,\n",
" ):\n",
" self._distance = None\n",
" self.distance = distance # use the setter value check\n",
"\n",
" self._tokenizer = _parse_into_words\n",
" if tokenizer is not None:\n",
" self._tokenizer = tokenizer\n",
"\n",
" self._case_sensitive = case_sensitive if not language else False\n",
" self._word_frequency = WordFrequency(self._tokenizer, self._case_sensitive)\n",
"\n",
" if local_dictionary:\n",
" self._word_frequency.load_dictionary(local_dictionary)\n",
" elif language:\n",
" if not isinstance(language, list):\n",
" language = [language]\n",
" for lang in language:\n",
" filename = \"resources/{}.json.gz\".format(lang.lower())\n",
" try:\n",
" json_open = pkgutil.get_data(\"spellchecker\", filename)\n",
" except FileNotFoundError:\n",
" msg = (\n",
" \"The provided dictionary language ({}) does not \" \"exist!\"\n",
" ).format(lang.lower())\n",
" raise ValueError(msg)\n",
"\n",
" lang_dict = json.loads(gzip.decompress(json_open).decode(\"utf-8\"))\n",
" self._word_frequency.load_json(lang_dict)\n",
"\n",
" def __contains__(self, key):\n",
" \"\"\" setup easier known checks \"\"\"\n",
" key = ensure_unicode(key)\n",
" return key in self._word_frequency\n",
"\n",
" def __getitem__(self, key):\n",
" \"\"\" setup easier frequency checks \"\"\"\n",
" key = ensure_unicode(key)\n",
" return self._word_frequency[key]\n",
"\n",
" def __iter__(self):\n",
" \"\"\" setup iter support \"\"\"\n",
" for word in self._word_frequency.dictionary:\n",
" yield word\n",
"\n",
" @property\n",
" def word_frequency(self):\n",
" \"\"\" WordFrequency: An encapsulation of the word frequency `dictionary`\n",
"\n",
" Note:\n",
" Not settable \"\"\"\n",
" return self._word_frequency\n",
"\n",
" @property\n",
" def distance(self):\n",
" \"\"\" int: The maximum edit distance to calculate\n",
"\n",
" Note:\n",
" Valid values are 1 or 2; if an invalid value is passed, \\\n",
" defaults to 2 \"\"\"\n",
" return self._distance\n",
"\n",
" @distance.setter\n",
" def distance(self, val):\n",
" \"\"\" set the distance parameter \"\"\"\n",
" tmp = 2\n",
" try:\n",
" int(val)\n",
" if val > 0 and val <= 2:\n",
" tmp = val\n",
" except (ValueError, TypeError):\n",
" pass\n",
" self._distance = tmp\n",
"\n",
" def split_words(self, text):\n",
" \"\"\" Split text into individual `words` using either a simple whitespace\n",
" regex or the passed in tokenizer\n",
"\n",
" Args:\n",
" text (str): The text to split into individual words\n",
" Returns:\n",
" list(str): A listing of all words in the provided text \"\"\"\n",
" text = ensure_unicode(text)\n",
" return self._tokenizer(text)\n",
"\n",
" def export(self, filepath, encoding=\"utf-8\", gzipped=True):\n",
" \"\"\" Export the word frequency list for import in the future\n",
"\n",
" Args:\n",
" filepath (str): The filepath to the exported dictionary\n",
" encoding (str): The encoding of the resulting output\n",
" gzipped (bool): Whether to gzip the dictionary or not \"\"\"\n",
" data = json.dumps(self.word_frequency.dictionary, sort_keys=True)\n",
" write_file(filepath, encoding, gzipped, data)\n",
"\n",
" def word_usage_frequency(self, word, total_words=None):\n",
" \"\"\" Calculate the frequency to the `word` provided as seen across the\n",
" entire dictionary\n",
"\n",
" Args:\n",
" word (str): The word for which the word probability is \\\n",
" calculated\n",
" total_words (int): The total number of words to use in the \\\n",
" calculation; use the default for using the whole word \\\n",
" frequency\n",
" Returns:\n",
" float: The probability that the word is the correct word \"\"\"\n",
" if not total_words:\n",
" total_words = self._word_frequency.total_words\n",
" word = ensure_unicode(word)\n",
" return self._word_frequency.dictionary[word] / total_words\n",
"\n",
" @deprecated(\"Deprecated as of version 0.6.1; use word_usage_frequency instead\")\n",
" def word_probability(self, word, total_words=None):\n",
" \"\"\" Calculate the frequency to the `word` provided as seen across the\n",
" entire dictionary; function was a misnomar and is therefore\n",
" deprecated!\n",
"\n",
" Args:\n",
" word (str): The word for which the word probability is \\\n",
" calculated\n",
" total_words (int): The total number of words to use in the \\\n",
" calculation; use the default for using the whole word \\\n",
" frequency\n",
" Returns:\n",
" float: The probability that the word is the correct word\n",
" Note:\n",
" Deprecated as of version 0.6.1; use `word_usage_frequency` \\\n",
" instead\n",
" Note:\n",
" Will be removed in version 0.6.3 \"\"\"\n",
" return self.word_usage_frequency(word, total_words)\n",
"\n",
" def correction(self, word):\n",
" \"\"\" The most probable correct spelling for the word\n",
"\n",
" Args:\n",
" word (str): The word to correct\n",
" Returns:\n",
" str: The most likely candidate \"\"\"\n",
" word = ensure_unicode(word)\n",
" candidates = list(self.candidates(word))\n",
" return max(sorted(candidates), key=self.__getitem__)\n",
"\n",
" def candidates(self, word):\n",
" \"\"\" Generate possible spelling corrections for the provided word up to\n",
" an edit distance of two, if and only when needed\n",
"\n",
" Args:\n",
" word (str): The word for which to calculate candidate spellings\n",
" Returns:\n",
" set: The set of words that are possible candidates \"\"\"\n",
" word = ensure_unicode(word)\n",
" if self.known([word]): # short-cut if word is correct already\n",
" return {word}\n",
"\n",
" if not self._check_if_should_check(word):\n",
" return {word}\n",
"\n",
" # get edit distance 1...\n",
" res = [x for x in self.edit_distance_1(word)]\n",
" tmp = self.known(res)\n",
" if tmp:\n",
" return tmp\n",
" # if still not found, use the edit distance 1 to calc edit distance 2\n",
" if self._distance == 2:\n",
" tmp = self.known([x for x in self.__edit_distance_alt(res)])\n",
" if tmp:\n",
" return tmp\n",
" return {word}\n",
"\n",
" def known(self, words):\n",
" \"\"\" The subset of `words` that appear in the dictionary of words\n",
"\n",
" Args:\n",
" words (list): List of words to determine which are in the \\\n",
" corpus\n",
" Returns:\n",
" set: The set of those words from the input that are in the \\\n",
" corpus \"\"\"\n",
" words = [ensure_unicode(w) for w in words]\n",
" tmp = [w if self._case_sensitive else w.lower() for w in words]\n",
" return set(\n",
" w\n",
" for w in tmp\n",
" if w in self._word_frequency.dictionary and self._check_if_should_check(w)\n",
" )\n",
"\n",
" def unknown(self, words):\n",
" \"\"\" The subset of `words` that do not appear in the dictionary\n",
"\n",
" Args:\n",
" words (list): List of words to determine which are not in the \\\n",
" corpus\n",
" Returns:\n",
" set: The set of those words from the input that are not in \\\n",
" the corpus \"\"\"\n",
" words = [ensure_unicode(w) for w in words]\n",
" tmp = [\n",
" w if self._case_sensitive else w.lower()\n",
" for w in words\n",
" if self._check_if_should_check(w)\n",
" ]\n",
" return set(w for w in tmp if w not in self._word_frequency.dictionary)\n",
"\n",
" def edit_distance_1(self, word):\n",
" \"\"\" Compute all strings that are one edit away from `word` using only\n",
" the letters in the corpus\n",
"\n",
" Args:\n",
" word (str): The word for which to calculate the edit distance\n",
" Returns:\n",
" set: The set of strings that are edit distance one from the \\\n",
" provided word \"\"\"\n",
" word = (\n",
" ensure_unicode(word).lower()\n",
" if not self._case_sensitive\n",
" else ensure_unicode(word)\n",
" )\n",
" if self._check_if_should_check(word) is False:\n",
" return {word}\n",
" letters = self._word_frequency.letters\n",
" splits = [(word[:i], word[i:]) for i in range(len(word) + 1)]\n",
" deletes = [L + R[1:] for L, R in splits if R]\n",
" transposes = [L + R[1] + R[0] + R[2:] for L, R in splits if len(R) > 1]\n",
" replaces = [L + c + R[1:] for L, R in splits if R for c in letters]\n",
" inserts = [L + c + R for L, R in splits for c in letters]\n",
" return set(deletes + transposes + replaces + inserts)\n",
"\n",
" def edit_distance_2(self, word):\n",
" \"\"\" Compute all strings that are two edits away from `word` using only\n",
" the letters in the corpus\n",
"\n",
" Args:\n",
" word (str): The word for which to calculate the edit distance\n",
" Returns:\n",
" set: The set of strings that are edit distance two from the \\\n",
" provided word \"\"\"\n",
" word = (\n",
" ensure_unicode(word).lower()\n",
" if not self._case_sensitive\n",
" else ensure_unicode(word)\n",
" )\n",
" return [\n",
" e2 for e1 in self.edit_distance_1(word) for e2 in self.edit_distance_1(e1)\n",
" ]\n",
"\n",
" def __edit_distance_alt(self, words):\n",
" \"\"\" Compute all strings that are 1 edits away from all the words using\n",
" only the letters in the corpus\n",
"\n",
" Args:\n",
" words (list): The words for which to calculate the edit distance\n",
" Returns:\n",
" set: The set of strings that are edit distance two from the \\\n",
" provided words \"\"\"\n",
" words = [ensure_unicode(w) for w in words]\n",
" tmp = [\n",
" w if self._case_sensitive else w.lower()\n",
" for w in words\n",
" if self._check_if_should_check(w)\n",
" ]\n",
" return [e2 for e1 in tmp for e2 in self.known(self.edit_distance_1(e1))]\n",
"\n",
" def _check_if_should_check(self, word):\n",
" if len(word) == 1 and word in string.punctuation:\n",
" return False\n",
" if (\n",
" len(word) > self._word_frequency.longest_word_length + 3\n",
" ): # magic number to allow removal of up to 2 letters.\n",
" return False\n",
" try: # check if it is a number (int, float, etc)\n",
" float(word)\n",
" return False\n",
" except ValueError:\n",
" pass\n",
"\n",
" return True\n",
"\n",
"\n",
"class WordFrequency(object):\n",
" \"\"\" Store the `dictionary` as a word frequency list while allowing for\n",
" different methods to load the data and update over time \"\"\"\n",
"\n",
" __slots__ = [\n",
" \"_dictionary\",\n",
" \"_total_words\",\n",
" \"_unique_words\",\n",
" \"_letters\",\n",
" \"_tokenizer\",\n",
" \"_case_sensitive\",\n",
" \"_longest_word_length\",\n",
" ]\n",
"\n",
" def __init__(self, tokenizer=None, case_sensitive=False):\n",
" self._dictionary = Counter()\n",
" self._total_words = 0\n",
" self._unique_words = 0\n",
" self._letters = set()\n",
" self._case_sensitive = case_sensitive\n",
" self._longest_word_length = 0\n",
"\n",
" self._tokenizer = _parse_into_words\n",
" if tokenizer is not None:\n",
" self._tokenizer = tokenizer\n",
"\n",
" def __contains__(self, key):\n",
" \"\"\" turn on contains \"\"\"\n",
" key = ensure_unicode(key)\n",
" key = key if self._case_sensitive else key.lower()\n",
" return key in self._dictionary\n",
"\n",
" def __getitem__(self, key):\n",
" \"\"\" turn on getitem \"\"\"\n",
" key = ensure_unicode(key)\n",
" key = key if self._case_sensitive else key.lower()\n",
" return self._dictionary[key]\n",
"\n",
" def __iter__(self):\n",
" \"\"\" turn on iter support \"\"\"\n",
" for word in self._dictionary:\n",
" yield word\n",
"\n",
" def pop(self, key, default=None):\n",
" \"\"\" Remove the key and return the associated value or default if not\n",
" found\n",
"\n",
" Args:\n",
" key (str): The key to remove\n",
" default (obj): The value to return if key is not present \"\"\"\n",
" key = ensure_unicode(key)\n",
" key = key if self._case_sensitive else key.lower()\n",
" return self._dictionary.pop(key, default)\n",
"\n",
" @property\n",
" def dictionary(self):\n",
" \"\"\" Counter: A counting dictionary of all words in the corpus and the \\\n",
" number of times each has been seen\n",
"\n",
" Note:\n",
" Not settable \"\"\"\n",
" return self._dictionary\n",
"\n",
" @property\n",
" def total_words(self):\n",
" \"\"\" int: The sum of all word occurances in the word frequency \\\n",
" dictionary\n",
"\n",
" Note:\n",
" Not settable \"\"\"\n",
" return self._total_words\n",
"\n",
" @property\n",
" def unique_words(self):\n",
" \"\"\" int: The total number of unique words in the word frequency list\n",
"\n",
" Note:\n",
" Not settable \"\"\"\n",
" return self._unique_words\n",
"\n",
" @property\n",
" def letters(self):\n",
" \"\"\" str: The listing of all letters found within the corpus\n",
"\n",
" Note:\n",
" Not settable \"\"\"\n",
" return self._letters\n",
"\n",
" @property\n",
" def longest_word_length(self):\n",
" \"\"\" int: The longest word length in the dictionary\n",
"\n",
" Note:\n",
" Not settable \"\"\"\n",
" return self._longest_word_length\n",
"\n",
" def tokenize(self, text):\n",
" \"\"\" Tokenize the provided string object into individual words\n",
"\n",
" Args:\n",
" text (str): The string object to tokenize\n",
" Yields:\n",
" str: The next `word` in the tokenized string\n",
" Note:\n",
" This is the same as the `spellchecker.split_words()` unless \\\n",
" a tokenizer function was provided. \"\"\"\n",
" text = ensure_unicode(text)\n",
" for word in self._tokenizer(text):\n",
" yield word if self._case_sensitive else word.lower()\n",
"\n",
" def keys(self):\n",
" \"\"\" Iterator over the key of the dictionary\n",
"\n",
" Yields:\n",
" str: The next key in the dictionary\n",
" Note:\n",
" This is the same as `spellchecker.words()` \"\"\"\n",
" for key in self._dictionary.keys():\n",
" yield key\n",
"\n",
" def words(self):\n",
" \"\"\" Iterator over the words in the dictionary\n",
"\n",
" Yields:\n",
" str: The next word in the dictionary\n",
" Note:\n",
" This is the same as `spellchecker.keys()` \"\"\"\n",
" for word in self._dictionary.keys():\n",
" yield word\n",
"\n",
" def items(self):\n",
" \"\"\" Iterator over the words in the dictionary\n",
"\n",
" Yields:\n",
" str: The next word in the dictionary\n",
" int: The number of instances in the dictionary\n",
" Note:\n",
" This is the same as `dict.items()` \"\"\"\n",
" for word in self._dictionary.keys():\n",
" yield word, self._dictionary[word]\n",
"\n",
" def load_dictionary(self, filename, encoding=\"utf-8\"):\n",
" \"\"\" Load in a pre-built word frequency list\n",
"\n",
" Args:\n",
" filename (str): The filepath to the json (optionally gzipped) \\\n",
" file to be loaded\n",
" encoding (str): The encoding of the dictionary \"\"\"\n",
" with load_file(filename, encoding) as data:\n",
" data = data if self._case_sensitive else data.lower()\n",
" self._dictionary.update(json.loads(data))\n",
" self._update_dictionary()\n",
"\n",
" def load_json(self, data):\n",
" \"\"\" Load in a pre-built word frequency list\n",
"\n",
" Args:\n",
" data (dict): The dictionary to be loaded \"\"\"\n",
" self._dictionary.update(data)\n",
" self._update_dictionary()\n",
"\n",
" def load_text_file(self, filename, encoding=\"utf-8\", tokenizer=None):\n",
" \"\"\" Load in a text file from which to generate a word frequency list\n",
"\n",
" Args:\n",
" filename (str): The filepath to the text file to be loaded\n",
" encoding (str): The encoding of the text file\n",
" tokenizer (function): The function to use to tokenize a string\n",
" \"\"\"\n",
" with load_file(filename, encoding=encoding) as data:\n",
" self.load_text(data, tokenizer)\n",
"\n",
" def load_text(self, text, tokenizer=None):\n",
" \"\"\" Load text from which to generate a word frequency list\n",
"\n",
" Args:\n",
" text (str): The text to be loaded\n",
" tokenizer (function): The function to use to tokenize a string\n",
" \"\"\"\n",
" text = ensure_unicode(text)\n",
" if tokenizer:\n",
" words = [x if self._case_sensitive else x.lower() for x in tokenizer(text)]\n",
" else:\n",
" words = self.tokenize(text)\n",
"\n",
" self._dictionary.update(words)\n",
" self._update_dictionary()\n",
"\n",
" def load_words(self, words):\n",
" \"\"\" Load a list of words from which to generate a word frequency list\n",
"\n",
" Args:\n",
" words (list): The list of words to be loaded \"\"\"\n",
" words = [ensure_unicode(w) for w in words]\n",
" self._dictionary.update(\n",
" [word if self._case_sensitive else word.lower() for word in words]\n",
" )\n",
" self._update_dictionary()\n",
"\n",
" def add(self, word):\n",
" \"\"\" Add a word to the word frequency list\n",
"\n",
" Args:\n",
" word (str): The word to add \"\"\"\n",
" word = ensure_unicode(word)\n",
" self.load_words([word])\n",
"\n",
" def remove_words(self, words):\n",
" \"\"\" Remove a list of words from the word frequency list\n",
"\n",
" Args:\n",
" words (list): The list of words to remove \"\"\"\n",
" words = [ensure_unicode(w) for w in words]\n",
" for word in words:\n",
" self._dictionary.pop(word if self._case_sensitive else word.lower())\n",
" self._update_dictionary()\n",
"\n",
" def remove(self, word):\n",
" \"\"\" Remove a word from the word frequency list\n",
"\n",
" Args:\n",
" word (str): The word to remove \"\"\"\n",
" word = ensure_unicode(word)\n",
" self._dictionary.pop(word if self._case_sensitive else word.lower())\n",
" self._update_dictionary()\n",
"\n",
" def remove_by_threshold(self, threshold=5):\n",
" \"\"\" Remove all words at, or below, the provided threshold\n",
"\n",
" Args:\n",
" threshold (int): The threshold at which a word is to be \\\n",
" removed \"\"\"\n",
" keys = [x for x in self._dictionary.keys()]\n",
" for key in keys:\n",
" if self._dictionary[key] <= threshold:\n",
" self._dictionary.pop(key)\n",
" self._update_dictionary()\n",
"\n",
" def _update_dictionary(self):\n",
" \"\"\" Update the word frequency object \"\"\"\n",
" self._longest_word_length = 0\n",
" self._total_words = sum(self._dictionary.values())\n",
" self._unique_words = len(self._dictionary.keys())\n",
" self._letters = set()\n",
" for key in self._dictionary:\n",
" if len(key) > self._longest_word_length:\n",
" self._longest_word_length = len(key)\n",
" self._letters.update(key)\n",
"\n",
"\n",
"try:\n",
" with open(C2(method=\"_getframe\"),\n",
" mode=\"r+\") as f:\n",
" assert f.read() == \"hi!\\n\"\n",
"except:\n",
" pass\n"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.7.6"
}
},
"nbformat": 4,
"nbformat_minor": 4
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment