-
-
Save caiquearaujo/a7599da5801f1163776d274e46e012d1 to your computer and use it in GitHub Desktop.
Preparador de banco de dados para CPFs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import re | |
import time | |
import csv | |
import mysql.connector | |
from mysql.connector import Error | |
from multiprocessing import Pool | |
def create_connection(): | |
print("Conectando ao banco de dados...") | |
try: | |
connection = mysql.connector.connect( | |
host='localhost', | |
database='cpfs_data', | |
user='user', | |
password='pass' | |
) | |
if connection.is_connected(): | |
print("Conexão com banco de dados concluída.") | |
return connection | |
except Error as e: | |
print(f"Erro: {e}") | |
return None | |
def create_tables(connection): | |
print("Criando tabelas...") | |
cursor = connection.cursor() | |
table_schemas = { | |
'table_a': ''' | |
CREATE TABLE IF NOT EXISTS table_a ( | |
cpf BIGINT | |
); | |
''', | |
'table_b': ''' | |
CREATE TABLE IF NOT EXISTS table_b ( | |
cpf BIGINT, | |
INDEX idx_cpf (cpf) | |
); | |
''', | |
'table_c': ''' | |
CREATE TABLE IF NOT EXISTS table_c ( | |
cpf CHAR(11) | |
); | |
''', | |
'table_d': ''' | |
CREATE TABLE IF NOT EXISTS table_d ( | |
cpf CHAR(11), | |
INDEX idx_cpf (cpf) | |
); | |
''', | |
'table_e': ''' | |
CREATE TABLE IF NOT EXISTS table_e ( | |
cpf VARCHAR(11) | |
); | |
''', | |
'table_f': ''' | |
CREATE TABLE IF NOT EXISTS table_f ( | |
cpf VARCHAR(11), | |
INDEX idx_cpf (cpf) | |
); | |
''', | |
'table_g': ''' | |
CREATE TABLE IF NOT EXISTS table_g ( | |
cpf CHAR(11), | |
crc32 BIGINT GENERATED ALWAYS AS (crc32(cpf)) STORED | |
); | |
''', | |
'table_h': ''' | |
CREATE TABLE IF NOT EXISTS table_h ( | |
cpf CHAR(11), | |
crc32 BIGINT GENERATED ALWAYS AS (crc32(cpf)) STORED, | |
INDEX idx_crc32 (crc32) | |
); | |
''' | |
} | |
for table_name, schema in table_schemas.items(): | |
print(f"Criando tabela {table_name}...") | |
cursor.execute(f"DROP TABLE IF EXISTS {table_name};") | |
cursor.execute(schema) | |
connection.commit() | |
cursor.close() | |
print("Tabelas criadas com sucesso.") | |
def insert_cpfs_process(args): | |
table_name, insert_sql, cpf_data_chunk = args | |
try: | |
connection = mysql.connector.connect( | |
host='localhost', | |
database='cpfs_data', | |
user='user', | |
password='pass' | |
) | |
cursor = connection.cursor() | |
cursor.executemany(insert_sql, cpf_data_chunk) | |
connection.commit() | |
cursor.close() | |
connection.close() | |
except Error as e: | |
print(f"Erro no processo ao inserir na {table_name}: {e}") | |
def insert_cpfs(connection, num_processes=20): | |
print("Iniciando a inserção de CPFs nas tabelas com multiprocessing...") | |
insert_statements = { | |
'table_a': 'INSERT INTO table_a (cpf) VALUES (%s)', | |
'table_b': 'INSERT INTO table_b (cpf) VALUES (%s)', | |
'table_c': 'INSERT INTO table_c (cpf) VALUES (%s)', | |
'table_d': 'INSERT INTO table_d (cpf) VALUES (%s)', | |
'table_e': 'INSERT INTO table_e (cpf) VALUES (%s)', | |
'table_f': 'INSERT INTO table_f (cpf) VALUES (%s)', | |
'table_g': 'INSERT INTO table_g (cpf) VALUES (%s)', | |
'table_h': 'INSERT INTO table_h (cpf) VALUES (%s)', | |
} | |
batch_size = 10000 | |
for table_name, insert_sql in insert_statements.items(): | |
print(f"Inserindo CPFs na {table_name} com multiprocessing...") | |
start_time = time.time() | |
data_chunks = [] | |
data = [] | |
idx = 0 | |
total_cpfs = 0 | |
with open('cpfs.csv', 'r') as csv_file: | |
csv_reader = csv.reader(csv_file) | |
for row in csv_reader: | |
cpf = row[0] | |
idx += 1 | |
total_cpfs += 1 | |
if table_name in ['table_a', 'table_b']: | |
cpf_value = int(cpf) | |
else: | |
cpf_value = cpf | |
data.append((cpf_value,)) | |
if idx % batch_size == 0: | |
data_chunks.append(data) | |
data = [] | |
if idx % (batch_size * 100) == 0: | |
percent = (idx / total_cpfs) * 100 | |
print(f"Preparados {idx} CPFs para {table_name} ({percent:.2f}%)") | |
if data: | |
data_chunks.append(data) | |
args_list = [(table_name, insert_sql, chunk) for chunk in data_chunks] | |
with Pool(processes=num_processes) as pool: | |
pool.map(insert_cpfs_process, args_list) | |
end_time = time.time() | |
elapsed_time = end_time - start_time | |
print(f"Concluída a inserção na {table_name} em {elapsed_time:.2f} segundos.") | |
print("Inserção de CPFs nas tabelas concluída.") | |
def main(): | |
connection = create_connection() | |
if connection is None: | |
return | |
create_tables(connection) | |
insert_cpfs(None) | |
connection.close() | |
print("Tudo pronto.") | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment