Skip to content

Instantly share code, notes, and snippets.

@jdayllon
Created January 3, 2024 14:08
Show Gist options
  • Save jdayllon/9afdabbcfc90c861aafbc7d313b0bc50 to your computer and use it in GitHub Desktop.
Save jdayllon/9afdabbcfc90c861aafbc7d313b0bc50 to your computer and use it in GitHub Desktop.
output-benthos
input:
label: "load_csv_pagos"
batched:
child:
csv:
paths:
- /tmp/benthos/data/PAGOS.csv
delimiter: '|'
parse_header_row: false
policy:
count: 100
period: "1s"
pipeline:
threads: -1
processors:
- label: "delete_heading"
bloblang: |
root = if this.0 == "Id_Sociedad_Financiera" { deleted() }
- label: "mapping"
bloblang: |
root.data.company_id = this.0.number()
root.data.company_desc = this.1
root.data.budget_section_id = this.2
root.data.budget_section_desc = this.3
root.data.outgoing_payment_year = this.4.number()
root.data.outgoing_payment_id = if this.5 == null { null } else { this.5.number() }
root.data.outgoing_payment_date = this.6.number()
root.data.creditor_desc = this.7
root.data.creditor_id = this.8
root.data.transferee_id = this.9
root.data.budget_line_id = this.10
root.data.budget_line_desc = this.11
root.data.general_ledger_id = this.12
root.data.general_ledger_desc = this.13
root.data.outgoing_payment_treasury = this.14
root.data.accounting_file_year = if this.15 == null {
null
} else { this.15.number() }
root.data.accounting_file_id = match {
this.16 == null => null ,
this.16 == "" => null,
_ => this.16.number()
}
root.data.accounting_procedure_id = this.17
root.data.accounting_procedure_desc = this.18
root.data.outsourcing_accounting_file_class = this.19
root.data.outsourcing_accounting_file_year = if this.20 == null {
null
} else { this.20.number() }
root.data.outsourcing_accounting_file_id = match {
this.21 == null => null,
this.21 == "" => null,
_ => this.21.number()
}
root.data.treasury_id = this.22
root.data.bank_desc = this.23
root.data.bank_id = this.24
root.data.treasury_budget_class_desc = this.25
root.data.amount_gross = if this.26 == null {
null
} else { this.26.replace_all(" ","").number() }
root.data.amount_deductions = if this.27 == null {
null
} else { this.27.replace_all(" ","").number() }
root.data.amount_net = if this.28 == null {
null
} else { this.28.replace_all(" ","").number() }
root.semantized = "false"
root.random_id_0 = this.string().hash("sha1").index(0)
root.random_id_1 = this.string().hash("sha1").index(1)
root.random_id_2 = this.string().hash("sha1").index(2)
root.id = "%04.0f%04.0f%010.0f-%04d%04d%04d".format(root.data.company_id, root.data.outgoing_payment_year, root.data.outgoing_payment_id, root.random_id_0, root.random_id_1, root.random_id_2)
root.random_id = deleted()
- archive:
format: json_array
- bloblang: |
root.contents = this
root.date = now().ts_strftime("%Y-%m-%dT%H:%M:%S.000Z")
root.operation = "create_batch"
root.dataset = "${INDICEPAGOS}"
root.entityType = "node"
root.provider = "${INDICEPAGOS}_batch"
meta uuid = uuid_v4()
- log:
level: INFO
message: Tesoreria-Pagos-a-Kafka
fields_mapping: |
root.length = this.contents.length()
root.topic = env("KAFKATOPIC")
root.dataset = env("INDICEPAGOS")
output:
broker:
outputs:
- label: kafka_target
kafka:
addresses:
- ${KAFKAHOST}
sasl:
mechanism: none
topic: "${KAFKATOPIC}"
client_id: ${KAFKACLIENTID}
target_version: 2.0.0
key: ${! meta("uuid") }
# max_in_flight: 1
# max_msg_bytes: 1000000
timeout: "180s"
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment