初始化提交

This commit is contained in:
王立帮
2024-07-19 10:16:00 +08:00
parent 4c7b571f20
commit 4a2d56dcc4
7084 changed files with 741212 additions and 63 deletions

View File

@@ -0,0 +1,919 @@
# MicroPython Human Interface Device library
from micropython import const
import struct
import bluetooth
import json
import binascii
from bluetooth import UUID
F_READ = bluetooth.FLAG_READ
F_WRITE = bluetooth.FLAG_WRITE
F_READ_WRITE = bluetooth.FLAG_READ | bluetooth.FLAG_WRITE
F_READ_NOTIFY = bluetooth.FLAG_READ | bluetooth.FLAG_NOTIFY
ATT_F_READ = 0x01
ATT_F_WRITE = 0x02
_ADV_TYPE_FLAGS = const(0x01)
_ADV_TYPE_NAME = const(0x09)
_ADV_TYPE_UUID16_COMPLETE = const(0x3)
_ADV_TYPE_UUID32_COMPLETE = const(0x5)
_ADV_TYPE_UUID128_COMPLETE = const(0x7)
_ADV_TYPE_UUID16_MORE = const(0x2)
_ADV_TYPE_UUID32_MORE = const(0x4)
_ADV_TYPE_UUID128_MORE = const(0x6)
_ADV_TYPE_APPEARANCE = const(0x19)
# IRQ peripheral role event codes
_IRQ_CENTRAL_CONNECT = const(1)
_IRQ_CENTRAL_DISCONNECT = const(2)
_IRQ_GATTS_WRITE = const(3)
_IRQ_GATTS_READ_REQUEST = const(4)
_IRQ_SCAN_RESULT = const(5)
_IRQ_SCAN_DONE = const(6)
_IRQ_PERIPHERAL_CONNECT = const(7)
_IRQ_PERIPHERAL_DISCONNECT = const(8)
_IRQ_GATTC_SERVICE_RESULT = const(9)
_IRQ_GATTC_SERVICE_DONE = const(10)
_IRQ_GATTC_CHARACTERISTIC_RESULT = const(11)
_IRQ_GATTC_CHARACTERISTIC_DONE = const(12)
_IRQ_GATTC_DESCRIPTOR_RESULT = const(13)
_IRQ_GATTC_DESCRIPTOR_DONE = const(14)
_IRQ_GATTC_READ_RESULT = const(15)
_IRQ_GATTC_READ_DONE = const(16)
_IRQ_GATTC_WRITE_DONE = const(17)
_IRQ_GATTC_NOTIFY = const(18)
_IRQ_GATTC_INDICATE = const(19)
_IRQ_GATTS_INDICATE_DONE = const(20)
_IRQ_MTU_EXCHANGED = const(21)
_IRQ_L2CAP_ACCEPT = const(22)
_IRQ_L2CAP_CONNECT = const(23)
_IRQ_L2CAP_DISCONNECT = const(24)
_IRQ_L2CAP_RECV = const(25)
_IRQ_L2CAP_SEND_READY = const(26)
_IRQ_CONNECTION_UPDATE = const(27)
_IRQ_ENCRYPTION_UPDATE = const(28)
_IRQ_GET_SECRET = const(29)
_IRQ_SET_SECRET = const(30)
_IRQ_PASSKEY_ACTION = const(31)
_IO_CAPABILITY_DISPLAY_ONLY = const(0)
_IO_CAPABILITY_DISPLAY_YESNO = const(1)
_IO_CAPABILITY_KEYBOARD_ONLY = const(2)
_IO_CAPABILITY_NO_INPUT_OUTPUT = const(3)
_IO_CAPABILITY_KEYBOARD_DISPLAY = const(4)
_PASSKEY_ACTION_INPUT = const(2)
_PASSKEY_ACTION_DISP = const(3)
_PASSKEY_ACTION_NUMCMP = const(4)
class Advertiser:
# Generate a payload to be passed to gap_advertise(adv_data=...).
def advertising_payload(self, limited_disc=False, br_edr=False, name=None, services=None, appearance=0):
payload = bytearray()
def _append(adv_type, value):
nonlocal payload
payload += struct.pack("BB", len(value) + 1, adv_type) + value
_append(
_ADV_TYPE_FLAGS,
struct.pack("B", (0x01 if limited_disc else 0x02) + (0x18 if br_edr else 0x04)),
)
if name:
_append(_ADV_TYPE_NAME, name)
if services:
for uuid in services:
b = bytes(uuid)
if len(b) == 2:
_append(_ADV_TYPE_UUID16_COMPLETE, b)
elif len(b) == 4:
_append(_ADV_TYPE_UUID32_COMPLETE, b)
elif len(b) == 16:
_append(_ADV_TYPE_UUID128_COMPLETE, b)
# See org.bluetooth.characteristic.gap.appearance.xml
if appearance:
_append(_ADV_TYPE_APPEARANCE, struct.pack("<h", appearance))
return payload
def decode_field(self, payload, adv_type):
i = 0
result = []
while i + 1 < len(payload):
if payload[i + 1] == adv_type:
result.append(payload[i + 2 : i + payload[i] + 1])
i += 1 + payload[i]
return result
def decode_name(self, payload):
n = self.decode_field(payload, _ADV_TYPE_NAME)
return str(n[0], "utf-8") if n else ""
def decode_services(self, payload):
services = []
for u in self.decode_field(payload, _ADV_TYPE_UUID16_COMPLETE):
services.append(bluetooth.UUID(struct.unpack("<h", u)[0]))
for u in self.decode_field(payload, _ADV_TYPE_UUID32_COMPLETE):
services.append(bluetooth.UUID(struct.unpack("<d", u)[0]))
for u in self.decode_field(payload, _ADV_TYPE_UUID128_COMPLETE):
services.append(bluetooth.UUID(u))
return services
# Init as generic HID device (960 = generic HID appearance value)
def __init__(self, ble, services=[UUID(0x1812)], appearance=const(960), name="Generic HID Device"):
self._ble = ble
self._payload = self.advertising_payload(name=name, services=services, appearance=appearance)
self.advertising = False
print("Advertiser created: ", self.decode_name(self._payload), " with services: ", self.decode_services(self._payload))
# Start advertising at 100000 interval
def start_advertising(self):
if not self.advertising:
self._ble.gap_advertise(100000, adv_data=self._payload)
print("Started advertising")
# Stop advertising by setting interval of 0
def stop_advertising(self):
if self.advertising:
self._ble.gap_advertise(0, adv_data=self._payload)
print("Stopped advertising")
# Class that represents a general HID device services
class HumanInterfaceDevice(object):
DEVICE_STOPPED = const(0)
DEVICE_IDLE = const(1)
DEVICE_ADVERTISING = const(2)
DEVICE_CONNECTED = const(3)
def __init__(self, device_name="Generic HID Device"):
self._ble = bluetooth.BLE()
self.adv = None
self.device_state = HumanInterfaceDevice.DEVICE_STOPPED
self.conn_handle = None
self.state_change_callback = None
self.io_capability = _IO_CAPABILITY_NO_INPUT_OUTPUT
self.bond = False
self.le_secure = False
print("Server created")
self.device_name = device_name
self.service_uuids = [UUID(0x180A), UUID(0x180F), UUID(0x1812)] # Service UUIDs: DIS, BAS, HIDS
self.device_appearance = 960 # Generic HID Appearance
self.battery_level = 100
self.model_number = "1"
self.serial_number = "1"
self.firmware_revision = "1"
self.hardware_revision = "1"
self.software_revision = "1"
self.manufacture_name = "Homebrew"
self.pnp_manufacturer_source = 0x01 # Bluetooth uuid list
self.pnp_manufacturer_uuid = 0xFE61 # 0xFEB2 for Microsoft, 0xFE61 for Logitech, 0xFD65 for Razer
self.pnp_product_id = 0x01 # ID 1
self.pnp_product_version = 0x0123 # Version 1.2.3
self.DIS = ( # Device Information Service description
UUID(0x180A), # Device Information
(
(UUID(0x2A24), F_READ), # Model number string
(UUID(0x2A25), F_READ), # Serial number string
(UUID(0x2A26), F_READ), # Firmware revision string
(UUID(0x2A27), F_READ), # Hardware revision string
(UUID(0x2A28), F_READ), # Software revision string
(UUID(0x2A29), F_READ), # Manufacturer name string
(UUID(0x2A50), F_READ), # PnP ID
),
)
self.BAS = ( # Battery Service description
UUID(0x180F), # Device Information
(
(UUID(0x2A19), F_READ_NOTIFY), # Battery level
),
)
self.services = [self.DIS, self.BAS] # List of service descriptions, append HIDS
self.HID_INPUT_REPORT = None
# Passkey for pairing
# Only used when io capability allows so
self.passkey = 1234
# Key store for bonding
self.keys = {}
# Load known keys
self.load_secrets()
# Interrupt request callback function
def ble_irq(self, event, data):
if event == _IRQ_CENTRAL_CONNECT: # Central connected
self.conn_handle, _, _ = data # Save the handle
print("Central connected: ", self.conn_handle)
self.set_state(HumanInterfaceDevice.DEVICE_CONNECTED) # (HIDS specification only allow one central to be connected)
elif event == _IRQ_CENTRAL_DISCONNECT: # Central disconnected
self.conn_handle = None # Discard old handle
conn_handle, addr_type, addr = data
print("Central disconnected: ", conn_handle)
self.set_state(HumanInterfaceDevice.DEVICE_IDLE)
elif event == _IRQ_MTU_EXCHANGED: # MTU was set
conn_handle, mtu = data
self._ble.config(mtu=mtu)
print("MTU exchanged: ", mtu)
elif event == _IRQ_CONNECTION_UPDATE: # Connection parameters were updated
self.conn_handle, _, _, _, _ = data # The new parameters
print("Connection update")
elif event == _IRQ_ENCRYPTION_UPDATE: # Encryption updated
conn_handle, encrypted, authenticated, bonded, key_size = data
print("encryption update", conn_handle, encrypted, authenticated, bonded, key_size)
elif event == _IRQ_PASSKEY_ACTION: # Passkey actions: accept connection or show/enter passkey
conn_handle, action, passkey = data
print("passkey action", conn_handle, action, passkey)
if action == _PASSKEY_ACTION_NUMCMP: # Do we accept this connection?
accept = False
if self.passkey_callback is not None: # Is callback function set?
accept = self.passkey_callback() # Call callback for input
self._ble.gap_passkey(conn_handle, action, accept)
elif action == _PASSKEY_ACTION_DISP: # Show our passkey
print("displaying passkey")
self._ble.gap_passkey(conn_handle, action, self.passkey)
elif action == _PASSKEY_ACTION_INPUT: # Enter passkey
print("prompting for passkey")
pk = None
if self.passkey_callback is not None: # Is callback function set?
pk = self.passkey_callback() # Call callback for input
self._ble.gap_passkey(conn_handle, action, pk)
else:
print("unknown action")
elif event == _IRQ_GATTS_INDICATE_DONE:
conn_handle, value_handle, status = data
print("gatts done: ", conn_handle)
elif event == _IRQ_SET_SECRET: # Set secret for bonding
sec_type, key, value = data
key = sec_type, bytes(key)
value = bytes(value) if value else None
print("set secret: ", key, value)
if value is None: # If value is empty, and
if key in self.keys: # If key is known then
del self.keys[key] # Forget key
self.save_secrets() # Save bonding information
return True
else:
return False
else:
self.keys[key] = value # Remember key/value
self.save_secrets() # Save bonding information
return True
elif event == _IRQ_GET_SECRET: # Get secret for bonding
sec_type, index, key = data
print("get secret: ", sec_type, index, bytes(key) if key else None)
if key is None:
i = 0
for (t, _key), value in self.keys.items():
if t == sec_type:
if i == index:
return value
i += 1
return None
else:
key = sec_type, bytes(key)
return self.keys.get(key, None)
else:
print("Unhandled IRQ event: ", event, data)
# Start the service
# Must be overwritten by subclass, and called in
# the overwritten function by using super(Subclass, self).start()
# io_capability determines whether and how passkeys are used
def start(self):
if self.device_state is HumanInterfaceDevice.DEVICE_STOPPED:
# Set interrupt request callback function
self._ble.irq(self.ble_irq)
# Turn on BLE radio
self._ble.active(1)
# Configure BLE interface
# Set GAP device name
self._ble.config(gap_name=self.device_name)
# Configure MTU
self._ble.config(mtu=23)
# Allow bonding
if self.bond: # calling this on ESP32 is unsupported
self._ble.config(bond=True)
if self.le_secure: # calling these on ESP32 is unsupported
# Require secure pairing
self._ble.config(le_secure=True)
# Require man in the middle protection
self._ble.config(mitm=True)
# Set our input/output capabilities
self._ble.config(io=self.io_capability)
self.set_state(HumanInterfaceDevice.DEVICE_IDLE)
print("BLE on")
# After registering the DIS and BAS services, write their characteristic values
# Must be overwritten by subclass, and called in
# the overwritten function by using
# super(Subclass, self).write_service_characteristics(handles)
def write_service_characteristics(self, handles):
print("Writing service characteristics")
# Get handles to service characteristics
# These correspond directly to self.DIS and sel.BAS
(h_mod, h_ser, h_fwr, h_hwr, h_swr, h_man, h_pnp) = handles[0]
(self.h_bat,) = handles[1]
def string_pack(in_str):
return struct.pack(str(len(in_str))+"s", in_str.encode('UTF-8'))
# Write service characteristics
print("Writing device information service characteristics")
self._ble.gatts_write(h_mod, string_pack(self.model_number))
self._ble.gatts_write(h_ser, string_pack(self.serial_number))
self._ble.gatts_write(h_fwr, string_pack(self.firmware_revision))
self._ble.gatts_write(h_hwr, string_pack(self.hardware_revision))
self._ble.gatts_write(h_swr, string_pack(self.software_revision))
self._ble.gatts_write(h_man, string_pack(self.manufacture_name))
# "<B" is now "<BHHH" basis https://www.bluetooth.com/wp-content/uploads/Sitecore-Media-Library/Gatt/Xml/Characteristics/org.bluetooth.characteristic.pnp_id.xml
self._ble.gatts_write(h_pnp, struct.pack("<BHHH", self.pnp_manufacturer_source, self.pnp_manufacturer_uuid, self.pnp_product_id, self.pnp_product_version))
print("Writing battery service characteristics")
# Battery level
self._ble.gatts_write(self.h_bat, struct.pack("<B", self.battery_level))
# Stop the service
def stop(self):
if self.device_state is not HumanInterfaceDevice.DEVICE_STOPPED:
if self.device_state is HumanInterfaceDevice.DEVICE_ADVERTISING:
self.adv.stop_advertising()
if self.conn_handle is not None:
self._ble.gap_disconnect(self.conn_handle)
self.conn_handle = None
self._ble.active(0)
self.set_state(HumanInterfaceDevice.DEVICE_STOPPED)
print("Server stopped")
# Load bonding keys from json file
def load_secrets(self):
try:
with open("keys.json", "r") as file:
entries = json.load(file)
for sec_type, key, value in entries:
self.keys[sec_type, binascii.a2b_base64(key)] = binascii.a2b_base64(value)
except:
print("no secrets available")
# Save bonding keys from json file
def save_secrets(self):
try:
with open("keys.json", "w") as file:
json_secrets = [
(sec_type, binascii.b2a_base64(key), binascii.b2a_base64(value))
for (sec_type, key), value in self.keys.items()
]
json.dump(json_secrets, file)
except:
print("failed to save secrets")
def is_running(self):
return self.device_state is not HumanInterfaceDevice.DEVICE_STOPPED
def is_connected(self):
return self.device_state is HumanInterfaceDevice.DEVICE_CONNECTED
def is_advertising(self):
return self.device_state is HumanInterfaceDevice.DEVICE_ADVERTISING
# Set a new state and notify the user's callback function
def set_state(self, state):
self.device_state = state
if self.state_change_callback is not None:
self.state_change_callback()
def get_state(self):
return self.device_state
def set_state_change_callback(self, callback):
self.state_change_callback = callback
def start_advertising(self):
if self.device_state is not HumanInterfaceDevice.DEVICE_STOPPED and self.device_state is not HumanInterfaceDevice.DEVICE_ADVERTISING:
self.adv.start_advertising()
self.set_state(HumanInterfaceDevice.DEVICE_ADVERTISING)
def stop_advertising(self):
if self.device_state is not HumanInterfaceDevice.DEVICE_STOPPED:
self.adv.stop_advertising()
if self.device_state is not HumanInterfaceDevice.DEVICE_CONNECTED:
self.set_state(HumanInterfaceDevice.DEVICE_IDLE)
def get_device_name(self):
return self.device_name
def get_services_uuids(self):
return self.service_uuids
def get_appearance(self):
return self.device_appearance
def get_battery_level(self):
return self.battery_level
# Sets the value for the battery level
def set_battery_level(self, level):
if level > 100:
self.battery_level = 100
elif level < 0:
self.battery_level = 0
else:
self.battery_level = level
# Set device information
# Must be called before calling Start()
# Variables must be Strings
def set_device_information(self, manufacture_name="Homebrew", model_number="1", serial_number="1"):
self.manufacture_name = manufacture_name
self.model_number = model_number
self.serial_number = serial_number
# Set device revision
# Must be called before calling Start()
# Variables must be Strings
def set_device_revision(self, firmware_revision="1", hardware_revision="1", software_revision="1"):
self.firmware_revision = firmware_revision
self.hardware_revision = hardware_revision
self.software_revision = software_revision
# Set device pnp information
# Must be called before calling Start()
# Must use the following format:
# pnp_manufacturer_source: 0x01 for manufacturers uuid from the Bluetooth uuid list OR 0x02 from the USBs id list
# pnp_manufacturer_uuid: 0xFEB2 for Microsoft, 0xFE61 for Logitech, 0xFD65 for Razer with source 0x01
# pnp_product_id: One byte, user defined
# pnp_product_version: Two bytes, user defined, format as 0xJJMN which corresponds to version JJ.M.N
def set_device_pnp_information(self, pnp_manufacturer_source=0x01, pnp_manufacturer_uuid=0xFE61, pnp_product_id=0x01, pnp_product_version=0x0123):
self.pnp_manufacturer_source = pnp_manufacturer_source
self.pnp_manufacturer_uuid = pnp_manufacturer_uuid
self.pnp_product_id = pnp_product_id
self.pnp_product_version = pnp_product_version
# Set whether to use Bluetooth bonding
def set_bonding(self, bond):
self.bond = bond
# Set whether to use LE secure pairing
def set_le_secure(self, le_secure):
self.le_secure = le_secure
# Set input/output capability of this device
def set_io_capability(self, io_capability):
self.io_capability = io_capability
# Set callback function for pairing events
# Depending on the I/O capability used, the callback function should return either a
# - boolean to accept or deny a connection, or a
# - passkey that was displayed by the main
def set_passkey_callback(self, passkey_callback):
self.passkey_callback = passkey_callback
# Set the passkey used during pairing when entering a passkey at the main
def set_passkey(self, passkey):
self.passkey = passkey
# Notifies the central by writing to the battery level handle
def notify_battery_level(self):
if self.is_connected():
print("Notify battery level: ", self.battery_level)
self._ble.gatts_notify(self.conn_handle, self.h_bat, struct.pack("<B", self.battery_level))
# Notifies the central of the HID state
# Must be overwritten by subclass
def notify_hid_report(self):
return
# Class that represents the Joystick service
class Joystick(HumanInterfaceDevice):
def __init__(self, name="Bluetooth Joystick"):
super(Joystick, self).__init__(name) # Set up the general HID services in super
self.device_appearance = 963 # Device appearance ID, 963 = joystick
self.HIDS = ( # Service description: describes the service and how we communicate
UUID(0x1812), # Human Interface Device
(
(UUID(0x2A4A), F_READ), # HID information
(UUID(0x2A4B), F_READ), # HID report map
(UUID(0x2A4C), F_WRITE), # HID control point
(UUID(0x2A4D), F_READ_NOTIFY, ((UUID(0x2908), ATT_F_READ),)), # HID report / reference
(UUID(0x2A4E), F_READ_WRITE), # HID protocol mode
),
)
# fmt: off
self.HID_INPUT_REPORT = bytes([ # Report Description: describes what we communicate
0x05, 0x01, # USAGE_PAGE (Generic Desktop)
0x09, 0x04, # USAGE (Joystick)
0xa1, 0x01, # COLLECTION (Application)
0x85, 0x01, # REPORT_ID (1)
0xa1, 0x00, # COLLECTION (Physical)
0x09, 0x30, # USAGE (X)
0x09, 0x31, # USAGE (Y)
0x15, 0x81, # LOGICAL_MINIMUM (-127)
0x25, 0x7f, # LOGICAL_MAXIMUM (127)
0x75, 0x08, # REPORT_SIZE (8)
0x95, 0x02, # REPORT_COUNT (2)
0x81, 0x02, # INPUT (Data,Var,Abs)
0x05, 0x09, # USAGE_PAGE (Button)
0x29, 0x08, # USAGE_MAXIMUM (Button 8)
0x19, 0x01, # USAGE_MINIMUM (Button 1)
0x95, 0x08, # REPORT_COUNT (8)
0x75, 0x01, # REPORT_SIZE (1)
0x25, 0x01, # LOGICAL_MAXIMUM (1)
0x15, 0x00, # LOGICAL_MINIMUM (0)
0x81, 0x02, # Input (Data, Variable, Absolute)
0xc0, # END_COLLECTION
0xc0 # END_COLLECTION
])
# fmt: on
# Define the initial joystick state
self.x = 0
self.y = 0
self.button1 = 0
self.button2 = 0
self.button3 = 0
self.button4 = 0
self.button5 = 0
self.button6 = 0
self.button7 = 0
self.button8 = 0
self.services = [self.DIS, self.BAS, self.HIDS] # List of service descriptions
# Overwrite super to register HID specific service
# Call super to register DIS and BAS services
def start(self):
super(Joystick, self).start() # Start super
print("Registering services")
# Register services and get read/write handles for all services
handles = self._ble.gatts_register_services(self.services)
# Write the values for the characteristics
self.write_service_characteristics(handles)
# Create an Advertiser
# Only advertise the top level service, i.e., the HIDS
self.adv = Advertiser(self._ble, [UUID(0x1812)], self.device_appearance, self.device_name)
print("Server started")
# Overwrite super to write HID specific characteristics
# Call super to write DIS and BAS characteristics
def write_service_characteristics(self, handles):
super(Joystick, self).write_service_characteristics(handles)
# Get the handles from the hids, the third service after DIS and BAS
# These correspond directly to self.HIDS
(h_info, h_hid, _, self.h_rep, h_d1, h_proto,) = handles[2]
# Pack the initial joystick state as described by the input report
b = self.button1 + self.button2 * 2 + self.button3 * 4 + self.button4 * 8 + self.button5 * 16 + self.button6 * 32 + self.button7 * 64 + self.button8 * 128
state = struct.pack("bbB", self.x, self.y, b)
print("Writing hid service characteristics")
# Write service characteristics
self._ble.gatts_write(h_info, b"\x01\x01\x00\x02") # HID info: ver=1.1, country=0, flags=normal
self._ble.gatts_write(h_hid, self.HID_INPUT_REPORT) # HID input report map
self._ble.gatts_write(self.h_rep, state) # HID report
self._ble.gatts_write(h_d1, struct.pack("<BB", 1, 1)) # HID reference: id=1, type=input
self._ble.gatts_write(h_proto, b"\x01") # HID protocol mode: report
# Overwrite super to notify central of a hid report
def notify_hid_report(self):
if self.is_connected():
# Pack the joystick state as described by the input report
b = self.button1 + self.button2 * 2 + self.button3 * 4 + self.button4 * 8 + self.button5 * 16 + self.button6 * 32 + self.button7 * 64 + self.button8 * 128
state = struct.pack("bbB", self.x, self.y, b)
print("Notify with report: ", struct.unpack("bbB", state))
# Notify central by writing to the report handle
self._ble.gatts_notify(self.conn_handle, self.h_rep, state)
def set_axes(self, x=0, y=0):
if x > 127:
x = 127
elif x < -127:
x = -127
if y > 127:
y = 127
elif y < -127:
y = -127
self.x = x
self.y = y
def set_buttons(self, b1=0, b2=0, b3=0, b4=0, b5=0, b6=0, b7=0, b8=0):
self.button1 = b1
self.button2 = b2
self.button3 = b3
self.button4 = b4
self.button5 = b5
self.button6 = b6
self.button7 = b7
self.button8 = b8
# Class that represents the Mouse service
class Mouse(HumanInterfaceDevice):
def __init__(self, name="Bluetooth Mouse"):
super(Mouse, self).__init__(name) # Set up the general HID services in super
self.device_appearance = 962 # Device appearance ID, 962 = mouse
self.HIDS = ( # Service description: describes the service and how we communicate
UUID(0x1812), # Human Interface Device
(
(UUID(0x2A4A), F_READ), # HID information
(UUID(0x2A4B), F_READ), # HID report map
(UUID(0x2A4C), F_WRITE), # HID control point
(UUID(0x2A4D), F_READ_NOTIFY, ((UUID(0x2908), ATT_F_READ),)), # HID report / reference
(UUID(0x2A4E), F_READ_WRITE), # HID protocol mode
),
)
# fmt: off
self.HID_INPUT_REPORT = bytes([ # Report Description: describes what we communicate
0x05, 0x01, # USAGE_PAGE (Generic Desktop)
0x09, 0x02, # USAGE (Mouse)
0xa1, 0x01, # COLLECTION (Application)
0x85, 0x01, # REPORT_ID (1)
0x09, 0x01, # USAGE (Pointer)
0xa1, 0x00, # COLLECTION (Physical)
0x05, 0x09, # Usage Page (Buttons)
0x19, 0x01, # Usage Minimum (1)
0x29, 0x03, # Usage Maximum (3)
0x15, 0x00, # Logical Minimum (0)
0x25, 0x01, # Logical Maximum (1)
0x95, 0x03, # Report Count (3)
0x75, 0x01, # Report Size (1)
0x81, 0x02, # Input(Data, Variable, Absolute); 3 button bits
0x95, 0x01, # Report Count(1)
0x75, 0x05, # Report Size(5)
0x81, 0x03, # Input(Constant); 5 bit padding
0x05, 0x01, # Usage Page (Generic Desktop)
0x09, 0x30, # Usage (X)
0x09, 0x31, # Usage (Y)
0x09, 0x38, # Usage (Wheel)
0x15, 0x81, # Logical Minimum (-127)
0x25, 0x7F, # Logical Maximum (127)
0x75, 0x08, # Report Size (8)
0x95, 0x03, # Report Count (3)
0x81, 0x06, # Input(Data, Variable, Relative); 3 position bytes (X,Y,Wheel)
0xc0, # END_COLLECTION
0xc0 # END_COLLECTION
])
# fmt: on
# Define the initial mouse state
self.x = 0
self.y = 0
self.w = 0
self.button1 = 0
self.button2 = 0
self.button3 = 0
self.services = [self.DIS, self.BAS, self.HIDS] # List of service descriptions
# Overwrite super to register HID specific service
# Call super to register DIS and BAS services
def start(self):
super(Mouse, self).start() # Start super
print("Registering services")
# Register services and get read/write handles for all services
handles = self._ble.gatts_register_services(self.services)
# Write the values for the characteristics
self.write_service_characteristics(handles)
# Create an Advertiser
# Only advertise the top level service, i.e., the HIDS
self.adv = Advertiser(self._ble, [UUID(0x1812)], self.device_appearance, self.device_name)
print("Server started")
# Overwrite super to write HID specific characteristics
# Call super to write DIS and BAS characteristics
def write_service_characteristics(self, handles):
super(Mouse, self).write_service_characteristics(handles)
# Get the handles from the hids, the third service after DIS and BAS
# These correspond directly to self.HIDS
(h_info, h_hid, _, self.h_rep, h_d1, h_proto,) = handles[2]
# Pack the initial mouse state as described by the input report
b = self.button1 + self.button2 * 2 + self.button3 * 4
state = struct.pack("Bbbb", b, self.x, self.y, self.w)
print("Writing hid service characteristics")
# Write service characteristics
self._ble.gatts_write(h_info, b"\x01\x01\x00\x02") # HID info: ver=1.1, country=0, flags=normal
self._ble.gatts_write(h_hid, self.HID_INPUT_REPORT) # HID input report map
self._ble.gatts_write(self.h_rep, state) # HID report
self._ble.gatts_write(h_d1, struct.pack("<BB", 1, 1)) # HID reference: id=1, type=input
self._ble.gatts_write(h_proto, b"\x01") # HID protocol mode: report
# Overwrite super to notify central of a hid report
def notify_hid_report(self):
if self.is_connected():
# Pack the mouse state as described by the input report
b = self.button1 + self.button2 * 2 + self.button3* 4
state = struct.pack("Bbbb", b, self.x, self.y, self.w)
print("Notify with report: ", struct.unpack("Bbbb", state))
# Notify central by writing to the report handle
self._ble.gatts_notify(self.conn_handle, self.h_rep, state)
def set_axes(self, x=0, y=0):
if x > 127:
x = 127
elif x < -127:
x = -127
if y > 127:
y = 127
elif y < -127:
y = -127
self.x = x
self.y = y
def set_wheel(self, w=0):
if w > 127:
w = 127
elif w < -127:
w = -127
self.w = w
def set_buttons(self, b1=0, b2=0, b3=0):
self.button1 = b1
self.button2 = b2
self.button3 = b3
# Class that represents the Keyboard service
class Keyboard(HumanInterfaceDevice):
def __init__(self, name="Bluetooth Keyboard"):
super(Keyboard, self).__init__(name) # Set up the general HID services in super
self.device_appearance = 961 # Device appearance ID, 961 = keyboard
self.HIDS = ( # Service description: describes the service and how we communicate
UUID(0x1812), # Human Interface Device
(
(UUID(0x2A4A), F_READ), # HID information
(UUID(0x2A4B), F_READ), # HID report map
(UUID(0x2A4C), F_WRITE), # HID control point
(UUID(0x2A4D), F_READ_NOTIFY, ((UUID(0x2908), ATT_F_READ),)), # HID report / reference
(UUID(0x2A4D), F_READ_WRITE, ((UUID(0x2908), ATT_F_READ),)), # HID report / reference
(UUID(0x2A4E), F_READ_WRITE), # HID protocol mode
),
)
# fmt: off
self.HID_INPUT_REPORT = bytes([ # Report Description: describes what we communicate
0x05, 0x01, # USAGE_PAGE (Generic Desktop)
0x09, 0x06, # USAGE (Keyboard)
0xa1, 0x01, # COLLECTION (Application)
0x85, 0x01, # REPORT_ID (1)
0x75, 0x01, # Report Size (1)
0x95, 0x08, # Report Count (8)
0x05, 0x07, # Usage Page (Key Codes)
0x19, 0xE0, # Usage Minimum (224)
0x29, 0xE7, # Usage Maximum (231)
0x15, 0x00, # Logical Minimum (0)
0x25, 0x01, # Logical Maximum (1)
0x81, 0x02, # Input (Data, Variable, Absolute); Modifier byte
0x95, 0x01, # Report Count (1)
0x75, 0x08, # Report Size (8)
0x81, 0x01, # Input (Constant); Reserved byte
0x95, 0x05, # Report Count (5)
0x75, 0x01, # Report Size (1)
0x05, 0x08, # Usage Page (LEDs)
0x19, 0x01, # Usage Minimum (1)
0x29, 0x05, # Usage Maximum (5)
0x91, 0x02, # Output (Data, Variable, Absolute); LED report
0x95, 0x01, # Report Count (1)
0x75, 0x03, # Report Size (3)
0x91, 0x01, # Output (Constant); LED report padding
0x95, 0x06, # Report Count (6)
0x75, 0x08, # Report Size (8)
0x15, 0x00, # Logical Minimum (0)
0x25, 0x65, # Logical Maximum (101)
0x05, 0x07, # Usage Page (Key Codes)
0x19, 0x00, # Usage Minimum (0)
0x29, 0x65, # Usage Maximum (101)
0x81, 0x00, # Input (Data, Array); Key array (6 bytes)
0xc0 # END_COLLECTION
])
# fmt: on
# Define the initial keyboard state
self.modifiers = 0 # 8 bits signifying Right GUI(Win/Command), Right ALT/Option, Right Shift, Right Control, Left GUI, Left ALT, Left Shift, Left Control
self.keypresses = [0x00] * 6 # 6 keys to hold
# Callback function for keyboard messages from central
self.kb_callback = None
self.services = [self.DIS, self.BAS, self.HIDS] # List of service descriptions
# Interrupt request callback function
# Overwrite super to catch keyboard report write events by the central
def ble_irq(self, event, data):
if event == _IRQ_GATTS_WRITE: # If a client has written to a characteristic or descriptor.
print("Keyboard changed by Central")
conn_handle, attr_handle = data # Get the handle to the characteristic that was written
report = self._ble.gatts_read(attr_handle) # Read the report
bytes = struct.unpack("B", report) # Unpack the report
if self.kb_callback is not None: # Call the callback function
self.kb_callback(bytes)
else: # Else let super handle the event
super(Keyboard, self).ble_irq(event, data)
# Overwrite super to register HID specific service
# Call super to register DIS and BAS services
def start(self):
super(Keyboard, self).start() # Start super
print("Registering services")
# Register services and get read/write handles for all services
handles = self._ble.gatts_register_services(self.services)
# Write the values for the characteristics
self.write_service_characteristics(handles)
# Create an Advertiser
# Only advertise the top level service, i.e., the HIDS
self.adv = Advertiser(self._ble, [UUID(0x1812)], self.device_appearance, self.device_name)
print("Server started")
# Overwrite super to write HID specific characteristics
# Call super to write DIS and BAS characteristics
def write_service_characteristics(self, handles):
super(Keyboard, self).write_service_characteristics(handles)
# Get the handles from the hids, the third service after DIS and BAS
# These correspond directly to self.HIDS
(h_info, h_hid, _, self.h_rep, h_d1, self.h_repout, h_d2, h_proto,) = handles[2]
print("Writing hid service characteristics")
# Write service characteristics
self._ble.gatts_write(h_info, b"\x01\x01\x00\x02") # HID info: ver=1.1, country=0, flags=normal
self._ble.gatts_write(h_hid, self.HID_INPUT_REPORT) # HID input report map
self._ble.gatts_write(h_d1, struct.pack("<BB", 1, 1)) # HID reference: id=1, type=input
self._ble.gatts_write(h_d2, struct.pack("<BB", 1, 2)) # HID reference: id=1, type=output
self._ble.gatts_write(h_proto, b"\x01") # HID protocol mode: report
# Overwrite super to notify central of a hid report
def notify_hid_report(self):
if self.is_connected():
# Pack the Keyboard state as described by the input report
state = struct.pack("8B", self.modifiers, 0, self.keypresses[0], self.keypresses[1], self.keypresses[2], self.keypresses[3], self.keypresses[4], self.keypresses[5])
print("Notify with report: ", struct.unpack("8B", state))
# Notify central by writing to the report handle
self._ble.gatts_notify(self.conn_handle, self.h_rep, state)
# Set the modifier bits, notify to send the modifiers to central
def set_modifiers(self, right_gui=0, right_alt=0, right_shift=0, right_control=0, left_gui=0, left_alt=0, left_shift=0, left_control=0):
self.modifiers = (right_gui << 7) + (right_alt << 6) + (right_shift << 5) + (right_control << 4) + (left_gui << 3) + (left_alt << 2) + (left_shift << 1) + left_control
# Press keys, notify to send the keys to central
# This will hold down the keys, call set_keys() without arguments and notify again to release
def set_keys(self, k0=0x00, k1=0x00, k2=0x00, k3=0x00, k4=0x00, k5=0x00):
self.keypresses = [k0, k1, k2, k3, k4, k5]
# Set a callback function that gets notified on keyboard changes
# Should take a tuple with the report bytes
def set_kb_callback(self, kb_callback):
self.kb_callback = kb_callback

View File

@@ -0,0 +1,55 @@
"""
ME G1 -MixGo ME EXT G1
MicroPython library for the ME G1 (Expansion board for MixGo ME)
=======================================================
#Preliminary composition 20230110
dahanzimin From the Mixly Team
"""
import time,gc
from machine import Pin,SoftI2C,ADC
'''i2c-extboard'''
ext_i2c=SoftI2C(scl = Pin(0), sda = Pin(1), freq = 400000)
Pin(0,Pin.OUT)
'''Atmos_Sensor'''
try :
import hp203x
ext_hp203x = hp203x.HP203X(ext_i2c)
except Exception as e:
print("Warning: Failed to communicate with HP203X or",e)
'''T&H_Sensor'''
try :
import ahtx0
ext_ahtx0 = ahtx0.AHTx0(ext_i2c)
except Exception as e:
print("Warning: Failed to communicate with AHTx0 or",e)
'''RFID_Sensor'''
try :
import rc522
ext_rc522 = rc522.RC522(ext_i2c)
except Exception as e:
print("Warning: Failed to communicate with RC522 or",e)
'''Knob_Sensor'''
def varistor():
adc = ADC(Pin(0))
adc.atten(ADC.ATTN_11DB)
time.sleep_ms(1)
values = []
for _ in range(100):
values.append(adc.read_u16())
time.sleep_us(100)
result = sum(sorted(values)[25:75])//50
Pin(0,Pin.OUT)
time.sleep_ms(1)
return max((result-10100),0)*65535//55435
'''Reclaim memory'''
gc.collect()

View File

@@ -0,0 +1,249 @@
"""
ME GO -Onboard resources
MicroPython library for the ME GO (Smart Car base for MixGo ME)
=======================================================
#Preliminary composition 20220625
dahanzimin From the Mixly Team
"""
import time, gc, math
from tm1931 import TM1931
from machine import Pin, SoftI2C, ADC
'''i2c-onboard'''
i2c = SoftI2C(scl = Pin(7), sda = Pin(6), freq = 400000)
i2c_scan = i2c.scan()
'''Version judgment'''
if 0x50 in i2c_scan:
version = 1
else:
version = 0
'''Judging the type of external motor'''
Mi2c = 0
for addr in i2c_scan:
if addr in [0x30, 0x31, 0x32, 0x33]:
Mi2c = addr
break
'''i2c-motor'''
def i2c_motor(speed):
i2c.writeto(Mi2c, b'\x00\x00' + speed.to_bytes(1, 'little') + b'\x00')
'''TM1931-Expand'''
class CAR(TM1931):
'''Infrared line patrol obstacle avoidance mode'''
CL=0 #Turn off infrared to reduce power consumption
OA=1 #Obstacle avoidance mode only
LP=2 #Line patrol mode only
LS=3 #Light seeking mode only
AS=4 #Automatic mode switching
'''TM1931 port corresponding function definition'''
OAOU=5 #obstacle avoidance
LPOU=4 #Line patrol control
LSOU=3 #Light control
WLED=12 #Headlamp port
GLED=[17,8,6,15] #Green LED port
RLED=[16,7,9,18] #Red LED port
UCOU=[1,2] #Typec external port
MOTO=[[13,14],[10,11],[1,2]] #Motor port
def __init__(self, i2c_bus):
super().__init__(i2c_bus)
self._mode = self.CL
self.atten = 0.82 if version else 1
self.adc0 = ADC(Pin(0), atten=ADC.ATTN_11DB)
self.adc1 = ADC(Pin(1), atten=ADC.ATTN_11DB)
self.adc2 = ADC(Pin(2), atten=ADC.ATTN_11DB)
self.adc3 = ADC(Pin(3), atten=ADC.ATTN_11DB)
def ir_mode(self,select=0):
'''Infrared line patrol obstacle avoidance mode'''
self._mode=select
if select==self.CL:
self.pwm(self.OAOU,0)
self.pwm(self.LPOU,0)
self.pwm(self.LSOU,0)
if select==self.OA:
self.pwm(self.OAOU,255)
self.pwm(self.LPOU,0)
self.pwm(self.LSOU,0)
if select==self.LP:
self.pwm(self.OAOU,0)
self.pwm(self.LPOU,255)
self.pwm(self.LSOU,0)
if select==self.LS:
self.pwm(self.OAOU,0)
self.pwm(self.LPOU,0)
self.pwm(self.LSOU,255)
time.sleep_ms(2)
def obstacle(self):
'''Read the obstacle avoidance sensor'''
if self._mode==self.AS:
self.pwm(self.OAOU,255)
self.pwm(self.LPOU,0)
self.pwm(self.LSOU,0)
time.sleep_ms(2)
if self._mode==self.OA or self._mode==self.AS :
return self.adc2.read_u16(),self.adc1.read_u16(),self.adc0.read_u16(),self.adc3.read_u16()
else:
raise ValueError('Mode selection error, obstacle avoidance data cannot be read')
def patrol(self):
'''Read the line patrol sensor'''
if self._mode==self.AS:
self.pwm(self.OAOU,0)
self.pwm(self.LPOU,255)
self.pwm(self.LSOU,0)
time.sleep_ms(2)
if self._mode==self.LP or self._mode==self.AS:
return self.adc3.read_u16(),self.adc2.read_u16(),self.adc1.read_u16(),self.adc0.read_u16()
else:
raise ValueError('Mode selection error, line patrol data cannot be read')
def light(self):
'''Read the light seeking sensor'''
if self._mode==self.AS:
self.pwm(self.OAOU,0)
self.pwm(self.LPOU,0)
self.pwm(self.LSOU,255)
time.sleep_ms(2)
if self._mode==self.LS or self._mode==self.AS:
return self.adc3.read_u16(),self.adc2.read_u16(),self.adc1.read_u16(),self.adc0.read_u16()
else:
raise ValueError('Mode selection error, light seeking data cannot be read')
def motor(self, index, action, speed=0):
speed = round(max(min(speed, 100), -100) * self.atten)
if action=="N":
if (index == [1, 2]) and Mi2c:
i2c_motor(0)
else:
self.pwm(index[0], 255)
self.pwm(index[1], 255)
elif action=="P":
if (index == [1, 2]) and Mi2c:
i2c_motor(0)
else:
self.pwm(index[0], 0)
self.pwm(index[1], 0)
elif action=="CW":
if (index == [1, 2]) and Mi2c:
i2c_motor(speed)
else:
if speed >= 0:
self.pwm(index[0], speed * 255 // 100)
self.pwm(index[1], 0)
else:
self.pwm(index[0], 0)
self.pwm(index[1], - speed * 255 // 100)
elif action=="CCW":
if (index == [1, 2]) and Mi2c:
i2c_motor(- speed)
else:
if speed >= 0:
self.pwm(index[0], 0)
self.pwm(index[1], speed * 255 // 100)
else:
self.pwm(index[0], - speed * 255 // 100)
self.pwm(index[1], 0)
else:
raise ValueError('Invalid input, valid are "N","P","CW","CCW"')
def move(self,action,speed=100):
if action=="N":
self.motor(self.MOTO[0],"N")
self.motor(self.MOTO[1],"N")
elif action=="P":
self.motor(self.MOTO[0],"P")
self.motor(self.MOTO[1],"P")
elif action=="F":
self.motor(self.MOTO[0],"CCW",speed)
self.motor(self.MOTO[1],"CW",speed)
elif action=="B":
self.motor(self.MOTO[0],"CW",speed)
self.motor(self.MOTO[1],"CCW",speed)
elif action=="L":
self.motor(self.MOTO[0],"CW",speed)
self.motor(self.MOTO[1],"CW",speed)
elif action=="R":
self.motor(self.MOTO[0],"CCW",speed)
self.motor(self.MOTO[1],"CCW",speed)
else:
raise ValueError('Invalid input, valid are "N","P","F","B","L","R"')
def setbrightness(self,index,val):
if not 0 <= val <= 100:
raise ValueError("Brightness must be in the range: 0-100%")
self.pwm(index,val)
def getrightness(self,index):
return self.duty(index)
def setonoff(self,index,val):
if(val == -1):
if self.getrightness(index) < 50:
self.setbrightness(index,100)
else:
self.setbrightness(index,0)
elif(val == 1):
self.setbrightness(index,100)
elif(val == 0):
self.setbrightness(index,0)
def getonoff(self,index):
return True if self.getrightness(index)>0 else False
try :
car=CAR(i2c) #Including LED,motor,patrol,obstacle
except Exception as e:
print("Warning: Failed to communicate with TM1931 (ME GO CAR) or", e)
'''2Hall_HEP'''
class HALL:
_pulse_turns=1/480 if version else 1/400 #圈数= 1/(减速比*磁极)
_pulse_distance=_pulse_turns*math.pi*4.4 #距离= 圈数*π*轮胎直径
def __init__(self, pin):
self.turns = 0
self.distance = 0 #cm
self._speed = 0 #cm/s
self._on_receive = None
self._time = time.ticks_ms()
Pin(pin, Pin.IN).irq(handler=self._receive_cb, trigger = (Pin.IRQ_RISING | Pin.IRQ_FALLING))
def _receive_cb(self, event_source):
self.turns += self._pulse_turns
self.distance += self._pulse_distance
self._speed += self._pulse_distance
if self._on_receive:
self._on_receive(round(self.turns,2),round(self.distance,2))
def irq_cb(self, callback):
self._on_receive = callback
def initial(self,turns=None,distance=None):
if not (turns is None):
self.turns = turns
if not (distance is None):
self.distance = distance
@property
def speed(self):
value=self._speed/time.ticks_diff(time.ticks_ms(), self._time)*1000 if self._speed>0 else 0
self._time = time.ticks_ms()
self._speed=0
return round(value, 2)
hall_A = HALL(20)
hall_B = HALL(21)
'''Reclaim memory'''
gc.collect()

View File

@@ -0,0 +1,250 @@
"""
MixGo CC -Onboard resources
MicroPython library for the MixGo CC -Onboard resources
=======================================================
#Preliminary composition 20221010
dahanzimin From the Mixly Team
"""
import time, gc
from machine import Pin, SoftI2C, ADC, PWM, RTC
'''i2c-onboard'''
onboard_i2c=SoftI2C(scl = Pin(7), sda = Pin(6), freq = 400000)
onboard_i2c_scan = onboard_i2c.scan()
'''Version judgment'''
if 0x73 in onboard_i2c_scan:
version=1
elif 0x72 in onboard_i2c_scan:
version=0
else:
print("Warning: Mixgo CC board is not detected, which may cause usage errors")
'''RTC'''
rtc_clock=RTC()
'''ACC-Sensor'''
try :
import mxc6655xa
onboard_acc = mxc6655xa.MXC6655XA(onboard_i2c)
except Exception as e:
print("Warning: Failed to communicate with MXC6655XA (ACC) or",e)
'''ALS_PS-Sensor'''
try :
import ltr553als
onboard_als = ltr553als.LTR_553ALS(onboard_i2c)
except Exception as e:
print("Warning: Failed to communicate with TR_553ALS (ALS&PS) or",e)
'''BPS_Sensor'''
if 0x76 in onboard_i2c_scan:
try :
import hp203x
onboard_bps = hp203x.HP203X(onboard_i2c)
except Exception as e:
print("Warning: Failed to communicate with HP203X (BPS) or",e)
if 0x77 in onboard_i2c_scan:
try :
import spl06_001
onboard_bps = spl06_001.SPL06(onboard_i2c)
except Exception as e:
print("Warning: Failed to communicate with SPL06-001 (BPS) or",e)
'''T&H_Sensor'''
if 0x38 in onboard_i2c_scan:
try :
import ahtx0
onboard_ths = ahtx0.AHTx0(onboard_i2c)
except Exception as e:
print("Warning: Failed to communicate with AHTx0 (THS) or",e)
if 0x70 in onboard_i2c_scan:
try :
import shtc3
onboard_ths = shtc3.SHTC3(onboard_i2c)
except Exception as e:
print("Warning: Failed to communicate with GXHTC3 (THS) or",e)
'''RFID_Sensor'''
try :
import rc522
onboard_rfid = rc522.RC522(onboard_i2c)
except Exception as e:
print("Warning: Failed to communicate with RC522 (RFID) or",e)
'''matrix32x12'''
try :
import matrix32x12
onboard_matrix = matrix32x12.Matrix(onboard_i2c, address=0x73 if version else 0x72)
except Exception as e:
print("Warning: Failed to communicate with Matrix32X12 or",e)
'''Magnetic'''
try :
import mmc5603
onboard_mgs = mmc5603.MMC5603(onboard_i2c)
except Exception as e:
print("Warning: Failed to communicate with MMC5603 (MGS) or",e)
'''2RGB_WS2812'''
from ws2812 import NeoPixel
onboard_rgb = NeoPixel(Pin(8), 4, ORDER=(0, 1, 2, 3))
'''1Buzzer-Music'''
from music import MIDI
onboard_music =MIDI(10)
'''MIC_Sensor'''
class MICSensor:
def __init__(self,pin):
self.adc=ADC(Pin(pin), atten=ADC.ATTN_11DB)
def read(self):
maxloudness = 0
for i in range(5):
loudness = self.sample()
if loudness > maxloudness:
maxloudness = loudness
return maxloudness
def sample(self):
values = []
for i in range(50):
val = self.adc.read_u16()
values.append(val)
return max(values) - min(values)
onboard_sound = MICSensor(pin=4 if version else 3)
'''4,5KEY_Sensor'''
class KEYSensor:
def __init__(self, pin, range):
self.adc = ADC(Pin(pin), atten=ADC.ATTN_11DB)
self.pin = pin
self.range = range
self.flag = True
def _value(self):
values = []
for _ in range(50):
values.append(self.adc.read())
time.sleep_us(2)
return (self.range - 300) < min(sorted(values)[25:]) < (self.range + 300)
def get_presses(self, delay = 1):
last_time,presses = time.time(), 0
while time.time() < last_time + delay:
time.sleep_ms(50)
if self.was_pressed():
presses += 1
return presses
def is_pressed(self):
return self._value()
def was_pressed(self):
if(self._value() != self.flag):
self.flag = self._value()
if self.flag :
return True
else:
return False
def irq(self, handler, trigger):
Pin(self.pin, Pin.IN).irq(handler = handler, trigger = trigger)
'''2,1KEY_Button'''
class Button:
def __init__(self, pin):
self.pin = Pin(pin, Pin.IN)
self.flag = True
def get_presses(self, delay = 1):
last_time,presses = time.time(), 0
while time.time() < last_time + delay:
time.sleep(0.05)
if self.was_pressed():
presses += 1
return presses
def is_pressed(self):
return not self.pin.value()
def was_pressed(self, flag = 0):
if(self.pin.value() != self.flag):
self.flag = self.pin.value()
time.sleep(0.02)
if self.flag:
return False
else:
return True
def irq(self, handler, trigger):
self.pin.irq(handler = handler, trigger = trigger)
if version==0:
B1key = Button(9)
B2key = Button(4)
A1key = KEYSensor(2,20)
A2key = KEYSensor(2,1170)
A3key = KEYSensor(2,2400)
A4key = KEYSensor(2,3610)
else:
B1key = Button(9)
B2key = KEYSensor(5,20)
A1key = KEYSensor(5,800)
A2key = KEYSensor(5,1600)
A3key = KEYSensor(5,2500)
A4key = KEYSensor(5,3500)
'''2-LED''' #Modify indexing method
class LED:
def __init__(self, pins=[]):
self._pins = pins
self._flag = [True] * len(pins)
self._brightness = [0] * len(pins)
def setbrightness(self, index, val):
if not 0 <= val <= 100:
raise ValueError("Brightness must be in the range: 0-100%")
if len(self._pins) == 0:
print("Warning: Old version, without this function")
else:
if self._flag[index-1]:
self._pins[index-1] = PWM(Pin(self._pins[index-1]), duty_u16=65535)
self._flag[index-1] = False
self._brightness[index - 1] = val
self._pins[index - 1].duty_u16(65535 - val * 65535 // 100)
def getbrightness(self, index):
if len(self._pins) == 0:
print("Warning: Old version, without this function")
else:
return self._brightness[index - 1]
def setonoff(self, index, val):
if len(self._pins) == 0:
print("Warning: Old version, without this function")
else:
if val == -1:
self.setbrightness(index, 100) if self.getbrightness(index) < 50 else self.setbrightness(index, 0)
elif val == 1:
self.setbrightness(index, 100)
elif val == 0:
self.setbrightness(index, 0)
def getonoff(self, index):
if len(self._pins) == 0:
print("Warning: Old version, without this function")
else:
return True if self.getbrightness(index) > 50 else False
#LED with function call / L1(IO20),L2(IO21)
onboard_led = LED() if version == 0 else LED(pins=[20, 21])
'''Reclaim memory'''
gc.collect()

View File

@@ -0,0 +1,192 @@
"""
MixGo ME -Onboard resources
MicroPython library for the MixGo ME -Onboard resources
=======================================================
#Preliminary composition 20221010
dahanzimin From the Mixly Team
"""
import time, gc
from machine import Pin, SoftI2C, ADC, PWM, RTC
'''i2c-onboard'''
onboard_i2c=SoftI2C(scl = Pin(7), sda = Pin(6), freq = 400000)
'''RTC'''
rtc_clock=RTC()
'''ACC-Sensor'''
try :
import mxc6655xa
onboard_acc = mxc6655xa.MXC6655XA(onboard_i2c)
except Exception as e:
print("Warning: Failed to communicate with MXC6655XA or",e)
'''ALS_PS-Sensor'''
try :
import ltr553als
onboard_als = ltr553als.LTR_553ALS(onboard_i2c)
except Exception as e:
print("Warning: Failed to communicate with LTR_553ALS or",e)
'''Matrix8x5'''
try :
import matrix8x5
onboard_matrix = matrix8x5.Matrix(8)
except Exception as e:
print("Warning: Failed to communicate with Matrix8x5 or",e)
'''Magnetic'''
try :
import mmc5603
onboard_mgs = mmc5603.MMC5603(onboard_i2c)
except Exception as e:
print("Warning: Failed to communicate with MMC5603 or",e)
'''2RGB_WS2812'''
from ws2812 import NeoPixel
onboard_rgb = NeoPixel(Pin(9), 2, ORDER=(0, 1, 2, 3), multiplex=1)
'''1Buzzer-Music'''
from music import MIDI
onboard_music =MIDI(10)
'''MIC_Sensor'''
class MICSensor:
def __init__(self):
self.adc=ADC(Pin(4), atten=ADC.ATTN_11DB)
def read(self):
maxloudness = 0
for i in range(5):
loudness = self.sample()
if loudness > maxloudness:
maxloudness = loudness
return maxloudness
def sample(self):
values = []
for i in range(50):
val = self.adc.read_u16()
values.append(val)
return max(values) - min(values)
onboard_sound = MICSensor()
'''5KEY_Sensor'''
class KEYSensor:
def __init__(self,range):
self.adc=ADC(Pin(5), atten=ADC.ATTN_11DB)
self.range=range
self.flag = True
def _value(self):
values = []
for _ in range(50):
values.append(self.adc.read())
time.sleep_us(2)
return (self.range - 300) < min(sorted(values)[25:]) < (self.range + 300)
def get_presses(self, delay = 1):
last_time,presses = time.time(), 0
while time.time() < last_time + delay:
time.sleep_ms(50)
if self.was_pressed():
presses += 1
return presses
def is_pressed(self):
return self._value()
def was_pressed(self):
if(self._value() != self.flag):
self.flag = self._value()
if self.flag :
return True
else:
return False
def irq(self, handler, trigger):
Pin(5, Pin.IN).irq(handler = handler, trigger = trigger)
B2key = KEYSensor(20)
A1key = KEYSensor(800)
A2key = KEYSensor(1600)
A3key = KEYSensor(2500)
A4key = KEYSensor(3500)
'''1KEY_Button'''
class Button:
def __init__(self, pin):
self.pin = Pin(pin, Pin.IN)
self.flag = True
def get_presses(self, delay = 1):
last_time,presses = time.time(), 0
while time.time() < last_time + delay:
time.sleep(0.05)
if self.was_pressed():
presses += 1
return presses
def is_pressed(self):
return not self.pin.value()
def was_pressed(self, flag = 0):
if(self.pin.value() != self.flag):
self.flag = self.pin.value()
time.sleep(0.02)
if self.flag:
return False
else:
return True
def irq(self, handler, trigger):
self.pin.irq(handler = handler, trigger = trigger)
B1key = Button(9)
'''2LED-Multiplex RGB'''
class LED:
def __init__(self, rgb, num=2, color=3):
self._rgb = rgb
self._col = [color] * num
self._color = ((0, 0, 0), (1, 0, 0), (0, 1, 0), (0, 0, 1), (1, 1, 0), (0, 1, 1), (1, 0, 1), (1, 1, 1))
def setbrightness(self, index, value):
self._rgb[index - 1] = (value if self._color[self._col[index-1]][0] else 0,
value if self._color[self._col[index-1]][1] else 0,
value if self._color[self._col[index-1]][2] else 0)
self._rgb.write()
def getbrightness(self, index):
color = self._rgb[index - 1]
return color[0] | color[1] | color[2]
def setonoff(self, index, value):
if value == -1:
if self.getbrightness(index) < 50:
self.setbrightness(index, 100)
else:
self.setbrightness(index, 0)
elif value == 1:
self.setbrightness(index, 100)
elif value == 0:
self.setbrightness(index, 0)
def getonoff(self, index):
return True if self.getbrightness(index) > 50 else False
def setcolor(self, index, color):
self._col[index-1] = color
def getcolor(self, index):
return self._col[index-1]
onboard_led=LED(onboard_rgb)
'''Reclaim memory'''
gc.collect()

View File

@@ -0,0 +1,254 @@
"""
MixGo CAR -Onboard resources
MicroPython library for the MixGo CAR (ESP32C3)
=======================================================
#Preliminary composition 20220804
dahanzimin From the Mixly Team
"""
import time,gc,ms32006
from machine import Pin,SoftI2C,ADC,RTC
'''RTC'''
rtc_clock=RTC()
'''i2c-onboard'''
onboard_i2c=SoftI2C(scl = Pin(7), sda = Pin(6), freq = 400000)
'''4RGB_WS2812''' #color_chase(),rainbow_cycle()方法移至类里
from ws2812 import NeoPixel
onboard_rgb = NeoPixel(Pin(8), 4, ORDER=(0, 1, 2, 3))
'''1Buzzer-Music'''
from music import MIDI
onboard_music =MIDI(5)
'''1KEY_Button'''
class Button:
def __init__(self, pin):
self.pin = Pin(pin, Pin.IN)
self.flag = True
def get_presses(self, delay = 1):
last_time, last_state, presses = time.time(), 0, 0
while time.time() < last_time + delay:
time.sleep_ms(50)
if last_state == 0 and self.pin.value() == 1:
last_state = 1
if last_state == 1 and self.pin.value() == 0:
last_state, presses = 0, presses + 1
return presses
def is_pressed(self):
return not self.pin.value()
def was_pressed(self, flag = 0):
if(self.pin.value() != self.flag):
self.flag = self.pin.value()
time.sleep(0.02)
if self.flag:
return False
else:
return True
def irq(self, handler, trigger):
self.pin.irq(handler = handler, trigger = trigger)
button = Button(9)
'''MS32006-Drive'''
class CAR:
MOTO_R =1
MOTO_R1 =0
MOTO_R2 =7
MOTO_L =2
MOTO_L1 =4
MOTO_L2 =3
def __init__(self, i2c_bus):
self.motor_a=ms32006.MS32006(i2c_bus,ms32006.ADDRESS_A)
self.motor_b=ms32006.MS32006(i2c_bus,ms32006.ADDRESS_B)
self.motor_move("P")
def motor(self,index,action,speed=0):
if action=="N":
if index==self.MOTO_R:
self.motor_a.dc_motor(ms32006.MOT_N,speed)
if index==self.MOTO_L:
self.motor_b.dc_motor(ms32006.MOT_N,speed)
elif action=="P":
if index==self.MOTO_R:
self.motor_a.dc_motor(ms32006.MOT_P,speed)
if index==self.MOTO_L:
self.motor_b.dc_motor(ms32006.MOT_P,speed)
elif action=="CW":
if index==self.MOTO_R:
self.motor_a.dc_motor(ms32006.MOT_CW,speed)
if index==self.MOTO_L:
self.motor_b.dc_motor(ms32006.MOT_CW,speed)
elif action=="CCW":
if index==self.MOTO_R:
self.motor_a.dc_motor(ms32006.MOT_CCW,speed)
if index==self.MOTO_L:
self.motor_b.dc_motor(ms32006.MOT_CCW,speed)
else:
raise ValueError('Invalid input, valid are "N","P","CW","CCW"')
def stepper(self,index,action,mot_pps,mot_step):
if action=="N":
if index==self.MOTO_R1 or index==self.MOTO_L1:
self.motor_a.close(index)
if index==self.MOTO_R2 or index==self.MOTO_L2:
self.motor_b.close(index-3)
elif action=="P":
if index==self.MOTO_R1 or index==self.MOTO_L1:
self.motor_a.stop(index)
if index==self.MOTO_R2 or index==self.MOTO_L2:
self.motor_b.stop(index-3)
elif action=="CW":
if index==self.MOTO_R1 or index==self.MOTO_L1:
self.motor_a.move(index,ms32006.MOT_CW,mot_pps,mot_step)
if index==self.MOTO_R2 or index==self.MOTO_L2:
self.motor_b.move(index-3,ms32006.MOT_CW,mot_pps,mot_step)
elif action=="CCW":
if index==self.MOTO_R1 or index==self.MOTO_L1:
self.motor_a.move(index,ms32006.MOT_CCW,mot_pps,mot_step)
if index==self.MOTO_R2 or index==self.MOTO_L2:
self.motor_b.move(index-3,ms32006.MOT_CCW,mot_pps,mot_step)
else:
raise ValueError('Invalid input, valid are "N","P","CW","CCW"')
def stepper_readwork(self,index):
if index==self.MOTO_R1 or index==self.MOTO_L1:
return self.motor_a.readwork(index)
if index==self.MOTO_R2 or index==self.MOTO_L2:
return self.motor_b.readwork(index-3)
def stepper_move(self,action,mot_pps,mot_step):
if action=="N":
self.motor_a.close(self.MOTO_R1)
self.motor_a.close(self.MOTO_R2-3)
self.motor_b.close(self.MOTO_L1)
self.motor_b.close(self.MOTO_L2-3)
elif action=="P":
self.motor_a.stop(self.MOTO_R1)
self.motor_a.stop(self.MOTO_R2-3)
self.motor_b.stop(self.MOTO_L1)
self.motor_b.stop(self.MOTO_L2-3)
elif action=="F":
self.motor_a.move(self.MOTO_R1,ms32006.MOT_CW,mot_pps,mot_step)
self.motor_a.move(self.MOTO_R2-3,ms32006.MOT_CCW,mot_pps,mot_step)
self.motor_b.move(self.MOTO_L1,ms32006.MOT_CW,mot_pps,mot_step)
self.motor_b.move(self.MOTO_L2-3,ms32006.MOT_CCW,mot_pps,mot_step)
elif action=="B":
self.motor_a.move(self.MOTO_R1,ms32006.MOT_CCW,mot_pps,mot_step)
self.motor_a.move(self.MOTO_R2-3,ms32006.MOT_CW,mot_pps,mot_step)
self.motor_b.move(self.MOTO_L1,ms32006.MOT_CCW,mot_pps,mot_step)
self.motor_b.move(self.MOTO_L2-3,ms32006.MOT_CW,mot_pps,mot_step)
elif action=="L":
self.motor_a.move(self.MOTO_R1,ms32006.MOT_CCW,mot_pps,mot_step)
self.motor_a.move(self.MOTO_R2-3,ms32006.MOT_CCW,mot_pps,mot_step)
self.motor_b.move(self.MOTO_L1,ms32006.MOT_CCW,mot_pps,mot_step)
self.motor_b.move(self.MOTO_L2-3,ms32006.MOT_CCW,mot_pps,mot_step)
elif action=="R":
self.motor_a.move(self.MOTO_R1,ms32006.MOT_CW,mot_pps,mot_step)
self.motor_a.move(self.MOTO_R2-3,ms32006.MOT_CW,mot_pps,mot_step)
self.motor_b.move(self.MOTO_L1,ms32006.MOT_CW,mot_pps,mot_step)
self.motor_b.move(self.MOTO_L2-3,ms32006.MOT_CW,mot_pps,mot_step)
else:
raise ValueError('Invalid input, valid are "N","P","F","B","L","R"')
def motor_move(self,action,speed=100):
if action=="N":
self.motor_a.dc_motor(ms32006.MOT_N,speed)
self.motor_b.dc_motor(ms32006.MOT_N,speed)
elif action=="P":
self.motor_a.dc_motor(ms32006.MOT_P,speed)
self.motor_b.dc_motor(ms32006.MOT_P,speed)
elif action=="F":
self.motor_a.dc_motor(ms32006.MOT_CCW,speed)
self.motor_b.dc_motor(ms32006.MOT_CW,speed)
elif action=="B":
self.motor_a.dc_motor(ms32006.MOT_CW,speed)
self.motor_b.dc_motor(ms32006.MOT_CCW,speed)
elif action=="L":
self.motor_a.dc_motor(ms32006.MOT_CCW,speed)
self.motor_b.dc_motor(ms32006.MOT_CCW,speed)
elif action=="R":
self.motor_a.dc_motor(ms32006.MOT_CW,speed)
self.motor_b.dc_motor(ms32006.MOT_CW,speed)
else:
raise ValueError('Invalid input, valid are "N","P","F","B","L","R"')
try :
car=CAR(onboard_i2c) #Including LED,motor,patrol,obstacle
except Exception as e:
print("Warning: Failed to communicate with MS32006 or",e)
'''IRtube-Drive'''
class IRtube:
OA=1 #Obstacle avoidance mode only
LP=2 #Line patrol mode only
AS=3 #Automatic mode switching
def __init__(self):
#auto是否手动切换模拟开关转换
self.adc0 = ADC(Pin(0))
self.adc1 = ADC(Pin(1))
self.adc2 = ADC(Pin(3))
self.adc3 = ADC(Pin(4))
self.adc0.atten(ADC.ATTN_11DB)
self.adc1.atten(ADC.ATTN_11DB)
self.adc2.atten(ADC.ATTN_11DB)
self.adc3.atten(ADC.ATTN_11DB)
self.convert=Pin(2, Pin.OUT)
self._mode=self.AS
def read_bat(self):
#读电池电量,返回电压值
self.convert = ADC(Pin(2))
self.convert.atten(ADC.ATTN_11DB)
time.sleep_ms(5)
bat_adc=self.convert.read()*0.0011
time.sleep_ms(5)
self.convert=Pin(2, Pin.OUT)
return bat_adc
def obstacle(self):
#读避障传感器,返回前左、右后左、右ADC值
if self._mode==self.AS:
self.convert.value(0)
time.sleep_ms(2)
if self._mode==self.OA or self._mode==self.AS :
return self.adc1.read_u16(),self.adc2.read_u16(),self.adc3.read_u16(),self.adc0.read_u16()
else:
raise ValueError('In line patrol mode, obstacle avoidance data cannot be read')
def patrol(self):
#读巡线传感器,返回左、中左、中右、右ADC值
if self._mode==self.AS:
self.convert.value(1)
time.sleep_ms(2)
if self._mode==self.LP or self._mode==self.AS:
return self.adc0.read_u16(),self.adc1.read_u16(),self.adc2.read_u16(),self.adc3.read_u16()
else:
raise ValueError('In obstacle avoidance mode, line patrol data cannot be read')
def ir_mode(self,select=0):
#切换模式
self._mode=select
if select==self.OA:
self.convert.value(0)
if select==self.LP:
self.convert.value(1)
time.sleep_ms(2)
onboard_info = IRtube()
'''Reclaim memory'''
gc.collect()