Merge branch 'atp' of github.com:birarda/hifi into protocol

This commit is contained in:
Ryan Huffman 2015-07-16 12:14:46 -07:00
commit 87f891288f
5 changed files with 90 additions and 29 deletions

View file

@ -24,7 +24,7 @@ OctreePacketProcessor::OctreePacketProcessor() {
PacketType::EntityErase, PacketType::OctreeStats, PacketType::EnvironmentData
};
packetReceiver.registerListenerForTypes(types, this, "handleOctreePacket");
packetReceiver.registerDirectListenerForTypes(types, this, "handleOctreePacket");
}
void OctreePacketProcessor::handleOctreePacket(QSharedPointer<NLPacket> packet, SharedNodePointer senderNode) {

View file

@ -24,7 +24,7 @@ PacketReceiver::PacketReceiver(QObject* parent) :
qRegisterMetaType<QSharedPointer<NLPacket>>();
}
void PacketReceiver::registerListenerForTypes(const QSet<PacketType::Value>& types, PacketListener* listener, const char* slot) {
bool PacketReceiver::registerListenerForTypes(const QSet<PacketType::Value>& types, PacketListener* listener, const char* slot) {
QSet<PacketType::Value> nonSourcedTypes;
QSet<PacketType::Value> sourcedTypes;
@ -41,20 +41,45 @@ void PacketReceiver::registerListenerForTypes(const QSet<PacketType::Value>& typ
if (nonSourcedTypes.size() > 0) {
QMetaMethod nonSourcedMethod = matchingMethodForListener(*nonSourcedTypes.begin(), object, slot);
foreach(PacketType::Value type, nonSourcedTypes) {
registerVerifiedListener(type, object, nonSourcedMethod);
if (nonSourcedMethod.isValid()) {
foreach(PacketType::Value type, nonSourcedTypes) {
registerVerifiedListener(type, object, nonSourcedMethod);
}
} else {
return false;
}
}
if (sourcedTypes.size() > 0) {
QMetaMethod sourcedMethod = matchingMethodForListener(*sourcedTypes.begin(), object, slot);
foreach(PacketType::Value type, sourcedTypes) {
registerVerifiedListener(type, object, sourcedMethod);
if (sourcedMethod.isValid()) {
foreach(PacketType::Value type, sourcedTypes) {
registerVerifiedListener(type, object, sourcedMethod);
}
} else {
return false;
}
}
return true;
}
void PacketReceiver::registerDirectListenerForTypes(const QSet<PacketType::Value>& types,
PacketListener* listener, const char* slot) {
// just call register listener for types to start
bool success = registerListenerForTypes(types, listener, slot);
if (success) {
_directConnectSetMutex.lock();
// if we successfully registered, add this object to the set of objects that are directly connected
_directlyConnectedObjects.insert(dynamic_cast<QObject*>(listener));
_directConnectSetMutex.unlock();
}
}
void PacketReceiver::registerListener(PacketType::Value type, PacketListener* listener, const char* slot) {
bool PacketReceiver::registerListener(PacketType::Value type, PacketListener* listener, const char* slot) {
QObject* object = dynamic_cast<QObject*>(listener);
Q_ASSERT(object);
@ -62,6 +87,9 @@ void PacketReceiver::registerListener(PacketType::Value type, PacketListener* li
if (matchingMethod.isValid()) {
registerVerifiedListener(type, object, matchingMethod);
return true;
} else {
return false;
}
}
@ -134,12 +162,14 @@ void PacketReceiver::registerVerifiedListener(PacketType::Value type, QObject* o
}
void PacketReceiver::unregisterListener(PacketListener* listener) {
QObject* listenerObject = dynamic_cast<QObject*>(listener);
_packetListenerLock.lock();
auto it = _packetListenerMap.begin();
while (it != _packetListenerMap.end()) {
if (it.value().first == dynamic_cast<QObject*>(listener)) {
if (it.value().first == listenerObject) {
// this listener matches - erase it
it = _packetListenerMap.erase(it);
} else {
@ -148,6 +178,10 @@ void PacketReceiver::unregisterListener(PacketListener* listener) {
}
_packetListenerLock.unlock();
_directConnectSetMutex.lock();
_directlyConnectedObjects.remove(listenerObject);
_directConnectSetMutex.unlock();
}
bool PacketReceiver::packetVersionMatch(const NLPacket& packet) {
@ -227,6 +261,15 @@ void PacketReceiver::processDatagrams() {
if (listener.first) {
bool success = false;
// check if this is a directly connected listener
_directConnectSetMutex.lock();
Qt::ConnectionType connectionType =
_directlyConnectedObjects.contains(listener.first) ? Qt::DirectConnection : Qt::AutoConnection;
_directConnectSetMutex.unlock();
PacketType::Value packetType = packet->getType();
if (matchingNode) {
@ -244,17 +287,23 @@ void PacketReceiver::processDatagrams() {
if (metaMethod.parameterTypes().contains(SHARED_NODE_NORMALIZED)) {
success = metaMethod.invoke(listener.first,
Q_ARG(QSharedPointer<NLPacket>, QSharedPointer<NLPacket>(packet.release())),
Q_ARG(SharedNodePointer, matchingNode));
connectionType,
Q_ARG(QSharedPointer<NLPacket>,
QSharedPointer<NLPacket>(packet.release())),
Q_ARG(SharedNodePointer, matchingNode));
} else if (metaMethod.parameterTypes().contains(QSHAREDPOINTER_NODE_NORMALIZED)) {
success = metaMethod.invoke(listener.first,
Q_ARG(QSharedPointer<NLPacket>, QSharedPointer<NLPacket>(packet.release())),
Q_ARG(QSharedPointer<Node>, matchingNode));
connectionType,
Q_ARG(QSharedPointer<NLPacket>,
QSharedPointer<NLPacket>(packet.release())),
Q_ARG(QSharedPointer<Node>, matchingNode));
} else {
success = metaMethod.invoke(listener.first,
Q_ARG(QSharedPointer<NLPacket>, QSharedPointer<NLPacket>(packet.release())));
connectionType,
Q_ARG(QSharedPointer<NLPacket>,
QSharedPointer<NLPacket>(packet.release())));
}
} else {
emit dataReceived(NodeType::Unassigned, packet->getDataSize());

View file

@ -22,6 +22,7 @@
#include "NLPacket.h"
#include "udt/PacketHeaders.h"
class OctreePacketProcessor;
class PacketListener;
class PacketReceiver : public QObject {
@ -39,8 +40,8 @@ public:
void resetCounters() { _inPacketCount = 0; _inByteCount = 0; }
void registerListenerForTypes(const QSet<PacketType::Value>& types, PacketListener* listener, const char* slot);
void registerListener(PacketType::Value type, PacketListener* listener, const char* slot);
bool registerListenerForTypes(const QSet<PacketType::Value>& types, PacketListener* listener, const char* slot);
bool registerListener(PacketType::Value type, PacketListener* listener, const char* slot);
void unregisterListener(PacketListener* listener);
public slots:
@ -52,6 +53,10 @@ signals:
void packetVersionMismatch(PacketType::Value type);
private:
// this is a brutal hack for now - ideally GenericThread / ReceivedPacketProcessor
// should be changed to have a true event loop and be able to handle our QMetaMethod::invoke
void registerDirectListenerForTypes(const QSet<PacketType::Value>& types, PacketListener* listener, const char* slot);
bool packetVersionMatch(const NLPacket& packet);
QMetaMethod matchingMethodForListener(PacketType::Value type, QObject* object, const char* slot) const;
@ -64,6 +69,10 @@ private:
int _inPacketCount = 0;
int _inByteCount = 0;
bool _shouldDropPackets = false;
QMutex _directConnectSetMutex;
QSet<QObject*> _directlyConnectedObjects;
friend class OctreePacketProcessor;
};
#endif // hifi_PacketReceiver_h

View file

@ -49,13 +49,13 @@ PacketSender::~PacketSender() {
void PacketSender::queuePacketForSending(const SharedNodePointer& destinationNode, std::unique_ptr<NLPacket> packet) {
_totalPacketsQueued++;
_totalBytesQueued += packet->getDataSize();
lock();
_packets.push_back({destinationNode, std::move(packet)});
unlock();
_totalPacketsQueued++;
_totalBytesQueued += packet->getDataSize();
// Make sure to wake our actual processing thread because we now have packets for it to process.
_hasPackets.wakeAll();
}

View file

@ -110,7 +110,7 @@ void OctreeEditPacketSender::queuePacketToNode(const QUuid& nodeUUID, std::uniqu
// add packet to history
_sentPacketHistories[nodeUUID].packetSent(sequence, *packet);
queuePacketForSending(node, std::move(packet));
queuePacketForSending(node, NLPacket::createCopy(*packet));
}
});
}
@ -251,7 +251,7 @@ void OctreeEditPacketSender::queueOctreeEditMessage(PacketType::Value type, QByt
std::unique_ptr<NLPacket>& bufferedPacket = _pendingEditPackets[nodeUUID];
if (!bufferedPacket) {
bufferedPacket = NLPacket::create(type);
bufferedPacket = std::move(NLPacket::create(type));
} else {
// If we're switching type, then we send the last one and start over
if ((type != bufferedPacket->getType() && bufferedPacket->getPayloadSize() > 0) ||
@ -291,15 +291,18 @@ void OctreeEditPacketSender::releaseQueuedMessages() {
_releaseQueuedMessagesPending = true;
} else {
_packetsQueueLock.lock();
for (auto i = _pendingEditPackets.begin(); i != _pendingEditPackets.end(); i++) {
// construct a null unique_ptr to an NL packet
std::unique_ptr<NLPacket> releasedPacket;
// swap the null ptr with the packet we want to release
i->second.swap(releasedPacket);
// move and release the queued packet
releaseQueuedPacket(i->first, std::move(releasedPacket));
for (auto& i : _pendingEditPackets) {
if (i.second) {
// construct a null unique_ptr to an NL packet
std::unique_ptr<NLPacket> releasedPacket;
// swap the null ptr with the packet we want to release
i.second.swap(releasedPacket);
// move and release the queued packet
releaseQueuedPacket(i.first, std::move(releasedPacket));
}
}
_packetsQueueLock.unlock();
}