From 336967e580b5923ce44f1ae3661bb60a71cf6f4b Mon Sep 17 00:00:00 2001 From: Stephen Birarda Date: Thu, 16 Jul 2015 11:17:49 -0700 Subject: [PATCH] fix edit packet sending from client --- .../src/octree/OctreePacketProcessor.cpp | 2 +- libraries/networking/src/PacketReceiver.cpp | 73 ++++++++++++++++--- libraries/networking/src/PacketReceiver.h | 13 +++- libraries/networking/src/PacketSender.cpp | 6 +- .../octree/src/OctreeEditPacketSender.cpp | 25 ++++--- 5 files changed, 90 insertions(+), 29 deletions(-) diff --git a/interface/src/octree/OctreePacketProcessor.cpp b/interface/src/octree/OctreePacketProcessor.cpp index 7b0249138b..9558df196c 100644 --- a/interface/src/octree/OctreePacketProcessor.cpp +++ b/interface/src/octree/OctreePacketProcessor.cpp @@ -24,7 +24,7 @@ OctreePacketProcessor::OctreePacketProcessor() { PacketType::EntityErase, PacketType::OctreeStats, PacketType::EnvironmentData }; - packetReceiver.registerListenerForTypes(types, this, "handleOctreePacket"); + packetReceiver.registerDirectListenerForTypes(types, this, "handleOctreePacket"); } void OctreePacketProcessor::handleOctreePacket(QSharedPointer packet, SharedNodePointer senderNode) { diff --git a/libraries/networking/src/PacketReceiver.cpp b/libraries/networking/src/PacketReceiver.cpp index e9480a4c75..daa1e08d9d 100644 --- a/libraries/networking/src/PacketReceiver.cpp +++ b/libraries/networking/src/PacketReceiver.cpp @@ -24,7 +24,7 @@ PacketReceiver::PacketReceiver(QObject* parent) : qRegisterMetaType>(); } -void PacketReceiver::registerListenerForTypes(const QSet& types, PacketListener* listener, const char* slot) { +bool PacketReceiver::registerListenerForTypes(const QSet& types, PacketListener* listener, const char* slot) { QSet nonSourcedTypes; QSet sourcedTypes; @@ -41,20 +41,45 @@ void PacketReceiver::registerListenerForTypes(const QSet& typ if (nonSourcedTypes.size() > 0) { QMetaMethod nonSourcedMethod = matchingMethodForListener(*nonSourcedTypes.begin(), object, slot); - foreach(PacketType::Value type, nonSourcedTypes) { - registerVerifiedListener(type, object, nonSourcedMethod); + if (nonSourcedMethod.isValid()) { + foreach(PacketType::Value type, nonSourcedTypes) { + registerVerifiedListener(type, object, nonSourcedMethod); + } + } else { + return false; } } if (sourcedTypes.size() > 0) { QMetaMethod sourcedMethod = matchingMethodForListener(*sourcedTypes.begin(), object, slot); - foreach(PacketType::Value type, sourcedTypes) { - registerVerifiedListener(type, object, sourcedMethod); + if (sourcedMethod.isValid()) { + foreach(PacketType::Value type, sourcedTypes) { + registerVerifiedListener(type, object, sourcedMethod); + } + } else { + return false; } } + + return true; +} + + +void PacketReceiver::registerDirectListenerForTypes(const QSet& types, + PacketListener* listener, const char* slot) { + // just call register listener for types to start + bool success = registerListenerForTypes(types, listener, slot); + if (success) { + _directConnectSetMutex.lock(); + + // if we successfully registered, add this object to the set of objects that are directly connected + _directlyConnectedObjects.insert(dynamic_cast(listener)); + + _directConnectSetMutex.unlock(); + } } -void PacketReceiver::registerListener(PacketType::Value type, PacketListener* listener, const char* slot) { +bool PacketReceiver::registerListener(PacketType::Value type, PacketListener* listener, const char* slot) { QObject* object = dynamic_cast(listener); Q_ASSERT(object); @@ -62,6 +87,9 @@ void PacketReceiver::registerListener(PacketType::Value type, PacketListener* li if (matchingMethod.isValid()) { registerVerifiedListener(type, object, matchingMethod); + return true; + } else { + return false; } } @@ -134,12 +162,14 @@ void PacketReceiver::registerVerifiedListener(PacketType::Value type, QObject* o } void PacketReceiver::unregisterListener(PacketListener* listener) { + QObject* listenerObject = dynamic_cast(listener); + _packetListenerLock.lock(); auto it = _packetListenerMap.begin(); while (it != _packetListenerMap.end()) { - if (it.value().first == dynamic_cast(listener)) { + if (it.value().first == listenerObject) { // this listener matches - erase it it = _packetListenerMap.erase(it); } else { @@ -148,6 +178,10 @@ void PacketReceiver::unregisterListener(PacketListener* listener) { } _packetListenerLock.unlock(); + + _directConnectSetMutex.lock(); + _directlyConnectedObjects.remove(listenerObject); + _directConnectSetMutex.unlock(); } bool PacketReceiver::packetVersionMatch(const NLPacket& packet) { @@ -227,6 +261,15 @@ void PacketReceiver::processDatagrams() { if (listener.first) { bool success = false; + + // check if this is a directly connected listener + _directConnectSetMutex.lock(); + + Qt::ConnectionType connectionType = + _directlyConnectedObjects.contains(listener.first) ? Qt::DirectConnection : Qt::AutoConnection; + + _directConnectSetMutex.unlock(); + PacketType::Value packetType = packet->getType(); if (matchingNode) { @@ -244,17 +287,23 @@ void PacketReceiver::processDatagrams() { if (metaMethod.parameterTypes().contains(SHARED_NODE_NORMALIZED)) { success = metaMethod.invoke(listener.first, - Q_ARG(QSharedPointer, QSharedPointer(packet.release())), - Q_ARG(SharedNodePointer, matchingNode)); + connectionType, + Q_ARG(QSharedPointer, + QSharedPointer(packet.release())), + Q_ARG(SharedNodePointer, matchingNode)); } else if (metaMethod.parameterTypes().contains(QSHAREDPOINTER_NODE_NORMALIZED)) { success = metaMethod.invoke(listener.first, - Q_ARG(QSharedPointer, QSharedPointer(packet.release())), - Q_ARG(QSharedPointer, matchingNode)); + connectionType, + Q_ARG(QSharedPointer, + QSharedPointer(packet.release())), + Q_ARG(QSharedPointer, matchingNode)); } else { success = metaMethod.invoke(listener.first, - Q_ARG(QSharedPointer, QSharedPointer(packet.release()))); + connectionType, + Q_ARG(QSharedPointer, + QSharedPointer(packet.release()))); } } else { emit dataReceived(NodeType::Unassigned, packet->getDataSize()); diff --git a/libraries/networking/src/PacketReceiver.h b/libraries/networking/src/PacketReceiver.h index 3d4cd18946..1bf31b30b1 100644 --- a/libraries/networking/src/PacketReceiver.h +++ b/libraries/networking/src/PacketReceiver.h @@ -22,6 +22,7 @@ #include "NLPacket.h" #include "udt/PacketHeaders.h" +class OctreePacketProcessor; class PacketListener; class PacketReceiver : public QObject { @@ -39,8 +40,8 @@ public: void resetCounters() { _inPacketCount = 0; _inByteCount = 0; } - void registerListenerForTypes(const QSet& types, PacketListener* listener, const char* slot); - void registerListener(PacketType::Value type, PacketListener* listener, const char* slot); + bool registerListenerForTypes(const QSet& types, PacketListener* listener, const char* slot); + bool registerListener(PacketType::Value type, PacketListener* listener, const char* slot); void unregisterListener(PacketListener* listener); public slots: @@ -52,6 +53,10 @@ signals: void packetVersionMismatch(PacketType::Value type); private: + // this is a brutal hack for now - ideally GenericThread / ReceivedPacketProcessor + // should be changed to have a true event loop and be able to handle our QMetaMethod::invoke + void registerDirectListenerForTypes(const QSet& types, PacketListener* listener, const char* slot); + bool packetVersionMatch(const NLPacket& packet); QMetaMethod matchingMethodForListener(PacketType::Value type, QObject* object, const char* slot) const; @@ -64,6 +69,10 @@ private: int _inPacketCount = 0; int _inByteCount = 0; bool _shouldDropPackets = false; + QMutex _directConnectSetMutex; + QSet _directlyConnectedObjects; + + friend class OctreePacketProcessor; }; #endif // hifi_PacketReceiver_h diff --git a/libraries/networking/src/PacketSender.cpp b/libraries/networking/src/PacketSender.cpp index ee3d09cb75..14737dfb8d 100644 --- a/libraries/networking/src/PacketSender.cpp +++ b/libraries/networking/src/PacketSender.cpp @@ -49,13 +49,13 @@ PacketSender::~PacketSender() { void PacketSender::queuePacketForSending(const SharedNodePointer& destinationNode, std::unique_ptr packet) { + _totalPacketsQueued++; + _totalBytesQueued += packet->getDataSize(); + lock(); _packets.push_back({destinationNode, std::move(packet)}); unlock(); - _totalPacketsQueued++; - _totalBytesQueued += packet->getDataSize(); - // Make sure to wake our actual processing thread because we now have packets for it to process. _hasPackets.wakeAll(); } diff --git a/libraries/octree/src/OctreeEditPacketSender.cpp b/libraries/octree/src/OctreeEditPacketSender.cpp index e8a618de26..8fc13b440d 100644 --- a/libraries/octree/src/OctreeEditPacketSender.cpp +++ b/libraries/octree/src/OctreeEditPacketSender.cpp @@ -110,7 +110,7 @@ void OctreeEditPacketSender::queuePacketToNode(const QUuid& nodeUUID, std::uniqu // add packet to history _sentPacketHistories[nodeUUID].packetSent(sequence, *packet); - queuePacketForSending(node, std::move(packet)); + queuePacketForSending(node, NLPacket::createCopy(*packet)); } }); } @@ -251,7 +251,7 @@ void OctreeEditPacketSender::queueOctreeEditMessage(PacketType::Value type, QByt std::unique_ptr& bufferedPacket = _pendingEditPackets[nodeUUID]; if (!bufferedPacket) { - bufferedPacket = NLPacket::create(type); + bufferedPacket = std::move(NLPacket::create(type)); } else { // If we're switching type, then we send the last one and start over if ((type != bufferedPacket->getType() && bufferedPacket->getPayloadSize() > 0) || @@ -291,15 +291,18 @@ void OctreeEditPacketSender::releaseQueuedMessages() { _releaseQueuedMessagesPending = true; } else { _packetsQueueLock.lock(); - for (auto i = _pendingEditPackets.begin(); i != _pendingEditPackets.end(); i++) { - // construct a null unique_ptr to an NL packet - std::unique_ptr releasedPacket; - - // swap the null ptr with the packet we want to release - i->second.swap(releasedPacket); - - // move and release the queued packet - releaseQueuedPacket(i->first, std::move(releasedPacket)); + for (auto& i : _pendingEditPackets) { + if (i.second) { + // construct a null unique_ptr to an NL packet + std::unique_ptr releasedPacket; + + // swap the null ptr with the packet we want to release + i.second.swap(releasedPacket); + + // move and release the queued packet + releaseQueuedPacket(i.first, std::move(releasedPacket)); + } + } _packetsQueueLock.unlock(); }