First cut at JurisdictionListener and PACKET_TYPE_VOXEL_JURISDICTION_REQUEST

This commit is contained in:
ZappoMan 2013-08-19 20:15:46 -07:00
parent 55a025a0ed
commit 5e7e6fc9d7
18 changed files with 255 additions and 47 deletions

View file

@ -17,6 +17,7 @@
#include <NodeTypes.h>
#include <OctalCode.h>
#include <PacketHeaders.h>
#include <JurisdictionListener.h>
#include <SceneUtils.h>
#include <SharedUtil.h>
#include <VoxelTree.h>
@ -49,6 +50,10 @@ bool wantLocalDomain = false;
unsigned long packetsSent = 0;
unsigned long bytesSent = 0;
JurisdictionListener* jurisdictionListener = NULL;
static void sendVoxelEditMessage(PACKET_TYPE type, VoxelDetail& detail) {
unsigned char* bufferOut;
int sizeOut;
@ -777,6 +782,12 @@ int main(int argc, const char * argv[])
nodeList->linkedDataCreateCallback = NULL; // do we need a callback?
nodeList->startSilentNodeRemovalThread();
// Create our JurisdictionListener so we'll know where to send edit packets
::jurisdictionListener = new JurisdictionListener();
if (::jurisdictionListener) {
::jurisdictionListener->initialize(true);
}
srand((unsigned)time(0));
pthread_t animateVoxelThread;
@ -801,11 +812,22 @@ int main(int argc, const char * argv[])
// Nodes sending messages to us...
if (nodeList->getNodeSocket()->receive(&nodePublicAddress, packetData, &receivedBytes) &&
packetVersionMatch(packetData)) {
if (packetData[0] == PACKET_TYPE_VOXEL_JURISDICTION) {
if (::jurisdictionListener) {
::jurisdictionListener->queueReceivedPacket(nodePublicAddress, packetData, receivedBytes);
}
}
NodeList::getInstance()->processNodeData(&nodePublicAddress, packetData, receivedBytes);
}
}
pthread_join(animateVoxelThread, NULL);
if (::jurisdictionListener) {
::jurisdictionListener->terminate();
delete ::jurisdictionListener;
}
return 0;
}

View file

@ -3271,7 +3271,7 @@ void* Application::networkReceive(void* args) {
case PACKET_TYPE_VOXEL_STATS:
case PACKET_TYPE_ENVIRONMENT_DATA: {
// add this packet to our list of voxel packets and process them on the voxel processing
app->_voxelProcessor.queuePacket(senderAddress, app->_incomingPacket, bytesReceived);
app->_voxelProcessor.queueReceivedPacket(senderAddress, app->_incomingPacket, bytesReceived);
break;
}
case PACKET_TYPE_BULK_AVATAR_DATA:

View file

@ -343,7 +343,7 @@ private:
VoxelSceneStats _voxelSceneStats;
int parseVoxelStats(unsigned char* messageData, ssize_t messageLength, sockaddr senderAddress);
std::map<uint16_t, JurisdictionMap> _voxelServerJurisdictions;
NodeToJurisdictionMap _voxelServerJurisdictions;
std::vector<VoxelFade> _voxelFades;
};

View file

@ -13,7 +13,8 @@
#include <ReceivedPacketProcessor.h>
/// Handles processing of incoming voxel packets for the interface application.
/// Handles processing of incoming voxel packets for the interface application. As with other ReceivedPacketProcessor classes
/// the user is responsible for reading inbound packets and adding them to the processing queue by calling queueReceivedPacket()
class VoxelPacketProcessor : public ReceivedPacketProcessor {
protected:
virtual void processPacket(sockaddr& senderAddress, unsigned char* packetData, ssize_t packetLength);

View file

@ -41,6 +41,8 @@ protected:
/// Unlocks all the resources of the thread.
void unlock() { pthread_mutex_unlock(&_mutex); }
bool isStillRunning() const { return !_stopThread; }
private:
pthread_mutex_t _mutex;

View file

@ -39,6 +39,7 @@ const PACKET_TYPE PACKET_TYPE_REQUEST_ASSIGNMENT = 'r';
const PACKET_TYPE PACKET_TYPE_SEND_ASSIGNMENT = 's';
const PACKET_TYPE PACKET_TYPE_VOXEL_STATS = '#';
const PACKET_TYPE PACKET_TYPE_VOXEL_JURISDICTION = 'J';
const PACKET_TYPE PACKET_TYPE_VOXEL_JURISDICTION_REQUEST = 'j';
typedef char PACKET_VERSION;

View file

@ -26,7 +26,7 @@ PacketSender::PacketSender(PacketSenderNotify* notify, int packetsPerSecond) :
}
void PacketSender::queuePacket(sockaddr& address, unsigned char* packetData, ssize_t packetLength) {
void PacketSender::queuePacketForSending(sockaddr& address, unsigned char* packetData, ssize_t packetLength) {
NetworkPacket packet(address, packetData, packetLength);
lock();
_packets.push_back(packet);
@ -34,9 +34,6 @@ void PacketSender::queuePacket(sockaddr& address, unsigned char* packetData, ssi
}
bool PacketSender::process() {
//printf("PacketSender::process() packets pending=%ld _packetsPerSecond=%d\n",_packets.size(),_packetsPerSecond);
uint64_t USECS_PER_SECOND = 1000 * 1000;
uint64_t SEND_INTERVAL_USECS = (_packetsPerSecond == 0) ? USECS_PER_SECOND : (USECS_PER_SECOND / _packetsPerSecond);
@ -49,7 +46,6 @@ bool PacketSender::process() {
// send the packet through the NodeList...
UDPSocket* nodeSocket = NodeList::getInstance()->getNodeSocket();
//printf("sending a packet... length=%lu\n",packet.getLength());
nodeSocket->send(&packet.getAddress(), packet.getData(), packet.getLength());
if (_notify) {

View file

@ -22,7 +22,7 @@ public:
/// Generalized threaded processor for queueing and sending of outbound packets.
class PacketSender : public GenericThread {
class PacketSender : public virtual GenericThread {
public:
static const int DEFAULT_PACKETS_PER_SECOND;
static const int MINIMUM_PACKETS_PER_SECOND;
@ -34,7 +34,7 @@ public:
/// \param packetData pointer to data
/// \param ssize_t packetLength size of data
/// \thread any thread, typically the application thread
void queuePacket(sockaddr& address, unsigned char* packetData, ssize_t packetLength);
void queuePacketForSending(sockaddr& address, unsigned char* packetData, ssize_t packetLength);
void setPacketsPerSecond(int packetsPerSecond) { _packetsPerSecond = std::min(MINIMUM_PACKETS_PER_SECOND, packetsPerSecond); }
int getPacketsPerSecond() const { return _packetsPerSecond; }
@ -46,6 +46,9 @@ public:
protected:
int _packetsPerSecond;
bool hasPacketsToSend() const { return _packets.size() > 0; }
int packetsToSendCount() const { return _packets.size(); }
private:
std::vector<NetworkPacket> _packets;

View file

@ -8,9 +8,20 @@
// Threaded or non-threaded packet receiver.
//
#include "NodeList.h"
#include "ReceivedPacketProcessor.h"
#include "SharedUtil.h"
void ReceivedPacketProcessor::queueReceivedPacket(sockaddr& address, unsigned char* packetData, ssize_t packetLength) {
//printf("ReceivedPacketProcessor::queueReceivedPacket() packetData=%p, packetLength=%lu\n", packetData, packetLength);
// Make sure our Node and NodeList knows we've heard from this node.
Node* node = NodeList::getInstance()->nodeWithAddress(&address);
if (node) {
node->setLastHeardMicrostamp(usecTimestampNow());
}
void ReceivedPacketProcessor::queuePacket(sockaddr& address, unsigned char* packetData, ssize_t packetLength) {
NetworkPacket packet(address, packetData, packetLength);
lock();
_packets.push_back(packet);
@ -19,10 +30,12 @@ void ReceivedPacketProcessor::queuePacket(sockaddr& address, unsigned char* pack
bool ReceivedPacketProcessor::process() {
if (_packets.size() == 0) {
//printf("ReceivedPacketProcessor::process() no packets... sleeping... \n");
const uint64_t RECEIVED_THREAD_SLEEP_INTERVAL = (1000 * 1000)/60; // check at 60fps
usleep(RECEIVED_THREAD_SLEEP_INTERVAL);
}
while (_packets.size() > 0) {
//printf("ReceivedPacketProcessor::process() we got packets!! call processPacket() \n");
NetworkPacket& packet = _packets.front();
processPacket(packet.getAddress(), packet.getData(), packet.getLength());

View file

@ -15,7 +15,7 @@
#include "NetworkPacket.h"
/// Generalized threaded processor for handling received inbound packets.
class ReceivedPacketProcessor : public GenericThread {
class ReceivedPacketProcessor : public virtual GenericThread {
public:
/// Add packet from network receive thread to the processing queue.
@ -23,7 +23,7 @@ public:
/// \param packetData pointer to received data
/// \param ssize_t packetLength size of received data
/// \thread network receive thread
void queuePacket(sockaddr& senderAddress, unsigned char* packetData, ssize_t packetLength);
void queueReceivedPacket(sockaddr& senderAddress, unsigned char* packetData, ssize_t packetLength);
protected:
/// Callback for processing of recieved packets. Implement this to process the incoming packets.
@ -35,6 +35,13 @@ protected:
/// Implements generic processing behavior for this thread.
virtual bool process();
/// Are there received packets waiting to be processed
bool hasPacketsToProcess() const { return _packets.size() > 0; }
/// How many received packets waiting are to be processed
int packetsToProcessCount() const { return _packets.size(); }
private:
std::vector<NetworkPacket> _packets;

View file

@ -0,0 +1,75 @@
//
// JurisdictionListener.cpp
// shared
//
// Created by Brad Hefta-Gaub on 8/12/13.
// Copyright (c) 2013 High Fidelity, Inc. All rights reserved.
//
// Threaded or non-threaded jurisdiction Sender for the Application
//
#include <PerfStat.h>
#include <OctalCode.h>
#include <SharedUtil.h>
#include <PacketHeaders.h>
#include "JurisdictionListener.h"
JurisdictionListener::JurisdictionListener(PacketSenderNotify* notify) :
PacketSender(notify, JurisdictionListener::DEFAULT_PACKETS_PER_SECOND)
{
}
bool JurisdictionListener::queueJurisdictionRequest() {
static unsigned char buffer[MAX_PACKET_SIZE];
unsigned char* bufferOut = &buffer[0];
ssize_t sizeOut = populateTypeAndVersion(bufferOut, PACKET_TYPE_VOXEL_JURISDICTION_REQUEST);
int nodeCount = 0;
NodeList* nodeList = NodeList::getInstance();
for (NodeList::iterator node = nodeList->begin(); node != nodeList->end(); node++) {
// only send to the NodeTypes that are interested in our jurisdiction details
const int numNodeTypes = 1;
const NODE_TYPE nodeTypes[numNodeTypes] = { NODE_TYPE_VOXEL_SERVER };
if (node->getActiveSocket() != NULL && memchr(nodeTypes, node->getType(), numNodeTypes)) {
sockaddr* nodeAddress = node->getActiveSocket();
PacketSender::queuePacketForSending(*nodeAddress, bufferOut, sizeOut);
nodeCount++;
}
}
// set our packets per second to be the number of nodes
setPacketsPerSecond(nodeCount);
// keep going if still running
return isStillRunning();
}
void JurisdictionListener::processPacket(sockaddr& senderAddress, unsigned char* packetData, ssize_t packetLength) {
if (packetData[0] == PACKET_TYPE_VOXEL_JURISDICTION) {
Node* node = NodeList::getInstance()->nodeWithAddress(&senderAddress);
if (node) {
uint16_t nodeID = node->getNodeID();
JurisdictionMap map;
map.unpackFromMessage(packetData, packetLength);
_jurisdictions[nodeID] = map;
}
}
}
bool JurisdictionListener::process() {
bool continueProcessing = isStillRunning();
// If we're still running, and we don't have any requests waiting to be sent, then queue our jurisdiction requests
if (continueProcessing && !hasPacketsToSend()) {
queueJurisdictionRequest();
continueProcessing = PacketSender::process();
}
if (continueProcessing) {
// NOTE: This will sleep if there are no pending packets to process
continueProcessing = ReceivedPacketProcessor::process();
}
return continueProcessing;
}

View file

@ -0,0 +1,47 @@
//
// JurisdictionListener.h
// shared
//
// Created by Brad Hefta-Gaub on 8/12/13.
// Copyright (c) 2013 High Fidelity, Inc. All rights reserved.
//
// Voxel Packet Sender
//
#ifndef __shared__JurisdictionListener__
#define __shared__JurisdictionListener__
#include <PacketSender.h>
#include <ReceivedPacketProcessor.h>
#include "JurisdictionMap.h"
/// Sends out PACKET_TYPE_VOXEL_JURISDICTION_REQUEST packets to all voxel servers and then listens for and processes
/// the PACKET_TYPE_VOXEL_JURISDICTION packets it receives in order to maintain an accurate state of all jurisidictions
/// within the domain. As with other ReceivedPacketProcessor classes the user is responsible for reading inbound packets
/// and adding them to the processing queue by calling queueReceivedPacket()
class JurisdictionListener : public PacketSender, public ReceivedPacketProcessor {
public:
static const int DEFAULT_PACKETS_PER_SECOND = 1;
JurisdictionListener(PacketSenderNotify* notify = NULL);
virtual bool process();
const NodeToJurisdictionMap& getJurisdictions() const { return _jurisdictions; };
protected:
/// Callback for processing of received packets. Will process any queued PACKET_TYPE_VOXEL_JURISDICTION and update the
/// jurisdiction map member variable
/// \param sockaddr& senderAddress the address of the sender
/// \param packetData pointer to received data
/// \param ssize_t packetLength size of received data
/// \thread "this" individual processing thread
virtual void processPacket(sockaddr& senderAddress, unsigned char* packetData, ssize_t packetLength);
private:
NodeToJurisdictionMap _jurisdictions;
bool queueJurisdictionRequest();
};
#endif // __shared__JurisdictionListener__

View file

@ -10,6 +10,7 @@
#define __hifi__JurisdictionMap__
#include <vector>
#include <map>
#include <QtCore/QString>
class JurisdictionMap {
@ -62,6 +63,11 @@ private:
std::vector<unsigned char*> _endNodes;
};
/// Map between node IDs and their reported JurisdictionMap. Typically used by classes that need to know which nodes are
/// managing which jurisdictions.
typedef std::map<uint16_t, JurisdictionMap> NodeToJurisdictionMap;
#endif /* defined(__hifi__JurisdictionMap__) */

View file

@ -18,39 +18,59 @@
JurisdictionSender::JurisdictionSender(JurisdictionMap* map, PacketSenderNotify* notify) :
PacketSender(notify, JurisdictionSender::DEFAULT_PACKETS_PER_SECOND),
ReceivedPacketProcessor(),
_jurisdictionMap(map)
{
}
bool JurisdictionSender::process() {
//printf("JurisdictionSender::process() _packetsPerSecond=%d\n", _packetsPerSecond);
// add our packet to our own queue, then let the PacketSender class do the rest of the work.
if (_jurisdictionMap) {
//printf("JurisdictionSender::process() _jurisdictionMap=%p\n",_jurisdictionMap);
unsigned char buffer[MAX_PACKET_SIZE];
unsigned char* bufferOut = &buffer[0];
ssize_t sizeOut = _jurisdictionMap->packIntoMessage(bufferOut, MAX_PACKET_SIZE);
int nodeCount = 0;
NodeList* nodeList = NodeList::getInstance();
for (NodeList::iterator node = nodeList->begin(); node != nodeList->end(); node++) {
//printf("JurisdictionSender::process() node loop node=%d type=%c\n",node->getNodeID(), node->getType());
// only send to the NodeTypes that are interested in our jurisdiction details
const int numNodeTypes = 1;
const NODE_TYPE nodeTypes[numNodeTypes] = { NODE_TYPE_ANIMATION_SERVER };
if (node->getActiveSocket() != NULL && memchr(nodeTypes, node->getType(), numNodeTypes)) {
sockaddr* nodeAddress = node->getActiveSocket();
queuePacket(*nodeAddress, bufferOut, sizeOut);
nodeCount++;
}
void JurisdictionSender::processPacket(sockaddr& senderAddress, unsigned char* packetData, ssize_t packetLength) {
if (packetData[0] == PACKET_TYPE_VOXEL_JURISDICTION_REQUEST) {
Node* node = NodeList::getInstance()->nodeWithAddress(&senderAddress);
if (node) {
uint16_t nodeID = node->getNodeID();
lock();
_nodesRequestingJurisdictions.insert(nodeID);
unlock();
}
// set our packets per second to be the number of nodes
setPacketsPerSecond(nodeCount);
printf("loaded %d packets to send\n", nodeCount);
}
}
bool JurisdictionSender::process() {
bool continueProcessing = isStillRunning();
// call our ReceivedPacketProcessor base class process so we'll get any pending packets
if (continueProcessing) {
continueProcessing = ReceivedPacketProcessor::process();
}
return PacketSender::process();
if (continueProcessing) {
// add our packet to our own queue, then let the PacketSender class do the rest of the work.
if (_jurisdictionMap) {
unsigned char buffer[MAX_PACKET_SIZE];
unsigned char* bufferOut = &buffer[0];
ssize_t sizeOut = _jurisdictionMap->packIntoMessage(bufferOut, MAX_PACKET_SIZE);
int nodeCount = 0;
for (std::set<uint16_t>::iterator nodeIterator = _nodesRequestingJurisdictions.begin();
nodeIterator != _nodesRequestingJurisdictions.end(); nodeIterator++) {
uint16_t nodeID = *nodeIterator;
Node* node = NodeList::getInstance()->nodeWithID(nodeID);
if (node->getActiveSocket() != NULL) {
sockaddr* nodeAddress = node->getActiveSocket();
queuePacketForSending(*nodeAddress, bufferOut, sizeOut);
nodeCount++;
// remove it from the set
_nodesRequestingJurisdictions.erase(nodeIterator);
}
}
// set our packets per second to be the number of nodes
setPacketsPerSecond(nodeCount);
}
continueProcessing = PacketSender::process();
}
return continueProcessing;
}

View file

@ -11,11 +11,16 @@
#ifndef __shared__JurisdictionSender__
#define __shared__JurisdictionSender__
#include <set>
#include <PacketSender.h>
#include <ReceivedPacketProcessor.h>
#include "JurisdictionMap.h"
/// Threaded processor for queueing and sending of outbound edit voxel packets.
class JurisdictionSender : public PacketSender {
/// Will process PACKET_TYPE_VOXEL_JURISDICTION_REQUEST packets and send out PACKET_TYPE_VOXEL_JURISDICTION packets
/// to requesting parties. As with other ReceivedPacketProcessor classes the user is responsible for reading inbound packets
/// and adding them to the processing queue by calling queueReceivedPacket()
class JurisdictionSender : public PacketSender, public ReceivedPacketProcessor {
public:
static const int DEFAULT_PACKETS_PER_SECOND = 1;
@ -25,7 +30,11 @@ public:
virtual bool process();
protected:
virtual void processPacket(sockaddr& senderAddress, unsigned char* packetData, ssize_t packetLength);
private:
JurisdictionMap* _jurisdictionMap;
std::set<uint16_t> _nodesRequestingJurisdictions;
};
#endif // __shared__JurisdictionSender__

View file

@ -43,7 +43,7 @@ void VoxelEditPacketSender::actuallySendMessage(uint16_t nodeID, unsigned char*
if (node->getActiveSocket() != NULL && node->getType() == NODE_TYPE_VOXEL_SERVER &&
((node->getNodeID() == nodeID) || (nodeID == (uint16_t)UNKNOWN_NODE_ID)) ) {
sockaddr* nodeAddress = node->getActiveSocket();
queuePacket(*nodeAddress, bufferOut, sizeOut);
queuePacketForSending(*nodeAddress, bufferOut, sizeOut);
}
}
}

View file

@ -43,7 +43,7 @@ public:
bool getShouldSend() const { return _shouldSend; }
void setShouldSend(bool shouldSend) { _shouldSend = shouldSend; }
void setVoxelServerJurisdictions(std::map<uint16_t, JurisdictionMap>* voxelServerJurisdictions) {
void setVoxelServerJurisdictions(NodeToJurisdictionMap* voxelServerJurisdictions) {
_voxelServerJurisdictions = voxelServerJurisdictions;
}
@ -55,6 +55,6 @@ private:
std::map<uint16_t,EditPacketBuffer> _pendingEditPackets;
std::map<uint16_t, JurisdictionMap>* _voxelServerJurisdictions;
NodeToJurisdictionMap* _voxelServerJurisdictions;
};
#endif // __shared__VoxelEditPacketSender__

View file

@ -662,7 +662,9 @@ int main(int argc, const char * argv[]) {
// set up our jurisdiction broadcaster...
::jurisdictionSender = new JurisdictionSender(::jurisdiction);
jurisdictionSender->initialize(true);
if (::jurisdictionSender) {
::jurisdictionSender->initialize(true);
}
// loop to send to nodes requesting data
while (true) {
@ -826,6 +828,10 @@ int main(int argc, const char * argv[]) {
} else if (packetData[0] == PACKET_TYPE_DOMAIN) {
//printf("PACKET_TYPE_DOMAIN packet\n");
nodeList->processNodeData(&nodePublicAddress, packetData, receivedBytes);
} else if (packetData[0] == PACKET_TYPE_VOXEL_JURISDICTION_REQUEST) {
if (::jurisdictionSender) {
jurisdictionSender->queueReceivedPacket(nodePublicAddress, packetData, receivedBytes);
}
} else {
printf("unknown packet ignored... packetData[0]=%c\n", packetData[0]);
}