mirror of
https://github.com/overte-org/overte.git
synced 2025-04-22 17:33:28 +02:00
some side by side plumbing for NLPackets and NLPacketLists
This commit is contained in:
parent
de9eeff7d2
commit
c6947dd165
5 changed files with 80 additions and 15 deletions
libraries
networking/src
octree/src
|
@ -40,6 +40,9 @@ const quint64 DOMAIN_SERVER_CHECK_IN_MSECS = 1 * 1000;
|
|||
|
||||
const int MAX_SILENT_DOMAIN_SERVER_CHECK_INS = 5;
|
||||
|
||||
using PacketOrPacketList = std::pair<std::unique_ptr<NLPacket>, std::unique_ptr<NLPacketList>>;
|
||||
using NodePacketOrPacketListPair = std::pair<SharedNodePointer, PacketOrPacketList>;
|
||||
|
||||
using NodePacketPair = std::pair<SharedNodePointer, std::unique_ptr<NLPacket>>;
|
||||
using NodeSharedPacketPair = std::pair<SharedNodePointer, QSharedPointer<NLPacket>>;
|
||||
using NodeSharedReceivedMessagePair = std::pair<SharedNodePointer, QSharedPointer<ReceivedMessage>>;
|
||||
|
|
|
@ -53,7 +53,19 @@ void PacketSender::queuePacketForSending(const SharedNodePointer& destinationNod
|
|||
_totalBytesQueued += packet->getDataSize();
|
||||
|
||||
lock();
|
||||
_packets.push_back({destinationNode, std::move(packet)});
|
||||
_packets.push_back({destinationNode, PacketOrPacketList { std::move(packet), nullptr} });
|
||||
unlock();
|
||||
|
||||
// Make sure to wake our actual processing thread because we now have packets for it to process.
|
||||
_hasPackets.wakeAll();
|
||||
}
|
||||
|
||||
void PacketSender::queuePacketListForSending(const SharedNodePointer& destinationNode, std::unique_ptr<NLPacketList> packetList) {
|
||||
_totalPacketsQueued += packetList->getNumPackets();
|
||||
_totalBytesQueued += packetList->getMessageSize();
|
||||
|
||||
lock();
|
||||
_packets.push_back({ destinationNode, PacketOrPacketList { nullptr, std::move(packetList)} });
|
||||
unlock();
|
||||
|
||||
// Make sure to wake our actual processing thread because we now have packets for it to process.
|
||||
|
@ -178,7 +190,7 @@ bool PacketSender::nonThreadedProcess() {
|
|||
|
||||
|
||||
float averagePacketsPerCall = 0; // might be less than 1, if our caller calls us more frequently than the target PPS
|
||||
int packetsSentThisCall = 0;
|
||||
size_t packetsSentThisCall = 0;
|
||||
int packetsToSendThisCall = 0;
|
||||
|
||||
// Since we're in non-threaded mode, we need to determine how many packets to send per call to process
|
||||
|
@ -265,23 +277,31 @@ bool PacketSender::nonThreadedProcess() {
|
|||
while ((packetsSentThisCall < packetsToSendThisCall) && (packetsLeft > 0)) {
|
||||
lock();
|
||||
|
||||
NodePacketPair packetPair = std::move(_packets.front());
|
||||
NodePacketOrPacketListPair packetPair = std::move(_packets.front());
|
||||
_packets.pop_front();
|
||||
packetsLeft = _packets.size();
|
||||
|
||||
unlock();
|
||||
|
||||
// send the packet through the NodeList...
|
||||
DependencyManager::get<NodeList>()->sendUnreliablePacket(*packetPair.second, *packetPair.first);
|
||||
//PacketOrPacketList packetOrList = packetPair.second;
|
||||
bool sendAsPacket = packetPair.second.first.get();
|
||||
if (sendAsPacket) {
|
||||
DependencyManager::get<NodeList>()->sendUnreliablePacket(*packetPair.second.first, *packetPair.first);
|
||||
} else {
|
||||
DependencyManager::get<NodeList>()->sendPacketList(*packetPair.second.second, *packetPair.first);
|
||||
}
|
||||
|
||||
packetsSentThisCall++;
|
||||
_packetsOverCheckInterval++;
|
||||
_totalPacketsSent++;
|
||||
size_t packetSize = sendAsPacket ? packetPair.second.first->getDataSize() : packetPair.second.second->getMessageSize();
|
||||
size_t packetCount = sendAsPacket ? 1 : packetPair.second.second->getNumPackets();
|
||||
|
||||
packetsSentThisCall += packetCount;
|
||||
_packetsOverCheckInterval += packetCount;
|
||||
_totalPacketsSent += packetCount;
|
||||
|
||||
int packetSize = packetPair.second->getDataSize();
|
||||
|
||||
_totalBytesSent += packetSize;
|
||||
emit packetSent(packetSize);
|
||||
emit packetSent(packetSize); // FIXME should include number of packets?
|
||||
|
||||
_lastSendTime = now;
|
||||
}
|
||||
|
|
|
@ -39,6 +39,7 @@ public:
|
|||
|
||||
/// Add packet to outbound queue.
|
||||
void queuePacketForSending(const SharedNodePointer& destinationNode, std::unique_ptr<NLPacket> packet);
|
||||
void queuePacketListForSending(const SharedNodePointer& destinationNode, std::unique_ptr<NLPacketList> packetList);
|
||||
|
||||
void setPacketsPerSecond(int packetsPerSecond);
|
||||
int getPacketsPerSecond() const { return _packetsPerSecond; }
|
||||
|
@ -99,14 +100,14 @@ protected:
|
|||
SimpleMovingAverage _averageProcessCallTime;
|
||||
|
||||
private:
|
||||
std::list<NodePacketPair> _packets;
|
||||
std::list<NodePacketOrPacketListPair> _packets;
|
||||
quint64 _lastSendTime;
|
||||
|
||||
bool threadedProcess();
|
||||
bool nonThreadedProcess();
|
||||
|
||||
quint64 _lastPPSCheck;
|
||||
int _packetsOverCheckInterval;
|
||||
size_t _packetsOverCheckInterval;
|
||||
|
||||
quint64 _started;
|
||||
quint64 _totalPacketsSent;
|
||||
|
|
|
@ -115,6 +115,27 @@ void OctreeEditPacketSender::queuePacketToNode(const QUuid& nodeUUID, std::uniqu
|
|||
});
|
||||
}
|
||||
|
||||
// This method is called when the edit packet layer has determined that it has a fully formed packet destined for
|
||||
// a known nodeID.
|
||||
void OctreeEditPacketSender::queuePacketListToNode(const QUuid& nodeUUID, std::unique_ptr<NLPacketList> packetList) {
|
||||
|
||||
bool wantDebug = false;
|
||||
DependencyManager::get<NodeList>()->eachNode([&](const SharedNodePointer& node) {
|
||||
// only send to the NodeTypes that are getMyNodeType()
|
||||
if (node->getType() == getMyNodeType()
|
||||
&& ((node->getUUID() == nodeUUID) || (nodeUUID.isNull()))
|
||||
&& node->getActiveSocket()) {
|
||||
|
||||
// NOTE: unlike packets, the packet lists don't get rewritten sequence numbers.
|
||||
|
||||
// add packet to history -- we don't keep track of sent PacketLists
|
||||
//_sentPacketHistories[nodeUUID].packetSent(sequence, *packet);
|
||||
|
||||
queuePacketListForSending(node, std::move(packetList));
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
void OctreeEditPacketSender::processPreServerExistsPackets() {
|
||||
assert(serversExist()); // we should only be here if we have jurisdictions
|
||||
|
||||
|
@ -247,7 +268,7 @@ void OctreeEditPacketSender::queueOctreeEditMessage(PacketType type, QByteArray&
|
|||
});
|
||||
}
|
||||
if (isMyJurisdiction) {
|
||||
std::unique_ptr<NLPacket>& bufferedPacket = _pendingEditPackets[nodeUUID];
|
||||
std::unique_ptr<NLPacket>& bufferedPacket = _pendingEditPackets[nodeUUID].first; //only a NLPacket for now
|
||||
|
||||
if (!bufferedPacket) {
|
||||
bufferedPacket = initializePacket(type, node->getClockSkewUsec());
|
||||
|
@ -291,15 +312,24 @@ void OctreeEditPacketSender::releaseQueuedMessages() {
|
|||
} else {
|
||||
_packetsQueueLock.lock();
|
||||
for (auto& i : _pendingEditPackets) {
|
||||
if (i.second) {
|
||||
if (i.second.first) {
|
||||
// construct a null unique_ptr to an NL packet
|
||||
std::unique_ptr<NLPacket> releasedPacket;
|
||||
|
||||
// swap the null ptr with the packet we want to release
|
||||
i.second.swap(releasedPacket);
|
||||
i.second.first.swap(releasedPacket);
|
||||
|
||||
// move and release the queued packet
|
||||
releaseQueuedPacket(i.first, std::move(releasedPacket));
|
||||
} else if (i.second.second) {
|
||||
// construct a null unique_ptr to an NLPacketList
|
||||
std::unique_ptr<NLPacketList> releasedPacketList;
|
||||
|
||||
// swap the null ptr with the NLPacketList we want to release
|
||||
i.second.second.swap(releasedPacketList);
|
||||
|
||||
// move and release the queued NLPacketList
|
||||
releaseQueuedPacketList(i.first, std::move(releasedPacketList));
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -315,6 +345,14 @@ void OctreeEditPacketSender::releaseQueuedPacket(const QUuid& nodeID, std::uniqu
|
|||
_releaseQueuedPacketMutex.unlock();
|
||||
}
|
||||
|
||||
void OctreeEditPacketSender::releaseQueuedPacketList(const QUuid& nodeID, std::unique_ptr<NLPacketList> packetList) {
|
||||
_releaseQueuedPacketMutex.lock();
|
||||
if (packetList->getMessageSize() > 0 && packetList->getType() != PacketType::Unknown) {
|
||||
queuePacketListToNode(nodeID, std::move(packetList));
|
||||
}
|
||||
_releaseQueuedPacketMutex.unlock();
|
||||
}
|
||||
|
||||
std::unique_ptr<NLPacket> OctreeEditPacketSender::initializePacket(PacketType type, qint64 nodeClockSkew) {
|
||||
auto newPacket = NLPacket::create(type);
|
||||
|
||||
|
|
|
@ -87,15 +87,18 @@ protected:
|
|||
|
||||
bool _shouldSend;
|
||||
void queuePacketToNode(const QUuid& nodeID, std::unique_ptr<NLPacket> packet);
|
||||
void queuePacketListToNode(const QUuid& nodeUUID, std::unique_ptr<NLPacketList> packetList);
|
||||
|
||||
void queuePendingPacketToNodes(std::unique_ptr<NLPacket> packet);
|
||||
void queuePacketToNodes(std::unique_ptr<NLPacket> packet);
|
||||
std::unique_ptr<NLPacket> initializePacket(PacketType type, qint64 nodeClockSkew);
|
||||
void releaseQueuedPacket(const QUuid& nodeUUID, std::unique_ptr<NLPacket> packetBuffer); // releases specific queued packet
|
||||
void releaseQueuedPacketList(const QUuid& nodeID, std::unique_ptr<NLPacketList> packetList);
|
||||
|
||||
void processPreServerExistsPackets();
|
||||
|
||||
// These are packets which are destined from know servers but haven't been released because they're still too small
|
||||
std::unordered_map<QUuid, std::unique_ptr<NLPacket>> _pendingEditPackets;
|
||||
std::unordered_map<QUuid, PacketOrPacketList> _pendingEditPackets;
|
||||
|
||||
// These are packets that are waiting to be processed because we don't yet know if there are servers
|
||||
int _maxPendingMessages;
|
||||
|
|
Loading…
Reference in a new issue