Skip to content

Instantly share code, notes, and snippets.

@flinox
Last active November 27, 2019 13:02
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save flinox/ca2301c5ee806f6d628d37fcdd4da8c8 to your computer and use it in GitHub Desktop.
Save flinox/ca2301c5ee806f6d628d37fcdd4da8c8 to your computer and use it in GitHub Desktop.
A python script to to purge inactive records of many different sources from informatica MDM calling the service SIF ExecuteBatchDelete
# ------------------------------------------------------------------------------
# v1.0 - 2019-08-13 - Created by: Fernando Lino Di Tomazzo Silva
# v2.0 - 2019-11-27 - Updated by: Fernando Lino Di Tomazzo Silva
#
# Script para realizar expurgo de registros logicamente inativos das BO's do MDM da INFORMATICA.
# Mais informacoes sobre MDM da INFORMATICA em: https://www.informatica.com/br/products/master-data-management.html
#
# ##### PRE REQS ######
# 1) Ter uma tabela criada que será usada para carregar os ID's dos registros para expurgar, Ex.:
# create table CMX_ORS.TB_PURGE_AUX
# (
# ROWID_XREF CHAR(14) not null
# )
# tablespace CMX_DATA;
#
# 2) Python 3.6.5 ou + e library cx_Oracle instalada
# pip3 install cx_Oracle
#
# 3) No servidor linux que for executar o script, é necessário ter 2 pares de usuario e senha setados em variaveis de ambiente, o primeiro par
# deve ter permissão para realizar consultas nas tabelas do MDM e para insert e delete na tabela criada no
# passo 1, Precisa ter os nomes abaixo:
# SRCUSER e SRCPASS
#
# O segundo par deve ter permissão para executar os serviços do MDM, precisa ser os nomes abaixo de variaveis de ambiente:
# MDMUSER e MDMPASS
#
# 4) Para ajuda ou maiores informações sobre os parametros use:
# python mdm_purge.py -h
#
# 5) Exemplo de uso:
# python mdm_purge.py 192.168.0.1/servicedata http://192.168.0.2:7000/cmx/services/SifService servicedata-CMX_ORS C_B_PESSOA TB_PURGE_AUX 500 false true true false
#
# ------------------------------------------------------------------------------
import cx_Oracle
import requests
import time
import sys
import xml.etree.ElementTree as ET
import datetime
import argparse
import textwrap
import os
# Realiza conexao com banco
def conexao(cs):
# Realiza a conexao
print(">>> Conectando no banco de origem ...")
con = cx_Oracle.connect(cs)
return con
# Conta qtde de registros por origem
def get_qtde_para_expurgo_por_origem(con,bo):
lista = []
try:
# Contando os registros para expurgo
print(">>> Contando os registros para o expurgo ...")
#print(">"*80 +"\n")
registros = "SELECT XRPE.ROWID_SYSTEM,COUNT(1) FROM CMX_ORS.@mdmbo@_XREF XRPE INNER JOIN CMX_ORS.@mdmbo@ BOPE ON (XRPE.ROWID_OBJECT = BOPE.ROWID_OBJECT) WHERE BOPE.HUB_STATE_IND = -1 GROUP BY XRPE.ROWID_SYSTEM ORDER BY 2"
registros = registros.replace('@mdmbo@',bo)
cur = con.cursor()
cur.execute(registros)
time.sleep(1)
for result in cur:
lista.append({result[0].strip(" ") : int(result[1])})
except:
e = sys.exc_info()[0]
print("Error: %s" % e)
print(sys.exc_info())
finally:
cur.close()
if len(lista) > 0:
for item in lista:
for key,val in item.items():
print(">>> Origem %s possui %s registros inativos pendentes para expurgo ..." % (key,str(val)))
else:
print(">>> Nenhum sistema origem possui registros para expurgar.")
return lista
# Define os parametros esperados e obrigatorios
parser = argparse.ArgumentParser(formatter_class=argparse.RawDescriptionHelpFormatter,description=textwrap.dedent('''\
Expurgo de registros do MDM da informatica usando o serviço SOAP SIF ExecuteBatchDelete!
Maiores informacoes leia a doc. MDM_XXXHFX_ServicesIntegrationFramework[SIF]Guide_en.pdf
----------------------------------------------------------------------------------------
Created By: Fernando Lino Di Tomazzo Silva ( https://www.linkedin.com/in/flinox )
Version 1.0 - 2019-08-13
'''))
parser.add_argument("connectstring", help="Informe o endereco para o banco oracle onde estão os dados do MDM, Ex.: hostname:port/service")
parser.add_argument("mdmsifurl", help="Informe a url dos serviços SIF do MDM, Ex.: http://hostname:port/cmx/services/SifService")
parser.add_argument("mdmorsid", help="Informe o nome do ORS ID do seu MDM, Ex.: banco-CMX_ORS")
parser.add_argument("mdmbo", help="Informe o nome da BO que quer realizar o expurgo de registros inativos HUB_STATE_IND = -1, Ex.: C_B_PESSOA ")
parser.add_argument("mdmtabaux", help="Informe o nome da tabela auxiliar que deve conter os ROWID_XREF's dos registros que deverão ser expurgados, ela deve conter apenas um campo ROWID_XREF, Ex.: PURGE_AUX")
parser.add_argument("lote", help="Informe a qtde de registros por lote para expurgo", default=500, type=int)
parser.add_argument("recalculatebvt", help="Informe se após o expurgo você deseva recalcular o BVT")
parser.add_argument("cascading", help="Informe se você deseja que o expurgo seja em cascata para todas as tabelas relacionadas aquele registro que será expurgado")
parser.add_argument("overridehistory", help="Informe se você quer que o MDM registre a atividade executada pela exclusão do lote nas tabelas do histórico.")
parser.add_argument("purgehistory", help="Informe se você quer que o MDM exclua todos os registros do histórico de não mesclagem relacionados ao registro de cross reference excluído.")
args = parser.parse_args()
print(">"*80 +"\n")
print(">>> Verificando parametros e variaveis ...")
listaorigens = []
origem = ''
lote = args.lote
origemuser = os.environ['SRCUSER']
origempass = os.environ['SRCPASS']
mdmuser = os.environ['MDMUSER']
mdmpass = os.environ['MDMPASS']
connectstring = origemuser+'/'+origempass+'@'+args.connectstring #args.connectstring
mdmbo = args.mdmbo
mdmtabaux = args.mdmtabaux
mdmorsid = args.mdmorsid
url=args.mdmsifurl
recalculatebvt = args.recalculatebvt
cascading = args.cascading
overridehistory = args.overridehistory
purgehistory = args.purgehistory
print(">>> Inicio do processo de expurgo de inativos %s " % datetime.datetime.now().strftime("%d/%m/%Y %H:%M:%S"))
while True:
try:
# Realiza a conexão com o banco ...
con = conexao(connectstring)
listaorigens = get_qtde_para_expurgo_por_origem(con,mdmbo)
for item in listaorigens:
for key,val in item.items():
origem = key
origem_qtde = val
recount = 0
while(origem_qtde > 0):
qtdedeleted = 0
# Deleta tabela auxiliar, insere novos registros selecionados
try:
# Seleciona 100 registros para expurgo
print(">>> Selecionando um lote de até %s registros para expurgo da origem %s ..." % ((lote),origem))
queryselect = "SELECT XRPE.ROWID_XREF FROM CMX_ORS.@mdmbo@_XREF XRPE INNER JOIN CMX_ORS.@mdmbo@ BOPE ON (XRPE.ROWID_OBJECT = BOPE.ROWID_OBJECT) WHERE BOPE.HUB_STATE_IND = -1 AND XRPE.ROWID_SYSTEM = '@origem@' AND ROWNUM < @lote@"
queryselect = queryselect.replace('@origem@',origem)
queryselect = queryselect.replace('@lote@',str(lote+1))
queryselect = queryselect.replace('@mdmbo@',str(mdmbo))
cur = con.cursor()
# Deleta a tabela atual
print(">>> Limpando a tabela CMX_ORS.%s ..." % mdmtabaux)
querydelete = ("DELETE FROM CMX_ORS.%s" % mdmtabaux)
cur.execute(querydelete)
con.commit()
time.sleep(1)
print(">>> Inserindo os rowid_xref dos registros selecionados na CMX_ORS.PURGE_AUX ...")
queryinsert = ("INSERT INTO CMX_ORS.%s %s " % (mdmtabaux,queryselect))
cur.execute(queryinsert)
con.commit()
time.sleep(1)
except:
e = sys.exc_info()[0]
print("Error: %s" % e)
print(sys.exc_info())
finally:
# Encerra a conexao com o banco
print(">>> Commitando as transacoes ...")
time.sleep(1)
cur.close()
con.commit()
# Realiza a chamada do serviço para o expurgo
try:
# Realiza a chamada do serviço para o expurgo
print(">>> Executando o serviço SIF ExecuteBatchDelete do MDM ...")
headers = {'content-type': 'text/xml', 'SOAPAction': 'POST'}
body = """<soapenv:Envelope xmlns:soapenv="http://schemas.xmlsoap.org/soap/envelope/" xmlns:urn="urn:siperian.api">
<soapenv:Header/>
<soapenv:Body>
<urn:executeBatchDelete>
<!--Optional:-->
<urn:username>@mdmuser@</urn:username>
<!--Optional:-->
<urn:password>
<urn:password>@mdmpass@</urn:password>
<urn:encrypted>false</urn:encrypted>
</urn:password>
<!--Optional:-->
<urn:orsId>@mdmorsid@</urn:orsId>
<!--Optional:-->
<urn:interactionId></urn:interactionId>
<!--Optional:-->
<urn:asynchronousOptions>
<urn:isAsynchronous>false</urn:isAsynchronous>
<!--Optional:-->
<urn:jmsReplyTo></urn:jmsReplyTo>
<!--Optional:-->
<urn:jmsCorrelationId></urn:jmsCorrelationId>
</urn:asynchronousOptions>
<urn:tableName>@mdmbo@</urn:tableName>
<urn:sourceTableName>@mdmtabaux@</urn:sourceTableName>
<urn:recalculateBvt>@recalculatebvt@</urn:recalculateBvt>
<urn:cascading>@cascading@</urn:cascading>
<urn:overrideHistory>@overridehistory@</urn:overrideHistory>
<urn:purgeHistory>@purgehistory@</urn:purgeHistory>
</urn:executeBatchDelete>
</soapenv:Body>
</soapenv:Envelope>"""
body = body.replace("@mdmuser@",mdmuser)
body = body.replace("@mdmpass@",mdmpass)
body = body.replace("@mdmbo@",mdmbo)
body = body.replace("@mdmtabaux@",mdmtabaux)
body = body.replace("@mdmorsid@",mdmorsid)
body = body.replace("@recalculatebvt@",recalculatebvt)
body = body.replace("@cascading@",cascading)
body = body.replace("@overridehistory@",overridehistory)
body = body.replace("@purgehistory@",purgehistory)
response = requests.post(url,data=body,headers=headers)
print("")
print(response.content.decode('utf8'))
xml=ET.fromstring(response.content.decode('utf8'))
for child in xml[0][0]:
if child.tag == "{urn:siperian.api}processedXrefsCount":
qtdedeleted = int(child.text)
print("")
print(">>> Registros deletados: %s " % str(qtdedeleted))
print("")
except:
e = sys.exc_info()[0]
print("Error: %s" % e)
finally:
print(">"*80 +"\n")
if qtdedeleted == 0:
recount += 1
if recount > 6:
recountqry = "SELECT COUNT(1) FROM CMX_ORS.@mdmbo@_XREF XRPE INNER JOIN CMX_ORS.@mdmbo@ BOPE ON (XRPE.ROWID_OBJECT = BOPE.ROWID_OBJECT) WHERE BOPE.HUB_STATE_IND = -1 AND XRPE.ROWID_SYSTEM = '@origem@'"
recountqry = recountqry.replace('@origem@',origem)
recountqry = recountqry.replace('@mdmbo@',str(mdmbo))
cur = con.cursor()
cur.execute(recountqry)
for result in cur:
origem_qtde = int(result[0])
if origem_qtde == 0:
print(">>> Não existem mais registros para expurgar da origem %s " % origem)
continue
cur.close()
origem_qtde = origem_qtde - ( qtdedeleted )
print(">>> Processo em execucao %s " % datetime.datetime.now().strftime("%d/%m/%Y %H:%M:%S"))
print(">>> Restando %s registros inativos pendentes para %s ..." % (origem_qtde,origem))
print(">>> Lote finalizado para origem %s " % origem)
print(">>> Termino do lote em %s <<< " % datetime.datetime.now().strftime("%d/%m/%Y %H:%M:%S"))
# Encerrando a conexão com o banco
con.close()
print(">>> Termino do processo de expurgo de inativos %s " % datetime.datetime.now().strftime("%d/%m/%Y %H:%M:%S"))
print("")
print(">"*80 +"\n")
exit(0)
except KeyboardInterrupt:
print(">>> Processo finalizado pelo usuário ( CTRL + C ) <<<")
print(">"*80 +"\n")
exit(888)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment