Merge pull request from birarda/atp

test for ordered sending in udt-test
This commit is contained in:
Clément Brisset 2015-08-25 21:05:42 +02:00
commit 34b3bd211d
6 changed files with 181 additions and 64 deletions

View file

@ -48,14 +48,14 @@ void ConnectionStats::recordSentPackets(int payload, int total) {
}
void ConnectionStats::recordReceivedPackets(int payload, int total) {
++_currentSample.recievedPackets;
++_total.recievedPackets;
++_currentSample.receivedPackets;
++_total.receivedPackets;
_currentSample.recievedUtilBytes += payload;
_total.recievedUtilBytes += payload;
_currentSample.receivedUtilBytes += payload;
_total.receivedUtilBytes += payload;
_currentSample.recievedBytes += total;
_total.recievedBytes += total;
_currentSample.receivedBytes += total;
_total.receivedBytes += total;
}
void ConnectionStats::recordUnreliableSentPackets(int payload, int total) {
@ -70,14 +70,14 @@ void ConnectionStats::recordUnreliableSentPackets(int payload, int total) {
}
void ConnectionStats::recordUnreliableReceivedPackets(int payload, int total) {
++_currentSample.recievedUnreliablePackets;
++_total.recievedUnreliablePackets;
++_currentSample.receivedUnreliablePackets;
++_total.receivedUnreliablePackets;
_currentSample.recievedUnreliableUtilBytes += payload;
_total.recievedUnreliableUtilBytes += payload;
_currentSample.receivedUnreliableUtilBytes += payload;
_total.receivedUnreliableUtilBytes += payload;
_currentSample.sentUnreliableBytes += total;
_total.recievedUnreliableBytes += total;
_total.receivedUnreliableBytes += total;
}
static const double EWMA_CURRENT_SAMPLE_WEIGHT = 0.125;

View file

@ -44,18 +44,18 @@ public:
// packet counts and sizes
int sentPackets { 0 };
int recievedPackets { 0 };
int receivedPackets { 0 };
int sentUtilBytes { 0 };
int recievedUtilBytes { 0 };
int receivedUtilBytes { 0 };
int sentBytes { 0 };
int recievedBytes { 0 };
int receivedBytes { 0 };
int sentUnreliablePackets { 0 };
int recievedUnreliablePackets { 0 };
int receivedUnreliablePackets { 0 };
int sentUnreliableUtilBytes { 0 };
int recievedUnreliableUtilBytes { 0 };
int receivedUnreliableUtilBytes { 0 };
int sentUnreliableBytes { 0 };
int recievedUnreliableBytes { 0 };
int receivedUnreliableBytes { 0 };
// the following stats are trailing averages in the result, not totals
int sendRate { 0 };

View file

@ -26,9 +26,10 @@ PacketList::PacketList(PacketType packetType, QByteArray extendedHeader, bool is
}
PacketList::PacketList(PacketList&& other) :
_packetType(other._packetType),
_packets(std::move(other._packets))
_packets(std::move(other._packets)),
_packetType(other._packetType)
{
}
void PacketList::startSegment() {
@ -52,6 +53,19 @@ size_t PacketList::getDataSize() const {
return totalBytes;
}
size_t PacketList::getMessageSize() const {
size_t totalBytes = 0;
for (const auto& packet: _packets) {
totalBytes += packet->getPayloadSize();
}
if (_currentPacket) {
totalBytes += _currentPacket->getPayloadSize();
}
return totalBytes;
}
std::unique_ptr<PacketList> PacketList::fromReceivedPackets(std::list<std::unique_ptr<Packet>>&& packets) {
auto packetList = std::unique_ptr<PacketList>(new PacketList(PacketType::Unknown, QByteArray(), true, true));
packetList->_packets = std::move(packets);

View file

@ -47,6 +47,7 @@ public:
QByteArray getExtendedHeader() const { return _extendedHeader; }
size_t getDataSize() const;
size_t getMessageSize() const;
void closeCurrentPacket(bool shouldSendEmpty = false);

View file

@ -29,16 +29,16 @@ const QCommandLineOption PACKET_SIZE {
QString(udt::MAX_PACKET_SIZE_WITH_UDP_HEADER)
};
const QCommandLineOption MIN_PACKET_SIZE {
"min-packet-size", "min size for sent packets in bytes", "min-bytes"
"min-packet-size", "min size for sent packets in bytes", "min bytes"
};
const QCommandLineOption MAX_PACKET_SIZE {
"max-packet-size", "max size for sent packets in bytes", "max-bytes"
"max-packet-size", "max size for sent packets in bytes", "max bytes"
};
const QCommandLineOption MAX_SEND_BYTES {
"max-send-bytes", "number of bytes to send before stopping (default is infinite)", "max-bytes"
"max-send-bytes", "number of bytes to send before stopping (default is infinite)", "max bytes"
};
const QCommandLineOption MAX_SEND_PACKETS {
"max-send-packets", "number of packets to send before stopping (default is infinite)", "max-packets"
"max-send-packets", "number of packets to send before stopping (default is infinite)", "max packets"
};
const QCommandLineOption UNRELIABLE_PACKETS {
"unreliable", "send unreliable packets (default is reliable)"
@ -46,17 +46,26 @@ const QCommandLineOption UNRELIABLE_PACKETS {
const QCommandLineOption ORDERED_PACKETS {
"ordered", "send ordered packets (default is unordered)"
};
const QCommandLineOption MESSAGE_SIZE {
"message-size", "megabytes per message payload for ordered sending (default is 20)", "megabytes"
};
const QCommandLineOption MESSAGE_SEED {
"message-seed", "seed used for random number generation to match ordered messages (default is 742272)", "integer"
};
const QCommandLineOption STATS_INTERVAL {
"stats-interval", "stats output interval (default is 100ms)", "milliseconds"
};
const QStringList CLIENT_STATS_TABLE_HEADERS {
"Send Rate (P/s)", "Bandwidth (P/s)", "RTT(ms)", "CW (P)", "Send Period (us)",
"Received ACK", "Processed ACK", "Received LACK", "Received NAK", "Received TNAK",
"Send (P/s)", "Est. Max (P/s)", "RTT (ms)", "CW (P)", "Period (us)",
"Recv ACK", "Procd ACK", "Recv LACK", "Recv NAK", "Recv TNAK",
"Sent ACK2", "Sent Packets", "Re-sent Packets"
};
const QStringList SERVER_STATS_TABLE_HEADERS {
"Receive Rate (P/s)", "Bandwidth (P/s)", "RTT(ms)", "CW (P)",
"Megabits", "Recv P/s", "Est. Max (P/s)", "RTT (ms)", "CW (P)",
"Sent ACK", "Sent LACK", "Sent NAK", "Sent TNAK",
"Recieved ACK2", "Duplicate Packets"
"Recv ACK2", "Duplicates (P)"
};
UDTTest::UDTTest(int& argc, char** argv) :
@ -138,16 +147,51 @@ UDTTest::UDTTest(int& argc, char** argv) :
_sendOrdered = true;
}
if (_argumentParser.isSet(MESSAGE_SIZE)) {
if (_argumentParser.isSet(ORDERED_PACKETS)) {
static const double BYTES_PER_MEGABYTE = 1000000;
_messageSize = (int) _argumentParser.value(MESSAGE_SIZE).toInt() * BYTES_PER_MEGABYTE;
qDebug() << "Message size for ordered packet sending is" << QString("%1MB").arg(_messageSize / BYTES_PER_MEGABYTE);
} else {
qWarning() << "message-size has no effect if not sending ordered - it will be ignored";
}
}
// in case we're an ordered sender or receiver setup our random number generator now
static const int FIRST_MESSAGE_SEED = 742272;
int messageSeed = FIRST_MESSAGE_SEED;
if (_argumentParser.isSet(MESSAGE_SEED)) {
messageSeed = _argumentParser.value(MESSAGE_SEED).toInt();
}
// seed the generator with a value that the receiver will also use when verifying the ordered message
_generator.seed(messageSeed);
if (!_target.isNull()) {
sendInitialPackets();
} else {
// this is a receiver - in case there are ordered packets (messages) being sent to us make sure that we handle them
// so that they can be verified
using std::placeholders::_1;
_socket.setPacketListHandler(std::bind(&UDTTest::handlePacketList, this, _1));
}
// the sender reports stats every 100 milliseconds
static const int STATS_SAMPLE_INTERVAL = 100;
int statsInterval = STATS_SAMPLE_INTERVAL;
if (_argumentParser.isSet(STATS_INTERVAL)) {
statsInterval = _argumentParser.value(STATS_INTERVAL).toInt();
}
QTimer* statsTimer = new QTimer(this);
connect(statsTimer, &QTimer::timeout, this, &UDTTest::sampleStats);
statsTimer->start(STATS_SAMPLE_INTERVAL);
statsTimer->start(statsInterval);
}
void UDTTest::parseArguments() {
@ -159,7 +203,8 @@ void UDTTest::parseArguments() {
_argumentParser.addOptions({
PORT_OPTION, TARGET_OPTION, PACKET_SIZE, MIN_PACKET_SIZE, MAX_PACKET_SIZE,
MAX_SEND_BYTES, MAX_SEND_PACKETS, UNRELIABLE_PACKETS, ORDERED_PACKETS
MAX_SEND_BYTES, MAX_SEND_PACKETS, UNRELIABLE_PACKETS, ORDERED_PACKETS,
MESSAGE_SIZE, MESSAGE_SEED, STATS_INTERVAL
});
if (!_argumentParser.parse(arguments())) {
@ -216,22 +261,40 @@ void UDTTest::sendPacket() {
}
if (_sendOrdered) {
// check if it is time to add another message - we do this every time 95% of the message size has been sent
static int call = 0;
call = (call + 1) % 4;
if (call == 0) {
auto packetList = std::unique_ptr<udt::PacketList>(new udt::PacketList(PacketType::BulkAvatarData, QByteArray(), true, true));
for (int i = 0; i < 4; i++) {
packetList->writePrimitive(0x1);
packetList->writePrimitive(0x2);
packetList->writePrimitive(0x3);
packetList->writePrimitive(0x4);
packetList->closeCurrentPacket(false);
static int packetSize = udt::Packet::maxPayloadSize(true);
static int messageSizePackets = (int) ceil(_messageSize / udt::Packet::maxPayloadSize(true));
static int refillCount = (int) (messageSizePackets * 0.95);
if (call++ % refillCount == 0) {
// construct a reliable and ordered packet list
auto packetList = std::unique_ptr<udt::PacketList>({
new udt::PacketList(PacketType::BulkAvatarData, QByteArray(), true, true)
});
// fill the packet list with random data according to the constant seed (so receiver can verify)
for (int i = 0; i < messageSizePackets; ++i) {
// setup a QByteArray full of zeros for our random padded data
QByteArray randomPaddedData { packetSize, 0 };
// generate a random integer for the first 8 bytes of the random data
uint64_t randomInt = _distribution(_generator);
randomPaddedData.replace(0, sizeof(randomInt), reinterpret_cast<char*>(&randomInt), sizeof(randomInt));
// write this data to the PacketList
packetList->write(randomPaddedData);
}
packetList->closeCurrentPacket(false);
_totalQueuedBytes += packetList->getDataSize();
_totalQueuedPackets += packetList->getNumPackets();
_socket.writePacketList(std::move(packetList), _target);
}
_totalQueuedPackets += 4;
} else {
auto newPacket = udt::Packet::create(packetPayloadSize, _sendReliable);
newPacket->setPayloadSize(packetPayloadSize);
@ -239,17 +302,43 @@ void UDTTest::sendPacket() {
_totalQueuedBytes += newPacket->getDataSize();
// queue or send this packet by calling write packet on the socket for our target
// if (
if (_sendReliable) {
_socket.writePacket(std::move(newPacket), _target);
} else {
_socket.writePacket(*newPacket, _target);
}
++_totalQueuedPackets;
}
}
void UDTTest::handlePacketList(std::unique_ptr<udt::PacketList> packetList) {
// generate the byte array that should match this message - using the same seed the sender did
int packetSize = udt::Packet::maxPayloadSize(true);
int messageSize = packetList->getMessageSize();
QByteArray messageData(messageSize, 0);
for (int i = 0; i < messageSize; i += packetSize) {
// generate the random 64-bit unsigned integer that should lead this packet
uint64_t randomInt = _distribution(_generator);
messageData.replace(i, sizeof(randomInt), reinterpret_cast<char*>(&randomInt), sizeof(randomInt));
}
bool dataMatch = messageData == packetList->getMessage();
Q_ASSERT_X(dataMatch, "UDTTest::handlePacketList",
"received message did not match expected message (from seeded random number generation).");
if (!dataMatch) {
qCritical() << "UDTTest::handlePacketList" << "received message did not match expected message"
<< "(from seeded random number generation).";
}
}
void UDTTest::sampleStats() {
static bool first = true;
static const double USECS_PER_MSEC = 1000.0;
@ -267,19 +356,19 @@ void UDTTest::sampleStats() {
// setup a list of left justified values
QStringList values {
QString::number(stats.sendRate).leftJustified(CLIENT_STATS_TABLE_HEADERS[++headerIndex].size()),
QString::number(stats.estimatedBandwith).leftJustified(CLIENT_STATS_TABLE_HEADERS[++headerIndex].size()),
QString::number(stats.rtt / USECS_PER_MSEC).leftJustified(CLIENT_STATS_TABLE_HEADERS[++headerIndex].size()),
QString::number(stats.congestionWindowSize).leftJustified(CLIENT_STATS_TABLE_HEADERS[++headerIndex].size()),
QString::number(stats.packetSendPeriod).leftJustified(CLIENT_STATS_TABLE_HEADERS[++headerIndex].size()),
QString::number(stats.events[udt::ConnectionStats::Stats::ReceivedACK]).leftJustified(CLIENT_STATS_TABLE_HEADERS[++headerIndex].size()),
QString::number(stats.events[udt::ConnectionStats::Stats::ProcessedACK]).leftJustified(CLIENT_STATS_TABLE_HEADERS[++headerIndex].size()),
QString::number(stats.events[udt::ConnectionStats::Stats::ReceivedLightACK]).leftJustified(CLIENT_STATS_TABLE_HEADERS[++headerIndex].size()),
QString::number(stats.events[udt::ConnectionStats::Stats::ReceivedNAK]).leftJustified(CLIENT_STATS_TABLE_HEADERS[++headerIndex].size()),
QString::number(stats.events[udt::ConnectionStats::Stats::ReceivedTimeoutNAK]).leftJustified(CLIENT_STATS_TABLE_HEADERS[++headerIndex].size()),
QString::number(stats.events[udt::ConnectionStats::Stats::SentACK2]).leftJustified(CLIENT_STATS_TABLE_HEADERS[++headerIndex].size()),
QString::number(stats.sentPackets).leftJustified(CLIENT_STATS_TABLE_HEADERS[++headerIndex].size()),
QString::number(stats.events[udt::ConnectionStats::Stats::Retransmission]).leftJustified(CLIENT_STATS_TABLE_HEADERS[++headerIndex].size())
QString::number(stats.sendRate).rightJustified(CLIENT_STATS_TABLE_HEADERS[++headerIndex].size()),
QString::number(stats.estimatedBandwith).rightJustified(CLIENT_STATS_TABLE_HEADERS[++headerIndex].size()),
QString::number(stats.rtt / USECS_PER_MSEC, 'f', 2).rightJustified(CLIENT_STATS_TABLE_HEADERS[++headerIndex].size()),
QString::number(stats.congestionWindowSize).rightJustified(CLIENT_STATS_TABLE_HEADERS[++headerIndex].size()),
QString::number(stats.packetSendPeriod).rightJustified(CLIENT_STATS_TABLE_HEADERS[++headerIndex].size()),
QString::number(stats.events[udt::ConnectionStats::Stats::ReceivedACK]).rightJustified(CLIENT_STATS_TABLE_HEADERS[++headerIndex].size()),
QString::number(stats.events[udt::ConnectionStats::Stats::ProcessedACK]).rightJustified(CLIENT_STATS_TABLE_HEADERS[++headerIndex].size()),
QString::number(stats.events[udt::ConnectionStats::Stats::ReceivedLightACK]).rightJustified(CLIENT_STATS_TABLE_HEADERS[++headerIndex].size()),
QString::number(stats.events[udt::ConnectionStats::Stats::ReceivedNAK]).rightJustified(CLIENT_STATS_TABLE_HEADERS[++headerIndex].size()),
QString::number(stats.events[udt::ConnectionStats::Stats::ReceivedTimeoutNAK]).rightJustified(CLIENT_STATS_TABLE_HEADERS[++headerIndex].size()),
QString::number(stats.events[udt::ConnectionStats::Stats::SentACK2]).rightJustified(CLIENT_STATS_TABLE_HEADERS[++headerIndex].size()),
QString::number(stats.sentPackets).rightJustified(CLIENT_STATS_TABLE_HEADERS[++headerIndex].size()),
QString::number(stats.events[udt::ConnectionStats::Stats::Retransmission]).rightJustified(CLIENT_STATS_TABLE_HEADERS[++headerIndex].size())
};
// output this line of values
@ -297,18 +386,21 @@ void UDTTest::sampleStats() {
int headerIndex = -1;
static const double MEGABITS_PER_BYTE = 8.0 / 1000000.0;
// setup a list of left justified values
QStringList values {
QString::number(stats.receiveRate).leftJustified(SERVER_STATS_TABLE_HEADERS[++headerIndex].size()),
QString::number(stats.estimatedBandwith).leftJustified(SERVER_STATS_TABLE_HEADERS[++headerIndex].size()),
QString::number(stats.rtt / USECS_PER_MSEC).leftJustified(SERVER_STATS_TABLE_HEADERS[++headerIndex].size()),
QString::number(stats.congestionWindowSize).leftJustified(CLIENT_STATS_TABLE_HEADERS[++headerIndex].size()),
QString::number(stats.events[udt::ConnectionStats::Stats::SentACK]).leftJustified(SERVER_STATS_TABLE_HEADERS[++headerIndex].size()),
QString::number(stats.events[udt::ConnectionStats::Stats::SentLightACK]).leftJustified(SERVER_STATS_TABLE_HEADERS[++headerIndex].size()),
QString::number(stats.events[udt::ConnectionStats::Stats::SentNAK]).leftJustified(SERVER_STATS_TABLE_HEADERS[++headerIndex].size()),
QString::number(stats.events[udt::ConnectionStats::Stats::SentTimeoutNAK]).leftJustified(SERVER_STATS_TABLE_HEADERS[++headerIndex].size()),
QString::number(stats.events[udt::ConnectionStats::Stats::ReceivedACK2]).leftJustified(SERVER_STATS_TABLE_HEADERS[++headerIndex].size()),
QString::number(stats.events[udt::ConnectionStats::Stats::Duplicate]).leftJustified(SERVER_STATS_TABLE_HEADERS[++headerIndex].size())
QString::number(stats.receivedBytes * MEGABITS_PER_BYTE, 'f', 2).rightJustified(SERVER_STATS_TABLE_HEADERS[++headerIndex].size()),
QString::number(stats.receiveRate).rightJustified(SERVER_STATS_TABLE_HEADERS[++headerIndex].size()),
QString::number(stats.estimatedBandwith).rightJustified(SERVER_STATS_TABLE_HEADERS[++headerIndex].size()),
QString::number(stats.rtt / USECS_PER_MSEC, 'f', 2).rightJustified(SERVER_STATS_TABLE_HEADERS[++headerIndex].size()),
QString::number(stats.congestionWindowSize).rightJustified(SERVER_STATS_TABLE_HEADERS[++headerIndex].size()),
QString::number(stats.events[udt::ConnectionStats::Stats::SentACK]).rightJustified(SERVER_STATS_TABLE_HEADERS[++headerIndex].size()),
QString::number(stats.events[udt::ConnectionStats::Stats::SentLightACK]).rightJustified(SERVER_STATS_TABLE_HEADERS[++headerIndex].size()),
QString::number(stats.events[udt::ConnectionStats::Stats::SentNAK]).rightJustified(SERVER_STATS_TABLE_HEADERS[++headerIndex].size()),
QString::number(stats.events[udt::ConnectionStats::Stats::SentTimeoutNAK]).rightJustified(SERVER_STATS_TABLE_HEADERS[++headerIndex].size()),
QString::number(stats.events[udt::ConnectionStats::Stats::ReceivedACK2]).rightJustified(SERVER_STATS_TABLE_HEADERS[++headerIndex].size()),
QString::number(stats.events[udt::ConnectionStats::Stats::Duplicate]).rightJustified(SERVER_STATS_TABLE_HEADERS[++headerIndex].size())
};
// output this line of values

View file

@ -14,6 +14,9 @@
#ifndef hifi_UDTTest_h
#define hifi_UDTTest_h
#include <random>
#include <QtCore/QCoreApplication>
#include <QtCore/QCommandLineParser>
@ -31,6 +34,7 @@ public slots:
private:
void parseArguments();
void handlePacketList(std::unique_ptr<udt::PacketList> packetList);
void sendInitialPackets(); // fills the queue with packets to start
void sendPacket(); // constructs and sends a packet according to the test parameters
@ -48,6 +52,12 @@ private:
bool _sendReliable { true }; // whether packets are sent reliably or unreliably
bool _sendOrdered { false }; // whether to send ordered packets
int _messageSize { 10000000 }; // number of bytes per message while sending ordered
std::random_device _randomDevice;
std::mt19937 _generator { _randomDevice() }; // random number generator for ordered data testing
std::uniform_int_distribution<uint64_t> _distribution { 1, UINT64_MAX }; // producer of random integer values
int _totalQueuedPackets { 0 }; // keeps track of the number of packets we have already queued
int _totalQueuedBytes { 0 }; // keeps track of the number of bytes we have already queued
};