From 24520c5856dd7b2d3e89b41b330a652d45fd42cc Mon Sep 17 00:00:00 2001 From: Atlante45 Date: Tue, 28 Jul 2015 15:32:00 -0700 Subject: [PATCH] LNL send cleanup / Added connection hash --- ice-server/src/IceServer.cpp | 2 +- libraries/networking/src/LimitedNodeList.cpp | 74 +++++++++++--------- libraries/networking/src/LimitedNodeList.h | 10 +-- libraries/networking/src/udt/Connection.cpp | 18 +++-- libraries/networking/src/udt/Connection.h | 7 +- libraries/networking/src/udt/SendQueue.cpp | 4 +- libraries/networking/src/udt/SendQueue.h | 6 +- libraries/networking/src/udt/Socket.cpp | 28 ++++++-- libraries/networking/src/udt/Socket.h | 10 +-- 9 files changed, 97 insertions(+), 62 deletions(-) diff --git a/ice-server/src/IceServer.cpp b/ice-server/src/IceServer.cpp index 9ddb67677d..ec6f5ef825 100644 --- a/ice-server/src/IceServer.cpp +++ b/ice-server/src/IceServer.cpp @@ -145,7 +145,7 @@ void IceServer::sendPeerInformationPacket(const NetworkPeer& peer, const HifiSoc peerPacket->write(peer.toByteArray()); // write the current packet - _serverSocket.writeUnreliablePacket(*peerPacket, *destinationSockAddr); + _serverSocket.writePacket(*peerPacket, *destinationSockAddr); } void IceServer::clearInactivePeers() { diff --git a/libraries/networking/src/LimitedNodeList.cpp b/libraries/networking/src/LimitedNodeList.cpp index edfdd682e9..c705542bf5 100644 --- a/libraries/networking/src/LimitedNodeList.cpp +++ b/libraries/networking/src/LimitedNodeList.cpp @@ -244,18 +244,13 @@ bool LimitedNodeList::packetSourceAndHashMatch(const udt::Packet& packet) { return false; } -qint64 LimitedNodeList::writePacket(const NLPacket& packet, const Node& destinationNode) { - if (!destinationNode.getActiveSocket()) { - return 0; - } - - emit dataSent(destinationNode.getType(), packet.getDataSize()); - - return writePacket(packet, *destinationNode.getActiveSocket(), destinationNode.getConnectionSecret()); +void LimitedNodeList::collectPacketStats(const NLPacket& packet) { + // stat collection for packets + ++_numCollectedPackets; + _numCollectedBytes += packet.getDataSize(); } -qint64 LimitedNodeList::writePacket(const NLPacket& packet, const HifiSockAddr& destinationSockAddr, - const QUuid& connectionSecret) { +void LimitedNodeList::fillPacketHeader(const NLPacket& packet, const QUuid& connectionSecret) { if (!NON_SOURCED_PACKETS.contains(packet.getType())) { const_cast(packet).writeSourceID(getSessionUUID()); } @@ -265,55 +260,66 @@ qint64 LimitedNodeList::writePacket(const NLPacket& packet, const HifiSockAddr& && !NON_VERIFIED_PACKETS.contains(packet.getType())) { const_cast(packet).writeVerificationHashGivenSecret(connectionSecret); } - - emit dataSent(NodeType::Unassigned, packet.getDataSize()); - - return writePacketAndCollectStats(packet, destinationSockAddr); -} - -qint64 LimitedNodeList::writePacketAndCollectStats(const NLPacket& packet, const HifiSockAddr& destinationSockAddr) { - // XXX can BandwidthRecorder be used for this? - // stat collection for packets - ++_numCollectedPackets; - _numCollectedBytes += packet.getDataSize(); - - qint64 bytesWritten = _nodeSocket.writeUnreliablePacket(packet, destinationSockAddr); - - return bytesWritten; } qint64 LimitedNodeList::sendUnreliablePacket(const NLPacket& packet, const Node& destinationNode) { - return writePacket(packet, destinationNode); + if (!destinationNode.getActiveSocket()) { + return 0; + } + emit dataSent(destinationNode.getType(), packet.getDataSize()); + return sendUnreliablePacket(packet, *destinationNode.getActiveSocket(), destinationNode.getConnectionSecret()); } qint64 LimitedNodeList::sendUnreliablePacket(const NLPacket& packet, const HifiSockAddr& sockAddr, const QUuid& connectionSecret) { - return writePacket(packet, sockAddr, connectionSecret); + Q_ASSERT_X(!packet.isReliable(), "LimitedNodeList::sendUnreliablePacket", + "Trying to send a reliable packet unreliably."); + + collectPacketStats(packet); + fillPacketHeader(packet, connectionSecret); + + return _nodeSocket.writePacket(packet, sockAddr); } qint64 LimitedNodeList::sendPacket(std::unique_ptr packet, const Node& destinationNode) { - // Keep unique_ptr alive during write - auto result = writePacket(*packet, destinationNode); - return result; + if (!destinationNode.getActiveSocket()) { + return 0; + } + emit dataSent(destinationNode.getType(), packet->getDataSize()); + return sendPacket(std::move(packet), *destinationNode.getActiveSocket(), destinationNode.getConnectionSecret()); } qint64 LimitedNodeList::sendPacket(std::unique_ptr packet, const HifiSockAddr& sockAddr, const QUuid& connectionSecret) { - // Keep unique_ptr alive during write - auto result = writePacket(*packet, sockAddr, connectionSecret); - return result; + if (packet->isReliable()) { + collectPacketStats(*packet); + fillPacketHeader(*packet, connectionSecret); + + auto size = packet->getDataSize(); + _nodeSocket.writePacket(std::move(packet), sockAddr); + + return size; + } else { + return sendUnreliablePacket(*packet, sockAddr, connectionSecret); + } } qint64 LimitedNodeList::sendPacketList(NLPacketList& packetList, const Node& destinationNode) { + auto activeSocket = destinationNode.getActiveSocket(); + if (!activeSocket) { + return 0; + } qint64 bytesSent = 0; + auto connectionSecret = destinationNode.getConnectionSecret(); // close the last packet in the list packetList.closeCurrentPacket(); while (!packetList._packets.empty()) { - bytesSent += sendPacket(packetList.takeFront(), destinationNode); + bytesSent += sendPacket(packetList.takeFront(), *activeSocket, connectionSecret); } + emit dataSent(destinationNode.getType(), bytesSent); return bytesSent; } diff --git a/libraries/networking/src/LimitedNodeList.h b/libraries/networking/src/LimitedNodeList.h index e09617b88f..9c3ff058fc 100644 --- a/libraries/networking/src/LimitedNodeList.h +++ b/libraries/networking/src/LimitedNodeList.h @@ -247,11 +247,13 @@ protected: LimitedNodeList(LimitedNodeList const&); // Don't implement, needed to avoid copies of singleton void operator=(LimitedNodeList const&); // Don't implement, needed to avoid copies of singleton - qint64 writePacket(const NLPacket& packet, const Node& destinationNode); + qint64 sendPacket(std::unique_ptr packet, const Node& destinationNode, + const HifiSockAddr& overridenSockAddr); qint64 writePacket(const NLPacket& packet, const HifiSockAddr& destinationSockAddr, const QUuid& connectionSecret = QUuid()); - qint64 writePacketAndCollectStats(const NLPacket& packet, const HifiSockAddr& destinationSockAddr); - + void collectPacketStats(const NLPacket& packet); + void fillPacketHeader(const NLPacket& packet, const QUuid& connectionSecret); + bool isPacketVerified(const udt::Packet& packet); bool packetVersionMatch(const udt::Packet& packet); bool packetSourceAndHashMatch(const udt::Packet& packet); @@ -264,8 +266,6 @@ protected: void sendPacketToIceServer(PacketType packetType, const HifiSockAddr& iceServerSockAddr, const QUuid& clientID, const QUuid& peerRequestID = QUuid()); - qint64 sendPacket(std::unique_ptr packet, const Node& destinationNode, - const HifiSockAddr& overridenSockAddr); QUuid _sessionUUID; diff --git a/libraries/networking/src/udt/Connection.cpp b/libraries/networking/src/udt/Connection.cpp index 8e72a53c69..a46af2bbde 100644 --- a/libraries/networking/src/udt/Connection.cpp +++ b/libraries/networking/src/udt/Connection.cpp @@ -20,14 +20,21 @@ using namespace udt; using namespace std; using namespace std::chrono; -Connection::Connection(Socket* parentSocket, HifiSockAddr destination) { - +Connection::Connection(Socket* parentSocket, HifiSockAddr destination) : + _parentSocket(parentSocket), + _destination(destination) +{ } -void Connection::send(unique_ptr packet) { - if (_sendQueue) { - _sendQueue->queuePacket(move(packet)); +void Connection::sendReliablePacket(unique_ptr packet) { + Q_ASSERT_X(packet->isReliable(), "Connection::send", "Trying to send an unreliable packet reliably."); + + if (!_sendQueue) { + // Lasily create send queue + _sendQueue = SendQueue::create(_parentSocket, _destination); } + + _sendQueue->queuePacket(move(packet)); } void Connection::sendACK(bool wasCausedBySyncTimeout) { @@ -163,7 +170,6 @@ void Connection::processControl(unique_ptr controlPacket) { processACK(move(controlPacket)); } break; - } case ControlPacket::ACK2: processACK2(move(controlPacket)); break; diff --git a/libraries/networking/src/udt/Connection.h b/libraries/networking/src/udt/Connection.h index 5bf15f3031..bd3e6b82e4 100644 --- a/libraries/networking/src/udt/Connection.h +++ b/libraries/networking/src/udt/Connection.h @@ -17,8 +17,7 @@ #include "LossList.h" #include "SendQueue.h" - -class HifiSockAddr; +#include "../HifiSockAddr.h" namespace udt { @@ -34,7 +33,7 @@ public: Connection(Socket* parentSocket, HifiSockAddr destination); - void send(std::unique_ptr packet); + void sendReliablePacket(std::unique_ptr packet); void sendACK(bool wasCausedBySyncTimeout = true); void sendLightACK() const; @@ -74,6 +73,8 @@ private: SentACKMap _sentACKs; // Map of ACK sub-sequence numbers to ACKed sequence number and sent time + Socket* _parentSocket { nullptr }; + HifiSockAddr _destination; std::unique_ptr _sendQueue; }; diff --git a/libraries/networking/src/udt/SendQueue.cpp b/libraries/networking/src/udt/SendQueue.cpp index b69b643ec8..ef37bf7f9a 100644 --- a/libraries/networking/src/udt/SendQueue.cpp +++ b/libraries/networking/src/udt/SendQueue.cpp @@ -80,9 +80,9 @@ void SendQueue::stop() { _running = false; } -void SendQueue::sendPacket(const Packet& packet) { +void SendQueue::sendPacket(const BasePacket& packet) { if (_socket) { - _socket->writePacket(packet, _destination); + _socket->writeDatagram(packet.getData(), packet.getDataSize(), _destination); } } diff --git a/libraries/networking/src/udt/SendQueue.h b/libraries/networking/src/udt/SendQueue.h index 9e9c5cc024..1c30d1580a 100644 --- a/libraries/networking/src/udt/SendQueue.h +++ b/libraries/networking/src/udt/SendQueue.h @@ -47,6 +47,9 @@ public: int getPacketSendPeriod() const { return _packetSendPeriod; } void setPacketSendPeriod(int newPeriod) { _packetSendPeriod = newPeriod; } + // Send a packet through the socket + void sendPacket(const BasePacket& packet); + public slots: void start(); void stop(); @@ -67,9 +70,6 @@ private: // Increments current sequence number and return it SequenceNumber getNextSequenceNumber(); - - // Send a packet through the socket - void sendPacket(const Packet& packet); mutable QReadWriteLock _packetsLock; // Protects the packets to be sent list. std::list> _packets; // List of packets to be sent diff --git a/libraries/networking/src/udt/Socket.cpp b/libraries/networking/src/udt/Socket.cpp index 6923879201..a5719da9a4 100644 --- a/libraries/networking/src/udt/Socket.cpp +++ b/libraries/networking/src/udt/Socket.cpp @@ -14,6 +14,7 @@ #include #include "../NetworkLogging.h" +#include "ControlPacket.h" #include "Packet.h" using namespace udt; @@ -66,17 +67,36 @@ void Socket::setBufferSizes(int numBytes) { } } -qint64 Socket::writeUnreliablePacket(const BasePacket& packet, const HifiSockAddr& sockAddr) { +qint64 Socket::writePacket(const Packet& packet, const HifiSockAddr& sockAddr) { + Q_ASSERT_X(!packet.isReliable(), "Socket::writePacket", "Cannot send a reliable packet unreliably"); + + // TODO: write the correct sequence number to the Packet here + // const_cast(packet).writeSequenceNumber(sequenceNumber); + return writeDatagram(packet.getData(), packet.getDataSize(), sockAddr); } +qint64 Socket::writePacket(std::unique_ptr packet, const HifiSockAddr& sockAddr) { + if (packet->isReliable()) { + auto it = _connectionsHash.find(sockAddr); + if (it == _connectionsHash.end()) { + it = _connectionsHash.insert(it, std::make_pair(sockAddr, new Connection(this, sockAddr))); + } + it->second->sendReliablePacket(std::move(packet)); + return 0; + } + + return writePacket(*packet, sockAddr); +} + +qint64 Socket::writeDatagram(const char* data, qint64 size, const HifiSockAddr& sockAddr) { + return writeDatagram(QByteArray::fromRawData(data, size), sockAddr); +} + qint64 Socket::writeDatagram(const QByteArray& datagram, const HifiSockAddr& sockAddr) { qint64 bytesWritten = _udpSocket.writeDatagram(datagram, sockAddr.getAddress(), sockAddr.getPort()); - // TODO: write the correct sequence number to the Packet here - // const_cast(packet).writeSequenceNumber(sequenceNumber); - if (bytesWritten < 0) { qCDebug(networking) << "ERROR in writeDatagram:" << _udpSocket.error() << "-" << _udpSocket.errorString(); } diff --git a/libraries/networking/src/udt/Socket.h b/libraries/networking/src/udt/Socket.h index 5b8ebb5d0f..24fa8c8404 100644 --- a/libraries/networking/src/udt/Socket.h +++ b/libraries/networking/src/udt/Socket.h @@ -22,6 +22,7 @@ #include #include "../HifiSockAddr.h" +#include "Connection.h" namespace udt { @@ -42,10 +43,10 @@ public: quint16 localPort() const { return _udpSocket.localPort(); } - qint64 writeUnreliablePacket(const BasePacket& packet, const HifiSockAddr& sockAddr); - - qint64 writeDatagram(const char* data, qint64 size, const HifiSockAddr& sockAddr) - { return writeDatagram(QByteArray::fromRawData(data, size), sockAddr); } + // Simple functions writing to the socket with no processing + qint64 writePacket(const Packet& packet, const HifiSockAddr& sockAddr); + qint64 writePacket(std::unique_ptr packet, const HifiSockAddr& sockAddr); + qint64 writeDatagram(const char* data, qint64 size, const HifiSockAddr& sockAddr); qint64 writeDatagram(const QByteArray& datagram, const HifiSockAddr& sockAddr); void bind(const QHostAddress& address, quint16 port = 0) { _udpSocket.bind(address, port); } @@ -71,6 +72,7 @@ private: std::unordered_map _unfilteredHandlers; std::unordered_map _packetSequenceNumbers; + std::unordered_map _connectionsHash; int32_t _synInterval = 10; // 10ms QTimer _synTimer;