Created
June 10, 2022 07:04
-
-
Save xunge/2e45236603b8917c40dd3ab626a880ef to your computer and use it in GitHub Desktop.
multiprocessing demo for select sqlite db.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import sqlite3 | |
import multiprocessing as mp | |
from itertools import repeat | |
def build_db(db_name): | |
conn = sqlite3.connect(db_name) | |
cursor = conn.cursor() | |
cursor.execute('''CREATE TABLE IF NOT EXISTS EMPLOYEE ( | |
ID INTEGER PRIMARY KEY, | |
FIRST_NAME VARCHAR(255), | |
LAST_NAME VARCHAR(255), | |
AGE int, | |
SEX VARCHAR(255), | |
INCOME int | |
)''') | |
cursor.execute('''INSERT INTO EMPLOYEE( | |
FIRST_NAME, LAST_NAME, AGE, SEX, INCOME) VALUES | |
('Ramya', 'Rama Priya', 27, 'F', 9000)''') | |
cursor.execute('''INSERT INTO EMPLOYEE( | |
FIRST_NAME, LAST_NAME, AGE, SEX, INCOME) VALUES | |
('Vinay', 'Battacharya', 20, 'M', 6000)''') | |
cursor.execute('''INSERT INTO EMPLOYEE( | |
FIRST_NAME, LAST_NAME, AGE, SEX, INCOME) VALUES | |
('Sharukh', 'Sheik', 25, 'M', 8300)''') | |
cursor.execute('''INSERT INTO EMPLOYEE( | |
FIRST_NAME, LAST_NAME, AGE, SEX, INCOME) VALUES | |
('Sarmista', 'Sharma', 26, 'F', 10000)''') | |
cursor.execute('''INSERT INTO EMPLOYEE( | |
FIRST_NAME, LAST_NAME, AGE, SEX, INCOME) VALUES | |
('Tripthi', 'Mishra', 24, 'F', 6000)''') | |
conn.commit() | |
conn.close() | |
def init_worker(function, db_name): | |
conn = sqlite3.connect(db_name) | |
function.cursor = conn.cursor() | |
def use_db(id): | |
cursor = use_db.cursor | |
query = cursor.execute('SELECT ID, FIRST_NAME FROM EMPLOYEE WHERE ID=?', (id,)) | |
result = query.fetchall() | |
print(result[0]) | |
return result[0] | |
def use_db_two_args(id, db_name): | |
conn = sqlite3.connect(db_name) | |
cursor = conn.cursor() | |
query = cursor.execute('SELECT ID, FIRST_NAME FROM EMPLOYEE WHERE ID=?', (id,)) | |
result = query.fetchall() | |
print(result[0]) | |
return result[0] | |
result_list = [] | |
def log_result(result): | |
result_list.append(result) | |
def main(): | |
db_name = 'test.db' | |
build_db(db_name) | |
pool = mp.Pool(processes=2, initializer=init_worker, initargs=(use_db, db_name)) | |
result_list = [] | |
print('--- apply ---') | |
for i in range(1, 6): | |
res = pool.apply(use_db, (i,)) | |
result_list.append(res) | |
pool.close() | |
pool.join() | |
print(result_list) | |
pool = mp.Pool(processes=2, initializer=init_worker, initargs=(use_db, db_name)) | |
print('--- apply_async ---') | |
for i in range(1, 6): | |
pool.apply_async(use_db, (i,), callback=log_result) | |
pool.close() | |
pool.join() | |
print(result_list) | |
## wrong | |
print('--- apply_async wrong ---') | |
with mp.Pool(processes=2, initializer=init_worker, initargs=(use_db, db_name)) as pool: | |
for i in range(1, 6): | |
pool.apply_async(use_db, (i,), callback=log_result) | |
pool = mp.Pool(processes=2, initializer=init_worker, initargs=(use_db, db_name)) | |
print('--- map ---') | |
res = pool.map(use_db, range(1, 6)) | |
pool.close() | |
pool.join() | |
print(res) | |
pool = mp.Pool(processes=2, initializer=init_worker, initargs=(use_db, db_name)) | |
print('--- map_async ---') | |
res = pool.map_async(use_db, range(1, 6)) | |
pool.close() | |
pool.join() | |
print(res.get()) | |
pool = mp.Pool(processes=2, initializer=init_worker, initargs=(use_db, db_name)) | |
result_list = [] | |
print('--- imap ---') | |
for res in pool.imap(use_db, range(1, 6)): | |
result_list.append(res) | |
pool.close() | |
pool.join() | |
print(result_list) | |
pool = mp.Pool(processes=2, initializer=init_worker, initargs=(use_db, db_name)) | |
result_list = [] | |
print('--- imap_unordered ---') | |
for res in pool.imap_unordered(use_db, range(1, 6)): | |
result_list.append(res) | |
pool.close() | |
pool.join() | |
print(result_list) | |
pool = mp.Pool(processes=2, initializer=init_worker, initargs=(use_db, db_name)) | |
print('--- imap_unordered2 ---') | |
res_iter = pool.imap_unordered(use_db, range(1, 6)) | |
res = list(res_iter) | |
pool.close() | |
pool.join() | |
print(res) | |
pool = mp.Pool(processes=2) | |
print('--- starmap ---') | |
args1 = range(1, 6) | |
res = pool.starmap(use_db_two_args, zip(args1, repeat(db_name))) | |
pool.close() | |
pool.join() | |
print(res) | |
pool = mp.Pool(processes=2) | |
print('--- starmap_async ---') | |
args1 = range(1, 6) | |
res = pool.starmap_async(use_db_two_args, zip(args1, repeat(db_name))) | |
pool.close() | |
pool.join() | |
print(res.get()) | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment