Merge branch 'master' into boards

This commit is contained in:
王立帮
2024-10-28 09:03:32 +08:00
4 changed files with 223 additions and 223 deletions

View File

@@ -1,217 +1,217 @@
""" """
Bluetooth-Central Bluetooth-Central
Micropython library for the Bluetooth-Central(ESP32-C2) Micropython library for the Bluetooth-Central(ESP32-C2)
======================================================= =======================================================
#https://github.com/micropython/micropython/tree/master/examples/bluetooth #https://github.com/micropython/micropython/tree/master/examples/bluetooth
@dahanzimin From the Mixly Team @dahanzimin From the Mixly Team
""" """
import time,gc import time,gc
import bluetooth import bluetooth
from micropython import const from micropython import const
from ubinascii import hexlify,unhexlify from ubinascii import hexlify,unhexlify
from ble_advertising import decode_services, decode_name from ble_advertising import decode_services, decode_name
_IRQ_CENTRAL_CONNECT = const(1) _IRQ_CENTRAL_CONNECT = const(1)
_IRQ_CENTRAL_DISCONNECT = const(2) _IRQ_CENTRAL_DISCONNECT = const(2)
_IRQ_GATTS_WRITE = const(3) _IRQ_GATTS_WRITE = const(3)
_IRQ_GATTS_READ_REQUEST = const(4) _IRQ_GATTS_READ_REQUEST = const(4)
_IRQ_SCAN_RESULT = const(5) _IRQ_SCAN_RESULT = const(5)
_IRQ_SCAN_DONE = const(6) _IRQ_SCAN_DONE = const(6)
_IRQ_PERIPHERAL_CONNECT = const(7) _IRQ_PERIPHERAL_CONNECT = const(7)
_IRQ_PERIPHERAL_DISCONNECT = const(8) _IRQ_PERIPHERAL_DISCONNECT = const(8)
_IRQ_GATTC_SERVICE_RESULT = const(9) _IRQ_GATTC_SERVICE_RESULT = const(9)
_IRQ_GATTC_SERVICE_DONE = const(10) _IRQ_GATTC_SERVICE_DONE = const(10)
_IRQ_GATTC_CHARACTERISTIC_RESULT = const(11) _IRQ_GATTC_CHARACTERISTIC_RESULT = const(11)
_IRQ_GATTC_CHARACTERISTIC_DONE = const(12) _IRQ_GATTC_CHARACTERISTIC_DONE = const(12)
_IRQ_GATTC_DESCRIPTOR_RESULT = const(13) _IRQ_GATTC_DESCRIPTOR_RESULT = const(13)
_IRQ_GATTC_DESCRIPTOR_DONE = const(14) _IRQ_GATTC_DESCRIPTOR_DONE = const(14)
_IRQ_GATTC_READ_RESULT = const(15) _IRQ_GATTC_READ_RESULT = const(15)
_IRQ_GATTC_READ_DONE = const(16) _IRQ_GATTC_READ_DONE = const(16)
_IRQ_GATTC_WRITE_DONE = const(17) _IRQ_GATTC_WRITE_DONE = const(17)
_IRQ_GATTC_NOTIFY = const(18) _IRQ_GATTC_NOTIFY = const(18)
_IRQ_GATTC_INDICATE = const(19) _IRQ_GATTC_INDICATE = const(19)
_ADV_IND = const(0x00) _ADV_IND = const(0x00)
_ADV_DIRECT_IND = const(0x01) _ADV_DIRECT_IND = const(0x01)
_ADV_SCAN_IND = const(0x02) _ADV_SCAN_IND = const(0x02)
_ADV_NONCONN_IND = const(0x03) _ADV_NONCONN_IND = const(0x03)
_UART_SERVICE_UUID = bluetooth.UUID(0x1101) _UART_SERVICE_UUID = bluetooth.UUID(0x1101)
_UART_RX_CHAR_UUID = bluetooth.UUID("6E400002-B5A3-F393-E0A9-E50E24DCCA9E") _UART_RX_CHAR_UUID = bluetooth.UUID("6E400002-B5A3-F393-E0A9-E50E24DCCA9E")
_UART_TX_CHAR_UUID = bluetooth.UUID("6E400003-B5A3-F393-E0A9-E50E24DCCA9E") _UART_TX_CHAR_UUID = bluetooth.UUID("6E400003-B5A3-F393-E0A9-E50E24DCCA9E")
class BLESimpleCentral: class BLESimpleCentral:
def __init__(self): def __init__(self):
self._ble = bluetooth.BLE() self._ble = bluetooth.BLE()
self._scan_flg = True self._scan_flg = True
self._ble.active(True) self._ble.active(True)
self._ble.irq(self._irq) self._ble.irq(self._irq)
self._reset() self._reset()
self.scan() self.scan()
def _reset(self): def _reset(self):
# Cached name and address from a successful scan. # Cached name and address from a successful scan.
self._name = None self._name = None
self._addr_type = None self._addr_type = None
self._addr = None self._addr = None
# Callbacks for completion of various operations. # Callbacks for completion of various operations.
# These reset back to None after being invoked. # These reset back to None after being invoked.
self._conn_callback = None self._conn_callback = None
self._read_callback = None self._read_callback = None
# Persistent callback for when new data is notified from the device. # Persistent callback for when new data is notified from the device.
self._notify_callback = None self._notify_callback = None
self._write_data=None self._write_data=None
# Connected device. # Connected device.
self._conn_handle = None self._conn_handle = None
self._start_handle = None self._start_handle = None
self._end_handle = None self._end_handle = None
self._tx_handle = None self._tx_handle = None
self._rx_handle = None self._rx_handle = None
def _irq(self, event, data): def _irq(self, event, data):
if event == _IRQ_SCAN_RESULT: if event == _IRQ_SCAN_RESULT:
addr_type, addr, adv_type, rssi, adv_data = data addr_type, addr, adv_type, rssi, adv_data = data
if adv_type in (_ADV_IND, _ADV_DIRECT_IND) and _UART_SERVICE_UUID in decode_services(adv_data): if adv_type in (_ADV_IND, _ADV_DIRECT_IND) and _UART_SERVICE_UUID in decode_services(adv_data):
# Found a potential device, remember it and stop scanning. # Found a potential device, remember it and stop scanning.
self._addr_type = addr_type self._addr_type = addr_type
self._addr = bytes(addr) # Note: addr buffer is owned by caller so need to copy it. self._addr = bytes(addr) # Note: addr buffer is owned by caller so need to copy it.
self._name = decode_name(adv_data) or "?" self._name = decode_name(adv_data) or "?"
if self._addr in self._info[2]: if self._addr in self._info[2]:
#self._ble.gap_scan(None) #self._ble.gap_scan(None)
return None return None
else: else:
self._info[0].append(self._name) self._info[0].append(self._name)
self._info[1].append(self._addr_type) self._info[1].append(self._addr_type)
self._info[2].append(self._addr) self._info[2].append(self._addr)
self._info[3].append(rssi) self._info[3].append(rssi)
elif event == _IRQ_SCAN_DONE: elif event == _IRQ_SCAN_DONE:
self._scan_flg = False self._scan_flg = False
elif event == _IRQ_PERIPHERAL_CONNECT: elif event == _IRQ_PERIPHERAL_CONNECT:
# Connect successful. # Connect successful.
conn_handle, addr_type, addr = data conn_handle, addr_type, addr = data
if addr_type == self._addr_type and addr == self._addr: if addr_type == self._addr_type and addr == self._addr:
self._conn_handle = conn_handle self._conn_handle = conn_handle
self._ble.gattc_discover_services(self._conn_handle) self._ble.gattc_discover_services(self._conn_handle)
elif event == _IRQ_PERIPHERAL_DISCONNECT: elif event == _IRQ_PERIPHERAL_DISCONNECT:
# Disconnect (either initiated by us or the remote end). # Disconnect (either initiated by us or the remote end).
conn_handle, _, _ = data conn_handle, _, _ = data
if conn_handle == self._conn_handle: if conn_handle == self._conn_handle:
# If it was initiated by us, it'll already be reset. # If it was initiated by us, it'll already be reset.
self._reset() self._reset()
elif event == _IRQ_GATTC_SERVICE_RESULT: elif event == _IRQ_GATTC_SERVICE_RESULT:
# Connected device returned a service. # Connected device returned a service.
conn_handle, start_handle, end_handle, uuid = data conn_handle, start_handle, end_handle, uuid = data
print("service", data) print("service", data)
if conn_handle == self._conn_handle and uuid == _UART_SERVICE_UUID: if conn_handle == self._conn_handle and uuid == _UART_SERVICE_UUID:
self._start_handle, self._end_handle = start_handle, end_handle self._start_handle, self._end_handle = start_handle, end_handle
elif event == _IRQ_GATTC_SERVICE_DONE: elif event == _IRQ_GATTC_SERVICE_DONE:
# Service query complete. # Service query complete.
if self._start_handle and self._end_handle: if self._start_handle and self._end_handle:
self._ble.gattc_discover_characteristics( self._ble.gattc_discover_characteristics(
self._conn_handle, self._start_handle, self._end_handle self._conn_handle, self._start_handle, self._end_handle
) )
else: else:
print("Failed to find uart service.") print("Failed to find uart service.")
elif event == _IRQ_GATTC_CHARACTERISTIC_RESULT: elif event == _IRQ_GATTC_CHARACTERISTIC_RESULT:
# Connected device returned a characteristic. # Connected device returned a characteristic.
conn_handle, def_handle, value_handle, properties, uuid = data conn_handle, def_handle, value_handle, properties, uuid = data
if conn_handle == self._conn_handle and uuid == _UART_RX_CHAR_UUID: if conn_handle == self._conn_handle and uuid == _UART_RX_CHAR_UUID:
self._rx_handle = value_handle self._rx_handle = value_handle
if conn_handle == self._conn_handle and uuid == _UART_TX_CHAR_UUID: if conn_handle == self._conn_handle and uuid == _UART_TX_CHAR_UUID:
self._tx_handle = value_handle self._tx_handle = value_handle
elif event == _IRQ_GATTC_CHARACTERISTIC_DONE: elif event == _IRQ_GATTC_CHARACTERISTIC_DONE:
# Characteristic query complete. # Characteristic query complete.
if self._tx_handle is not None and self._rx_handle is not None: if self._tx_handle is not None and self._rx_handle is not None:
# We've finished connecting and discovering device, fire the connect callback. # We've finished connecting and discovering device, fire the connect callback.
if self._conn_callback: if self._conn_callback:
self._conn_callback() self._conn_callback()
else: else:
print("Failed to find uart rx characteristic.") print("Failed to find uart rx characteristic.")
elif event == _IRQ_GATTC_WRITE_DONE: elif event == _IRQ_GATTC_WRITE_DONE:
conn_handle, value_handle, status = data conn_handle, value_handle, status = data
print("TX complete") print("TX complete")
elif event == _IRQ_GATTC_NOTIFY: elif event == _IRQ_GATTC_NOTIFY:
conn_handle, value_handle, notify_data = data conn_handle, value_handle, notify_data = data
if conn_handle == self._conn_handle and value_handle == self._tx_handle: if conn_handle == self._conn_handle and value_handle == self._tx_handle:
try: try:
self._write_data=bytes(notify_data).decode().strip() self._write_data=bytes(notify_data).decode().strip()
except: except:
self._write_data=bytes(notify_data) self._write_data=bytes(notify_data)
if self._notify_callback: if self._notify_callback:
self._notify_callback(self._write_data) self._notify_callback(self._write_data)
# Returns true if we've successfully connected and discovered characteristics. # Returns true if we've successfully connected and discovered characteristics.
def is_connected(self): def is_connected(self):
return (self._conn_handle is not None and self._tx_handle is not None and self._rx_handle is not None) return (self._conn_handle is not None and self._tx_handle is not None and self._rx_handle is not None)
# Find a device advertising the environmental sensor service. # Find a device advertising the environmental sensor service.
def scan(self): def scan(self):
self._info = [[],[],[],[]] self._info = [[],[],[],[]]
self._ble.gap_scan(5000, 30000, 30000) self._ble.gap_scan(5000, 30000, 30000)
while self._scan_flg: while self._scan_flg:
time.sleep_ms(10) time.sleep_ms(10)
self._scan_flg = True self._scan_flg = True
info=[] info=[]
for i in range(len(self._info[0])): for i in range(len(self._info[0])):
info.append([self._info[0][i],self._info[1][i],hexlify(self._info[2][i]).decode(),self._info[3][i]]) info.append([self._info[0][i],self._info[1][i],hexlify(self._info[2][i]).decode(),self._info[3][i]])
return info return info
# Connect to the specified device (otherwise use cached address from a scan). # Connect to the specified device (otherwise use cached address from a scan).
def connect(self, name=None,mac=None, callback=None): def connect(self, name=None,mac=None, callback=None):
if mac and unhexlify(mac) in self._info[2]: if mac and unhexlify(mac) in self._info[2]:
index=self._info[2].index(unhexlify(mac)) index=self._info[2].index(unhexlify(mac))
self._addr_type=self._info[1][index] self._addr_type=self._info[1][index]
self._addr=unhexlify(mac) self._addr=unhexlify(mac)
elif name and name in self._info[0]: elif name and name in self._info[0]:
index=self._info[0].index(name) index=self._info[0].index(name)
self._addr_type=self._info[1][index] self._addr_type=self._info[1][index]
self._addr=self._info[2][index] self._addr=self._info[2][index]
else: else:
raise ValueError("Bluetooth was not found") raise ValueError("Bluetooth was not found")
self._conn_callback = callback self._conn_callback = callback
self._ble.gap_connect(self._addr_type, self._addr) self._ble.gap_connect(self._addr_type, self._addr)
return True return True
# Disconnect from current device. # Disconnect from current device.
def disconnect(self): def disconnect(self):
if not self._conn_handle: if not self._conn_handle:
return return
self._ble.gap_disconnect(self._conn_handle) self._ble.gap_disconnect(self._conn_handle)
self._reset() self._reset()
gc.collect() gc.collect()
# Send data over the UART # Send data over the UART
def send(self, v, response=False): def send(self, v, response=False):
if not self.is_connected(): if not self.is_connected():
return return
self._ble.gattc_write(self._conn_handle, self._rx_handle, v, 1 if response else 0) self._ble.gattc_write(self._conn_handle, self._rx_handle, v, 1 if response else 0)
# Set handler for when data is received over the UART. # Set handler for when data is received over the UART.
def recv(self, callback= None): def recv(self, callback= None):
if callback: if callback:
self._notify_callback = callback self._notify_callback = callback
else: else:
write_data=self._write_data write_data=self._write_data
self._write_data=None self._write_data=None
return write_data return write_data
@property @property
def mac(self): def mac(self):
'''Get mac address''' '''Get mac address'''
return hexlify(self._ble.config('mac')[1]).decode() return hexlify(self._ble.config('mac')[1]).decode()

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

View File

@@ -136,7 +136,7 @@ export const ce_go_hall_attachInterrupt = function (_, generator) {
else if (version == "mixgo_ce") { else if (version == "mixgo_ce") {
generator.definitions_['import_ce_go_hall_' + dropdown_mode] = 'from ce_go import hall_' + dropdown_mode; generator.definitions_['import_ce_go_hall_' + dropdown_mode] = 'from ce_go import hall_' + dropdown_mode;
}else if (version == "mixgo_mini"){ }else if (version == "mixgo_mini"){
generator.definitions_['import_mini_go_hall' + dropdown_mod] = 'from mini_go import hall_'+ dropdown_mode; generator.definitions_['import_mini_go_hall' + dropdown_mode] = 'from mini_go import hall_'+ dropdown_mode;
} }
var atta = generator.valueToCode(this, 'DO', generator.ORDER_ATOMIC); var atta = generator.valueToCode(this, 'DO', generator.ORDER_ATOMIC);
var code = 'hall_' + dropdown_mode + '.irq_cb(' + atta + ')\n' var code = 'hall_' + dropdown_mode + '.irq_cb(' + atta + ')\n'
@@ -153,7 +153,7 @@ export const ce_go_hall_initialize = function (_, generator) {
else if (version == "mixgo_ce") { else if (version == "mixgo_ce") {
generator.definitions_['import_ce_go_hall_' + dropdown_mode] = 'from ce_go import hall_' + dropdown_mode; generator.definitions_['import_ce_go_hall_' + dropdown_mode] = 'from ce_go import hall_' + dropdown_mode;
}else if (version == "mixgo_mini"){ }else if (version == "mixgo_mini"){
generator.definitions_['import_mini_go_hall' + dropdown_mod] = 'from mini_go import hall_'+ dropdown_mode; generator.definitions_['import_mini_go_hall' + dropdown_mode] = 'from mini_go import hall_'+ dropdown_mode;
} }
var num = generator.valueToCode(this, 'num', generator.ORDER_ATOMIC); var num = generator.valueToCode(this, 'num', generator.ORDER_ATOMIC);
if (args == 'all') { if (args == 'all') {
@@ -174,7 +174,7 @@ export const ce_go_hall_data = function (_, generator) {
else if (version == "mixgo_ce") { else if (version == "mixgo_ce") {
generator.definitions_['import_ce_go_hall_' + dropdown_mode] = 'from ce_go import hall_' + dropdown_mode; generator.definitions_['import_ce_go_hall_' + dropdown_mode] = 'from ce_go import hall_' + dropdown_mode;
}else if (version == "mixgo_mini"){ }else if (version == "mixgo_mini"){
generator.definitions_['import_mini_go_hall' + dropdown_mod] = 'from mini_go import hall_'+ dropdown_mode; generator.definitions_['import_mini_go_hall' + dropdown_mode] = 'from mini_go import hall_'+ dropdown_mode;
} }
var code = 'hall_' + dropdown_mode + '.' + args + ''; var code = 'hall_' + dropdown_mode + '.' + args + '';
return [code, generator.ORDER_ATOMIC]; return [code, generator.ORDER_ATOMIC];