mirror of
https://github.com/overte-org/overte.git
synced 2025-08-09 19:29:47 +02:00
Merge branch 'atp' of github.com:birarda/hifi into receive_packets
Conflicts: libraries/networking/src/PacketReceiver.cpp
This commit is contained in:
commit
bd345cdccd
16 changed files with 98 additions and 199 deletions
|
@ -62,6 +62,17 @@ AssignmentClient::AssignmentClient(Assignment::Type requestAssignmentType, QStri
|
||||||
DependencyManager::registerInheritance<EntityActionFactoryInterface, AssignmentActionFactory>();
|
DependencyManager::registerInheritance<EntityActionFactoryInterface, AssignmentActionFactory>();
|
||||||
auto actionFactory = DependencyManager::set<AssignmentActionFactory>();
|
auto actionFactory = DependencyManager::set<AssignmentActionFactory>();
|
||||||
|
|
||||||
|
// setup a thread for the NodeList and its PacketReceiver
|
||||||
|
QThread* nodeThread = new QThread(this);
|
||||||
|
nodeThread->setObjectName("NodeList Thread");
|
||||||
|
nodeThread->start();
|
||||||
|
|
||||||
|
// make sure the node thread is given highest priority
|
||||||
|
nodeThread->setPriority(QThread::TimeCriticalPriority);
|
||||||
|
|
||||||
|
// put the NodeList on the node thread
|
||||||
|
nodeList->moveToThread(nodeThread);
|
||||||
|
|
||||||
// make up a uuid for this child so the parent can tell us apart. This id will be changed
|
// make up a uuid for this child so the parent can tell us apart. This id will be changed
|
||||||
// when the domain server hands over an assignment.
|
// when the domain server hands over an assignment.
|
||||||
QUuid nodeUUID = QUuid::createUuid();
|
QUuid nodeUUID = QUuid::createUuid();
|
||||||
|
@ -124,7 +135,6 @@ AssignmentClient::AssignmentClient(Assignment::Type requestAssignmentType, QStri
|
||||||
packetReceiver.registerListener(PacketType::CreateAssignment, this, "handleCreateAssignmentPacket");
|
packetReceiver.registerListener(PacketType::CreateAssignment, this, "handleCreateAssignmentPacket");
|
||||||
packetReceiver.registerListener(PacketType::StopNode, this, "handleStopNodePacket");
|
packetReceiver.registerListener(PacketType::StopNode, this, "handleStopNodePacket");
|
||||||
}
|
}
|
||||||
|
|
||||||
void AssignmentClient::stopAssignmentClient() {
|
void AssignmentClient::stopAssignmentClient() {
|
||||||
qDebug() << "Forced stop of assignment-client.";
|
qDebug() << "Forced stop of assignment-client.";
|
||||||
|
|
||||||
|
@ -150,6 +160,16 @@ void AssignmentClient::stopAssignmentClient() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
AssignmentClient::~AssignmentClient() {
|
||||||
|
QThread* nodeThread = DependencyManager::get<NodeList>()->thread();
|
||||||
|
|
||||||
|
// remove the NodeList from the DependencyManager
|
||||||
|
DependencyManager::destroy<NodeList>();
|
||||||
|
|
||||||
|
// ask the node thread to quit and wait until it is done
|
||||||
|
nodeThread->quit();
|
||||||
|
nodeThread->wait();
|
||||||
|
}
|
||||||
|
|
||||||
void AssignmentClient::aboutToQuit() {
|
void AssignmentClient::aboutToQuit() {
|
||||||
stopAssignmentClient();
|
stopAssignmentClient();
|
||||||
|
@ -251,9 +271,6 @@ void AssignmentClient::handleCreateAssignmentPacket(QSharedPointer<NLPacket> pac
|
||||||
|
|
||||||
_currentAssignment->moveToThread(workerThread);
|
_currentAssignment->moveToThread(workerThread);
|
||||||
|
|
||||||
// move the NodeList to the thread used for the _current assignment
|
|
||||||
nodeList->moveToThread(workerThread);
|
|
||||||
|
|
||||||
// Starts an event loop, and emits workerThread->started()
|
// Starts an event loop, and emits workerThread->started()
|
||||||
workerThread->start();
|
workerThread->start();
|
||||||
} else {
|
} else {
|
||||||
|
@ -313,6 +330,9 @@ void AssignmentClient::assignmentCompleted() {
|
||||||
|
|
||||||
auto nodeList = DependencyManager::get<NodeList>();
|
auto nodeList = DependencyManager::get<NodeList>();
|
||||||
|
|
||||||
|
// tell the packet receiver to stop dropping packets
|
||||||
|
nodeList->getPacketReceiver().setShouldDropPackets(false);
|
||||||
|
|
||||||
// reset our NodeList by switching back to unassigned and clearing the list
|
// reset our NodeList by switching back to unassigned and clearing the list
|
||||||
nodeList->setOwnerType(NodeType::Unassigned);
|
nodeList->setOwnerType(NodeType::Unassigned);
|
||||||
nodeList->reset();
|
nodeList->reset();
|
||||||
|
|
|
@ -24,10 +24,10 @@ class QSharedMemory;
|
||||||
class AssignmentClient : public QObject, public PacketListener {
|
class AssignmentClient : public QObject, public PacketListener {
|
||||||
Q_OBJECT
|
Q_OBJECT
|
||||||
public:
|
public:
|
||||||
|
|
||||||
AssignmentClient(Assignment::Type requestAssignmentType, QString assignmentPool,
|
AssignmentClient(Assignment::Type requestAssignmentType, QString assignmentPool,
|
||||||
QUuid walletUUID, QString assignmentServerHostname, quint16 assignmentServerPort,
|
QUuid walletUUID, QString assignmentServerHostname, quint16 assignmentServerPort,
|
||||||
quint16 assignmentMonitorPort);
|
quint16 assignmentMonitorPort);
|
||||||
|
~AssignmentClient();
|
||||||
private slots:
|
private slots:
|
||||||
void sendAssignmentRequest();
|
void sendAssignmentRequest();
|
||||||
void assignmentCompleted();
|
void assignmentCompleted();
|
||||||
|
|
|
@ -52,7 +52,6 @@
|
||||||
|
|
||||||
#include "AudioRingBuffer.h"
|
#include "AudioRingBuffer.h"
|
||||||
#include "AudioMixerClientData.h"
|
#include "AudioMixerClientData.h"
|
||||||
#include "AudioMixerDatagramProcessor.h"
|
|
||||||
#include "AvatarAudioStream.h"
|
#include "AvatarAudioStream.h"
|
||||||
#include "InjectedAudioStream.h"
|
#include "InjectedAudioStream.h"
|
||||||
|
|
||||||
|
@ -656,10 +655,6 @@ void AudioMixer::run() {
|
||||||
// we do not want this event loop to be the handler for UDP datagrams, so disconnect
|
// we do not want this event loop to be the handler for UDP datagrams, so disconnect
|
||||||
disconnect(&nodeList->getNodeSocket(), 0, this, 0);
|
disconnect(&nodeList->getNodeSocket(), 0, this, 0);
|
||||||
|
|
||||||
// setup a QThread with us as parent that will house the AudioMixerDatagramProcessor
|
|
||||||
_datagramProcessingThread = new QThread(this);
|
|
||||||
_datagramProcessingThread->setObjectName("Datagram Processor Thread");
|
|
||||||
|
|
||||||
nodeList->addNodeTypeToInterestSet(NodeType::Agent);
|
nodeList->addNodeTypeToInterestSet(NodeType::Agent);
|
||||||
|
|
||||||
nodeList->linkedDataCreateCallback = [](Node* node) {
|
nodeList->linkedDataCreateCallback = [](Node* node) {
|
||||||
|
|
|
@ -1,47 +0,0 @@
|
||||||
//
|
|
||||||
// AudioMixerDatagramProcessor.cpp
|
|
||||||
// assignment-client/src
|
|
||||||
//
|
|
||||||
// Created by Stephen Birarda on 2014-08-14.
|
|
||||||
// Copyright 2014 High Fidelity, Inc.
|
|
||||||
//
|
|
||||||
// Distributed under the Apache License, Version 2.0.
|
|
||||||
// See the accompanying file LICENSE or http://www.apache.org/licenses/LICENSE-2.0.html
|
|
||||||
//
|
|
||||||
|
|
||||||
#include <QDebug>
|
|
||||||
|
|
||||||
#include <HifiSockAddr.h>
|
|
||||||
#include <NodeList.h>
|
|
||||||
|
|
||||||
#include "AudioMixerDatagramProcessor.h"
|
|
||||||
|
|
||||||
AudioMixerDatagramProcessor::AudioMixerDatagramProcessor(QUdpSocket& nodeSocket, QThread* previousNodeSocketThread) :
|
|
||||||
_nodeSocket(nodeSocket),
|
|
||||||
_previousNodeSocketThread(previousNodeSocketThread)
|
|
||||||
{
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
AudioMixerDatagramProcessor::~AudioMixerDatagramProcessor() {
|
|
||||||
// return the node socket to its previous thread
|
|
||||||
_nodeSocket.moveToThread(_previousNodeSocketThread);
|
|
||||||
}
|
|
||||||
|
|
||||||
void AudioMixerDatagramProcessor::readPendingDatagrams() {
|
|
||||||
|
|
||||||
HifiSockAddr senderSockAddr;
|
|
||||||
static QByteArray incomingPacket;
|
|
||||||
|
|
||||||
// read everything that is available
|
|
||||||
while (_nodeSocket.hasPendingDatagrams()) {
|
|
||||||
incomingPacket.resize(_nodeSocket.pendingDatagramSize());
|
|
||||||
|
|
||||||
// just get this packet off the stack
|
|
||||||
_nodeSocket.readDatagram(incomingPacket.data(), incomingPacket.size(),
|
|
||||||
senderSockAddr.getAddressPointer(), senderSockAddr.getPortPointer());
|
|
||||||
|
|
||||||
// emit the signal to tell AudioMixer it needs to process a packet
|
|
||||||
emit packetRequiresProcessing(incomingPacket, senderSockAddr);
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,32 +0,0 @@
|
||||||
//
|
|
||||||
// AudioMixerDatagramProcessor.h
|
|
||||||
// assignment-client/src
|
|
||||||
//
|
|
||||||
// Created by Stephen Birarda on 2014-08-14.
|
|
||||||
// Copyright 2014 High Fidelity, Inc.
|
|
||||||
//
|
|
||||||
// Distributed under the Apache License, Version 2.0.
|
|
||||||
// See the accompanying file LICENSE or http://www.apache.org/licenses/LICENSE-2.0.html
|
|
||||||
//
|
|
||||||
|
|
||||||
#ifndef hifi_AudioMixerDatagramProcessor_h
|
|
||||||
#define hifi_AudioMixerDatagramProcessor_h
|
|
||||||
|
|
||||||
#include <qobject.h>
|
|
||||||
#include <qudpsocket.h>
|
|
||||||
|
|
||||||
class AudioMixerDatagramProcessor : public QObject {
|
|
||||||
Q_OBJECT
|
|
||||||
public:
|
|
||||||
AudioMixerDatagramProcessor(QUdpSocket& nodeSocket, QThread* previousNodeSocketThread);
|
|
||||||
~AudioMixerDatagramProcessor();
|
|
||||||
public slots:
|
|
||||||
void readPendingDatagrams();
|
|
||||||
signals:
|
|
||||||
void packetRequiresProcessing(const QByteArray& receivedPacket, const HifiSockAddr& senderSockAddr);
|
|
||||||
private:
|
|
||||||
QUdpSocket& _nodeSocket;
|
|
||||||
QThread* _previousNodeSocketThread;
|
|
||||||
};
|
|
||||||
|
|
||||||
#endif // hifi_AudioMixerDatagramProcessor_h
|
|
|
@ -1054,7 +1054,7 @@ void OctreeServer::run() {
|
||||||
beforeRun(); // after payload has been processed
|
beforeRun(); // after payload has been processed
|
||||||
|
|
||||||
connect(nodeList.data(), SIGNAL(nodeAdded(SharedNodePointer)), SLOT(nodeAdded(SharedNodePointer)));
|
connect(nodeList.data(), SIGNAL(nodeAdded(SharedNodePointer)), SLOT(nodeAdded(SharedNodePointer)));
|
||||||
connect(nodeList.data(), SIGNAL(nodeKilled(SharedNodePointer)),SLOT(nodeKilled(SharedNodePointer)));
|
connect(nodeList.data(), SIGNAL(nodeKilled(SharedNodePointer)), SLOT(nodeKilled(SharedNodePointer)));
|
||||||
|
|
||||||
|
|
||||||
// we need to ask the DS about agents so we can ping/reply with them
|
// we need to ask the DS about agents so we can ping/reply with them
|
||||||
|
|
|
@ -145,8 +145,6 @@ protected:
|
||||||
QString getConfiguration();
|
QString getConfiguration();
|
||||||
QString getStatusLink();
|
QString getStatusLink();
|
||||||
|
|
||||||
void setupDatagramProcessingThread();
|
|
||||||
|
|
||||||
int _argc;
|
int _argc;
|
||||||
const char** _argv;
|
const char** _argv;
|
||||||
char** _parsedArgV;
|
char** _parsedArgV;
|
||||||
|
|
|
@ -380,17 +380,12 @@ Application::Application(int& argc, char** argv, QElapsedTimer &startup_time) :
|
||||||
|
|
||||||
// start the nodeThread so its event loop is running
|
// start the nodeThread so its event loop is running
|
||||||
QThread* nodeThread = new QThread(this);
|
QThread* nodeThread = new QThread(this);
|
||||||
nodeThread->setObjectName("Datagram Processor Thread");
|
nodeThread->setObjectName("NodeList Thread");
|
||||||
nodeThread->start();
|
nodeThread->start();
|
||||||
|
|
||||||
// make sure the node thread is given highest priority
|
// make sure the node thread is given highest priority
|
||||||
nodeThread->setPriority(QThread::TimeCriticalPriority);
|
nodeThread->setPriority(QThread::TimeCriticalPriority);
|
||||||
|
|
||||||
// have the NodeList use deleteLater from DM customDeleter
|
|
||||||
nodeList->setCustomDeleter([](Dependency* dependency) {
|
|
||||||
static_cast<NodeList*>(dependency)->deleteLater();
|
|
||||||
});
|
|
||||||
|
|
||||||
// setup a timer for domain-server check ins
|
// setup a timer for domain-server check ins
|
||||||
QTimer* domainCheckInTimer = new QTimer(nodeList.data());
|
QTimer* domainCheckInTimer = new QTimer(nodeList.data());
|
||||||
connect(domainCheckInTimer, &QTimer::timeout, nodeList.data(), &NodeList::sendDomainServerCheckIn);
|
connect(domainCheckInTimer, &QTimer::timeout, nodeList.data(), &NodeList::sendDomainServerCheckIn);
|
||||||
|
@ -668,12 +663,11 @@ void Application::aboutToQuit() {
|
||||||
|
|
||||||
void Application::cleanupBeforeQuit() {
|
void Application::cleanupBeforeQuit() {
|
||||||
|
|
||||||
// stop handling packets we've asked to handle
|
|
||||||
DependencyManager::get<LimitedNodeList>()->getPacketReceiver().unregisterListener(this);
|
|
||||||
|
|
||||||
_entities.clear(); // this will allow entity scripts to properly shutdown
|
_entities.clear(); // this will allow entity scripts to properly shutdown
|
||||||
|
|
||||||
//_datagramProcessor->shutdown(); // tell the datagram processor we're shutting down, so it can short circuit
|
// tell the packet receiver we're shutting down, so it can drop packets
|
||||||
|
DependencyManager::get<NodeList>()->getPacketReceiver().setShouldDropPackets(true);
|
||||||
|
|
||||||
_entities.shutdown(); // tell the entities system we're shutting down, so it will stop running scripts
|
_entities.shutdown(); // tell the entities system we're shutting down, so it will stop running scripts
|
||||||
ScriptEngine::stopAllScripts(this); // stop all currently running global scripts
|
ScriptEngine::stopAllScripts(this); // stop all currently running global scripts
|
||||||
|
|
||||||
|
@ -748,6 +742,8 @@ Application::~Application() {
|
||||||
DependencyManager::destroy<SoundCache>();
|
DependencyManager::destroy<SoundCache>();
|
||||||
|
|
||||||
QThread* nodeThread = DependencyManager::get<NodeList>()->thread();
|
QThread* nodeThread = DependencyManager::get<NodeList>()->thread();
|
||||||
|
|
||||||
|
// remove the NodeList from the DependencyManager
|
||||||
DependencyManager::destroy<NodeList>();
|
DependencyManager::destroy<NodeList>();
|
||||||
|
|
||||||
// ask the node thread to quit and wait until it is done
|
// ask the node thread to quit and wait until it is done
|
||||||
|
@ -1801,7 +1797,6 @@ void Application::checkFPS() {
|
||||||
|
|
||||||
_fps = (float)_frameCount / diffTime;
|
_fps = (float)_frameCount / diffTime;
|
||||||
_frameCount = 0;
|
_frameCount = 0;
|
||||||
//_datagramProcessor->resetCounters();
|
|
||||||
_timerStart.start();
|
_timerStart.start();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -121,13 +121,6 @@ public:
|
||||||
|
|
||||||
PacketReceiver& getPacketReceiver() { return _packetReceiver; }
|
PacketReceiver& getPacketReceiver() { return _packetReceiver; }
|
||||||
|
|
||||||
// QByteArray byteArrayWithPopulatedHeader(PacketType::Value packetType)
|
|
||||||
// { return byteArrayWithUUIDPopulatedHeader(packetType, _sessionUUID); }
|
|
||||||
// int populatePacketHeader(QByteArray& packet, PacketType::Value packetType)
|
|
||||||
// { return populatePacketHeaderWithUUID(packet, packetType, _sessionUUID); }
|
|
||||||
// int populatePacketHeader(char* packet, PacketType::Value packetType)
|
|
||||||
// { return populatePacketHeaderWithUUID(packet, packetType, _sessionUUID); }
|
|
||||||
|
|
||||||
qint64 sendUnreliablePacket(const NLPacket& packet, const SharedNodePointer& destinationNode) { assert(false); return 0; }
|
qint64 sendUnreliablePacket(const NLPacket& packet, const SharedNodePointer& destinationNode) { assert(false); return 0; }
|
||||||
qint64 sendUnreliablePacket(const NLPacket& packet, const HifiSockAddr& sockAddr) { assert(false); return 0; }
|
qint64 sendUnreliablePacket(const NLPacket& packet, const HifiSockAddr& sockAddr) { assert(false); return 0; }
|
||||||
|
|
||||||
|
|
|
@ -45,6 +45,11 @@ NodeList::NodeList(char newOwnerType, unsigned short socketListenPort, unsigned
|
||||||
qRegisterMetaType<SharedNodePointer>();
|
qRegisterMetaType<SharedNodePointer>();
|
||||||
firstCall = false;
|
firstCall = false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
setCustomDeleter([](Dependency* dependency){
|
||||||
|
static_cast<NodeList*>(dependency)->deleteLater();
|
||||||
|
});
|
||||||
|
|
||||||
auto addressManager = DependencyManager::get<AddressManager>();
|
auto addressManager = DependencyManager::get<AddressManager>();
|
||||||
|
|
||||||
// handle domain change signals from AddressManager
|
// handle domain change signals from AddressManager
|
||||||
|
|
|
@ -112,8 +112,6 @@ private:
|
||||||
DomainHandler _domainHandler;
|
DomainHandler _domainHandler;
|
||||||
int _numNoReplyDomainCheckIns;
|
int _numNoReplyDomainCheckIns;
|
||||||
HifiSockAddr _assignmentServerSocket;
|
HifiSockAddr _assignmentServerSocket;
|
||||||
|
|
||||||
friend class Application;
|
|
||||||
};
|
};
|
||||||
|
|
||||||
#endif // hifi_NodeList_h
|
#endif // hifi_NodeList_h
|
||||||
|
|
|
@ -59,7 +59,7 @@ void PacketReceiver::registerListener(PacketType::Value type, PacketListener* li
|
||||||
Q_ASSERT(object);
|
Q_ASSERT(object);
|
||||||
|
|
||||||
QMetaMethod matchingMethod = matchingMethodForListener(type, object, slot);
|
QMetaMethod matchingMethod = matchingMethodForListener(type, object, slot);
|
||||||
|
|
||||||
if (matchingMethod.isValid()) {
|
if (matchingMethod.isValid()) {
|
||||||
registerVerifiedListener(type, object, matchingMethod);
|
registerVerifiedListener(type, object, matchingMethod);
|
||||||
}
|
}
|
||||||
|
@ -69,7 +69,7 @@ QMetaMethod PacketReceiver::matchingMethodForListener(PacketType::Value type, QO
|
||||||
Q_ASSERT(object);
|
Q_ASSERT(object);
|
||||||
|
|
||||||
// normalize the slot with the expected parameters
|
// normalize the slot with the expected parameters
|
||||||
|
|
||||||
const QString NON_SOURCED_PACKET_LISTENER_PARAMETERS = "QSharedPointer<NLPacket>";
|
const QString NON_SOURCED_PACKET_LISTENER_PARAMETERS = "QSharedPointer<NLPacket>";
|
||||||
|
|
||||||
QSet<QString> possibleSignatures { QString("%1(%2)").arg(slot).arg(NON_SOURCED_PACKET_LISTENER_PARAMETERS) };
|
QSet<QString> possibleSignatures { QString("%1(%2)").arg(slot).arg(NON_SOURCED_PACKET_LISTENER_PARAMETERS) };
|
||||||
|
@ -79,7 +79,7 @@ QMetaMethod PacketReceiver::matchingMethodForListener(PacketType::Value type, QO
|
||||||
|
|
||||||
// a sourced packet must take the shared pointer to the packet but optionally could include
|
// a sourced packet must take the shared pointer to the packet but optionally could include
|
||||||
// a shared pointer to the node
|
// a shared pointer to the node
|
||||||
|
|
||||||
possibleSignatures << QString("%1(%2)").arg(slot).arg(SOURCED_PACKET_LISTENER_PARAMETERS);
|
possibleSignatures << QString("%1(%2)").arg(slot).arg(SOURCED_PACKET_LISTENER_PARAMETERS);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -94,7 +94,7 @@ QMetaMethod PacketReceiver::matchingMethodForListener(PacketType::Value type, QO
|
||||||
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (methodIndex < 0) {
|
if (methodIndex < 0) {
|
||||||
qDebug() << "PacketReceiver::registerListener expected a method with one of the following signatures:"
|
qDebug() << "PacketReceiver::registerListener expected a method with one of the following signatures:"
|
||||||
<< possibleSignatures << "- but such a method was not found.";
|
<< possibleSignatures << "- but such a method was not found.";
|
||||||
|
@ -122,7 +122,7 @@ void PacketReceiver::registerVerifiedListener(PacketType::Value type, QObject* o
|
||||||
|
|
||||||
// add the mapping
|
// add the mapping
|
||||||
_packetListenerMap[type] = ObjectMethodPair(object, slot);
|
_packetListenerMap[type] = ObjectMethodPair(object, slot);
|
||||||
|
|
||||||
_packetListenerLock.unlock();
|
_packetListenerLock.unlock();
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -131,7 +131,7 @@ void PacketReceiver::unregisterListener(PacketListener* listener) {
|
||||||
_packetListenerLock.lock();
|
_packetListenerLock.lock();
|
||||||
|
|
||||||
auto it = _packetListenerMap.begin();
|
auto it = _packetListenerMap.begin();
|
||||||
|
|
||||||
while (it != _packetListenerMap.end()) {
|
while (it != _packetListenerMap.end()) {
|
||||||
if (it.value().first == dynamic_cast<QObject*>(listener)) {
|
if (it.value().first == dynamic_cast<QObject*>(listener)) {
|
||||||
// this listener matches - erase it
|
// this listener matches - erase it
|
||||||
|
@ -152,7 +152,7 @@ bool PacketReceiver::packetVersionMatch(const NLPacket& packet) {
|
||||||
static QMultiMap<QUuid, PacketType::Value> versionDebugSuppressMap;
|
static QMultiMap<QUuid, PacketType::Value> versionDebugSuppressMap;
|
||||||
|
|
||||||
const QUuid& senderID = packet.getSourceID();
|
const QUuid& senderID = packet.getSourceID();
|
||||||
|
|
||||||
if (!versionDebugSuppressMap.contains(senderID, packet.getType())) {
|
if (!versionDebugSuppressMap.contains(senderID, packet.getType())) {
|
||||||
|
|
||||||
qCDebug(networking) << "Packet version mismatch on" << packet.getType() << "- Sender"
|
qCDebug(networking) << "Packet version mismatch on" << packet.getType() << "- Sender"
|
||||||
|
@ -174,17 +174,18 @@ void PacketReceiver::processDatagrams() {
|
||||||
//PerformanceWarning warn(Menu::getInstance()->isOptionChecked(MenuOption::PipelineWarnings),
|
//PerformanceWarning warn(Menu::getInstance()->isOptionChecked(MenuOption::PipelineWarnings),
|
||||||
//"PacketReceiver::processDatagrams()");
|
//"PacketReceiver::processDatagrams()");
|
||||||
|
|
||||||
if (_isShuttingDown) {
|
|
||||||
return; // bail early... we're shutting down.
|
|
||||||
}
|
|
||||||
|
|
||||||
auto nodeList = DependencyManager::get<NodeList>();
|
auto nodeList = DependencyManager::get<NodeList>();
|
||||||
|
|
||||||
while (nodeList->getNodeSocket().hasPendingDatagrams()) {
|
while (nodeList->getNodeSocket().hasPendingDatagrams()) {
|
||||||
// setup a buffer to read the packet into
|
// setup a buffer to read the packet into
|
||||||
int packetSizeWithHeader = nodeList->getNodeSocket().pendingDatagramSize();
|
int packetSizeWithHeader = nodeList->getNodeSocket().pendingDatagramSize();
|
||||||
std::unique_ptr<char> buffer = std::unique_ptr<char>(new char[packetSizeWithHeader]);
|
std::unique_ptr<char> buffer = std::unique_ptr<char>(new char[packetSizeWithHeader]);
|
||||||
|
|
||||||
|
// if we're supposed to drop this packet then break out here
|
||||||
|
if (_shouldDropPackets) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
// setup a HifiSockAddr to read into
|
// setup a HifiSockAddr to read into
|
||||||
HifiSockAddr senderSockAddr;
|
HifiSockAddr senderSockAddr;
|
||||||
|
|
||||||
|
@ -194,15 +195,15 @@ void PacketReceiver::processDatagrams() {
|
||||||
|
|
||||||
// setup an NLPacket from the data we just read
|
// setup an NLPacket from the data we just read
|
||||||
auto packet = NLPacket::fromReceivedPacket(std::move(buffer), packetSizeWithHeader, senderSockAddr);
|
auto packet = NLPacket::fromReceivedPacket(std::move(buffer), packetSizeWithHeader, senderSockAddr);
|
||||||
|
|
||||||
_inPacketCount++;
|
_inPacketCount++;
|
||||||
_inByteCount += packetSizeWithHeader;
|
_inByteCount += packetSizeWithHeader;
|
||||||
|
|
||||||
if (packetVersionMatch(*packet)) {
|
if (packetVersionMatch(*packet)) {
|
||||||
|
|
||||||
SharedNodePointer matchingNode;
|
SharedNodePointer matchingNode;
|
||||||
if (nodeList->packetSourceAndHashMatch(*packet, matchingNode)) {
|
if (nodeList->packetSourceAndHashMatch(*packet, matchingNode)) {
|
||||||
|
|
||||||
if (matchingNode) {
|
if (matchingNode) {
|
||||||
// No matter if this packet is handled or not, we update the timestamp for the last time we heard
|
// No matter if this packet is handled or not, we update the timestamp for the last time we heard
|
||||||
// from this sending node
|
// from this sending node
|
||||||
|
@ -210,38 +211,45 @@ void PacketReceiver::processDatagrams() {
|
||||||
}
|
}
|
||||||
|
|
||||||
_packetListenerLock.lock();
|
_packetListenerLock.lock();
|
||||||
|
|
||||||
auto it = _packetListenerMap.find(packet->getType());
|
auto it = _packetListenerMap.find(packet->getType());
|
||||||
|
|
||||||
if (it != _packetListenerMap.end()) {
|
if (it != _packetListenerMap.end()) {
|
||||||
|
|
||||||
auto listener = it.value();
|
auto listener = it.value();
|
||||||
|
|
||||||
if (listener.first) {
|
if (listener.first) {
|
||||||
|
|
||||||
if (matchingNode) {
|
|
||||||
emit dataReceived(matchingNode->getType(), packet->getSizeWithHeader());
|
|
||||||
} else {
|
|
||||||
emit dataReceived(NodeType::Unassigned, packet->getSizeWithHeader());
|
|
||||||
}
|
|
||||||
|
|
||||||
// if this was a sequence numbered packet we should store the last seq number for
|
|
||||||
// a packet of this type for this node
|
|
||||||
if (SEQUENCE_NUMBERED_PACKETS.contains(packet->getType())) {
|
|
||||||
matchingNode->setLastSequenceNumberForPacketType(packet->readSequenceNumber(), packet->getType());
|
|
||||||
}
|
|
||||||
|
|
||||||
bool success = false;
|
bool success = false;
|
||||||
|
|
||||||
if (matchingNode) {
|
if (matchingNode) {
|
||||||
|
// if this was a sequence numbered packet we should store the last seq number for
|
||||||
|
// a packet of this type for this node
|
||||||
|
if (SEQUENCE_NUMBERED_PACKETS.contains(packet->getType())) {
|
||||||
|
matchingNode->setLastSequenceNumberForPacketType(packet->readSequenceNumber(), packet->getType());
|
||||||
|
}
|
||||||
|
|
||||||
|
emit dataReceived(matchingNode->getType(), packet->getSizeWithHeader());
|
||||||
|
QMetaMethod metaMethod = listener.second;
|
||||||
|
|
||||||
|
static const QByteArray SHARED_NODE_NORMALIZED = QMetaObject::normalizedType("QSharedPointer<Node>");
|
||||||
|
|
||||||
|
if (metaMethod.parameterTypes().contains(SHARED_NODE_NORMALIZED)) {
|
||||||
|
success = metaMethod.invoke(listener.first,
|
||||||
|
Q_ARG(QSharedPointer<NLPacket>, QSharedPointer<NLPacket>(packet.release())),
|
||||||
|
Q_ARG(SharedNodePointer, matchingNode));
|
||||||
|
|
||||||
|
} else {
|
||||||
|
success = metaMethod.invoke(listener.first,
|
||||||
|
Q_ARG(QSharedPointer<NLPacket>, QSharedPointer<NLPacket>(packet.release())));
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
emit dataReceived(NodeType::Unassigned, packet->getSizeWithHeader());
|
||||||
|
|
||||||
success = listener.second.invoke(listener.first,
|
success = listener.second.invoke(listener.first,
|
||||||
Q_ARG(QSharedPointer<NLPacket>, QSharedPointer<NLPacket>(packet.release())));
|
Q_ARG(QSharedPointer<NLPacket>, QSharedPointer<NLPacket>(packet.release())));
|
||||||
} else {
|
|
||||||
success = listener.second.invoke(listener.first,
|
|
||||||
Q_ARG(QSharedPointer<NLPacket>, QSharedPointer<NLPacket>(packet.release())),
|
|
||||||
Q_ARG(SharedNodePointer, matchingNode));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!success) {
|
if (!success) {
|
||||||
qDebug() << "Error delivering packet " << nameForPacketType(packet->getType()) << " to listener: "
|
qDebug() << "Error delivering packet " << nameForPacketType(packet->getType()) << " to listener: "
|
||||||
<< listener.first->objectName() << "::" << listener.second.name();
|
<< listener.first->objectName() << "::" << listener.second.name();
|
||||||
|
@ -253,13 +261,11 @@ void PacketReceiver::processDatagrams() {
|
||||||
<< "has been destroyed - removing mapping.";
|
<< "has been destroyed - removing mapping.";
|
||||||
_packetListenerMap.erase(it);
|
_packetListenerMap.erase(it);
|
||||||
}
|
}
|
||||||
|
|
||||||
_packetListenerLock.unlock();
|
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
_packetListenerLock.unlock();
|
|
||||||
qDebug() << "No listener found for packet type " << nameForPacketType(packet->getType());
|
qDebug() << "No listener found for packet type " << nameForPacketType(packet->getType());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
_packetListenerLock.unlock();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -34,11 +34,11 @@ public:
|
||||||
|
|
||||||
int getInPacketCount() const { return _inPacketCount; }
|
int getInPacketCount() const { return _inPacketCount; }
|
||||||
int getInByteCount() const { return _inByteCount; }
|
int getInByteCount() const { return _inByteCount; }
|
||||||
|
|
||||||
|
void setShouldDropPackets(bool shouldDropPackets) { _shouldDropPackets = shouldDropPackets; }
|
||||||
|
|
||||||
void resetCounters() { _inPacketCount = 0; _inByteCount = 0; }
|
void resetCounters() { _inPacketCount = 0; _inByteCount = 0; }
|
||||||
|
|
||||||
void shutdown() { _isShuttingDown = true; }
|
|
||||||
|
|
||||||
void registerListenerForTypes(const QSet<PacketType::Value>& types, PacketListener* listener, const char* slot);
|
void registerListenerForTypes(const QSet<PacketType::Value>& types, PacketListener* listener, const char* slot);
|
||||||
void registerListener(PacketType::Value type, PacketListener* listener, const char* slot);
|
void registerListener(PacketType::Value type, PacketListener* listener, const char* slot);
|
||||||
void unregisterListener(PacketListener* listener);
|
void unregisterListener(PacketListener* listener);
|
||||||
|
@ -47,8 +47,8 @@ public slots:
|
||||||
void processDatagrams();
|
void processDatagrams();
|
||||||
|
|
||||||
signals:
|
signals:
|
||||||
void dataSent(quint8 channel_type, int bytes);
|
void dataSent(quint8 channelType, int bytes);
|
||||||
void dataReceived(quint8 channel_type, int bytes);
|
void dataReceived(quint8 channelType, int bytes);
|
||||||
void packetVersionMismatch(PacketType::Value type);
|
void packetVersionMismatch(PacketType::Value type);
|
||||||
|
|
||||||
private:
|
private:
|
||||||
|
@ -63,7 +63,7 @@ private:
|
||||||
QHash<PacketType::Value, ObjectMethodPair> _packetListenerMap;
|
QHash<PacketType::Value, ObjectMethodPair> _packetListenerMap;
|
||||||
int _inPacketCount = 0;
|
int _inPacketCount = 0;
|
||||||
int _inByteCount = 0;
|
int _inByteCount = 0;
|
||||||
bool _isShuttingDown = false;
|
bool _shouldDropPackets = false;
|
||||||
};
|
};
|
||||||
|
|
||||||
#endif // hifi_PacketReceiver_h
|
#endif // hifi_PacketReceiver_h
|
||||||
|
|
|
@ -20,8 +20,8 @@
|
||||||
|
|
||||||
ThreadedAssignment::ThreadedAssignment(NLPacket& packet) :
|
ThreadedAssignment::ThreadedAssignment(NLPacket& packet) :
|
||||||
Assignment(packet),
|
Assignment(packet),
|
||||||
_isFinished(false),
|
_isFinished(false)
|
||||||
_datagramProcessingThread(NULL)
|
|
||||||
{
|
{
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -34,6 +34,14 @@ void ThreadedAssignment::setFinished(bool isFinished) {
|
||||||
|
|
||||||
qDebug() << "ThreadedAssignment::setFinished(true) called - finishing up.";
|
qDebug() << "ThreadedAssignment::setFinished(true) called - finishing up.";
|
||||||
|
|
||||||
|
auto& packetReceiver = DependencyManager::get<NodeList>()->getPacketReceiver();
|
||||||
|
|
||||||
|
// we should de-register immediately for any of our packets
|
||||||
|
packetReceiver.unregisterListener(this);
|
||||||
|
|
||||||
|
// we should also tell the packet receiver to drop packets while we're cleaning up
|
||||||
|
packetReceiver.setShouldDropPackets(true);
|
||||||
|
|
||||||
if (_domainServerTimer) {
|
if (_domainServerTimer) {
|
||||||
_domainServerTimer->stop();
|
_domainServerTimer->stop();
|
||||||
}
|
}
|
||||||
|
@ -42,31 +50,9 @@ void ThreadedAssignment::setFinished(bool isFinished) {
|
||||||
_statsTimer->stop();
|
_statsTimer->stop();
|
||||||
}
|
}
|
||||||
|
|
||||||
// stop processing datagrams from the node socket
|
|
||||||
// this ensures we won't process a domain list while we are going down
|
|
||||||
auto nodeList = DependencyManager::get<NodeList>();
|
|
||||||
disconnect(&nodeList->getNodeSocket(), 0, this, 0);
|
|
||||||
|
|
||||||
// call our virtual aboutToFinish method - this gives the ThreadedAssignment subclass a chance to cleanup
|
// call our virtual aboutToFinish method - this gives the ThreadedAssignment subclass a chance to cleanup
|
||||||
aboutToFinish();
|
aboutToFinish();
|
||||||
|
|
||||||
// if we have a datagram processing thread, quit it and wait on it to make sure that
|
|
||||||
// the node socket is back on the same thread as the NodeList
|
|
||||||
|
|
||||||
|
|
||||||
if (_datagramProcessingThread) {
|
|
||||||
// tell the datagram processing thread to quit and wait until it is done,
|
|
||||||
// then return the node socket to the NodeList
|
|
||||||
_datagramProcessingThread->quit();
|
|
||||||
_datagramProcessingThread->wait();
|
|
||||||
|
|
||||||
// set node socket parent back to NodeList
|
|
||||||
nodeList->getNodeSocket().setParent(nodeList.data());
|
|
||||||
}
|
|
||||||
|
|
||||||
// move the NodeList back to the QCoreApplication instance's thread
|
|
||||||
nodeList->moveToThread(QCoreApplication::instance()->thread());
|
|
||||||
|
|
||||||
emit finished();
|
emit finished();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -79,9 +65,6 @@ void ThreadedAssignment::commonInit(const QString& targetName, NodeType_t nodeTy
|
||||||
auto nodeList = DependencyManager::get<NodeList>();
|
auto nodeList = DependencyManager::get<NodeList>();
|
||||||
nodeList->setOwnerType(nodeType);
|
nodeList->setOwnerType(nodeType);
|
||||||
|
|
||||||
// this is a temp fix for Qt 5.3 - rebinding the node socket gives us readyRead for the socket on this thread
|
|
||||||
nodeList->rebindNodeSocket();
|
|
||||||
|
|
||||||
_domainServerTimer = new QTimer();
|
_domainServerTimer = new QTimer();
|
||||||
connect(_domainServerTimer, SIGNAL(timeout()), this, SLOT(checkInWithDomainServerOrExit()));
|
connect(_domainServerTimer, SIGNAL(timeout()), this, SLOT(checkInWithDomainServerOrExit()));
|
||||||
_domainServerTimer->start(DOMAIN_SERVER_CHECK_IN_MSECS);
|
_domainServerTimer->start(DOMAIN_SERVER_CHECK_IN_MSECS);
|
||||||
|
@ -120,16 +103,3 @@ void ThreadedAssignment::checkInWithDomainServerOrExit() {
|
||||||
DependencyManager::get<NodeList>()->sendDomainServerCheckIn();
|
DependencyManager::get<NodeList>()->sendDomainServerCheckIn();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
bool ThreadedAssignment::readAvailableDatagram(QByteArray& destinationByteArray, HifiSockAddr& senderSockAddr) {
|
|
||||||
auto nodeList = DependencyManager::get<NodeList>();
|
|
||||||
|
|
||||||
if (nodeList->getNodeSocket().hasPendingDatagrams()) {
|
|
||||||
destinationByteArray.resize(nodeList->getNodeSocket().pendingDatagramSize());
|
|
||||||
nodeList->getNodeSocket().readDatagram(destinationByteArray.data(), destinationByteArray.size(),
|
|
||||||
senderSockAddr.getAddressPointer(), senderSockAddr.getPortPointer());
|
|
||||||
return true;
|
|
||||||
} else {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
|
@ -38,10 +38,8 @@ signals:
|
||||||
void finished();
|
void finished();
|
||||||
|
|
||||||
protected:
|
protected:
|
||||||
bool readAvailableDatagram(QByteArray& destinationByteArray, HifiSockAddr& senderSockAddr);
|
|
||||||
void commonInit(const QString& targetName, NodeType_t nodeType, bool shouldSendStats = true);
|
void commonInit(const QString& targetName, NodeType_t nodeType, bool shouldSendStats = true);
|
||||||
bool _isFinished;
|
bool _isFinished;
|
||||||
QThread* _datagramProcessingThread;
|
|
||||||
QTimer* _domainServerTimer = nullptr;
|
QTimer* _domainServerTimer = nullptr;
|
||||||
QTimer* _statsTimer = nullptr;
|
QTimer* _statsTimer = nullptr;
|
||||||
|
|
||||||
|
|
|
@ -108,7 +108,7 @@ void OctreeRenderer::processDatagram(NLPacket& packet, SharedNodePointer sourceN
|
||||||
|
|
||||||
bool error = false;
|
bool error = false;
|
||||||
|
|
||||||
while (packet.bytesAvailable() && !error) {
|
while (packet.bytesAvailable() > 0 && !error) {
|
||||||
if (packetIsCompressed) {
|
if (packetIsCompressed) {
|
||||||
if (packet.bytesAvailable() > (qint64) sizeof(OCTREE_PACKET_INTERNAL_SECTION_SIZE)) {
|
if (packet.bytesAvailable() > (qint64) sizeof(OCTREE_PACKET_INTERNAL_SECTION_SIZE)) {
|
||||||
packet.readPrimitive(§ionLength);
|
packet.readPrimitive(§ionLength);
|
||||||
|
|
Loading…
Reference in a new issue