Merge branch 'atp' of https://github.com/birarda/hifi into protocol

This commit is contained in:
Atlante45 2015-07-29 17:29:36 -07:00
commit fe48bfdf95
10 changed files with 106 additions and 25 deletions

View file

@ -27,7 +27,8 @@ public:
static const qint64 PACKET_WRITE_ERROR; static const qint64 PACKET_WRITE_ERROR;
static std::unique_ptr<BasePacket> create(qint64 size = -1); static std::unique_ptr<BasePacket> create(qint64 size = -1);
static std::unique_ptr<BasePacket> fromReceivedPacket(std::unique_ptr<char> data, qint64 size, const HifiSockAddr& senderSockAddr); static std::unique_ptr<BasePacket> fromReceivedPacket(std::unique_ptr<char> data, qint64 size,
const HifiSockAddr& senderSockAddr);
static qint64 maxPayloadSize() { return MAX_PACKET_SIZE; } // The maximum payload size this packet can use to fit in MTU static qint64 maxPayloadSize() { return MAX_PACKET_SIZE; } // The maximum payload size this packet can use to fit in MTU
static qint64 localHeaderSize() { return 0; } // Current level's header size static qint64 localHeaderSize() { return 0; } // Current level's header size

View file

@ -225,27 +225,27 @@ SequenceNumber Connection::nextACK() const {
} }
} }
void Connection::processReceivedSequenceNumber(SequenceNumber seq) { bool Connection::processReceivedSequenceNumber(SequenceNumber sequenceNumber) {
// check if this is a packet pair we should estimate bandwidth from, or just a regular packet // check if this is a packet pair we should estimate bandwidth from, or just a regular packet
if (((uint32_t) seq & 0xF) == 0) { if (((uint32_t) sequenceNumber & 0xF) == 0) {
_receiveWindow.onProbePair1Arrival(); _receiveWindow.onProbePair1Arrival();
} else if (((uint32_t) seq & 0xF) == 1) { } else if (((uint32_t) sequenceNumber & 0xF) == 1) {
_receiveWindow.onProbePair2Arrival(); _receiveWindow.onProbePair2Arrival();
} else { } else {
_receiveWindow.onPacketArrival(); _receiveWindow.onPacketArrival();
} }
// If this is not the next sequence number, report loss // If this is not the next sequence number, report loss
if (seq > _lastReceivedSequenceNumber + 1) { if (sequenceNumber > _lastReceivedSequenceNumber + 1) {
if (_lastReceivedSequenceNumber + 1 == seq - 1) { if (_lastReceivedSequenceNumber + 1 == sequenceNumber - 1) {
_lossList.append(_lastReceivedSequenceNumber + 1); _lossList.append(_lastReceivedSequenceNumber + 1);
} else { } else {
_lossList.append(_lastReceivedSequenceNumber + 1, seq - 1); _lossList.append(_lastReceivedSequenceNumber + 1, sequenceNumber - 1);
} }
// Send a NAK packet // Send a NAK packet
sendNAK(seq); sendNAK(sequenceNumber);
// figure out when we should send the next loss report, if we haven't heard anything back // figure out when we should send the next loss report, if we haven't heard anything back
_nakInterval = (_rtt + 4 * _rttVariance); _nakInterval = (_rtt + 4 * _rttVariance);
@ -262,12 +262,14 @@ void Connection::processReceivedSequenceNumber(SequenceNumber seq) {
} }
} }
if (seq > _lastReceivedSequenceNumber) { bool wasDuplicate = false;
if (sequenceNumber > _lastReceivedSequenceNumber) {
// Update largest recieved sequence number // Update largest recieved sequence number
_lastReceivedSequenceNumber = seq; _lastReceivedSequenceNumber = sequenceNumber;
} else { } else {
// Otherwise, it's a resend, remove it from the loss list // Otherwise, it could be a resend, try and remove it from the loss list
_lossList.remove(seq); wasDuplicate = !_lossList.remove(sequenceNumber);
} }
// increment the counters for data packets received // increment the counters for data packets received
@ -280,6 +282,8 @@ void Connection::processReceivedSequenceNumber(SequenceNumber seq) {
} else if (_congestionControl->_lightACKInterval > 0 && _packetsSinceACK >= _congestionControl->_lightACKInterval) { } else if (_congestionControl->_lightACKInterval > 0 && _packetsSinceACK >= _congestionControl->_lightACKInterval) {
sendLightACK(); sendLightACK();
} }
return wasDuplicate;
} }
void Connection::processControl(unique_ptr<ControlPacket> controlPacket) { void Connection::processControl(unique_ptr<ControlPacket> controlPacket) {

View file

@ -42,7 +42,7 @@ public:
void sync(); // rate control method, fired by Socket for all connections on SYN interval void sync(); // rate control method, fired by Socket for all connections on SYN interval
void processReceivedSequenceNumber(SequenceNumber seq); bool processReceivedSequenceNumber(SequenceNumber sequenceNumber); // returns indicates if this packet was a duplicate
void processControl(std::unique_ptr<ControlPacket> controlPacket); void processControl(std::unique_ptr<ControlPacket> controlPacket);
private: private:

View file

@ -14,6 +14,8 @@
#ifndef hifi_udt_Constants_h #ifndef hifi_udt_Constants_h
#define hifi_udt_Constants_h #define hifi_udt_Constants_h
#include "SequenceNumber.h"
namespace udt { namespace udt {
static const int MAX_PACKET_SIZE_WITH_UDP_HEADER = 1500; static const int MAX_PACKET_SIZE_WITH_UDP_HEADER = 1500;
static const int MAX_PACKET_SIZE = MAX_PACKET_SIZE_WITH_UDP_HEADER - 28; static const int MAX_PACKET_SIZE = MAX_PACKET_SIZE_WITH_UDP_HEADER - 28;
@ -23,6 +25,7 @@ namespace udt {
static const int UDP_SEND_BUFFER_SIZE_BYTES = 1048576; static const int UDP_SEND_BUFFER_SIZE_BYTES = 1048576;
static const int UDP_RECEIVE_BUFFER_SIZE_BYTES = 1048576; static const int UDP_RECEIVE_BUFFER_SIZE_BYTES = 1048576;
static const int DEFAULT_SYN_INTERVAL_USECS = 10 * 1000; static const int DEFAULT_SYN_INTERVAL_USECS = 10 * 1000;
static const uint32_t CONTROL_BIT_MASK = 1 << (sizeof(SequenceNumber) - 1);
} }
#endif // hifi_udt_Constants_h #endif // hifi_udt_Constants_h

View file

@ -11,8 +11,26 @@
#include "ControlPacket.h" #include "ControlPacket.h"
#include "Constants.h"
using namespace udt; using namespace udt;
std::unique_ptr<ControlPacket> ControlPacket::fromReceivedPacket(std::unique_ptr<char> data, qint64 size,
const HifiSockAddr &senderSockAddr) {
// Fail with null data
Q_ASSERT(data);
// Fail with invalid size
Q_ASSERT(size >= 0);
// allocate memory
auto packet = std::unique_ptr<ControlPacket>(new ControlPacket(std::move(data), size, senderSockAddr));
packet->open(QIODevice::ReadOnly);
return packet;
}
std::unique_ptr<ControlPacket> ControlPacket::create(Type type, qint64 size) { std::unique_ptr<ControlPacket> ControlPacket::create(Type type, qint64 size) {
std::unique_ptr<ControlPacket> controlPacket; std::unique_ptr<ControlPacket> controlPacket;
@ -57,6 +75,17 @@ ControlPacket::ControlPacket(Type type, qint64 size) :
writeControlBitAndType(); writeControlBitAndType();
} }
ControlPacket::ControlPacket(std::unique_ptr<char> data, qint64 size, const HifiSockAddr& senderSockAddr) :
BasePacket(std::move(data), size, senderSockAddr)
{
// sanity check before we decrease the payloadSize with the payloadCapacity
Q_ASSERT(_payloadSize == _payloadCapacity);
adjustPayloadStartAndCapacity(_payloadSize > 0);
readType();
}
ControlPacket::ControlPacket(ControlPacket&& other) : ControlPacket::ControlPacket(ControlPacket&& other) :
BasePacket(std::move(other)) BasePacket(std::move(other))
{ {
@ -71,8 +100,6 @@ ControlPacket& ControlPacket::operator=(ControlPacket&& other) {
return *this; return *this;
} }
static const uint32_t CONTROL_BIT_MASK = 1 << (sizeof(ControlPacket::ControlBitAndType) - 1);
void ControlPacket::setType(udt::ControlPacket::Type type) { void ControlPacket::setType(udt::ControlPacket::Type type) {
_type = type; _type = type;
@ -94,3 +121,11 @@ void ControlPacket::writeType() {
// write the type by OR'ing the new type with the current value & CONTROL_BIT_MASK // write the type by OR'ing the new type with the current value & CONTROL_BIT_MASK
*bitAndType = (*bitAndType & CONTROL_BIT_MASK) | (_type << sizeof((ControlPacket::Type) - 1)); *bitAndType = (*bitAndType & CONTROL_BIT_MASK) | (_type << sizeof((ControlPacket::Type) - 1));
} }
void ControlPacket::readType() {
ControlBitAndType bitAndType = *reinterpret_cast<ControlBitAndType*>(_packet.get());
// read the type
uint32_t oversizeType = (uint32_t) (bitAndType & ~CONTROL_BIT_MASK);
_type = (Type) oversizeType;
}

View file

@ -33,6 +33,8 @@ public:
TimeoutNAK TimeoutNAK
}; };
static std::unique_ptr<ControlPacket> fromReceivedPacket(std::unique_ptr<char> data, qint64 size,
const HifiSockAddr& senderSockAddr);
static std::unique_ptr<ControlPacket> create(Type type, qint64 size = -1); static std::unique_ptr<ControlPacket> create(Type type, qint64 size = -1);
static qint64 localHeaderSize(); // Current level's header size static qint64 localHeaderSize(); // Current level's header size
@ -44,6 +46,7 @@ public:
private: private:
ControlPacket(Type type); ControlPacket(Type type);
ControlPacket(Type type, qint64 size); ControlPacket(Type type, qint64 size);
ControlPacket(std::unique_ptr<char> data, qint64 size, const HifiSockAddr& senderSockAddr);
ControlPacket(ControlPacket&& other); ControlPacket(ControlPacket&& other);
ControlPacket(const ControlPacket& other) = delete; ControlPacket(const ControlPacket& other) = delete;
@ -54,6 +57,9 @@ private:
void writeControlBitAndType(); void writeControlBitAndType();
void writeType(); void writeType();
// Header readers
void readType();
Type _type; Type _type;
}; };

View file

@ -81,7 +81,7 @@ void LossList::insert(SequenceNumber start, SequenceNumber end) {
} }
} }
void LossList::remove(SequenceNumber seq) { bool LossList::remove(SequenceNumber seq) {
auto it = find_if(_lossList.begin(), _lossList.end(), [&seq](pair<SequenceNumber, SequenceNumber> pair) { auto it = find_if(_lossList.begin(), _lossList.end(), [&seq](pair<SequenceNumber, SequenceNumber> pair) {
return pair.first <= seq && seq <= pair.second; return pair.first <= seq && seq <= pair.second;
}); });
@ -99,6 +99,12 @@ void LossList::remove(SequenceNumber seq) {
_lossList.insert(it, make_pair(seq + 1, temp)); _lossList.insert(it, make_pair(seq + 1, temp));
} }
_length -= 1; _length -= 1;
// this sequence number was found in the loss list, return true
return true;
} else {
// this sequence number was not found in the loss list, return false
return false;
} }
} }

View file

@ -33,7 +33,7 @@ public:
// Inserts anywhere - MUCH slower // Inserts anywhere - MUCH slower
void insert(SequenceNumber start, SequenceNumber end); void insert(SequenceNumber start, SequenceNumber end);
void remove(SequenceNumber seq); bool remove(SequenceNumber seq);
void remove(SequenceNumber start, SequenceNumber end); void remove(SequenceNumber start, SequenceNumber end);
int getLength() const { return _length; } int getLength() const { return _length; }

View file

@ -117,7 +117,6 @@ void Packet::writeSequenceNumber(SequenceNumber sequenceNumber) const {
writeHeader(); writeHeader();
} }
static const uint32_t CONTROL_BIT_MASK = 1 << (sizeof(Packet::SequenceNumberAndBitField) - 1);
static const uint32_t RELIABILITY_BIT_MASK = 1 << (sizeof(Packet::SequenceNumberAndBitField) - 2); static const uint32_t RELIABILITY_BIT_MASK = 1 << (sizeof(Packet::SequenceNumberAndBitField) - 2);
static const uint32_t MESSAGE_BIT_MASK = 1 << (sizeof(Packet::SequenceNumberAndBitField) - 3); static const uint32_t MESSAGE_BIT_MASK = 1 << (sizeof(Packet::SequenceNumberAndBitField) - 3);
static const uint32_t BIT_FIELD_MASK = CONTROL_BIT_MASK | RELIABILITY_BIT_MASK | MESSAGE_BIT_MASK; static const uint32_t BIT_FIELD_MASK = CONTROL_BIT_MASK | RELIABILITY_BIT_MASK | MESSAGE_BIT_MASK;

View file

@ -136,14 +136,41 @@ void Socket::readPendingDatagrams() {
return; return;
} }
// setup a Packet from the data we just read // check if this was a control packet or a data packet
auto packet = Packet::fromReceivedPacket(std::move(buffer), packetSizeWithHeader, senderSockAddr); bool isControlPacket = *buffer & CONTROL_BIT_MASK;
// call our verification operator to see if this packet is verified if (isControlPacket) {
if (!_packetFilterOperator || _packetFilterOperator(*packet)) { // setup a control packet from the data we just read
if (_packetHandler) { auto controlPacket = ControlPacket::fromReceivedPacket(std::move(buffer), packetSizeWithHeader, senderSockAddr);
// call the verified packet callback to let it handle this packet
_packetHandler(std::move(packet)); // move this control packet to the matching connection
auto it = _connectionsHash.find(senderSockAddr);
if (it != _connectionsHash.end()) {
it->second->processControl(move(controlPacket));
}
} else {
// setup a Packet from the data we just read
auto packet = Packet::fromReceivedPacket(std::move(buffer), packetSizeWithHeader, senderSockAddr);
// call our verification operator to see if this packet is verified
if (!_packetFilterOperator || _packetFilterOperator(*packet)) {
if (packet->isReliable()) {
// if this was a reliable packet then signal the matching connection with the sequence number
// assuming it exists
auto it = _connectionsHash.find(senderSockAddr);
if (it != _connectionsHash.end()) {
it->second->processReceivedSequenceNumber(packet->getSequenceNumber());
}
}
if (_packetHandler) {
// call the verified packet callback to let it handle this packet
_packetHandler(std::move(packet));
}
} }
} }
} }