Skip to content

Instantly share code, notes, and snippets.

@ryanswanstrom
Created January 24, 2019 01:23
Show Gist options
  • Save ryanswanstrom/0dca9ee88e70ba52ba1ccb5c47cb724c to your computer and use it in GitHub Desktop.
Save ryanswanstrom/0dca9ee88e70ba52ba1ccb5c47cb724c to your computer and use it in GitHub Desktop.
import urequests as requests
#common functions
def mb_safe(func):
def _wrapper(*args, **kwargs):
try:
return func(*args, **kwargs)
except Exception as e:
print('===== mb_safe: ' + str(e))
return ''
return _wrapper
#create iot module
import uos
uos.chdir('/flash/flash')
with open("iot.py", "w") as f:
f.write("")
import ujson
import iot
import time
import codey
import urequests
from codey_wlan_board import codey_wlan
# 云列表请求 域名
iot_list_request_domain = '[iotCloudListDomain]'
iot_weather_request_domin = '[iotWeatherDomain]';
iot_air_request_domain = '[iotAirDomain]';
def __iot_get_request_header():
return {
'content-type': 'application/json; charset=utf-8',
'uid': '[uId]',
'devicetype': '[deviceType]',
'deviceid': '[deviceId]'
}
def __iot_get(request_url):
if not codey.wifi_is_connected():
return ''
res = urequests.request('GET', request_url, headers = __iot_get_request_header()).json()
return res['data']
def __iot_post(request_url, post_data):
if not codey.wifi_is_connected():
return ''
res = urequests.request('POST', request_url, headers = __iot_get_request_header(), data = post_data).json()
return res['data']
# 添加数据项
@mb_safe
def iot_list_add(name, data):
post_data = ujson.dumps({ "listName": name, "data": data})
return __iot_post(iot_list_request_domain + 'meos/postcloudlist', post_data)
iot.list_add = iot_list_add
# 得到 index 指向数据项
@mb_safe
def iot_list_index(name, index):
req_type = 'index'
req_index = 0
if index == 'random':
req_type = 'random'
elif index == 'last':
req_type = 'last'
else:
index = int(index)
if index > 0:
req_index = index - 1
else:
return ''
res = __iot_get(iot_list_request_domain + 'meos/getCloudListItemByIndex?listName=' + name + '&type=' + req_type + '&index=' + str(req_index))
return res['itemData']['data']
iot.list_index = iot_list_index
# 获取云列表长度
@mb_safe
def iot_list_length(name):
res = __iot_get(iot_list_request_domain + 'meos/getcloudlistlen?listName=' + name)
return int(res['listLen'])
iot.list_length = iot_list_length
# 获取天气信息想
# city_code: 城市编码
# data_type: 获取数据类型
@mb_safe
def iot_weather(city_code, data_type):
if not codey.wifi_is_connected():
return ''
request_url = iot_weather_request_domin + 'getweather?woeid=' + str(city_code) + '&type=' + str(data_type)
res = urequests.request('GET', request_url)
text = res.text
if int(data_type) <= 3:
return int(text)
return text
iot.weather = iot_weather
# 获取空气信息
# cid: 检查站id
# arg: 需要查询的信息(pm25, so2等)
@mb_safe
def iot_air(cid, arg):
if not codey.wifi_is_connected():
return ''
post_data = ujson.dumps({ "cid": cid, "arg": arg})
res = urequests.request('POST', iot_air_request_domain + 'air/getone', headers = __iot_get_request_header(), data = post_data)
text = res.text
return float(text)
iot.air = iot_air
import codey
import rocky
def on_button_callback2():
codey.face(
'1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1'
'0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0'
'0 1 1 1 0 0 0 0 0 0 0 0 0 0 0 0'
'0 1 0 0 0 1 1 1 0 0 0 0 0 1 1 1'
'0 1 1 1 0 0 1 0 0 1 1 1 0 1 0 1'
'0 0 0 1 0 0 1 0 0 1 0 1 0 1 1 1'
'0 1 1 1 0 0 1 0 0 1 1 1 0 1 0 0'
'1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1')
rocky.stop()
codey.wifi('GooF-phone', '12345678')
if codey.wifi_is_connected():
codey.say('yeah.wav', True)
codey.show('yes', wait = False)
else:
codey.say('angry.wav', True)
codey.show('no', wait = False)
codey.on_button('C', on_button_callback2)
def on_button_callback():
codey.face(
'1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1'
'0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0'
'0 1 1 1 0 0 0 0 0 0 0 1 1 1 0 0'
'0 1 0 1 0 1 1 1 1 1 0 1 0 1 0 0'
'0 1 1 1 0 0 0 1 0 0 0 1 1 1 0 0'
'0 1 0 0 0 0 0 1 0 0 0 1 1 0 0 0'
'0 1 0 0 0 0 0 1 0 0 0 1 0 1 0 0'
'1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1')
codey.say('annoyed.wav')
rocky.forward(60, 0.6)
rocky.backward(20, 0.5)
if codey.wifi_is_connected():
res = requests.get(url='http://ryanstestfuncapp.azurewebsites.net/api/mbot?symbol=msft')
if res:
codey.show('Microsoft stock price is ' + res.text)
else:
codey.show('apis bad')
else:
codey.show('not con')
codey.on_button('A', on_button_callback)
def on_button_callback1():
codey.face(
'1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1'
'0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0'
'0 0 1 1 1 0 0 1 0 0 1 1 1 0 0 0'
'0 0 1 0 1 0 0 0 0 0 1 0 1 0 0 0'
'0 0 1 1 1 0 0 1 0 0 1 1 1 0 0 0'
'0 0 1 1 0 0 0 1 0 0 1 0 0 0 0 0'
'0 0 1 0 1 0 0 1 0 0 1 0 0 0 0 0'
'1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1')
for count in range(2):
codey.play_note(60, 0.25)
rocky.turn_left(50, 0.5)
rocky.forward(50, 0.5)
rocky.turn_right(50, 0.6)
rocky.backward(50, 0.5)
codey.on_button('B', on_button_callback1)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment