Skip to content

Instantly share code, notes, and snippets.

@h3xagn
Last active June 5, 2025 03:59
Show Gist options
  • Select an option

  • Save h3xagn/7c2dcaaa2760f13904b7e4e51a97df7e to your computer and use it in GitHub Desktop.

Select an option

Save h3xagn/7c2dcaaa2760f13904b7e4e51a97df7e to your computer and use it in GitHub Desktop.
Telegraf Processor for OPC UA
[[processors.starlark]]
namepass = ["opcua_plc1", "opcua_mplc"]
order = 1
source = '''
def apply(metric):
new_fields = {}
for field_key in list(metric.fields.keys()):
if field_key != "Quality":
tag_value = field_key
metric.tags['tag_name'] = tag_value
value = metric.fields[field_key]
metric.fields.pop(field_key)
new_field_name = tag_value
new_fields[new_field_name] = value
if type(value) == "int" or type(value) == "float":
new_fields["value_real"] = float(value)
elif type(value) == "bool":
new_fields["value_bool"] = value
else:
new_fields["value_str"] = value
for k, v in new_fields.items():
metric.fields[k] = v
for field_key in list(metric.fields.keys()):
if field_key == "Quality" or field_key.startswith("value_"):
continue
metric.fields.pop(field_key)
return metric
'''
[[processors.regex]]
namepass = ["opcua_plc1", "opcua_mplc"]
order = 2
[[processors.regex.fields]]
key = "Quality"
pattern = ".*\\(([^)]*)\\).*"
replacement = "${1}"
[[processors.regex.field_rename]]
pattern = "^Quality$"
replacement = "quality"
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment