remove senderWithAddress from NodeList

This commit is contained in:
Stephen Birarda 2014-02-06 14:43:46 -08:00
parent a823722d27
commit 97a7369c76
15 changed files with 42 additions and 66 deletions

View file

@ -39,7 +39,7 @@ void OctreeInboundPacketProcessor::resetStats() {
}
void OctreeInboundPacketProcessor::processPacket(const HifiSockAddr& senderSockAddr, const QByteArray& packet) {
void OctreeInboundPacketProcessor::processPacket(const SharedNodePointer& sendingNode, const QByteArray& packet) {
bool debugProcessPacket = _myServer->wantsVerboseDebug();
@ -55,8 +55,6 @@ void OctreeInboundPacketProcessor::processPacket(const HifiSockAddr& senderSockA
if (_myServer->getOctree()->handlesEditPacketType(packetType)) {
PerformanceWarning warn(debugProcessPacket, "processPacket KNOWN TYPE",debugProcessPacket);
_receivedPacketCount++;
SharedNodePointer senderNode = NodeList::getInstance()->nodeWithAddress(senderSockAddr);
const unsigned char* packetData = reinterpret_cast<const unsigned char*>(packet.data());
@ -90,7 +88,7 @@ void OctreeInboundPacketProcessor::processPacket(const HifiSockAddr& senderSockA
int editDataBytesRead = _myServer->getOctree()->processEditPacketData(packetType,
reinterpret_cast<const unsigned char*>(packet.data()),
packet.size(),
editData, maxSize, senderNode.data());
editData, maxSize, sendingNode.data());
_myServer->getOctree()->unlock();
quint64 endProcess = usecTimestampNow();
@ -113,9 +111,9 @@ void OctreeInboundPacketProcessor::processPacket(const HifiSockAddr& senderSockA
// Make sure our Node and NodeList knows we've heard from this node.
QUuid& nodeUUID = DEFAULT_NODE_ID_REF;
if (senderNode) {
senderNode->setLastHeardMicrostamp(usecTimestampNow());
nodeUUID = senderNode->getUUID();
if (sendingNode) {
sendingNode->setLastHeardMicrostamp(usecTimestampNow());
nodeUUID = sendingNode->getUUID();
if (debugProcessPacket) {
qDebug() << "sender has uuid=" << nodeUUID;
}

View file

@ -63,7 +63,7 @@ public:
NodeToSenderStatsMap& getSingleSenderStats() { return _singleSenderStats; }
protected:
virtual void processPacket(const HifiSockAddr& senderSockAddr, const QByteArray& packet);
virtual void processPacket(const SharedNodePointer& sendingNode, const QByteArray& packet);
private:
void trackInboundPackets(const QUuid& nodeUUID, int sequence, quint64 transitTime,

View file

@ -3948,27 +3948,25 @@ void Application::nodeKilled(SharedNodePointer node) {
}
}
void Application::trackIncomingVoxelPacket(const QByteArray& packet, const HifiSockAddr& senderSockAddr, bool wasStatsPacket) {
void Application::trackIncomingVoxelPacket(const QByteArray& packet, const SharedNodePointer& sendingNode, bool wasStatsPacket) {
// Attempt to identify the sender from it's address.
SharedNodePointer serverNode = NodeList::getInstance()->nodeWithAddress(senderSockAddr);
if (serverNode) {
QUuid nodeUUID = serverNode->getUUID();
if (sendingNode) {
QUuid nodeUUID = sendingNode->getUUID();
// now that we know the node ID, let's add these stats to the stats for that node...
_voxelSceneStatsLock.lockForWrite();
if (_octreeServerSceneStats.find(nodeUUID) != _octreeServerSceneStats.end()) {
VoxelSceneStats& stats = _octreeServerSceneStats[nodeUUID];
stats.trackIncomingOctreePacket(packet, wasStatsPacket, serverNode->getClockSkewUsec());
stats.trackIncomingOctreePacket(packet, wasStatsPacket, sendingNode->getClockSkewUsec());
}
_voxelSceneStatsLock.unlock();
}
}
int Application::parseOctreeStats(const QByteArray& packet, const HifiSockAddr& senderSockAddr) {
int Application::parseOctreeStats(const QByteArray& packet, const SharedNodePointer& sendingNode) {
// But, also identify the sender, and keep track of the contained jurisdiction root for this server
SharedNodePointer server = NodeList::getInstance()->nodeWithAddress(senderSockAddr);
// parse the incoming stats datas stick it in a temporary object for now, while we
// determine which server it belongs to
@ -3976,8 +3974,8 @@ int Application::parseOctreeStats(const QByteArray& packet, const HifiSockAddr&
int statsMessageLength = temp.unpackFromMessage(reinterpret_cast<const unsigned char*>(packet.data()), packet.size());
// quick fix for crash... why would voxelServer be NULL?
if (server) {
QUuid nodeUUID = server->getUUID();
if (sendingNode) {
QUuid nodeUUID = sendingNode->getUUID();
// now that we know the node ID, let's add these stats to the stats for that node...
_voxelSceneStatsLock.lockForWrite();
@ -3994,7 +3992,7 @@ int Application::parseOctreeStats(const QByteArray& packet, const HifiSockAddr&
// see if this is the first we've heard of this node...
NodeToJurisdictionMap* jurisdiction = NULL;
if (server->getType() == NodeType::VoxelServer) {
if (sendingNode->getType() == NodeType::VoxelServer) {
jurisdiction = &_voxelServerJurisdictions;
} else {
jurisdiction = &_particleServerJurisdictions;

View file

@ -470,8 +470,8 @@ private:
PieMenu _pieMenu;
int parseOctreeStats(const QByteArray& packet, const HifiSockAddr& senderAddress);
void trackIncomingVoxelPacket(const QByteArray& packet, const HifiSockAddr& senderSockAddr, bool wasStatsPacket);
int parseOctreeStats(const QByteArray& packet, const SharedNodePointer& sendingNode);
void trackIncomingVoxelPacket(const QByteArray& packet, const SharedNodePointer& sendingNode, bool wasStatsPacket);
NodeToJurisdictionMap _voxelServerJurisdictions;
NodeToJurisdictionMap _particleServerJurisdictions;

View file

@ -40,6 +40,9 @@ void DatagramProcessor::processDatagrams() {
_packetCount++;
_byteCount += incomingPacket.size();
QUuid nodeUUID;
deconstructPacketHeader(incomingPacket, nodeUUID);
if (packetVersionMatch(incomingPacket)) {
// only process this packet if we have a match on the packet version
switch (packetTypeForPacket(incomingPacket)) {
@ -84,9 +87,6 @@ void DatagramProcessor::processDatagrams() {
printf("got PacketType_VOXEL_DATA, sequence:%d flightTime:%d\n", sequence, flightTime);
}
QUuid nodeUUID;
deconstructPacketHeader(incomingPacket, nodeUUID);
SharedNodePointer matchedNode = NodeList::getInstance()->nodeWithUUID(nodeUUID);
if (matchedNode) {
@ -103,7 +103,7 @@ void DatagramProcessor::processDatagrams() {
case PacketTypeKillAvatar:
case PacketTypeAvatarIdentity: {
// update having heard from the avatar-mixer and record the bytes received
SharedNodePointer avatarMixer = NodeList::getInstance()->nodeWithAddress(senderSockAddr);
SharedNodePointer avatarMixer = nodeList->nodeWithUUID(nodeUUID);
if (avatarMixer) {
avatarMixer->setLastHeardMicrostamp(usecTimestampNow());

View file

@ -14,7 +14,7 @@
#include "Menu.h"
#include "VoxelPacketProcessor.h"
void VoxelPacketProcessor::processPacket(const HifiSockAddr& senderSockAddr, const QByteArray& packet) {
void VoxelPacketProcessor::processPacket(const SharedNodePointer& sendingNode, const QByteArray& packet) {
PerformanceWarning warn(Menu::getInstance()->isOptionChecked(MenuOption::PipelineWarnings),
"VoxelPacketProcessor::processPacket()");
@ -44,7 +44,7 @@ void VoxelPacketProcessor::processPacket(const HifiSockAddr& senderSockAddr, con
// then process any remaining bytes as if it was another packet
if (voxelPacketType == PacketTypeOctreeStats) {
int statsMessageLength = app->parseOctreeStats(mutablePacket, senderSockAddr);
int statsMessageLength = app->parseOctreeStats(mutablePacket, sendingNode);
wasStatsPacket = true;
if (messageLength > statsMessageLength) {
mutablePacket = mutablePacket.mid(statsMessageLength);
@ -60,26 +60,25 @@ void VoxelPacketProcessor::processPacket(const HifiSockAddr& senderSockAddr, con
voxelPacketType = packetTypeForPacket(mutablePacket);
if (Menu::getInstance()->isOptionChecked(MenuOption::Voxels)) {
app->trackIncomingVoxelPacket(mutablePacket, senderSockAddr, wasStatsPacket);
app->trackIncomingVoxelPacket(mutablePacket, sendingNode, wasStatsPacket);
SharedNodePointer serverNode = NodeList::getInstance()->nodeWithAddress(senderSockAddr);
if (serverNode && serverNode->getActiveSocket() && *serverNode->getActiveSocket() == senderSockAddr) {
if (sendingNode) {
switch(voxelPacketType) {
case PacketTypeParticleErase: {
app->_particles.processEraseMessage(mutablePacket, senderSockAddr, serverNode.data());
app->_particles.processEraseMessage(mutablePacket, *sendingNode->getActiveSocket(), sendingNode.data());
} break;
case PacketTypeParticleData: {
app->_particles.processDatagram(mutablePacket, senderSockAddr, serverNode.data());
app->_particles.processDatagram(mutablePacket, *sendingNode->getActiveSocket(), sendingNode.data());
} break;
case PacketTypeEnvironmentData: {
app->_environment.parseData(senderSockAddr, mutablePacket);
app->_environment.parseData(*sendingNode->getActiveSocket(), mutablePacket);
} break;
default : {
app->_voxels.setDataSourceUUID(serverNode->getUUID());
app->_voxels.setDataSourceUUID(sendingNode->getUUID());
app->_voxels.parseData(mutablePacket);
app->_voxels.setDataSourceUUID(QUuid());
} break;

View file

@ -17,6 +17,6 @@
/// the user is responsible for reading inbound packets and adding them to the processing queue by calling queueReceivedPacket()
class VoxelPacketProcessor : public ReceivedPacketProcessor {
protected:
virtual void processPacket(const HifiSockAddr& senderSockAddr, const QByteArray& packet);
virtual void processPacket(const SharedNodePointer& sendingNode, const QByteArray& packet);
};
#endif // __shared__VoxelPacketProcessor__

View file

@ -61,16 +61,13 @@ bool JurisdictionListener::queueJurisdictionRequest() {
return isStillRunning();
}
void JurisdictionListener::processPacket(const HifiSockAddr& senderAddress, const QByteArray& packet) {
void JurisdictionListener::processPacket(const SharedNodePointer& sendingNode, const QByteArray& packet) {
//qDebug() << "JurisdictionListener::processPacket()";
if (packetTypeForPacket(packet) == PacketTypeJurisdictionRequest) {
SharedNodePointer node = NodeList::getInstance()->nodeWithAddress(senderAddress);
if (node) {
QUuid nodeUUID = node->getUUID();
JurisdictionMap map;
map.unpackFromMessage(reinterpret_cast<const unsigned char*>(packet.data()), packet.size());
_jurisdictions[nodeUUID] = map;
}
if (packetTypeForPacket(packet) == PacketTypeJurisdictionRequest && sendingNode) {
QUuid nodeUUID = sendingNode->getUUID();
JurisdictionMap map;
map.unpackFromMessage(reinterpret_cast<const unsigned char*>(packet.data()), packet.size());
_jurisdictions[nodeUUID] = map;
}
}

View file

@ -49,7 +49,7 @@ protected:
/// \param packetData pointer to received data
/// \param ssize_t packetLength size of received data
/// \thread "this" individual processing thread
virtual void processPacket(const HifiSockAddr& senderAddress, const QByteArray& packet);
virtual void processPacket(const SharedNodePointer& sendingNode, const QByteArray& packet);
private:
NodeToJurisdictionMap _jurisdictions;

View file

@ -28,13 +28,11 @@ JurisdictionSender::~JurisdictionSender() {
}
void JurisdictionSender::processPacket(const HifiSockAddr& senderAddress, const QByteArray& packet) {
void JurisdictionSender::processPacket(const SharedNodePointer& sendingNode, const QByteArray& packet) {
if (packetTypeForPacket(packet) == PacketTypeJurisdictionRequest) {
QUuid nodeUUID;
deconstructPacketHeader(packet, nodeUUID);
if (!nodeUUID.isNull()) {
if (sendingNode) {
lockRequestingNodes();
_nodesRequestingJurisdictions.push(nodeUUID);
_nodesRequestingJurisdictions.push(sendingNode->getUUID());
unlockRequestingNodes();
}
}

View file

@ -37,7 +37,7 @@ public:
void setNodeType(NodeType_t type) { _nodeType = type; }
protected:
virtual void processPacket(const HifiSockAddr& senderAddress, const QByteArray& packet);
virtual void processPacket(const SharedNodePointer& sendingNode, const QByteArray& packet);
/// Locks all the resources of the thread.
void lockRequestingNodes() { _requestingNodeMutex.lock(); }

View file

@ -246,18 +246,6 @@ int NodeList::updateNodeWithData(Node *node, const HifiSockAddr& senderSockAddr,
}
}
SharedNodePointer NodeList::nodeWithAddress(const HifiSockAddr &senderSockAddr) {
// naively returns the first node that has a matching active HifiSockAddr
// note that there can be multiple nodes that have a matching active socket, so this isn't a good way to uniquely identify
foreach (const SharedNodePointer& node, getNodeHash()) {
if (node->getActiveSocket() && *node->getActiveSocket() == senderSockAddr) {
return node;
}
}
return SharedNodePointer();
}
SharedNodePointer NodeList::nodeWithUUID(const QUuid& nodeUUID) {
QMutexLocker locker(&_nodeHashMutex);
return _nodeHash.value(nodeUUID);

View file

@ -104,7 +104,6 @@ public:
QByteArray constructPingReplyPacket(const QByteArray& pingPacket);
void pingPublicAndLocalSocketsForInactiveNode(Node* node);
SharedNodePointer nodeWithAddress(const HifiSockAddr& senderSockAddr);
SharedNodePointer nodeWithUUID(const QUuid& nodeUUID);
SharedNodePointer addOrUpdateNode(const QUuid& uuid, char nodeType, const HifiSockAddr& publicSocket, const HifiSockAddr& localSocket);

View file

@ -41,8 +41,7 @@ bool ReceivedPacketProcessor::process() {
NetworkPacket temporary = packet; // make a copy of the packet in case the vector is resized on us
_packets.erase(_packets.begin()); // remove the oldest packet
unlock(); // let others add to the packets
processPacket(*temporary.getDestinationNode()->getActiveSocket(),
temporary.getByteArray()); // process our temporary copy
processPacket(temporary.getDestinationNode(), temporary.getByteArray()); // process our temporary copy
}
return isStillRunning(); // keep running till they terminate us
}

View file

@ -38,7 +38,7 @@ protected:
/// \param packetData pointer to received data
/// \param ssize_t packetLength size of received data
/// \thread "this" individual processing thread
virtual void processPacket(const HifiSockAddr& senderAddress, const QByteArray& packet) = 0;
virtual void processPacket(const SharedNodePointer& sendingNode, const QByteArray& packet) = 0;
/// Implements generic processing behavior for this thread.
virtual bool process();