Merge branch 'ordered-sending' into atp-server

This commit is contained in:
Ryan Huffman 2015-08-20 11:50:06 -07:00
commit 0a290da529
7 changed files with 101 additions and 101 deletions

View file

@ -97,13 +97,10 @@ AudioMixer::AudioMixer(NLPacket& packet) :
auto& packetReceiver = DependencyManager::get<NodeList>()->getPacketReceiver(); auto& packetReceiver = DependencyManager::get<NodeList>()->getPacketReceiver();
QSet<PacketType> nodeAudioPackets { packetReceiver.registerListenerForTypes({ PacketType::MicrophoneAudioNoEcho, PacketType::MicrophoneAudioWithEcho,
PacketType::MicrophoneAudioNoEcho, PacketType::MicrophoneAudioWithEcho,
PacketType::InjectAudio, PacketType::SilentAudioFrame, PacketType::InjectAudio, PacketType::SilentAudioFrame,
PacketType::AudioStreamStats PacketType::AudioStreamStats },
}; this, "handleNodeAudioPacket");
packetReceiver.registerListenerForTypes(nodeAudioPackets, this, "handleNodeAudioPacket");
packetReceiver.registerListener(PacketType::MuteEnvironment, this, "handleMuteEnvironmentPacket"); packetReceiver.registerListener(PacketType::MuteEnvironment, this, "handleMuteEnvironmentPacket");
} }

View file

@ -110,6 +110,11 @@ DomainServer::DomainServer(int argc, char* argv[]) :
} }
} }
DomainServer::~DomainServer() {
// destroy the LimitedNodeList before the DomainServer QCoreApplication is down
DependencyManager::destroy<LimitedNodeList>();
}
void DomainServer::aboutToQuit() { void DomainServer::aboutToQuit() {
// clear the log handler so that Qt doesn't call the destructor on LogHandler // clear the log handler so that Qt doesn't call the destructor on LogHandler

View file

@ -38,6 +38,7 @@ class DomainServer : public QCoreApplication, public HTTPSRequestHandler {
Q_OBJECT Q_OBJECT
public: public:
DomainServer(int argc, char* argv[]); DomainServer(int argc, char* argv[]);
~DomainServer();
static int const EXIT_CODE_REBOOT; static int const EXIT_CODE_REBOOT;

View file

@ -19,12 +19,9 @@
OctreePacketProcessor::OctreePacketProcessor() { OctreePacketProcessor::OctreePacketProcessor() {
auto& packetReceiver = DependencyManager::get<NodeList>()->getPacketReceiver(); auto& packetReceiver = DependencyManager::get<NodeList>()->getPacketReceiver();
QSet<PacketType> types { packetReceiver.registerDirectListenerForTypes({ PacketType::OctreeStats, PacketType::EntityData,
PacketType::OctreeStats, PacketType::EntityData, PacketType::EntityErase, PacketType::OctreeStats },
PacketType::EntityErase, PacketType::OctreeStats this, "handleOctreePacket");
};
packetReceiver.registerDirectListenerForTypes(types, this, "handleOctreePacket");
} }
void OctreePacketProcessor::handleOctreePacket(QSharedPointer<NLPacket> packet, SharedNodePointer senderNode) { void OctreePacketProcessor::handleOctreePacket(QSharedPointer<NLPacket> packet, SharedNodePointer senderNode) {

View file

@ -19,92 +19,95 @@
#include "NodeList.h" #include "NodeList.h"
#include "SharedUtil.h" #include "SharedUtil.h"
PacketReceiver::PacketReceiver(QObject* parent) : PacketReceiver::PacketReceiver(QObject* parent) : QObject(parent) {
QObject(parent),
_packetListenerMap()
{
qRegisterMetaType<QSharedPointer<NLPacket>>(); qRegisterMetaType<QSharedPointer<NLPacket>>();
} }
bool PacketReceiver::registerListenerForTypes(const QSet<PacketType>& types, QObject* listener, const char* slot) { bool PacketReceiver::registerListenerForTypes(PacketTypeList types, QObject* listener, const char* slot) {
QSet<PacketType> nonSourcedTypes; Q_ASSERT_X(!types.empty(), "PacketReceiver::registerListenerForTypes", "No types to register");
QSet<PacketType> sourcedTypes; Q_ASSERT_X(listener, "PacketReceiver::registerListenerForTypes", "No object to register");
Q_ASSERT_X(slot, "PacketReceiver::registerListenerForTypes", "No slot to register");
foreach(PacketType type, types) { // Partition types based on whether they are sourced or not (non sourced in front)
if (NON_SOURCED_PACKETS.contains(type)) { auto middle = std::partition(std::begin(types), std::end(types), [](PacketType type) {
nonSourcedTypes << type; return NON_SOURCED_PACKETS.contains(type);
} else { });
sourcedTypes << type;
QMetaMethod nonSourcedMethod, sourcedMethod;
// Check we have a valid method for non sourced types if any
if (middle != std::begin(types)) {
nonSourcedMethod = matchingMethodForListener(*std::begin(types), listener, slot);
if (!nonSourcedMethod.isValid()) {
return false;
} }
} }
Q_ASSERT(listener); // Check we have a valid method for sourced types if any
if (middle != std::end(types)) {
sourcedMethod = matchingMethodForListener(*middle, listener, slot);
if (!sourcedMethod.isValid()) {
return false;
}
}
if (nonSourcedTypes.size() > 0) { // Register non sourced types
QMetaMethod nonSourcedMethod = matchingMethodForListener(*nonSourcedTypes.begin(), listener, slot); std::for_each(std::begin(types), middle, [this, &listener, &nonSourcedMethod](PacketType type) {
if (nonSourcedMethod.isValid()) {
foreach(PacketType type, nonSourcedTypes) {
registerVerifiedListener(type, listener, nonSourcedMethod); registerVerifiedListener(type, listener, nonSourcedMethod);
} });
} else {
return false;
}
}
if (sourcedTypes.size() > 0) { // Register sourced types
QMetaMethod sourcedMethod = matchingMethodForListener(*sourcedTypes.begin(), listener, slot); std::for_each(middle, std::end(types), [this, &listener, &sourcedMethod](PacketType type) {
if (sourcedMethod.isValid()) {
foreach(PacketType type, sourcedTypes) {
registerVerifiedListener(type, listener, sourcedMethod); registerVerifiedListener(type, listener, sourcedMethod);
} });
} else {
return false;
}
}
return true; return true;
} }
void PacketReceiver::registerDirectListener(PacketType type, QObject* listener, const char* slot) { void PacketReceiver::registerDirectListener(PacketType type, QObject* listener, const char* slot) {
Q_ASSERT_X(listener, "PacketReceiver::registerDirectListener", "No object to register");
Q_ASSERT_X(slot, "PacketReceiver::registerDirectListener", "No slot to register");
bool success = registerListener(type, listener, slot); bool success = registerListener(type, listener, slot);
if (success) { if (success) {
_directConnectSetMutex.lock(); QMutexLocker locker(&_directConnectSetMutex);
// if we successfully registered, add this object to the set of objects that are directly connected // if we successfully registered, add this object to the set of objects that are directly connected
_directlyConnectedObjects.insert(listener); _directlyConnectedObjects.insert(listener);
_directConnectSetMutex.unlock();
} }
} }
void PacketReceiver::registerDirectListenerForTypes(const QSet<PacketType>& types, void PacketReceiver::registerDirectListenerForTypes(PacketTypeList types,
QObject* listener, const char* slot) { QObject* listener, const char* slot) {
Q_ASSERT_X(listener, "PacketReceiver::registerDirectListenerForTypes", "No object to register");
Q_ASSERT_X(slot, "PacketReceiver::registerDirectListenerForTypes", "No slot to register");
// just call register listener for types to start // just call register listener for types to start
bool success = registerListenerForTypes(types, listener, slot); bool success = registerListenerForTypes(std::move(types), listener, slot);
if (success) { if (success) {
_directConnectSetMutex.lock(); QMutexLocker locker(&_directConnectSetMutex);
// if we successfully registered, add this object to the set of objects that are directly connected // if we successfully registered, add this object to the set of objects that are directly connected
_directlyConnectedObjects.insert(listener); _directlyConnectedObjects.insert(listener);
_directConnectSetMutex.unlock();
} }
} }
bool PacketReceiver::registerMessageListener(PacketType type, QObject* listener, const char* slot) { bool PacketReceiver::registerMessageListener(PacketType type, QObject* listener, const char* slot) {
Q_ASSERT_X(listener, "PacketReceiver::registerMessageListener", "No object to register");
Q_ASSERT_X(slot, "PacketReceiver::registerMessageListener", "No slot to register");
QMetaMethod matchingMethod = matchingMethodForListener(type, listener, slot); QMetaMethod matchingMethod = matchingMethodForListener(type, listener, slot);
if (matchingMethod.isValid()) { if (matchingMethod.isValid()) {
QMutexLocker locker(&_packetListenerLock); QMutexLocker locker(&_packetListenerLock);
if (_packetListListenerMap.contains(type)) { if (_packetListListenerMap.contains(type)) {
qCDebug(networking) << "Warning: Registering a packet listener for packet type" << type qCWarning(networking) << "Registering a packet listener for packet type" << type
<< "that will remove a previously registered listener"; << "that will remove a previously registered listener";
} }
// add the mapping // add the mapping
_packetListListenerMap[type] = ObjectMethodPair(QPointer<QObject>(listener), matchingMethod); _packetListListenerMap[type] = ObjectMethodPair(QPointer<QObject>(listener), matchingMethod);
return true; return true;
} else { } else {
return false; return false;
@ -112,7 +115,8 @@ bool PacketReceiver::registerMessageListener(PacketType type, QObject* listener,
} }
bool PacketReceiver::registerListener(PacketType type, QObject* listener, const char* slot) { bool PacketReceiver::registerListener(PacketType type, QObject* listener, const char* slot) {
Q_ASSERT(listener); Q_ASSERT_X(listener, "PacketReceiver::registerListener", "No object to register");
Q_ASSERT_X(slot, "PacketReceiver::registerListener", "No slot to register");
QMetaMethod matchingMethod = matchingMethodForListener(type, listener, slot); QMetaMethod matchingMethod = matchingMethodForListener(type, listener, slot);
@ -125,16 +129,18 @@ bool PacketReceiver::registerListener(PacketType type, QObject* listener, const
} }
QMetaMethod PacketReceiver::matchingMethodForListener(PacketType type, QObject* object, const char* slot) const { QMetaMethod PacketReceiver::matchingMethodForListener(PacketType type, QObject* object, const char* slot) const {
Q_ASSERT(object); Q_ASSERT_X(object, "PacketReceiver::matchingMethodForListener", "No object to call");
Q_ASSERT_X(slot, "PacketReceiver::matchingMethodForListener", "No slot to call");
// normalize the slot with the expected parameters // normalize the slot with the expected parameters
static const QString SIGNATURE_TEMPLATE("%1(%2)");
static const QString NON_SOURCED_PACKET_LISTENER_PARAMETERS = "QSharedPointer<NLPacket>"; static const QString NON_SOURCED_PACKET_LISTENER_PARAMETERS = "QSharedPointer<NLPacket>";
static const QString NON_SOURCED_PACKETLIST_LISTENER_PARAMETERS = "QSharedPointer<NLPacketList>"; static const QString NON_SOURCED_PACKETLIST_LISTENER_PARAMETERS = "QSharedPointer<NLPacketList>";
QSet<QString> possibleSignatures { QSet<QString> possibleSignatures {
QString("%1(%2)").arg(slot).arg(NON_SOURCED_PACKET_LISTENER_PARAMETERS), SIGNATURE_TEMPLATE.arg(slot, NON_SOURCED_PACKET_LISTENER_PARAMETERS),
QString("%1(%2)").arg(slot).arg(NON_SOURCED_PACKETLIST_LISTENER_PARAMETERS) SIGNATURE_TEMPLATE.arg(slot, NON_SOURCED_PACKETLIST_LISTENER_PARAMETERS)
}; };
if (!NON_SOURCED_PACKETS.contains(type)) { if (!NON_SOURCED_PACKETS.contains(type)) {
@ -146,10 +152,10 @@ QMetaMethod PacketReceiver::matchingMethodForListener(PacketType type, QObject*
// a sourced packet must take the shared pointer to the packet but optionally could include // a sourced packet must take the shared pointer to the packet but optionally could include
// a shared pointer to the node // a shared pointer to the node
possibleSignatures << QString("%1(%2)").arg(slot).arg(TYPEDEF_SOURCED_PACKET_LISTENER_PARAMETERS); possibleSignatures << SIGNATURE_TEMPLATE.arg(slot, TYPEDEF_SOURCED_PACKET_LISTENER_PARAMETERS);
possibleSignatures << QString("%1(%2)").arg(slot).arg(SOURCED_PACKET_LISTENER_PARAMETERS); possibleSignatures << SIGNATURE_TEMPLATE.arg(slot, SOURCED_PACKET_LISTENER_PARAMETERS);
possibleSignatures << QString("%1(%2)").arg(slot).arg(TYPEDEF_SOURCED_PACKETLIST_LISTENER_PARAMETERS); possibleSignatures << SIGNATURE_TEMPLATE.arg(slot, TYPEDEF_SOURCED_PACKETLIST_LISTENER_PARAMETERS);
possibleSignatures << QString("%1(%2)").arg(slot).arg(SOURCED_PACKETLIST_LISTENER_PARAMETERS); possibleSignatures << SIGNATURE_TEMPLATE.arg(slot, SOURCED_PACKETLIST_LISTENER_PARAMETERS);
} }
int methodIndex = -1; int methodIndex = -1;
@ -185,39 +191,30 @@ QMetaMethod PacketReceiver::matchingMethodForListener(PacketType type, QObject*
} }
void PacketReceiver::registerVerifiedListener(PacketType type, QObject* object, const QMetaMethod& slot) { void PacketReceiver::registerVerifiedListener(PacketType type, QObject* object, const QMetaMethod& slot) {
_packetListenerLock.lock(); Q_ASSERT_X(object, "PacketReceiver::registerVerifiedListener", "No object to register");
QMutexLocker locker(&_packetListenerLock);
if (_packetListenerMap.contains(type)) { if (_packetListenerMap.contains(type)) {
qCDebug(networking) << "Warning: Registering a packet listener for packet type" << type qCWarning(networking) << "Registering a packet listener for packet type" << type
<< "that will remove a previously registered listener"; << "that will remove a previously registered listener";
} }
// add the mapping // add the mapping
_packetListenerMap[type] = ObjectMethodPair(QPointer<QObject>(object), slot); _packetListenerMap[type] = ObjectMethodPair(QPointer<QObject>(object), slot);
_packetListenerLock.unlock();
} }
void PacketReceiver::unregisterListener(QObject* listener) { void PacketReceiver::unregisterListener(QObject* listener) {
_packetListenerLock.lock(); Q_ASSERT_X(listener, "PacketReceiver::unregisterListener", "No listener to unregister");
auto it = _packetListenerMap.begin(); QMutexLocker packetListenerLocker(&_packetListenerLock);
std::remove_if(std::begin(_packetListenerMap), std::end(_packetListenerMap),
[&listener](const ObjectMethodPair& pair) {
return pair.first == listener;
});
packetListenerLocker.unlock();
while (it != _packetListenerMap.end()) { QMutexLocker directConnectSetLocker(&_directConnectSetMutex);
if (it.value().first == listener) {
// this listener matches - erase it
it = _packetListenerMap.erase(it);
} else {
++it;
}
}
_packetListenerLock.unlock();
_directConnectSetMutex.lock();
_directlyConnectedObjects.remove(listener); _directlyConnectedObjects.remove(listener);
_directConnectSetMutex.unlock();
} }
void PacketReceiver::handleVerifiedPacketList(std::unique_ptr<udt::PacketList> packetList) { void PacketReceiver::handleVerifiedPacketList(std::unique_ptr<udt::PacketList> packetList) {
@ -338,7 +335,7 @@ void PacketReceiver::handleVerifiedPacketList(std::unique_ptr<udt::PacketList> p
} }
} else if (it == _packetListListenerMap.end()) { } else if (it == _packetListListenerMap.end()) {
qWarning() << "No listener found for packet type" << nlPacketList->getType(); qCWarning(networking) << "No listener found for packet type" << nlPacketList->getType();
// insert a dummy listener so we don't print this again // insert a dummy listener so we don't print this again
_packetListListenerMap.insert(nlPacketList->getType(), { nullptr, QMetaMethod() }); _packetListListenerMap.insert(nlPacketList->getType(), { nullptr, QMetaMethod() });
@ -372,7 +369,7 @@ void PacketReceiver::handleVerifiedPacket(std::unique_ptr<udt::Packet> packet) {
} }
} }
_packetListenerLock.lock(); QMutexLocker packetListenerLocker(&_packetListenerLock);
bool listenerIsDead = false; bool listenerIsDead = false;
@ -387,12 +384,10 @@ void PacketReceiver::handleVerifiedPacket(std::unique_ptr<udt::Packet> packet) {
bool success = false; bool success = false;
// check if this is a directly connected listener // check if this is a directly connected listener
_directConnectSetMutex.lock(); QMutexLocker directConnectSetLocker(&_directConnectSetMutex);
Qt::ConnectionType connectionType = Qt::ConnectionType connectionType =
_directlyConnectedObjects.contains(listener.first) ? Qt::DirectConnection : Qt::AutoConnection; _directlyConnectedObjects.contains(listener.first) ? Qt::DirectConnection : Qt::AutoConnection;
directConnectSetLocker.unlock();
_directConnectSetMutex.unlock();
PacketType packetType = nlPacket->getType(); PacketType packetType = nlPacket->getType();
@ -457,18 +452,18 @@ void PacketReceiver::handleVerifiedPacket(std::unique_ptr<udt::Packet> packet) {
it = _packetListenerMap.erase(it); it = _packetListenerMap.erase(it);
// if it exists, remove the listener from _directlyConnectedObjects // if it exists, remove the listener from _directlyConnectedObjects
_directConnectSetMutex.lock(); QMutexLocker locker(&_directConnectSetMutex);
_directlyConnectedObjects.remove(listener.first); _directlyConnectedObjects.remove(listener.first);
_directConnectSetMutex.unlock(); locker.unlock();
} }
} else if (it == _packetListenerMap.end()) { } else if (it == _packetListenerMap.end()) {
qWarning() << "No listener found for packet type" << nlPacket->getType(); qCWarning(networking) << "No listener found for packet type" << nlPacket->getType();
// insert a dummy listener so we don't print this again // insert a dummy listener so we don't print this again
_packetListenerMap.insert(nlPacket->getType(), { nullptr, QMetaMethod() }); _packetListenerMap.insert(nlPacket->getType(), { nullptr, QMetaMethod() });
} }
_packetListenerLock.unlock(); packetListenerLocker.unlock();
} }

View file

@ -13,6 +13,8 @@
#ifndef hifi_PacketReceiver_h #ifndef hifi_PacketReceiver_h
#define hifi_PacketReceiver_h #define hifi_PacketReceiver_h
#include <vector>
#include <QtCore/QMap> #include <QtCore/QMap>
#include <QtCore/QMetaMethod> #include <QtCore/QMetaMethod>
#include <QtCore/QMutex> #include <QtCore/QMutex>
@ -30,6 +32,8 @@ class OctreePacketProcessor;
class PacketReceiver : public QObject { class PacketReceiver : public QObject {
Q_OBJECT Q_OBJECT
public: public:
using PacketTypeList = std::vector<PacketType>;
PacketReceiver(QObject* parent = 0); PacketReceiver(QObject* parent = 0);
PacketReceiver(const PacketReceiver&) = delete; PacketReceiver(const PacketReceiver&) = delete;
@ -42,8 +46,8 @@ public:
void resetCounters() { _inPacketCount = 0; _inByteCount = 0; } void resetCounters() { _inPacketCount = 0; _inByteCount = 0; }
bool registerListenerForTypes(const QSet<PacketType>& types, QObject* listener, const char* slot); bool registerListenerForTypes(PacketTypeList types, QObject* listener, const char* slot);
bool registerMessageListener(PacketType types, QObject* listener, const char* slot); bool registerMessageListener(PacketType type, QObject* listener, const char* slot);
bool registerListener(PacketType type, QObject* listener, const char* slot); bool registerListener(PacketType type, QObject* listener, const char* slot);
void unregisterListener(QObject* listener); void unregisterListener(QObject* listener);
@ -56,7 +60,7 @@ signals:
private: private:
// these are brutal hacks for now - ideally GenericThread / ReceivedPacketProcessor // these are brutal hacks for now - ideally GenericThread / ReceivedPacketProcessor
// should be changed to have a true event loop and be able to handle our QMetaMethod::invoke // should be changed to have a true event loop and be able to handle our QMetaMethod::invoke
void registerDirectListenerForTypes(const QSet<PacketType>& types, QObject* listener, const char* slot); void registerDirectListenerForTypes(PacketTypeList types, QObject* listener, const char* slot);
void registerDirectListener(PacketType type, QObject* listener, const char* slot); void registerDirectListener(PacketType type, QObject* listener, const char* slot);
QMetaMethod matchingMethodForListener(PacketType type, QObject* object, const char* slot) const; QMetaMethod matchingMethodForListener(PacketType type, QObject* object, const char* slot) const;

View file

@ -59,6 +59,7 @@ Connection::~Connection() {
_sendQueue.release(); _sendQueue.release();
// wait on the send queue thread so we know the send queue is gone // wait on the send queue thread so we know the send queue is gone
sendQueueThread->quit();
sendQueueThread->wait(); sendQueueThread->wait();
} }
} }