Skip to content

Instantly share code, notes, and snippets.

@moshez
Created August 12, 2020 04:09
Show Gist options
  • Save moshez/cb020469eb3bf7c43240b17f8cf87a62 to your computer and use it in GitHub Desktop.
Save moshez/cb020469eb3bf7c43240b17f8cf87a62 to your computer and use it in GitHub Desktop.
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "code",
"execution_count": 2,
"metadata": {
"tags": [
"no_markdown"
]
},
"outputs": [],
"source": [
"from __future__ import annotations\n",
"import reg\n",
"from zope.interface import Interface, implementer, implementedBy\n",
"import attr\n",
"import contextlib\n",
"from unittest import mock\n",
"import ipaddress\n",
"socket = mock.MagicMock()\n",
"socket.gethostbyname_ex.return_value = None, None, [\"10.1.1.1\", \"10.1.1.2\"]"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {},
"outputs": [],
"source": [
"def _check_cert(\n",
" sock: socket.socket,\n",
" desc: Any,\n",
" ) -> str:\n",
" ## Assumption: Socket is already connected\n",
" return f\"Cert for {desc} is valid\" # TODO"
]
},
{
"cell_type": "code",
"execution_count": 15,
"metadata": {},
"outputs": [],
"source": [
"def check_certificate(ip: Union[str,ipaddress.IPv4Address], port: int\n",
" ) -> Iterable[str]:\n",
" ip = str(ip)\n",
" with contextlib.closing(\n",
" socket.socket()\n",
" ) as sock:\n",
" sock.connect(ip, port)\n",
" yield _check_cert(sock, (ip, port))"
]
},
{
"cell_type": "code",
"execution_count": 16,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"[\"Cert for ('127.0.0.1', 8080) is valid\"]"
]
},
"execution_count": 16,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"list(check_certificate(\"127.0.0.1\", 8080))"
]
},
{
"cell_type": "code",
"execution_count": 17,
"metadata": {},
"outputs": [],
"source": [
"def check_certificate_by_name(\n",
" name, \n",
" port\n",
" ):\n",
" _, _, ipaddrs = socket.gethostbyname_ex(\n",
" name\n",
" )\n",
" for ip in ipaddrs:\n",
" yield from check_certificate(\n",
" ip, \n",
" port\n",
" )"
]
},
{
"cell_type": "code",
"execution_count": 18,
"metadata": {},
"outputs": [],
"source": [
"class IAddressable(Interface):\n",
" def get_ip() -> ipaddress.IPv4Address:\n",
" \"Get IP\""
]
},
{
"cell_type": "code",
"execution_count": 19,
"metadata": {},
"outputs": [],
"source": [
"class ClassAndInterfaceIndex(reg.ClassIndex):\n",
" def permutations(self, key):\n",
" yield from super().permutations(key)\n",
" yield from implementedBy(key)"
]
},
{
"cell_type": "code",
"execution_count": 20,
"metadata": {},
"outputs": [],
"source": [
"def match_instance(name):\n",
" predicate = reg.match_instance(name)\n",
" predicate.index = ClassAndInterfaceIndex\n",
" return predicate"
]
},
{
"cell_type": "code",
"execution_count": 29,
"metadata": {
"tags": [
"no_output"
]
},
"outputs": [
{
"data": {
"text/plain": [
"<function __main__.check_certificate(ip: 'Union[str, ipaddress.IPv4Address]', port: 'int') -> 'Iterable[str]'>"
]
},
"execution_count": 29,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"@reg.dispatch(match_instance(\"endpoint\"))\n",
"def check_certificate_dispatch(endpoint, port):\n",
" raise NotImplementedError(\n",
" \"could not dispatch\"\n",
" )\n",
"check_certificate_dispatch.register(\n",
" check_certificate_by_name,\n",
" endpoint=str)\n",
"check_certificate_dispatch.register(\n",
" check_certificate,\n",
" endpoint=ipaddress.IPv4Address)"
]
},
{
"cell_type": "code",
"execution_count": 30,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"([\"Cert for ('10.1.1.1', 8080) is valid\",\n",
" \"Cert for ('10.1.1.2', 8080) is valid\"],\n",
" [\"Cert for ('127.0.0.1', 8080) is valid\"])"
]
},
"execution_count": 30,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"(list(check_certificate_dispatch(\n",
" \"name.example.com\",\n",
" 8080\n",
")),\n",
" list(check_certificate_dispatch(\n",
" ipaddress.IPv4Address(\"127.0.0.1\"),\n",
" 8080\n",
" )))"
]
},
{
"cell_type": "code",
"execution_count": 31,
"metadata": {},
"outputs": [],
"source": [
"@check_certificate_dispatch.register(\n",
" endpoint=IAddressable\n",
")\n",
"def check_certificate_addressable(\n",
" addressable,\n",
" port\n",
"):\n",
" addr = addressable.get_ip()\n",
" return check_certificate(\n",
" addr,\n",
" port\n",
" )"
]
},
{
"cell_type": "code",
"execution_count": 32,
"metadata": {},
"outputs": [],
"source": [
"@implementer(IAddressable)\n",
"@attr.dataclass(frozen=True)\n",
"class Known:\n",
" _ip: ipaddress.IPv4Address\n",
" def get_ip(self) -> ipaddress.IPv4Address:\n",
" return self._ip"
]
},
{
"cell_type": "code",
"execution_count": 33,
"metadata": {},
"outputs": [],
"source": [
"x = Known(ipaddress.IPv4Address(\"127.0.0.1\"))"
]
},
{
"cell_type": "code",
"execution_count": 34,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"[\"Cert for ('127.0.0.1', 80) is valid\"]"
]
},
"execution_count": 34,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"list(check_certificate_dispatch(x, 80))"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "forking-paths",
"language": "python",
"name": "forking-paths-venv"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.8.3"
}
},
"nbformat": 4,
"nbformat_minor": 4
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment