Fix: 修复MicroPython MixGoAI和Microbit下一些py异常同时格式化代码

This commit is contained in:
王立帮
2024-12-03 10:36:49 +08:00
parent e6c9a30bdc
commit ee04dadb66
71 changed files with 3265 additions and 2382 deletions

View File

@@ -1,29 +1,34 @@
import usocket as socket
import ustruct as struct
import network,time,board
#from ubinascii import hexlify
import network, time, board
# from ubinascii import hexlify
import ujson as json
from machine import UART
wifi_en=board.pin(19,board.GPIO.OUT)
board.register(18,board.FPIOA.UART2_TX)
board.register(17,board.FPIOA.UART2_RX)
wifi_en = board.pin(19, board.GPIO.OUT)
board.register(18, board.FPIOA.UART2_TX)
board.register(17, board.FPIOA.UART2_RX)
def wifi_enable(en):
global wifi_en
wifi_en.value(en)
def wifi_reset():
global uart
wifi_enable(0)
time.sleep_ms(200)
wifi_enable(1)
time.sleep(2)
uart = UART(UART.UART2,115200,timeout=1000, read_buf_len=4096)
uart = UART(UART.UART2, 115200, timeout=1000, read_buf_len=4096)
tmp = uart.read()
uart.write("AT+UART_CUR=921600,8,1,0,0\r\n")
print(uart.read())
uart = UART(UART.UART2,921600,timeout=1000, read_buf_len=10240) # important! baudrate too low or read_buf_len too small will loose data
uart = UART(
UART.UART2, 921600, timeout=1000, read_buf_len=10240
) # important! baudrate too low or read_buf_len too small will loose data
uart.write("AT\r\n")
tmp = uart.read()
print(tmp)
@@ -36,46 +41,62 @@ def wifi_reset():
return None
return nic
def get_data_dict(d):
result = {"datastreams":[]}
result = {"datastreams": []}
for x in d:
result["datastreams"].append({"id":x,"datapoints":[{"value":d[x]}]})
result["datastreams"].append({"id": x, "datapoints": [{"value": d[x]}]})
return result
def pubData(value, state):
def pubData(value, state):
value = get_data_dict(value)
jdata = json.dumps(value)
jlen = len(jdata)
bdata = bytearray(jlen+3)
bdata[0] = 1 # publish data in type of json
bdata[1] = int(jlen / 256) # data lenght
bdata[2] = jlen % 256 # data lenght
bdata[3:jlen+4] = jdata.encode('ascii') # json data
bdata = bytearray(jlen + 3)
bdata[0] = 1 # publish data in type of json
bdata[1] = int(jlen / 256) # data lenght
bdata[2] = jlen % 256 # data lenght
bdata[3 : jlen + 4] = jdata.encode("ascii") # json data
if state:
print(value)
print(bdata)
return bdata
def do_connect(account,password):
nic=wifi_reset()
if not nic:
raise Exception("[Cool.AI]:WiFi init fail")
nic.connect(account,password)
nic.ifconfig()
def do_connect(account, password):
nic = wifi_reset()
if not nic:
raise Exception("[Cool.AI]:WiFi init fail")
nic.connect(account, password)
nic.ifconfig()
def init_MQTT_client(sid, address, cid, api, topic, callback):
client = MQTTClient(sid, address, 6002, cid, api)
client.set_callback(callback)
client.connect()
client.subscribe(bytes(topic, 'utf-8'))
client.subscribe(bytes(topic, "utf-8"))
return client
class MQTTException(Exception):
pass
class MQTTClient:
def __init__(self, client_id, server, port=0, user=None, password=None, keepalive=0,ssl=False, ssl_params={}):
def __init__(
self,
client_id,
server,
port=0,
user=None,
password=None,
keepalive=0,
ssl=False,
ssl_params={},
):
if port == 0:
port = 8883 if ssl else 1883
self.client_id = client_id
@@ -102,7 +123,7 @@ class MQTTClient:
sh = 0
while 1:
b = self.sock.read(1)[0]
n |= (b & 0x7f) << sh
n |= (b & 0x7F) << sh
if not b & 0x80:
return n
sh += 7
@@ -124,6 +145,7 @@ class MQTTClient:
print(self.addr)
if self.ssl:
import ussl
self.sock = ussl.wrap_socket(self.sock, **self.ssl_params)
msg = bytearray(b"\x10\0\0\x04MQTT\x04\x02\0\0")
msg[1] = 10 + 2 + len(self.client_id)
@@ -140,7 +162,7 @@ class MQTTClient:
msg[9] |= 0x4 | (self.lw_qos & 0x1) << 3 | (self.lw_qos & 0x2) << 3
msg[9] |= self.lw_retain << 5
self.sock.write(msg)
#print(hex(len(msg)), hexlify(msg, ":"))
# print(hex(len(msg)), hexlify(msg, ":"))
self._send_str(self.client_id)
if self.lw_topic:
self._send_str(self.lw_topic)
@@ -161,7 +183,7 @@ class MQTTClient:
def ping(self):
self.sock.write(b"\xc0\0")
def publish(self, msg, is_print=True, topic='$dp', retain=False, qos=0):
def publish(self, msg, is_print=True, topic="$dp", retain=False, qos=0):
msg = pubData(msg, is_print)
pkt = bytearray(b"\x30\0\0\0")
pkt[0] |= qos << 1 | retain
@@ -170,12 +192,12 @@ class MQTTClient:
sz += 2
assert sz < 2097152
i = 1
while sz > 0x7f:
pkt[i] = (sz & 0x7f) | 0x80
while sz > 0x7F:
pkt[i] = (sz & 0x7F) | 0x80
sz >>= 7
i += 1
pkt[i] = sz
#print(hex(len(pkt)), hexlify(pkt, ":"))
# print(hex(len(pkt)), hexlify(pkt, ":"))
self.sock.write(pkt, i + 1)
self._send_str(topic)
if qos > 0:
@@ -202,7 +224,7 @@ class MQTTClient:
pkt = bytearray(b"\x82\0\0\0")
self.pid += 1
struct.pack_into("!BH", pkt, 1, 2 + 2 + len(topic) + 1, self.pid)
#print(hex(len(pkt)), hexlify(pkt, ":"))
# print(hex(len(pkt)), hexlify(pkt, ":"))
self.sock.write(pkt)
self._send_str(topic)
self.sock.write(qos.to_bytes(1, "little"))
@@ -210,7 +232,7 @@ class MQTTClient:
op = self.wait_msg()
if op == 0x90:
resp = self.sock.read(4)
#print(resp)
# print(resp)
assert resp[1] == pkt[2] and resp[2] == pkt[3]
if resp[3] == 0x80:
raise MQTTException(resp[3])
@@ -227,12 +249,12 @@ class MQTTClient:
return None
if res == b"":
raise OSError(-1)
if res == b"\xd0": # PINGRESP
if res == b"\xd0": # PINGRESP
sz = self.sock.read(1)[0]
assert sz == 0
return None
op = res[0]
if op & 0xf0 != 0x30:
if op & 0xF0 != 0x30:
return op
sz = self._recv_len()
topic_len = self.sock.read(2)