avoid various data-races related to stats collection and delivery

This commit is contained in:
Seth Alves 2016-10-18 13:05:53 -07:00
parent 0cda01f105
commit 0c6dedff45
6 changed files with 63 additions and 25 deletions

View file

@ -636,7 +636,7 @@ QString AudioMixer::percentageForMixStats(int counter) {
} }
void AudioMixer::sendStatsPacket() { void AudioMixer::sendStatsPacket() {
static QJsonObject statsObject; QJsonObject statsObject;
statsObject["useDynamicJitterBuffers"] = _numStaticJitterFrames == -1; statsObject["useDynamicJitterBuffers"] = _numStaticJitterFrames == -1;
statsObject["trailing_sleep_percentage"] = _trailingSleepRatio * 100.0f; statsObject["trailing_sleep_percentage"] = _trailingSleepRatio * 100.0f;

View file

@ -321,9 +321,8 @@ protected:
PacketReceiver* _packetReceiver; PacketReceiver* _packetReceiver;
// XXX can BandwidthRecorder be used for this? std::atomic<int> _numCollectedPackets;
int _numCollectedPackets; std::atomic<int> _numCollectedBytes;
int _numCollectedBytes;
QElapsedTimer _packetStatTimer; QElapsedTimer _packetStatTimer;
NodePermissions _permissions; NodePermissions _permissions;

View file

@ -127,19 +127,30 @@ NodeList::NodeList(char newOwnerType, int socketListenPort, int dtlsListenPort)
packetReceiver.registerListener(PacketType::DomainServerRemovedNode, this, "processDomainServerRemovedNode"); packetReceiver.registerListener(PacketType::DomainServerRemovedNode, this, "processDomainServerRemovedNode");
} }
qint64 NodeList::sendStats(const QJsonObject& statsObject, const HifiSockAddr& destination) { qint64 NodeList::sendStats(QJsonObject statsObject, HifiSockAddr destination) {
if (thread() != QThread::currentThread()) {
QMetaObject::invokeMethod(this, "sendStats", Qt::QueuedConnection,
Q_ARG(QJsonObject, statsObject),
Q_ARG(HifiSockAddr, destination));
return 0;
}
auto statsPacketList = NLPacketList::create(PacketType::NodeJsonStats, QByteArray(), true, true); auto statsPacketList = NLPacketList::create(PacketType::NodeJsonStats, QByteArray(), true, true);
QJsonDocument jsonDocument(statsObject); QJsonDocument jsonDocument(statsObject);
statsPacketList->write(jsonDocument.toBinaryData()); statsPacketList->write(jsonDocument.toBinaryData());
sendPacketList(std::move(statsPacketList), destination); sendPacketList(std::move(statsPacketList), destination);
// enumerate the resulting strings, breaking them into MTU sized packets
return 0; return 0;
} }
qint64 NodeList::sendStatsToDomainServer(const QJsonObject& statsObject) { qint64 NodeList::sendStatsToDomainServer(QJsonObject statsObject) {
if (thread() != QThread::currentThread()) {
QMetaObject::invokeMethod(this, "sendStatsToDomainServer", Qt::QueuedConnection,
Q_ARG(QJsonObject, statsObject));
return 0;
}
return sendStats(statsObject, _domainHandler.getSockAddr()); return sendStats(statsObject, _domainHandler.getSockAddr());
} }

View file

@ -54,8 +54,8 @@ public:
NodeType_t getOwnerType() const { return _ownerType; } NodeType_t getOwnerType() const { return _ownerType; }
void setOwnerType(NodeType_t ownerType) { _ownerType = ownerType; } void setOwnerType(NodeType_t ownerType) { _ownerType = ownerType; }
qint64 sendStats(const QJsonObject& statsObject, const HifiSockAddr& destination); Q_INVOKABLE qint64 sendStats(QJsonObject statsObject, HifiSockAddr destination);
qint64 sendStatsToDomainServer(const QJsonObject& statsObject); Q_INVOKABLE qint64 sendStatsToDomainServer(QJsonObject statsObject);
int getNumNoReplyDomainCheckIns() const { return _numNoReplyDomainCheckIns; } int getNumNoReplyDomainCheckIns() const { return _numNoReplyDomainCheckIns; }
DomainHandler& getDomainHandler() { return _domainHandler; } DomainHandler& getDomainHandler() { return _domainHandler; }

View file

@ -19,15 +19,29 @@ SimpleMovingAverage::SimpleMovingAverage(int numSamplesToAverage) :
_eventDeltaAverage(0.0f), _eventDeltaAverage(0.0f),
WEIGHTING(1.0f / numSamplesToAverage), WEIGHTING(1.0f / numSamplesToAverage),
ONE_MINUS_WEIGHTING(1 - WEIGHTING) { ONE_MINUS_WEIGHTING(1 - WEIGHTING) {
} }
SimpleMovingAverage::SimpleMovingAverage(const SimpleMovingAverage& other) {
*this = other;
}
SimpleMovingAverage& SimpleMovingAverage::operator=(const SimpleMovingAverage& other) {
_numSamples = (int)other._numSamples;
_lastEventTimestamp = (uint64_t)other._lastEventTimestamp;
_average = (float)other._average;
_eventDeltaAverage = (float)other._eventDeltaAverage;
WEIGHTING = other.WEIGHTING;
ONE_MINUS_WEIGHTING = other.ONE_MINUS_WEIGHTING;
return *this;
}
int SimpleMovingAverage::updateAverage(float sample) { int SimpleMovingAverage::updateAverage(float sample) {
if (_numSamples > 0) { if (_numSamples > 0) {
_average = (ONE_MINUS_WEIGHTING * _average) + (WEIGHTING * sample); _average = (ONE_MINUS_WEIGHTING * _average) + (WEIGHTING * sample);
float eventDelta = (usecTimestampNow() - _lastEventTimestamp) / 1000000.0f; float eventDelta = (usecTimestampNow() - _lastEventTimestamp) / 1000000.0f;
if (_numSamples > 1) { if (_numSamples > 1) {
_eventDeltaAverage = (ONE_MINUS_WEIGHTING * _eventDeltaAverage) + _eventDeltaAverage = (ONE_MINUS_WEIGHTING * _eventDeltaAverage) +
(WEIGHTING * eventDelta); (WEIGHTING * eventDelta);

View file

@ -16,27 +16,31 @@
#include <mutex> #include <mutex>
#include <stdint.h> #include <stdint.h>
#include <atomic>
class SimpleMovingAverage { class SimpleMovingAverage {
public: public:
SimpleMovingAverage(int numSamplesToAverage = 100); SimpleMovingAverage(int numSamplesToAverage = 100);
SimpleMovingAverage(const SimpleMovingAverage& other);
SimpleMovingAverage& operator=(const SimpleMovingAverage& other);
int updateAverage(float sample); int updateAverage(float sample);
void reset(); void reset();
int getSampleCount() const { return _numSamples; }; int getSampleCount() const { return _numSamples; };
float getAverage() const { return _average; }; float getAverage() const { return _average; };
float getEventDeltaAverage() const; // returned in seconds float getEventDeltaAverage() const; // returned in seconds
float getAverageSampleValuePerSecond() const { return _average * (1.0f / getEventDeltaAverage()); } float getAverageSampleValuePerSecond() const { return _average * (1.0f / getEventDeltaAverage()); }
uint64_t getUsecsSinceLastEvent() const; uint64_t getUsecsSinceLastEvent() const;
private: private:
int _numSamples; std::atomic<int> _numSamples;
uint64_t _lastEventTimestamp; std::atomic<uint64_t> _lastEventTimestamp;
float _average; std::atomic<float> _average;
float _eventDeltaAverage; std::atomic<float> _eventDeltaAverage;
float WEIGHTING; float WEIGHTING;
float ONE_MINUS_WEIGHTING; float ONE_MINUS_WEIGHTING;
}; };
@ -44,10 +48,20 @@ private:
template <class T, int MAX_NUM_SAMPLES> class MovingAverage { template <class T, int MAX_NUM_SAMPLES> class MovingAverage {
public: public:
MovingAverage<T, MAX_NUM_SAMPLES>() {}
MovingAverage<T, MAX_NUM_SAMPLES>(const MovingAverage<T, MAX_NUM_SAMPLES>& other) {
*this = other;
}
MovingAverage<T, MAX_NUM_SAMPLES>& operator=(const MovingAverage<T, MAX_NUM_SAMPLES>& other) {
numSamples = (int)other.numSamples;
average = (T)other.average;
return *this;
}
const float WEIGHTING = 1.0f / (float)MAX_NUM_SAMPLES; const float WEIGHTING = 1.0f / (float)MAX_NUM_SAMPLES;
const float ONE_MINUS_WEIGHTING = 1.0f - WEIGHTING; const float ONE_MINUS_WEIGHTING = 1.0f - WEIGHTING;
int numSamples{ 0 }; std::atomic<int> numSamples{ 0 };
T average; std::atomic<T> average;
void clear() { void clear() {
numSamples = 0; numSamples = 0;
@ -72,7 +86,7 @@ public:
_samples = 0; _samples = 0;
} }
bool isAverageValid() const { bool isAverageValid() const {
std::unique_lock<std::mutex> lock(_lock); std::unique_lock<std::mutex> lock(_lock);
return (_samples > 0); return (_samples > 0);
} }
@ -87,7 +101,7 @@ public:
_samples++; _samples++;
} }
T getAverage() const { T getAverage() const {
std::unique_lock<std::mutex> lock(_lock); std::unique_lock<std::mutex> lock(_lock);
return _average; return _average;
} }