Merge branch 'atp' of github.com:birarda/hifi into atp

Conflicts:
	libraries/audio-client/src/AudioIOStats.cpp
This commit is contained in:
Ryan Huffman 2015-07-07 16:26:22 -07:00
commit 60538d4a82
23 changed files with 196 additions and 203 deletions

View file

@ -213,7 +213,7 @@ void AvatarMixer::broadcastAvatarData() {
} }
// setup a PacketList for the avatarPackets // setup a PacketList for the avatarPackets
PacketList avatarPacketList(PacketType::AvatarData); NLPacketList avatarPacketList(PacketType::AvatarData);
// this is an AGENT we have received head data from // this is an AGENT we have received head data from
// send back a packet with other active node data to this node // send back a packet with other active node data to this node
@ -305,9 +305,9 @@ void AvatarMixer::broadcastAvatarData() {
|| otherNodeData->getBillboardChangeTimestamp() > _lastFrameTimestamp || otherNodeData->getBillboardChangeTimestamp() > _lastFrameTimestamp
|| randFloat() < BILLBOARD_AND_IDENTITY_SEND_PROBABILITY)) { || randFloat() < BILLBOARD_AND_IDENTITY_SEND_PROBABILITY)) {
auto billboardPacket { NLPacket::create(PacketType::AvatarBillboard); } auto billboardPacket = NLPacket::create(PacketType::AvatarBillboard);
billboardPacket.write(otherNode->getUUID().toRfc4122()); billboardPacket->write(otherNode->getUUID().toRfc4122());
billboardPacket.write(otherNodeData->getAvatar().getBillboard()); billboardPacket->write(otherNodeData->getAvatar().getBillboard());
nodeList->sendPacket(std::move(billboardPacket), node); nodeList->sendPacket(std::move(billboardPacket), node);
@ -319,12 +319,12 @@ void AvatarMixer::broadcastAvatarData() {
|| otherNodeData->getIdentityChangeTimestamp() > _lastFrameTimestamp || otherNodeData->getIdentityChangeTimestamp() > _lastFrameTimestamp
|| randFloat() < BILLBOARD_AND_IDENTITY_SEND_PROBABILITY)) { || randFloat() < BILLBOARD_AND_IDENTITY_SEND_PROBABILITY)) {
auto identityPacket { NLPacket::create(PacketType::AvatarIdentity); } auto identityPacket = NLPacket::create(PacketType::AvatarIdentity);
QByteArray individualData = otherNodeData->getAvatar().identityByteArray(); QByteArray individualData = otherNodeData->getAvatar().identityByteArray();
individualData.replace(0, NUM_BYTES_RFC4122_UUID, otherNode->getUUID().toRfc4122()); individualData.replace(0, NUM_BYTES_RFC4122_UUID, otherNode->getUUID().toRfc4122());
identityPacket.write(individualData); identityPacket->write(individualData);
nodeList->sendPacket(std::move(identityPacket), node); nodeList->sendPacket(std::move(identityPacket), node);
@ -361,8 +361,8 @@ void AvatarMixer::nodeKilled(SharedNodePointer killedNode) {
// this was an avatar we were sending to other people // this was an avatar we were sending to other people
// send a kill packet for it to our other nodes // send a kill packet for it to our other nodes
auto killPacket { NLPacket::create(PacketType::KillAvatar); } auto killPacket = NLPacket::create(PacketType::KillAvatar, NUM_BYTES_RFC4122_UUID);
killPacket.write(killedNode->getUUID().toRfc4122()); killPacket->write(killedNode->getUUID().toRfc4122());
nodeList->broadcastToNodes(killPacket, NodeSet() << NodeType::Agent); nodeList->broadcastToNodes(killPacket, NodeSet() << NodeType::Agent);

View file

@ -235,14 +235,13 @@ void OctreeInboundPacketProcessor::trackInboundPacket(const QUuid& nodeUUID, uns
} }
int OctreeInboundPacketProcessor::sendNackPackets() { int OctreeInboundPacketProcessor::sendNackPackets() {
int packetsSent = 0;
if (_shuttingDown) { if (_shuttingDown) {
qDebug() << "OctreeInboundPacketProcessor::sendNackPackets() while shutting down... ignore"; qDebug() << "OctreeInboundPacketProcessor::sendNackPackets() while shutting down... ignore";
return packetsSent; return 0;
} }
auto nackPacket { NLPacket::create(_myServer->getMyEditNackType(); } NLPacketList nackPacketList(_myServer->getMyEditNackType();
auto nodeList = DependencyManager::get<NodeList>();
NodeToSenderStatsMapIterator i = _singleSenderStats.begin(); NodeToSenderStatsMapIterator i = _singleSenderStats.begin();
while (i != _singleSenderStats.end()) { while (i != _singleSenderStats.end()) {
@ -271,37 +270,25 @@ int OctreeInboundPacketProcessor::sendNackPackets() {
// construct nack packet(s) for this node // construct nack packet(s) for this node
const QSet<unsigned short int>& missingSequenceNumbers = sequenceNumberStats.getMissingSet(); const QSet<unsigned short int>& missingSequenceNumbers = sequenceNumberStats.getMissingSet();
int numSequenceNumbersAvailable = missingSequenceNumbers.size();
QSet<unsigned short int>::const_iterator missingSequenceNumberIterator = missingSequenceNumbers.constBegin();
while (numSequenceNumbersAvailable > 0) {
auto nodeList = DependencyManager::get<NodeList>(); auto it = missingSequenceNumbers.constBegin();
nackPacket->reset(); while (it != missingSequenceNumbers.constEnd()) {
unsigned short int sequenceNumber = *it;
// calculate and pack the number of sequence numbers to nack nackPacketList->write(&sequenceNumber, sizeof(sequenceNumber));
int numSequenceNumbersRoomFor = (nackPacket->getCapacity() - sizeof(uint16_t)) / sizeof(unsigned short int); ++it;
uint16_t numSequenceNumbers = std::min(numSequenceNumbersAvailable, numSequenceNumbersRoomFor);
nackPacket->write(&numSequenceNumbers, sizeof(numSequenceNumbers));
// pack sequence numbers to nack
for (uint16_t i = 0; i < numSequenceNumbers; i++) {
unsigned short int sequenceNumber = *missingSequenceNumberIterator;
nackPacket->write(&sequenceNumber, sizeof(sequenceNumber));
missingSequenceNumberIterator++;
}
numSequenceNumbersAvailable -= numSequenceNumbers;
// send it
nodeList->sendUnreliablePacket(nackPacket, destinationNode);
packetsSent++;
qDebug() << "NACK Sent back to editor/client... destinationNode=" << nodeUUID;
} }
i++;
} }
int packetsSent = nackPacketList.getNumPackets();
if (packetsSent) {
qDebug() << "NACK Sent back to editor/client... destinationNode=" << nodeUUID;
}
// send the list of nack packets
nodeList->sendPacketList(nackPacketList, destinationNode);
return packetsSent; return packetsSent;
} }

View file

@ -108,9 +108,9 @@ bool OctreeQueryNode::packetIsDuplicate() const {
// since our packets now include header information, like sequence number, and createTime, we can't just do a memcmp // since our packets now include header information, like sequence number, and createTime, we can't just do a memcmp
// of the entire packet, we need to compare only the packet content... // of the entire packet, we need to compare only the packet content...
if (_lastOctreePacketLength == getPacketLength()) { if (_lastOctreePacketLength == _octreePacket->getSizeUsed()) {
if (memcmp(_lastOctreePayload + OCTREE_PACKET_EXTRA_HEADERS_SIZE, if (memcmp(_lastOctreePayload + OCTREE_PACKET_EXTRA_HEADERS_SIZE,
_octreePacket->getPayload() + OCTREE_PACKET_EXTRA_HEADERS_SIZE, _octreePacket->getPayload() + OCTREE_PACKET_EXTRA_HEADERS_SIZE,
_octreePacket->getSizeUsed() - OCTREE_PACKET_EXTRA_HEADERS_SIZE) == 0) { _octreePacket->getSizeUsed() - OCTREE_PACKET_EXTRA_HEADERS_SIZE) == 0) {
return true; return true;
} }
@ -379,6 +379,8 @@ void OctreeQueryNode::parseNackPacket(const QByteArray& packet) {
int numBytesPacketHeader = numBytesForPacketHeader(packet); int numBytesPacketHeader = numBytesForPacketHeader(packet);
const unsigned char* dataAt = reinterpret_cast<const unsigned char*>(packet.data()) + numBytesPacketHeader; const unsigned char* dataAt = reinterpret_cast<const unsigned char*>(packet.data()) + numBytesPacketHeader;
// TODO: This no longer has the number of sequence numbers - just read to the end of the packet in sequence number blocks
// read number of sequence numbers // read number of sequence numbers
uint16_t numSequenceNumbers = (*(uint16_t*)dataAt); uint16_t numSequenceNumbers = (*(uint16_t*)dataAt);
dataAt += sizeof(uint16_t); dataAt += sizeof(uint16_t);

View file

@ -147,7 +147,7 @@ int OctreeSendThread::handlePacketSend(OctreeQueryNode* nodeData, int& trueBytes
NLPacket& statsPacket = nodeData->stats.getStatsMessage(); NLPacket& statsPacket = nodeData->stats.getStatsMessage();
// If the size of the stats message and the octree message will fit in a packet, then piggyback them // If the size of the stats message and the octree message will fit in a packet, then piggyback them
if (nodeData->getPacket()->getSizeUsed() < statsPacket->bytesAvailable()) { if (nodeData->getPacket()->getSizeWithHeader() < statsPacket->bytesAvailable()) {
// copy octree message to back of stats message // copy octree message to back of stats message
statsPacket->write(nodeData->getPacket()->getData(), nodeData->getPacket()->getSizeWithHeader()); statsPacket->write(nodeData->getPacket()->getData(), nodeData->getPacket()->getSizeWithHeader());

View file

@ -622,8 +622,8 @@ void DomainServer::handleConnectRequest(const QByteArray& packet, const HifiSock
QByteArray utfString = reason.toUtf8(); QByteArray utfString = reason.toUtf8();
int payloadSize = utfString.size(); int payloadSize = utfString.size();
auto connectionDeniedPacket = NLPacket::make(PacketType::DomainConnectionDenied, payloadSize); auto connectionDeniedPacket = NLPacket::create(PacketType::DomainConnectionDenied, payloadSize);
connectionDeniedPacket.write(utfString); connectionDeniedPacket->write(utfString);
// tell client it has been refused. // tell client it has been refused.
limitedNodeList->sendPacket(std::move(connectionDeniedPacket, senderSockAddr); limitedNodeList->sendPacket(std::move(connectionDeniedPacket, senderSockAddr);
@ -927,7 +927,7 @@ void DomainServer::sendDomainListToNode(const SharedNodePointer& node, const Hif
const NodeSet& nodeInterestSet) { const NodeSet& nodeInterestSet) {
auto limitedNodeList = DependencyManager::get<LimitedNodeList>(); auto limitedNodeList = DependencyManager::get<LimitedNodeList>();
PacketList domainListPackets(PacketType::DomainList); NLPacketList domainListPackets(PacketType::DomainList);
// always send the node their own UUID back // always send the node their own UUID back
QDataStream domainListStream(&domainListPackets); QDataStream domainListStream(&domainListPackets);
@ -1448,7 +1448,7 @@ void DomainServer::processDatagram(const QByteArray& receivedPacket, const HifiS
processICEPingReply(receivedPacket, senderSockAddr); processICEPingReply(receivedPacket, senderSockAddr);
break; break;
} }
case PacketTypeIceServerPeerInformation: case PacketType::ICEServerPeerInformation:
processICEPeerInformation(receivedPacket); processICEPeerInformation(receivedPacket);
break; break;
default: default:

View file

@ -127,7 +127,7 @@ SharedNetworkPeer IceServer::addOrUpdateHeartbeatingPeer(const QByteArray& incom
} }
void IceServer::sendPeerInformationPacket(const NetworkPeer& peer, const HifiSockAddr* destinationSockAddr) { void IceServer::sendPeerInformationPacket(const NetworkPeer& peer, const HifiSockAddr* destinationSockAddr) {
auto peerPacket { NLPacket::create(PacketType::IceServerPeerInformation); } auto peerPacket = Packet::create(PacketType::IceServerPeerInformation);
// get the byte array for this peer // get the byte array for this peer
peerPacket->write(peer.toByteArray()); peerPacket->write(peer.toByteArray());

View file

@ -2660,9 +2660,7 @@ int Application::sendNackPackets() {
return 0; return 0;
} }
int packetsSent = 0; NLPacketList nackPacketList(PacketType::OctreeDataNack);
auto nackPacket { NLPacket::create(PacketType::OctreeDataNack); }
// iterates thru all nodes in NodeList // iterates thru all nodes in NodeList
auto nodeList = DependencyManager::get<NodeList>(); auto nodeList = DependencyManager::get<NodeList>();
@ -2676,7 +2674,7 @@ int Application::sendNackPackets() {
// if there are octree packets from this node that are waiting to be processed, // if there are octree packets from this node that are waiting to be processed,
// don't send a NACK since the missing packets may be among those waiting packets. // don't send a NACK since the missing packets may be among those waiting packets.
if (_octreeProcessor.hasPacketsToProcessFrom(nodeUUID)) { if (_octreeProcessor.hasPacketsToProcessFrom(nodeUUID)) {
return; return 0;
} }
_octreeSceneStatsLock.lockForRead(); _octreeSceneStatsLock.lockForRead();
@ -2684,7 +2682,7 @@ int Application::sendNackPackets() {
// retreive octree scene stats of this node // retreive octree scene stats of this node
if (_octreeServerSceneStats.find(nodeUUID) == _octreeServerSceneStats.end()) { if (_octreeServerSceneStats.find(nodeUUID) == _octreeServerSceneStats.end()) {
_octreeSceneStatsLock.unlock(); _octreeSceneStatsLock.unlock();
return; return 0;
} }
// get sequence number stats of node, prune its missing set, and make a copy of the missing set // get sequence number stats of node, prune its missing set, and make a copy of the missing set
@ -2696,34 +2694,23 @@ int Application::sendNackPackets() {
// construct nack packet(s) for this node // construct nack packet(s) for this node
int numSequenceNumbersAvailable = missingSequenceNumbers.size(); int numSequenceNumbersAvailable = missingSequenceNumbers.size();
QSet<OCTREE_PACKET_SEQUENCE>::const_iterator missingSequenceNumbersIterator = missingSequenceNumbers.constBegin();
while (numSequenceNumbersAvailable > 0) {
// reset the position we are writing at and the size we have used auto it = missingSequenceNumbers.constBegin();
nackPacket->seek(0); while (it != missingSequenceNumbers.constEnd()) {
nackPacket->setSizeUsed(0); OCTREE_PACKET_SEQUENCE missingNumber = *it;
nackPacketList->write(&missingNumber, sizeof(OCTREE_PACKET_SEQUENCE));
// calculate and pack the number of sequence numbers ++it;
int numSequenceNumbersRoomFor = (nackPacket->size() - sizeof(uint16_t)) / sizeof(OCTREE_PACKET_SEQUENCE);
uint16_t numSequenceNumbers = min(numSequenceNumbersAvailable, numSequenceNumbersRoomFor);
nackPacket->write(&numSequenceNumbers, sizeof(numSequenceNumbers));
// pack sequence numbers
for (int i = 0; i < numSequenceNumbers; i++) {
OCTREE_PACKET_SEQUENCE missingNumber = *missingSequenceNumbersIterator;
nackPacket->write(&missingNumber, sizeof(OCTREE_PACKET_SEQUENCE));
missingSequenceNumbersIterator++;
}
numSequenceNumbersAvailable -= numSequenceNumbers;
// send the packet
nodeList->sendUnreliablePacket(packet, node);
packetsSent++;
} }
} }
}); });
int packetsSent = nackPacketList.getNumPackets();
if (packetsSent) {
// send the packet list
nodeList->sendPacketList(nackPacketList, node);
}
return packetsSent; return packetsSent;
} }
@ -2817,7 +2804,7 @@ void Application::queryOctree(NodeType_t serverType, PacketType::Value packetTyp
qCDebug(interfaceapp, "perServerPPS: %d perUnknownServer: %d", perServerPPS, perUnknownServer); qCDebug(interfaceapp, "perServerPPS: %d perUnknownServer: %d", perServerPPS, perUnknownServer);
} }
auto queryPacket { NLPacket::create(packetType); } auto queryPacket = NLPacket::create(packetType);
nodeList->eachNode([&](const SharedNodePointer& node){ nodeList->eachNode([&](const SharedNodePointer& node){
// only send to the NodeTypes that are serverType // only send to the NodeTypes that are serverType
@ -2896,7 +2883,7 @@ void Application::queryOctree(NodeType_t serverType, PacketType::Value packetTyp
// encode the query data // encode the query data
int packetSize = _octreeQuery.getBroadcastData(queryPacket.payload()); int packetSize = _octreeQuery.getBroadcastData(queryPacket.payload());
queryPacket.setSizeUsed(packetSize); queryPacket->setSizeUsed(packetSize);
// make sure we still have an active socket // make sure we still have an active socket
nodeList->sendUnreliablePacket(queryPacket, node); nodeList->sendUnreliablePacket(queryPacket, node);

View file

@ -853,11 +853,7 @@ void AudioClient::handleAudioInput() {
} }
} }
// seek to the beginning of the audio packet payload // reset the audio packet so we can start writing
_audioPacket->seek(0);
// reset the size used in this packet so it will be correct once we are done writing
_audioPacket->setSizeUsed(0);
// write sequence number // write sequence number
_audioPacket->write(&_outgoingAvatarAudioSequenceNumber, sizeof(quint16)); _audioPacket->write(&_outgoingAvatarAudioSequenceNumber, sizeof(quint16));
@ -913,14 +909,14 @@ void AudioClient::sendMuteEnvironmentPacket() {
int dataSize = sizeof(glm::vec3) + sizeof(float); int dataSize = sizeof(glm::vec3) + sizeof(float);
NodeList::Packet mutePacket = nodeList->makePacket(PacketType::MuteEnvironment, dataSize); auto mutePacket = NLPacket::create(PacketType::MuteEnvironment, dataSize);
const float MUTE_RADIUS = 50; const float MUTE_RADIUS = 50;
glm::vec3 currentSourcePosition = _positionGetter(); glm::vec3 currentSourcePosition = _positionGetter();
memcpy(mutePacket.payload().data(), &currentSourcePosition, sizeof(glm::vec3)); mutePacket->write(&currentSourcePosition, sizeof(currentSourcePosition));
memcpy(mutePacket.payload() + sizeof(glm::vec3), &MUTE_RADIUS, sizeof(float)); mutePacket->write(&MUTE_RADIUS, sizeof(MUTE_RADIUS));
// grab our audio mixer from the NodeList, if it exists // grab our audio mixer from the NodeList, if it exists
SharedNodePointer audioMixer = nodeList->soloNodeOfType(NodeType::AudioMixer); SharedNodePointer audioMixer = nodeList->soloNodeOfType(NodeType::AudioMixer);

View file

@ -153,7 +153,7 @@ void AudioInjector::injectToMixer() {
// make sure we actually have samples downloaded to inject // make sure we actually have samples downloaded to inject
if (_audioData.size()) { if (_audioData.size()) {
auto audioPacket { NLPacket::create(PacketType::InjectAudio); } auto audioPacket = NLPacket::create(PacketType::InjectAudio);
// setup the packet for injected audio // setup the packet for injected audio
QDataStream audioPacketStream(&audioPacket); QDataStream audioPacketStream(&audioPacket);
@ -177,7 +177,6 @@ void AudioInjector::injectToMixer() {
sizeof(_options.position)); sizeof(_options.position));
// pack our orientation for injected audio // pack our orientation for injected audio
int orientationOptionOffset = audioPacket.pos();
audioPacketStream.writeRawData(reinterpret_cast<const char*>(&_options.orientation), audioPacketStream.writeRawData(reinterpret_cast<const char*>(&_options.orientation),
sizeof(_options.orientation)); sizeof(_options.orientation));
@ -216,23 +215,24 @@ void AudioInjector::injectToMixer() {
} }
_loudness /= (float)(bytesToCopy / sizeof(int16_t)); _loudness /= (float)(bytesToCopy / sizeof(int16_t));
audioPacket.seek(positionOptionOffset); audioPacket->seek(positionOptionOffset);
audioPacket.write(&_options.position, sizeof(_options.position)); audioPacket->write(&_options.position, sizeof(_options.position));
audioPacket.seek(orientationOptionOffset);
audioPacket.write(&_options.orientation, sizeof(_options.orientation)); audioPacket.write(&_options.orientation, sizeof(_options.orientation));
volume = MAX_INJECTOR_VOLUME * _options.volume; volume = MAX_INJECTOR_VOLUME * _options.volume;
audioPacket.seek(volumeOptionOffset); audioPacket->seek(volumeOptionOffset);
audioPacket.write(&volume, sizeof(volume)); audioPacket->write(&volume, sizeof(volume));
audioPacket.seek(audioDataOffset); audioPacket->seek(audioDataOffset);
// pack the sequence number // pack the sequence number
audioPacket.write(&outgoingInjectedAudioSequenceNumber, sizeof(quint16)); audioPacket->write(&outgoingInjectedAudioSequenceNumber, sizeof(quint16));
// copy the next NETWORK_BUFFER_LENGTH_BYTES_PER_CHANNEL bytes to the packet // copy the next NETWORK_BUFFER_LENGTH_BYTES_PER_CHANNEL bytes to the packet
audioPacket.write(_audioData.data() + _currentSendPosition, bytesToCopy); audioPacket->write(_audioData.data() + _currentSendPosition, bytesToCopy);
// set the correct size used for this packet
audioPacket->setSizeUsed(audioPacket->pos());
// grab our audio mixer from the NodeList, if it exists // grab our audio mixer from the NodeList, if it exists
SharedNodePointer audioMixer = nodeList->soloNodeOfType(NodeType::AudioMixer); SharedNodePointer audioMixer = nodeList->soloNodeOfType(NodeType::AudioMixer);

View file

@ -521,11 +521,11 @@ SharedNodePointer LimitedNodeList::addOrUpdateNode(const QUuid& uuid, NodeType_t
// return n; // return n;
// } // }
NLPacket&& LimitedNodeList::constructPingPacket(PingType_t pingType) { std::unique_ptr<NLPacket> LimitedNodeList::constructPingPacket(PingType_t pingType) {
int packetSize = sizeof(PingType_t) + sizeof(quint64); int packetSize = sizeof(PingType_t) + sizeof(quint64);
auto pingPacket { NLPacket::create(PacketType::Ping, packetSize); } auto pingPacket = NLPacket::create(PacketType::Ping, packetSize);
QDataStream packetStream(&pingPacket.payload(), QIODevice::Append); QDataStream packetStream(&pingPacket);
packetStream << pingType; packetStream << pingType;
packetStream << usecTimestampNow(); packetStream << usecTimestampNow();
@ -533,7 +533,7 @@ NLPacket&& LimitedNodeList::constructPingPacket(PingType_t pingType) {
return pingPacket; return pingPacket;
} }
NLPacket&& LimitedNodeList::constructPingReplyPacket(const QByteArray& pingPacket) { std::unique_ptr<NLPacket> LimitedNodeList::constructPingReplyPacket(const QByteArray& pingPacket) {
QDataStream pingPacketStream(pingPacket); QDataStream pingPacketStream(pingPacket);
pingPacketStream.skipRawData(numBytesForPacketHeader(pingPacket)); pingPacketStream.skipRawData(numBytesForPacketHeader(pingPacket));
@ -545,37 +545,37 @@ NLPacket&& LimitedNodeList::constructPingReplyPacket(const QByteArray& pingPacke
int packetSize = sizeof(PingType_t) + sizeof(quint64) + sizeof(quint64); int packetSize = sizeof(PingType_t) + sizeof(quint64) + sizeof(quint64);
auto replyPacket { NLPacket::create(PacketType::Ping, packetSize); } auto replyPacket = NLPacket::create(PacketType::Ping, packetSize);
QDataStream packetStream(&replyPacket, QIODevice::Append); QDataStream packetStream(&replyPacket);
packetStream << typeFromOriginalPing << timeFromOriginalPing << usecTimestampNow(); packetStream << typeFromOriginalPing << timeFromOriginalPing << usecTimestampNow();
return replyPacket; return replyPacket;
} }
NLPacket&& constructICEPingPacket(PingType_t pingType, const QUuid& iceID) { std::unique_ptr<NLPacket> constructICEPingPacket(PingType_t pingType, const QUuid& iceID) {
int packetSize = NUM_BYTES_RFC4122_UUID + sizeof(PingType_t); int packetSize = NUM_BYTES_RFC4122_UUID + sizeof(PingType_t);
auto icePingPacket { NLPacket::create(PacketType::ICEPing, packetSize); } auto icePingPacket = NLPacket::create(PacketType::ICEPing, packetSize);
icePingPacket.payload().replace(0, NUM_BYTES_RFC4122_UUID, iceID.toRfc4122().data()); icePingPacket->write(iceID.toRfc4122());
memcpy(icePingPacket.payload() + NUM_BYTES_RFC4122_UUID, &pingType, sizeof(PingType_t)); icePingPacket->write(&pingType, sizeof(pingType));
return icePingPacket; return icePingPacket;
} }
NLPacket&& constructICEPingReplyPacket(const QByteArray& pingPacket, const QUuid& iceID) { std::unique_ptr<NLPacket> constructICEPingReplyPacket(const QByteArray& pingPacket, const QUuid& iceID) {
// pull out the ping type so we can reply back with that // pull out the ping type so we can reply back with that
PingType_t pingType; PingType_t pingType;
memcpy(&pingType, pingPacket.data() + NUM_BYTES_RFC4122_UUID, sizeof(PingType_t)); memcpy(&pingType, pingPacket.data() + NUM_BYTES_RFC4122_UUID, sizeof(PingType_t));
int packetSize = NUM_BYTES_RFC4122_UUID + sizeof(PingType_t); int packetSize = NUM_BYTES_RFC4122_UUID + sizeof(PingType_t);
auto icePingReplyPacket { NLPacket::create(PacketType::ICEPingReply, packetSize); } auto icePingReplyPacket = NLPacket::create(PacketType::ICEPingReply, packetSize);
// pack the ICE ID and then the ping type // pack the ICE ID and then the ping type
memcpy(icePingReplyPacket.payload(), iceID.toRfc4122().data(), NUM_BYTES_RFC4122_UUID); icePingReplyPacket->write(iceID.toRfc4122());
memcpy(icePingReplyPacket.payload() + NUM_BYTES_RFC4122_UUID, &pingType, sizeof(PingType_t)); icePingReplyPacket->write(&pingType, sizeof(pingType));
return icePingReplyPacket; return icePingReplyPacket;
} }

View file

@ -38,6 +38,7 @@
#include "Node.h" #include "Node.h"
#include "NLPacket.h" #include "NLPacket.h"
#include "PacketHeaders.h" #include "PacketHeaders.h"
#include "PacketList.h"
#include "UUIDHasher.h" #include "UUIDHasher.h"
const int MAX_PACKET_SIZE = 1450; const int MAX_PACKET_SIZE = 1450;
@ -67,6 +68,7 @@ Q_DECLARE_METATYPE(SharedNodePointer)
using namespace tbb; using namespace tbb;
typedef std::pair<QUuid, SharedNodePointer> UUIDNodePair; typedef std::pair<QUuid, SharedNodePointer> UUIDNodePair;
typedef concurrent_unordered_map<QUuid, SharedNodePointer, UUIDHasher> NodeHash; typedef concurrent_unordered_map<QUuid, SharedNodePointer, UUIDHasher> NodeHash;
using NLPacketList = PacketList<NLPacket>;
typedef quint8 PingType_t; typedef quint8 PingType_t;
namespace PingType { namespace PingType {
@ -143,12 +145,12 @@ public:
// const HifiSockAddr& overridenSockAddr = HifiSockAddr()); // const HifiSockAddr& overridenSockAddr = HifiSockAddr());
// //
qint64 sendUnreliablePacket(NLPacket& packet, const SharedNodePointer& destinationNode) {}; qint64 sendUnreliablePacket(std::unique_ptr<NLPacket>& packet, const SharedNodePointer& destinationNode) {};
qint64 sendUnreliablePacket(NLPacket& packet, const HifiSockAddr& sockAddr) {}; qint64 sendUnreliablePacket(std::unique_ptr<NLPacket>& packet, const HifiSockAddr& sockAddr) {};
qint64 sendPacket(NLPacket&& packet, const SharedNodePointer& destinationNode) {}; qint64 sendPacket(std::unique_ptr<NLPacket> packet, const SharedNodePointer& destinationNode) {};
qint64 sendPacket(NLPacket&& packet, const HifiSockAddr& sockAddr) {}; qint64 sendPacket(std::unique_ptr<NLPacket> packet, const HifiSockAddr& sockAddr) {};
qint64 sendPacketList(PacketList& packetList, const SharedNodePointer& destinationNode) {}; qint64 sendPacketList(NLPacketList& packetList, const SharedNodePointer& destinationNode) {};
qint64 sendPacketList(PacketList& packetList, const HifiSockAddr& sockAddr) {}; qint64 sendPacketList(NLPacketList& packetList, const HifiSockAddr& sockAddr) {};
void (*linkedDataCreateCallback)(Node *); void (*linkedDataCreateCallback)(Node *);
@ -173,17 +175,17 @@ public:
int updateNodeWithDataFromPacket(const SharedNodePointer& matchingNode, const QByteArray& packet); int updateNodeWithDataFromPacket(const SharedNodePointer& matchingNode, const QByteArray& packet);
int findNodeAndUpdateWithDataFromPacket(const QByteArray& packet); int findNodeAndUpdateWithDataFromPacket(const QByteArray& packet);
unsigned broadcastToNodes(PacketList& packetList, const NodeSet& destinationNodeTypes) {}; unsigned broadcastToNodes(std::unique_ptr<NLPacket> packet, const NodeSet& destinationNodeTypes) {};
SharedNodePointer soloNodeOfType(char nodeType); SharedNodePointer soloNodeOfType(char nodeType);
void getPacketStats(float &packetsPerSecond, float &bytesPerSecond); void getPacketStats(float &packetsPerSecond, float &bytesPerSecond);
void resetPacketStats(); void resetPacketStats();
NLPacket&& constructPingPacket(PingType_t pingType = PingType::Agnostic); std::unique_ptr<NLPacket> constructPingPacket(PingType_t pingType = PingType::Agnostic);
NLPacket&& constructPingReplyPacket(const QByteArray& pingPacket); std::unique_ptr<NLPacket> constructPingReplyPacket(const QByteArray& pingPacket);
NLPacket&& constructICEPingPacket(PingType_t pingType, const QUuid& iceID); std::unique_ptr<NLPacket> constructICEPingPacket(PingType_t pingType, const QUuid& iceID);
NLPacket&& constructICEPingReplyPacket(const QByteArray& pingPacket, const QUuid& iceID); std::unique_ptr<NLPacket> constructICEPingReplyPacket(const QByteArray& pingPacket, const QUuid& iceID);
virtual bool processSTUNResponse(const QByteArray& packet); virtual bool processSTUNResponse(const QByteArray& packet);

View file

@ -24,7 +24,7 @@ NetworkPacket::NetworkPacket(const NetworkPacket& other) :
} }
NetworkPacket::NetworkPacket(const SharedNodePointer& node, const NLPacket& packet) { NetworkPacket::NetworkPacket(const SharedNodePointer& node, const NLPacket& packet) {
if (packet.getSizeWitHeader() && packet.getSizeWithHeader() <= MAX_nlPacket_SIZE) { if (packet.getSizeWithHeader() && packet.getSizeWithHeader() <= MAX_PACKET_SIZE) {
_node = node; _node = node;
_nlPacket = packet; _nlPacket = packet;
} else { } else {

View file

@ -19,7 +19,7 @@
/// Storage of not-yet processed inbound, or not yet sent outbound generic UDP network packet /// Storage of not-yet processed inbound, or not yet sent outbound generic UDP network packet
class NetworkPacket { class NetworkPacket {
public: public:
NetworkPacket() { } NetworkPacket();
NetworkPacket(const NetworkPacket& packet); // copy constructor NetworkPacket(const NetworkPacket& packet); // copy constructor
NetworkPacket& operator= (const NetworkPacket& other); // copy assignment NetworkPacket& operator= (const NetworkPacket& other); // copy assignment

View file

@ -422,10 +422,10 @@ void NodeList::sendDSPathQuery(const QString& newPath) {
if (numPathBytes + sizeof(numPathBytes) < pathQueryPacket.size() ) { if (numPathBytes + sizeof(numPathBytes) < pathQueryPacket.size() ) {
// append the size of the path to the query packet // append the size of the path to the query packet
pathQueryPacket.write(&numPathBytes, sizeof(numPathBytes)); pathQueryPacket->write(&numPathBytes, sizeof(numPathBytes));
// append the path itself to the query packet // append the path itself to the query packet
pathQueryPacket.write(pathQueryUTF8); pathQueryPacket->write(pathQueryUTF8);
qCDebug(networking) << "Sending a path query packet for path" << newPath << "to domain-server at" qCDebug(networking) << "Sending a path query packet for path" << newPath << "to domain-server at"
<< _domainHandler.getSockAddr(); << _domainHandler.getSockAddr();

View file

@ -38,6 +38,12 @@ std::unique_ptr<Packet> Packet::create(PacketType::Value type, qint64 size) {
return std::unique_ptr<Packet>(new Packet(type, size)); return std::unique_ptr<Packet>(new Packet(type, size));
} }
std::unique_ptr<Packet> Packet::createCopy(const std::unique_ptr<Packet>& other) {
Q_ASSERT(other);
return std::unique_ptr<Packet>(new Packet(*other));
}
qint64 Packet::totalHeadersSize() const { qint64 Packet::totalHeadersSize() const {
return localHeaderSize(); return localHeaderSize();
} }
@ -56,11 +62,11 @@ Packet::Packet(PacketType::Value type, qint64 size) :
Q_ASSERT(size <= maxPayloadSize(type)); Q_ASSERT(size <= maxPayloadSize(type));
// copy packet type and version in header // copy packet type and version in header
setPacketTypeAndVersion(type); writePacketTypeAndVersion(type);
// Set control bit and sequence number to 0 if necessary // Set control bit and sequence number to 0 if necessary
if (SEQUENCE_NUMBERED_PACKETS.contains(type)) { if (SEQUENCE_NUMBERED_PACKETS.contains(type)) {
setSequenceNumber(0); writeSequenceNumber(0);
} }
} }
@ -74,7 +80,6 @@ Packet& Packet::operator=(const Packet& other) {
memcpy(_packet.get(), other._packet.get(), _packetSize); memcpy(_packet.get(), other._packet.get(), _packetSize);
_payloadStart = _packet.get() + (other._payloadStart - other._packet.get()); _payloadStart = _packet.get() + (other._payloadStart - other._packet.get());
_position = other._position;
_capacity = other._capacity; _capacity = other._capacity;
_sizeUsed = other._sizeUsed; _sizeUsed = other._sizeUsed;
@ -91,7 +96,6 @@ Packet& Packet::operator=(Packet&& other) {
_packet = std::move(other._packet); _packet = std::move(other._packet);
_payloadStart = other._payloadStart; _payloadStart = other._payloadStart;
_position = other._position;
_capacity = other._capacity; _capacity = other._capacity;
_sizeUsed = other._sizeUsed; _sizeUsed = other._sizeUsed;
@ -99,15 +103,24 @@ Packet& Packet::operator=(Packet&& other) {
return *this; return *this;
} }
PacketType::Value Packet::getPacketType() const { void Packet::setPacketType(PacketType::Value type) {
auto currentHeaderSize = totalHeadersSize();
_type = type;
writePacketTypeAndVersion(_type);
// Setting new packet type with a different header size not currently supported
Q_ASSERT(currentHeaderSize == totalHeadersSize());
}
PacketType::Value Packet::readPacketType() const {
return (PacketType::Value)arithmeticCodingValueFromBuffer(_packet.get()); return (PacketType::Value)arithmeticCodingValueFromBuffer(_packet.get());
} }
PacketVersion Packet::getPacketTypeVersion() const { PacketVersion Packet::readPacketTypeVersion() const {
return *reinterpret_cast<PacketVersion*>(_packet.get() + numBytesForArithmeticCodedPacketType(_type)); return *reinterpret_cast<PacketVersion*>(_packet.get() + numBytesForArithmeticCodedPacketType(_type));
} }
Packet::SequenceNumber Packet::getSequenceNumber() const { Packet::SequenceNumber Packet::readSequenceNumber() const {
if (SEQUENCE_NUMBERED_PACKETS.contains(_type)) { if (SEQUENCE_NUMBERED_PACKETS.contains(_type)) {
SequenceNumber seqNum = *reinterpret_cast<SequenceNumber*>(_packet.get() + SequenceNumber seqNum = *reinterpret_cast<SequenceNumber*>(_packet.get() +
numBytesForArithmeticCodedPacketType(_type) + numBytesForArithmeticCodedPacketType(_type) +
@ -117,7 +130,7 @@ Packet::SequenceNumber Packet::getSequenceNumber() const {
return -1; return -1;
} }
bool Packet::isControlPacket() const { bool Packet::readIsControlPacket() const {
if (SEQUENCE_NUMBERED_PACKETS.contains(_type)) { if (SEQUENCE_NUMBERED_PACKETS.contains(_type)) {
SequenceNumber seqNum = *reinterpret_cast<SequenceNumber*>(_packet.get() + SequenceNumber seqNum = *reinterpret_cast<SequenceNumber*>(_packet.get() +
numBytesForArithmeticCodedPacketType(_type) + numBytesForArithmeticCodedPacketType(_type) +
@ -127,16 +140,16 @@ bool Packet::isControlPacket() const {
return false; return false;
} }
void Packet::setPacketTypeAndVersion(PacketType::Value type) { void Packet::writePacketTypeAndVersion(PacketType::Value type) {
// Pack the packet type // Pack the packet type
auto offset = packArithmeticallyCodedValue(type, _packet.get()); auto offset = packArithmeticallyCodedValue(type, _packet.get());
// Pack the packet version // Pack the packet version
auto version { versionForPacketType(type) }; auto version = versionForPacketType(type);
memcpy(_packet.get() + offset, &version, sizeof(version)); memcpy(_packet.get() + offset, &version, sizeof(version));
} }
void Packet::setSequenceNumber(SequenceNumber seqNum) { void Packet::writeSequenceNumber(SequenceNumber seqNum) {
// Here we are overriding the control bit to 0. // Here we are overriding the control bit to 0.
// But that is not an issue since we should only ever set the seqNum // But that is not an issue since we should only ever set the seqNum
// for data packets going out // for data packets going out
@ -144,14 +157,6 @@ void Packet::setSequenceNumber(SequenceNumber seqNum) {
&seqNum, sizeof(seqNum)); &seqNum, sizeof(seqNum));
} }
bool Packet::seek(qint64 pos) {
bool valid = (pos >= 0) && (pos < size());
if (valid) {
_position = pos;
}
return valid;
}
static const qint64 PACKET_WRITE_ERROR = -1; static const qint64 PACKET_WRITE_ERROR = -1;
qint64 Packet::writeData(const char* data, qint64 maxSize) { qint64 Packet::writeData(const char* data, qint64 maxSize) {
// make sure we have the space required to write this block // make sure we have the space required to write this block

View file

@ -23,6 +23,8 @@ public:
using SequenceNumber = uint16_t; using SequenceNumber = uint16_t;
static std::unique_ptr<Packet> create(PacketType::Value type, int64_t size = -1); static std::unique_ptr<Packet> create(PacketType::Value type, int64_t size = -1);
// Provided for convenience, try to limit use
static std::unique_ptr<Packet> createCopy(const std::unique_ptr<Packet>& other);
static qint64 localHeaderSize(PacketType::Value type); static qint64 localHeaderSize(PacketType::Value type);
static qint64 maxPayloadSize(PacketType::Value type); static qint64 maxPayloadSize(PacketType::Value type);
@ -30,56 +32,59 @@ public:
virtual qint64 totalHeadersSize() const; // Cumulated size of all the headers virtual qint64 totalHeadersSize() const; // Cumulated size of all the headers
virtual qint64 localHeaderSize() const; // Current level's header size virtual qint64 localHeaderSize() const; // Current level's header size
// Payload direct access, use responsibly! // Payload direct access to the payload, use responsibly!
char* getPayload() { return _payloadStart; } char* getPayload() { return _payloadStart; }
const char* getPayload() const { return _payloadStart; } const char* getPayload() const { return _payloadStart; }
// Return direct access to the entire packet, use responsibly!
char* getData() { return _packet.get(); }
const char* getData() const { return _packet.get(); }
PacketType::Value getPacketType() const { return _type; }
void setPacketType(PacketType::Value type);
qint64 getSizeWithHeader() const { return localHeaderSize() + getSizeUsed(); } qint64 getSizeWithHeader() const { return localHeaderSize() + getSizeUsed(); }
qint64 getSizeUsed() const { return _sizeUsed; } qint64 getSizeUsed() const { return _sizeUsed; }
void setSizeUsed(qint64 sizeUsed) { _sizeUsed = sizeUsed; } void setSizeUsed(qint64 sizeUsed) { _sizeUsed = sizeUsed; }
// Header readers // Header readers
PacketType::Value getPacketType() const; PacketType::Value readPacketType() const;
PacketVersion getPacketTypeVersion() const; PacketVersion readPacketTypeVersion() const;
SequenceNumber getSequenceNumber() const; SequenceNumber readSequenceNumber() const;
bool isControlPacket() const; bool readIsControlPacket() const;
// QIODevice virtual functions // QIODevice virtual functions
// WARNING: Those methods all refer to the payload ONLY and NOT the entire packet // WARNING: Those methods all refer to the payload ONLY and NOT the entire packet
virtual bool atEnd() const { return _position == _capacity; }
virtual qint64 bytesAvailable() const { return size() - pos(); }
virtual bool canReadLine() const { return false; }
virtual bool isSequential() const { return false; } virtual bool isSequential() const { return false; }
virtual qint64 pos() const { return _position; } virtual bool reset() { setSizeUsed(0); return QIODevice::reset(); }
virtual bool reset() { return seek(0); }
virtual bool seek(qint64 pos);
virtual qint64 size() const { return _capacity; } virtual qint64 size() const { return _capacity; }
protected: protected:
Packet(PacketType::Value type, int64_t size); Packet(PacketType::Value type, int64_t size);
Packet(const Packet& other);
Packet& operator=(const Packet& other);
Packet(Packet&& other);
Packet& operator=(Packet&& other);
// QIODevice virtual functions // QIODevice virtual functions
virtual qint64 writeData(const char* data, qint64 maxSize); virtual qint64 writeData(const char* data, qint64 maxSize);
virtual qint64 readData(char* data, qint64 maxSize); virtual qint64 readData(char* data, qint64 maxSize);
// Header writers // Header writers
void setPacketTypeAndVersion(PacketType::Value type); void writePacketTypeAndVersion(PacketType::Value type);
void setSequenceNumber(SequenceNumber seqNum); void writeSequenceNumber(SequenceNumber seqNum);
PacketType::Value _type; PacketType::Value _type; // Packet type
qint64 _packetSize = 0; // Total size of the allocated memory qint64 _packetSize = 0; // Total size of the allocated memory
std::unique_ptr<char> _packet; // Allocated memory std::unique_ptr<char> _packet; // Allocated memory
char* _payloadStart = nullptr; // Start of the payload char* _payloadStart = nullptr; // Start of the payload
qint64 _position = 0; // Current position in the payload
qint64 _capacity = 0; // Total capacity of the payload qint64 _capacity = 0; // Total capacity of the payload
qint64 _sizeUsed = 0; // How much of the payload is actually used qint64 _sizeUsed = 0; // How much of the payload is actually used
private:
Packet(const Packet& other);
Packet& operator=(const Packet& other);
Packet(Packet&& other);
Packet& operator=(Packet&& other);
}; };
#endif // hifi_Packet_h #endif // hifi_Packet_h

View file

@ -82,17 +82,23 @@ PacketVersion versionForPacketType(PacketType::Value packetType) {
case DomainList: case DomainList:
case DomainListRequest: case DomainListRequest:
return 5; return 5;
case DomainConnectRequest:
return 1;
case CreateAssignment: case CreateAssignment:
case RequestAssignment: case RequestAssignment:
return 2; return 2;
case OctreeStats: case OctreeStats:
return 1; return 1;
case OctreeDataNack:
return 1;
case StopNode: case StopNode:
return 1; return 1;
case EntityAdd: case EntityAdd:
case EntityEdit: case EntityEdit:
case EntityData: case EntityData:
return VERSION_ENTITIES_HAVE_SIMULATION_OWNER_AND_ACTIONS_OVER_WIRE; return VERSION_ENTITIES_HAVE_SIMULATION_OWNER_AND_ACTIONS_OVER_WIRE;
case EntityEditNack:
return 1;
case EntityErase: case EntityErase:
return 2; return 2;
case AudioStreamStats: case AudioStreamStats:

View file

@ -52,8 +52,8 @@ namespace PacketType {
DomainServerPathQuery, DomainServerPathQuery,
DomainServerPathResponse, DomainServerPathResponse,
DomainServerAddedNode, DomainServerAddedNode,
IceServerPeerInformation, ICEServerPeerInformation,
IceServerQuery, // 25 ICEServerQuery, // 25
OctreeStats, OctreeStats,
Jurisdiction, Jurisdiction,
JurisdictionRequest, JurisdictionRequest,

View file

@ -12,18 +12,17 @@
#include "PacketList.h" #include "PacketList.h"
PacketList::PacketList(PacketType::Value packetType) : PacketList::PacketList(PacketType::Value packetType) :
_packetType(packetType), _packetType(packetType)
_isOrdered(false)
{ {
} }
void PacketList::createPacketWithExtendedHeader() { void PacketList::createPacketWithExtendedHeader() {
// use the static create method to create a new packet // use the static create method to create a new packet
_currentPacket = T::create(_packetType); auto packet = T::create(_packetType);
// add the extended header to the front of the packet // add the extended header to the front of the packet
if (_currentPacket.write(_extendedHeader) == -1) { if (packet->write(_extendedHeader) == -1) {
qDebug() << "Could not write extendedHeader in PacketList::createPacketWithExtendedHeader" qDebug() << "Could not write extendedHeader in PacketList::createPacketWithExtendedHeader"
<< "- make sure that _extendedHeader is not larger than the payload capacity."; << "- make sure that _extendedHeader is not larger than the payload capacity.";
} }
@ -32,7 +31,7 @@ void PacketList::createPacketWithExtendedHeader() {
qint64 writeData(const char* data, qint64 maxSize) { qint64 writeData(const char* data, qint64 maxSize) {
if (!_currentPacket) { if (!_currentPacket) {
// we don't have a current packet, time to set one up // we don't have a current packet, time to set one up
createPacketWithExtendedHeader(); _currentPacket = createPacketWithExtendedHeader();
} }
// check if this block of data can fit into the currentPacket // check if this block of data can fit into the currentPacket
@ -46,7 +45,7 @@ qint64 writeData(const char* data, qint64 maxSize) {
// it does not fit - this may need to be in the next packet // it does not fit - this may need to be in the next packet
if (!_isOrdered) { if (!_isOrdered) {
auto newPacket = T::create(_packetType); auto newPacket = createPacketWithExtendedHeader();
if (_segmentStartIndex >= 0) { if (_segmentStartIndex >= 0) {
// We in the process of writing a segment for an unordered PacketList. // We in the process of writing a segment for an unordered PacketList.
@ -64,7 +63,7 @@ qint64 writeData(const char* data, qint64 maxSize) {
} }
// copy from currentPacket where the segment started to the beginning of the newPacket // copy from currentPacket where the segment started to the beginning of the newPacket
newPacket.write(currentPacket.constData() + _segmentStartIndex, numBytesToEnd); newPacket.write(currentPacket->getPayload() + _segmentStartIndex, numBytesToEnd);
// the current segment now starts at the beginning of the new packet // the current segment now starts at the beginning of the new packet
_segmentStartIndex = 0; _segmentStartIndex = 0;
@ -89,7 +88,7 @@ qint64 writeData(const char* data, qint64 maxSize) {
// into a new packet // into a new packet
int numBytesToEnd = _currentPacket.size() - _currentPacket.sizeUsed(); int numBytesToEnd = _currentPacket.size() - _currentPacket.sizeUsed();
_currentPacket.write(data, numBytesToEnd); _currentPacket->write(data, numBytesToEnd);
// move the current packet to our list of packets // move the current packet to our list of packets
_packets.insert(std::move(_currentPacket)); _packets.insert(std::move(_currentPacket));

View file

@ -12,7 +12,9 @@
#ifndef hifi_PacketList_h #ifndef hifi_PacketList_h
#define hifi_PacketList_h #define hifi_PacketList_h
#pragma once #include <QIODevice>
#include "PacketHeaders.h"
template <class T> class PacketList : public QIODevice { template <class T> class PacketList : public QIODevice {
public: public:
@ -20,7 +22,7 @@ public:
virtual bool isSequential() const { return true; } virtual bool isSequential() const { return true; }
void startSegment() { _segmentStartIndex = currentPacket->payload().pos(); } void startSegment() { _segmentStartIndex = _currentPacket->payload().pos(); }
void endSegment() { _segmentStartIndex = -1; } void endSegment() { _segmentStartIndex = -1; }
int getNumPackets() const { return _packets.size() + (_currentPacket ? 1 : 0); } int getNumPackets() const { return _packets.size() + (_currentPacket ? 1 : 0); }
@ -30,19 +32,19 @@ public:
void setExtendedHeader(const QByteArray& extendedHeader) { _extendedHeader = extendedHeader; } void setExtendedHeader(const QByteArray& extendedHeader) { _extendedHeader = extendedHeader; }
protected: protected:
qint64 writeData(const char* data, qint64 maxSize); qint64 writeData(const char* data, qint64 maxSize);
qint64 readData(const char* data, qint64 maxSize) { return 0 }; qint64 readData(const char* data, qint64 maxSize) { return 0; };
private: private:
void createPacketWithExtendedHeader(); std::unique_ptr<NLPacket> createPacketWithExtendedHeader();
PacketType::Value _packetType; PacketType::Value _packetType;
bool isOrdered; bool isOrdered = false;
std::unique_ptr<T> _currentPacket; std::unique_ptr<T> _currentPacket;
std::list<std::unique_ptr<T>> _packets; std::list<std::unique_ptr<T>> _packets;
int _segmentStartIndex = -1; int _segmentStartIndex = -1;
QByteArray _extendedHeader = extendedHeader; QByteArray _extendedHeader;
} };
#endif // hifi_PacketList_h #endif // hifi_PacketList_h

View file

@ -30,7 +30,7 @@ void SentPacketHistory::packetSent(uint16_t sequenceNumber, const NLPacket& pack
<< "Expected:" << expectedSequenceNumber << "Actual:" << sequenceNumber; << "Expected:" << expectedSequenceNumber << "Actual:" << sequenceNumber;
} }
_newestSequenceNumber = sequenceNumber; _newestSequenceNumber = sequenceNumber;
_sentPackets.insert(new NLPacket(packet)); _sentPackets.insert(NLPacket::createCopy(packet));
} }
const QByteArray* SentPacketHistory::getPacket(uint16_t sequenceNumber) const { const QByteArray* SentPacketHistory::getPacket(uint16_t sequenceNumber) const {

View file

@ -23,10 +23,10 @@ public:
SentPacketHistory(int size = MAX_REASONABLE_SEQUENCE_GAP); SentPacketHistory(int size = MAX_REASONABLE_SEQUENCE_GAP);
void packetSent(uint16_t sequenceNumber, const NLPacket& packet); void packetSent(uint16_t sequenceNumber, const NLPacket& packet);
const NLPacket* getPacket(uint16_t sequenceNumber) const; const std::unique_ptr<NLPacket>& getPacket(uint16_t sequenceNumber) const;
private: private:
RingBufferHistory<NLPacket*> _sentPackets; // circular buffer RingBufferHistory<std::unique_ptr<NLPacket>> _sentPackets; // circular buffer
uint16_t _newestSequenceNumber; uint16_t _newestSequenceNumber;
}; };

View file

@ -30,7 +30,7 @@ OctreeEditPacketSender::OctreeEditPacketSender() :
_maxPacketSize(MAX_PACKET_SIZE), _maxPacketSize(MAX_PACKET_SIZE),
_destinationWalletUUID() _destinationWalletUUID()
{ {
} }
OctreeEditPacketSender::~OctreeEditPacketSender() { OctreeEditPacketSender::~OctreeEditPacketSender() {
@ -52,10 +52,10 @@ OctreeEditPacketSender::~OctreeEditPacketSender() {
bool OctreeEditPacketSender::serversExist() const { bool OctreeEditPacketSender::serversExist() const {
bool hasServers = false; bool hasServers = false;
bool atLeastOneJurisdictionMissing = false; // assume the best bool atLeastOneJurisdictionMissing = false; // assume the best
DependencyManager::get<NodeList>()->eachNodeBreakable([&](const SharedNodePointer& node){ DependencyManager::get<NodeList>()->eachNodeBreakable([&](const SharedNodePointer& node){
if (node->getType() == getMyNodeType() && node->getActiveSocket()) { if (node->getType() == getMyNodeType() && node->getActiveSocket()) {
QUuid nodeUUID = node->getUUID(); QUuid nodeUUID = node->getUUID();
// If we've got Jurisdictions set, then check to see if we know the jurisdiction for this server // If we've got Jurisdictions set, then check to see if we know the jurisdiction for this server
if (_serverJurisdictions) { if (_serverJurisdictions) {
@ -69,7 +69,7 @@ bool OctreeEditPacketSender::serversExist() const {
} }
hasServers = true; hasServers = true;
} }
if (atLeastOneJurisdictionMissing) { if (atLeastOneJurisdictionMissing) {
return false; // no point in looking further - return false from anonymous function return false; // no point in looking further - return false from anonymous function
} else { } else {
@ -91,27 +91,27 @@ void OctreeEditPacketSender::queuePacketToNode(const QUuid& nodeUUID, unsigned c
if (node->getType() == getMyNodeType() if (node->getType() == getMyNodeType()
&& ((node->getUUID() == nodeUUID) || (nodeUUID.isNull())) && ((node->getUUID() == nodeUUID) || (nodeUUID.isNull()))
&& node->getActiveSocket()) { && node->getActiveSocket()) {
// pack sequence number // pack sequence number
int numBytesPacketHeader = numBytesForPacketHeader(reinterpret_cast<char*>(buffer)); int numBytesPacketHeader = numBytesForPacketHeader(reinterpret_cast<char*>(buffer));
unsigned char* sequenceAt = buffer + numBytesPacketHeader; unsigned char* sequenceAt = buffer + numBytesPacketHeader;
quint16 sequence = _outgoingSequenceNumbers[nodeUUID]++; quint16 sequence = _outgoingSequenceNumbers[nodeUUID]++;
memcpy(sequenceAt, &sequence, sizeof(quint16)); memcpy(sequenceAt, &sequence, sizeof(quint16));
// send packet // send packet
QByteArray packet(reinterpret_cast<const char*>(buffer), length); QByteArray packet(reinterpret_cast<const char*>(buffer), length);
queuePacketForSending(node, packet); queuePacketForSending(node, packet);
if (hasDestinationWalletUUID() && satoshiCost > 0) { if (hasDestinationWalletUUID() && satoshiCost > 0) {
// if we have a destination wallet UUID and a cost associated with this packet, signal that it // if we have a destination wallet UUID and a cost associated with this packet, signal that it
// needs to be sent // needs to be sent
emit octreePaymentRequired(satoshiCost, nodeUUID, _destinationWalletUUID); emit octreePaymentRequired(satoshiCost, nodeUUID, _destinationWalletUUID);
} }
// add packet to history // add packet to history
_sentPacketHistories[nodeUUID].packetSent(sequence, packet); _sentPacketHistories[nodeUUID].packetSent(sequence, packet);
// debugging output... // debugging output...
if (wantDebug) { if (wantDebug) {
int numBytesPacketHeader = numBytesForPacketHeader(reinterpret_cast<const char*>(buffer)); int numBytesPacketHeader = numBytesForPacketHeader(reinterpret_cast<const char*>(buffer));
@ -119,7 +119,7 @@ void OctreeEditPacketSender::queuePacketToNode(const QUuid& nodeUUID, unsigned c
quint64 createdAt = (*((quint64*)(buffer + numBytesPacketHeader + sizeof(sequence)))); quint64 createdAt = (*((quint64*)(buffer + numBytesPacketHeader + sizeof(sequence))));
quint64 queuedAt = usecTimestampNow(); quint64 queuedAt = usecTimestampNow();
quint64 transitTime = queuedAt - createdAt; quint64 transitTime = queuedAt - createdAt;
qCDebug(octree) << "OctreeEditPacketSender::queuePacketToNode() queued " << buffer[0] << qCDebug(octree) << "OctreeEditPacketSender::queuePacketToNode() queued " << buffer[0] <<
" - command to node bytes=" << length << " - command to node bytes=" << length <<
" satoshiCost=" << satoshiCost << " satoshiCost=" << satoshiCost <<
@ -192,7 +192,7 @@ void OctreeEditPacketSender::queuePacketToNodes(unsigned char* buffer, size_t le
// But we can't really do that with a packed message, since each edit message could be destined // But we can't really do that with a packed message, since each edit message could be destined
// for a different server... So we need to actually manage multiple queued packets... one // for a different server... So we need to actually manage multiple queued packets... one
// for each server // for each server
DependencyManager::get<NodeList>()->eachNode([&](const SharedNodePointer& node){ DependencyManager::get<NodeList>()->eachNode([&](const SharedNodePointer& node){
// only send to the NodeTypes that are getMyNodeType() // only send to the NodeTypes that are getMyNodeType()
if (node->getActiveSocket() && node->getType() == getMyNodeType()) { if (node->getActiveSocket() && node->getType() == getMyNodeType()) {
@ -251,7 +251,7 @@ void OctreeEditPacketSender::queueOctreeEditMessage(PacketType::Value type, unsi
if (node->getActiveSocket() && node->getType() == getMyNodeType()) { if (node->getActiveSocket() && node->getType() == getMyNodeType()) {
QUuid nodeUUID = node->getUUID(); QUuid nodeUUID = node->getUUID();
bool isMyJurisdiction = true; bool isMyJurisdiction = true;
if (type == PacketTypeEntityErase) { if (type == PacketTypeEntityErase) {
isMyJurisdiction = true; // send erase messages to all servers isMyJurisdiction = true; // send erase messages to all servers
} else if (_serverJurisdictions) { } else if (_serverJurisdictions) {
@ -269,19 +269,19 @@ void OctreeEditPacketSender::queueOctreeEditMessage(PacketType::Value type, unsi
if (isMyJurisdiction) { if (isMyJurisdiction) {
EditPacketBuffer& packetBuffer = _pendingEditPackets[nodeUUID]; EditPacketBuffer& packetBuffer = _pendingEditPackets[nodeUUID];
packetBuffer._nodeUUID = nodeUUID; packetBuffer._nodeUUID = nodeUUID;
// If we're switching type, then we send the last one and start over // If we're switching type, then we send the last one and start over
if ((type != packetBuffer._currentType && packetBuffer._currentSize > 0) || if ((type != packetBuffer._currentType && packetBuffer._currentSize > 0) ||
(packetBuffer._currentSize + length >= (size_t)_maxPacketSize)) { (packetBuffer._currentSize + length >= (size_t)_maxPacketSize)) {
releaseQueuedPacket(packetBuffer); releaseQueuedPacket(packetBuffer);
initializePacket(packetBuffer, type, node->getClockSkewUsec()); initializePacket(packetBuffer, type, node->getClockSkewUsec());
} }
// If the buffer is empty and not correctly initialized for our type... // If the buffer is empty and not correctly initialized for our type...
if (type != packetBuffer._currentType && packetBuffer._currentSize == 0) { if (type != packetBuffer._currentType && packetBuffer._currentSize == 0) {
initializePacket(packetBuffer, type, node->getClockSkewUsec()); initializePacket(packetBuffer, type, node->getClockSkewUsec());
} }
// This is really the first time we know which server/node this particular edit message // This is really the first time we know which server/node this particular edit message
// is going to, so we couldn't adjust for clock skew till now. But here's our chance. // is going to, so we couldn't adjust for clock skew till now. But here's our chance.
// We call this virtual function that allows our specific type of EditPacketSender to // We call this virtual function that allows our specific type of EditPacketSender to
@ -289,7 +289,7 @@ void OctreeEditPacketSender::queueOctreeEditMessage(PacketType::Value type, unsi
if (node->getClockSkewUsec() != 0) { if (node->getClockSkewUsec() != 0) {
adjustEditPacketForClockSkew(type, editPacketBuffer, length, node->getClockSkewUsec()); adjustEditPacketForClockSkew(type, editPacketBuffer, length, node->getClockSkewUsec());
} }
memcpy(&packetBuffer._currentBuffer[packetBuffer._currentSize], editPacketBuffer, length); memcpy(&packetBuffer._currentBuffer[packetBuffer._currentSize], editPacketBuffer, length);
packetBuffer._currentSize += length; packetBuffer._currentSize += length;
packetBuffer._satoshiCost += satoshiCost; packetBuffer._satoshiCost += satoshiCost;
@ -341,7 +341,7 @@ void OctreeEditPacketSender::initializePacket(EditPacketBuffer& packetBuffer, Pa
packetBuffer._currentSize += sizeof(quint64); // nudge past timestamp packetBuffer._currentSize += sizeof(quint64); // nudge past timestamp
packetBuffer._currentType = type; packetBuffer._currentType = type;
// reset cost for packet to 0 // reset cost for packet to 0
packetBuffer._satoshiCost = 0; packetBuffer._satoshiCost = 0;
} }
@ -360,20 +360,22 @@ bool OctreeEditPacketSender::process() {
void OctreeEditPacketSender::processNackPacket(const QByteArray& packet) { void OctreeEditPacketSender::processNackPacket(const QByteArray& packet) {
// parse sending node from packet, retrieve packet history for that node // parse sending node from packet, retrieve packet history for that node
QUuid sendingNodeUUID = uuidFromPacketHeader(packet); QUuid sendingNodeUUID = uuidFromPacketHeader(packet);
// if packet history doesn't exist for the sender node (somehow), bail // if packet history doesn't exist for the sender node (somehow), bail
if (!_sentPacketHistories.contains(sendingNodeUUID)) { if (!_sentPacketHistories.contains(sendingNodeUUID)) {
return; return;
} }
const SentPacketHistory& sentPacketHistory = _sentPacketHistories.value(sendingNodeUUID); const SentPacketHistory& sentPacketHistory = _sentPacketHistories.value(sendingNodeUUID);
// TODO: these NAK packets no longer send the number of sequence numbers - just read out sequence numbers in blocks
int numBytesPacketHeader = numBytesForPacketHeader(packet); int numBytesPacketHeader = numBytesForPacketHeader(packet);
const unsigned char* dataAt = reinterpret_cast<const unsigned char*>(packet.data()) + numBytesPacketHeader; const unsigned char* dataAt = reinterpret_cast<const unsigned char*>(packet.data()) + numBytesPacketHeader;
// read number of sequence numbers // read number of sequence numbers
uint16_t numSequenceNumbers = (*(uint16_t*)dataAt); uint16_t numSequenceNumbers = (*(uint16_t*)dataAt);
dataAt += sizeof(uint16_t); dataAt += sizeof(uint16_t);
// read sequence numbers and queue packets for resend // read sequence numbers and queue packets for resend
for (int i = 0; i < numSequenceNumbers; i++) { for (int i = 0; i < numSequenceNumbers; i++) {
unsigned short int sequenceNumber = (*(unsigned short int*)dataAt); unsigned short int sequenceNumber = (*(unsigned short int*)dataAt);