initial support to get CongestionControl down to Connection

This commit is contained in:
Stephen Birarda 2015-07-28 17:58:49 -07:00
parent 988bd226ca
commit 0e0968f748
6 changed files with 50 additions and 27 deletions

View file

@ -15,10 +15,9 @@
using namespace udt; using namespace udt;
void UdtCC::init() { void DefaultCC::init() {
_rcInterval = _synInterval;
_lastRCTime = usecTimestampNow(); _lastRCTime = usecTimestampNow();
setAckTimer(_rcInterval); setAckTimer(synInterval());
_lastAck = _sendCurrSeqNum; _lastAck = _sendCurrSeqNum;
_lastDecSeq = SequenceNumber{ SequenceNumber::MAX }; _lastDecSeq = SequenceNumber{ SequenceNumber::MAX };
@ -27,7 +26,7 @@ void UdtCC::init() {
_packetSendPeriod = 1.0; _packetSendPeriod = 1.0;
} }
void UdtCC::onACK(SequenceNumber ackNum) { void DefaultCC::onACK(SequenceNumber ackNum) {
int64_t B = 0; int64_t B = 0;
double inc = 0; double inc = 0;
// Note: 1/24/2012 // Note: 1/24/2012
@ -37,7 +36,7 @@ void UdtCC::onACK(SequenceNumber ackNum) {
const double min_inc = 0.01; const double min_inc = 0.01;
uint64_t currtime = usecTimestampNow(); uint64_t currtime = usecTimestampNow();
if (currtime - _lastRCTime < (uint64_t)_rcInterval) { if (currtime - _lastRCTime < (uint64_t)synInterval()) {
return; return;
} }
@ -52,11 +51,11 @@ void UdtCC::onACK(SequenceNumber ackNum) {
if (_recvieveRate > 0) { if (_recvieveRate > 0) {
_packetSendPeriod = 1000000.0 / _recvieveRate; _packetSendPeriod = 1000000.0 / _recvieveRate;
} else { } else {
_packetSendPeriod = (_rtt + _rcInterval) / _congestionWindowSize; _packetSendPeriod = (_rtt + synInterval()) / _congestionWindowSize;
} }
} }
} else { } else {
_congestionWindowSize = _recvieveRate / 1000000.0 * (_rtt + _rcInterval) + 16; _congestionWindowSize = _recvieveRate / 1000000.0 * (_rtt + synInterval()) + 16;
} }
// During Slow Start, no rate increase // During Slow Start, no rate increase
@ -86,10 +85,10 @@ void UdtCC::onACK(SequenceNumber ackNum) {
} }
} }
_packetSendPeriod = (_packetSendPeriod * _rcInterval) / (_packetSendPeriod * inc + _rcInterval); _packetSendPeriod = (_packetSendPeriod * synInterval()) / (_packetSendPeriod * inc + synInterval());
} }
void UdtCC::onLoss(const std::vector<SequenceNumber>& losslist) { void DefaultCC::onLoss(const std::vector<SequenceNumber>& losslist) {
//Slow Start stopped, if it hasn't yet //Slow Start stopped, if it hasn't yet
if (_slowStart) { if (_slowStart) {
_slowStart = false; _slowStart = false;
@ -101,7 +100,7 @@ void UdtCC::onLoss(const std::vector<SequenceNumber>& losslist) {
// If no receiving rate is observed, we have to compute the sending // If no receiving rate is observed, we have to compute the sending
// rate according to the current window size, and decrease it // rate according to the current window size, and decrease it
// using the method below. // using the method below.
_packetSendPeriod = _congestionWindowSize / (_rtt + _rcInterval); _packetSendPeriod = _congestionWindowSize / (_rtt + synInterval());
} }
_loss = true; _loss = true;
@ -128,13 +127,13 @@ void UdtCC::onLoss(const std::vector<SequenceNumber>& losslist) {
} }
} }
void UdtCC::onTimeout() { void DefaultCC::onTimeout() {
if (_slowStart) { if (_slowStart) {
_slowStart = false; _slowStart = false;
if (_recvieveRate > 0) { if (_recvieveRate > 0) {
_packetSendPeriod = 1000000.0 / _recvieveRate; _packetSendPeriod = 1000000.0 / _recvieveRate;
} else { } else {
_packetSendPeriod = _congestionWindowSize / (_rtt + _rcInterval); _packetSendPeriod = _congestionWindowSize / (_rtt + synInterval());
} }
} else { } else {
/* /*

View file

@ -17,15 +17,19 @@
#include "SequenceNumber.h" #include "SequenceNumber.h"
namespace udt { namespace udt {
static const int32_t DEFAULT_SYN_INTERVAL = 10000; // 10 ms
class Packet; class Packet;
class CongestionControl { class CongestionControl {
public: public:
static const int32_t DEFAULT_SYN_INTERVAL = 10000; // 10 ms
CongestionControl() {} CongestionControl() {};
CongestionControl(int synInterval) : _synInterval(synInterval) {}
virtual ~CongestionControl() {} virtual ~CongestionControl() {}
int synInterval() const { return _synInterval; }
virtual void init() {} virtual void init() {}
virtual void close() {} virtual void close() {}
@ -35,12 +39,10 @@ public:
virtual void onPacketReceived(const Packet& packet) {} virtual void onPacketReceived(const Packet& packet) {}
protected: protected:
void setAckTimer(int syn) { _ackPeriod = (syn > _synInterval) ? _synInterval : syn; } void setAckTimer(int period) { _ackPeriod = (period > _synInterval) ? _synInterval : period; }
void setAckInterval(int interval) { _ackInterval = interval; } void setAckInterval(int ackInterval) { _ackInterval = ackInterval; }
void setRto(int rto) { _userDefinedRto = true; _rto = rto; } void setRto(int rto) { _userDefinedRto = true; _rto = rto; }
int32_t _synInterval = DEFAULT_SYN_INTERVAL; // UDT constant parameter, SYN
double _packetSendPeriod = 1.0; // Packet sending period, in microseconds double _packetSendPeriod = 1.0; // Packet sending period, in microseconds
double _congestionWindowSize = 16.0; // Congestion window size, in packets double _congestionWindowSize = 16.0; // Congestion window size, in packets
@ -66,6 +68,8 @@ private:
int _ackPeriod = 0; // Periodical timer to send an ACK, in milliseconds int _ackPeriod = 0; // Periodical timer to send an ACK, in milliseconds
int _ackInterval = 0; // How many packets to send one ACK, in packets int _ackInterval = 0; // How many packets to send one ACK, in packets
int _synInterval { DEFAULT_SYN_INTERVAL };
bool _userDefinedRto = false; // if the RTO value is defined by users bool _userDefinedRto = false; // if the RTO value is defined by users
int _rto = -1; // RTO value, microseconds int _rto = -1; // RTO value, microseconds
}; };
@ -75,6 +79,8 @@ class CongestionControlVirtualFactory {
public: public:
virtual ~CongestionControlVirtualFactory() {} virtual ~CongestionControlVirtualFactory() {}
static int synInterval() { return DEFAULT_SYN_INTERVAL; }
virtual std::unique_ptr<CongestionControl> create() = 0; virtual std::unique_ptr<CongestionControl> create() = 0;
}; };
@ -82,13 +88,12 @@ template <class T> class CongestionControlFactory: public CongestionControlVirtu
{ {
public: public:
virtual ~CongestionControlFactory() {} virtual ~CongestionControlFactory() {}
virtual std::unique_ptr<CongestionControl> create() { return std::unique_ptr<T>(new T()); } virtual std::unique_ptr<CongestionControl> create() { return std::unique_ptr<T>(new T()); }
}; };
class UdtCC: public CongestionControl { class DefaultCC: public CongestionControl {
public: public:
UdtCC() {} DefaultCC() {}
public: public:
virtual void init(); virtual void init();
@ -97,7 +102,6 @@ public:
virtual void onTimeout(); virtual void onTimeout();
private: private:
int _rcInterval = 0; // UDT Rate control interval
uint64_t _lastRCTime = 0; // last rate increase time uint64_t _lastRCTime = 0; // last rate increase time
bool _slowStart = true; // if in slow start phase bool _slowStart = true; // if in slow start phase
SequenceNumber _lastAck; // last ACKed seq num SequenceNumber _lastAck; // last ACKed seq num
@ -112,4 +116,4 @@ private:
} }
#endif // hifi_CongestionControl_h #endif // hifi_CongestionControl_h

View file

@ -16,6 +16,7 @@
#include <NumericalConstants.h> #include <NumericalConstants.h>
#include "../HifiSockAddr.h" #include "../HifiSockAddr.h"
#include "CongestionControl.h"
#include "ControlPacket.h" #include "ControlPacket.h"
#include "Packet.h" #include "Packet.h"
#include "Socket.h" #include "Socket.h"
@ -24,10 +25,12 @@ using namespace udt;
using namespace std; using namespace std;
using namespace std::chrono; using namespace std::chrono;
Connection::Connection(Socket* parentSocket, HifiSockAddr destination) : Connection::Connection(Socket* parentSocket, HifiSockAddr destination, unique_ptr<CongestionControl> congestionControl) :
_parentSocket(parentSocket), _parentSocket(parentSocket),
_destination(destination) _destination(destination),
_congestionControl(move(congestionControl))
{ {
} }
Connection::~Connection() { Connection::~Connection() {
@ -343,6 +346,7 @@ void Connection::processACK(std::unique_ptr<ControlPacket> controlPacket) {
} }
// fire the onACK callback for congestion control // fire the onACK callback for congestion control
_congestionControl->onAck(ack);
// update the total count of received ACKs // update the total count of received ACKs
++_totalReceivedACKs; ++_totalReceivedACKs;

View file

@ -22,6 +22,7 @@
namespace udt { namespace udt {
class CongestionControl;
class ControlPacket; class ControlPacket;
class Packet; class Packet;
class Socket; class Socket;
@ -32,7 +33,7 @@ public:
using SequenceNumberTimePair = std::pair<SequenceNumber, std::chrono::high_resolution_clock::time_point>; using SequenceNumberTimePair = std::pair<SequenceNumber, std::chrono::high_resolution_clock::time_point>;
using SentACKMap = std::unordered_map<SequenceNumber, SequenceNumberTimePair>; using SentACKMap = std::unordered_map<SequenceNumber, SequenceNumberTimePair>;
Connection(Socket* parentSocket, HifiSockAddr destination); Connection(Socket* parentSocket, HifiSockAddr destination, std::unique_ptr<CongestionControl> congestionControl);
~Connection(); ~Connection();
void sendReliablePacket(std::unique_ptr<Packet> packet); void sendReliablePacket(std::unique_ptr<Packet> packet);
@ -89,6 +90,8 @@ private:
std::unique_ptr<SendQueue> _sendQueue; std::unique_ptr<SendQueue> _sendQueue;
std::unique_ptr<CongestionControl> _congestionControl;
// Control Packet stat collection // Control Packet stat collection
int _totalReceivedACKs { 0 }; int _totalReceivedACKs { 0 };
int _totalSentACKs { 0 }; int _totalSentACKs { 0 };

View file

@ -79,7 +79,7 @@ qint64 Socket::writePacket(std::unique_ptr<Packet> packet, const HifiSockAddr& s
if (packet->isReliable()) { if (packet->isReliable()) {
auto it = _connectionsHash.find(sockAddr); auto it = _connectionsHash.find(sockAddr);
if (it == _connectionsHash.end()) { if (it == _connectionsHash.end()) {
it = _connectionsHash.insert(it, std::make_pair(sockAddr, new Connection(this, sockAddr))); it = _connectionsHash.insert(it, std::make_pair(sockAddr, new Connection(this, sockAddr, _ccFactory->create())));
} }
it->second->sendReliablePacket(std::move(packet)); it->second->sendReliablePacket(std::move(packet));
return 0; return 0;
@ -154,3 +154,11 @@ void Socket::rateControlSync() {
_synTimer.start(_synInterval); _synTimer.start(_synInterval);
} }
} }
void Socket::setCongestionControlFactory(std::unique_ptr<CongestionControlVirtualFactory> ccFactory) {
// swap the current unique_ptr for the new factory
_ccFactory.swap(ccFactory);
// update the _synInterval to the value from the factory
_synInterval = _ccFactory->synInterval();
}

View file

@ -22,6 +22,7 @@
#include <QtNetwork/QUdpSocket> #include <QtNetwork/QUdpSocket>
#include "../HifiSockAddr.h" #include "../HifiSockAddr.h"
#include "CongestionControl.h"
#include "Connection.h" #include "Connection.h"
namespace udt { namespace udt {
@ -59,6 +60,8 @@ public:
void addUnfilteredHandler(const HifiSockAddr& senderSockAddr, BasePacketHandler handler) void addUnfilteredHandler(const HifiSockAddr& senderSockAddr, BasePacketHandler handler)
{ _unfilteredHandlers[senderSockAddr] = handler; } { _unfilteredHandlers[senderSockAddr] = handler; }
void setCongestionControlFactory(std::unique_ptr<CongestionControlVirtualFactory> ccFactory);
private slots: private slots:
void readPendingDatagrams(); void readPendingDatagrams();
@ -78,6 +81,8 @@ private:
int32_t _synInterval = 10; // 10ms int32_t _synInterval = 10; // 10ms
QTimer _synTimer; QTimer _synTimer;
std::unique_ptr<CongestionControlVirtualFactory> _ccFactory { new CongestionControlFactory<DefaultCC>() };
}; };
} // namespace udt } // namespace udt