diff --git a/libraries/networking/src/udt/Socket.cpp b/libraries/networking/src/udt/Socket.cpp index 44220df8f8..6de43219e5 100644 --- a/libraries/networking/src/udt/Socket.cpp +++ b/libraries/networking/src/udt/Socket.cpp @@ -37,7 +37,6 @@ Socket::Socket(QObject* parent, bool shouldChangeSocketOptions) : _shouldChangeSocketOptions(shouldChangeSocketOptions) { connect(&_udpSocket, &QUdpSocket::readyRead, this, &Socket::readPendingDatagrams); - connect(this, &Socket::pendingDatagrams, this, &Socket::processPendingDatagrams, Qt::QueuedConnection); // make sure we hear about errors and state changes from the underlying socket connect(&_udpSocket, SIGNAL(error(QAbstractSocket::SocketError)), @@ -316,85 +315,64 @@ void Socket::checkForReadyReadBackup() { } void Socket::readPendingDatagrams() { - int packetsRead = 0; - + using namespace std::chrono; + static const auto MAX_PROCESS_TIME { 100ms }; + const auto abortTime = system_clock::now() + MAX_PROCESS_TIME; int packetSizeWithHeader = -1; - // Max datagrams to read before processing: - static const int MAX_DATAGRAMS_CONSECUTIVELY = 10000; - while (_udpSocket.hasPendingDatagrams() - && (packetSizeWithHeader = _udpSocket.pendingDatagramSize()) != -1 - && packetsRead <= MAX_DATAGRAMS_CONSECUTIVELY) { - // grab a time point we can mark as the receive time of this packet - auto receiveTime = p_high_resolution_clock::now(); - - // setup a buffer to read the packet into - auto buffer = std::unique_ptr(new char[packetSizeWithHeader]); - - QHostAddress senderAddress; - quint16 senderPort; - - // pull the datagram - auto sizeRead = _udpSocket.readDatagram(buffer.get(), packetSizeWithHeader, - &senderAddress, &senderPort); - - // we either didn't pull anything for this packet or there was an error reading (this seems to trigger - // on windows even if there's not a packet available) - if (sizeRead < 0) { - continue; + while (_udpSocket.hasPendingDatagrams() && + (packetSizeWithHeader = _udpSocket.pendingDatagramSize()) != -1) { + if (system_clock::now() > abortTime) { + // We've been running for too long, stop processing packets for now + // Once we've processed the event queue, we'll come back to packet processing + break; } - _incomingDatagrams.push_back({ senderAddress, senderPort, packetSizeWithHeader, - std::move(buffer), receiveTime }); - ++packetsRead; - - } - - if (packetsRead > _maxDatagramsRead) { - _maxDatagramsRead = packetsRead; - qCDebug(networking) << "readPendingDatagrams: Datagrams read:" << packetsRead; - } - emit pendingDatagrams(packetsRead); -} - -void Socket::processPendingDatagrams(int) { - // setup a HifiSockAddr to read into - HifiSockAddr senderSockAddr; - - while (!_incomingDatagrams.empty()) { - auto& datagram = _incomingDatagrams.front(); - senderSockAddr.setAddress(datagram._senderAddress); - senderSockAddr.setPort(datagram._senderPort); - int datagramSize = datagram._datagramLength; - auto receiveTime = datagram._receiveTime; - // we're reading a packet so re-start the readyRead backup timer _readyReadBackupTimer->start(); + // grab a time point we can mark as the receive time of this packet + auto receiveTime = p_high_resolution_clock::now(); + + // setup a HifiSockAddr to read into + HifiSockAddr senderSockAddr; + + // setup a buffer to read the packet into + auto buffer = std::unique_ptr(new char[packetSizeWithHeader]); + + // pull the datagram + auto sizeRead = _udpSocket.readDatagram(buffer.get(), packetSizeWithHeader, + senderSockAddr.getAddressPointer(), senderSockAddr.getPortPointer()); + // save information for this packet, in case it is the one that sticks readyRead - _lastPacketSizeRead = datagramSize; + _lastPacketSizeRead = sizeRead; _lastPacketSockAddr = senderSockAddr; - // Process unfiltered packets first. + if (sizeRead <= 0) { + // we either didn't pull anything for this packet or there was an error reading (this seems to trigger + // on windows even if there's not a packet available) + continue; + } + auto it = _unfilteredHandlers.find(senderSockAddr); + if (it != _unfilteredHandlers.end()) { - // we have a registered unfiltered handler for this HifiSockAddr (eg. STUN packet) - call that and return + // we have a registered unfiltered handler for this HifiSockAddr - call that and return if (it->second) { - auto basePacket = BasePacket::fromReceivedPacket(std::move(datagram._datagram), - datagramSize, senderSockAddr); + auto basePacket = BasePacket::fromReceivedPacket(std::move(buffer), packetSizeWithHeader, senderSockAddr); basePacket->setReceiveTime(receiveTime); it->second(std::move(basePacket)); } - _incomingDatagrams.pop_front(); + continue; } // check if this was a control packet or a data packet - bool isControlPacket = *reinterpret_cast(datagram._datagram.get()) & CONTROL_BIT_MASK; + bool isControlPacket = *reinterpret_cast(buffer.get()) & CONTROL_BIT_MASK; if (isControlPacket) { // setup a control packet from the data we just read - auto controlPacket = ControlPacket::fromReceivedPacket(std::move(datagram._datagram), datagramSize, senderSockAddr); + auto controlPacket = ControlPacket::fromReceivedPacket(std::move(buffer), packetSizeWithHeader, senderSockAddr); controlPacket->setReceiveTime(receiveTime); // move this control packet to the matching connection, if there is one @@ -406,13 +384,13 @@ void Socket::processPendingDatagrams(int) { } else { // setup a Packet from the data we just read - auto packet = Packet::fromReceivedPacket(std::move(datagram._datagram), datagramSize, senderSockAddr); + auto packet = Packet::fromReceivedPacket(std::move(buffer), packetSizeWithHeader, senderSockAddr); packet->setReceiveTime(receiveTime); // save the sequence number in case this is the packet that sticks readyRead _lastReceivedSequenceNumber = packet->getSequenceNumber(); - // call our hash verification operator to see if this packet is verified + // call our verification operator to see if this packet is verified if (!_packetFilterOperator || _packetFilterOperator(*packet)) { if (packet->isReliable()) { // if this was a reliable packet then signal the matching connection with the sequence number @@ -426,7 +404,6 @@ void Socket::processPendingDatagrams(int) { qCDebug(networking) << "Can't process packet: version" << (unsigned int)NLPacket::versionInHeader(*packet) << ", type" << NLPacket::typeInHeader(*packet); #endif - _incomingDatagrams.pop_front(); continue; } } @@ -442,8 +419,6 @@ void Socket::processPendingDatagrams(int) { } } } - - _incomingDatagrams.pop_front(); } } diff --git a/libraries/networking/src/udt/Socket.h b/libraries/networking/src/udt/Socket.h index ef4a457116..30058e1d23 100644 --- a/libraries/networking/src/udt/Socket.h +++ b/libraries/networking/src/udt/Socket.h @@ -95,7 +95,6 @@ public: signals: void clientHandshakeRequestComplete(const HifiSockAddr& sockAddr); - void pendingDatagrams(int datagramCount); public slots: void cleanupConnection(HifiSockAddr sockAddr); @@ -103,7 +102,6 @@ public slots: private slots: void readPendingDatagrams(); - void processPendingDatagrams(int datagramCount); void checkForReadyReadBackup(); void handleSocketError(QAbstractSocket::SocketError socketError); @@ -147,17 +145,6 @@ private: int _lastPacketSizeRead { 0 }; SequenceNumber _lastReceivedSequenceNumber; HifiSockAddr _lastPacketSockAddr; - - struct Datagram { - QHostAddress _senderAddress; - int _senderPort; - int _datagramLength; - std::unique_ptr _datagram; - p_high_resolution_clock::time_point _receiveTime; - }; - - std::list _incomingDatagrams; - int _maxDatagramsRead { 0 }; friend UDTTest; };