mirror of
https://github.com/HifiExperiments/overte.git
synced 2025-06-15 19:48:47 +02:00
Merge pull request #1011 from ZappoMan/voxeleditsender_improvements
VoxelEditPacketSender class improvements
This commit is contained in:
commit
e9a21e284a
8 changed files with 336 additions and 62 deletions
|
@ -6,6 +6,7 @@
|
||||||
// Copyright (c) 2012 High Fidelity, Inc. All rights reserved.
|
// Copyright (c) 2012 High Fidelity, Inc. All rights reserved.
|
||||||
//
|
//
|
||||||
|
|
||||||
|
#include <algorithm>
|
||||||
#include <cmath>
|
#include <cmath>
|
||||||
#include <cstdlib>
|
#include <cstdlib>
|
||||||
#include <cstring>
|
#include <cstring>
|
||||||
|
@ -39,12 +40,14 @@ bool includeMovingBug = true;
|
||||||
bool includeBlinkingVoxel = false;
|
bool includeBlinkingVoxel = false;
|
||||||
bool includeDanceFloor = true;
|
bool includeDanceFloor = true;
|
||||||
bool buildStreet = false;
|
bool buildStreet = false;
|
||||||
|
bool nonThreadedPacketSender = false;
|
||||||
|
|
||||||
|
|
||||||
const int ANIMATION_LISTEN_PORT = 40107;
|
const int ANIMATION_LISTEN_PORT = 40107;
|
||||||
const int ACTUAL_FPS = 60;
|
const int ACTUAL_FPS = 60;
|
||||||
const double OUR_FPS_IN_MILLISECONDS = 1000.0/ACTUAL_FPS; // determines FPS from our desired FPS
|
const double OUR_FPS_IN_MILLISECONDS = 1000.0/ACTUAL_FPS; // determines FPS from our desired FPS
|
||||||
const int ANIMATE_VOXELS_INTERVAL_USECS = OUR_FPS_IN_MILLISECONDS * 1000.0; // converts from milliseconds to usecs
|
const int FUDGE_USECS = 10; // a little bit of fudge to actually do some processing
|
||||||
|
const int ANIMATE_VOXELS_INTERVAL_USECS = (OUR_FPS_IN_MILLISECONDS * 1000.0) - FUDGE_USECS; // converts from milliseconds to usecs
|
||||||
|
|
||||||
bool wantLocalDomain = false;
|
bool wantLocalDomain = false;
|
||||||
|
|
||||||
|
@ -618,7 +621,10 @@ void* animateVoxels(void* args) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (::voxelEditPacketSender) {
|
if (::voxelEditPacketSender) {
|
||||||
::voxelEditPacketSender->flushQueue();
|
::voxelEditPacketSender->releaseQueuedMessages();
|
||||||
|
if (::nonThreadedPacketSender) {
|
||||||
|
::voxelEditPacketSender->process();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
uint64_t end = usecTimestampNow();
|
uint64_t end = usecTimestampNow();
|
||||||
|
@ -629,6 +635,9 @@ void* animateVoxels(void* args) {
|
||||||
}
|
}
|
||||||
// dynamically sleep until we need to fire off the next set of voxels
|
// dynamically sleep until we need to fire off the next set of voxels
|
||||||
uint64_t usecToSleep = ANIMATE_VOXELS_INTERVAL_USECS - (usecTimestampNow() - usecTimestamp(&lastSendTime));
|
uint64_t usecToSleep = ANIMATE_VOXELS_INTERVAL_USECS - (usecTimestampNow() - usecTimestamp(&lastSendTime));
|
||||||
|
if (usecToSleep > ANIMATE_VOXELS_INTERVAL_USECS) {
|
||||||
|
usecToSleep = ANIMATE_VOXELS_INTERVAL_USECS;
|
||||||
|
}
|
||||||
|
|
||||||
if (usecToSleep > 0) {
|
if (usecToSleep > 0) {
|
||||||
usleep(usecToSleep);
|
usleep(usecToSleep);
|
||||||
|
@ -648,6 +657,11 @@ int main(int argc, const char * argv[])
|
||||||
NodeList* nodeList = NodeList::createInstance(NODE_TYPE_ANIMATION_SERVER, ANIMATION_LISTEN_PORT);
|
NodeList* nodeList = NodeList::createInstance(NODE_TYPE_ANIMATION_SERVER, ANIMATION_LISTEN_PORT);
|
||||||
setvbuf(stdout, NULL, _IOLBF, 0);
|
setvbuf(stdout, NULL, _IOLBF, 0);
|
||||||
|
|
||||||
|
// Handle Local Domain testing with the --local command line
|
||||||
|
const char* NON_THREADED_PACKETSENDER = "--NonThreadedPacketSender";
|
||||||
|
::nonThreadedPacketSender = cmdOptionExists(argc, argv, NON_THREADED_PACKETSENDER);
|
||||||
|
printf("nonThreadedPacketSender=%s\n", debug::valueOf(::nonThreadedPacketSender));
|
||||||
|
|
||||||
// Handle Local Domain testing with the --local command line
|
// Handle Local Domain testing with the --local command line
|
||||||
const char* NO_BILLBOARD = "--NoBillboard";
|
const char* NO_BILLBOARD = "--NoBillboard";
|
||||||
::includeBillboard = !cmdOptionExists(argc, argv, NO_BILLBOARD);
|
::includeBillboard = !cmdOptionExists(argc, argv, NO_BILLBOARD);
|
||||||
|
@ -702,10 +716,13 @@ int main(int argc, const char * argv[])
|
||||||
// Create out VoxelEditPacketSender
|
// Create out VoxelEditPacketSender
|
||||||
::voxelEditPacketSender = new VoxelEditPacketSender;
|
::voxelEditPacketSender = new VoxelEditPacketSender;
|
||||||
if (::voxelEditPacketSender) {
|
if (::voxelEditPacketSender) {
|
||||||
::voxelEditPacketSender->initialize(true);
|
::voxelEditPacketSender->initialize(!::nonThreadedPacketSender);
|
||||||
if (::jurisdictionListener) {
|
if (::jurisdictionListener) {
|
||||||
::voxelEditPacketSender->setVoxelServerJurisdictions(::jurisdictionListener->getJurisdictions());
|
::voxelEditPacketSender->setVoxelServerJurisdictions(::jurisdictionListener->getJurisdictions());
|
||||||
}
|
}
|
||||||
|
if (::nonThreadedPacketSender) {
|
||||||
|
::voxelEditPacketSender->setProcessCallIntervalHint(ANIMATE_VOXELS_INTERVAL_USECS);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
srand((unsigned)time(0));
|
srand((unsigned)time(0));
|
||||||
|
|
|
@ -94,6 +94,9 @@ void Agent::run() {
|
||||||
engine.globalObject().setProperty("TREE_SCALE", treeScaleValue);
|
engine.globalObject().setProperty("TREE_SCALE", treeScaleValue);
|
||||||
|
|
||||||
const long long VISUAL_DATA_SEND_INTERVAL_USECS = (1 / 60.0f) * 1000 * 1000;
|
const long long VISUAL_DATA_SEND_INTERVAL_USECS = (1 / 60.0f) * 1000 * 1000;
|
||||||
|
|
||||||
|
// let the VoxelPacketSender know how frequently we plan to call it
|
||||||
|
voxelScripter.getVoxelPacketSender()->setProcessCallIntervalHint(VISUAL_DATA_SEND_INTERVAL_USECS);
|
||||||
|
|
||||||
QScriptValue visualSendIntervalValue = engine.newVariant((QVariant(VISUAL_DATA_SEND_INTERVAL_USECS / 1000)));
|
QScriptValue visualSendIntervalValue = engine.newVariant((QVariant(VISUAL_DATA_SEND_INTERVAL_USECS / 1000)));
|
||||||
engine.globalObject().setProperty("VISUAL_DATA_SEND_INTERVAL_MS", visualSendIntervalValue);
|
engine.globalObject().setProperty("VISUAL_DATA_SEND_INTERVAL_MS", visualSendIntervalValue);
|
||||||
|
@ -149,9 +152,11 @@ void Agent::run() {
|
||||||
qDebug() << "Uncaught exception at line" << line << ":" << engine.uncaughtException().toString() << "\n";
|
qDebug() << "Uncaught exception at line" << line << ":" << engine.uncaughtException().toString() << "\n";
|
||||||
}
|
}
|
||||||
|
|
||||||
// flush the queue of packets and then process them so they are all sent off
|
// release the queue of edit voxel messages.
|
||||||
voxelScripter.getVoxelPacketSender()->flushQueue();
|
voxelScripter.getVoxelPacketSender()->releaseQueuedMessages();
|
||||||
voxelScripter.getVoxelPacketSender()->processWithoutSleep();
|
|
||||||
|
// since we're in non-threaded mode, call process so that the packets are sent
|
||||||
|
voxelScripter.getVoxelPacketSender()->process();
|
||||||
}
|
}
|
||||||
|
|
||||||
while (NodeList::getInstance()->getNodeSocket()->receive((sockaddr*) &senderAddress, receivedData, &receivedBytes)) {
|
while (NodeList::getInstance()->getNodeSocket()->receive((sockaddr*) &senderAddress, receivedData, &receivedBytes)) {
|
||||||
|
|
|
@ -1440,7 +1440,7 @@ void Application::pasteVoxels() {
|
||||||
_sharedVoxelSystem.changeTree(&_clipboard);
|
_sharedVoxelSystem.changeTree(&_clipboard);
|
||||||
}
|
}
|
||||||
|
|
||||||
_voxelEditSender.flushQueue();
|
_voxelEditSender.releaseQueuedMessages();
|
||||||
|
|
||||||
if (calculatedOctCode) {
|
if (calculatedOctCode) {
|
||||||
delete[] calculatedOctCode;
|
delete[] calculatedOctCode;
|
||||||
|
@ -3354,6 +3354,10 @@ void Application::deleteVoxelUnderCursor() {
|
||||||
if (_mouseVoxel.s != 0) {
|
if (_mouseVoxel.s != 0) {
|
||||||
// sending delete to the server is sufficient, server will send new version so we see updates soon enough
|
// sending delete to the server is sufficient, server will send new version so we see updates soon enough
|
||||||
_voxelEditSender.sendVoxelEditMessage(PACKET_TYPE_ERASE_VOXEL, _mouseVoxel);
|
_voxelEditSender.sendVoxelEditMessage(PACKET_TYPE_ERASE_VOXEL, _mouseVoxel);
|
||||||
|
|
||||||
|
// delete it locally to see the effect immediately (and in case no voxel server is present)
|
||||||
|
_voxels.deleteVoxelAt(_mouseVoxel.x, _mouseVoxel.y, _mouseVoxel.z, _mouseVoxel.s);
|
||||||
|
|
||||||
AudioInjector* voxelInjector = AudioInjectionManager::injectorWithCapacity(5000);
|
AudioInjector* voxelInjector = AudioInjectionManager::injectorWithCapacity(5000);
|
||||||
|
|
||||||
if (voxelInjector) {
|
if (voxelInjector) {
|
||||||
|
|
|
@ -42,13 +42,15 @@ protected:
|
||||||
void unlock() { pthread_mutex_unlock(&_mutex); }
|
void unlock() { pthread_mutex_unlock(&_mutex); }
|
||||||
|
|
||||||
bool isStillRunning() const { return !_stopThread; }
|
bool isStillRunning() const { return !_stopThread; }
|
||||||
|
|
||||||
|
bool isThreaded() const { return _isThreaded; }
|
||||||
|
|
||||||
private:
|
private:
|
||||||
pthread_mutex_t _mutex;
|
pthread_mutex_t _mutex;
|
||||||
|
|
||||||
bool _stopThread;
|
bool _stopThread;
|
||||||
bool _isThreaded;
|
bool _isThreaded;
|
||||||
pthread_t _thread;
|
pthread_t _thread;
|
||||||
};
|
};
|
||||||
|
|
||||||
extern "C" void* GenericThreadEntry(void* arg);
|
extern "C" void* GenericThreadEntry(void* arg);
|
||||||
|
|
|
@ -8,6 +8,7 @@
|
||||||
// Threaded or non-threaded packet sender.
|
// Threaded or non-threaded packet sender.
|
||||||
//
|
//
|
||||||
|
|
||||||
|
#include <math.h>
|
||||||
#include <stdint.h>
|
#include <stdint.h>
|
||||||
|
|
||||||
#include "NodeList.h"
|
#include "NodeList.h"
|
||||||
|
@ -17,9 +18,13 @@
|
||||||
const int PacketSender::DEFAULT_PACKETS_PER_SECOND = 200;
|
const int PacketSender::DEFAULT_PACKETS_PER_SECOND = 200;
|
||||||
const int PacketSender::MINIMUM_PACKETS_PER_SECOND = 1;
|
const int PacketSender::MINIMUM_PACKETS_PER_SECOND = 1;
|
||||||
|
|
||||||
|
const int AVERAGE_CALL_TIME_SAMPLES = 10;
|
||||||
|
|
||||||
PacketSender::PacketSender(PacketSenderNotify* notify, int packetsPerSecond) :
|
PacketSender::PacketSender(PacketSenderNotify* notify, int packetsPerSecond) :
|
||||||
_packetsPerSecond(packetsPerSecond),
|
_packetsPerSecond(packetsPerSecond),
|
||||||
|
_usecsPerProcessCallHint(0),
|
||||||
|
_lastProcessCallTime(usecTimestampNow()),
|
||||||
|
_averageProcessCallTime(AVERAGE_CALL_TIME_SAMPLES),
|
||||||
_lastSendTime(usecTimestampNow()),
|
_lastSendTime(usecTimestampNow()),
|
||||||
_notify(notify)
|
_notify(notify)
|
||||||
{
|
{
|
||||||
|
@ -37,16 +42,64 @@ bool PacketSender::process() {
|
||||||
uint64_t USECS_PER_SECOND = 1000 * 1000;
|
uint64_t USECS_PER_SECOND = 1000 * 1000;
|
||||||
uint64_t SEND_INTERVAL_USECS = (_packetsPerSecond == 0) ? USECS_PER_SECOND : (USECS_PER_SECOND / _packetsPerSecond);
|
uint64_t SEND_INTERVAL_USECS = (_packetsPerSecond == 0) ? USECS_PER_SECOND : (USECS_PER_SECOND / _packetsPerSecond);
|
||||||
|
|
||||||
|
// keep track of our process call times, so we have a reliable account of how often our caller calls us
|
||||||
|
uint64_t now = usecTimestampNow();
|
||||||
|
uint64_t elapsedSinceLastCall = now - _lastProcessCallTime;
|
||||||
|
_lastProcessCallTime = now;
|
||||||
|
_averageProcessCallTime.updateAverage(elapsedSinceLastCall);
|
||||||
|
|
||||||
if (_packets.size() == 0) {
|
if (_packets.size() == 0) {
|
||||||
usleep(SEND_INTERVAL_USECS);
|
if (isThreaded()) {
|
||||||
|
usleep(SEND_INTERVAL_USECS);
|
||||||
|
} else {
|
||||||
|
return isStillRunning(); // in non-threaded mode, if there's nothing to do, just return, keep running till they terminate us
|
||||||
|
}
|
||||||
}
|
}
|
||||||
while (_packets.size() > 0) {
|
|
||||||
|
int packetsPerCall = _packets.size(); // in threaded mode, we just empty this!
|
||||||
|
int packetsThisCall = 0;
|
||||||
|
|
||||||
|
// if we're in non-threaded mode, then we actually need to determine how many packets to send per call to process
|
||||||
|
// based on how often we get called... We do this by keeping a running average of our call times, and we determine
|
||||||
|
// how many packets to send per call
|
||||||
|
if (!isThreaded()) {
|
||||||
|
int averageCallTime;
|
||||||
|
const int TRUST_AVERAGE_AFTER = AVERAGE_CALL_TIME_SAMPLES * 2;
|
||||||
|
if (_usecsPerProcessCallHint == 0 || _averageProcessCallTime.getSampleCount() > TRUST_AVERAGE_AFTER) {
|
||||||
|
averageCallTime = _averageProcessCallTime.getAverage();
|
||||||
|
} else {
|
||||||
|
averageCallTime = _usecsPerProcessCallHint;
|
||||||
|
}
|
||||||
|
|
||||||
|
// we can determine how many packets we need to send per call to achieve our desired
|
||||||
|
// packets per second send rate.
|
||||||
|
int callsPerSecond = USECS_PER_SECOND / averageCallTime;
|
||||||
|
packetsPerCall = ceil(_packetsPerSecond / callsPerSecond);
|
||||||
|
|
||||||
|
// send at least one packet per call, if we have it
|
||||||
|
if (packetsPerCall < 1) {
|
||||||
|
packetsPerCall = 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
bool keepGoing = _packets.size() > 0;
|
||||||
|
while (keepGoing) {
|
||||||
|
|
||||||
|
// in threaded mode, we go till we're empty
|
||||||
|
if (isThreaded()) {
|
||||||
|
keepGoing = _packets.size() > 0;
|
||||||
|
} else {
|
||||||
|
// in non-threaded mode, we send as many packets as we need per expected call to process()
|
||||||
|
keepGoing = (packetsThisCall < packetsPerCall) && (_packets.size() > 0);
|
||||||
|
}
|
||||||
|
|
||||||
NetworkPacket& packet = _packets.front();
|
NetworkPacket& packet = _packets.front();
|
||||||
|
|
||||||
// send the packet through the NodeList...
|
// send the packet through the NodeList...
|
||||||
UDPSocket* nodeSocket = NodeList::getInstance()->getNodeSocket();
|
UDPSocket* nodeSocket = NodeList::getInstance()->getNodeSocket();
|
||||||
|
|
||||||
nodeSocket->send(&packet.getAddress(), packet.getData(), packet.getLength());
|
nodeSocket->send(&packet.getAddress(), packet.getData(), packet.getLength());
|
||||||
|
packetsThisCall++;
|
||||||
|
|
||||||
if (_notify) {
|
if (_notify) {
|
||||||
_notify->packetSentNotification(packet.getLength());
|
_notify->packetSentNotification(packet.getLength());
|
||||||
|
@ -56,34 +109,18 @@ bool PacketSender::process() {
|
||||||
_packets.erase(_packets.begin());
|
_packets.erase(_packets.begin());
|
||||||
unlock();
|
unlock();
|
||||||
|
|
||||||
uint64_t now = usecTimestampNow();
|
// dynamically sleep until we need to fire off the next set of voxels we only sleep in threaded mode
|
||||||
// dynamically sleep until we need to fire off the next set of voxels
|
if (isThreaded()) {
|
||||||
uint64_t elapsed = now - _lastSendTime;
|
uint64_t elapsed = now - _lastSendTime;
|
||||||
int usecToSleep = SEND_INTERVAL_USECS - elapsed;
|
int usecToSleep = std::max(SEND_INTERVAL_USECS, SEND_INTERVAL_USECS - elapsed);
|
||||||
_lastSendTime = now;
|
|
||||||
if (usecToSleep > 0) {
|
// we only sleep in non-threaded mode
|
||||||
usleep(usecToSleep);
|
if (usecToSleep > 0) {
|
||||||
|
usleep(usecToSleep);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
_lastSendTime = now;
|
||||||
}
|
}
|
||||||
|
|
||||||
return isStillRunning(); // keep running till they terminate us
|
return isStillRunning(); // keep running till they terminate us
|
||||||
}
|
}
|
||||||
|
|
||||||
void PacketSender::processWithoutSleep() {
|
|
||||||
while (_packets.size() > 0) {
|
|
||||||
NetworkPacket& packet = _packets.front();
|
|
||||||
|
|
||||||
// send the packet through the NodeList...
|
|
||||||
UDPSocket* nodeSocket = NodeList::getInstance()->getNodeSocket();
|
|
||||||
|
|
||||||
nodeSocket->send(&packet.getAddress(), packet.getData(), packet.getLength());
|
|
||||||
|
|
||||||
if (_notify) {
|
|
||||||
_notify->packetSentNotification(packet.getLength());
|
|
||||||
}
|
|
||||||
|
|
||||||
lock();
|
|
||||||
_packets.erase(_packets.begin());
|
|
||||||
unlock();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
|
@ -43,14 +43,24 @@ public:
|
||||||
PacketSenderNotify* getPacketSenderNotify() const { return _notify; }
|
PacketSenderNotify* getPacketSenderNotify() const { return _notify; }
|
||||||
|
|
||||||
virtual bool process();
|
virtual bool process();
|
||||||
virtual void processWithoutSleep();
|
|
||||||
|
/// are there packets waiting in the send queue to be sent
|
||||||
|
bool hasPacketsToSend() const { return _packets.size() > 0; }
|
||||||
|
|
||||||
|
/// how many packets are there in the send queue waiting to be sent
|
||||||
|
int packetsToSendCount() const { return _packets.size(); }
|
||||||
|
|
||||||
|
/// If you're running in non-threaded mode, call this to give us a hint as to how frequently you will call process.
|
||||||
|
/// This has no effect in threaded mode. This is only considered a hint in non-threaded mode.
|
||||||
|
/// \param int usecsPerProcessCall expected number of usecs between calls to process in non-threaded mode.
|
||||||
|
void setProcessCallIntervalHint(int usecsPerProcessCall) { _usecsPerProcessCallHint = usecsPerProcessCall; }
|
||||||
|
|
||||||
protected:
|
protected:
|
||||||
int _packetsPerSecond;
|
int _packetsPerSecond;
|
||||||
|
int _usecsPerProcessCallHint;
|
||||||
|
uint64_t _lastProcessCallTime;
|
||||||
|
SimpleMovingAverage _averageProcessCallTime;
|
||||||
|
|
||||||
bool hasPacketsToSend() const { return _packets.size() > 0; }
|
|
||||||
int packetsToSendCount() const { return _packets.size(); }
|
|
||||||
|
|
||||||
private:
|
private:
|
||||||
std::vector<NetworkPacket> _packets;
|
std::vector<NetworkPacket> _packets;
|
||||||
uint64_t _lastSendTime;
|
uint64_t _lastSendTime;
|
||||||
|
|
|
@ -8,6 +8,8 @@
|
||||||
// Threaded or non-threaded voxel packet Sender for the Application
|
// Threaded or non-threaded voxel packet Sender for the Application
|
||||||
//
|
//
|
||||||
|
|
||||||
|
#include <assert.h>
|
||||||
|
|
||||||
#include <PerfStat.h>
|
#include <PerfStat.h>
|
||||||
|
|
||||||
#include <OctalCode.h>
|
#include <OctalCode.h>
|
||||||
|
@ -15,12 +17,38 @@
|
||||||
#include "VoxelEditPacketSender.h"
|
#include "VoxelEditPacketSender.h"
|
||||||
|
|
||||||
|
|
||||||
|
EditPacketBuffer::EditPacketBuffer(PACKET_TYPE type, unsigned char* buffer, ssize_t length, uint16_t nodeID) {
|
||||||
|
_nodeID = nodeID;
|
||||||
|
_currentType = type;
|
||||||
|
_currentSize = length;
|
||||||
|
memcpy(_currentBuffer, buffer, length);
|
||||||
|
};
|
||||||
|
|
||||||
|
const int VoxelEditPacketSender::DEFAULT_MAX_PENDING_MESSAGES = PacketSender::DEFAULT_PACKETS_PER_SECOND;
|
||||||
|
|
||||||
|
|
||||||
VoxelEditPacketSender::VoxelEditPacketSender(PacketSenderNotify* notify) :
|
VoxelEditPacketSender::VoxelEditPacketSender(PacketSenderNotify* notify) :
|
||||||
PacketSender(notify),
|
PacketSender(notify),
|
||||||
_shouldSend(true),
|
_shouldSend(true),
|
||||||
|
_maxPendingMessages(DEFAULT_MAX_PENDING_MESSAGES),
|
||||||
|
_releaseQueuedMessagesPending(false),
|
||||||
_voxelServerJurisdictions(NULL) {
|
_voxelServerJurisdictions(NULL) {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
VoxelEditPacketSender::~VoxelEditPacketSender() {
|
||||||
|
while (!_preServerSingleMessagePackets.empty()) {
|
||||||
|
EditPacketBuffer* packet = _preServerSingleMessagePackets.front();
|
||||||
|
delete packet;
|
||||||
|
_preServerSingleMessagePackets.erase(_preServerSingleMessagePackets.begin());
|
||||||
|
}
|
||||||
|
while (!_preServerPackets.empty()) {
|
||||||
|
EditPacketBuffer* packet = _preServerPackets.front();
|
||||||
|
delete packet;
|
||||||
|
_preServerPackets.erase(_preServerPackets.begin());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
void VoxelEditPacketSender::sendVoxelEditMessage(PACKET_TYPE type, VoxelDetail& detail) {
|
void VoxelEditPacketSender::sendVoxelEditMessage(PACKET_TYPE type, VoxelDetail& detail) {
|
||||||
// allows app to disable sending if for example voxels have been disabled
|
// allows app to disable sending if for example voxels have been disabled
|
||||||
if (!_shouldSend) {
|
if (!_shouldSend) {
|
||||||
|
@ -30,13 +58,49 @@ void VoxelEditPacketSender::sendVoxelEditMessage(PACKET_TYPE type, VoxelDetail&
|
||||||
unsigned char* bufferOut;
|
unsigned char* bufferOut;
|
||||||
int sizeOut;
|
int sizeOut;
|
||||||
|
|
||||||
|
// This encodes the voxel edit message into a buffer...
|
||||||
if (createVoxelEditMessage(type, 0, 1, &detail, bufferOut, sizeOut)){
|
if (createVoxelEditMessage(type, 0, 1, &detail, bufferOut, sizeOut)){
|
||||||
actuallySendMessage(UNKNOWN_NODE_ID, bufferOut, sizeOut); // sends to all servers... not ideal!
|
|
||||||
|
// If we don't have voxel jurisdictions, then we will simply queue up these packets and wait till we have
|
||||||
|
// jurisdictions for processing
|
||||||
|
if (!voxelServersExist()) {
|
||||||
|
|
||||||
|
// If we're asked to save messages while waiting for voxel servers to arrive, then do so...
|
||||||
|
if (_maxPendingMessages > 0) {
|
||||||
|
EditPacketBuffer* packet = new EditPacketBuffer(type, bufferOut, sizeOut);
|
||||||
|
_preServerSingleMessagePackets.push_back(packet);
|
||||||
|
// if we've saved MORE than out max, then clear out the oldest packet...
|
||||||
|
int allPendingMessages = _preServerSingleMessagePackets.size() + _preServerPackets.size();
|
||||||
|
if (allPendingMessages > _maxPendingMessages) {
|
||||||
|
EditPacketBuffer* packet = _preServerSingleMessagePackets.front();
|
||||||
|
delete packet;
|
||||||
|
_preServerSingleMessagePackets.erase(_preServerSingleMessagePackets.begin());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return; // bail early
|
||||||
|
} else {
|
||||||
|
queuePacketToNode(UNKNOWN_NODE_ID, bufferOut, sizeOut); // sends to all servers... not ideal!
|
||||||
|
}
|
||||||
|
|
||||||
|
// either way, clean up the created buffer
|
||||||
delete[] bufferOut;
|
delete[] bufferOut;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void VoxelEditPacketSender::actuallySendMessage(uint16_t nodeID, unsigned char* bufferOut, ssize_t sizeOut) {
|
bool VoxelEditPacketSender::voxelServersExist() const {
|
||||||
|
NodeList* nodeList = NodeList::getInstance();
|
||||||
|
for (NodeList::iterator node = nodeList->begin(); node != nodeList->end(); node++) {
|
||||||
|
// only send to the NodeTypes that are NODE_TYPE_VOXEL_SERVER
|
||||||
|
if (node->getActiveSocket() != NULL && node->getType() == NODE_TYPE_VOXEL_SERVER) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
// This method is called when the edit packet layer has determined that it has a fully formed packet destined for
|
||||||
|
// a known nodeID. However, we also want to handle the case where the
|
||||||
|
void VoxelEditPacketSender::queuePacketToNode(uint16_t nodeID, unsigned char* bufferOut, ssize_t sizeOut) {
|
||||||
NodeList* nodeList = NodeList::getInstance();
|
NodeList* nodeList = NodeList::getInstance();
|
||||||
for (NodeList::iterator node = nodeList->begin(); node != nodeList->end(); node++) {
|
for (NodeList::iterator node = nodeList->begin(); node != nodeList->end(); node++) {
|
||||||
// only send to the NodeTypes that are NODE_TYPE_VOXEL_SERVER
|
// only send to the NodeTypes that are NODE_TYPE_VOXEL_SERVER
|
||||||
|
@ -63,12 +127,87 @@ void VoxelEditPacketSender::queueVoxelEditMessages(PACKET_TYPE type, int numberO
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void VoxelEditPacketSender::queueVoxelEditMessage(PACKET_TYPE type, unsigned char* codeColorBuffer, ssize_t length) {
|
void VoxelEditPacketSender::processPreServerExistsPackets() {
|
||||||
|
assert(voxelServersExist()); // we should only be here if we have jurisdictions
|
||||||
|
|
||||||
|
// First send out all the single message packets...
|
||||||
|
while (!_preServerSingleMessagePackets.empty()) {
|
||||||
|
EditPacketBuffer* packet = _preServerSingleMessagePackets.front();
|
||||||
|
queuePacketToNode(UNKNOWN_NODE_ID, &packet->_currentBuffer[0], packet->_currentSize);
|
||||||
|
delete packet;
|
||||||
|
_preServerSingleMessagePackets.erase(_preServerSingleMessagePackets.begin());
|
||||||
|
}
|
||||||
|
|
||||||
|
// Then "process" all the packable messages...
|
||||||
|
while (!_preServerPackets.empty()) {
|
||||||
|
EditPacketBuffer* packet = _preServerPackets.front();
|
||||||
|
queueVoxelEditMessage(packet->_currentType, &packet->_currentBuffer[0], packet->_currentSize);
|
||||||
|
delete packet;
|
||||||
|
_preServerPackets.erase(_preServerPackets.begin());
|
||||||
|
}
|
||||||
|
|
||||||
|
// if while waiting for the jurisdictions the caller called releaseQueuedMessages()
|
||||||
|
// then we want to honor that request now.
|
||||||
|
if (_releaseQueuedMessagesPending) {
|
||||||
|
releaseQueuedMessages();
|
||||||
|
_releaseQueuedMessagesPending = false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void VoxelEditPacketSender::queueVoxelEditMessageToNodes(unsigned char* codeColorBuffer, ssize_t length) {
|
||||||
if (!_shouldSend) {
|
if (!_shouldSend) {
|
||||||
return; // bail early
|
return; // bail early
|
||||||
}
|
}
|
||||||
|
|
||||||
|
assert(voxelServersExist()); // we must have jurisdictions to be here!!
|
||||||
|
|
||||||
|
// We want to filter out edit messages for voxel servers based on the server's Jurisdiction
|
||||||
|
// But we can't really do that with a packed message, since each edit message could be destined
|
||||||
|
// for a different voxel server... So we need to actually manage multiple queued packets... one
|
||||||
|
// for each voxel server
|
||||||
|
NodeList* nodeList = NodeList::getInstance();
|
||||||
|
for (NodeList::iterator node = nodeList->begin(); node != nodeList->end(); node++) {
|
||||||
|
// only send to the NodeTypes that are NODE_TYPE_VOXEL_SERVER
|
||||||
|
if (node->getActiveSocket() != NULL && node->getType() == NODE_TYPE_VOXEL_SERVER) {
|
||||||
|
uint16_t nodeID = node->getNodeID();
|
||||||
|
bool isMyJurisdiction = true;
|
||||||
|
|
||||||
|
// we need to get the jurisdiction for this
|
||||||
|
// here we need to get the "pending packet" for this server
|
||||||
|
const JurisdictionMap& map = (*_voxelServerJurisdictions)[nodeID];
|
||||||
|
isMyJurisdiction = (map.isMyJurisdiction(codeColorBuffer, CHECK_NODE_ONLY) == JurisdictionMap::WITHIN);
|
||||||
|
|
||||||
|
if (isMyJurisdiction) {
|
||||||
|
queuePacketToNode(nodeID, codeColorBuffer, length);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
void VoxelEditPacketSender::queueVoxelEditMessage(PACKET_TYPE type, unsigned char* codeColorBuffer, ssize_t length) {
|
||||||
|
if (!_shouldSend) {
|
||||||
|
return; // bail early
|
||||||
|
}
|
||||||
|
|
||||||
|
// If we don't have voxel jurisdictions, then we will simply queue up all of these packets and wait till we have
|
||||||
|
// jurisdictions for processing
|
||||||
|
if (!voxelServersExist()) {
|
||||||
|
if (_maxPendingMessages > 0) {
|
||||||
|
EditPacketBuffer* packet = new EditPacketBuffer(type, codeColorBuffer, length);
|
||||||
|
_preServerPackets.push_back(packet);
|
||||||
|
|
||||||
|
// if we've saved MORE than out max, then clear out the oldest packet...
|
||||||
|
int allPendingMessages = _preServerSingleMessagePackets.size() + _preServerPackets.size();
|
||||||
|
if (allPendingMessages > _maxPendingMessages) {
|
||||||
|
EditPacketBuffer* packet = _preServerPackets.front();
|
||||||
|
delete packet;
|
||||||
|
_preServerPackets.erase(_preServerPackets.begin());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return; // bail early
|
||||||
|
}
|
||||||
|
|
||||||
// We want to filter out edit messages for voxel servers based on the server's Jurisdiction
|
// We want to filter out edit messages for voxel servers based on the server's Jurisdiction
|
||||||
// But we can't really do that with a packed message, since each edit message could be destined
|
// But we can't really do that with a packed message, since each edit message could be destined
|
||||||
// for a different voxel server... So we need to actually manage multiple queued packets... one
|
// for a different voxel server... So we need to actually manage multiple queued packets... one
|
||||||
|
@ -93,7 +232,7 @@ void VoxelEditPacketSender::queueVoxelEditMessage(PACKET_TYPE type, unsigned cha
|
||||||
// If we're switching type, then we send the last one and start over
|
// If we're switching type, then we send the last one and start over
|
||||||
if ((type != packetBuffer._currentType && packetBuffer._currentSize > 0) ||
|
if ((type != packetBuffer._currentType && packetBuffer._currentSize > 0) ||
|
||||||
(packetBuffer._currentSize + length >= MAX_PACKET_SIZE)) {
|
(packetBuffer._currentSize + length >= MAX_PACKET_SIZE)) {
|
||||||
flushQueue(packetBuffer);
|
releaseQueuedPacket(packetBuffer);
|
||||||
initializePacket(packetBuffer, type);
|
initializePacket(packetBuffer, type);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -109,14 +248,21 @@ void VoxelEditPacketSender::queueVoxelEditMessage(PACKET_TYPE type, unsigned cha
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void VoxelEditPacketSender::flushQueue() {
|
void VoxelEditPacketSender::releaseQueuedMessages() {
|
||||||
for (std::map<uint16_t,EditPacketBuffer>::iterator i = _pendingEditPackets.begin(); i != _pendingEditPackets.end(); i++) {
|
// if we don't yet have jurisdictions then we can't actually release messages yet because we don't
|
||||||
flushQueue(i->second);
|
// know where to send them to. Instead, just remember this request and when we eventually get jurisdictions
|
||||||
|
// call release again at that time.
|
||||||
|
if (!voxelServersExist()) {
|
||||||
|
_releaseQueuedMessagesPending = true;
|
||||||
|
} else {
|
||||||
|
for (std::map<uint16_t,EditPacketBuffer>::iterator i = _pendingEditPackets.begin(); i != _pendingEditPackets.end(); i++) {
|
||||||
|
releaseQueuedPacket(i->second);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void VoxelEditPacketSender::flushQueue(EditPacketBuffer& packetBuffer) {
|
void VoxelEditPacketSender::releaseQueuedPacket(EditPacketBuffer& packetBuffer) {
|
||||||
actuallySendMessage(packetBuffer._nodeID, &packetBuffer._currentBuffer[0], packetBuffer._currentSize);
|
queuePacketToNode(packetBuffer._nodeID, &packetBuffer._currentBuffer[0], packetBuffer._currentSize);
|
||||||
packetBuffer._currentSize = 0;
|
packetBuffer._currentSize = 0;
|
||||||
packetBuffer._currentType = PACKET_TYPE_UNKNOWN;
|
packetBuffer._currentType = PACKET_TYPE_UNKNOWN;
|
||||||
}
|
}
|
||||||
|
@ -128,3 +274,14 @@ void VoxelEditPacketSender::initializePacket(EditPacketBuffer& packetBuffer, PAC
|
||||||
packetBuffer._currentSize += sizeof(unsigned short int); // set to command + sequence
|
packetBuffer._currentSize += sizeof(unsigned short int); // set to command + sequence
|
||||||
packetBuffer._currentType = type;
|
packetBuffer._currentType = type;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bool VoxelEditPacketSender::process() {
|
||||||
|
// if we have server jurisdiction details, and we have pending pre-jurisdiction packets, then process those
|
||||||
|
// before doing our normal process step. This processPreJurisdictionPackets()
|
||||||
|
if (voxelServersExist() && (!_preServerPackets.empty() || !_preServerSingleMessagePackets.empty() )) {
|
||||||
|
processPreServerExistsPackets();
|
||||||
|
}
|
||||||
|
|
||||||
|
// base class does most of the work.
|
||||||
|
return PacketSender::process();
|
||||||
|
}
|
||||||
|
|
|
@ -20,45 +20,87 @@
|
||||||
class EditPacketBuffer {
|
class EditPacketBuffer {
|
||||||
public:
|
public:
|
||||||
EditPacketBuffer() { _currentSize = 0; _currentType = PACKET_TYPE_UNKNOWN; _nodeID = UNKNOWN_NODE_ID; }
|
EditPacketBuffer() { _currentSize = 0; _currentType = PACKET_TYPE_UNKNOWN; _nodeID = UNKNOWN_NODE_ID; }
|
||||||
|
EditPacketBuffer(PACKET_TYPE type, unsigned char* codeColorBuffer, ssize_t length, uint16_t nodeID = UNKNOWN_NODE_ID);
|
||||||
uint16_t _nodeID;
|
uint16_t _nodeID;
|
||||||
PACKET_TYPE _currentType;
|
PACKET_TYPE _currentType;
|
||||||
unsigned char _currentBuffer[MAX_PACKET_SIZE];
|
unsigned char _currentBuffer[MAX_PACKET_SIZE];
|
||||||
ssize_t _currentSize;
|
ssize_t _currentSize;
|
||||||
};
|
};
|
||||||
|
|
||||||
/// Threaded processor for queueing and sending of outbound edit voxel packets.
|
/// Utility for processing, packing, queueing and sending of outbound edit voxel messages.
|
||||||
class VoxelEditPacketSender : public PacketSender {
|
class VoxelEditPacketSender : public virtual PacketSender {
|
||||||
public:
|
public:
|
||||||
VoxelEditPacketSender(PacketSenderNotify* notify = NULL);
|
VoxelEditPacketSender(PacketSenderNotify* notify = NULL);
|
||||||
|
~VoxelEditPacketSender();
|
||||||
|
|
||||||
/// Send voxel edit message immediately
|
/// Send voxel edit message immediately
|
||||||
void sendVoxelEditMessage(PACKET_TYPE type, VoxelDetail& detail);
|
void sendVoxelEditMessage(PACKET_TYPE type, VoxelDetail& detail);
|
||||||
|
|
||||||
/// Queues a voxel edit message. Will potentially sends a pending multi-command packet. Determines which voxel-server
|
/// Queues a single voxel edit message. Will potentially send a pending multi-command packet. Determines which voxel-server
|
||||||
/// node or nodes the packet should be sent to.
|
/// node or nodes the packet should be sent to. Can be called even before voxel servers are known, in which case up to
|
||||||
|
/// MaxPendingMessages will be buffered and processed when voxel servers are known.
|
||||||
void queueVoxelEditMessage(PACKET_TYPE type, unsigned char* codeColorBuffer, ssize_t length);
|
void queueVoxelEditMessage(PACKET_TYPE type, unsigned char* codeColorBuffer, ssize_t length);
|
||||||
|
|
||||||
/// Queues an array of several voxel edit messages. Will potentially send a pending multi-command packet. Determines
|
/// Queues an array of several voxel edit messages. Will potentially send a pending multi-command packet. Determines
|
||||||
/// which voxel-server node or nodes the packet should be sent to.
|
/// which voxel-server node or nodes the packet should be sent to. Can be called even before voxel servers are known, in
|
||||||
|
/// which case up to MaxPendingMessages will be buffered and processed when voxel servers are known.
|
||||||
void queueVoxelEditMessages(PACKET_TYPE type, int numberOfDetails, VoxelDetail* details);
|
void queueVoxelEditMessages(PACKET_TYPE type, int numberOfDetails, VoxelDetail* details);
|
||||||
|
|
||||||
/// flushes all queued packets for all nodes
|
/// Releases all queued messages even if those messages haven't filled an MTU packet. This will move the packed message
|
||||||
void flushQueue();
|
/// packets onto the send queue. If running in threaded mode, the caller does not need to do any further processing to
|
||||||
|
/// have these packets get sent. If running in non-threaded mode, the caller must still call process() on a regular
|
||||||
|
/// interval to ensure that the packets are actually sent. Can be called even before voxel servers are known, in
|
||||||
|
/// which case up to MaxPendingMessages of the released messages will be buffered and actually released when
|
||||||
|
/// voxel servers are known.
|
||||||
|
void releaseQueuedMessages();
|
||||||
|
|
||||||
|
/// are we in sending mode. If we're not in sending mode then all packets and messages will be ignored and
|
||||||
|
/// not queued and not sent
|
||||||
bool getShouldSend() const { return _shouldSend; }
|
bool getShouldSend() const { return _shouldSend; }
|
||||||
|
|
||||||
|
/// set sending mode. By default we are set to shouldSend=TRUE and packets will be sent. If shouldSend=FALSE, then we'll
|
||||||
|
/// switch to not sending mode, and all packets and messages will be ignored, not queued, and not sent. This might be used
|
||||||
|
/// in an application like interface when all voxel features are disabled.
|
||||||
void setShouldSend(bool shouldSend) { _shouldSend = shouldSend; }
|
void setShouldSend(bool shouldSend) { _shouldSend = shouldSend; }
|
||||||
|
|
||||||
|
/// call this to inform the VoxelEditPacketSender of the voxel server jurisdictions. This is required for normal operation.
|
||||||
|
/// The internal contents of the jurisdiction map may change throughout the lifetime of the VoxelEditPacketSender. This map
|
||||||
|
/// can be set prior to voxel servers being present, so long as the contents of the map accurately reflect the current
|
||||||
|
/// known jurisdictions.
|
||||||
void setVoxelServerJurisdictions(NodeToJurisdictionMap* voxelServerJurisdictions) {
|
void setVoxelServerJurisdictions(NodeToJurisdictionMap* voxelServerJurisdictions) {
|
||||||
_voxelServerJurisdictions = voxelServerJurisdictions;
|
_voxelServerJurisdictions = voxelServerJurisdictions;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// if you're running in non-threaded mode, you must call this method regularly
|
||||||
|
virtual bool process();
|
||||||
|
|
||||||
|
/// Set the desired number of pending messages that the VoxelEditPacketSender should attempt to queue even if voxel
|
||||||
|
/// servers are not present. This only applies to how the VoxelEditPacketSender will manage messages when no voxel
|
||||||
|
/// servers are present. By default, this value is the same as the default packets that will be sent in one second.
|
||||||
|
/// Which means the VoxelEditPacketSender will not buffer all messages given to it if no voxel servers are present.
|
||||||
|
/// This is the maximum number of queued messages and single messages.
|
||||||
|
void setMaxPendingMessages(int maxPendingMessages) { _maxPendingMessages = maxPendingMessages; }
|
||||||
|
|
||||||
|
// the default number of pending messages we will store if no voxel servers are available
|
||||||
|
static const int DEFAULT_MAX_PENDING_MESSAGES;
|
||||||
|
|
||||||
private:
|
private:
|
||||||
bool _shouldSend;
|
bool _shouldSend;
|
||||||
void actuallySendMessage(uint16_t nodeID, unsigned char* bufferOut, ssize_t sizeOut);
|
void queuePacketToNode(uint16_t nodeID, unsigned char* bufferOut, ssize_t sizeOut);
|
||||||
|
void queueVoxelEditMessageToNodes(unsigned char* codeColorBuffer, ssize_t length);
|
||||||
void initializePacket(EditPacketBuffer& packetBuffer, PACKET_TYPE type);
|
void initializePacket(EditPacketBuffer& packetBuffer, PACKET_TYPE type);
|
||||||
void flushQueue(EditPacketBuffer& packetBuffer); // flushes specific queued packet
|
void releaseQueuedPacket(EditPacketBuffer& packetBuffer); // releases specific queued packet
|
||||||
|
bool voxelServersExist() const;
|
||||||
|
void processPreServerExistsPackets();
|
||||||
|
|
||||||
|
// These are packets which are destined from know servers but haven't been released because they're still too small
|
||||||
std::map<uint16_t,EditPacketBuffer> _pendingEditPackets;
|
std::map<uint16_t,EditPacketBuffer> _pendingEditPackets;
|
||||||
|
|
||||||
|
// These are packets that are waiting to be processed because we don't yet know if there are voxel servers
|
||||||
|
int _maxPendingMessages;
|
||||||
|
bool _releaseQueuedMessagesPending;
|
||||||
|
std::vector<EditPacketBuffer*> _preServerPackets; // these will get packed into other larger packets
|
||||||
|
std::vector<EditPacketBuffer*> _preServerSingleMessagePackets; // these will go out as is
|
||||||
|
|
||||||
NodeToJurisdictionMap* _voxelServerJurisdictions;
|
NodeToJurisdictionMap* _voxelServerJurisdictions;
|
||||||
};
|
};
|
||||||
|
|
Loading…
Reference in a new issue