first cut at JurisdictionSender and related changes

This commit is contained in:
ZappoMan 2013-08-19 11:05:29 -07:00
parent 5a96f51602
commit e64664c0d1
8 changed files with 113 additions and 22 deletions

View file

@ -735,21 +735,27 @@ int main(int argc, const char * argv[])
// Handle Local Domain testing with the --local command line
const char* NO_BILLBOARD = "--NoBillboard";
::includeBillboard = !cmdOptionExists(argc, argv, NO_BILLBOARD);
printf("includeBillboard=%s\n",debug::valueOf(::includeBillboard));
const char* NO_BORDER_TRACER = "--NoBorderTracer";
::includeBorderTracer = !cmdOptionExists(argc, argv, NO_BORDER_TRACER);
printf("includeBorderTracer=%s\n",debug::valueOf(::includeBorderTracer));
const char* NO_MOVING_BUG = "--NoMovingBug";
::includeMovingBug = !cmdOptionExists(argc, argv, NO_MOVING_BUG);
printf("includeMovingBug=%s\n",debug::valueOf(::includeMovingBug));
const char* INCLUDE_BLINKING_VOXEL = "--includeBlinkingVoxel";
::includeBlinkingVoxel = cmdOptionExists(argc, argv, INCLUDE_BLINKING_VOXEL);
printf("includeBlinkingVoxel=%s\n",debug::valueOf(::includeBlinkingVoxel));
const char* NO_DANCE_FLOOR = "--NoDanceFloor";
::includeDanceFloor = !cmdOptionExists(argc, argv, NO_DANCE_FLOOR);
printf("includeDanceFloor=%s\n",debug::valueOf(::includeDanceFloor));
const char* BUILD_STREET = "--BuildStreet";
::buildStreet = cmdOptionExists(argc, argv, BUILD_STREET);
printf("buildStreet=%s\n",debug::valueOf(::buildStreet));
// Handle Local Domain testing with the --local command line
const char* showPPS = "--showPPS";

View file

@ -127,6 +127,14 @@ int main(int argc, const char * argv[])
unsigned char* nodeTypesOfInterest = packetData + numBytesSenderHeader + sizeof(NODE_TYPE)
+ numBytesSocket + sizeof(unsigned char);
int numInterestTypes = *(nodeTypesOfInterest - 1);
/*
printf("line 131 newNode=%d type=%c numInterestTypes=%d\n", newNode->getNodeID(), newNode->getType(), numInterestTypes);
for (int t = 0; t < numInterestTypes; t++) {
printf("line 133 t=%d type=%c\n", t, nodeTypesOfInterest[t]);
}
*/
if (numInterestTypes > 0) {
// if the node has sent no types of interest, assume they want nothing but their own ID back
@ -136,15 +144,19 @@ int main(int argc, const char * argv[])
// this is not the node themselves
// and this is an node of a type in the passed node types of interest
// or the node did not pass us any specific types they are interested in
//printf("line 140 node=%d type=%c\n", node->getNodeID(), node->getType());
if (memchr(SOLO_NODE_TYPES, node->getType(), sizeof(SOLO_NODE_TYPES)) == NULL) {
// this is an node of which there can be multiple, just add them to the packet
// don't send avatar nodes to other avatars, that will come from avatar mixer
//printf("line 144 node=%d type=%c\n", node->getNodeID(), node->getType());
if (nodeType != NODE_TYPE_AGENT || node->getType() != NODE_TYPE_AGENT) {
//printf("line 146 node=%d type=%c\n", node->getNodeID(), node->getType());
currentBufferPos = addNodeToBroadcastPacket(currentBufferPos, &(*node));
}
} else {
//printf("line 151 node=%d type=%c\n", node->getNodeID(), node->getType());
// solo node, we need to only send newest
if (newestSoloNodes[node->getType()] == NULL ||
newestSoloNodes[node->getType()]->getWakeMicrostamp() < node->getWakeMicrostamp()) {

View file

@ -30,10 +30,11 @@ public:
/// If you're running in non-threaded mode, you must call this regularly
void* threadRoutine();
protected:
/// Override this function to do whatever your class actually does, return false to exit thread early.
virtual bool process() = 0;
protected:
/// Locks all the resources of the thread.
void lock() { pthread_mutex_lock(&_mutex); }

View file

@ -411,6 +411,7 @@ Node* NodeList::addOrUpdateNode(sockaddr* publicSocket, sockaddr* localSocket, c
} else {
if (node->getType() == NODE_TYPE_AUDIO_MIXER ||
//node->getType() == NODE_TYPE_ANIMATION_SERVER ||
node->getType() == NODE_TYPE_VOXEL_SERVER) {
// until the Audio class also uses our nodeList, we need to update
// the lastRecvTimeUsecs for the audio mixer so it doesn't get killed and re-added continously

View file

@ -10,14 +10,19 @@
#include <stdint.h>
const uint64_t SEND_INTERVAL_USECS = 1000 * 5; // no more than 200pps... should be settable
#include "NodeList.h"
#include "PacketSender.h"
#include "SharedUtil.h"
PacketSender::PacketSender(PacketSenderNotify* notify) : _notify(notify) {
_lastSendTime = usecTimestampNow();
const int PacketSender::DEFAULT_PACKETS_PER_SECOND = 200;
const int PacketSender::MINIMUM_PACKETS_PER_SECOND = 1;
PacketSender::PacketSender(PacketSenderNotify* notify, int packetsPerSecond) :
_packetsPerSecond(packetsPerSecond),
_lastSendTime(usecTimestampNow()),
_notify(notify)
{
}
@ -29,9 +34,14 @@ void PacketSender::queuePacket(sockaddr& address, unsigned char* packetData, ssi
}
bool PacketSender::process() {
//printf("PacketSender::process() packets pending=%ld _packetsPerSecond=%d\n",_packets.size(),_packetsPerSecond);
uint64_t USECS_PER_SECOND = 1000 * 1000;
uint64_t SEND_INTERVAL_USECS = (_packetsPerSecond == 0) ? USECS_PER_SECOND : (USECS_PER_SECOND / _packetsPerSecond);
if (_packets.size() == 0) {
const uint64_t SEND_THREAD_SLEEP_INTERVAL = (1000 * 1000)/60; // check at 60fps
usleep(SEND_THREAD_SLEEP_INTERVAL);
usleep(SEND_INTERVAL_USECS);
}
while (_packets.size() > 0) {
NetworkPacket& packet = _packets.front();
@ -39,7 +49,9 @@ bool PacketSender::process() {
// send the packet through the NodeList...
UDPSocket* nodeSocket = NodeList::getInstance()->getNodeSocket();
//printf("sending a packet... length=%lu\n",packet.getLength());
nodeSocket->send(&packet.getAddress(), packet.getData(), packet.getLength());
if (_notify) {
_notify->packetSentNotification(packet.getLength());
}

View file

@ -24,8 +24,10 @@ public:
/// Generalized threaded processor for queueing and sending of outbound packets.
class PacketSender : public GenericThread {
public:
static const int DEFAULT_PACKETS_PER_SECOND;
static const int MINIMUM_PACKETS_PER_SECOND;
PacketSender(PacketSenderNotify* notify = NULL);
PacketSender(PacketSenderNotify* notify = NULL, int packetsPerSecond = DEFAULT_PACKETS_PER_SECOND);
/// Add packet to outbound queue.
/// \param sockaddr& address the destination address
@ -34,14 +36,21 @@ public:
/// \thread any thread, typically the application thread
void queuePacket(sockaddr& address, unsigned char* packetData, ssize_t packetLength);
private:
void setPacketsPerSecond(int packetsPerSecond) { _packetsPerSecond = std::min(MINIMUM_PACKETS_PER_SECOND, packetsPerSecond); }
int getPacketsPerSecond() const { return _packetsPerSecond; }
void setPacketSenderNotify(PacketSenderNotify* notify) { _notify = notify; }
PacketSenderNotify* getPacketSenderNotify() const { return _notify; }
virtual bool process();
protected:
int _packetsPerSecond;
private:
std::vector<NetworkPacket> _packets;
uint64_t _lastSendTime;
PacketSenderNotify* _notify;
};
#endif // __shared__PacketSender__

View file

@ -217,13 +217,15 @@ int JurisdictionMap::packIntoMessage(unsigned char* destinationBuffer, int avail
// if and only if there's a root jurisdiction, also include the end nodes
int endNodeCount = _endNodes.size();
memcpy(destinationBuffer, &endNodeCount, sizeof(endNodeCount));
destinationBuffer += sizeof(endNodeCount);
for (int i=0; i < endNodeCount; i++) {
unsigned char* endNodeCode = _endNodes[i];
int bytes = bytesRequiredForCodeLength(numberOfThreeBitSectionsInCode(endNodeCode));
int bytes = 0;
if (endNodeCode) {
bytes = bytesRequiredForCodeLength(numberOfThreeBitSectionsInCode(endNodeCode));
}
memcpy(destinationBuffer, &bytes, sizeof(bytes));
destinationBuffer += sizeof(bytes);
memcpy(destinationBuffer, endNodeCode, bytes);
@ -266,7 +268,11 @@ int JurisdictionMap::unpackFromMessage(unsigned char* sourceBuffer, int availabl
unsigned char* endNodeCode = new unsigned char[bytes];
memcpy(endNodeCode, sourceBuffer, bytes);
sourceBuffer += bytes;
_endNodes.push_back(endNodeCode);
// if the endNodeCode was 0 length then don't add it
if (bytes > 0) {
_endNodes.push_back(endNodeCode);
}
}
}

View file

@ -21,6 +21,8 @@
#include <SceneUtils.h>
#include <PerfStat.h>
#include <JurisdictionSender.h>
#ifdef _WIN32
#include "Syssocket.h"
#include "Systime.h"
@ -73,6 +75,7 @@ EnvironmentData environmentData[3];
int receivedPacketCount = 0;
JurisdictionMap* jurisdiction = NULL;
JurisdictionSender* jurisdictionSender = NULL;
void randomlyFillVoxelTree(int levelsToGo, VoxelNode *currentRootNode) {
// randomly generate children for this node
@ -400,7 +403,7 @@ void persistVoxelsWhenDirty() {
}
}
void *distributeVoxelsToListeners(void *args) {
void* distributeVoxelsToListeners(void* args) {
NodeList* nodeList = NodeList::getInstance();
timeval lastSendTime;
@ -408,7 +411,6 @@ void *distributeVoxelsToListeners(void *args) {
while (true) {
gettimeofday(&lastSendTime, NULL);
// enumerate the nodes to send 3 packets to each
for (NodeList::iterator node = nodeList->begin(); node != nodeList->end(); node++) {
VoxelNodeData* nodeData = (VoxelNodeData*) node->getLinkedData();
@ -482,11 +484,10 @@ int main(int argc, const char * argv[]) {
}
if (jurisdictionRoot || jurisdictionEndNodes) {
jurisdiction = new JurisdictionMap(jurisdictionRoot, jurisdictionEndNodes);
::jurisdiction = new JurisdictionMap(jurisdictionRoot, jurisdictionEndNodes);
}
}
// should we send environments? Default is yes, but this command line suppresses sending
const char* DUMP_VOXELS_ON_MOVE = "--dumpVoxelsOnMove";
::dumpVoxelsOnMove = cmdOptionExists(argc, argv, DUMP_VOXELS_ON_MOVE);
@ -509,6 +510,10 @@ int main(int argc, const char * argv[]) {
NodeList* nodeList = NodeList::createInstance(NODE_TYPE_VOXEL_SERVER, listenPort);
setvbuf(stdout, NULL, _IOLBF, 0);
const int numNodeTypes = 2;
const NODE_TYPE nodeTypes[numNodeTypes] = { NODE_TYPE_AGENT, NODE_TYPE_ANIMATION_SERVER };
NodeList::getInstance()->setNodeTypesOfInterest(&nodeTypes[0], numNodeTypes);
// Handle Local Domain testing with the --local command line
const char* local = "--local";
::wantLocalDomain = cmdOptionExists(argc, argv,local);
@ -655,8 +660,11 @@ int main(int argc, const char * argv[]) {
timeval lastDomainServerCheckIn = {};
// set up our jurisdiction broadcaster...
::jurisdictionSender = new JurisdictionSender(::jurisdiction);
jurisdictionSender->initialize(true);
// loop to send to nodes requesting data
while (true) {
// send a check in packet to the domain server if DOMAIN_SERVER_CHECK_IN_USECS has elapsed
@ -735,12 +743,30 @@ int main(int argc, const char * argv[]) {
voxelData += voxelDataSize;
atByte += voxelDataSize;
}
//printf("PACKET_TYPE_SET_VOXEL ...\n");
// Make sure our Node and NodeList knows we've heard from this node.
Node* node = NodeList::getInstance()->nodeWithAddress(&nodePublicAddress);
if (node) {
//printf("PACKET_TYPE_SET_VOXEL node->setLastHeardMicrostamp(usecTimestampNow());\n");
node->setLastHeardMicrostamp(usecTimestampNow());
}
} else if (packetData[0] == PACKET_TYPE_ERASE_VOXEL) {
// Send these bits off to the VoxelTree class to process them
pthread_mutex_lock(&::treeLock);
serverTree.processRemoveVoxelBitstream((unsigned char*)packetData, receivedBytes);
pthread_mutex_unlock(&::treeLock);
// Make sure our Node and NodeList knows we've heard from this node.
Node* node = NodeList::getInstance()->nodeWithAddress(&nodePublicAddress);
if (node) {
//printf("PACKET_TYPE_ERASE_VOXEL node->setLastHeardMicrostamp(usecTimestampNow());\n");
node->setLastHeardMicrostamp(usecTimestampNow());
}
} else if (packetData[0] == PACKET_TYPE_Z_COMMAND) {
// the Z command is a special command that allows the sender to send the voxel server high level semantic
@ -774,6 +800,14 @@ int main(int argc, const char * argv[]) {
printf("rebroadcasting Z message to connected nodes... nodeList.broadcastToNodes()\n");
nodeList->broadcastToNodes(packetData, receivedBytes, &NODE_TYPE_AGENT, 1);
}
// Make sure our Node and NodeList knows we've heard from this node.
Node* node = NodeList::getInstance()->nodeWithAddress(&nodePublicAddress);
if (node) {
printf("PACKET_TYPE_Z_COMMAND node->setLastHeardMicrostamp(usecTimestampNow());\n");
node->setLastHeardMicrostamp(usecTimestampNow());
}
} else if (packetData[0] == PACKET_TYPE_HEAD_DATA) {
// If we got a PACKET_TYPE_HEAD_DATA, then we're talking to an NODE_TYPE_AVATAR, and we
// need to make sure we have it in our nodeList.
@ -789,6 +823,11 @@ int main(int argc, const char * argv[]) {
} else if (packetData[0] == PACKET_TYPE_PING) {
// If the packet is a ping, let processNodeData handle it.
nodeList->processNodeData(&nodePublicAddress, packetData, receivedBytes);
} else if (packetData[0] == PACKET_TYPE_DOMAIN) {
//printf("PACKET_TYPE_DOMAIN packet\n");
nodeList->processNodeData(&nodePublicAddress, packetData, receivedBytes);
} else {
printf("unknown packet ignored... packetData[0]=%c\n", packetData[0]);
}
}
}
@ -796,8 +835,13 @@ int main(int argc, const char * argv[]) {
pthread_join(sendVoxelThread, NULL);
pthread_mutex_destroy(&::treeLock);
if (jurisdiction) {
delete jurisdiction;
if (::jurisdiction) {
delete ::jurisdiction;
}
if (::jurisdictionSender) {
jurisdictionSender->terminate();
delete ::jurisdictionSender;
}
return 0;