complete the initial version of UDTTest

This commit is contained in:
Stephen Birarda 2015-07-30 17:09:25 -07:00
parent 80ef80ec2e
commit d340515ab3
17 changed files with 133 additions and 33 deletions

View file

@ -11,8 +11,10 @@
#include <QtCore/QObject>
#include <QDebug>
#include <BufferParser.h>
#include <udt/PacketHeaders.h>
#include <UUID.h>
#include "RegisteredMetaTypes.h"
#include "EntityItemID.h"

View file

@ -18,11 +18,11 @@
#include <GLMHelpers.h>
#include <SettingHandle.h>
#include <UUID.h>
#include "AddressManager.h"
#include "NodeList.h"
#include "NetworkLogging.h"
#include "AddressManager.h"
#include "UUID.h"
const QString ADDRESS_MANAGER_SETTINGS_GROUP = "AddressManager";
const QString SETTINGS_CURRENT_ADDRESS_KEY = "address";

View file

@ -14,6 +14,7 @@
#include <QtCore/QSharedPointer>
#include "UUID.h"
#include "udt/Packet.h"
class NLPacket : public udt::Packet {

View file

@ -16,11 +16,10 @@
#include <QtCore/QDataStream>
#include <SharedUtil.h>
#include <UUID.h>
#include "NetworkLogging.h"
#include "BandwidthRecorder.h"
#include "NetworkLogging.h"
#include "UUID.h"
NetworkPeer::NetworkPeer(QObject* parent) :
QObject(parent),

View file

@ -12,10 +12,9 @@
#include <cstring>
#include <stdio.h>
#include <UUID.h>
#include "Node.h"
#include "SharedUtil.h"
#include "UUID.h"
#include <QtCore/QDataStream>
#include <QtCore/QDebug>

View file

@ -1,6 +1,6 @@
//
// WalletTransaction.cpp
// domain-server/src
// libraries/networking/src
//
// Created by Stephen Birarda on 2014-05-20.
// Copyright 2014 High Fidelity, Inc.
@ -11,10 +11,10 @@
#include <QtCore/QJsonObject>
#include <UUID.h>
#include "UUID.h"
#include "WalletTransaction.h"
WalletTransaction::WalletTransaction() :
_uuid(),
_destinationUUID(),
@ -64,4 +64,4 @@ void WalletTransaction::loadFromJson(const QJsonObject& jsonObject) {
_uuid = QUuid(transactionObject.value(TRANSACTION_ID_KEY).toString());
_destinationUUID = QUuid(transactionObject.value(TRANSACTION_DESTINATION_WALLET_ID_KEY).toString());
_amount = transactionObject.value(TRANSACTION_AMOUNT_KEY).toInt();
}
}

View file

@ -50,6 +50,8 @@ void Connection::sendReliablePacket(unique_ptr<Packet> packet) {
if (!_sendQueue) {
// Lasily create send queue
_sendQueue = SendQueue::create(_parentSocket, _destination);
QObject::connect(_sendQueue.get(), &SendQueue::packetSent, this, &Connection::packetSent);
}
_sendQueue->queuePacket(move(packet));

View file

@ -15,6 +15,8 @@
#include <chrono>
#include <memory>
#include <QtCore/QObject>
#include "ConnectionStats.h"
#include "Constants.h"
#include "LossList.h"
@ -29,8 +31,8 @@ class ControlPacket;
class Packet;
class Socket;
class Connection {
class Connection : public QObject {
Q_OBJECT
public:
using SequenceNumberTimePair = std::pair<SequenceNumber, std::chrono::high_resolution_clock::time_point>;
using SentACKMap = std::unordered_map<SequenceNumber, SequenceNumberTimePair>;
@ -44,6 +46,9 @@ public:
bool processReceivedSequenceNumber(SequenceNumber sequenceNumber); // returns indicates if this packet was a duplicate
void processControl(std::unique_ptr<ControlPacket> controlPacket);
signals:
void packetSent();
private:
void sendACK(bool wasCausedBySyncTimeout = true);

View file

@ -21,8 +21,6 @@
#include <QtCore/QSet>
#include <QtCore/QUuid>
#include "UUID.h"
// If adding a new packet packetType, you can replace one marked usable or add at the end.
// If you want the name of the packet packetType to be available for debugging or logging, update nameForPacketType() as well
// This enum must hold 256 or fewer packet types (so the value is <= 255) since it is statically typed as a uint8_t

View file

@ -189,6 +189,8 @@ void SendQueue::loop() {
Q_ASSERT_X(!nextPacket,
"SendQueue::sendNextPacket()", "Overriden packet in sent list");
emit packetSent();
if (shouldSendSecondOfPair) {
std::unique_ptr<Packet> pairedPacket;
@ -209,6 +211,8 @@ void SendQueue::loop() {
_sentPackets[pairedPacket->getSequenceNumber()].swap(pairedPacket);
Q_ASSERT_X(!pairedPacket,
"SendQueue::sendNextPacket()", "Overriden packet in sent list");
emit packetSent();
}
}
}

View file

@ -61,6 +61,9 @@ public slots:
void ack(SequenceNumber ack);
void nak(SequenceNumber start, SequenceNumber end);
void overrideNAKListFromPacket(ControlPacket& packet);
signals:
void packetSent();
private slots:
void loop();

View file

@ -85,11 +85,7 @@ qint64 Socket::writePacket(const Packet& packet, const HifiSockAddr& sockAddr) {
qint64 Socket::writePacket(std::unique_ptr<Packet> packet, const HifiSockAddr& sockAddr) {
if (packet->isReliable()) {
auto it = _connectionsHash.find(sockAddr);
if (it == _connectionsHash.end()) {
it = _connectionsHash.insert(it, std::make_pair(sockAddr, new Connection(this, sockAddr, _ccFactory->create())));
}
it->second->sendReliablePacket(std::move(packet));
auto connection = findOrCreateConnection(sockAddr);
return 0;
}
@ -111,6 +107,16 @@ qint64 Socket::writeDatagram(const QByteArray& datagram, const HifiSockAddr& soc
return bytesWritten;
}
Connection* Socket::findOrCreateConnection(const HifiSockAddr& sockAddr) {
auto it = _connectionsHash.find(sockAddr);
if (it == _connectionsHash.end()) {
it = _connectionsHash.insert(it, std::make_pair(sockAddr, new Connection(this, sockAddr, _ccFactory->create())));
}
return it->second;
}
void Socket::readPendingDatagrams() {
while (_udpSocket.hasPendingDatagrams()) {
// setup a HifiSockAddr to read into
@ -144,11 +150,8 @@ void Socket::readPendingDatagrams() {
auto controlPacket = ControlPacket::fromReceivedPacket(std::move(buffer), packetSizeWithHeader, senderSockAddr);
// move this control packet to the matching connection
auto it = _connectionsHash.find(senderSockAddr);
if (it != _connectionsHash.end()) {
it->second->processControl(move(controlPacket));
}
auto connection = findOrCreateConnection(senderSockAddr);
connection->processControl(move(controlPacket));
} else {
// setup a Packet from the data we just read
@ -176,6 +179,13 @@ void Socket::readPendingDatagrams() {
}
}
void Socket::connectToSendSignal(const HifiSockAddr& destinationAddr, QObject* receiver, const char* slot) {
auto it = _connectionsHash.find(destinationAddr);
if (it != _connectionsHash.end()) {
connect(it->second, SIGNAL(packetSent()), receiver, slot);
}
}
void Socket::rateControlSync() {
// enumerate our list of connections and ask each of them to send off periodic ACK packet for rate control

View file

@ -60,6 +60,8 @@ public:
{ _unfilteredHandlers[senderSockAddr] = handler; }
void setCongestionControlFactory(std::unique_ptr<CongestionControlVirtualFactory> ccFactory);
void connectToSendSignal(const HifiSockAddr& destinationAddr, QObject* receiver, const char* slot);
private slots:
void readPendingDatagrams();
@ -67,6 +69,7 @@ private slots:
private:
void setSystemBufferSizes();
Connection* findOrCreateConnection(const HifiSockAddr& sockAddr);
QUdpSocket _udpSocket { this };
PacketFilterOperator _packetFilterOperator;

View file

@ -14,6 +14,7 @@
#include <QtCore/QDebug>
#include <udt/Constants.h>
#include <udt/Packet.h>
const QCommandLineOption PORT_OPTION { "p", "listening port for socket (defaults to random)", "port", 0 };
const QCommandLineOption TARGET_OPTION {
@ -45,6 +46,9 @@ UDTTest::UDTTest(int& argc, char** argv) :
{
parseArguments();
// randomize the seed for packet size randomization
srand(time(NULL));
_socket.bind(QHostAddress::LocalHost, _argumentParser.value(PORT_OPTION).toUInt());
qDebug() << "Test socket is listening on" << _socket.localPort();
@ -53,7 +57,7 @@ UDTTest::UDTTest(int& argc, char** argv) :
QString hostnamePortString = _argumentParser.value(TARGET_OPTION);
QHostAddress address { hostnamePortString.left(hostnamePortString.indexOf(':')) };
quint16 port { (quint16) hostnamePortString.right(hostnamePortString.indexOf(':') + 1).toUInt() };
quint16 port { (quint16) hostnamePortString.mid(hostnamePortString.indexOf(':') + 1).toUInt() };
if (address.isNull() || port == 0) {
qCritical() << "Could not parse an IP address and port combination from" << hostnamePortString << "-" <<
@ -85,9 +89,9 @@ UDTTest::UDTTest(int& argc, char** argv) :
if (_argumentParser.isSet(MAX_PACKET_SIZE)) {
_maxPacketSize = _argumentParser.value(MAX_PACKET_SIZE).toInt();
// if we don't have a min packet size we should make it zero, because we have a max
// if we don't have a min packet size we should make it 1, because we have a max
if (customMinSize) {
_minPacketSize = 0;
_minPacketSize = 1;
}
}
@ -108,6 +112,10 @@ UDTTest::UDTTest(int& argc, char** argv) :
if (_argumentParser.isSet(UNRELIABLE_PACKETS)) {
_sendReliable = false;
}
if (!_target.isNull()) {
sendInitialPackets();
}
}
void UDTTest::parseArguments() {
@ -133,3 +141,58 @@ void UDTTest::parseArguments() {
Q_UNREACHABLE();
}
}
void UDTTest::sendInitialPackets() {
static const int NUM_INITIAL_PACKETS = 500;
int numPackets = std::max(NUM_INITIAL_PACKETS, _maxSendPackets);
for (int i = 0; i < numPackets; ++i) {
sendPacket();
}
if (numPackets == NUM_INITIAL_PACKETS) {
// we've put 500 initial packets in the queue, everytime we hear one has gone out we should add a new one
_socket.connectToSendSignal(_target, this, SLOT(refillPacket()));
}
}
void UDTTest::sendPacket() {
qDebug() << "Sending packet" << _totalQueuedPackets + 1;
if (_maxSendPackets != -1 && _totalQueuedPackets > _maxSendPackets) {
// don't send more packets, we've hit max
return;
}
if (_maxSendBytes != -1 && _totalQueuedBytes > _maxSendBytes) {
// don't send more packets, we've hit max
return;
}
// we're good to send a new packet, construct it now
// figure out what size the packet will be
int packetPayloadSize = 0;
if (_minPacketSize == _maxPacketSize) {
// we know what size we want - figure out the payload size
packetPayloadSize = _maxPacketSize - udt::Packet::localHeaderSize(false);
} else {
// pick a random size in our range
int randomPacketSize = rand() % _maxPacketSize + _minPacketSize;
packetPayloadSize = randomPacketSize - udt::Packet::localHeaderSize(false);
}
auto newPacket = udt::Packet::create(packetPayloadSize, _sendReliable);
// queue or send this packet by calling write packet on the socket for our target
if (_sendReliable) {
_socket.writePacket(std::move(newPacket), _target);
} else {
_socket.writePacket(*newPacket, _target);
}
++_totalQueuedPackets;
}

View file

@ -21,22 +21,33 @@
#include <udt/Socket.h>
class UDTTest : public QCoreApplication {
Q_OBJECT
public:
UDTTest(int& argc, char** argv);
public slots:
void refillPacket() { sendPacket(); } // adds a new packet to the queue when we are told one is sent
private:
void parseArguments();
void sendInitialPackets(); // fills the queue with packets to start
void sendPacket(); // constructs and sends a packet according to the test parameters
QCommandLineParser _argumentParser;
udt::Socket _socket;
HifiSockAddr _target; // the target for sent packets
int _minPacketSize = udt::MAX_PACKET_SIZE_WITH_UDP_HEADER;
int _maxPacketSize = udt::MAX_PACKET_SIZE_WITH_UDP_HEADER;
int _maxSendBytes = -1; // the number of bytes to send to the target before stopping
int _maxSendPackets = -1; // the number of packets to send to the target before stopping
int _minPacketSize { udt::MAX_PACKET_SIZE };
int _maxPacketSize { udt::MAX_PACKET_SIZE };
int _maxSendBytes { -1 }; // the number of bytes to send to the target before stopping
int _maxSendPackets { -1 }; // the number of packets to send to the target before stopping
bool _sendReliable = true; // wether packets are sent reliably or unreliably
bool _sendReliable { true }; // wether packets are sent reliably or unreliably
int _totalQueuedPackets { 0 }; // keeps track of the number of packets we have already queued
int _totalQueuedBytes { 0 }; // keeps track of the number of bytes we have already queued
};
#endif // hifi_UDTTest_h