Merge pull request #870 from ZappoMan/animation_server_jurisdictions

Improve Animation Server to support Jurisdictions in Multi-voxel-server environment
This commit is contained in:
Stephen Birarda 2013-08-21 13:18:19 -07:00
commit 1995788c2b
26 changed files with 737 additions and 188 deletions

View file

@ -17,8 +17,10 @@
#include <NodeTypes.h>
#include <OctalCode.h>
#include <PacketHeaders.h>
#include <JurisdictionListener.h>
#include <SceneUtils.h>
#include <SharedUtil.h>
#include <VoxelEditPacketSender.h>
#include <VoxelTree.h>
#ifdef _WIN32
@ -49,23 +51,10 @@ bool wantLocalDomain = false;
unsigned long packetsSent = 0;
unsigned long bytesSent = 0;
static void sendVoxelEditMessage(PACKET_TYPE type, VoxelDetail& detail) {
unsigned char* bufferOut;
int sizeOut;
if (createVoxelEditMessage(type, 0, 1, &detail, bufferOut, sizeOut)){
::packetsSent++;
::bytesSent += sizeOut;
JurisdictionListener* jurisdictionListener = NULL;
VoxelEditPacketSender* voxelEditPacketSender = NULL;
if (::shouldShowPacketsPerSecond) {
printf("sending packet of size=%d\n",sizeOut);
}
NodeList::getInstance()->broadcastToNodes(bufferOut, sizeOut, &NODE_TYPE_VOXEL_SERVER, 1);
delete[] bufferOut;
}
}
glm::vec3 rotatePoint(glm::vec3 point, float angle) {
// First, create the quaternion based on this angle of rotation
@ -141,8 +130,6 @@ const BugPart bugParts[VOXELS_PER_BUG] = {
static void renderMovingBug() {
VoxelDetail details[VOXELS_PER_BUG];
unsigned char* bufferOut;
int sizeOut;
// Generate voxels for where bug used to be
for (int i = 0; i < VOXELS_PER_BUG; i++) {
@ -163,17 +150,7 @@ static void renderMovingBug() {
// send the "erase message" first...
PACKET_TYPE message = PACKET_TYPE_ERASE_VOXEL;
if (createVoxelEditMessage(message, 0, VOXELS_PER_BUG, (VoxelDetail*)&details, bufferOut, sizeOut)){
::packetsSent++;
::bytesSent += sizeOut;
if (::shouldShowPacketsPerSecond) {
printf("sending packet of size=%d\n", sizeOut);
}
NodeList::getInstance()->broadcastToNodes(bufferOut, sizeOut, &NODE_TYPE_VOXEL_SERVER, 1);
delete[] bufferOut;
}
::voxelEditPacketSender->queueVoxelEditMessages(message, VOXELS_PER_BUG, (VoxelDetail*)&details);
// Move the bug...
if (moveBugInLine) {
@ -233,17 +210,7 @@ static void renderMovingBug() {
// send the "create message" ...
message = PACKET_TYPE_SET_VOXEL_DESTRUCTIVE;
if (createVoxelEditMessage(message, 0, VOXELS_PER_BUG, (VoxelDetail*)&details, bufferOut, sizeOut)){
::packetsSent++;
::bytesSent += sizeOut;
if (::shouldShowPacketsPerSecond) {
printf("sending packet of size=%d\n", sizeOut);
}
NodeList::getInstance()->broadcastToNodes(bufferOut, sizeOut, &NODE_TYPE_VOXEL_SERVER, 1);
delete[] bufferOut;
}
::voxelEditPacketSender->queueVoxelEditMessages(message, VOXELS_PER_BUG, (VoxelDetail*)&details);
}
@ -279,7 +246,7 @@ static void sendVoxelBlinkMessage() {
PACKET_TYPE message = PACKET_TYPE_SET_VOXEL_DESTRUCTIVE;
sendVoxelEditMessage(message, detail);
::voxelEditPacketSender->sendVoxelEditMessage(message, detail);
}
bool stringOfLightsInitialized = false;
@ -297,8 +264,6 @@ static void sendBlinkingStringOfLights() {
PACKET_TYPE message = PACKET_TYPE_SET_VOXEL_DESTRUCTIVE; // we're a bully!
float lightScale = STRING_OF_LIGHTS_SIZE;
static VoxelDetail details[LIGHTS_PER_SEGMENT];
unsigned char* bufferOut;
int sizeOut;
// first initialized the string of lights if needed...
if (!stringOfLightsInitialized) {
@ -338,19 +303,7 @@ static void sendBlinkingStringOfLights() {
details[indexInSegment].blue = offColor[2];
}
// send entire segment at once
if (createVoxelEditMessage(message, 0, LIGHTS_PER_SEGMENT, (VoxelDetail*)&details, bufferOut, sizeOut)){
::packetsSent++;
::bytesSent += sizeOut;
if (::shouldShowPacketsPerSecond) {
printf("sending packet of size=%d\n",sizeOut);
}
NodeList::getInstance()->broadcastToNodes(bufferOut, sizeOut, &NODE_TYPE_VOXEL_SERVER, 1);
delete[] bufferOut;
}
::voxelEditPacketSender->queueVoxelEditMessages(message, LIGHTS_PER_SEGMENT, (VoxelDetail*)&details);
}
stringOfLightsInitialized = true;
} else {
@ -381,17 +334,7 @@ static void sendBlinkingStringOfLights() {
details[1].blue = onColor[2];
// send both changes in same message
if (createVoxelEditMessage(message, 0, 2, (VoxelDetail*)&details, bufferOut, sizeOut)){
::packetsSent++;
::bytesSent += sizeOut;
if (::shouldShowPacketsPerSecond) {
printf("sending packet of size=%d\n",sizeOut);
}
NodeList::getInstance()->broadcastToNodes(bufferOut, sizeOut, &NODE_TYPE_VOXEL_SERVER, 1);
delete[] bufferOut;
}
::voxelEditPacketSender->queueVoxelEditMessages(message, 2, (VoxelDetail*)&details);
}
}
@ -427,8 +370,6 @@ void sendDanceFloor() {
PACKET_TYPE message = PACKET_TYPE_SET_VOXEL_DESTRUCTIVE; // we're a bully!
float lightScale = DANCE_FLOOR_LIGHT_SIZE;
static VoxelDetail details[DANCE_FLOOR_VOXELS_PER_PACKET];
unsigned char* bufferOut;
int sizeOut;
// first initialized the billboard of lights if needed...
if (!::danceFloorInitialized) {
@ -505,16 +446,7 @@ void sendDanceFloor() {
}
if (item == DANCE_FLOOR_VOXELS_PER_PACKET - 1) {
if (createVoxelEditMessage(message, 0, DANCE_FLOOR_VOXELS_PER_PACKET,
(VoxelDetail*)&details, bufferOut, sizeOut)){
::packetsSent++;
::bytesSent += sizeOut;
if (::shouldShowPacketsPerSecond) {
printf("sending packet of size=%d\n", sizeOut);
}
NodeList::getInstance()->broadcastToNodes(bufferOut, sizeOut, &NODE_TYPE_VOXEL_SERVER, 1);
delete[] bufferOut;
}
::voxelEditPacketSender->queueVoxelEditMessages(message, DANCE_FLOOR_VOXELS_PER_PACKET, (VoxelDetail*)&details);
}
}
}
@ -554,8 +486,6 @@ static void sendBillboard() {
PACKET_TYPE message = PACKET_TYPE_SET_VOXEL_DESTRUCTIVE; // we're a bully!
float lightScale = BILLBOARD_LIGHT_SIZE;
static VoxelDetail details[VOXELS_PER_PACKET];
unsigned char* bufferOut;
int sizeOut;
// first initialized the billboard of lights if needed...
if (!billboardInitialized) {
@ -603,15 +533,7 @@ static void sendBillboard() {
}
if (item == VOXELS_PER_PACKET - 1) {
if (createVoxelEditMessage(message, 0, VOXELS_PER_PACKET, (VoxelDetail*)&details, bufferOut, sizeOut)){
::packetsSent++;
::bytesSent += sizeOut;
if (::shouldShowPacketsPerSecond) {
printf("sending packet of size=%d\n", sizeOut);
}
NodeList::getInstance()->broadcastToNodes(bufferOut, sizeOut, &NODE_TYPE_VOXEL_SERVER, 1);
delete[] bufferOut;
}
::voxelEditPacketSender->queueVoxelEditMessages(message, VOXELS_PER_PACKET, (VoxelDetail*)&details);
}
}
}
@ -634,8 +556,6 @@ void doBuildStreet() {
PACKET_TYPE message = PACKET_TYPE_SET_VOXEL_DESTRUCTIVE; // we're a bully!
static VoxelDetail details[BRICKS_PER_PACKET];
unsigned char* bufferOut;
int sizeOut;
for (int z = 0; z < ROAD_LENGTH; z++) {
for (int x = 0; x < ROAD_WIDTH; x++) {
@ -656,15 +576,7 @@ void doBuildStreet() {
details[item].blue = randomTone;
if (item == BRICKS_PER_PACKET - 1) {
if (createVoxelEditMessage(message, 0, BRICKS_PER_PACKET, (VoxelDetail*)&details, bufferOut, sizeOut)){
::packetsSent++;
::bytesSent += sizeOut;
if (true || ::shouldShowPacketsPerSecond) {
printf("building road sending packet of size=%d\n", sizeOut);
}
NodeList::getInstance()->broadcastToNodes(bufferOut, sizeOut, &NODE_TYPE_VOXEL_SERVER, 1);
delete[] bufferOut;
}
::voxelEditPacketSender->queueVoxelEditMessages(message, BRICKS_PER_PACKET, (VoxelDetail*)&details);
}
}
}
@ -705,6 +617,10 @@ void* animateVoxels(void* args) {
doBuildStreet();
}
if (::voxelEditPacketSender) {
::voxelEditPacketSender->flushQueue();
}
uint64_t end = usecTimestampNow();
uint64_t elapsedSeconds = (end - ::start) / 1000000;
if (::shouldShowPacketsPerSecond) {
@ -735,21 +651,27 @@ int main(int argc, const char * argv[])
// Handle Local Domain testing with the --local command line
const char* NO_BILLBOARD = "--NoBillboard";
::includeBillboard = !cmdOptionExists(argc, argv, NO_BILLBOARD);
printf("includeBillboard=%s\n", debug::valueOf(::includeBillboard));
const char* NO_BORDER_TRACER = "--NoBorderTracer";
::includeBorderTracer = !cmdOptionExists(argc, argv, NO_BORDER_TRACER);
printf("includeBorderTracer=%s\n", debug::valueOf(::includeBorderTracer));
const char* NO_MOVING_BUG = "--NoMovingBug";
::includeMovingBug = !cmdOptionExists(argc, argv, NO_MOVING_BUG);
printf("includeMovingBug=%s\n", debug::valueOf(::includeMovingBug));
const char* INCLUDE_BLINKING_VOXEL = "--includeBlinkingVoxel";
::includeBlinkingVoxel = cmdOptionExists(argc, argv, INCLUDE_BLINKING_VOXEL);
printf("includeBlinkingVoxel=%s\n", debug::valueOf(::includeBlinkingVoxel));
const char* NO_DANCE_FLOOR = "--NoDanceFloor";
::includeDanceFloor = !cmdOptionExists(argc, argv, NO_DANCE_FLOOR);
printf("includeDanceFloor=%s\n", debug::valueOf(::includeDanceFloor));
const char* BUILD_STREET = "--BuildStreet";
::buildStreet = cmdOptionExists(argc, argv, BUILD_STREET);
printf("buildStreet=%s\n", debug::valueOf(::buildStreet));
// Handle Local Domain testing with the --local command line
const char* showPPS = "--showPPS";
@ -771,6 +693,21 @@ int main(int argc, const char * argv[])
nodeList->linkedDataCreateCallback = NULL; // do we need a callback?
nodeList->startSilentNodeRemovalThread();
// Create our JurisdictionListener so we'll know where to send edit packets
::jurisdictionListener = new JurisdictionListener();
if (::jurisdictionListener) {
::jurisdictionListener->initialize(true);
}
// Create out VoxelEditPacketSender
::voxelEditPacketSender = new VoxelEditPacketSender;
if (::voxelEditPacketSender) {
::voxelEditPacketSender->initialize(true);
if (::jurisdictionListener) {
::voxelEditPacketSender->setVoxelServerJurisdictions(::jurisdictionListener->getJurisdictions());
}
}
srand((unsigned)time(0));
pthread_t animateVoxelThread;
@ -795,11 +732,27 @@ int main(int argc, const char * argv[])
// Nodes sending messages to us...
if (nodeList->getNodeSocket()->receive(&nodePublicAddress, packetData, &receivedBytes) &&
packetVersionMatch(packetData)) {
if (packetData[0] == PACKET_TYPE_VOXEL_JURISDICTION) {
if (::jurisdictionListener) {
::jurisdictionListener->queueReceivedPacket(nodePublicAddress, packetData, receivedBytes);
}
}
NodeList::getInstance()->processNodeData(&nodePublicAddress, packetData, receivedBytes);
}
}
pthread_join(animateVoxelThread, NULL);
if (::jurisdictionListener) {
::jurisdictionListener->terminate();
delete ::jurisdictionListener;
}
if (::voxelEditPacketSender) {
::voxelEditPacketSender->terminate();
delete ::voxelEditPacketSender;
}
return 0;
}

View file

@ -136,7 +136,7 @@ int main(int argc, const char * argv[])
// this is not the node themselves
// and this is an node of a type in the passed node types of interest
// or the node did not pass us any specific types they are interested in
if (memchr(SOLO_NODE_TYPES, node->getType(), sizeof(SOLO_NODE_TYPES)) == NULL) {
// this is an node of which there can be multiple, just add them to the packet
// don't send avatar nodes to other avatars, that will come from avatar mixer

View file

@ -123,7 +123,7 @@ Application::Application(int& argc, char** argv, timeval &startup_time) :
_audio(&_audioScope, STARTUP_JITTER_SAMPLES),
#endif
_stopNetworkReceiveThread(false),
_voxelProcessor(this),
_voxelProcessor(),
_voxelEditSender(this),
_packetCount(0),
_packetsPerSecond(0),
@ -229,6 +229,9 @@ Application::Application(int& argc, char** argv, timeval &startup_time) :
_glWidget->setMouseTracking(true);
// initialization continues in initializeGL when OpenGL context is ready
// Tell our voxel edit sender about our known jurisdictions
_voxelEditSender.setVoxelServerJurisdictions(&_voxelServerJurisdictions);
}
Application::~Application() {
@ -1112,6 +1115,7 @@ void Application::setFullscreen(bool fullscreen) {
}
void Application::setRenderVoxels(bool voxelRender) {
_voxelEditSender.setShouldSend(voxelRender);
if (!voxelRender) {
doKillLocalVoxels();
}
@ -3154,7 +3158,7 @@ void* Application::networkReceive(void* args) {
case PACKET_TYPE_VOXEL_STATS:
case PACKET_TYPE_ENVIRONMENT_DATA: {
// add this packet to our list of voxel packets and process them on the voxel processing
app->_voxelProcessor.queuePacket(senderAddress, app->_incomingPacket, bytesReceived);
app->_voxelProcessor.queueReceivedPacket(senderAddress, app->_incomingPacket, bytesReceived);
break;
}
case PACKET_TYPE_BULK_AVATAR_DATA:
@ -3184,3 +3188,7 @@ void* Application::networkReceive(void* args) {
}
return NULL;
}
void Application::packetSentNotification(ssize_t length) {
_bandwidthMeter.outputStream(BandwidthMeter::VOXELS).updateValue(length);
}

View file

@ -74,7 +74,7 @@ static const float NODE_KILLED_RED = 1.0f;
static const float NODE_KILLED_GREEN = 0.0f;
static const float NODE_KILLED_BLUE = 0.0f;
class Application : public QApplication, public NodeListHook {
class Application : public QApplication, public NodeListHook, public PacketSenderNotify {
Q_OBJECT
friend class VoxelPacketProcessor;
@ -105,8 +105,6 @@ public:
const glm::vec3 getMouseVoxelWorldCoordinates(const VoxelDetail _mouseVoxel);
void updateParticleSystem(float deltaTime);
QGLWidget* getGLWidget() { return _glWidget; }
Avatar* getAvatar() { return &_myAvatar; }
Audio* getAudio() { return &_audio; }
@ -134,6 +132,7 @@ public:
virtual void nodeAdded(Node* node);
virtual void nodeKilled(Node* node);
virtual void packetSentNotification(ssize_t length);
public slots:
void sendAvatarFaceVideoMessage(int frameCount, const QByteArray& data);
@ -351,7 +350,7 @@ private:
VoxelSceneStats _voxelSceneStats;
int parseVoxelStats(unsigned char* messageData, ssize_t messageLength, sockaddr senderAddress);
std::map<uint16_t, JurisdictionMap> _voxelServerJurisdictions;
NodeToJurisdictionMap _voxelServerJurisdictions;
std::vector<VoxelFade> _voxelFades;
};

View file

@ -14,19 +14,17 @@
#include "Menu.h"
#include "VoxelPacketProcessor.h"
VoxelPacketProcessor::VoxelPacketProcessor(Application* app) :
_app(app) {
}
void VoxelPacketProcessor::processPacket(sockaddr& senderAddress, unsigned char* packetData, ssize_t packetLength) {
PerformanceWarning warn(Menu::getInstance()->isOptionChecked(MenuOption::PipelineWarnings),
"VoxelPacketProcessor::processPacket()");
ssize_t messageLength = packetLength;
Application* app = Application::getInstance();
// check to see if the UI thread asked us to kill the voxel tree. since we're the only thread allowed to do that
if (_app->_wantToKillLocalVoxels) {
_app->_voxels.killLocalVoxels();
_app->_wantToKillLocalVoxels = false;
if (app->_wantToKillLocalVoxels) {
app->_voxels.killLocalVoxels();
app->_wantToKillLocalVoxels = false;
}
// note: PACKET_TYPE_VOXEL_STATS can have PACKET_TYPE_VOXEL_DATA or PACKET_TYPE_VOXEL_DATA_MONOCHROME
@ -34,7 +32,7 @@ void VoxelPacketProcessor::processPacket(sockaddr& senderAddress, unsigned char*
// then process any remaining bytes as if it was another packet
if (packetData[0] == PACKET_TYPE_VOXEL_STATS) {
int statsMessageLength = _app->parseVoxelStats(packetData, messageLength, senderAddress);
int statsMessageLength = app->parseVoxelStats(packetData, messageLength, senderAddress);
if (messageLength > statsMessageLength) {
packetData += statsMessageLength;
messageLength -= statsMessageLength;
@ -51,11 +49,11 @@ void VoxelPacketProcessor::processPacket(sockaddr& senderAddress, unsigned char*
if (voxelServer && socketMatch(voxelServer->getActiveSocket(), &senderAddress)) {
voxelServer->lock();
if (packetData[0] == PACKET_TYPE_ENVIRONMENT_DATA) {
_app->_environment.parseData(&senderAddress, packetData, messageLength);
app->_environment.parseData(&senderAddress, packetData, messageLength);
} else {
_app->_voxels.setDataSourceID(voxelServer->getNodeID());
_app->_voxels.parseData(packetData, messageLength);
_app->_voxels.setDataSourceID(UNKNOWN_NODE_ID);
app->_voxels.setDataSourceID(voxelServer->getNodeID());
app->_voxels.parseData(packetData, messageLength);
app->_voxels.setDataSourceID(UNKNOWN_NODE_ID);
}
voxelServer->unlock();
}

View file

@ -13,17 +13,10 @@
#include <ReceivedPacketProcessor.h>
class Application;
/// Handles processing of incoming voxel packets for the interface application.
/// Handles processing of incoming voxel packets for the interface application. As with other ReceivedPacketProcessor classes
/// the user is responsible for reading inbound packets and adding them to the processing queue by calling queueReceivedPacket()
class VoxelPacketProcessor : public ReceivedPacketProcessor {
public:
VoxelPacketProcessor(Application* app);
protected:
virtual void processPacket(sockaddr& senderAddress, unsigned char* packetData, ssize_t packetLength);
private:
Application* _app;
};
#endif // __shared__VoxelPacketProcessor__

View file

@ -30,9 +30,10 @@ VoxelStatsDialog::VoxelStatsDialog(QWidget* parent, VoxelSceneStats* model) :
this->QDialog::setLayout(form);
// Setup labels
for (int i = 0; i < VoxelSceneStats::ITEM_COUNT; ++i) {
VoxelSceneStats::ItemInfo& itemInfo = _model->getItemInfo(i);
QLabel* label = _labels[i] = new QLabel();
for (int i = 0; i < (int)VoxelSceneStats::ITEM_COUNT; i++) {
VoxelSceneStats::Item item = (VoxelSceneStats::Item)(i);
VoxelSceneStats::ItemInfo& itemInfo = _model->getItemInfo(item);
QLabel* label = _labels[item] = new QLabel();
label->setAlignment(Qt::AlignRight);
// Set foreground color to 62.5% brightness of the meter (otherwise will be hard to read on the bright background)
@ -56,9 +57,10 @@ void VoxelStatsDialog::paintEvent(QPaintEvent* event) {
// Update labels
char strBuf[256];
for (int i = 0; i < VoxelSceneStats::ITEM_COUNT; ++i) {
QLabel* label = _labels[i];
snprintf(strBuf, sizeof(strBuf), "%s", _model->getItemValue(i));
for (int i = 0; i < (int)VoxelSceneStats::ITEM_COUNT; i++) {
VoxelSceneStats::Item item = (VoxelSceneStats::Item)(i);
QLabel* label = _labels[item];
snprintf(strBuf, sizeof(strBuf), "%s", _model->getItemValue(item));
label->setText(strBuf);
}

View file

@ -30,16 +30,19 @@ public:
/// If you're running in non-threaded mode, you must call this regularly
void* threadRoutine();
protected:
/// Override this function to do whatever your class actually does, return false to exit thread early.
virtual bool process() = 0;
protected:
/// Locks all the resources of the thread.
void lock() { pthread_mutex_lock(&_mutex); }
/// Unlocks all the resources of the thread.
void unlock() { pthread_mutex_unlock(&_mutex); }
bool isStillRunning() const { return !_stopThread; }
private:
pthread_mutex_t _mutex;

View file

@ -38,6 +38,8 @@ const PACKET_TYPE PACKET_TYPE_DOMAIN_REPORT_FOR_DUTY = 'C';
const PACKET_TYPE PACKET_TYPE_REQUEST_ASSIGNMENT = 'r';
const PACKET_TYPE PACKET_TYPE_SEND_ASSIGNMENT = 's';
const PACKET_TYPE PACKET_TYPE_VOXEL_STATS = '#';
const PACKET_TYPE PACKET_TYPE_VOXEL_JURISDICTION = 'J';
const PACKET_TYPE PACKET_TYPE_VOXEL_JURISDICTION_REQUEST = 'j';
typedef char PACKET_VERSION;

View file

@ -10,18 +10,23 @@
#include <stdint.h>
const uint64_t SEND_INTERVAL_USECS = 1000 * 5; // no more than 200pps... should be settable
#include "NodeList.h"
#include "PacketSender.h"
#include "SharedUtil.h"
PacketSender::PacketSender() {
_lastSendTime = usecTimestampNow();
const int PacketSender::DEFAULT_PACKETS_PER_SECOND = 200;
const int PacketSender::MINIMUM_PACKETS_PER_SECOND = 1;
PacketSender::PacketSender(PacketSenderNotify* notify, int packetsPerSecond) :
_packetsPerSecond(packetsPerSecond),
_lastSendTime(usecTimestampNow()),
_notify(notify)
{
}
void PacketSender::queuePacket(sockaddr& address, unsigned char* packetData, ssize_t packetLength) {
void PacketSender::queuePacketForSending(sockaddr& address, unsigned char* packetData, ssize_t packetLength) {
NetworkPacket packet(address, packetData, packetLength);
lock();
_packets.push_back(packet);
@ -29,9 +34,11 @@ void PacketSender::queuePacket(sockaddr& address, unsigned char* packetData, ssi
}
bool PacketSender::process() {
uint64_t USECS_PER_SECOND = 1000 * 1000;
uint64_t SEND_INTERVAL_USECS = (_packetsPerSecond == 0) ? USECS_PER_SECOND : (USECS_PER_SECOND / _packetsPerSecond);
if (_packets.size() == 0) {
const uint64_t SEND_THREAD_SLEEP_INTERVAL = (1000 * 1000)/60; // check at 60fps
usleep(SEND_THREAD_SLEEP_INTERVAL);
usleep(SEND_INTERVAL_USECS);
}
while (_packets.size() > 0) {
NetworkPacket& packet = _packets.front();
@ -40,6 +47,10 @@ bool PacketSender::process() {
UDPSocket* nodeSocket = NodeList::getInstance()->getNodeSocket();
nodeSocket->send(&packet.getAddress(), packet.getData(), packet.getLength());
if (_notify) {
_notify->packetSentNotification(packet.getLength());
}
lock();
_packets.erase(_packets.begin());
@ -55,5 +66,5 @@ bool PacketSender::process() {
}
}
return true; // keep running till they terminate us
return isStillRunning(); // keep running till they terminate us
}

View file

@ -14,25 +14,46 @@
#include "GenericThread.h"
#include "NetworkPacket.h"
/// Generalized threaded processor for queueing and sending of outbound packets.
class PacketSender : public GenericThread {
/// Notification Hook for packets being sent by a PacketSender
class PacketSenderNotify {
public:
virtual void packetSentNotification(ssize_t length) = 0;
};
PacketSender();
/// Generalized threaded processor for queueing and sending of outbound packets.
class PacketSender : public virtual GenericThread {
public:
static const int DEFAULT_PACKETS_PER_SECOND;
static const int MINIMUM_PACKETS_PER_SECOND;
PacketSender(PacketSenderNotify* notify = NULL, int packetsPerSecond = DEFAULT_PACKETS_PER_SECOND);
/// Add packet to outbound queue.
/// \param sockaddr& address the destination address
/// \param packetData pointer to data
/// \param ssize_t packetLength size of data
/// \thread any thread, typically the application thread
void queuePacket(sockaddr& address, unsigned char* packetData, ssize_t packetLength);
void queuePacketForSending(sockaddr& address, unsigned char* packetData, ssize_t packetLength);
private:
void setPacketsPerSecond(int packetsPerSecond) { _packetsPerSecond = std::min(MINIMUM_PACKETS_PER_SECOND, packetsPerSecond); }
int getPacketsPerSecond() const { return _packetsPerSecond; }
void setPacketSenderNotify(PacketSenderNotify* notify) { _notify = notify; }
PacketSenderNotify* getPacketSenderNotify() const { return _notify; }
virtual bool process();
protected:
int _packetsPerSecond;
bool hasPacketsToSend() const { return _packets.size() > 0; }
int packetsToSendCount() const { return _packets.size(); }
private:
std::vector<NetworkPacket> _packets;
uint64_t _lastSendTime;
PacketSenderNotify* _notify;
};
#endif // __shared__PacketSender__

View file

@ -8,9 +8,17 @@
// Threaded or non-threaded packet receiver.
//
#include "NodeList.h"
#include "ReceivedPacketProcessor.h"
#include "SharedUtil.h"
void ReceivedPacketProcessor::queueReceivedPacket(sockaddr& address, unsigned char* packetData, ssize_t packetLength) {
// Make sure our Node and NodeList knows we've heard from this node.
Node* node = NodeList::getInstance()->nodeWithAddress(&address);
if (node) {
node->setLastHeardMicrostamp(usecTimestampNow());
}
void ReceivedPacketProcessor::queuePacket(sockaddr& address, unsigned char* packetData, ssize_t packetLength) {
NetworkPacket packet(address, packetData, packetLength);
lock();
_packets.push_back(packet);
@ -30,5 +38,5 @@ bool ReceivedPacketProcessor::process() {
_packets.erase(_packets.begin());
unlock();
}
return true; // keep running till they terminate us
return isStillRunning(); // keep running till they terminate us
}

View file

@ -15,7 +15,7 @@
#include "NetworkPacket.h"
/// Generalized threaded processor for handling received inbound packets.
class ReceivedPacketProcessor : public GenericThread {
class ReceivedPacketProcessor : public virtual GenericThread {
public:
/// Add packet from network receive thread to the processing queue.
@ -23,7 +23,7 @@ public:
/// \param packetData pointer to received data
/// \param ssize_t packetLength size of received data
/// \thread network receive thread
void queuePacket(sockaddr& senderAddress, unsigned char* packetData, ssize_t packetLength);
void queueReceivedPacket(sockaddr& senderAddress, unsigned char* packetData, ssize_t packetLength);
protected:
/// Callback for processing of recieved packets. Implement this to process the incoming packets.
@ -35,6 +35,13 @@ protected:
/// Implements generic processing behavior for this thread.
virtual bool process();
/// Are there received packets waiting to be processed
bool hasPacketsToProcess() const { return _packets.size() > 0; }
/// How many received packets waiting are to be processed
int packetsToProcessCount() const { return _packets.size(); }
private:
std::vector<NetworkPacket> _packets;

View file

@ -255,6 +255,38 @@ bool createVoxelEditMessage(unsigned char command, short int sequence,
return success;
}
/// encodes the voxel details portion of a voxel edit message
bool encodeVoxelEditMessageDetails(unsigned char command, int voxelCount, VoxelDetail* voxelDetails,
unsigned char* bufferOut, int sizeIn, int& sizeOut) {
bool success = true; // assume the best
unsigned char* copyAt = bufferOut;
sizeOut = 0;
for (int i = 0; i < voxelCount && success; i++) {
// get the coded voxel
unsigned char* voxelData = pointToVoxel(voxelDetails[i].x,voxelDetails[i].y,voxelDetails[i].z,
voxelDetails[i].s,voxelDetails[i].red,voxelDetails[i].green,voxelDetails[i].blue);
int lengthOfVoxelData = bytesRequiredForCodeLength(*voxelData)+SIZE_OF_COLOR_DATA;
// make sure we have room to copy this voxel
if (sizeOut + lengthOfVoxelData > sizeIn) {
success = false;
} else {
// add it to our message
memcpy(copyAt, voxelData, lengthOfVoxelData);
copyAt += lengthOfVoxelData;
sizeOut += lengthOfVoxelData;
}
// cleanup
delete[] voxelData;
}
return success;
}
//////////////////////////////////////////////////////////////////////////////////////////
// Function: pointToVoxel()
// Description: Given a universal point with location x,y,z this will return the voxel

View file

@ -81,9 +81,15 @@ struct VoxelDetail {
};
unsigned char* pointToVoxel(float x, float y, float z, float s, unsigned char r = 0, unsigned char g = 0, unsigned char b = 0);
// Creates a full Voxel edit message, including command header, sequence, and details
bool createVoxelEditMessage(unsigned char command, short int sequence,
int voxelCount, VoxelDetail* voxelDetails, unsigned char*& bufferOut, int& sizeOut);
/// encodes the voxel details portion of a voxel edit message
bool encodeVoxelEditMessageDetails(unsigned char command, int voxelCount, VoxelDetail* voxelDetails,
unsigned char* bufferOut, int sizeIn, int& sizeOut);
#ifdef _WIN32
void usleep(int waitTime);
#endif

View file

@ -0,0 +1,90 @@
//
// JurisdictionListener.cpp
// shared
//
// Created by Brad Hefta-Gaub on 8/12/13.
// Copyright (c) 2013 High Fidelity, Inc. All rights reserved.
//
// Threaded or non-threaded jurisdiction Sender for the Application
//
#include <PerfStat.h>
#include <OctalCode.h>
#include <SharedUtil.h>
#include <PacketHeaders.h>
#include "JurisdictionListener.h"
JurisdictionListener::JurisdictionListener(PacketSenderNotify* notify) :
PacketSender(notify, JurisdictionListener::DEFAULT_PACKETS_PER_SECOND)
{
NodeList* nodeList = NodeList::getInstance();
nodeList->addHook(this);
}
JurisdictionListener::~JurisdictionListener() {
NodeList* nodeList = NodeList::getInstance();
nodeList->removeHook(this);
}
void JurisdictionListener::nodeAdded(Node* node) {
// nothing to do. But need to implement it.
}
void JurisdictionListener::nodeKilled(Node* node) {
_jurisdictions.erase(_jurisdictions.find(node->getNodeID()));
}
bool JurisdictionListener::queueJurisdictionRequest() {
static unsigned char buffer[MAX_PACKET_SIZE];
unsigned char* bufferOut = &buffer[0];
ssize_t sizeOut = populateTypeAndVersion(bufferOut, PACKET_TYPE_VOXEL_JURISDICTION_REQUEST);
int nodeCount = 0;
NodeList* nodeList = NodeList::getInstance();
for (NodeList::iterator node = nodeList->begin(); node != nodeList->end(); node++) {
// only send to the NodeTypes that are interested in our jurisdiction details
const int numNodeTypes = 1;
const NODE_TYPE nodeTypes[numNodeTypes] = { NODE_TYPE_VOXEL_SERVER };
if (node->getActiveSocket() != NULL && memchr(nodeTypes, node->getType(), numNodeTypes)) {
sockaddr* nodeAddress = node->getActiveSocket();
PacketSender::queuePacketForSending(*nodeAddress, bufferOut, sizeOut);
nodeCount++;
}
}
// set our packets per second to be the number of nodes
setPacketsPerSecond(nodeCount);
// keep going if still running
return isStillRunning();
}
void JurisdictionListener::processPacket(sockaddr& senderAddress, unsigned char* packetData, ssize_t packetLength) {
if (packetData[0] == PACKET_TYPE_VOXEL_JURISDICTION) {
Node* node = NodeList::getInstance()->nodeWithAddress(&senderAddress);
if (node) {
uint16_t nodeID = node->getNodeID();
JurisdictionMap map;
map.unpackFromMessage(packetData, packetLength);
_jurisdictions[nodeID] = map;
}
}
}
bool JurisdictionListener::process() {
bool continueProcessing = isStillRunning();
// If we're still running, and we don't have any requests waiting to be sent, then queue our jurisdiction requests
if (continueProcessing && !hasPacketsToSend()) {
queueJurisdictionRequest();
continueProcessing = PacketSender::process();
}
if (continueProcessing) {
// NOTE: This will sleep if there are no pending packets to process
continueProcessing = ReceivedPacketProcessor::process();
}
return continueProcessing;
}

View file

@ -0,0 +1,55 @@
//
// JurisdictionListener.h
// shared
//
// Created by Brad Hefta-Gaub on 8/12/13.
// Copyright (c) 2013 High Fidelity, Inc. All rights reserved.
//
// Voxel Packet Sender
//
#ifndef __shared__JurisdictionListener__
#define __shared__JurisdictionListener__
#include <NodeList.h>
#include <PacketSender.h>
#include <ReceivedPacketProcessor.h>
#include "JurisdictionMap.h"
/// Sends out PACKET_TYPE_VOXEL_JURISDICTION_REQUEST packets to all voxel servers and then listens for and processes
/// the PACKET_TYPE_VOXEL_JURISDICTION packets it receives in order to maintain an accurate state of all jurisidictions
/// within the domain. As with other ReceivedPacketProcessor classes the user is responsible for reading inbound packets
/// and adding them to the processing queue by calling queueReceivedPacket()
class JurisdictionListener : public NodeListHook, public PacketSender, public ReceivedPacketProcessor {
public:
static const int DEFAULT_PACKETS_PER_SECOND = 1;
JurisdictionListener(PacketSenderNotify* notify = NULL);
~JurisdictionListener();
virtual bool process();
NodeToJurisdictionMap* getJurisdictions() { return &_jurisdictions; };
/// Called by NodeList to inform us that a node has been added.
void nodeAdded(Node* node);
/// Called by NodeList to inform us that a node has been killed.
void nodeKilled(Node* node);
protected:
/// Callback for processing of received packets. Will process any queued PACKET_TYPE_VOXEL_JURISDICTION and update the
/// jurisdiction map member variable
/// \param sockaddr& senderAddress the address of the sender
/// \param packetData pointer to received data
/// \param ssize_t packetLength size of received data
/// \thread "this" individual processing thread
virtual void processPacket(sockaddr& senderAddress, unsigned char* packetData, ssize_t packetLength);
private:
NodeToJurisdictionMap _jurisdictions;
bool queueJurisdictionRequest();
};
#endif // __shared__JurisdictionListener__

View file

@ -9,6 +9,9 @@
#include <QtCore/QSettings>
#include <QtCore/QString>
#include <QtCore/QStringList>
#include <QDebug>
#include <PacketHeaders.h>
#include "JurisdictionMap.h"
#include "VoxelNode.h"
@ -179,6 +182,18 @@ bool JurisdictionMap::readFromFile(const char* filename) {
return true;
}
void JurisdictionMap::displayDebugDetails() {
QString rootNodeValue = octalCodeToHexString(_rootOctalCode);
qDebug() << "root:" << rootNodeValue << "\n";
for (int i = 0; i < _endNodes.size(); i++) {
QString value = octalCodeToHexString(_endNodes[i]);
qDebug() << "End node[" << i << "]: " << rootNodeValue << "\n";
}
}
bool JurisdictionMap::writeToFile(const char* filename) {
QString settingsFile(filename);
QSettings settings(settingsFile, QSettings::IniFormat);
@ -197,3 +212,95 @@ bool JurisdictionMap::writeToFile(const char* filename) {
settings.endGroup();
return true;
}
int JurisdictionMap::packEmptyJurisdictionIntoMessage(unsigned char* destinationBuffer, int availableBytes) {
unsigned char* bufferStart = destinationBuffer;
int headerLength = populateTypeAndVersion(destinationBuffer, PACKET_TYPE_VOXEL_JURISDICTION);
destinationBuffer += headerLength;
// No root or end node details to pack!
int bytes = 0;
memcpy(destinationBuffer, &bytes, sizeof(bytes));
destinationBuffer += sizeof(bytes);
return destinationBuffer - bufferStart; // includes header!
}
int JurisdictionMap::packIntoMessage(unsigned char* destinationBuffer, int availableBytes) {
unsigned char* bufferStart = destinationBuffer;
int headerLength = populateTypeAndVersion(destinationBuffer, PACKET_TYPE_VOXEL_JURISDICTION);
destinationBuffer += headerLength;
// add the root jurisdiction
if (_rootOctalCode) {
int bytes = bytesRequiredForCodeLength(numberOfThreeBitSectionsInCode(_rootOctalCode));
memcpy(destinationBuffer, &bytes, sizeof(bytes));
destinationBuffer += sizeof(bytes);
memcpy(destinationBuffer, _rootOctalCode, bytes);
destinationBuffer += bytes;
// if and only if there's a root jurisdiction, also include the end nodes
int endNodeCount = _endNodes.size();
memcpy(destinationBuffer, &endNodeCount, sizeof(endNodeCount));
destinationBuffer += sizeof(endNodeCount);
for (int i=0; i < endNodeCount; i++) {
unsigned char* endNodeCode = _endNodes[i];
int bytes = 0;
if (endNodeCode) {
bytes = bytesRequiredForCodeLength(numberOfThreeBitSectionsInCode(endNodeCode));
}
memcpy(destinationBuffer, &bytes, sizeof(bytes));
destinationBuffer += sizeof(bytes);
memcpy(destinationBuffer, endNodeCode, bytes);
destinationBuffer += bytes;
}
} else {
int bytes = 0;
memcpy(destinationBuffer, &bytes, sizeof(bytes));
destinationBuffer += sizeof(bytes);
}
return destinationBuffer - bufferStart; // includes header!
}
int JurisdictionMap::unpackFromMessage(unsigned char* sourceBuffer, int availableBytes) {
clear();
unsigned char* startPosition = sourceBuffer;
// increment to push past the packet header
int numBytesPacketHeader = numBytesForPacketHeader(sourceBuffer);
sourceBuffer += numBytesPacketHeader;
// read the root jurisdiction
int bytes = 0;
memcpy(&bytes, sourceBuffer, sizeof(bytes));
sourceBuffer += sizeof(bytes);
if (bytes > 0) {
_rootOctalCode = new unsigned char[bytes];
memcpy(_rootOctalCode, sourceBuffer, bytes);
sourceBuffer += bytes;
// if and only if there's a root jurisdiction, also include the end nodes
int endNodeCount = 0;
memcpy(&endNodeCount, sourceBuffer, sizeof(endNodeCount));
sourceBuffer += sizeof(endNodeCount);
for (int i=0; i < endNodeCount; i++) {
int bytes = 0;
memcpy(&bytes, sourceBuffer, sizeof(bytes));
sourceBuffer += sizeof(bytes);
unsigned char* endNodeCode = new unsigned char[bytes];
memcpy(endNodeCode, sourceBuffer, bytes);
sourceBuffer += bytes;
// if the endNodeCode was 0 length then don't add it
if (bytes > 0) {
_endNodes.push_back(endNodeCode);
}
}
}
return sourceBuffer - startPosition; // includes header!
}

View file

@ -9,6 +9,8 @@
#ifndef __hifi__JurisdictionMap__
#define __hifi__JurisdictionMap__
#include <map>
#include <stdint.h>
#include <vector>
#include <QtCore/QString>
@ -49,6 +51,14 @@ public:
int getEndNodeCount() const { return _endNodes.size(); }
void copyContents(unsigned char* rootCodeIn, const std::vector<unsigned char*>& endNodesIn);
int unpackFromMessage(unsigned char* sourceBuffer, int availableBytes);
int packIntoMessage(unsigned char* destinationBuffer, int availableBytes);
/// Available to pack an empty or unknown jurisdiction into a network packet, used when no JurisdictionMap is available
static int packEmptyJurisdictionIntoMessage(unsigned char* destinationBuffer, int availableBytes);
void displayDebugDetails();
private:
void copyContents(const JurisdictionMap& other); // use assignment instead
@ -59,6 +69,11 @@ private:
std::vector<unsigned char*> _endNodes;
};
/// Map between node IDs and their reported JurisdictionMap. Typically used by classes that need to know which nodes are
/// managing which jurisdictions.
typedef std::map<uint16_t, JurisdictionMap> NodeToJurisdictionMap;
#endif /* defined(__hifi__JurisdictionMap__) */

View file

@ -0,0 +1,76 @@
//
// JurisdictionSender.cpp
// shared
//
// Created by Brad Hefta-Gaub on 8/12/13.
// Copyright (c) 2013 High Fidelity, Inc. All rights reserved.
//
// Threaded or non-threaded jurisdiction Sender for the Application
//
#include <PerfStat.h>
#include <OctalCode.h>
#include <SharedUtil.h>
#include <PacketHeaders.h>
#include "JurisdictionSender.h"
JurisdictionSender::JurisdictionSender(JurisdictionMap* map, PacketSenderNotify* notify) :
PacketSender(notify, JurisdictionSender::DEFAULT_PACKETS_PER_SECOND),
ReceivedPacketProcessor(),
_jurisdictionMap(map)
{
}
void JurisdictionSender::processPacket(sockaddr& senderAddress, unsigned char* packetData, ssize_t packetLength) {
if (packetData[0] == PACKET_TYPE_VOXEL_JURISDICTION_REQUEST) {
Node* node = NodeList::getInstance()->nodeWithAddress(&senderAddress);
if (node) {
uint16_t nodeID = node->getNodeID();
lock();
_nodesRequestingJurisdictions.insert(nodeID);
unlock();
}
}
}
bool JurisdictionSender::process() {
bool continueProcessing = isStillRunning();
// call our ReceivedPacketProcessor base class process so we'll get any pending packets
if (continueProcessing && (continueProcessing = ReceivedPacketProcessor::process())) {
// add our packet to our own queue, then let the PacketSender class do the rest of the work.
static unsigned char buffer[MAX_PACKET_SIZE];
unsigned char* bufferOut = &buffer[0];
ssize_t sizeOut = 0;
if (_jurisdictionMap) {
sizeOut = _jurisdictionMap->packIntoMessage(bufferOut, MAX_PACKET_SIZE);
} else {
sizeOut = JurisdictionMap::packEmptyJurisdictionIntoMessage(bufferOut, MAX_PACKET_SIZE);
}
int nodeCount = 0;
for (std::set<uint16_t>::iterator nodeIterator = _nodesRequestingJurisdictions.begin();
nodeIterator != _nodesRequestingJurisdictions.end(); nodeIterator++) {
uint16_t nodeID = *nodeIterator;
Node* node = NodeList::getInstance()->nodeWithID(nodeID);
if (node->getActiveSocket() != NULL) {
sockaddr* nodeAddress = node->getActiveSocket();
queuePacketForSending(*nodeAddress, bufferOut, sizeOut);
nodeCount++;
// remove it from the set
_nodesRequestingJurisdictions.erase(nodeIterator);
}
}
// set our packets per second to be the number of nodes
setPacketsPerSecond(nodeCount);
continueProcessing = PacketSender::process();
}
return continueProcessing;
}

View file

@ -0,0 +1,40 @@
//
// JurisdictionSender.h
// shared
//
// Created by Brad Hefta-Gaub on 8/12/13.
// Copyright (c) 2013 High Fidelity, Inc. All rights reserved.
//
// Voxel Packet Sender
//
#ifndef __shared__JurisdictionSender__
#define __shared__JurisdictionSender__
#include <set>
#include <PacketSender.h>
#include <ReceivedPacketProcessor.h>
#include "JurisdictionMap.h"
/// Will process PACKET_TYPE_VOXEL_JURISDICTION_REQUEST packets and send out PACKET_TYPE_VOXEL_JURISDICTION packets
/// to requesting parties. As with other ReceivedPacketProcessor classes the user is responsible for reading inbound packets
/// and adding them to the processing queue by calling queueReceivedPacket()
class JurisdictionSender : public PacketSender, public ReceivedPacketProcessor {
public:
static const int DEFAULT_PACKETS_PER_SECOND = 1;
JurisdictionSender(JurisdictionMap* map, PacketSenderNotify* notify = NULL);
void setJurisdiction(JurisdictionMap* map) { _jurisdictionMap = map; }
virtual bool process();
protected:
virtual void processPacket(sockaddr& senderAddress, unsigned char* packetData, ssize_t packetLength);
private:
JurisdictionMap* _jurisdictionMap;
std::set<uint16_t> _nodesRequestingJurisdictions;
};
#endif // __shared__JurisdictionSender__

View file

@ -10,33 +10,30 @@
#include <PerfStat.h>
#include "Application.h"
#include "Menu.h"
#include <OctalCode.h>
#include <PacketHeaders.h>
#include "VoxelEditPacketSender.h"
VoxelEditPacketSender::VoxelEditPacketSender(Application* app) :
_app(app)
{
VoxelEditPacketSender::VoxelEditPacketSender(PacketSenderNotify* notify) :
PacketSender(notify),
_shouldSend(true),
_voxelServerJurisdictions(NULL) {
}
void VoxelEditPacketSender::sendVoxelEditMessage(PACKET_TYPE type, VoxelDetail& detail) {
// if the app has Voxels disabled, we don't do any of this...
if (!Menu::getInstance()->isOptionChecked(MenuOption::Voxels)) {
// allows app to disable sending if for example voxels have been disabled
if (!_shouldSend) {
return; // bail early
}
unsigned char* bufferOut;
int sizeOut;
int totalBytesSent = 0;
if (createVoxelEditMessage(type, 0, 1, &detail, bufferOut, sizeOut)){
actuallySendMessage(UNKNOWN_NODE_ID, bufferOut, sizeOut); // sends to all servers... not ideal!
delete[] bufferOut;
}
// Tell the application's bandwidth meters about what we've sent
_app->_bandwidthMeter.outputStream(BandwidthMeter::VOXELS).updateValue(totalBytesSent);
}
void VoxelEditPacketSender::actuallySendMessage(uint16_t nodeID, unsigned char* bufferOut, ssize_t sizeOut) {
@ -46,12 +43,32 @@ void VoxelEditPacketSender::actuallySendMessage(uint16_t nodeID, unsigned char*
if (node->getActiveSocket() != NULL && node->getType() == NODE_TYPE_VOXEL_SERVER &&
((node->getNodeID() == nodeID) || (nodeID == (uint16_t)UNKNOWN_NODE_ID)) ) {
sockaddr* nodeAddress = node->getActiveSocket();
queuePacket(*nodeAddress, bufferOut, sizeOut);
queuePacketForSending(*nodeAddress, bufferOut, sizeOut);
}
}
}
void VoxelEditPacketSender::queueVoxelEditMessages(PACKET_TYPE type, int numberOfDetails, VoxelDetail* details) {
if (!_shouldSend) {
return; // bail early
}
for (int i = 0; i < numberOfDetails; i++) {
static unsigned char bufferOut[MAX_PACKET_SIZE];
int sizeOut = 0;
if (encodeVoxelEditMessageDetails(type, 1, &details[i], &bufferOut[0], MAX_PACKET_SIZE, sizeOut)) {
queueVoxelEditMessage(type, bufferOut, sizeOut);
}
}
}
void VoxelEditPacketSender::queueVoxelEditMessage(PACKET_TYPE type, unsigned char* codeColorBuffer, ssize_t length) {
if (!_shouldSend) {
return; // bail early
}
// We want to filter out edit messages for voxel servers based on the server's Jurisdiction
// But we can't really do that with a packed message, since each edit message could be destined
// for a different voxel server... So we need to actually manage multiple queued packets... one
@ -60,12 +77,16 @@ void VoxelEditPacketSender::queueVoxelEditMessage(PACKET_TYPE type, unsigned cha
for (NodeList::iterator node = nodeList->begin(); node != nodeList->end(); node++) {
// only send to the NodeTypes that are NODE_TYPE_VOXEL_SERVER
if (node->getActiveSocket() != NULL && node->getType() == NODE_TYPE_VOXEL_SERVER) {
// we need to get the jurisdiction for this
// here we need to get the "pending packet" for this server
uint16_t nodeID = node->getNodeID();
const JurisdictionMap& map = _app->_voxelServerJurisdictions[nodeID];
if (map.isMyJurisdiction(codeColorBuffer, CHECK_NODE_ONLY) == JurisdictionMap::WITHIN) {
bool isMyJurisdiction = true;
if (_voxelServerJurisdictions) {
// we need to get the jurisdiction for this
// here we need to get the "pending packet" for this server
const JurisdictionMap& map = (*_voxelServerJurisdictions)[nodeID];
isMyJurisdiction = (map.isMyJurisdiction(codeColorBuffer, CHECK_NODE_ONLY) == JurisdictionMap::WITHIN);
}
if (isMyJurisdiction) {
EditPacketBuffer& packetBuffer = _pendingEditPackets[nodeID];
packetBuffer._nodeID = nodeID;

View file

@ -1,6 +1,6 @@
//
// VoxelEditPacketSender.h
// interface
// shared
//
// Created by Brad Hefta-Gaub on 8/12/13.
// Copyright (c) 2013 High Fidelity, Inc. All rights reserved.
@ -13,8 +13,7 @@
#include <PacketSender.h>
#include <SharedUtil.h> // for VoxelDetail
class Application;
#include "JurisdictionMap.h"
/// Used for construction of edit voxel packets
class EditPacketBuffer {
@ -29,7 +28,7 @@ public:
/// Threaded processor for queueing and sending of outbound edit voxel packets.
class VoxelEditPacketSender : public PacketSender {
public:
VoxelEditPacketSender(Application* app);
VoxelEditPacketSender(PacketSenderNotify* notify = NULL);
/// Send voxel edit message immediately
void sendVoxelEditMessage(PACKET_TYPE type, VoxelDetail& detail);
@ -38,15 +37,28 @@ public:
/// node or nodes the packet should be sent to.
void queueVoxelEditMessage(PACKET_TYPE type, unsigned char* codeColorBuffer, ssize_t length);
/// Queues an array of several voxel edit messages. Will potentially send a pending multi-command packet. Determines
/// which voxel-server node or nodes the packet should be sent to.
void queueVoxelEditMessages(PACKET_TYPE type, int numberOfDetails, VoxelDetail* details);
/// flushes all queued packets for all nodes
void flushQueue();
bool getShouldSend() const { return _shouldSend; }
void setShouldSend(bool shouldSend) { _shouldSend = shouldSend; }
void setVoxelServerJurisdictions(NodeToJurisdictionMap* voxelServerJurisdictions) {
_voxelServerJurisdictions = voxelServerJurisdictions;
}
private:
bool _shouldSend;
void actuallySendMessage(uint16_t nodeID, unsigned char* bufferOut, ssize_t sizeOut);
void initializePacket(EditPacketBuffer& packetBuffer, PACKET_TYPE type);
void flushQueue(EditPacketBuffer& packetBuffer); // flushes specific queued packet
Application* _app;
std::map<uint16_t,EditPacketBuffer> _pendingEditPackets;
NodeToJurisdictionMap* _voxelServerJurisdictions;
};
#endif // __shared__VoxelEditPacketSender__

View file

@ -543,7 +543,7 @@ VoxelSceneStats::ItemInfo VoxelSceneStats::_ITEMS[] = {
{ "Mode" , greenish },
};
char* VoxelSceneStats::getItemValue(int item) {
char* VoxelSceneStats::getItemValue(Item item) {
const uint64_t USECS_PER_SECOND = 1000 * 1000;
int calcFPS, calcAverageFPS, calculatedKBPS;
switch(item) {

View file

@ -16,42 +16,83 @@
class VoxelNode;
/// Collects statistics for calculating and sending a scene from a voxel server to an interface client
class VoxelSceneStats {
public:
VoxelSceneStats();
~VoxelSceneStats();
void reset();
/// Call when beginning the computation of a scene. Initializes internal structures
void sceneStarted(bool fullScene, bool moving, VoxelNode* root, JurisdictionMap* jurisdictionMap);
/// Call when the computation of a scene is completed. Finalizes internal structures
void sceneCompleted();
void printDebugDetails();
/// Track that a packet was sent as part of the scene.
void packetSent(int bytes);
/// Tracks the beginning of an encode pass during scene calculation.
void encodeStarted();
/// Tracks the ending of an encode pass during scene calculation.
void encodeStopped();
/// Track that a node was traversed as part of computation of a scene.
void traversed(const VoxelNode* node);
/// Track that a node was skipped as part of computation of a scene due to being beyond the LOD distance.
void skippedDistance(const VoxelNode* node);
/// Track that a node was skipped as part of computation of a scene due to being out of view.
void skippedOutOfView(const VoxelNode* node);
/// Track that a node was skipped as part of computation of a scene due to previously being in view while in delta sending
void skippedWasInView(const VoxelNode* node);
/// Track that a node was skipped as part of computation of a scene due to not having changed since last full scene sent
void skippedNoChange(const VoxelNode* node);
/// Track that a node was skipped as part of computation of a scene due to being occluded
void skippedOccluded(const VoxelNode* node);
/// Track that a node's color was was sent as part of computation of a scene
void colorSent(const VoxelNode* node);
/// Track that a node was due to be sent, but didn't fit in the packet and was moved to next packet
void didntFit(const VoxelNode* node);
/// Track that the color bitmask was was sent as part of computation of a scene
void colorBitsWritten();
/// Track that the exists in tree bitmask was was sent as part of computation of a scene
void existsBitsWritten();
/// Track that the exists in packet bitmask was was sent as part of computation of a scene
void existsInPacketBitsWritten();
/// Fix up tracking statistics in case where bitmasks were removed for some reason
void childBitsRemoved(bool includesExistsBits, bool includesColors);
/// Pack the details of the statistics into a buffer for sending as a network packet
int packIntoMessage(unsigned char* destinationBuffer, int availableBytes);
/// Unpack the details of the statistics from a buffer typically received as a network packet
int unpackFromMessage(unsigned char* sourceBuffer, int availableBytes);
/// Indicates that a scene has been completed and the statistics are ready to be sent
bool isReadyToSend() const { return _isReadyToSend; }
/// Mark that the scene statistics have been sent
void markAsSent() { _isReadyToSend = false; }
unsigned char* getStatsMessage() { return &_statsMessage[0]; }
int getStatsMessageLength() const { return _statsMessageLength; }
enum {
/// List of various items tracked by VoxelSceneStats which can be accessed via getItemInfo() and getItemValue()
enum Item {
ITEM_ELAPSED,
ITEM_ENCODE,
ITEM_PACKETS,
@ -71,16 +112,24 @@ public:
ITEM_COUNT
};
// Meta information about each stats item
/// Meta information about each stats item
struct ItemInfo {
char const* const caption;
unsigned colorRGBA;
};
ItemInfo& getItemInfo(int item) { return _ITEMS[item]; };
char* getItemValue(int item);
/// Returns details about items tracked by VoxelSceneStats
/// \param Item item The item from the stats you're interested in.
ItemInfo& getItemInfo(Item item) { return _ITEMS[item]; };
/// Returns a UI formatted value of an item tracked by VoxelSceneStats
/// \param Item item The item from the stats you're interested in.
char* getItemValue(Item item);
/// Returns OctCode for root node of the jurisdiction of this particular voxel server
unsigned char* getJurisdictionRoot() const { return _jurisdictionRoot; }
/// Returns list of OctCodes for end nodes of the jurisdiction of this particular voxel server
const std::vector<unsigned char*>& getJurisdictionEndNodes() const { return _jurisdictionEndNodes; }
private:

View file

@ -21,6 +21,8 @@
#include <SceneUtils.h>
#include <PerfStat.h>
#include <JurisdictionSender.h>
#ifdef _WIN32
#include "Syssocket.h"
#include "Systime.h"
@ -73,6 +75,7 @@ EnvironmentData environmentData[3];
int receivedPacketCount = 0;
JurisdictionMap* jurisdiction = NULL;
JurisdictionSender* jurisdictionSender = NULL;
void randomlyFillVoxelTree(int levelsToGo, VoxelNode *currentRootNode) {
// randomly generate children for this node
@ -400,7 +403,7 @@ void persistVoxelsWhenDirty() {
}
}
void *distributeVoxelsToListeners(void *args) {
void* distributeVoxelsToListeners(void* args) {
NodeList* nodeList = NodeList::getInstance();
timeval lastSendTime;
@ -408,7 +411,6 @@ void *distributeVoxelsToListeners(void *args) {
while (true) {
gettimeofday(&lastSendTime, NULL);
// enumerate the nodes to send 3 packets to each
for (NodeList::iterator node = nodeList->begin(); node != nodeList->end(); node++) {
VoxelNodeData* nodeData = (VoxelNodeData*) node->getLinkedData();
@ -482,11 +484,10 @@ int main(int argc, const char * argv[]) {
}
if (jurisdictionRoot || jurisdictionEndNodes) {
jurisdiction = new JurisdictionMap(jurisdictionRoot, jurisdictionEndNodes);
::jurisdiction = new JurisdictionMap(jurisdictionRoot, jurisdictionEndNodes);
}
}
// should we send environments? Default is yes, but this command line suppresses sending
const char* DUMP_VOXELS_ON_MOVE = "--dumpVoxelsOnMove";
::dumpVoxelsOnMove = cmdOptionExists(argc, argv, DUMP_VOXELS_ON_MOVE);
@ -509,6 +510,9 @@ int main(int argc, const char * argv[]) {
NodeList* nodeList = NodeList::createInstance(NODE_TYPE_VOXEL_SERVER, listenPort);
setvbuf(stdout, NULL, _IOLBF, 0);
const NODE_TYPE nodeTypes[] = { NODE_TYPE_AGENT, NODE_TYPE_ANIMATION_SERVER };
NodeList::getInstance()->setNodeTypesOfInterest(&nodeTypes[0], sizeof(nodeTypes));
// Handle Local Domain testing with the --local command line
const char* local = "--local";
::wantLocalDomain = cmdOptionExists(argc, argv,local);
@ -655,8 +659,13 @@ int main(int argc, const char * argv[]) {
timeval lastDomainServerCheckIn = {};
// set up our jurisdiction broadcaster...
::jurisdictionSender = new JurisdictionSender(::jurisdiction);
if (::jurisdictionSender) {
::jurisdictionSender->initialize(true);
}
// loop to send to nodes requesting data
while (true) {
// send a check in packet to the domain server if DOMAIN_SERVER_CHECK_IN_USECS has elapsed
@ -735,12 +744,25 @@ int main(int argc, const char * argv[]) {
voxelData += voxelDataSize;
atByte += voxelDataSize;
}
// Make sure our Node and NodeList knows we've heard from this node.
Node* node = NodeList::getInstance()->nodeWithAddress(&nodePublicAddress);
if (node) {
node->setLastHeardMicrostamp(usecTimestampNow());
}
} else if (packetData[0] == PACKET_TYPE_ERASE_VOXEL) {
// Send these bits off to the VoxelTree class to process them
pthread_mutex_lock(&::treeLock);
serverTree.processRemoveVoxelBitstream((unsigned char*)packetData, receivedBytes);
pthread_mutex_unlock(&::treeLock);
// Make sure our Node and NodeList knows we've heard from this node.
Node* node = NodeList::getInstance()->nodeWithAddress(&nodePublicAddress);
if (node) {
node->setLastHeardMicrostamp(usecTimestampNow());
}
} else if (packetData[0] == PACKET_TYPE_Z_COMMAND) {
// the Z command is a special command that allows the sender to send the voxel server high level semantic
@ -774,6 +796,12 @@ int main(int argc, const char * argv[]) {
printf("rebroadcasting Z message to connected nodes... nodeList.broadcastToNodes()\n");
nodeList->broadcastToNodes(packetData, receivedBytes, &NODE_TYPE_AGENT, 1);
}
// Make sure our Node and NodeList knows we've heard from this node.
Node* node = NodeList::getInstance()->nodeWithAddress(&nodePublicAddress);
if (node) {
node->setLastHeardMicrostamp(usecTimestampNow());
}
} else if (packetData[0] == PACKET_TYPE_HEAD_DATA) {
// If we got a PACKET_TYPE_HEAD_DATA, then we're talking to an NODE_TYPE_AVATAR, and we
// need to make sure we have it in our nodeList.
@ -789,6 +817,14 @@ int main(int argc, const char * argv[]) {
} else if (packetData[0] == PACKET_TYPE_PING) {
// If the packet is a ping, let processNodeData handle it.
nodeList->processNodeData(&nodePublicAddress, packetData, receivedBytes);
} else if (packetData[0] == PACKET_TYPE_DOMAIN) {
nodeList->processNodeData(&nodePublicAddress, packetData, receivedBytes);
} else if (packetData[0] == PACKET_TYPE_VOXEL_JURISDICTION_REQUEST) {
if (::jurisdictionSender) {
jurisdictionSender->queueReceivedPacket(nodePublicAddress, packetData, receivedBytes);
}
} else {
printf("unknown packet ignored... packetData[0]=%c\n", packetData[0]);
}
}
}
@ -796,8 +832,13 @@ int main(int argc, const char * argv[]) {
pthread_join(sendVoxelThread, NULL);
pthread_mutex_destroy(&::treeLock);
if (jurisdiction) {
delete jurisdiction;
if (::jurisdiction) {
delete ::jurisdiction;
}
if (::jurisdictionSender) {
jurisdictionSender->terminate();
delete ::jurisdictionSender;
}
return 0;