Merge pull request #7159 from Atlante45/fix/atp

Fix packet resend faillure bug
This commit is contained in:
Stephen Birarda 2016-02-23 10:15:36 -08:00
commit 8385d12d9b
9 changed files with 238 additions and 55 deletions

View file

@ -21,6 +21,17 @@
class NLPacket : public udt::Packet {
Q_OBJECT
public:
// 0 1 2 3
// 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
// | Packet Type | Packet Version |
// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
// | Node UUID | Hash (only if verified) | Optional (only if sourced)
// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
//
// NLPacket Header Format
// this is used by the Octree classes - must be known at compile time
static const int MAX_PACKET_HEADER_SIZE =
sizeof(udt::Packet::SequenceNumberAndBitField) + sizeof(udt::Packet::MessageNumberAndBitField) +

View file

@ -26,10 +26,65 @@ namespace udt {
static const int UDP_SEND_BUFFER_SIZE_BYTES = 1048576;
static const int UDP_RECEIVE_BUFFER_SIZE_BYTES = 1048576;
static const int DEFAULT_SYN_INTERVAL_USECS = 10 * 1000;
static const int SEQUENCE_NUMBER_BITS = sizeof(SequenceNumber) * 8;
static const int MESSAGE_LINE_NUMBER_BITS = 32;
static const int MESSAGE_NUMBER_BITS = 30;
static const uint32_t CONTROL_BIT_MASK = uint32_t(1) << (SEQUENCE_NUMBER_BITS - 1);
// Header constants
// Bit sizes (in order)
static const int CONTROL_BIT_SIZE = 1;
static const int RELIABILITY_BIT_SIZE = 1;
static const int MESSAGE_BIT_SIZE = 1;
static const int OBFUSCATION_LEVEL_SIZE = 2;
static const int SEQUENCE_NUMBER_SIZE= 27;
static const int PACKET_POSITION_SIZE = 2;
static const int MESSAGE_NUMBER_SIZE = 30;
static const int MESSAGE_PART_NUMBER_SIZE = 32;
// Offsets
static const int SEQUENCE_NUMBER_OFFSET = 0;
static const int OBFUSCATION_LEVEL_OFFSET = SEQUENCE_NUMBER_OFFSET + SEQUENCE_NUMBER_SIZE;
static const int MESSAGE_BIT_OFFSET = OBFUSCATION_LEVEL_OFFSET + OBFUSCATION_LEVEL_SIZE;
static const int RELIABILITY_BIT_OFFSET = MESSAGE_BIT_OFFSET + MESSAGE_BIT_SIZE;
static const int CONTROL_BIT_OFFSET = RELIABILITY_BIT_OFFSET + RELIABILITY_BIT_SIZE;
static const int MESSAGE_NUMBER_OFFSET = 0;
static const int PACKET_POSITION_OFFSET = MESSAGE_NUMBER_OFFSET + MESSAGE_NUMBER_SIZE;
static const int MESSAGE_PART_NUMBER_OFFSET = 0;
// Masks
static const uint32_t CONTROL_BIT_MASK = uint32_t(1) << CONTROL_BIT_OFFSET;
static const uint32_t RELIABILITY_BIT_MASK = uint32_t(1) << RELIABILITY_BIT_OFFSET;
static const uint32_t MESSAGE_BIT_MASK = uint32_t(1) << MESSAGE_BIT_OFFSET;
static const uint32_t OBFUSCATION_LEVEL_MASK = uint32_t(3) << OBFUSCATION_LEVEL_OFFSET;
static const uint32_t BIT_FIELD_MASK = CONTROL_BIT_MASK | RELIABILITY_BIT_MASK | MESSAGE_BIT_MASK | OBFUSCATION_LEVEL_MASK;
static const uint32_t SEQUENCE_NUMBER_MASK = ~BIT_FIELD_MASK;
static const uint32_t PACKET_POSITION_MASK = uint32_t(3) << PACKET_POSITION_OFFSET;
static const uint32_t MESSAGE_NUMBER_MASK = ~PACKET_POSITION_MASK;
static const uint32_t MESSAGE_PART_NUMBER_MASK = ~uint32_t(0);
// Static checks
static_assert(CONTROL_BIT_SIZE + RELIABILITY_BIT_SIZE + MESSAGE_BIT_SIZE +
OBFUSCATION_LEVEL_SIZE + SEQUENCE_NUMBER_SIZE == 32, "Sequence number line size incorrect");
static_assert(PACKET_POSITION_SIZE + MESSAGE_NUMBER_SIZE == 32, "Message number line size incorrect");
static_assert(MESSAGE_PART_NUMBER_SIZE == 32, "Message part number line size incorrect");
static_assert(CONTROL_BIT_MASK == 0x80000000, "CONTROL_BIT_MASK incorrect");
static_assert(RELIABILITY_BIT_MASK == 0x40000000, "RELIABILITY_BIT_MASK incorrect");
static_assert(MESSAGE_BIT_MASK == 0x20000000, "MESSAGE_BIT_MASK incorrect");
static_assert(OBFUSCATION_LEVEL_MASK == 0x18000000, "OBFUSCATION_LEVEL_MASK incorrect");
static_assert(BIT_FIELD_MASK == 0xF8000000, "BIT_FIELD_MASK incorrect");
static_assert(SEQUENCE_NUMBER_MASK == 0x07FFFFFF, "SEQUENCE_NUMBER_MASK incorrect");
static_assert(PACKET_POSITION_MASK == 0xC0000000, "PACKET_POSITION_MASK incorrect");
static_assert(MESSAGE_NUMBER_MASK == 0x3FFFFFFF, "MESSAGE_NUMBER_MASK incorrect");
static_assert(MESSAGE_PART_NUMBER_MASK == 0xFFFFFFFF, "MESSAGE_PART_NUMBER_MASK incorrect");
}
#endif // hifi_udt_Constants_h

View file

@ -11,10 +11,35 @@
#include "Packet.h"
#include <array>
#include <LogHandler.h>
using namespace udt;
static int packetMetaTypeId = qRegisterMetaType<Packet*>("Packet*");
using Key = uint64_t;
static const std::array<Key, 4> KEYS {{
0x0,
0x6362726973736574,
0x7362697261726461,
0x72687566666d616e,
}};
void xorHelper(char* start, int size, Key key) {
const auto end = start + size;
auto p = start;
for (; p + sizeof(Key) < end; p += sizeof(Key)) {
*reinterpret_cast<Key*>(p) ^= key;
}
for (int i = 0; p < end; ++p || ++i) {
*p ^= *(reinterpret_cast<const char*>(&key) + i);
}
}
int Packet::localHeaderSize(bool isPartOfMessage) {
return sizeof(Packet::SequenceNumberAndBitField) +
(isPartOfMessage ? sizeof(Packet::MessageNumberAndBitField) + sizeof(MessagePartNumber) : 0);
@ -69,44 +94,48 @@ Packet::Packet(std::unique_ptr<char[]> data, qint64 size, const HifiSockAddr& se
readHeader();
adjustPayloadStartAndCapacity(Packet::localHeaderSize(_isPartOfMessage), _payloadSize > 0);
if (getObfuscationLevel() != Packet::NoObfuscation) {
#ifdef UDT_CONNECTION_DEBUG
QString debugString = "Unobfuscating packet %1 with level %2";
debugString = debugString.arg(QString::number((uint32_t)getSequenceNumber()),
QString::number(getObfuscationLevel()));
if (isPartOfMessage()) {
debugString += "\n";
debugString += " Message Number: %1, Part Number: %2.";
debugString = debugString.arg(QString::number(getMessageNumber()),
QString::number(getMessagePartNumber()));
}
static QString repeatedMessage = LogHandler::getInstance().addRepeatedMessageRegex("^Unobfuscating packet .*");
qDebug() << qPrintable(debugString);
#endif
obfuscate(NoObfuscation); // Undo obfuscation
}
}
Packet::Packet(const Packet& other) :
BasePacket(other)
{
_isReliable = other._isReliable;
_isPartOfMessage = other._isPartOfMessage;
_sequenceNumber = other._sequenceNumber;
Packet::Packet(const Packet& other) : BasePacket(other) {
copyMembers(other);
}
Packet& Packet::operator=(const Packet& other) {
BasePacket::operator=(other);
_isReliable = other._isReliable;
_isPartOfMessage = other._isPartOfMessage;
_sequenceNumber = other._sequenceNumber;
copyMembers(other);
return *this;
}
Packet::Packet(Packet&& other) :
BasePacket(std::move(other))
{
_isReliable = other._isReliable;
_isPartOfMessage = other._isPartOfMessage;
_sequenceNumber = other._sequenceNumber;
_packetPosition = other._packetPosition;
_messageNumber = other._messageNumber;
Packet::Packet(Packet&& other) : BasePacket(std::move(other)) {
copyMembers(other);
}
Packet& Packet::operator=(Packet&& other) {
BasePacket::operator=(std::move(other));
_isReliable = other._isReliable;
_isPartOfMessage = other._isPartOfMessage;
_sequenceNumber = other._sequenceNumber;
_packetPosition = other._packetPosition;
_messageNumber = other._messageNumber;
copyMembers(other);
return *this;
}
@ -124,13 +153,27 @@ void Packet::writeSequenceNumber(SequenceNumber sequenceNumber) const {
writeHeader();
}
static const uint32_t RELIABILITY_BIT_MASK = uint32_t(1) << (SEQUENCE_NUMBER_BITS - 2);
static const uint32_t MESSAGE_BIT_MASK = uint32_t(1) << (SEQUENCE_NUMBER_BITS - 3);
static const uint32_t BIT_FIELD_MASK = CONTROL_BIT_MASK | RELIABILITY_BIT_MASK | MESSAGE_BIT_MASK;
void Packet::obfuscate(ObfuscationLevel level) {
auto obfuscationKey = KEYS[getObfuscationLevel()] ^ KEYS[level]; // Undo old and apply new one.
if (obfuscationKey != 0) {
xorHelper(getData() + localHeaderSize(isPartOfMessage()),
getDataSize() - localHeaderSize(isPartOfMessage()), obfuscationKey);
static const uint8_t PACKET_POSITION_OFFSET = 30;
static const uint32_t PACKET_POSITION_MASK = uint32_t(0x03) << PACKET_POSITION_OFFSET;
static const uint32_t MESSAGE_NUMBER_MASK = ~PACKET_POSITION_MASK;
// Update members and header
_obfuscationLevel = level;
writeHeader();
}
}
void Packet::copyMembers(const Packet& other) {
_isReliable = other._isReliable;
_isPartOfMessage = other._isPartOfMessage;
_obfuscationLevel = other._obfuscationLevel;
_sequenceNumber = other._sequenceNumber;
_packetPosition = other._packetPosition;
_messageNumber = other._messageNumber;
_messagePartNumber = other._messagePartNumber;
}
void Packet::readHeader() const {
SequenceNumberAndBitField* seqNumBitField = reinterpret_cast<SequenceNumberAndBitField*>(_packet.get());
@ -139,10 +182,12 @@ void Packet::readHeader() const {
_isReliable = (bool) (*seqNumBitField & RELIABILITY_BIT_MASK); // Only keep reliability bit
_isPartOfMessage = (bool) (*seqNumBitField & MESSAGE_BIT_MASK); // Only keep message bit
_sequenceNumber = SequenceNumber{ *seqNumBitField & ~BIT_FIELD_MASK }; // Remove the bit field
_obfuscationLevel = (ObfuscationLevel)((*seqNumBitField & OBFUSCATION_LEVEL_MASK) >> OBFUSCATION_LEVEL_OFFSET);
_sequenceNumber = SequenceNumber{ *seqNumBitField & SEQUENCE_NUMBER_MASK }; // Remove the bit field
if (_isPartOfMessage) {
MessageNumberAndBitField* messageNumberAndBitField = seqNumBitField + 1;
_messageNumber = *messageNumberAndBitField & MESSAGE_NUMBER_MASK;
_packetPosition = static_cast<PacketPosition>(*messageNumberAndBitField >> PACKET_POSITION_OFFSET);
@ -163,6 +208,10 @@ void Packet::writeHeader() const {
if (_isReliable) {
*seqNumBitField |= RELIABILITY_BIT_MASK;
}
if (_obfuscationLevel != NoObfuscation) {
*seqNumBitField |= (_obfuscationLevel << OBFUSCATION_LEVEL_OFFSET);
}
if (_isPartOfMessage) {
*seqNumBitField |= MESSAGE_BIT_MASK;

View file

@ -25,6 +25,25 @@ namespace udt {
class Packet : public BasePacket {
Q_OBJECT
public:
// Packet Header Format
//
// 0 1 2 3
// 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
// |C|R|M| O | Sequence Number |
// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
// | P | Message Number | Optional (only if M = 1)
// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
// | Message Part Number | Optional (only if M = 1)
// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
//
// C: Control bit
// R: Reliable bit
// M: Message bit
// O: Obfuscation level
// P: Position bits
// NOTE: The SequenceNumber is only actually 29 bits to leave room for a bit field
using SequenceNumberAndBitField = uint32_t;
@ -35,12 +54,20 @@ public:
// Use same size as MessageNumberAndBitField so we can use the enum with bitwise operations
enum PacketPosition : MessageNumberAndBitField {
ONLY = 0x0,
FIRST = 0x2,
MIDDLE = 0x3,
LAST = 0x1
ONLY = 0x0, // 00
FIRST = 0x2, // 10
MIDDLE = 0x3, // 11
LAST = 0x1 // 01
};
// Use same size as SequenceNumberAndBitField so we can use the enum with bitwise operations
enum ObfuscationLevel : SequenceNumberAndBitField {
NoObfuscation = 0x0, // 00
ObfuscationL1 = 0x1, // 01
ObfuscationL2 = 0x2, // 10
ObfuscationL3 = 0x3, // 11
};
static std::unique_ptr<Packet> create(qint64 size = -1, bool isReliable = false, bool isPartOfMessage = false);
static std::unique_ptr<Packet> fromReceivedPacket(std::unique_ptr<char[]> data, qint64 size, const HifiSockAddr& senderSockAddr);
@ -56,7 +83,8 @@ public:
bool isPartOfMessage() const { return _isPartOfMessage; }
bool isReliable() const { return _isReliable; }
ObfuscationLevel getObfuscationLevel() const { return _obfuscationLevel; }
SequenceNumber getSequenceNumber() const { return _sequenceNumber; }
MessageNumber getMessageNumber() const { return _messageNumber; }
PacketPosition getPacketPosition() const { return _packetPosition; }
@ -64,6 +92,7 @@ public:
void writeMessageNumber(MessageNumber messageNumber, PacketPosition position, MessagePartNumber messagePartNumber);
void writeSequenceNumber(SequenceNumber sequenceNumber) const;
void obfuscate(ObfuscationLevel level);
protected:
Packet(qint64 size, bool isReliable = false, bool isPartOfMessage = false);
@ -76,6 +105,8 @@ protected:
Packet& operator=(Packet&& other);
private:
void copyMembers(const Packet& other);
// Header readers - these read data to member variables after pulling packet off wire
void readHeader() const;
void writeHeader() const;
@ -83,6 +114,7 @@ private:
// Simple holders to prevent multiple reading and bitwise ops
mutable bool _isReliable { false };
mutable bool _isPartOfMessage { false };
mutable ObfuscationLevel _obfuscationLevel { NoObfuscation };
mutable SequenceNumber _sequenceNumber { 0 };
mutable MessageNumber _messageNumber { 0 };
mutable PacketPosition _packetPosition { PacketPosition::ONLY };

View file

@ -16,7 +16,7 @@
using namespace udt;
MessageNumber PacketQueue::getNextMessageNumber() {
static const MessageNumber MAX_MESSAGE_NUMBER = MessageNumber(1) << MESSAGE_NUMBER_BITS;
static const MessageNumber MAX_MESSAGE_NUMBER = MessageNumber(1) << MESSAGE_NUMBER_SIZE;
_currentMessageNumber = (_currentMessageNumber + 1) % MAX_MESSAGE_NUMBER;
return _currentMessageNumber;
}

View file

@ -18,6 +18,7 @@
#include <QtCore/QDateTime>
#include <QtCore/QThread>
#include <LogHandler.h>
#include <SharedUtil.h>
#include "../NetworkLogging.h"
@ -225,7 +226,9 @@ void SendQueue::sendNewPacketAndAddToSentList(std::unique_ptr<Packet> newPacket,
{
// Insert the packet we have just sent in the sent list
QWriteLocker locker(&_sentLock);
_sentPackets[newPacket->getSequenceNumber()].swap(newPacket);
auto& entry = _sentPackets[newPacket->getSequenceNumber()];
entry.first = 0; // No resend
entry.second.swap(newPacket);
}
Q_ASSERT_X(!newPacket, "SendQueue::sendNewPacketAndAddToSentList()", "Overriden packet in sent list");
@ -354,14 +357,46 @@ bool SendQueue::maybeResendPacket() {
auto it = _sentPackets.find(resendNumber);
if (it != _sentPackets.end()) {
auto& entry = it->second;
// we found the packet - grab it
auto& resendPacket = *(it->second);
// send it off
sendPacket(resendPacket);
// unlock the sent packets
sentLocker.unlock();
auto& resendPacket = *(entry.second);
++entry.first; // Add 1 resend
Packet::ObfuscationLevel level = (Packet::ObfuscationLevel)(entry.first < 2 ? 0 : (entry.first - 2) % 4);
if (level != Packet::NoObfuscation) {
#ifdef UDT_CONNECTION_DEBUG
QString debugString = "Obfuscating packet %1 with level %2";
debugString = debugString.arg(QString::number((uint32_t)resendPacket.getSequenceNumber()),
QString::number(level));
if (resendPacket.isPartOfMessage()) {
debugString += "\n";
debugString += " Message Number: %1, Part Number: %2.";
debugString = debugString.arg(QString::number(resendPacket.getMessageNumber()),
QString::number(resendPacket.getMessagePartNumber()));
}
static QString repeatedMessage = LogHandler::getInstance().addRepeatedMessageRegex("^Obfuscating packet .*");
qCritical() << qPrintable(debugString);
#endif
// Create copy of the packet
auto packet = Packet::createCopy(resendPacket);
// unlock the sent packets
sentLocker.unlock();
// Obfuscate packet
packet->obfuscate(level);
// send it off
sendPacket(*packet);
} else {
// send it off
sendPacket(resendPacket);
// unlock the sent packets
sentLocker.unlock();
}
emit packetRetransmitted();

View file

@ -126,7 +126,8 @@ private:
LossList _naks; // Sequence numbers of packets to resend
mutable QReadWriteLock _sentLock; // Protects the sent packet list
std::unordered_map<SequenceNumber, std::unique_ptr<Packet>> _sentPackets; // Packets waiting for ACK.
using PacketResendPair = std::pair<uint8_t, std::unique_ptr<Packet>>; // Number of resend + packet ptr
std::unordered_map<SequenceNumber, PacketResendPair> _sentPackets; // Packets waiting for ACK.
std::mutex _handshakeMutex; // Protects the handshake ACK condition_variable
std::atomic<bool> _hasReceivedHandshakeACK { false }; // flag for receipt of handshake ACK from client

View file

@ -24,9 +24,9 @@ public:
using Type = int32_t;
using UType = uint32_t;
// Values are for 29 bit SequenceNumber
static const Type THRESHOLD = 0x0FFFFFFF; // threshold for comparing sequence numbers
static const Type MAX = 0x1FFFFFFF; // maximum sequence number used in UDT
// Values are for 27 bit SequenceNumber
static const Type THRESHOLD = 0x03FFFFFF; // threshold for comparing sequence numbers
static const Type MAX = 0x07FFFFFF; // maximum sequence number used in UDT
SequenceNumber() = default;
SequenceNumber(const SequenceNumber& other) : _value(other._value) {}

View file

@ -277,7 +277,7 @@ void Socket::readPendingDatagrams() {
if (packet->isReliable()) {
// if this was a reliable packet then signal the matching connection with the sequence number
auto& connection = findOrCreateConnection(senderSockAddr);
if (!connection.processReceivedSequenceNumber(packet->getSequenceNumber(),
packet->getDataSize(),
packet->getPayloadSize())) {