replace mutexes with invoked methods in Socket

This commit is contained in:
Stephen Birarda 2015-08-28 16:34:24 -07:00
parent 20d1244db4
commit 42105dfc33
5 changed files with 68 additions and 45 deletions

View file

@ -513,6 +513,7 @@ void AvatarMixer::run() {
qDebug() << "Waiting for domain settings from domain-server.";
// block until we get the settingsRequestComplete signal
QEventLoop loop;
connect(&domainHandler, &DomainHandler::settingsReceived, &loop, &QEventLoop::quit);
connect(&domainHandler, &DomainHandler::settingsReceiveFail, &loop, &QEventLoop::quit);

View file

@ -77,24 +77,20 @@ void Connection::resetRTT() {
SendQueue& Connection::getSendQueue() {
if (!_sendQueue) {
std::unique_lock<std::mutex> locker { _sendQueueMutex };
// Lasily create send queue
_sendQueue = SendQueue::create(_parentSocket, _destination);
if (!_sendQueue) {
// Lasily create send queue
_sendQueue = SendQueue::create(_parentSocket, _destination);
qDebug() << "Created SendQueue for connection to" << _destination;
QObject::connect(_sendQueue.get(), &SendQueue::packetSent, this, &Connection::packetSent);
QObject::connect(_sendQueue.get(), &SendQueue::packetSent, this, &Connection::recordSentPackets);
QObject::connect(_sendQueue.get(), &SendQueue::packetRetransmitted, this, &Connection::recordRetransmission);
QObject::connect(_sendQueue.get(), &SendQueue::queueInactive, this, &Connection::queueInactive);
// set defaults on the send queue from our congestion control object and estimatedTimeout()
_sendQueue->setPacketSendPeriod(_congestionControl->_packetSendPeriod);
_sendQueue->setEstimatedTimeout(estimatedTimeout());
_sendQueue->setFlowWindowSize(std::min(_flowWindowSize, (int) _congestionControl->_congestionWindowSize));
}
qDebug() << "Created SendQueue for connection to" << _destination;
QObject::connect(_sendQueue.get(), &SendQueue::packetSent, this, &Connection::packetSent);
QObject::connect(_sendQueue.get(), &SendQueue::packetSent, this, &Connection::recordSentPackets);
QObject::connect(_sendQueue.get(), &SendQueue::packetRetransmitted, this, &Connection::recordRetransmission);
QObject::connect(_sendQueue.get(), &SendQueue::queueInactive, this, &Connection::queueInactive);
// set defaults on the send queue from our congestion control object and estimatedTimeout()
_sendQueue->setPacketSendPeriod(_congestionControl->_packetSendPeriod);
_sendQueue->setEstimatedTimeout(estimatedTimeout());
_sendQueue->setFlowWindowSize(std::min(_flowWindowSize, (int) _congestionControl->_congestionWindowSize));
}
return *_sendQueue;

View file

@ -149,7 +149,6 @@ private:
std::unique_ptr<CongestionControl> _congestionControl;
std::unique_ptr<SendQueue> _sendQueue;
std::mutex _sendQueueMutex; // Guards the creation of SendQueue so it only happens once
std::map<MessageNumber, PendingReceivedMessage> _pendingReceivedMessages;

View file

@ -24,9 +24,15 @@
using namespace udt;
Q_DECLARE_METATYPE(Packet*);
Q_DECLARE_METATYPE(PacketList*);
Socket::Socket(QObject* parent) :
QObject(parent)
{
qRegisterMetaType<Packet*>();
qRegisterMetaType<PacketList*>();
connect(&_udpSocket, &QUdpSocket::readyRead, this, &Socket::readPendingDatagrams);
// make sure our synchronization method is called every SYN interval
@ -97,8 +103,20 @@ qint64 Socket::writePacket(const Packet& packet, const HifiSockAddr& sockAddr) {
}
qint64 Socket::writePacket(std::unique_ptr<Packet> packet, const HifiSockAddr& sockAddr) {
if (packet->isReliable()) {
findOrCreateConnection(sockAddr).sendReliablePacket(move(packet));
// hand this packet off to writeReliablePacket
// because Qt can't invoke with the unique_ptr we have to release it here and re-construct in writeReliablePacket
if (QThread::currentThread() != thread()) {
qDebug() << "About to invoke with" << packet.get();
QMetaObject::invokeMethod(this, "writeReliablePacket", Qt::QueuedConnection,
Q_ARG(Packet*, packet.release()),
Q_ARG(HifiSockAddr, sockAddr));
} else {
writeReliablePacket(packet.release(), sockAddr);
}
return 0;
}
@ -107,9 +125,17 @@ qint64 Socket::writePacket(std::unique_ptr<Packet> packet, const HifiSockAddr& s
qint64 Socket::writePacketList(std::unique_ptr<PacketList> packetList, const HifiSockAddr& sockAddr) {
if (packetList->isReliable()) {
// Reliable and Ordered
// Reliable and Unordered
findOrCreateConnection(sockAddr).sendReliablePacketList(move(packetList));
// hand this packetList off to writeReliablePacketList
// because Qt can't invoke with the unique_ptr we have to release it here and re-construct in writeReliablePacketList
if (QThread::currentThread() != thread()) {
QMetaObject::invokeMethod(this, "writeReliablePacketList", Qt::QueuedConnection,
Q_ARG(PacketList*, packetList.release()),
Q_ARG(HifiSockAddr, sockAddr));
} else {
writeReliablePacketList(packetList.release(), sockAddr);
}
return 0;
}
@ -122,6 +148,14 @@ qint64 Socket::writePacketList(std::unique_ptr<PacketList> packetList, const Hif
return totalBytesSent;
}
void Socket::writeReliablePacket(Packet* packet, const HifiSockAddr& sockAddr) {
findOrCreateConnection(sockAddr).sendReliablePacket(std::unique_ptr<Packet>(packet));
}
void Socket::writeReliablePacketList(PacketList* packetList, const HifiSockAddr& sockAddr) {
findOrCreateConnection(sockAddr).sendReliablePacketList(std::unique_ptr<PacketList>(packetList));
}
qint64 Socket::writeDatagram(const char* data, qint64 size, const HifiSockAddr& sockAddr) {
return writeDatagram(QByteArray::fromRawData(data, size), sockAddr);
}
@ -143,8 +177,6 @@ qint64 Socket::writeDatagram(const QByteArray& datagram, const HifiSockAddr& soc
}
Connection& Socket::findOrCreateConnection(const HifiSockAddr& sockAddr) {
QWriteLocker locker(&_connectionsMutex);
auto it = _connectionsHash.find(sockAddr);
if (it == _connectionsHash.end()) {
@ -157,7 +189,10 @@ Connection& Socket::findOrCreateConnection(const HifiSockAddr& sockAddr) {
}
void Socket::clearConnections() {
QWriteLocker locker(&_connectionsMutex);
if (QThread::currentThread() != thread()) {
QMetaObject::invokeMethod(this, "clearConnections", Qt::BlockingQueuedConnection);
return;
}
// clear all of the current connections in the socket
qDebug() << "Clearing all remaining connections in Socket.";
@ -165,8 +200,6 @@ void Socket::clearConnections() {
}
void Socket::cleanupConnection(HifiSockAddr sockAddr) {
QWriteLocker locker(&_connectionsMutex);
qCDebug(networking) << "Socket::cleanupConnection called for UDT connection to" << sockAddr;
_connectionsHash.erase(sockAddr);
}
@ -244,8 +277,6 @@ void Socket::readPendingDatagrams() {
}
void Socket::connectToSendSignal(const HifiSockAddr& destinationAddr, QObject* receiver, const char* slot) {
QReadLocker readLocker(&_connectionsMutex);
auto it = _connectionsHash.find(destinationAddr);
if (it != _connectionsHash.end()) {
connect(it->second.get(), SIGNAL(packetSent()), receiver, slot);
@ -254,15 +285,11 @@ void Socket::connectToSendSignal(const HifiSockAddr& destinationAddr, QObject* r
void Socket::rateControlSync() {
QWriteLocker writeLocker(&_connectionsMutex);
// enumerate our list of connections and ask each of them to send off periodic ACK packet for rate control
for (auto& connection : _connectionsHash) {
connection.second->sync();
}
writeLocker.unlock();
if (_synTimer.interval() != _synInterval) {
// if the _synTimer interval doesn't match the current _synInterval (changes when the CC factory is changed)
// then restart it now with the right interval
@ -279,8 +306,6 @@ void Socket::setCongestionControlFactory(std::unique_ptr<CongestionControlVirtua
}
ConnectionStats::Stats Socket::sampleStatsForConnection(const HifiSockAddr& destination) {
QReadLocker readLocker(&_connectionsMutex);
auto it = _connectionsHash.find(destination);
if (it != _connectionsHash.end()) {
return it->second->sampleStats();
@ -289,9 +314,7 @@ ConnectionStats::Stats Socket::sampleStatsForConnection(const HifiSockAddr& dest
}
}
std::vector<HifiSockAddr> Socket::getConnectionSockAddrs() {
QReadLocker readLocker(&_connectionsMutex);
std::vector<HifiSockAddr> Socket::getConnectionSockAddrs() {
std::vector<HifiSockAddr> addr;
addr.reserve(_connectionsHash.size());

View file

@ -32,6 +32,7 @@ class ControlSender;
class Packet;
class PacketList;
class SequenceNumber;
class UDTTest;
using PacketFilterOperator = std::function<bool(const Packet&)>;
@ -65,13 +66,8 @@ public:
{ _unfilteredHandlers[senderSockAddr] = handler; }
void setCongestionControlFactory(std::unique_ptr<CongestionControlVirtualFactory> ccFactory);
void connectToSendSignal(const HifiSockAddr& destinationAddr, QObject* receiver, const char* slot);
void messageReceived(std::unique_ptr<PacketList> packetList);
ConnectionStats::Stats sampleStatsForConnection(const HifiSockAddr& destination);
std::vector<HifiSockAddr> getConnectionSockAddrs();
public slots:
void cleanupConnection(HifiSockAddr sockAddr);
@ -84,6 +80,14 @@ private slots:
private:
void setSystemBufferSizes();
Connection& findOrCreateConnection(const HifiSockAddr& sockAddr);
// privatized methods used by UDTTest - they are private since they must be called on the Socket thread
ConnectionStats::Stats sampleStatsForConnection(const HifiSockAddr& destination);
std::vector<HifiSockAddr> getConnectionSockAddrs();
void connectToSendSignal(const HifiSockAddr& destinationAddr, QObject* receiver, const char* slot);
Q_INVOKABLE void writeReliablePacket(Packet* packet, const HifiSockAddr& sockAddr);
Q_INVOKABLE void writeReliablePacketList(PacketList* packetList, const HifiSockAddr& sockAddr);
QUdpSocket _udpSocket { this };
PacketFilterOperator _packetFilterOperator;
@ -94,12 +98,12 @@ private:
std::unordered_map<HifiSockAddr, SequenceNumber> _unreliableSequenceNumbers;
std::unordered_map<HifiSockAddr, std::unique_ptr<Connection>> _connectionsHash;
QReadWriteLock _connectionsMutex { QReadWriteLock::Recursive }; // guards concurrent access to connections hashs
int _synInterval = 10; // 10ms
QTimer _synTimer;
std::unique_ptr<CongestionControlVirtualFactory> _ccFactory { new CongestionControlFactory<DefaultCC>() };
friend class UDTTest;
};
} // namespace udt