Skip to content

Instantly share code, notes, and snippets.

@caiquearaujo
Created October 27, 2024 08:21
Show Gist options
  • Save caiquearaujo/a7599da5801f1163776d274e46e012d1 to your computer and use it in GitHub Desktop.
Save caiquearaujo/a7599da5801f1163776d274e46e012d1 to your computer and use it in GitHub Desktop.
Preparador de banco de dados para CPFs
import re
import time
import csv
import mysql.connector
from mysql.connector import Error
from multiprocessing import Pool
def create_connection():
print("Conectando ao banco de dados...")
try:
connection = mysql.connector.connect(
host='localhost',
database='cpfs_data',
user='user',
password='pass'
)
if connection.is_connected():
print("Conexão com banco de dados concluída.")
return connection
except Error as e:
print(f"Erro: {e}")
return None
def create_tables(connection):
print("Criando tabelas...")
cursor = connection.cursor()
table_schemas = {
'table_a': '''
CREATE TABLE IF NOT EXISTS table_a (
cpf BIGINT
);
''',
'table_b': '''
CREATE TABLE IF NOT EXISTS table_b (
cpf BIGINT,
INDEX idx_cpf (cpf)
);
''',
'table_c': '''
CREATE TABLE IF NOT EXISTS table_c (
cpf CHAR(11)
);
''',
'table_d': '''
CREATE TABLE IF NOT EXISTS table_d (
cpf CHAR(11),
INDEX idx_cpf (cpf)
);
''',
'table_e': '''
CREATE TABLE IF NOT EXISTS table_e (
cpf VARCHAR(11)
);
''',
'table_f': '''
CREATE TABLE IF NOT EXISTS table_f (
cpf VARCHAR(11),
INDEX idx_cpf (cpf)
);
''',
'table_g': '''
CREATE TABLE IF NOT EXISTS table_g (
cpf CHAR(11),
crc32 BIGINT GENERATED ALWAYS AS (crc32(cpf)) STORED
);
''',
'table_h': '''
CREATE TABLE IF NOT EXISTS table_h (
cpf CHAR(11),
crc32 BIGINT GENERATED ALWAYS AS (crc32(cpf)) STORED,
INDEX idx_crc32 (crc32)
);
'''
}
for table_name, schema in table_schemas.items():
print(f"Criando tabela {table_name}...")
cursor.execute(f"DROP TABLE IF EXISTS {table_name};")
cursor.execute(schema)
connection.commit()
cursor.close()
print("Tabelas criadas com sucesso.")
def insert_cpfs_process(args):
table_name, insert_sql, cpf_data_chunk = args
try:
connection = mysql.connector.connect(
host='localhost',
database='cpfs_data',
user='user',
password='pass'
)
cursor = connection.cursor()
cursor.executemany(insert_sql, cpf_data_chunk)
connection.commit()
cursor.close()
connection.close()
except Error as e:
print(f"Erro no processo ao inserir na {table_name}: {e}")
def insert_cpfs(connection, num_processes=20):
print("Iniciando a inserção de CPFs nas tabelas com multiprocessing...")
insert_statements = {
'table_a': 'INSERT INTO table_a (cpf) VALUES (%s)',
'table_b': 'INSERT INTO table_b (cpf) VALUES (%s)',
'table_c': 'INSERT INTO table_c (cpf) VALUES (%s)',
'table_d': 'INSERT INTO table_d (cpf) VALUES (%s)',
'table_e': 'INSERT INTO table_e (cpf) VALUES (%s)',
'table_f': 'INSERT INTO table_f (cpf) VALUES (%s)',
'table_g': 'INSERT INTO table_g (cpf) VALUES (%s)',
'table_h': 'INSERT INTO table_h (cpf) VALUES (%s)',
}
batch_size = 10000
for table_name, insert_sql in insert_statements.items():
print(f"Inserindo CPFs na {table_name} com multiprocessing...")
start_time = time.time()
data_chunks = []
data = []
idx = 0
total_cpfs = 0
with open('cpfs.csv', 'r') as csv_file:
csv_reader = csv.reader(csv_file)
for row in csv_reader:
cpf = row[0]
idx += 1
total_cpfs += 1
if table_name in ['table_a', 'table_b']:
cpf_value = int(cpf)
else:
cpf_value = cpf
data.append((cpf_value,))
if idx % batch_size == 0:
data_chunks.append(data)
data = []
if idx % (batch_size * 100) == 0:
percent = (idx / total_cpfs) * 100
print(f"Preparados {idx} CPFs para {table_name} ({percent:.2f}%)")
if data:
data_chunks.append(data)
args_list = [(table_name, insert_sql, chunk) for chunk in data_chunks]
with Pool(processes=num_processes) as pool:
pool.map(insert_cpfs_process, args_list)
end_time = time.time()
elapsed_time = end_time - start_time
print(f"Concluída a inserção na {table_name} em {elapsed_time:.2f} segundos.")
print("Inserção de CPFs nas tabelas concluída.")
def main():
connection = create_connection()
if connection is None:
return
create_tables(connection)
insert_cpfs(None)
connection.close()
print("Tudo pronto.")
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment