Merge pull request #5892 from Atlante45/protocol

Add channels to UDT
This commit is contained in:
Stephen Birarda 2015-10-07 11:06:22 -07:00
commit 022c4276b9
16 changed files with 427 additions and 355 deletions

View file

@ -521,7 +521,7 @@ std::unique_ptr<NLPacket> LimitedNodeList::constructPingPacket(PingType_t pingTy
pingPacket->writePrimitive(pingType);
pingPacket->writePrimitive(usecTimestampNow());
return std::move(pingPacket);
return pingPacket;
}
std::unique_ptr<NLPacket> LimitedNodeList::constructPingReplyPacket(NLPacket& pingPacket) {
@ -536,7 +536,7 @@ std::unique_ptr<NLPacket> LimitedNodeList::constructPingReplyPacket(NLPacket& pi
replyPacket->writePrimitive(timeFromOriginalPing);
replyPacket->writePrimitive(usecTimestampNow());
return std::move(replyPacket);
return replyPacket;
}
std::unique_ptr<NLPacket> LimitedNodeList::constructICEPingPacket(PingType_t pingType, const QUuid& iceID) {
@ -546,7 +546,7 @@ std::unique_ptr<NLPacket> LimitedNodeList::constructICEPingPacket(PingType_t pin
icePingPacket->write(iceID.toRfc4122());
icePingPacket->writePrimitive(pingType);
return std::move(icePingPacket);
return icePingPacket;
}
std::unique_ptr<NLPacket> LimitedNodeList::constructICEPingReplyPacket(NLPacket& pingPacket, const QUuid& iceID) {

View file

@ -52,15 +52,14 @@ Connection::~Connection() {
}
void Connection::stopSendQueue() {
if (_sendQueue) {
if (auto sendQueue = _sendQueue.release()) {
// grab the send queue thread so we can wait on it
QThread* sendQueueThread = _sendQueue->thread();
QThread* sendQueueThread = sendQueue->thread();
// tell the send queue to stop and be deleted
_sendQueue->stop();
_sendQueue->deleteLater();
_sendQueue.release();
sendQueue->stop();
sendQueue->deleteLater();
// since we're stopping the send queue we should consider our handshake ACK not receieved
_hasReceivedHandshakeACK = false;
@ -858,37 +857,22 @@ void PendingReceivedMessage::enqueuePacket(std::unique_ptr<Packet> packet) {
"PendingReceivedMessage::enqueuePacket",
"called with a packet that is not part of a message");
if (_isComplete) {
qCDebug(networking) << "UNEXPECTED: Received packet for a message that is already complete";
return;
}
auto sequenceNumber = packet->getSequenceNumber();
if (packet->getPacketPosition() == Packet::PacketPosition::FIRST) {
_hasFirstSequenceNumber = true;
_firstSequenceNumber = sequenceNumber;
} else if (packet->getPacketPosition() == Packet::PacketPosition::LAST) {
_hasLastSequenceNumber = true;
_lastSequenceNumber = sequenceNumber;
} else if (packet->getPacketPosition() == Packet::PacketPosition::ONLY) {
_hasFirstSequenceNumber = true;
_hasLastSequenceNumber = true;
_firstSequenceNumber = sequenceNumber;
_lastSequenceNumber = sequenceNumber;
if (packet->getPacketPosition() == Packet::PacketPosition::LAST ||
packet->getPacketPosition() == Packet::PacketPosition::ONLY) {
_hasLastPacket = true;
_numPackets = packet->getMessagePartNumber() + 1;
}
// Insert into the packets list in sorted order. Because we generally expect to receive packets in order, begin
// searching from the end of the list.
auto it = find_if(_packets.rbegin(), _packets.rend(),
[&](const std::unique_ptr<Packet>& packet) { return sequenceNumber > packet->getSequenceNumber(); });
auto messagePartNumber = packet->getMessagePartNumber();
auto it = std::find_if(_packets.rbegin(), _packets.rend(),
[&](const std::unique_ptr<Packet>& value) { return messagePartNumber >= value->getMessagePartNumber(); });
_packets.insert(it.base(), std::move(packet));
if (_hasFirstSequenceNumber && _hasLastSequenceNumber) {
auto numPackets = udt::seqlen(_firstSequenceNumber, _lastSequenceNumber);
if (uint64_t(numPackets) == _packets.size()) {
_isComplete = true;
}
if (it != _packets.rend() && ((*it)->getMessagePartNumber() == messagePartNumber)) {
qCDebug(networking) << "PendingReceivedMessage::enqueuePacket: This is a duplicate packet";
return;
}
_packets.insert(it.base(), std::move(packet));
}

View file

@ -37,16 +37,13 @@ class Socket;
class PendingReceivedMessage {
public:
void enqueuePacket(std::unique_ptr<Packet> packet);
bool isComplete() const { return _isComplete; }
bool isComplete() const { return _hasLastPacket && _numPackets == _packets.size(); }
std::list<std::unique_ptr<Packet>> _packets;
private:
bool _isComplete { false };
bool _hasFirstSequenceNumber { false };
bool _hasLastSequenceNumber { false };
SequenceNumber _firstSequenceNumber;
SequenceNumber _lastSequenceNumber;
bool _hasLastPacket { false };
unsigned int _numPackets { 0 };
};
class Connection : public QObject {

View file

@ -13,16 +13,13 @@
#define hifi_ConnectionStats_h
#include <chrono>
#include <vector>
#include <array>
namespace udt {
class ConnectionStats {
public:
struct Stats {
std::chrono::microseconds startTime;
std::chrono::microseconds endTime;
enum Event {
SentACK,
ReceivedACK,
@ -41,8 +38,14 @@ public:
NumEvents
};
using microseconds = std::chrono::microseconds;
using Events = std::array<int, NumEvents>;
microseconds startTime;
microseconds endTime;
// construct a vector for the events of the size of our Enum - default value is zero
std::vector<int> events = std::vector<int>((int) Event::NumEvents, 0);
Events events;
// packet counts and sizes
int sentPackets { 0 };
@ -66,6 +69,9 @@ public:
int rtt { 0 };
int congestionWindowSize { 0 };
int packetSendPeriod { 0 };
// TODO: Remove once Win build supports brace initialization: `Events events {{ 0 }};`
Stats() { events.fill(0); }
};
ConnectionStats();

View file

@ -37,6 +37,7 @@ public:
void remove(SequenceNumber start, SequenceNumber end);
int getLength() const { return _length; }
bool isEmpty() const { return _length == 0; }
SequenceNumber getFirstSequenceNumber() const;
SequenceNumber popFirstSequenceNumber();

View file

@ -15,7 +15,7 @@ using namespace udt;
int Packet::localHeaderSize(bool isPartOfMessage) {
return sizeof(Packet::SequenceNumberAndBitField) +
(isPartOfMessage ? sizeof(Packet::MessageNumberAndBitField) : 0);
(isPartOfMessage ? sizeof(Packet::MessageNumberAndBitField) + sizeof(MessagePartNumber) : 0);
}
int Packet::totalHeaderSize(bool isPartOfMessage) {
@ -109,9 +109,11 @@ Packet& Packet::operator=(Packet&& other) {
return *this;
}
void Packet::writeMessageNumber(MessageNumber messageNumber) {
void Packet::writeMessageNumber(MessageNumber messageNumber, PacketPosition position, MessagePartNumber messagePartNumber) {
_isPartOfMessage = true;
_messageNumber = messageNumber;
_packetPosition = position;
_messagePartNumber = messagePartNumber;
writeHeader();
}
@ -124,7 +126,8 @@ static const uint32_t RELIABILITY_BIT_MASK = uint32_t(1) << (SEQUENCE_NUMBER_BIT
static const uint32_t MESSAGE_BIT_MASK = uint32_t(1) << (SEQUENCE_NUMBER_BITS - 3);
static const uint32_t BIT_FIELD_MASK = CONTROL_BIT_MASK | RELIABILITY_BIT_MASK | MESSAGE_BIT_MASK;
static const uint32_t PACKET_POSITION_MASK = uint32_t(0x03) << 30;
static const uint8_t PACKET_POSITION_OFFSET = 30;
static const uint32_t PACKET_POSITION_MASK = uint32_t(0x03) << PACKET_POSITION_OFFSET;
static const uint32_t MESSAGE_NUMBER_MASK = ~PACKET_POSITION_MASK;
void Packet::readHeader() const {
@ -139,7 +142,10 @@ void Packet::readHeader() const {
if (_isPartOfMessage) {
MessageNumberAndBitField* messageNumberAndBitField = seqNumBitField + 1;
_messageNumber = *messageNumberAndBitField & MESSAGE_NUMBER_MASK;
_packetPosition = static_cast<PacketPosition>(*messageNumberAndBitField >> 30);
_packetPosition = static_cast<PacketPosition>(*messageNumberAndBitField >> PACKET_POSITION_OFFSET);
MessagePartNumber* messagePartNumber = messageNumberAndBitField + 1;
_messagePartNumber = *messagePartNumber;
}
}
@ -164,6 +170,9 @@ void Packet::writeHeader() const {
MessageNumberAndBitField* messageNumberAndBitField = seqNumBitField + 1;
*messageNumberAndBitField = _messageNumber;
*messageNumberAndBitField |= _packetPosition << 30;
*messageNumberAndBitField |= _packetPosition << PACKET_POSITION_OFFSET;
MessagePartNumber* messagePartNumber = messageNumberAndBitField + 1;
*messagePartNumber = _messagePartNumber;
}
}

View file

@ -28,9 +28,10 @@ public:
// NOTE: The SequenceNumber is only actually 29 bits to leave room for a bit field
using SequenceNumberAndBitField = uint32_t;
// NOTE: The MessageNumber is only actually 29 bits to leave room for a bit field
// NOTE: The MessageNumber is only actually 30 bits to leave room for a bit field
using MessageNumber = uint32_t;
using MessageNumberAndBitField = uint32_t;
using MessagePartNumber = uint32_t;
// Use same size as MessageNumberAndBitField so we can use the enum with bitwise operations
enum PacketPosition : MessageNumberAndBitField {
@ -55,14 +56,13 @@ public:
bool isPartOfMessage() const { return _isPartOfMessage; }
bool isReliable() const { return _isReliable; }
SequenceNumber getSequenceNumber() const { return _sequenceNumber; }
MessageNumber getMessageNumber() const { return _messageNumber; }
void setPacketPosition(PacketPosition position) { _packetPosition = position; }
PacketPosition getPacketPosition() const { return _packetPosition; }
void writeMessageNumber(MessageNumber messageNumber);
SequenceNumber getSequenceNumber() const { return _sequenceNumber; }
MessageNumber getMessageNumber() const { return _messageNumber; }
PacketPosition getPacketPosition() const { return _packetPosition; }
MessagePartNumber getMessagePartNumber() const { return _messagePartNumber; }
void writeMessageNumber(MessageNumber messageNumber, PacketPosition position, MessagePartNumber messagePartNumber);
void writeSequenceNumber(SequenceNumber sequenceNumber) const;
protected:
@ -83,9 +83,10 @@ private:
// Simple holders to prevent multiple reading and bitwise ops
mutable bool _isReliable { false };
mutable bool _isPartOfMessage { false };
mutable SequenceNumber _sequenceNumber;
mutable PacketPosition _packetPosition { PacketPosition::ONLY };
mutable SequenceNumber _sequenceNumber { 0 };
mutable MessageNumber _messageNumber { 0 };
mutable PacketPosition _packetPosition { PacketPosition::ONLY };
mutable MessagePartNumber _messagePartNumber { 0 };
};
} // namespace udt

View file

@ -38,12 +38,9 @@ PacketVersion versionForPacketType(PacketType packetType) {
case PacketType::EntityAdd:
case PacketType::EntityEdit:
case PacketType::EntityData:
return VERSION_ENTITIES_PARTICLE_ELLIPSOID_EMITTER;
case PacketType::AvatarData:
case PacketType::BulkAvatarData:
return 15;
return VERSION_ENTITIES_PROTOCOL_CHANNELS;
default:
return 14;
return 16;
}
}

View file

@ -141,5 +141,6 @@ const PacketVersion VERSION_ENTITIES_PARTICLE_RADIUS_PROPERTIES = 41;
const PacketVersion VERSION_ENTITIES_PARTICLE_COLOR_PROPERTIES = 42;
const PacketVersion VERSION_ENTITIES_PROTOCOL_HEADER_SWAP = 43;
const PacketVersion VERSION_ENTITIES_PARTICLE_ELLIPSOID_EMITTER = 44;
const PacketVersion VERSION_ENTITIES_PROTOCOL_CHANNELS = 45;
#endif // hifi_PacketHeaders_h

View file

@ -105,7 +105,7 @@ std::unique_ptr<Packet> PacketList::createPacketWithExtendedHeader() {
}
void PacketList::closeCurrentPacket(bool shouldSendEmpty) {
if (shouldSendEmpty && !_currentPacket) {
if (shouldSendEmpty && !_currentPacket && _packets.empty()) {
_currentPacket = createPacketWithExtendedHeader();
}
@ -132,6 +132,24 @@ QByteArray PacketList::getMessage() {
return data;
}
void PacketList::preparePackets(MessageNumber messageNumber) {
Q_ASSERT(_packets.size() > 0);
if (_packets.size() == 1) {
_packets.front()->writeMessageNumber(messageNumber, Packet::PacketPosition::ONLY, 0);
} else {
const auto second = ++_packets.begin();
const auto last = --_packets.end();
Packet::MessagePartNumber messagePartNumber = 0;
std::for_each(second, last, [&](const PacketPointer& packet) {
packet->writeMessageNumber(messageNumber, Packet::PacketPosition::MIDDLE, ++messagePartNumber);
});
_packets.front()->writeMessageNumber(messageNumber, Packet::PacketPosition::FIRST, 0);
_packets.back()->writeMessageNumber(messageNumber, Packet::PacketPosition::LAST, ++messagePartNumber);
}
}
qint64 PacketList::writeData(const char* data, qint64 maxSize) {
auto sizeRemaining = maxSize;

View file

@ -28,28 +28,29 @@ class Packet;
class PacketList : public QIODevice {
Q_OBJECT
public:
using MessageNumber = uint32_t;
using PacketPointer = std::unique_ptr<Packet>;
static std::unique_ptr<PacketList> create(PacketType packetType, QByteArray extendedHeader = QByteArray(),
bool isReliable = false, bool isOrdered = false);
static std::unique_ptr<PacketList> fromReceivedPackets(std::list<std::unique_ptr<Packet>>&& packets);
PacketType getType() const { return _packetType; }
bool isReliable() const { return _isReliable; }
bool isOrdered() const { return _isOrdered; }
int getNumPackets() const { return _packets.size() + (_currentPacket ? 1 : 0); }
size_t getDataSize() const;
size_t getMessageSize() const;
QByteArray getMessage();
QByteArray getExtendedHeader() const { return _extendedHeader; }
void startSegment();
void endSegment();
PacketType getType() const { return _packetType; }
int getNumPackets() const { return _packets.size() + (_currentPacket ? 1 : 0); }
QByteArray getExtendedHeader() const { return _extendedHeader; }
size_t getDataSize() const;
size_t getMessageSize() const;
void closeCurrentPacket(bool shouldSendEmpty = false);
QByteArray getMessage();
// QIODevice virtual functions
virtual bool isSequential() const { return false; }
virtual qint64 size() const { return getDataSize(); }
@ -60,6 +61,8 @@ public:
protected:
PacketList(PacketType packetType, QByteArray extendedHeader = QByteArray(), bool isReliable = false, bool isOrdered = false);
PacketList(PacketList&& other);
void preparePackets(MessageNumber messageNumber);
virtual qint64 writeData(const char* data, qint64 maxSize);
// Not implemented, added an assert so that it doesn't get used by accident
@ -70,6 +73,7 @@ protected:
private:
friend class ::LimitedNodeList;
friend class PacketQueue;
friend class SendQueue;
friend class Socket;

View file

@ -0,0 +1,72 @@
//
// PacketQueue.cpp
// libraries/networking/src/udt
//
// Created by Clement on 9/16/15.
// Copyright 2015 High Fidelity, Inc.
//
// Distributed under the Apache License, Version 2.0.
// See the accompanying file LICENSE or http://www.apache.org/licenses/LICENSE-2.0.html
//
#include "PacketQueue.h"
#include "PacketList.h"
using namespace udt;
MessageNumber PacketQueue::getNextMessageNumber() {
static const MessageNumber MAX_MESSAGE_NUMBER = MessageNumber(1) << MESSAGE_NUMBER_BITS;
_currentMessageNumber = (_currentMessageNumber + 1) % MAX_MESSAGE_NUMBER;
return _currentMessageNumber;
}
bool PacketQueue::isEmpty() const {
LockGuard locker(_packetsLock);
// Only the main channel and it is empty
return (_channels.size() == 1) && _channels.front().empty();
}
PacketQueue::PacketPointer PacketQueue::takePacket() {
LockGuard locker(_packetsLock);
if (isEmpty()) {
return PacketPointer();
}
// Find next non empty channel
if (_channels[nextIndex()].empty()) {
nextIndex();
}
auto& channel = _channels[_currentIndex];
Q_ASSERT(!channel.empty());
// Take front packet
auto packet = std::move(channel.front());
channel.pop_front();
// Remove now empty channel (Don't remove the main channel)
if (channel.empty() && _currentIndex != 0) {
channel.swap(_channels.back());
_channels.pop_back();
--_currentIndex;
}
return std::move(packet);
}
unsigned int PacketQueue::nextIndex() {
_currentIndex = (++_currentIndex) % _channels.size();
return _currentIndex;
}
void PacketQueue::queuePacket(PacketPointer packet) {
LockGuard locker(_packetsLock);
_channels.front().push_back(std::move(packet));
}
void PacketQueue::queuePacketList(PacketListPointer packetList) {
packetList->preparePackets(getNextMessageNumber());
LockGuard locker(_packetsLock);
_channels.push_back(std::move(packetList->_packets));
}

View file

@ -0,0 +1,59 @@
//
// PacketQueue.h
// libraries/networking/src/udt
//
// Created by Clement on 9/16/15.
// Copyright 2015 High Fidelity, Inc.
//
// Distributed under the Apache License, Version 2.0.
// See the accompanying file LICENSE or http://www.apache.org/licenses/LICENSE-2.0.html
//
#ifndef hifi_PacketQueue_h
#define hifi_PacketQueue_h
#include <list>
#include <vector>
#include <memory>
#include <mutex>
#include "Packet.h"
namespace udt {
class PacketList;
using MessageNumber = uint32_t;
class PacketQueue {
using Mutex = std::recursive_mutex;
using LockGuard = std::lock_guard<Mutex>;
using PacketPointer = std::unique_ptr<Packet>;
using PacketListPointer = std::unique_ptr<PacketList>;
using Channel = std::list<PacketPointer>;
using Channels = std::vector<Channel>;
public:
void queuePacket(PacketPointer packet);
void queuePacketList(PacketListPointer packetList);
bool isEmpty() const;
PacketPointer takePacket();
Mutex& getLock() { return _packetsLock; }
private:
MessageNumber getNextMessageNumber();
unsigned int nextIndex();
MessageNumber _currentMessageNumber { 0 };
mutable Mutex _packetsLock; // Protects the packets to be sent.
Channels _channels = Channels(1); // One channel per packet list + Main channel
unsigned int _currentIndex { 0 };
};
}
#endif // hifi_PacketQueue_h

View file

@ -28,9 +28,12 @@
using namespace udt;
template <typename Mutex1, typename Mutex2>
class DoubleLock {
public:
DoubleLock(std::mutex& mutex1, std::mutex& mutex2) : _mutex1(mutex1), _mutex2(mutex2) { }
using Lock = std::unique_lock<DoubleLock<Mutex1, Mutex2>>;
DoubleLock(Mutex1& mutex1, Mutex2& mutex2) : _mutex1(mutex1), _mutex2(mutex2) { }
DoubleLock(const DoubleLock&) = delete;
DoubleLock& operator=(const DoubleLock&) = delete;
@ -45,15 +48,15 @@ public:
void unlock() { _mutex1.unlock(); _mutex2.unlock(); }
private:
std::mutex& _mutex1;
std::mutex& _mutex2;
Mutex1& _mutex1;
Mutex2& _mutex2;
};
std::unique_ptr<SendQueue> SendQueue::create(Socket* socket, HifiSockAddr destination) {
auto queue = std::unique_ptr<SendQueue>(new SendQueue(socket, destination));
Q_ASSERT_X(socket, "SendQueue::create", "Must be called with a valid Socket*");
auto queue = std::unique_ptr<SendQueue>(new SendQueue(socket, destination));
// Setup queue private thread
QThread* thread = new QThread;
thread->setObjectName("Networking: SendQueue " + destination.objectName()); // Name thread for easier debug
@ -68,28 +71,20 @@ std::unique_ptr<SendQueue> SendQueue::create(Socket* socket, HifiSockAddr destin
thread->start();
return std::move(queue);
return queue;
}
SendQueue::SendQueue(Socket* socket, HifiSockAddr dest) :
_socket(socket),
_destination(dest)
{
}
void SendQueue::queuePacket(std::unique_ptr<Packet> packet) {
{
std::unique_lock<std::mutex> locker(_packetsLock);
_packets.push_back(std::move(packet));
// unlock the mutex before we notify
locker.unlock();
// call notify_one on the condition_variable_any in case the send thread is sleeping waiting for packets
_emptyCondition.notify_one();
}
_packets.queuePacket(std::move(packet));
// call notify_one on the condition_variable_any in case the send thread is sleeping waiting for packets
_emptyCondition.notify_one();
if (!this->thread()->isRunning() && _state == State::NotStarted) {
this->thread()->start();
@ -97,46 +92,10 @@ void SendQueue::queuePacket(std::unique_ptr<Packet> packet) {
}
void SendQueue::queuePacketList(std::unique_ptr<PacketList> packetList) {
Q_ASSERT(packetList->_packets.size() > 0);
{
auto messageNumber = getNextMessageNumber();
if (packetList->_packets.size() == 1) {
auto& packet = packetList->_packets.front();
packet->setPacketPosition(Packet::PacketPosition::ONLY);
packet->writeMessageNumber(messageNumber);
} else {
bool haveMarkedFirstPacket = false;
auto end = packetList->_packets.end();
auto lastElement = --packetList->_packets.end();
for (auto it = packetList->_packets.begin(); it != end; ++it) {
auto& packet = *it;
if (!haveMarkedFirstPacket) {
packet->setPacketPosition(Packet::PacketPosition::FIRST);
haveMarkedFirstPacket = true;
} else if (it == lastElement) {
packet->setPacketPosition(Packet::PacketPosition::LAST);
} else {
packet->setPacketPosition(Packet::PacketPosition::MIDDLE);
}
packet->writeMessageNumber(messageNumber);
}
}
std::unique_lock<std::mutex> locker(_packetsLock);
_packets.splice(_packets.end(), packetList->_packets);
// unlock the mutex so we can notify
locker.unlock();
// call notify_one on the condition_variable_any in case the send thread is sleeping waiting for packets
_emptyCondition.notify_one();
}
_packets.queuePacketList(std::move(packetList));
// call notify_one on the condition_variable_any in case the send thread is sleeping waiting for packets
_emptyCondition.notify_one();
if (!this->thread()->isRunning() && _state == State::NotStarted) {
this->thread()->start();
@ -147,10 +106,8 @@ void SendQueue::stop() {
_state = State::Stopped;
// in case we're waiting to send another handshake, release the condition_variable now so we cleanup sooner
// Notify all conditions in case we're waiting somewhere
_handshakeACKCondition.notify_one();
// in case the empty condition is waiting for packets/loss release it now so that the queue is cleaned up
_emptyCondition.notify_one();
}
@ -178,7 +135,7 @@ void SendQueue::ack(SequenceNumber ack) {
{ // remove any sequence numbers equal to or lower than this ACK in the loss list
std::lock_guard<std::mutex> nakLocker(_naksLock);
if (_naks.getLength() > 0 && _naks.getFirstSequenceNumber() <= ack) {
if (!_naks.isEmpty() && _naks.getFirstSequenceNumber() <= ack) {
_naks.remove(_naks.getFirstSequenceNumber(), ack);
}
}
@ -191,12 +148,10 @@ void SendQueue::nak(SequenceNumber start, SequenceNumber end) {
_timeoutExpiryCount = 0;
_lastReceiverResponse = uint64_t(QDateTime::currentMSecsSinceEpoch());
std::unique_lock<std::mutex> nakLocker(_naksLock);
_naks.insert(start, end);
// unlock the locked mutex before we notify
nakLocker.unlock();
{
std::lock_guard<std::mutex> nakLocker(_naksLock);
_naks.insert(start, end);
}
// call notify_one on the condition_variable_any in case the send thread is sleeping waiting for losses to re-send
_emptyCondition.notify_one();
@ -207,36 +162,47 @@ void SendQueue::overrideNAKListFromPacket(ControlPacket& packet) {
_timeoutExpiryCount = 0;
_lastReceiverResponse = uint64_t(QDateTime::currentMSecsSinceEpoch());
std::unique_lock<std::mutex> nakLocker(_naksLock);
_naks.clear();
SequenceNumber first, second;
while (packet.bytesLeftToRead() >= (qint64)(2 * sizeof(SequenceNumber))) {
packet.readPrimitive(&first);
packet.readPrimitive(&second);
{
std::lock_guard<std::mutex> nakLocker(_naksLock);
_naks.clear();
if (first == second) {
_naks.append(first);
} else {
_naks.append(first, second);
SequenceNumber first, second;
while (packet.bytesLeftToRead() >= (qint64)(2 * sizeof(SequenceNumber))) {
packet.readPrimitive(&first);
packet.readPrimitive(&second);
if (first == second) {
_naks.append(first);
} else {
_naks.append(first, second);
}
}
}
// unlock the mutex before we notify
nakLocker.unlock();
// call notify_one on the condition_variable_any in case the send thread is sleeping waiting for losses to re-send
_emptyCondition.notify_one();
}
void SendQueue::sendHandshake() {
std::unique_lock<std::mutex> handshakeLock { _handshakeMutex };
if (!_hasReceivedHandshakeACK) {
// we haven't received a handshake ACK from the client, send another now
static const auto handshakePacket = ControlPacket::create(ControlPacket::Handshake, 0);
_socket->writeBasePacket(*handshakePacket, _destination);
// we wait for the ACK or the re-send interval to expire
static const auto HANDSHAKE_RESEND_INTERVAL = std::chrono::milliseconds(100);
_handshakeACKCondition.wait_for(handshakeLock, HANDSHAKE_RESEND_INTERVAL);
}
}
void SendQueue::handshakeACK() {
std::unique_lock<std::mutex> locker { _handshakeMutex };
_hasReceivedHandshakeACK = true;
// unlock the mutex and notify on the handshake ACK condition
locker.unlock();
{
std::lock_guard<std::mutex> locker { _handshakeMutex };
_hasReceivedHandshakeACK = true;
}
// Notify on the handshake ACK condition
_handshakeACKCondition.notify_one();
}
@ -245,12 +211,6 @@ SequenceNumber SendQueue::getNextSequenceNumber() {
return _currentSequenceNumber;
}
uint32_t SendQueue::getNextMessageNumber() {
static const MessageNumber MAX_MESSAGE_NUMBER = MessageNumber(1) << MESSAGE_NUMBER_BITS;
_currentMessageNumber = (_currentMessageNumber + 1) % MAX_MESSAGE_NUMBER;
return _currentMessageNumber;
}
void SendQueue::sendNewPacketAndAddToSentList(std::unique_ptr<Packet> newPacket, SequenceNumber sequenceNumber) {
// write the sequence number and send the packet
newPacket->writeSequenceNumber(sequenceNumber);
@ -287,207 +247,88 @@ void SendQueue::run() {
_state = State::Running;
// Wait for handshake to be complete
while (_state == State::Running && !_hasReceivedHandshakeACK) {
sendHandshake();
// Keep processing events
QCoreApplication::sendPostedEvents(this);
// Once we're here we've either received the handshake ACK or it's going to be time to re-send a handshake.
// Either way let's continue processing - no packets will be sent if no handshake ACK has been received.
}
while (_state == State::Running) {
// Record how long the loop takes to execute
auto loopStartTimestamp = p_high_resolution_clock::now();
std::unique_lock<std::mutex> handshakeLock { _handshakeMutex };
if (!_hasReceivedHandshakeACK) {
// we haven't received a handshake ACK from the client
// if it has been at least 100ms since we last sent a handshake, send another now
static const auto HANDSHAKE_RESEND_INTERVAL_MS = std::chrono::milliseconds(100);
// hold the time of last send in a static
static auto lastSendHandshake = p_high_resolution_clock::now() - HANDSHAKE_RESEND_INTERVAL_MS;
if (p_high_resolution_clock::now() - lastSendHandshake >= HANDSHAKE_RESEND_INTERVAL_MS) {
// it has been long enough since last handshake, send another
static auto handshakePacket = ControlPacket::create(ControlPacket::Handshake, 0);
_socket->writeBasePacket(*handshakePacket, _destination);
lastSendHandshake = p_high_resolution_clock::now();
}
// we wait for the ACK or the re-send interval to expire
_handshakeACKCondition.wait_until(handshakeLock, p_high_resolution_clock::now() + HANDSHAKE_RESEND_INTERVAL_MS);
// Once we're here we've either received the handshake ACK or it's going to be time to re-send a handshake.
// Either way let's continue processing - no packets will be sent if no handshake ACK has been received.
}
handshakeLock.unlock();
const auto loopStartTimestamp = p_high_resolution_clock::now();
bool sentAPacket = maybeResendPacket();
// if we didn't find a packet to re-send AND we think we can fit a new packet on the wire
// (this is according to the current flow window size) then we send out a new packet
if (_hasReceivedHandshakeACK && !sentAPacket) {
if (seqlen(SequenceNumber { (uint32_t) _lastACKSequenceNumber }, _currentSequenceNumber) <= _flowWindowSize) {
sentAPacket = maybeSendNewPacket();
}
if (!sentAPacket) {
sentAPacket = maybeSendNewPacket();
}
// since we're a while loop, give the thread a chance to process events
QCoreApplication::sendPostedEvents(this, 0);
QCoreApplication::sendPostedEvents(this);
// we just processed events so check now if we were just told to stop
if (_state != State::Running) {
// If the send queue has been innactive, skip the sleep for
// Either _isRunning will have been set to false and we'll break
// Or something happened and we'll keep going
if (_state != State::Running || isInactive(sentAPacket)) {
return;
}
if (_hasReceivedHandshakeACK && !sentAPacket) {
// check if it is time to break this connection
// that will be the case if we have had 16 timeouts since hearing back from the client, and it has been
// at least 5 seconds
static const int NUM_TIMEOUTS_BEFORE_INACTIVE = 16;
static const int MIN_SECONDS_BEFORE_INACTIVE_MS = 5 * 1000;
auto sinceEpochNow = QDateTime::currentMSecsSinceEpoch();
if (_timeoutExpiryCount >= NUM_TIMEOUTS_BEFORE_INACTIVE
&& (sinceEpochNow - _lastReceiverResponse) > MIN_SECONDS_BEFORE_INACTIVE_MS) {
// If the flow window has been full for over CONSIDER_INACTIVE_AFTER,
// then signal the queue is inactive and return so it can be cleaned up
#ifdef UDT_CONNECTION_DEBUG
qCDebug(networking) << "SendQueue to" << _destination << "reached" << NUM_TIMEOUTS_BEFORE_INACTIVE << "timeouts"
<< "and 10s before receiving any ACK/NAK and is now inactive. Stopping.";
#endif
deactivate();
return;
} else {
// During our processing above we didn't send any packets
// If that is still the case we should use a condition_variable_any to sleep until we have data to handle.
// To confirm that the queue of packets and the NAKs list are still both empty we'll need to use the DoubleLock
DoubleLock doubleLock(_packetsLock, _naksLock);
if (doubleLock.try_lock()) {
// The packets queue and loss list mutexes are now both locked - check if they're still both empty
if (_packets.empty() && _naks.getLength() == 0) {
if (uint32_t(_lastACKSequenceNumber) == uint32_t(_currentSequenceNumber)) {
// we've sent the client as much data as we have (and they've ACKed it)
// either wait for new data to send or 5 seconds before cleaning up the queue
static const auto EMPTY_QUEUES_INACTIVE_TIMEOUT = std::chrono::seconds(5);
// use our condition_variable_any to wait
auto cvStatus = _emptyCondition.wait_for(doubleLock, EMPTY_QUEUES_INACTIVE_TIMEOUT);
// we have the double lock again - Make sure to unlock it
doubleLock.unlock();
if (cvStatus == std::cv_status::timeout) {
#ifdef UDT_CONNECTION_DEBUG
qCDebug(networking) << "SendQueue to" << _destination << "has been empty for"
<< EMPTY_QUEUES_INACTIVE_TIMEOUT.count()
<< "seconds and receiver has ACKed all packets."
<< "The queue is now inactive and will be stopped.";
#endif
deactivate();
return;
}
} else {
// We think the client is still waiting for data (based on the sequence number gap)
// Let's wait either for a response from the client or until the estimated timeout
auto waitDuration = std::chrono::microseconds(_estimatedTimeout);
// use our condition_variable_any to wait
auto cvStatus = _emptyCondition.wait_for(doubleLock, waitDuration);
if (cvStatus == std::cv_status::timeout) {
// increase the number of timeouts
++_timeoutExpiryCount;
if (SequenceNumber(_lastACKSequenceNumber) < _currentSequenceNumber) {
// after a timeout if we still have sent packets that the client hasn't ACKed we
// add them to the loss list
// Note that thanks to the DoubleLock we have the _naksLock right now
_naks.append(SequenceNumber(_lastACKSequenceNumber) + 1, _currentSequenceNumber);
}
}
// we have the double lock again - Make sure to unlock it
doubleLock.unlock();
// skip to the next iteration
continue;
}
} else {
// we got the try_lock but failed the other conditionals so we need to unlock
doubleLock.unlock();
}
}
}
}
auto loopEndTimestamp = p_high_resolution_clock::now();
// sleep as long as we need until next packet send, if we can
auto timeToSleep = (loopStartTimestamp + std::chrono::microseconds(_packetSendPeriod)) - loopEndTimestamp;
if (timeToSleep > timeToSleep.zero()) {
std::this_thread::sleep_for(timeToSleep);
}
const auto loopEndTimestamp = p_high_resolution_clock::now();
const auto timeToSleep = (loopStartTimestamp + std::chrono::microseconds(_packetSendPeriod)) - loopEndTimestamp;
std::this_thread::sleep_for(timeToSleep);
}
}
bool SendQueue::maybeSendNewPacket() {
// we didn't re-send a packet, so time to send a new one
std::unique_lock<std::mutex> locker(_packetsLock);
if (_packets.size() > 0) {
SequenceNumber nextNumber = getNextSequenceNumber();
if (seqlen(SequenceNumber { (uint32_t) _lastACKSequenceNumber }, _currentSequenceNumber) <= _flowWindowSize) {
// we didn't re-send a packet, so time to send a new one
// grab the first packet we will send
std::unique_ptr<Packet> firstPacket;
firstPacket.swap(_packets.front());
_packets.pop_front();
std::unique_ptr<Packet> secondPacket;
bool shouldSendPairTail = false;
if (((uint32_t) nextNumber & 0xF) == 0) {
// the first packet is the first in a probe pair - every 16 (rightmost 16 bits = 0) packets
// pull off a second packet if we can before we unlock
shouldSendPairTail = true;
if (!_packets.isEmpty()) {
SequenceNumber nextNumber = getNextSequenceNumber();
if (_packets.size() > 0) {
secondPacket.swap(_packets.front());
_packets.pop_front();
// grab the first packet we will send
std::unique_ptr<Packet> firstPacket = _packets.takePacket();
Q_ASSERT(firstPacket);
std::unique_ptr<Packet> secondPacket;
bool shouldSendPairTail = false;
if (((uint32_t) nextNumber & 0xF) == 0) {
// the first packet is the first in a probe pair - every 16 (rightmost 16 bits = 0) packets
// pull off a second packet if we can before we unlock
shouldSendPairTail = true;
secondPacket = _packets.takePacket();
}
// definitely send the first packet
sendNewPacketAndAddToSentList(move(firstPacket), nextNumber);
// do we have a second in a pair to send as well?
if (secondPacket) {
sendNewPacketAndAddToSentList(move(secondPacket), getNextSequenceNumber());
} else if (shouldSendPairTail) {
// we didn't get a second packet to send in the probe pair
// send a control packet of type ProbePairTail so the receiver can still do
// proper bandwidth estimation
static auto pairTailPacket = ControlPacket::create(ControlPacket::ProbeTail);
_socket->writeBasePacket(*pairTailPacket, _destination);
}
// We sent our packet(s), return here
return true;
}
// unlock the packets, we're done pulling
locker.unlock();
// definitely send the first packet
sendNewPacketAndAddToSentList(move(firstPacket), nextNumber);
// do we have a second in a pair to send as well?
if (secondPacket) {
sendNewPacketAndAddToSentList(move(secondPacket), getNextSequenceNumber());
} else if (shouldSendPairTail) {
// we didn't get a second packet to send in the probe pair
// send a control packet of type ProbePairTail so the receiver can still do
// proper bandwidth estimation
static auto pairTailPacket = ControlPacket::create(ControlPacket::ProbeTail);
_socket->writeBasePacket(*pairTailPacket, _destination);
}
// We sent our packet(s), return here
return true;
}
// No packets were sent
return false;
}
@ -499,7 +340,7 @@ bool SendQueue::maybeResendPacket() {
std::unique_lock<std::mutex> naksLocker(_naksLock);
if (_naks.getLength() > 0) {
if (!_naks.isEmpty()) {
// pull the sequence number we need to re-send
SequenceNumber resendNumber = _naks.popFirstSequenceNumber();
naksLocker.unlock();
@ -538,6 +379,89 @@ bool SendQueue::maybeResendPacket() {
return false;
}
bool SendQueue::isInactive(bool sentAPacket) {
if (!sentAPacket) {
// check if it is time to break this connection
// that will be the case if we have had 16 timeouts since hearing back from the client, and it has been
// at least 5 seconds
static const int NUM_TIMEOUTS_BEFORE_INACTIVE = 16;
static const int MIN_SECONDS_BEFORE_INACTIVE_MS = 5 * 1000;
if (_timeoutExpiryCount >= NUM_TIMEOUTS_BEFORE_INACTIVE &&
(QDateTime::currentMSecsSinceEpoch() - _lastReceiverResponse) > MIN_SECONDS_BEFORE_INACTIVE_MS) {
// If the flow window has been full for over CONSIDER_INACTIVE_AFTER,
// then signal the queue is inactive and return so it can be cleaned up
#ifdef UDT_CONNECTION_DEBUG
qCDebug(networking) << "SendQueue to" << _destination << "reached" << NUM_TIMEOUTS_BEFORE_INACTIVE << "timeouts"
<< "and 5s before receiving any ACK/NAK and is now inactive. Stopping.";
#endif
deactivate();
return true;
}
// During our processing above we didn't send any packets
// If that is still the case we should use a condition_variable_any to sleep until we have data to handle.
// To confirm that the queue of packets and the NAKs list are still both empty we'll need to use the DoubleLock
using DoubleLock = DoubleLock<std::recursive_mutex, std::mutex>;
DoubleLock doubleLock(_packets.getLock(), _naksLock);
DoubleLock::Lock locker(doubleLock, std::try_to_lock);
if (locker.owns_lock() && _packets.isEmpty() && _naks.isEmpty()) {
// The packets queue and loss list mutexes are now both locked and they're both empty
if (uint32_t(_lastACKSequenceNumber) == uint32_t(_currentSequenceNumber)) {
// we've sent the client as much data as we have (and they've ACKed it)
// either wait for new data to send or 5 seconds before cleaning up the queue
static const auto EMPTY_QUEUES_INACTIVE_TIMEOUT = std::chrono::seconds(5);
// use our condition_variable_any to wait
auto cvStatus = _emptyCondition.wait_for(locker, EMPTY_QUEUES_INACTIVE_TIMEOUT);
// we have the lock again - Make sure to unlock it
locker.unlock();
if (cvStatus == std::cv_status::timeout) {
#ifdef UDT_CONNECTION_DEBUG
qCDebug(networking) << "SendQueue to" << _destination << "has been empty for"
<< EMPTY_QUEUES_INACTIVE_TIMEOUT.count()
<< "seconds and receiver has ACKed all packets."
<< "The queue is now inactive and will be stopped.";
#endif
// Deactivate queue
deactivate();
return true;
}
} else {
// We think the client is still waiting for data (based on the sequence number gap)
// Let's wait either for a response from the client or until the estimated timeout
auto waitDuration = std::chrono::microseconds(_estimatedTimeout);
// use our condition_variable_any to wait
auto cvStatus = _emptyCondition.wait_for(locker, waitDuration);
if (cvStatus == std::cv_status::timeout) {
// increase the number of timeouts
++_timeoutExpiryCount;
if (SequenceNumber(_lastACKSequenceNumber) < _currentSequenceNumber) {
// after a timeout if we still have sent packets that the client hasn't ACKed we
// add them to the loss list
// Note that thanks to the DoubleLock we have the _naksLock right now
_naks.append(SequenceNumber(_lastACKSequenceNumber) + 1, _currentSequenceNumber);
}
}
}
}
}
return false;
}
void SendQueue::deactivate() {
// this queue is inactive - emit that signal and stop the while
emit queueInactive();

View file

@ -28,6 +28,7 @@
#include "../HifiSockAddr.h"
#include "Constants.h"
#include "PacketQueue.h"
#include "SequenceNumber.h"
#include "LossList.h"
@ -38,8 +39,6 @@ class ControlPacket;
class Packet;
class PacketList;
class Socket;
using MessageNumber = uint32_t;
class SendQueue : public QObject {
Q_OBJECT
@ -87,29 +86,29 @@ private:
SendQueue(SendQueue& other) = delete;
SendQueue(SendQueue&& other) = delete;
void sendHandshake();
void sendPacket(const Packet& packet);
void sendNewPacketAndAddToSentList(std::unique_ptr<Packet> newPacket, SequenceNumber sequenceNumber);
bool maybeSendNewPacket(); // Figures out what packet to send next
bool maybeResendPacket(); // Determines whether to resend a packet and which one
bool isInactive(bool sentAPacket);
void deactivate(); // makes the queue inactive and cleans it up
// Increments current sequence number and return it
SequenceNumber getNextSequenceNumber();
MessageNumber getNextMessageNumber();
mutable std::mutex _packetsLock; // Protects the packets to be sent list.
std::list<std::unique_ptr<Packet>> _packets; // List of packets to be sent
PacketQueue _packets;
Socket* _socket { nullptr }; // Socket to send packet on
HifiSockAddr _destination; // Destination addr
std::atomic<uint32_t> _lastACKSequenceNumber { 0 }; // Last ACKed sequence number
MessageNumber _currentMessageNumber { 0 };
SequenceNumber _currentSequenceNumber; // Last sequence number sent out
std::atomic<uint32_t> _atomicCurrentSequenceNumber { 0 };// Atomic for last sequence number sent out
std::atomic<uint32_t> _atomicCurrentSequenceNumber { 0 }; // Atomic for last sequence number sent out
std::atomic<int> _packetSendPeriod { 0 }; // Interval between two packet send event in microseconds, set from CC
std::atomic<State> _state { State::NotStarted };

View file

@ -282,7 +282,7 @@ void UDTTest::sendPacket() {
packetList->write(randomPaddedData);
}
packetList->closeCurrentPacket(false);
packetList->closeCurrentPacket();
_totalQueuedBytes += packetList->getDataSize();
_totalQueuedPackets += packetList->getNumPackets();