97 lines
3.0 KiB
Python
97 lines
3.0 KiB
Python
"""
|
|
AGS10
|
|
|
|
Micropython library for the AGS10(TVOC)
|
|
=======================================================
|
|
@dahanzimin From the Mixly Team
|
|
"""
|
|
import time
|
|
from micropython import const
|
|
|
|
_AGS10_ADC = const(0x00)
|
|
_AGS10_CAL = const(0x01)
|
|
_AGS10_RES = const(0x20)
|
|
_AGS10_ADD = const(0x21)
|
|
|
|
class AGS10:
|
|
def __init__(self, i2c_bus, address=0x1A, delay=1000):
|
|
self._i2c = i2c_bus
|
|
self._addr = address
|
|
self._voc = None
|
|
self._delay = delay
|
|
self._star = 0
|
|
|
|
def _crc8(self, buf, is_byte=False):
|
|
'''Perform CRC check on the data'''
|
|
crc = 0xff
|
|
for byte in buf:
|
|
crc ^= byte
|
|
for _ in range(8):
|
|
if crc & 0x80:
|
|
crc = (crc << 1) ^ 0x31
|
|
else:
|
|
crc = crc << 1
|
|
return crc.to_bytes(1, 'little') if is_byte else crc & 0xff
|
|
|
|
def _wreg(self, reg, buf):
|
|
'''Write memory address'''
|
|
self._i2c.writeto_mem(self._addr, reg, buf)
|
|
time.sleep_ms(20)
|
|
|
|
def _rreg(self, reg, nbytes=5):
|
|
'''Read memory address'''
|
|
_buf = self._i2c.readfrom_mem(self._addr, reg, nbytes)
|
|
time.sleep_ms(20)
|
|
return _buf[:4] if self._crc8(_buf[:4]) == _buf[4] else None
|
|
|
|
def address(self, addr=None):
|
|
_scan = self._i2c.scan()
|
|
if addr is None:
|
|
if self._addr in _scan:
|
|
return self._addr
|
|
elif len(_scan) == 1:
|
|
self._addr = _scan[0]
|
|
return self._addr
|
|
else:
|
|
return None
|
|
else:
|
|
if self.address():
|
|
if 8<= addr <=119:
|
|
_buf = bytes([addr, (~ addr) & 0xff] * 2)
|
|
self._wreg(_AGS10_ADD, _buf + self._crc8(_buf, True))
|
|
if addr in self._i2c.scan():
|
|
self._addr = addr
|
|
return True
|
|
else:
|
|
raise ValueError("Not within the valid range of 8-119")
|
|
|
|
def calibration(self, zero=1):
|
|
'''0: Factory restoration 1:Sensor resistance'''
|
|
_buf = b'\x00\x0c'
|
|
_buf += b'\x00\x00' if zero else b'\xff\xff'
|
|
self._wreg(_AGS10_ADD, _buf + self._crc8(_buf, True))
|
|
|
|
@property
|
|
def getdata(self):
|
|
if time.ticks_diff(time.ticks_ms(), self._star) >= self._delay:
|
|
self._star = time.ticks_ms()
|
|
_buf = self._rreg(_AGS10_ADC)
|
|
if (_buf[0] & 0x01) == 0:
|
|
self._voc = int.from_bytes(_buf[1:4], 'big')
|
|
return self._voc / 1000
|
|
|
|
def read(self, hcho_mw=30.033 , co2_mv=0.853, co2_base=400):
|
|
'''unit ppm'''
|
|
self.getdata
|
|
_voc = self._voc / 1000 #ppm
|
|
return round( _voc, 2), round( _voc / hcho_mw, 2), co2_base + round( _voc / co2_mv, 2)
|
|
|
|
def tvoc(self):
|
|
return self.read()[0]
|
|
|
|
def hcho(self, tvoc_mw=30.053):
|
|
return self.read(hcho_mw=hcho_mw)[1]
|
|
|
|
def eco2(self, co2_mv=0.853):
|
|
return self.read(co2_mv=co2_mv)[2]
|