Skip to content

Instantly share code, notes, and snippets.

@jschoch
Created March 14, 2024 22:31
Show Gist options
  • Save jschoch/55bb6335aa1433e28a0df91a7144c3a7 to your computer and use it in GitHub Desktop.
Save jschoch/55bb6335aa1433e28a0df91a7144c3a7 to your computer and use it in GitHub Desktop.
now_pendant.py
ESP32Encoder encoder1;
ESP32Encoder encoder2;
ESP32Encoder encoder3;
ESP32Encoder encoder4;
Ticker readEncoders;
Ticker slowReadEncoders;
enum class InputType{
Encoder,
Button
};
struct EncInputData{
InputType type;
int id;
int64_t v;
int64_t prev_v;
enum class ValueType { FLOAT, INT32, INT64, INT8, BOOL };
ValueType vtype;
ESP32Encoder *encoder;
};
// initilize the structs, why doesn't teh commented code work gah!@
//EncInputData e1;
//e1.encoder = &encoder1;
EncInputData e1 = {
InputType::Encoder,
1,
0,
0,
EncInputData::ValueType::INT64,
&encoder1
};
EncInputData e2 = {
InputType::Encoder,
2,
0,
0,
EncInputData::ValueType::INT64,
&encoder2
};
EncInputData e3 = {
InputType::Encoder,
3,
0,
0,
EncInputData::ValueType::INT64,
&encoder3
};
EncInputData e4 = {
InputType::Encoder,
4,
0,
0,
EncInputData::ValueType::INT64,
&encoder4
};;
#define NUM_ENC 4
EncInputData encoders[] = {e1,e2,e3,e4};
void sendUpdate(EncInputData encoderData){
// TODO: how can the 3rd value be int64_t?
MsgPack::arr_t<int> enc1_list {encoderData.id, encoderData.prev_v, encoderData.v};
//MsgPack::arr_t<int> enc2_list {1, 2, (int32_t)c2};
//packer.to_map("h",0,"e",encoder_list);
//packer.serialize(MsgPack::map_size_t(2), "h",false, "e" , MsgPack::arr_size_t(NUM_ENC), enc1_list, enc2_list);
packer.serialize(MsgPack::map_size_t(2), "h",false, "e" , enc1_list);
esp_err_t result = esp_now_send(remotePeerAddress, packer.data(), packer.size()); // send esp-now addPeerMsg
packer.clear();
if (result == ESP_OK) {
Serial.print(".");
}else{
Serial.println(" couldn't send msg");
Serial.println(result);
}
}
void doReadEncoders(bool print){
for(int i = 0;i < NUM_ENC;i++){
encoders[i].v = encoders[i].encoder->getCount();
if(encoders[i].prev_v != encoders[i].v){
sendUpdate(encoders[i]);
// set previous value
encoders[i].prev_v = encoders[i].v;
}
}
}
#!/usr/bin/env python3
import gramme
import os
from gi.repository import GObject
from gi.repository import GLib
import hal
from hal_glib import GStat
GSTAT = GStat()
import sys
import time
#print("sup")
print("Loading now pendant")
print(f"Python version: {sys.version}")
pinmap = [4,5,6,7,8,9] #
dacpinmap = [3] # DAC fixed
nout = 5
encoder_map = {
0: 0.0,
1: 1.0,
2: 0.5,
3: 0.1,
4: 0.05,
5: 0.1}
c = hal.component("now_pendant")
c.newpin("analog-out-%02d" % dacpinmap[0], hal.HAL_FLOAT, hal.HAL_IN)
c.newparam("analog-out-%02d-offset" % dacpinmap[0], hal.HAL_FLOAT, hal.HAL_RW)
c.newparam("analog-out-%02d-scale" % dacpinmap[0], hal.HAL_FLOAT, hal.HAL_RW)
c['analog-out-%02d-scale' % dacpinmap[0]] = 1.0
for port in range(6):
c.newpin("analog-in-%02d" % port, hal.HAL_FLOAT, hal.HAL_OUT)
c.newparam("analog-in-%02d-offset" % port, hal.HAL_FLOAT, hal.HAL_RW)
c.newparam("analog-in-%02d-gain" % port, hal.HAL_FLOAT, hal.HAL_RW)
c['analog-in-%02d-gain' % port] = 1.0
for port in range(nout):
c.newpin("digital-out-%02d" % pinmap[port], hal.HAL_BIT, hal.HAL_IN)
c.newparam("digital-out-%02d-invert" % pinmap[port], hal.HAL_BIT, hal.HAL_RW)
for port in range(nout, 6):
c.newpin("digital-in-%02d" % pinmap[port], hal.HAL_BIT, hal.HAL_OUT)
c.newpin("digital-in-%02d-not" % pinmap[port], hal.HAL_BIT, hal.HAL_OUT)
c.newparam("digital-in-%02d-pullup" % pinmap[port], hal.HAL_BIT, hal.HAL_RW)
c.newpin("network-error", hal.HAL_BIT, hal.HAL_IN)
for i in range(5):
c.newpin("mpg-count-%02d" % i, hal.HAL_S32, hal.HAL_IN)
c.newpin("jog-scale",hal.HAL_FLOAT, hal.HAL_IN)
c.newpin("thetest-1",hal.HAL_BIT, hal.HAL_IN)
c.ready()
def test_changed(obj,data):
print(" got a flip flop")
print(data)
ticker = 0
def scale_ticker(ticker,dir):
if(dir):
if(ticker > 4):
ticker = ticker + 1
else:
ticker = 0
else:
if(ticker > 0):
ticker = ticker - 1
else:
ticker = 4
return ticker
# connect a GSTAT message to a callback function
GSTAT.connect("mode_manual",test_changed)
GSTAT.forced_update()
def update_encoders(encoder_list):
global ticker
print(encoder_list)
c['mpg-count-%02d'% encoder_list[0]] = encoder_list[2]
if(encoder_list[0] == 4):
print("update scale")
print("ticker %d" % ticker)
if(encoder_list[1] > encoder_list[2]):
ticker = scale_ticker(ticker,1)
c['jog-scale'] = encoder_map[ticker]
else:
ticker = scale_ticker(ticker,0)
c['jog-scale'] = encoder_map[ticker]
print("ticker %d" % ticker)
print(encoder_map[ticker])
@gramme.server(8080)
def my_awsome_data_handler(data):
print( data)
#print( bin(data))
match data:
case {b'h':True}:
c['thetest-1'] = not c['thetest-1']
print( "true")
case {b'h':False}:
print ('got some data')
if b'e' in data:
#encoder_list = data[b'e']
update_encoders( data[b'e'])
else:
print( ' didnt find encoders')
case _:
print( "doh")
def nothing():
return
################################################################
# MAIN LOOP #
################################################################
if __name__ == "__main__":
try:
while 1:
GLib.MainLoop().run()
except (KeyboardInterrupt,):
#raise SystemExit, 0
raise SystemExit
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment