From 4604bc5a3a3e02fd5398bd2351d86397e13fdc69 Mon Sep 17 00:00:00 2001 From: Stephen Birarda Date: Tue, 21 Jul 2015 12:27:27 -0700 Subject: [PATCH] leverage udt::Socket in ice-server --- ice-server/src/IceServer.cpp | 95 +++++++++----------- ice-server/src/IceServer.h | 7 +- libraries/networking/src/LimitedNodeList.cpp | 10 +-- libraries/networking/src/LimitedNodeList.h | 2 +- libraries/networking/src/udt/Socket.cpp | 4 + libraries/networking/src/udt/Socket.h | 2 + 6 files changed, 57 insertions(+), 63 deletions(-) diff --git a/ice-server/src/IceServer.cpp b/ice-server/src/IceServer.cpp index 1d38802f4e..fb7433aa10 100644 --- a/ice-server/src/IceServer.cpp +++ b/ice-server/src/IceServer.cpp @@ -34,8 +34,9 @@ IceServer::IceServer(int argc, char* argv[]) : qDebug() << "monitoring http endpoint is listening on " << ICE_SERVER_MONITORING_PORT; _serverSocket.bind(QHostAddress::AnyIPv4, ICE_SERVER_DEFAULT_PORT); - // call our process datagrams slot when the UDP socket has packets ready - connect(&_serverSocket, &QUdpSocket::readyRead, this, &IceServer::processDatagrams); + // set processPacket as the verified packet callback for the udt::Socket + using std::placeholders::_1; + _serverSocket.setVerifiedPacketCallback(std::bind(&IceServer::processPacket, this, _1)); // setup our timer to clear inactive peers QTimer* inactivePeerTimer = new QTimer(this); @@ -44,58 +45,45 @@ IceServer::IceServer(int argc, char* argv[]) : } -void IceServer::processDatagrams() { - HifiSockAddr sendingSockAddr; - - while (_serverSocket.hasPendingDatagrams()) { - // setup a buffer to read the packet into - int packetSizeWithHeader = _serverSocket.pendingDatagramSize(); - std::unique_ptr buffer = std::unique_ptr(new char[packetSizeWithHeader]); - - _serverSocket.readDatagram(buffer.get(), packetSizeWithHeader, - sendingSockAddr.getAddressPointer(), sendingSockAddr.getPortPointer()); +void IceServer::processPacket(std::unique_ptr packet) { + PacketType::Value packetType = packet->getType(); + + if (packetType == PacketType::ICEServerHeartbeat) { + SharedNetworkPeer peer = addOrUpdateHeartbeatingPeer(*packet); - auto packet = udt::Packet::fromReceivedPacket(std::move(buffer), packetSizeWithHeader, sendingSockAddr); - - PacketType::Value packetType = packet->getType(); - - if (packetType == PacketType::ICEServerHeartbeat) { - SharedNetworkPeer peer = addOrUpdateHeartbeatingPeer(*packet); - - // so that we can send packets to the heartbeating peer when we need, we need to activate a socket now - peer->activateMatchingOrNewSymmetricSocket(sendingSockAddr); - } else if (packetType == PacketType::ICEServerQuery) { - QDataStream heartbeatStream(packet.get()); - - // this is a node hoping to connect to a heartbeating peer - do we have the heartbeating peer? - QUuid senderUUID; - heartbeatStream >> senderUUID; - - // pull the public and private sock addrs for this peer - HifiSockAddr publicSocket, localSocket; - heartbeatStream >> publicSocket >> localSocket; - - // check if this node also included a UUID that they would like to connect to - QUuid connectRequestID; - heartbeatStream >> connectRequestID; + // so that we can send packets to the heartbeating peer when we need, we need to activate a socket now + peer->activateMatchingOrNewSymmetricSocket(packet->getSenderSockAddr()); + } else if (packetType == PacketType::ICEServerQuery) { + QDataStream heartbeatStream(packet.get()); + + // this is a node hoping to connect to a heartbeating peer - do we have the heartbeating peer? + QUuid senderUUID; + heartbeatStream >> senderUUID; + + // pull the public and private sock addrs for this peer + HifiSockAddr publicSocket, localSocket; + heartbeatStream >> publicSocket >> localSocket; + + // check if this node also included a UUID that they would like to connect to + QUuid connectRequestID; + heartbeatStream >> connectRequestID; + + SharedNetworkPeer matchingPeer = _activePeers.value(connectRequestID); + + if (matchingPeer) { - SharedNetworkPeer matchingPeer = _activePeers.value(connectRequestID); - - if (matchingPeer) { - - qDebug() << "Sending information for peer" << connectRequestID << "to peer" << senderUUID; - - // we have the peer they want to connect to - send them pack the information for that peer - sendPeerInformationPacket(*(matchingPeer.data()), &sendingSockAddr); - - // we also need to send them to the active peer they are hoping to connect to - // create a dummy peer object we can pass to sendPeerInformationPacket - - NetworkPeer dummyPeer(senderUUID, publicSocket, localSocket); - sendPeerInformationPacket(dummyPeer, matchingPeer->getActiveSocket()); - } else { - qDebug() << "Peer" << senderUUID << "asked for" << connectRequestID << "but no matching peer found"; - } + qDebug() << "Sending information for peer" << connectRequestID << "to peer" << senderUUID; + + // we have the peer they want to connect to - send them pack the information for that peer + sendPeerInformationPacket(*(matchingPeer.data()), &packet->getSenderSockAddr()); + + // we also need to send them to the active peer they are hoping to connect to + // create a dummy peer object we can pass to sendPeerInformationPacket + + NetworkPeer dummyPeer(senderUUID, publicSocket, localSocket); + sendPeerInformationPacket(dummyPeer, matchingPeer->getActiveSocket()); + } else { + qDebug() << "Peer" << senderUUID << "asked for" << connectRequestID << "but no matching peer found"; } } } @@ -139,8 +127,7 @@ void IceServer::sendPeerInformationPacket(const NetworkPeer& peer, const HifiSoc peerPacket->write(peer.toByteArray()); // write the current packet - _serverSocket.writeDatagram(peerPacket->getData(), peerPacket->getDataSize(), - destinationSockAddr->getAddress(), destinationSockAddr->getPort()); + _serverSocket.writeUnreliablePacket(*peerPacket, *destinationSockAddr); } void IceServer::clearInactivePeers() { diff --git a/ice-server/src/IceServer.h b/ice-server/src/IceServer.h index e91dc4e064..092ece6c3f 100644 --- a/ice-server/src/IceServer.h +++ b/ice-server/src/IceServer.h @@ -20,6 +20,7 @@ #include #include #include +#include typedef QHash NetworkPeerHash; @@ -29,15 +30,15 @@ public: IceServer(int argc, char* argv[]); bool handleHTTPRequest(HTTPConnection* connection, const QUrl& url, bool skipSubHandler = false); private slots: - void processDatagrams(); void clearInactivePeers(); private: - + void processPacket(std::unique_ptr packet); + SharedNetworkPeer addOrUpdateHeartbeatingPeer(udt::Packet& incomingPacket); void sendPeerInformationPacket(const NetworkPeer& peer, const HifiSockAddr* destinationSockAddr); QUuid _id; - QUdpSocket _serverSocket; + udt::Socket _serverSocket; NetworkPeerHash _activePeers; HTTPManager _httpManager; }; diff --git a/libraries/networking/src/LimitedNodeList.cpp b/libraries/networking/src/LimitedNodeList.cpp index 4d8b7e48a3..45deb80c16 100644 --- a/libraries/networking/src/LimitedNodeList.cpp +++ b/libraries/networking/src/LimitedNodeList.cpp @@ -97,7 +97,7 @@ LimitedNodeList::LimitedNodeList(unsigned short socketListenPort, unsigned short // check the local socket right now updateLocalSockAddr(); - // set &PacketReceiver::handleVerifiedPacket as the verified packet function for the udt::Socket + // set &PacketReceiver::handleVerifiedPacket as the verified packet callback for the udt::Socket using std::placeholders::_1; _nodeSocket.setVerifiedPacketCallback(std::bind(&PacketReceiver::handleVerifiedPacket, _packetReceiver, _1)); @@ -273,16 +273,16 @@ qint64 LimitedNodeList::writePacket(const NLPacket& packet, const HifiSockAddr& emit dataSent(NodeType::Unassigned, packet.getDataSize()); - return writeDatagram(QByteArray::fromRawData(packet.getData(), packet.getDataSize()), destinationSockAddr); + return writePacketAndCollectStats(packet, destinationSockAddr); } -qint64 LimitedNodeList::writeDatagram(const QByteArray& datagram, const HifiSockAddr& destinationSockAddr) { +qint64 LimitedNodeList::writePacketAndCollectStats(const NLPacket& packet, const HifiSockAddr& destinationSockAddr) { // XXX can BandwidthRecorder be used for this? // stat collection for packets ++_numCollectedPackets; - _numCollectedBytes += datagram.size(); + _numCollectedBytes += packet.getDataSize(); - qint64 bytesWritten = _nodeSocket.writeDatagram(datagram, destinationSockAddr); + qint64 bytesWritten = _nodeSocket.writeUnreliablePacket(packet, destinationSockAddr); return bytesWritten; } diff --git a/libraries/networking/src/LimitedNodeList.h b/libraries/networking/src/LimitedNodeList.h index ebfe77cc8b..67dc707815 100644 --- a/libraries/networking/src/LimitedNodeList.h +++ b/libraries/networking/src/LimitedNodeList.h @@ -251,7 +251,7 @@ protected: qint64 writePacket(const NLPacket& packet, const Node& destinationNode); qint64 writePacket(const NLPacket& packet, const HifiSockAddr& destinationSockAddr, const QUuid& connectionSecret = QUuid()); - qint64 writeDatagram(const QByteArray& datagram, const HifiSockAddr& destinationSockAddr); + qint64 writePacketAndCollectStats(const NLPacket& packet, const HifiSockAddr& destinationSockAddr); PacketSequenceNumber getNextSequenceNumberForPacket(const QUuid& nodeUUID, PacketType::Value packetType); diff --git a/libraries/networking/src/udt/Socket.cpp b/libraries/networking/src/udt/Socket.cpp index 0192e306c3..33ad4159c2 100644 --- a/libraries/networking/src/udt/Socket.cpp +++ b/libraries/networking/src/udt/Socket.cpp @@ -57,6 +57,10 @@ void Socket::setBufferSizes(int numBytes) { } } +qint64 Socket::writeUnreliablePacket(const Packet& packet, const HifiSockAddr& sockAddr) { + return writeDatagram(packet.getData(), packet.getDataSize(), sockAddr); +} + qint64 Socket::writeDatagram(const QByteArray& datagram, const HifiSockAddr& sockAddr) { qint64 bytesWritten = _udpSocket.writeDatagram(datagram, sockAddr.getAddress(), sockAddr.getPort()); diff --git a/libraries/networking/src/udt/Socket.h b/libraries/networking/src/udt/Socket.h index 36f11213ff..6997986e91 100644 --- a/libraries/networking/src/udt/Socket.h +++ b/libraries/networking/src/udt/Socket.h @@ -34,6 +34,8 @@ public: quint16 localPort() const { return _udpSocket.localPort(); } + qint64 writeUnreliablePacket(const Packet& packet, const HifiSockAddr& sockAddr); + qint64 writeDatagram(const char* data, qint64 size, const HifiSockAddr& sockAddr) { return writeDatagram(QByteArray::fromRawData(data, size), sockAddr); } qint64 writeDatagram(const QByteArray& datagram, const HifiSockAddr& sockAddr);