feat(boards): micropython下添加TinyWebDB

This commit is contained in:
王立帮
2025-09-28 16:40:36 +08:00
parent 851bb60057
commit 59a1adf0b6

View File

@@ -1,5 +1,6 @@
import time import time
import usocket as socket import usocket as socket
import urequests as requests
import ustruct as struct import ustruct as struct
from machine import unique_id from machine import unique_id
from ubinascii import hexlify from ubinascii import hexlify
@@ -40,9 +41,8 @@ def image_base64(path="mixly.jpg"):
return b'data:image/jpg;base64,' + b64encode(path) return b'data:image/jpg;base64,' + b64encode(path)
def ntp(url='mixio.mixly.cn'): def ntp(url='mixio.mixly.cn'):
import urequests
try: try:
results=eval(urequests.get('http://{}/time.php'.format(url)).text) results=eval(requests.get('http://{}/time.php'.format(url)).text)
except Exception as e: except Exception as e:
raise RuntimeError("API request failed or WiFi is not connected",e) raise RuntimeError("API request failed or WiFi is not connected",e)
return results return results
@@ -341,3 +341,61 @@ class MQTTClient:
self._star_time = time.ticks_ms() self._star_time = time.ticks_ms()
self.ping() self.ping()
return self.wait_msg() return self.wait_msg()
class TinyWebDB:
def __init__(self, url, username, password):
if url[-1] != '/':
url += '/'
self._api_url = url
self._username = username
self._password = password
def update(self, key, value):
key = str(key)
value = str(value)
result = self._request("update", "tag={}&value={}".format(key, value))
if result["status"] == "error":
raise RuntimeError(result["message"])
def get(self, key):
key = str(key)
result = self._request("get", "tag={}".format(key))
if result["status"] == "error":
raise RuntimeError(result["message"])
return result["value"]
def count(self):
result = self._request("count")
if result["status"] == "error":
raise RuntimeError(result["message"])
return int(result["count"])
def search(self, count):
count = str(count)
result = self._request("search", "count={}".format(count))
if result["status"] == "error":
raise RuntimeError(result["message"])
return result["data"]
def delete(self, key):
key = str(key)
result = self._request("delete", "tag={}".format(key))
if result["status"] == "error":
raise RuntimeError(result["message"])
def _request(self, op, param=""):
data = "user={}&secret={}&action={}".format(self._username, self._password, op)
if param:
data += '&' + param
try:
headers = {
"Content-Type": "application/x-www-form-urlencoded"
}
response = requests.post(self._api_url, data=data, headers=headers)
result = {}
if response.status_code == 200:
result = response.json()
response.close()
return result
except Exception as e:
raise RuntimeError("API request failed or WiFi is not connected", e)