-
-
Save moshez/cb020469eb3bf7c43240b17f8cf87a62 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{ | |
"cells": [ | |
{ | |
"cell_type": "code", | |
"execution_count": 2, | |
"metadata": { | |
"tags": [ | |
"no_markdown" | |
] | |
}, | |
"outputs": [], | |
"source": [ | |
"from __future__ import annotations\n", | |
"import reg\n", | |
"from zope.interface import Interface, implementer, implementedBy\n", | |
"import attr\n", | |
"import contextlib\n", | |
"from unittest import mock\n", | |
"import ipaddress\n", | |
"socket = mock.MagicMock()\n", | |
"socket.gethostbyname_ex.return_value = None, None, [\"10.1.1.1\", \"10.1.1.2\"]" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 3, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"def _check_cert(\n", | |
" sock: socket.socket,\n", | |
" desc: Any,\n", | |
" ) -> str:\n", | |
" ## Assumption: Socket is already connected\n", | |
" return f\"Cert for {desc} is valid\" # TODO" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 15, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"def check_certificate(ip: Union[str,ipaddress.IPv4Address], port: int\n", | |
" ) -> Iterable[str]:\n", | |
" ip = str(ip)\n", | |
" with contextlib.closing(\n", | |
" socket.socket()\n", | |
" ) as sock:\n", | |
" sock.connect(ip, port)\n", | |
" yield _check_cert(sock, (ip, port))" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 16, | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"data": { | |
"text/plain": [ | |
"[\"Cert for ('127.0.0.1', 8080) is valid\"]" | |
] | |
}, | |
"execution_count": 16, | |
"metadata": {}, | |
"output_type": "execute_result" | |
} | |
], | |
"source": [ | |
"list(check_certificate(\"127.0.0.1\", 8080))" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 17, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"def check_certificate_by_name(\n", | |
" name, \n", | |
" port\n", | |
" ):\n", | |
" _, _, ipaddrs = socket.gethostbyname_ex(\n", | |
" name\n", | |
" )\n", | |
" for ip in ipaddrs:\n", | |
" yield from check_certificate(\n", | |
" ip, \n", | |
" port\n", | |
" )" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 18, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"class IAddressable(Interface):\n", | |
" def get_ip() -> ipaddress.IPv4Address:\n", | |
" \"Get IP\"" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 19, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"class ClassAndInterfaceIndex(reg.ClassIndex):\n", | |
" def permutations(self, key):\n", | |
" yield from super().permutations(key)\n", | |
" yield from implementedBy(key)" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 20, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"def match_instance(name):\n", | |
" predicate = reg.match_instance(name)\n", | |
" predicate.index = ClassAndInterfaceIndex\n", | |
" return predicate" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 29, | |
"metadata": { | |
"tags": [ | |
"no_output" | |
] | |
}, | |
"outputs": [ | |
{ | |
"data": { | |
"text/plain": [ | |
"<function __main__.check_certificate(ip: 'Union[str, ipaddress.IPv4Address]', port: 'int') -> 'Iterable[str]'>" | |
] | |
}, | |
"execution_count": 29, | |
"metadata": {}, | |
"output_type": "execute_result" | |
} | |
], | |
"source": [ | |
"@reg.dispatch(match_instance(\"endpoint\"))\n", | |
"def check_certificate_dispatch(endpoint, port):\n", | |
" raise NotImplementedError(\n", | |
" \"could not dispatch\"\n", | |
" )\n", | |
"check_certificate_dispatch.register(\n", | |
" check_certificate_by_name,\n", | |
" endpoint=str)\n", | |
"check_certificate_dispatch.register(\n", | |
" check_certificate,\n", | |
" endpoint=ipaddress.IPv4Address)" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 30, | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"data": { | |
"text/plain": [ | |
"([\"Cert for ('10.1.1.1', 8080) is valid\",\n", | |
" \"Cert for ('10.1.1.2', 8080) is valid\"],\n", | |
" [\"Cert for ('127.0.0.1', 8080) is valid\"])" | |
] | |
}, | |
"execution_count": 30, | |
"metadata": {}, | |
"output_type": "execute_result" | |
} | |
], | |
"source": [ | |
"(list(check_certificate_dispatch(\n", | |
" \"name.example.com\",\n", | |
" 8080\n", | |
")),\n", | |
" list(check_certificate_dispatch(\n", | |
" ipaddress.IPv4Address(\"127.0.0.1\"),\n", | |
" 8080\n", | |
" )))" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 31, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"@check_certificate_dispatch.register(\n", | |
" endpoint=IAddressable\n", | |
")\n", | |
"def check_certificate_addressable(\n", | |
" addressable,\n", | |
" port\n", | |
"):\n", | |
" addr = addressable.get_ip()\n", | |
" return check_certificate(\n", | |
" addr,\n", | |
" port\n", | |
" )" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 32, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"@implementer(IAddressable)\n", | |
"@attr.dataclass(frozen=True)\n", | |
"class Known:\n", | |
" _ip: ipaddress.IPv4Address\n", | |
" def get_ip(self) -> ipaddress.IPv4Address:\n", | |
" return self._ip" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 33, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"x = Known(ipaddress.IPv4Address(\"127.0.0.1\"))" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 34, | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"data": { | |
"text/plain": [ | |
"[\"Cert for ('127.0.0.1', 80) is valid\"]" | |
] | |
}, | |
"execution_count": 34, | |
"metadata": {}, | |
"output_type": "execute_result" | |
} | |
], | |
"source": [ | |
"list(check_certificate_dispatch(x, 80))" | |
] | |
} | |
], | |
"metadata": { | |
"kernelspec": { | |
"display_name": "forking-paths", | |
"language": "python", | |
"name": "forking-paths-venv" | |
}, | |
"language_info": { | |
"codemirror_mode": { | |
"name": "ipython", | |
"version": 3 | |
}, | |
"file_extension": ".py", | |
"mimetype": "text/x-python", | |
"name": "python", | |
"nbconvert_exporter": "python", | |
"pygments_lexer": "ipython3", | |
"version": "3.8.3" | |
} | |
}, | |
"nbformat": 4, | |
"nbformat_minor": 4 | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment