diff --git a/libraries/networking/src/udt/CongestionControl.cpp b/libraries/networking/src/udt/CongestionControl.cpp index ebbf86ecd8..4f32be0d06 100644 --- a/libraries/networking/src/udt/CongestionControl.cpp +++ b/libraries/networking/src/udt/CongestionControl.cpp @@ -1,6 +1,6 @@ // // CongestionControl.cpp -// +// libraries/networking/src/udt // // Created by Clement on 7/23/15. // Copyright 2015 High Fidelity, Inc. diff --git a/libraries/networking/src/udt/CongestionControl.h b/libraries/networking/src/udt/CongestionControl.h index fdf8803ec0..fe7f4a15fc 100644 --- a/libraries/networking/src/udt/CongestionControl.h +++ b/libraries/networking/src/udt/CongestionControl.h @@ -1,6 +1,6 @@ // // CongestionControl.h -// +// libraries/networking/src/udt // // Created by Clement on 7/23/15. // Copyright 2015 High Fidelity, Inc. diff --git a/libraries/networking/src/udt/Connection.cpp b/libraries/networking/src/udt/Connection.cpp index d3b521e054..20d8a9cb92 100644 --- a/libraries/networking/src/udt/Connection.cpp +++ b/libraries/networking/src/udt/Connection.cpp @@ -18,6 +18,7 @@ using namespace udt; using namespace std; +using namespace std::chrono; Connection::Connection(Socket* parentSocket, HifiSockAddr destination) { @@ -29,21 +30,105 @@ void Connection::send(unique_ptr packet) { } } +void Connection::sendACK(bool wasCausedBySyncTimeout) { + static const int ACK_PACKET_PAYLOAD_BYTES = sizeof(_lastSentACK) + sizeof(_currentACKSubSequenceNumber) + + sizeof(_rtt) + sizeof(_rttVariance) + sizeof(int32_t) + sizeof(int32_t); + + // setup the ACK packet, make it static so we can re-use it + static auto ackPacket = ControlPacket::create(ControlPacket::ACK, ACK_PACKET_PAYLOAD_BYTES); + + auto currentTime = high_resolution_clock::now(); + + SeqNum nextACKNumber = nextACK(); + + if (nextACKNumber <= _lastReceivedAcknowledgedACK) { + // we already got an ACK2 for this ACK we would be sending, don't bother + return; + } + + if (nextACKNumber >= _lastSentACK) { + // we have received new packets since the last sent ACK + + // update the last sent ACK + _lastSentACK = nextACKNumber; + + // remove the ACKed packets from the receive queue + + } else if (nextACKNumber == _lastSentACK) { + // We already sent this ACK, but check if we should re-send it. + // We will re-send if it has been more than RTT + (4 * RTT variance) since the last ACK + milliseconds sinceLastACK = duration_cast(currentTime - _lastACKTime); + if (sinceLastACK.count() < (_rtt + (4 * _rttVariance))) { + return; + } + } + + // reset the ACK packet so we can fill it up and have it figure out what size it is + ackPacket->reset(); + + // pack in the ACK sub-sequence number + ackPacket->writePrimitive(_currentACKSubSequenceNumber++); + + // pack in the ACK number + ackPacket->writePrimitive(nextACKNumber); + + // pack in the RTT and variance + ackPacket->writePrimitive(_rtt); + ackPacket->writePrimitive(_rttVariance); + + // pack the available buffer size - must be a minimum of 2 + + if (wasCausedBySyncTimeout) { + // pack in the receive speed and bandwidth + + // record this as the last ACK send time + _lastACKTime = high_resolution_clock::now(); + } + + // have the send queue send off our packet + _sendQueue->sendPacket(*ackPacket); +} + +void Connection::sendLightACK() const { + static const int LIGHT_ACK_PACKET_PAYLOAD_BYTES = 4; + + // create the light ACK packet, make it static so we can re-use it + static auto lightACKPacket = ControlPacket::create(ControlPacket::ACK, LIGHT_ACK_PACKET_PAYLOAD_BYTES); + + SeqNum nextACKNumber = nextACK(); + + if (nextACKNumber == _lastReceivedAcknowledgedACK) { + // we already got an ACK2 for this ACK we would be sending, don't bother + return; + } + + // pack in the ACK + memcpy(lightACKPacket->getPayload(), &nextACKNumber, sizeof(nextACKNumber)); + + // have the send queue send off our packet + _sendQueue->sendPacket(*lightACKPacket); +} + +SeqNum Connection::nextACK() const { + // TODO: check if we have a loss list + return _largestReceivedSeqNum + 1; +} + void Connection::processReceivedSeqNum(SeqNum seq) { // If this is not the next sequence number, report loss - if (seq > _largestRecievedSeqNum + 1) { - if (_largestRecievedSeqNum + 1 == seq - 1) { - _lossList.append(_largestRecievedSeqNum + 1); + if (seq > _largestReceivedSeqNum + 1) { + if (_largestReceivedSeqNum + 1 == seq - 1) { + _lossList.append(_largestReceivedSeqNum + 1); } else { - _lossList.append(_largestRecievedSeqNum + 1, seq - 1); + _lossList.append(_largestReceivedSeqNum + 1, seq - 1); } // TODO: Send loss report } - if (seq > _largestRecievedSeqNum) { + if (seq > _largestReceivedSeqNum) { // Update largest recieved sequence number - _largestRecievedSeqNum = seq; + _largestReceivedSeqNum = seq; } else { // Otherwise, it's a resend, remove it from the loss list _lossList.remove(seq); @@ -52,13 +137,13 @@ void Connection::processReceivedSeqNum(SeqNum seq) { void Connection::processControl(unique_ptr controlPacket) { switch (controlPacket->getType()) { - case ControlPacket::Type::ACK: + case ControlPacket::ACK: break; - case ControlPacket::Type::ACK2: + case ControlPacket::ACK2: break; - case ControlPacket::Type::NAK: + case ControlPacket::NAK: break; - case ControlPacket::Type::PacketPair: + case ControlPacket::PacketPair: break; } } diff --git a/libraries/networking/src/udt/Connection.h b/libraries/networking/src/udt/Connection.h index 3b25c7f1b1..f67404a16b 100644 --- a/libraries/networking/src/udt/Connection.h +++ b/libraries/networking/src/udt/Connection.h @@ -12,6 +12,7 @@ #ifndef hifi_Connection_h #define hifi_Connection_h +#include #include #include "LossList.h" @@ -32,13 +33,26 @@ public: void send(std::unique_ptr packet); + void sendACK(bool wasCausedBySyncTimeout = true); + void sendLightACK() const; + + SeqNum nextACK() const; + void processReceivedSeqNum(SeqNum seq); void processControl(std::unique_ptr controlPacket); private: - LossList _lossList; - SeqNum _largestRecievedSeqNum; + SeqNum _largestReceivedSeqNum; // The largest sequence number received from the peer + SeqNum _lastSentACK; // The last sent ACK + SeqNum _lastReceivedAcknowledgedACK; // The last sent ACK that has been acknowledged via an ACK2 from the peer + SeqNum _currentACKSubSequenceNumber; // The current ACK sub-sequence number (used for Acknowledgment of ACKs) + + int32_t _rtt; // RTT, in milliseconds + int32_t _rttVariance; // RTT variance + + std::chrono::high_resolution_clock::time_point _lastACKTime; + std::unique_ptr _sendQueue; }; diff --git a/libraries/networking/src/udt/ControlPacket.cpp b/libraries/networking/src/udt/ControlPacket.cpp index a8688f2cdf..8b863c5a21 100644 --- a/libraries/networking/src/udt/ControlPacket.cpp +++ b/libraries/networking/src/udt/ControlPacket.cpp @@ -13,8 +13,18 @@ using namespace udt; -std::unique_ptr ControlPacket::create(Type type, const SequenceNumberList& sequenceNumbers) { - return ControlPacket::create(type, sequenceNumbers); +std::unique_ptr ControlPacket::create(Type type, qint64 size) { + + std::unique_ptr controlPacket; + + if (size == -1) { + return ControlPacket::create(type); + } else { + // Fail with invalid size + Q_ASSERT(size >= 0); + + return ControlPacket::create(type, size); + } } ControlPacket::ControlPacketPair ControlPacket::createPacketPair(quint64 timestamp) { @@ -25,25 +35,33 @@ ControlPacket::ControlPacketPair ControlPacket::createPacketPair(quint64 timesta } qint64 ControlPacket::localHeaderSize() { - return sizeof(TypeAndSubSequenceNumber); + return sizeof(ControlBitAndType); } qint64 ControlPacket::totalHeadersSize() const { return BasePacket::totalHeadersSize() + localHeaderSize(); } -ControlPacket::ControlPacket(Type type, const SequenceNumberList& sequenceNumbers) : - BasePacket(localHeaderSize() + (sizeof(Packet::SequenceNumber) * sequenceNumbers.size())), +ControlPacket::ControlPacket(Type type) : + BasePacket(-1), _type(type) { adjustPayloadStartAndCapacity(); open(QIODevice::ReadWrite); - // pack in the sequence numbers - for (auto& sequenceNumber : sequenceNumbers) { - writePrimitive(sequenceNumber); - } + writeControlBitAndType(); +} + +ControlPacket::ControlPacket(Type type, qint64 size) : + BasePacket(localHeaderSize() + size), + _type(type) +{ + adjustPayloadStartAndCapacity(); + + open(QIODevice::ReadWrite); + + writeControlBitAndType(); } ControlPacket::ControlPacket(quint64 timestamp) : @@ -54,6 +72,8 @@ ControlPacket::ControlPacket(quint64 timestamp) : open(QIODevice::ReadWrite); + writeControlBitAndType(); + // pack in the timestamp writePrimitive(timestamp); } @@ -71,3 +91,15 @@ ControlPacket& ControlPacket::operator=(ControlPacket&& other) { return *this; } + +static const uint32_t CONTROL_BIT_MASK = 1 << (sizeof(ControlPacket::ControlBitAndType) - 1); + +void ControlPacket::writeControlBitAndType() { + ControlBitAndType* bitAndType = reinterpret_cast(_packet.get()); + + // write the control bit by OR'ing the current value with the CONTROL_BIT_MASK + *bitAndType = (*bitAndType | CONTROL_BIT_MASK); + + // write the type by OR'ing the type with the current value & CONTROL_BIT_MASK + *bitAndType = (*bitAndType & CONTROL_BIT_MASK) | (_type << sizeof((ControlPacket::Type) - 1)); +} diff --git a/libraries/networking/src/udt/ControlPacket.h b/libraries/networking/src/udt/ControlPacket.h index 8b9ccbf073..6d1b7eb5d3 100644 --- a/libraries/networking/src/udt/ControlPacket.h +++ b/libraries/networking/src/udt/ControlPacket.h @@ -21,23 +21,21 @@ namespace udt { -using SequenceNumberList = std::vector; - class ControlPacket : public BasePacket { Q_OBJECT public: - using TypeAndSubSequenceNumber = uint32_t; + using ControlBitAndType = uint32_t; using ControlPacketPair = std::pair, std::unique_ptr>; - enum class Type : uint16_t { + enum Type : uint16_t { ACK, ACK2, NAK, PacketPair }; - std::unique_ptr create(Type type, const SequenceNumberList& sequenceNumbers); - ControlPacketPair createPacketPair(quint64 timestamp); + static std::unique_ptr create(Type type, qint64 size = -1); + static ControlPacketPair createPacketPair(quint64 timestamp); static qint64 localHeaderSize(); // Current level's header size virtual qint64 totalHeadersSize() const; // Cumulated size of all the headers @@ -45,7 +43,8 @@ public: Type getType() const { return _type; } private: - ControlPacket(Type type, const SequenceNumberList& sequenceNumbers); + ControlPacket(Type type); + ControlPacket(Type type, qint64 size); ControlPacket(quint64 timestamp); ControlPacket(ControlPacket&& other); ControlPacket(const ControlPacket& other) = delete; @@ -53,6 +52,9 @@ private: ControlPacket& operator=(ControlPacket&& other); ControlPacket& operator=(const ControlPacket& other) = delete; + // Header writers + void writeControlBitAndType(); + Type _type; }; diff --git a/libraries/networking/src/udt/Packet.cpp b/libraries/networking/src/udt/Packet.cpp index e9a5636172..a70c3bfbef 100644 --- a/libraries/networking/src/udt/Packet.cpp +++ b/libraries/networking/src/udt/Packet.cpp @@ -117,7 +117,6 @@ Packet& Packet::operator=(Packet&& other) { static const uint32_t CONTROL_BIT_MASK = 1 << (sizeof(Packet::SequenceNumberAndBitField) - 1); static const uint32_t RELIABILITY_BIT_MASK = 1 << (sizeof(Packet::SequenceNumberAndBitField) - 2); static const uint32_t MESSAGE_BIT_MASK = 1 << (sizeof(Packet::SequenceNumberAndBitField) - 3); -static const int BIT_FIELD_LENGTH = 3; static const uint32_t BIT_FIELD_MASK = CONTROL_BIT_MASK | RELIABILITY_BIT_MASK | MESSAGE_BIT_MASK; void Packet::readIsReliable() { diff --git a/libraries/networking/src/udt/Packet.h b/libraries/networking/src/udt/Packet.h index 905af15ded..6189be0ef1 100644 --- a/libraries/networking/src/udt/Packet.h +++ b/libraries/networking/src/udt/Packet.h @@ -33,8 +33,6 @@ public: using MessageNumber = uint32_t; using MessageNumberAndBitField = uint32_t; - static const uint32_t DEFAULT_SEQUENCE_NUMBER = 0; - static std::unique_ptr create(qint64 size = -1, bool isReliable = false, bool isPartOfMessage = false); static std::unique_ptr fromReceivedPacket(std::unique_ptr data, qint64 size, const HifiSockAddr& senderSockAddr); diff --git a/libraries/networking/src/udt/SendQueue.cpp b/libraries/networking/src/udt/SendQueue.cpp index 7561a7042d..65c1807d19 100644 --- a/libraries/networking/src/udt/SendQueue.cpp +++ b/libraries/networking/src/udt/SendQueue.cpp @@ -80,8 +80,8 @@ void SendQueue::stop() { _running = false; } -void SendQueue::sendPacket(const Packet& packet) { - _socket->writeDatagram(packet.getData(), packet.getDataSize(), _destination); +void SendQueue::sendPacket(const BasePacket& packet) { + _socket->writeUnreliablePacket(packet, _destination); } void SendQueue::ack(SeqNum ack) { diff --git a/libraries/networking/src/udt/SendQueue.h b/libraries/networking/src/udt/SendQueue.h index 1a1a2fc90d..9c14c19ede 100644 --- a/libraries/networking/src/udt/SendQueue.h +++ b/libraries/networking/src/udt/SendQueue.h @@ -26,6 +26,7 @@ namespace udt { class Socket; +class BasePacket; class Packet; class SendQueue : public QObject { @@ -47,7 +48,7 @@ public: public slots: void start(); void stop(); - void sendPacket(const Packet& packet); + void sendPacket(const BasePacket& packet); void ack(SeqNum ack); void nak(std::list naks); diff --git a/libraries/networking/src/udt/Socket.cpp b/libraries/networking/src/udt/Socket.cpp index 8bf7f973ae..6923879201 100644 --- a/libraries/networking/src/udt/Socket.cpp +++ b/libraries/networking/src/udt/Socket.cpp @@ -11,6 +11,8 @@ #include "Socket.h" +#include + #include "../NetworkLogging.h" #include "Packet.h" @@ -20,6 +22,12 @@ Socket::Socket(QObject* parent) : QObject(parent) { connect(&_udpSocket, &QUdpSocket::readyRead, this, &Socket::readPendingDatagrams); + + // make sure our synchronization method is called every SYN interval + connect(&_synTimer, &QTimer::timeout, this, &Socket::rateControlSync); + + // start our timer for the synchronization time interval + _synTimer.start(_synInterval); } void Socket::rebind() { @@ -58,7 +66,7 @@ void Socket::setBufferSizes(int numBytes) { } } -qint64 Socket::writeUnreliablePacket(const Packet& packet, const HifiSockAddr& sockAddr) { +qint64 Socket::writeUnreliablePacket(const BasePacket& packet, const HifiSockAddr& sockAddr) { return writeDatagram(packet.getData(), packet.getDataSize(), sockAddr); } @@ -113,3 +121,14 @@ void Socket::readPendingDatagrams() { } } } + +void Socket::rateControlSync() { + + // TODO: enumerate our list of connections and ask each of them to send off periodic ACK packet for rate control + + if (_synTimer.interval() != _synInterval) { + // if the _synTimer interval doesn't match the current _synInterval (changes when the CC factory is changed) + // then restart it now with the right interval + _synTimer.start(_synInterval); + } +} diff --git a/libraries/networking/src/udt/Socket.h b/libraries/networking/src/udt/Socket.h index b23903501e..f74ea49f0b 100644 --- a/libraries/networking/src/udt/Socket.h +++ b/libraries/networking/src/udt/Socket.h @@ -18,13 +18,15 @@ #include #include +#include #include #include "../HifiSockAddr.h" namespace udt { - + class BasePacket; +class ControlSender; class Packet; class SeqNum; @@ -40,7 +42,7 @@ public: quint16 localPort() const { return _udpSocket.localPort(); } - qint64 writeUnreliablePacket(const Packet& packet, const HifiSockAddr& sockAddr); + qint64 writeUnreliablePacket(const BasePacket& packet, const HifiSockAddr& sockAddr); qint64 writeDatagram(const char* data, qint64 size, const HifiSockAddr& sockAddr) { return writeDatagram(QByteArray::fromRawData(data, size), sockAddr); } @@ -59,6 +61,7 @@ public: private slots: void readPendingDatagrams(); + void rateControlSync(); private: QUdpSocket _udpSocket { this }; @@ -68,6 +71,9 @@ private: std::unordered_map _unfilteredHandlers; std::unordered_map _packetSequenceNumbers; + + int32_t _synInterval = 10; // 10ms + QTimer _synTimer; }; } // namespace udt