avoid creating multiple send queues from connection

This commit is contained in:
Stephen Birarda 2015-08-28 14:49:27 -07:00
parent d1386fcb14
commit 1e09321b1a
4 changed files with 33 additions and 28 deletions

View file

@ -74,20 +74,22 @@ void Connection::resetRTT() {
SendQueue& Connection::getSendQueue() { SendQueue& Connection::getSendQueue() {
if (!_sendQueue) { if (!_sendQueue) {
// Lasily create send queue std::call_once(_sendQueueCreateFlag, [this](){
_sendQueue = SendQueue::create(_parentSocket, _destination); // Lasily create send queue
_sendQueue = SendQueue::create(_parentSocket, _destination);
qDebug() << "Created SendQueue for connection to" << _destination;
qDebug() << "Created SendQueue for connection to" << _destination;
QObject::connect(_sendQueue.get(), &SendQueue::packetSent, this, &Connection::packetSent);
QObject::connect(_sendQueue.get(), &SendQueue::packetSent, this, &Connection::recordSentPackets); QObject::connect(_sendQueue.get(), &SendQueue::packetSent, this, &Connection::packetSent);
QObject::connect(_sendQueue.get(), &SendQueue::packetRetransmitted, this, &Connection::recordRetransmission); QObject::connect(_sendQueue.get(), &SendQueue::packetSent, this, &Connection::recordSentPackets);
QObject::connect(_sendQueue.get(), &SendQueue::queueInactive, this, &Connection::queueInactive); QObject::connect(_sendQueue.get(), &SendQueue::packetRetransmitted, this, &Connection::recordRetransmission);
QObject::connect(_sendQueue.get(), &SendQueue::queueInactive, this, &Connection::queueInactive);
// set defaults on the send queue from our congestion control object and estimatedTimeout()
_sendQueue->setPacketSendPeriod(_congestionControl->_packetSendPeriod); // set defaults on the send queue from our congestion control object and estimatedTimeout()
_sendQueue->setEstimatedTimeout(estimatedTimeout()); _sendQueue->setPacketSendPeriod(_congestionControl->_packetSendPeriod);
_sendQueue->setFlowWindowSize(std::min(_flowWindowSize, (int) _congestionControl->_congestionWindowSize)); _sendQueue->setEstimatedTimeout(estimatedTimeout());
_sendQueue->setFlowWindowSize(std::min(_flowWindowSize, (int) _congestionControl->_congestionWindowSize));
});
} }
return *_sendQueue; return *_sendQueue;

View file

@ -145,9 +145,10 @@ private:
PacketTimeWindow _receiveWindow { 16, 64 }; // Window of interval between packets (16) and probes (64) for bandwidth and receive speed PacketTimeWindow _receiveWindow { 16, 64 }; // Window of interval between packets (16) and probes (64) for bandwidth and receive speed
std::unique_ptr<CongestionControl> _congestionControl; std::unique_ptr<CongestionControl> _congestionControl;
std::unique_ptr<SendQueue> _sendQueue; std::unique_ptr<SendQueue> _sendQueue;
std::once_flag _sendQueueCreateFlag; // Guards the creation of SendQueue so it only happens once
std::map<MessageNumber, PendingReceivedMessage> _pendingReceivedMessages; std::map<MessageNumber, PendingReceivedMessage> _pendingReceivedMessages;
int _packetsSinceACK { 0 }; // The number of packets that have been received during the current ACK interval int _packetsSinceACK { 0 }; // The number of packets that have been received during the current ACK interval

View file

@ -143,7 +143,7 @@ qint64 Socket::writeDatagram(const QByteArray& datagram, const HifiSockAddr& soc
} }
Connection& Socket::findOrCreateConnection(const HifiSockAddr& sockAddr) { Connection& Socket::findOrCreateConnection(const HifiSockAddr& sockAddr) {
QMutexLocker locker(&_connectionsMutex); QWriteLocker locker(&_connectionsMutex);
auto it = _connectionsHash.find(sockAddr); auto it = _connectionsHash.find(sockAddr);
@ -157,12 +157,7 @@ Connection& Socket::findOrCreateConnection(const HifiSockAddr& sockAddr) {
} }
void Socket::clearConnections() { void Socket::clearConnections() {
if (thread() != QThread::currentThread()) { QWriteLocker locker(&_connectionsMutex);
QMetaObject::invokeMethod(this, "clearConnections", Qt::BlockingQueuedConnection);
return;
}
QMutexLocker locker(&_connectionsMutex);
// clear all of the current connections in the socket // clear all of the current connections in the socket
qDebug() << "Clearing all remaining connections in Socket."; qDebug() << "Clearing all remaining connections in Socket.";
@ -170,7 +165,7 @@ void Socket::clearConnections() {
} }
void Socket::cleanupConnection(HifiSockAddr sockAddr) { void Socket::cleanupConnection(HifiSockAddr sockAddr) {
QMutexLocker locker(&_connectionsMutex); QWriteLocker locker(&_connectionsMutex);
qCDebug(networking) << "Socket::cleanupConnection called for UDT connection to" << sockAddr; qCDebug(networking) << "Socket::cleanupConnection called for UDT connection to" << sockAddr;
_connectionsHash.erase(sockAddr); _connectionsHash.erase(sockAddr);
@ -249,6 +244,8 @@ void Socket::readPendingDatagrams() {
} }
void Socket::connectToSendSignal(const HifiSockAddr& destinationAddr, QObject* receiver, const char* slot) { void Socket::connectToSendSignal(const HifiSockAddr& destinationAddr, QObject* receiver, const char* slot) {
QReadLocker readLocker(&_connectionsMutex);
auto it = _connectionsHash.find(destinationAddr); auto it = _connectionsHash.find(destinationAddr);
if (it != _connectionsHash.end()) { if (it != _connectionsHash.end()) {
connect(it->second.get(), SIGNAL(packetSent()), receiver, slot); connect(it->second.get(), SIGNAL(packetSent()), receiver, slot);
@ -257,11 +254,15 @@ void Socket::connectToSendSignal(const HifiSockAddr& destinationAddr, QObject* r
void Socket::rateControlSync() { void Socket::rateControlSync() {
QReadLocker readLocker(&_connectionsMutex);
// enumerate our list of connections and ask each of them to send off periodic ACK packet for rate control // enumerate our list of connections and ask each of them to send off periodic ACK packet for rate control
for (auto& connection : _connectionsHash) { for (auto& connection : _connectionsHash) {
connection.second->sync(); connection.second->sync();
} }
readLocker.unlock();
if (_synTimer.interval() != _synInterval) { if (_synTimer.interval() != _synInterval) {
// if the _synTimer interval doesn't match the current _synInterval (changes when the CC factory is changed) // if the _synTimer interval doesn't match the current _synInterval (changes when the CC factory is changed)
// then restart it now with the right interval // then restart it now with the right interval
@ -278,9 +279,7 @@ void Socket::setCongestionControlFactory(std::unique_ptr<CongestionControlVirtua
} }
ConnectionStats::Stats Socket::sampleStatsForConnection(const HifiSockAddr& destination) { ConnectionStats::Stats Socket::sampleStatsForConnection(const HifiSockAddr& destination) {
Q_ASSERT_X(thread() == QThread::currentThread(), QReadLocker readLocker(&_connectionsMutex);
"Socket::sampleStatsForConnection",
"Stats sampling for connection must be on socket thread");
auto it = _connectionsHash.find(destination); auto it = _connectionsHash.find(destination);
if (it != _connectionsHash.end()) { if (it != _connectionsHash.end()) {
@ -291,8 +290,11 @@ ConnectionStats::Stats Socket::sampleStatsForConnection(const HifiSockAddr& dest
} }
std::vector<HifiSockAddr> Socket::getConnectionSockAddrs() { std::vector<HifiSockAddr> Socket::getConnectionSockAddrs() {
QReadLocker readLocker(&_connectionsMutex);
std::vector<HifiSockAddr> addr; std::vector<HifiSockAddr> addr;
addr.reserve(_connectionsHash.size()); addr.reserve(_connectionsHash.size());
for (const auto& connectionPair : _connectionsHash) { for (const auto& connectionPair : _connectionsHash) {
addr.push_back(connectionPair.first); addr.push_back(connectionPair.first);
} }

View file

@ -94,7 +94,7 @@ private:
std::unordered_map<HifiSockAddr, SequenceNumber> _unreliableSequenceNumbers; std::unordered_map<HifiSockAddr, SequenceNumber> _unreliableSequenceNumbers;
std::unordered_map<HifiSockAddr, std::unique_ptr<Connection>> _connectionsHash; std::unordered_map<HifiSockAddr, std::unique_ptr<Connection>> _connectionsHash;
QMutex _connectionsMutex; // guards concurrent access to connections hashs QReadWriteLock _connectionsMutex; // guards concurrent access to connections hashs
int _synInterval = 10; // 10ms int _synInterval = 10; // 10ms
QTimer _synTimer; QTimer _synTimer;