Calculate length of the list
>>> len(["foo", "bar", "baz"])
3
List concatenation
>>> ["foo", "bar"] + ["baz"]
['foo', 'bar', 'baz']
Add an item to the back of the list
>>> list = ["foo", "bar"]
>>> list.append("baz")
>>> list
['foo', 'bar', 'baz']
Remove an item from the back of the list
>>> list = ["foo", "bar"]
>>> list.pop()
'bar'
>>> list
['foo']
>>>
Remove an item from the list
>>> list = ["foo", "bar", "baz"]
>>> list.remove("bar")
>>> list
['foo', 'baz']
Slicing
>>> list = ["foo", "bar", "baz"]
>>> list = ["a", "b", "c", "d", "e", "f"]
>>> list[1:3]
['b', 'c']
>>> list[:3]
['a', 'b', 'c']
>>> list[1:]
['b', 'c', 'd', 'e', 'f']
>>> list[:]
['a', 'b', 'c', 'd', 'e', 'f']
Sum of the list
def sum(list):
if len(list) == 0:
return 0
else:
return list[0] + sum(list[1:])
>>> sum([1, 2, 3, 4, 5])
15
Check if an item is in the list
def exists(list, item):
if len(list) == 0:
return False
elif list[0] == item:
return True
else:
return exists(list[1:], item)
Find the maximum item in the list
def maximum(list):
if len(list) == 0:
return None
elif len(list) == 1:
return list[0]
else:
item = list[0]
max_item = maximum(list[1:])
if item > max_item:
return item
else:
return max_item
Generate all even numbers of 0 to 99
# method 1
[ x for x in range(100) if x % 2 == 0 ]
# method 2
[ x for x in range(0, 100, 2) ]
Calculate the first 10 square numbers (0 included)
[ x * x for x in range(10) ]
Spawning threads in python
import threading
class MyThread(threading.Thread):
def __init__(self):
super().__init__()
# Initialization goes here
def run(self):
# Body of the thread goes here
# Instantiate a MyThread object
my_thread = MyThread()
# Start the thread
my_thread.start()
Spawning threads (alternative method)
import threading
def task():
# Body of the thread goes here
my_task = threading.Thread(target=task)
my_task.start()
File I/O
# read from a file
with open("data.txt", "r") as f:
data = f.read()
# write to a file
with open("data.txt", "w") as f:
f.write(data)
Pickle
import pickle
# write to a file
with open("my_data.dat", "w") as f:
my_data = {"foo": 1}
pickle.dump(my_data, f)
# read from a file
with open("my_data.dat", "r") as f:
my_data = pickle.load(f)
JSON
import json
# write to a file
with open("my_data.dat", "w") as f:
my_data = {"foo": 1}
json.dump(my_data, f)
# read from a file
with open("my_data.dat", "r") as f:
my_data = json.load(f)
XML using ElementTree
<!-- product.xml -->
<?xml version="1.0" ?>
<shop>
<product id="1">
<name>Apple</name>
<price>10.5</price>
</product>
<product id="2">
<name>Banana</name>
<price>5.0</price>
</product>
</shop>
import xml.etree.ElementTree as ET
tree = ET.parse("product.xml")
# Get the tree root (the outer most 'shop' tag)
root = tree.getroot()
# Accessing element via index
root[0] # The first 'product' tag with id = 1
root[0][1] # The 'price' tag of the first product
# Accessing attributes
root[0].attrib["id"] # => "1", i.e. the id attribute of the 1st product
# Accessing text content
root[0][0].text # => "Apple", i.e. the name of the first product
# Iterate all products
for product in root:
print(product[0].text)
print(product[1].text)
# Save the modified document
tree.write("modified_product.xml")
Using threading.Lock / threading.RLock
import threading
lock = threading.Lock() # or threading.RLock()
# method 1
lock.acquire()
# Perform work inside the critical section
lock.release()
# method 2
with lock:
# Perform work inside the critical section
Using threading.Condition
import threading
cond = threading.Condition()
# The thread wait until being notify
with cond:
cond.wait()
# Wake up ONE waiting thread
with cond:
cond.notify()
# Wake up ALL waiting threads
with cond:
cond.notify_all()
Example: update a shared counter
import threading
counter = 0
lock = threading.Lock()
def task():
global counter
for i in range(10000):
with lock:
counter += 1
thread1 = threading.Thread(target=task)
thread2 = threading.Thread(target=task)
thread1.start()
thread2.start()
Example: consumer and producer
import threading
buffer = []
max_length = 10
cond = threading.Condition()
def producer_task():
for item in range(1000):
with cond:
while len(buffer) >= max_length:
cond.wait()
buffer.append(item)
cond.notify_all()
def consumer_task():
acc = 0
for item in range(1000):
with cond:
while len(buffer) == 0:
cond.wait()
acc += buffer.pop()
cond.notify_all()
print("acc is %s" % acc)
producer = threading.Thread(target=producer_task)
consumer = threading.Thread(target=consumer_task)
producer.start()
consumer.start()
TCP Server
from socket import socket, AF_INET, SOCK_STREAM
server_socket = socket(AF_INET, SOCK_STREAM)
server_socket.bind(("localhost", 5000)) # (host, port)
server_socket.listen(100) # backlog
while True:
client_socket, address = server_socket.accept()
# Receive message from client
data = client_socket.recv(1024) # buffer size
# Send message to client
client_socket.send(data)
client_socket.close()
TCP Client
from socket import socket, AF_INET, SOCK_STREAM
client_socket = socket(AF_INET, SOCK_STREAM)
client_socket.connect(("localhost", 5000)) # (host, port)
# Receive message from server
data = client_socket.recv(1024) # buffer size
# Send message to server
client_socket.send(data)
client_socket.close()
UDP Client / Server
from socket import socket, AF_INET, SOCK_DGRAM
local_address = ("localhost", 5000)
remote_address = ("localhost", 5001)
sock = socket(AF_INET, SOCK_DGRAM)
sock.bind(local_address)
# Receive message from anywhere (not just the server)
data, addr = sock.recvfrom(1024) # buffer size
# Send message to a remote address
sock.sendto(data, remote_address)
sock.close()
Web Service
from resty import PathDispatcher
from wsgiref.simple_server import make_server
def handler(environ, start_response):
start_response("200", [{"Content-Type": "text/html"}])
yield "This is the response body"
dispatcher = PathDispatcher()
dispatcher.register("GET", "/pathname", handler)
httpd = make_server("", 8080, dispatcher)
httpd.serve_forever()