Merge pull request #34 from birarda/atp

fixes for ordered sending via UDT, asset system
This commit is contained in:
Clément Brisset 2015-08-28 20:30:22 +02:00
commit 16d082e7c8
16 changed files with 155 additions and 70 deletions

View file

@ -42,8 +42,8 @@ const long long ASSIGNMENT_REQUEST_INTERVAL_MSECS = 1 * 1000;
int hifiSockAddrMeta = qRegisterMetaType<HifiSockAddr>("HifiSockAddr");
AssignmentClient::AssignmentClient(Assignment::Type requestAssignmentType, QString assignmentPool,
QUuid walletUUID, QString assignmentServerHostname, quint16 assignmentServerPort,
quint16 assignmentMonitorPort) :
quint16 listenPort, QUuid walletUUID, QString assignmentServerHostname,
quint16 assignmentServerPort, quint16 assignmentMonitorPort) :
_assignmentServerHostname(DEFAULT_ASSIGNMENT_SERVER_HOSTNAME)
{
LogUtils::init();
@ -53,7 +53,7 @@ AssignmentClient::AssignmentClient(Assignment::Type requestAssignmentType, QStri
auto addressManager = DependencyManager::set<AddressManager>();
// create a NodeList as an unassigned client, must be after addressManager
auto nodeList = DependencyManager::set<NodeList>(NodeType::Unassigned);
auto nodeList = DependencyManager::set<NodeList>(NodeType::Unassigned, listenPort);
auto animationCache = DependencyManager::set<AnimationCache>();
auto entityScriptingInterface = DependencyManager::set<EntityScriptingInterface>();

View file

@ -23,6 +23,7 @@ class AssignmentClient : public QObject {
Q_OBJECT
public:
AssignmentClient(Assignment::Type requestAssignmentType, QString assignmentPool,
quint16 listenPort,
QUuid walletUUID, QString assignmentServerHostname, quint16 assignmentServerPort,
quint16 assignmentMonitorPort);
~AssignmentClient();

View file

@ -59,6 +59,10 @@ AssignmentClientApp::AssignmentClientApp(int argc, char* argv[]) :
const QCommandLineOption poolOption(ASSIGNMENT_POOL_OPTION, "set assignment pool", "pool-name");
parser.addOption(poolOption);
const QCommandLineOption portOption(ASSIGNMENT_CLIENT_LISTEN_PORT_OPTION,
"UDP port for this assignment client (or monitor)", "port");
parser.addOption(portOption);
const QCommandLineOption walletDestinationOption(ASSIGNMENT_WALLET_DESTINATION_ID_OPTION,
"set wallet destination", "wallet-uuid");
@ -158,12 +162,18 @@ AssignmentClientApp::AssignmentClientApp(int argc, char* argv[]) :
// check for an overriden assignment server port
quint16 assignmentServerPort = DEFAULT_DOMAIN_SERVER_PORT;
if (argumentVariantMap.contains(ASSIGNMENT_WALLET_DESTINATION_ID_OPTION)) {
assignmentServerPort = argumentVariantMap.value(CUSTOM_ASSIGNMENT_SERVER_PORT_OPTION).toString().toUInt();
assignmentServerPort = argumentVariantMap.value(CUSTOM_ASSIGNMENT_SERVER_PORT_OPTION).toUInt();
}
if (parser.isSet(assignmentServerPortOption)) {
assignmentServerPort = parser.value(assignmentServerPortOption).toInt();
}
// check for an overidden listen port
quint16 listenPort = 0;
if (argumentVariantMap.contains(ASSIGNMENT_CLIENT_LISTEN_PORT_OPTION)) {
listenPort = argumentVariantMap.value(ASSIGNMENT_CLIENT_LISTEN_PORT_OPTION).toUInt();
}
if (parser.isSet(numChildsOption)) {
if (minForks && minForks > numForks) {
@ -185,12 +195,12 @@ AssignmentClientApp::AssignmentClientApp(int argc, char* argv[]) :
if (numForks || minForks || maxForks) {
AssignmentClientMonitor* monitor = new AssignmentClientMonitor(numForks, minForks, maxForks,
requestAssignmentType, assignmentPool,
walletUUID, assignmentServerHostname,
listenPort, walletUUID, assignmentServerHostname,
assignmentServerPort);
monitor->setParent(this);
connect(this, &QCoreApplication::aboutToQuit, monitor, &AssignmentClientMonitor::aboutToQuit);
} else {
AssignmentClient* client = new AssignmentClient(requestAssignmentType, assignmentPool,
AssignmentClient* client = new AssignmentClient(requestAssignmentType, assignmentPool, listenPort,
walletUUID, assignmentServerHostname,
assignmentServerPort, monitorPort);
client->setParent(this);

View file

@ -17,15 +17,15 @@
const QString ASSIGNMENT_TYPE_OVERRIDE_OPTION = "t";
const QString ASSIGNMENT_POOL_OPTION = "pool";
const QString ASSIGNMENT_CLIENT_LISTEN_PORT_OPTION = "p";
const QString ASSIGNMENT_WALLET_DESTINATION_ID_OPTION = "wallet";
const QString CUSTOM_ASSIGNMENT_SERVER_HOSTNAME_OPTION = "a";
const QString CUSTOM_ASSIGNMENT_SERVER_PORT_OPTION = "p";
const QString CUSTOM_ASSIGNMENT_SERVER_HOSTNAME_OPTION = "i";
const QString CUSTOM_ASSIGNMENT_SERVER_PORT_OPTION = "server-port";
const QString ASSIGNMENT_NUM_FORKS_OPTION = "n";
const QString ASSIGNMENT_MIN_FORKS_OPTION = "min";
const QString ASSIGNMENT_MAX_FORKS_OPTION = "max";
const QString ASSIGNMENT_CLIENT_MONITOR_PORT_OPTION = "monitor-port";
class AssignmentClientApp : public QCoreApplication {
Q_OBJECT
public:

View file

@ -28,7 +28,7 @@ AssignmentClientMonitor::AssignmentClientMonitor(const unsigned int numAssignmen
const unsigned int minAssignmentClientForks,
const unsigned int maxAssignmentClientForks,
Assignment::Type requestAssignmentType, QString assignmentPool,
QUuid walletUUID, QString assignmentServerHostname,
quint16 listenPort, QUuid walletUUID, QString assignmentServerHostname,
quint16 assignmentServerPort) :
_numAssignmentClientForks(numAssignmentClientForks),
_minAssignmentClientForks(minAssignmentClientForks),
@ -50,7 +50,7 @@ AssignmentClientMonitor::AssignmentClientMonitor(const unsigned int numAssignmen
// create a NodeList so we can receive stats from children
DependencyManager::registerInheritance<LimitedNodeList, NodeList>();
auto addressManager = DependencyManager::set<AddressManager>();
auto nodeList = DependencyManager::set<LimitedNodeList>();
auto nodeList = DependencyManager::set<LimitedNodeList>(listenPort);
auto& packetReceiver = DependencyManager::get<NodeList>()->getPacketReceiver();
packetReceiver.registerListener(PacketType::AssignmentClientStatus, this, "handleChildStatusPacket");

View file

@ -28,7 +28,7 @@ class AssignmentClientMonitor : public QObject {
public:
AssignmentClientMonitor(const unsigned int numAssignmentClientForks, const unsigned int minAssignmentClientForks,
const unsigned int maxAssignmentClientForks, Assignment::Type requestAssignmentType,
QString assignmentPool, QUuid walletUUID, QString assignmentServerHostname,
QString assignmentPool, quint16 listenPort, QUuid walletUUID, QString assignmentServerHostname,
quint16 assignmentServerPort);
~AssignmentClientMonitor();

View file

@ -15,14 +15,20 @@
void FileResourceRequest::doSend() {
QString filename = _url.toLocalFile();
QFile file(filename);
_state = Finished;
if (file.open(QFile::ReadOnly)) {
_data = file.readAll();
_result = ResourceRequest::Success;
emit finished();
if (file.exists()) {
if (file.open(QFile::ReadOnly)) {
_data = file.readAll();
_result = ResourceRequest::Success;
} else {
_result = ResourceRequest::AccessDenied;
}
} else {
_result = ResourceRequest::AccessDenied;
emit finished();
_result = ResourceRequest::NotFound;
}
emit finished();
}

View file

@ -84,7 +84,7 @@ void HTTPResourceRequest::onDownloadProgress(qint64 bytesReceived, qint64 bytesT
}
void HTTPResourceRequest::onTimeout() {
Q_ASSERT(_state != Unsent);
Q_ASSERT(_state != NotStarted);
if (_state == InProgress) {
qCDebug(networking) << "Timed out loading " << _url;

View file

@ -17,7 +17,7 @@ ResourceRequest::ResourceRequest(QObject* parent, const QUrl& url) :
}
void ResourceRequest::send() {
Q_ASSERT(_state == Unsent);
Q_ASSERT(_state == NotStarted);
_state = InProgress;
doSend();

View file

@ -21,7 +21,7 @@ public:
ResourceRequest(QObject* parent, const QUrl& url);
enum State {
Unsent = 0,
NotStarted = 0,
InProgress,
Finished
};
@ -51,7 +51,7 @@ protected:
virtual void doSend() = 0;
QUrl _url;
State _state { Unsent };
State _state { NotStarted };
Result _result;
QByteArray _data;
bool _cacheEnabled { true };

View file

@ -78,8 +78,9 @@ SendQueue& Connection::getSendQueue() {
QObject::connect(_sendQueue.get(), &SendQueue::packetRetransmitted, this, &Connection::recordRetransmission);
QObject::connect(_sendQueue.get(), &SendQueue::queueInactive, this, &Connection::queueInactive);
// set defaults on the send queue from our congestion control object
// set defaults on the send queue from our congestion control object and estimatedTimeout()
_sendQueue->setPacketSendPeriod(_congestionControl->_packetSendPeriod);
_sendQueue->setEstimatedTimeout(estimatedTimeout());
_sendQueue->setFlowWindowSize(std::min(_flowWindowSize, (int) _congestionControl->_congestionWindowSize));
}
@ -237,7 +238,7 @@ void Connection::sendLightACK() {
// create the light ACK packet, make it static so we can re-use it
static const int LIGHT_ACK_PACKET_PAYLOAD_BYTES = sizeof(SequenceNumber);
static auto lightACKPacket = ControlPacket::create(ControlPacket::ACK, LIGHT_ACK_PACKET_PAYLOAD_BYTES);
static auto lightACKPacket = ControlPacket::create(ControlPacket::LightACK, LIGHT_ACK_PACKET_PAYLOAD_BYTES);
// reset the lightACKPacket before we go to write the ACK to it
lightACKPacket->reset();
@ -407,13 +408,13 @@ void Connection::processControl(std::unique_ptr<ControlPacket> controlPacket) {
switch (controlPacket->getType()) {
case ControlPacket::ACK:
if (_hasReceivedHandshakeACK) {
if (controlPacket->getPayloadSize() == sizeof(SequenceNumber)) {
processLightACK(move(controlPacket));
} else {
processACK(move(controlPacket));
}
processACK(move(controlPacket));
}
break;
case ControlPacket::LightACK:
if (_hasReceivedHandshakeACK) {
processLightACK(move(controlPacket));
}
case ControlPacket::ACK2:
if (_hasReceivedHandshake) {
processACK2(move(controlPacket));
@ -727,9 +728,12 @@ void Connection::updateCongestionControlAndSendQueue(std::function<void ()> cong
// fire congestion control callback
congestionCallback();
auto& sendQueue = getSendQueue();
// now that we've updated the congestion control, update the packet send period and flow window size
getSendQueue().setPacketSendPeriod(_congestionControl->_packetSendPeriod);
getSendQueue().setFlowWindowSize(std::min(_flowWindowSize, (int) _congestionControl->_congestionWindowSize));
sendQueue.setPacketSendPeriod(_congestionControl->_packetSendPeriod);
sendQueue.setEstimatedTimeout(estimatedTimeout());
sendQueue.setFlowWindowSize(std::min(_flowWindowSize, (int) _congestionControl->_congestionWindowSize));
// record connection stats
_stats.recordPacketSendPeriod(_congestionControl->_packetSendPeriod);

View file

@ -29,6 +29,7 @@ public:
enum Type : uint16_t {
ACK,
ACK2,
LightACK,
NAK,
TimeoutNAK,
Handshake,

View file

@ -154,6 +154,9 @@ void SendQueue::sendPacket(const Packet& packet) {
}
void SendQueue::ack(SequenceNumber ack) {
// this is a response from the client, re-set our timeout expiry
_timeoutExpiryCount = 0;
if (_lastACKSequenceNumber == (uint32_t) ack) {
return;
}
@ -177,6 +180,9 @@ void SendQueue::ack(SequenceNumber ack) {
}
void SendQueue::nak(SequenceNumber start, SequenceNumber end) {
// this is a response from the client, re-set our timeout expiry
_timeoutExpiryCount = 0;
std::unique_lock<std::mutex> nakLocker(_naksLock);
_naks.insert(start, end);
@ -189,6 +195,9 @@ void SendQueue::nak(SequenceNumber start, SequenceNumber end) {
}
void SendQueue::overrideNAKListFromPacket(ControlPacket& packet) {
// this is a response from the client, re-set our timeout expiry
_timeoutExpiryCount = 0;
std::unique_lock<std::mutex> nakLocker(_naksLock);
_naks.clear();
@ -212,7 +221,7 @@ void SendQueue::overrideNAKListFromPacket(ControlPacket& packet) {
}
void SendQueue::handshakeACK() {
std::unique_lock<std::mutex> locker(_handshakeMutex);
std::unique_lock<std::mutex> locker { _handshakeMutex };
_hasReceivedHandshakeACK = true;
@ -258,7 +267,7 @@ void SendQueue::run() {
while (_isRunning) {
// Record how long the loop takes to execute
auto loopStartTimestamp = high_resolution_clock::now();
std::unique_lock<std::mutex> handshakeLock { _handshakeMutex };
if (!_hasReceivedHandshakeACK) {
@ -295,22 +304,15 @@ void SendQueue::run() {
handshakeLock.unlock();
bool sentAPacket = maybeResendPacket();
bool flowWindowFull = false;
// if we didn't find a packet to re-send AND we think we can fit a new packet on the wire
// (this is according to the current flow window size) then we send out a new packet
if (_hasReceivedHandshakeACK && !sentAPacket) {
flowWindowFull = (seqlen(SequenceNumber { (uint32_t) _lastACKSequenceNumber }, _currentSequenceNumber) >
_flowWindowSize);
sentAPacket = maybeSendNewPacket();
if (seqlen(SequenceNumber { (uint32_t) _lastACKSequenceNumber }, _currentSequenceNumber) <= _flowWindowSize) {
sentAPacket = maybeSendNewPacket();
}
}
// Keep track of how long the flow window has been full for
if (flowWindowFull && !_flowWindowWasFull) {
_flowWindowFullSince = loopStartTimestamp;
}
_flowWindowWasFull = flowWindowFull;
// since we're a while loop, give the thread a chance to process events
QCoreApplication::processEvents();
@ -320,38 +322,90 @@ void SendQueue::run() {
}
if (_hasReceivedHandshakeACK && !sentAPacket) {
static const std::chrono::seconds CONSIDER_INACTIVE_AFTER { 5 };
// check if it is time to break this connection
if (flowWindowFull && (high_resolution_clock::now() - _flowWindowFullSince) > CONSIDER_INACTIVE_AFTER) {
// that will be the case if we have had 16 timeouts since hearing back from the client, and it has been
// at least 10 seconds
static const int NUM_TIMEOUTS_BEFORE_INACTIVE = 16;
if (_timeoutExpiryCount >= NUM_TIMEOUTS_BEFORE_INACTIVE) {
// If the flow window has been full for over CONSIDER_INACTIVE_AFTER,
// then signal the queue is inactive and return so it can be cleaned up
qDebug() << "SendQueue to" << _destination << "reached" << NUM_TIMEOUTS_BEFORE_INACTIVE << "timeouts and is"
<< "considered inactive. It is now being stopped.";
emit queueInactive();
_isRunning = false;
return;
} else {
// During our processing above we didn't send any packets and the flow window is not full.
// During our processing above we didn't send any packets
// If that is still the case we should use a condition_variable_any to sleep until we have data to handle.
// To confirm that the queue of packets and the NAKs list are still both empty we'll need to use the DoubleLock
DoubleLock doubleLock(_packetsLock, _naksLock);
// The packets queue and loss list mutexes are now both locked - check if they're still both empty
if (doubleLock.try_lock() && _packets.empty() && _naks.getLength() == 0) {
if (doubleLock.try_lock()) {
// The packets queue and loss list mutexes are now both locked - check if they're still both empty
// both are empty - let's use a condition_variable_any to wait
auto cvStatus = _emptyCondition.wait_for(doubleLock, CONSIDER_INACTIVE_AFTER);
// we have the double lock again - Make sure to unlock it
doubleLock.unlock();
if (cvStatus == std::cv_status::timeout) {
// the wait_for released because we've been inactive for too long
// so emit our inactive signal and return so the send queue can be cleaned up
emit queueInactive();
return;
}
// skip to the next iteration
continue;
if (_packets.empty() && _naks.getLength() == 0) {
if (uint32_t(_lastACKSequenceNumber) == uint32_t(_currentSequenceNumber)) {
// we've sent the client as much data as we have (and they've ACKed it)
// either wait for new data to send or 5 seconds before cleaning up the queue
static const auto EMPTY_QUEUES_INACTIVE_TIMEOUT = std::chrono::seconds(5);
// use our condition_variable_any to wait
auto cvStatus = _emptyCondition.wait_for(doubleLock, EMPTY_QUEUES_INACTIVE_TIMEOUT);
// we have the double lock again - Make sure to unlock it
doubleLock.unlock();
if (cvStatus == std::cv_status::timeout) {
qDebug() << "SendQueue to" << _destination << "has been empty for"
<< EMPTY_QUEUES_INACTIVE_TIMEOUT.count()
<< "seconds and receiver has ACKed all packets."
<< "The queue is considered inactive and will be stopped.";
// this queue is inactive - emit that signal and stop the while
emit queueInactive();
_isRunning = false;
return;
}
} else {
// We think the client is still waiting for data (based on the sequence number gap)
// Let's wait either for a response from the client or until the estimated timeout
auto waitDuration = std::chrono::microseconds(_estimatedTimeout);
// use our condition_variable_any to wait
auto cvStatus = _emptyCondition.wait_for(doubleLock, waitDuration);
if (cvStatus == std::cv_status::timeout) {
// increase the number of timeouts
++_timeoutExpiryCount;
if (SequenceNumber(_lastACKSequenceNumber) < _currentSequenceNumber) {
// after a timeout if we still have sent packets that the client hasn't ACKed we
// add them to the loss list
// Note that thanks to the DoubleLock we have the _naksLock right now
_naks.append(SequenceNumber(_lastACKSequenceNumber) + 1, _currentSequenceNumber);
}
}
// we have the double lock again - Make sure to unlock it
doubleLock.unlock();
// skip to the next iteration
continue;
}
} else {
// we got the try_lock but failed the other conditionals so we need to unlock
doubleLock.unlock();
}
}
}
}
@ -410,9 +464,10 @@ bool SendQueue::maybeSendNewPacket() {
}
bool SendQueue::maybeResendPacket() {
std::unique_lock<std::mutex> naksLocker(_naksLock);
// the following while makes sure that we find a packet to re-send, if there is one
while (true) {
std::unique_lock<std::mutex> naksLocker(_naksLock);
if (_naks.getLength() > 0) {
// pull the sequence number we need to re-send
@ -452,4 +507,3 @@ bool SendQueue::maybeResendPacket() {
// No packet was resent
return false;
}

View file

@ -57,6 +57,8 @@ public:
int getPacketSendPeriod() const { return _packetSendPeriod; }
void setPacketSendPeriod(int newPeriod) { _packetSendPeriod = newPeriod; }
void setEstimatedTimeout(int estimatedTimeout) { _estimatedTimeout = estimatedTimeout; }
public slots:
void stop();
@ -104,11 +106,10 @@ private:
std::atomic<int> _packetSendPeriod { 0 }; // Interval between two packet send event in microseconds, set from CC
std::atomic<bool> _isRunning { false };
std::atomic<int> _flowWindowSize { 0 }; // Flow control window size (number of packets that can be on wire) - set from CC
std::atomic<int> _estimatedTimeout { 0 }; // Estimated timeout, set from CC
std::atomic<int> _timeoutExpiryCount { 0 }; // The number of times the timeout has expired without response from client
// Used to detect when the connection becomes inactive for too long
bool _flowWindowWasFull = false;
time_point _flowWindowFullSince;
std::atomic<int> _flowWindowSize { 0 }; // Flow control window size (number of packets that can be on wire) - set from CC
mutable std::mutex _naksLock; // Protects the naks list.
LossList _naks; // Sequence numbers of packets to resend
@ -117,7 +118,7 @@ private:
std::unordered_map<SequenceNumber, std::unique_ptr<Packet>> _sentPackets; // Packets waiting for ACK.
std::mutex _handshakeMutex; // Protects the handshake ACK condition_variable
bool _hasReceivedHandshakeACK { false }; // flag for receipt of handshake ACK from client
std::atomic<bool> _hasReceivedHandshakeACK { false }; // flag for receipt of handshake ACK from client
std::condition_variable _handshakeACKCondition;
std::condition_variable_any _emptyCondition;

View file

@ -143,6 +143,8 @@ qint64 Socket::writeDatagram(const QByteArray& datagram, const HifiSockAddr& soc
}
Connection& Socket::findOrCreateConnection(const HifiSockAddr& sockAddr) {
QMutexLocker locker(&_connectionsMutex);
auto it = _connectionsHash.find(sockAddr);
if (it == _connectionsHash.end()) {
@ -160,12 +162,16 @@ void Socket::clearConnections() {
return;
}
QMutexLocker locker(&_connectionsMutex);
// clear all of the current connections in the socket
qDebug() << "Clearing all remaining connections in Socket.";
_connectionsHash.clear();
}
void Socket::cleanupConnection(HifiSockAddr sockAddr) {
QMutexLocker locker(&_connectionsMutex);
qCDebug(networking) << "Socket::cleanupConnection called for UDT connection to" << sockAddr;
_connectionsHash.erase(sockAddr);
}

View file

@ -94,6 +94,8 @@ private:
std::unordered_map<HifiSockAddr, SequenceNumber> _unreliableSequenceNumbers;
std::unordered_map<HifiSockAddr, std::unique_ptr<Connection>> _connectionsHash;
QMutex _connectionsMutex; // guards concurrent access to connections hashs
int _synInterval = 10; // 10ms
QTimer _synTimer;