初始化提交

This commit is contained in:
王立帮
2024-07-20 22:09:06 +08:00
commit c247dd07a6
6876 changed files with 2743096 additions and 0 deletions

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,503 @@
/*
MultiDelegate.h - A queue or event multiplexer based on the efficient Delegate
class
Copyright (c) 2019 Dirk O. Kaar. All rights reserved.
This library is free software; you can redistribute it and/or
modify it under the terms of the GNU Lesser General Public
License as published by the Free Software Foundation; either
version 2.1 of the License, or (at your option) any later version.
This library is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public
License along with this library; if not, write to the Free Software
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
*/
#ifndef __MULTIDELEGATE_H
#define __MULTIDELEGATE_H
#if defined(ESP8266) || defined(ESP32) || !defined(ARDUINO)
#include <atomic>
#else
#include "circular_queue/ghostl.h"
#endif
#if defined(ESP8266)
#include <interrupts.h>
using esp8266::InterruptLock;
#elif defined(ARDUINO)
class InterruptLock {
public:
InterruptLock() {
noInterrupts();
}
~InterruptLock() {
interrupts();
}
};
#else
#include <mutex>
#endif
namespace detail
{
namespace
{
template< typename Delegate, typename R, bool ISQUEUE = false, typename... P>
struct CallP
{
static R execute(Delegate& del, P... args)
{
return del(std::forward<P...>(args...)) ? !ISQUEUE : ISQUEUE;
}
};
template< typename Delegate, bool ISQUEUE, typename... P>
struct CallP<Delegate, void, ISQUEUE, P...>
{
static bool execute(Delegate& del, P... args)
{
del(std::forward<P...>(args...));
return !ISQUEUE;
}
};
template< typename Delegate, typename R, bool ISQUEUE = false>
struct Call
{
static R execute(Delegate& del)
{
return del() ? !ISQUEUE : ISQUEUE;
}
};
template< typename Delegate, bool ISQUEUE>
struct Call<Delegate, void, ISQUEUE>
{
static bool execute(Delegate& del)
{
del();
return !ISQUEUE;
}
};
};
template< typename Delegate, typename R = void, bool ISQUEUE = false, uint32_t QUEUE_CAPACITY = 32, typename... P>
class MultiDelegatePImpl
{
public:
MultiDelegatePImpl() = default;
~MultiDelegatePImpl()
{
*this = nullptr;
}
MultiDelegatePImpl(const MultiDelegatePImpl&) = delete;
MultiDelegatePImpl& operator=(const MultiDelegatePImpl&) = delete;
MultiDelegatePImpl(MultiDelegatePImpl&& md)
{
first = md.first;
last = md.last;
unused = md.unused;
nodeCount = md.nodeCount;
md.first = nullptr;
md.last = nullptr;
md.unused = nullptr;
md.nodeCount = 0;
}
MultiDelegatePImpl(const Delegate& del)
{
add(del);
}
MultiDelegatePImpl(Delegate&& del)
{
add(std::move(del));
}
MultiDelegatePImpl& operator=(MultiDelegatePImpl&& md)
{
first = md.first;
last = md.last;
unused = md.unused;
nodeCount = md.nodeCount;
md.first = nullptr;
md.last = nullptr;
md.unused = nullptr;
md.nodeCount = 0;
return *this;
}
MultiDelegatePImpl& operator=(std::nullptr_t)
{
if (last)
last->mNext = unused;
if (first)
unused = first;
while (unused)
{
auto to_delete = unused;
unused = unused->mNext;
delete(to_delete);
}
return *this;
}
MultiDelegatePImpl& operator+=(const Delegate& del)
{
add(del);
return *this;
}
MultiDelegatePImpl& operator+=(Delegate&& del)
{
add(std::move(del));
return *this;
}
protected:
struct Node_t
{
~Node_t()
{
mDelegate = nullptr; // special overload in Delegate
}
Node_t* mNext = nullptr;
Delegate mDelegate;
};
Node_t* first = nullptr;
Node_t* last = nullptr;
Node_t* unused = nullptr;
uint32_t nodeCount = 0;
// Returns a pointer to an unused Node_t,
// or if none are available allocates a new one,
// or nullptr if limit is reached
Node_t* IRAM_ATTR get_node_unsafe()
{
Node_t* result = nullptr;
// try to get an item from unused items list
if (unused)
{
result = unused;
unused = unused->mNext;
}
// if no unused items, and count not too high, allocate a new one
else if (nodeCount < QUEUE_CAPACITY)
{
#if defined(ESP8266) || defined(ESP32)
result = new (std::nothrow) Node_t;
#else
result = new Node_t;
#endif
if (result)
++nodeCount;
}
return result;
}
void recycle_node_unsafe(Node_t* node)
{
node->mDelegate = nullptr; // special overload in Delegate
node->mNext = unused;
unused = node;
}
#ifndef ARDUINO
std::mutex mutex_unused;
#endif
public:
const Delegate* IRAM_ATTR add(const Delegate& del)
{
return add(Delegate(del));
}
const Delegate* IRAM_ATTR add(Delegate&& del)
{
if (!del)
return nullptr;
#ifdef ARDUINO
InterruptLock lockAllInterruptsInThisScope;
#else
std::lock_guard<std::mutex> lock(mutex_unused);
#endif
Node_t* item = ISQUEUE ? get_node_unsafe() :
#if defined(ESP8266) || defined(ESP32)
new (std::nothrow) Node_t;
#else
new Node_t;
#endif
if (!item)
return nullptr;
item->mDelegate = std::move(del);
item->mNext = nullptr;
if (last)
last->mNext = item;
else
first = item;
last = item;
return &item->mDelegate;
}
bool remove(const Delegate* del)
{
auto current = first;
if (!current)
return false;
Node_t* prev = nullptr;
do
{
if (del == &current->mDelegate)
{
// remove callback from stack
#ifdef ARDUINO
InterruptLock lockAllInterruptsInThisScope;
#else
std::lock_guard<std::mutex> lock(mutex_unused);
#endif
auto to_recycle = current;
// removing rLast
if (last == current)
last = prev;
current = current->mNext;
if (prev)
{
prev->mNext = current;
}
else
{
first = current;
}
if (ISQUEUE)
recycle_node_unsafe(to_recycle);
else
delete to_recycle;
return true;
}
else
{
prev = current;
current = current->mNext;
}
} while (current);
return false;
}
void operator()(P... args)
{
auto current = first;
if (!current)
return;
static std::atomic<bool> fence(false);
// prevent recursive calls
#if defined(ARDUINO) && !defined(ESP32)
if (fence.load()) return;
fence.store(true);
#else
if (fence.exchange(true)) return;
#endif
Node_t* prev = nullptr;
// prevent execution of new callbacks during this run
auto stop = last;
bool done;
do
{
done = current == stop;
if (!CallP<Delegate, R, ISQUEUE, P...>::execute(current->mDelegate, args...))
{
// remove callback from stack
#ifdef ARDUINO
InterruptLock lockAllInterruptsInThisScope;
#else
std::lock_guard<std::mutex> lock(mutex_unused);
#endif
auto to_recycle = current;
// removing rLast
if (last == current)
last = prev;
current = current->mNext;
if (prev)
{
prev->mNext = current;
}
else
{
first = current;
}
if (ISQUEUE)
recycle_node_unsafe(to_recycle);
else
delete to_recycle;
}
else
{
prev = current;
current = current->mNext;
}
#if defined(ESP8266) || defined(ESP32)
// running callbacks might last too long for watchdog etc.
optimistic_yield(10000);
#endif
} while (current && !done);
fence.store(false);
}
};
template< typename Delegate, typename R = void, bool ISQUEUE = false, uint32_t QUEUE_CAPACITY = 32>
class MultiDelegateImpl : public MultiDelegatePImpl<Delegate, R, ISQUEUE, QUEUE_CAPACITY>
{
protected:
using typename MultiDelegatePImpl<Delegate, R, ISQUEUE, QUEUE_CAPACITY>::Node_t;
using MultiDelegatePImpl<Delegate, R, ISQUEUE, QUEUE_CAPACITY>::first;
using MultiDelegatePImpl<Delegate, R, ISQUEUE, QUEUE_CAPACITY>::last;
using MultiDelegatePImpl<Delegate, R, ISQUEUE, QUEUE_CAPACITY>::unused;
using MultiDelegatePImpl<Delegate, R, ISQUEUE, QUEUE_CAPACITY>::nodeCount;
using MultiDelegatePImpl<Delegate, R, ISQUEUE, QUEUE_CAPACITY>::recycle_node_unsafe;
#ifndef ARDUINO
using MultiDelegatePImpl<Delegate, R, ISQUEUE, QUEUE_CAPACITY>::mutex_unused;
#endif
public:
using MultiDelegatePImpl<Delegate, R, ISQUEUE, QUEUE_CAPACITY>::MultiDelegatePImpl;
void operator()()
{
auto current = first;
if (!current)
return;
static std::atomic<bool> fence(false);
// prevent recursive calls
#if defined(ARDUINO) && !defined(ESP32)
if (fence.load()) return;
fence.store(true);
#else
if (fence.exchange(true)) return;
#endif
Node_t* prev = nullptr;
// prevent execution of new callbacks during this run
auto stop = last;
bool done;
do
{
done = current == stop;
if (!Call<Delegate, R, ISQUEUE>::execute(current->mDelegate))
{
// remove callback from stack
#ifdef ARDUINO
InterruptLock lockAllInterruptsInThisScope;
#else
std::lock_guard<std::mutex> lock(mutex_unused);
#endif
auto to_recycle = current;
// removing rLast
if (last == current)
last = prev;
current = current->mNext;
if (prev)
{
prev->mNext = current;
}
else
{
first = current;
}
if (ISQUEUE)
recycle_node_unsafe(to_recycle);
else
delete to_recycle;
}
else
{
prev = current;
current = current->mNext;
}
#if defined(ESP8266) || defined(ESP32)
// running callbacks might last too long for watchdog etc.
optimistic_yield(10000);
#endif
} while (current && !done);
fence.store(false);
}
};
template< typename Delegate, typename R, bool ISQUEUE, uint32_t QUEUE_CAPACITY, typename... P> class MultiDelegate;
template< typename Delegate, typename R, bool ISQUEUE, uint32_t QUEUE_CAPACITY, typename... P>
class MultiDelegate<Delegate, R(P...), ISQUEUE, QUEUE_CAPACITY> : public MultiDelegatePImpl<Delegate, R, ISQUEUE, QUEUE_CAPACITY, P...>
{
public:
using MultiDelegatePImpl<Delegate, R, ISQUEUE, QUEUE_CAPACITY, P...>::MultiDelegatePImpl;
};
template< typename Delegate, typename R, bool ISQUEUE, uint32_t QUEUE_CAPACITY>
class MultiDelegate<Delegate, R(), ISQUEUE, QUEUE_CAPACITY> : public MultiDelegateImpl<Delegate, R, ISQUEUE, QUEUE_CAPACITY>
{
public:
using MultiDelegateImpl<Delegate, R, ISQUEUE, QUEUE_CAPACITY>::MultiDelegateImpl;
};
};
/**
The MultiDelegate class template can be specialized to either a queue or an event multiplexer.
It is designed to be used with Delegate, the efficient runtime wrapper for C function ptr and C++ std::function.
@tparam Delegate specifies the concrete type that MultiDelegate bases the queue or event multiplexer on.
@tparam ISQUEUE modifies the generated MultiDelegate class in subtle ways. In queue mode (ISQUEUE == true),
the value of QUEUE_CAPACITY enforces the maximum number of simultaneous items the queue can contain.
This is exploited to minimize the use of new and delete by reusing already allocated items, thus
reducing heap fragmentation. In event multiplexer mode (ISQUEUE = false), new and delete are
used for allocation of the event handler items.
If the result type of the function call operator of Delegate is void, calling a MultiDelegate queue
removes each item after calling it; a Multidelegate event multiplexer keeps event handlers until
explicitly removed.
If the result type of the function call operator of Delegate is non-void, the type-conversion to bool
of that result determines if the item is immediately removed or kept after each call: a Multidelegate
queue removes an item only if true is returned, but a Multidelegate event multiplexer removes event
handlers that return false.
@tparam QUEUE_CAPACITY is only used if ISQUEUE == true. Then, it sets the maximum capacity that the queue dynamically
allocates from the heap. Unused items are not returned to the heap, but are managed by the MultiDelegate
instance during its own lifetime for efficiency.
*/
template< typename Delegate, bool ISQUEUE = false, uint32_t QUEUE_CAPACITY = 32>
class MultiDelegate : public detail::MultiDelegate<Delegate, typename Delegate::target_type, ISQUEUE, QUEUE_CAPACITY>
{
public:
using detail::MultiDelegate<Delegate, typename Delegate::target_type, ISQUEUE, QUEUE_CAPACITY>::MultiDelegate;
};
#endif // __MULTIDELEGATE_H

View File

@@ -0,0 +1,399 @@
/*
circular_queue.h - Implementation of a lock-free circular queue for EspSoftwareSerial.
Copyright (c) 2019 Dirk O. Kaar. All rights reserved.
This library is free software; you can redistribute it and/or
modify it under the terms of the GNU Lesser General Public
License as published by the Free Software Foundation; either
version 2.1 of the License, or (at your option) any later version.
This library is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public
License along with this library; if not, write to the Free Software
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
*/
#ifndef __circular_queue_h
#define __circular_queue_h
#ifdef ARDUINO
#include <Arduino.h>
#endif
#if defined(ESP8266) || defined(ESP32) || !defined(ARDUINO)
#include <atomic>
#include <memory>
#include <algorithm>
#include "Delegate.h"
using std::min;
#else
#include "ghostl.h"
#endif
#if !defined(ESP32) && !defined(ESP8266)
#define ICACHE_RAM_ATTR
#define IRAM_ATTR
#endif
/*!
@brief Instance class for a single-producer, single-consumer circular queue / ring buffer (FIFO).
This implementation is lock-free between producer and consumer for the available(), peek(),
pop(), and push() type functions.
*/
template< typename T, typename ForEachArg = void >
class circular_queue
{
public:
/*!
@brief Constructs a valid, but zero-capacity dummy queue.
*/
circular_queue() : m_bufSize(1)
{
m_inPos.store(0);
m_outPos.store(0);
}
/*!
@brief Constructs a queue of the given maximum capacity.
*/
circular_queue(const size_t capacity) : m_bufSize(capacity + 1), m_buffer(new T[m_bufSize])
{
m_inPos.store(0);
m_outPos.store(0);
}
circular_queue(circular_queue&& cq) :
m_bufSize(cq.m_bufSize), m_buffer(cq.m_buffer), m_inPos(cq.m_inPos.load()), m_outPos(cq.m_outPos.load())
{}
~circular_queue()
{
m_buffer.reset();
}
circular_queue(const circular_queue&) = delete;
circular_queue& operator=(circular_queue&& cq)
{
m_bufSize = cq.m_bufSize;
m_buffer = cq.m_buffer;
m_inPos.store(cq.m_inPos.load());
m_outPos.store(cq.m_outPos.load());
}
circular_queue& operator=(const circular_queue&) = delete;
/*!
@brief Get the numer of elements the queue can hold at most.
*/
size_t capacity() const
{
return m_bufSize - 1;
}
/*!
@brief Resize the queue. The available elements in the queue are preserved.
This is not lock-free and concurrent producer or consumer access
will lead to corruption.
@return True if the new capacity could accommodate the present elements in
the queue, otherwise nothing is done and false is returned.
*/
bool capacity(const size_t cap);
/*!
@brief Discard all data in the queue.
*/
void flush()
{
m_outPos.store(m_inPos.load());
}
/*!
@brief Get a snapshot number of elements that can be retrieved by pop.
*/
size_t available() const
{
int avail = static_cast<int>(m_inPos.load() - m_outPos.load());
if (avail < 0) avail += m_bufSize;
return avail;
}
/*!
@brief Get the remaining free elementes for pushing.
*/
size_t available_for_push() const
{
int avail = static_cast<int>(m_outPos.load() - m_inPos.load()) - 1;
if (avail < 0) avail += m_bufSize;
return avail;
}
/*!
@brief Peek at the next element pop will return without removing it from the queue.
@return An rvalue copy of the next element that can be popped. If the queue is empty,
return an rvalue copy of the element that is pending the next push.
*/
T peek() const
{
const auto outPos = m_outPos.load(std::memory_order_relaxed);
std::atomic_thread_fence(std::memory_order_acquire);
return m_buffer[outPos];
}
/*!
@brief Peek at the next pending input value.
@return A reference to the next element that can be pushed.
*/
T& IRAM_ATTR pushpeek()
{
const auto inPos = m_inPos.load(std::memory_order_relaxed);
std::atomic_thread_fence(std::memory_order_acquire);
return m_buffer[inPos];
}
/*!
@brief Release the next pending input value, accessible by pushpeek(), into the queue.
@return true if the queue accepted the value, false if the queue
was full.
*/
bool IRAM_ATTR push();
/*!
@brief Move the rvalue parameter into the queue.
@return true if the queue accepted the value, false if the queue
was full.
*/
bool IRAM_ATTR push(T&& val);
/*!
@brief Push a copy of the parameter into the queue.
@return true if the queue accepted the value, false if the queue
was full.
*/
bool IRAM_ATTR push(const T& val)
{
return push(T(val));
}
#if defined(ESP8266) || defined(ESP32) || !defined(ARDUINO)
/*!
@brief Push copies of multiple elements from a buffer into the queue,
in order, beginning at buffer's head.
@return The number of elements actually copied into the queue, counted
from the buffer head.
*/
size_t push_n(const T* buffer, size_t size);
#endif
/*!
@brief Pop the next available element from the queue.
@return An rvalue copy of the popped element, or a default
value of type T if the queue is empty.
*/
T pop();
#if defined(ESP8266) || defined(ESP32) || !defined(ARDUINO)
/*!
@brief Pop multiple elements in ordered sequence from the queue to a buffer.
If buffer is nullptr, simply discards up to size elements from the queue.
@return The number of elements actually popped from the queue to
buffer.
*/
size_t pop_n(T* buffer, size_t size);
#endif
/*!
@brief Iterate over and remove each available element from queue,
calling back fun with an rvalue reference of every single element.
*/
#if defined(ESP8266) || defined(ESP32) || !defined(ARDUINO)
void for_each(const Delegate<void(T&&), ForEachArg>& fun);
#else
void for_each(Delegate<void(T&&), ForEachArg> fun);
#endif
/*!
@brief In reverse order, iterate over, pop and optionally requeue each available element from the queue,
calling back fun with a reference of every single element.
Requeuing is dependent on the return boolean of the callback function. If it
returns true, the requeue occurs.
*/
#if defined(ESP8266) || defined(ESP32) || !defined(ARDUINO)
bool for_each_rev_requeue(const Delegate<bool(T&), ForEachArg>& fun);
#else
bool for_each_rev_requeue(Delegate<bool(T&), ForEachArg> fun);
#endif
protected:
const T defaultValue = {};
unsigned m_bufSize;
#if defined(ESP8266) || defined(ESP32) || !defined(ARDUINO)
std::unique_ptr<T[]> m_buffer;
#else
std::unique_ptr<T> m_buffer;
#endif
std::atomic<unsigned> m_inPos;
std::atomic<unsigned> m_outPos;
};
template< typename T, typename ForEachArg >
bool circular_queue<T, ForEachArg>::capacity(const size_t cap)
{
if (cap + 1 == m_bufSize) return true;
else if (available() > cap) return false;
std::unique_ptr<T[] > buffer(new T[cap + 1]);
const auto available = pop_n(buffer, cap);
m_buffer.reset(buffer);
m_bufSize = cap + 1;
std::atomic_thread_fence(std::memory_order_release);
m_inPos.store(available, std::memory_order_relaxed);
m_outPos.store(0, std::memory_order_release);
return true;
}
template< typename T, typename ForEachArg >
bool IRAM_ATTR circular_queue<T, ForEachArg>::push()
{
const auto inPos = m_inPos.load(std::memory_order_acquire);
const unsigned next = (inPos + 1) % m_bufSize;
if (next == m_outPos.load(std::memory_order_relaxed)) {
return false;
}
std::atomic_thread_fence(std::memory_order_acquire);
m_inPos.store(next, std::memory_order_release);
return true;
}
template< typename T, typename ForEachArg >
bool IRAM_ATTR circular_queue<T, ForEachArg>::push(T&& val)
{
const auto inPos = m_inPos.load(std::memory_order_acquire);
const unsigned next = (inPos + 1) % m_bufSize;
if (next == m_outPos.load(std::memory_order_relaxed)) {
return false;
}
std::atomic_thread_fence(std::memory_order_acquire);
m_buffer[inPos] = std::move(val);
std::atomic_thread_fence(std::memory_order_release);
m_inPos.store(next, std::memory_order_release);
return true;
}
#if defined(ESP8266) || defined(ESP32) || !defined(ARDUINO)
template< typename T, typename ForEachArg >
size_t circular_queue<T, ForEachArg>::push_n(const T* buffer, size_t size)
{
const auto inPos = m_inPos.load(std::memory_order_acquire);
const auto outPos = m_outPos.load(std::memory_order_relaxed);
size_t blockSize = (outPos > inPos) ? outPos - 1 - inPos : (outPos == 0) ? m_bufSize - 1 - inPos : m_bufSize - inPos;
blockSize = min(size, blockSize);
if (!blockSize) return 0;
int next = (inPos + blockSize) % m_bufSize;
std::atomic_thread_fence(std::memory_order_acquire);
auto dest = m_buffer.get() + inPos;
std::copy_n(std::make_move_iterator(buffer), blockSize, dest);
size = min(size - blockSize, outPos > 1 ? static_cast<size_t>(outPos - next - 1) : 0);
next += size;
dest = m_buffer.get();
std::copy_n(std::make_move_iterator(buffer + blockSize), size, dest);
std::atomic_thread_fence(std::memory_order_release);
m_inPos.store(next, std::memory_order_release);
return blockSize + size;
}
#endif
template< typename T, typename ForEachArg >
T circular_queue<T, ForEachArg>::pop()
{
const auto outPos = m_outPos.load(std::memory_order_acquire);
if (m_inPos.load(std::memory_order_relaxed) == outPos) return defaultValue;
std::atomic_thread_fence(std::memory_order_acquire);
auto val = std::move(m_buffer[outPos]);
std::atomic_thread_fence(std::memory_order_release);
m_outPos.store((outPos + 1) % m_bufSize, std::memory_order_release);
return val;
}
#if defined(ESP8266) || defined(ESP32) || !defined(ARDUINO)
template< typename T, typename ForEachArg >
size_t circular_queue<T, ForEachArg>::pop_n(T* buffer, size_t size) {
size_t avail = size = min(size, available());
if (!avail) return 0;
const auto outPos = m_outPos.load(std::memory_order_acquire);
size_t n = min(avail, static_cast<size_t>(m_bufSize - outPos));
std::atomic_thread_fence(std::memory_order_acquire);
if (buffer) {
buffer = std::copy_n(std::make_move_iterator(m_buffer.get() + outPos), n, buffer);
avail -= n;
std::copy_n(std::make_move_iterator(m_buffer.get()), avail, buffer);
}
std::atomic_thread_fence(std::memory_order_release);
m_outPos.store((outPos + size) % m_bufSize, std::memory_order_release);
return size;
}
#endif
template< typename T, typename ForEachArg >
#if defined(ESP8266) || defined(ESP32) || !defined(ARDUINO)
void circular_queue<T, ForEachArg>::for_each(const Delegate<void(T&&), ForEachArg>& fun)
#else
void circular_queue<T, ForEachArg>::for_each(Delegate<void(T&&), ForEachArg> fun)
#endif
{
auto outPos = m_outPos.load(std::memory_order_acquire);
const auto inPos = m_inPos.load(std::memory_order_relaxed);
std::atomic_thread_fence(std::memory_order_acquire);
while (outPos != inPos)
{
fun(std::move(m_buffer[outPos]));
std::atomic_thread_fence(std::memory_order_release);
outPos = (outPos + 1) % m_bufSize;
m_outPos.store(outPos, std::memory_order_release);
}
}
template< typename T, typename ForEachArg >
#if defined(ESP8266) || defined(ESP32) || !defined(ARDUINO)
bool circular_queue<T, ForEachArg>::for_each_rev_requeue(const Delegate<bool(T&), ForEachArg>& fun)
#else
bool circular_queue<T, ForEachArg>::for_each_rev_requeue(Delegate<bool(T&), ForEachArg> fun)
#endif
{
auto inPos0 = circular_queue<T, ForEachArg>::m_inPos.load(std::memory_order_acquire);
auto outPos = circular_queue<T, ForEachArg>::m_outPos.load(std::memory_order_relaxed);
std::atomic_thread_fence(std::memory_order_acquire);
if (outPos == inPos0) return false;
auto pos = inPos0;
auto outPos1 = inPos0;
const auto posDecr = circular_queue<T, ForEachArg>::m_bufSize - 1;
do {
pos = (pos + posDecr) % circular_queue<T, ForEachArg>::m_bufSize;
T&& val = std::move(circular_queue<T, ForEachArg>::m_buffer[pos]);
if (fun(val))
{
outPos1 = (outPos1 + posDecr) % circular_queue<T, ForEachArg>::m_bufSize;
if (outPos1 != pos) circular_queue<T, ForEachArg>::m_buffer[outPos1] = std::move(val);
}
} while (pos != outPos);
circular_queue<T, ForEachArg>::m_outPos.store(outPos1, std::memory_order_release);
return true;
}
#endif // __circular_queue_h

View File

@@ -0,0 +1,200 @@
/*
circular_queue_mp.h - Implementation of a lock-free circular queue for EspSoftwareSerial.
Copyright (c) 2019 Dirk O. Kaar. All rights reserved.
This library is free software; you can redistribute it and/or
modify it under the terms of the GNU Lesser General Public
License as published by the Free Software Foundation; either
version 2.1 of the License, or (at your option) any later version.
This library is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public
License along with this library; if not, write to the Free Software
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
*/
#ifndef __circular_queue_mp_h
#define __circular_queue_mp_h
#include "circular_queue.h"
#ifdef ESP8266
#include "interrupts.h"
#else
#include <mutex>
#endif
/*!
@brief Instance class for a multi-producer, single-consumer circular queue / ring buffer (FIFO).
This implementation is lock-free between producers and consumer for the available(), peek(),
pop(), and push() type functions, but is guarded to safely allow only a single producer
at any instant.
*/
template< typename T, typename ForEachArg = void >
class circular_queue_mp : protected circular_queue<T, ForEachArg>
{
public:
circular_queue_mp() = default;
circular_queue_mp(const size_t capacity) : circular_queue<T, ForEachArg>(capacity)
{}
circular_queue_mp(circular_queue<T, ForEachArg>&& cq) : circular_queue<T, ForEachArg>(std::move(cq))
{}
using circular_queue<T, ForEachArg>::operator=;
using circular_queue<T, ForEachArg>::capacity;
using circular_queue<T, ForEachArg>::flush;
using circular_queue<T, ForEachArg>::available;
using circular_queue<T, ForEachArg>::available_for_push;
using circular_queue<T, ForEachArg>::peek;
using circular_queue<T, ForEachArg>::pop;
using circular_queue<T, ForEachArg>::pop_n;
using circular_queue<T, ForEachArg>::for_each;
using circular_queue<T, ForEachArg>::for_each_rev_requeue;
/*!
@brief Resize the queue. The available elements in the queue are preserved.
This is not lock-free, but safe, concurrent producer or consumer access
is guarded.
@return True if the new capacity could accommodate the present elements in
the queue, otherwise nothing is done and false is returned.
*/
bool capacity(const size_t cap)
{
#ifdef ESP8266
esp8266::InterruptLock lock;
#else
std::lock_guard<std::mutex> lock(m_pushMtx);
#endif
return circular_queue<T, ForEachArg>::capacity(cap);
}
bool IRAM_ATTR push() = delete;
/*!
@brief Move the rvalue parameter into the queue, guarded
for multiple concurrent producers.
@return true if the queue accepted the value, false if the queue
was full.
*/
bool IRAM_ATTR push(T&& val)
{
#ifdef ESP8266
esp8266::InterruptLock lock;
#else
std::lock_guard<std::mutex> lock(m_pushMtx);
#endif
return circular_queue<T, ForEachArg>::push(std::move(val));
}
/*!
@brief Push a copy of the parameter into the queue, guarded
for multiple concurrent producers.
@return true if the queue accepted the value, false if the queue
was full.
*/
bool IRAM_ATTR push(const T& val)
{
#ifdef ESP8266
esp8266::InterruptLock lock;
#else
std::lock_guard<std::mutex> lock(m_pushMtx);
#endif
return circular_queue<T, ForEachArg>::push(val);
}
/*!
@brief Push copies of multiple elements from a buffer into the queue,
in order, beginning at buffer's head. This is guarded for
multiple producers, push_n() is atomic.
@return The number of elements actually copied into the queue, counted
from the buffer head.
*/
size_t push_n(const T* buffer, size_t size)
{
#ifdef ESP8266
esp8266::InterruptLock lock;
#else
std::lock_guard<std::mutex> lock(m_pushMtx);
#endif
return circular_queue<T, ForEachArg>::push_n(buffer, size);
}
/*!
@brief Pops the next available element from the queue, requeues
it immediately.
@return A reference to the just requeued element, or the default
value of type T if the queue is empty.
*/
T& pop_requeue();
/*!
@brief Iterate over, pop and optionally requeue each available element from the queue,
calling back fun with a reference of every single element.
Requeuing is dependent on the return boolean of the callback function. If it
returns true, the requeue occurs.
*/
bool for_each_requeue(const Delegate<bool(T&), ForEachArg>& fun);
#ifndef ESP8266
protected:
std::mutex m_pushMtx;
#endif
};
template< typename T, typename ForEachArg >
T& circular_queue_mp<T>::pop_requeue()
{
#ifdef ESP8266
esp8266::InterruptLock lock;
#else
std::lock_guard<std::mutex> lock(m_pushMtx);
#endif
const auto outPos = circular_queue<T, ForEachArg>::m_outPos.load(std::memory_order_acquire);
const auto inPos = circular_queue<T, ForEachArg>::m_inPos.load(std::memory_order_relaxed);
std::atomic_thread_fence(std::memory_order_acquire);
if (inPos == outPos) return circular_queue<T, ForEachArg>::defaultValue;
T& val = circular_queue<T, ForEachArg>::m_buffer[inPos] = std::move(circular_queue<T, ForEachArg>::m_buffer[outPos]);
const auto bufSize = circular_queue<T, ForEachArg>::m_bufSize;
std::atomic_thread_fence(std::memory_order_release);
circular_queue<T, ForEachArg>::m_outPos.store((outPos + 1) % bufSize, std::memory_order_relaxed);
circular_queue<T, ForEachArg>::m_inPos.store((inPos + 1) % bufSize, std::memory_order_release);
return val;
}
template< typename T, typename ForEachArg >
bool circular_queue_mp<T>::for_each_requeue(const Delegate<bool(T&), ForEachArg>& fun)
{
auto inPos0 = circular_queue<T, ForEachArg>::m_inPos.load(std::memory_order_acquire);
auto outPos = circular_queue<T, ForEachArg>::m_outPos.load(std::memory_order_relaxed);
std::atomic_thread_fence(std::memory_order_acquire);
if (outPos == inPos0) return false;
do {
T&& val = std::move(circular_queue<T, ForEachArg>::m_buffer[outPos]);
if (fun(val))
{
#ifdef ESP8266
esp8266::InterruptLock lock;
#else
std::lock_guard<std::mutex> lock(m_pushMtx);
#endif
std::atomic_thread_fence(std::memory_order_release);
auto inPos = circular_queue<T, ForEachArg>::m_inPos.load(std::memory_order_relaxed);
std::atomic_thread_fence(std::memory_order_acquire);
circular_queue<T, ForEachArg>::m_buffer[inPos] = std::move(val);
std::atomic_thread_fence(std::memory_order_release);
circular_queue<T, ForEachArg>::m_inPos.store((inPos + 1) % circular_queue<T, ForEachArg>::m_bufSize, std::memory_order_release);
}
else
{
std::atomic_thread_fence(std::memory_order_release);
}
outPos = (outPos + 1) % circular_queue<T, ForEachArg>::m_bufSize;
circular_queue<T, ForEachArg>::m_outPos.store(outPos, std::memory_order_release);
} while (outPos != inPos0);
return true;
}
#endif // __circular_queue_mp_h

View File

@@ -0,0 +1,92 @@
/*
ghostl.h - Implementation of a bare-bones, mostly no-op, C++ STL shell
that allows building some Arduino ESP8266/ESP32
libraries on Aruduino AVR.
Copyright (c) 2019 Dirk O. Kaar. All rights reserved.
This library is free software; you can redistribute it and/or
modify it under the terms of the GNU Lesser General Public
License as published by the Free Software Foundation; either
version 2.1 of the License, or (at your option) any later version.
This library is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public
License along with this library; if not, write to the Free Software
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
*/
#ifndef __ghostl_h
#define __ghostl_h
#if defined(ARDUINO_ARCH_SAMD)
#include <atomic>
#endif
namespace std
{
#if !defined(ARDUINO_ARCH_SAMD)
typedef enum memory_order {
memory_order_relaxed,
memory_order_acquire,
memory_order_release,
memory_order_seq_cst
} memory_order;
template< typename T > class atomic {
private:
T value;
public:
atomic() {}
atomic(T desired) { value = desired; }
void store(T desired, std::memory_order = std::memory_order_seq_cst) volatile noexcept { value = desired; }
T load(std::memory_order = std::memory_order_seq_cst) const volatile noexcept { return value; }
};
inline void atomic_thread_fence(std::memory_order order) noexcept {}
template< typename T > T&& move(T& t) noexcept { return static_cast<T&&>(t); }
#endif
template< typename T, unsigned long N > struct array
{
T _M_elems[N];
decltype(sizeof(0)) size() const { return N; }
T& operator[](decltype(sizeof(0)) i) { return _M_elems[i]; }
const T& operator[](decltype(sizeof(0)) i) const { return _M_elems[i]; }
};
template< typename T > class unique_ptr
{
public:
using pointer = T*;
unique_ptr() noexcept : ptr(nullptr) {}
unique_ptr(pointer p) : ptr(p) {}
pointer operator->() const noexcept { return ptr; }
T& operator[](decltype(sizeof(0)) i) const { return ptr[i]; }
void reset(pointer p = pointer()) noexcept
{
delete ptr;
ptr = p;
}
T& operator*() const { return *ptr; }
private:
pointer ptr;
};
template< typename T > using function = T*;
using nullptr_t = decltype(nullptr);
template<typename T>
struct identity {
typedef T type;
};
template <typename T>
inline T&& forward(typename identity<T>::type& t) noexcept
{
return static_cast<typename identity<T>::type&&>(t);
}
}
#endif // __ghostl_h