更新 mini支持讯飞中英文识别大模型,sant增加image对图像处理

This commit is contained in:
dahanzimin
2025-10-27 10:32:44 +08:00
parent f9d3427b20
commit d98b9fb0ed
4 changed files with 465 additions and 425 deletions

View File

@@ -45,7 +45,6 @@ class ASR_WebSocket(Ws_Param):
"domain": "iat", "domain": "iat",
"language": "zh_cn", "language": "zh_cn",
"accent": "mandarin", "accent": "mandarin",
"vinfo": 1,
"vad_eos": 1000, "vad_eos": 1000,
"nbest": 1, "nbest": 1,
"wbest": 1, "wbest": 1,
@@ -56,7 +55,10 @@ class ASR_WebSocket(Ws_Param):
self.ws.settimeout(1000) self.ws.settimeout(1000)
def _frame(self, status, buf): def _frame(self, status, buf):
return {"status": status, "format": "audio/L16;rate=8000", "audio": str(b64encode(buf), 'utf-8'), "encoding": "raw"} if status == 0:
return {"common": {"app_id": self.APPID}, "business": self.business, "data": {"status": status, "format": "audio/L16;rate=8000", "audio": str(b64encode(buf), 'utf-8'), "encoding": "raw"}}
else:
return {"data": {"status": status, "format": "audio/L16;rate=8000", "audio": str(b64encode(buf), 'utf-8'), "encoding": "raw"}}
def on_message(self, message): def on_message(self, message):
result = "" result = ""
@@ -98,17 +100,16 @@ class ASR_WebSocket(Ws_Param):
if pace: print('=',end ="") if pace: print('=',end ="")
# 第一帧处理 # 第一帧处理
if _state == 0: if _state == 0:
d = {"common": {"app_id": self.APPID}, "business": self.business, "data": self._frame(_state, buf)} d = self._frame(_state, buf)
_state = 1 _state = 1
# 中间帧处理 # 中间帧处理
else: else:
d = {"data": self._frame(_state, buf)} d = self._frame(_state, buf)
self.ws.send(json.dumps(d)) self.ws.send(json.dumps(d))
#print("------",len(buf), time.ticks_diff(time.ticks_ms(), _star))
if time.ticks_diff(time.ticks_ms(), _star) > timeout: if time.ticks_diff(time.ticks_ms(), _star) > timeout:
raise OSError("Timeout pcm read error") raise OSError("Timeout pcm read error")
# 最后一帧处理 # 最后一帧处理
d = {"data": self._frame(2, b'\x00')} d = self._frame(2, b'\x00')
self.ws.send(json.dumps(d)) self.ws.send(json.dumps(d))
onboard_bot.pcm_en(False) #PCM关闭 onboard_bot.pcm_en(False) #PCM关闭
if pace: print(']') if pace: print(']')
@@ -122,6 +123,45 @@ class ASR_WebSocket(Ws_Param):
else: else:
print("Run error: %s" % (e)) print("Run error: %s" % (e))
#中英识别大模型
class IAT_WebSocket(ASR_WebSocket):
def __init__(self, APPID, APIKey, APISecret, url='ws://iat.xf-yun.com/v1', accent="mandarin", res_id=None):
super().__init__(APPID, APIKey, APISecret, url)
self.res_id = res_id
self.business = {
"domain": "slm",
"language": "zh_cn",
"accent": accent,
"result": {
"encoding": "utf8",
"compress": "raw",
"format": "plain"
}
}
def _frame(self, status, buf):
if status == 0:
return {"header": {"status": status, "app_id": self.APPID, "res_id": self.res_id}, "parameter": {"iat": self.business}, "payload": {"audio": { "audio": str(b64encode(buf), 'utf-8'), "sample_rate": 8000, "encoding": "raw"}}}
else:
return {"header": {"status": status, "app_id": self.APPID, "res_id": self.res_id}, "payload": {"audio": { "audio": str(b64encode(buf), 'utf-8'), "sample_rate": 8000, "encoding": "raw"}}}
def on_message(self, message):
result = ""
msg = json.loads(message)
code = msg['header']["code"]
if code != 0:
raise AttributeError("%s Code:%s" % (msg['header']["message"], code))
else:
if "payload" in msg:
text = msg["payload"]["result"]["text"]
data = json.loads(b64decode(text).decode())['ws']
for i in data:
for w in i["cw"]:
result += w["w"]
if msg["header"]["status"]== 2:
return result, False
return result, True
#大模型 #大模型
class LLM_WebSocket(Ws_Param): class LLM_WebSocket(Ws_Param):
Model_url = { Model_url = {

View File

@@ -5,18 +5,23 @@ MicroPython library for the Camera(Inherit C module)
======================================================= =======================================================
@dahanzimin From the Mixly Team @dahanzimin From the Mixly Team
""" """
import time import time
import base64
import jpeg
from _camera import * from _camera import *
from jpeg import Encoder from base64 import b64encode
from machine import SoftI2C, Pin from machine import SoftI2C, Pin
from mixgo_sant import onboard_bot from jpeg import Encoder, Decoder
class IMG:
def __init__(self, image, width, height):
self.image = image
self.width = width
self.height = height
self.format = "RGB565"
class Camera(Camera): class Camera(Camera):
def __init__(self, frame_size=FrameSize.R240X240, pixel_format=PixelFormat.RGB565, hmirror=False, vflip=False, **kwargs): def __init__(self, frame_size=FrameSize.R240X240, pixel_format=PixelFormat.RGB565, hmirror=False, vflip=False, **kwargs):
from mixgo_sant import onboard_bot
onboard_bot.cam_reset(1, 0)
onboard_bot.cam_en(1, 150) onboard_bot.cam_en(1, 150)
super().__init__(frame_size=frame_size, pixel_format=pixel_format, **kwargs) super().__init__(frame_size=frame_size, pixel_format=pixel_format, **kwargs)
self.set_hmirror(not hmirror) self.set_hmirror(not hmirror)
@@ -27,21 +32,42 @@ class Camera(Camera):
def deinit(self): def deinit(self):
super().deinit() super().deinit()
onboard_bot.cam_reset(0, 0)
onboard_bot.cam_en(0, 100) onboard_bot.cam_en(0, 100)
def snapshot(self, path=None, formats=0, quality=90, rotation=0): def snapshot(self, path=None, quality=90, rotation=0):
if formats == 0 and path is None: if path is None:
return self.capture() return self.capture()
else: else:
_encoder = Encoder(pixel_format="RGB565_BE", quality=quality, rotation=rotation, width=self.get_pixel_width(), height=self.get_pixel_height()) Image.save(self.capture(), path, quality, rotation)
_jpeg = _encoder.encode(self.capture())
del _encoder def capture(self):
if path is None: return IMG(super().capture(), self.get_pixel_width(), self.get_pixel_height())
if formats == 1:
return _jpeg class Image:
else: def save(self, img, path="mixly.jpg", quality=90, rotation=0, **kwargs):
return b'data:image/jpg;base64,' + base64.b64encode(_jpeg) _encoder = Encoder(pixel_format="RGB565_BE", quality=quality, rotation=rotation, width=img.width, height=img.height, **kwargs)
else: _jpeg = _encoder.encode(img.image)
with open(path, 'wb') as f: del _encoder
f.write(_jpeg) if isinstance(path, str):
return True with open(path, 'wb') as f:
f.write(_jpeg)
else:
return _jpeg
def open(self, path="mixly.jpg", rotation=0, **kwargs):
with open(path, "rb") as f:
_jpeg = f.read()
_decoder = Decoder(pixel_format="RGB565_BE", rotation=rotation, **kwargs)
_info = _decoder.get_img_info(_jpeg)
_image = IMG(_decoder.decode(_jpeg), _info[0], _info[1])
del _decoder
return _image
def convert(self, img, formats=0, **kwargs):
if formats == 0:
return self.save(img, None, **kwargs)
elif formats == 1:
return b'data:image/jpg;base64,' + b64encode(self.save(img, None, **kwargs))
#图像处理
Image = Image()

View File

@@ -1,18 +1,16 @@
""" """
mixgo_sant Onboard resources(v1.9) mixgo_sant Onboard resources(v2.0)
Micropython library for the mixgo_sant Onboard resources Micropython library for the mixgo_sant Onboard resources
======================================================= =======================================================
@dahanzimin From the Mixly Team @dahanzimin From the Mixly Team
""" """
import gc
import time
import math
from machine import *
from music import MIDI from music import MIDI
from ws2812x import NeoPixel from ws2812x import NeoPixel
from machine import *
import time
import gc
import st7789_cf
import math
'''RTC''' '''RTC'''
rtc_clock = RTC() rtc_clock = RTC()
@@ -33,6 +31,7 @@ except Exception as e:
print("Warning: Failed to communicate with BOT035 (Coprocessor) or", e) print("Warning: Failed to communicate with BOT035 (Coprocessor) or", e)
'''TFT/240*240''' '''TFT/240*240'''
import st7789_cf
onboard_tft = st7789_cf.ST7789(onboard_spi, 240, 240, dc_pin=45, reset=onboard_bot.tft_reset, backlight=onboard_bot.tft_brightness, font_address=0xF00000) onboard_tft = st7789_cf.ST7789(onboard_spi, 240, 240, dc_pin=45, reset=onboard_bot.tft_reset, backlight=onboard_bot.tft_brightness, font_address=0xF00000)
'''ACC-Sensor''' '''ACC-Sensor'''
@@ -83,8 +82,6 @@ onboard_rgb = NeoPixel(onboard_bot.rgb_sync, 4)
onboard_music = MIDI(46, pa_ctrl=onboard_bot.spk_en) onboard_music = MIDI(46, pa_ctrl=onboard_bot.spk_en)
'''5KEY_Sensor''' '''5KEY_Sensor'''
class KEYSensor: class KEYSensor:
def __init__(self, pin, range): def __init__(self, pin, range):
self.pin = pin self.pin = pin
@@ -122,10 +119,7 @@ class KEYSensor:
def irq(self, handler, trigger): def irq(self, handler, trigger):
Pin(self.pin, Pin.IN).irq(handler=handler, trigger=trigger) Pin(self.pin, Pin.IN).irq(handler=handler, trigger=trigger)
'''1KEY_Button''' '''1KEY_Button'''
class Button(KEYSensor): class Button(KEYSensor):
def __init__(self, pin): def __init__(self, pin):
self.pin = pin self.pin = pin
@@ -135,7 +129,6 @@ class Button(KEYSensor):
def _value(self): def _value(self):
return not self.key.value() return not self.key.value()
B1key = Button(0) B1key = Button(0)
B2key = KEYSensor(17, 0) B2key = KEYSensor(17, 0)
A1key = KEYSensor(17, 1600) A1key = KEYSensor(17, 1600)
@@ -144,8 +137,6 @@ A3key = KEYSensor(17, 550)
A4key = KEYSensor(17, 2100) A4key = KEYSensor(17, 2100)
'''2-LED''' '''2-LED'''
class LED: class LED:
def __init__(self, func): def __init__(self, func):
self._func = func self._func = func
@@ -168,10 +159,8 @@ class LED:
def getonoff(self, index): def getonoff(self, index):
return True if self.getbrightness(index) > 50 else False return True if self.getbrightness(index) > 50 else False
onboard_led = LED(onboard_bot.led_pwm) onboard_led = LED(onboard_bot.led_pwm)
class Voice_Energy: class Voice_Energy:
def read(self, samples=10): def read(self, samples=10):
values = [] values = []
@@ -180,10 +169,8 @@ class Voice_Energy:
0x08, 3)[:2], 'little')) # 在语音识别里获取 0x08, 3)[:2], 'little')) # 在语音识别里获取
return sorted(values)[samples // 2] return sorted(values)[samples // 2]
onboard_sound = Voice_Energy() onboard_sound = Voice_Energy()
class Clock: class Clock:
def __init__(self, x, y, radius, color, oled=onboard_tft): # 定义时钟中心点和半径 def __init__(self, x, y, radius, color, oled=onboard_tft): # 定义时钟中心点和半径
self.display = oled self.display = oled
@@ -254,6 +241,5 @@ class Clock:
def clear(self, color=0): # 清除 def clear(self, color=0): # 清除
self.display.ellipse(self.xc, self.yc, self.r, self.r, color, True) self.display.ellipse(self.xc, self.yc, self.r, self.r, color, True)
'''Reclaim memory''' '''Reclaim memory'''
gc.collect() gc.collect()

View File

@@ -8,24 +8,16 @@ MicroPython library for the ST7789(TFT-SPI)
import time import time
import uframebuf import uframebuf
from machine import Pin from machine import Pin
from jpeg import Decoder from camera import Image, IMG
from micropython import const from micropython import const
_CMD_SWRESET = const(0x01) _CMD_SWRESET = const(0x01)
_CMD_SLPIN = const(0x10)
_CMD_SLPOUT = const(0x11) _CMD_SLPOUT = const(0x11)
_CMD_PTLON = const(0x12)
_CMD_NORON = const(0x13)
_CMD_INVOFF = const(0x20)
_CMD_INVON = const(0x21) _CMD_INVON = const(0x21)
_CMD_DISPOFF = const(0x28)
_CMD_DISPON = const(0x29) _CMD_DISPON = const(0x29)
_CMD_CASET = const(0x2A) _CMD_CASET = const(0x2A)
_CMD_RASET = const(0x2B) _CMD_RASET = const(0x2B)
_CMD_RAMWR = const(0x2C) _CMD_RAMWR = const(0x2C)
_CMD_RAMRD = const(0x2E)
_CMD_PTLAR = const(0x30)
_CMD_VSCRDEF = const(0x33)
_CMD_COLMOD = const(0x3A) _CMD_COLMOD = const(0x3A)
_CMD_MADCTL = const(0x36) _CMD_MADCTL = const(0x36)
@@ -44,16 +36,12 @@ class ST7789(uframebuf.FrameBuffer_Uincode):
def display(self, data=None, rotation=0, sync=True): def display(self, data=None, rotation=0, sync=True):
if type(data) is str: if type(data) is str:
with open(data, "rb") as f: data = Image.open(data, rotation)
_jpeg = f.read() self._buffer[:] = data.image # 后期做图像尺寸匹配处理
_decoder = Decoder(pixel_format="RGB565_BE", rotation=rotation) if sync: self.show()
self._buffer[:] = _decoder.decode(_jpeg)
del _decoder def screenshot(self):
else: return IMG(memoryview(self._buffer), self.width, self.height)
self._buffer[:] = data # 后期做图像大小处理
if sync:
self.show()
return self._buffer
def _write(self, cmd, dat=None): def _write(self, cmd, dat=None):
self.dc.off() self.dc.off()