Merge pull request #7217 from huffman/udt-random-seq

Randomize initial send sequence number in UDT
This commit is contained in:
Stephen Birarda 2016-03-01 12:59:10 -08:00
commit b0c11e6562
10 changed files with 75 additions and 39 deletions

View file

@ -540,7 +540,7 @@ Application::Application(int& argc, char** argv, QElapsedTimer& startupTimer) :
const DomainHandler& domainHandler = nodeList->getDomainHandler();
connect(&domainHandler, SIGNAL(hostnameChanged(const QString&)), SLOT(domainChanged(const QString&)));
connect(&domainHandler, SIGNAL(connectedToDomain(const QString&)), SLOT(connectedToDomain(const QString&)));
connect(&domainHandler, SIGNAL(resetting()), SLOT(resettingDomain()));
connect(&domainHandler, SIGNAL(connectedToDomain(const QString&)), SLOT(updateWindowTitle()));
connect(&domainHandler, SIGNAL(disconnectedFromDomain()), SLOT(updateWindowTitle()));
connect(&domainHandler, SIGNAL(disconnectedFromDomain()), SLOT(clearDomainOctreeDetails()));
@ -3979,13 +3979,8 @@ void Application::handleDomainConnectionDeniedPacket(QSharedPointer<ReceivedMess
AccountManager::getInstance().checkAndSignalForAccessToken();
}
void Application::connectedToDomain(const QString& hostname) {
AccountManager& accountManager = AccountManager::getInstance();
const QUuid& domainID = DependencyManager::get<NodeList>()->getDomainHandler().getUUID();
if (accountManager.isLoggedIn() && !domainID.isNull()) {
_notifiedPacketVersionMismatchThisDomain = false;
}
void Application::resettingDomain() {
_notifiedPacketVersionMismatchThisDomain = false;
}
void Application::nodeAdded(SharedNodePointer node) {

View file

@ -290,7 +290,7 @@ private slots:
void idle(uint64_t now);
void aboutToQuit();
void connectedToDomain(const QString& hostname);
void resettingDomain();
void audioMuteToggled();
void faceTrackerMuteToggled();

View file

@ -98,6 +98,8 @@ void DomainHandler::softReset() {
}
void DomainHandler::hardReset() {
emit resetting();
softReset();
qCDebug(networking) << "Hard reset in NodeList DomainHandler.";

View file

@ -104,6 +104,7 @@ signals:
// It means that, either from DNS lookup or ICE, we think we have a socket we can talk to DS on
void completedSocketDiscovery();
void resetting();
void connectedToDomain(const QString& hostname);
void disconnectedFromDomain();

View file

@ -58,7 +58,7 @@ BasePacket::BasePacket(qint64 size) {
Q_ASSERT(size >= 0 || size < maxPayload);
_packetSize = size;
_packet.reset(new char[_packetSize]);
_packet.reset(new char[_packetSize]());
_payloadCapacity = _packetSize;
_payloadSize = 0;
_payloadStart = _packet.get();

View file

@ -87,7 +87,8 @@ SendQueue& Connection::getSendQueue() {
// receiver is getting the sequence numbers it expects (given that the connection must still be active)
// Lasily create send queue
_sendQueue = SendQueue::create(_parentSocket, _destination, _inactiveSendQueueSequenceNumber);
_sendQueue = SendQueue::create(_parentSocket, _destination);
_lastReceivedACK = _sendQueue->getCurrentSequenceNumber();
#ifdef UDT_CONNECTION_DEBUG
qCDebug(networking) << "Created SendQueue for connection to" << _destination;
@ -109,10 +110,6 @@ SendQueue& Connection::getSendQueue() {
}
void Connection::queueInactive() {
// get the current sequence number from the send queue, this is to be re-used if the send
// queue is re-activated for this connection
_inactiveSendQueueSequenceNumber = _sendQueue->getCurrentSequenceNumber();
// tell our current send queue to go down and reset our ptr to it to null
stopSendQueue();
@ -728,15 +725,28 @@ void Connection::processNAK(std::unique_ptr<ControlPacket> controlPacket) {
}
void Connection::processHandshake(std::unique_ptr<ControlPacket> controlPacket) {
SequenceNumber initialSequenceNumber;
controlPacket->readPrimitive(&initialSequenceNumber);
if (!_hasReceivedHandshake || _isReceivingData) {
if (!_hasReceivedHandshake || initialSequenceNumber != _initialReceiveSequenceNumber) {
// server sent us a handshake - we need to assume this means state should be reset
// as long as we haven't received a handshake yet or we have and we've received some data
#ifdef UDT_CONNECTION_DEBUG
if (initialSequenceNumber != _initialReceiveSequenceNumber) {
qCDebug(networking) << "Resetting receive state, received a new initial sequence number in handshake";
}
#endif
resetReceiveState();
_initialReceiveSequenceNumber = initialSequenceNumber;
_lastReceivedSequenceNumber = initialSequenceNumber - 1;
_lastSentACK = initialSequenceNumber - 1;
}
// immediately respond with a handshake ACK
static auto handshakeACK = ControlPacket::create(ControlPacket::HandshakeACK, 0);
static auto handshakeACK = ControlPacket::create(ControlPacket::HandshakeACK, sizeof(SequenceNumber));
handshakeACK->seek(0);
handshakeACK->writePrimitive(initialSequenceNumber);
_parentSocket->writeBasePacket(*handshakeACK, _destination);
// indicate that handshake has been received
@ -746,8 +756,11 @@ void Connection::processHandshake(std::unique_ptr<ControlPacket> controlPacket)
void Connection::processHandshakeACK(std::unique_ptr<ControlPacket> controlPacket) {
// if we've decided to clean up the send queue then this handshake ACK should be ignored, it's useless
if (_sendQueue) {
SequenceNumber initialSequenceNumber;
controlPacket->readPrimitive(&initialSequenceNumber);
// hand off this handshake ACK to the send queue so it knows it can start sending
getSendQueue().handshakeACK();
getSendQueue().handshakeACK(initialSequenceNumber);
// indicate that handshake ACK was received
_hasReceivedHandshakeACK = true;

View file

@ -130,7 +130,9 @@ private:
bool _isReceivingData { false }; // flag used for expiry of receipt portion of connection
bool _isActive { true }; // flag used for inactivity of connection
SequenceNumber _initialReceiveSequenceNumber; // Randomized by peer SendQueue on creation, identifies connection during re-connect requests
LossList _lossList; // List of all missing packets
SequenceNumber _lastReceivedSequenceNumber; // The largest sequence number received from the peer
SequenceNumber _lastReceivedACK; // The last ACK received
@ -140,8 +142,6 @@ private:
SequenceNumber _lastSentACK; // The last sent ACK
SequenceNumber _lastSentACK2; // The last sent ACK sub-sequence number in an ACK2
SequenceNumber _inactiveSendQueueSequenceNumber { 0 };
int _acksDuringSYN { 1 }; // The number of non-SYN ACKs sent during SYN
int _lightACKsDuringSYN { 1 }; // The number of lite ACKs sent during SYN interval

View file

@ -16,6 +16,10 @@
#include <QtCore/QDebug>
#include <QtCore/QMetaEnum>
Q_DECLARE_METATYPE(PacketType);
static int packetTypeMetaTypeId = qRegisterMetaType<PacketType>();
const QSet<PacketType> NON_VERIFIED_PACKETS = QSet<PacketType>()
<< PacketType::NodeJsonStats << PacketType::EntityQuery
<< PacketType::OctreeDataNack << PacketType::EntityEditNack
@ -38,6 +42,8 @@ const QSet<PacketType> RELIABLE_PACKETS = QSet<PacketType>();
PacketVersion versionForPacketType(PacketType packetType) {
switch (packetType) {
case PacketType::DomainList:
return 18;
case PacketType::EntityAdd:
case PacketType::EntityEdit:
case PacketType::EntityData:

View file

@ -12,6 +12,7 @@
#include "SendQueue.h"
#include <algorithm>
#include <random>
#include <thread>
#include <QtCore/QCoreApplication>
@ -53,10 +54,10 @@ private:
Mutex2& _mutex2;
};
std::unique_ptr<SendQueue> SendQueue::create(Socket* socket, HifiSockAddr destination, SequenceNumber currentSequenceNumber) {
std::unique_ptr<SendQueue> SendQueue::create(Socket* socket, HifiSockAddr destination) {
Q_ASSERT_X(socket, "SendQueue::create", "Must be called with a valid Socket*");
auto queue = std::unique_ptr<SendQueue>(new SendQueue(socket, destination, currentSequenceNumber));
auto queue = std::unique_ptr<SendQueue>(new SendQueue(socket, destination));
// Setup queue private thread
QThread* thread = new QThread;
@ -75,12 +76,23 @@ std::unique_ptr<SendQueue> SendQueue::create(Socket* socket, HifiSockAddr destin
return queue;
}
SendQueue::SendQueue(Socket* socket, HifiSockAddr dest, SequenceNumber currentSequenceNumber) :
SendQueue::SendQueue(Socket* socket, HifiSockAddr dest) :
_socket(socket),
_destination(dest),
_currentSequenceNumber(currentSequenceNumber)
_destination(dest)
{
// setup psuedo-random number generation for all instances of SendQueue
static std::random_device rd;
static std::mt19937 generator(rd());
static std::uniform_int_distribution<> distribution(0, SequenceNumber::MAX);
// randomize the intial sequence number
_initialSequenceNumber = SequenceNumber(distribution(generator));
// set our member variables from randomized initial number
_currentSequenceNumber = _initialSequenceNumber - 1;
_atomicCurrentSequenceNumber = uint32_t(_currentSequenceNumber);
_lastACKSequenceNumber = uint32_t(_currentSequenceNumber) - 1;
}
void SendQueue::queuePacket(std::unique_ptr<Packet> packet) {
@ -190,7 +202,11 @@ void SendQueue::sendHandshake() {
std::unique_lock<std::mutex> handshakeLock { _handshakeMutex };
if (!_hasReceivedHandshakeACK) {
// we haven't received a handshake ACK from the client, send another now
static const auto handshakePacket = ControlPacket::create(ControlPacket::Handshake, 0);
static const auto handshakePacket = ControlPacket::create(ControlPacket::Handshake, sizeof(SequenceNumber));
handshakePacket->seek(0);
handshakePacket->writePrimitive(_initialSequenceNumber);
_socket->writeBasePacket(*handshakePacket, _destination);
// we wait for the ACK or the re-send interval to expire
@ -199,14 +215,16 @@ void SendQueue::sendHandshake() {
}
}
void SendQueue::handshakeACK() {
{
std::lock_guard<std::mutex> locker { _handshakeMutex };
_hasReceivedHandshakeACK = true;
void SendQueue::handshakeACK(SequenceNumber initialSequenceNumber) {
if (initialSequenceNumber == _initialSequenceNumber) {
{
std::lock_guard<std::mutex> locker { _handshakeMutex };
_hasReceivedHandshakeACK = true;
}
// Notify on the handshake ACK condition
_handshakeACKCondition.notify_one();
}
// Notify on the handshake ACK condition
_handshakeACKCondition.notify_one();
}
SequenceNumber SendQueue::getNextSequenceNumber() {

View file

@ -50,8 +50,7 @@ public:
Stopped
};
static std::unique_ptr<SendQueue> create(Socket* socket, HifiSockAddr destination,
SequenceNumber currentSequenceNumber = SequenceNumber());
static std::unique_ptr<SendQueue> create(Socket* socket, HifiSockAddr destination);
void queuePacket(std::unique_ptr<Packet> packet);
void queuePacketList(std::unique_ptr<PacketList> packetList);
@ -72,7 +71,7 @@ public slots:
void ack(SequenceNumber ack);
void nak(SequenceNumber start, SequenceNumber end);
void overrideNAKListFromPacket(ControlPacket& packet);
void handshakeACK();
void handshakeACK(SequenceNumber initialSequenceNumber);
signals:
void packetSent(int dataSize, int payloadSize);
@ -84,7 +83,7 @@ private slots:
void run();
private:
SendQueue(Socket* socket, HifiSockAddr dest, SequenceNumber currentSequenceNumber);
SendQueue(Socket* socket, HifiSockAddr dest);
SendQueue(SendQueue& other) = delete;
SendQueue(SendQueue&& other) = delete;
@ -106,6 +105,8 @@ private:
Socket* _socket { nullptr }; // Socket to send packet on
HifiSockAddr _destination; // Destination addr
SequenceNumber _initialSequenceNumber; // Randomized on SendQueue creation, identifies connection during re-connect requests
std::atomic<uint32_t> _lastACKSequenceNumber { 0 }; // Last ACKed sequence number