Skip to content

Instantly share code, notes, and snippets.

@ball-hayden
Last active October 21, 2021 10:26
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ball-hayden/02223827ae380044dabccdfbb97cd115 to your computer and use it in GitHub Desktop.
Save ball-hayden/02223827ae380044dabccdfbb97cd115 to your computer and use it in GitHub Desktop.
ECG Capture and Analysis using Photon, Biosppy and InfluxDB
This was a test for a bit of fun one evening.
Data captured using https://www.sparkfun.com/products/12650 connected to one of the photon's ADC pins.
Data is forwarded (in bursts) via TCP to a python script, and forwarded to InfluxDB.
Biosppy is run every second on data received to extract heartrate, which is also sent to InfluxDB.
reporting-disabled = false
bind-address = ":8088"
[meta]
enabled = true
dir = "/home/hayden/.influxdb/meta"
bind-address = ":8088"
http-bind-address = ":8091"
https-enabled = false
https-certificate = ""
retention-autocreate = true
election-timeout = "1s"
heartbeat-timeout = "1s"
leader-lease-timeout = "500ms"
commit-timeout = "50ms"
cluster-tracing = false
raft-promotion-enabled = true
logging-enabled = true
pprof-enabled = false
lease-duration = "1m0s"
[data]
enabled = true
dir = "/home/hayden/.influxdb/data"
engine = "tsm1"
max-wal-size = 104857600
wal-flush-interval = "10m0s"
wal-partition-flush-delay = "2s"
wal-dir = "/home/hayden/.influxdb/wal"
wal-logging-enabled = true
wal-ready-series-size = 30720
wal-compaction-threshold = 0.5
wal-max-series-size = 1048576
wal-flush-cold-interval = "5s"
wal-partition-size-threshold = 52428800
query-log-enabled = true
cache-max-memory-size = 524288000
cache-snapshot-memory-size = 26214400
cache-snapshot-write-cold-duration = "1h0m0s"
compact-full-write-cold-duration = "24h0m0s"
max-points-per-block = 0
data-logging-enabled = true
[cluster]
force-remote-mapping = false
write-timeout = "5s"
shard-writer-timeout = "5s"
max-remote-write-connections = 3
shard-mapper-timeout = "5s"
[retention]
enabled = true
check-interval = "30m0s"
[shard-precreation]
enabled = true
check-interval = "10m0s"
advance-period = "30m0s"
[admin]
enabled = true
bind-address = ":8083"
https-enabled = false
https-certificate = "/etc/ssl/influxdb.pem"
[monitor]
store-enabled = true
store-database = "_internal"
store-interval = "10s"
[subscriber]
enabled = true
[http]
enabled = true
bind-address = ":8086"
auth-enabled = false
log-enabled = true
write-tracing = false
pprof-enabled = false
https-enabled = false
https-certificate = "/etc/ssl/influxdb.pem"
[collectd]
enabled = false
bind-address = ":25826"
database = "collectd"
retention-policy = ""
batch-size = 5000
batch-pending = 10
batch-timeout = "10s"
read-buffer = 0
typesdb = "/usr/share/collectd/types.db"
[opentsdb]
enabled = false
bind-address = ":4242"
database = "opentsdb"
retention-policy = ""
consistency-level = "one"
tls-enabled = false
certificate = "/etc/ssl/influxdb.pem"
batch-size = 1000
batch-pending = 5
batch-timeout = "1s"
log-point-errors = true
[[udp]]
enabled = true
bind-address = ":8089" # the bind address
database = "udp" # Name of the database that will be written to
batch-size = 5000 # will flush if this many points get buffered
batch-timeout = "1s" # will flush at least this often even if the batch-size is not reached
batch-pending = 100 # number of batches that may be pending in memory
read-buffer = 8388608 # (8*1024*1024) UDP read buffer size
[continuous_queries]
log-enabled = true
enabled = true
run-interval = "1s"
[hinted-handoff]
enabled = true
dir = "/home/hayden/.influxdb/hh"
max-size = 1073741824
max-age = "168h0m0s"
retry-rate-limit = 0
retry-interval = "1s"
retry-max-interval = "1m0s"
purge-interval = "1h0m0s"
#include "application.h"
int aIn = A0;
int aOut = A5;
int led0 = D7;
int led1 = D0;
byte serverIp[] = { 192, 168, 88, 240 };
TCPClient tcpClient;
Timer sampleTimer(5, takeSample);
Timer transmitTimer(500, transmitSamples);
unsigned long startTimeMillis;
struct Sample {
unsigned long value;
unsigned int time;
unsigned int timeMillis;
};
const unsigned int SAMPLE_BUFFER_SIZE = 500;
Sample sampleBuffer[SAMPLE_BUFFER_SIZE];
unsigned int sampleBufferLocation = 0;
void setup()
{
setADCSampleTime(ADC_SampleTime_3Cycles);
// Setup LEDs
pinMode(led0, OUTPUT);
pinMode(led1, OUTPUT);
// Setup analogue inputs
pinMode(aOut, OUTPUT);
pinMode(aIn, INPUT);
analogWrite(aOut, 255);
connect();
startTimeMillis = millis();
sampleTimer.start();
transmitTimer.start();
}
void takeSample()
{
int timeMillis = (millis() - startTimeMillis) % 1000;
Sample sample;
sample.value = analogRead(aIn);
sample.time = Time.now();
sample.timeMillis = timeMillis;
sampleBuffer[sampleBufferLocation] = sample;
sampleBufferLocation += 1;
if(sampleBufferLocation >= SAMPLE_BUFFER_SIZE) {
digitalWrite(led1, HIGH);
sampleBufferLocation = 0;
}
}
void connect() {
tcpClient.connect(serverIp, 2000);
}
const char TRANSMIT_FORMAT[] = "analogue value=%i %i%03i000000\n";
unsigned int TRANSMIT_LENGTH = sizeof(TRANSMIT_FORMAT) * 2;
void transmitSamples() {
if (tcpClient.connected()) {
digitalWrite(led0, HIGH);
char *buffer = (char *) malloc(TRANSMIT_LENGTH * SAMPLE_BUFFER_SIZE);
char line[TRANSMIT_LENGTH];
strcpy(buffer, "");
for (int i = 0; i <= SAMPLE_BUFFER_SIZE; i++) {
if(i >= sampleBufferLocation) {
break;
}
Sample sample = sampleBuffer[i];
sprintf(line, TRANSMIT_FORMAT, sample.value, sample.time, sample.timeMillis);
strcat(buffer, line);
}
digitalWrite(led1, LOW);
sampleBufferLocation = 0;
tcpClient.print(buffer);
free(buffer);
} else {
digitalWrite(led0, LOW);
connect();
}
}
import csv
import numpy as np
from biosppy.signals import ecg
import SocketServer
import socket
import re
import threading
from time import sleep
# UDP Socket
udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
INFLUX_HOST = 'localhost'
INFLUX_PORT = 8089
biosppy_data = []
biosppy_time = []
class MyTCPHandler(SocketServer.StreamRequestHandler):
def handle(self):
while 1:
data = self.rfile.readline()
# Forward to influx
udp_socket.sendto(data, (INFLUX_HOST, INFLUX_PORT))
# Extract time and value:
extracted = re.match(r'(\w+?) value=(\d+) (\d+)', data)
value = int(extracted.group(2))
time = extracted.group(3)
biosppy_data.append(value)
biosppy_time.append(time)
# Remove entries after 2000 samples
if len(biosppy_data) > 2000:
biosppy_data.pop(0)
biosppy_time.pop(0)
# print "%s: %i" % (time, value)
if __name__ == "__main__":
HOST, PORT = "0.0.0.0", 2000
server = SocketServer.TCPServer((HOST, PORT), MyTCPHandler)
server_thread = threading.Thread(target=server.serve_forever)
# Exit the server thread when the main thread terminates
server_thread.daemon = True
server_thread.start()
while 1:
sleep(1)
try:
out = ecg.ecg(signal=biosppy_data, sampling_rate=200., show=False)
rates = out['heart_rate']
rate = rates[-1]
time = biosppy_time[-1]
influx_line = "bpm value=%i %s" % (rate, time)
udp_socket.sendto(influx_line, (INFLUX_HOST, INFLUX_PORT))
print "%i" % rates[-1]
except Exception as ex:
print ex.message
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment