From 59a1adf0b6df2274bef6a0f213c5e5daaa514040 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=8E=8B=E7=AB=8B=E5=B8=AE?= <3294713004@qq.com> Date: Sun, 28 Sep 2025 16:40:36 +0800 Subject: [PATCH] =?UTF-8?q?feat(boards):=20micropython=E4=B8=8B=E6=B7=BB?= =?UTF-8?q?=E5=8A=A0TinyWebDB?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../micropython/origin/build/lib/mixiot.py | 62 ++++++++++++++++++- 1 file changed, 60 insertions(+), 2 deletions(-) diff --git a/boards/default_src/micropython/origin/build/lib/mixiot.py b/boards/default_src/micropython/origin/build/lib/mixiot.py index 5d2abcfd..36112150 100644 --- a/boards/default_src/micropython/origin/build/lib/mixiot.py +++ b/boards/default_src/micropython/origin/build/lib/mixiot.py @@ -1,5 +1,6 @@ import time import usocket as socket +import urequests as requests import ustruct as struct from machine import unique_id from ubinascii import hexlify @@ -40,9 +41,8 @@ def image_base64(path="mixly.jpg"): return b'data:image/jpg;base64,' + b64encode(path) def ntp(url='mixio.mixly.cn'): - import urequests try: - results=eval(urequests.get('http://{}/time.php'.format(url)).text) + results=eval(requests.get('http://{}/time.php'.format(url)).text) except Exception as e: raise RuntimeError("API request failed or WiFi is not connected",e) return results @@ -341,3 +341,61 @@ class MQTTClient: self._star_time = time.ticks_ms() self.ping() return self.wait_msg() + +class TinyWebDB: + def __init__(self, url, username, password): + if url[-1] != '/': + url += '/' + self._api_url = url + self._username = username + self._password = password + + def update(self, key, value): + key = str(key) + value = str(value) + result = self._request("update", "tag={}&value={}".format(key, value)) + if result["status"] == "error": + raise RuntimeError(result["message"]) + + def get(self, key): + key = str(key) + result = self._request("get", "tag={}".format(key)) + if result["status"] == "error": + raise RuntimeError(result["message"]) + return result["value"] + + def count(self): + result = self._request("count") + if result["status"] == "error": + raise RuntimeError(result["message"]) + return int(result["count"]) + + def search(self, count): + count = str(count) + result = self._request("search", "count={}".format(count)) + if result["status"] == "error": + raise RuntimeError(result["message"]) + return result["data"] + + def delete(self, key): + key = str(key) + result = self._request("delete", "tag={}".format(key)) + if result["status"] == "error": + raise RuntimeError(result["message"]) + + def _request(self, op, param=""): + data = "user={}&secret={}&action={}".format(self._username, self._password, op) + if param: + data += '&' + param + try: + headers = { + "Content-Type": "application/x-www-form-urlencoded" + } + response = requests.post(self._api_url, data=data, headers=headers) + result = {} + if response.status_code == 200: + result = response.json() + response.close() + return result + except Exception as e: + raise RuntimeError("API request failed or WiFi is not connected", e) \ No newline at end of file