Merge pull request #10 from birarda/atp

add udt::Socket, leverage in LimitedNodeList
This commit is contained in:
Stephen Birarda 2015-07-22 14:49:35 -07:00
commit f35004d85c
21 changed files with 582 additions and 375 deletions

View file

@ -652,9 +652,6 @@ void AudioMixer::run() {
auto nodeList = DependencyManager::get<NodeList>(); auto nodeList = DependencyManager::get<NodeList>();
// we do not want this event loop to be the handler for UDP datagrams, so disconnect
disconnect(&nodeList->getNodeSocket(), 0, this, 0);
nodeList->addNodeTypeToInterestSet(NodeType::Agent); nodeList->addNodeTypeToInterestSet(NodeType::Agent);
nodeList->linkedDataCreateCallback = [](Node* node) { nodeList->linkedDataCreateCallback = [](Node* node) {

View file

@ -259,7 +259,7 @@ void DomainServer::setupNodeListAndAssignments(const QUuid& sessionUUID) {
auto nodeList = DependencyManager::set<LimitedNodeList>(domainServerPort, domainServerDTLSPort); auto nodeList = DependencyManager::set<LimitedNodeList>(domainServerPort, domainServerDTLSPort);
// no matter the local port, save it to shared mem so that local assignment clients can ask what it is // no matter the local port, save it to shared mem so that local assignment clients can ask what it is
nodeList->putLocalPortIntoSharedMemory(DOMAIN_SERVER_LOCAL_PORT_SMEM_KEY, this, nodeList->getNodeSocket().localPort()); nodeList->putLocalPortIntoSharedMemory(DOMAIN_SERVER_LOCAL_PORT_SMEM_KEY, this, nodeList->getSocketLocalPort());
// store our local http ports in shared memory // store our local http ports in shared memory
quint16 localHttpPort = DOMAIN_SERVER_HTTP_PORT; quint16 localHttpPort = DOMAIN_SERVER_HTTP_PORT;

View file

@ -34,8 +34,12 @@ IceServer::IceServer(int argc, char* argv[]) :
qDebug() << "monitoring http endpoint is listening on " << ICE_SERVER_MONITORING_PORT; qDebug() << "monitoring http endpoint is listening on " << ICE_SERVER_MONITORING_PORT;
_serverSocket.bind(QHostAddress::AnyIPv4, ICE_SERVER_DEFAULT_PORT); _serverSocket.bind(QHostAddress::AnyIPv4, ICE_SERVER_DEFAULT_PORT);
// call our process datagrams slot when the UDP socket has packets ready // set processPacket as the verified packet callback for the udt::Socket
connect(&_serverSocket, &QUdpSocket::readyRead, this, &IceServer::processDatagrams); using std::placeholders::_1;
_serverSocket.setPacketHandler(std::bind(&IceServer::processPacket, this, _1));
// set packetVersionMatch as the verify packet operator for the udt::Socket
_serverSocket.setPacketFilterOperator(std::bind(&IceServer::packetVersionMatch, this, _1));
// setup our timer to clear inactive peers // setup our timer to clear inactive peers
QTimer* inactivePeerTimer = new QTimer(this); QTimer* inactivePeerTimer = new QTimer(this);
@ -44,63 +48,61 @@ IceServer::IceServer(int argc, char* argv[]) :
} }
void IceServer::processDatagrams() { bool IceServer::packetVersionMatch(const udt::Packet& packet) {
HifiSockAddr sendingSockAddr; if (packet.getVersion() == versionForPacketType(packet.getType())) {
return true;
while (_serverSocket.hasPendingDatagrams()) { } else {
// setup a buffer to read the packet into qDebug() << "Packet version mismatch for packet" << packet.getType()
int packetSizeWithHeader = _serverSocket.pendingDatagramSize(); << "(" << nameForPacketType(packet.getType()) << ") from" << packet.getSenderSockAddr();
std::unique_ptr<char> buffer = std::unique_ptr<char>(new char[packetSizeWithHeader]);
_serverSocket.readDatagram(buffer.get(), packetSizeWithHeader,
sendingSockAddr.getAddressPointer(), sendingSockAddr.getPortPointer());
auto packet = Packet::fromReceivedPacket(std::move(buffer), packetSizeWithHeader, sendingSockAddr); return false;
}
}
PacketType::Value packetType = packet->getType(); void IceServer::processPacket(std::unique_ptr<udt::Packet> packet) {
PacketType::Value packetType = packet->getType();
if (packetType == PacketType::ICEServerHeartbeat) {
SharedNetworkPeer peer = addOrUpdateHeartbeatingPeer(*packet); if (packetType == PacketType::ICEServerHeartbeat) {
SharedNetworkPeer peer = addOrUpdateHeartbeatingPeer(*packet);
// so that we can send packets to the heartbeating peer when we need, we need to activate a socket now
peer->activateMatchingOrNewSymmetricSocket(sendingSockAddr); // so that we can send packets to the heartbeating peer when we need, we need to activate a socket now
} else if (packetType == PacketType::ICEServerQuery) { peer->activateMatchingOrNewSymmetricSocket(packet->getSenderSockAddr());
QDataStream heartbeatStream(packet.get()); } else if (packetType == PacketType::ICEServerQuery) {
QDataStream heartbeatStream(packet.get());
// this is a node hoping to connect to a heartbeating peer - do we have the heartbeating peer?
QUuid senderUUID; // this is a node hoping to connect to a heartbeating peer - do we have the heartbeating peer?
heartbeatStream >> senderUUID; QUuid senderUUID;
heartbeatStream >> senderUUID;
// pull the public and private sock addrs for this peer
HifiSockAddr publicSocket, localSocket; // pull the public and private sock addrs for this peer
heartbeatStream >> publicSocket >> localSocket; HifiSockAddr publicSocket, localSocket;
heartbeatStream >> publicSocket >> localSocket;
// check if this node also included a UUID that they would like to connect to
QUuid connectRequestID; // check if this node also included a UUID that they would like to connect to
heartbeatStream >> connectRequestID; QUuid connectRequestID;
heartbeatStream >> connectRequestID;
SharedNetworkPeer matchingPeer = _activePeers.value(connectRequestID);
if (matchingPeer) {
SharedNetworkPeer matchingPeer = _activePeers.value(connectRequestID); qDebug() << "Sending information for peer" << connectRequestID << "to peer" << senderUUID;
if (matchingPeer) { // we have the peer they want to connect to - send them pack the information for that peer
sendPeerInformationPacket(*matchingPeer, &packet->getSenderSockAddr());
qDebug() << "Sending information for peer" << connectRequestID << "to peer" << senderUUID;
// we also need to send them to the active peer they are hoping to connect to
// we have the peer they want to connect to - send them pack the information for that peer // create a dummy peer object we can pass to sendPeerInformationPacket
sendPeerInformationPacket(*(matchingPeer.data()), &sendingSockAddr);
NetworkPeer dummyPeer(senderUUID, publicSocket, localSocket);
// we also need to send them to the active peer they are hoping to connect to sendPeerInformationPacket(dummyPeer, matchingPeer->getActiveSocket());
// create a dummy peer object we can pass to sendPeerInformationPacket } else {
qDebug() << "Peer" << senderUUID << "asked for" << connectRequestID << "but no matching peer found";
NetworkPeer dummyPeer(senderUUID, publicSocket, localSocket);
sendPeerInformationPacket(dummyPeer, matchingPeer->getActiveSocket());
} else {
qDebug() << "Peer" << senderUUID << "asked for" << connectRequestID << "but no matching peer found";
}
} }
} }
} }
SharedNetworkPeer IceServer::addOrUpdateHeartbeatingPeer(Packet& packet) { SharedNetworkPeer IceServer::addOrUpdateHeartbeatingPeer(udt::Packet& packet) {
// pull the UUID, public and private sock addrs for this peer // pull the UUID, public and private sock addrs for this peer
QUuid senderUUID; QUuid senderUUID;
@ -133,14 +135,13 @@ SharedNetworkPeer IceServer::addOrUpdateHeartbeatingPeer(Packet& packet) {
} }
void IceServer::sendPeerInformationPacket(const NetworkPeer& peer, const HifiSockAddr* destinationSockAddr) { void IceServer::sendPeerInformationPacket(const NetworkPeer& peer, const HifiSockAddr* destinationSockAddr) {
auto peerPacket = Packet::create(PacketType::ICEServerPeerInformation); auto peerPacket = udt::Packet::create(PacketType::ICEServerPeerInformation);
// get the byte array for this peer // get the byte array for this peer
peerPacket->write(peer.toByteArray()); peerPacket->write(peer.toByteArray());
// write the current packet // write the current packet
_serverSocket.writeDatagram(peerPacket->getData(), peerPacket->getDataSize(), _serverSocket.writeUnreliablePacket(*peerPacket, *destinationSockAddr);
destinationSockAddr->getAddress(), destinationSockAddr->getPort());
} }
void IceServer::clearInactivePeers() { void IceServer::clearInactivePeers() {

View file

@ -20,6 +20,7 @@
#include <HTTPConnection.h> #include <HTTPConnection.h>
#include <HTTPManager.h> #include <HTTPManager.h>
#include <udt/Packet.h> #include <udt/Packet.h>
#include <udt/Socket.h>
typedef QHash<QUuid, SharedNetworkPeer> NetworkPeerHash; typedef QHash<QUuid, SharedNetworkPeer> NetworkPeerHash;
@ -29,15 +30,16 @@ public:
IceServer(int argc, char* argv[]); IceServer(int argc, char* argv[]);
bool handleHTTPRequest(HTTPConnection* connection, const QUrl& url, bool skipSubHandler = false); bool handleHTTPRequest(HTTPConnection* connection, const QUrl& url, bool skipSubHandler = false);
private slots: private slots:
void processDatagrams();
void clearInactivePeers(); void clearInactivePeers();
private: private:
bool packetVersionMatch(const udt::Packet& packet);
SharedNetworkPeer addOrUpdateHeartbeatingPeer(Packet& incomingPacket); void processPacket(std::unique_ptr<udt::Packet> packet);
SharedNetworkPeer addOrUpdateHeartbeatingPeer(udt::Packet& incomingPacket);
void sendPeerInformationPacket(const NetworkPeer& peer, const HifiSockAddr* destinationSockAddr); void sendPeerInformationPacket(const NetworkPeer& peer, const HifiSockAddr* destinationSockAddr);
QUuid _id; QUuid _id;
QUdpSocket _serverSocket; udt::Socket _serverSocket;
NetworkPeerHash _activePeers; NetworkPeerHash _activePeers;
HTTPManager _httpManager; HTTPManager _httpManager;
}; };

View file

@ -453,8 +453,7 @@ Application::Application(int& argc, char** argv, QElapsedTimer &startup_time) :
connect(nodeList.data(), &NodeList::uuidChanged, _myAvatar, &MyAvatar::setSessionUUID); connect(nodeList.data(), &NodeList::uuidChanged, _myAvatar, &MyAvatar::setSessionUUID);
connect(nodeList.data(), &NodeList::uuidChanged, this, &Application::setSessionUUID); connect(nodeList.data(), &NodeList::uuidChanged, this, &Application::setSessionUUID);
connect(nodeList.data(), &NodeList::limitOfSilentDomainCheckInsReached, nodeList.data(), &NodeList::reset); connect(nodeList.data(), &NodeList::limitOfSilentDomainCheckInsReached, nodeList.data(), &NodeList::reset);
connect(&nodeList->getPacketReceiver(), &PacketReceiver::packetVersionMismatch, connect(nodeList.data(), &NodeList::packetVersionMismatch, this, &Application::notifyPacketVersionMismatch);
this, &Application::notifyPacketVersionMismatch);
// connect to appropriate slots on AccountManager // connect to appropriate slots on AccountManager
AccountManager& accountManager = AccountManager::getInstance(); AccountManager& accountManager = AccountManager::getInstance();

View file

@ -77,8 +77,8 @@ void OctreePacketProcessor::processPacket(QSharedPointer<NLPacket> packet, Share
const QUuid& senderUUID = packet->getSourceID(); const QUuid& senderUUID = packet->getSourceID();
if (!versionDebugSuppressMap.contains(senderUUID, packetType)) { if (!versionDebugSuppressMap.contains(senderUUID, packetType)) {
qDebug() << "Packet version mismatch on" << packetType << "- Sender" qDebug() << "OctreePacketProcessor - piggyback packet version mismatch on" << packetType << "- Sender"
<< senderUUID << "sent" << (int) packetType << "but" << senderUUID << "sent" << (int) packet->getVersion() << "but"
<< (int) versionForPacketType(packetType) << "expected."; << (int) versionForPacketType(packetType) << "expected.";
emit packetVersionMismatch(); emit packetVersionMismatch();

View file

@ -82,7 +82,7 @@ LimitedNodeList::LimitedNodeList(unsigned short socketListenPort, unsigned short
} }
const int LARGER_BUFFER_SIZE = 1048576; const int LARGER_BUFFER_SIZE = 1048576;
changeSocketBufferSizes(LARGER_BUFFER_SIZE); _nodeSocket.setBufferSizes(LARGER_BUFFER_SIZE);
// check for local socket updates every so often // check for local socket updates every so often
const int LOCAL_SOCKET_UPDATE_INTERVAL_MSECS = 5 * 1000; const int LOCAL_SOCKET_UPDATE_INTERVAL_MSECS = 5 * 1000;
@ -96,11 +96,14 @@ LimitedNodeList::LimitedNodeList(unsigned short socketListenPort, unsigned short
// check the local socket right now // check the local socket right now
updateLocalSockAddr(); updateLocalSockAddr();
// set &PacketReceiver::handleVerifiedPacket as the verified packet callback for the udt::Socket
using std::placeholders::_1;
_nodeSocket.setPacketHandler(std::bind(&PacketReceiver::handleVerifiedPacket, _packetReceiver, _1));
// TODO: Create a new thread, and move PacketReceiver to it // set our isPacketVerified method as the verify operator for the udt::Socket
_nodeSocket.setPacketFilterOperator(std::bind(&LimitedNodeList::isPacketVerified, this, _1));
connect(&_nodeSocket, &QUdpSocket::readyRead, _packetReceiver, &PacketReceiver::processDatagrams);
_packetStatTimer.start(); _packetStatTimer.start();
// make sure we handle STUN response packets // make sure we handle STUN response packets
@ -149,53 +152,77 @@ QUdpSocket& LimitedNodeList::getDTLSSocket() {
return *_dtlsSocket; return *_dtlsSocket;
} }
void LimitedNodeList::changeSocketBufferSizes(int numBytes) { bool LimitedNodeList::isPacketVerified(const udt::Packet& packet) {
for (int i = 0; i < 2; i++) { return packetVersionMatch(packet) && packetSourceAndHashMatch(packet);
QAbstractSocket::SocketOption bufferOpt; }
QString bufferTypeString;
if (i == 0) {
bufferOpt = QAbstractSocket::SendBufferSizeSocketOption;
bufferTypeString = "send";
bool LimitedNodeList::packetVersionMatch(const udt::Packet& packet) {
if (packet.getVersion() != versionForPacketType(packet.getType())) {
static QMultiHash<QUuid, PacketType::Value> sourcedVersionDebugSuppressMap;
static QMultiHash<HifiSockAddr, PacketType::Value> versionDebugSuppressMap;
bool hasBeenOutput = false;
QString senderString;
if (NON_SOURCED_PACKETS.contains(packet.getType())) {
const HifiSockAddr& senderSockAddr = packet.getSenderSockAddr();
hasBeenOutput = versionDebugSuppressMap.contains(senderSockAddr, packet.getType());
if (!hasBeenOutput) {
versionDebugSuppressMap.insert(senderSockAddr, packet.getType());
senderString = QString("%1:%2").arg(senderSockAddr.getAddress().toString()).arg(senderSockAddr.getPort());
}
} else { } else {
bufferOpt = QAbstractSocket::ReceiveBufferSizeSocketOption; QUuid sourceID = QUuid::fromRfc4122(QByteArray::fromRawData(packet.getPayload(), NUM_BYTES_RFC4122_UUID));
bufferTypeString = "receive";
hasBeenOutput = sourcedVersionDebugSuppressMap.contains(sourceID, packet.getType());
if (!hasBeenOutput) {
sourcedVersionDebugSuppressMap.insert(sourceID, packet.getType());
senderString = uuidStringWithoutCurlyBraces(sourceID.toString());
}
} }
int oldBufferSize = _nodeSocket.socketOption(bufferOpt).toInt();
if (oldBufferSize < numBytes) { if (!hasBeenOutput) {
int newBufferSize = _nodeSocket.socketOption(bufferOpt).toInt(); qCDebug(networking) << "Packet version mismatch on" << packet.getType() << "- Sender"
<< senderString << "sent" << qPrintable(QString::number(packet.getVersion())) << "but"
qCDebug(networking) << "Changed socket" << bufferTypeString << "buffer size from" << oldBufferSize << "to" << qPrintable(QString::number(versionForPacketType(packet.getType()))) << "expected.";
<< newBufferSize << "bytes";
} else { emit packetVersionMismatch(packet.getType());
// don't make the buffer smaller
qCDebug(networking) << "Did not change socket" << bufferTypeString << "buffer size from" << oldBufferSize
<< "since it is larger than desired size of" << numBytes;
} }
return false;
} else {
return true;
} }
} }
bool LimitedNodeList::packetSourceAndHashMatch(const NLPacket& packet, SharedNodePointer& matchingNode) { bool LimitedNodeList::packetSourceAndHashMatch(const udt::Packet& packet) {
if (NON_SOURCED_PACKETS.contains(packet.getType())) { if (NON_SOURCED_PACKETS.contains(packet.getType())) {
return true; return true;
} else { } else {
QUuid sourceID = NLPacket::sourceIDInHeader(packet);
// figure out which node this is from // figure out which node this is from
matchingNode = nodeWithUUID(packet.getSourceID()); SharedNodePointer matchingNode = nodeWithUUID(sourceID);
if (matchingNode) { if (matchingNode) {
if (!NON_VERIFIED_PACKETS.contains(packet.getType())) { if (!NON_VERIFIED_PACKETS.contains(packet.getType())) {
QByteArray packetHeaderHash = NLPacket::verificationHashInHeader(packet);
QByteArray expectedHash = NLPacket::hashForPacketAndSecret(packet, matchingNode->getConnectionSecret());
// check if the md5 hash in the header matches the hash we would expect // check if the md5 hash in the header matches the hash we would expect
if (packet.getVerificationHash() != packet.payloadHashWithConnectionUUID(matchingNode->getConnectionSecret())) { if (packetHeaderHash != expectedHash) {
static QMultiMap<QUuid, PacketType::Value> hashDebugSuppressMap; static QMultiMap<QUuid, PacketType::Value> hashDebugSuppressMap;
const QUuid& senderID = packet.getSourceID(); if (!hashDebugSuppressMap.contains(sourceID, packet.getType())) {
qCDebug(networking) << "Packet hash mismatch on" << packet.getType() << "- Sender" << sourceID;
if (!hashDebugSuppressMap.contains(senderID, packet.getType())) { hashDebugSuppressMap.insert(sourceID, packet.getType());
qCDebug(networking) << "Packet hash mismatch on" << packet.getType() << "- Sender" << senderID;
hashDebugSuppressMap.insert(senderID, packet.getType());
} }
return false; return false;
@ -209,7 +236,7 @@ bool LimitedNodeList::packetSourceAndHashMatch(const NLPacket& packet, SharedNod
= LogHandler::getInstance().addRepeatedMessageRegex("Packet of type \\d+ \\([\\sa-zA-Z]+\\) received from unknown node with UUID"); = LogHandler::getInstance().addRepeatedMessageRegex("Packet of type \\d+ \\([\\sa-zA-Z]+\\) received from unknown node with UUID");
qCDebug(networking) << "Packet of type" << packet.getType() << "(" << qPrintable(nameForPacketType(packet.getType())) << ")" qCDebug(networking) << "Packet of type" << packet.getType() << "(" << qPrintable(nameForPacketType(packet.getType())) << ")"
<< "received from unknown node with UUID" << qPrintable(uuidStringWithoutCurlyBraces(packet.getSourceID())); << "received from unknown node with UUID" << qPrintable(uuidStringWithoutCurlyBraces(sourceID));
} }
} }
@ -241,26 +268,21 @@ qint64 LimitedNodeList::writePacket(const NLPacket& packet, const HifiSockAddr&
if (!connectionSecret.isNull() if (!connectionSecret.isNull()
&& !NON_SOURCED_PACKETS.contains(packet.getType()) && !NON_SOURCED_PACKETS.contains(packet.getType())
&& !NON_VERIFIED_PACKETS.contains(packet.getType())) { && !NON_VERIFIED_PACKETS.contains(packet.getType())) {
const_cast<NLPacket&>(packet).writeVerificationHash(packet.payloadHashWithConnectionUUID(connectionSecret)); const_cast<NLPacket&>(packet).writeVerificationHashGivenSecret(connectionSecret);
} }
emit dataSent(NodeType::Unassigned, packet.getDataSize()); emit dataSent(NodeType::Unassigned, packet.getDataSize());
return writeDatagram(QByteArray::fromRawData(packet.getData(), packet.getDataSize()), destinationSockAddr); return writePacketAndCollectStats(packet, destinationSockAddr);
} }
qint64 LimitedNodeList::writeDatagram(const QByteArray& datagram, const HifiSockAddr& destinationSockAddr) { qint64 LimitedNodeList::writePacketAndCollectStats(const NLPacket& packet, const HifiSockAddr& destinationSockAddr) {
// XXX can BandwidthRecorder be used for this? // XXX can BandwidthRecorder be used for this?
// stat collection for packets // stat collection for packets
++_numCollectedPackets; ++_numCollectedPackets;
_numCollectedBytes += datagram.size(); _numCollectedBytes += packet.getDataSize();
qint64 bytesWritten = _nodeSocket.writeDatagram(datagram, qint64 bytesWritten = _nodeSocket.writeUnreliablePacket(packet, destinationSockAddr);
destinationSockAddr.getAddress(), destinationSockAddr.getPort());
if (bytesWritten < 0) {
qCDebug(networking) << "ERROR in writeDatagram:" << _nodeSocket.error() << "-" << _nodeSocket.errorString();
}
return bytesWritten; return bytesWritten;
} }
@ -571,7 +593,7 @@ void LimitedNodeList::sendSTUNRequest() {
++_numInitialSTUNRequests; ++_numInitialSTUNRequests;
} }
unsigned char stunRequestPacket[NUM_BYTES_STUN_HEADER]; char stunRequestPacket[NUM_BYTES_STUN_HEADER];
int packetIndex = 0; int packetIndex = 0;
@ -597,15 +619,7 @@ void LimitedNodeList::sendSTUNRequest() {
flagTimeForConnectionStep(ConnectionStep::SendSTUNRequest); flagTimeForConnectionStep(ConnectionStep::SendSTUNRequest);
_nodeSocket.writeDatagram((char*) stunRequestPacket, sizeof(stunRequestPacket), _nodeSocket.writeDatagram(stunRequestPacket, sizeof(stunRequestPacket), _stunSockAddr);
_stunSockAddr.getAddress(), _stunSockAddr.getPort());
}
void LimitedNodeList::rebindNodeSocket() {
quint16 oldPort = _nodeSocket.localPort();
_nodeSocket.close();
_nodeSocket.bind(QHostAddress::AnyIPv4, oldPort);
} }
bool LimitedNodeList::processSTUNResponse(QSharedPointer<NLPacket> packet) { bool LimitedNodeList::processSTUNResponse(QSharedPointer<NLPacket> packet) {
@ -696,6 +710,7 @@ void LimitedNodeList::startSTUNPublicSocketUpdate() {
// if we don't know the STUN IP yet we need to have ourselves be called once it is known // if we don't know the STUN IP yet we need to have ourselves be called once it is known
if (_stunSockAddr.getAddress().isNull()) { if (_stunSockAddr.getAddress().isNull()) {
connect(&_stunSockAddr, &HifiSockAddr::lookupCompleted, this, &LimitedNodeList::startSTUNPublicSocketUpdate); connect(&_stunSockAddr, &HifiSockAddr::lookupCompleted, this, &LimitedNodeList::startSTUNPublicSocketUpdate);
connect(&_stunSockAddr, &HifiSockAddr::lookupCompleted, this, &LimitedNodeList::addSTUNSockAddrToUnfiltered);
// in case we just completely fail to lookup the stun socket - add a 10s timeout that will trigger the fail case // in case we just completely fail to lookup the stun socket - add a 10s timeout that will trigger the fail case
const quint64 STUN_DNS_LOOKUP_TIMEOUT_MSECS = 10 * 1000; const quint64 STUN_DNS_LOOKUP_TIMEOUT_MSECS = 10 * 1000;

View file

@ -37,9 +37,10 @@
#include "DomainHandler.h" #include "DomainHandler.h"
#include "Node.h" #include "Node.h"
#include "NLPacket.h" #include "NLPacket.h"
#include "udt/PacketHeaders.h"
#include "PacketReceiver.h" #include "PacketReceiver.h"
#include "NLPacketList.h" #include "NLPacketList.h"
#include "udt/PacketHeaders.h"
#include "udt/Socket.h"
#include "UUIDHasher.h" #include "UUIDHasher.h"
const quint64 NODE_SILENCE_THRESHOLD_MSECS = 2 * 1000; const quint64 NODE_SILENCE_THRESHOLD_MSECS = 2 * 1000;
@ -109,13 +110,10 @@ public:
bool getThisNodeCanRez() const { return _thisNodeCanRez; } bool getThisNodeCanRez() const { return _thisNodeCanRez; }
void setThisNodeCanRez(bool canRez); void setThisNodeCanRez(bool canRez);
void rebindNodeSocket(); quint16 getSocketLocalPort() const { return _nodeSocket.localPort(); }
QUdpSocket& getNodeSocket() { return _nodeSocket; }
QUdpSocket& getDTLSSocket(); QUdpSocket& getDTLSSocket();
bool packetSourceAndHashMatch(const NLPacket& packet, SharedNodePointer& matchingNode);
PacketReceiver& getPacketReceiver() { return *_packetReceiver; } PacketReceiver& getPacketReceiver() { return *_packetReceiver; }
qint64 sendUnreliablePacket(const NLPacket& packet, const Node& destinationNode); qint64 sendUnreliablePacket(const NLPacket& packet, const Node& destinationNode);
@ -233,6 +231,7 @@ public slots:
signals: signals:
void dataSent(quint8 channelType, int bytes); void dataSent(quint8 channelType, int bytes);
void packetVersionMismatch(PacketType::Value type);
void uuidChanged(const QUuid& ownerUUID, const QUuid& oldUUID); void uuidChanged(const QUuid& ownerUUID, const QUuid& oldUUID);
void nodeAdded(SharedNodePointer); void nodeAdded(SharedNodePointer);
@ -252,11 +251,13 @@ protected:
qint64 writePacket(const NLPacket& packet, const Node& destinationNode); qint64 writePacket(const NLPacket& packet, const Node& destinationNode);
qint64 writePacket(const NLPacket& packet, const HifiSockAddr& destinationSockAddr, qint64 writePacket(const NLPacket& packet, const HifiSockAddr& destinationSockAddr,
const QUuid& connectionSecret = QUuid()); const QUuid& connectionSecret = QUuid());
qint64 writeDatagram(const QByteArray& datagram, const HifiSockAddr& destinationSockAddr); qint64 writePacketAndCollectStats(const NLPacket& packet, const HifiSockAddr& destinationSockAddr);
PacketSequenceNumber getNextSequenceNumberForPacket(const QUuid& nodeUUID, PacketType::Value packetType); PacketSequenceNumber getNextSequenceNumberForPacket(const QUuid& nodeUUID, PacketType::Value packetType);
void changeSocketBufferSizes(int numBytes); bool isPacketVerified(const udt::Packet& packet);
bool packetVersionMatch(const udt::Packet& packet);
bool packetSourceAndHashMatch(const udt::Packet& packet);
void handleNodeKill(const SharedNodePointer& node); void handleNodeKill(const SharedNodePointer& node);
@ -272,7 +273,7 @@ protected:
QUuid _sessionUUID; QUuid _sessionUUID;
NodeHash _nodeHash; NodeHash _nodeHash;
QReadWriteLock _nodeMutex; QReadWriteLock _nodeMutex;
QUdpSocket _nodeSocket; udt::Socket _nodeSocket;
QUdpSocket* _dtlsSocket; QUdpSocket* _dtlsSocket;
HifiSockAddr _localSockAddr; HifiSockAddr _localSockAddr;
HifiSockAddr _publicSockAddr; HifiSockAddr _publicSockAddr;
@ -309,9 +310,11 @@ protected:
functor(it); functor(it);
} }
} }
private slots: private slots:
void flagTimeForConnectionStep(ConnectionStep connectionStep, quint64 timestamp); void flagTimeForConnectionStep(ConnectionStep connectionStep, quint64 timestamp);
void possiblyTimeoutSTUNAddressLookup(); void possiblyTimeoutSTUNAddressLookup();
void addSTUNSockAddrToUnfiltered() { _nodeSocket.addUnfilteredSockAddr(_stunSockAddr); } // called once STUN socket known
}; };
#endif // hifi_LimitedNodeList_h #endif // hifi_LimitedNodeList_h

View file

@ -63,6 +63,14 @@ std::unique_ptr<NLPacket> NLPacket::fromReceivedPacket(std::unique_ptr<char> dat
} }
std::unique_ptr<NLPacket> NLPacket::fromBase(std::unique_ptr<Packet> packet) {
// Fail with null packet
Q_ASSERT(packet);
// call our constructor to create an NLPacket from this Packet
return std::unique_ptr<NLPacket>(new NLPacket(std::move(packet)));
}
std::unique_ptr<NLPacket> NLPacket::createCopy(const NLPacket& other) { std::unique_ptr<NLPacket> NLPacket::createCopy(const NLPacket& other) {
return std::unique_ptr<NLPacket>(new NLPacket(other)); return std::unique_ptr<NLPacket>(new NLPacket(other));
} }
@ -81,37 +89,88 @@ NLPacket::NLPacket(PacketType::Value type) :
adjustPayloadStartAndCapacity(); adjustPayloadStartAndCapacity();
} }
NLPacket::NLPacket(const NLPacket& other) : Packet(other) { NLPacket::NLPacket(std::unique_ptr<Packet> packet) :
Packet(std::move(*packet.release()))
{
adjustPayloadStartAndCapacity(_payloadSize > 0);
readSourceID();
}
NLPacket::NLPacket(const NLPacket& other) : Packet(other) {
_sourceID = other._sourceID;
} }
NLPacket::NLPacket(std::unique_ptr<char> data, qint64 size, const HifiSockAddr& senderSockAddr) : NLPacket::NLPacket(std::unique_ptr<char> data, qint64 size, const HifiSockAddr& senderSockAddr) :
Packet(std::move(data), size, senderSockAddr) Packet(std::move(data), size, senderSockAddr)
{ {
adjustPayloadStartAndCapacity(); // sanity check before we decrease the payloadSize with the payloadCapacity
_payloadSize = _payloadCapacity; Q_ASSERT(_payloadSize == _payloadCapacity);
adjustPayloadStartAndCapacity(_payloadSize > 0);
readSourceID(); readSourceID();
readVerificationHash();
} }
void NLPacket::adjustPayloadStartAndCapacity() { NLPacket::NLPacket(NLPacket&& other) :
Packet(other)
{
_sourceID = std::move(other._sourceID);
}
NLPacket& NLPacket::operator=(const NLPacket& other) {
Packet::operator=(other);
_sourceID = other._sourceID;
return *this;
}
NLPacket& NLPacket::operator=(NLPacket&& other) {
Packet::operator=(std::move(other));
_sourceID = std::move(other._sourceID);
return *this;
}
QUuid NLPacket::sourceIDInHeader(const udt::Packet& packet) {
int offset = packet.Packet::localHeaderSize();
return QUuid::fromRfc4122(QByteArray::fromRawData(packet.getData() + offset, NUM_BYTES_RFC4122_UUID));
}
QByteArray NLPacket::verificationHashInHeader(const udt::Packet& packet) {
int offset = packet.Packet::localHeaderSize() + NUM_BYTES_RFC4122_UUID;
return QByteArray(packet.getData() + offset, NUM_BYTES_MD5_HASH);
}
QByteArray NLPacket::hashForPacketAndSecret(const udt::Packet& packet, const QUuid& connectionSecret) {
QCryptographicHash hash(QCryptographicHash::Md5);
int offset = packet.Packet::localHeaderSize() + NUM_BYTES_RFC4122_UUID + NUM_BYTES_MD5_HASH;
// add the packet payload and the connection UUID
hash.addData(packet.getData() + offset, packet.getDataSize() - offset);
hash.addData(connectionSecret.toRfc4122());
// return the hash
return hash.result();
}
void NLPacket::adjustPayloadStartAndCapacity(bool shouldDecreasePayloadSize) {
qint64 headerSize = localHeaderSize(_type); qint64 headerSize = localHeaderSize(_type);
_payloadStart += headerSize; _payloadStart += headerSize;
_payloadCapacity -= headerSize; _payloadCapacity -= headerSize;
if (shouldDecreasePayloadSize) {
_payloadSize -= headerSize;
}
} }
void NLPacket::readSourceID() { void NLPacket::readSourceID() {
if (!NON_SOURCED_PACKETS.contains(_type)) { if (!NON_SOURCED_PACKETS.contains(_type)) {
auto offset = Packet::localHeaderSize(); _sourceID = sourceIDInHeader(*this);
_sourceID = QUuid::fromRfc4122(QByteArray::fromRawData(_packet.get() + offset, NUM_BYTES_RFC4122_UUID));
}
}
void NLPacket::readVerificationHash() {
if (!NON_SOURCED_PACKETS.contains(_type) && !NON_VERIFIED_PACKETS.contains(_type)) {
auto offset = Packet::localHeaderSize() + NUM_BYTES_RFC4122_UUID;
_verificationHash = QByteArray(_packet.get() + offset, NUM_BYTES_MD5_HASH);
} }
} }
@ -124,22 +183,11 @@ void NLPacket::writeSourceID(const QUuid& sourceID) {
_sourceID = sourceID; _sourceID = sourceID;
} }
void NLPacket::writeVerificationHash(const QByteArray& verificationHash) { void NLPacket::writeVerificationHashGivenSecret(const QUuid& connectionSecret) {
Q_ASSERT(!NON_SOURCED_PACKETS.contains(_type) && !NON_VERIFIED_PACKETS.contains(_type)); Q_ASSERT(!NON_SOURCED_PACKETS.contains(_type) && !NON_VERIFIED_PACKETS.contains(_type));
auto offset = Packet::localHeaderSize() + NUM_BYTES_RFC4122_UUID; auto offset = Packet::localHeaderSize() + NUM_BYTES_RFC4122_UUID;
QByteArray verificationHash = hashForPacketAndSecret(*this, connectionSecret);
memcpy(_packet.get() + offset, verificationHash.data(), verificationHash.size()); memcpy(_packet.get() + offset, verificationHash.data(), verificationHash.size());
_verificationHash = verificationHash;
}
QByteArray NLPacket::payloadHashWithConnectionUUID(const QUuid& connectionUUID) const {
QCryptographicHash hash(QCryptographicHash::Md5);
// add the packet payload and the connection UUID
hash.addData(_payloadStart, _payloadSize);
hash.addData(connectionUUID.toRfc4122());
// return the hash
return hash.result();
} }

View file

@ -16,14 +16,20 @@
#include "udt/Packet.h" #include "udt/Packet.h"
class NLPacket : public Packet { class NLPacket : public udt::Packet {
Q_OBJECT Q_OBJECT
public: public:
static std::unique_ptr<NLPacket> create(PacketType::Value type, qint64 size = -1); static std::unique_ptr<NLPacket> create(PacketType::Value type, qint64 size = -1);
static std::unique_ptr<NLPacket> fromReceivedPacket(std::unique_ptr<char> data, qint64 size, static std::unique_ptr<NLPacket> fromReceivedPacket(std::unique_ptr<char> data, qint64 size,
const HifiSockAddr& senderSockAddr); const HifiSockAddr& senderSockAddr);
static std::unique_ptr<NLPacket> fromBase(std::unique_ptr<Packet> packet);
// Provided for convenience, try to limit use // Provided for convenience, try to limit use
static std::unique_ptr<NLPacket> createCopy(const NLPacket& other); static std::unique_ptr<NLPacket> createCopy(const NLPacket& other);
static QUuid sourceIDInHeader(const udt::Packet& packet);
static QByteArray verificationHashInHeader(const udt::Packet& packet);
static QByteArray hashForPacketAndSecret(const udt::Packet& packet, const QUuid& connectionSecret);
static qint64 localHeaderSize(PacketType::Value type); static qint64 localHeaderSize(PacketType::Value type);
static qint64 maxPayloadSize(PacketType::Value type); static qint64 maxPayloadSize(PacketType::Value type);
@ -32,27 +38,27 @@ public:
virtual qint64 localHeaderSize() const; // Current level's header size virtual qint64 localHeaderSize() const; // Current level's header size
const QUuid& getSourceID() const { return _sourceID; } const QUuid& getSourceID() const { return _sourceID; }
const QByteArray& getVerificationHash() const { return _verificationHash; }
void writeSourceID(const QUuid& sourceID); void writeSourceID(const QUuid& sourceID);
void writeVerificationHash(const QByteArray& verificationHash); void writeVerificationHashGivenSecret(const QUuid& connectionSecret);
QByteArray payloadHashWithConnectionUUID(const QUuid& connectionUUID) const;
protected: protected:
void adjustPayloadStartAndCapacity(); void adjustPayloadStartAndCapacity(bool shouldDecreasePayloadSize = false);
NLPacket(PacketType::Value type); NLPacket(PacketType::Value type);
NLPacket(PacketType::Value type, qint64 size); NLPacket(PacketType::Value type, qint64 size);
NLPacket(std::unique_ptr<char> data, qint64 size, const HifiSockAddr& senderSockAddr); NLPacket(std::unique_ptr<char> data, qint64 size, const HifiSockAddr& senderSockAddr);
NLPacket(std::unique_ptr<Packet> packet);
NLPacket(const NLPacket& other); NLPacket(const NLPacket& other);
NLPacket(NLPacket&& other);
NLPacket& operator=(const NLPacket& other);
NLPacket& operator=(NLPacket&& other);
void readSourceID(); void readSourceID();
void readVerificationHash();
QUuid _sourceID; QUuid _sourceID;
QByteArray _verificationHash;
}; };
#endif // hifi_NLPacket_h #endif // hifi_NLPacket_h

View file

@ -19,7 +19,7 @@ NLPacketList::NLPacketList(PacketType::Value packetType, QByteArray extendedHead
} }
std::unique_ptr<Packet> NLPacketList::createPacket() { std::unique_ptr<udt::Packet> NLPacketList::createPacket() {
return NLPacket::create(getType()); return NLPacket::create(getType());
} }

View file

@ -14,7 +14,7 @@
#include "udt/PacketList.h" #include "udt/PacketList.h"
class NLPacketList : public PacketList { class NLPacketList : public udt::PacketList {
public: public:
NLPacketList(PacketType::Value packetType, QByteArray extendedHeader = QByteArray()); NLPacketList(PacketType::Value packetType, QByteArray extendedHeader = QByteArray());
@ -22,7 +22,7 @@ private:
NLPacketList(const NLPacketList& other) = delete; NLPacketList(const NLPacketList& other) = delete;
NLPacketList& operator=(const NLPacketList& other) = delete; NLPacketList& operator=(const NLPacketList& other) = delete;
virtual std::unique_ptr<Packet> createPacket(); virtual std::unique_ptr<udt::Packet> createPacket();
}; };
#endif // hifi_PacketList_h #endif // hifi_PacketList_h

View file

@ -54,12 +54,14 @@ void NetworkPeer::setPublicSocket(const HifiSockAddr& publicSocket) {
// if the active socket was the public socket then reset it to NULL // if the active socket was the public socket then reset it to NULL
_activeSocket = NULL; _activeSocket = NULL;
} }
if (!_publicSocket.isNull()) { bool wasOldSocketNull = _publicSocket.isNull();
qCDebug(networking) << "Public socket change for node" << *this;
}
_publicSocket = publicSocket; _publicSocket = publicSocket;
if (!wasOldSocketNull) {
qCDebug(networking) << "Public socket change for node" << *this;
}
} }
} }
@ -69,12 +71,14 @@ void NetworkPeer::setLocalSocket(const HifiSockAddr& localSocket) {
// if the active socket was the local socket then reset it to NULL // if the active socket was the local socket then reset it to NULL
_activeSocket = NULL; _activeSocket = NULL;
} }
bool wasOldSocketNull = _localSocket.isNull();
_localSocket = localSocket;
if (!_localSocket.isNull()) { if (!wasOldSocketNull) {
qCDebug(networking) << "Local socket change for node" << *this; qCDebug(networking) << "Local socket change for node" << *this;
} }
_localSocket = localSocket;
} }
} }
@ -84,12 +88,14 @@ void NetworkPeer::setSymmetricSocket(const HifiSockAddr& symmetricSocket) {
// if the active socket was the symmetric socket then reset it to NULL // if the active socket was the symmetric socket then reset it to NULL
_activeSocket = NULL; _activeSocket = NULL;
} }
if (!_symmetricSocket.isNull()) { bool wasOldSocketNull = _symmetricSocket.isNull();
_symmetricSocket = symmetricSocket;
if (!wasOldSocketNull) {
qCDebug(networking) << "Symmetric socket change for node" << *this; qCDebug(networking) << "Symmetric socket change for node" << *this;
} }
_symmetricSocket = symmetricSocket;
} }
} }

View file

@ -192,189 +192,134 @@ void PacketReceiver::unregisterListener(QObject* listener) {
_directConnectSetMutex.unlock(); _directConnectSetMutex.unlock();
} }
bool PacketReceiver::packetVersionMatch(const NLPacket& packet) { void PacketReceiver::handleVerifiedPacket(std::unique_ptr<udt::Packet> packet) {
if (packet.getVersion() != versionForPacketType(packet.getType()) // if we're supposed to drop this packet then break out here
&& packet.getType() != PacketType::StunResponse) { if (_shouldDropPackets) {
return;
static QMultiHash<QUuid, PacketType::Value> sourcedVersionDebugSuppressMap;
static QMultiHash<HifiSockAddr, PacketType::Value> versionDebugSuppressMap;
bool hasBeenOutput = false;
QString senderString;
if (NON_SOURCED_PACKETS.contains(packet.getType())) {
const HifiSockAddr& senderSockAddr = packet.getSenderSockAddr();
hasBeenOutput = versionDebugSuppressMap.contains(senderSockAddr, packet.getType());
if (!hasBeenOutput) {
versionDebugSuppressMap.insert(senderSockAddr, packet.getType());
senderString = QString("%1:%2").arg(senderSockAddr.getAddress().toString()).arg(senderSockAddr.getPort());
}
} else {
hasBeenOutput = sourcedVersionDebugSuppressMap.contains(packet.getSourceID(), packet.getType());
if (!hasBeenOutput) {
sourcedVersionDebugSuppressMap.insert(packet.getSourceID(), packet.getType());
senderString = uuidStringWithoutCurlyBraces(packet.getSourceID().toString());
}
}
if (!hasBeenOutput) {
qCDebug(networking) << "Packet version mismatch on" << packet.getType() << "- Sender"
<< senderString << "sent" << qPrintable(QString::number(packet.getVersion())) << "but"
<< qPrintable(QString::number(versionForPacketType(packet.getType()))) << "expected.";
emit packetVersionMismatch(packet.getType());
}
return false;
} else {
return true;
} }
}
void PacketReceiver::processDatagrams() {
//PerformanceWarning warn(Menu::getInstance()->isOptionChecked(MenuOption::PipelineWarnings),
//"PacketReceiver::processDatagrams()");
auto nodeList = DependencyManager::get<LimitedNodeList>(); auto nodeList = DependencyManager::get<LimitedNodeList>();
while (nodeList && nodeList->getNodeSocket().hasPendingDatagrams()) { // setup a HifiSockAddr to read into
// setup a buffer to read the packet into HifiSockAddr senderSockAddr;
int packetSizeWithHeader = nodeList->getNodeSocket().pendingDatagramSize();
std::unique_ptr<char> buffer = std::unique_ptr<char>(new char[packetSizeWithHeader]); // setup an NLPacket from the data we just read
auto nlPacket = NLPacket::fromBase(std::move(packet));
// if we're supposed to drop this packet then break out here
if (_shouldDropPackets) { _inPacketCount++;
break; _inByteCount += nlPacket->getDataSize();
}
SharedNodePointer matchingNode;
// setup a HifiSockAddr to read into if (!nlPacket->getSourceID().isNull()) {
HifiSockAddr senderSockAddr; matchingNode = nodeList->nodeWithUUID(nlPacket->getSourceID());
// pull the datagram
nodeList->getNodeSocket().readDatagram(buffer.get(), packetSizeWithHeader,
senderSockAddr.getAddressPointer(), senderSockAddr.getPortPointer());
// setup an NLPacket from the data we just read
auto packet = NLPacket::fromReceivedPacket(std::move(buffer), packetSizeWithHeader, senderSockAddr);
_inPacketCount++; if (matchingNode) {
_inByteCount += packetSizeWithHeader; // No matter if this packet is handled or not, we update the timestamp for the last time we heard
// from this sending node
if (packetVersionMatch(*packet)) { matchingNode->setLastHeardMicrostamp(usecTimestampNow());
SharedNodePointer matchingNode;
if (nodeList->packetSourceAndHashMatch(*packet, matchingNode)) {
if (matchingNode) {
// No matter if this packet is handled or not, we update the timestamp for the last time we heard
// from this sending node
matchingNode->setLastHeardMicrostamp(usecTimestampNow());
}
_packetListenerLock.lock();
bool listenerIsDead = false;
auto it = _packetListenerMap.find(packet->getType());
if (it != _packetListenerMap.end()) {
auto listener = it.value();
if (listener.first) {
bool success = false;
// check if this is a directly connected listener
_directConnectSetMutex.lock();
Qt::ConnectionType connectionType =
_directlyConnectedObjects.contains(listener.first) ? Qt::DirectConnection : Qt::AutoConnection;
_directConnectSetMutex.unlock();
PacketType::Value packetType = packet->getType();
if (matchingNode) {
// if this was a sequence numbered packet we should store the last seq number for
// a packet of this type for this node
if (SEQUENCE_NUMBERED_PACKETS.contains(packet->getType())) {
matchingNode->setLastSequenceNumberForPacketType(packet->readSequenceNumber(), packet->getType());
}
emit dataReceived(matchingNode->getType(), packet->getDataSize());
QMetaMethod metaMethod = listener.second;
static const QByteArray QSHAREDPOINTER_NODE_NORMALIZED = QMetaObject::normalizedType("QSharedPointer<Node>");
static const QByteArray SHARED_NODE_NORMALIZED = QMetaObject::normalizedType("SharedNodePointer");
// one final check on the QPointer before we go to invoke
if (listener.first) {
if (metaMethod.parameterTypes().contains(SHARED_NODE_NORMALIZED)) {
success = metaMethod.invoke(listener.first,
connectionType,
Q_ARG(QSharedPointer<NLPacket>,
QSharedPointer<NLPacket>(packet.release())),
Q_ARG(SharedNodePointer, matchingNode));
} else if (metaMethod.parameterTypes().contains(QSHAREDPOINTER_NODE_NORMALIZED)) {
success = metaMethod.invoke(listener.first,
connectionType,
Q_ARG(QSharedPointer<NLPacket>,
QSharedPointer<NLPacket>(packet.release())),
Q_ARG(QSharedPointer<Node>, matchingNode));
} else {
success = metaMethod.invoke(listener.first,
connectionType,
Q_ARG(QSharedPointer<NLPacket>,
QSharedPointer<NLPacket>(packet.release())));
}
} else {
listenerIsDead = true;
}
} else {
emit dataReceived(NodeType::Unassigned, packet->getDataSize());
success = listener.second.invoke(listener.first,
Q_ARG(QSharedPointer<NLPacket>, QSharedPointer<NLPacket>(packet.release())));
}
if (!success) {
qDebug().nospace() << "Error delivering packet " << packetType
<< " (" << qPrintable(nameForPacketType(packetType)) << ") to listener "
<< listener.first << "::" << qPrintable(listener.second.methodSignature());
}
} else {
listenerIsDead = true;
}
if (listenerIsDead) {
qDebug().nospace() << "Listener for packet" << packet->getType()
<< " (" << qPrintable(nameForPacketType(packet->getType())) << ")"
<< " has been destroyed. Removing from listener map.";
it = _packetListenerMap.erase(it);
// if it exists, remove the listener from _directlyConnectedObjects
_directConnectSetMutex.lock();
_directlyConnectedObjects.remove(listener.first);
_directConnectSetMutex.unlock();
}
} else {
qWarning() << "No listener found for packet type " << nameForPacketType(packet->getType());
// insert a dummy listener so we don't print this again
_packetListenerMap.insert(packet->getType(), { nullptr, QMetaMethod() });
}
_packetListenerLock.unlock();
}
} }
} }
_packetListenerLock.lock();
bool listenerIsDead = false;
auto it = _packetListenerMap.find(nlPacket->getType());
if (it != _packetListenerMap.end()) {
auto listener = it.value();
if (listener.first) {
bool success = false;
// check if this is a directly connected listener
_directConnectSetMutex.lock();
Qt::ConnectionType connectionType =
_directlyConnectedObjects.contains(listener.first) ? Qt::DirectConnection : Qt::AutoConnection;
_directConnectSetMutex.unlock();
PacketType::Value packetType = nlPacket->getType();
if (matchingNode) {
// if this was a sequence numbered packet we should store the last seq number for
// a packet of this type for this node
if (SEQUENCE_NUMBERED_PACKETS.contains(nlPacket->getType())) {
matchingNode->setLastSequenceNumberForPacketType(nlPacket->readSequenceNumber(), nlPacket->getType());
}
emit dataReceived(matchingNode->getType(), nlPacket->getDataSize());
QMetaMethod metaMethod = listener.second;
static const QByteArray QSHAREDPOINTER_NODE_NORMALIZED = QMetaObject::normalizedType("QSharedPointer<Node>");
static const QByteArray SHARED_NODE_NORMALIZED = QMetaObject::normalizedType("SharedNodePointer");
// one final check on the QPointer before we go to invoke
if (listener.first) {
if (metaMethod.parameterTypes().contains(SHARED_NODE_NORMALIZED)) {
success = metaMethod.invoke(listener.first,
connectionType,
Q_ARG(QSharedPointer<NLPacket>,
QSharedPointer<NLPacket>(nlPacket.release())),
Q_ARG(SharedNodePointer, matchingNode));
} else if (metaMethod.parameterTypes().contains(QSHAREDPOINTER_NODE_NORMALIZED)) {
success = metaMethod.invoke(listener.first,
connectionType,
Q_ARG(QSharedPointer<NLPacket>,
QSharedPointer<NLPacket>(nlPacket.release())),
Q_ARG(QSharedPointer<Node>, matchingNode));
} else {
success = metaMethod.invoke(listener.first,
connectionType,
Q_ARG(QSharedPointer<NLPacket>,
QSharedPointer<NLPacket>(nlPacket.release())));
}
} else {
listenerIsDead = true;
}
} else {
emit dataReceived(NodeType::Unassigned, nlPacket->getDataSize());
success = listener.second.invoke(listener.first,
Q_ARG(QSharedPointer<NLPacket>, QSharedPointer<NLPacket>(nlPacket.release())));
}
if (!success) {
qDebug().nospace() << "Error delivering packet " << packetType
<< " (" << qPrintable(nameForPacketType(packetType)) << ") to listener "
<< listener.first << "::" << qPrintable(listener.second.methodSignature());
}
} else {
listenerIsDead = true;
}
if (listenerIsDead) {
qDebug().nospace() << "Listener for packet" << nlPacket->getType()
<< " (" << qPrintable(nameForPacketType(nlPacket->getType())) << ")"
<< " has been destroyed. Removing from listener map.";
it = _packetListenerMap.erase(it);
// if it exists, remove the listener from _directlyConnectedObjects
_directConnectSetMutex.lock();
_directlyConnectedObjects.remove(listener.first);
_directConnectSetMutex.unlock();
}
} else {
qWarning() << "No listener found for packet type " << nameForPacketType(nlPacket->getType());
// insert a dummy listener so we don't print this again
_packetListenerMap.insert(packet->getType(), { nullptr, QMetaMethod() });
}
_packetListenerLock.unlock();
} }

View file

@ -44,21 +44,17 @@ public:
bool registerListenerForTypes(const QSet<PacketType::Value>& types, QObject* listener, const char* slot); bool registerListenerForTypes(const QSet<PacketType::Value>& types, QObject* listener, const char* slot);
bool registerListener(PacketType::Value type, QObject* listener, const char* slot); bool registerListener(PacketType::Value type, QObject* listener, const char* slot);
void unregisterListener(QObject* listener); void unregisterListener(QObject* listener);
public slots: void handleVerifiedPacket(std::unique_ptr<udt::Packet> packet);
void processDatagrams();
signals: signals:
void dataReceived(quint8 channelType, int bytes); void dataReceived(quint8 channelType, int bytes);
void packetVersionMismatch(PacketType::Value type);
private: private:
// these are brutal hacks for now - ideally GenericThread / ReceivedPacketProcessor // these are brutal hacks for now - ideally GenericThread / ReceivedPacketProcessor
// should be changed to have a true event loop and be able to handle our QMetaMethod::invoke // should be changed to have a true event loop and be able to handle our QMetaMethod::invoke
void registerDirectListenerForTypes(const QSet<PacketType::Value>& types, QObject* listener, const char* slot); void registerDirectListenerForTypes(const QSet<PacketType::Value>& types, QObject* listener, const char* slot);
void registerDirectListener(PacketType::Value type, QObject* listener, const char* slot); void registerDirectListener(PacketType::Value type, QObject* listener, const char* slot);
bool packetVersionMatch(const NLPacket& packet);
QMetaMethod matchingMethodForListener(PacketType::Value type, QObject* object, const char* slot) const; QMetaMethod matchingMethodForListener(PacketType::Value type, QObject* object, const char* slot) const;
void registerVerifiedListener(PacketType::Value type, QObject* listener, const QMetaMethod& slot); void registerVerifiedListener(PacketType::Value type, QObject* listener, const QMetaMethod& slot);

View file

@ -11,6 +11,8 @@
#include "Packet.h" #include "Packet.h"
using namespace udt;
const qint64 Packet::PACKET_WRITE_ERROR = -1; const qint64 Packet::PACKET_WRITE_ERROR = -1;
qint64 Packet::localHeaderSize(PacketType::Value type) { qint64 Packet::localHeaderSize(PacketType::Value type) {
@ -98,16 +100,11 @@ Packet::Packet(const Packet& other) :
QIODevice() QIODevice()
{ {
*this = other; *this = other;
if (other.isOpen()) {
this->open(other.openMode());
}
this->seek(other.pos());
} }
Packet& Packet::operator=(const Packet& other) { Packet& Packet::operator=(const Packet& other) {
_type = other._type; _type = other._type;
_version = other._version;
_packetSize = other._packetSize; _packetSize = other._packetSize;
_packet = std::unique_ptr<char>(new char[_packetSize]); _packet = std::unique_ptr<char>(new char[_packetSize]);
@ -117,6 +114,12 @@ Packet& Packet::operator=(const Packet& other) {
_payloadCapacity = other._payloadCapacity; _payloadCapacity = other._payloadCapacity;
_payloadSize = other._payloadSize; _payloadSize = other._payloadSize;
if (other.isOpen() && !isOpen()) {
open(other.openMode());
}
seek(other.pos());
return *this; return *this;
} }
@ -127,6 +130,7 @@ Packet::Packet(Packet&& other) {
Packet& Packet::operator=(Packet&& other) { Packet& Packet::operator=(Packet&& other) {
_type = other._type; _type = other._type;
_version = other._version;
_packetSize = other._packetSize; _packetSize = other._packetSize;
_packet = std::move(other._packet); _packet = std::move(other._packet);
@ -135,6 +139,14 @@ Packet& Packet::operator=(Packet&& other) {
_payloadCapacity = other._payloadCapacity; _payloadCapacity = other._payloadCapacity;
_payloadSize = other._payloadSize; _payloadSize = other._payloadSize;
_senderSockAddr = std::move(other._senderSockAddr);
if (other.isOpen() && !isOpen()) {
open(other.openMode());
}
seek(other.pos());
return *this; return *this;
} }

View file

@ -1,6 +1,6 @@
// //
// Packet.h // Packet.h
// libraries/networking/src // libraries/networking/src/udt
// //
// Created by Clement on 7/2/15. // Created by Clement on 7/2/15.
// Copyright 2015 High Fidelity, Inc. // Copyright 2015 High Fidelity, Inc.
@ -19,6 +19,8 @@
#include "../HifiSockAddr.h" #include "../HifiSockAddr.h"
#include "PacketHeaders.h" #include "PacketHeaders.h"
namespace udt {
class Packet : public QIODevice { class Packet : public QIODevice {
Q_OBJECT Q_OBJECT
public: public:
@ -131,5 +133,7 @@ template<typename T> qint64 Packet::writePrimitive(const T& data) {
static_assert(!std::is_pointer<T>::value, "T must not be a pointer"); static_assert(!std::is_pointer<T>::value, "T must not be a pointer");
return QIODevice::write(reinterpret_cast<const char*>(&data), sizeof(T)); return QIODevice::write(reinterpret_cast<const char*>(&data), sizeof(T));
} }
} // namespace udt
#endif // hifi_Packet_h #endif // hifi_Packet_h

View file

@ -15,6 +15,8 @@
#include "Packet.h" #include "Packet.h"
using namespace udt;
PacketList::PacketList(PacketType::Value packetType, QByteArray extendedHeader) : PacketList::PacketList(PacketType::Value packetType, QByteArray extendedHeader) :
_packetType(packetType), _packetType(packetType),
_extendedHeader(extendedHeader) _extendedHeader(extendedHeader)

View file

@ -18,6 +18,10 @@
#include "PacketHeaders.h" #include "PacketHeaders.h"
class LimitedNodeList;
namespace udt {
class Packet; class Packet;
class PacketList : public QIODevice { class PacketList : public QIODevice {
@ -42,7 +46,7 @@ protected:
virtual qint64 readData(char* data, qint64 maxSize) { return 0; } virtual qint64 readData(char* data, qint64 maxSize) { return 0; }
private: private:
friend class LimitedNodeList; friend class ::LimitedNodeList;
PacketList(const PacketList& other) = delete; PacketList(const PacketList& other) = delete;
PacketList& operator=(const PacketList& other) = delete; PacketList& operator=(const PacketList& other) = delete;
@ -82,5 +86,7 @@ template<typename T> std::unique_ptr<T> PacketList::takeFront() {
_packets.pop_front(); _packets.pop_front();
return std::unique_ptr<T>(dynamic_cast<T*>(packet.release())); return std::unique_ptr<T>(dynamic_cast<T*>(packet.release()));
} }
}
#endif // hifi_PacketList_h #endif // hifi_PacketList_h

View file

@ -0,0 +1,99 @@
//
// Socket.cpp
// libraries/networking/src/udt
//
// Created by Stephen Birarda on 2015-07-20.
// Copyright 2015 High Fidelity, Inc.
//
// Distributed under the Apache License, Version 2.0.
// See the accompanying file LICENSE or http://www.apache.org/licenses/LICENSE-2.0.html
//
#include "Socket.h"
#include "../NetworkLogging.h"
using namespace udt;
Socket::Socket(QObject* parent) :
QObject(parent)
{
connect(&_udpSocket, &QUdpSocket::readyRead, this, &Socket::readPendingDatagrams);
}
void Socket::rebind() {
quint16 oldPort = _udpSocket.localPort();
_udpSocket.close();
_udpSocket.bind(QHostAddress::AnyIPv4, oldPort);
}
void Socket::setBufferSizes(int numBytes) {
for (int i = 0; i < 2; i++) {
QAbstractSocket::SocketOption bufferOpt;
QString bufferTypeString;
if (i == 0) {
bufferOpt = QAbstractSocket::SendBufferSizeSocketOption;
bufferTypeString = "send";
} else {
bufferOpt = QAbstractSocket::ReceiveBufferSizeSocketOption;
bufferTypeString = "receive";
}
int oldBufferSize = _udpSocket.socketOption(bufferOpt).toInt();
if (oldBufferSize < numBytes) {
int newBufferSize = _udpSocket.socketOption(bufferOpt).toInt();
qCDebug(networking) << "Changed socket" << bufferTypeString << "buffer size from" << oldBufferSize << "to"
<< newBufferSize << "bytes";
} else {
// don't make the buffer smaller
qCDebug(networking) << "Did not change socket" << bufferTypeString << "buffer size from" << oldBufferSize
<< "since it is larger than desired size of" << numBytes;
}
}
}
qint64 Socket::writeUnreliablePacket(const Packet& packet, const HifiSockAddr& sockAddr) {
return writeDatagram(packet.getData(), packet.getDataSize(), sockAddr);
}
qint64 Socket::writeDatagram(const QByteArray& datagram, const HifiSockAddr& sockAddr) {
qint64 bytesWritten = _udpSocket.writeDatagram(datagram, sockAddr.getAddress(), sockAddr.getPort());
if (bytesWritten < 0) {
qCDebug(networking) << "ERROR in writeDatagram:" << _udpSocket.error() << "-" << _udpSocket.errorString();
}
return bytesWritten;
}
void Socket::readPendingDatagrams() {
while (_udpSocket.hasPendingDatagrams()) {
// setup a HifiSockAddr to read into
HifiSockAddr senderSockAddr;
// setup a buffer to read the packet into
int packetSizeWithHeader = _udpSocket.pendingDatagramSize();
std::unique_ptr<char> buffer = std::unique_ptr<char>(new char[packetSizeWithHeader]);
// pull the datagram
_udpSocket.readDatagram(buffer.get(), packetSizeWithHeader,
senderSockAddr.getAddressPointer(), senderSockAddr.getPortPointer());
// setup a Packet from the data we just read
auto packet = Packet::fromReceivedPacket(std::move(buffer), packetSizeWithHeader, senderSockAddr);
// call our verification operator to see if this packet is verified
if (_unfilteredSockAddrs.contains(senderSockAddr) || !_packetFilterOperator || _packetFilterOperator(*packet)) {
if (_packetHandler) {
// call the verified packet callback to let it handle this packet
return _packetHandler(std::move(packet));
}
}
}
}

View file

@ -0,0 +1,66 @@
//
// Socket.h
// libraries/networking/src/udt
//
// Created by Stephen Birarda on 2015-07-20.
// Copyright 2015 High Fidelity, Inc.
//
// Distributed under the Apache License, Version 2.0.
// See the accompanying file LICENSE or http://www.apache.org/licenses/LICENSE-2.0.html
//
#pragma once
#ifndef hifi_Socket_h
#define hifi_Socket_h
#include <functional>
#include <QtCore/QObject>
#include <QtNetwork/QUdpSocket>
#include "../HifiSockAddr.h"
#include "Packet.h"
namespace udt {
using PacketFilterOperator = std::function<bool(const Packet&)>;
using PacketHandler = std::function<void(std::unique_ptr<Packet>)>;
class Socket : public QObject {
Q_OBJECT
public:
Socket(QObject* object = 0);
quint16 localPort() const { return _udpSocket.localPort(); }
qint64 writeUnreliablePacket(const Packet& packet, const HifiSockAddr& sockAddr);
qint64 writeDatagram(const char* data, qint64 size, const HifiSockAddr& sockAddr)
{ return writeDatagram(QByteArray::fromRawData(data, size), sockAddr); }
qint64 writeDatagram(const QByteArray& datagram, const HifiSockAddr& sockAddr);
void bind(const QHostAddress& address, quint16 port = 0) { _udpSocket.bind(address, port); }
void rebind();
void setPacketFilterOperator(PacketFilterOperator filterOperator) { _packetFilterOperator = filterOperator; }
void setPacketHandler(PacketHandler handler) { _packetHandler = handler; }
void setBufferSizes(int numBytes);
void addUnfilteredSockAddr(const HifiSockAddr& senderSockAddr) { _unfilteredSockAddrs.insert(senderSockAddr); }
private slots:
void readPendingDatagrams();
private:
QUdpSocket _udpSocket { this };
PacketFilterOperator _packetFilterOperator;
PacketHandler _packetHandler;
QSet<HifiSockAddr> _unfilteredSockAddrs;
};
} // namespace udt
#endif // hifi_Socket_h