resolve conflicts on merge with clement/protocol

This commit is contained in:
Stephen Birarda 2015-08-26 12:50:13 -07:00
commit d039930ca3
8 changed files with 199 additions and 154 deletions

View file

@ -442,6 +442,10 @@ void LimitedNodeList::handleNodeKill(const SharedNodePointer& node) {
qCDebug(networking) << "Killed" << *node;
node->stopPingTimer();
emit nodeKilled(node);
if (auto activeSocket = node->getActiveSocket()) {
_nodeSocket.cleanupConnection(*activeSocket);
}
}
SharedNodePointer LimitedNodeList::addOrUpdateNode(const QUuid& uuid, NodeType_t nodeType,

View file

@ -208,12 +208,13 @@ void PacketReceiver::registerVerifiedListener(PacketType type, QObject* object,
void PacketReceiver::unregisterListener(QObject* listener) {
Q_ASSERT_X(listener, "PacketReceiver::unregisterListener", "No listener to unregister");
QMutexLocker packetListenerLocker(&_packetListenerLock);
std::remove_if(std::begin(_packetListenerMap), std::end(_packetListenerMap),
[&listener](const ObjectMethodPair& pair) {
return pair.first == listener;
});
packetListenerLocker.unlock();
{
QMutexLocker packetListenerLocker(&_packetListenerLock);
std::remove_if(std::begin(_packetListenerMap), std::end(_packetListenerMap),
[&listener](const ObjectMethodPair& pair) {
return pair.first == listener;
});
}
QMutexLocker directConnectSetLocker(&_directConnectSetMutex);
_directlyConnectedObjects.remove(listener);
@ -456,7 +457,6 @@ void PacketReceiver::handleVerifiedPacket(std::unique_ptr<udt::Packet> packet) {
// if it exists, remove the listener from _directlyConnectedObjects
QMutexLocker locker(&_directConnectSetMutex);
_directlyConnectedObjects.remove(listener.first);
locker.unlock();
}
} else if (it == _packetListenerMap.end()) {
@ -465,7 +465,4 @@ void PacketReceiver::handleVerifiedPacket(std::unique_ptr<udt::Packet> packet) {
// insert a dummy listener so we don't print this again
_packetListenerMap.insert(nlPacket->getType(), { nullptr, QMetaMethod() });
}
packetListenerLocker.unlock();
}

View file

@ -76,6 +76,7 @@ SendQueue& Connection::getSendQueue() {
QObject::connect(_sendQueue.get(), &SendQueue::packetSent, this, &Connection::packetSent);
QObject::connect(_sendQueue.get(), &SendQueue::packetSent, this, &Connection::recordSentPackets);
QObject::connect(_sendQueue.get(), &SendQueue::packetRetransmitted, this, &Connection::recordRetransmission);
QObject::connect(_sendQueue.get(), &SendQueue::queueInactive, this, &Connection::queueInactive);
// set defaults on the send queue from our congestion control object
_sendQueue->setPacketSendPeriod(_congestionControl->_packetSendPeriod);
@ -85,6 +86,10 @@ SendQueue& Connection::getSendQueue() {
return *_sendQueue;
}
void Connection::queueInactive() {
emit connectionInactive(_destination);
}
void Connection::sendReliablePacket(std::unique_ptr<Packet> packet) {
Q_ASSERT_X(packet->isReliable(), "Connection::send", "Trying to send an unreliable packet reliably.");
getSendQueue().queuePacket(std::move(packet));

View file

@ -73,10 +73,12 @@ public:
signals:
void packetSent();
void connectionInactive(HifiSockAddr sockAdrr);
private slots:
void recordSentPackets(int payload, int total);
void recordRetransmission();
void queueInactive();
private:
void sendACK(bool wasCausedBySyncTimeout = true);

View file

@ -12,6 +12,7 @@
#include "SendQueue.h"
#include <algorithm>
#include <thread>
#include <QtCore/QCoreApplication>
#include <QtCore/QThread>
@ -24,7 +25,27 @@
#include "Socket.h"
using namespace udt;
using namespace std::chrono;
class DoubleLock {
public:
DoubleLock(std::mutex& mutex1, std::mutex& mutex2) : _mutex1(mutex1), _mutex2(mutex2) { }
DoubleLock(const DoubleLock&) = delete;
DoubleLock& operator=(const DoubleLock&) = delete;
// Either locks all the mutexes or none of them
bool try_lock() { return (std::try_lock(_mutex1, _mutex2) == -1); }
// Locks all the mutexes
void lock() { std::lock(_mutex1, _mutex2); }
// Undefined behavior if not locked
void unlock() { _mutex1.unlock(); _mutex2.unlock(); }
private:
std::mutex& _mutex1;
std::mutex& _mutex2;
};
std::unique_ptr<SendQueue> SendQueue::create(Socket* socket, HifiSockAddr destination) {
auto queue = std::unique_ptr<SendQueue>(new SendQueue(socket, destination));
@ -200,8 +221,8 @@ void SendQueue::sendNewPacketAndAddToSentList(std::unique_ptr<Packet> newPacket,
// Insert the packet we have just sent in the sent list
QWriteLocker locker(&_sentLock);
_sentPackets[newPacket->getSequenceNumber()].swap(newPacket);
Q_ASSERT_X(!newPacket, "SendQueue::sendNewPacketAndAddToSentList()", "Overriden packet in sent list");
}
Q_ASSERT_X(!newPacket, "SendQueue::sendNewPacketAndAddToSentList()", "Overriden packet in sent list");
emit packetSent(packetSize, payloadSize);
}
@ -210,8 +231,8 @@ void SendQueue::run() {
_isRunning = true;
while (_isRunning) {
// Record timing
_lastSendTimestamp = high_resolution_clock::now();
// Record how long the loop takes to execute
auto loopStartTimestamp = high_resolution_clock::now();
std::unique_lock<std::mutex> handshakeLock { _handshakeMutex };
@ -222,12 +243,13 @@ void SendQueue::run() {
// hold the time of last send in a static
static auto lastSendHandshake = high_resolution_clock::time_point();
static const int HANDSHAKE_RESEND_INTERVAL_MS = 100;
static const auto HANDSHAKE_RESEND_INTERVAL_MS = std::chrono::milliseconds(100);
// calculation the duration since the last handshake send
auto sinceLastHandshake = duration_cast<milliseconds>(high_resolution_clock::now() - lastSendHandshake);
auto sinceLastHandshake = std::chrono::duration_cast<std::chrono::milliseconds>(high_resolution_clock::now()
- lastSendHandshake);
if (sinceLastHandshake.count() >= HANDSHAKE_RESEND_INTERVAL_MS) {
if (sinceLastHandshake >= HANDSHAKE_RESEND_INTERVAL_MS) {
// it has been long enough since last handshake, send another
static auto handshakePacket = ControlPacket::create(ControlPacket::Handshake, 0);
@ -238,7 +260,8 @@ void SendQueue::run() {
// we wait for the ACK or the re-send interval to expire
_handshakeACKCondition.wait_until(handshakeLock,
high_resolution_clock::now() + milliseconds(HANDSHAKE_RESEND_INTERVAL_MS));
high_resolution_clock::now()
+ HANDSHAKE_RESEND_INTERVAL_MS);
// Once we're here we've either received the handshake ACK or it's going to be time to re-send a handshake.
// Either way let's continue processing - no packets will be sent if no handshake ACK has been received.
@ -246,141 +269,151 @@ void SendQueue::run() {
handshakeLock.unlock();
bool naksEmpty = true; // used at the end of processing to see if we should wait for NAKs
bool resentPacket = false;
bool sentAPacket = maybeResendPacket();
bool flowWindowFull = false;
// the following while makes sure that we find a packet to re-send, if there is one
while (!resentPacket) {
std::unique_lock<std::mutex> nakLocker(_naksLock);
if (_naks.getLength() > 0) {
naksEmpty = _naks.getLength() > 1;
// pull the sequence number we need to re-send
SequenceNumber resendNumber = _naks.popFirstSequenceNumber();
nakLocker.unlock();
// pull the packet to re-send from the sent packets list
QReadLocker sentLocker(&_sentLock);
// see if we can find the packet to re-send
auto it = _sentPackets.find(resendNumber);
if (it != _sentPackets.end()) {
// we found the packet - grab it
auto& resendPacket = *(it->second);
// unlock the sent packets
sentLocker.unlock();
// send it off
sendPacket(resendPacket);
emit packetRetransmitted();
// mark that we did resend a packet
resentPacket = true;
// break out of our while now that we have re-sent a packet
break;
} else {
// we didn't find this packet in the sentPackets queue - assume this means it was ACKed
// we'll fire the loop again to see if there is another to re-send
continue;
}
} else {
naksEmpty = true;
}
// break from the while, we didn't resend a packet
break;
}
bool packetsEmpty = false; // used after processing to check if we should wait for packets
bool sentPacket = false;
// if we didn't find a packet to re-send AND we think we can fit a new packet on the wire
// (this is according to the current flow window size) then we send out a new packet
if (_hasReceivedHandshakeACK
&& !resentPacket
&& seqlen(SequenceNumber { (uint32_t) _lastACKSequenceNumber }, _currentSequenceNumber) <= _flowWindowSize) {
if (_hasReceivedHandshakeACK && !sentAPacket) {
flowWindowFull = (seqlen(SequenceNumber { (uint32_t) _lastACKSequenceNumber }, _currentSequenceNumber) >
_flowWindowSize);
sentAPacket = maybeSendNewPacket();
}
// Keep track of how long the flow window has been full for
if (flowWindowFull && !_flowWindowWasFull) {
_flowWindowFullSince = loopStartTimestamp;
}
_flowWindowWasFull = flowWindowFull;
if (_hasReceivedHandshakeACK && !sentAPacket) {
static const std::chrono::seconds CONSIDER_INACTIVE_AFTER { 5 };
// we didn't re-send a packet, so time to send a new one
std::unique_lock<std::mutex> locker(_packetsLock);
if (_packets.size() > 0) {
SequenceNumber nextNumber = getNextSequenceNumber();
// grab the first packet we will send
std::unique_ptr<Packet> firstPacket;
firstPacket.swap(_packets.front());
_packets.pop_front();
std::unique_ptr<Packet> secondPacket;
if (((uint32_t) nextNumber & 0xF) == 0) {
// the first packet is the first in a probe pair - every 16 (rightmost 16 bits = 0) packets
// pull off a second packet if we can before we unlock
if (_packets.size() > 0) {
secondPacket.swap(_packets.front());
_packets.pop_front();
}
}
packetsEmpty = _packets.size() == 0;
// unlock the packets, we're done pulling
locker.unlock();
sentPacket = true;
// definitely send the first packet
sendNewPacketAndAddToSentList(move(firstPacket), nextNumber);
// do we have a second in a pair to send as well?
if (secondPacket) {
nextNumber = getNextSequenceNumber();
sendNewPacketAndAddToSentList(move(secondPacket), nextNumber);
}
if (flowWindowFull && (high_resolution_clock::now() - _flowWindowFullSince) > CONSIDER_INACTIVE_AFTER) {
// If the flow window has been full for over CONSIDER_INACTIVE_AFTER,
// then signal the queue is inactive
emit queueInactive();
} else {
packetsEmpty = true;
locker.unlock();
// During our processing above we didn't send any packets and the flow window is not full.
// If that is still the case we should use a condition_variable_any to sleep until we have data to handle.
// To confirm that the queue of packets and the NAKs list are still both empty we'll need to use the DoubleLock
DoubleLock doubleLock(_packetsLock, _naksLock);
// The packets queue and loss list mutexes are now both locked - check if they're still both empty
if (doubleLock.try_lock() && _packets.empty() && _naks.getLength() == 0) {
// both are empty - let's use a condition_variable_any to wait
auto cvStatus = _emptyCondition.wait_for(doubleLock, CONSIDER_INACTIVE_AFTER);
// we have the double lock again - Make sure to unlock it
doubleLock.unlock();
// Check if we've been inactive for too long
if (cvStatus == std::cv_status::timeout) {
emit queueInactive();
}
// skip to the next iteration
continue;
}
}
}
// since we're a while loop, give the thread a chance to process events
QCoreApplication::processEvents();
auto loopEndTimestamp = high_resolution_clock::now();
// we just processed events so check now if we were just told to stop
if (!_isRunning) {
break;
// sleep as long as we need until next packet send, if we can
auto timeToSleep = (loopStartTimestamp + std::chrono::microseconds(_packetSendPeriod)) - loopEndTimestamp;
if (timeToSleep > timeToSleep.zero()) {
std::this_thread::sleep_for(timeToSleep);
}
}
}
bool SendQueue::maybeSendNewPacket() {
// we didn't re-send a packet, so time to send a new one
std::unique_lock<std::mutex> locker(_packetsLock);
if (_packets.size() > 0) {
SequenceNumber nextNumber = getNextSequenceNumber();
// grab the first packet we will send
std::unique_ptr<Packet> firstPacket;
firstPacket.swap(_packets.front());
_packets.pop_front();
std::unique_ptr<Packet> secondPacket;
if (((uint32_t) nextNumber & 0xF) == 0) {
// the first packet is the first in a probe pair - every 16 (rightmost 16 bits = 0) packets
// pull off a second packet if we can before we unlock
if (_packets.size() > 0) {
secondPacket.swap(_packets.front());
_packets.pop_front();
}
}
if (packetsEmpty && naksEmpty) {
// During our processing above the loss list and packet list were both empty.
// unlock the packets, we're done pulling
locker.unlock();
// definitely send the first packet
sendNewPacketAndAddToSentList(move(firstPacket), nextNumber);
// do we have a second in a pair to send as well?
if (secondPacket) {
sendNewPacketAndAddToSentList(move(secondPacket), getNextSequenceNumber());
}
// We sent our packet(s), return here
return true;
}
// No packets were sent
return false;
}
bool SendQueue::maybeResendPacket() {
// the following while makes sure that we find a packet to re-send, if there is one
while (true) {
std::unique_lock<std::mutex> naksLocker(_naksLock);
if (_naks.getLength() > 0) {
// pull the sequence number we need to re-send
SequenceNumber resendNumber = _naks.popFirstSequenceNumber();
naksLocker.unlock();
// If that is still the case we should use a condition_variable_any to sleep until we have data to handle.
// To confirm that the queue of packets and the NAKs list are still both empty we'll need to use the DoubleLock
DoubleLock doubleLock(_packetsLock, _naksLock);
// pull the packet to re-send from the sent packets list
QReadLocker sentLocker(&_sentLock);
// The packets queue and loss list mutexes are now both locked - check if they're still both empty
if (_packets.empty() && _naks.getLength() == 0) {
// both are empty - let's use a condition_variable_any to wait
_emptyCondition.wait(doubleLock);
// see if we can find the packet to re-send
auto it = _sentPackets.find(resendNumber);
if (it != _sentPackets.end()) {
// we found the packet - grab it
auto& resendPacket = *(it->second);
// we have the double lock again - it'll be unlocked once it goes out of scope
// skip to the next iteration
// unlock the sent packets
sentLocker.unlock();
// send it off
sendPacket(resendPacket);
emit packetRetransmitted();
// Signal that we did resend a packet
return true;
} else {
// we didn't find this packet in the sentPackets queue - assume this means it was ACKed
// we'll fire the loop again to see if there is another to re-send
continue;
}
}
// sleep as long as we need until next packet send, if we can
auto now = high_resolution_clock::now();
auto microsecondDuration = duration_cast<microseconds>((_lastSendTimestamp + microseconds(_packetSendPeriod)) - now);
if (microsecondDuration.count() > 0) {
usleep(microsecondDuration.count());
}
// break from the while, we didn't resend a packet
break;
}
// No packet was resent
return false;
}

View file

@ -42,21 +42,8 @@ class SendQueue : public QObject {
Q_OBJECT
public:
class DoubleLock {
public:
DoubleLock(std::mutex& mutex1, std::mutex& mutex2) : _mutex1(mutex1), _mutex2(mutex2) { lock(); }
~DoubleLock() { unlock(); }
DoubleLock(const DoubleLock&) = delete;
DoubleLock& operator=(const DoubleLock&) = delete;
void lock() { std::lock(_mutex1, _mutex2); }
void unlock() { _mutex1.unlock(); _mutex2.unlock(); }
private:
std::mutex& _mutex1;
std::mutex& _mutex2;
};
using high_resolution_clock = std::chrono::high_resolution_clock;
using time_point = high_resolution_clock::time_point;
static std::unique_ptr<SendQueue> create(Socket* socket, HifiSockAddr destination);
@ -82,6 +69,8 @@ signals:
void packetSent(int dataSize, int payloadSize);
void packetRetransmitted();
void queueInactive();
private slots:
void run();
@ -93,6 +82,9 @@ private:
void sendPacket(const Packet& packet);
void sendNewPacketAndAddToSentList(std::unique_ptr<Packet> newPacket, SequenceNumber sequenceNumber);
bool maybeSendNewPacket(); // Figures out what packet to send next
bool maybeResendPacket(); // Determines whether to resend a packet and wich one
// Increments current sequence number and return it
SequenceNumber getNextSequenceNumber();
MessageNumber getNextMessageNumber();
@ -110,11 +102,14 @@ private:
std::atomic<uint32_t> _atomicCurrentSequenceNumber { 0 };// Atomic for last sequence number sent out
std::atomic<int> _packetSendPeriod { 0 }; // Interval between two packet send event in microseconds, set from CC
std::chrono::high_resolution_clock::time_point _lastSendTimestamp; // Record last time of packet departure
std::atomic<bool> _isRunning { false };
std::atomic<int> _flowWindowSize { 0 }; // Flow control window size (number of packets that can be on wire) - set from CC
// Used to detect when the connection becomes inactive for too long
bool _flowWindowWasFull = false;
time_point _flowWindowFullSince;
mutable std::mutex _naksLock; // Protects the naks list.
LossList _naks; // Sequence numbers of packets to resend

View file

@ -147,12 +147,18 @@ Connection& Socket::findOrCreateConnection(const HifiSockAddr& sockAddr) {
if (it == _connectionsHash.end()) {
auto connection = std::unique_ptr<Connection>(new Connection(this, sockAddr, _ccFactory->create()));
QObject::connect(connection.get(), &Connection::connectionInactive, this, &Socket::cleanupConnection);
it = _connectionsHash.insert(it, std::make_pair(sockAddr, std::move(connection)));
}
return *it->second;
}
void Socket::cleanupConnection(HifiSockAddr sockAddr) {
qCDebug(networking) << "Socket::cleanupConnection called for connection to" << sockAddr;
_connectionsHash.erase(sockAddr);
}
void Socket::messageReceived(std::unique_ptr<PacketList> packetList) {
if (_packetListHandler) {
_packetListHandler(std::move(packetList));
@ -205,8 +211,8 @@ void Socket::readPendingDatagrams() {
// if this was a reliable packet then signal the matching connection with the sequence number
auto& connection = findOrCreateConnection(senderSockAddr);
connection.processReceivedSequenceNumber(packet->getSequenceNumber(),
packet->getDataSize(),
packet->getPayloadSize());
packet->getDataSize(),
packet->getPayloadSize());
}
if (packet->isPartOfMessage()) {

View file

@ -73,6 +73,9 @@ public:
ConnectionStats::Stats sampleStatsForConnection(const HifiSockAddr& destination);
std::vector<HifiSockAddr> getConnectionSockAddrs();
public slots:
void cleanupConnection(HifiSockAddr sockAddr);
private slots:
void readPendingDatagrams();
void rateControlSync();