Skip to content

Instantly share code, notes, and snippets.

@h3xagn
Created May 24, 2025 14:56
Show Gist options
  • Select an option

  • Save h3xagn/0c31154b3c82f93ac4fd0fc721fa6b2c to your computer and use it in GitHub Desktop.

Select an option

Save h3xagn/0c31154b3c82f93ac4fd0fc721fa6b2c to your computer and use it in GitHub Desktop.
Bulk insert for InfluxDB 3
def WriteInfluxDB(timestamp: datetime, points_data: list) -> None:
try:
# Create a list of Point objects
points = []
for point_data in points_data:
tag_name = point_data["tag_name"]
tag_value = point_data["tag_value"]
point = (
Point("homedb")
.tag("sensor", tag_name)
.field("value", tag_value)
.time(timestamp, WritePrecision.NS)
)
points.append(point)
# Write all points in a single batch operation
client_influx.write(points)
print(f"-- InfluxDB: {len(points)} records inserted.")
except Exception as e:
print(f"** InfluxDB: Could not save to database. Error: {e}")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment