build(boards): 所有板卡执行 npm run build:prod

This commit is contained in:
王立帮
2025-10-24 22:43:34 +08:00
parent 17ad6d31a0
commit 00d1c46627
176 changed files with 31780 additions and 17439 deletions

View File

@@ -0,0 +1,602 @@
# SPDX-FileCopyrightText: 2009 Kazuhiko Arase
# SPDX-FileCopyrightText: 2021 ladyada for Adafruit Industries
#
# SPDX-License-Identifier: MIT
# Ported from the Javascript library by Sam Curren
# QRCode for Javascript
# http://d-project.googlecode.com/svn/trunk/misc/qrcode/js/qrcode.js
#
# Minimized for CircuitPython by ladyada for adafruit industries
#
# The word "QR Code" is registered trademark of
# DENSO WAVE INCORPORATED
# http://www.denso-wave.com/qrcode/faqpatent-e.html
"""
`adafruit_miniqr`
====================================================
A non-hardware dependant miniature QR generator library. All native Python!
* Author(s): ladyada
Implementation Notes
--------------------
**Hardware:**
* Any!
**Software and Dependencies:**
* Python 3
"""
# imports
import math
try:
from typing import Dict, List, Optional, Tuple
except ImportError:
pass
__version__ = "0.0.0+auto.0"
__repo__ = "https://github.com/adafruit/Adafruit_CircuitPython_miniQR.git"
# Consts!
M = 0
L = 1
H = 2
Q = 3
_MODE_8BIT_BYTE = 1 << 2
_PAD0 = 0xEC
_PAD1 = 0x11
# Optimized polynomial helpers
def _glog(n: int) -> int:
"""Lookup log(n) from pre-calculated byte table"""
if n < 1:
raise ValueError("glog(" + n + ")")
return LOG_TABLE[n]
def _gexp(n: int) -> int:
"""Lookup exp(n) from pre-calculated byte table"""
while n < 0:
n += 255
while n >= 256:
n -= 255
return EXP_TABLE[n]
EXP_TABLE = b"\x01\x02\x04\x08\x10 @\x80\x1d:t\xe8\xcd\x87\x13&L\x98-Z\xb4u\xea\xc9\x8f\x03\x06\x0c\x180`\xc0\x9d'N\x9c%J\x945j\xd4\xb5w\xee\xc1\x9f#F\x8c\x05\n\x14(P\xa0]\xbai\xd2\xb9o\xde\xa1_\xbea\xc2\x99/^\xbce\xca\x89\x0f\x1e<x\xf0\xfd\xe7\xd3\xbbk\xd6\xb1\x7f\xfe\xe1\xdf\xa3[\xb6q\xe2\xd9\xafC\x86\x11\"D\x88\r\x1a4h\xd0\xbdg\xce\x81\x1f>|\xf8\xed\xc7\x93;v\xec\xc5\x973f\xcc\x85\x17.\\\xb8m\xda\xa9O\x9e!B\x84\x15*T\xa8M\x9a)R\xa4U\xaaI\x929r\xe4\xd5\xb7s\xe6\xd1\xbfc\xc6\x91?~\xfc\xe5\xd7\xb3{\xf6\xf1\xff\xe3\xdb\xabK\x961b\xc4\x957n\xdc\xa5W\xaeA\x82\x192d\xc8\x8d\x07\x0e\x1c8p\xe0\xdd\xa7S\xa6Q\xa2Y\xb2y\xf2\xf9\xef\xc3\x9b+V\xacE\x8a\t\x12$H\x90=z\xf4\xf5\xf7\xf3\xfb\xeb\xcb\x8b\x0b\x16,X\xb0}\xfa\xe9\xcf\x83\x1b6l\xd8\xadG\x8e\x01" # noqa: E501
LOG_TABLE = b"\x00\x00\x01\x19\x022\x1a\xc6\x03\xdf3\xee\x1bh\xc7K\x04d\xe0\x0e4\x8d\xef\x81\x1c\xc1i\xf8\xc8\x08Lq\x05\x8ae/\xe1$\x0f!5\x93\x8e\xda\xf0\x12\x82E\x1d\xb5\xc2}j'\xf9\xb9\xc9\x9a\txM\xe4r\xa6\x06\xbf\x8bbf\xdd0\xfd\xe2\x98%\xb3\x10\x91\"\x886\xd0\x94\xce\x8f\x96\xdb\xbd\xf1\xd2\x13\\\x838F@\x1eB\xb6\xa3\xc3H~nk:(T\xfa\x85\xba=\xca^\x9b\x9f\n\x15y+N\xd4\xe5\xacs\xf3\xa7W\x07p\xc0\xf7\x8c\x80c\rgJ\xde\xed1\xc5\xfe\x18\xe3\xa5\x99w&\xb8\xb4|\x11D\x92\xd9# \x89.7?\xd1[\x95\xbc\xcf\xcd\x90\x87\x97\xb2\xdc\xfc\xbea\xf2V\xd3\xab\x14*]\x9e\x84<9SGmA\xa2\x1f-C\xd8\xb7{\xa4v\xc4\x17I\xec\x7f\x0co\xf6l\xa1;R)\x9dU\xaa\xfb`\x86\xb1\xbb\xcc>Z\xcbY_\xb0\x9c\xa9\xa0Q\x0b\xf5\x16\xebzu,\xd7O\xae\xd5\xe9\xe6\xe7\xad\xe8t\xd6\xf4\xea\xa8PX\xaf" # noqa: E501
class QRCode:
"""The generator class for QR code matrices"""
def __init__(self, *, qr_type: Optional[int] = None, error_correct: int = L):
"""Initialize an empty QR code. You can define the `qr_type` (size)
of the code matrix, or have the libary auto-select the smallest
match. Default `error_correct` is type L (7%), but you can select M,
Q or H."""
self.type = qr_type
self.ECC = error_correct
self.matrix = None
self.module_count = 0
self.data_cache = None
self.data_list = []
def add_data(self, data: bytes) -> None:
"""Add more data to the QR code, must be bytestring stype"""
self.data_list.append(data)
datalen = sum(len(x) for x in self.data_list)
if not self.type:
for qr_type in range(1, 10):
rs_blocks = _get_rs_blocks(qr_type, self.ECC)
total_data_count = 0
for block in rs_blocks:
total_data_count += block["data"]
if total_data_count > datalen:
self.type = qr_type
break
self.data_cache = None
def make(self, *, test: bool = False, mask_pattern: int = 0) -> None:
"""Perform the actual generation of the QR matrix. To keep things
small and speedy we don't generate all 8 mask patterns and pick
the best. Instead, please pass in a desired mask_pattern, the
default mask is 0."""
self.module_count = self.type * 4 + 17
self.matrix = QRBitMatrix(self.module_count, self.module_count)
self._setup_position_probe_pattern(0, 0)
self._setup_position_probe_pattern(self.module_count - 7, 0)
self._setup_position_probe_pattern(0, self.module_count - 7)
self._setup_position_adjust_pattern()
self._setup_timing_pattern()
self._setup_type_info(test, mask_pattern)
if self.type >= 7:
self._setup_type_number(test)
if self.data_cache is None:
self.data_cache = QRCode._create_data(self.type, self.ECC, self.data_list)
self._map_data(self.data_cache, mask_pattern)
def _setup_position_probe_pattern(self, row: int, col: int) -> None:
"""Add the positition probe data pixels to the matrix"""
for r in range(-1, 8):
if row + r <= -1 or self.module_count <= row + r:
continue
for c in range(-1, 8):
if col + c <= -1 or self.module_count <= col + c:
continue
test = (
(0 <= r <= 6 and (c in (0, 6)))
or (0 <= c <= 6 and (r in (0, 6)))
or (2 <= r <= 4 and 2 <= c <= 4)
)
self.matrix[row + r, col + c] = test
def _setup_timing_pattern(self) -> None:
"""Add the timing data pixels to the matrix"""
for r in range(8, self.module_count - 8):
if self.matrix[r, 6] is not None:
continue
self.matrix[r, 6] = r % 2 == 0
for c in range(8, self.module_count - 8):
if self.matrix[6, c] is not None:
continue
self.matrix[6, c] = c % 2 == 0
def _setup_position_adjust_pattern(self) -> None:
"""Add the position adjust data pixels to the matrix"""
pos = QRUtil.get_pattern_position(self.type)
for row in pos:
for col in pos:
if self.matrix[row, col] is not None:
continue
for r in range(-2, 3):
for c in range(-2, 3):
test = abs(r) == 2 or abs(c) == 2 or (r == 0 and c == 0)
self.matrix[row + r, col + c] = test
def _setup_type_number(self, test: bool) -> None:
"""Add the type number pixels to the matrix"""
bits = QRUtil.get_BCH_type_number(self.type)
for i in range(18):
mod = not test and ((bits >> i) & 1) == 1
self.matrix[i // 3, i % 3 + self.module_count - 8 - 3] = mod
for i in range(18):
mod = not test and ((bits >> i) & 1) == 1
self.matrix[i % 3 + self.module_count - 8 - 3, i // 3] = mod
def _setup_type_info(self, test: bool, mask_pattern: int) -> None:
"""Add the type info pixels to the matrix"""
data = (self.ECC << 3) | mask_pattern
bits = QRUtil.get_BCH_type_info(data)
# // vertical
for i in range(15):
mod = not test and ((bits >> i) & 1) == 1
if i < 6:
self.matrix[i, 8] = mod
elif i < 8:
self.matrix[i + 1, 8] = mod
else:
self.matrix[self.module_count - 15 + i, 8] = mod
# // horizontal
for i in range(15):
mod = not test and ((bits >> i) & 1) == 1
if i < 8:
self.matrix[8, self.module_count - i - 1] = mod
elif i < 9:
self.matrix[8, 15 - i - 1 + 1] = mod
else:
self.matrix[8, 15 - i - 1] = mod
# // fixed module
self.matrix[self.module_count - 8, 8] = not test
def _map_data(self, data: bytes, mask_pattern: int) -> None:
"""Map the data onto the QR code"""
inc = -1
row = self.module_count - 1
bit_idx = 7
byte_idx = 0
for col in range(self.module_count - 1, 0, -2):
if col == 6:
col -= 1 # noqa: PLW2901 loop variable overwritten
while True:
for c in range(2):
if self.matrix[row, col - c] is None:
dark = False
if byte_idx < len(data):
dark = ((data[byte_idx] >> bit_idx) & 1) == 1
mask = QRUtil.get_mask(mask_pattern, row, col - c)
if mask:
dark = not dark
self.matrix[row, col - c] = dark
bit_idx -= 1
if bit_idx == -1:
byte_idx += 1
bit_idx = 7
row += inc
if row < 0 or self.module_count <= row:
row -= inc
inc = -inc
break
@staticmethod
def _create_data(qr_type: int, ecc: int, data_list: list) -> bytes:
"""Check and format data into bit buffer"""
rs_blocks = _get_rs_blocks(qr_type, ecc)
buffer = QRBitBuffer()
for data in data_list:
if isinstance(data, str):
data = str.encode(data) # noqa: PLW2901 loop variable overwritten
buffer.put(_MODE_8BIT_BYTE, 4)
buffer.put(len(data), 8)
for byte in data:
buffer.put(byte, 8)
# // calc num max data.
total_data_count = 0
for block in rs_blocks:
total_data_count += block["data"]
if buffer.get_length_bits() > total_data_count * 8:
raise RuntimeError(
"Code length overflow: %d > %d" % (buffer.get_length_bits(), total_data_count * 8)
)
# // end code
if buffer.get_length_bits() + 4 <= total_data_count * 8:
buffer.put(0, 4)
# // padding
while buffer.get_length_bits() % 8 != 0:
buffer.put_bit(False)
# // padding
while True:
if buffer.get_length_bits() >= total_data_count * 8:
break
buffer.put(_PAD0, 8)
if buffer.get_length_bits() >= total_data_count * 8:
break
buffer.put(_PAD1, 8)
return QRCode._create_bytes(buffer, rs_blocks)
@staticmethod
def _create_bytes(buffer: bytes, rs_blocks: List[Dict]) -> bytes: # noqa: PLR0912 Too many branches
"""Perform error calculation math on bit buffer"""
offset = 0
max_dc_count = 0
max_ec_count = 0
dcdata = [0] * len(rs_blocks)
ecdata = [0] * len(rs_blocks)
for r, block in enumerate(rs_blocks):
dc_count = block["data"]
ec_count = block["total"] - dc_count
max_dc_count = max(max_dc_count, dc_count)
max_ec_count = max(max_ec_count, ec_count)
dcdata[r] = [0] * dc_count
for i in range(len(dcdata[r])):
dcdata[r][i] = 0xFF & buffer.buffer[i + offset]
offset += dc_count
rs_poly = QRUtil.get_error_correct_polynomial(ec_count)
mod_poly = QRPolynomial(dcdata[r], rs_poly.get_length() - 1)
while True:
if mod_poly.get_length() - rs_poly.get_length() < 0:
break
ratio = _glog(mod_poly.get(0)) - _glog(rs_poly.get(0))
num = [0 for x in range(mod_poly.get_length())]
for i in range(mod_poly.get_length()):
num[i] = mod_poly.get(i)
for i in range(rs_poly.get_length()):
num[i] ^= _gexp(_glog(rs_poly.get(i)) + ratio)
mod_poly = QRPolynomial(num, 0)
ecdata[r] = [0 for x in range(rs_poly.get_length() - 1)]
for i in range(len(ecdata[r])):
mod_index = i + mod_poly.get_length() - len(ecdata[r])
if mod_index >= 0:
ecdata[r][i] = mod_poly.get(mod_index)
else:
ecdata[r][i] = 0
total_code_count = 0
for block in rs_blocks:
total_code_count += block["total"]
data = [None] * total_code_count
index = 0
for i in range(max_dc_count):
for r in range(len(rs_blocks)):
if i < len(dcdata[r]):
data[index] = dcdata[r][i]
index += 1
for i in range(max_ec_count):
for r in range(len(rs_blocks)):
if i < len(ecdata[r]):
data[index] = ecdata[r][i]
index += 1
return data
class QRUtil:
"""A selection of bit manipulation tools for QR generation and BCH encoding"""
PATTERN_POSITION_TABLE = [
b"",
b"\x06\x12",
b"\x06\x16",
b"\x06\x1a",
b"\x06\x1e",
b'\x06"',
b"\x06\x16&",
b"\x06\x18*",
b"\x06\x1a.",
b"\x06\x1c2",
]
G15 = 0b10100110111
G18 = 0b1111100100101
G15_MASK = 0b101010000010010
@staticmethod
def get_BCH_type_info(data: int) -> int:
"""Encode with G15 BCH mask"""
d = data << 10
while QRUtil.get_BCH_digit(d) - QRUtil.get_BCH_digit(QRUtil.G15) >= 0:
d ^= QRUtil.G15 << (QRUtil.get_BCH_digit(d) - QRUtil.get_BCH_digit(QRUtil.G15))
return ((data << 10) | d) ^ QRUtil.G15_MASK
@staticmethod
def get_BCH_type_number(data: int) -> int:
"""Encode with G18 BCH mask"""
d = data << 12
while QRUtil.get_BCH_digit(d) - QRUtil.get_BCH_digit(QRUtil.G18) >= 0:
d ^= QRUtil.G18 << (QRUtil.get_BCH_digit(d) - QRUtil.get_BCH_digit(QRUtil.G18))
return (data << 12) | d
@staticmethod
def get_BCH_digit(data: int) -> int:
"""Count digits in data"""
digit = 0
while data != 0:
digit += 1
data >>= 1
return digit
@staticmethod
def get_pattern_position(qr_type: int) -> bytes:
"""The mask pattern position array for this QR type"""
return QRUtil.PATTERN_POSITION_TABLE[qr_type - 1]
@staticmethod
def get_mask(mask: int, i: int, j: int) -> int: # noqa: PLR0911 Too many return statements
"""Perform matching calculation on two vals for given pattern mask"""
if mask == 0:
return (i + j) % 2 == 0
if mask == 1:
return i % 2 == 0
if mask == 2:
return j % 3 == 0
if mask == 3:
return (i + j) % 3 == 0
if mask == 4:
return (math.floor(i / 2) + math.floor(j / 3)) % 2 == 0
if mask == 5:
return (i * j) % 2 + (i * j) % 3 == 0
if mask == 6:
return ((i * j) % 2 + (i * j) % 3) % 2 == 0
if mask == 7:
return ((i * j) % 3 + (i + j) % 2) % 2 == 0
raise ValueError("Bad mask pattern:" + mask)
@staticmethod
def get_error_correct_polynomial(ecc_length: int) -> "QRPolynomial":
"""Generate a ecc polynomial"""
poly = QRPolynomial([1], 0)
for i in range(ecc_length):
poly = poly.multiply(QRPolynomial([1, _gexp(i)], 0))
return poly
class QRPolynomial:
"""Structure for creating and manipulating error code polynomials"""
def __init__(self, num: int, shift: int):
"""Create a QR polynomial"""
if not num:
raise ValueError(num.length + "/" + shift)
offset = 0
while offset < len(num) and num[offset] == 0:
offset += 1
self.num = [0 for x in range(len(num) - offset + shift)]
for i in range(len(num) - offset):
self.num[i] = num[i + offset]
def get(self, index: int) -> int:
"""The exponent at the index location"""
return self.num[index]
def get_length(self) -> int:
"""Length of the poly"""
return len(self.num)
def multiply(self, other_polynomial: "QRPolynomial") -> "QRPolynomial":
"""Multiply two polynomials, returns a new one"""
num = [0 for x in range(self.get_length() + other_polynomial.get_length() - 1)]
for i in range(self.get_length()):
for j in range(other_polynomial.get_length()):
num[i + j] ^= _gexp(_glog(self.get(i)) + _glog(other_polynomial.get(j)))
return QRPolynomial(num, 0)
_QRRS_BLOCK_TABLE = (
b"\x01\x1a\x10",
b"\x01\x1a\x13",
b"\x01\x1a\t",
b"\x01\x1a\r",
b"\x01,\x1c",
b'\x01,"',
b"\x01,\x10",
b"\x01,\x16",
b"\x01F,",
b"\x01F7",
b"\x02#\r",
b"\x02#\x11",
b"\x022 ",
b"\x01dP",
b"\x04\x19\t",
b"\x022\x18",
b"\x02C+",
b"\x01\x86l",
b'\x02!\x0b\x02"\x0c',
b'\x02!\x0f\x02"\x10',
b"\x04+\x1b",
b"\x02VD",
b"\x04+\x0f",
b"\x04+\x13",
b"\x041\x1f",
b"\x02bN",
b"\x04'\r\x01(\x0e",
b"\x02 \x0e\x04!\x0f",
b"\x02<&\x02='",
b"\x02ya",
b"\x04(\x0e\x02)\x0f",
b"\x04(\x12\x02)\x13",
b"\x03:$\x02;%",
b"\x02\x92t",
b"\x04$\x0c\x04%\r",
b"\x04$\x10\x04%\x11",
)
def _get_rs_blocks(qr_type: int, ecc: int) -> List[Dict]:
rs_block = _QRRS_BLOCK_TABLE[(qr_type - 1) * 4 + ecc]
length = len(rs_block) // 3
blocks = []
for i in range(length):
count = rs_block[i * 3 + 0]
total = rs_block[i * 3 + 1]
data = rs_block[i * 3 + 2]
block = {"total": total, "data": data}
for _ in range(count):
blocks.append(block)
return blocks
class QRBitMatrix:
"""A bit-packed storage class for matrices"""
def __init__(self, width: int, height: int):
self.width = width
self.height = height
if width > 60:
raise ValueError("Max 60 bits wide:", width)
self.buffer = [0] * self.height * 2
self.used = [0] * self.height * 2
def __repr__(self) -> str:
b = ""
for y in range(self.height):
for x in range(self.width):
if self[x, y]:
b += "X"
else:
b += "."
b += "\n"
return b
def __getitem__(self, key: Tuple[int, int]) -> int:
x, y = key
if y > self.width:
raise ValueError()
i = 2 * x + (y // 30)
j = y % 30
if not self.used[i] & (1 << j):
return None
return self.buffer[i] & (1 << j)
def __setitem__(self, key: Tuple[int, int], value: int) -> None:
x, y = key
if y > self.width:
raise ValueError()
i = 2 * x + (y // 30)
j = y % 30
if value:
self.buffer[i] |= 1 << j
else:
self.buffer[i] &= ~(1 << j)
self.used[i] |= 1 << j # buffer item was set
class QRBitBuffer:
"""Storage class for a length of individual bits"""
def __init__(self):
self.buffer = []
self.length = 0
def __repr__(self) -> str:
return ".".join([str(n) for n in self.buffer])
def get(self, index: int) -> int:
"""The bit value at a location"""
i = index // 8
return self.buffer[i] & (1 << (7 - index % 8))
def put(self, num: int, length: int) -> None:
"""Add a number of bits from a single integer value"""
for i in range(length):
self.put_bit(num & (1 << (length - i - 1)))
def get_length_bits(self) -> int:
"""Size of bit buffer"""
return self.length
def put_bit(self, bit: int) -> None:
"""Insert one bit at the end of the bit buffer"""
i = self.length // 8
if len(self.buffer) <= i:
self.buffer.append(0)
if bit:
self.buffer[i] |= 0x80 >> (self.length % 8)
self.length += 1

View File

@@ -0,0 +1,87 @@
# Implements the hmac module from the Python standard library.
class HMAC:
def __init__(self, key, msg=None, digestmod=None):
if not isinstance(key, (bytes, bytearray)):
raise TypeError("key: expected bytes/bytearray")
import hashlib
if digestmod is None:
# TODO: Default hash algorithm is now deprecated.
digestmod = hashlib.md5
if callable(digestmod):
# A hashlib constructor returning a new hash object.
make_hash = digestmod # A
elif isinstance(digestmod, str):
# A hash name suitable for hashlib.new().
make_hash = lambda d=b"": getattr(hashlib, digestmod)(d)
else:
# A module supporting PEP 247.
make_hash = digestmod.new # C
self._outer = make_hash()
self._inner = make_hash()
self.digest_size = getattr(self._inner, "digest_size", None)
# If the provided hash doesn't support block_size (e.g. built-in
# hashlib), 64 is the correct default for all built-in hash
# functions (md5, sha1, sha256).
self.block_size = getattr(self._inner, "block_size", 64)
# Truncate to digest_size if greater than block_size.
if len(key) > self.block_size:
key = make_hash(key).digest()
# Pad to block size.
key = key + bytes(self.block_size - len(key))
self._outer.update(bytes(x ^ 0x5C for x in key))
self._inner.update(bytes(x ^ 0x36 for x in key))
if msg is not None:
self.update(msg)
@property
def name(self):
return "hmac-" + getattr(self._inner, "name", type(self._inner).__name__)
def update(self, msg):
self._inner.update(msg)
def copy(self):
if not hasattr(self._inner, "copy"):
# Not supported for built-in hash functions.
raise NotImplementedError()
# Call __new__ directly to avoid the expensive __init__.
other = self.__class__.__new__(self.__class__)
other.block_size = self.block_size
other.digest_size = self.digest_size
other._inner = self._inner.copy()
other._outer = self._outer.copy()
return other
def _current(self):
h = self._outer
if hasattr(h, "copy"):
# built-in hash functions don't support this, and as a result,
# digest() will finalise the hmac and further calls to
# update/digest will fail.
h = h.copy()
h.update(self._inner.digest())
return h
def digest(self):
h = self._current()
return h.digest()
def hexdigest(self):
import binascii
return str(binascii.hexlify(self.digest()), "utf-8")
def new(key, msg=None, digestmod=None):
return HMAC(key, msg, digestmod)

File diff suppressed because it is too large Load Diff

View File

@@ -72,6 +72,7 @@ class MQTTClient:
port = 8883 if ssl else 1883
self.client_id = client_id
self.sock = None
self.server = server
self.addr = socket.getaddrinfo(server, port)[0][-1]
self.ssl = ssl
self.ssl_params = ssl_params
@@ -341,3 +342,6 @@ class MQTTClient:
self._star_time = time.ticks_ms()
self.ping()
return self.wait_msg()
def get_server_info(self):
return (self.server, self.username, self.password)

View File

@@ -3,14 +3,9 @@ Music buzzer
Micropython library for the Music buzzer
=======================================================
#Based on Author: qiren123(MIDI Music) 20220618
#Make changes to instantiation 20220622
#Increase level reversal selection 20220716
dahanzimin From the Mixly Team
@dahanzimin From the Mixly Team
"""
import _thread, gc
from time import sleep_ms
from machine import Pin, PWM
@@ -30,13 +25,15 @@ Letter = 'ABCDEFG#R'
class MIDI():
def __init__(self, pin, volume=100, invert=0, pa_ctrl=None):
self.reset()
self._invert=invert
self._invert = invert
self._pin = pin
self._volume = volume
self._play = False
self._over = True
self._pwm = None
self._pa_ctrl = pa_ctrl
def set_volume(self,volume):
def set_volume(self, volume):
if not 0 <= volume <= 100:
raise ValueError("Volume value is in the range: 0-100")
self._volume=volume
@@ -97,45 +94,60 @@ class MIDI():
self.set_duration(int(tone[(pos + 1):]))
tone = tone[:pos]
def play(self, tune, duration=None):
if self._pa_ctrl: self._pa_ctrl(1)
def play(self, tune, duration=None, pa_delay=100):
if self._pa_ctrl: self._pa_ctrl(1, pa_delay)
self._pwm = PWM(Pin(self._pin), duty=1023 if self._invert else 0)
self._play = True
self._over = False
if duration is None:
self.set_default(tune[0])
else:
self.set_duration(duration)
for tone in tune:
tone = tone.upper()
if not self._play:
break
if tone[0] not in Letter:
continue
midi = self.midi(tone)
self._pwm.duty(1023-self._volume) if self._invert else self._pwm.duty(self._volume)
self._pwm.freq(midi[0])
if self._play: self._pwm.duty((1023-self._volume) if self._invert else self._volume)
if self._play: self._pwm.freq(midi[0])
sleep_ms(midi[1])
self._pwm.freq(400000)
if self._play: self._pwm.freq(400000)
sleep_ms(1)
if self._pa_ctrl: self._pa_ctrl(0)
if self._pa_ctrl: self._pa_ctrl(0, 0)
self._pwm.deinit()
sleep_ms(10)
self._over = True
def pitch(self, freq):
if self._pa_ctrl: self._pa_ctrl(1)
self._pwm = PWM(Pin(self._pin))
self._pwm.duty(1023-self._volume) if self._invert else self._pwm.duty(self._volume)
self._pwm.freq(int(freq))
def play_thread(self, tune, duration=None, pa_delay=100):
self._play = False
while not self._over:
sleep_ms(10)
if not self._play:
gc.collect()
_thread.start_new_thread(self.play, (tune, duration, pa_delay))
sleep_ms(100)
def pitch_time(self, freq, delay):
if self._pa_ctrl: self._pa_ctrl(1)
def pitch(self, freq, pa_delay=100):
if self._pa_ctrl: self._pa_ctrl(1, pa_delay)
self._pwm = PWM(Pin(self._pin))
self._pwm.duty(1023-self._volume) if self._invert else self._pwm.duty(self._volume)
self._pwm.freq(int(freq))
self._pwm.duty((1023-self._volume) if self._invert else self._volume)
self._pwm.freq(int(freq))
def pitch_time(self, freq, delay, pa_delay=100):
if self._pa_ctrl: self._pa_ctrl(1, pa_delay)
self._pwm = PWM(Pin(self._pin))
self._pwm.duty((1023-self._volume) if self._invert else self._volume)
self._pwm.freq(int(freq))
sleep_ms(delay)
if self._pa_ctrl: self._pa_ctrl(0)
if self._pa_ctrl: self._pa_ctrl(0, 0)
self._pwm.deinit()
sleep_ms(10)
def stop(self):
if self._pa_ctrl: self._pa_ctrl(0)
self._play = False
if self._pa_ctrl: self._pa_ctrl(0, 0)
if self._pwm: self._pwm.deinit()
sleep_ms(10)

View File

@@ -3,12 +3,10 @@ QMI8658
Micropython library for the QMI8658(Accelerometer+Gyroscope)
=======================================================
#Preliminary composition 20220716
dahanzimin From the Mixly Team
@dahanzimin From the Mixly Team
"""
import time
from math import atan, sqrt, degrees
from micropython import const
QMI8658_REG_DEVICE_ID = const(0x00)
@@ -54,7 +52,7 @@ class QMI8658:
raise AttributeError("Cannot find a QMI8658")
self._wreg(QMI8658_REG_Ctrl9,0xA2) #做selftest提高精度
time.sleep(1)
time.sleep(0.1)
self._wreg(QMI8658_REG_Ctrl1,0x60)
self._wreg(QMI8658_REG_Ctrl7,0x03) #启动
self._wreg(QMI8658_REG_Ctrl2,(AccRange<< 4)|Acc_Gyr_Odr) #ACC-500HZ/8G
@@ -78,8 +76,9 @@ class QMI8658:
return self._rreg(QMI8658_REG_StatusInt)
def u2s(self,n):
return n if n < (1 << 15) else n - (1 << 16)
return n if n < (1 << 15) else n - (1 << 16)
@property
def getdata(self):
while self.status() == 0x81:
time.sleep(0.001)
@@ -92,12 +91,21 @@ class QMI8658:
gyr_y=float(self.u2s(_buffer[11]<<8|_buffer[10]))/self.gyr_lsb_div
gyr_z=float(self.u2s(_buffer[13]<<8|_buffer[12]))/self.gyr_lsb_div
return (acc_x,acc_y,acc_z),(gyr_x,gyr_y,gyr_z),round(tmp,2)
def accelerometer(self):
return self.getdata()[0]
def acceleration(self):
return self.getdata[0]
def strength(self):
return sqrt(self.getdata[0][0]**2+self.getdata[0][1]**2+self.getdata[0][2]**2)
def gyroscope(self):
return self.getdata()[1]
return self.getdata[1]
def temperature(self):
return self.getdata()[2]
return self.getdata[2]
def eulerangles(self,upright=False):
x,y,z = self.acceleration()
pitch = degrees(atan(z / sqrt(x ** 2 + y ** 2))) if upright else degrees(atan(y / sqrt(x ** 2 + z ** 2)))
roll = degrees(atan(x / sqrt(y ** 2 + z ** 2)))
return round(pitch,2),round(roll,2)

View File

@@ -111,5 +111,5 @@ class SPL06:
def temperature(self):
return self.getdata[1]
def altitude(self):
return self.getdata[2]
def altitude(self, reference=1013.25):
return round((pow((reference / 33.8639), 0.190255) - pow((self.getdata[0] / 33.8639), 0.190255)) / 0.000013125214, 2)

View File

@@ -0,0 +1,91 @@
import urequests as requests
def url_quote(s):
safe = b"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789_.-"
s = str(s)
res = bytearray()
for b in s.encode('utf-8'):
if b in safe:
res.append(b)
else:
res.extend(b'%' + b'%02X' % b)
return res.decode()
class TinyWebDB:
def __init__(self, *args):
if len(args) == 1:
self.init_with_mqtt(*args)
else:
self.init_with_user(*args)
def init_with_user(self, url, username, password):
self._api_url = ""
self._username = username
self._password = password
self.set_url(url)
def init_with_mqtt(self, mqtt_client):
self._api_url = ""
url, username, password = mqtt_client.get_server_info()
self.set_url('https://{}/tinydb'.format(url))
self._username = username
self._password = password
def update(self, key, value):
key = url_quote(str(key))
value = url_quote(str(value))
result = self._request("update", "tag={}&value={}".format(key, value))
if "status" in result and result["status"] == "error":
raise RuntimeError(result["message"])
def get(self, key):
key = url_quote(str(key))
result = self._request("get", "tag={}".format(key))
if "status" in result and result["status"] == "error":
raise RuntimeError(result["message"])
return result["value"]
def count(self):
result = self._request("count")
if "status" in result and result["status"] == "error":
raise RuntimeError(result["message"])
return int(result["count"])
def search(self, no=1, count=1, tag='', dtype='both'):
no = str(no)
count = str(count)
tag = url_quote(tag)
result = self._request("search", "no={}&count={}&tag={}&type={}".format(no, count, tag, dtype))
if "status" in result and result["status"] == "error":
raise RuntimeError(result["message"])
return result["data"]
def delete(self, key):
key = url_quote(str(key))
result = self._request("delete", "tag={}".format(key))
if "status" in result and result["status"] == "error":
raise RuntimeError(result["message"])
def set_url(self, url):
if url[-1] != '/':
url += '/'
self._api_url = url
def _request(self, op, param=""):
data = "user={}&secret={}&action={}".format(self._username, self._password, op)
if param:
data += '&' + param
try:
headers = {
"Content-Type": "application/x-www-form-urlencoded"
}
response = requests.post(self._api_url, data=data, headers=headers)
result = {}
if response.status_code == 200:
result = response.json()
response.close()
return result
except Exception as e:
raise RuntimeError("API request failed or WiFi is not connected", e)

View File

@@ -163,6 +163,7 @@ class FrameBuffer_Base(FrameBuffer):
self._buffer = buf
self._way = 1
self._speed = 100
self._miniqr = None
def show(self):
print("External inheritance is required to override this method")
@@ -551,3 +552,22 @@ class FrameBuffer_Uincode(FrameBuffer_Base):
x = buffer[1][0] * size + x + space
self.show()
time.sleep_ms(speed)
def qrcode(self, data, x=None, y=None, size=None, bold=0, type=None, correct=0, color=0xffff, bg_color=0x0, sync=True):
if self._miniqr is None:
from adafruit_miniqr import QRCode
self._miniqr = QRCode
_qr = self._miniqr(qr_type=type, error_correct=correct)
_qr.add_data(str(data))
_qr.make()
if sync: self.fill(bg_color, sync=False)
size = min(self.height // _qr.matrix.height, self.width // _qr.matrix.width) if size is None else size
x = (self.width - _qr.matrix.width * size) // 2 if x is None else x
y = (self.height - _qr.matrix.height * size) // 2 if y is None else y
for j in range(_qr.matrix.height):
for i in range(_qr.matrix.width):
if _qr.matrix[i, j]:
self.fill_rect(x + i * size, y + j * size, int(size + bold), int(size + bold), color, sync=False)
del _qr
gc.collect()
if sync: self.show()

View File

@@ -0,0 +1,32 @@
from ucollections import namedtuple
URI = namedtuple('URI', ('cheme', 'netloc', 'path', 'params', 'query', 'fragment'))
def quote(string, safe='_.-~+'):
""" A simple implementation of URL quoting"""
string = string.replace(' ', '+')
result = ""
for char in string:
if ('a' <= char <= 'z') or ('A' <= char <= 'Z') or ('0' <= char<= '9') or (char in safe):
result += char
else:
result += "%{:02X}".format(ord(char))
return result
def urlencode(query, safe='_.-~+'):
"""A simple urlencode function"""
return '&'.join('{}={}'.format(quote(k, safe), quote(v, safe)) for k, v in query.items())
def urlparse(url):
"""A simple urlparse (cheme, netloc, path, params, query, fragment)"""
parts = [''] * 6
for i, sep in enumerate(['://', '#', '?', ';']):
if sep in url:
left, right = url.split(sep, 1)
parts[i], url = left, right
if '/' in url:
parts[1], parts[2] = url.split('/', 1)
parts[2] = '/' + parts[2]
else:
parts[1] = url
return URI(*parts)

View File

@@ -0,0 +1,187 @@
# Characters valid in scheme names
scheme_chars = "abcdefghijklmnopqrstuvwxyz" "ABCDEFGHIJKLMNOPQRSTUVWXYZ" "0123456789" "+-."
# XXX: Consider replacing with functools.lru_cache
MAX_CACHE_SIZE = 20
_ALWAYS_SAFE = frozenset(b"ABCDEFGHIJKLMNOPQRSTUVWXYZ" b"abcdefghijklmnopqrstuvwxyz" b"0123456789" b"_.-")
_ALWAYS_SAFE_BYTES = bytes(_ALWAYS_SAFE)
_safe_quoters = {}
def clear_cache():
"""Clear the parse cache and the quoters cache."""
_safe_quoters.clear()
_hexdig = "0123456789ABCDEFabcdef"
_hextobyte = {(a + b).encode(): bytes([int(a + b, 16)]) for a in _hexdig for b in _hexdig}
def unquote_to_bytes(string):
"""unquote_to_bytes('abc%20def') -> b'abc def'."""
# Note: strings are encoded as UTF-8. This is only an issue if it contains
# unescaped non-ASCII characters, which URIs should not.
if not string:
# Is it a string-like object?
string.split
return b""
if isinstance(string, str):
string = string.encode("utf-8")
bits = string.split(b"%")
if len(bits) == 1:
return string
res = [bits[0]]
append = res.append
for item in bits[1:]:
try:
append(_hextobyte[item[:2]])
append(item[2:])
except KeyError:
append(b"%")
append(item)
return b"".join(res)
def split_on_non_ascii(s):
"""
Splits the input string wherever a character is not ASCII (ord(c) not in 0..127).
Returns a list of substrings and the non-ASCII characters as separate elements.
"""
result = []
current = []
for c in s:
if 0 <= ord(c) <= 127:
current.append(c)
else:
if current:
result.append("".join(current))
current = []
result.append(c)
if current:
result.append("".join(current))
return result
def unquote(string, encoding="utf-8", errors="replace"):
"""Replace %xx escapes by their single-character equivalent. The optional
encoding and errors parameters specify how to decode percent-encoded
sequences into Unicode characters, as accepted by the bytes.decode()
method.
By default, percent-encoded sequences are decoded with UTF-8, and invalid
sequences are replaced by a placeholder character.
unquote('abc%20def') -> 'abc def'.
"""
if "%" not in string:
string.split
return string
if encoding is None:
encoding = "utf-8"
if errors is None:
errors = "replace"
bits = split_on_non_ascii(string)
res = []
append = res.append
for i in range(0, len(bits), 2):
append(unquote_to_bytes(bits[i]).decode(encoding, errors))
if i + 1 < len(bits):
# Append the non-ASCII part as is
append(bits[i + 1])
return "".join(res)
class Quoter:
"""A mapping from bytes (in range(0,256)) to strings.
String values are percent-encoded byte values, unless the key < 128, and
in the "safe" set (either the specified safe set, or default set).
"""
# Keeps a cache internally, using defaultdict, for efficiency (lookups
# of cached keys don't call Python code at all).
def __init__(self, safe):
"""safe: bytes object."""
self.safe = _ALWAYS_SAFE.union(safe)
self.cache = {}
def get(self, b):
try:
return self.cache[b]
except KeyError:
# Handle a cache miss. Store quoted string in cache and return.
res = chr(b) if b in self.safe else "%{:02X}".format(b)
self.cache[b] = res
return res
def quote(string, safe="/", encoding=None, errors=None):
"""quote('abc def') -> 'abc%20def'
Each part of a URL, e.g. the path info, the query, etc., has a
different set of reserved characters that must be quoted.
RFC 2396 Uniform Resource Identifiers (URI): Generic Syntax lists
the following reserved characters.
reserved = ";" | "/" | "?" | ":" | "@" | "&" | "=" | "+" |
"$" | ","
Each of these characters is reserved in some component of a URL,
but not necessarily in all of them.
By default, the quote function is intended for quoting the path
section of a URL. Thus, it will not encode '/'. This character
is reserved, but in typical usage the quote function is being
called on a path where the existing slash characters are used as
reserved characters.
string and safe may be either str or bytes objects. encoding must
not be specified if string is a str.
The optional encoding and errors parameters specify how to deal with
non-ASCII characters, as accepted by the str.encode method.
By default, encoding='utf-8' (characters are encoded with UTF-8), and
errors='strict' (unsupported characters raise a UnicodeEncodeError).
"""
if isinstance(string, str):
if not string:
return string
if encoding is None:
encoding = "utf-8"
if errors is None:
errors = "strict"
string = string.encode(encoding, errors)
else:
if encoding is not None:
raise TypeError("quote() doesn't support 'encoding' for bytes")
if errors is not None:
raise TypeError("quote() doesn't support 'errors' for bytes")
return quote_from_bytes(string, safe)
def quote_from_bytes(bs, safe="/"):
"""Like quote(), but accepts a bytes object rather than a str, and does
not perform string-to-bytes encoding. It always returns an ASCII string.
quote_from_bytes(b'abc def\x3f') -> 'abc%20def%3f'
"""
if not isinstance(bs, (bytes, bytearray)):
raise TypeError("quote_from_bytes() expected bytes")
if not bs:
return ""
if isinstance(safe, str):
# Normalize 'safe' by converting to bytes and removing non-ASCII chars
safe = safe.encode("ascii", "ignore")
else:
safe = bytes([c for c in safe if c < 128])
if not bs.rstrip(_ALWAYS_SAFE_BYTES + safe):
return bs.decode()
try:
quoter = _safe_quoters[safe]
except KeyError as e:
_safe_quoters[safe] = quoter = Quoter(safe)
res = ""
for char in bs:
res += quoter.get(char)
return res

View File

@@ -0,0 +1,229 @@
''''''
import usocket as socket
import ubinascii as binascii
import urandom as random
import ustruct as struct
import urandom as random
from ucollections import namedtuple
# Opcodes
OP_CONT = const(0x0)
OP_TEXT = const(0x1)
OP_BYTES = const(0x2)
OP_CLOSE = const(0x8)
OP_PING = const(0x9)
OP_PONG = const(0xA)
# Close codes
CLOSE_OK = const(1000)
CLOSE_GOING_AWAY = const(1001)
CLOSE_PROTOCOL_ERROR = const(1002)
CLOSE_DATA_NOT_SUPPORTED = const(1003)
CLOSE_BAD_DATA = const(1007)
CLOSE_POLICY_VIOLATION = const(1008)
CLOSE_TOO_BIG = const(1009)
CLOSE_MISSING_EXTN = const(1010)
CLOSE_BAD_CONDITION = const(1011)
URI = namedtuple('URI', ('protocol', 'hostname', 'port', 'path'))
class NoDataException(Exception):
pass
class ConnectionClosed(Exception):
pass
def urlparse(uri):
# Split protocol and the rest
protocol, rest = uri.split('://', 1)
if '/' in rest:
hostname_port, path = rest.split('/', 1)
path = '/' + path
else:
hostname_port, path = rest, ''
if ':' in hostname_port:
hostname, port = hostname_port.rsplit(':', 1)
else:
hostname, port = hostname_port, None
if port is None:
port = 443 if protocol == 'wss' else 80
return URI(protocol, hostname, port, path)
class Websocket:
"""Basis of the Websocket protocol."""
is_client = False
def __init__(self, sock):
self.sock = sock
self.open = True
def __enter__(self):
return self
def __exit__(self, exc_type, exc, tb):
self.close()
def settimeout(self, timeout):
self.sock.settimeout(timeout)
def read_frame(self, max_size=None):
"""Read a frame from the socket"""
# Frame header
two_bytes = self.sock.read(2)
if not two_bytes:
raise NoDataException
byte1, byte2 = struct.unpack('!BB', two_bytes)
# Byte 1: FIN(1) _(1) _(1) _(1) OPCODE(4)
fin = bool(byte1 & 0x80)
opcode = byte1 & 0x0F
# Byte 2: MASK(1) LENGTH(7)
mask = bool(byte2 & (1 << 7))
length = byte2 & 0x7F
if length == 126: # Magic number, length header is 2 bytes
(length,) = struct.unpack('!H', self.sock.read(2))
elif length == 127: # Magic number, length header is 8 bytes
(length,) = struct.unpack('!Q', self.sock.read(8))
if mask: # Mask is 4 bytes
mask_bits = self.sock.read(4)
try:
data = self.sock.read(length)
except MemoryError:
# We can't receive this many bytes, close the socket
self.close(code=CLOSE_TOO_BIG)
return True, OP_CLOSE, None
if mask:
data = bytes(b ^ mask_bits[i % 4] for i, b in enumerate(data))
return fin, opcode, data
def write_frame(self, opcode, data=b''):
"""Write a frame to the socket"""
fin = True
mask = self.is_client # messages sent by client are masked
length = len(data)
# Frame header
# Byte 1: FIN(1) _(1) _(1) _(1) OPCODE(4)
byte1 = 0x80 if fin else 0
byte1 |= opcode
# Byte 2: MASK(1) LENGTH(7)
byte2 = 0x80 if mask else 0
if length < 126: # 126 is magic value to use 2-byte length header
byte2 |= length
self.sock.write(struct.pack('!BB', byte1, byte2))
elif length < (1 << 16): # Length fits in 2-bytes
byte2 |= 126 # Magic code
self.sock.write(struct.pack('!BBH', byte1, byte2, length))
elif length < (1 << 64):
byte2 |= 127 # Magic code
self.sock.write(struct.pack('!BBQ', byte1, byte2, length))
else:
raise ValueError()
if mask: # Mask is 4 bytes
mask_bits = struct.pack('!I', random.getrandbits(32))
self.sock.write(mask_bits)
data = bytes(b ^ mask_bits[i % 4] for i, b in enumerate(data))
self.sock.write(data)
def recv(self):
"""Receive data from the websocket"""
assert self.open
while self.open:
try:
fin, opcode, data = self.read_frame()
except NoDataException:
return ''
except ValueError:
self._close()
raise ConnectionClosed()
if not fin:
raise NotImplementedError()
if opcode == OP_TEXT:
return data.decode('utf-8')
elif opcode == OP_BYTES:
return data
elif opcode == OP_CLOSE:
self._close()
return
elif opcode == OP_PONG:
# Ignore this frame, keep waiting for a data frame
continue
elif opcode == OP_PING:
# We need to send a pong frame
self.write_frame(OP_PONG, data)
# And then wait to receive
continue
elif opcode == OP_CONT:
# This is a continuation of a previous frame
raise NotImplementedError(opcode)
else:
raise ValueError(opcode)
def send(self, buf):
"""Send data to the websocket."""
assert self.open
if isinstance(buf, str):
opcode = OP_TEXT
buf = buf.encode('utf-8')
elif isinstance(buf, bytes):
opcode = OP_BYTES
else:
raise TypeError()
self.write_frame(opcode, buf)
def close(self, code=CLOSE_OK, reason=''):
"""Close the websocket."""
if not self.open:
return
buf = struct.pack('!H', code) + reason.encode('utf-8')
self.write_frame(OP_CLOSE, buf)
self._close()
def _close(self):
self.open = False
self.sock.close()
class WebsocketClient(Websocket):
is_client = True
def connect(uri, headers=None):
"""Connect a websocket."""
uri = urlparse(uri)
assert uri
sock = socket.socket()
addr = socket.getaddrinfo(uri.hostname, uri.port)
sock.connect(addr[0][4])
if uri.protocol == 'wss':
import ssl as ussl
sock = ussl.wrap_socket(sock, server_hostname=uri.hostname)
def send_header(header, *args):
sock.write(header % args + '\r\n')
# Sec-WebSocket-Key is 16 bytes of random base64 encoded
key = binascii.b2a_base64(bytes(random.getrandbits(8) for _ in range(16)))[:-1]
send_header(b'GET %s HTTP/1.1', uri.path or '/')
send_header(b'Host: %s:%s', uri.hostname, uri.port)
send_header(b'Connection: Upgrade')
send_header(b'Upgrade: websocket')
send_header(b'Sec-WebSocket-Key: %s', key)
send_header(b'Sec-WebSocket-Version: 13')
send_header(b'Origin: http://{hostname}:{port}'.format(hostname=uri.hostname, port=uri.port))
if headers: # 注入自定义头
for k, v in headers.items():
send_header((k + ": " + v).encode())
send_header(b'')
header = sock.readline()[:-2]
assert header.startswith(b'HTTP/1.1 101 '), header
# We don't (currently) need these headers
# FIXME: should we check the return key?
while header:
header = sock.readline()[:-2]
return WebsocketClient(sock)