Skip to content

Instantly share code, notes, and snippets.

@mirkobrombin
Created August 28, 2023 12:23
Show Gist options
  • Save mirkobrombin/3cfa21a44c318e08db29fd27b6252f4a to your computer and use it in GitHub Desktop.
Save mirkobrombin/3cfa21a44c318e08db29fd27b6252f4a to your computer and use it in GitHub Desktop.
# network.py
#
# Copyright 2023 mirkobrombin
# Copyright 2023 matbme
#
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundationat version 3 of the License.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import logging
import time
from gettext import gettext as _
from operator import attrgetter
from threading import Lock, Timer
from gi.repository import NM, NMA4, Adw, Gtk, GLib
from vanilla_installer.utils.run_async import RunAsync
logger = logging.getLogger("VanillaInstaller::Network")
# Dictionary mapping security types to a tuple containing
# their pretty name and whether it is a secure protocol.
# If security is None, it means that no padlock icon is shown.
# If security is False, a warning symbol appears instead of a padlock.
AP_SECURITY_TYPES = {
"none": (None, None),
"wep": (False, _("Insecure network (WEP)")),
"wpa": (True, _("Secure network (WPA)")),
"wpa2": (True, _("Secure network (WPA2)")),
"sae": (True, _("Secure network (WPA3)")),
"owe": (None, None),
"owe_tm": (None, None),
}
# PyGObject-libnm doesn't seem to expose these values, so we have redefine them
NM_802_11_AP_FLAGS_PRIVACY = 0x00000001
NM_802_11_AP_SEC_NONE = 0x00000000
NM_802_11_AP_SEC_KEY_MGMT_802_1X = 0x00000200
NM_802_11_AP_SEC_KEY_MGMT_EAP_SUITE_B_192 = 0x00002000
NM_802_11_AP_SEC_KEY_MGMT_OWE = 0x00000800
NM_802_11_AP_SEC_KEY_MGMT_OWE_TM = 0x00001000
NM_802_11_AP_SEC_KEY_MGMT_PSK = 0x00000100
NM_802_11_AP_SEC_KEY_MGMT_SAE = 0x00000400
@Gtk.Template(resource_path="/org/vanillaos/Installer/gtk/wireless-row.ui")
class WirelessRow(Adw.ActionRow):
__gtype_name__ = "WirelessRow"
signal_icon = Gtk.Template.Child()
secure_icon = Gtk.Template.Child()
connected_label = Gtk.Template.Child()
def __init__(self, window, client, device: NM.DeviceWifi, ap, **kwargs):
super().__init__(**kwargs)
self.__window = window
self.client = client
self.ap = ap
self.device = device
self.refresh_ui()
self.set_activatable(True)
self.connect("activated", self.__show_connect_dialog)
@property
def ssid(self):
ssid = self.ap.get_ssid()
if ssid is not None:
ssid = ssid.get_data().decode("utf-8")
else:
ssid = ""
return ssid
@property
def signal_strength(self):
return self.ap.get_strength()
@property
def connected(self):
active_connection = self.device.get_active_connection()
if active_connection is not None:
if active_connection.get_id() == self.ssid:
return True
return False
def refresh_ui(self):
# We use the same strength logic as gnome-control-center
strength = self.signal_strength
if strength < 20:
icon_name = "network-wireless-signal-none-symbolic"
elif strength < 40:
icon_name = "network-wireless-signal-weak-symbolic"
elif strength < 50:
icon_name = "network-wireless-signal-ok-symbolic"
elif strength < 80:
icon_name = "network-wireless-signal-good-symbolic"
else:
icon_name = "network-wireless-signal-excellent-symbolic"
self.set_title(self.ssid)
self.signal_icon.set_from_icon_name(icon_name)
secure, tooltip = self.__get_security()
if secure is not None:
if not secure:
self.secure_icon.set_from_icon_name("warning-small-symbolic")
else:
self.secure_icon.set_from_icon_name(
"network-wireless-encrypted-symbolic"
)
self.secure_icon.set_visible(secure is not None)
if tooltip is not None:
self.secure_icon.set_tooltip_text(tooltip)
self.connected_label.set_visible(self.connected)
def __get_security(self) -> tuple[bool | None, str | None]:
flags = self.ap.get_flags()
rsn_flags = self.ap.get_rsn_flags()
wpa_flags = self.ap.get_wpa_flags()
# Copying logic used in gnome-control-center because this is a mess
if (
not (flags & NM_802_11_AP_FLAGS_PRIVACY)
and wpa_flags == NM_802_11_AP_SEC_NONE
and rsn_flags == NM_802_11_AP_SEC_NONE
):
return AP_SECURITY_TYPES["none"]
elif (
(flags & NM_802_11_AP_FLAGS_PRIVACY)
and wpa_flags == NM_802_11_AP_SEC_NONE
and rsn_flags == NM_802_11_AP_SEC_NONE
):
return AP_SECURITY_TYPES["wep"]
elif (
(flags & NM_802_11_AP_FLAGS_PRIVACY)
and wpa_flags != NM_802_11_AP_SEC_NONE
and rsn_flags != NM_802_11_AP_SEC_NONE
):
return AP_SECURITY_TYPES["wpa"]
elif rsn_flags & NM_802_11_AP_SEC_KEY_MGMT_SAE:
return AP_SECURITY_TYPES["sae"]
elif rsn_flags & NM_802_11_AP_SEC_KEY_MGMT_OWE:
return AP_SECURITY_TYPES["owe"]
elif rsn_flags & NM_802_11_AP_SEC_KEY_MGMT_OWE_TM:
return AP_SECURITY_TYPES["owe_tm"]
else:
return AP_SECURITY_TYPES["wpa2"]
@property
def __key_mgmt(self):
# Key management used for the connection. One of "none" (WEP or no
# password protection), "ieee8021x" (Dynamic WEP), "owe" (Opportunistic
# Wireless Encryption), "wpa-psk" (WPA2 + WPA3 personal), "sae" (WPA3
# personal only), "wpa-eap" (WPA2 + WPA3 enterprise) or
# "wpa-eap-suite-b-192" (WPA3 enterprise only).
rsn_flags = self.ap.get_rsn_flags()
wpa_flags = self.ap.get_wpa_flags()
if wpa_flags == NM_802_11_AP_SEC_NONE and rsn_flags == NM_802_11_AP_SEC_NONE:
return "none"
elif rsn_flags & NM_802_11_AP_SEC_KEY_MGMT_802_1X:
return "ieee8021x"
elif rsn_flags & NM_802_11_AP_SEC_KEY_MGMT_EAP_SUITE_B_192:
return "wpa-eap-suite-b-192"
elif (
rsn_flags & NM_802_11_AP_SEC_KEY_MGMT_OWE
or rsn_flags & NM_802_11_AP_SEC_KEY_MGMT_OWE_TM
):
return "owe"
elif rsn_flags & NM_802_11_AP_SEC_KEY_MGMT_PSK:
return "wpa-psk"
elif rsn_flags & NM_802_11_AP_SEC_KEY_MGMT_SAE:
return "sae"
def __show_connect_dialog(self, data):
dialog = NMA4.WifiDialog.new(
self.client, self.__construct_connection(), self.device, self.ap, False
)
dialog.set_modal(True)
dialog.set_transient_for(self.__window)
dialog.connect("response", self.__on_dialog_response)
dialog.show()
def __on_dialog_response(self, dialog, response_id):
def connect_cb(client, result, data):
try:
ac = client.add_and_activate_connection_finish(result)
logger.debug("ActiveConnection {}".format(ac.get_path()))
except Exception as e:
logger.error("Error:", e)
if response_id == -6:
dialog.close()
elif response_id == -5:
conn, _, _ = dialog.get_connection()
self.client.add_and_activate_connection_async(
conn, self.device, self.ap.get_path(), None, connect_cb, None
)
dialog.close()
def __construct_connection(self):
connection = NM.SimpleConnection.new()
s_con = NM.SettingConnection.new()
s_con.set_property(NM.SETTING_CONNECTION_ID, self.ssid)
s_con.set_property(NM.SETTING_CONNECTION_TYPE, "802-11-wireless")
s_wifi = NM.SettingWireless.new()
s_wifi.set_property(NM.SETTING_WIRELESS_SSID, self.ap.get_ssid())
s_wifi.set_property(NM.SETTING_WIRELESS_MODE, "infrastructure")
s_wsec = NM.SettingWirelessSecurity.new()
s_wsec.set_property(NM.SETTING_WIRELESS_SECURITY_KEY_MGMT, self.__key_mgmt)
s_ip4 = NM.SettingIP4Config.new()
s_ip4.set_property(NM.SETTING_IP_CONFIG_METHOD, "auto")
s_ip6 = NM.SettingIP6Config.new()
s_ip6.set_property(NM.SETTING_IP_CONFIG_METHOD, "auto")
connection.add_setting(s_con)
connection.add_setting(s_wifi)
connection.add_setting(s_wsec)
connection.add_setting(s_ip4)
connection.add_setting(s_ip6)
return connection
@Gtk.Template(resource_path="/org/vanillaos/Installer/gtk/default-network.ui")
class VanillaDefaultNetwork(Adw.Bin):
__gtype_name__ = "VanillaDefaultNetwork"
wired_group = Gtk.Template.Child()
wireless_group = Gtk.Template.Child()
hidden_network_row = Gtk.Template.Child()
proxy_settings_row = Gtk.Template.Child()
advanced_group = Gtk.Template.Child()
btn_next = Gtk.Template.Child()
def __init__(self, window, distro_info, key, step, **kwargs):
super().__init__(**kwargs)
self.__window = window
self.__distro_info = distro_info
self.__key = key
self.__step = step
self.__nm_client = NM.Client.new()
self.__devices = []
self.__wired_children = []
self.__wireless_children = {}
self.__last_wifi_scan = 0
# Prevent concurrency issues when re-scanning Wi-Fi devices.
# Since we reload the list every time there's a state change,
# there's a high change that it coincides with a periodic
# refresh operation.
self.__wifi_lock = Lock()
# Since we have a dedicated page for checking connectivity,
# we only need to make sure the user has some type of
# connection set up, be it wired or wireless.
self.has_eth_connection = False
self.has_wifi_connection = False
self.__get_network_devices()
self.__start_auto_refresh()
# TODO: Remove once implemented
self.advanced_group.set_visible(False)
self.__nm_client.connect("device-added", self.__add_new_device)
self.__nm_client.connect("device-added", self.__remove_device)
self.btn_next.connect("clicked", self.__window.next)
self.connect("realize", self.__try_skip_page)
def __try_skip_page(self, data):
# Skip page if already connected to the internet
if self.has_eth_connection or self.has_wifi_connection:
self.__window.next()
@property
def step_id(self):
return self.__key
def get_finals(self):
return {}
def set_btn_next(self, state: bool):
if state:
if not self.btn_next.has_css_class("suggested-action"):
self.btn_next.add_css_class("suggested-action")
self.btn_next.set_sensitive(True)
else:
if self.btn_next.has_css_class("suggested-action"):
self.btn_next.remove_css_class("suggested-action")
self.btn_next.set_sensitive(False)
def __get_network_devices(self):
devices = self.__nm_client.get_devices()
eth_devices = 0
wifi_devices = 0
for device in devices:
if device.is_real():
device_type = device.get_device_type()
if device_type == NM.DeviceType.ETHERNET:
self.__add_ethernet_connection(device)
eth_devices += 1
elif device_type == NM.DeviceType.WIFI:
device.connect("state-changed", self.__on_state_changed)
self.has_wifi_connection = (
device.get_active_connection() is not None
)
self.__refresh_wifi_list(device)
wifi_devices += 1
else:
continue
self.__devices.append(device)
self.wired_group.set_visible(eth_devices > 0)
self.wireless_group.set_visible(wifi_devices > 0)
def __add_new_device(self, client, device):
self.__devices.append(device)
def __remove_device(self, client, device):
self.__devices.remove(device)
def __on_state_changed(self, device, new_state, old_state, reason):
self.has_wifi_connection = device.get_active_connection() is not None
self.__refresh()
def __refresh(self):
for child in self.__wired_children:
self.wired_group.remove(child)
self.__wired_children = []
for device in self.__devices:
device_type = device.get_device_type()
if device_type == NM.DeviceType.ETHERNET:
self.__add_ethernet_connection(device)
elif device_type == NM.DeviceType.WIFI:
self.__scan_wifi(device)
self.set_btn_next(self.has_eth_connection or self.has_wifi_connection)
def __start_auto_refresh(self):
def run_async():
while True:
GLib.idle_add(self.__refresh)
time.sleep(10)
RunAsync(run_async, None)
def __device_status(self, conn: NM.Device):
connected = False
match conn.get_state():
case NM.DeviceState.ACTIVATED:
status = _("Connected")
connected = True
case NM.DeviceState.NEED_AUTH:
status = _("Authentication required")
case [
NM.DeviceState.PREPARE,
NM.DeviceState.CONFIG,
NM.DeviceState.IP_CONFIG,
NM.DeviceState.IP_CHECK,
NM.DeviceState.SECONDARIES,
]:
status = _("Connecting")
case NM.DeviceState.DISCONNECTED:
status = _("Disconnected")
case NM.DeviceState.DEACTIVATING:
status = _("Disconnecting")
case NM.DeviceState.FAILED:
status = _("Connection Failed")
case NM.DeviceState.UNKNOWN:
status = _("Status Unknown")
case NM.DeviceState.UNMANAGED:
status = _("Unmanaged")
case NM.DeviceState.UNAVAILABLE:
status = _("Unavailable")
return status, connected
def __add_ethernet_connection(self, conn: NM.DeviceEthernet):
status, connected = self.__device_status(conn)
if connected:
status += f" - {conn.get_speed()} Mbps"
self.has_eth_connection = True
else:
self.has_eth_connection = False
# Wired devices with no cable plugged in are shown as unavailable
if conn.get_state() == NM.DeviceState.UNAVAILABLE:
status = _("Cable Unplugged")
eth_conn = Adw.ActionRow(title=status)
self.wired_group.add(eth_conn)
self.__wired_children.append(eth_conn)
def __refresh_wifi_list(self, conn: NM.DeviceWifi):
while conn.get_last_scan() == self.__last_wifi_scan:
time.sleep(0.25)
networks = {}
for ap in conn.get_access_points():
ssid = ap.get_ssid()
if ssid is None:
continue
ssid = ssid.get_data().decode("utf-8")
if ssid in networks.keys():
networks[ssid].append(ap)
else:
networks[ssid] = [ap]
self.__wifi_lock.acquire()
# Invalidate current list
for ssid, (child, clean) in self.__wireless_children.items():
self.__wireless_children[ssid] = (child, True)
for ssid, aps in networks.items():
max_strength = -1
best_ap = None
for ap in aps:
ap_strength = ap.get_strength()
if ap_strength > max_strength:
max_strength = ap_strength
best_ap = ap
# Try to re-use entries with the same SSID
if ssid in self.__wireless_children.keys():
child = self.__wireless_children[ssid][0]
child.ap = best_ap
child.refresh_ui()
self.__wireless_children[ssid] = (child, False)
continue
# Create new row if SSID is new
wifi_network = WirelessRow(self.__window, self.__nm_client, conn, best_ap)
self.wireless_group.add(wifi_network)
self.__wireless_children[ssid] = (wifi_network, False)
# Remove invalid rows
invalid_ssids = []
for ssid, (child, clean) in self.__wireless_children.items():
self.wireless_group.remove(child)
if clean:
invalid_ssids.append(ssid)
for ssid in invalid_ssids:
del self.__wireless_children[ssid]
for row in self.__sorted_wireless_children:
self.wireless_group.add(row)
self.__wifi_lock.release()
def __scan_wifi(self, conn: NM.DeviceWifi):
self.__last_wifi_scan = conn.get_last_scan()
conn.request_scan_async()
t = Timer(1.5, self.__refresh_wifi_list, [conn])
t.start()
@property
def __sorted_wireless_children(self):
def multisort(xs, specs):
for key, reverse in reversed(specs):
xs.sort(key=attrgetter(key), reverse=reverse)
return xs
# 1 - Is connected
# 2 - Signal strength
# 3 - Alphabetically
return multisort(
[it[0] for it in list(self.__wireless_children.values())],
(("connected", True), ("signal_strength", True), ("ssid", True)),
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment