From 66c308b43682a1b8df364148c12bcc5c9a758a9c Mon Sep 17 00:00:00 2001 From: Stephen Birarda Date: Wed, 29 Jul 2015 16:48:14 -0700 Subject: [PATCH] actually process control and sequence numbers from Socket --- libraries/networking/src/udt/BasePacket.h | 3 +- libraries/networking/src/udt/Connection.cpp | 26 +++++++----- libraries/networking/src/udt/Connection.h | 2 +- libraries/networking/src/udt/Constants.h | 3 ++ .../networking/src/udt/ControlPacket.cpp | 39 +++++++++++++++++- libraries/networking/src/udt/ControlPacket.h | 6 +++ libraries/networking/src/udt/LossList.cpp | 8 +++- libraries/networking/src/udt/LossList.h | 2 +- libraries/networking/src/udt/Packet.cpp | 1 - libraries/networking/src/udt/Socket.cpp | 41 +++++++++++++++---- 10 files changed, 106 insertions(+), 25 deletions(-) diff --git a/libraries/networking/src/udt/BasePacket.h b/libraries/networking/src/udt/BasePacket.h index 8dcc3ab9be..7ebf2d1e65 100644 --- a/libraries/networking/src/udt/BasePacket.h +++ b/libraries/networking/src/udt/BasePacket.h @@ -27,7 +27,8 @@ public: static const qint64 PACKET_WRITE_ERROR; static std::unique_ptr create(qint64 size = -1); - static std::unique_ptr fromReceivedPacket(std::unique_ptr data, qint64 size, const HifiSockAddr& senderSockAddr); + static std::unique_ptr fromReceivedPacket(std::unique_ptr data, qint64 size, + const HifiSockAddr& senderSockAddr); static qint64 maxPayloadSize() { return MAX_PACKET_SIZE; } // The maximum payload size this packet can use to fit in MTU static qint64 localHeaderSize() { return 0; } // Current level's header size diff --git a/libraries/networking/src/udt/Connection.cpp b/libraries/networking/src/udt/Connection.cpp index ec175d8f39..9867584e96 100644 --- a/libraries/networking/src/udt/Connection.cpp +++ b/libraries/networking/src/udt/Connection.cpp @@ -230,27 +230,27 @@ SequenceNumber Connection::nextACK() const { } } -void Connection::processReceivedSequenceNumber(SequenceNumber seq) { +bool Connection::processReceivedSequenceNumber(SequenceNumber sequenceNumber) { // check if this is a packet pair we should estimate bandwidth from, or just a regular packet - if (((uint32_t) seq & 0xF) == 0) { + if (((uint32_t) sequenceNumber & 0xF) == 0) { _receiveWindow.onProbePair1Arrival(); - } else if (((uint32_t) seq & 0xF) == 1) { + } else if (((uint32_t) sequenceNumber & 0xF) == 1) { _receiveWindow.onProbePair2Arrival(); } else { _receiveWindow.onPacketArrival(); } // If this is not the next sequence number, report loss - if (seq > _lastReceivedSequenceNumber + 1) { - if (_lastReceivedSequenceNumber + 1 == seq - 1) { + if (sequenceNumber > _lastReceivedSequenceNumber + 1) { + if (_lastReceivedSequenceNumber + 1 == sequenceNumber - 1) { _lossList.append(_lastReceivedSequenceNumber + 1); } else { - _lossList.append(_lastReceivedSequenceNumber + 1, seq - 1); + _lossList.append(_lastReceivedSequenceNumber + 1, sequenceNumber - 1); } // Send a NAK packet - sendNAK(seq); + sendNAK(sequenceNumber); // figure out when we should send the next loss report, if we haven't heard anything back _nakInterval = (_rtt + 4 * _rttVariance); @@ -267,12 +267,14 @@ void Connection::processReceivedSequenceNumber(SequenceNumber seq) { } } - if (seq > _lastReceivedSequenceNumber) { + bool wasDuplicate = false; + + if (sequenceNumber > _lastReceivedSequenceNumber) { // Update largest recieved sequence number - _lastReceivedSequenceNumber = seq; + _lastReceivedSequenceNumber = sequenceNumber; } else { - // Otherwise, it's a resend, remove it from the loss list - _lossList.remove(seq); + // Otherwise, it could be a resend, try and remove it from the loss list + wasDuplicate = !_lossList.remove(sequenceNumber); } // increment the counters for data packets received @@ -283,6 +285,8 @@ void Connection::processReceivedSequenceNumber(SequenceNumber seq) { if (_congestionControl->_ackInterval > 0 && _packetsSinceACK >= _congestionControl->_ackInterval) { sendACK(false); } + + return wasDuplicate; } void Connection::processControl(unique_ptr controlPacket) { diff --git a/libraries/networking/src/udt/Connection.h b/libraries/networking/src/udt/Connection.h index eaa55797d9..2e6d665e4f 100644 --- a/libraries/networking/src/udt/Connection.h +++ b/libraries/networking/src/udt/Connection.h @@ -44,7 +44,7 @@ public: SequenceNumber nextACK() const; - void processReceivedSequenceNumber(SequenceNumber seq); + bool processReceivedSequenceNumber(SequenceNumber sequenceNumber); // returns indicates if this packet was a duplicate void processControl(std::unique_ptr controlPacket); private: diff --git a/libraries/networking/src/udt/Constants.h b/libraries/networking/src/udt/Constants.h index e8a7658bf9..5e0f130f13 100644 --- a/libraries/networking/src/udt/Constants.h +++ b/libraries/networking/src/udt/Constants.h @@ -14,6 +14,8 @@ #ifndef hifi_udt_Constants_h #define hifi_udt_Constants_h +#include "SequenceNumber.h" + namespace udt { static const int MAX_PACKET_SIZE_WITH_UDP_HEADER = 1500; static const int MAX_PACKET_SIZE = MAX_PACKET_SIZE_WITH_UDP_HEADER - 28; @@ -23,6 +25,7 @@ namespace udt { static const int UDP_SEND_BUFFER_SIZE_BYTES = 1048576; static const int UDP_RECEIVE_BUFFER_SIZE_BYTES = 1048576; static const int DEFAULT_SYN_INTERVAL_USECS = 10 * 1000; + static const uint32_t CONTROL_BIT_MASK = 1 << (sizeof(SequenceNumber) - 1); } #endif // hifi_udt_Constants_h diff --git a/libraries/networking/src/udt/ControlPacket.cpp b/libraries/networking/src/udt/ControlPacket.cpp index 057f3633f9..2bc081e197 100644 --- a/libraries/networking/src/udt/ControlPacket.cpp +++ b/libraries/networking/src/udt/ControlPacket.cpp @@ -11,8 +11,26 @@ #include "ControlPacket.h" +#include "Constants.h" + using namespace udt; +std::unique_ptr ControlPacket::fromReceivedPacket(std::unique_ptr data, qint64 size, + const HifiSockAddr &senderSockAddr) { + // Fail with null data + Q_ASSERT(data); + + // Fail with invalid size + Q_ASSERT(size >= 0); + + // allocate memory + auto packet = std::unique_ptr(new ControlPacket(std::move(data), size, senderSockAddr)); + + packet->open(QIODevice::ReadOnly); + + return packet; +} + std::unique_ptr ControlPacket::create(Type type, qint64 size) { std::unique_ptr controlPacket; @@ -57,6 +75,17 @@ ControlPacket::ControlPacket(Type type, qint64 size) : writeControlBitAndType(); } +ControlPacket::ControlPacket(std::unique_ptr data, qint64 size, const HifiSockAddr& senderSockAddr) : + BasePacket(std::move(data), size, senderSockAddr) +{ + // sanity check before we decrease the payloadSize with the payloadCapacity + Q_ASSERT(_payloadSize == _payloadCapacity); + + adjustPayloadStartAndCapacity(_payloadSize > 0); + + readType(); +} + ControlPacket::ControlPacket(ControlPacket&& other) : BasePacket(std::move(other)) { @@ -71,8 +100,6 @@ ControlPacket& ControlPacket::operator=(ControlPacket&& other) { return *this; } -static const uint32_t CONTROL_BIT_MASK = 1 << (sizeof(ControlPacket::ControlBitAndType) - 1); - void ControlPacket::setType(udt::ControlPacket::Type type) { _type = type; @@ -94,3 +121,11 @@ void ControlPacket::writeType() { // write the type by OR'ing the new type with the current value & CONTROL_BIT_MASK *bitAndType = (*bitAndType & CONTROL_BIT_MASK) | (_type << sizeof((ControlPacket::Type) - 1)); } + +void ControlPacket::readType() { + ControlBitAndType bitAndType = *reinterpret_cast(_packet.get()); + + // read the type + uint32_t oversizeType = (uint32_t) (bitAndType & ~CONTROL_BIT_MASK); + _type = (Type) oversizeType; +} diff --git a/libraries/networking/src/udt/ControlPacket.h b/libraries/networking/src/udt/ControlPacket.h index fdd0b65e36..6d13c06d33 100644 --- a/libraries/networking/src/udt/ControlPacket.h +++ b/libraries/networking/src/udt/ControlPacket.h @@ -33,6 +33,8 @@ public: TimeoutNAK }; + static std::unique_ptr fromReceivedPacket(std::unique_ptr data, qint64 size, + const HifiSockAddr& senderSockAddr); static std::unique_ptr create(Type type, qint64 size = -1); static qint64 localHeaderSize(); // Current level's header size @@ -44,6 +46,7 @@ public: private: ControlPacket(Type type); ControlPacket(Type type, qint64 size); + ControlPacket(std::unique_ptr data, qint64 size, const HifiSockAddr& senderSockAddr); ControlPacket(ControlPacket&& other); ControlPacket(const ControlPacket& other) = delete; @@ -54,6 +57,9 @@ private: void writeControlBitAndType(); void writeType(); + // Header readers + void readType(); + Type _type; }; diff --git a/libraries/networking/src/udt/LossList.cpp b/libraries/networking/src/udt/LossList.cpp index 0bf08fbc1d..32812be10b 100644 --- a/libraries/networking/src/udt/LossList.cpp +++ b/libraries/networking/src/udt/LossList.cpp @@ -81,7 +81,7 @@ void LossList::insert(SequenceNumber start, SequenceNumber end) { } } -void LossList::remove(SequenceNumber seq) { +bool LossList::remove(SequenceNumber seq) { auto it = find_if(_lossList.begin(), _lossList.end(), [&seq](pair pair) { return pair.first <= seq && seq <= pair.second; }); @@ -99,6 +99,12 @@ void LossList::remove(SequenceNumber seq) { _lossList.insert(it, make_pair(seq + 1, temp)); } _length -= 1; + + // this sequence number was found in the loss list, return true + return true; + } else { + // this sequence number was not found in the loss list, return false + return false; } } diff --git a/libraries/networking/src/udt/LossList.h b/libraries/networking/src/udt/LossList.h index 5b5fb9f0c8..fc97206282 100644 --- a/libraries/networking/src/udt/LossList.h +++ b/libraries/networking/src/udt/LossList.h @@ -33,7 +33,7 @@ public: // Inserts anywhere - MUCH slower void insert(SequenceNumber start, SequenceNumber end); - void remove(SequenceNumber seq); + bool remove(SequenceNumber seq); void remove(SequenceNumber start, SequenceNumber end); int getLength() const { return _length; } diff --git a/libraries/networking/src/udt/Packet.cpp b/libraries/networking/src/udt/Packet.cpp index 4223986aee..96cf5210b4 100644 --- a/libraries/networking/src/udt/Packet.cpp +++ b/libraries/networking/src/udt/Packet.cpp @@ -117,7 +117,6 @@ void Packet::writeSequenceNumber(SequenceNumber sequenceNumber) const { writeHeader(); } -static const uint32_t CONTROL_BIT_MASK = 1 << (sizeof(Packet::SequenceNumberAndBitField) - 1); static const uint32_t RELIABILITY_BIT_MASK = 1 << (sizeof(Packet::SequenceNumberAndBitField) - 2); static const uint32_t MESSAGE_BIT_MASK = 1 << (sizeof(Packet::SequenceNumberAndBitField) - 3); static const uint32_t BIT_FIELD_MASK = CONTROL_BIT_MASK | RELIABILITY_BIT_MASK | MESSAGE_BIT_MASK; diff --git a/libraries/networking/src/udt/Socket.cpp b/libraries/networking/src/udt/Socket.cpp index 650bdd7906..5ded97135b 100644 --- a/libraries/networking/src/udt/Socket.cpp +++ b/libraries/networking/src/udt/Socket.cpp @@ -135,14 +135,41 @@ void Socket::readPendingDatagrams() { return; } - // setup a Packet from the data we just read - auto packet = Packet::fromReceivedPacket(std::move(buffer), packetSizeWithHeader, senderSockAddr); + // check if this was a control packet or a data packet + bool isControlPacket = *buffer & CONTROL_BIT_MASK; - // call our verification operator to see if this packet is verified - if (!_packetFilterOperator || _packetFilterOperator(*packet)) { - if (_packetHandler) { - // call the verified packet callback to let it handle this packet - _packetHandler(std::move(packet)); + if (isControlPacket) { + // setup a control packet from the data we just read + auto controlPacket = ControlPacket::fromReceivedPacket(std::move(buffer), packetSizeWithHeader, senderSockAddr); + + // move this control packet to the matching connection + auto it = _connectionsHash.find(senderSockAddr); + + if (it != _connectionsHash.end()) { + it->second->processControl(move(controlPacket)); + } + + } else { + // setup a Packet from the data we just read + auto packet = Packet::fromReceivedPacket(std::move(buffer), packetSizeWithHeader, senderSockAddr); + + // call our verification operator to see if this packet is verified + if (!_packetFilterOperator || _packetFilterOperator(*packet)) { + + if (packet->isReliable()) { + // if this was a reliable packet then signal the matching connection with the sequence number + // assuming it exists + auto it = _connectionsHash.find(senderSockAddr); + + if (it != _connectionsHash.end()) { + it->second->processReceivedSequenceNumber(packet->getSequenceNumber()); + } + } + + if (_packetHandler) { + // call the verified packet callback to let it handle this packet + _packetHandler(std::move(packet)); + } } } }