Merge pull request #13 from huffman/ordered-sending

Ordered sending
This commit is contained in:
Stephen Birarda 2015-08-24 16:41:50 -07:00
commit 9275b954f4
34 changed files with 699 additions and 157 deletions

View file

@ -96,14 +96,11 @@ AudioMixer::AudioMixer(NLPacket& packet) :
// SOON
auto& packetReceiver = DependencyManager::get<NodeList>()->getPacketReceiver();
QSet<PacketType> nodeAudioPackets {
PacketType::MicrophoneAudioNoEcho, PacketType::MicrophoneAudioWithEcho,
PacketType::InjectAudio, PacketType::SilentAudioFrame,
PacketType::AudioStreamStats
};
packetReceiver.registerListenerForTypes(nodeAudioPackets, this, "handleNodeAudioPacket");
packetReceiver.registerListenerForTypes({ PacketType::MicrophoneAudioNoEcho, PacketType::MicrophoneAudioWithEcho,
PacketType::InjectAudio, PacketType::SilentAudioFrame,
PacketType::AudioStreamStats },
this, "handleNodeAudioPacket");
packetReceiver.registerListener(PacketType::MuteEnvironment, this, "handleMuteEnvironmentPacket");
}

View file

@ -110,6 +110,11 @@ DomainServer::DomainServer(int argc, char* argv[]) :
}
}
DomainServer::~DomainServer() {
// destroy the LimitedNodeList before the DomainServer QCoreApplication is down
DependencyManager::destroy<LimitedNodeList>();
}
void DomainServer::aboutToQuit() {
// clear the log handler so that Qt doesn't call the destructor on LogHandler
@ -289,6 +294,9 @@ void DomainServer::setupNodeListAndAssignments(const QUuid& sessionUUID) {
packetReceiver.registerListener(PacketType::ICEPing, this, "processICEPingPacket");
packetReceiver.registerListener(PacketType::ICEPingReply, this, "processICEPingReplyPacket");
packetReceiver.registerListener(PacketType::ICEServerPeerInformation, this, "processICEPeerInformationPacket");
// NodeList won't be available to the settings manager when it is created, so call registerListener here
packetReceiver.registerListener(PacketType::DomainSettingsRequest, &_settingsManager, "processSettingsRequestPacket");
// add whatever static assignments that have been parsed to the queue
addStaticAssignmentsToQueue();

View file

@ -38,7 +38,8 @@ class DomainServer : public QCoreApplication, public HTTPSRequestHandler {
Q_OBJECT
public:
DomainServer(int argc, char* argv[]);
~DomainServer();
static int const EXIT_CODE_REBOOT;
bool handleHTTPRequest(HTTPConnection* connection, const QUrl& url, bool skipSubHandler = false);

View file

@ -22,6 +22,7 @@
#include <Assignment.h>
#include <HifiConfigVariantMap.h>
#include <HTTPConnection.h>
#include <NLPacketList.h>
#include "DomainServerSettingsManager.h"
@ -66,6 +67,21 @@ DomainServerSettingsManager::DomainServerSettingsManager() :
QMetaObject::invokeMethod(QCoreApplication::instance(), "quit", Qt::QueuedConnection);
}
void DomainServerSettingsManager::processSettingsRequestPacket(QSharedPointer<NLPacket> packet) {
Assignment::Type type;
packet->readPrimitive(&type);
QJsonObject responseObject = responseObjectForType(QString::number(type));
auto json = QJsonDocument(responseObject).toJson();
auto packetList = std::unique_ptr<NLPacketList>(new NLPacketList(PacketType::DomainSettings, QByteArray(), true, true));
packetList->write(json);
auto nodeList = DependencyManager::get<LimitedNodeList>();
nodeList->sendPacketList(std::move(packetList), packet->getSenderSockAddr());
}
void DomainServerSettingsManager::setupConfigMap(const QStringList& argumentList) {
_configMap.loadMasterAndUserConfig(argumentList);

View file

@ -18,6 +18,8 @@
#include <HifiConfigVariantMap.h>
#include <HTTPManager.h>
#include <NLPacket.h>
const QString SETTINGS_PATHS_KEY = "paths";
const QString SETTINGS_PATH = "/settings";
@ -38,6 +40,10 @@ public:
QVariantMap& getUserSettingsMap() { return _configMap.getUserConfig(); }
QVariantMap& getSettingsMap() { return _configMap.getMergedConfig(); }
private slots:
void processSettingsRequestPacket(QSharedPointer<NLPacket> packet);
private:
QJsonObject responseObjectForType(const QString& typeValue, bool isAuthenticated = false);
void recurseJSONObjectAndOverwriteSettings(const QJsonObject& postedObject, QVariantMap& settingsVariant);

View file

@ -18,13 +18,10 @@
OctreePacketProcessor::OctreePacketProcessor() {
auto& packetReceiver = DependencyManager::get<NodeList>()->getPacketReceiver();
QSet<PacketType> types {
PacketType::OctreeStats, PacketType::EntityData,
PacketType::EntityErase, PacketType::OctreeStats
};
packetReceiver.registerDirectListenerForTypes(types, this, "handleOctreePacket");
packetReceiver.registerDirectListenerForTypes({ PacketType::OctreeStats, PacketType::EntityData,
PacketType::EntityErase, PacketType::OctreeStats },
this, "handleOctreePacket");
}
void OctreePacketProcessor::handleOctreePacket(QSharedPointer<NLPacket> packet, SharedNodePointer senderNode) {

View file

@ -159,3 +159,10 @@ QDataStream& operator>>(QDataStream &in, Assignment& assignment) {
return in;
}
uint qHash(const Assignment::Type& key, uint seed) {
// seems odd that Qt couldn't figure out this cast itself, but this fixes a compile error after switch to
// strongly typed enum for PacketType
return qHash((uint8_t) key, seed);
}

View file

@ -25,7 +25,7 @@ class Assignment : public NodeData {
Q_OBJECT
public:
enum Type {
enum Type : uint8_t {
AudioMixerType = 0,
AvatarMixerType = 1,
AgentType = 2,
@ -100,4 +100,6 @@ protected:
QUuid _walletUUID; /// the UUID for the wallet that should be paid for this assignment
};
uint qHash(const Assignment::Type& key, uint seed);
#endif // hifi_Assignment_h

View file

@ -17,7 +17,9 @@
#include "Assignment.h"
#include "HifiSockAddr.h"
#include "NodeList.h"
#include "udt/Packet.h"
#include "udt/PacketHeaders.h"
#include "NLPacket.h"
#include "SharedUtil.h"
#include "UserActivityLogger.h"
#include "NetworkLogging.h"
@ -39,7 +41,7 @@ DomainHandler::DomainHandler(QObject* parent) :
_failedSettingsRequests(0)
{
_sockAddr.setObjectName("DomainServer");
// if we get a socket that make sure our NetworkPeer ping timer stops
connect(this, &DomainHandler::completedSocketDiscovery, &_icePeer, &NetworkPeer::stopPingTimer);
}
@ -233,21 +235,15 @@ void DomainHandler::requestDomainSettings() {
emit settingsReceived(_settingsObject);
} else {
if (_settingsObject.isEmpty()) {
// setup the URL required to grab settings JSON
QUrl settingsJSONURL;
settingsJSONURL.setScheme("http");
settingsJSONURL.setHost(_hostname);
settingsJSONURL.setPort(DOMAIN_SERVER_HTTP_PORT);
settingsJSONURL.setPath("/settings.json");
qCDebug(networking) << "Requesting settings from domain server";
Assignment::Type assignmentType = Assignment::typeForNodeType(DependencyManager::get<NodeList>()->getOwnerType());
settingsJSONURL.setQuery(QString("type=%1").arg(assignmentType));
qCDebug(networking) << "Requesting domain-server settings at" << settingsJSONURL.toString();
auto packet = NLPacket::create(PacketType::DomainSettingsRequest, sizeof(assignmentType), true, false);
packet->writePrimitive(assignmentType);
QNetworkRequest settingsRequest(settingsJSONURL);
settingsRequest.setHeader(QNetworkRequest::UserAgentHeader, HIGH_FIDELITY_USER_AGENT);
QNetworkReply* reply = NetworkAccessManager::getInstance().get(settingsRequest);
connect(reply, &QNetworkReply::finished, this, &DomainHandler::settingsRequestFinished);
auto nodeList = DependencyManager::get<LimitedNodeList>();
nodeList->sendPacket(std::move(packet), _sockAddr);
}
}
}
@ -286,6 +282,19 @@ void DomainHandler::settingsRequestFinished() {
settingsReply->deleteLater();
}
void DomainHandler::processSettingsPacketList(QSharedPointer<NLPacketList> packetList) {
auto data = packetList->getMessage();
_settingsObject = QJsonDocument::fromJson(data).object();
qCDebug(networking) << "Received domain settings: \n" << QString(data);
// reset failed settings requests to 0, we got them
_failedSettingsRequests = 0;
emit settingsReceived(_settingsObject);
}
void DomainHandler::processICEPingReplyPacket(QSharedPointer<NLPacket> packet) {
const HifiSockAddr& senderSockAddr = packet->getSenderSockAddr();
qCDebug(networking) << "Received reply from domain-server on" << senderSockAddr;

View file

@ -22,6 +22,8 @@
#include "HifiSockAddr.h"
#include "NetworkPeer.h"
#include "NLPacket.h"
#include "NLPacketList.h"
#include "Node.h"
const unsigned short DEFAULT_DOMAIN_SERVER_PORT = 40102;
const unsigned short DEFAULT_DOMAIN_SERVER_DTLS_PORT = 40103;
@ -85,6 +87,7 @@ public slots:
void setHostnameAndPort(const QString& hostname, quint16 port = DEFAULT_DOMAIN_SERVER_PORT);
void setIceServerHostnameAndID(const QString& iceServerHostname, const QUuid& id);
void processSettingsPacketList(QSharedPointer<NLPacketList> packetList);
void processICEPingReplyPacket(QSharedPointer<NLPacket> packet);
void processDTLSRequirementPacket(QSharedPointer<NLPacket> dtlsRequirementPacket);
void processICEResponsePacket(QSharedPointer<NLPacket> icePacket);

View file

@ -95,6 +95,7 @@ LimitedNodeList::LimitedNodeList(unsigned short socketListenPort, unsigned short
// set &PacketReceiver::handleVerifiedPacket as the verified packet callback for the udt::Socket
using std::placeholders::_1;
_nodeSocket.setPacketHandler(std::bind(&PacketReceiver::handleVerifiedPacket, _packetReceiver, _1));
_nodeSocket.setPacketListHandler(std::bind(&PacketReceiver::handleVerifiedPacketList, _packetReceiver, _1));
// set our isPacketVerified method as the verify operator for the udt::Socket
_nodeSocket.setPacketFilterOperator(std::bind(&LimitedNodeList::isPacketVerified, this, _1));
@ -258,6 +259,7 @@ void LimitedNodeList::fillPacketHeader(const NLPacket& packet, const QUuid& conn
}
qint64 LimitedNodeList::sendUnreliablePacket(const NLPacket& packet, const Node& destinationNode) {
Q_ASSERT(!packet.isPartOfMessage());
if (!destinationNode.getActiveSocket()) {
return 0;
}
@ -267,6 +269,7 @@ qint64 LimitedNodeList::sendUnreliablePacket(const NLPacket& packet, const Node&
qint64 LimitedNodeList::sendUnreliablePacket(const NLPacket& packet, const HifiSockAddr& sockAddr,
const QUuid& connectionSecret) {
Q_ASSERT(!packet.isPartOfMessage());
Q_ASSERT_X(!packet.isReliable(), "LimitedNodeList::sendUnreliablePacket",
"Trying to send a reliable packet unreliably.");
@ -277,6 +280,7 @@ qint64 LimitedNodeList::sendUnreliablePacket(const NLPacket& packet, const HifiS
}
qint64 LimitedNodeList::sendPacket(std::unique_ptr<NLPacket> packet, const Node& destinationNode) {
Q_ASSERT(!packet->isPartOfMessage());
if (!destinationNode.getActiveSocket()) {
return 0;
}
@ -286,6 +290,7 @@ qint64 LimitedNodeList::sendPacket(std::unique_ptr<NLPacket> packet, const Node&
qint64 LimitedNodeList::sendPacket(std::unique_ptr<NLPacket> packet, const HifiSockAddr& sockAddr,
const QUuid& connectionSecret) {
Q_ASSERT(!packet->isPartOfMessage());
if (packet->isReliable()) {
collectPacketStats(*packet);
fillPacketHeader(*packet, connectionSecret);
@ -332,6 +337,13 @@ qint64 LimitedNodeList::sendPacketList(NLPacketList& packetList, const HifiSockA
return bytesSent;
}
qint64 LimitedNodeList::sendPacketList(std::unique_ptr<NLPacketList> packetList, const HifiSockAddr& sockAddr) {
// close the last packet in the list
packetList->closeCurrentPacket();
return _nodeSocket.writePacketList(std::move(packetList), sockAddr);
}
qint64 LimitedNodeList::sendPacket(std::unique_ptr<NLPacket> packet, const Node& destinationNode,
const HifiSockAddr& overridenSockAddr) {
// use the node's active socket as the destination socket if there is no overriden socket address

View file

@ -127,6 +127,7 @@ public:
qint64 sendPacketList(NLPacketList& packetList, const Node& destinationNode);
qint64 sendPacketList(NLPacketList& packetList, const HifiSockAddr& sockAddr,
const QUuid& connectionSecret = QUuid());
qint64 sendPacketList(std::unique_ptr<NLPacketList> packetList, const HifiSockAddr& sockAddr);
void (*linkedDataCreateCallback)(Node *);

View file

@ -54,7 +54,7 @@ std::unique_ptr<NLPacket> NLPacket::fromBase(std::unique_ptr<Packet> packet) {
Q_ASSERT(packet);
// call our constructor to create an NLPacket from this Packet
return std::unique_ptr<NLPacket>(new NLPacket(std::move(packet)));
return std::unique_ptr<NLPacket>(new NLPacket(std::move(*packet)));
}
std::unique_ptr<NLPacket> NLPacket::createCopy(const NLPacket& other) {
@ -71,8 +71,8 @@ NLPacket::NLPacket(PacketType type, qint64 size, bool isReliable, bool isPartOfM
writeTypeAndVersion();
}
NLPacket::NLPacket(std::unique_ptr<Packet> packet) :
Packet(std::move(*packet.release()))
NLPacket::NLPacket(Packet&& packet) :
Packet(std::move(packet))
{
readType();
readVersion();

View file

@ -63,10 +63,10 @@ protected:
NLPacket(PacketType type, qint64 size = -1, bool forceReliable = false, bool isPartOfMessage = false);
NLPacket(std::unique_ptr<char[]> data, qint64 size, const HifiSockAddr& senderSockAddr);
NLPacket(std::unique_ptr<Packet> packet);
NLPacket(const NLPacket& other);
NLPacket(NLPacket&& other);
NLPacket(Packet&& other);
NLPacket& operator=(const NLPacket& other);
NLPacket& operator=(NLPacket&& other);

View file

@ -11,15 +11,27 @@
#include "NLPacketList.h"
#include "NLPacket.h"
#include "udt/Packet.h"
NLPacketList::NLPacketList(PacketType packetType, QByteArray extendedHeader) :
PacketList(packetType, extendedHeader)
NLPacketList::NLPacketList(PacketType packetType, QByteArray extendedHeader, bool isReliable, bool isOrdered) :
PacketList(packetType, extendedHeader, isReliable, isOrdered)
{
}
NLPacketList::NLPacketList(PacketList&& other) : PacketList(other.getType(), other.getExtendedHeader(), other.isReliable(), other.isOrdered()) {
// Update _packets
for (auto& packet : other._packets) {
auto nlPacket = NLPacket::fromBase(std::move(packet));
_packets.push_back(std::move(nlPacket));
}
if (_packets.size() > 0) {
auto nlPacket = static_cast<const NLPacket*>(_packets.front().get());
_sourceID = nlPacket->getSourceID();
_packetType = nlPacket->getType();
}
}
std::unique_ptr<udt::Packet> NLPacketList::createPacket() {
return NLPacket::create(getType());
return NLPacket::create(getType(), -1, isReliable(), isOrdered());
}

View file

@ -14,15 +14,22 @@
#include "udt/PacketList.h"
#include "NLPacket.h"
class NLPacketList : public udt::PacketList {
public:
NLPacketList(PacketType packetType, QByteArray extendedHeader = QByteArray());
NLPacketList(PacketType packetType, QByteArray extendedHeader = QByteArray(), bool isReliable = false, bool isOrdered = false);
NLPacketList(PacketList&& packetList);
const QUuid& getSourceID() const { return _sourceID; }
private:
NLPacketList(const NLPacketList& other) = delete;
NLPacketList& operator=(const NLPacketList& other) = delete;
virtual std::unique_ptr<udt::Packet> createPacket();
QUuid _sourceID;
};
#endif // hifi_PacketList_h

View file

@ -95,6 +95,7 @@ NodeList::NodeList(char newOwnerType, unsigned short socketListenPort, unsigned
packetReceiver.registerListener(PacketType::ICEPing, this, "processICEPingPacket");
packetReceiver.registerListener(PacketType::DomainServerAddedNode, this, "processDomainServerAddedNode");
packetReceiver.registerListener(PacketType::DomainServerConnectionToken, this, "processDomainServerConnectionTokenPacket");
packetReceiver.registerMessageListener(PacketType::DomainSettings, &_domainHandler, "processSettingsPacketList");
packetReceiver.registerListener(PacketType::ICEServerPeerInformation, &_domainHandler, "processICEResponsePacket");
packetReceiver.registerListener(PacketType::DomainServerRequireDTLS, &_domainHandler, "processDTLSRequirementPacket");
packetReceiver.registerListener(PacketType::ICEPingReply, &_domainHandler, "processICEPingReplyPacket");

View file

@ -12,85 +12,111 @@
#include "PacketReceiver.h"
#include <QMutexLocker>
#include "DependencyManager.h"
#include "NetworkLogging.h"
#include "NodeList.h"
#include "SharedUtil.h"
PacketReceiver::PacketReceiver(QObject* parent) :
QObject(parent),
_packetListenerMap()
{
PacketReceiver::PacketReceiver(QObject* parent) : QObject(parent) {
qRegisterMetaType<QSharedPointer<NLPacket>>();
}
bool PacketReceiver::registerListenerForTypes(const QSet<PacketType>& types, QObject* listener, const char* slot) {
QSet<PacketType> nonSourcedTypes;
QSet<PacketType> sourcedTypes;
foreach(PacketType type, types) {
if (NON_SOURCED_PACKETS.contains(type)) {
nonSourcedTypes << type;
} else {
sourcedTypes << type;
}
}
Q_ASSERT(listener);
if (nonSourcedTypes.size() > 0) {
QMetaMethod nonSourcedMethod = matchingMethodForListener(*nonSourcedTypes.begin(), listener, slot);
if (nonSourcedMethod.isValid()) {
foreach(PacketType type, nonSourcedTypes) {
registerVerifiedListener(type, listener, nonSourcedMethod);
}
} else {
bool PacketReceiver::registerListenerForTypes(PacketTypeList types, QObject* listener, const char* slot) {
Q_ASSERT_X(!types.empty(), "PacketReceiver::registerListenerForTypes", "No types to register");
Q_ASSERT_X(listener, "PacketReceiver::registerListenerForTypes", "No object to register");
Q_ASSERT_X(slot, "PacketReceiver::registerListenerForTypes", "No slot to register");
// Partition types based on whether they are sourced or not (non sourced in front)
auto middle = std::partition(std::begin(types), std::end(types), [](PacketType type) {
return NON_SOURCED_PACKETS.contains(type);
});
QMetaMethod nonSourcedMethod, sourcedMethod;
// Check we have a valid method for non sourced types if any
if (middle != std::begin(types)) {
nonSourcedMethod = matchingMethodForListener(*std::begin(types), listener, slot);
if (!nonSourcedMethod.isValid()) {
return false;
}
}
if (sourcedTypes.size() > 0) {
QMetaMethod sourcedMethod = matchingMethodForListener(*sourcedTypes.begin(), listener, slot);
if (sourcedMethod.isValid()) {
foreach(PacketType type, sourcedTypes) {
registerVerifiedListener(type, listener, sourcedMethod);
}
} else {
// Check we have a valid method for sourced types if any
if (middle != std::end(types)) {
sourcedMethod = matchingMethodForListener(*middle, listener, slot);
if (!sourcedMethod.isValid()) {
return false;
}
}
// Register non sourced types
std::for_each(std::begin(types), middle, [this, &listener, &nonSourcedMethod](PacketType type) {
registerVerifiedListener(type, listener, nonSourcedMethod);
});
// Register sourced types
std::for_each(middle, std::end(types), [this, &listener, &sourcedMethod](PacketType type) {
registerVerifiedListener(type, listener, sourcedMethod);
});
return true;
}
void PacketReceiver::registerDirectListener(PacketType type, QObject* listener, const char* slot) {
Q_ASSERT_X(listener, "PacketReceiver::registerDirectListener", "No object to register");
Q_ASSERT_X(slot, "PacketReceiver::registerDirectListener", "No slot to register");
bool success = registerListener(type, listener, slot);
if (success) {
_directConnectSetMutex.lock();
QMutexLocker locker(&_directConnectSetMutex);
// if we successfully registered, add this object to the set of objects that are directly connected
_directlyConnectedObjects.insert(listener);
_directConnectSetMutex.unlock();
}
}
void PacketReceiver::registerDirectListenerForTypes(const QSet<PacketType>& types,
void PacketReceiver::registerDirectListenerForTypes(PacketTypeList types,
QObject* listener, const char* slot) {
Q_ASSERT_X(listener, "PacketReceiver::registerDirectListenerForTypes", "No object to register");
Q_ASSERT_X(slot, "PacketReceiver::registerDirectListenerForTypes", "No slot to register");
// just call register listener for types to start
bool success = registerListenerForTypes(types, listener, slot);
bool success = registerListenerForTypes(std::move(types), listener, slot);
if (success) {
_directConnectSetMutex.lock();
QMutexLocker locker(&_directConnectSetMutex);
// if we successfully registered, add this object to the set of objects that are directly connected
_directlyConnectedObjects.insert(listener);
_directConnectSetMutex.unlock();
}
}
bool PacketReceiver::registerMessageListener(PacketType type, QObject* listener, const char* slot) {
Q_ASSERT_X(listener, "PacketReceiver::registerMessageListener", "No object to register");
Q_ASSERT_X(slot, "PacketReceiver::registerMessageListener", "No slot to register");
QMetaMethod matchingMethod = matchingMethodForListener(type, listener, slot);
if (matchingMethod.isValid()) {
QMutexLocker locker(&_packetListenerLock);
if (_packetListListenerMap.contains(type)) {
qCWarning(networking) << "Registering a packet listener for packet type" << type
<< "that will remove a previously registered listener";
}
// add the mapping
_packetListListenerMap[type] = ObjectMethodPair(QPointer<QObject>(listener), matchingMethod);
return true;
} else {
return false;
}
}
bool PacketReceiver::registerListener(PacketType type, QObject* listener, const char* slot) {
Q_ASSERT(listener);
Q_ASSERT_X(listener, "PacketReceiver::registerListener", "No object to register");
Q_ASSERT_X(slot, "PacketReceiver::registerListener", "No slot to register");
QMetaMethod matchingMethod = matchingMethodForListener(type, listener, slot);
@ -103,23 +129,33 @@ bool PacketReceiver::registerListener(PacketType type, QObject* listener, const
}
QMetaMethod PacketReceiver::matchingMethodForListener(PacketType type, QObject* object, const char* slot) const {
Q_ASSERT(object);
Q_ASSERT_X(object, "PacketReceiver::matchingMethodForListener", "No object to call");
Q_ASSERT_X(slot, "PacketReceiver::matchingMethodForListener", "No slot to call");
// normalize the slot with the expected parameters
static const QString SIGNATURE_TEMPLATE("%1(%2)");
static const QString NON_SOURCED_PACKET_LISTENER_PARAMETERS = "QSharedPointer<NLPacket>";
static const QString NON_SOURCED_PACKETLIST_LISTENER_PARAMETERS = "QSharedPointer<NLPacketList>";
const QString NON_SOURCED_PACKET_LISTENER_PARAMETERS = "QSharedPointer<NLPacket>";
QSet<QString> possibleSignatures { QString("%1(%2)").arg(slot).arg(NON_SOURCED_PACKET_LISTENER_PARAMETERS) };
QSet<QString> possibleSignatures {
SIGNATURE_TEMPLATE.arg(slot, NON_SOURCED_PACKET_LISTENER_PARAMETERS),
SIGNATURE_TEMPLATE.arg(slot, NON_SOURCED_PACKETLIST_LISTENER_PARAMETERS)
};
if (!NON_SOURCED_PACKETS.contains(type)) {
static const QString SOURCED_PACKET_LISTENER_PARAMETERS = "QSharedPointer<NLPacket>,QSharedPointer<Node>";
static const QString TYPEDEF_SOURCED_PACKET_LISTENER_PARAMETERS = "QSharedPointer<NLPacket>,SharedNodePointer";
static const QString SOURCED_PACKETLIST_LISTENER_PARAMETERS = "QSharedPointer<NLPacketList>,QSharedPointer<Node>";
static const QString TYPEDEF_SOURCED_PACKETLIST_LISTENER_PARAMETERS = "QSharedPointer<NLPacketList>,SharedNodePointer";
// a sourced packet must take the shared pointer to the packet but optionally could include
// a shared pointer to the node
possibleSignatures << QString("%1(%2)").arg(slot).arg(TYPEDEF_SOURCED_PACKET_LISTENER_PARAMETERS);
possibleSignatures << QString("%1(%2)").arg(slot).arg(SOURCED_PACKET_LISTENER_PARAMETERS);
possibleSignatures << SIGNATURE_TEMPLATE.arg(slot, TYPEDEF_SOURCED_PACKET_LISTENER_PARAMETERS);
possibleSignatures << SIGNATURE_TEMPLATE.arg(slot, SOURCED_PACKET_LISTENER_PARAMETERS);
possibleSignatures << SIGNATURE_TEMPLATE.arg(slot, TYPEDEF_SOURCED_PACKETLIST_LISTENER_PARAMETERS);
possibleSignatures << SIGNATURE_TEMPLATE.arg(slot, SOURCED_PACKETLIST_LISTENER_PARAMETERS);
}
int methodIndex = -1;
@ -137,7 +173,7 @@ QMetaMethod PacketReceiver::matchingMethodForListener(PacketType type, QObject*
}
if (methodIndex < 0) {
qDebug() << "PacketReceiver::registerListener expected a slot with one of the following signatures:"
qCDebug(networking) << "PacketReceiver::registerListener expected a slot with one of the following signatures:"
<< possibleSignatures.toList() << "- but such a slot was not found."
<< "Could not complete listener registration for type" << type;
}
@ -155,39 +191,155 @@ QMetaMethod PacketReceiver::matchingMethodForListener(PacketType type, QObject*
}
void PacketReceiver::registerVerifiedListener(PacketType type, QObject* object, const QMetaMethod& slot) {
_packetListenerLock.lock();
Q_ASSERT_X(object, "PacketReceiver::registerVerifiedListener", "No object to register");
QMutexLocker locker(&_packetListenerLock);
if (_packetListenerMap.contains(type)) {
qDebug() << "Warning: Registering a packet listener for packet type" << type
qCWarning(networking) << "Registering a packet listener for packet type" << type
<< "that will remove a previously registered listener";
}
// add the mapping
_packetListenerMap[type] = ObjectMethodPair(QPointer<QObject>(object), slot);
_packetListenerLock.unlock();
}
void PacketReceiver::unregisterListener(QObject* listener) {
_packetListenerLock.lock();
Q_ASSERT_X(listener, "PacketReceiver::unregisterListener", "No listener to unregister");
QMutexLocker packetListenerLocker(&_packetListenerLock);
std::remove_if(std::begin(_packetListenerMap), std::end(_packetListenerMap),
[&listener](const ObjectMethodPair& pair) {
return pair.first == listener;
});
packetListenerLocker.unlock();
QMutexLocker directConnectSetLocker(&_directConnectSetMutex);
_directlyConnectedObjects.remove(listener);
}
auto it = _packetListenerMap.begin();
while (it != _packetListenerMap.end()) {
if (it.value().first == listener) {
// this listener matches - erase it
it = _packetListenerMap.erase(it);
} else {
++it;
}
void PacketReceiver::handleVerifiedPacketList(std::unique_ptr<udt::PacketList> packetList) {
// if we're supposed to drop this packet then break out here
if (_shouldDropPackets) {
return;
}
_packetListenerLock.unlock();
// setup an NLPacketList from the PacketList we were passed
auto nlPacketList = new NLPacketList(std::move(*packetList));
auto nodeList = DependencyManager::get<LimitedNodeList>();
_directConnectSetMutex.lock();
_directlyConnectedObjects.remove(listener);
_directConnectSetMutex.unlock();
_inPacketCount += nlPacketList->getNumPackets();
_inByteCount += nlPacketList->getDataSize();
SharedNodePointer matchingNode;
if (!nlPacketList->getSourceID().isNull()) {
matchingNode = nodeList->nodeWithUUID(nlPacketList->getSourceID());
if (matchingNode) {
// No matter if this packet is handled or not, we update the timestamp for the last time we heard
// from this sending node
matchingNode->setLastHeardMicrostamp(usecTimestampNow());
}
}
QMutexLocker packetListenerLocker(&_packetListenerLock);
bool listenerIsDead = false;
auto it = _packetListListenerMap.find(nlPacketList->getType());
if (it != _packetListListenerMap.end() && it->second.isValid()) {
auto listener = it.value();
if (listener.first) {
bool success = false;
Qt::ConnectionType connectionType;
// check if this is a directly connected listener
{
QMutexLocker directConnectLocker(&_directConnectSetMutex);
connectionType = _directlyConnectedObjects.contains(listener.first) ? Qt::DirectConnection : Qt::AutoConnection;
}
PacketType packetType = nlPacketList->getType();
if (matchingNode) {
emit dataReceived(matchingNode->getType(), nlPacketList->getDataSize());
QMetaMethod metaMethod = listener.second;
static const QByteArray QSHAREDPOINTER_NODE_NORMALIZED = QMetaObject::normalizedType("QSharedPointer<Node>");
static const QByteArray SHARED_NODE_NORMALIZED = QMetaObject::normalizedType("SharedNodePointer");
// one final check on the QPointer before we go to invoke
if (listener.first) {
if (metaMethod.parameterTypes().contains(SHARED_NODE_NORMALIZED)) {
success = metaMethod.invoke(listener.first,
connectionType,
Q_ARG(QSharedPointer<NLPacketList>,
QSharedPointer<NLPacketList>(nlPacketList)),
Q_ARG(SharedNodePointer, matchingNode));
} else if (metaMethod.parameterTypes().contains(QSHAREDPOINTER_NODE_NORMALIZED)) {
success = metaMethod.invoke(listener.first,
connectionType,
Q_ARG(QSharedPointer<NLPacketList>,
QSharedPointer<NLPacketList>(nlPacketList)),
Q_ARG(QSharedPointer<Node>, matchingNode));
} else {
success = metaMethod.invoke(listener.first,
connectionType,
Q_ARG(QSharedPointer<NLPacketList>,
QSharedPointer<NLPacketList>(nlPacketList)));
}
} else {
listenerIsDead = true;
}
} else {
emit dataReceived(NodeType::Unassigned, nlPacketList->getDataSize());
// one final check on the QPointer before we invoke
if (listener.first) {
success = listener.second.invoke(listener.first,
Q_ARG(QSharedPointer<NLPacketList>,
QSharedPointer<NLPacketList>(nlPacketList)));
} else {
listenerIsDead = true;
}
}
if (!success) {
qCDebug(networking).nospace() << "Error delivering packet " << packetType << " to listener "
<< listener.first << "::" << qPrintable(listener.second.methodSignature());
}
} else {
listenerIsDead = true;
}
if (listenerIsDead) {
qCDebug(networking).nospace() << "Listener for packet " << nlPacketList->getType()
<< " has been destroyed. Removing from listener map.";
it = _packetListListenerMap.erase(it);
// if it exists, remove the listener from _directlyConnectedObjects
{
QMutexLocker directConnectLocker(&_directConnectSetMutex);
_directlyConnectedObjects.remove(listener.first);
}
}
} else if (it == _packetListListenerMap.end()) {
qCWarning(networking) << "No listener found for packet type" << nlPacketList->getType();
// insert a dummy listener so we don't print this again
_packetListListenerMap.insert(nlPacketList->getType(), { nullptr, QMetaMethod() });
}
}
void PacketReceiver::handleVerifiedPacket(std::unique_ptr<udt::Packet> packet) {
@ -217,7 +369,7 @@ void PacketReceiver::handleVerifiedPacket(std::unique_ptr<udt::Packet> packet) {
}
}
_packetListenerLock.lock();
QMutexLocker packetListenerLocker(&_packetListenerLock);
bool listenerIsDead = false;
@ -232,12 +384,10 @@ void PacketReceiver::handleVerifiedPacket(std::unique_ptr<udt::Packet> packet) {
bool success = false;
// check if this is a directly connected listener
_directConnectSetMutex.lock();
QMutexLocker directConnectSetLocker(&_directConnectSetMutex);
Qt::ConnectionType connectionType =
_directlyConnectedObjects.contains(listener.first) ? Qt::DirectConnection : Qt::AutoConnection;
_directConnectSetMutex.unlock();
directConnectSetLocker.unlock();
PacketType packetType = nlPacket->getType();
@ -288,7 +438,7 @@ void PacketReceiver::handleVerifiedPacket(std::unique_ptr<udt::Packet> packet) {
}
if (!success) {
qDebug().nospace() << "Error delivering packet " << packetType << " to listener "
qCDebug(networking).nospace() << "Error delivering packet " << packetType << " to listener "
<< listener.first << "::" << qPrintable(listener.second.methodSignature());
}
@ -297,23 +447,23 @@ void PacketReceiver::handleVerifiedPacket(std::unique_ptr<udt::Packet> packet) {
}
if (listenerIsDead) {
qDebug().nospace() << "Listener for packet " << nlPacket->getType()
qCDebug(networking).nospace() << "Listener for packet " << nlPacket->getType()
<< " has been destroyed. Removing from listener map.";
it = _packetListenerMap.erase(it);
// if it exists, remove the listener from _directlyConnectedObjects
_directConnectSetMutex.lock();
QMutexLocker locker(&_directConnectSetMutex);
_directlyConnectedObjects.remove(listener.first);
_directConnectSetMutex.unlock();
locker.unlock();
}
} else if (it == _packetListenerMap.end()) {
qWarning() << "No listener found for packet type" << nlPacket->getType();
qCWarning(networking) << "No listener found for packet type" << nlPacket->getType();
// insert a dummy listener so we don't print this again
_packetListenerMap.insert(nlPacket->getType(), { nullptr, QMetaMethod() });
}
_packetListenerLock.unlock();
packetListenerLocker.unlock();
}

View file

@ -13,6 +13,8 @@
#ifndef hifi_PacketReceiver_h
#define hifi_PacketReceiver_h
#include <vector>
#include <QtCore/QMap>
#include <QtCore/QMetaMethod>
#include <QtCore/QMutex>
@ -21,6 +23,7 @@
#include <QtCore/QSet>
#include "NLPacket.h"
#include "NLPacketList.h"
#include "udt/PacketHeaders.h"
class EntityEditPacketSender;
@ -29,6 +32,8 @@ class OctreePacketProcessor;
class PacketReceiver : public QObject {
Q_OBJECT
public:
using PacketTypeList = std::vector<PacketType>;
PacketReceiver(QObject* parent = 0);
PacketReceiver(const PacketReceiver&) = delete;
@ -41,11 +46,13 @@ public:
void resetCounters() { _inPacketCount = 0; _inByteCount = 0; }
bool registerListenerForTypes(const QSet<PacketType>& types, QObject* listener, const char* slot);
bool registerListenerForTypes(PacketTypeList types, QObject* listener, const char* slot);
bool registerMessageListener(PacketType type, QObject* listener, const char* slot);
bool registerListener(PacketType type, QObject* listener, const char* slot);
void unregisterListener(QObject* listener);
void handleVerifiedPacket(std::unique_ptr<udt::Packet> packet);
void handleVerifiedPacketList(std::unique_ptr<udt::PacketList> packetList);
signals:
void dataReceived(quint8 channelType, int bytes);
@ -53,7 +60,7 @@ signals:
private:
// these are brutal hacks for now - ideally GenericThread / ReceivedPacketProcessor
// should be changed to have a true event loop and be able to handle our QMetaMethod::invoke
void registerDirectListenerForTypes(const QSet<PacketType>& types, QObject* listener, const char* slot);
void registerDirectListenerForTypes(PacketTypeList types, QObject* listener, const char* slot);
void registerDirectListener(PacketType type, QObject* listener, const char* slot);
QMetaMethod matchingMethodForListener(PacketType type, QObject* object, const char* slot) const;
@ -63,6 +70,7 @@ private:
QMutex _packetListenerLock;
QHash<PacketType, ObjectMethodPair> _packetListenerMap;
QHash<PacketType, ObjectMethodPair> _packetListListenerMap;
int _inPacketCount = 0;
int _inByteCount = 0;
bool _shouldDropPackets = false;

View file

@ -16,9 +16,12 @@
#include <NumericalConstants.h>
#include "../HifiSockAddr.h"
#include "../NetworkLogging.h"
#include "CongestionControl.h"
#include "ControlPacket.h"
#include "Packet.h"
#include "PacketList.h"
#include "Socket.h"
using namespace udt;
@ -55,7 +58,8 @@ Connection::~Connection() {
_sendQueue->deleteLater();
_sendQueue.release();
// wait on the send queue thread so we know the send queue is gone
// wait on the send queue thread so we know the send queue is gone
sendQueueThread->quit();
sendQueueThread->wait();
}
}
@ -79,7 +83,32 @@ SendQueue& Connection::getSendQueue() {
void Connection::sendReliablePacket(std::unique_ptr<Packet> packet) {
Q_ASSERT_X(packet->isReliable(), "Connection::send", "Trying to send an unreliable packet reliably.");
getSendQueue().queuePacket(move(packet));
getSendQueue().queuePacket(std::move(packet));
}
void Connection::sendReliablePacketList(std::unique_ptr<PacketList> packetList) {
Q_ASSERT_X(packetList->isReliable(), "Connection::send", "Trying to send an unreliable packet reliably.");
getSendQueue().queuePacketList(std::move(packetList));
}
void Connection::queueReceivedMessagePacket(std::unique_ptr<Packet> packet) {
Q_ASSERT(packet->isPartOfMessage());
auto messageNumber = packet->getMessageNumber();
PendingReceivedMessage& pendingMessage = _pendingReceivedMessages[messageNumber];
pendingMessage.enqueuePacket(std::move(packet));
if (pendingMessage.isComplete()) {
// All messages have been received, create PacketList
auto packetList = PacketList::fromReceivedPackets(std::move(pendingMessage._packets));
_pendingReceivedMessages.erase(messageNumber);
if (_parentSocket) {
_parentSocket->messageReceived(std::move(packetList));
}
}
}
void Connection::sync() {
@ -609,3 +638,37 @@ void Connection::updateCongestionControlAndSendQueue(std::function<void ()> cong
_stats.recordPacketSendPeriod(_congestionControl->_packetSendPeriod);
_stats.recordCongestionWindowSize(_congestionControl->_congestionWindowSize);
}
void PendingReceivedMessage::enqueuePacket(std::unique_ptr<Packet> packet) {
if (_isComplete) {
qCDebug(networking) << "UNEXPECTED: Received packet for a message that is already complete";
return;
}
if (packet->getPacketPosition() == Packet::PacketPosition::FIRST) {
_hasFirstSequenceNumber = true;
_firstSequenceNumber = packet->getSequenceNumber();
} else if (packet->getPacketPosition() == Packet::PacketPosition::LAST) {
_hasLastSequenceNumber = true;
_lastSequenceNumber = packet->getSequenceNumber();
} else if (packet->getPacketPosition() == Packet::PacketPosition::ONLY) {
_hasFirstSequenceNumber = true;
_hasLastSequenceNumber = true;
_firstSequenceNumber = packet->getSequenceNumber();
_lastSequenceNumber = packet->getSequenceNumber();
}
_packets.push_back(std::move(packet));
if (_hasFirstSequenceNumber && _hasLastSequenceNumber) {
auto numPackets = udt::seqlen(_firstSequenceNumber, _lastSequenceNumber);
if (uint64_t(numPackets) == _packets.size()) {
_isComplete = true;
// Sort packets by sequence number
_packets.sort([](std::unique_ptr<Packet>& a, std::unique_ptr<Packet>& b) {
return a->getSequenceNumber() < b->getSequenceNumber();
});
}
}
}

View file

@ -30,8 +30,24 @@ namespace udt {
class CongestionControl;
class ControlPacket;
class Packet;
class PacketList;
class Socket;
class PendingReceivedMessage {
public:
void enqueuePacket(std::unique_ptr<Packet> packet);
bool isComplete() const { return _isComplete; }
std::list<std::unique_ptr<Packet>> _packets;
private:
bool _isComplete { false };
bool _hasFirstSequenceNumber { false };
bool _hasLastSequenceNumber { false };
SequenceNumber _firstSequenceNumber;
SequenceNumber _lastSequenceNumber;
};
class Connection : public QObject {
Q_OBJECT
public:
@ -43,12 +59,15 @@ public:
~Connection();
void sendReliablePacket(std::unique_ptr<Packet> packet);
void sendReliablePacketList(std::unique_ptr<PacketList> packet);
void sync(); // rate control method, fired by Socket for all connections on SYN interval
// return indicates if this packet was a duplicate
bool processReceivedSequenceNumber(SequenceNumber sequenceNumber, int packetSize, int payloadSize);
void processControl(std::unique_ptr<ControlPacket> controlPacket);
void queueReceivedMessagePacket(std::unique_ptr<Packet> packet);
ConnectionStats::Stats sampleStats() { return _stats.sample(); }
@ -117,6 +136,8 @@ private:
std::unique_ptr<CongestionControl> _congestionControl;
std::unique_ptr<SendQueue> _sendQueue;
std::map<MessageNumber, PendingReceivedMessage> _pendingReceivedMessages;
int _packetsSinceACK { 0 }; // The number of packets that have been received during the current ACK interval

View file

@ -26,6 +26,8 @@ namespace udt {
static const int UDP_RECEIVE_BUFFER_SIZE_BYTES = 1048576;
static const int DEFAULT_SYN_INTERVAL_USECS = 10 * 1000;
static const int SEQUENCE_NUMBER_BITS = sizeof(SequenceNumber) * 8;
static const int MESSAGE_LINE_NUMBER_BITS = 32;
static const int MESSAGE_NUMBER_BITS = 30;
static const uint32_t CONTROL_BIT_MASK = uint32_t(1) << (SEQUENCE_NUMBER_BITS - 1);
}

View file

@ -57,7 +57,6 @@ Packet::Packet(qint64 size, bool isReliable, bool isPartOfMessage) :
_isPartOfMessage(isPartOfMessage)
{
adjustPayloadStartAndCapacity(Packet::localHeaderSize(_isPartOfMessage));
// set the UDT header to default values
writeHeader();
}
@ -106,6 +105,12 @@ Packet& Packet::operator=(Packet&& other) {
return *this;
}
void Packet::writeMessageNumber(MessageNumber messageNumber) {
_isPartOfMessage = true;
_messageNumber = messageNumber;
writeHeader();
}
void Packet::writeSequenceNumber(SequenceNumber sequenceNumber) const {
_sequenceNumber = sequenceNumber;
writeHeader();
@ -115,14 +120,23 @@ static const uint32_t RELIABILITY_BIT_MASK = uint32_t(1) << (SEQUENCE_NUMBER_BIT
static const uint32_t MESSAGE_BIT_MASK = uint32_t(1) << (SEQUENCE_NUMBER_BITS - 3);
static const uint32_t BIT_FIELD_MASK = CONTROL_BIT_MASK | RELIABILITY_BIT_MASK | MESSAGE_BIT_MASK;
static const uint32_t PACKET_POSITION_MASK = uint32_t(0x03) << 30;
static const uint32_t MESSAGE_NUMBER_MASK = ~PACKET_POSITION_MASK;
void Packet::readHeader() const {
SequenceNumberAndBitField seqNumBitField = *reinterpret_cast<SequenceNumberAndBitField*>(_packet.get());
SequenceNumberAndBitField* seqNumBitField = reinterpret_cast<SequenceNumberAndBitField*>(_packet.get());
Q_ASSERT_X(!(seqNumBitField & CONTROL_BIT_MASK), "Packet::readHeader()", "This should be a data packet");
Q_ASSERT_X(!(*seqNumBitField & CONTROL_BIT_MASK), "Packet::readHeader()", "This should be a data packet");
_isReliable = (bool) (seqNumBitField & RELIABILITY_BIT_MASK); // Only keep reliability bit
_isPartOfMessage = (bool) (seqNumBitField & MESSAGE_BIT_MASK); // Only keep message bit
_sequenceNumber = SequenceNumber{ seqNumBitField & ~BIT_FIELD_MASK }; // Remove the bit field
_isReliable = (bool) (*seqNumBitField & RELIABILITY_BIT_MASK); // Only keep reliability bit
_isPartOfMessage = (bool) (*seqNumBitField & MESSAGE_BIT_MASK); // Only keep message bit
_sequenceNumber = SequenceNumber{ *seqNumBitField & ~BIT_FIELD_MASK }; // Remove the bit field
if (_isPartOfMessage) {
MessageNumberAndBitField* messageNumberAndBitField = seqNumBitField + 1;
_messageNumber = *messageNumberAndBitField & MESSAGE_NUMBER_MASK;
_packetPosition = static_cast<PacketPosition>(*messageNumberAndBitField >> 30);
}
}
void Packet::writeHeader() const {
@ -140,5 +154,12 @@ void Packet::writeHeader() const {
if (_isPartOfMessage) {
*seqNumBitField |= MESSAGE_BIT_MASK;
Q_ASSERT_X(!(_messageNumber & PACKET_POSITION_MASK),
"Packet::writeHeader()", "Message number is overflowing into bit field");
MessageNumberAndBitField* messageNumberAndBitField = seqNumBitField + 1;
*messageNumberAndBitField = _messageNumber;
*messageNumberAndBitField |= _packetPosition << 30;
}
}

View file

@ -31,6 +31,14 @@ public:
// NOTE: The MessageNumber is only actually 29 bits to leave room for a bit field
using MessageNumber = uint32_t;
using MessageNumberAndBitField = uint32_t;
// Use same size as MessageNumberAndBitField so we can use the enum with bitwise operations
enum PacketPosition : MessageNumberAndBitField {
ONLY = 0x0,
FIRST = 0x2,
MIDDLE = 0x3,
LAST = 0x1
};
static std::unique_ptr<Packet> create(qint64 size = -1, bool isReliable = false, bool isPartOfMessage = false);
static std::unique_ptr<Packet> fromReceivedPacket(std::unique_ptr<char[]> data, qint64 size, const HifiSockAddr& senderSockAddr);
@ -48,7 +56,13 @@ public:
bool isPartOfMessage() const { return _isPartOfMessage; }
bool isReliable() const { return _isReliable; }
SequenceNumber getSequenceNumber() const { return _sequenceNumber; }
MessageNumber getMessageNumber() const { return _messageNumber; }
void setPacketPosition(PacketPosition position) { _packetPosition = position; }
PacketPosition getPacketPosition() const { return _packetPosition; }
void writeMessageNumber(MessageNumber messageNumber);
void writeSequenceNumber(SequenceNumber sequenceNumber) const;
protected:
@ -70,6 +84,8 @@ private:
mutable bool _isReliable { false };
mutable bool _isPartOfMessage { false };
mutable SequenceNumber _sequenceNumber;
mutable PacketPosition _packetPosition { PacketPosition::ONLY };
mutable MessageNumber _messageNumber { 0 };
};
} // namespace udt

View file

@ -26,6 +26,7 @@ const QSet<PacketType> NON_SOURCED_PACKETS = QSet<PacketType>()
<< PacketType::DomainList << PacketType::DomainConnectionDenied
<< PacketType::DomainServerPathQuery << PacketType::DomainServerPathResponse
<< PacketType::DomainServerAddedNode
<< PacketType::DomainSettingsRequest << PacketType::DomainSettings
<< PacketType::ICEServerPeerInformation << PacketType::ICEServerQuery << PacketType::ICEServerHeartbeat
<< PacketType::ICEPing << PacketType::ICEPingReply
<< PacketType::AssignmentClientStatus << PacketType::StopNode;

View file

@ -71,7 +71,9 @@ enum class PacketType : uint8_t {
EntityAdd,
EntityErase,
EntityEdit,
DomainServerConnectionToken
DomainServerConnectionToken,
DomainSettingsRequest,
DomainSettings
};
const int NUM_BYTES_MD5_HASH = 16;

View file

@ -13,17 +13,24 @@
#include <QDebug>
#include "Packet.h"
using namespace udt;
PacketList::PacketList(PacketType packetType, QByteArray extendedHeader) :
PacketList::PacketList(PacketType packetType, QByteArray extendedHeader, bool isReliable, bool isOrdered) :
_packetType(packetType),
_isReliable(isReliable),
_isOrdered(isOrdered),
_extendedHeader(extendedHeader)
{
Q_ASSERT_X(!(!_isReliable && _isOrdered), "PacketList", "Unreliable ordered PacketLists are not currently supported");
QIODevice::open(WriteOnly);
}
PacketList::PacketList(PacketList&& other) :
_packetType(other._packetType),
_packets(std::move(other._packets))
{
}
void PacketList::startSegment() {
_segmentStartIndex = _currentPacket ? _currentPacket->pos() : _extendedHeader.size();
}
@ -32,10 +39,30 @@ void PacketList::endSegment() {
_segmentStartIndex = -1;
}
size_t PacketList::getDataSize() const {
size_t totalBytes = 0;
for (const auto& packet : _packets) {
totalBytes += packet->getDataSize();
}
if (_currentPacket) {
totalBytes += _currentPacket->getDataSize();
}
return totalBytes;
}
std::unique_ptr<PacketList> PacketList::fromReceivedPackets(std::list<std::unique_ptr<Packet>>&& packets) {
auto packetList = std::unique_ptr<PacketList>(new PacketList(PacketType::Unknown, QByteArray(), true, true));
packetList->_packets = std::move(packets);
return packetList;
}
std::unique_ptr<Packet> PacketList::createPacket() {
// use the static create method to create a new packet
// TODO: create a packet with correct reliability and messaging
return Packet::create();
// If this packet list is supposed to be ordered then we consider this to be part of a message
bool isPartOfMessage = _isOrdered;
return Packet::create(-1, _isReliable, isPartOfMessage);
}
std::unique_ptr<Packet> PacketList::createPacketWithExtendedHeader() {
@ -53,6 +80,23 @@ std::unique_ptr<Packet> PacketList::createPacketWithExtendedHeader() {
return packet;
}
QByteArray PacketList::getMessage() {
size_t sizeBytes = 0;
for (const auto& packet : _packets) {
sizeBytes += packet->size();
}
QByteArray data;
data.reserve(sizeBytes);
for (auto& packet : _packets) {
data.append(packet->getPayload(), packet->getPayloadSize());
}
return data;
}
qint64 PacketList::writeData(const char* data, qint64 maxSize) {
if (!_currentPacket) {
// we don't have a current packet, time to set one up

View file

@ -16,6 +16,7 @@
#include <QtCore/QIODevice>
#include "Packet.h"
#include "PacketHeaders.h"
class LimitedNodeList;
@ -27,26 +28,44 @@ class Packet;
class PacketList : public QIODevice {
Q_OBJECT
public:
PacketList(PacketType packetType, QByteArray extendedHeader = QByteArray());
PacketList(PacketType packetType, QByteArray extendedHeader = QByteArray(), bool isReliable = false, bool isOrdered = false);
PacketList(PacketList&& other);
static std::unique_ptr<PacketList> fromReceivedPackets(std::list<std::unique_ptr<Packet>>&& packets);
virtual bool isSequential() const { return true; }
bool isReliable() const { return _isReliable; }
bool isOrdered() const { return _isOrdered; }
void startSegment();
void endSegment();
PacketType getType() const { return _packetType; }
int getNumPackets() const { return _packets.size() + (_currentPacket ? 1 : 0); }
QByteArray getExtendedHeader() const { return _extendedHeader; }
size_t getDataSize() const;
void closeCurrentPacket(bool shouldSendEmpty = false);
QByteArray getMessage();
template<typename T> qint64 readPrimitive(T* data);
template<typename T> qint64 writePrimitive(const T& data);
std::list<std::unique_ptr<Packet>> _packets;
protected:
virtual qint64 writeData(const char* data, qint64 maxSize);
virtual qint64 readData(char* data, qint64 maxSize) { return 0; }
PacketType _packetType;
private:
friend class ::LimitedNodeList;
friend class Socket;
friend class SendQueue;
friend class NLPacketList;
PacketList(const PacketList& other) = delete;
PacketList& operator=(const PacketList& other) = delete;
@ -58,11 +77,11 @@ private:
virtual std::unique_ptr<Packet> createPacket();
std::unique_ptr<Packet> createPacketWithExtendedHeader();
PacketType _packetType;
Packet::MessageNumber _messageNumber;
bool _isReliable = false;
bool _isOrdered = false;
std::unique_ptr<Packet> _currentPacket;
std::list<std::unique_ptr<Packet>> _packets;
int _segmentStartIndex = -1;

View file

@ -20,6 +20,7 @@
#include "ControlPacket.h"
#include "Packet.h"
#include "PacketList.h"
#include "Socket.h"
using namespace udt;
@ -63,6 +64,47 @@ void SendQueue::queuePacket(std::unique_ptr<Packet> packet) {
}
}
void SendQueue::queuePacketList(std::unique_ptr<PacketList> packetList) {
Q_ASSERT(packetList->_packets.size() > 0);
{
auto messageNumber = getNextMessageNumber();
if (packetList->_packets.size() == 1) {
auto& packet = packetList->_packets.front();
packet->setPacketPosition(Packet::PacketPosition::ONLY);
packet->writeMessageNumber(messageNumber);
} else {
bool haveMarkedFirstPacket = false;
auto end = packetList->_packets.end();
auto lastElement = --packetList->_packets.end();
for (auto it = packetList->_packets.begin(); it != end; ++it) {
auto& packet = *it;
if (!haveMarkedFirstPacket) {
packet->setPacketPosition(Packet::PacketPosition::FIRST);
haveMarkedFirstPacket = true;
} else if (it == lastElement) {
packet->setPacketPosition(Packet::PacketPosition::LAST);
} else {
packet->setPacketPosition(Packet::PacketPosition::MIDDLE);
}
packet->writeMessageNumber(messageNumber);
}
}
QWriteLocker locker(&_packetsLock);
_packets.splice(_packets.end(), packetList->_packets);
}
if (!this->thread()->isRunning()) {
this->thread()->start();
}
}
void SendQueue::stop() {
_isRunning = false;
}
@ -121,6 +163,12 @@ SequenceNumber SendQueue::getNextSequenceNumber() {
return _currentSequenceNumber;
}
uint32_t SendQueue::getNextMessageNumber() {
static const MessageNumber MAX_MESSAGE_NUMBER = MessageNumber(1) << MESSAGE_NUMBER_BITS;
_currentMessageNumber = (_currentMessageNumber + 1) % MAX_MESSAGE_NUMBER;
return _currentMessageNumber;
}
void SendQueue::sendNewPacketAndAddToSentList(std::unique_ptr<Packet> newPacket, SequenceNumber sequenceNumber) {
// write the sequence number and send the packet
newPacket->writeSequenceNumber(sequenceNumber);

View file

@ -31,7 +31,10 @@ namespace udt {
class BasePacket;
class ControlPacket;
class Packet;
class PacketList;
class Socket;
using MessageNumber = uint32_t;
class SendQueue : public QObject {
Q_OBJECT
@ -40,6 +43,7 @@ public:
static std::unique_ptr<SendQueue> create(Socket* socket, HifiSockAddr destination);
void queuePacket(std::unique_ptr<Packet> packet);
void queuePacketList(std::unique_ptr<PacketList> packetList);
int getQueueSize() const { QReadLocker locker(&_packetsLock); return _packets.size(); }
SequenceNumber getCurrentSequenceNumber() const { return SequenceNumber(_atomicCurrentSequenceNumber); }
@ -73,6 +77,7 @@ private:
// Increments current sequence number and return it
SequenceNumber getNextSequenceNumber();
MessageNumber getNextMessageNumber();
mutable QReadWriteLock _packetsLock; // Protects the packets to be sent list.
std::list<std::unique_ptr<Packet>> _packets; // List of packets to be sent
@ -82,6 +87,7 @@ private:
std::atomic<uint32_t> _lastACKSequenceNumber; // Last ACKed sequence number
MessageNumber _currentMessageNumber { 0 };
SequenceNumber _currentSequenceNumber; // Last sequence number sent out
std::atomic<uint32_t> _atomicCurrentSequenceNumber;// Atomic for last sequence number sent out

View file

@ -20,6 +20,7 @@
#include "ControlPacket.h"
#include "Packet.h"
#include "../NLPacket.h"
#include "PacketList.h"
using namespace udt;
@ -104,6 +105,23 @@ qint64 Socket::writePacket(std::unique_ptr<Packet> packet, const HifiSockAddr& s
return writePacket(*packet, sockAddr);
}
qint64 Socket::writePacketList(std::unique_ptr<PacketList> packetList, const HifiSockAddr& sockAddr) {
if (packetList->isReliable()) {
// Reliable and Ordered
// Reliable and Unordered
findOrCreateConnection(sockAddr).sendReliablePacketList(move(packetList));
return 0;
}
// Unerliable and Unordered
qint64 totalBytesSent = 0;
while (!packetList->_packets.empty()) {
totalBytesSent += writePacket(packetList->takeFront<Packet>(), sockAddr);
}
return totalBytesSent;
}
qint64 Socket::writeDatagram(const char* data, qint64 size, const HifiSockAddr& sockAddr) {
return writeDatagram(QByteArray::fromRawData(data, size), sockAddr);
}
@ -126,7 +144,7 @@ qint64 Socket::writeDatagram(const QByteArray& datagram, const HifiSockAddr& soc
Connection& Socket::findOrCreateConnection(const HifiSockAddr& sockAddr) {
auto it = _connectionsHash.find(sockAddr);
if (it == _connectionsHash.end()) {
auto connection = std::unique_ptr<Connection>(new Connection(this, sockAddr, _ccFactory->create()));
it = _connectionsHash.insert(it, std::make_pair(sockAddr, std::move(connection)));
@ -135,6 +153,12 @@ Connection& Socket::findOrCreateConnection(const HifiSockAddr& sockAddr) {
return *it->second;
}
void Socket::messageReceived(std::unique_ptr<PacketList> packetList) {
if (_packetListHandler) {
_packetListHandler(std::move(packetList));
}
}
void Socket::readPendingDatagrams() {
int packetSizeWithHeader = -1;
while ((packetSizeWithHeader = _udpSocket.pendingDatagramSize()) != -1) {
@ -177,16 +201,18 @@ void Socket::readPendingDatagrams() {
// call our verification operator to see if this packet is verified
if (!_packetFilterOperator || _packetFilterOperator(*packet)) {
if (packet->isReliable()) {
// if this was a reliable packet then signal the matching connection with the sequence number
auto& connection = findOrCreateConnection(senderSockAddr);
connection.processReceivedSequenceNumber(packet->getSequenceNumber(),
packet->getDataSize(),
packet->getPayloadSize());
packet->getDataSize(),
packet->getPayloadSize());
}
if (_packetHandler) {
if (packet->isPartOfMessage()) {
auto& connection = findOrCreateConnection(senderSockAddr);
connection.queueReceivedMessagePacket(std::move(packet));
} else if (_packetHandler) {
// call the verified packet callback to let it handle this packet
_packetHandler(std::move(packet));
}

View file

@ -30,12 +30,14 @@ namespace udt {
class BasePacket;
class ControlSender;
class Packet;
class PacketList;
class SequenceNumber;
using PacketFilterOperator = std::function<bool(const Packet&)>;
using BasePacketHandler = std::function<void(std::unique_ptr<BasePacket>)>;
using PacketHandler = std::function<void(std::unique_ptr<Packet>)>;
using PacketListHandler = std::function<void(std::unique_ptr<PacketList>)>;
class Socket : public QObject {
Q_OBJECT
@ -48,6 +50,7 @@ public:
qint64 writeBasePacket(const BasePacket& packet, const HifiSockAddr& sockAddr);
qint64 writePacket(const Packet& packet, const HifiSockAddr& sockAddr);
qint64 writePacket(std::unique_ptr<Packet> packet, const HifiSockAddr& sockAddr);
qint64 writePacketList(std::unique_ptr<PacketList> packetList, const HifiSockAddr& sockAddr);
qint64 writeDatagram(const char* data, qint64 size, const HifiSockAddr& sockAddr);
qint64 writeDatagram(const QByteArray& datagram, const HifiSockAddr& sockAddr);
@ -56,6 +59,7 @@ public:
void setPacketFilterOperator(PacketFilterOperator filterOperator) { _packetFilterOperator = filterOperator; }
void setPacketHandler(PacketHandler handler) { _packetHandler = handler; }
void setPacketListHandler(PacketListHandler handler) { _packetListHandler = handler; }
void addUnfilteredHandler(const HifiSockAddr& senderSockAddr, BasePacketHandler handler)
{ _unfilteredHandlers[senderSockAddr] = handler; }
@ -63,6 +67,8 @@ public:
void setCongestionControlFactory(std::unique_ptr<CongestionControlVirtualFactory> ccFactory);
void connectToSendSignal(const HifiSockAddr& destinationAddr, QObject* receiver, const char* slot);
void messageReceived(std::unique_ptr<PacketList> packetList);
ConnectionStats::Stats sampleStatsForConnection(const HifiSockAddr& destination);
std::vector<HifiSockAddr> getConnectionSockAddrs();
@ -78,6 +84,7 @@ private:
QUdpSocket _udpSocket { this };
PacketFilterOperator _packetFilterOperator;
PacketHandler _packetHandler;
PacketListHandler _packetListHandler;
std::unordered_map<HifiSockAddr, BasePacketHandler> _unfilteredHandlers;
std::unordered_map<HifiSockAddr, SequenceNumber> _unreliableSequenceNumbers;

View file

@ -15,6 +15,7 @@
#include <udt/Constants.h>
#include <udt/Packet.h>
#include <udt/PacketList.h>
#include <LogHandler.h>
@ -42,6 +43,9 @@ const QCommandLineOption MAX_SEND_PACKETS {
const QCommandLineOption UNRELIABLE_PACKETS {
"unreliable", "send unreliable packets (default is reliable)"
};
const QCommandLineOption ORDERED_PACKETS {
"ordered", "send ordered packets (default is unordered)"
};
const QStringList CLIENT_STATS_TABLE_HEADERS {
"Send Rate (P/s)", "Bandwidth (P/s)", "RTT(ms)", "CW (P)", "Send Period (us)",
@ -129,6 +133,10 @@ UDTTest::UDTTest(int& argc, char** argv) :
if (_argumentParser.isSet(UNRELIABLE_PACKETS)) {
_sendReliable = false;
}
if (_argumentParser.isSet(ORDERED_PACKETS)) {
_sendOrdered = true;
}
if (!_target.isNull()) {
sendInitialPackets();
@ -151,7 +159,7 @@ void UDTTest::parseArguments() {
_argumentParser.addOptions({
PORT_OPTION, TARGET_OPTION, PACKET_SIZE, MIN_PACKET_SIZE, MAX_PACKET_SIZE,
MAX_SEND_BYTES, MAX_SEND_PACKETS, UNRELIABLE_PACKETS
MAX_SEND_BYTES, MAX_SEND_PACKETS, UNRELIABLE_PACKETS, ORDERED_PACKETS
});
if (!_argumentParser.parse(arguments())) {
@ -206,20 +214,40 @@ void UDTTest::sendPacket() {
int randomPacketSize = rand() % _maxPacketSize + _minPacketSize;
packetPayloadSize = randomPacketSize - udt::Packet::localHeaderSize(false);
}
auto newPacket = udt::Packet::create(packetPayloadSize, _sendReliable);
newPacket->setPayloadSize(packetPayloadSize);
_totalQueuedBytes += newPacket->getDataSize();
// queue or send this packet by calling write packet on the socket for our target
if (_sendReliable) {
_socket.writePacket(std::move(newPacket), _target);
if (_sendOrdered) {
static int call = 0;
call = (call + 1) % 4;
if (call == 0) {
auto packetList = std::unique_ptr<udt::PacketList>(new udt::PacketList(PacketType::BulkAvatarData, QByteArray(), true, true));
for (int i = 0; i < 4; i++) {
packetList->writePrimitive(0x1);
packetList->writePrimitive(0x2);
packetList->writePrimitive(0x3);
packetList->writePrimitive(0x4);
packetList->closeCurrentPacket(false);
}
_totalQueuedBytes += packetList->getDataSize();
_socket.writePacketList(std::move(packetList), _target);
}
_totalQueuedPackets += 4;
} else {
_socket.writePacket(*newPacket, _target);
auto newPacket = udt::Packet::create(packetPayloadSize, _sendReliable);
newPacket->setPayloadSize(packetPayloadSize);
_totalQueuedBytes += newPacket->getDataSize();
// queue or send this packet by calling write packet on the socket for our target
// if (
if (_sendReliable) {
_socket.writePacket(std::move(newPacket), _target);
} else {
_socket.writePacket(*newPacket, _target);
}
++_totalQueuedPackets;
}
++_totalQueuedPackets;
}
void UDTTest::sampleStats() {

View file

@ -45,7 +45,8 @@ private:
int _maxSendBytes { -1 }; // the number of bytes to send to the target before stopping
int _maxSendPackets { -1 }; // the number of packets to send to the target before stopping
bool _sendReliable { true }; // wether packets are sent reliably or unreliably
bool _sendReliable { true }; // whether packets are sent reliably or unreliably
bool _sendOrdered { false }; // whether to send ordered packets
int _totalQueuedPackets { 0 }; // keeps track of the number of packets we have already queued
int _totalQueuedBytes { 0 }; // keeps track of the number of bytes we have already queued