Skip to content

Instantly share code, notes, and snippets.

@detrin
Last active September 27, 2019 12:24
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save detrin/16dfb79cd696ff611e4473d97323ae77 to your computer and use it in GitHub Desktop.
Save detrin/16dfb79cd696ff611e4473d97323ae77 to your computer and use it in GitHub Desktop.
Python2.7 parallel computing
# -*- coding: utf-8 -*-
import random
import time
from concurrent.futures import ProcessPoolExecutor, as_completed
from multiprocessing import Pool
import numpy
from tqdm import tqdm
# Majme teda (neefektívnu) funkciu, ktorá zistí či dané číslo je prvočíslo.
def is_prime(n):
root = int(n**0.5)
num = 1
while num <= root:
if n % num == 0:
return False
num += 1
return True
# Majme pole na ktorej chceme testovať túto funkciu.
generator = range(10**7)
args = [x for x in generator]
# Pozrime sa ako dlho bude trvať takejto funkcii vyhodnotiť toto pole.
timeFlag = time.time()
result = []
for arg in args:
result.append(is_prime(arg))
print "1.", time.time()-timeFlag, "s"
# Pre prípad, že by nás zaujímalo ako táto funkcia napreduje pri výpočte.
timeFlag = time.time()
result = []
for arg in tqdm(args):
result.append(is_prime(arg))
print "2.", time.time()-timeFlag, "s"
# Možno ale použiť pool z knižnice multiprocessing.
timeFlag = time.time()
pool = Pool(processes=4)
result = pool.map(is_prime, args)
print "3.", time.time()-timeFlag, "s"
# Pri použití generátoru (yield) na vstupe sa efektivita poolu zníži, pretože
# thready musia čakať kým dostanú z jedného generátoru ďalší vstup.
# Pri krátkom behu generátoru je toto spomalenie relatívne malé.
timeFlag = time.time()
pool = Pool(processes=4)
result = pool.map(is_prime, generator)
print "4.", time.time()-timeFlag, "s"
# Pozor takto sa nemusíme dozvedieť, že náš skript havaroval.
def is_prime_bad(n):
root = int(n**0.5)
num = 1
while num <= root:
if n % num == 0:
return False
# num je typu int, operácia += preto nie je povolená.
num += "a"
return True
timeFlag = time.time()
pool = Pool(processes=4)
result = pool.map(is_prime_bad, args)
print "5.", time.time()-timeFlag, "s"
# Pri dlhších procesoch je celkom nápomocné mať nejaký odhad ako dlho náš proces
# bude spustený.
def parallel(function, array, n_jobs=4, use_kwargs=False):
if n_jobs == 1:
return [function(**a) if use_kwargs else function(a) for a in tqdm(array)]
with ProcessPoolExecutor(max_workers=n_jobs) as pool:
if use_kwargs:
futures = [pool.submit(function, **a) for a in array]
else:
futures = [pool.submit(function, a) for a in array]
kwargs = {
'total': len(futures),
'unit': 'it',
'unit_scale': True,
'leave': True
}
for f in tqdm(as_completed(futures), **kwargs):
pass
out = []
for i, future in tqdm(enumerate(futures)):
try:
out.append(future.result())
except Exception as e:
out.append(e)
return out
timeFlag = time.time()
result = parallel(is_prime, range(10**4), n_jobs=4)
print "6.", time.time()-timeFlag, "s"
# S použitím kwargs.
def GCD(x, y):
while(y):
x, y = y, x % y
return x
args = [[random.randint(0, 2**12), random.randint(0, 2**12)]
for x in range(10**4)]
timeFlag = time.time()
result = parallel(GCD, args, n_jobs=4)
print "7.", time.time()-timeFlag, "s"
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment