changes for new packet receive API

This commit is contained in:
Stephen Birarda 2015-07-10 16:45:44 -07:00
parent 2d1c3e27da
commit 990f7b7332
11 changed files with 258 additions and 165 deletions

View file

@ -168,54 +168,33 @@ void LimitedNodeList::changeSocketBufferSizes(int numBytes) {
}
}
bool LimitedNodeList::packetVersionAndHashMatch(const QByteArray& packet) {
PacketType::Value checkType = packetTypeForPacket(packet);
int numPacketTypeBytes = numBytesArithmeticCodingFromBuffer(packet.data());
if (packet[numPacketTypeBytes] != versionForPacketType(checkType)
&& checkType != PacketType::StunResponse) {
PacketType::Value mismatchType = packetTypeForPacket(packet);
static QMultiMap<QUuid, PacketType::Value> versionDebugSuppressMap;
QUuid senderUUID = uuidFromPacketHeader(packet);
if (!versionDebugSuppressMap.contains(senderUUID, checkType)) {
qCDebug(networking) << "Packet version mismatch on" << packetTypeForPacket(packet) << "- Sender"
<< uuidFromPacketHeader(packet) << "sent" << qPrintable(QString::number(packet[numPacketTypeBytes])) << "but"
<< qPrintable(QString::number(versionForPacketType(mismatchType))) << "expected.";
emit packetVersionMismatch();
versionDebugSuppressMap.insert(senderUUID, checkType);
}
return false;
}
if (!NON_VERIFIED_PACKETS.contains(checkType)) {
bool LimitedNodeList::packetSourceAndHashMatch(const NLPacket& packet, SharedNodePointer& matchingNode) {
if (!NON_VERIFIED_PACKETS.contains(packet.getType()) && !NON_SOURCED_PACKETS.contains(packet.getType())) {
// figure out which node this is from
SharedNodePointer sendingNode = sendingNodeForPacket(packet);
if (sendingNode) {
matchingNode = nodeWithUUID(packet.getSourceID());
if (matchingNode) {
// check if the md5 hash in the header matches the hash we would expect
if (hashFromPacketHeader(packet) == hashForPacketAndConnectionUUID(packet, sendingNode->getConnectionSecret())) {
if (packet.getVerificationHash() == packet.payloadHashWithConnectionUUID(matchingNode->getConnectionSecret())) {
return true;
} else {
static QMultiMap<QUuid, PacketType::Value> hashDebugSuppressMap;
const QUuid& senderID = packet.getSourceID();
QUuid senderUUID = uuidFromPacketHeader(packet);
if (!hashDebugSuppressMap.contains(senderUUID, checkType)) {
qCDebug(networking) << "Packet hash mismatch on" << checkType << "- Sender"
<< uuidFromPacketHeader(packet);
if (!hashDebugSuppressMap.contains(senderID, packet.getType())) {
qCDebug(networking) << "Packet hash mismatch on" << packet.getType() << "- Sender" << senderID;
hashDebugSuppressMap.insert(senderUUID, checkType);
hashDebugSuppressMap.insert(senderID, packet.getType());
}
}
} else {
static QString repeatedMessage
= LogHandler::getInstance().addRepeatedMessageRegex("Packet of type \\d+ received from unknown node with UUID");
qCDebug(networking) << "Packet of type" << checkType << "received from unknown node with UUID"
<< qPrintable(uuidStringWithoutCurlyBraces(uuidFromPacketHeader(packet)));
qCDebug(networking) << "Packet of type" << packet.getType() << "received from unknown node with UUID"
<< qPrintable(uuidStringWithoutCurlyBraces(packet.getSourceID()));
}
} else {
return true;
@ -224,19 +203,6 @@ bool LimitedNodeList::packetVersionAndHashMatch(const QByteArray& packet) {
return false;
}
qint64 LimitedNodeList::readDatagram(QByteArray& incomingPacket, QHostAddress* address = 0, quint16* port = 0) {
qint64 result = getNodeSocket().readDatagram(incomingPacket.data(), incomingPacket.size(), address, port);
SharedNodePointer sendingNode = sendingNodeForPacket(incomingPacket);
if (sendingNode) {
emit dataReceived(sendingNode->getType(), incomingPacket.size());
} else {
emit dataReceived(NodeType::Unassigned, incomingPacket.size());
}
return result;
}
qint64 LimitedNodeList::writeDatagram(const QByteArray& datagram, const HifiSockAddr& destinationSockAddr) {
// XXX can BandwidthRecorder be used for this?
// stat collection for packets
@ -262,25 +228,13 @@ PacketSequenceNumber LimitedNodeList::getNextSequenceNumberForPacket(const QUuid
return _packetSequenceNumbers[nodeUUID][packetType]++;
}
void LimitedNodeList::processNodeData(const HifiSockAddr& senderSockAddr, const QByteArray& packet) {
// the node decided not to do anything with this packet
// if it comes from a known source we should keep that node alive
SharedNodePointer matchingNode = sendingNodeForPacket(packet);
if (matchingNode) {
matchingNode->setLastHeardMicrostamp(usecTimestampNow());
}
}
int LimitedNodeList::updateNodeWithDataFromPacket(const SharedNodePointer& matchingNode, const QByteArray &packet) {
int LimitedNodeList::updateNodeWithDataFromPacket(const SharedNodePointer& matchingNode, QSharedPointer<NLPacket> packet) {
QMutexLocker locker(&matchingNode->getMutex());
matchingNode->setLastHeardMicrostamp(usecTimestampNow());
// if this was a sequence numbered packet we should store the last seq number for
// a packet of this type for this node
PacketType::Value packetType = packetTypeForPacket(packet);
if (SEQUENCE_NUMBERED_PACKETS.contains(packetType)) {
matchingNode->setLastSequenceNumberForPacketType(sequenceNumberFromHeader(packet, packetType), packetType);
if (SEQUENCE_NUMBERED_PACKETS.contains(packet->getType())) {
matchingNode->setLastSequenceNumberForPacketType(packet->readSequenceNumber(), packet->getType());
}
NodeData* linkedData = matchingNode->getLinkedData();
@ -290,13 +244,13 @@ int LimitedNodeList::updateNodeWithDataFromPacket(const SharedNodePointer& match
if (linkedData) {
QMutexLocker linkedDataLocker(&linkedData->getMutex());
return linkedData->parseData(packet);
return linkedData->parseData(QByteArray::fromRawData(packet->getData(), packet->getSizeWithHeader()));
}
return 0;
}
int LimitedNodeList::findNodeAndUpdateWithDataFromPacket(const QByteArray& packet) {
SharedNodePointer matchingNode = sendingNodeForPacket(packet);
int LimitedNodeList::findNodeAndUpdateWithDataFromPacket(QSharedPointer<NLPacket> packet) {
SharedNodePointer matchingNode = nodeWithUUID(packet->getSourceID());
if (matchingNode) {
return updateNodeWithDataFromPacket(matchingNode, packet);
@ -313,13 +267,6 @@ SharedNodePointer LimitedNodeList::nodeWithUUID(const QUuid& nodeUUID) {
return it == _nodeHash.cend() ? SharedNodePointer() : it->second;
}
SharedNodePointer LimitedNodeList::sendingNodeForPacket(const QByteArray& packet) {
QUuid nodeUUID = uuidFromPacketHeader(packet);
// return the matching node, or NULL if there is no match
return nodeWithUUID(nodeUUID);
}
void LimitedNodeList::eraseAllNodes() {
qCDebug(networking) << "Clearing the NodeList. Deleting all nodes in list.";

View file

@ -120,7 +120,8 @@ public:
QUdpSocket& getNodeSocket() { return _nodeSocket; }
QUdpSocket& getDTLSSocket();
bool packetVersionAndHashMatch(const QByteArray& packet);
bool packetVersionMatch(const NLPacket& packet);
bool packetSourceAndHashMatch(const NLPacket& packet, SharedNodePointer& matchingNode);
PacketReceiver& getPacketReceiver() { return _packetReceiver; }
@ -130,7 +131,6 @@ public:
// { return populatePacketHeaderWithUUID(packet, packetType, _sessionUUID); }
// int populatePacketHeader(char* packet, PacketType::Value packetType)
// { return populatePacketHeaderWithUUID(packet, packetType, _sessionUUID); }
qint64 readDatagram(QByteArray& incomingPacket, QHostAddress* address, quint16 * port);
qint64 sendUnreliablePacket(const NLPacket& packet, const SharedNodePointer& destinationNode) { assert(false); return 0; }
qint64 sendUnreliablePacket(const NLPacket& packet, const HifiSockAddr& sockAddr) { assert(false); return 0; }
@ -146,7 +146,6 @@ public:
int size() const { return _nodeHash.size(); }
SharedNodePointer nodeWithUUID(const QUuid& nodeUUID);
SharedNodePointer sendingNodeForPacket(const QByteArray& packet);
SharedNodePointer addOrUpdateNode(const QUuid& uuid, NodeType_t nodeType,
const HifiSockAddr& publicSocket, const HifiSockAddr& localSocket,
@ -158,11 +157,10 @@ public:
const HifiSockAddr& getLocalSockAddr() const { return _localSockAddr; }
const HifiSockAddr& getSTUNSockAddr() const { return _stunSockAddr; }
void processNodeData(const HifiSockAddr& senderSockAddr, const QByteArray& packet);
void processKillNode(const QByteArray& datagram);
int updateNodeWithDataFromPacket(const SharedNodePointer& matchingNode, const QByteArray& packet);
int findNodeAndUpdateWithDataFromPacket(const QByteArray& packet);
int updateNodeWithDataFromPacket(const SharedNodePointer& matchingNode, QSharedPointer<NLPacket> packet);
int findNodeAndUpdateWithDataFromPacket(const QSharedPointer<NLPacket> packet);
unsigned broadcastToNodes(std::unique_ptr<NLPacket> packet, const NodeSet& destinationNodeTypes) { assert(false); return 0; }
SharedNodePointer soloNodeOfType(char nodeType);
@ -256,11 +254,6 @@ signals:
void canAdjustLocksChanged(bool canAdjustLocks);
void canRezChanged(bool canRez);
void dataSent(const quint8 channel_type, const int bytes);
void dataReceived(const quint8 channel_type, const int bytes);
void packetVersionMismatch();
protected:
LimitedNodeList(unsigned short socketListenPort = 0, unsigned short dtlsListenPort = 0);
LimitedNodeList(LimitedNodeList const&); // Don't implement, needed to avoid copies of singleton

View file

@ -13,7 +13,7 @@
qint64 NLPacket::localHeaderSize(PacketType::Value type) {
qint64 size = ((NON_SOURCED_PACKETS.contains(type)) ? 0 : NUM_BYTES_RFC4122_UUID) +
((NON_VERIFIED_PACKETS.contains(type)) ? 0 : NUM_BYTES_RFC4122_UUID);
((NON_VERIFIED_PACKETS.contains(type) || NON_VERIFIED_PACKETS.contains(type)) ? 0 : NUM_BYTES_MD5_HASH);
return size;
}
@ -43,6 +43,19 @@ std::unique_ptr<NLPacket> NLPacket::create(PacketType::Value type, qint64 size)
return std::unique_ptr<NLPacket>(new NLPacket(type, size));
}
std::unique_ptr<NLPacket> NLPacket::fromReceivedPacket(std::unique_ptr<char> data, qint64 size,
const HifiSockAddr& senderSockAddr) {
// Fail with null data
Q_ASSERT(data);
// Fail with invalid size
Q_ASSERT(size >= 0);
// allocate memory
return std::unique_ptr<NLPacket>(new NLPacket(std::move(data), size, senderSockAddr));
}
std::unique_ptr<NLPacket> NLPacket::createCopy(const NLPacket& other) {
return std::unique_ptr<NLPacket>(new NLPacket(other));
}
@ -53,15 +66,52 @@ NLPacket::NLPacket(PacketType::Value type, qint64 size) : Packet(type, localHead
NLPacket::NLPacket(const NLPacket& other) : Packet(other) {
}
void NLPacket::setSourceUuid(QUuid sourceUuid) {
Q_ASSERT(!NON_SOURCED_PACKETS.contains(_type));
auto offset = Packet::totalHeadersSize();
memcpy(_packet.get() + offset, sourceUuid.toRfc4122().constData(), NUM_BYTES_RFC4122_UUID);
NLPacket::NLPacket(std::unique_ptr<char> data, qint64 size, const HifiSockAddr& senderSockAddr) :
Packet(std::move(data), size, senderSockAddr)
{
readSourceID();
readVerificationHash();
}
void NLPacket::setConnectionUuid(QUuid connectionUuid) {
Q_ASSERT(!NON_VERIFIED_PACKETS.contains(_type));
auto offset = Packet::totalHeadersSize() +
((NON_SOURCED_PACKETS.contains(_type)) ? 0 : NUM_BYTES_RFC4122_UUID);
memcpy(_packet.get() + offset, connectionUuid.toRfc4122().constData(), NUM_BYTES_RFC4122_UUID);
void NLPacket::readSourceID() {
if (!NON_SOURCED_PACKETS.contains(_type)) {
auto offset = Packet::totalHeadersSize();
_sourceID = QUuid::fromRfc4122(QByteArray::fromRawData(_packet.get() + offset, NUM_BYTES_RFC4122_UUID));
}
}
void NLPacket::readVerificationHash() {
if (!NON_SOURCED_PACKETS.contains(_type) && !NON_VERIFIED_PACKETS.contains(_type)) {
auto offset = Packet::totalHeadersSize() + NUM_BYTES_RFC4122_UUID;
_verificationHash = QByteArray(_packet.get() + offset, NUM_BYTES_MD5_HASH);
}
}
void NLPacket::setSourceID(const QUuid& sourceID) {
Q_ASSERT(!NON_SOURCED_PACKETS.contains(_type));
auto offset = Packet::totalHeadersSize();
memcpy(_packet.get() + offset, sourceID.toRfc4122().constData(), NUM_BYTES_RFC4122_UUID);
_sourceID = sourceID;
}
void NLPacket::setVerificationHash(const QByteArray& verificationHash) {
Q_ASSERT(!NON_SOURCED_PACKETS.contains(_type) && !NON_VERIFIED_PACKETS.contains(_type));
auto offset = Packet::totalHeadersSize() + NUM_BYTES_RFC4122_UUID;
memcpy(_packet.get() + offset, verificationHash.data(), verificationHash.size());
_verificationHash = verificationHash;
}
QByteArray NLPacket::payloadHashWithConnectionUUID(const QUuid& connectionUUID) const {
QCryptographicHash hash(QCryptographicHash::Md5);
// add the packet payload and the connection UUID
hash.addData(_payloadStart, _sizeUsed);
hash.addData(connectionUUID.toRfc4122());
// return the hash
return hash.result();
}

View file

@ -18,6 +18,8 @@ class NLPacket : public Packet {
Q_OBJECT
public:
static std::unique_ptr<NLPacket> create(PacketType::Value type, qint64 size = -1);
static std::unique_ptr<NLPacket> fromReceivedPacket(std::unique_ptr<char> data, qint64 size,
const HifiSockAddr& senderSockAddr);
// Provided for convenience, try to limit use
static std::unique_ptr<NLPacket> createCopy(const NLPacket& other);
@ -27,15 +29,24 @@ public:
virtual qint64 totalHeadersSize() const; // Cumulated size of all the headers
virtual qint64 localHeaderSize() const; // Current level's header size
// TODO Implement this :)
QUuid getSourceID() const { return QUuid(); }
void readSourceID();
void readVerificationHash();
const QUuid& getSourceID() const { return _sourceID; }
const QByteArray& getVerificationHash() const { return _verificationHash; }
QByteArray payloadHashWithConnectionUUID(const QUuid& connectionUUID) const;
protected:
NLPacket(PacketType::Value type, qint64 size);
NLPacket(std::unique_ptr<char> data, qint64 size, const HifiSockAddr& senderSockAddr);
NLPacket(const NLPacket& other);
void setSourceUuid(QUuid sourceUuid);
void setConnectionUuid(QUuid connectionUuid);
void setSourceID(const QUuid& sourceID);
void setVerificationHash(const QByteArray& verificationHash);
QUuid _sourceID;
QByteArray _verificationHash;
};
#endif // hifi_NLPacket_h

View file

@ -59,8 +59,8 @@ public:
quint64 getWakeTimestamp() const { return _wakeTimestamp; }
void setWakeTimestamp(quint64 wakeTimestamp) { _wakeTimestamp = wakeTimestamp; }
quint64 getLastHeardMicrostamp() const { return _lastHeardMicrostamp; }
void setLastHeardMicrostamp(quint64 lastHeardMicrostamp) { _lastHeardMicrostamp = lastHeardMicrostamp; }
quint64 getLastHeardMicrostamp() const { return _lastHeardMicrostamp.load(); }
void setLastHeardMicrostamp(quint64 lastHeardMicrostamp) { _lastHeardMicrostamp.store(lastHeardMicrostamp); }
QByteArray toByteArray() const;
@ -92,7 +92,7 @@ protected:
HifiSockAddr* _activeSocket;
quint64 _wakeTimestamp;
quint64 _lastHeardMicrostamp;
std::atomic_ullong _lastHeardMicrostamp;
QTimer* _pingTimer = NULL;

View file

@ -37,6 +37,14 @@ std::unique_ptr<Packet> Packet::create(PacketType::Value type, qint64 size) {
return std::unique_ptr<Packet>(new Packet(type, size));
}
std::unique_ptr<Packet> Packet::fromReceivedPacket(std::unique_ptr<char> data, qint64 size, const HifiSockAddr& senderSockAddr) {
// Fail with invalid size
Q_ASSERT(size >= 0);
// allocate memory
return std::unique_ptr<Packet>(new Packet(std::move(data), size, senderSockAddr));
}
std::unique_ptr<Packet> Packet::createCopy(const Packet& other) {
return std::unique_ptr<Packet>(new Packet(other));
}
@ -51,20 +59,34 @@ qint64 Packet::localHeaderSize() const {
Packet::Packet(PacketType::Value type, qint64 size) :
_type(type),
_version(0),
_packetSize(localHeaderSize(_type) + size),
_packet(new char(_packetSize)),
_payloadStart(_packet.get() + localHeaderSize(_type)),
_capacity(size) {
// Sanity check
Q_ASSERT(size <= maxPayloadSize(type));
_capacity(size)
{
// Sanity check
Q_ASSERT(size <= maxPayloadSize(type));
// copy packet type and version in header
writePacketTypeAndVersion(type);
// copy packet type and version in header
writePacketTypeAndVersion(type);
// Set control bit and sequence number to 0 if necessary
if (SEQUENCE_NUMBERED_PACKETS.contains(type)) {
writeSequenceNumber(0);
}
// Set control bit and sequence number to 0 if necessary
if (SEQUENCE_NUMBERED_PACKETS.contains(type)) {
writeSequenceNumber(0);
}
}
Packet::Packet(std::unique_ptr<char> data, qint64 size, const HifiSockAddr& senderSockAddr) :
_packetSize(size),
_packet(std::move(data)),
_senderSockAddr(senderSockAddr)
{
_type = readType();
_version = readVersion();
_capacity = _packetSize - localHeaderSize(_type);
_sizeUsed = _capacity;
_payloadStart = _packet.get() + (_packetSize - _capacity);
}
Packet::Packet(const Packet& other) {

View file

@ -16,6 +16,7 @@
#include <QIODevice>
#include "HifiSockAddr.h"
#include "PacketHeaders.h"
class Packet : public QIODevice {
@ -24,6 +25,8 @@ public:
using SequenceNumber = uint16_t;
static std::unique_ptr<Packet> create(PacketType::Value type, qint64 size = -1);
static std::unique_ptr<Packet> fromReceivedPacket(std::unique_ptr<char> data, qint64 size, const HifiSockAddr& senderSockAddr);
// Provided for convenience, try to limit use
static std::unique_ptr<Packet> createCopy(const Packet& other);
@ -43,7 +46,9 @@ public:
PacketType::Value getType() const { return _type; }
void setType(PacketType::Value type);
PacketVersion getVersion() const { return _version; }
qint64 getSizeWithHeader() const { return localHeaderSize() + getSizeUsed(); }
qint64 getSizeUsed() const { return _sizeUsed; }
void setSizeUsed(qint64 sizeUsed) { _sizeUsed = sizeUsed; }
@ -51,9 +56,6 @@ public:
HifiSockAddr& getSenderSockAddr() { return _senderSockAddr; }
const HifiSockAddr& getSenderSockAddr() const { return _senderSockAddr; }
// Header readers
PacketType::Value readType() const;
PacketVersion readVersion() const;
SequenceNumber readSequenceNumber() const;
bool readIsControlPacket() const;
@ -68,11 +70,16 @@ public:
protected:
Packet(PacketType::Value type, int64_t size);
Packet(std::unique_ptr<char> data, qint64 size, const HifiSockAddr& senderSockAddr);
Packet(const Packet& other);
Packet& operator=(const Packet& other);
Packet(Packet&& other);
Packet& operator=(Packet&& other);
// Header readers
PacketType::Value readType() const;
PacketVersion readVersion() const;
// QIODevice virtual functions
virtual qint64 writeData(const char* data, qint64 maxSize);
virtual qint64 readData(char* data, qint64 maxSize);
@ -82,6 +89,7 @@ protected:
void writeSequenceNumber(SequenceNumber seqNum);
PacketType::Value _type; // Packet type
PacketVersion _version; // Packet version
qint64 _packetSize = 0; // Total size of the allocated memory
std::unique_ptr<char> _packet; // Allocated memory

View file

@ -208,15 +208,6 @@ int sequenceNumberOffsetForPacketType(PacketType::Value packetType) {
return numBytesForPacketHeaderGivenPacketType(packetType) - sizeof(PacketSequenceNumber);
}
QByteArray hashFromPacketHeader(const QByteArray& packet) {
return packet.mid(hashOffsetForPacketType(packetTypeForPacket(packet)), NUM_BYTES_MD5_HASH);
}
QByteArray hashForPacketAndConnectionUUID(const QByteArray& packet, const QUuid& connectionUUID) {
return QCryptographicHash::hash(packet.mid(numBytesForPacketHeader(packet)) + connectionUUID.toRfc4122(),
QCryptographicHash::Md5);
}
PacketSequenceNumber sequenceNumberFromHeader(const QByteArray& packet, PacketType::Value packetType) {
if (packetType == PacketType::Unknown) {
packetType = packetTypeForPacket(packet);

View file

@ -121,9 +121,6 @@ QUuid uuidFromPacketHeader(const QByteArray& packet);
int hashOffsetForPacketType(PacketType::Value packetType);
int sequenceNumberOffsetForPacketType(PacketType::Value packetType);
QByteArray hashFromPacketHeader(const QByteArray& packet);
QByteArray hashForPacketAndConnectionUUID(const QByteArray& packet, const QUuid& connectionUUID);
// NOTE: The following four methods accept a PacketType::Value which defaults to PacketType::Unknown.
// If the caller has already looked at the packet type and can provide it then the methods below won't have to look it up.

View file

@ -13,8 +13,9 @@
#include "PacketReceiver.h"
#include "DependencyManager.h"
#include "NLPacket.h"
#include "NetworkLogging.h"
#include "NodeList.h"
#include "SharedUtil.h"
PacketReceiver::PacketReceiver(QObject* parent) :
QObject(parent),
@ -51,18 +52,18 @@ void PacketReceiver::registerPacketListener(PacketType::Value type, QObject* obj
qDebug() << "PacketReceiver::registerPacketListener expected a method that takes"
<< NON_SOURCED_PACKET_LISTENER_PARAMETERS
<< "but parameter method takes" << signalMethod.parameterTypes();
<< "but parameter method takes" << slotMethod.parameterTypes();
} else {
const QList<QByteArray> SOURCED_PACKET_LISTENER_PARAMETERS = QList<QByteArray>()
<< QMetaObject::normalizedType(("QSharedPointer<NLPacket>")
<< QMetaObject::normalizedType(("QSharedPointer<Node>");
<< QMetaObject::normalizedType("QSharedPointer<NLPacket>")
<< QMetaObject::normalizedType("QSharedPointer<Node>");
parametersMatch = slotMethod.parameterTypes() == SOURCED_PACKET_LISTENER_PARAMETERS;
if (!parametersMatch) {
qDebug() << "PacketReceiver::registerPacketListener expected a method that takes"
<< SOURCED_PACKET_LISTENER_PARAMETERS
<< "but parameter method takes" << signalMethod.parameterTypes();
<< "but parameter method takes" << slotMethod.parameterTypes();
}
}
@ -70,11 +71,37 @@ void PacketReceiver::registerPacketListener(PacketType::Value type, QObject* obj
assert(parametersMatch);
// add the mapping
_packetListenerMap[type] = ObjectMethodPair(object, slotMethod);
_packetListenerMap[type] = ObjectMethodPair(QPointer<QObject>(object), slotMethod);
_packetListenerLock.unlock();
}
bool PacketReceiver::packetVersionMatch(const NLPacket& packet) {
if (packet.getVersion() != versionForPacketType(packet.getType())
&& packet.getType() != PacketType::StunResponse) {
static QMultiMap<QUuid, PacketType::Value> versionDebugSuppressMap;
const QUuid& senderID = packet.getSourceID();
if (!versionDebugSuppressMap.contains(senderID, packet.getType())) {
qCDebug(networking) << "Packet version mismatch on" << packet.getType() << "- Sender"
<< senderID << "sent" << qPrintable(QString::number(packet.getVersion())) << "but"
<< qPrintable(QString::number(versionForPacketType(packet.getType()))) << "expected.";
emit packetVersionMismatch(packet.getType());
versionDebugSuppressMap.insert(senderID, packet.getType());
}
return false;
} else {
return true;
}
}
void PacketReceiver::processDatagrams() {
//PerformanceWarning warn(Menu::getInstance()->isOptionChecked(MenuOption::PipelineWarnings),
//"PacketReceiver::processDatagrams()");
@ -83,42 +110,83 @@ void PacketReceiver::processDatagrams() {
return; // bail early... we're shutting down.
}
static QByteArray incomingPacket;
auto nodeList = DependencyManager::get<NodeList>();
while (DependencyManager::get<NodeList>()->getNodeSocket().hasPendingDatagrams()) {
incomingPacket.resize(nodeList->getNodeSocket().pendingDatagramSize());
// setup a buffer to read the packet into
int packetSizeWithHeader = nodeList->getNodeSocket().pendingDatagramSize();
std::unique_ptr<char> buffer = std::unique_ptr<char>(new char[packetSizeWithHeader]);
// setup a HifiSockAddr to read into
HifiSockAddr senderSockAddr;
nodeList->readDatagram(incomingPacket, senderSockAddr.getAddressPointer(), senderSockAddr.getPortPointer());
// pull the datagram
nodeList->getNodeSocket().readDatagram(buffer.get(), packetSizeWithHeader,
senderSockAddr.getAddressPointer(), senderSockAddr.getPortPointer());
// setup an NLPacket from the data we just read
auto packet = NLPacket::fromReceivedPacket(std::move(buffer), packetSizeWithHeader, senderSockAddr);
_inPacketCount++;
_inByteCount += incomingPacket.size();
_inByteCount += packetSizeWithHeader;
if (nodeList->packetVersionAndHashMatch(incomingPacket)) {
PacketType::Value incomingType = packetTypeForPacket(incomingPacket);
// TODO What do we do about this?
//nodeList->processNodeData(senderSockAddr, incomingPacket);
_packetListenerLock.lock();
auto& listener = _packetListenerMap[incomingType];
_packetListenerLock.unlock();
if (_packetListenerMap.contains(incomingType)) {
auto& listener = _packetListenerMap[incomingType];
//TODO Update packet
QSharedPointer<NLPacket> packet;
bool success = QMetaObject::invokeMethod(listener.first, listener.second,
Q_ARG(QSharedPointer<NLPacket>, packet),
Q_ARG(HifiSockAddr, senderSockAddr));
if (!success) {
qDebug() << "Error sending packet " << incomingType << " to listener: " << listener.first->objectName() << "::" << listener.second;
if (packetVersionMatch(*packet)) {
SharedNodePointer matchingNode;
if (nodeList->packetSourceAndHashMatch(*packet, matchingNode)) {
if (matchingNode) {
// No matter if this packet is handled or not, we update the timestamp for the last time we heard
// from this sending node
matchingNode->setLastHeardMicrostamp(usecTimestampNow());
}
} else {
qDebug() << "No listener found for packet type: " << incomingType;
}
_packetListenerLock.lock();
auto it = _packetListenerMap.find(packet->getType());
if (it != _packetListenerMap.end()) {
auto listener = it.value();
if (!listener.first.isNull()) {
if (matchingNode) {
emit dataReceived(matchingNode->getType(), packet->getSizeWithHeader());
} else {
emit dataReceived(NodeType::Unassigned, packet->getSizeWithHeader());
}
bool success = false;
if (matchingNode) {
success = listener.second.invoke(listener.first,
Q_ARG(QSharedPointer<NLPacket>, QSharedPointer<NLPacket>(packet.release())));
} else {
success = listener.second.invoke(listener.first,
Q_ARG(QSharedPointer<NLPacket>, QSharedPointer<NLPacket>(packet.release())),
Q_ARG(SharedNodePointer, matchingNode));
}
if (!success) {
qDebug() << "Error delivering packet " << nameForPacketType(packet->getType()) << " to listener: "
<< listener.first->objectName() << "::" << listener.second.name();
}
} else {
// we have a dead listener - remove this mapping from the _packetListenerMap
qDebug() << "Listener for packet type" << nameForPacketType(packet->getType())
<< "has been destroyed - removing mapping.";
_packetListenerMap.erase(it);
}
_packetListenerLock.unlock();
} else {
_packetListenerLock.unlock();
qDebug() << "No listener found for packet type " << nameForPacketType(packet->getType());
}
}
}
}
}

View file

@ -14,12 +14,11 @@
#define hifi_PacketReceiver_h
#include <QtCore/QMap>
#include <QtCore/QMetaMethod>
#include <QtCore/QMutex>
#include <QtCore/QObject>
#include <QtCore/QPair>
#include <memory>
#include "NLPacket.h"
#include "PacketHeaders.h"
class PacketReceiver : public QObject {
@ -36,16 +35,23 @@ public:
void shutdown() { _isShuttingDown = true; }
bool registerPacketListener(PacketType::Value type, QObject* listener, const char* slot);
void registerPacketListener(PacketType::Value type, QObject* listener, const char* slot);
public slots:
void processDatagrams();
signals:
void dataSent(quint8 channel_type, int bytes);
void dataReceived(quint8 channel_type, int bytes);
void packetVersionMismatch(PacketType::Value type);
private:
using ObjectMethodPair = QPair<QObject*, QMetaMethod>;
bool packetVersionMatch(const NLPacket& packet);
using ObjectMethodPair = std::pair<QPointer<QObject>, QMetaMethod>;
QMutex _packetListenerLock;
QMap<PacketType::Value, ObjectMethodPair> _packetListenerMap;
QHash<PacketType::Value, ObjectMethodPair> _packetListenerMap;
int _inPacketCount = 0;
int _outPacketCount = 0;
int _inByteCount = 0;