Sample connections stats every seconds

This commit is contained in:
Clement 2018-12-12 12:15:44 -08:00
parent c5b60594b6
commit f163bbc0d5
22 changed files with 204 additions and 455 deletions

View file

@ -915,59 +915,52 @@ void AssetServer::handleAssetUpload(QSharedPointer<ReceivedMessage> message, Sha
void AssetServer::sendStatsPacket() {
QJsonObject serverStats;
auto stats = DependencyManager::get<NodeList>()->sampleStatsForAllConnections();
auto nodeList = DependencyManager::get<NodeList>();
nodeList->eachNode([&](auto& node) {
auto& stats = node->getConnectionStats();
for (const auto& stat : stats) {
QJsonObject nodeStats;
auto endTimeMs = std::chrono::duration_cast<std::chrono::milliseconds>(stat.second.endTime);
auto endTimeMs = std::chrono::duration_cast<std::chrono::milliseconds>(stats.endTime);
QDateTime date = QDateTime::fromMSecsSinceEpoch(endTimeMs.count());
static const float USEC_PER_SEC = 1000000.0f;
static const float MEGABITS_PER_BYTE = 8.0f / 1000000.0f; // Bytes => Mbits
float elapsed = (float)(stat.second.endTime - stat.second.startTime).count() / USEC_PER_SEC; // sec
float elapsed = (float)(stats.endTime - stats.startTime).count() / USEC_PER_SEC; // sec
float megabitsPerSecPerByte = MEGABITS_PER_BYTE / elapsed; // Bytes => Mb/s
QJsonObject connectionStats;
connectionStats["1. Last Heard"] = date.toString();
connectionStats["2. Est. Max (P/s)"] = stat.second.estimatedBandwith;
connectionStats["3. RTT (ms)"] = stat.second.rtt;
connectionStats["4. CW (P)"] = stat.second.congestionWindowSize;
connectionStats["5. Period (us)"] = stat.second.packetSendPeriod;
connectionStats["6. Up (Mb/s)"] = stat.second.sentBytes * megabitsPerSecPerByte;
connectionStats["7. Down (Mb/s)"] = stat.second.receivedBytes * megabitsPerSecPerByte;
connectionStats["2. Est. Max (P/s)"] = stats.estimatedBandwith;
connectionStats["3. RTT (ms)"] = stats.rtt;
connectionStats["4. CW (P)"] = stats.congestionWindowSize;
connectionStats["5. Period (us)"] = stats.packetSendPeriod;
connectionStats["6. Up (Mb/s)"] = stats.sentBytes * megabitsPerSecPerByte;
connectionStats["7. Down (Mb/s)"] = stats.receivedBytes * megabitsPerSecPerByte;
nodeStats["Connection Stats"] = connectionStats;
using Events = udt::ConnectionStats::Stats::Event;
const auto& events = stat.second.events;
const auto& events = stats.events;
QJsonObject upstreamStats;
upstreamStats["1. Sent (P/s)"] = stat.second.sendRate;
upstreamStats["2. Sent Packets"] = stat.second.sentPackets;
upstreamStats["1. Sent (P/s)"] = stats.sendRate;
upstreamStats["2. Sent Packets"] = (int)stats.sentPackets;
upstreamStats["3. Recvd ACK"] = events[Events::ReceivedACK];
upstreamStats["4. Procd ACK"] = events[Events::ProcessedACK];
upstreamStats["5. Retransmitted"] = stat.second.retransmittedPackets;
upstreamStats["5. Retransmitted"] = (int)stats.retransmittedPackets;
nodeStats["Upstream Stats"] = upstreamStats;
QJsonObject downstreamStats;
downstreamStats["1. Recvd (P/s)"] = stat.second.receiveRate;
downstreamStats["2. Recvd Packets"] = stat.second.receivedPackets;
downstreamStats["1. Recvd (P/s)"] = stats.receiveRate;
downstreamStats["2. Recvd Packets"] = (int)stats.receivedPackets;
downstreamStats["3. Sent ACK"] = events[Events::SentACK];
downstreamStats["4. Duplicates"] = stat.second.duplicatePackets;
downstreamStats["4. Duplicates"] = (int)stats.duplicatePackets;
nodeStats["Downstream Stats"] = downstreamStats;
QString uuid;
auto nodelist = DependencyManager::get<NodeList>();
if (stat.first == nodelist->getDomainHandler().getSockAddr()) {
uuid = uuidStringWithoutCurlyBraces(nodelist->getDomainHandler().getUUID());
nodeStats[USERNAME_UUID_REPLACEMENT_STATS_KEY] = "DomainServer";
} else {
auto node = nodelist->findNodeWithAddr(stat.first);
uuid = uuidStringWithoutCurlyBraces(node ? node->getUUID() : QUuid());
nodeStats[USERNAME_UUID_REPLACEMENT_STATS_KEY] = uuid;
}
QString uuid = uuidStringWithoutCurlyBraces(node->getUUID());
nodeStats[USERNAME_UUID_REPLACEMENT_STATS_KEY] = uuid;
serverStats[uuid] = nodeStats;
}
});
// send off the stats packets
ThreadedAssignment::addPacketStatsAndSendStatsPacket(serverStats);

View file

@ -338,7 +338,7 @@ void AudioMixer::sendStatsPacket() {
QJsonObject nodeStats;
QString uuidString = uuidStringWithoutCurlyBraces(node->getUUID());
nodeStats["outbound_kbps"] = node->getOutboundBandwidth();
nodeStats["outbound_kbps"] = node->getOutboundKbps();
nodeStats[USERNAME_UUID_REPLACEMENT_STATS_KEY] = uuidString;
nodeStats["jitter"] = clientData->getAudioStreamStats();

View file

@ -839,8 +839,8 @@ void AvatarMixer::sendStatsPacket() {
// add the key to ask the domain-server for a username replacement, if it has it
avatarStats[USERNAME_UUID_REPLACEMENT_STATS_KEY] = uuidStringWithoutCurlyBraces(node->getUUID());
avatarStats[NODE_OUTBOUND_KBPS_STAT_KEY] = node->getOutboundBandwidth();
avatarStats[NODE_INBOUND_KBPS_STAT_KEY] = node->getInboundBandwidth();
avatarStats[NODE_OUTBOUND_KBPS_STAT_KEY] = node->getOutboundKbps();
avatarStats[NODE_INBOUND_KBPS_STAT_KEY] = node->getInboundKbps();
AvatarMixerClientData* clientData = static_cast<AvatarMixerClientData*>(node->getLinkedData());
if (clientData) {

View file

@ -75,8 +75,8 @@ void MessagesMixer::sendStatsPacket() {
DependencyManager::get<NodeList>()->eachNode([&](const SharedNodePointer& node) {
QJsonObject clientStats;
clientStats[USERNAME_UUID_REPLACEMENT_STATS_KEY] = uuidStringWithoutCurlyBraces(node->getUUID());
clientStats["outbound_kbps"] = node->getOutboundBandwidth();
clientStats["inbound_kbps"] = node->getInboundBandwidth();
clientStats["outbound_kbps"] = node->getOutboundKbps();
clientStats["inbound_kbps"] = node->getInboundKbps();
messagesMixerObject[uuidStringWithoutCurlyBraces(node->getUUID())] = clientStats;
});

View file

@ -192,13 +192,13 @@ Item {
}
StatText {
visible: root.expanded;
text: "Audio In Audio: " + root.audioAudioInboundPPS + " pps, " +
"Silent: " + root.audioSilentInboundPPS + " pps";
text: "Audio Mixer Out: " + root.audioMixerOutKbps + " kbps, " +
root.audioMixerOutPps + "pps";
}
StatText {
visible: root.expanded;
text: "Audio Mixer Out: " + root.audioMixerOutKbps + " kbps, " +
root.audioMixerOutPps + "pps";
text: "Audio In Audio: " + root.audioAudioInboundPPS + " pps, " +
"Silent: " + root.audioSilentInboundPPS + " pps";
}
StatText {
visible: root.expanded;

View file

@ -210,13 +210,13 @@ Item {
}
StatText {
visible: root.expanded;
text: "Audio In Audio: " + root.audioAudioInboundPPS + " pps, " +
"Silent: " + root.audioSilentInboundPPS + " pps";
text: "Audio Mixer Out: " + root.audioMixerOutKbps + " kbps, " +
root.audioMixerOutPps + "pps";
}
StatText {
visible: root.expanded;
text: "Audio Mixer Out: " + root.audioMixerOutKbps + " kbps, " +
root.audioMixerOutPps + "pps";
text: "Audio In Audio: " + root.audioAudioInboundPPS + " pps, " +
"Silent: " + root.audioSilentInboundPPS + " pps";
}
StatText {
visible: root.expanded;

View file

@ -859,7 +859,6 @@ bool setupEssentials(int& argc, char** argv, bool runningMarkerExisted) {
DependencyManager::set<LODManager>();
DependencyManager::set<StandAloneJSConsole>();
DependencyManager::set<DialogsManager>();
DependencyManager::set<BandwidthRecorder>();
DependencyManager::set<ResourceCacheSharedItems>();
DependencyManager::set<DesktopScriptingInterface>();
DependencyManager::set<EntityScriptingInterface>(true);
@ -1574,13 +1573,6 @@ Application::Application(int& argc, char** argv, QElapsedTimer& startupTimer, bo
connect(this, SIGNAL(aboutToQuit()), this, SLOT(onAboutToQuit()));
// hook up bandwidth estimator
QSharedPointer<BandwidthRecorder> bandwidthRecorder = DependencyManager::get<BandwidthRecorder>();
connect(nodeList.data(), &LimitedNodeList::dataSent,
bandwidthRecorder.data(), &BandwidthRecorder::updateOutboundData);
connect(nodeList.data(), &LimitedNodeList::dataReceived,
bandwidthRecorder.data(), &BandwidthRecorder::updateInboundData);
// FIXME -- I'm a little concerned about this.
connect(myAvatar->getSkeletonModel().get(), &SkeletonModel::skeletonLoaded,
this, &Application::checkSkeleton, Qt::QueuedConnection);
@ -2046,15 +2038,12 @@ Application::Application(int& argc, char** argv, QElapsedTimer& startupTimer, bo
properties["deadlock_watchdog_maxElapsed"] = (int)DeadlockWatchdogThread::_maxElapsed;
properties["deadlock_watchdog_maxElapsedAverage"] = (int)DeadlockWatchdogThread::_maxElapsedAverage;
auto bandwidthRecorder = DependencyManager::get<BandwidthRecorder>();
properties["packet_rate_in"] = bandwidthRecorder->getCachedTotalAverageInputPacketsPerSecond();
properties["packet_rate_out"] = bandwidthRecorder->getCachedTotalAverageOutputPacketsPerSecond();
properties["kbps_in"] = bandwidthRecorder->getCachedTotalAverageInputKilobitsPerSecond();
properties["kbps_out"] = bandwidthRecorder->getCachedTotalAverageOutputKilobitsPerSecond();
properties["atp_in_kbps"] = bandwidthRecorder->getAverageInputKilobitsPerSecond(NodeType::AssetServer);
auto nodeList = DependencyManager::get<NodeList>();
properties["packet_rate_in"] = nodeList->getInboundPPS();
properties["packet_rate_out"] = nodeList->getOutboundPPS();
properties["kbps_in"] = nodeList->getInboundKbps();
properties["kbps_out"] = nodeList->getOutboundKbps();
SharedNodePointer entityServerNode = nodeList->soloNodeOfType(NodeType::EntityServer);
SharedNodePointer audioMixerNode = nodeList->soloNodeOfType(NodeType::AudioMixer);
SharedNodePointer avatarMixerNode = nodeList->soloNodeOfType(NodeType::AvatarMixer);
@ -2065,6 +2054,7 @@ Application::Application(int& argc, char** argv, QElapsedTimer& startupTimer, bo
properties["avatar_ping"] = avatarMixerNode ? avatarMixerNode->getPingMs() : -1;
properties["asset_ping"] = assetServerNode ? assetServerNode->getPingMs() : -1;
properties["messages_ping"] = messagesMixerNode ? messagesMixerNode->getPingMs() : -1;
properties["atp_in_kbps"] = messagesMixerNode ? assetServerNode->getInboundKbps() : 0.0f;
auto loadingRequests = ResourceCache::getLoadingRequests();

View file

@ -30,7 +30,6 @@
#include <gl/Context.h>
#include "BandwidthRecorder.h"
#include "Menu.h"
#include "Util.h"
#include "SequenceNumberStats.h"
@ -166,20 +165,25 @@ void Stats::updateStats(bool force) {
STAT_UPDATE(collisionPicksUpdated, updatedPicks[PickQuery::Collision]);
}
auto bandwidthRecorder = DependencyManager::get<BandwidthRecorder>();
STAT_UPDATE(packetInCount, (int)bandwidthRecorder->getCachedTotalAverageInputPacketsPerSecond());
STAT_UPDATE(packetOutCount, (int)bandwidthRecorder->getCachedTotalAverageOutputPacketsPerSecond());
STAT_UPDATE_FLOAT(mbpsIn, (float)bandwidthRecorder->getCachedTotalAverageInputKilobitsPerSecond() / 1000.0f, 0.01f);
STAT_UPDATE_FLOAT(mbpsOut, (float)bandwidthRecorder->getCachedTotalAverageOutputKilobitsPerSecond() / 1000.0f, 0.01f);
STAT_UPDATE(packetInCount, nodeList->getInboundPPS());
STAT_UPDATE(packetOutCount, nodeList->getOutboundPPS());
STAT_UPDATE_FLOAT(mbpsIn, nodeList->getInboundKbps() / 1000.0f, 0.01f);
STAT_UPDATE_FLOAT(mbpsOut, nodeList->getOutboundKbps() / 1000.0f, 0.01f);
STAT_UPDATE_FLOAT(assetMbpsIn, (float)bandwidthRecorder->getAverageInputKilobitsPerSecond(NodeType::AssetServer) / 1000.0f, 0.01f);
STAT_UPDATE_FLOAT(assetMbpsOut, (float)bandwidthRecorder->getAverageOutputKilobitsPerSecond(NodeType::AssetServer) / 1000.0f, 0.01f);
// Second column: ping
SharedNodePointer audioMixerNode = nodeList->soloNodeOfType(NodeType::AudioMixer);
SharedNodePointer avatarMixerNode = nodeList->soloNodeOfType(NodeType::AvatarMixer);
SharedNodePointer assetServerNode = nodeList->soloNodeOfType(NodeType::AssetServer);
SharedNodePointer messageMixerNode = nodeList->soloNodeOfType(NodeType::MessagesMixer);
if (assetServerNode) {
STAT_UPDATE_FLOAT(assetMbpsIn, assetServerNode->getInboundKbps() / 1000.0f, 0.01f);
STAT_UPDATE_FLOAT(assetMbpsOut, assetServerNode->getOutboundKbps() / 1000.0f, 0.01f);
} else {
STAT_UPDATE_FLOAT(assetMbpsIn, 0.0f, 0.01f);
STAT_UPDATE_FLOAT(assetMbpsOut, 0.0f, 0.01f);
}
// Second column: ping
STAT_UPDATE(audioPing, audioMixerNode ? audioMixerNode->getPingMs() : -1);
const int mixerLossRate = (int)roundf(_audioStats->data()->getMixerStream()->lossRateWindow() * 100.0f);
const int clientLossRate = (int)roundf(_audioStats->data()->getClientStream()->lossRateWindow() * 100.0f);
@ -198,7 +202,7 @@ void Stats::updateStats(bool force) {
// TODO: this should also support entities
if (node->getType() == NodeType::EntityServer) {
totalPingOctree += node->getPingMs();
totalEntityKbps += node->getInboundBandwidth();
totalEntityKbps += node->getInboundKbps();
octreeServerCount++;
if (pingOctreeMax < node->getPingMs()) {
pingOctreeMax = node->getPingMs();
@ -218,10 +222,10 @@ void Stats::updateStats(bool force) {
if (_expanded || force) {
SharedNodePointer avatarMixer = nodeList->soloNodeOfType(NodeType::AvatarMixer);
if (avatarMixer) {
STAT_UPDATE(avatarMixerInKbps, (int)roundf(bandwidthRecorder->getAverageInputKilobitsPerSecond(NodeType::AvatarMixer)));
STAT_UPDATE(avatarMixerInPps, (int)roundf(bandwidthRecorder->getAverageInputPacketsPerSecond(NodeType::AvatarMixer)));
STAT_UPDATE(avatarMixerOutKbps, (int)roundf(bandwidthRecorder->getAverageOutputKilobitsPerSecond(NodeType::AvatarMixer)));
STAT_UPDATE(avatarMixerOutPps, (int)roundf(bandwidthRecorder->getAverageOutputPacketsPerSecond(NodeType::AvatarMixer)));
STAT_UPDATE(avatarMixerInKbps, (int)roundf(avatarMixer->getInboundKbps()));
STAT_UPDATE(avatarMixerInPps, avatarMixer->getInboundPPS());
STAT_UPDATE(avatarMixerOutKbps, (int)roundf(avatarMixer->getOutboundKbps()));
STAT_UPDATE(avatarMixerOutPps, avatarMixer->getOutboundPPS());
} else {
STAT_UPDATE(avatarMixerInKbps, -1);
STAT_UPDATE(avatarMixerInPps, -1);
@ -233,17 +237,15 @@ void Stats::updateStats(bool force) {
SharedNodePointer audioMixerNode = nodeList->soloNodeOfType(NodeType::AudioMixer);
auto audioClient = DependencyManager::get<AudioClient>().data();
if (audioMixerNode || force) {
STAT_UPDATE(audioMixerKbps, (int)roundf(
bandwidthRecorder->getAverageInputKilobitsPerSecond(NodeType::AudioMixer) +
bandwidthRecorder->getAverageOutputKilobitsPerSecond(NodeType::AudioMixer)));
STAT_UPDATE(audioMixerPps, (int)roundf(
bandwidthRecorder->getAverageInputPacketsPerSecond(NodeType::AudioMixer) +
bandwidthRecorder->getAverageOutputPacketsPerSecond(NodeType::AudioMixer)));
STAT_UPDATE(audioMixerKbps, (int)roundf(audioMixerNode->getInboundKbps() +
audioMixerNode->getOutboundKbps()));
STAT_UPDATE(audioMixerPps, audioMixerNode->getInboundPPS() +
audioMixerNode->getOutboundPPS());
STAT_UPDATE(audioMixerInKbps, (int)roundf(bandwidthRecorder->getAverageInputKilobitsPerSecond(NodeType::AudioMixer)));
STAT_UPDATE(audioMixerInPps, (int)roundf(bandwidthRecorder->getAverageInputPacketsPerSecond(NodeType::AudioMixer)));
STAT_UPDATE(audioMixerOutKbps, (int)roundf(bandwidthRecorder->getAverageOutputKilobitsPerSecond(NodeType::AudioMixer)));
STAT_UPDATE(audioMixerOutPps, (int)roundf(bandwidthRecorder->getAverageOutputPacketsPerSecond(NodeType::AudioMixer)));
STAT_UPDATE(audioMixerInKbps, (int)roundf(audioMixerNode->getInboundKbps()));
STAT_UPDATE(audioMixerInPps, audioMixerNode->getInboundPPS());
STAT_UPDATE(audioMixerOutKbps, (int)roundf(audioMixerNode->getOutboundKbps()));
STAT_UPDATE(audioMixerOutPps, audioMixerNode->getOutboundPPS());
STAT_UPDATE(audioAudioInboundPPS, (int)audioClient->getAudioInboundPPS());
STAT_UPDATE(audioSilentInboundPPS, (int)audioClient->getSilentInboundPPS());
STAT_UPDATE(audioOutboundPPS, (int)audioClient->getAudioOutboundPPS());

View file

@ -1,190 +0,0 @@
//
// BandwidthMeter.cpp
// interface/src/ui
//
// Created by Seth Alves on 2015-1-30
// Copyright 2015 High Fidelity, Inc.
//
// Based on code by Tobias Schwinger
//
// Distributed under the Apache License, Version 2.0.
// See the accompanying file LICENSE or http://www.apache.org/licenses/LICENSE-2.0.html
//
#include "BandwidthRecorder.h"
#include <QDateTime>
BandwidthRecorder::Channel::Channel() {
}
float BandwidthRecorder::Channel::getAverageInputPacketsPerSecond() const {
float averageTimeBetweenPackets = _input.getEventDeltaAverage();
if (averageTimeBetweenPackets > 0.0f) {
return (1.0f / averageTimeBetweenPackets);
}
return 0.0f;
}
float BandwidthRecorder::Channel::getAverageOutputPacketsPerSecond() const {
float averageTimeBetweenPackets = _output.getEventDeltaAverage();
if (averageTimeBetweenPackets > 0.0f) {
return (1.0f / averageTimeBetweenPackets);
}
return 0.0f;
}
float BandwidthRecorder::Channel::getAverageInputKilobitsPerSecond() const {
return (_input.getAverageSampleValuePerSecond() * (8.0f / 1000));
}
float BandwidthRecorder::Channel::getAverageOutputKilobitsPerSecond() const {
return (_output.getAverageSampleValuePerSecond() * (8.0f / 1000));
}
void BandwidthRecorder::Channel::updateInputAverage(const float sample) {
_input.updateAverage(sample);
}
void BandwidthRecorder::Channel::updateOutputAverage(const float sample) {
_output.updateAverage(sample);
}
BandwidthRecorder::BandwidthRecorder() {
for (uint i=0; i<CHANNEL_COUNT; i++) {
_channels[ i ] = NULL;
}
}
BandwidthRecorder::~BandwidthRecorder() {
for (uint i=0; i<CHANNEL_COUNT; i++) {
delete _channels[ i ];
}
}
void BandwidthRecorder::updateInboundData(const quint8 channelType, const int sample) {
if (! _channels[channelType]) {
_channels[channelType] = new Channel();
}
_channels[channelType]->updateInputAverage(sample);
}
void BandwidthRecorder::updateOutboundData(const quint8 channelType, const int sample) {
if (! _channels[channelType]) {
_channels[channelType] = new Channel();
}
_channels[channelType]->updateOutputAverage(sample);
}
float BandwidthRecorder::getAverageInputPacketsPerSecond(const quint8 channelType) const {
if (! _channels[channelType]) {
return 0.0f;
}
return _channels[channelType]->getAverageInputPacketsPerSecond();
}
float BandwidthRecorder::getAverageOutputPacketsPerSecond(const quint8 channelType) const {
if (! _channels[channelType]) {
return 0.0f;
}
return _channels[channelType]->getAverageOutputPacketsPerSecond();
}
float BandwidthRecorder::getAverageInputKilobitsPerSecond(const quint8 channelType) const {
if (! _channels[channelType]) {
return 0.0f;
}
return _channels[channelType]->getAverageInputKilobitsPerSecond();
}
float BandwidthRecorder::getAverageOutputKilobitsPerSecond(const quint8 channelType) const {
if (! _channels[channelType]) {
return 0.0f;
}
return _channels[channelType]->getAverageOutputKilobitsPerSecond();
}
float BandwidthRecorder::getTotalAverageInputPacketsPerSecond() const {
float result = 0.0f;
for (uint i=0; i<CHANNEL_COUNT; i++) {
if (_channels[i]) {
result += _channels[i]->getAverageInputPacketsPerSecond();
}
}
return result;
}
float BandwidthRecorder::getTotalAverageOutputPacketsPerSecond() const {
float result = 0.0f;
for (uint i=0; i<CHANNEL_COUNT; i++) {
if (_channels[i]) {
result += _channels[i]->getAverageOutputPacketsPerSecond();
}
}
return result;
}
float BandwidthRecorder::getTotalAverageInputKilobitsPerSecond() const {
float result = 0.0f;
for (uint i=0; i<CHANNEL_COUNT; i++) {
if (_channels[i]) {
result += _channels[i]->getAverageInputKilobitsPerSecond();
}
}
return result;
}
float BandwidthRecorder::getTotalAverageOutputKilobitsPerSecond() const {
float result = 0.0f;
for (uint i=0; i<CHANNEL_COUNT; i++) {
if (_channels[i]) {
result += _channels[i]->getAverageOutputKilobitsPerSecond();
}
}
return result;
}
float BandwidthRecorder::getCachedTotalAverageInputPacketsPerSecond() const {
static qint64 lastCalculated = 0;
static float cachedValue = 0.0f;
qint64 now = QDateTime::currentMSecsSinceEpoch();
if (now - lastCalculated > 1000.0f) {
lastCalculated = now;
cachedValue = getTotalAverageInputPacketsPerSecond();
}
return cachedValue;
}
float BandwidthRecorder::getCachedTotalAverageOutputPacketsPerSecond() const {
static qint64 lastCalculated = 0;
static float cachedValue = 0.0f;
qint64 now = QDateTime::currentMSecsSinceEpoch();
if (now - lastCalculated > 1000.0f) {
lastCalculated = now;
cachedValue = getTotalAverageOutputPacketsPerSecond();
}
return cachedValue;
}
float BandwidthRecorder::getCachedTotalAverageInputKilobitsPerSecond() const {
static qint64 lastCalculated = 0;
static float cachedValue = 0.0f;
qint64 now = QDateTime::currentMSecsSinceEpoch();
if (now - lastCalculated > 1000.0f) {
lastCalculated = now;
cachedValue = getTotalAverageInputKilobitsPerSecond();
}
return cachedValue;
}
float BandwidthRecorder::getCachedTotalAverageOutputKilobitsPerSecond() const {
static qint64 lastCalculated = 0;
static float cachedValue = 0.0f;
qint64 now = QDateTime::currentMSecsSinceEpoch();
if (now - lastCalculated > 1000.0f) {
lastCalculated = now;
cachedValue = getTotalAverageOutputKilobitsPerSecond();
}
return cachedValue;
}

View file

@ -1,75 +0,0 @@
//
// BandwidthRecorder.h
//
// Created by Seth Alves on 2015-1-30
// Copyright 2015 High Fidelity, Inc.
//
// Based on code by Tobias Schwinger
//
// Distributed under the Apache License, Version 2.0.
// See the accompanying file LICENSE or http://www.apache.org/licenses/LICENSE-2.0.html
//
#ifndef hifi_BandwidthRecorder_h
#define hifi_BandwidthRecorder_h
#include <QObject>
#include <QElapsedTimer>
#include "DependencyManager.h"
#include "SimpleMovingAverage.h"
class BandwidthRecorder : public QObject, public Dependency {
Q_OBJECT
SINGLETON_DEPENDENCY
public:
BandwidthRecorder();
~BandwidthRecorder();
// keep track of data rate in two directions as well as units and style to use during display
class Channel {
public:
Channel();
float getAverageInputPacketsPerSecond() const;
float getAverageOutputPacketsPerSecond() const;
float getAverageInputKilobitsPerSecond() const;
float getAverageOutputKilobitsPerSecond() const;
void updateInputAverage(const float sample);
void updateOutputAverage(const float sample);
private:
SimpleMovingAverage _input;
SimpleMovingAverage _output;
};
float getAverageInputPacketsPerSecond(const quint8 channelType) const;
float getAverageOutputPacketsPerSecond(const quint8 channelType) const;
float getAverageInputKilobitsPerSecond(const quint8 channelType) const;
float getAverageOutputKilobitsPerSecond(const quint8 channelType) const;
float getTotalAverageInputPacketsPerSecond() const;
float getTotalAverageOutputPacketsPerSecond() const;
float getTotalAverageInputKilobitsPerSecond() const;
float getTotalAverageOutputKilobitsPerSecond() const;
float getCachedTotalAverageInputPacketsPerSecond() const;
float getCachedTotalAverageOutputPacketsPerSecond() const;
float getCachedTotalAverageInputKilobitsPerSecond() const;
float getCachedTotalAverageOutputKilobitsPerSecond() const;
private:
// one for each possible Node type
static const unsigned int CHANNEL_COUNT = 256;
Channel* _channels[CHANNEL_COUNT];
public slots:
void updateInboundData(const quint8 channelType, const int bytes);
void updateOutboundData(const quint8 channelType, const int bytes);
};
#endif

View file

@ -83,6 +83,11 @@ LimitedNodeList::LimitedNodeList(int socketListenPort, int dtlsListenPort) :
connect(silentNodeTimer, &QTimer::timeout, this, &LimitedNodeList::removeSilentNodes);
silentNodeTimer->start(NODE_SILENCE_THRESHOLD_MSECS);
const int CONNECTION_STATS_SAMPLE_INTERVAL_MSECS = 1000;
QTimer* statsSampleTimer = new QTimer(this);
connect(statsSampleTimer, &QTimer::timeout, this, &LimitedNodeList::sampleConnectionStats);
statsSampleTimer->start(CONNECTION_STATS_SAMPLE_INTERVAL_MSECS);
// check the local socket right now
updateLocalSocket();
@ -295,7 +300,6 @@ bool LimitedNodeList::packetSourceAndHashMatchAndTrackBandwidth(const udt::Packe
});
if (sendingNodeType != NodeType::Unassigned) {
emit dataReceived(sendingNodeType, packet.getPayloadSize());
return true;
} else {
HIFI_FCDEBUG(networking(), "Replicated packet of type" << headerType
@ -303,9 +307,7 @@ bool LimitedNodeList::packetSourceAndHashMatchAndTrackBandwidth(const udt::Packe
return false;
}
} else {
emit dataReceived(NodeType::Unassigned, packet.getPayloadSize());
return true;
}
} else {
@ -328,8 +330,6 @@ bool LimitedNodeList::packetSourceAndHashMatchAndTrackBandwidth(const udt::Packe
packet.getSenderSockAddr() == getDomainSockAddr() &&
PacketTypeEnum::getDomainSourcedPackets().contains(headerType)) {
// This is a packet sourced by the domain server
emit dataReceived(NodeType::Unassigned, packet.getPayloadSize());
return true;
}
@ -367,8 +367,6 @@ bool LimitedNodeList::packetSourceAndHashMatchAndTrackBandwidth(const udt::Packe
// from this sending node
sourceNode->setLastHeardMicrostamp(usecTimestampNow());
emit dataReceived(sourceNode->getType(), packet.getPayloadSize());
return true;
} else {
@ -407,9 +405,6 @@ qint64 LimitedNodeList::sendUnreliablePacket(const NLPacket& packet, const Node&
return 0;
}
emit dataSent(destinationNode.getType(), packet.getDataSize());
destinationNode.recordBytesSent(packet.getDataSize());
return sendUnreliablePacket(packet, *destinationNode.getActiveSocket(), destinationNode.getAuthenticateHash());
}
@ -430,9 +425,6 @@ qint64 LimitedNodeList::sendPacket(std::unique_ptr<NLPacket> packet, const Node&
auto activeSocket = destinationNode.getActiveSocket();
if (activeSocket) {
emit dataSent(destinationNode.getType(), packet->getDataSize());
destinationNode.recordBytesSent(packet->getDataSize());
return sendPacket(std::move(packet), *activeSocket, destinationNode.getAuthenticateHash());
} else {
qCDebug(networking) << "LimitedNodeList::sendPacket called without active socket for node" << destinationNode << "- not sending";
@ -470,8 +462,6 @@ qint64 LimitedNodeList::sendUnreliableUnorderedPacketList(NLPacketList& packetLi
bytesSent += sendPacket(packetList.takeFront<NLPacket>(), *activeSocket,
connectionHash);
}
emit dataSent(destinationNode.getType(), bytesSent);
return bytesSent;
} else {
qCDebug(networking) << "LimitedNodeList::sendPacketList called without active socket for node" << destinationNode
@ -887,10 +877,56 @@ void LimitedNodeList::removeSilentNodes() {
}
}
void LimitedNodeList::sampleConnectionStats() {
uint32_t packetsIn { 0 };
uint32_t packetsOut { 0 };
uint64_t bytesIn { 0 };
uint64_t bytesOut { 0 };
int elapsedSum { 0 };
int elapsedCount { 0 };
auto allStats = _nodeSocket.sampleStatsForAllConnections();
for (const auto& stats : allStats) {
auto node = findNodeWithAddr(stats.first);
if (node && node->getActiveSocket() &&
*node->getActiveSocket() == stats.first) {
node->updateStats(stats.second);
}
packetsIn += stats.second.receivedPackets;
packetsIn += stats.second.receivedUnreliablePackets;
packetsOut += stats.second.sentPackets;
packetsOut += stats.second.sentUnreliablePackets;
bytesIn += stats.second.receivedBytes;
bytesIn += stats.second.receivedUnreliableBytes;
bytesOut += stats.second.sentBytes;
bytesOut += stats.second.sentUnreliableBytes;
elapsedSum += (stats.second.endTime - stats.second.startTime).count();
elapsedCount++;
}
if (elapsedCount > 0) {
float elapsedAvg = (float)elapsedSum / elapsedCount;
float factor = USECS_PER_SECOND / elapsedAvg;
float kilobitsReceived = (float)bytesIn * BITS_IN_BYTE / BYTES_PER_KILOBYTE;
float kilobitsSent = (float)bytesOut * BITS_IN_BYTE / BYTES_PER_KILOBYTE;
_inboundPPS = packetsIn * factor;
_outboundPPS = packetsOut * factor;
_inboundKbps = kilobitsReceived * factor;
_outboundKbps = kilobitsSent * factor;
} else {
_inboundPPS = 0;
_outboundPPS = 0;
_inboundKbps = 0.0f;
_outboundKbps = 0.0f;
}
}
const uint32_t RFC_5389_MAGIC_COOKIE = 0x2112A442;
const int NUM_BYTES_STUN_HEADER = 20;
void LimitedNodeList::makeSTUNRequestPacket(char* stunRequestPacket) {
int packetIndex = 0;

View file

@ -319,6 +319,11 @@ public:
void sendFakedHandshakeRequestToNode(SharedNodePointer node);
#endif
int getInboundPPS() const { return _inboundPPS; }
int getOutboundPPS() const { return _outboundPPS; }
float getInboundKbps() const { return _inboundKbps; }
float getOutboundKbps() const { return _outboundKbps; }
public slots:
void reset();
void eraseAllNodes();
@ -332,10 +337,10 @@ public slots:
bool killNodeWithUUID(const QUuid& nodeUUID, ConnectionID newConnectionID = NULL_CONNECTION_ID);
signals:
void dataSent(quint8 channelType, int bytes);
void dataReceived(quint8 channelType, int bytes);
private slots:
void sampleConnectionStats();
signals:
// QUuid might be zero for non-sourced packet types.
void packetVersionMismatch(PacketType type, const HifiSockAddr& senderSockAddr, const QUuid& senderUUID);
@ -442,6 +447,11 @@ private:
LocalIDMapping _localIDMap;
Node::LocalID _sessionLocalID { 0 };
bool _flagTimeForConnectionStep { false }; // only keep track in interface
int _inboundPPS { 0 };
int _outboundPPS { 0 };
float _inboundKbps { 0.0f };
float _outboundKbps { 0.0f };
};
#endif // hifi_LimitedNodeList_h

View file

@ -228,19 +228,3 @@ QDebug operator<<(QDebug debug, const NetworkPeer &peer) {
<< "- local:" << peer.getLocalSocket();
return debug;
}
void NetworkPeer::recordBytesSent(int count) const {
_bandwidthRecorder.updateOutboundData(0, count);
}
void NetworkPeer::recordBytesReceived(int count) const {
_bandwidthRecorder.updateInboundData(0, count);
}
float NetworkPeer::getOutboundBandwidth() const {
return _bandwidthRecorder.getAverageOutputKilobitsPerSecond(0);
}
float NetworkPeer::getInboundBandwidth() const {
return _bandwidthRecorder.getAverageInputKilobitsPerSecond(0);
}

View file

@ -18,7 +18,6 @@
#include <QtCore/QTimer>
#include <QtCore/QUuid>
#include "BandwidthRecorder.h"
#include "HifiSockAddr.h"
#include "UUID.h"
@ -78,12 +77,6 @@ public:
void incrementConnectionAttempts() { ++_connectionAttempts; }
void resetConnectionAttempts() { _connectionAttempts = 0; }
void recordBytesSent(int count) const;
void recordBytesReceived(int count) const;
float getOutboundBandwidth() const; // in kbps
float getInboundBandwidth() const; // in kbps
// Typically the LimitedNodeList removes nodes after they are "silent"
// meaning that we have not received any packets (including simple keepalive pings) from them for a set interval.
// The _isForcedNeverSilent flag tells the LimitedNodeList that a Node should never be killed by removeSilentNodes()
@ -114,8 +107,6 @@ protected:
HifiSockAddr _symmetricSocket;
HifiSockAddr* _activeSocket;
mutable BandwidthRecorder _bandwidthRecorder;
quint64 _wakeTimestamp;
std::atomic_ullong _lastHeardMicrostamp;

View file

@ -219,3 +219,37 @@ void Node::setConnectionSecret(const QUuid& connectionSecret) {
_connectionSecret = connectionSecret;
_authenticateHash->setKey(_connectionSecret);
}
void Node::updateStats(Stats stats) {
_stats = stats;
}
const Node::Stats& Node::getConnectionStats() const {
return _stats;
}
float Node::getInboundKbps() const {
float bitsReceived = (_stats.receivedBytes + _stats.receivedUnreliableBytes) * BITS_IN_BYTE;
auto elapsed = _stats.endTime - _stats.startTime;
auto bps = (bitsReceived * USECS_PER_SECOND) / elapsed.count();
return bps / BYTES_PER_KILOBYTE;
}
float Node::getOutboundKbps() const {
float bitsSent = (_stats.sentBytes + _stats.sentUnreliableBytes) * BITS_IN_BYTE;
auto elapsed = _stats.endTime - _stats.startTime;
auto bps = (bitsSent * USECS_PER_SECOND) / elapsed.count();
return bps / BYTES_PER_KILOBYTE;
}
int Node::getInboundPPS() const {
float packetsReceived = _stats.receivedPackets + _stats.receivedUnreliablePackets;
auto elapsed = _stats.endTime - _stats.startTime;
return (packetsReceived * USECS_PER_SECOND) / elapsed.count();
}
int Node::getOutboundPPS() const {
float packetsSent = _stats.sentPackets + _stats.sentUnreliablePackets;
auto elapsed = _stats.endTime - _stats.startTime;
return (packetsSent * USECS_PER_SECOND) / elapsed.count();
}

View file

@ -35,10 +35,13 @@
#include "MovingPercentile.h"
#include "NodePermissions.h"
#include "HMACAuth.h"
#include "udt/ConnectionStats.h"
#include "NumericalConstants.h"
class Node : public NetworkPeer {
Q_OBJECT
public:
using Stats = udt::ConnectionStats::Stats;
Node(const QUuid& uuid, NodeType_t type,
const HifiSockAddr& publicSocket, const HifiSockAddr& localSocket,
@ -94,6 +97,14 @@ public:
friend QDataStream& operator<<(QDataStream& out, const Node& node);
friend QDataStream& operator>>(QDataStream& in, Node& node);
void updateStats(Stats stats);
const Stats& getConnectionStats() const;
int getInboundPPS() const;
int getOutboundPPS() const;
float getInboundKbps() const;
float getOutboundKbps() const;
private:
// privatize copy and assignment operator to disallow Node copying
Node(const Node &otherNode);
@ -115,6 +126,8 @@ private:
IgnoredNodeIDs _ignoredNodeIDs;
mutable QReadWriteLock _ignoredNodeIDSetLock;
std::vector<QString> _replicatedUsernames { };
Stats _stats;
};
Q_DECLARE_METATYPE(Node*)

View file

@ -284,10 +284,6 @@ void PacketReceiver::handleVerifiedMessage(QSharedPointer<ReceivedMessage> recei
connectionType = _directlyConnectedObjects.contains(listener.object) ? Qt::DirectConnection : Qt::AutoConnection;
}
if (matchingNode) {
matchingNode->recordBytesReceived(receivedMessage->getSize());
}
QMetaMethod metaMethod = listener.method;
static const QByteArray QSHAREDPOINTER_NODE_NORMALIZED = QMetaObject::normalizedType("QSharedPointer<Node>");

View file

@ -199,6 +199,14 @@ void Connection::recordRetransmission(int wireSize, int payloadSize,
_congestionControl->onPacketReSent(wireSize, seqNum, timePoint);
}
void Connection::recordSentUnreliablePackets(int wireSize, int payloadSize) {
_stats.recordUnreliableSentPackets(payloadSize, wireSize);
}
void Connection::recordRecievedUnreliablePackets(int wireSize, int payloadSize) {
_stats.recordUnreliableReceivedPackets(payloadSize, wireSize);
}
void Connection::sendACK() {
SequenceNumber nextACKNumber = nextACK();

View file

@ -73,6 +73,9 @@ public:
void setMaxBandwidth(int maxBandwidth);
void sendHandshakeRequest();
void recordSentUnreliablePackets(int wireSize, int payloadSize);
void recordRecievedUnreliablePackets(int wireSize, int payloadSize);
signals:
void packetSent();
@ -81,6 +84,7 @@ signals:
private slots:
void recordSentPackets(int wireSize, int payloadSize, SequenceNumber seqNum, p_high_resolution_clock::time_point timePoint);
void recordRetransmission(int wireSize, int payloadSize, SequenceNumber sequenceNumber, p_high_resolution_clock::time_point timePoint);
void queueInactive();
void queueTimeout();

View file

@ -19,7 +19,6 @@ using namespace std::chrono;
ConnectionStats::ConnectionStats() {
auto now = duration_cast<microseconds>(system_clock::now().time_since_epoch());
_currentSample.startTime = now;
_total.startTime = now;
}
ConnectionStats::Stats ConnectionStats::sample() {
@ -35,101 +34,50 @@ ConnectionStats::Stats ConnectionStats::sample() {
void ConnectionStats::record(Stats::Event event) {
++_currentSample.events[(int) event];
++_total.events[(int) event];
}
void ConnectionStats::recordSentPackets(int payload, int total) {
++_currentSample.sentPackets;
++_total.sentPackets;
_currentSample.sentUtilBytes += payload;
_total.sentUtilBytes += payload;
_currentSample.sentBytes += total;
_total.sentBytes += total;
}
void ConnectionStats::recordReceivedPackets(int payload, int total) {
++_currentSample.receivedPackets;
++_total.receivedPackets;
_currentSample.receivedUtilBytes += payload;
_total.receivedUtilBytes += payload;
_currentSample.receivedBytes += total;
_total.receivedBytes += total;
}
void ConnectionStats::recordRetransmittedPackets(int payload, int total) {
++_currentSample.retransmittedPackets;
++_total.retransmittedPackets;
_currentSample.retransmittedUtilBytes += payload;
_total.retransmittedUtilBytes += payload;
_currentSample.retransmittedBytes += total;
_total.retransmittedBytes += total;
}
void ConnectionStats::recordDuplicatePackets(int payload, int total) {
++_currentSample.duplicatePackets;
++_total.duplicatePackets;
_currentSample.duplicateUtilBytes += payload;
_total.duplicateUtilBytes += payload;
_currentSample.duplicateBytes += total;
_total.duplicateBytes += total;
}
void ConnectionStats::recordUnreliableSentPackets(int payload, int total) {
++_currentSample.sentUnreliablePackets;
++_total.sentUnreliablePackets;
_currentSample.sentUnreliableUtilBytes += payload;
_total.sentUnreliableUtilBytes += payload;
_currentSample.sentUnreliableBytes += total;
_total.sentUnreliableBytes += total;
}
void ConnectionStats::recordUnreliableReceivedPackets(int payload, int total) {
++_currentSample.receivedUnreliablePackets;
++_total.receivedUnreliablePackets;
_currentSample.receivedUnreliableUtilBytes += payload;
_total.receivedUnreliableUtilBytes += payload;
_currentSample.sentUnreliableBytes += total;
_total.receivedUnreliableBytes += total;
}
static const double EWMA_CURRENT_SAMPLE_WEIGHT = 0.125;
static const double EWMA_PREVIOUS_SAMPLES_WEIGHT = 1.0 - EWMA_CURRENT_SAMPLE_WEIGHT;
void ConnectionStats::recordSendRate(int sample) {
_currentSample.sendRate = sample;
_total.sendRate = (int)((_total.sendRate * EWMA_PREVIOUS_SAMPLES_WEIGHT) + (sample * EWMA_CURRENT_SAMPLE_WEIGHT));
}
void ConnectionStats::recordReceiveRate(int sample) {
_currentSample.receiveRate = sample;
_total.receiveRate = (int)((_total.receiveRate * EWMA_PREVIOUS_SAMPLES_WEIGHT) + (sample * EWMA_CURRENT_SAMPLE_WEIGHT));
}
void ConnectionStats::recordRTT(int sample) {
_currentSample.rtt = sample;
_total.rtt = (int)((_total.rtt * EWMA_PREVIOUS_SAMPLES_WEIGHT) + (sample * EWMA_CURRENT_SAMPLE_WEIGHT));
_currentSample.receivedUnreliableBytes += total;
}
void ConnectionStats::recordCongestionWindowSize(int sample) {
_currentSample.congestionWindowSize = sample;
_total.congestionWindowSize = (int)((_total.congestionWindowSize * EWMA_PREVIOUS_SAMPLES_WEIGHT) + (sample * EWMA_CURRENT_SAMPLE_WEIGHT));
}
void ConnectionStats::recordPacketSendPeriod(int sample) {
_currentSample.packetSendPeriod = sample;
_total.packetSendPeriod = (int)((_total.packetSendPeriod * EWMA_PREVIOUS_SAMPLES_WEIGHT) + (sample * EWMA_CURRENT_SAMPLE_WEIGHT));
}
QDebug& operator<<(QDebug&& debug, const udt::ConnectionStats::Stats& stats) {

View file

@ -76,7 +76,6 @@ public:
ConnectionStats();
Stats sample();
Stats getTotalStats();
void record(Stats::Event event);
@ -88,16 +87,12 @@ public:
void recordUnreliableSentPackets(int payload, int total);
void recordUnreliableReceivedPackets(int payload, int total);
void recordSendRate(int sample);
void recordReceiveRate(int sample);
void recordRTT(int sample);
void recordCongestionWindowSize(int sample);
void recordPacketSendPeriod(int sample);
private:
Stats _currentSample;
Stats _total;
};
}

View file

@ -129,6 +129,12 @@ qint64 Socket::writePacket(const Packet& packet, const HifiSockAddr& sockAddr) {
sequenceNumber = ++_unreliableSequenceNumbers[sockAddr];
}
auto connection = findOrCreateConnection(sockAddr, true);
if (connection) {
connection->recordSentUnreliablePackets(packet.getWireSize(),
packet.getPayloadSize());
}
// write the correct sequence number to the Packet here
packet.writeSequenceNumber(sequenceNumber);
@ -392,9 +398,10 @@ void Socket::readPendingDatagrams() {
// call our verification operator to see if this packet is verified
if (!_packetFilterOperator || _packetFilterOperator(*packet)) {
auto connection = findOrCreateConnection(senderSockAddr, true);
if (packet->isReliable()) {
// if this was a reliable packet then signal the matching connection with the sequence number
auto connection = findOrCreateConnection(senderSockAddr, true);
if (!connection || !connection->processReceivedSequenceNumber(packet->getSequenceNumber(),
packet->getDataSize(),
@ -406,6 +413,9 @@ void Socket::readPendingDatagrams() {
#endif
continue;
}
} else if (connection) {
connection->recordRecievedUnreliablePackets(packet->getWireSize(),
packet->getPayloadSize());
}
if (packet->isPartOfMessage()) {