Cleanup PacketReceier and ReceivedMessage

This commit is contained in:
Ryan Huffman 2015-10-16 09:21:10 -07:00
parent 7682ab00e3
commit 9ddcfdf94d
21 changed files with 83 additions and 113 deletions

View file

@ -63,12 +63,12 @@ void Agent::handleOctreePacket(QSharedPointer<ReceivedMessage> message, SharedNo
if (packetType == PacketType::OctreeStats) { if (packetType == PacketType::OctreeStats) {
int statsMessageLength = OctreeHeadlessViewer::parseOctreeStats(message, senderNode); int statsMessageLength = OctreeHeadlessViewer::parseOctreeStats(message, senderNode);
if (message->getPayloadSize() > statsMessageLength) { if (message->getSize() > statsMessageLength) {
// pull out the piggybacked packet and create a new QSharedPointer<NLPacket> for it // pull out the piggybacked packet and create a new QSharedPointer<NLPacket> for it
int piggyBackedSizeWithHeader = message->getPayloadSize() - statsMessageLength; int piggyBackedSizeWithHeader = message->getSize() - statsMessageLength;
auto buffer = std::unique_ptr<char[]>(new char[piggyBackedSizeWithHeader]); auto buffer = std::unique_ptr<char[]>(new char[piggyBackedSizeWithHeader]);
memcpy(buffer.get(), message->getPayload() + statsMessageLength, piggyBackedSizeWithHeader); memcpy(buffer.get(), message->getRawMessage() + statsMessageLength, piggyBackedSizeWithHeader);
auto newPacket = NLPacket::fromReceivedPacket(std::move(buffer), piggyBackedSizeWithHeader, message->getSenderSockAddr()); auto newPacket = NLPacket::fromReceivedPacket(std::move(buffer), piggyBackedSizeWithHeader, message->getSenderSockAddr());
message = QSharedPointer<ReceivedMessage>(new ReceivedMessage(*newPacket)); message = QSharedPointer<ReceivedMessage>(new ReceivedMessage(*newPacket));

View file

@ -40,7 +40,7 @@ AssetServer::AssetServer(ReceivedMessage& message) :
auto& packetReceiver = DependencyManager::get<NodeList>()->getPacketReceiver(); auto& packetReceiver = DependencyManager::get<NodeList>()->getPacketReceiver();
packetReceiver.registerListener(PacketType::AssetGet, this, "handleAssetGet"); packetReceiver.registerListener(PacketType::AssetGet, this, "handleAssetGet");
packetReceiver.registerListener(PacketType::AssetGetInfo, this, "handleAssetGetInfo"); packetReceiver.registerListener(PacketType::AssetGetInfo, this, "handleAssetGetInfo");
packetReceiver.registerMessageListener(PacketType::AssetUpload, this, "handleAssetUpload"); packetReceiver.registerListener(PacketType::AssetUpload, this, "handleAssetUpload");
} }
void AssetServer::run() { void AssetServer::run() {
@ -89,7 +89,7 @@ void AssetServer::handleAssetGetInfo(QSharedPointer<ReceivedMessage> message, Sh
MessageID messageID; MessageID messageID;
uint8_t extensionLength; uint8_t extensionLength;
if (message->getPayloadSize() < qint64(SHA256_HASH_LENGTH + sizeof(messageID) + sizeof(extensionLength))) { if (message->getSize() < qint64(SHA256_HASH_LENGTH + sizeof(messageID) + sizeof(extensionLength))) {
qDebug() << "ERROR bad file request"; qDebug() << "ERROR bad file request";
return; return;
} }
@ -126,7 +126,7 @@ void AssetServer::handleAssetGet(QSharedPointer<ReceivedMessage> message, Shared
auto minSize = qint64(sizeof(MessageID) + SHA256_HASH_LENGTH + sizeof(uint8_t) + sizeof(DataOffset) + sizeof(DataOffset)); auto minSize = qint64(sizeof(MessageID) + SHA256_HASH_LENGTH + sizeof(uint8_t) + sizeof(DataOffset) + sizeof(DataOffset));
if (message->getPayloadSize() < minSize) { if (message->getSize() < minSize) {
qDebug() << "ERROR bad file request"; qDebug() << "ERROR bad file request";
return; return;
} }

View file

@ -549,9 +549,9 @@ void AudioMixer::handleMuteEnvironmentPacket(QSharedPointer<ReceivedMessage> mes
auto nodeList = DependencyManager::get<NodeList>(); auto nodeList = DependencyManager::get<NodeList>();
if (sendingNode->getCanAdjustLocks()) { if (sendingNode->getCanAdjustLocks()) {
auto newPacket = NLPacket::create(PacketType::MuteEnvironment, message->getPayloadSize()); auto newPacket = NLPacket::create(PacketType::MuteEnvironment, message->getSize());
// Copy payload // Copy payload
newPacket->write(message->getPayload(), message->getPayloadSize()); newPacket->write(message->getRawMessage(), message->getSize());
nodeList->eachNode([&](const SharedNodePointer& node){ nodeList->eachNode([&](const SharedNodePointer& node){
if (node->getType() == NodeType::Agent && node->getActiveSocket() && if (node->getType() == NodeType::Agent && node->getActiveSocket() &&

View file

@ -60,7 +60,7 @@ int AudioMixerClientData::parseData(ReceivedMessage& message) {
// read the downstream audio stream stats // read the downstream audio stream stats
message.readPrimitive(&_downstreamAudioStreamStats); message.readPrimitive(&_downstreamAudioStreamStats);
return message.pos(); return message.getPosition();
} else { } else {
PositionalAudioStream* matchingStream = NULL; PositionalAudioStream* matchingStream = NULL;

View file

@ -84,8 +84,8 @@ void OctreeInboundPacketProcessor::processPacket(QSharedPointer<ReceivedMessage>
if (debugProcessPacket) { if (debugProcessPacket) {
qDebug("OctreeInboundPacketProcessor::processPacket() payload=%p payloadLength=%lld", qDebug("OctreeInboundPacketProcessor::processPacket() payload=%p payloadLength=%lld",
message->getPayload(), message->getRawMessage(),
message->getPayloadSize()); message->getSize());
} }
// Ask our tree subclass if it can handle the incoming packet... // Ask our tree subclass if it can handle the incoming packet...
@ -117,7 +117,7 @@ void OctreeInboundPacketProcessor::processPacket(QSharedPointer<ReceivedMessage>
if (debugProcessPacket || _myServer->wantsDebugReceiving()) { if (debugProcessPacket || _myServer->wantsDebugReceiving()) {
qDebug() << "PROCESSING THREAD: got '" << packetType << "' packet - " << _receivedPacketCount << " command from client"; qDebug() << "PROCESSING THREAD: got '" << packetType << "' packet - " << _receivedPacketCount << " command from client";
qDebug() << " receivedBytes=" << message->getDataSize(); qDebug() << " receivedBytes=" << message->getSize();
qDebug() << " sequence=" << sequence; qDebug() << " sequence=" << sequence;
qDebug() << " sentAt=" << sentAt << " usecs"; qDebug() << " sentAt=" << sentAt << " usecs";
qDebug() << " arrivedAt=" << arrivedAt << " usecs"; qDebug() << " arrivedAt=" << arrivedAt << " usecs";
@ -131,8 +131,8 @@ void OctreeInboundPacketProcessor::processPacket(QSharedPointer<ReceivedMessage>
qDebug() << " numBytesPacketHeader=" << NLPacket::totalHeaderSize(packetType); qDebug() << " numBytesPacketHeader=" << NLPacket::totalHeaderSize(packetType);
qDebug() << " sizeof(sequence)=" << sizeof(sequence); qDebug() << " sizeof(sequence)=" << sizeof(sequence);
qDebug() << " sizeof(sentAt)=" << sizeof(sentAt); qDebug() << " sizeof(sentAt)=" << sizeof(sentAt);
qDebug() << " atByte (in payload)=" << message->pos(); qDebug() << " atByte (in payload)=" << message->getPosition();
qDebug() << " payload size=" << message->getPayloadSize(); qDebug() << " payload size=" << message->getSize();
if (!message->getBytesLeftToRead()) { if (!message->getBytesLeftToRead()) {
qDebug() << " ----- UNEXPECTED ---- got a packet without any edit details!!!! --------"; qDebug() << " ----- UNEXPECTED ---- got a packet without any edit details!!!! --------";
@ -143,7 +143,7 @@ void OctreeInboundPacketProcessor::processPacket(QSharedPointer<ReceivedMessage>
while (message->getBytesLeftToRead() > 0) { while (message->getBytesLeftToRead() > 0) {
editData = reinterpret_cast<const unsigned char*>(message->getPayload() + message->pos()); editData = reinterpret_cast<const unsigned char*>(message->getRawMessage() + message->getPosition());
int maxSize = message->getBytesLeftToRead(); int maxSize = message->getBytesLeftToRead();
@ -152,8 +152,8 @@ void OctreeInboundPacketProcessor::processPacket(QSharedPointer<ReceivedMessage>
qDebug() << " maxSize=" << maxSize; qDebug() << " maxSize=" << maxSize;
qDebug("OctreeInboundPacketProcessor::processPacket() %hhu " qDebug("OctreeInboundPacketProcessor::processPacket() %hhu "
"payload=%p payloadLength=%lld editData=%p payloadPosition=%lld maxSize=%d", "payload=%p payloadLength=%lld editData=%p payloadPosition=%lld maxSize=%d",
packetType, message->getPayload(), message->getPayloadSize(), editData, packetType, message->getRawMessage(), message->getSize(), editData,
message->pos(), maxSize); message->getPosition(), maxSize);
} }
quint64 startProcess, startLock = usecTimestampNow(); quint64 startProcess, startLock = usecTimestampNow();
@ -177,12 +177,12 @@ void OctreeInboundPacketProcessor::processPacket(QSharedPointer<ReceivedMessage>
lockWaitTime += thisLockWaitTime; lockWaitTime += thisLockWaitTime;
// skip to next edit record in the packet // skip to next edit record in the packet
message->seek(message->pos() + editDataBytesRead); message->seek(message->getPosition() + editDataBytesRead);
if (debugProcessPacket) { if (debugProcessPacket) {
qDebug() << " editDataBytesRead=" << editDataBytesRead; qDebug() << " editDataBytesRead=" << editDataBytesRead;
qDebug() << " AFTER processEditPacketData payload position=" << message->pos(); qDebug() << " AFTER processEditPacketData payload position=" << message->getPosition();
qDebug() << " AFTER processEditPacketData payload size=" << message->getPayloadSize(); qDebug() << " AFTER processEditPacketData payload size=" << message->getSize();
} }
} }
@ -190,7 +190,7 @@ void OctreeInboundPacketProcessor::processPacket(QSharedPointer<ReceivedMessage>
if (debugProcessPacket) { if (debugProcessPacket) {
qDebug("OctreeInboundPacketProcessor::processPacket() DONE LOOPING FOR %hhu " qDebug("OctreeInboundPacketProcessor::processPacket() DONE LOOPING FOR %hhu "
"payload=%p payloadLength=%lld editData=%p payloadPosition=%lld", "payload=%p payloadLength=%lld editData=%p payloadPosition=%lld",
packetType, message->getPayload(), message->getPayloadSize(), editData, message->pos()); packetType, message->getRawMessage(), message->getSize(), editData, message->getPosition());
} }
// Make sure our Node and NodeList knows we've heard from this node. // Make sure our Node and NodeList knows we've heard from this node.

View file

@ -51,7 +51,7 @@ const NodeSet STATICALLY_ASSIGNED_NODES = NodeSet() << NodeType::AudioMixer
<< NodeType::AssetServer; << NodeType::AssetServer;
void DomainGatekeeper::processConnectRequestPacket(QSharedPointer<ReceivedMessage> message) { void DomainGatekeeper::processConnectRequestPacket(QSharedPointer<ReceivedMessage> message) {
if (message->getPayloadSize() == 0) { if (message->getSize() == 0) {
return; return;
} }

View file

@ -267,7 +267,7 @@ void DomainServer::setupNodeListAndAssignments(const QUuid& sessionUUID) {
packetReceiver.registerListener(PacketType::RequestAssignment, this, "processRequestAssignmentPacket"); packetReceiver.registerListener(PacketType::RequestAssignment, this, "processRequestAssignmentPacket");
packetReceiver.registerListener(PacketType::DomainListRequest, this, "processListRequestPacket"); packetReceiver.registerListener(PacketType::DomainListRequest, this, "processListRequestPacket");
packetReceiver.registerListener(PacketType::DomainServerPathQuery, this, "processPathQueryPacket"); packetReceiver.registerListener(PacketType::DomainServerPathQuery, this, "processPathQueryPacket");
packetReceiver.registerMessageListener(PacketType::NodeJsonStats, this, "processNodeJSONStatsPacket"); packetReceiver.registerListener(PacketType::NodeJsonStats, this, "processNodeJSONStatsPacket");
// NodeList won't be available to the settings manager when it is created, so call registerListener here // NodeList won't be available to the settings manager when it is created, so call registerListener here
packetReceiver.registerListener(PacketType::DomainSettingsRequest, &_settingsManager, "processSettingsRequestPacket"); packetReceiver.registerListener(PacketType::DomainSettingsRequest, &_settingsManager, "processSettingsRequestPacket");
@ -1763,7 +1763,7 @@ void DomainServer::processPathQueryPacket(QSharedPointer<ReceivedMessage> messag
if (numPathBytes <= message->getBytesLeftToRead()) { if (numPathBytes <= message->getBytesLeftToRead()) {
// the number of path bytes makes sense for the sent packet - pull out the path // the number of path bytes makes sense for the sent packet - pull out the path
QString pathQuery = QString::fromUtf8(message->getPayload() + message->pos(), numPathBytes); QString pathQuery = QString::fromUtf8(message->getRawMessage() + message->getPosition(), numPathBytes);
// our settings contain paths that start with a leading slash, so make sure this query has that // our settings contain paths that start with a leading slash, so make sure this query has that
if (!pathQuery.startsWith("/")) { if (!pathQuery.startsWith("/")) {

View file

@ -48,12 +48,12 @@ void OctreePacketProcessor::processPacket(QSharedPointer<ReceivedMessage> messag
int statsMessageLength = qApp->processOctreeStats(*message, sendingNode); int statsMessageLength = qApp->processOctreeStats(*message, sendingNode);
wasStatsPacket = true; wasStatsPacket = true;
int piggybackBytes = message->getPayloadSize() - statsMessageLength; int piggybackBytes = message->getSize() - statsMessageLength;
if (piggybackBytes) { if (piggybackBytes) {
// construct a new packet from the piggybacked one // construct a new packet from the piggybacked one
auto buffer = std::unique_ptr<char[]>(new char[piggybackBytes]); auto buffer = std::unique_ptr<char[]>(new char[piggybackBytes]);
memcpy(buffer.get(), message->getPayload() + statsMessageLength, piggybackBytes); memcpy(buffer.get(), message->getRawMessage() + statsMessageLength, piggybackBytes);
qDebug() << "Got piggyback, read " << piggybackBytes << " bytes"; qDebug() << "Got piggyback, read " << piggybackBytes << " bytes";
auto newPacket = NLPacket::fromReceivedPacket(std::move(buffer), piggybackBytes, message->getSenderSockAddr()); auto newPacket = NLPacket::fromReceivedPacket(std::move(buffer), piggybackBytes, message->getSenderSockAddr());

View file

@ -111,7 +111,7 @@ int InboundAudioStream::parseData(ReceivedMessage& message) {
int networkSamples; int networkSamples;
// parse the info after the seq number and before the audio data (the stream properties) // parse the info after the seq number and before the audio data (the stream properties)
int prePropertyPosition = message.pos(); int prePropertyPosition = message.getPosition();
int propertyBytes = parseStreamProperties(message.getType(), message.readWithoutCopy(message.getBytesLeftToRead()), networkSamples); int propertyBytes = parseStreamProperties(message.getType(), message.readWithoutCopy(message.getBytesLeftToRead()), networkSamples);
message.seek(prePropertyPosition + propertyBytes); message.seek(prePropertyPosition + propertyBytes);
@ -161,7 +161,7 @@ int InboundAudioStream::parseData(ReceivedMessage& message) {
framesAvailableChanged(); framesAvailableChanged();
return message.pos(); return message.getPosition();
} }
int InboundAudioStream::parseStreamProperties(PacketType type, const QByteArray& packetAfterSeqNum, int& numAudioSamples) { int InboundAudioStream::parseStreamProperties(PacketType type, const QByteArray& packetAfterSeqNum, int& numAudioSamples) {

View file

@ -55,7 +55,7 @@ void AvatarHashMap::processAvatarDataPacket(QSharedPointer<ReceivedMessage> mess
while (message->getBytesLeftToRead()) { while (message->getBytesLeftToRead()) {
QUuid sessionUUID = QUuid::fromRfc4122(message->readWithoutCopy(NUM_BYTES_RFC4122_UUID)); QUuid sessionUUID = QUuid::fromRfc4122(message->readWithoutCopy(NUM_BYTES_RFC4122_UUID));
int positionBeforeRead = message->pos(); int positionBeforeRead = message->getPosition();
QByteArray byteArray = message->readWithoutCopy(message->getBytesLeftToRead()); QByteArray byteArray = message->readWithoutCopy(message->getBytesLeftToRead());

View file

@ -877,7 +877,7 @@ int EntityTree::processEraseMessage(ReceivedMessage& message, const SharedNodePo
deleteEntities(entityItemIDsToDelete, true, true); deleteEntities(entityItemIDsToDelete, true, true);
} }
}); });
return message.pos(); return message.getPosition();
} }
// This version skips over the header // This version skips over the header

View file

@ -38,9 +38,9 @@ AssetClient::AssetClient() {
auto nodeList = DependencyManager::get<NodeList>(); auto nodeList = DependencyManager::get<NodeList>();
auto& packetReceiver = nodeList->getPacketReceiver(); auto& packetReceiver = nodeList->getPacketReceiver();
packetReceiver.registerMessageListener(PacketType::AssetGetInfoReply, this, "handleAssetGetInfoReply"); packetReceiver.registerListener(PacketType::AssetGetInfoReply, this, "handleAssetGetInfoReply");
packetReceiver.registerMessageListener(PacketType::AssetGetReply, this, "handleAssetGetReply", true); packetReceiver.registerListener(PacketType::AssetGetReply, this, "handleAssetGetReply", true);
packetReceiver.registerMessageListener(PacketType::AssetUploadReply, this, "handleAssetUploadReply"); packetReceiver.registerListener(PacketType::AssetUploadReply, this, "handleAssetUploadReply");
connect(nodeList.data(), &LimitedNodeList::nodeKilled, this, &AssetClient::handleNodeKilled); connect(nodeList.data(), &LimitedNodeList::nodeKilled, this, &AssetClient::handleNodeKilled);
} }
@ -242,7 +242,7 @@ void AssetClient::handleAssetGetReply(QSharedPointer<ReceivedMessage> message, S
} else { } else {
connect(message.data(), &ReceivedMessage::progress, this, [this, length, message, callbacks](ReceivedMessage* msg) { connect(message.data(), &ReceivedMessage::progress, this, [this, length, message, callbacks](ReceivedMessage* msg) {
//qDebug() << "Progress: " << msg->getDataSize(); //qDebug() << "Progress: " << msg->getDataSize();
callbacks.progressCallback(msg->getDataSize(), length); callbacks.progressCallback(msg->getSize(), length);
}); });
connect(message.data(), &ReceivedMessage::completed, this, [this, message, error, callbacks](ReceivedMessage* msg) { connect(message.data(), &ReceivedMessage::completed, this, [this, message, error, callbacks](ReceivedMessage* msg) {
callbacks.completeCallback(true, error, message->readAll()); callbacks.completeCallback(true, error, message->readAll());

View file

@ -100,11 +100,6 @@ LimitedNodeList::LimitedNodeList(unsigned short socketListenPort, unsigned short
_packetReceiver->handleVerifiedMessagePacket(std::move(packet)); _packetReceiver->handleVerifiedMessagePacket(std::move(packet));
} }
); );
_nodeSocket.setPacketListHandler(
[this](std::unique_ptr<udt::PacketList> packetList) {
_packetReceiver->handleVerifiedPacketList(std::move(packetList));
}
);
// set our isPacketVerified method as the verify operator for the udt::Socket // set our isPacketVerified method as the verify operator for the udt::Socket
using std::placeholders::_1; using std::placeholders::_1;
@ -554,7 +549,7 @@ std::unique_ptr<NLPacket> LimitedNodeList::constructICEPingReplyPacket(ReceivedM
// pull out the ping type so we can reply back with that // pull out the ping type so we can reply back with that
PingType_t pingType; PingType_t pingType;
memcpy(&pingType, message.getPayload() + NUM_BYTES_RFC4122_UUID, sizeof(PingType_t)); memcpy(&pingType, message.getRawMessage() + NUM_BYTES_RFC4122_UUID, sizeof(PingType_t));
int packetSize = NUM_BYTES_RFC4122_UUID + sizeof(PingType_t); int packetSize = NUM_BYTES_RFC4122_UUID + sizeof(PingType_t);
auto icePingReplyPacket = NLPacket::create(PacketType::ICEPingReply, packetSize); auto icePingReplyPacket = NLPacket::create(PacketType::ICEPingReply, packetSize);

View file

@ -98,7 +98,7 @@ NodeList::NodeList(char newOwnerType, unsigned short socketListenPort, unsigned
packetReceiver.registerListener(PacketType::ICEPing, this, "processICEPingPacket"); packetReceiver.registerListener(PacketType::ICEPing, this, "processICEPingPacket");
packetReceiver.registerListener(PacketType::DomainServerAddedNode, this, "processDomainServerAddedNode"); packetReceiver.registerListener(PacketType::DomainServerAddedNode, this, "processDomainServerAddedNode");
packetReceiver.registerListener(PacketType::DomainServerConnectionToken, this, "processDomainServerConnectionTokenPacket"); packetReceiver.registerListener(PacketType::DomainServerConnectionToken, this, "processDomainServerConnectionTokenPacket");
packetReceiver.registerMessageListener(PacketType::DomainSettings, &_domainHandler, "processSettingsPacketList"); packetReceiver.registerListener(PacketType::DomainSettings, &_domainHandler, "processSettingsPacketList");
packetReceiver.registerListener(PacketType::ICEServerPeerInformation, &_domainHandler, "processICEResponsePacket"); packetReceiver.registerListener(PacketType::ICEServerPeerInformation, &_domainHandler, "processICEResponsePacket");
packetReceiver.registerListener(PacketType::DomainServerRequireDTLS, &_domainHandler, "processDTLSRequirementPacket"); packetReceiver.registerListener(PacketType::DomainServerRequireDTLS, &_domainHandler, "processDTLSRequirementPacket");
packetReceiver.registerListener(PacketType::ICEPingReply, &_domainHandler, "processICEPingReplyPacket"); packetReceiver.registerListener(PacketType::ICEPingReply, &_domainHandler, "processICEPingReplyPacket");
@ -378,8 +378,8 @@ void NodeList::processDomainServerPathResponse(QSharedPointer<ReceivedMessage> m
return; return;
} }
QString pathQuery = QString::fromUtf8(message->getPayload() + message->pos(), numPathBytes); QString pathQuery = QString::fromUtf8(message->getRawMessage() + message->getPosition(), numPathBytes);
message->seek(message->pos() + numPathBytes); message->seek(message->getPosition() + numPathBytes);
// figure out how many bytes the viewpoint is // figure out how many bytes the viewpoint is
quint16 numViewpointBytes; quint16 numViewpointBytes;
@ -391,7 +391,7 @@ void NodeList::processDomainServerPathResponse(QSharedPointer<ReceivedMessage> m
} }
// pull the viewpoint from the packet // pull the viewpoint from the packet
QString viewpoint = QString::fromUtf8(message->getPayload() + message->pos(), numViewpointBytes); QString viewpoint = QString::fromUtf8(message->getRawMessage() + message->getPosition(), numViewpointBytes);
// Hand it off to the AddressManager so it can handle it as a relative viewpoint // Hand it off to the AddressManager so it can handle it as a relative viewpoint
if (DependencyManager::get<AddressManager>()->goToViewpointForPath(viewpoint, pathQuery)) { if (DependencyManager::get<AddressManager>()->goToViewpointForPath(viewpoint, pathQuery)) {
@ -500,7 +500,7 @@ void NodeList::processDomainServerList(QSharedPointer<ReceivedMessage> message)
setThisNodeCanRez((bool) thisNodeCanRez); setThisNodeCanRez((bool) thisNodeCanRez);
// pull each node in the packet // pull each node in the packet
while (packetStream.device()->pos() < message->getPayloadSize()) { while (packetStream.device()->pos() < message->getSize()) {
parseNodeFromPacketStream(packetStream); parseNodeFromPacketStream(packetStream);
} }
} }

View file

@ -94,7 +94,7 @@ void PacketReceiver::registerDirectListenerForTypes(PacketTypeList types,
} }
} }
bool PacketReceiver::registerMessageListener(PacketType type, QObject* listener, const char* slot, bool PacketReceiver::registerListener(PacketType type, QObject* listener, const char* slot,
bool deliverPending) { bool deliverPending) {
Q_ASSERT_X(listener, "PacketReceiver::registerMessageListener", "No object to register"); Q_ASSERT_X(listener, "PacketReceiver::registerMessageListener", "No object to register");
Q_ASSERT_X(slot, "PacketReceiver::registerMessageListener", "No slot to register"); Q_ASSERT_X(slot, "PacketReceiver::registerMessageListener", "No slot to register");
@ -102,22 +102,15 @@ bool PacketReceiver::registerMessageListener(PacketType type, QObject* listener,
QMetaMethod matchingMethod = matchingMethodForListener(type, listener, slot); QMetaMethod matchingMethod = matchingMethodForListener(type, listener, slot);
if (matchingMethod.isValid()) { if (matchingMethod.isValid()) {
QMutexLocker locker(&_packetListenerLock); qDebug() << "Found: " << matchingMethod.methodSignature();
registerVerifiedListener(type, listener, matchingMethod);
if (_messageListenerMap.contains(type)) {
qCWarning(networking) << "Registering a packet listener for packet type" << type
<< "that will remove a previously registered listener";
}
// add the mapping
_messageListenerMap[type] = { QPointer<QObject>(listener), matchingMethod, deliverPending };
return true; return true;
} else { } else {
return false; return false;
} }
} }
/*
bool PacketReceiver::registerListener(PacketType type, QObject* listener, const char* slot) { bool PacketReceiver::registerListener(PacketType type, QObject* listener, const char* slot) {
Q_ASSERT_X(listener, "PacketReceiver::registerListener", "No object to register"); Q_ASSERT_X(listener, "PacketReceiver::registerListener", "No object to register");
Q_ASSERT_X(slot, "PacketReceiver::registerListener", "No slot to register"); Q_ASSERT_X(slot, "PacketReceiver::registerListener", "No slot to register");
@ -132,6 +125,7 @@ bool PacketReceiver::registerListener(PacketType type, QObject* listener, const
return false; return false;
} }
} }
*/
QMetaMethod PacketReceiver::matchingMethodForListener(PacketType type, QObject* object, const char* slot) const { QMetaMethod PacketReceiver::matchingMethodForListener(PacketType type, QObject* object, const char* slot) const {
Q_ASSERT_X(object, "PacketReceiver::matchingMethodForListener", "No object to call"); Q_ASSERT_X(object, "PacketReceiver::matchingMethodForListener", "No object to call");
@ -140,27 +134,20 @@ QMetaMethod PacketReceiver::matchingMethodForListener(PacketType type, QObject*
// normalize the slot with the expected parameters // normalize the slot with the expected parameters
static const QString SIGNATURE_TEMPLATE("%1(%2)"); static const QString SIGNATURE_TEMPLATE("%1(%2)");
// static const QString NON_SOURCED_PACKET_LISTENER_PARAMETERS = "QSharedPointer<NLPacket>"; static const QString NON_SOURCED_MESSAGE_LISTENER_PARAMETERS = "QSharedPointer<ReceivedMessage>";
static const QString NON_SOURCED_PACKETLIST_LISTENER_PARAMETERS = "QSharedPointer<ReceivedMessage>";
QSet<QString> possibleSignatures { QSet<QString> possibleSignatures {
// SIGNATURE_TEMPLATE.arg(slot, NON_SOURCED_PACKET_LISTENER_PARAMETERS), SIGNATURE_TEMPLATE.arg(slot, NON_SOURCED_MESSAGE_LISTENER_PARAMETERS)
SIGNATURE_TEMPLATE.arg(slot, NON_SOURCED_PACKETLIST_LISTENER_PARAMETERS)
}; };
if (!NON_SOURCED_PACKETS.contains(type)) { if (!NON_SOURCED_PACKETS.contains(type)) {
// static const QString SOURCED_PACKET_LISTENER_PARAMETERS = "QSharedPointer<NLPacket>,QSharedPointer<Node>"; static const QString SOURCED_MESSAGE_LISTENER_PARAMETERS = "QSharedPointer<ReceivedMessage>,QSharedPointer<Node>";
// static const QString TYPEDEF_SOURCED_PACKET_LISTENER_PARAMETERS = "QSharedPointer<NLPacket>,SharedNodePointer"; static const QString TYPEDEF_SOURCED_MESSAGE_LISTENER_PARAMETERS = "QSharedPointer<ReceivedMessage>,SharedNodePointer";
static const QString SOURCED_PACKETLIST_LISTENER_PARAMETERS = "QSharedPointer<ReceivedMessage>,QSharedPointer<Node>";
static const QString TYPEDEF_SOURCED_PACKETLIST_LISTENER_PARAMETERS = "QSharedPointer<ReceivedMessage>,SharedNodePointer";
// a sourced packet must take the shared pointer to the packet but optionally could include // a sourced packet must take the shared pointer to the ReceivedMessage but optionally could include
// a shared pointer to the node // a shared pointer to the node
possibleSignatures << SIGNATURE_TEMPLATE.arg(slot, TYPEDEF_SOURCED_MESSAGE_LISTENER_PARAMETERS);
// possibleSignatures << SIGNATURE_TEMPLATE.arg(slot, TYPEDEF_SOURCED_PACKET_LISTENER_PARAMETERS); possibleSignatures << SIGNATURE_TEMPLATE.arg(slot, SOURCED_MESSAGE_LISTENER_PARAMETERS);
// possibleSignatures << SIGNATURE_TEMPLATE.arg(slot, SOURCED_PACKET_LISTENER_PARAMETERS);
possibleSignatures << SIGNATURE_TEMPLATE.arg(slot, TYPEDEF_SOURCED_PACKETLIST_LISTENER_PARAMETERS);
possibleSignatures << SIGNATURE_TEMPLATE.arg(slot, SOURCED_PACKETLIST_LISTENER_PARAMETERS);
} }
int methodIndex = -1; int methodIndex = -1;
@ -244,17 +231,22 @@ void PacketReceiver::handleVerifiedPacket(std::unique_ptr<udt::Packet> packet) {
auto nlPacket = NLPacket::fromBase(std::move(packet)); auto nlPacket = NLPacket::fromBase(std::move(packet));
auto receivedMessage = QSharedPointer<ReceivedMessage>(new ReceivedMessage(*nlPacket.get())); auto receivedMessage = QSharedPointer<ReceivedMessage>(new ReceivedMessage(*nlPacket.get()));
_inPacketCount += 1;
_inByteCount += nlPacket->size();
handleVerifiedMessage(receivedMessage, true); handleVerifiedMessage(receivedMessage, true);
} }
void PacketReceiver::handleVerifiedMessagePacket(std::unique_ptr<udt::Packet> packet) { void PacketReceiver::handleVerifiedMessagePacket(std::unique_ptr<udt::Packet> packet) {
qDebug() << "Got message packet";
auto nlPacket = NLPacket::fromBase(std::move(packet)); auto nlPacket = NLPacket::fromBase(std::move(packet));
_inPacketCount += 1; _inPacketCount += 1;
_inByteCount += nlPacket->size(); _inByteCount += nlPacket->size();
auto key = std::pair<HifiSockAddr, udt::Packet::MessageNumber>(nlPacket->getSenderSockAddr(), nlPacket->getMessageNumber()); auto key = std::pair<HifiSockAddr, udt::Packet::MessageNumber>(nlPacket->getSenderSockAddr(), nlPacket->getMessageNumber());
auto it = _pendingMessages.find(key); auto it = _pendingMessages.find(key);
QSharedPointer<ReceivedMessage> message; QSharedPointer<ReceivedMessage> message;
if (it == _pendingMessages.end()) { if (it == _pendingMessages.end()) {
// Create message // Create message
message = QSharedPointer<ReceivedMessage>(new ReceivedMessage(*nlPacket.release())); message = QSharedPointer<ReceivedMessage>(new ReceivedMessage(*nlPacket.release()));
@ -273,26 +265,9 @@ void PacketReceiver::handleVerifiedMessagePacket(std::unique_ptr<udt::Packet> pa
} }
} }
void PacketReceiver::handleVerifiedPacketList(std::unique_ptr<udt::PacketList> packetList) {
// if we're supposed to drop this packet then break out here
if (_shouldDropPackets) {
return;
}
// setup an NLPacketList from the PacketList we were passed
auto nlPacketList = NLPacketList::fromPacketList(std::move(packetList));
auto receivedMessage = QSharedPointer<ReceivedMessage>(new ReceivedMessage(*nlPacketList.release()));
handleVerifiedMessage(receivedMessage, true);
}
void PacketReceiver::handleVerifiedMessage(QSharedPointer<ReceivedMessage> receivedMessage, bool justReceived) { void PacketReceiver::handleVerifiedMessage(QSharedPointer<ReceivedMessage> receivedMessage, bool justReceived) {
auto nodeList = DependencyManager::get<LimitedNodeList>(); auto nodeList = DependencyManager::get<LimitedNodeList>();
_inPacketCount += receivedMessage->getNumPackets();
_inByteCount += receivedMessage->size();
SharedNodePointer matchingNode; SharedNodePointer matchingNode;
if (!receivedMessage->getSourceID().isNull()) { if (!receivedMessage->getSourceID().isNull()) {
@ -328,14 +303,12 @@ void PacketReceiver::handleVerifiedMessage(QSharedPointer<ReceivedMessage> recei
PacketType packetType = receivedMessage->getType(); PacketType packetType = receivedMessage->getType();
if (matchingNode) { if (matchingNode) {
emit dataReceived(matchingNode->getType(), receivedMessage->size()); emit dataReceived(matchingNode->getType(), receivedMessage->getSize());
matchingNode->recordBytesReceived(receivedMessage->size()); matchingNode->recordBytesReceived(receivedMessage->getSize());
Node* n = matchingNode.data(); Node* n = matchingNode.data();
auto addr = n->getActiveSocket(); auto addr = n->getActiveSocket();
QMetaMethod metaMethod = listener.method; QMetaMethod metaMethod = listener.method;
// qDebug() << "Got verified packet list: " << QString(receivedMessage->getMessage());
static const QByteArray QSHAREDPOINTER_NODE_NORMALIZED = QMetaObject::normalizedType("QSharedPointer<Node>"); static const QByteArray QSHAREDPOINTER_NODE_NORMALIZED = QMetaObject::normalizedType("QSharedPointer<Node>");
static const QByteArray SHARED_NODE_NORMALIZED = QMetaObject::normalizedType("SharedNodePointer"); static const QByteArray SHARED_NODE_NORMALIZED = QMetaObject::normalizedType("SharedNodePointer");
@ -364,7 +337,7 @@ void PacketReceiver::handleVerifiedMessage(QSharedPointer<ReceivedMessage> recei
} }
} else { } else {
// qDebug() << "Got verified unsourced packet list: " << QString(nlPacketList->getMessage()); // qDebug() << "Got verified unsourced packet list: " << QString(nlPacketList->getMessage());
emit dataReceived(NodeType::Unassigned, receivedMessage->size()); emit dataReceived(NodeType::Unassigned, receivedMessage->getSize());
// one final check on the QPointer before we invoke // one final check on the QPointer before we invoke
if (listener.object) { if (listener.object) {

View file

@ -57,15 +57,14 @@ public:
void resetCounters() { _inPacketCount = 0; _inByteCount = 0; } void resetCounters() { _inPacketCount = 0; _inByteCount = 0; }
// If deliverPending is false, ReceivedMessage will only be delivered once all packets for the message have
// been received. If deliverPending is true, ReceivedMessage will be delivered as soon as the first packet
// for the message is received.
bool registerListener(PacketType type, QObject* listener, const char* slot, bool deliverPending = false);
bool registerListenerForTypes(PacketTypeList types, QObject* listener, const char* slot); bool registerListenerForTypes(PacketTypeList types, QObject* listener, const char* slot);
bool registerMessageListener(PacketType type, QObject* listener, const char* slot, bool deliverPending = false);
bool registerListener(PacketType type, QObject* listener, const char* slot);
void unregisterListener(QObject* listener); void unregisterListener(QObject* listener);
void handleVerifiedPacket(std::unique_ptr<udt::Packet> packet); void handleVerifiedPacket(std::unique_ptr<udt::Packet> packet);
void handleVerifiedPacketList(std::unique_ptr<udt::PacketList> packetList);
void handleVerifiedMessage(QSharedPointer<ReceivedMessage> message, bool justReceived);
void handleVerifiedMessagePacket(std::unique_ptr<udt::Packet> message); void handleVerifiedMessagePacket(std::unique_ptr<udt::Packet> message);
signals: signals:
@ -78,6 +77,8 @@ private:
bool deliverPending; bool deliverPending;
}; };
void handleVerifiedMessage(QSharedPointer<ReceivedMessage> message, bool justReceived);
// these are brutal hacks for now - ideally GenericThread / ReceivedPacketProcessor // these are brutal hacks for now - ideally GenericThread / ReceivedPacketProcessor
// should be changed to have a true event loop and be able to handle our QMetaMethod::invoke // should be changed to have a true event loop and be able to handle our QMetaMethod::invoke
void registerDirectListenerForTypes(PacketTypeList types, QObject* listener, const char* slot); void registerDirectListenerForTypes(PacketTypeList types, QObject* listener, const char* slot);

View file

@ -27,8 +27,9 @@ public:
ReceivedMessage(const NLPacketList& packetList); ReceivedMessage(const NLPacketList& packetList);
ReceivedMessage(NLPacket& packet); ReceivedMessage(NLPacket& packet);
const char* getPayload() const { return _data.constData(); }
QByteArray getMessage() const { return _data; } QByteArray getMessage() const { return _data; }
const char* getRawMessage() const { return _data.constData(); }
PacketType getType() const { return _packetType; } PacketType getType() const { return _packetType; }
PacketVersion getVersion() const { return _packetVersion; } PacketVersion getVersion() const { return _packetVersion; }
@ -39,18 +40,18 @@ public:
const QUuid& getSourceID() const { return _sourceID; } const QUuid& getSourceID() const { return _sourceID; }
const HifiSockAddr& getSenderSockAddr() { return _senderSockAddr; } const HifiSockAddr& getSenderSockAddr() { return _senderSockAddr; }
void seek(qint64 position) { _position = position; } qint64 getPosition() const { return _position; }
qint64 pos() const { return _position; } //qint64 size() const { return _data.size(); }
qint64 size() const { return _data.size(); }
// Get the number of packets that were used to send this message // Get the number of packets that were used to send this message
qint64 getNumPackets() const { return _numPackets; } qint64 getNumPackets() const { return _numPackets; }
qint64 getDataSize() const { return _data.size(); } qint64 getSize() const { return _data.size(); }
qint64 getPayloadSize() const { return _data.size(); }
qint64 getBytesLeftToRead() const { return _data.size() - _position; } qint64 getBytesLeftToRead() const { return _data.size() - _position; }
void seek(qint64 position) { _position = position; }
qint64 peek(char* data, qint64 size); qint64 peek(char* data, qint64 size);
qint64 read(char* data, qint64 size); qint64 read(char* data, qint64 size);

View file

@ -329,5 +329,5 @@ int JurisdictionMap::unpackFromPacket(ReceivedMessage& message) {
} }
} }
return message.pos(); // excludes header return message.getPosition(); // excludes header
} }

View file

@ -66,7 +66,7 @@ int OctreeQuery::getBroadcastData(unsigned char* destinationBuffer) {
// called on the other nodes - assigns it to my views of the others // called on the other nodes - assigns it to my views of the others
int OctreeQuery::parseData(ReceivedMessage& message) { int OctreeQuery::parseData(ReceivedMessage& message) {
const unsigned char* startPosition = reinterpret_cast<const unsigned char*>(message.getPayload()); const unsigned char* startPosition = reinterpret_cast<const unsigned char*>(message.getRawMessage());
const unsigned char* sourceBuffer = startPosition; const unsigned char* sourceBuffer = startPosition;
// camera details // camera details

View file

@ -83,7 +83,7 @@ void OctreeRenderer::processDatagram(ReceivedMessage& message, SharedNodePointer
qCDebug(octree, "OctreeRenderer::processDatagram() ... Got Packet Section" qCDebug(octree, "OctreeRenderer::processDatagram() ... Got Packet Section"
" color:%s compressed:%s sequence: %u flight:%d usec size:%lld data:%lld", " color:%s compressed:%s sequence: %u flight:%d usec size:%lld data:%lld",
debug::valueOf(packetIsColored), debug::valueOf(packetIsCompressed), debug::valueOf(packetIsColored), debug::valueOf(packetIsCompressed),
sequence, flightTime, message.getDataSize(), message.getBytesLeftToRead()); sequence, flightTime, message.getSize(), message.getBytesLeftToRead());
} }
_packetsInLastWindow++; _packetsInLastWindow++;
@ -125,14 +125,14 @@ void OctreeRenderer::processDatagram(ReceivedMessage& message, SharedNodePointer
startUncompress = usecTimestampNow(); startUncompress = usecTimestampNow();
OctreePacketData packetData(packetIsCompressed); OctreePacketData packetData(packetIsCompressed);
packetData.loadFinalizedContent(reinterpret_cast<const unsigned char*>(message.getPayload() + message.pos()), packetData.loadFinalizedContent(reinterpret_cast<const unsigned char*>(message.getRawMessage() + message.getPosition()),
sectionLength); sectionLength);
if (extraDebugging) { if (extraDebugging) {
qCDebug(octree, "OctreeRenderer::processDatagram() ... Got Packet Section" qCDebug(octree, "OctreeRenderer::processDatagram() ... Got Packet Section"
" color:%s compressed:%s sequence: %u flight:%d usec size:%lld data:%lld" " color:%s compressed:%s sequence: %u flight:%d usec size:%lld data:%lld"
" subsection:%d sectionLength:%d uncompressed:%d", " subsection:%d sectionLength:%d uncompressed:%d",
debug::valueOf(packetIsColored), debug::valueOf(packetIsCompressed), debug::valueOf(packetIsColored), debug::valueOf(packetIsCompressed),
sequence, flightTime, message.getDataSize(), message.getBytesLeftToRead(), subsection, sectionLength, sequence, flightTime, message.getSize(), message.getBytesLeftToRead(), subsection, sectionLength,
packetData.getUncompressedSize()); packetData.getUncompressedSize());
} }
@ -148,7 +148,7 @@ void OctreeRenderer::processDatagram(ReceivedMessage& message, SharedNodePointer
}); });
// seek forwards in packet // seek forwards in packet
message.seek(message.pos() + sectionLength); message.seek(message.getPosition() + sectionLength);
elementsPerPacket += args.elementsPerPacket; elementsPerPacket += args.elementsPerPacket;
entitiesPerPacket += args.entitiesPerPacket; entitiesPerPacket += args.entitiesPerPacket;

View file

@ -550,7 +550,7 @@ int OctreeSceneStats::unpackFromPacket(ReceivedMessage& packet) {
float calculatedBPV = total == 0 ? 0 : (_bytes * 8) / total; float calculatedBPV = total == 0 ? 0 : (_bytes * 8) / total;
_bitsPerOctreeAverage.updateAverage(calculatedBPV); _bitsPerOctreeAverage.updateAverage(calculatedBPV);
return packet.pos(); // excludes header! return packet.getPosition(); // excludes header!
} }
@ -790,8 +790,8 @@ void OctreeSceneStats::trackIncomingOctreePacket(ReceivedMessage& message, bool
// track packets here... // track packets here...
_incomingPacket++; _incomingPacket++;
_incomingBytes += message.getDataSize(); _incomingBytes += message.getSize();
if (!wasStatsPacket) { if (!wasStatsPacket) {
_incomingWastedBytes += (udt::MAX_PACKET_SIZE - message.getDataSize()); _incomingWastedBytes += (udt::MAX_PACKET_SIZE - message.getSize());
} }
} }