Skip to content

Instantly share code, notes, and snippets.

@progandy
Created January 2, 2024 17:53
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save progandy/87a6f4c7cc6349092731107017a3768c to your computer and use it in GitHub Desktop.
Save progandy/87a6f4c7cc6349092731107017a3768c to your computer and use it in GitHub Desktop.
#!/usr/bin/env python
"""
capo-browser
Copyright (C) 2024 ProgAndy
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
"""
# Portal Browser for Captive Portal Detection
This script launches a minimal web browser inside a user namespace, configuring specific nameservers
to ensure accurate captive portal detection. It automatically detects the presence of a captive portal
using the provided check URL and opens a minimal browser window accordingly.
## Dependencies:
- Python 3.8+
- Either GTK or Qt. Possible combinations are:
- PyGObject, GTK3, WebKit2Gtk 4.0
- PyGObject, GTK3, WebKit2Gtk 4.1
- PyGObject, GTK4, WebKitGtk 6.0
- PySide2, Qt5, QtWebEngine for Gt5
- PySide6, Qt6, QtWebEngine for Qt6
- POSIX sh with
- unshare
- mount
- Linux kernel with support for unprivileged user namespaces and mount namespace
- libc able to parse resolv.conf (directly or configurable with nsswitch.conf)
"""
import gi
import sys
import os
import tempfile
import argparse
from urllib.parse import urlparse
import shlex
import ipaddress
import http.client
import socket
from enum import Enum
class PortalDetectionProvider(Enum):
FIREFOX = ("http://detectportal.firefox.com/success.txt", "success")
EDGE = ("http://www.msftconnecttest.com/connecttest.txt", "Microsoft Connect Test")
CHROME = ("http://www.gstatic.com/generate_204", None)
APPLE = ("http://captive.apple.com/hotspot-detect.html", "Success")
ANDROID = ("http://connectivitycheck.android.com/generate_204", None)
KUKETZ = ("http://captiveportal.kuketz.de", None)
GRAPHENEOS = ("http://connectivitycheck.grapheneos.network/generate_204", None)
GNOME = ("http://nmcheck.gnome.org/check_network_status.txt", "NetworkManager is online")
ARCHLINUX = ("http://ping.archlinux.org/", "This domain is used for connectivity checking (captive portal detection).")
STEAM = ("http://test.steampowered.com/", "success")
STEAM204 = ("http://test.steampowered.com/204", None)
class CaptivePortalChecker:
def __init__(self, provider_or_url, text=None):
if provider_or_url.lower() in [entity.name.lower() for entity in PortalDetectionProvider]:
url, default_text = self._get_portal_detection_info(provider_or_url)
print(f"Using provider {provider_or_url}")
else:
url, default_text = provider_or_url, text
print(f"Detecion url: {url}")
print(f"Detection method: status {'204' if default_text is None else '400 with text [' + default_text + ']'}")
self.url = url
self.text = default_text
def _get_portal_detection_info(self, provider_name):
entity = PortalDetectionProvider[provider_name.upper()]
return entity.value
def check_captive_portal(self):
parsed_url = urlparse(self.url)
def perform_check(protocol):
connection_cls = http.client.HTTPSConnection if protocol == 'https' else http.client.HTTPConnection
try:
path = parsed_url.path
if parsed_url.query:
path += '?' + parsed_url.query
connection = connection_cls(parsed_url.netloc, timeout=5)
connection.request("GET", path)
response = connection.getresponse()
if self.text is None and response.status == 204:
print(f"No captive portal detected via {protocol}")
return False
if self.text is not None and response.status == 200 and self.text.strip() == response.read().decode("utf-8").strip():
print(f"No captive portal detected via {protocol}")
return False
location_header = response.getheader('Location')
if location_header:
print(f"Redirect detected via {protocol}. Target URL: {location_header}")
return location_header
print(f"Captive portal detected via {protocol}")
return True
except Exception as e:
print(f"Error: {e} via {protocol}")
return None
# Perform HTTPS check
result_https = perform_check("https")
if result_https is False:
return False
# Perform HTTP check only if HTTPS check finds a portal
result_http = perform_check("http")
return result_http
@staticmethod
def add_argument_group(parser):
group = parser.add_argument_group('Captive Portal Detection Options', description='Perform connectivity checks for predefined entities or custom URLs.')
group.add_argument('--check', choices=[entity.name.lower() for entity in PortalDetectionProvider],
help='Provider for connectivity check (choose one)', default=PortalDetectionProvider.GNOME.name.lower())
group.add_argument('--check-url', help='Custom URL for connectivity check, must be http', metavar='CUSTOM_URL')
group.add_argument('--check-text', help='Expected text for text check (optional)', dest='check_text', metavar='EXPECTED_TEXT')
return group
class GObjectIntrospectionHelper:
"""
Helper class for GObject Introspection related operations.
"""
@staticmethod
def version_available(namespace, version):
"""
Check if a specific version of a GObject Introspection module is available.
Parameters:
- namespace (str): The namespace of the GObject Introspection module.
- version (str): The version of the GObject Introspection module.
Returns:
- bool: True if the version is available, False otherwise.
"""
repository = gi.Repository.get_default()
if repository:
for vers in repository.enumerate_versions(namespace):
if vers == version:
return True
return False
class GTKBrowserApp:
"""
GTK3 Browser Application.
"""
def __init__(self, url, gtk_version, webkit_version):
gi.require_version("Gtk", gtk_version)
if gtk_version == "3.0":
gi.require_version("WebKit2", webkit_version)
from gi.repository import Gtk, WebKit2
else:
gi.require_version("WebKit", "6.0")
from gi.repository import Gtk, WebKit
class BrowserAppGtk(Gtk.Application):
"""GTK Browser Application"""
def __init__(self):
"""Initialize the GTK browser application."""
super().__init__(application_id="org.example.portalbrowser", flags=0)
self.connect("activate", self.on_activate)
self.url = url
def on_activate(self, app):
"""Handler for the 'activate' signal."""
self.win = Gtk.ApplicationWindow(application=app)
self.win.set_title("Captive Portal Browser")
self.view = self.create_webview()
if gtk_version == "3.0":
self.win.add(self.view)
self.win.show_all()
else:
self.win.set_child(self.view)
self.win.present()
def create_webview(self):
"""Create and configure the WebKit2.WebView."""
wk = WebKit2 if gtk_version == "3.0" else WebKit
view = wk.WebView()
context = view.get_context()
context.set_cache_model(wk.CacheModel.DOCUMENT_VIEWER)
view.load_uri(self.url)
return view
self.app = BrowserAppGtk()
def run(self):
"""Run the GTK3 browser application."""
self.app.run()
class QtBrowserApp:
"""
Qt Browser Application.
"""
def __init__(self, url):
try:
from PySide6.QtWidgets import QApplication, QMainWindow
from PySide6.QtWebEngineWidgets import QWebEngineView
except ImportError:
from PySide2.QtWidgets import QApplication, QMainWindow
from PySide2.QtWebEngineWidgets import QWebEngineView
self.app = QApplication(sys.argv)
self.win = QMainWindow()
self.win.setWindowTitle("Captive Portal Browser")
self.view = QWebEngineView()
self.view.setUrl(url)
self.win.setCentralWidget(self.view)
self.win.show()
def run(self):
"""Run the Qt browser application."""
sys.exit(self.app.exec())
class Utils:
"""
Class for running the script inside a user namespace.
"""
@staticmethod
def run_inside_userns(args):
"""Run the script inside a user namespace."""
print("Inside user namespace")
checker = CaptivePortalChecker(args.check_url or args.check, text=args.check_text)
if checker.check_captive_portal():
print("Captive portal detected.")
#url = "http://www.gstatic.com/generate_204"
url = args.check_url
print(url)
Utils.create_minimal_browser(url)
else:
print("No captive portal detected.")
sys.exit()
@staticmethod
def ip_string(address):
"""
Check if the given string is a valid IP address.
Parameters:
- address (str): The IP address to validate.
Returns:
- bool: True if the address is a valid IP, False otherwise.
"""
try:
ipaddress.ip_address(address)
return address
except ValueError:
raise argparse.ArgumentTypeError(f"{address} is not a valid ip")
@staticmethod
def check_captive_portal(check_url):
"""
Check for a captive portal using the provided check URL.
Parameters:
- check_url (str): The URL used for portal detection.
Returns:
- bool: True if a captive portal is detected, False otherwise.
"""
parsed_url = urlparse(check_url)
try:
conn = http.client.HTTPConnection(parsed_url.netloc, timeout=5)
query_string = parsed_url.query
path = parsed_url.path + '?' + query_string if query_string else parsed_url.path
conn.request("GET", path)
response = conn.getresponse()
return response.status != 204
except (socket.timeout, socket.error):
return False
finally:
conn.close()
def create_minimal_browser(url):
"""
Create a minimal browser window based on the available libraries.
"""
if GObjectIntrospectionHelper.version_available("Gtk", "4.0") and GObjectIntrospectionHelper.version_available("WebKit", "6.0"):
gtk4_browser = GTKBrowserApp(url, "4.0", "6.0")
gtk4_browser.run()
elif GObjectIntrospectionHelper.version_available("Gtk", "3.0") and (
GObjectIntrospectionHelper.version_available("WebKit2", "4.1") or GObjectIntrospectionHelper.version_available("WebKit2", "4.0")):
webkit_version = "4.1" if GObjectIntrospectionHelper.version_available("WebKit2", "4.1") else "4.0"
gtk3_browser = GTKBrowserApp(url, "3.0", webkit_version)
gtk3_browser.run()
else:
qt_browser = QtBrowserApp(url)
qt_browser.run()
class DefaultHelpFormatter(argparse.HelpFormatter):
def _get_help_string(self, action):
if action.default is not argparse.SUPPRESS and action.default is not None:
default_str = f" (default: {action.default})"
else:
default_str = ""
return super()._get_help_string(action) + default_str
def parse_command_line_args():
"""
Parse and validate command-line arguments.
Returns:
- str: The primary nameserver IP.
- bool: Whether to run inside a user namespace.
- str: The URL used for portal detection.
"""
parser = argparse.ArgumentParser(formatter_class=DefaultHelpFormatter, description="Portal Browser with Captive Portal Detection")
CaptivePortalChecker.add_argument_group(parser)
parser.add_argument("nameservers", type=Utils.ip_string, nargs="+", help="Nameservers to be used for captive portal detection")
parser.add_argument("--ns", action="store_true", help=argparse.SUPPRESS) # Hiding the --ns argument
return parser.parse_args()
def setup_userns(primary_nameserver, secondary_nameserver, check_url):
"""
Sets up the user namespace and runs the browser in it.
Parameters:
- primary_nameserver (str): The primary nameserver IP.
- secondary_nameserver (str): The secondary nameserver IP (if provided, otherwise None).
- check_url (str): The URL used for portal detection.
"""
with tempfile.TemporaryDirectory() as mnt_namespace:
print("Directory for namespace resolver overrides:", mnt_namespace)
resolv_conf_path = os.path.join(mnt_namespace, "resolv.conf")
with open(resolv_conf_path, "w") as resolv_conf:
resolv_conf.write(f"nameserver {primary_nameserver}\n")
if secondary_nameserver:
resolv_conf.write(f"nameserver {secondary_nameserver}\n")
nsswitch_conf_path = os.path.join(mnt_namespace, "nsswitch.conf")
with open(nsswitch_conf_path, "w") as nsswitch_conf:
nsswitch_conf.write("passwd: files\n")
nsswitch_conf.write("group: files\n")
nsswitch_conf.write("hosts: files dns\n")
# Include check_url in the command line arguments passed to the new process
os.execlp("unshare", "unshare", "-rm", "sh", "-c",
f"mount --bind {shlex.quote(resolv_conf_path)} /etc/resolv.conf"
f" && mount --bind {shlex.quote(nsswitch_conf_path)} /etc/nsswitch.conf"
f" && unshare -u --map-group={os.getgid()} --map-user={os.getuid()}" +
f" {shlex.quote(sys.executable)} {' '.join(map(shlex.quote, sys.argv))} --ns"
f" ; rm -r {shlex.quote(mnt_namespace)}")
def main():
"""
Main entry point for the script.
"""
args = parse_command_line_args()
if args.ns:
Utils.run_inside_userns(args)
elif args.nameservers:
setup_userns(args.nameservers[0], args.nameservers[1] if len(args.nameservers) > 1 else None, args.check_url)
else:
print("Unexpected error in command-line argument parsing.")
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment