Skip to content

Instantly share code, notes, and snippets.

@taylor224
Created April 28, 2015 16:50
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save taylor224/0c63e8bc2262ae4a60f1 to your computer and use it in GitHub Desktop.
Save taylor224/0c63e8bc2262ae4a60f1 to your computer and use it in GitHub Desktop.
WiFi Client Service
# /usr/local/lib/python2.7/dist-packages/wifi/scheme.py
import re
import itertools
import wifi.subprocess_compat as subprocess
from pbkdf2 import PBKDF2
from wifi.utils import ensure_file_exists
from wifi.exceptions import ConnectionError
def configuration(cell, passkey=None):
"""
Returns a dictionary of configuration options for cell
Asks for a password if necessary
"""
if not cell.encrypted:
return {
'wireless-essid': cell.ssid,
'wireless-channel': 'auto',
}
else:
if cell.encryption_type.startswith('wpa'):
if len(passkey) != 64:
passkey = PBKDF2(passkey, cell.ssid, 4096).hexread(32)
return {
'wpa-ssid': cell.ssid,
'wpa-psk': passkey,
'wireless-channel': 'auto',
}
elif cell.encryption_type == 'wep':
# Pass key lengths in bytes for WEP depend on type of key and key length:
#
# 64bit 128bit 152bit 256bit
# hex 10 26 32 58
# ASCII 5 13 16 29
#
# (source: https://en.wikipedia.org/wiki/Wired_Equivalent_Privacy)
#
# ASCII keys need to be prefixed with an s: in the interfaces file in order to work with linux' wireless
# tools
ascii_lengths = (5, 13, 16, 29)
if len(passkey) in ascii_lengths:
# we got an ASCII passkey here (otherwise the key length wouldn't match), we'll need to prefix that
# with s: in our config for the wireless tools to pick it up properly
passkey = "s:" + passkey
return {
'wireless-essid': cell.ssid,
'wireless-key': passkey,
}
else:
raise NotImplementedError
bound_ip_re = re.compile(r'^bound to (?P<ip_address>\S+)', flags=re.MULTILINE)
class Scheme(object):
"""
Saved configuration for connecting to a wireless network. This
class provides a Python interface to the /etc/network/interfaces
file.
"""
interfaces = '/etc/network/interfaces'
@classmethod
def for_file(cls, interfaces):
"""
A class factory for providing a nice way to specify the interfaces file
that you want to use. Use this instead of directly overwriting the
interfaces Class attribute if you care about thread safety.
"""
return type(cls)(cls.__name__, (cls,), {
'interfaces': interfaces,
})
def __init__(self, interface, name, options=None):
self.interface = interface
self.name = name.replace(" ", "__0__")
self.options = options or {}
def __str__(self):
"""
Returns the representation of a scheme that you would need
in the /etc/network/interfaces file.
"""
iface = "iface {interface}-{name} inet dhcp".format(**vars(self))
options = ''.join("\n {k} {v}".format(k=k, v=v) for k, v in self.options.items())
return iface + options + '\n'
def __repr__(self):
return 'Scheme(interface={interface!r}, name={name!r}, options={options!r}'.format(**vars(self))
@classmethod
def all(cls):
"""
Returns an generator of saved schemes.
"""
ensure_file_exists(cls.interfaces)
with open(cls.interfaces, 'r') as f:
return extract_schemes(f.read(), scheme_class=cls)
@classmethod
def where(cls, fn):
return list(filter(fn, cls.all()))
@classmethod
def find(cls, interface, name):
"""
Returns a :class:`Scheme` or `None` based on interface and
name.
"""
try:
return cls.where(lambda s: s.interface == interface and s.name == name.replace(" ", "__0__"))[0]
except IndexError:
return None
@classmethod
def for_cell(cls, interface, name, cell, passkey=None):
"""
Intuits the configuration needed for a specific
:class:`Cell` and creates a :class:`Scheme` for it.
"""
return cls(interface, name, configuration(cell, passkey))
def save(self):
"""
Writes the configuration to the :attr:`interfaces` file.
"""
assert not self.find(self.interface, self.name), "This scheme already exists"
with open(self.interfaces, 'a') as f:
f.write('\n')
f.write(str(self))
def delete(self):
"""
Deletes the configuration from the :attr:`interfaces` file.
"""
iface = "iface %s-%s inet dhcp" % (self.interface, self.name)
content = ''
with open(self.interfaces, 'r') as f:
skip = False
for line in f:
if not line.strip():
skip = False
elif line.strip() == iface:
skip = True
if not skip:
content += line
with open(self.interfaces, 'w') as f:
f.write(content)
@property
def iface(self):
return '{0}-{1}'.format(self.interface, self.name)
def as_args(self):
args = list(itertools.chain.from_iterable(
('-o', '{k}={v}'.format(k=k, v=v)) for k, v in self.options.items()))
return [self.interface + '=' + self.iface] + args
def activate(self):
"""
Connects to the network as configured in this scheme.
"""
subprocess.check_output(['/sbin/ifdown', self.interface], stderr=subprocess.STDOUT)
ifup_output = subprocess.check_output(['/sbin/ifup'] + self.as_args(), stderr=subprocess.STDOUT)
ifup_output = ifup_output.decode('utf-8')
return self.parse_ifup_output(ifup_output)
def parse_ifup_output(self, output):
matches = bound_ip_re.search(output)
if matches:
return Connection(scheme=self, ip_address=matches.group('ip_address'))
else:
raise ConnectionError("Failed to connect to %r" % self)
class Connection(object):
"""
The connection object returned when connecting to a Scheme.
"""
def __init__(self, scheme, ip_address):
self.scheme = scheme
self.ip_address = ip_address
# TODO: support other interfaces
scheme_re = re.compile(r'iface\s+(?P<interface>wlan\d?)(?:-(?P<name>\w+))?')
def extract_schemes(interfaces, scheme_class=Scheme):
lines = interfaces.splitlines()
while lines:
line = lines.pop(0)
if line.startswith('#') or not line:
continue
match = scheme_re.match(line)
if match:
options = {}
interface, scheme = match.groups()
if not scheme or not interface:
continue
while lines and lines[0].startswith(' '):
key, value = re.sub(r'\s{2,}', ' ', lines.pop(0).strip()).split(' ', 1)
options[key] = value
scheme = scheme_class(interface, scheme, options)
yield scheme
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import wifi
import bluetooth
import subprocess
import json
import time
def Search():
wifilist = []
cells = None
cells = wifi.Cell.all('wlan0')
return cells
def SearchForCommand():
wifilist = []
cells = None
try:
cells = wifi.Cell.all('wlan0')
except wifi.exceptions.InterfaceError:
return '<busy>'
except:
return '<exception>'
for cell in cells:
celldata = {
'ssid': cell.ssid,
'signal': cell.signal,
'quality': cell.quality,
'frequency': cell.frequency,
'bitrates': cell.bitrates,
'encrypted': cell.encrypted,
'channel': cell.channel,
'address': cell.address,
'mode': cell.mode
}
wifilist.append(celldata)
return wifilist
def FindFromSearchList(ssid):
wifilist = Search()
for cell in wifilist:
if cell.ssid == ssid:
return cell
return False
def FindFromSavedList(ssid):
cell = wifi.Scheme.find('wlan0', ssid)
if cell:
return cell
return False
def Connect(ssid, password=None):
cell = FindFromSearchList(ssid)
if cell:
savedcell = FindFromSavedList(cell.ssid)
# Already Saved from Setting
if savedcell:
savedcell.activate()
return cell
# First time to conenct
else:
if cell.encrypted:
if password:
scheme = Add(cell, password)
try:
scheme.activate()
# Wrong Password
except wifi.exceptions.ConnectionError:
Delete(ssid)
return '<wrong_password>'
return cell
else:
return '<require_password>'
else:
scheme = Add(cell)
try:
scheme.activate()
except wifi.exceptions.ConnectionError:
Delete(ssid)
return '<connect_error>'
return cell
else:
return '<not_found>'
def Add(cell, password=None):
if not cell:
return False
scheme = wifi.Scheme.for_cell('wlan0', cell.ssid, cell, password)
scheme.save()
return scheme
def Delete(ssid):
if not ssid:
return False
cell = FindFromSavedList(ssid)
if cell:
cell.delete()
connectedAP = GetConnectedAP()
if connectedAP == ssid:
subprocess.check_output('ifdown wlan0', shell=True)
subprocess.check_output('ifup wlan0', shell=True)
return True
return False
def GetConnectedAP():
popresult = None
try:
popresult = subprocess.check_output('iwconfig | grep wlan0', shell=True)
except:
return False
if not popresult:
return False
if 'ESSID:off/any' in popresult:
return '<not_connected>'
if not len(popresult.split('\"')) > 2:
return False
ssid = popresult.split('\"')[1]
if ssid == '' or not ssid:
return False
else:
return ssid
def CommandProcess(data):
command = ''
try:
command = data.get('command')
print '[ALERT] Command : ', command
# Connect WiFi
if command == 'connect':
ssid = data.get('ssid')
password = data.get('password')
if not ssid:
return ResultProcess(command, 'no_argument')
result = Connect(ssid, password)
if type(result) is str:
return ResultProcess(command, result.replace('<', '').replace('>', ''), ssid)
if type(result) is wifi.Cell:
return ResultProcess(command, 'success', ssid)
if not result:
return ResultProcess(command, 'connect_fail', ssid)
# Delete WiFi
if command == 'delete':
ssid = data.get('ssid')
if not ssid:
return ResultProcess(command, 'no_argument')
if Delete(ssid):
return ResultProcess(command, 'success', ssid)
else:
return ResultProcess(command, 'delete_fail', ssid)
# Search WiFi
if command == 'search':
searchlist = SearchForCommand()
if searchlist == '<busy>':
return ResultProcess(command, 'busy')
if searchlist == '<exception>':
return ResultProcess(command, 'exception')
if not searchlist:
return ResultProcess(command, 'no_result')
return ResultProcess(command, 'success', searchlist)
# Get Current WiFi
if command == 'current':
ssid = GetConnectedAP()
if not ssid:
return ResultProcess(command, 'exception')
if ssid == '<not_connected>':
return ResultProcess(command, 'not_connected')
return ResultProcess(command, 'success', ssid)
except wifi.exceptions.InterfaceError, e:
return ResultProcess(command, str(e))
except Exception, e:
print '[ALERT] Command Process Exception : ', e
return ResultProcess(command, str(e))
return ResultProcess(command, 'program_fault')
def ResultProcess(command, result, data=None):
jsonresult = ''
if not command or not result:
return False
if data:
jsonresult = {
'command' : command,
'result' : result,
'data' : data
}
else:
jsonresult = {
'command' : command,
'result' : result
}
return json.dumps(jsonresult)
if __name__ == '__main__':
print '[INFO] Sleep 5 second to Initialize'
time.sleep(5)
blueSocket=bluetooth.BluetoothSocket(bluetooth.RFCOMM)
blueSocket.bind(("", 1))
blueSocket.listen(1)
port = blueSocket.getsockname()[1]
uuid = "94f39d29-7d6d-437d-973b-fba39e49d4ee"
bluetooth.advertise_service( blueSocket, "WiFiAssistance",
service_id = uuid,
service_classes = [uuid, bluetooth.SERIAL_PORT_CLASS],
profiles = [bluetooth.SERIAL_PORT_PROFILE]
)
print "[INFO] Waiting for connection on RFCOMM channel %d" % port
clientSocket, clientInfo = blueSocket.accept()
print "[ALERT] Accepted connection from ", clientInfo
try:
while True:
data = clientSocket.recv(4096)
if len(data) == 0: break
print "received [%s]" % data
try:
data = json.loads(data)
result = CommandProcess(data)
if result:
clientSocket.send(result)
else:
pass
except Exception, e:
print e
ResultProcess('error', str(e))
except IOError:
pass
print "[INFO] Bluetooth Disconnected"
clientSocket.close()
blueSocket.close()
print "Program Finished"
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment