Last active
June 15, 2018 18:13
-
-
Save Thomascountz/753b6c2d8ed96ac39afeae1980885caa to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class LightBulb | |
attr_reader :state | |
def initialize | |
self.state = :off | |
end | |
def turn_on | |
self.state = :on | |
end | |
private | |
attr_writer :state | |
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class LightBulb | |
attr_reader :state | |
def initialize | |
self.state = :off | |
end | |
def turn_on | |
self.state = :on | |
end | |
def turn_off | |
self.state = :off | |
end | |
private | |
attr_writer :state | |
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class LightBulb | |
attr_reader :state | |
def initialize | |
self.state = :off | |
end | |
def turn_on | |
if state == :on | |
false | |
else | |
self.state = :on | |
true | |
end | |
end | |
def turn_off | |
if state == :off | |
false | |
else | |
self.state = :off | |
true | |
end | |
end | |
private | |
attr_writer :state | |
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class LightBulb | |
def initialize | |
self.state = :off | |
end | |
def turn_on | |
state.turn_on(self) | |
end | |
def turn_off | |
state.turn_off(self) | |
end | |
def state=(state) | |
@state = LightBulbState.for(state) | |
end | |
def state | |
@state | |
end | |
end | |
class LightBulbState | |
def self.for(state) | |
if state == :on | |
LightBulbOnState | |
elsif state == :off | |
LightBulbOffState | |
end.new | |
end | |
end | |
class LightBulbOnState < LightBulbState | |
attr_reader :state | |
def initialize | |
@state = :on | |
end | |
def turn_on(light_bulb) | |
false | |
end | |
def turn_off(light_bulb) | |
light_bulb.state = :off | |
true | |
end | |
end | |
class LightBulbOffState < LightBulbState | |
attr_reader :state | |
def initialize | |
@state = :off | |
end | |
def turn_on(light_bulb) | |
light_bulb.state = :on | |
true | |
end | |
def turn_off(light_bulb) | |
false | |
end | |
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class LightBulb | |
attr_reader :state | |
def initialize | |
self.state = :off | |
end | |
def turn_on | |
if state == :on | |
false | |
else | |
self.state = :on | |
true | |
end | |
end | |
def turn_off | |
if state == :off | |
false | |
else | |
self.state = :off | |
true | |
end | |
end | |
def dim | |
if state == :off | |
false | |
elsif state == :dimmed | |
self.state = :on | |
true | |
else | |
self.state = :dimmed | |
true | |
end | |
end | |
private | |
attr_writer :state | |
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class LightBulb | |
def initialize | |
self.state = :off | |
end | |
def turn_on | |
state.turn_on(self) | |
end | |
def turn_off | |
state.turn_off(self) | |
end | |
def dim | |
state.dim(self) | |
end | |
def state=(state) | |
@state = LightBulbState.for(state) | |
end | |
def state | |
@state | |
end | |
end | |
class LightBulbState | |
def self.for(state) | |
if state == :on | |
LightBulbOnState | |
elsif state == :off | |
LightBulbOffState | |
elsif state == :dimmed | |
LightBulbDimmedState | |
end.new | |
end | |
end | |
class LightBulbOnState < LightBulbState | |
attr_reader :state | |
def initialize | |
@state = :on | |
end | |
def turn_on(light_bulb) | |
false | |
end | |
def turn_off(light_bulb) | |
light_bulb.state = :off | |
true | |
end | |
def dim(light_bulb) | |
light_bulb.state = :dimmed | |
true | |
end | |
end | |
class LightBulbOffState < LightBulbState | |
attr_reader :state | |
def initialize | |
@state = :off | |
end | |
def turn_on(light_bulb) | |
light_bulb.state = :on | |
true | |
end | |
def turn_off(light_bulb) | |
false | |
end | |
def dim(light_bulb) | |
false | |
end | |
end | |
class LightBulbDimmedState < LightBulbState | |
attr_reader :state | |
def initialize | |
@state = :dimmed | |
end | |
def turn_on(light_bulb) | |
light_bulb.state = :on | |
true | |
end | |
def turn_off(light_bulb) | |
light_bulb.state = :off | |
true | |
end | |
def dim(light_bulb) | |
light_bulb.state = :on | |
true | |
end | |
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class WebSocket(object): | |
def connect(self, uri, origin=None, protocols=[]): | |
"""Establishes a new connection to a WebSocket server. | |
This method connects to the host specified by uri and | |
negotiates a WebSocket connection. origin should be specified | |
in accordance with RFC 6454 if known. A list of valid | |
sub-protocols can be specified in the protocols argument. | |
The data will be sent in the clear if the "ws" scheme is used, | |
and encrypted if the "wss" scheme is used. | |
Both WebSocketWantReadError and WebSocketWantWriteError can be | |
raised whilst negotiating the connection. Repeated calls to | |
connect() must retain the same arguments. | |
""" | |
self.client = True; | |
uri = urlparse(uri) | |
port = uri.port | |
if uri.scheme in ("ws", "http"): | |
if not port: | |
port = 80 | |
elif uri.scheme in ("wss", "https"): | |
if not port: | |
port = 443 | |
else: | |
raise Exception("Unknown scheme '%s'" % uri.scheme) | |
# This is a state machine in order to handle | |
# WantRead/WantWrite events | |
if self._state == "new": | |
self.socket = socket.create_connection((uri.hostname, port)) | |
if uri.scheme in ("wss", "https"): | |
self.socket = ssl.wrap_socket(self.socket) | |
self._state = "ssl_handshake" | |
else: | |
self._state = "headers" | |
if self._state == "ssl_handshake": | |
self.socket.do_handshake() | |
self._state = "headers" | |
if self._state == "headers": | |
self._key = '' | |
for i in range(16): | |
self._key += chr(random.randrange(256)) | |
if sys.hexversion >= 0x3000000: | |
self._key = bytes(self._key, "latin-1") | |
self._key = b64encode(self._key).decode("ascii") | |
path = uri.path | |
if not path: | |
path = "/" | |
self._queue_str("GET %s HTTP/1.1\r\n" % path) | |
self._queue_str("Host: %s\r\n" % uri.hostname) | |
self._queue_str("Upgrade: websocket\r\n") | |
self._queue_str("Connection: upgrade\r\n") | |
self._queue_str("Sec-WebSocket-Key: %s\r\n" % self._key) | |
self._queue_str("Sec-WebSocket-Version: 13\r\n") | |
if origin is not None: | |
self._queue_str("Origin: %s\r\n" % origin) | |
if len(protocols) > 0: | |
self._queue_str("Sec-WebSocket-Protocol: %s\r\n" % ", ".join(protocols)) | |
self._queue_str("\r\n") | |
self._state = "send_headers" | |
if self._state == "send_headers": | |
self._flush() | |
self._state = "response" | |
if self._state == "response": | |
if not self._recv(): | |
raise Exception("Socket closed unexpectedly") | |
if self._recv_buffer.find('\r\n\r\n'.encode("ascii")) == -1: | |
raise WebSocketWantReadError | |
(request, self._recv_buffer) = self._recv_buffer.split('\r\n'.encode("ascii"), 1) | |
request = request.decode("latin-1") | |
words = request.split() | |
if (len(words) < 2) or (words[0] != "HTTP/1.1"): | |
raise Exception("Invalid response") | |
if words[1] != "101": | |
raise Exception("WebSocket request denied: %s" % " ".join(words[1:])) | |
(headers, self._recv_buffer) = self._recv_buffer.split('\r\n\r\n'.encode("ascii"), 1) | |
headers = headers.decode('latin-1') + '\r\n' | |
headers = email.message_from_string(headers) | |
if headers.get("Upgrade", "").lower() != "websocket": | |
print(type(headers)) | |
raise Exception("Missing or incorrect upgrade header") | |
accept = headers.get('Sec-WebSocket-Accept') | |
if accept is None: | |
raise Exception("Missing Sec-WebSocket-Accept header"); | |
expected = sha1((self._key + self.GUID).encode("ascii")).digest() | |
expected = b64encode(expected).decode("ascii") | |
del self._key | |
if accept != expected: | |
raise Exception("Invalid Sec-WebSocket-Accept header"); | |
self.protocol = headers.get('Sec-WebSocket-Protocol') | |
if len(protocols) == 0: | |
if self.protocol is not None: | |
raise Exception("Unexpected Sec-WebSocket-Protocol header") | |
else: | |
if self.protocol not in protocols: | |
raise Exception("Invalid protocol chosen by server") | |
self._state = "done" | |
return | |
raise Exception("WebSocket is in an invalid state") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class WebSocketState(object): | |
@staticmethod | |
def factory(state): | |
if state == "new": | |
return WebSocketNewState() | |
if state == "ssl_handshake": | |
return WebSocketSSLHandshakeState() | |
if state == "headers": | |
return WebSocketHeadersState() | |
if state == "send_headers": | |
return WebSocketSendHeadersState() | |
if state == "response": | |
return WebSocketResponseState() | |
if state == "done": | |
return WebSocketDoneState() | |
if state == "flush": | |
return WebSocketFlushState() | |
class WebSocketNewState(WebSocketState): | |
@property | |
def state(self): | |
return "new" | |
@staticmethod | |
def connect(web_socket, uri, origin=None, protocols=[]): | |
web_socket.client = True | |
uri = urlparse(uri) | |
port = uri.port | |
if uri.scheme in ("ws", "http"): | |
if not port: | |
port = 80 | |
elif uri.scheme in ("wss", "https"): | |
if not port: | |
port = 443 | |
else: | |
raise Exception("Unknown scheme '%s'" % uri.scheme) | |
web_socket.socket = socket.create_connection((uri.hostname, port)) | |
if uri.scheme in ("wss", "https"): | |
web_socket.socket = ssl.wrap_socket(web_socket.socket) | |
web_socket.state = "ssl_handshake" | |
else: | |
web_socket.state = "headers" | |
class WebSocketSSLHandshakeState(WebSocketState): | |
@property | |
def state(self): | |
return "ssl_handshake" | |
@staticmethod | |
def connect(web_socket, uri, origin=None, protocols=[]): | |
web_socket.socket.do_handshake() | |
web_socket.state = "headers" | |
class WebSocketHeadersState(WebSocketState): | |
@property | |
def state(self): | |
return "headers" | |
@staticmethod | |
def connect(web_socket, uri, origin=None, protocols=[]): | |
web_socket._key = '' | |
for i in range(16): | |
web_socket._key += chr(random.randrange(256)) | |
if sys.hexversion >= 0x3000000: | |
web_socket._key = bytes(web_socket._key, "latin-1") | |
web_socket._key = b64encode(web_socket._key).decode("ascii") | |
path = uri.path | |
if not path: | |
path = "/" | |
web_socket._queue_str("GET %s HTTP/1.1\r\n" % path) | |
web_socket._queue_str("Host: %s\r\n" % uri.hostname) | |
web_socket._queue_str("Upgrade: websocket\r\n") | |
web_socket._queue_str("Connection: upgrade\r\n") | |
web_socket._queue_str("Sec-WebSocket-Key: %s\r\n" % web_socket._key) | |
web_socket._queue_str("Sec-WebSocket-Version: 13\r\n") | |
if origin is not None: | |
web_socket._queue_str("Origin: %s\r\n" % origin) | |
if len(protocols) > 0: | |
web_socket._queue_str( | |
"Sec-WebSocket-Protocol: %s\r\n" % ", ".join(protocols)) | |
web_socket._queue_str("\r\n") | |
web_socket.state = "send_headers" | |
class WebSocketSendHeadersState(WebSocketState): | |
@property | |
def state(self): | |
return "send_headers" | |
@staticmethod | |
def connect(web_socket, uri, origin=None, protocols=[]): | |
web_socket._flush() | |
web_socket.state = "response" | |
class WebSocketResponseState(WebSocketState): | |
@property | |
def state(self): | |
return "response" | |
@staticmethod | |
def connect(web_socket, uri, origin=None, protocols=[]): | |
if not web_socket._recv(): | |
raise Exception("Socket closed unexpectedly") | |
if web_socket._recv_buffer.find('\r\n\r\n'.encode("ascii")) == -1: | |
raise WebSocketWantReadError | |
(request, web_socket._recv_buffer) = web_socket._recv_buffer.split( | |
'\r\n'.encode("ascii"), 1) | |
request = request.decode("latin-1") | |
words = request.split() | |
if (len(words) < 2) or (words[0] != "HTTP/1.1"): | |
raise Exception("Invalid response") | |
if words[1] != "101": | |
raise Exception("WebSocket request denied: %s" % | |
" ".join(words[1:])) | |
(headers, web_socket._recv_buffer) = web_socket._recv_buffer.split( | |
'\r\n\r\n'.encode("ascii"), 1) | |
headers = headers.decode('latin-1') + '\r\n' | |
headers = email.message_from_string(headers) | |
if headers.get("Upgrade", "").lower() != "websocket": | |
print(type(headers)) | |
raise Exception("Missing or incorrect upgrade header") | |
accept = headers.get('Sec-WebSocket-Accept') | |
if accept is None: | |
raise Exception("Missing Sec-WebSocket-Accept header") | |
expected = sha1( | |
(web_socket._key + web_socket.GUID).encode("ascii")).digest() | |
expected = b64encode(expected).decode("ascii") | |
del web_socket._key | |
if accept != expected: | |
raise Exception("Invalid Sec-WebSocket-Accept header") | |
web_socket.protocol = headers.get('Sec-WebSocket-Protocol') | |
if len(protocols) == 0: | |
if web_socket.protocol is not None: | |
raise Exception("Unexpected Sec-WebSocket-Protocol header") | |
else: | |
if web_socket.protocol not in protocols: | |
raise Exception("Invalid protocol chosen by server") | |
web_socket.state = "done" | |
class WebSocketDoneState(WebSocketState): | |
@property | |
def state(self): | |
return "done" | |
@staticmethod | |
def connect(web_socket, uri, origin=None, protocols=[]): | |
raise Exception("WebSocket is in an invalid state") | |
class WebSocketFlushState(WebSocketState): | |
@property | |
def state(self): | |
return "flush" | |
@staticmethod | |
def connect(web_socket, uri, origin=None, protocols=[]): | |
raise Exception("WebSocket is in an invalid state") | |
class WebSocket(object): | |
@property | |
def state_obj(self): | |
return self._state | |
@property | |
def state(self): | |
return self._state.state | |
# | |
@state.setter | |
def state(self, value): | |
self._state = WebSocketState.factory(value) | |
# | |
@state.deleter | |
def state(self): # again, name must be the same | |
del self._state | |
def connect(self, uri, origin=None, protocols=[]): | |
self.state_obj.connect(self, uri, origin=None, protocols=[]) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment