Skip to content

Instantly share code, notes, and snippets.

@buxx
Created September 2, 2021 10:22
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save buxx/548e2724ca912209e28bbfbfc506c496 to your computer and use it in GitHub Desktop.
Save buxx/548e2724ca912209e28bbfbfc506c496 to your computer and use it in GitHub Desktop.
Quelques concepts python
import asyncio
"""
asyncio: résoudre les problèmes d'attente
"""
class Progress:
def __init__(self, start: float):
self.value = start
async def telecharger_un_fichier(progression: Progress):
progression.value = 1.0
await asyncio.sleep(1.0)
progression.value = 25.0
await asyncio.sleep(1.0)
progression.value = 55.0
await asyncio.sleep(1.0)
progression.value = 99.0
await asyncio.sleep(1.0)
progression.value = 100.0
async def afficher_progression(progression: Progress):
print(f"{progression.value}%")
previous_progress = progression.value
while progression.value != 100.0:
await asyncio.sleep(0.25)
if previous_progress != progression.value:
print(f"{progression.value}%")
previous_progress = progression.value
print("Done !")
async def main():
progress = Progress(0.0)
await asyncio.gather(
telecharger_un_fichier(progress),
afficher_progression(progress),
)
asyncio.run(main())
"""
Gestionnaire de contexte : appliquer un comportement avant et après
"""
class MonFichier(object):
def __init__(self, file_name: str):
self.file = open(file_name, "w+")
def __enter__(self):
print("ouverture du fichier")
return self.file
def __exit__(self, exception_type, value, traceback):
print(f"fermeture du fichier ({exception_type}, {value}, {traceback})")
self.file.close()
import contextlib
@contextlib.contextmanager
def mon_fichier(file_name: str):
print("ouverture du fichier")
file = open(file_name, "w+")
try:
yield file
finally:
print(f"fermeture du fichier")
file.close()
print("Utilisation sans accro")
with MonFichier("/tmp/toto.txt") as f:
f.write("hello world")
print()
print("Utilisation avec accro")
with MonFichier("/tmp/toto.txt") as f:
1/0
"""
Décorateurs : Remplacement dune fonction par une autre
"""
# Décorateur sans arguments
def mon_decorateur(fonction_decore):
def ma_fonction_de_remplacement(*args, **kwargs):
print("print depuis ma_fonction_de_remplacement")
return fonction_decore(*args, **kwargs)
return ma_fonction_de_remplacement
# Décorateur avec arguments
def mon_constructeur_de_decorateur(un_argument: str):
def mon_decorateur(fonction_decore):
def ma_fonction_de_remplacement(*args, **kwargs):
print(f"print depuis ma_fonction_de_remplacement, j'aime jouer du: {un_argument}")
return fonction_decore(*args, **kwargs)
return ma_fonction_de_remplacement
return mon_decorateur
@mon_decorateur
def ma_fonction_a_decorer(a: int) -> int:
return a + 1
@mon_constructeur_de_decorateur("trombonne")
def ma_fonction_a_decorer2(a: int) -> int:
return a + 1
print("Décorateur sans arguments")
print(ma_fonction_a_decorer)
print(ma_fonction_a_decorer(41))
print()
print("Décorateur avec arguments")
print(ma_fonction_a_decorer2)
print(ma_fonction_a_decorer2(41))
"""
Générateur : un stream
"""
def generator_function():
yield 1
yield 1
yield 2
print("Comme un iterateur")
for i in generator_function():
print(i)
print("Comme un objet gnerateur")
generateur = generator_function()
print(next(generateur))
print(next(generateur))
print(next(generateur))
print()
print("Si on l'appelle une fois de trop")
print(next(generateur))
print(next(generateur))
print(next(generateur))
print(next(generateur))
print()
print("Envoyer une valeur au generateur")
# Recevoir des valeurs dans le generateur
def generateur_pair_ou_impair():
filtre = True
input_ = None
for i in range(20):
if (i % 2 == 0) == filtre:
input_ = yield i
if input_ is not None:
if input_ == "pair":
filtre = True
elif input_ == "impair":
filtre = False
gen = generateur_pair_ou_impair()
for i in gen:
print(i)
if i == 10:
print(gen.send("impair"))
"""
Processing : code concurrent
"""
import multiprocessing
import time
def recevoir_meteo(queue_: multiprocessing.Queue, stop_signal_: multiprocessing.Event):
counter = 0
soleil = True
while not stop_signal_.is_set():
counter += 1
if counter % 5 == 0:
soleil = not soleil
counter = 0
queue_.put(soleil)
time.sleep(1.0)
stop_signal = multiprocessing.Event()
queue = multiprocessing.Queue()
meteo_process = multiprocessing.Process(target=recevoir_meteo, args=(queue, stop_signal))
meteo_process.start()
counter = 0
while True:
soleil = queue.get(block=True)
counter += 1
if soleil:
print("☀")
else:
print("☁")
if counter == 10:
stop_signal.set()
break
"""
Thread : sur deux fils mais pas concurrent
"""
import threading
import time
def mettre_a_jour_meteo(meteo_: dict, stop_signal_: threading.Event) -> None:
counter = 0
while not stop_signal_.is_set():
counter += 1
if counter % 5 == 0:
meteo_["soleil"] = not meteo_["soleil"]
meteo_["nuage"] = not meteo_["nuage"]
counter = 0
time.sleep(1.0)
meteo = {
"soleil": True,
"nuage": False,
}
stop_signal_ = threading.Event()
meteo_thread = threading.Thread(target=mettre_a_jour_meteo, args=(meteo, stop_signal_))
meteo_thread.start()
for _ in range(15):
if meteo["soleil"]:
print("☀")
if meteo["nuage"]:
print("☁")
time.sleep(1)
stop_signal_.set()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment