mirror of
https://github.com/overte-org/overte.git
synced 2025-04-20 18:44:01 +02:00
actually process control and sequence numbers from Socket
This commit is contained in:
parent
35f00f9ba1
commit
66c308b436
10 changed files with 106 additions and 25 deletions
|
@ -27,7 +27,8 @@ public:
|
|||
static const qint64 PACKET_WRITE_ERROR;
|
||||
|
||||
static std::unique_ptr<BasePacket> create(qint64 size = -1);
|
||||
static std::unique_ptr<BasePacket> fromReceivedPacket(std::unique_ptr<char> data, qint64 size, const HifiSockAddr& senderSockAddr);
|
||||
static std::unique_ptr<BasePacket> fromReceivedPacket(std::unique_ptr<char> data, qint64 size,
|
||||
const HifiSockAddr& senderSockAddr);
|
||||
|
||||
static qint64 maxPayloadSize() { return MAX_PACKET_SIZE; } // The maximum payload size this packet can use to fit in MTU
|
||||
static qint64 localHeaderSize() { return 0; } // Current level's header size
|
||||
|
|
|
@ -230,27 +230,27 @@ SequenceNumber Connection::nextACK() const {
|
|||
}
|
||||
}
|
||||
|
||||
void Connection::processReceivedSequenceNumber(SequenceNumber seq) {
|
||||
bool Connection::processReceivedSequenceNumber(SequenceNumber sequenceNumber) {
|
||||
|
||||
// check if this is a packet pair we should estimate bandwidth from, or just a regular packet
|
||||
if (((uint32_t) seq & 0xF) == 0) {
|
||||
if (((uint32_t) sequenceNumber & 0xF) == 0) {
|
||||
_receiveWindow.onProbePair1Arrival();
|
||||
} else if (((uint32_t) seq & 0xF) == 1) {
|
||||
} else if (((uint32_t) sequenceNumber & 0xF) == 1) {
|
||||
_receiveWindow.onProbePair2Arrival();
|
||||
} else {
|
||||
_receiveWindow.onPacketArrival();
|
||||
}
|
||||
|
||||
// If this is not the next sequence number, report loss
|
||||
if (seq > _lastReceivedSequenceNumber + 1) {
|
||||
if (_lastReceivedSequenceNumber + 1 == seq - 1) {
|
||||
if (sequenceNumber > _lastReceivedSequenceNumber + 1) {
|
||||
if (_lastReceivedSequenceNumber + 1 == sequenceNumber - 1) {
|
||||
_lossList.append(_lastReceivedSequenceNumber + 1);
|
||||
} else {
|
||||
_lossList.append(_lastReceivedSequenceNumber + 1, seq - 1);
|
||||
_lossList.append(_lastReceivedSequenceNumber + 1, sequenceNumber - 1);
|
||||
}
|
||||
|
||||
// Send a NAK packet
|
||||
sendNAK(seq);
|
||||
sendNAK(sequenceNumber);
|
||||
|
||||
// figure out when we should send the next loss report, if we haven't heard anything back
|
||||
_nakInterval = (_rtt + 4 * _rttVariance);
|
||||
|
@ -267,12 +267,14 @@ void Connection::processReceivedSequenceNumber(SequenceNumber seq) {
|
|||
}
|
||||
}
|
||||
|
||||
if (seq > _lastReceivedSequenceNumber) {
|
||||
bool wasDuplicate = false;
|
||||
|
||||
if (sequenceNumber > _lastReceivedSequenceNumber) {
|
||||
// Update largest recieved sequence number
|
||||
_lastReceivedSequenceNumber = seq;
|
||||
_lastReceivedSequenceNumber = sequenceNumber;
|
||||
} else {
|
||||
// Otherwise, it's a resend, remove it from the loss list
|
||||
_lossList.remove(seq);
|
||||
// Otherwise, it could be a resend, try and remove it from the loss list
|
||||
wasDuplicate = !_lossList.remove(sequenceNumber);
|
||||
}
|
||||
|
||||
// increment the counters for data packets received
|
||||
|
@ -283,6 +285,8 @@ void Connection::processReceivedSequenceNumber(SequenceNumber seq) {
|
|||
if (_congestionControl->_ackInterval > 0 && _packetsSinceACK >= _congestionControl->_ackInterval) {
|
||||
sendACK(false);
|
||||
}
|
||||
|
||||
return wasDuplicate;
|
||||
}
|
||||
|
||||
void Connection::processControl(unique_ptr<ControlPacket> controlPacket) {
|
||||
|
|
|
@ -44,7 +44,7 @@ public:
|
|||
|
||||
SequenceNumber nextACK() const;
|
||||
|
||||
void processReceivedSequenceNumber(SequenceNumber seq);
|
||||
bool processReceivedSequenceNumber(SequenceNumber sequenceNumber); // returns indicates if this packet was a duplicate
|
||||
void processControl(std::unique_ptr<ControlPacket> controlPacket);
|
||||
|
||||
private:
|
||||
|
|
|
@ -14,6 +14,8 @@
|
|||
#ifndef hifi_udt_Constants_h
|
||||
#define hifi_udt_Constants_h
|
||||
|
||||
#include "SequenceNumber.h"
|
||||
|
||||
namespace udt {
|
||||
static const int MAX_PACKET_SIZE_WITH_UDP_HEADER = 1500;
|
||||
static const int MAX_PACKET_SIZE = MAX_PACKET_SIZE_WITH_UDP_HEADER - 28;
|
||||
|
@ -23,6 +25,7 @@ namespace udt {
|
|||
static const int UDP_SEND_BUFFER_SIZE_BYTES = 1048576;
|
||||
static const int UDP_RECEIVE_BUFFER_SIZE_BYTES = 1048576;
|
||||
static const int DEFAULT_SYN_INTERVAL_USECS = 10 * 1000;
|
||||
static const uint32_t CONTROL_BIT_MASK = 1 << (sizeof(SequenceNumber) - 1);
|
||||
}
|
||||
|
||||
#endif // hifi_udt_Constants_h
|
||||
|
|
|
@ -11,8 +11,26 @@
|
|||
|
||||
#include "ControlPacket.h"
|
||||
|
||||
#include "Constants.h"
|
||||
|
||||
using namespace udt;
|
||||
|
||||
std::unique_ptr<ControlPacket> ControlPacket::fromReceivedPacket(std::unique_ptr<char> data, qint64 size,
|
||||
const HifiSockAddr &senderSockAddr) {
|
||||
// Fail with null data
|
||||
Q_ASSERT(data);
|
||||
|
||||
// Fail with invalid size
|
||||
Q_ASSERT(size >= 0);
|
||||
|
||||
// allocate memory
|
||||
auto packet = std::unique_ptr<ControlPacket>(new ControlPacket(std::move(data), size, senderSockAddr));
|
||||
|
||||
packet->open(QIODevice::ReadOnly);
|
||||
|
||||
return packet;
|
||||
}
|
||||
|
||||
std::unique_ptr<ControlPacket> ControlPacket::create(Type type, qint64 size) {
|
||||
|
||||
std::unique_ptr<ControlPacket> controlPacket;
|
||||
|
@ -57,6 +75,17 @@ ControlPacket::ControlPacket(Type type, qint64 size) :
|
|||
writeControlBitAndType();
|
||||
}
|
||||
|
||||
ControlPacket::ControlPacket(std::unique_ptr<char> data, qint64 size, const HifiSockAddr& senderSockAddr) :
|
||||
BasePacket(std::move(data), size, senderSockAddr)
|
||||
{
|
||||
// sanity check before we decrease the payloadSize with the payloadCapacity
|
||||
Q_ASSERT(_payloadSize == _payloadCapacity);
|
||||
|
||||
adjustPayloadStartAndCapacity(_payloadSize > 0);
|
||||
|
||||
readType();
|
||||
}
|
||||
|
||||
ControlPacket::ControlPacket(ControlPacket&& other) :
|
||||
BasePacket(std::move(other))
|
||||
{
|
||||
|
@ -71,8 +100,6 @@ ControlPacket& ControlPacket::operator=(ControlPacket&& other) {
|
|||
return *this;
|
||||
}
|
||||
|
||||
static const uint32_t CONTROL_BIT_MASK = 1 << (sizeof(ControlPacket::ControlBitAndType) - 1);
|
||||
|
||||
void ControlPacket::setType(udt::ControlPacket::Type type) {
|
||||
_type = type;
|
||||
|
||||
|
@ -94,3 +121,11 @@ void ControlPacket::writeType() {
|
|||
// write the type by OR'ing the new type with the current value & CONTROL_BIT_MASK
|
||||
*bitAndType = (*bitAndType & CONTROL_BIT_MASK) | (_type << sizeof((ControlPacket::Type) - 1));
|
||||
}
|
||||
|
||||
void ControlPacket::readType() {
|
||||
ControlBitAndType bitAndType = *reinterpret_cast<ControlBitAndType*>(_packet.get());
|
||||
|
||||
// read the type
|
||||
uint32_t oversizeType = (uint32_t) (bitAndType & ~CONTROL_BIT_MASK);
|
||||
_type = (Type) oversizeType;
|
||||
}
|
||||
|
|
|
@ -33,6 +33,8 @@ public:
|
|||
TimeoutNAK
|
||||
};
|
||||
|
||||
static std::unique_ptr<ControlPacket> fromReceivedPacket(std::unique_ptr<char> data, qint64 size,
|
||||
const HifiSockAddr& senderSockAddr);
|
||||
static std::unique_ptr<ControlPacket> create(Type type, qint64 size = -1);
|
||||
|
||||
static qint64 localHeaderSize(); // Current level's header size
|
||||
|
@ -44,6 +46,7 @@ public:
|
|||
private:
|
||||
ControlPacket(Type type);
|
||||
ControlPacket(Type type, qint64 size);
|
||||
ControlPacket(std::unique_ptr<char> data, qint64 size, const HifiSockAddr& senderSockAddr);
|
||||
ControlPacket(ControlPacket&& other);
|
||||
ControlPacket(const ControlPacket& other) = delete;
|
||||
|
||||
|
@ -54,6 +57,9 @@ private:
|
|||
void writeControlBitAndType();
|
||||
void writeType();
|
||||
|
||||
// Header readers
|
||||
void readType();
|
||||
|
||||
Type _type;
|
||||
};
|
||||
|
||||
|
|
|
@ -81,7 +81,7 @@ void LossList::insert(SequenceNumber start, SequenceNumber end) {
|
|||
}
|
||||
}
|
||||
|
||||
void LossList::remove(SequenceNumber seq) {
|
||||
bool LossList::remove(SequenceNumber seq) {
|
||||
auto it = find_if(_lossList.begin(), _lossList.end(), [&seq](pair<SequenceNumber, SequenceNumber> pair) {
|
||||
return pair.first <= seq && seq <= pair.second;
|
||||
});
|
||||
|
@ -99,6 +99,12 @@ void LossList::remove(SequenceNumber seq) {
|
|||
_lossList.insert(it, make_pair(seq + 1, temp));
|
||||
}
|
||||
_length -= 1;
|
||||
|
||||
// this sequence number was found in the loss list, return true
|
||||
return true;
|
||||
} else {
|
||||
// this sequence number was not found in the loss list, return false
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -33,7 +33,7 @@ public:
|
|||
// Inserts anywhere - MUCH slower
|
||||
void insert(SequenceNumber start, SequenceNumber end);
|
||||
|
||||
void remove(SequenceNumber seq);
|
||||
bool remove(SequenceNumber seq);
|
||||
void remove(SequenceNumber start, SequenceNumber end);
|
||||
|
||||
int getLength() const { return _length; }
|
||||
|
|
|
@ -117,7 +117,6 @@ void Packet::writeSequenceNumber(SequenceNumber sequenceNumber) const {
|
|||
writeHeader();
|
||||
}
|
||||
|
||||
static const uint32_t CONTROL_BIT_MASK = 1 << (sizeof(Packet::SequenceNumberAndBitField) - 1);
|
||||
static const uint32_t RELIABILITY_BIT_MASK = 1 << (sizeof(Packet::SequenceNumberAndBitField) - 2);
|
||||
static const uint32_t MESSAGE_BIT_MASK = 1 << (sizeof(Packet::SequenceNumberAndBitField) - 3);
|
||||
static const uint32_t BIT_FIELD_MASK = CONTROL_BIT_MASK | RELIABILITY_BIT_MASK | MESSAGE_BIT_MASK;
|
||||
|
|
|
@ -135,14 +135,41 @@ void Socket::readPendingDatagrams() {
|
|||
return;
|
||||
}
|
||||
|
||||
// setup a Packet from the data we just read
|
||||
auto packet = Packet::fromReceivedPacket(std::move(buffer), packetSizeWithHeader, senderSockAddr);
|
||||
// check if this was a control packet or a data packet
|
||||
bool isControlPacket = *buffer & CONTROL_BIT_MASK;
|
||||
|
||||
// call our verification operator to see if this packet is verified
|
||||
if (!_packetFilterOperator || _packetFilterOperator(*packet)) {
|
||||
if (_packetHandler) {
|
||||
// call the verified packet callback to let it handle this packet
|
||||
_packetHandler(std::move(packet));
|
||||
if (isControlPacket) {
|
||||
// setup a control packet from the data we just read
|
||||
auto controlPacket = ControlPacket::fromReceivedPacket(std::move(buffer), packetSizeWithHeader, senderSockAddr);
|
||||
|
||||
// move this control packet to the matching connection
|
||||
auto it = _connectionsHash.find(senderSockAddr);
|
||||
|
||||
if (it != _connectionsHash.end()) {
|
||||
it->second->processControl(move(controlPacket));
|
||||
}
|
||||
|
||||
} else {
|
||||
// setup a Packet from the data we just read
|
||||
auto packet = Packet::fromReceivedPacket(std::move(buffer), packetSizeWithHeader, senderSockAddr);
|
||||
|
||||
// call our verification operator to see if this packet is verified
|
||||
if (!_packetFilterOperator || _packetFilterOperator(*packet)) {
|
||||
|
||||
if (packet->isReliable()) {
|
||||
// if this was a reliable packet then signal the matching connection with the sequence number
|
||||
// assuming it exists
|
||||
auto it = _connectionsHash.find(senderSockAddr);
|
||||
|
||||
if (it != _connectionsHash.end()) {
|
||||
it->second->processReceivedSequenceNumber(packet->getSequenceNumber());
|
||||
}
|
||||
}
|
||||
|
||||
if (_packetHandler) {
|
||||
// call the verified packet callback to let it handle this packet
|
||||
_packetHandler(std::move(packet));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue