Skip to content

Instantly share code, notes, and snippets.

@kevinkm
Created December 30, 2020 04:50
Show Gist options
  • Save kevinkm/de53cd710abe2ff6dedca8b837a357cd to your computer and use it in GitHub Desktop.
Save kevinkm/de53cd710abe2ff6dedca8b837a357cd to your computer and use it in GitHub Desktop.
# Created by .ignore support plugin (hsz.mobi)
# Default ignored files
/shelf/
/workspace.xml
<component name="ProjectDictionaryState">
<dictionary name="KevinFox" />
</component>
<component name="libraryTable">
<library name="MicroPython" type="python">
<CLASSES>
<root url="file://$APPLICATION_PLUGINS_DIR$/intellij-micropython/typehints/stdlib" />
<root url="file://$APPLICATION_PLUGINS_DIR$/intellij-micropython/typehints/micropython" />
<root url="file://$APPLICATION_PLUGINS_DIR$/intellij-micropython/typehints/esp8266" />
</CLASSES>
<SOURCES />
</library>
</component>
<component name="libraryTable">
<library name="MicroPython" type="python">
<CLASSES>
<root url="file://$APPLICATION_PLUGINS_DIR$/intellij-micropython/typehints/stdlib" />
<root url="file://$APPLICATION_PLUGINS_DIR$/intellij-micropython/typehints/micropython" />
<root url="file://$APPLICATION_PLUGINS_DIR$/intellij-micropython/typehints/esp8266" />
</CLASSES>
<SOURCES />
</library>
</component>
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ProjectInspectionProfilesVisibleTreeState">
<entry key="Project Default">
<profile-state>
<expanded-state>
<State>
<id />
</State>
</expanded-state>
<selected-state>
<State>
<id>Buildout</id>
</State>
</selected-state>
</profile-state>
</entry>
</component>
<component name="ProjectRootManager" version="2" project-jdk-name="Python 3.7" project-jdk-type="Python SDK" />
</project>
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ProjectModuleManager">
<modules>
<module fileurl="file://$PROJECT_DIR$/.idea/swordWithMC.iml" filepath="$PROJECT_DIR$/.idea/swordWithMC.iml" />
</modules>
</component>
</project>
<?xml version="1.0" encoding="UTF-8"?>
<module type="PYTHON_MODULE" version="4">
<component name="FacetManager">
<facet type="MicroPython" name="MicroPython">
<configuration>
<device name="ESP8266" />
</configuration>
</facet>
</component>
<component name="NewModuleRootManager">
<content url="file://$MODULE_DIR$" />
<orderEntry type="jdk" jdkName="Python 3.7" jdkType="Python SDK" />
<orderEntry type="sourceFolder" forTests="false" />
<orderEntry type="library" name="MicroPython" level="project" />
</component>
<component name="TestRunnerService">
<option name="PROJECT_TEST_RUNNER" value="Unittests" />
</component>
</module>
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="VcsDirectoryMappings">
<mapping directory="$PROJECT_DIR$" vcs="Git" />
</component>
</project>
# plugged gyro, press connect(it will reboot when connect by uPyCraft) ESP then unplug gyro in 1s after the led flashes ,now connected repl
# uart = uos.dupterm(None, 1) ## Turn off the UART0 (rx tx)on repl by defualt, in order to use it now. But repl will be invaild
from lib.COMMON.timer import Timer
Timer.init()
import network
# from timer import Timer
nic = network.WLAN(network.STA_IF)
if nic.active():
nic.active(False)
ap = network.WLAN(network.AP_IF) # create access-point interface
if ap.active():
ap.active(False) # activate the interface
del (nic,ap)
import gc
gc.collect()
import webrepl
webrepl.start()
exec(open('./boot.py').read(),globals())
# Writes down the codes according to user's tutorial. Thank you for purchasing our modules, have a good day!
from lib.COMMON.MMG_client import microMSG_client
from lib.CLI.gyroctrl_cli.GYROCTRL import GyroCtrl
from lib.COMMON.management import CAR as Main
# from lib.DRV.GYRO.MPU6050_JY61P import MPU6050_JY61P
conn = microMSG_client( apparatus=Main, h_CH=2, Hdatalen='car' )
# mpu6050 = MPU6050_JY61P()
while True:
# print (mpu6050.Str_CarDATA )
GyroCtrl.control( conn, mpu6050.Str_CarDATA )
class GyroCtrl():
def control(conn, data):
conn._send_H( data )
from lib.COMMON.timer import Timer
class Switch( object ): # using timer
def __init__(self, conn):
self.conn = conn
self.CMDing = 0
self.RunAfter = Timer.RunAfter
# a = lambda x,y: int( x.split( '-' )[y])
# self.setgpio = lambda b,bool: conn.slotToGPIO[a(b,0)].value( a(b,1) if bool == 1 else a(b,1) )
# self.setgpio = lambda b,bool: conn.slotToGPIO[a(b,0)].value( bool )
# self.setgpio = lambda b: conn.slotToGPIO[a(b,0)].value( a(b,1) )
self.setgpio = lambda b,bool: conn.slotToGPIO[int(b)].value( bool )
def setstatus(self,i):
for j in self.slots[i]: # Change the status of the list of Slots.
self.setgpio(j, self.status[i])
# print('SET STATUS IS ', self.status[i])
def MSG(self, data):
if self.CMDing: # while in process of CMDing, can receive new data, but can't deal with it
for i in range( len( self.slots ) ):
if self.rounds[i]>0:
if self.RunAfter ( self.interval[i] ):
print('CHANGE status of switch')
# print ('running in CMDing')
self.status[i] = not self.status[i]
self.setstatus(i)
if self.status[i]==0: # when status is OFF, rounds - 1 ??
self.rounds[i] -= 1
if self.rounds[i] == 0:
self.setstatus(i)
self.countTimes -= 1
# print( '\ncountTimes -1 \n' )
if self.countTimes==0:
self.CMDing = 0
print( '\nDone! You can receive data\n' )
# print (self.rounds[i])
# print('\n\n\nswitch to', self.S,'\n\n\n')
elif data!=None:
self.slots, self.interval, self.rounds, self.status, b2, = [], [], [], [], []
# print ('data:',data)
a = data.split(',')
for i in range(len(a)):
aa = a[i].split(':')[0]
if '_' in aa:
for j in aa.split('_'):
b2.append(j)
self.setgpio(j,True)
self.slots.append( b2 )
b2=[]
else:
self.setgpio( aa, True ) # set GPIO high
# print ('set GPIO high')
self.slots.append( aa )
self.interval.append('MSG'+str(i)+'_' + str(int(a[i].split(':')[1].split( '-' )[0]) * 1000))
self.rounds.append(int(a[i].split(':')[1].split('-')[1]))
self.status.append(True) # each of status of switch is ON
self.countTimes = len( self.slots )
self.CMDing = 1
self.conn.switchMSG = None # useless?
# print('slots:', self.slots, ' times:', self.rounds, ' interval:', self.interval, ' status:', self.status )
def recvRepeatMSG_loop(self):
if self.CMDing or self.conn.switch_loop() :
# print ('LOOPing',self.conn.switchMSG,' self.runing', self.running)
self.MSG(self.conn.switchMSG)
class Arduino_func():
__slots__ = ('leftMin', 'leftMax', 'rightMin','rightMax')
def __init__(self, leftMin, leftMax, rightMin, rightMax ):
self.leftMin = leftMin
self.leftMax = leftMax
self.rightMin = rightMin
self.rightMax = rightMax
def map(self, value, order=1):
if value < self.leftMin or value > self.leftMax:
return None
# Figure out how 'wide' each range is
leftSpan = self.leftMax - self.leftMin
rightSpan = self.rightMax - self.rightMin
# Convert the left range into a 0-1 range (float)
valueScaled = float( value - self.leftMin ) / float( leftSpan )
# Convert the 0-1 range into a value in the right range.
if order == 1:
return int( self.rightMin + (valueScaled * rightSpan) )
elif order == -1:
return int( (self.rightMin + self.rightMax) - (self.rightMin + (valueScaled * rightSpan)) )
'''
#Usage:
arduino_map=Arduino_map(0,180,30,120)
arduino_map(150)
#it will return a number between 30 to 120, which due to the value that scaled ascending order by 0 to 180,
#if "-1" as one of parameters
arduino_map.map(150,-1)
# it will be scaled descending order by 180 to 0
'''
from machine import Pin
from lib.COMMON.timer import Timer
class Force(): # using Timer
def __init__(self, IO, funclist, funcP=None, diffi='normal'):
# from lib.management import MCBOX_IOassign
# self.IO={'MC':Pin( IO.MC_IN, Pin.IN ), 'VIBRATE':Pin( MCBOX_IOassign.VIBRATE, Pin.OUT ), 'MCPOWER':Pin( MCBOX_IOassign.MCPOWER, Pin.OUT )}
self.IO={'MC' : Pin( IO.MC_IN, Pin.IN,Pin.PULL_DOWN )}
# self.IO['MCPOWER'].value( 1 )
# self.IO['VIBRATE'].value( 0 )
self.MCtimes, self.round = 0, 0
self.failedRound = 35
self.funclist=funclist
self.funcP=funcP
self.target_i = len(funclist)
self.RunAfter = Timer.RunAfter
self.gotForce = 0
if diffi == 'easy':
self.MCthreshold = 3
elif diffi == 'normal':
self.MCthreshold = 5
elif diffi == 'hard':
self.MCthreshold = 9
def reset_parameters(self):
self.MCtimes = 0
self.round = 0
print ('a round, reset parameters ')
@property
def forceON(self):
# self.conn.Main()
if self.gotForce==0: # when inputDone=1, force_both stop working, but sword_srv start to work, details to see sword_main.py
if self.RunAfter( 'countMC_1000' ):
# if self.IO['MC'].value() == 1 or 1: # for test
if self.IO['MC'].value(): # run officially
self.MCtimes += 1
self.round += 1
if (self.round >= 10):
if self.MCtimes >= self.MCthreshold:
# self.runTrigger()
if self.MCthreshold == 3: # 3 means easy
# run all ID once
for i in range(len(self.funclist)):
self.funclist[i]( self.funcP )
print ('bodyID and targetID')
elif self.MCthreshold > 3: # greater than 3 means normal or hard
if self.target_i == 1:
print( 'Run Final func' )
self.funclist[0]( self.funcP )
self.target_i = len (self.funclist) # reset target_i
self.gotForce = 1 #when inputDone=1, MC stop working, but sword_srv start to work, details to see sword_main.py
elif self.target_i > 1:
self.funclist[self.target_i-1]( self.funcP )
print('The running func is no.: ', self.target_i )
self.target_i -= 1
else:
print ('fail to activate')
self.reset_parameters()
else:
return 1
# class sort by name
class Sword_AP():
ssid = 'Sword_AP'
passwd = '123456789'
IPprefix = '192.168.4.'
class IO():
MC_IN=22
RELAY_OUT=27
class CMD():
Lhand='1:1-1'
Rhand1='1_2:1-1'
Rhand2='1_2:4-1' # S1,S2, begin from ON at once, next running is opposite of beginning, after interval of round = 4s, start by round-cycle, 1 round in all
target='1:10-1'
class SWORD_Lhand(): # touch electrical light, it was foot before
ssid = Sword_AP.ssid
passwd = Sword_AP.passwd
IPprefix = Sword_AP.IPprefix
ID = 17
S_Num = 1
S1 = 4 #d2, GPIO4
# cmd = '1:1-1' # S1, begin from ON immediately, next running is opposite of beginning, after interval of round= 0.5s,, start by round-cycle, 1 round in all.
class SWORD_Rhand(): # hold the swordWithMC
ssid = Sword_AP.ssid
passwd = Sword_AP.passwd
IPprefix = Sword_AP.IPprefix
ID = 18
S_Num = 2
S1 = 14 #d5, GPIO14
S2 = 12 #d6, GPIO12
# cmd_2 = '1_2:4-1' # S1,S2, begin from ON at once, next running is opposite of beginning, after interval of round = 4s, start by round-cycle, 1 round in all
# cmd_2 = '1:8-5,2:2-9' # S1,S2, begin from ON at once, next running is opposite of beginning, after interval of round = 4s, start by round-cycle, 1 round in all
class SWORD_target():
ssid = Sword_AP.ssid
passwd = Sword_AP.passwd
IPprefix = Sword_AP.IPprefix
ID = 19
S_Num = 1
S1 = 0 # d3, GPIO0
# cmd = '1:10-1' # S1, begin from ON immediately, next running is opposite of beginning, after interval of round= 10s,, start by round-cycle, 1 round in all.
class CAR():
ssid = 'CAR_AP'
passwd = '123456789'
IPprefix = '192.168.4.'
ID = 13
D_sensor = False
class Data_AP():
ssid = 'Data_AP'
passwd = '123456789'
IPprefix = '192.168.4.'
class highSpeed():
ssid = 'Data_AP'
passwd = '123456789'
ID = 2
IPprefix = '192.168.4.'
class branch1():
ID = 3
class lowSpeed():
ssid = 'Data_AP'
passwd = '123456789'
ID = 4
IPprefix = '192.168.4.'
class branch1():
ID = 5
class branch2():
ID = 6
class powerStrip_AP(): # CAN'T BE HAVE BRANCH
ssid = 'powerStrip_AP'
passwd = '123456789'
IPprefix = '192.168.4.'
class CMD():
No1 = '2:3-1'
No2 = '2:3-1'
class powerStrip_1():
ssid = powerStrip_AP.ssid
passwd = powerStrip_AP.passwd
IPprefix = powerStrip_AP.IPprefix
ID = 8
S_Num = 6
S1 = 0 # GPIO 0
S2 = 2
S3 = 4
S4 = 5
S5 = 12
S6 = 13
class powerStrip_2():
ssid = powerStrip_AP.ssid
passwd = powerStrip_AP.passwd
IPprefix = powerStrip_AP.IPprefix
ID = 9
S_Num = 6
S1 = 0 # GPIO 0
S2 = 2
S3 = 4
S4 = 5
S5 = 12
S6 = 13
class MCBOX_IOassign(): # not using yet
MC_IN=13
VIBRATE=4
MCPOWER=5
'''
This is microMG, MG = message, go!
'''
import socket
import network
from machine import reset as machinereset
from lib.COMMON.timer import Timer
sta = network.WLAN( network.STA_IF )
sta.active( 1 )
class microMSG_client():
# __slots__ = ['ID', 'switchlen', 'shakeData_port', 'H_port', 'regularData_port', 'H_failCount', 'LData_len','HdataLen_str','Hcheck','shakeData_len','switchOffCount','isHighSpeed','switchConn','']
def __init__(self, apparatus,
h_CH='', Hdatalen=10, Ldatalen=10, Ltime=1000, isSendRepeat=1, switchMode=0): #h_CH=1...5
# program start
self.ID = 'ID' + '%04d' % apparatus.ID
if Hdatalen == 'car':
Hdatalen = 7
if switchMode==1:
from machine import Pin
import time
self.ran_switch = time.ticks_us() % 1000
self.slotToGPIO={}
for i in range( 1, apparatus.S_Num + 1 ):
self.slotToGPIO.update( {i: Pin( apparatus.__dict__.get( 'S' + str( i ) ), Pin.OUT )} )
self.switchlen = 31
self.shakeData_port = 700
self.H_port = [800, 801, 802, 803, 804]
# self.competition_port = 900
self.regularData_port = 1000
self.H_failCount = 0
self.LData_len = Ldatalen
self.HdataLen_str = '%04d' % Hdatalen
self.Hcheck = ''.join( ['*' for i in range( Hdatalen )] )
self.shakeData_len = 6
self.switchOffCount = 0
self.isHighSpeed = 1
# self.tempcount = 1
self.switchConn = None
import time
self.ran_CL = (time.ticks_us() % 100) * 2
# del (time)
self.CLtime_ms = Ltime if Ltime >= 1000 else 1000
self.isHalfduplex_recv = 1
self.Latest_Hdata=None
# self.switchData,self.switchStatus= '','ON'
# Timer.init()
self.RunAfter = Timer.RunAfter
self.isSendRepeat=isSendRepeat
# connect to WIFI , then socket
if sta.isconnected() == 0:
sta.ifconfig( (apparatus.IPprefix + str( apparatus.ID ), '255.255.255.0', apparatus.IPprefix + '1', apparatus.IPprefix + '1') )
self.connectwifi( sta, apparatus.ssid, apparatus.passwd )
else:
print( 'WIFI has already CONNECTED' )
# CREATE SHAKEDATA
if h_CH != '':
self.highSpeed_port = self.H_port[h_CH]
self.shakeData = '%04d' % apparatus.ID + 'H' + str( h_CH )
elif switchMode == 1:
print( 'switch_cli MODE' )
globals()['i' + '%04d' % apparatus.ID + 'runtimes'] = 0
self.shakeData = '%04d' % apparatus.ID + 'S_'
self.isSwitch = 1
else:
self.shakeData = '%04d' % apparatus.ID + 'L_'
# IF HIGH SPEED
if h_CH != '':
# SEND SHAKE DATA
self.sendUDP( port=self.shakeData_port, data=self.shakeData )
print( 'sent self.shakeData : ' + self.shakeData )
# SEND THE LENGTH OF HIGHSPEED, WAIT UNTIL RECEIVE STR: 'OK'
conn, recvData = self.verifyTCP( self.HdataLen_str, 8000 )
if recvData == 'OK':
print( 'trying connect to high speed port: ' + str( self.highSpeed_port ) )
self.H_clientSocket = self.try_connConnnect( int( self.highSpeed_port ) )
if self.H_clientSocket != 1:
self.isHighSpeed = 1
self.H_clientSocket.settimeout( 0.1 )
else:
print( '\n\nH_clientSocket is 1\n\n' )
conn.close()
else:
print( ' can not receive the response from high speed port 8000 ' )
Timer.delayMS(5000)
def connectwifi(self, station, ssid, password):
counter = 0
station.disconnect()
while station.isconnected() == 0:
Timer.delayMS(1000)
print(' is trying to connect WIFI')
station.connect(ssid, password)
Timer.delayMS(5000)
counter += 1
if counter >= 8 or station.isconnected() == 0:
print('can not find the AP , go to sleep now!')
def sendUDP(self, port, data):
count = 0
while 1:
try:
conn = socket.socket()
conn.connect( ('192.168.4.1', port) )
self._send_data( conn, data )
conn.close()
break
except:
count += 1
Timer.delayMS(800)
print( 'shake data can not send.' )
if count > 4:
# uos.dupterm(uart, 1) # for moment when test
machinereset() # for moment when officially start to use
def DoneSwitchconn(self):
try:
self.switchConn.close()
self.switchConn=None
print ('switchConn close succeed')
except:
print ('switchConn close failed')
pass
self.switchOffCount=0
self.isHalfduplex_recv=1
def verifyTCP(self, data, port, mode=''): # halfduplex means: send data in this round, receive data in next round
if self.isHalfduplex_recv == 1: # "isHalfduplex_recv" only for SWITCH MODE
conn = self.try_connConnnect( int( port ) )
if conn != 1:
self._send_data( conn, data )
else:
print ('get conn of verifyTCP failed')
self.DoneSwitchconn()
return 1, 1
else:
conn = self.switchConn
if conn != 1:
if mode == '':
conn.settimeout( 5 )
elif mode == 'halfduplex' and self.isHalfduplex_recv == 1:
conn.settimeout( 0 )
self.isHalfduplex_recv = not self.isHalfduplex_recv
return conn, 'responding'
elif mode == 'halfduplex':
self.isHalfduplex_recv = not self.isHalfduplex_recv
recv = self._recv_data( conn, self.shakeData_len + self.switchlen ) # receive: NO, OFF, OK, CHECK,
return conn, recv
def try_connConnnect(self, port):
count = 0
while count < 5:
try:
conn1 = socket.socket()
conn1.connect( ('192.168.4.1', port) )
return conn1
except:
count += 1
Timer.delayMS(750)
print( 'can not connect to server port: ' + str( port ) )
return 1
def _recv_data(self, conn, length):
try:
recv = conn.recv( length ).decode()
if recv == '':# for test, able to be deleted.
print( 'receive empty, try again' ) # for test, able to be deleted.
except:
recv = '_'
print ('NO data can be received, so it will be _')
return recv
def _send_data(self, conn, str):
conn.send( str.encode() )
def switch_loop(self):
recvData=''
if self.isSwitch == 1:
if self.RunAfter( self.ID + '_' + str( 1000 + self.ran_switch ) ):
while recvData=='':
if self.RunAfter('tryRecv_150'):
if self.switchOffCount > 7: # if can't recv data over 7times, send 'check' to server to prove not offline.
self.switchConn, recvData = self.verifyTCP( self.shakeData + 'check', self.regularData_port,
mode='halfduplex' )
else:
self.switchConn, recvData = self.verifyTCP( self.shakeData ,
self.regularData_port, mode='halfduplex' )
print( 'Non-Responsed-Times is : ' + str( self.switchOffCount ) )
if recvData not in ('responding', '_', 1):
self.switchConn.close()
rawdata = recvData[self.shakeData_len:] # rawdata in switch_cli is 0 or 1
print ('\nGET SWITCH DATA: ', rawdata,'\n')
unique_ID = recvData[:4]
if unique_ID == self.ID[2:]:
self.switchOffCount = 0
if rawdata == 'check':
self.switchOffCount = 0
elif ':' in rawdata:
self.switchMSG = rawdata
return 1 # funclib needs this
else: # when data you receive isn't : ON, OFF , check
b = rawdata.split( ',' )
for i in range( len( b ) ):
self.slotToGPIO[int( b[i].split( '-' )[0] )].value( int( b[i].split( '-' )[1] ) )
else:
self.switchOffCount += 1
if self.switchOffCount > 24:
print ('recvData: ',recvData)
print( ' set switch_cli be offline' )
def _send_L(self, data):
if self.RunAfter( self.ID + '_' + str( self.CLtime_ms + self.ran_CL ) ):
self.sendUDP( data=self.shakeData + data, port=self.regularData_port )
print ('send L')
def _send_H(self, data):
if self.isHighSpeed and self.RunAfter( self.ID + '_50' ):
try:
if self.isSendRepeat or data != self.Latest_Hdata:
self.Latest_Hdata = data
self.H_clientSocket.send( self.Latest_Hdata.encode() )
self.H_failCount = 0
elif self.RunAfter( self.ID + 'HC_14000' ):
self.H_clientSocket.send( self.Hcheck )
except:
Timer.delayMS(700)
self.H_failCount += 1
print( ' Highspeed data can not send.' )
if self.H_failCount > 8:
self.isHighSpeed=1
print ('isHighSpeed 1')
return 1 # for moment when test
# machine.reset() # for moment when officially start to use
def Main(self):
self.switch_loop()
'''
def competition_client(self, checkin):
self.client_socket.settimeout( -1 )
data = self._recv_data( self.client_socket, self.online_len )
while data == 'B':
# not yet, count down, and make a beep for 5s
print( 'warning, MC check start after 5s' )
Timer.delayMS(5000)
while 1:
Timer.delayMS( 1000 ) # save energy than run_after
if checkin:
self._send_data( self.client_socket, '1' )
self.client_socket.settimeout( 0.2 )
if self._recv_data( self.client_socket, self.online_len ) == 'W':
print( 'winner' )
self.client_socket.settimeout( 0 )
data = '_' # jump out the whole loop.
else:
print( 'you out, waiting for next chance' )
self.client_socket.settimeout( -1 )
if self._recv_data( self.client_socket, self.online_len ) == 'B':
break
'''
import socket
import time
import network
from lib.COMMON.timer import Timer
class microMSG_server():
def __init__(self, WIFI, test=0):
ap = network.WLAN(network.AP_IF) # create access-point interface
ap.active(1) # activate the interface
ap.config(essid=WIFI.ssid, password=WIFI.passwd,authmode=4,channel=9) # set the ESSID of the access point
time.sleep_ms(3000)
self.printTest=test # only for test, control by main.py
self.shake_port = 700
# self.competition_port = 900
self.regularData_port = 1000
# self.competition_users_num = 0
self.shake_socket = socket.socket() # get instance
self.shake_socket.bind(('0.0.0.0', self.shake_port)) # bind host address and port together
self.shake_socket.listen(5)
self.shake_socket.settimeout(0)
self.N_socket = socket.socket() # get instance
self.N_socket.bind(('0.0.0.0', self.regularData_port)) # bind host address and port together
self.N_socket.listen(5)
self.N_socket.settimeout(0)
self.data_len = 10
self.Hdata_str = None
self.Ldata_str = None
self.shakeData_len = 6
# self.compRecv_len = 1
self.recv_List = []
self.isHighSpeed = [0, 0, 0, 0, 0]
self.H_conn = ['H_conn0', 'H_conn1', 'H_conn2', 'H_conn3', 'H_conn4']
self.H_checkOnline = {0: 0, 1: 0, 2: 0, 3: 0, 4: 0}
self.H_socket = {'H_socket0': 800, 'H_socket1': 801, 'H_socket2': 802, 'H_socket3': 803, 'H_socket4': 804}
self.isGet_H = 0
self.shakeToLData_dict = {} # i don't remember, but it may be only for competition.
self.switch_dict = {}
self.isSendSwitch = {}
# Timer.init() # run in boot.py
self.RunAfter = Timer.RunAfter
def updateSwitch(self, ID, cmd):
self.switch_dict['%04d' % ID] = [cmd, True]
print ('%04d' % ID,'update switchDict: ',self.switch_dict['%04d' % ID])
# print ('update switchDict: ',self.switch_dict)
def try_connAccept(self, conn1):
try:
conn, addr = conn1.accept()
return conn
except:
pass
return 0
def wait_newconn(self):
if self.RunAfter('newConn_1100'):
try:
# print ('check newConn in every 1.1s')
conn1, addr = self.shake_socket.accept() # just use addr here, addr is ip, it's same like online_socket.
conn1.settimeout(
2.8) # timeout for 3s to receive the shakeData, do not use -1, incase signal just block for awhile
# print ('checked newConn ')
try:
shakeData = self._recv_data(conn1, self.shakeData_len)
except:
print( 'no shakeData, it may a switch_cli' )
pass
conn1.settimeout(0)
conn1.close()
print( 'GOT SHAKE_DATA: ' + shakeData )
if shakeData[0] != '_':
# Add into list
self.shakeToLData_dict[shakeData] = ''
# USAGE: constantly use client, it needs to start after the previous one has been settled down.
if shakeData[-2:-1] == 'H':
i = int(shakeData[-1:]) # i is the number of channel
# print( 'HighSpeed' )
# STEP ONE, create socket in port( 8000) to receive length of H_data
socketLength = socket.socket()
# print( 111111111 )
socketLength.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
socketLength.bind(('0.0.0.0', 8000))
# print( 222222222 )
socketLength.listen(5)
socketLength.settimeout(10)
connLength = self.try_connAccept(socketLength)
if connLength != 0:
connLength.settimeout(5) # keep 5s, very important before create high speed connection.
recv = self._recv_data(connLength, 4)
# print( 3333333333 )
print( 'recv from connLength: ' + recv )
# send length back
if recv != '_':
self.Hdata_len = int(recv)
# self.Hcheck=['*' for i in range( self.Hdata_len )] # no need.
print( 'receive length: ' + recv + ' NOW send OK back' )
self._send_data(connLength, 'OK')
print( 'CLOSE connLength AND socketLength ' )
else:
print( 'can not recv length H_ch' )
pass
else:
print( 'NO request port 8000, done.' )
pass
# STEP TWO, create socket in port(800...804) to receive H_data as such
if connLength != 0:
H_socket = list(sorted(self.H_socket.keys()))[i] # get data from position 0
globals()[H_socket] = socket.socket() # this line must after try.
globals()[H_socket].setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
globals()[H_socket].bind(
('0.0.0.0', self.H_socket[H_socket])) # bind host address and port together
globals()[H_socket].listen(5)
globals()[H_socket].settimeout(5)
print( 'bound port: ' + str( self.H_socket[H_socket] ) )
globals()[self.H_conn[i]] = self.try_connAccept(globals()[H_socket])
print (globals()[self.H_conn[i]])
if globals()[self.H_conn[i]] != 0:
globals()[self.H_conn[i]].settimeout(0.05)
self.isGet_H = 1
self.isHighSpeed[i] = 1
print( 'start to receive high speed data ' )
connLength.close()
socketLength.close()
else:
print( 'NO connections-request on port ' + str( i ) )
pass
# self.competition_users_num += 1
except:
pass
'''
def competition_server(self, Setnum_Users):
# competition_conn = List_NormalCHANNEL_conn
competition_conn = list(self.shakeToLData_dict.keys())
def sendTOALLcompetition(str):
for i in competition_conn:
self._send_data(globals()[competition_conn[i]], str)
def recvTOALLcompetition():
for i in competition_conn:
data = self._recv_data(globals()[competition_conn[i]], self.compRecv_len)
if data[0] == '_':
self.competition_users_num -= 1
competition_conn.pop(i)
def begin_competition():
if self.RunAfter('competition_1000', 4):
sendTOALLcompetition('B')
if self.competition_users_num >= Setnum_Users:
begin_competition()
# confrontation loop start from here
while 1:
# begin at first time
if self.competition_users_num > 1:
if self.RunAfter('competition_1000'):
recvTOALLcompetition()
# Winner is born
elif self.competition_users_num == 1:
globals()[competition_conn[0]].send(b"W")
self._recv_data(globals()[competition_conn[Setnum_Users]],
self.compRecv_len) # important clear buffer
return globals()[competition_conn[0]]
# reset parameters for startover
elif self.competition_users_num == 0:
begin_competition()
'''
def _send_data(self, conn, str):
conn.send( str.encode() )
def _recv_data(self, conn, length):
self.connAccept = conn
try:
if length == self.shakeData_len or length == 4:
recv = self.connAccept.recv(length).decode()
else:
self.connAccept = conn.accept()[0]
recv = '_'
while 1:
try:
recv = self.connAccept.recv(length).decode()
time.sleep(0.01)
# self.recv_List.append(recv)
shakeData = recv[:self.shakeData_len]
self.shakeToLData_dict[shakeData] = recv[self.shakeData_len:]
# globals()['online' + shakeData] = 1
break
except:
pass
except:
# List_CheckOnlineCHANNEL_conn.append( Var_Name_str )
recv = '_'
return recv
def _get_L(self):
recv = self._recv_data(self.N_socket, self.shakeData_len + self.data_len) # recv has included accept()
if recv != '_':
shakedata = recv[:self.shakeData_len]
uniqueid = shakedata[:4]
rawdata = recv[self.shakeData_len:]
if shakedata[-2:] == 'S_': # 'receive a swicth request'
print( 'receive a switch_cli request, id is: ', uniqueid )
# problem:
# 1. if isSendSwitch=Bool, it turns False since updateed, but other switch may need to update
# 2. if isSendSwitch is num, it will add up when
# if self.isSendSwitch > 0 :
try:
a=self.switch_dict[uniqueid][1]
except:
print('No this ID ' + uniqueid + ' in switch_dict')
a=0
if a :
self._send_data(self.connAccept, shakedata + self.switch_dict[uniqueid][0])
self.switch_dict[uniqueid][1] = False
print('\nsending updated data to client for switch_cli:',shakedata + self.switch_dict[uniqueid][0],'\n\n')
elif rawdata == 'check':
self._send_data(self.connAccept, shakedata + 'check')
print( 'check client for switch_cli' )
# else: # this is for test
# print( 'no need to response for switch_cli request! ' )
# pass
self.connAccept.close()
elif shakedata[-2:] == 'L_':
self.Ldata_str = recv[self.shakeData_len:]
if self.printTest == 1:
print( 'Ldata_str: ' + recv )
self.connAccept.close()
def _get_H(self, i, conn):
try:
recv = conn.recv(self.Hdata_len).decode() # don't use _recv_data()
except:
recv = '_'
if recv == '_':
self.Hdata_str = None
self.H_checkOnline[i] += 1
# accumulation greater than 200
if self.H_checkOnline[i] > 100: # 100 = about 10s
self.isHighSpeed[i] = 0
self.H_checkOnline[i] = 0
conn.close() # close accept
globals()[list(sorted(self.H_socket.keys()))[i]].close() # close socket
print( 'high speed CHANNEL-' + str( i ) + ' has got lost: ' )
print( list( sorted( self.H_socket.keys() ) )[i] + ' closed' )
del (globals()[self.H_conn[i]])
print( self.H_conn[i] + ' closed' )
del (globals()[list(sorted(self.H_socket.keys()))[i]])
self.Hdata_str = 'offline'
else:
self.H_checkOnline[i] = 0
self.Hdata_str = recv if '*' not in recv else None
if self.printTest==1:
print( 'Hdata_str: ' + self.Hdata_str )
def start_server(self):
if self.isGet_H and self.RunAfter('HS_50'):
for i in range(5):
if self.isHighSpeed[i]:
self._get_H(i, globals()[self.H_conn[i]])
if self.RunAfter('lowSpeed_350'):
self._get_L()
def Main(self): # put it into "while" statement
self.wait_newconn()
self.start_server()
import time
class Timer():
__slots__ = ['RunAfter', 'ticksAdd', 'ticks_ms_3', 'delayMS', 'ticks_ms', 'ticks_add', 'delay_ms']
def delayMS(t):
time.sleep( (t / 1000) )
def ticks_ms_3():
return int( time.time() * 1000 )
def ticksAdd(a, b):
return a + b
def init():
# global ticks_ms, ticks_add, delay_ms
print ('\nInit timer')
import sys
if 'esp' in sys.platform:
print( '\nESP SYSTEM\n' )
Timer.ticks_ms = time.ticks_ms
# Timer.ticks_add = time.ticks_add
Timer.ticks_add = Timer.ticksAdd
Timer.delay_ms = time.sleep_ms
else:
print( 'NOT ESP SYSTEM' )
Timer.ticks_ms = Timer.ticks_ms_3
Timer.ticks_add = Timer.ticksAdd
Timer.delay_ms = Timer.delayMS
del sys
def RunAfter(str1):
if str1 not in globals().keys():
globals()[str1] = Timer.ticks_add( Timer.ticks_ms(), int( str1.split( '_' )[1] ) )
elif globals()[str1] - Timer.ticks_ms() < 0:
del (globals()[str1])
return True
else:
return False
from machine import UART
from lib.COMMON.Arduino import Arduino_func
from lib.COMMON.timer import Timer
# import uos, time
class MPU6050_JY61P():
def __init__(self, type=('angle')):
'''
EPS32 MODULE
uart=UART( 1, baudrate=115200, bits=8, parity=None, stop=1, tx=12, rx=14, rts=-1, cts=-1, txbuf=256, rxbuf=256,
timeout=5000, timeout_char=2 )
self.uart.init( 115200, bits=8, parity=None, stop=1, pins=(17, 16) )
# but the 1 may be used as pin9 and pin10, try uart2 first
uart = UART( 2, baudrate=115200) # for esp32, it is said that uart2 default tx=17,rx=16
self.uart.init( 115200, bits=8, parity=None, stop=1 )
'''
import sys
s=sys.platform
if s=='esp8266':
p=0
elif s=='esp32': #rx16,tx17
p=2
self.getAngle,self.getPalstance,self.getAccel,=0,0,0
if 'angle' in type:
self.getAngle=1
if 'palstance' in type:
self.getPalstance=1
if 'accel' in type:
self.getAccel=1
self.uart = UART( p, baudrate=115200 )
self.uart.init( 115200, bits=8, parity=None, stop=1 )
self.Re_buf = []
self.angle, self.accel, self.palstance= [], [], []
self.sum_rx_bytes, self.rx_bytes = b'', b''
#########################
self.startLroll = 0
self.endLroll = 60
self.startRroll = 359
self.endRroll = 300
self.startFrotate = 359
self.endFrotate = 300
self.startBrotate = 0
self.endBrotate = 60
self.startSpin=0
self.endSpin=359
self.RunAfter = Timer.RunAfter
self.Lrange = range( self.startLroll, self.endLroll + 1 )
self.Rrange = range( self.endRroll, self.startRroll + 1 )
self.edge_Lrange = range( self.endLroll, 180 + 1 )
self.Frange = range( self.endFrotate, self.startFrotate + 1 )
self.Brange = range( self.startBrotate, self.endBrotate + 1 )
self.edge_Brange = range( self.endBrotate, 180 + 1 )
self.Srange= range(self.startSpin,self.endSpin+1)
def calcGRYO(self, var, basenum):
for i in range(2, 7, 2):
var.append(int((int(self.Re_buf[basenum + 1 + i] << 8 | self.Re_buf[basenum+i])) / 32768.0 * 180))
def current_axis(self):
while (b'US' not in self.rx_bytes): #UQ=51, US=53
# if self.RunAfter('g_3000'):
if self.uart.any():
self.rx_bytes = self.uart.read()
self.sum_rx_bytes = self.sum_rx_bytes + self.rx_bytes
# print (11111)
# else:
# print ('can not get b"UT", so break while ')
# break
if (len(self.sum_rx_bytes))==33:
# print (22222)
self.Re_buf, self.angle, self.palstance, self.accel= [], [], [], []
for i in range( 0, 33 ):
one_byte = self.sum_rx_bytes[i:i + 1]
one_int = int.from_bytes( one_byte, "big" )
self.Re_buf.append( one_int )
if self.getAccel and self.Re_buf[0] == 0x55 and self.Re_buf[1] == 0x51:
self.calcGRYO(self.accel, 0)
if self.getPalstance and self.Re_buf[11] == 0x55 and self.Re_buf[12] == 0x52:
self.calcGRYO( self.palstance, 11 )
if self .getAngle and self.Re_buf[22] == 0x55 and self.Re_buf[23] == 0x53:
self.calcGRYO( self.angle, 22 )
self.sum_rx_bytes, self.rx_bytes = b'', b''
return {'accel':self.accel, 'palstance':self.palstance, 'angle': self.angle}
# function
def A2(self, angle, range1=0, range2=0, edge_range1=0, startMap1=0, endMap1=0, startMap2=0, endMap2=0, name=0, Maxlevel=10): #angle2direction
if angle in range1:
arduino_map = Arduino_func(startMap1, endMap1, 0, Maxlevel)
return name[0] + '%02d' % arduino_map.map( angle )
elif angle in range2:
arduino_map = Arduino_func(endMap2, startMap2, 0, Maxlevel)
return name[1] + '%02d' % arduino_map.map( angle, -1 )
elif angle in edge_range1:
return name[0] + '10'
elif angle < endMap2:
return name[1] + '10'
def A2D(self, angle, Maxlevel=10): #angle2direction
if angle in self.Lrange:
arduino_map = Arduino_func( self.startLroll, self.endLroll, 0, Maxlevel )
return 'L' + '%02d' % arduino_map.map( angle )
elif angle in self.Rrange:
arduino_map = Arduino_func( self.endRroll, self.startRroll, 0, Maxlevel )
return 'R' + '%02d' % arduino_map.map( angle, -1 )
elif angle in self.edge_Lrange:
return 'L10'
elif angle < self.endRroll:
return 'R10'
def A2M(self, angle, Maxlevel=10): #angle2move
if angle in self.Brange:
arduino_map = Arduino_func( self.startBrotate, self.endBrotate, 0, Maxlevel )
return 'B' + '%02d' % arduino_map.map( angle )
elif angle in self.Frange:
arduino_map = Arduino_func( self.endFrotate, self.startFrotate, 0, Maxlevel )
return 'F' + '%02d' % arduino_map.map( angle, -1 )
elif angle in self.edge_Brange:
return 'B10'
elif angle < self.endFrotate:
return 'F10'
def angleSpin(self,angle, Maxlevel=20):
if angle in self.Srange:
arduino_map = Arduino_func( self.startSpin, self.endSpin, 0, Maxlevel )
return 'S' + '%02d' % arduino_map.map( angle )
elif angle in self.endSpin:
return 'S20'
@property
def Str_CarDATA(self):
current_ANGLE = self.current_axis()['angle']
str1=self.A2(current_ANGLE[0], self.Lrange, self.Rrange, self.edge_Lrange, self.startLroll, self.endLroll, self.startRroll, self.endRroll, 'LR')
str2=self.A2(current_ANGLE[1], self.Brange, self.Frange, self.edge_Brange, self.startBrotate, self.endBrotate, self.startFrotate, self.endFrotate, 'BF')
final_str1 = str1 + '-' + str2
return final_str1
@property
def Str_AngleDATA(self):
# print( self.current_axis )
a=self.current_axis()['angle']
# print (a)
if len(a)==3:
current_ANGLE = a
# str1 = self.A2D( current_ANGLE[0] )
# str2 = self.A2M( current_ANGLE[1] )
str1 = self.A2( current_ANGLE[0], self.Lrange, self.Rrange, self.edge_Lrange, self.startLroll,
self.endLroll, self.startRroll, self.endRroll, 'LR' )
str2 = self.A2( current_ANGLE[1], self.Brange, self.Frange, self.edge_Brange, self.startBrotate,
self.endBrotate, self.startFrotate, self.endFrotate, 'BF' )
str3 = self.A2( current_ANGLE[2], self.Srange, edge_range1=self.endSpin, startMap1=self.startSpin,
endMap1=self.endSpin, name='S', Maxlevel=20 )
# str3 = self.angleSpin( current_ANGLE[2] )
final_str1 = str1 + '-' + str2 + '-' + str3
return final_str1
# def Decode_CarDATA(self, bytes):
# return bytes.decode( "uft-8" ).split( '-' ) # format is: ['Lx/Rx','Fx/Bx']
'''
USAGE:
# HOWTO get the three angles from MPU6050
from GYRO import MPU6050_JY61P
mpu6050=MPU6050_JY61P()
print (mpu6050.current_angle)
>>>[num1,num2,num3]
# HOWTO switch_cli angle to direction or move, format of level is L/Rx, eg.L4 or R6
mpu6050.angle2direction(mpu6050.current_angle[0])
mpu6050.angle2move(mpu6050.current_angle[1])
#ADVANCED USAGE:
#HOW TO use in car
from GYRO.MPU6050_JY61P import Decode_CarDATA
'''
from machine import time_pulse_us,Pin
import time
class HCSR04:
# echo_timeout_us is based in chip range limit (400cm)
def __init__(self, direction,trigger_gpio=15, echo_gpio=14, echo_timeout_us=500 * 2 * 30 ): #D8=IO15,D5=IO14
self.echo_timeout_us = echo_timeout_us
# Init trigger pin (out)
self.trigger = Pin( trigger_gpio, mode=Pin.OUT, pull=None )
self.trigger.value(0)
self.direction=direction
# Init echo pin (in)
self.echo = Pin( echo_gpio, mode=Pin.IN, pull=None )
self.sensor=1
self._sensorRead=True
self.A, self.last_D = 3, 0
self.T = 250
self.moveOn=...
def _send_pulse_and_wait(self):
self.trigger.value(0) # Stabilize the sensor
time.sleep_us(5)
self.trigger.value(1)
# Send a 10us pulse.
time.sleep_us(10)
self.trigger.value(0)
try:
pulse_time = time_pulse_us(self.echo, 1, self.echo_timeout_us)
return pulse_time
except OSError as ex:
if ex.args[0] == 110: # 110 = ETIMEDOUT
raise OSError('Out of range')
raise ex
@property
def vector(self):
if self._sensorRead:
pulse_time = self._send_pulse_and_wait()
cms = (pulse_time / 2) / 29.1
return self.direction +'-'+ (int(cms))
def distance_cm(self): # only for test
pulse_time = self._send_pulse_and_wait()
cms = (pulse_time / 2) / 29.1
return cms
def outputP(self, last, current, v):
if last < current:
return 30, 70, 100
else:
return 30 + v ** (self.A / 5), 70 + v ** (self.A / 5), 100 + v ** (self.A / 5)
def outputTime(current, p, NowMove, SetMove): # moveOn and stopcar and driver, need to be parameter
if current < p[0]:
return 100,True
elif current > p[0]:
return 250,False
elif current > p[1]:
return 350,False
elif current > p[2]:
return 1000,False
def outputV(current, last, time):
return abs( current - last ) / time * 1000
def watchCar(self, stopcar, moveOn, driver, M):
# program start
if self._sensorRead == True:
direction = self.vector.split( '-' )[0]
current_D = ( self.vector.split( '-' )[1] )
if current_D < 120:
try: # For the moment, start the car, distance <120, but no D, then occurs error.
self.T,STOP = self.outputTime( current=current_D,
p=self.outputP( self.last_D, current_D,
v=self.outputV( current_D, self.last_D,
self.T ) ) )
if STOP:
stopcar( direction[0] )
if moveOn and M[0] == direction[0]:
driver.move( 'F00' )
self.moveOn = False
except:
print( 'try current_D < 120 got problem' )
pass
else:
self.T = 1000
self.T = 100 if self.T == None else self.T
sensitivity = 'ms_' + str( self.T )
# figure out the distance
self.last_D = current_D
self._sensorRead = not self._sensorRead
return self.moveOn, sensitivity
from machine import PWM,Pin
from lib.COMMON.Arduino import Arduino_func
# IO5= SERVO PWM
# IO12,13 = MOTER DRVIER, when you have four wheels drive, IO14, IO15 occupied by default
# IO4 = MOTER DRVIER PWM (6612)
# IO14, IO15(trig) =HCSR04
class servo_MG996R():
# d1-gpio5 for servo, d1-gpio5 as PWM
__slots__ = 'IO', 'Level_min', 'Level_max','Straight','Left_max','Right_max'
def __init__(self, IO=5, Level_min=0, Level_max=10, Straight=70, Left_max=50, Right_max=105):
self.servo = PWM( Pin( IO ), freq=50 )
self.servo.duty( Straight )
# time.sleep( 1 ) # you may not need it
self.Min_Turn_Level = Level_min
self.Max_Turn_Level = Level_max
self.Go_Straight = Straight
self.Max_Turn_Left = Left_max
self.Max_Turn_Right = Right_max
def turn(self, value):
duty_value = list( value )
duty_value.pop( 0 )
duty_value = int( ''.join( duty_value ) )
if value[0] == 'L':
arduino_map = Arduino_func( self.Min_Turn_Level, self.Max_Turn_Level, self.Max_Turn_Left,
self.Go_Straight )
duty_value = arduino_map.map( duty_value, -1 )
elif value[0] == 'R':
arduino_map = Arduino_func( self.Min_Turn_Level, self.Max_Turn_Level, self.Go_Straight,
self.Max_Turn_Right )
duty_value = arduino_map.map( duty_value )
self.servo.duty( duty_value )
class MD_L298N():
# d6-gpio12, d7-gpio13 for motorA,
# d5-gpio14, d8-gpio15 for motorB.
def __init__(self, IO1=12, IO2=13, IO3=14, IO4=15, type='Single', Min_Level=0, Max_Level=10, Min_duty=0,
Max_duty=1023):
self.DOUBLEMOTOR = False
self.DOUBLEMOTOR_DIRECTIONAL = False
self.servo_IN1 = PWM( Pin( IO1 ), freq=50 )
self.servo_IN2 = PWM( Pin( IO2 ), freq=50 )
self.Min_Move_Level = Min_Level
self.Max_Move_Level = Max_Level
self.Min_DUTY = Min_duty
self.Max_DUTY = Max_duty
if 'Double' in type:
if type == 'Double_directional':
self.DOUBLEMOTOR = True
else:
self.DOUBLEMOTOR_DIRECTIONAL = True
self.servo_IN3 = PWM( Pin( IO3 ), freq=50 )
self.servo_IN4 = PWM( Pin( IO4 ), freq=50 )
# self.motor = self.servo.duty( 500 )
def forward(self, value):
self.servo_IN1.duty( value )
self.servo_IN2.duty( 0 )
if self.DOUBLEMOTOR:
self.servo_IN3.duty( value )
self.servo_IN4.duty( 0 )
def backward(self, value):
self.servo_IN1.duty( 0 )
self.servo_IN2.duty( value )
if self.DOUBLEMOTOR:
self.servo_IN3.duty( 0 )
self.servo_IN4.duty( value )
# OUT1,OUT2 to left motor, OUT3,OUT4 to right motor
def turnleft(self, value):
if self.DOUBLEMOTOR_DIRECTIONAL:
self.servo_IN1.duty( 0 )
self.servo_IN2.duty( value )
self.servo_IN3.duty( value )
self.servo_IN4.duty( 0 )
def turnright(self, value):
if self.DOUBLEMOTOR_DIRECTIONAL:
self.servo_IN1.duty( value )
self.servo_IN2.duty( 0 )
self.servo_IN3.duty( 0 )
self.servo_IN4.duty( value )
def move(self, value):
move_value = list( value )
move_value.pop( 0 )
move_value = int( ''.join( move_value ) )
if value[0] == 'F':
arduino_map = Arduino_func( self.Min_Move_Level, self.Max_Move_Level, self.Min_DUTY, self.Max_DUTY )
move_value = arduino_map.map( move_value )
self.forward( move_value )
elif value[0] == 'B':
arduino_map = Arduino_func( self.Min_Move_Level, self.Max_Move_Level, self.Min_DUTY, self.Max_DUTY )
move_value = arduino_map.map( move_value )
self.backward( move_value )
# print('move_value' + str(move_value))
class MD_TB6612FNG_DRV8833():
# d6-gpio12, d7-gpio13 for motorA, d2-gpio4 as PWMA
# d5-gpio14, d8-gpio15 for motorB. NO SUFFICIENT GPIO, use PWMA.
def __init__(self, in1=12, in2=13, in3=14, in4=15, pwm=4, type='Single', Min_Level=0, Max_Level=10, Min_duty=0,
Max_duty=1023):
self.DOUBLEMOTOR = False
self.DOUBLEMOTOR_DIRECTIONAL = False
self.PWMA = PWM( Pin( pwm ), freq=50 )
self.MOTORA = {'out1': Pin( in1, Pin.OUT ), 'out2': Pin( in2, Pin.OUT )}
self.Min_Move_Level = Min_Level
self.Max_Move_Level = Max_Level
self.Min_DUTY = Min_duty
self.Max_DUTY = Max_duty
if 'Double' in type:
if type == 'Double_directional':
self.DOUBLEMOTOR = True
else:
self.DOUBLEMOTOR_DIRECTIONAL = True
self.MOTORB = {'out1': Pin( in3, Pin.OUT ), 'out2': Pin( in4, Pin.OUT )}
# self.motor = self.servo.duty( 500 )
def motorSpin(self, motor,num):
if num == 1:
motor['out1'].value( 1 )
motor['out2'].value( 0 )
elif num == 0:
motor['out1'].value( 0 )
motor['out2'].value( 1 )
def forward(self, value):
self.PWMA.duty( value )
self.motorSpin( self.MOTORA,1 )
if self.DOUBLEMOTOR:
self.motorSpin(self.MOTORB, 1 )
def backward(self, value):
self.PWMA.duty( value )
self.motorSpin(self.MOTORA, 0 )
if self.DOUBLEMOTOR:
self.motorSpin(self.MOTORB, 0 )
# ONLY FOR DOUBLE MOTORS, motorA = left side wheels, motorB = right side wheels
def motor_turnleft(self, value):
self.PWMA.duty( value )
if self.DOUBLEMOTOR_DIRECTIONAL:
self.motorSpin(self.MOTORA, 0 )
self.motorSpin(self.MOTORB, 1 )
def motor_turnright(self, value):
self.PWMA.duty( value )
if self.DOUBLEMOTOR_DIRECTIONAL:
self.motorSpin(self.MOTORA, 1 )
self.motorSpin(self.MOTORB, 0 )
def move(self, value):
move_value = list( value )
move_value.pop( 0 )
move_value = int( ''.join( move_value ) )
if value[0] == 'F':
arduino_map = Arduino_func( self.Min_Move_Level, self.Max_Move_Level, self.Min_DUTY, self.Max_DUTY )
move_value = arduino_map.map( move_value )
self.forward( move_value )
elif value[0] == 'B':
arduino_map = Arduino_func( self.Min_Move_Level, self.Max_Move_Level, self.Min_DUTY, self.Max_DUTY )
move_value = arduino_map.map( move_value )
self.backward( move_value )
# print('move_value' + str(move_value))
from machine import PWM,Pin
from lib.COMMON.Arduino import Arduino_func
class MD_TB6612FNG_DRV8833():
# d6-gpio12, d7-gpio13 for motorA, d2-gpio4 as PWMA
# d5-gpio14, d8-gpio15 for motorB. NO SUFFICIENT GPIO, use PWMA.
def __init__(self, in1=12, in2=13, in3=14, in4=15, pwm=4, type='Single', Min_Level=0, Max_Level=10, Min_duty=0,
Max_duty=1023):
self.DOUBLEMOTOR = False
self.DOUBLEMOTOR_DIRECTIONAL = False
self.PWMA = PWM( Pin( pwm ), freq=50 )
self.MOTORA = {'out1': Pin( in1, Pin.OUT ), 'out2': Pin( in2, Pin.OUT )}
self.Min_Move_Level = Min_Level
self.Max_Move_Level = Max_Level
self.Min_DUTY = Min_duty
self.Max_DUTY = Max_duty
if 'Double' in type:
if type == 'Double_directional':
self.DOUBLEMOTOR = True
else:
self.DOUBLEMOTOR_DIRECTIONAL = True
self.MOTORB = {'out1': Pin( in3, Pin.OUT ), 'out2': Pin( in4, Pin.OUT )}
# self.motor = self.servo.duty( 500 )
def motorSpin(self, motor,num):
if num == 1:
motor['out1'].value( 1 )
motor['out2'].value( 0 )
elif num == 0:
motor['out1'].value( 0 )
motor['out2'].value( 1 )
def forward(self, value):
self.PWMA.duty( value )
self.motorSpin( self.MOTORA,1 )
if self.DOUBLEMOTOR:
self.motorSpin(self.MOTORB, 1 )
def backward(self, value):
self.PWMA.duty( value )
self.motorSpin(self.MOTORA, 0 )
if self.DOUBLEMOTOR:
self.motorSpin(self.MOTORB, 0 )
# ONLY FOR DOUBLE MOTORS, motorA = left side wheels, motorB = right side wheels
def motor_turnleft(self, value):
self.PWMA.duty( value )
if self.DOUBLEMOTOR_DIRECTIONAL:
self.motorSpin(self.MOTORA, 0 )
self.motorSpin(self.MOTORB, 1 )
def motor_turnright(self, value):
self.PWMA.duty( value )
if self.DOUBLEMOTOR_DIRECTIONAL:
self.motorSpin(self.MOTORA, 1 )
self.motorSpin(self.MOTORB, 0 )
def move(self, value):
move_value = list( value )
move_value.pop( 0 )
move_value = int( ''.join( move_value ) )
if value[0] == 'F':
arduino_map = Arduino_func( self.Min_Move_Level, self.Max_Move_Level, self.Min_DUTY, self.Max_DUTY )
move_value = arduino_map.map( move_value )
self.forward( move_value )
elif value[0] == 'B':
arduino_map = Arduino_func( self.Min_Move_Level, self.Max_Move_Level, self.Min_DUTY, self.Max_DUTY )
move_value = arduino_map.map( move_value )
self.backward( move_value )
# print('move_value' + str(move_value))
from machine import PWM,Pin
from lib.COMMON.Arduino import Arduino_func
class MD_L298N():
# d6-gpio12, d7-gpio13 for motorA,
# d5-gpio14, d8-gpio15 for motorB.
def __init__(self, IO1=12, IO2=13, IO3=14, IO4=15, type='Single', Min_Level=0, Max_Level=10, Min_duty=0,
Max_duty=1023):
self.DOUBLEMOTOR = False
self.DOUBLEMOTOR_DIRECTIONAL = False
self.servo_IN1 = PWM( Pin( IO1 ), freq=50 )
self.servo_IN2 = PWM( Pin( IO2 ), freq=50 )
self.Min_Move_Level = Min_Level
self.Max_Move_Level = Max_Level
self.Min_DUTY = Min_duty
self.Max_DUTY = Max_duty
if 'Double' in type:
if type == 'Double_directional':
self.DOUBLEMOTOR = True
else:
self.DOUBLEMOTOR_DIRECTIONAL = True
self.servo_IN3 = PWM( Pin( IO3 ), freq=50 )
self.servo_IN4 = PWM( Pin( IO4 ), freq=50 )
# self.motor = self.servo.duty( 500 )
def forward(self, value):
self.servo_IN1.duty( value )
self.servo_IN2.duty( 0 )
if self.DOUBLEMOTOR:
self.servo_IN3.duty( value )
self.servo_IN4.duty( 0 )
def backward(self, value):
self.servo_IN1.duty( 0 )
self.servo_IN2.duty( value )
if self.DOUBLEMOTOR:
self.servo_IN3.duty( 0 )
self.servo_IN4.duty( value )
# OUT1,OUT2 to left motor, OUT3,OUT4 to right motor
def turnleft(self, value):
if self.DOUBLEMOTOR_DIRECTIONAL:
self.servo_IN1.duty( 0 )
self.servo_IN2.duty( value )
self.servo_IN3.duty( value )
self.servo_IN4.duty( 0 )
def turnright(self, value):
if self.DOUBLEMOTOR_DIRECTIONAL:
self.servo_IN1.duty( value )
self.servo_IN2.duty( 0 )
self.servo_IN3.duty( 0 )
self.servo_IN4.duty( value )
def move(self, value):
move_value = list( value )
move_value.pop( 0 )
move_value = int( ''.join( move_value ) )
if value[0] == 'F':
arduino_map = Arduino_func( self.Min_Move_Level, self.Max_Move_Level, self.Min_DUTY, self.Max_DUTY )
move_value = arduino_map.map( move_value )
self.forward( move_value )
elif value[0] == 'B':
arduino_map = Arduino_func( self.Min_Move_Level, self.Max_Move_Level, self.Min_DUTY, self.Max_DUTY )
move_value = arduino_map.map( move_value )
self.backward( move_value )
# print('move_value' + str(move_value))
from machine import PWM,Pin
from lib.COMMON.Arduino import Arduino_func
class MD_TB6612FNG():
# d6-gpio12, d7-gpio13 for motorA, d2-gpio4 as PWMA
# d5-gpio14, d8-gpio15 for motorB. NO SUFFICIENT GPIO, use PWMA.
def __init__(self, in1=12, in2=13, in3=14, in4=15, pwm=4, type='Single', Min_Level=0, Max_Level=10, Min_duty=0,
Max_duty=1023):
self.DOUBLEMOTOR = False
self.DOUBLEMOTOR_DIRECTIONAL = False
self.PWMA = PWM( Pin( pwm ), freq=50 )
self.MOTORA = {'out1': Pin( in1, Pin.OUT ), 'out2': Pin( in2, Pin.OUT )}
self.Min_Move_Level = Min_Level
self.Max_Move_Level = Max_Level
self.Min_DUTY = Min_duty
self.Max_DUTY = Max_duty
if 'Double' in type:
if type == 'Double_directional':
self.DOUBLEMOTOR = True
else:
self.DOUBLEMOTOR_DIRECTIONAL = True
self.MOTORB = {'out1': Pin( in3, Pin.OUT ), 'out2': Pin( in4, Pin.OUT )}
# self.motor = self.servo.duty( 500 )
def motorSpin(self, motor,num):
if num == 1:
motor['out1'].value( 1 )
motor['out2'].value( 0 )
elif num == 0:
motor['out1'].value( 0 )
motor['out2'].value( 1 )
def forward(self, value):
self.PWMA.duty( value )
self.motorSpin( self.MOTORA,1 )
if self.DOUBLEMOTOR:
self.motorSpin(self.MOTORB, 1 )
def backward(self, value):
self.PWMA.duty( value )
self.motorSpin(self.MOTORA, 0 )
if self.DOUBLEMOTOR:
self.motorSpin(self.MOTORB, 0 )
# ONLY FOR DOUBLE MOTORS, motorA = left side wheels, motorB = right side wheels
def motor_turnleft(self, value):
self.PWMA.duty( value )
if self.DOUBLEMOTOR_DIRECTIONAL:
self.motorSpin(self.MOTORA, 0 )
self.motorSpin(self.MOTORB, 1 )
def motor_turnright(self, value):
self.PWMA.duty( value )
if self.DOUBLEMOTOR_DIRECTIONAL:
self.motorSpin(self.MOTORA, 1 )
self.motorSpin(self.MOTORB, 0 )
def move(self, value):
move_value = list( value )
move_value.pop( 0 )
move_value = int( ''.join( move_value ) )
if value[0] == 'F':
arduino_map = Arduino_func( self.Min_Move_Level, self.Max_Move_Level, self.Min_DUTY, self.Max_DUTY )
move_value = arduino_map.map( move_value )
self.forward( move_value )
elif value[0] == 'B':
arduino_map = Arduino_func( self.Min_Move_Level, self.Max_Move_Level, self.Min_DUTY, self.Max_DUTY )
move_value = arduino_map.map( move_value )
self.backward( move_value )
# print('move_value' + str(move_value))
from machine import PWM,Pin
from lib.COMMON.Arduino import Arduino_func
class servo_MG996R():
# d1-gpio5 for servo, d1-gpio5 as PWM
__slots__ = 'IO', 'Level_min', 'Level_max','Straight','Left_max','Right_max'
def __init__(self, IO=5, Level_min=0, Level_max=10, Straight=70, Left_max=50, Right_max=105):
self.servo = PWM( Pin( IO ), freq=50 )
self.servo.duty( Straight )
# time.sleep( 1 ) # you may not need it
self.Min_Turn_Level = Level_min
self.Max_Turn_Level = Level_max
self.Go_Straight = Straight
self.Max_Turn_Left = Left_max
self.Max_Turn_Right = Right_max
def turn(self, value):
duty_value = list( value )
duty_value.pop( 0 )
duty_value = int( ''.join( duty_value ) )
if value[0] == 'L':
arduino_map = Arduino_func( self.Min_Turn_Level, self.Max_Turn_Level, self.Max_Turn_Left,
self.Go_Straight )
duty_value = arduino_map.map( duty_value, -1 )
elif value[0] == 'R':
arduino_map = Arduino_func( self.Min_Turn_Level, self.Max_Turn_Level, self.Go_Straight,
self.Max_Turn_Right )
duty_value = arduino_map.map( duty_value )
self.servo.duty( duty_value )
from lib.COMMON.timer import Timer
class Car():
def __init__(self, driver, steering, watchCar=None):
if watchCar!=None:
self.watchCar = watchCar
else:
self.watchCar = 0
self.driver = driver
self.servo = steering
self.RunAfter = Timer.RunAfter
self.engineData = {}
self.lastM = 0
self.stepDown = 0
self.listStep = []
self.lastM1 = ''
self.lastData = ''
self.sensitivity = 'ms_1000'
self.D, self.M = '', ''
self.forwardNO, self.backwardNO = 1, 1
self.moveOn = 1
self.A, self.last_D, self.current_D = 3, 0, 0
self.T = 250
self.delay = 't_400'
def stepnum(self, num1, num2, M1):
list1 = []
j = num1 - num2
if j > 0:
for i in range( abs( j ) ):
list1.append( M1 + str( num2 + i ) )
list1.append( M1 + str( num1 ) )
else:
for i in range( abs( j ) ):
list1.append( M1 + str( num1 + i ) )
list1.append( M1 + str( num2 ) )
return list1
def stopcar(self, direction):
if direction == 'F':
self.forwardNO = 0
elif direction == 'B':
self.backwardNO = 0
def engine(self, data, soloRemote):
D, M = data
self.engineData.update( {'direction': D, 'movement': M} if soloRemote else {
'direction': data} if data[0] in ('L', 'R') else {'movement': data} )
self.servo.turn( self.engineData['direction'] )
if (self.forwardNO and self.engineData['movement'][0] == 'F') or (
self.backwardNO and self.engineData['movement'][0] == 'B') or self.forwardNO == 0 and \
self.engineData['movement'][
0] == 'B' or self.backwardNO == 0 and self.engineData['movement'][0] == 'F':
self.driver.move( self.engineData['movement'] )
self.forwardNO, self.backwardNO, self.moveOn = 1, 1, 1
def go(self, data, soloRemote=1):
if soloRemote == 0:
pass
else:
if data == 'offline':
self.driver.move( 'F00' )
self.servo.turn( 'L00' )
elif data != self.lastData and data != None:
self.D, self.M = data.split( '-' )
self.M, M1 = self.M[1:], self.M[:1] # M1 is F or B
if M1 != self.lastM1:
self.lastM = 0
if int( self.M ) - (self.lastM) > 1:
self.listStep = self.stepnum( int( self.M ) - 1, self.lastM, M1 )
self.listStep.pop( 0 ) # already ranoll
self.M = '%02d' % (self.lastM + 1)
self.stepDown = 1
self.lastM = int( self.M )
self.M = M1 + self.M
self.engine( (self.D, self.M), soloRemote=soloRemote )
else:
self.lastM = int( self.M )
self.M = M1 + self.M
self.stepDown = 0
# print( 'stepped The running is: ', self.D, M )
self.engine( (self.D, self.M), soloRemote=soloRemote )
try:
# del (globals()[delay]) # in order to run again in first
self.delay=...
except:
pass
self.lastM1 = M1
self.lastData = data
elif self.stepDown:
try:
if self.RunAfter( self.delay ): # in order to slow down gryo action.
M1, self.M = self.listStep[0][:1], self.listStep[0][1:]
self.M = M1 + '%02d' % (int( self.M ) + 1)
self.listStep.pop( 0 )
self.engine( (self.D, self.M), soloRemote=soloRemote )
except:
self.stepDown = 0
if self.watchCar != 0:
if self.RunAfter( self.sensitivity ):
self.sensitivity,self.moveOn = self.watchCar( self.stopcar, self.moveOn, self.driver,self.M)
from machine import Pin
from lib.COMMON.timer import Timer
import time
from lib.COMMON.management import Sword_AP
class Sword(): # using Timer
def __init__(self, IO, conn, func,force):
self.conn=conn
self.func=func
self.force=force
from lib.COMMON.management import SWORD_Lhand
self.LhandID = SWORD_Lhand.ID
del (SWORD_Lhand)
from lib.COMMON.management import SWORD_Rhand
self.RhandID = SWORD_Rhand.ID
del (SWORD_Rhand)
from lib.COMMON.management import SWORD_target
self.targetID = SWORD_target.ID
del (SWORD_target)
self.IO = {'RELAY': Pin( IO.RELAY_OUT, Pin.OUT )}
self.RunAfter = Timer.RunAfter
self.L1,self.runTarget= 0, 0
# self.FP,self.BP,self.LP,self.RP=0,0,0,0
self.runON=1 # force will set it 1
self.spinData,self.spinCount=20,0
self.CTRLcar=0
self.LaserON = 0
self.F,self.B,self.L,self.R=0,0,0,0
self.webrepl=0
self.webreplON=1
self.doneTARGET = 0
self.TH1=1 # threshold 1, for activate the car
def runWebrepl(self, v, w, x, y):
# self.spinCount = 0 # reset spin gyro
if self.webreplON:
v += 1
if v >40:
self.runON = 0
self.clickRelay( 1.2 )
self.webrepl = 1
elif (v > 0 and w == 0 and x == 0 and y == 0):
print ('ready for webrepl :',v) # for test
return v
# run the next codes, you always have chance to switch webrepl, otherwise, Once a Chance!
else:
self.webreplON=0
def clickRelay(self,t):
self.IO['RELAY'].value(1) # set relay ON, for 0.2
time.sleep(t)
self.IO['RELAY'].value(0)
print ('clickRelay:',t )
def activate_car(self, GYRO):
data=int(GYRO[9:11])
# if 1: # for test
if self.TH1 and data < self.spinData: # officially
self.spinCount += 1
print ('spinCount',self.spinCount)
# self.F, self.B, self.L, self.R = 0, 0, 0, 0 # no need to reset, because if spin, no chance to activate webrepl
self.spinData = data
if self.TH1 and self.spinCount>45:
self.conn.updateSwitch( ID=self.LhandID, cmd=Sword_AP.CMD.Lhand )
self.conn.updateSwitch( ID=self.RhandID, cmd=Sword_AP.CMD.Rhand2 )
self.TH1 = 0
elif self.TH1==0 and self.RunAfter('wait_3000'):
print ('\nactivate_car\n')
self.clickRelay(1.2)
# self.TH1 = 1
return 1
else:
return 0
def activate_target(self, GYRO):
TURN, GO = GYRO[0], GYRO[4]
i, j = int( GYRO[1:3] ) , int( GYRO[5:7] )
# if self.L3 or 1: # for test
if self.runTarget : # officially
if self.doneTARGET==0 and self.RunAfter( 'ii_3000' ):
print ('run target')
self.conn.updateSwitch( ID=self.targetID, cmd=Sword_AP.CMD.target ) # activate target
# self.conn.updateSwitch(ID=self.LhandID, cmd=Sword_AP.CMD.Lhand) # for test, because i need a hint
print(self.conn.switch_dict, '\n')
self.doneTARGET=1
elif self.doneTARGET and self.RunAfter('ii_10000'):
# self.conn.Main() # To refresh conn.Ldata_str, make it not be 'Activate' , this is for the old code?? YES i think it's useless now, it's in "while" of main.py
self.runTarget = 0
self.clickRelay(1.2) # shutdown the sword.
self.force.gotForce = 0 #MC can be input now
self.doneTARGET = 0
self.LaserON = 0
self.runON=1
self.spinCount=0
# self.FP,self.BP,self.LP,self.RP=0,0,0,0
print ('Target Mission has done, showdown the sword.')
# elif self.FP and self.BP and self.LP and self.RP or 1: # for test
elif self.FP and self.BP and self.LP and self.RP: # officially
self.FP, self.BP, self.LP, self.RP = 0, 0, 0, 0
self.conn.updateSwitch( ID=self.RhandID, cmd=Sword_AP.CMD.Rhand2 ) # the hand which hold the sword_srv, just activate 3s
# self.conn.updateSwitch(ID=self.LhandID, cmd=Sword_AP.CMD.Lhand) # for test, because i need to see the both when im testing
self.runTarget = 1
# elif GO == 'F' and j > 7 or 1: # for test
elif GO == 'F' and j > 7 : # officially
self.F = self.runWebrepl(self.F, self.B, self.L, self.R) # keep gyro in the direction about 50*200ms
if self.FP == 0:
print('\nFP=1')
self.func[0](self.conn)
self.FP = 1
# elif GO == 'B' and j > 7 or 1 : # for test
elif GO == 'B' and j > 7: # officially
self.B = self.runWebrepl(self.B, self.F, self.L, self.R)
if self.BP == 0:
print('\nBP=1')
self.func[1](self.conn)
self.BP = 1
# elif l == 'L' and i > 7 or 1:# for test
elif TURN == 'L' and i > 7:# officially
self.L = self.runWebrepl(self.L, self.B, self.F, self.R)
if self.LP == 0:
print('\nLP=1')
self.func[2](self.conn)
self.LP = 1
# elif l == 'R' and i > 7 or 1:# for test
elif TURN == 'R' and i > 7:# officially
self.R = self.runWebrepl(self.R, self.B, self.L, self.F)
if self.RP == 0:
print('\nRP=1')
self.func[3](self.conn)
self.RP = 1
def run(self,GYRO):
# if 1: # for test
if self.runON: # for officially, why do i need runON, when go for webrepl, self.runON=0
if GYRO==None: # when no install GYRO, test program
GYRO='L00-F00-S00'
if self.LaserON==0:
self.clickRelay(0.2)
# self.clickRelay(0.2, self-locked) # new plan, use globle var to make sure it run for once in time
self.LaserON=1
self.FP, self.BP, self.LP, self.RP = 0, 0, 0, 0
if self.L1:
if self.RunAfter('L1_200'):
self.activate_target(GYRO)
self.CTRLcar=self.activate_car(GYRO)
elif self.L1==0: # the hand which touch electrical light,
self.conn.updateSwitch( ID=self.LhandID, cmd=Sword_AP.CMD.Lhand ) # just activate 1s
self.L1=1
print('L1 activated')
from lib.SRV.sword_srv.SWORD import Sword
from lib.DRV.GYRO.MPU6050_JY61P import MPU6050_JY61P
from lib.COMMON.management import Sword_AP as Main, SWORD_Rhand
from lib.COMMON.MMG_server import microMSG_server
from lib.COMMON.force import Force
def funcSword(P=None):
# print('update 1_2:1-1')
P.updateSwitch( ID=SWORD_Rhand.ID, cmd=Main.CMD.Rhand1 ) # the hand which hold the sword_srv.
def turnONsword(P=None):# set func1,func2,func3,func4, func1 will run as the final target
pass
# sword.runON=1
conn = microMSG_server( WIFI=Main )
mpu6050 = MPU6050_JY61P()
force = Force(IO=Main.IO, funclist=(turnONsword,))
sword = Sword( IO=Main.IO, conn=conn, func=(funcSword, funcSword, funcSword, funcSword), force=force ) # [0]=F,[1]=B,[2]=L,[3]=R
# mpu6050.Str_AngleDATA # cannot cancel this. or it will return "None" afterwards, why ?
while True:
conn.Main()
if 1: # for test
# if force.forceON: # for officially
# mpu6050.Str_AngleDATA
sword.run(mpu6050.Str_AngleDATA)
if sword.CTRLcar or sword.webrepl: #spin clockwise 30 time, car. down over 9 times
print ('While break!')
break
if sword.CTRLcar: # when webrepl=1, program stops for webrepl
print ('\n\n\n\nNOW, control car.')
# del(conn, sword, Sword, force, Force, microMSG_server, Main, SWORD_Rhand, mpu6050, MPU6050_JY61P) # can't delete mpu6050, important!!!
del(conn, sword, Sword, force, Force, microMSG_server, Main, SWORD_Rhand)
import gc
gc.collect()
exec(open('./gyroCTRLcar.py').read(),globals())
'''
1. activate sword
force 7s in 10s, CMD.Lhand
2. activate target
go forward tilt more than level7, CMD.Rhand1
go backward tilt more than level7, CMD.Rhand1
go left tilt more than level7, CMD.Rhand1
go right tilt more than level7, CMD.Rhand1
2.1 activate car
spin clockwise about 31 times, CMD.Rhand2 and CMD.Lhand
3. activate webpel
tilt more than level7 for ten seconds.
'''
import machine
machine.reset()
import network
ap = network.WLAN(network.AP_IF) # create access-point interface
ap.config(essid='sword_ap', password='123456789') # set the ESSID of the access point
ap.active(1) # activate the interface
# POWER STRIP TEST ON ESP8266, format is 'gpio1_0,gpio2_1,gpio3_0,gpio4_1'
from lib.COMMON.management import powerStrip_AP as Main,powerStrip_1 as No1, powerStrip_2 as No2
from lib.COMMON.MMG_server import microMSG_server
conn = microMSG_server( WIFI=Main,test=True )
# CHANGE THE SWITCH STATUS, IT WILL READ CLIENT REQUESTS IN EVERY 300MS, AND SEND THE LATEST STATUS
# conn.updateSwitch( No1, '2,3-1' ) # 1-0 means slot1 is off
# conn.updateSwitch( No2, '2,3-1' ) # 1-0 means slot2 is off
# conn.updateSwitch( Main, (Main.S1=True, Main.S2) ) # 1-0 means slot1 is off
while True:
conn.Main()
if conn.RunAfter('abc_3000'):
conn.updateSwitch( No1.ID, Main.CMD.No1 ) # 1-0 means slot1 is off
conn.updateSwitch( No2.ID, Main.CMD.No2 ) # 1-0 means slot2 is off
print('\n', conn.switch_dict )
# POWER STRIP TEST ON ESP8266, format is 'gpio1_0,gpio2_1,gpio3_0,gpio4_1'
from lib.COMMON.management import Sword_AP as Main, SWORD_Rhand as Rhand,SWORD_Lhand as Lhand, SWORD_target as target
from lib.COMMON.MMG_server import microMSG_server
conn = microMSG_server( WIFI=Main,test=True )
while True:
conn.Main()
if conn.RunAfter('abc_3000'):
conn.updateSwitch( Rhand.ID, Main.CMD.Rhand1 ) # 1-0 means slot1 is off
conn.updateSwitch(Rhand.ID, Main.CMD.Rhand2) # 1-0 means slot1 is off
conn.updateSwitch( Lhand.ID, Main.CMD.Lhand ) # 1-0 means slot2 is off
conn.updateSwitch( target.ID, Main.CMD.target ) # 1-0 means slot2 is off
print('\n', conn.switch_dict )
#HOWTO replace SWORD.py
# 1. upload SWORD.py
# 2. Run the codes in webrepl
os.remove('./lib/SRV/sword_srv/SWORD.py')
os.rename('SWORD.py', './lib/SRV/sword_srv/SWORD.py' )
os.rename('force.py', './lib/COMMON/force.py' )
./lib/DRV/GYRO/MPU6050_JY61P.py
./lib/COMMON/MMG_client.py
os.listdir('./lib/COMMON')
os.listdir('./lib/CLI/gyroctrl_cli')
os.listdir('./lib/CLI/switch_cli')
exec(open('./gyroCTRLcar.py').read(),globals())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment