Merge pull request #12 from birarda/atp

groundwork for UDT reliable sending
This commit is contained in:
Ryan Huffman 2015-08-18 20:25:15 -07:00
commit 4bb1c46ad0
62 changed files with 3175 additions and 359 deletions

View file

@ -95,6 +95,7 @@ AssignmentClient::AssignmentClient(Assignment::Type requestAssignmentType, QStri
}
_assignmentServerSocket = HifiSockAddr(_assignmentServerHostname, assignmentServerPort, true);
_assignmentServerSocket.setObjectName("AssigmentServer");
nodeList->setAssignmentServerSocket(_assignmentServerSocket);
qDebug() << "Assignment server socket is" << _assignmentServerSocket;
@ -119,6 +120,7 @@ AssignmentClient::AssignmentClient(Assignment::Type requestAssignmentType, QStri
// did we get an assignment-client monitor port?
if (assignmentMonitorPort > 0) {
_assignmentClientMonitorSocket = HifiSockAddr(DEFAULT_ASSIGNMENT_CLIENT_MONITOR_HOSTNAME, assignmentMonitorPort);
_assignmentClientMonitorSocket.setObjectName("AssignmentClientMonitor");
qDebug() << "Assignment-client monitor socket is" << _assignmentClientMonitorSocket;

View file

@ -81,7 +81,7 @@ private:
AvatarData _avatar;
uint16_t _lastReceivedSequenceNumber { 0 };
std::unordered_map<QUuid, uint16_t, UUIDHasher> _lastBroadcastSequenceNumbers;
std::unordered_map<QUuid, uint16_t> _lastBroadcastSequenceNumbers;
bool _hasReceivedFirstPackets = false;
quint64 _billboardChangeTimestamp = 0;

View file

@ -128,7 +128,7 @@ void OctreeInboundPacketProcessor::processPacket(QSharedPointer<NLPacket> packet
}
if (debugProcessPacket) {
qDebug() << " numBytesPacketHeader=" << packet->totalHeadersSize();
qDebug() << " numBytesPacketHeader=" << NLPacket::totalHeaderSize(packetType);
qDebug() << " sizeof(sequence)=" << sizeof(sequence);
qDebug() << " sizeof(sentAt)=" << sizeof(sentAt);
}

View file

@ -145,7 +145,7 @@ void IceServer::sendPeerInformationPacket(const NetworkPeer& peer, const HifiSoc
peerPacket->write(peer.toByteArray());
// write the current packet
_serverSocket.writeUnreliablePacket(*peerPacket, *destinationSockAddr);
_serverSocket.writePacket(*peerPacket, *destinationSockAddr);
}
void IceServer::clearInactivePeers() {

View file

@ -89,10 +89,11 @@ BandwidthDialog::BandwidthDialog(QWidget* parent) :
_allChannelDisplays[4] = _otherChannelDisplay =
new BandwidthChannelDisplay({NodeType::Unassigned}, form, "Other", "Kbps", 1.0, COLOR2);
_allChannelDisplays[5] = _totalChannelDisplay =
new BandwidthChannelDisplay({NodeType::DomainServer, NodeType::EntityServer,
NodeType::EnvironmentServer, NodeType::AudioMixer, NodeType::Agent,
NodeType::AvatarMixer, NodeType::Unassigned},
form, "Total", "Kbps", 1.0, COLOR2);
new BandwidthChannelDisplay({
NodeType::DomainServer, NodeType::EntityServer,
NodeType::AudioMixer, NodeType::Agent,
NodeType::AvatarMixer, NodeType::Unassigned
}, form, "Total", "Kbps", 1.0, COLOR2);
connect(averageUpdateTimer, SIGNAL(timeout()), this, SLOT(updateTimerTimeout()));
averageUpdateTimer->start(1000);

View file

@ -11,8 +11,10 @@
#include <QtCore/QObject>
#include <QDebug>
#include <BufferParser.h>
#include <udt/PacketHeaders.h>
#include <UUID.h>
#include "RegisteredMetaTypes.h"
#include "EntityItemID.h"

View file

@ -18,11 +18,11 @@
#include <GLMHelpers.h>
#include <SettingHandle.h>
#include <UUID.h>
#include "AddressManager.h"
#include "NodeList.h"
#include "NetworkLogging.h"
#include "AddressManager.h"
#include "UUID.h"
const QString ADDRESS_MANAGER_SETTINGS_GROUP = "AddressManager";
const QString SETTINGS_CURRENT_ADDRESS_KEY = "address";

View file

@ -150,11 +150,9 @@ QDataStream& operator<<(QDataStream &out, const Assignment& assignment) {
QDataStream& operator>>(QDataStream &in, Assignment& assignment) {
quint8 packedType;
in >> packedType;
in >> packedType >> assignment._uuid >> assignment._pool >> assignment._payload;
assignment._type = (Assignment::Type) packedType;
in >> assignment._uuid >> assignment._pool >> assignment._payload;
if (assignment._command == Assignment::RequestCommand) {
in >> assignment._walletUUID;
}

View file

@ -38,6 +38,8 @@ DomainHandler::DomainHandler(QObject* parent) :
_settingsObject(),
_failedSettingsRequests(0)
{
_sockAddr.setObjectName("DomainServer");
// if we get a socket that make sure our NetworkPeer ping timer stops
connect(this, &DomainHandler::completedSocketDiscovery, &_icePeer, &NetworkPeer::stopPingTimer);
}
@ -147,6 +149,7 @@ void DomainHandler::setIceServerHostnameAndID(const QString& iceServerHostname,
HifiSockAddr* replaceableSockAddr = &_iceServerSockAddr;
replaceableSockAddr->~HifiSockAddr();
replaceableSockAddr = new (replaceableSockAddr) HifiSockAddr(iceServerHostname, ICE_SERVER_DEFAULT_PORT);
_iceServerSockAddr.setObjectName("IceServer");
auto nodeList = DependencyManager::get<NodeList>();

View file

@ -33,16 +33,16 @@ HifiSockAddr::HifiSockAddr(const QHostAddress& address, quint16 port) :
}
HifiSockAddr::HifiSockAddr(const HifiSockAddr& otherSockAddr) :
QObject(),
_address(otherSockAddr._address),
_port(otherSockAddr._port)
{
setObjectName(otherSockAddr.objectName());
}
HifiSockAddr& HifiSockAddr::operator=(const HifiSockAddr& rhsSockAddr) {
HifiSockAddr temp(rhsSockAddr);
swap(temp);
setObjectName(rhsSockAddr.objectName());
_address = rhsSockAddr._address;
_port = rhsSockAddr._port;
return *this;
}
@ -76,9 +76,14 @@ HifiSockAddr::HifiSockAddr(const sockaddr* sockaddr) {
void HifiSockAddr::swap(HifiSockAddr& otherSockAddr) {
using std::swap;
swap(_address, otherSockAddr._address);
swap(_port, otherSockAddr._port);
// Swap objects name
auto temp = otherSockAddr.objectName();
otherSockAddr.setObjectName(objectName());
setObjectName(temp);
}
bool HifiSockAddr::operator==(const HifiSockAddr& rhsSockAddr) const {

View file

@ -70,10 +70,21 @@ uint qHash(const HifiSockAddr& key, uint seed);
template <>
struct std::hash<HifiSockAddr> {
// NOTE: this hashing specifically ignores IPv6 addresses - if we begin to support those we will need
// to conditionally hash the bytes that represent an IPv6 address
std::size_t operator()(const HifiSockAddr& sockAddr) const {
// use XOR of implemented std::hash templates for new hash
return std::hash<std::string>()(sockAddr.getAddress().toString().toStdString())
^ std::hash<uint16_t>()((uint16_t) sockAddr.getPort());
// depending on the type of address we're looking at
if (sockAddr.getAddress().protocol() == QAbstractSocket::IPv4Protocol) {
return std::hash<uint32_t>()((uint32_t) sockAddr.getAddress().toIPv4Address())
^ std::hash<uint16_t>()((uint16_t) sockAddr.getPort());
} else {
// NOTE: if we start to use IPv6 addresses, it's possible their hashing
// can be faster by XORing the hash for each 64 bits in the address
return std::hash<std::string>()(sockAddr.getAddress().toString().toStdString())
^ std::hash<uint16_t>()((uint16_t) sockAddr.getPort());
}
}
};

View file

@ -33,8 +33,6 @@
#include "UUID.h"
#include "NetworkLogging.h"
#include "udt/udt.h"
const char SOLO_NODE_TYPES[2] = {
NodeType::AvatarMixer,
NodeType::AudioMixer
@ -81,9 +79,6 @@ LimitedNodeList::LimitedNodeList(unsigned short socketListenPort, unsigned short
qCDebug(networking) << "NodeList DTLS socket is listening on" << _dtlsSocket->localPort();
}
const int LARGER_BUFFER_SIZE = 1048576;
_nodeSocket.setBufferSizes(LARGER_BUFFER_SIZE);
// check for local socket updates every so often
const int LOCAL_SOCKET_UPDATE_INTERVAL_MSECS = 5 * 1000;
QTimer* localSocketUpdate = new QTimer(this);
@ -244,76 +239,82 @@ bool LimitedNodeList::packetSourceAndHashMatch(const udt::Packet& packet) {
return false;
}
qint64 LimitedNodeList::writePacket(const NLPacket& packet, const Node& destinationNode) {
if (!destinationNode.getActiveSocket()) {
return 0;
}
emit dataSent(destinationNode.getType(), packet.getDataSize());
return writePacket(packet, *destinationNode.getActiveSocket(), destinationNode.getConnectionSecret());
void LimitedNodeList::collectPacketStats(const NLPacket& packet) {
// stat collection for packets
++_numCollectedPackets;
_numCollectedBytes += packet.getDataSize();
}
qint64 LimitedNodeList::writePacket(const NLPacket& packet, const HifiSockAddr& destinationSockAddr,
const QUuid& connectionSecret) {
void LimitedNodeList::fillPacketHeader(const NLPacket& packet, const QUuid& connectionSecret) {
if (!NON_SOURCED_PACKETS.contains(packet.getType())) {
const_cast<NLPacket&>(packet).writeSourceID(getSessionUUID());
packet.writeSourceID(getSessionUUID());
}
if (!connectionSecret.isNull()
&& !NON_SOURCED_PACKETS.contains(packet.getType())
&& !NON_VERIFIED_PACKETS.contains(packet.getType())) {
const_cast<NLPacket&>(packet).writeVerificationHashGivenSecret(connectionSecret);
packet.writeVerificationHashGivenSecret(connectionSecret);
}
emit dataSent(NodeType::Unassigned, packet.getDataSize());
return writePacketAndCollectStats(packet, destinationSockAddr);
}
qint64 LimitedNodeList::writePacketAndCollectStats(const NLPacket& packet, const HifiSockAddr& destinationSockAddr) {
// XXX can BandwidthRecorder be used for this?
// stat collection for packets
++_numCollectedPackets;
_numCollectedBytes += packet.getDataSize();
qint64 bytesWritten = _nodeSocket.writeUnreliablePacket(packet, destinationSockAddr);
return bytesWritten;
}
qint64 LimitedNodeList::sendUnreliablePacket(const NLPacket& packet, const Node& destinationNode) {
return writePacket(packet, destinationNode);
if (!destinationNode.getActiveSocket()) {
return 0;
}
emit dataSent(destinationNode.getType(), packet.getDataSize());
return sendUnreliablePacket(packet, *destinationNode.getActiveSocket(), destinationNode.getConnectionSecret());
}
qint64 LimitedNodeList::sendUnreliablePacket(const NLPacket& packet, const HifiSockAddr& sockAddr,
const QUuid& connectionSecret) {
return writePacket(packet, sockAddr, connectionSecret);
Q_ASSERT_X(!packet.isReliable(), "LimitedNodeList::sendUnreliablePacket",
"Trying to send a reliable packet unreliably.");
collectPacketStats(packet);
fillPacketHeader(packet, connectionSecret);
return _nodeSocket.writePacket(packet, sockAddr);
}
qint64 LimitedNodeList::sendPacket(std::unique_ptr<NLPacket> packet, const Node& destinationNode) {
// Keep unique_ptr alive during write
auto result = writePacket(*packet, destinationNode);
return result;
if (!destinationNode.getActiveSocket()) {
return 0;
}
emit dataSent(destinationNode.getType(), packet->getDataSize());
return sendPacket(std::move(packet), *destinationNode.getActiveSocket(), destinationNode.getConnectionSecret());
}
qint64 LimitedNodeList::sendPacket(std::unique_ptr<NLPacket> packet, const HifiSockAddr& sockAddr,
const QUuid& connectionSecret) {
// Keep unique_ptr alive during write
auto result = writePacket(*packet, sockAddr, connectionSecret);
return result;
if (packet->isReliable()) {
collectPacketStats(*packet);
fillPacketHeader(*packet, connectionSecret);
auto size = packet->getDataSize();
_nodeSocket.writePacket(std::move(packet), sockAddr);
return size;
} else {
return sendUnreliablePacket(*packet, sockAddr, connectionSecret);
}
}
qint64 LimitedNodeList::sendPacketList(NLPacketList& packetList, const Node& destinationNode) {
auto activeSocket = destinationNode.getActiveSocket();
if (!activeSocket) {
return 0;
}
qint64 bytesSent = 0;
auto connectionSecret = destinationNode.getConnectionSecret();
// close the last packet in the list
packetList.closeCurrentPacket();
while (!packetList._packets.empty()) {
bytesSent += sendPacket(packetList.takeFront<NLPacket>(), destinationNode);
bytesSent += sendPacket(packetList.takeFront<NLPacket>(), *activeSocket, connectionSecret);
}
emit dataSent(destinationNode.getType(), bytesSent);
return bytesSent;
}
@ -510,7 +511,7 @@ unsigned int LimitedNodeList::broadcastToNodes(std::unique_ptr<NLPacket> packet,
eachNode([&](const SharedNodePointer& node){
if (node && destinationNodeTypes.contains(node->getType())) {
writePacket(*packet, *node);
sendUnreliablePacket(*packet, *node);
++n;
}
});

View file

@ -247,11 +247,13 @@ protected:
LimitedNodeList(LimitedNodeList const&); // Don't implement, needed to avoid copies of singleton
void operator=(LimitedNodeList const&); // Don't implement, needed to avoid copies of singleton
qint64 writePacket(const NLPacket& packet, const Node& destinationNode);
qint64 sendPacket(std::unique_ptr<NLPacket> packet, const Node& destinationNode,
const HifiSockAddr& overridenSockAddr);
qint64 writePacket(const NLPacket& packet, const HifiSockAddr& destinationSockAddr,
const QUuid& connectionSecret = QUuid());
qint64 writePacketAndCollectStats(const NLPacket& packet, const HifiSockAddr& destinationSockAddr);
void collectPacketStats(const NLPacket& packet);
void fillPacketHeader(const NLPacket& packet, const QUuid& connectionSecret);
bool isPacketVerified(const udt::Packet& packet);
bool packetVersionMatch(const udt::Packet& packet);
bool packetSourceAndHashMatch(const udt::Packet& packet);
@ -264,8 +266,6 @@ protected:
void sendPacketToIceServer(PacketType packetType, const HifiSockAddr& iceServerSockAddr, const QUuid& clientID,
const QUuid& peerRequestID = QUuid());
qint64 sendPacket(std::unique_ptr<NLPacket> packet, const Node& destinationNode,
const HifiSockAddr& overridenSockAddr);
QUuid _sessionUUID;

View file

@ -11,39 +11,21 @@
#include "NLPacket.h"
qint64 NLPacket::maxPayloadSize(PacketType type) {
return Packet::maxPayloadSize(false) - localHeaderSize(type);
}
qint64 NLPacket::localHeaderSize(PacketType type) {
qint64 optionalSize = ((NON_SOURCED_PACKETS.contains(type)) ? 0 : NUM_BYTES_RFC4122_UUID) +
((NON_SOURCED_PACKETS.contains(type) || NON_VERIFIED_PACKETS.contains(type)) ? 0 : NUM_BYTES_MD5_HASH);
int NLPacket::localHeaderSize(PacketType type) {
bool nonSourced = NON_SOURCED_PACKETS.contains(type);
bool nonVerified = NON_VERIFIED_PACKETS.contains(type);
qint64 optionalSize = (nonSourced ? 0 : NUM_BYTES_RFC4122_UUID) + ((nonSourced || nonVerified) ? 0 : NUM_BYTES_MD5_HASH);
return sizeof(PacketType) + sizeof(PacketVersion) + optionalSize;
}
qint64 NLPacket::maxPayloadSize() const {
return Packet::maxPayloadSize() - localHeaderSize();
int NLPacket::totalHeaderSize(PacketType type, bool isPartOfMessage) {
return Packet::totalHeaderSize(isPartOfMessage) + NLPacket::localHeaderSize(type);
}
qint64 NLPacket::totalHeadersSize() const {
return localHeaderSize() + Packet::localHeaderSize();
}
qint64 NLPacket::localHeaderSize() const {
return localHeaderSize(_type);
int NLPacket::maxPayloadSize(PacketType type, bool isPartOfMessage) {
return Packet::maxPayloadSize(isPartOfMessage) - NLPacket::localHeaderSize(type);
}
std::unique_ptr<NLPacket> NLPacket::create(PacketType type, qint64 size, bool isReliable, bool isPartOfMessage) {
std::unique_ptr<NLPacket> packet;
if (size == -1) {
packet = std::unique_ptr<NLPacket>(new NLPacket(type, isReliable, isPartOfMessage));
} else {
// Fail with invalid size
Q_ASSERT(size >= 0);
packet = std::unique_ptr<NLPacket>(new NLPacket(type, size, isReliable, isPartOfMessage));
}
auto packet = std::unique_ptr<NLPacket>(new NLPacket(type, size, isReliable, isPartOfMessage));
packet->open(QIODevice::ReadWrite);
@ -79,24 +61,12 @@ std::unique_ptr<NLPacket> NLPacket::createCopy(const NLPacket& other) {
return std::unique_ptr<NLPacket>(new NLPacket(other));
}
NLPacket::NLPacket(PacketType type, bool isReliable, bool isPartOfMessage) :
Packet(-1, isReliable, isPartOfMessage),
_type(type),
_version(versionForPacketType(type))
{
adjustPayloadStartAndCapacity();
writeTypeAndVersion();
}
NLPacket::NLPacket(PacketType type, qint64 size, bool isReliable, bool isPartOfMessage) :
Packet(localHeaderSize(type) + size, isReliable, isPartOfMessage),
Packet((size == -1) ? -1 : NLPacket::localHeaderSize(type) + size, isReliable, isPartOfMessage),
_type(type),
_version(versionForPacketType(type))
{
Q_ASSERT(size >= 0);
adjustPayloadStartAndCapacity();
adjustPayloadStartAndCapacity(NLPacket::localHeaderSize(_type));
writeTypeAndVersion();
}
@ -108,7 +78,7 @@ NLPacket::NLPacket(std::unique_ptr<Packet> packet) :
readVersion();
readSourceID();
adjustPayloadStartAndCapacity(_payloadSize > 0);
adjustPayloadStartAndCapacity(NLPacket::localHeaderSize(_type), _payloadSize > 0);
}
NLPacket::NLPacket(const NLPacket& other) : Packet(other) {
@ -123,11 +93,11 @@ NLPacket::NLPacket(std::unique_ptr<char[]> data, qint64 size, const HifiSockAddr
// sanity check before we decrease the payloadSize with the payloadCapacity
Q_ASSERT(_payloadSize == _payloadCapacity);
adjustPayloadStartAndCapacity(_payloadSize > 0);
readType();
readVersion();
readSourceID();
adjustPayloadStartAndCapacity(NLPacket::localHeaderSize(_type), _payloadSize > 0);
}
NLPacket::NLPacket(NLPacket&& other) :
@ -160,29 +130,29 @@ NLPacket& NLPacket::operator=(NLPacket&& other) {
}
PacketType NLPacket::typeInHeader(const udt::Packet& packet) {
auto headerOffset = packet.Packet::totalHeadersSize();
auto headerOffset = Packet::totalHeaderSize(packet.isPartOfMessage());
return *reinterpret_cast<const PacketType*>(packet.getData() + headerOffset);
}
PacketVersion NLPacket::versionInHeader(const udt::Packet& packet) {
auto headerOffset = packet.Packet::totalHeadersSize();
auto headerOffset = Packet::totalHeaderSize(packet.isPartOfMessage());
return *reinterpret_cast<const PacketVersion*>(packet.getData() + headerOffset + sizeof(PacketType));
}
QUuid NLPacket::sourceIDInHeader(const udt::Packet& packet) {
int offset = packet.Packet::totalHeadersSize() + sizeof(PacketType) + sizeof(PacketVersion);
int offset = Packet::totalHeaderSize(packet.isPartOfMessage()) + sizeof(PacketType) + sizeof(PacketVersion);
return QUuid::fromRfc4122(QByteArray::fromRawData(packet.getData() + offset, NUM_BYTES_RFC4122_UUID));
}
QByteArray NLPacket::verificationHashInHeader(const udt::Packet& packet) {
int offset = packet.Packet::totalHeadersSize() + sizeof(PacketType) + sizeof(PacketVersion) + NUM_BYTES_RFC4122_UUID;
int offset = Packet::totalHeaderSize(packet.isPartOfMessage()) + sizeof(PacketType) + sizeof(PacketVersion) + NUM_BYTES_RFC4122_UUID;
return QByteArray(packet.getData() + offset, NUM_BYTES_MD5_HASH);
}
QByteArray NLPacket::hashForPacketAndSecret(const udt::Packet& packet, const QUuid& connectionSecret) {
QCryptographicHash hash(QCryptographicHash::Md5);
int offset = packet.Packet::totalHeadersSize() + sizeof(PacketType) + sizeof(PacketVersion)
int offset = Packet::totalHeaderSize(packet.isPartOfMessage()) + sizeof(PacketType) + sizeof(PacketVersion)
+ NUM_BYTES_RFC4122_UUID + NUM_BYTES_MD5_HASH;
// add the packet payload and the connection UUID
@ -194,7 +164,7 @@ QByteArray NLPacket::hashForPacketAndSecret(const udt::Packet& packet, const QUu
}
void NLPacket::writeTypeAndVersion() {
auto headerOffset = Packet::totalHeadersSize();
auto headerOffset = Packet::totalHeaderSize(isPartOfMessage());
// Pack the packet type
memcpy(_packet.get() + headerOffset, &_type, sizeof(PacketType));
@ -204,16 +174,14 @@ void NLPacket::writeTypeAndVersion() {
}
void NLPacket::setType(PacketType type) {
auto currentHeaderSize = totalHeadersSize();
// Setting new packet type with a different header size not currently supported
Q_ASSERT(NLPacket::totalHeaderSize(_type, isPartOfMessage()) ==
NLPacket::totalHeaderSize(type, isPartOfMessage()));
_type = type;
_version = versionForPacketType(_type);
writeTypeAndVersion();
// Setting new packet type with a different header size not currently supported
Q_ASSERT(currentHeaderSize == totalHeadersSize());
Q_UNUSED(currentHeaderSize);
}
void NLPacket::readType() {
@ -230,19 +198,20 @@ void NLPacket::readSourceID() {
}
}
void NLPacket::writeSourceID(const QUuid& sourceID) {
void NLPacket::writeSourceID(const QUuid& sourceID) const {
Q_ASSERT(!NON_SOURCED_PACKETS.contains(_type));
auto offset = Packet::totalHeadersSize() + sizeof(PacketType) + sizeof(PacketVersion);
auto offset = Packet::totalHeaderSize(isPartOfMessage()) + sizeof(PacketType) + sizeof(PacketVersion);
memcpy(_packet.get() + offset, sourceID.toRfc4122().constData(), NUM_BYTES_RFC4122_UUID);
_sourceID = sourceID;
}
void NLPacket::writeVerificationHashGivenSecret(const QUuid& connectionSecret) {
void NLPacket::writeVerificationHashGivenSecret(const QUuid& connectionSecret) const {
Q_ASSERT(!NON_SOURCED_PACKETS.contains(_type) && !NON_VERIFIED_PACKETS.contains(_type));
auto offset = Packet::totalHeadersSize() + sizeof(PacketType) + sizeof(PacketVersion) + NUM_BYTES_RFC4122_UUID;
auto offset = Packet::totalHeaderSize(isPartOfMessage()) + sizeof(PacketType) + sizeof(PacketVersion)
+ NUM_BYTES_RFC4122_UUID;
QByteArray verificationHash = hashForPacketAndSecret(*this, connectionSecret);
memcpy(_packet.get() + offset, verificationHash.data(), verificationHash.size());

View file

@ -14,12 +14,12 @@
#include <QtCore/QSharedPointer>
#include "UUID.h"
#include "udt/Packet.h"
class NLPacket : public udt::Packet {
Q_OBJECT
public:
// this is used by the Octree classes - must be known at compile time
static const int MAX_PACKET_HEADER_SIZE =
sizeof(udt::Packet::SequenceNumberAndBitField) + sizeof(udt::Packet::MessageNumberAndBitField) +
@ -35,6 +35,13 @@ public:
// Provided for convenience, try to limit use
static std::unique_ptr<NLPacket> createCopy(const NLPacket& other);
// Current level's header size
static int localHeaderSize(PacketType type);
// Cumulated size of all the headers
static int totalHeaderSize(PacketType type, bool isPartOfMessage = false);
// The maximum payload size this packet can use to fit in MTU
static int maxPayloadSize(PacketType type, bool isPartOfMessage = false);
static PacketType typeInHeader(const udt::Packet& packet);
static PacketVersion versionInHeader(const udt::Packet& packet);
@ -42,13 +49,6 @@ public:
static QByteArray verificationHashInHeader(const udt::Packet& packet);
static QByteArray hashForPacketAndSecret(const udt::Packet& packet, const QUuid& connectionSecret);
static qint64 maxPayloadSize(PacketType type);
static qint64 localHeaderSize(PacketType type);
virtual qint64 maxPayloadSize() const; // The maximum payload size this packet can use to fit in MTU
virtual qint64 totalHeadersSize() const; // Cumulated size of all the headers
virtual qint64 localHeaderSize() const; // Current level's header size
PacketType getType() const { return _type; }
void setType(PacketType type);
@ -56,13 +56,12 @@ public:
const QUuid& getSourceID() const { return _sourceID; }
void writeSourceID(const QUuid& sourceID);
void writeVerificationHashGivenSecret(const QUuid& connectionSecret);
void writeSourceID(const QUuid& sourceID) const;
void writeVerificationHashGivenSecret(const QUuid& connectionSecret) const;
protected:
NLPacket(PacketType type, bool forceReliable = false, bool isPartOfMessage = false);
NLPacket(PacketType type, qint64 size, bool forceReliable = false, bool isPartOfMessage = false);
NLPacket(PacketType type, qint64 size = -1, bool forceReliable = false, bool isPartOfMessage = false);
NLPacket(std::unique_ptr<char[]> data, qint64 size, const HifiSockAddr& senderSockAddr);
NLPacket(std::unique_ptr<Packet> packet);
@ -82,7 +81,7 @@ protected:
PacketType _type;
PacketVersion _version;
QUuid _sourceID;
mutable QUuid _sourceID;
};
#endif // hifi_NLPacket_h

View file

@ -16,11 +16,10 @@
#include <QtCore/QDataStream>
#include <SharedUtil.h>
#include <UUID.h>
#include "NetworkLogging.h"
#include "BandwidthRecorder.h"
#include "NetworkLogging.h"
#include "UUID.h"
NetworkPeer::NetworkPeer(QObject* parent) :
QObject(parent),
@ -32,7 +31,7 @@ NetworkPeer::NetworkPeer(QObject* parent) :
_wakeTimestamp(QDateTime::currentMSecsSinceEpoch()),
_connectionAttempts(0)
{
_lastHeardMicrostamp.store(usecTimestampNow());
_lastHeardMicrostamp = usecTimestampNow();
}
NetworkPeer::NetworkPeer(const QUuid& uuid, const HifiSockAddr& publicSocket, const HifiSockAddr& localSocket, QObject* parent) :
@ -45,7 +44,7 @@ NetworkPeer::NetworkPeer(const QUuid& uuid, const HifiSockAddr& publicSocket, co
_wakeTimestamp(QDateTime::currentMSecsSinceEpoch()),
_connectionAttempts(0)
{
_lastHeardMicrostamp.store(usecTimestampNow());
_lastHeardMicrostamp = usecTimestampNow();
}
void NetworkPeer::setPublicSocket(const HifiSockAddr& publicSocket) {
@ -57,7 +56,9 @@ void NetworkPeer::setPublicSocket(const HifiSockAddr& publicSocket) {
bool wasOldSocketNull = _publicSocket.isNull();
auto temp = _publicSocket.objectName();
_publicSocket = publicSocket;
_publicSocket.setObjectName(temp);
if (!wasOldSocketNull) {
qCDebug(networking) << "Public socket change for node" << *this;
@ -74,7 +75,9 @@ void NetworkPeer::setLocalSocket(const HifiSockAddr& localSocket) {
bool wasOldSocketNull = _localSocket.isNull();
auto temp = _localSocket.objectName();
_localSocket = localSocket;
_localSocket.setObjectName(temp);
if (!wasOldSocketNull) {
qCDebug(networking) << "Local socket change for node" << *this;
@ -91,7 +94,9 @@ void NetworkPeer::setSymmetricSocket(const HifiSockAddr& symmetricSocket) {
bool wasOldSocketNull = _symmetricSocket.isNull();
auto temp = _symmetricSocket.objectName();
_symmetricSocket = symmetricSocket;
_symmetricSocket.setObjectName(temp);
if (!wasOldSocketNull) {
qCDebug(networking) << "Symmetric socket change for node" << *this;

View file

@ -61,8 +61,8 @@ public:
quint64 getWakeTimestamp() const { return _wakeTimestamp; }
void setWakeTimestamp(quint64 wakeTimestamp) { _wakeTimestamp = wakeTimestamp; }
quint64 getLastHeardMicrostamp() const { return _lastHeardMicrostamp.load(); }
void setLastHeardMicrostamp(quint64 lastHeardMicrostamp) { _lastHeardMicrostamp.store(lastHeardMicrostamp); }
quint64 getLastHeardMicrostamp() const { return _lastHeardMicrostamp; }
void setLastHeardMicrostamp(quint64 lastHeardMicrostamp) { _lastHeardMicrostamp = lastHeardMicrostamp; }
QByteArray toByteArray() const;

View file

@ -12,10 +12,9 @@
#include <cstring>
#include <stdio.h>
#include <UUID.h>
#include "Node.h"
#include "SharedUtil.h"
#include "UUID.h"
#include <QtCore/QDataStream>
#include <QtCore/QDebug>
@ -55,13 +54,23 @@ Node::Node(const QUuid& uuid, NodeType_t type, const HifiSockAddr& publicSocket,
_canAdjustLocks(canAdjustLocks),
_canRez(canRez)
{
// Update socket's object name
setType(_type);
}
Node::~Node() {
delete _linkedData;
}
void Node::setType(char type) {
_type = type;
auto typeString = NodeType::getNodeTypeName(type);
_publicSocket.setObjectName(typeString);
_localSocket.setObjectName(typeString);
_symmetricSocket.setObjectName(typeString);
}
void Node::updateClockSkewUsec(int clockSkewSample) {
_clockSkewMovingPercentile.updatePercentile((float)clockSkewSample);
_clockSkewUsec = (int)_clockSkewMovingPercentile.getValueAtPercentile();

View file

@ -40,7 +40,7 @@ public:
bool operator!=(const Node& otherNode) const { return !(*this == otherNode); }
char getType() const { return _type; }
void setType(char type) { _type = type; }
void setType(char type);
const QUuid& getConnectionSecret() const { return _connectionSecret; }
void setConnectionSecret(const QUuid& connectionSecret) { _connectionSecret = connectionSecret; }

View file

@ -252,6 +252,7 @@ void NodeList::sendDomainServerCheckIn() {
}
auto domainPacket = NLPacket::create(domainPacketType);
QDataStream packetStream(domainPacket.get());
if (domainPacketType == PacketType::DomainConnectRequest) {

View file

@ -19,7 +19,6 @@ typedef quint8 NodeType_t;
namespace NodeType {
const NodeType_t DomainServer = 'D';
const NodeType_t EntityServer = 'o'; // was ModelServer
const NodeType_t EnvironmentServer = 'E';
const NodeType_t Agent = 'I';
const NodeType_t AudioMixer = 'M';
const NodeType_t AvatarMixer = 'W';

View file

@ -70,10 +70,11 @@ void ThreadedAssignment::commonInit(const QString& targetName, NodeType_t nodeTy
_domainServerTimer->start(DOMAIN_SERVER_CHECK_IN_MSECS);
if (shouldSendStats) {
// send a stats packet every 1 second
_statsTimer = new QTimer();
connect(_statsTimer, &QTimer::timeout, this, &ThreadedAssignment::sendStatsPacket);
_statsTimer->start(1000);
// start sending stats packet once we connect to the domain
connect(&nodeList->getDomainHandler(), &DomainHandler::connectedToDomain, this, &ThreadedAssignment::startSendingStats);
// stop sending stats if we disconnect
connect(&nodeList->getDomainHandler(), &DomainHandler::disconnectedFromDomain, this, &ThreadedAssignment::stopSendingStats);
}
}
@ -96,6 +97,21 @@ void ThreadedAssignment::sendStatsPacket() {
addPacketStatsAndSendStatsPacket(statsObject);
}
void ThreadedAssignment::startSendingStats() {
// send the stats packet every 1s
if (!_statsTimer) {
_statsTimer = new QTimer();
connect(_statsTimer, &QTimer::timeout, this, &ThreadedAssignment::sendStatsPacket);
}
_statsTimer->start(1000);
}
void ThreadedAssignment::stopSendingStats() {
// stop sending stats, we just disconnected from domain
_statsTimer->stop();
}
void ThreadedAssignment::checkInWithDomainServerOrExit() {
if (DependencyManager::get<NodeList>()->getNumNoReplyDomainCheckIns() == MAX_SILENT_DOMAIN_SERVER_CHECK_INS) {
setFinished(true);

View file

@ -42,6 +42,8 @@ protected:
QTimer* _statsTimer = nullptr;
private slots:
void startSendingStats();
void stopSendingStats();
void checkInWithDomainServerOrExit();
};

View file

@ -20,9 +20,13 @@
class UUIDHasher {
public:
size_t operator()(const QUuid& uuid) const {
return uuid.data1 ^ uuid.data2 ^ (uuid.data3 << 16)
^ ((uuid.data4[0] << 24) | (uuid.data4[1] << 16) | (uuid.data4[2] << 8) | uuid.data4[3])
^ ((uuid.data4[4] << 24) | (uuid.data4[5] << 16) | (uuid.data4[6] << 8) | uuid.data4[7]);
return qHash(uuid);
}
};
template <> struct std::hash<QUuid> {
size_t operator()(const QUuid& uuid) const {
return qHash(uuid);
}
};

View file

@ -1,6 +1,6 @@
//
// WalletTransaction.cpp
// domain-server/src
// libraries/networking/src
//
// Created by Stephen Birarda on 2014-05-20.
// Copyright 2014 High Fidelity, Inc.
@ -11,10 +11,10 @@
#include <QtCore/QJsonObject>
#include <UUID.h>
#include "UUID.h"
#include "WalletTransaction.h"
WalletTransaction::WalletTransaction() :
_uuid(),
_destinationUUID(),
@ -64,4 +64,4 @@ void WalletTransaction::loadFromJson(const QJsonObject& jsonObject) {
_uuid = QUuid(transactionObject.value(TRANSACTION_ID_KEY).toString());
_destinationUUID = QUuid(transactionObject.value(TRANSACTION_DESTINATION_WALLET_ID_KEY).toString());
_amount = transactionObject.value(TRANSACTION_AMOUNT_KEY).toInt();
}
}

View file

@ -15,6 +15,16 @@ using namespace udt;
const qint64 BasePacket::PACKET_WRITE_ERROR = -1;
int BasePacket::localHeaderSize() {
return 0;
}
int BasePacket::totalHeaderSize() {
return 0;
}
int BasePacket::maxPayloadSize() {
return MAX_PACKET_SIZE;
}
std::unique_ptr<BasePacket> BasePacket::create(qint64 size) {
auto packet = std::unique_ptr<BasePacket>(new BasePacket(size));
@ -37,7 +47,7 @@ std::unique_ptr<BasePacket> BasePacket::fromReceivedPacket(std::unique_ptr<char[
}
BasePacket::BasePacket(qint64 size) {
auto maxPayload = maxPayloadSize();
auto maxPayload = BasePacket::maxPayloadSize();
if (size == -1) {
// default size of -1, means biggest packet possible
@ -73,7 +83,7 @@ BasePacket::BasePacket(const BasePacket& other) :
BasePacket& BasePacket::operator=(const BasePacket& other) {
_packetSize = other._packetSize;
_packet = std::unique_ptr<char>(new char[_packetSize]);
_packet = std::unique_ptr<char[]>(new char[_packetSize]);
memcpy(_packet.get(), other._packet.get(), _packetSize);
_payloadStart = _packet.get() + (other._payloadStart - other._packet.get());
@ -116,6 +126,10 @@ BasePacket& BasePacket::operator=(BasePacket&& other) {
return *this;
}
qint64 BasePacket::getDataSize() const {
return (_payloadStart - _packet.get()) + _payloadSize;
}
void BasePacket::setPayloadSize(qint64 payloadSize) {
if (isWritable()) {
Q_ASSERT(payloadSize <= _payloadCapacity);
@ -142,13 +156,13 @@ bool BasePacket::reset() {
}
qint64 BasePacket::writeData(const char* data, qint64 maxSize) {
Q_ASSERT_X(maxSize <= bytesAvailableForWrite(), "BasePacket::writeData", "not enough space for write");
// make sure we have the space required to write this block
if (maxSize <= bytesAvailableForWrite()) {
qint64 currentPos = pos();
Q_ASSERT(currentPos < _payloadCapacity);
// good to go - write the data
memcpy(_payloadStart + currentPos, data, maxSize);
@ -177,8 +191,7 @@ qint64 BasePacket::readData(char* dest, qint64 maxSize) {
return numBytesToRead;
}
void BasePacket::adjustPayloadStartAndCapacity(bool shouldDecreasePayloadSize) {
qint64 headerSize = localHeaderSize();
void BasePacket::adjustPayloadStartAndCapacity(qint64 headerSize, bool shouldDecreasePayloadSize) {
_payloadStart += headerSize;
_payloadCapacity -= headerSize;

View file

@ -17,23 +17,25 @@
#include <QtCore/QIODevice>
#include "../HifiSockAddr.h"
#include "Constants.h"
namespace udt {
static const int MAX_PACKET_SIZE = 1450;
class BasePacket : public QIODevice {
Q_OBJECT
public:
static const qint64 PACKET_WRITE_ERROR;
static std::unique_ptr<BasePacket> create(qint64 size = -1);
static std::unique_ptr<BasePacket> fromReceivedPacket(std::unique_ptr<char[]> data, qint64 size, const HifiSockAddr& senderSockAddr);
static std::unique_ptr<BasePacket> fromReceivedPacket(std::unique_ptr<char[]> data, qint64 size,
const HifiSockAddr& senderSockAddr);
static qint64 maxPayloadSize() { return MAX_PACKET_SIZE; } // The maximum payload size this packet can use to fit in MTU
static qint64 localHeaderSize() { return 0; } // Current level's header size
virtual qint64 totalHeadersSize() const { return 0; } // Cumulated size of all the headers
// Current level's header size
static int localHeaderSize();
// Cumulated size of all the headers
static int totalHeaderSize();
// The maximum payload size this packet can use to fit in MTU
static int maxPayloadSize();
// Payload direct access to the payload, use responsibly!
char* getPayload() { return _payloadStart; }
@ -44,7 +46,7 @@ public:
const char* getData() const { return _packet.get(); }
// Returns the size of the packet, including the header
qint64 getDataSize() const { return totalHeadersSize() + _payloadSize; }
qint64 getDataSize() const;
// Returns the size of the payload only
qint64 getPayloadSize() const { return _payloadSize; }
@ -86,7 +88,7 @@ protected:
virtual qint64 writeData(const char* data, qint64 maxSize);
virtual qint64 readData(char* data, qint64 maxSize);
virtual void adjustPayloadStartAndCapacity(bool shouldDecreasePayloadSize = false);
void adjustPayloadStartAndCapacity(qint64 headerSize, bool shouldDecreasePayloadSize = false);
qint64 _packetSize = 0; // Total size of the allocated memory
std::unique_ptr<char[]> _packet; // Allocated memory

View file

@ -0,0 +1,207 @@
//
// CongestionControl.cpp
// libraries/networking/src/udt
//
// Created by Clement on 7/23/15.
// Copyright 2015 High Fidelity, Inc.
//
// Distributed under the Apache License, Version 2.0.
// See the accompanying file LICENSE or http://www.apache.org/licenses/LICENSE-2.0.html
//
#include "CongestionControl.h"
#include <random>
#include "Packet.h"
using namespace udt;
using namespace std::chrono;
static const double USECS_PER_SECOND = 1000000.0;
void CongestionControl::setPacketSendPeriod(double newSendPeriod) {
Q_ASSERT_X(newSendPeriod >= 0, "CongestionControl::setPacketPeriod", "Can not set a negative packet send period");
if (_maxBandwidth > 0) {
// anytime the packet send period is about to be increased, make sure it stays below the minimum period,
// calculated based on the maximum desired bandwidth
double minPacketSendPeriod = USECS_PER_SECOND / (((double) _maxBandwidth) / _mss);
_packetSendPeriod = std::max(newSendPeriod, minPacketSendPeriod);
} else {
_packetSendPeriod = newSendPeriod;
}
}
DefaultCC::DefaultCC() :
_lastRCTime(high_resolution_clock::now()),
_slowStartLastAck(_sendCurrSeqNum),
_lastDecreaseMaxSeq(SequenceNumber {SequenceNumber::MAX })
{
_mss = udt::MAX_PACKET_SIZE_WITH_UDP_HEADER;
_congestionWindowSize = 16.0;
_packetSendPeriod = 1.0;
}
void DefaultCC::onACK(SequenceNumber ackNum) {
double increase = 0;
// Note from UDT original code:
// The minimum increase parameter is increased from "1.0 / _mss" to 0.01
// because the original was too small and caused sending rate to stay at low level
// for long time.
const double minimumIncrease = 0.01;
// we will only adjust once per sync interval so check that it has been at least that long now
auto now = high_resolution_clock::now();
if (duration_cast<microseconds>(now - _lastRCTime).count() < synInterval()) {
return;
}
// our last rate increase time is now
_lastRCTime = now;
if (_slowStart) {
// we are in slow start phase - increase the congestion window size by the number of packets just ACKed
_congestionWindowSize += seqlen(_slowStartLastAck, ackNum);
// update the last ACK
_slowStartLastAck = ackNum;
// check if we can get out of slow start (is our new congestion window size bigger than the max)
if (_congestionWindowSize > _maxCongestionWindowSize) {
_slowStart = false;
if (_receiveRate > 0) {
// if we have a valid receive rate we set the send period to whatever the receive rate dictates
_packetSendPeriod = USECS_PER_SECOND / _receiveRate;
} else {
// no valid receive rate, packet send period is dictated by estimated RTT and current congestion window size
_packetSendPeriod = (_rtt + synInterval()) / _congestionWindowSize;
}
}
} else {
// not in slow start - window size should be arrival rate * (RTT + SYN) + 16
_congestionWindowSize = _receiveRate / USECS_PER_SECOND * (_rtt + synInterval()) + 16;
}
// during slow start we perform no rate increases
if (_slowStart) {
return;
}
// if loss has happened since the last rate increase we do not perform another increase
if (_loss) {
_loss = false;
return;
}
double capacitySpeedDelta = (_bandwidth - USECS_PER_SECOND / _packetSendPeriod);
// UDT uses what they call DAIMD - additive increase multiplicative decrease with decreasing increases
// This factor is a protocol parameter that is part of the DAIMD algorithim
static const int AIMD_DECREASING_INCREASE_FACTOR = 9;
if ((_packetSendPeriod > _lastDecreasePeriod) && ((_bandwidth / AIMD_DECREASING_INCREASE_FACTOR) < capacitySpeedDelta)) {
capacitySpeedDelta = _bandwidth / AIMD_DECREASING_INCREASE_FACTOR;
}
if (capacitySpeedDelta <= 0) {
increase = minimumIncrease;
} else {
// use UDTs DAIMD algorithm to figure out what the send period increase factor should be
// inc = max(10 ^ ceil(log10(B * MSS * 8 ) * Beta / MSS, minimumIncrease)
// B = estimated link capacity
// Beta = 1.5 * 10^(-6)
static const double BETA = 0.0000015;
static const double BITS_PER_BYTE = 8.0;
increase = pow(10.0, ceil(log10(capacitySpeedDelta * _mss * BITS_PER_BYTE))) * BETA / _mss;
if (increase < minimumIncrease) {
increase = minimumIncrease;
}
}
setPacketSendPeriod((_packetSendPeriod * synInterval()) / (_packetSendPeriod * increase + synInterval()));
}
void DefaultCC::onLoss(SequenceNumber rangeStart, SequenceNumber rangeEnd) {
// stop the slow start if we haven't yet
if (_slowStart) {
stopSlowStart();
// if the change to send rate was driven by a known receive rate, then we don't continue with the decrease
if (_receiveRate > 0) {
return;
}
}
_loss = true;
static const double INTER_PACKET_ARRIVAL_INCREASE = 1.125;
static const int MAX_DECREASES_PER_CONGESTION_EPOCH = 5;
// check if this NAK starts a new congestion period - this will be the case if the
// NAK received occured for a packet sent after the last decrease
if (rangeStart > _lastDecreaseMaxSeq) {
_lastDecreasePeriod = _packetSendPeriod;
_packetSendPeriod = ceil(_packetSendPeriod * INTER_PACKET_ARRIVAL_INCREASE);
// use EWMA to update the average number of NAKs per congestion
static const double NAK_EWMA_ALPHA = 0.125;
_avgNAKNum = (int)ceil(_avgNAKNum * (1 - NAK_EWMA_ALPHA) + _nakCount * NAK_EWMA_ALPHA);
// update the count of NAKs and count of decreases in this interval
_nakCount = 1;
_decreaseCount = 1;
_lastDecreaseMaxSeq = _sendCurrSeqNum;
// avoid synchronous rate decrease across connections using randomization
std::random_device rd;
std::mt19937 generator(rd());
std::uniform_int_distribution<> distribution(1, _avgNAKNum);
_randomDecreaseThreshold = distribution(generator);
} else if ((_decreaseCount++ < MAX_DECREASES_PER_CONGESTION_EPOCH) && ((++_nakCount % _randomDecreaseThreshold) == 0)) {
// there have been fewer than MAX_DECREASES_PER_CONGESTION_EPOCH AND this NAK matches the random count at which we
// decided we would decrease the packet send period
_packetSendPeriod = ceil(_packetSendPeriod * INTER_PACKET_ARRIVAL_INCREASE);
_lastDecreaseMaxSeq = _sendCurrSeqNum;
}
}
// Note: This isn't currently being called by anything since we, unlike UDT, don't have TTL on our packets
void DefaultCC::onTimeout() {
if (_slowStart) {
stopSlowStart();
} else {
// UDT used to do the following on timeout if not in slow start - we should check if it could be helpful
// _lastDecreasePeriod = _packetSendPeriod;
// _packetSendPeriod = ceil(_packetSendPeriod * 2);
// this seems odd - the last ack they were setting _lastDecreaseMaxSeq to only applies to slow start
// _lastDecreaseMaxSeq = _slowStartLastAck;
}
}
void DefaultCC::stopSlowStart() {
_slowStart = false;
if (_receiveRate > 0) {
// Set the sending rate to the receiving rate.
_packetSendPeriod = USECS_PER_SECOND / _receiveRate;
} else {
// If no receiving rate is observed, we have to compute the sending
// rate according to the current window size, and decrease it
// using the method below.
_packetSendPeriod = _congestionWindowSize / (_rtt + synInterval());
}
}

View file

@ -0,0 +1,122 @@
//
// CongestionControl.h
// libraries/networking/src/udt
//
// Created by Clement on 7/23/15.
// Copyright 2015 High Fidelity, Inc.
//
// Distributed under the Apache License, Version 2.0.
// See the accompanying file LICENSE or http://www.apache.org/licenses/LICENSE-2.0.html
//
#ifndef hifi_CongestionControl_h
#define hifi_CongestionControl_h
#include <chrono>
#include <vector>
#include "LossList.h"
#include "SequenceNumber.h"
namespace udt {
static const int32_t DEFAULT_SYN_INTERVAL = 10000; // 10 ms
class Connection;
class Packet;
class CongestionControl {
friend class Connection;
public:
CongestionControl() {};
CongestionControl(int synInterval) : _synInterval(synInterval) {}
virtual ~CongestionControl() {}
int synInterval() const { return _synInterval; }
virtual void init() {}
virtual void onACK(SequenceNumber ackNum) {}
virtual void onLoss(SequenceNumber rangeStart, SequenceNumber rangeEnd) {}
protected:
void setAckInterval(int ackInterval) { _ackInterval = ackInterval; }
void setRTO(int rto) { _userDefinedRTO = true; _rto = rto; }
void setMSS(int mss) { _mss = mss; }
void setMaxCongestionWindowSize(int window) { _maxCongestionWindowSize = window; }
void setBandwidth(int bandwidth) { _bandwidth = bandwidth; }
void setMaxBandwidth(int maxBandwidth) { _maxBandwidth = maxBandwidth; }
void setSendCurrentSequenceNumber(SequenceNumber seqNum) { _sendCurrSeqNum = seqNum; }
void setReceiveRate(int rate) { _receiveRate = rate; }
void setRTT(int rtt) { _rtt = rtt; }
void setPacketSendPeriod(double newSendPeriod); // call this internally to ensure send period doesn't go past max bandwidth
double _packetSendPeriod { 1.0 }; // Packet sending period, in microseconds
double _congestionWindowSize { 16.0 }; // Congestion window size, in packets
int _bandwidth { 0 }; // estimated bandwidth, packets per second
int _maxBandwidth { -1 }; // Maximum desired bandwidth, packets per second
double _maxCongestionWindowSize { 0.0 }; // maximum cwnd size, in packets
int _mss { 0 }; // Maximum Packet Size, including all packet headers
SequenceNumber _sendCurrSeqNum; // current maximum seq num sent out
int _receiveRate { 0 }; // packet arrive rate at receiver side, packets per second
int _rtt { 0 }; // current estimated RTT, microsecond
private:
CongestionControl(const CongestionControl& other) = delete;
CongestionControl& operator=(const CongestionControl& other) = delete;
int _ackInterval { 0 }; // How many packets to send one ACK, in packets
int _lightACKInterval { 64 }; // How many packets to send one light ACK, in packets
int _synInterval { DEFAULT_SYN_INTERVAL };
bool _userDefinedRTO { false }; // if the RTO value is defined by users
int _rto { -1 }; // RTO value, microseconds
};
class CongestionControlVirtualFactory {
public:
virtual ~CongestionControlVirtualFactory() {}
static int synInterval() { return DEFAULT_SYN_INTERVAL; }
virtual std::unique_ptr<CongestionControl> create() = 0;
};
template <class T> class CongestionControlFactory: public CongestionControlVirtualFactory {
public:
virtual ~CongestionControlFactory() {}
virtual std::unique_ptr<CongestionControl> create() { return std::unique_ptr<T>(new T()); }
};
class DefaultCC: public CongestionControl {
public:
DefaultCC();
public:
virtual void onACK(SequenceNumber ackNum);
virtual void onLoss(SequenceNumber rangeStart, SequenceNumber rangeEnd);
virtual void onTimeout();
private:
void stopSlowStart(); // stops the slow start on loss or timeout
std::chrono::high_resolution_clock::time_point _lastRCTime; // last rate increase time
bool _slowStart { true }; // if in slow start phase
SequenceNumber _slowStartLastAck; // last ACKed seq num
bool _loss { false }; // if loss happened since last rate increase
SequenceNumber _lastDecreaseMaxSeq; // max pkt seq num sent out when last decrease happened
double _lastDecreasePeriod { 1 }; // value of _packetSendPeriod when last decrease happened
int _nakCount { 0 }; // number of NAKs in congestion epoch
int _randomDecreaseThreshold { 1 }; // random threshold on decrease by number of loss events
int _avgNAKNum { 0 }; // average number of NAKs per congestion
int _decreaseCount { 0 }; // number of decreases in a congestion epoch
};
}
#endif // hifi_CongestionControl_h

View file

@ -0,0 +1,611 @@
//
// Connection.cpp
// libraries/networking/src/udt
//
// Created by Clement on 7/27/15.
// Copyright 2015 High Fidelity, Inc.
//
// Distributed under the Apache License, Version 2.0.
// See the accompanying file LICENSE or http://www.apache.org/licenses/LICENSE-2.0.html
//
#include "Connection.h"
#include <QtCore/QThread>
#include <NumericalConstants.h>
#include "../HifiSockAddr.h"
#include "CongestionControl.h"
#include "ControlPacket.h"
#include "Packet.h"
#include "Socket.h"
using namespace udt;
using namespace std::chrono;
Connection::Connection(Socket* parentSocket, HifiSockAddr destination, std::unique_ptr<CongestionControl> congestionControl) :
_parentSocket(parentSocket),
_destination(destination),
_congestionControl(move(congestionControl))
{
Q_ASSERT_X(socket, "Connection::Connection", "Must be called with a valid Socket*");
Q_ASSERT_X(_congestionControl, "Connection::Connection", "Must be called with a valid CongestionControl object");
_congestionControl->init();
// setup default SYN, RTT and RTT Variance based on the SYN interval in CongestionControl object
_synInterval = _congestionControl->synInterval();
_rtt = _synInterval * 10;
_rttVariance = _rtt / 2;
// set the initial RTT and flow window size on congestion control object
_congestionControl->setRTT(_rtt);
_congestionControl->setMaxCongestionWindowSize(_flowWindowSize);
}
Connection::~Connection() {
if (_sendQueue) {
// grab the send queue thread so we can wait on it
QThread* sendQueueThread = _sendQueue->thread();
// tell the send queue to stop and be deleted
_sendQueue->stop();
_sendQueue->deleteLater();
_sendQueue.release();
// wait on the send queue thread so we know the send queue is gone
sendQueueThread->wait();
}
}
SendQueue& Connection::getSendQueue() {
if (!_sendQueue) {
// Lasily create send queue
_sendQueue = SendQueue::create(_parentSocket, _destination);
QObject::connect(_sendQueue.get(), &SendQueue::packetSent, this, &Connection::packetSent);
QObject::connect(_sendQueue.get(), &SendQueue::packetSent, this, &Connection::recordSentPackets);
QObject::connect(_sendQueue.get(), &SendQueue::packetRetransmitted, this, &Connection::recordRetransmission);
// set defaults on the send queue from our congestion control object
_sendQueue->setPacketSendPeriod(_congestionControl->_packetSendPeriod);
_sendQueue->setFlowWindowSize(std::min(_flowWindowSize, (int) _congestionControl->_congestionWindowSize));
}
return *_sendQueue;
}
void Connection::sendReliablePacket(std::unique_ptr<Packet> packet) {
Q_ASSERT_X(packet->isReliable(), "Connection::send", "Trying to send an unreliable packet reliably.");
getSendQueue().queuePacket(move(packet));
}
void Connection::sync() {
if (_hasReceivedFirstPacket) {
// reset the number of light ACKs or non SYN ACKs during this sync interval
_lightACKsDuringSYN = 1;
_acksDuringSYN = 1;
// we send out a periodic ACK every rate control interval
sendACK();
// check if we need to re-transmit a loss list
// we do this if it has been longer than the current nakInterval since we last sent
auto now = high_resolution_clock::now();
if (duration_cast<microseconds>(now - _lastNAKTime).count() >= _nakInterval) {
// Send a timeout NAK packet
sendTimeoutNAK();
}
}
}
void Connection::recordSentPackets(int dataSize, int payloadSize) {
_stats.recordSentPackets(payloadSize, dataSize);
}
void Connection::recordRetransmission() {
_stats.record(ConnectionStats::Stats::Retransmission);
}
void Connection::sendACK(bool wasCausedBySyncTimeout) {
static high_resolution_clock::time_point lastACKSendTime;
auto currentTime = high_resolution_clock::now();
SequenceNumber nextACKNumber = nextACK();
Q_ASSERT_X(nextACKNumber >= _lastSentACK, "Connection::sendACK", "Sending lower ACK, something is wrong");
if (nextACKNumber == _lastSentACK) {
// We already sent this ACK, but check if we should re-send it.
if (nextACKNumber < _lastReceivedAcknowledgedACK) {
// we already got an ACK2 for this ACK we would be sending, don't bother
return;
}
// We will re-send if it has been more than the estimated timeout since the last ACK
microseconds sinceLastACK = duration_cast<microseconds>(currentTime - lastACKSendTime);
if (sinceLastACK.count() < estimatedTimeout()) {
return;
}
}
// we have received new packets since the last sent ACK
// update the last sent ACK
_lastSentACK = nextACKNumber;
// setup the ACK packet, make it static so we can re-use it
static const int ACK_PACKET_PAYLOAD_BYTES = sizeof(_lastSentACK) + sizeof(_currentACKSubSequenceNumber)
+ sizeof(_rtt) + sizeof(int32_t) + sizeof(int32_t) + sizeof(int32_t);
static auto ackPacket = ControlPacket::create(ControlPacket::ACK, ACK_PACKET_PAYLOAD_BYTES);
ackPacket->reset(); // We need to reset it every time.
// pack in the ACK sub-sequence number
ackPacket->writePrimitive(++_currentACKSubSequenceNumber);
// pack in the ACK number
ackPacket->writePrimitive(nextACKNumber);
// pack in the RTT and variance
ackPacket->writePrimitive(_rtt);
// pack the available buffer size, in packets
// in our implementation we have no hard limit on receive buffer size, send the default value
ackPacket->writePrimitive((int32_t) udt::CONNECTION_RECEIVE_BUFFER_SIZE_PACKETS);
if (wasCausedBySyncTimeout) {
// grab the up to date packet receive speed and estimated bandwidth
int32_t packetReceiveSpeed = _receiveWindow.getPacketReceiveSpeed();
int32_t estimatedBandwidth = _receiveWindow.getEstimatedBandwidth();
// update those values in our connection stats
_stats.recordReceiveRate(packetReceiveSpeed);
_stats.recordEstimatedBandwidth(estimatedBandwidth);
// pack in the receive speed and estimatedBandwidth
ackPacket->writePrimitive(packetReceiveSpeed);
ackPacket->writePrimitive(estimatedBandwidth);
// record this as the last ACK send time
lastACKSendTime = high_resolution_clock::now();
}
// have the socket send off our packet
_parentSocket->writeBasePacket(*ackPacket, _destination);
Q_ASSERT_X(_sentACKs.empty() || _sentACKs.back().first + 1 == _currentACKSubSequenceNumber,
"Connection::sendACK", "Adding an invalid ACK to _sentACKs");
// write this ACK to the map of sent ACKs
_sentACKs.push_back({ _currentACKSubSequenceNumber, { nextACKNumber, high_resolution_clock::now() }});
// reset the number of data packets received since last ACK
_packetsSinceACK = 0;
_stats.record(ConnectionStats::Stats::SentACK);
}
void Connection::sendLightACK() {
SequenceNumber nextACKNumber = nextACK();
if (nextACKNumber == _lastReceivedAcknowledgedACK) {
// we already got an ACK2 for this ACK we would be sending, don't bother
return;
}
// create the light ACK packet, make it static so we can re-use it
static const int LIGHT_ACK_PACKET_PAYLOAD_BYTES = sizeof(SequenceNumber);
static auto lightACKPacket = ControlPacket::create(ControlPacket::ACK, LIGHT_ACK_PACKET_PAYLOAD_BYTES);
// reset the lightACKPacket before we go to write the ACK to it
lightACKPacket->reset();
// pack in the ACK
lightACKPacket->writePrimitive(nextACKNumber);
// have the socket send off our packet immediately
_parentSocket->writeBasePacket(*lightACKPacket, _destination);
_stats.record(ConnectionStats::Stats::SentLightACK);
}
void Connection::sendACK2(SequenceNumber currentACKSubSequenceNumber) {
// setup a static ACK2 packet we will re-use
static const int ACK2_PAYLOAD_BYTES = sizeof(SequenceNumber);
static auto ack2Packet = ControlPacket::create(ControlPacket::ACK2, ACK2_PAYLOAD_BYTES);
// reset the ACK2 Packet before writing the sub-sequence number to it
ack2Packet->reset();
// write the sub sequence number for this ACK2
ack2Packet->writePrimitive(currentACKSubSequenceNumber);
// send the ACK2 packet
_parentSocket->writeBasePacket(*ack2Packet, _destination);
// update the last sent ACK2 and the last ACK2 send time
_lastSentACK2 = currentACKSubSequenceNumber;
_stats.record(ConnectionStats::Stats::SentACK2);
}
void Connection::sendNAK(SequenceNumber sequenceNumberRecieved) {
// create the loss report packet, make it static so we can re-use it
static const int NAK_PACKET_PAYLOAD_BYTES = 2 * sizeof(SequenceNumber);
static auto lossReport = ControlPacket::create(ControlPacket::NAK, NAK_PACKET_PAYLOAD_BYTES);
lossReport->reset(); // We need to reset it every time.
// pack in the loss report
lossReport->writePrimitive(_lastReceivedSequenceNumber + 1);
if (_lastReceivedSequenceNumber + 1 != sequenceNumberRecieved - 1) {
lossReport->writePrimitive(sequenceNumberRecieved - 1);
}
// have the parent socket send off our packet immediately
_parentSocket->writeBasePacket(*lossReport, _destination);
// record our last NAK time
_lastNAKTime = high_resolution_clock::now();
_stats.record(ConnectionStats::Stats::SentNAK);
}
void Connection::sendTimeoutNAK() {
if (_lossList.getLength() > 0) {
int timeoutPayloadSize = std::min((int) (_lossList.getLength() * 2 * sizeof(SequenceNumber)),
ControlPacket::maxPayloadSize());
// construct a NAK packet that will hold all of the lost sequence numbers
auto lossListPacket = ControlPacket::create(ControlPacket::TimeoutNAK, timeoutPayloadSize);
// Pack in the lost sequence numbers
_lossList.write(*lossListPacket, timeoutPayloadSize / 2);
// have our parent socket send off this control packet
_parentSocket->writeBasePacket(*lossListPacket, _destination);
// record this as the last NAK time
_lastNAKTime = high_resolution_clock::now();
_stats.record(ConnectionStats::Stats::SentTimeoutNAK);
}
}
SequenceNumber Connection::nextACK() const {
if (_lossList.getLength() > 0) {
return _lossList.getFirstSequenceNumber() - 1;
} else {
return _lastReceivedSequenceNumber;
}
}
bool Connection::processReceivedSequenceNumber(SequenceNumber sequenceNumber, int packetSize, int payloadSize) {
_hasReceivedFirstPacket = true;
// check if this is a packet pair we should estimate bandwidth from, or just a regular packet
if (((uint32_t) sequenceNumber & 0xF) == 0) {
_receiveWindow.onProbePair1Arrival();
} else if (((uint32_t) sequenceNumber & 0xF) == 1) {
_receiveWindow.onProbePair2Arrival();
}
_receiveWindow.onPacketArrival();
// If this is not the next sequence number, report loss
if (sequenceNumber > _lastReceivedSequenceNumber + 1) {
if (_lastReceivedSequenceNumber + 1 == sequenceNumber - 1) {
_lossList.append(_lastReceivedSequenceNumber + 1);
} else {
_lossList.append(_lastReceivedSequenceNumber + 1, sequenceNumber - 1);
}
// Send a NAK packet
sendNAK(sequenceNumber);
// figure out when we should send the next loss report, if we haven't heard anything back
_nakInterval = estimatedTimeout();
int receivedPacketsPerSecond = _receiveWindow.getPacketReceiveSpeed();
if (receivedPacketsPerSecond > 0) {
// the NAK interval is at least the _minNAKInterval
// but might be the time required for all lost packets to be retransmitted
_nakInterval += (int) (_lossList.getLength() * (USECS_PER_SECOND / receivedPacketsPerSecond));
}
// the NAK interval is at least the _minNAKInterval but might be the estimated timeout
_nakInterval = std::max(_nakInterval, _minNAKInterval);
}
bool wasDuplicate = false;
if (sequenceNumber > _lastReceivedSequenceNumber) {
// Update largest recieved sequence number
_lastReceivedSequenceNumber = sequenceNumber;
} else {
// Otherwise, it could be a resend, try and remove it from the loss list
wasDuplicate = !_lossList.remove(sequenceNumber);
}
// increment the counters for data packets received
++_packetsSinceACK;
// check if we need to send an ACK, according to CC params
if (_congestionControl->_ackInterval > 0 && _packetsSinceACK >= _congestionControl->_ackInterval * _acksDuringSYN) {
_acksDuringSYN++;
sendACK(false);
} else if (_congestionControl->_lightACKInterval > 0
&& _packetsSinceACK >= _congestionControl->_lightACKInterval * _lightACKsDuringSYN) {
sendLightACK();
++_lightACKsDuringSYN;
}
if (wasDuplicate) {
_stats.record(ConnectionStats::Stats::Duplicate);
} else {
_stats.recordReceivedPackets(payloadSize, packetSize);
}
return wasDuplicate;
}
void Connection::processControl(std::unique_ptr<ControlPacket> controlPacket) {
// Simple dispatch to control packets processing methods based on their type
switch (controlPacket->getType()) {
case ControlPacket::ACK:
if (controlPacket->getPayloadSize() == sizeof(SequenceNumber)) {
processLightACK(move(controlPacket));
} else {
processACK(move(controlPacket));
}
break;
case ControlPacket::ACK2:
processACK2(move(controlPacket));
break;
case ControlPacket::NAK:
processNAK(move(controlPacket));
break;
case ControlPacket::TimeoutNAK:
processTimeoutNAK(move(controlPacket));
break;
}
}
void Connection::processACK(std::unique_ptr<ControlPacket> controlPacket) {
// read the ACK sub-sequence number
SequenceNumber currentACKSubSequenceNumber;
controlPacket->readPrimitive(&currentACKSubSequenceNumber);
// Check if we need send an ACK2 for this ACK
// This will be the case if it has been longer than the sync interval OR
// it looks like they haven't received our ACK2 for this ACK
auto currentTime = high_resolution_clock::now();
static high_resolution_clock::time_point lastACK2SendTime;
microseconds sinceLastACK2 = duration_cast<microseconds>(currentTime - lastACK2SendTime);
if (sinceLastACK2.count() >= _synInterval || currentACKSubSequenceNumber == _lastSentACK2) {
// Send ACK2 packet
sendACK2(currentACKSubSequenceNumber);
lastACK2SendTime = high_resolution_clock::now();
}
// read the ACKed sequence number
SequenceNumber ack;
controlPacket->readPrimitive(&ack);
// update the total count of received ACKs
_stats.record(ConnectionStats::Stats::ReceivedACK);
// validate that this isn't a BS ACK
if (ack > getSendQueue().getCurrentSequenceNumber()) {
// in UDT they specifically break the connection here - do we want to do anything?
Q_ASSERT_X(false, "Connection::processACK", "ACK recieved higher than largest sent sequence number");
return;
}
// read the RTT
int32_t rtt;
controlPacket->readPrimitive(&rtt);
if (ack < _lastReceivedACK) {
// this is an out of order ACK, bail
return;
}
// this is a valid ACKed sequence number - update the flow window size and the last received ACK
int32_t packedFlowWindow;
controlPacket->readPrimitive(&packedFlowWindow);
_flowWindowSize = packedFlowWindow;
if (ack == _lastReceivedACK) {
// processing an already received ACK, bail
return;
}
_lastReceivedACK = ack;
// ACK the send queue so it knows what was received
getSendQueue().ack(ack);
// update the RTT
updateRTT(rtt);
// write this RTT to stats
_stats.recordRTT(rtt);
// set the RTT for congestion control
_congestionControl->setRTT(_rtt);
if (controlPacket->bytesLeftToRead() > 0) {
int32_t receiveRate, bandwidth;
Q_ASSERT_X(controlPacket->bytesLeftToRead() == sizeof(receiveRate) + sizeof(bandwidth),
"Connection::processACK", "sync interval ACK packet does not contain expected data");
controlPacket->readPrimitive(&receiveRate);
controlPacket->readPrimitive(&bandwidth);
// set the delivery rate and bandwidth for congestion control
// these are calculated using an EWMA
static const int EMWA_ALPHA_NUMERATOR = 8;
// record these samples in connection stats
_stats.recordSendRate(receiveRate);
_stats.recordEstimatedBandwidth(bandwidth);
_deliveryRate = (_deliveryRate * (EMWA_ALPHA_NUMERATOR - 1) + receiveRate) / EMWA_ALPHA_NUMERATOR;
_bandwidth = (_bandwidth * (EMWA_ALPHA_NUMERATOR - 1) + bandwidth) / EMWA_ALPHA_NUMERATOR;
_congestionControl->setReceiveRate(_deliveryRate);
_congestionControl->setBandwidth(_bandwidth);
}
// give this ACK to the congestion control and update the send queue parameters
updateCongestionControlAndSendQueue([this, ack](){
_congestionControl->onACK(ack);
});
_stats.record(ConnectionStats::Stats::ProcessedACK);
}
void Connection::processLightACK(std::unique_ptr<ControlPacket> controlPacket) {
// read the ACKed sequence number
SequenceNumber ack;
controlPacket->readPrimitive(&ack);
// must be larger than the last received ACK to be processed
if (ack > _lastReceivedACK) {
// NOTE: the following makes sense in UDT where there is a dynamic receive buffer.
// Since we have a receive buffer that is always of a default size, we don't use this light ACK to
// drop the flow window size.
// decrease the flow window size by the offset between the last received ACK and this ACK
// _flowWindowSize -= seqoff(_lastReceivedACK, ack);
// update the last received ACK to the this one
_lastReceivedACK = ack;
// send light ACK to the send queue
getSendQueue().ack(ack);
}
_stats.record(ConnectionStats::Stats::ReceivedLightACK);
}
void Connection::processACK2(std::unique_ptr<ControlPacket> controlPacket) {
// pull the sub sequence number from the packet
SequenceNumber subSequenceNumber;
controlPacket->readPrimitive(&subSequenceNumber);
// check if we had that subsequence number in our map
auto it = std::find_if_not(_sentACKs.begin(), _sentACKs.end(), [&subSequenceNumber](const ACKListPair& pair){
return pair.first < subSequenceNumber;
});
if (it != _sentACKs.end()) {
if (it->first == subSequenceNumber){
// update the RTT using the ACK window
// calculate the RTT (time now - time ACK sent)
auto now = high_resolution_clock::now();
int rtt = duration_cast<microseconds>(now - it->second.second).count();
updateRTT(rtt);
// write this RTT to stats
_stats.recordRTT(rtt);
// set the RTT for congestion control
_congestionControl->setRTT(_rtt);
// update the last ACKed ACK
if (it->second.first > _lastReceivedAcknowledgedACK) {
_lastReceivedAcknowledgedACK = it->second.first;
}
} else if (it->first < subSequenceNumber) {
Q_UNREACHABLE();
}
}
// erase this sub-sequence number and anything below it now that we've gotten our timing information
_sentACKs.erase(_sentACKs.begin(), it);
_stats.record(ConnectionStats::Stats::ReceivedACK2);
}
void Connection::processNAK(std::unique_ptr<ControlPacket> controlPacket) {
// read the loss report
SequenceNumber start, end;
controlPacket->readPrimitive(&start);
end = start;
if (controlPacket->bytesLeftToRead() >= (qint64)sizeof(SequenceNumber)) {
controlPacket->readPrimitive(&end);
}
// send that off to the send queue so it knows there was loss
getSendQueue().nak(start, end);
// give the loss to the congestion control object and update the send queue parameters
updateCongestionControlAndSendQueue([this, start, end](){
_congestionControl->onLoss(start, end);
});
_stats.record(ConnectionStats::Stats::ReceivedNAK);
}
void Connection::processTimeoutNAK(std::unique_ptr<ControlPacket> controlPacket) {
// Override SendQueue's LossList with the timeout NAK list
getSendQueue().overrideNAKListFromPacket(*controlPacket);
// we don't tell the congestion control object there was loss here - this matches UDTs implementation
// a possible improvement would be to tell it which new loss this timeout packet told us about
_stats.record(ConnectionStats::Stats::ReceivedTimeoutNAK);
}
void Connection::updateRTT(int rtt) {
// This updates the RTT using exponential weighted moving average
// This is the Jacobson's forumla for RTT estimation
// http://www.mathcs.emory.edu/~cheung/Courses/455/Syllabus/7-transport/Jacobson-88.pdf
// Estimated RTT = (1 - x)(estimatedRTT) + (x)(sampleRTT)
// (where x = 0.125 via Jacobson)
// Deviation = (1 - x)(deviation) + x |sampleRTT - estimatedRTT|
// (where x = 0.25 via Jacobson)
static const int RTT_ESTIMATION_ALPHA_NUMERATOR = 8;
static const int RTT_ESTIMATION_VARIANCE_ALPHA_NUMERATOR = 4;
_rtt = (_rtt * (RTT_ESTIMATION_ALPHA_NUMERATOR - 1) + rtt) / RTT_ESTIMATION_ALPHA_NUMERATOR;
_rttVariance = (_rttVariance * (RTT_ESTIMATION_VARIANCE_ALPHA_NUMERATOR - 1)
+ abs(rtt - _rtt)) / RTT_ESTIMATION_VARIANCE_ALPHA_NUMERATOR;
}
int Connection::estimatedTimeout() const {
return _congestionControl->_userDefinedRTO ? _congestionControl->_rto : _rtt + _rttVariance * 4;
}
void Connection::updateCongestionControlAndSendQueue(std::function<void ()> congestionCallback) {
// update the last sent sequence number in congestion control
_congestionControl->setSendCurrentSequenceNumber(getSendQueue().getCurrentSequenceNumber());
// fire congestion control callback
congestionCallback();
// now that we've updated the congestion control, update the packet send period and flow window size
getSendQueue().setPacketSendPeriod(_congestionControl->_packetSendPeriod);
getSendQueue().setFlowWindowSize(std::min(_flowWindowSize, (int) _congestionControl->_congestionWindowSize));
// record connection stats
_stats.recordPacketSendPeriod(_congestionControl->_packetSendPeriod);
_stats.recordCongestionWindowSize(_congestionControl->_congestionWindowSize);
}

View file

@ -0,0 +1,128 @@
//
// Connection.h
// libraries/networking/src/udt
//
// Created by Clement on 7/27/15.
// Copyright 2015 High Fidelity, Inc.
//
// Distributed under the Apache License, Version 2.0.
// See the accompanying file LICENSE or http://www.apache.org/licenses/LICENSE-2.0.html
//
#ifndef hifi_Connection_h
#define hifi_Connection_h
#include <chrono>
#include <list>
#include <memory>
#include <QtCore/QObject>
#include "ConnectionStats.h"
#include "Constants.h"
#include "LossList.h"
#include "PacketTimeWindow.h"
#include "SendQueue.h"
#include "../HifiSockAddr.h"
namespace udt {
class CongestionControl;
class ControlPacket;
class Packet;
class Socket;
class Connection : public QObject {
Q_OBJECT
public:
using SequenceNumberTimePair = std::pair<SequenceNumber, std::chrono::high_resolution_clock::time_point>;
using ACKListPair = std::pair<SequenceNumber, SequenceNumberTimePair>;
using SentACKList = std::list<ACKListPair>;
Connection(Socket* parentSocket, HifiSockAddr destination, std::unique_ptr<CongestionControl> congestionControl);
~Connection();
void sendReliablePacket(std::unique_ptr<Packet> packet);
void sync(); // rate control method, fired by Socket for all connections on SYN interval
// return indicates if this packet was a duplicate
bool processReceivedSequenceNumber(SequenceNumber sequenceNumber, int packetSize, int payloadSize);
void processControl(std::unique_ptr<ControlPacket> controlPacket);
ConnectionStats::Stats sampleStats() { return _stats.sample(); }
signals:
void packetSent();
private slots:
void recordSentPackets(int payload, int total);
void recordRetransmission();
private:
void sendACK(bool wasCausedBySyncTimeout = true);
void sendLightACK();
void sendACK2(SequenceNumber currentACKSubSequenceNumber);
void sendNAK(SequenceNumber sequenceNumberRecieved);
void sendTimeoutNAK();
void processACK(std::unique_ptr<ControlPacket> controlPacket);
void processLightACK(std::unique_ptr<ControlPacket> controlPacket);
void processACK2(std::unique_ptr<ControlPacket> controlPacket);
void processNAK(std::unique_ptr<ControlPacket> controlPacket);
void processTimeoutNAK(std::unique_ptr<ControlPacket> controlPacket);
SendQueue& getSendQueue();
SequenceNumber nextACK() const;
void updateRTT(int rtt);
int estimatedTimeout() const;
void updateCongestionControlAndSendQueue(std::function<void()> congestionCallback);
int _synInterval; // Periodical Rate Control Interval, in microseconds
int _nakInterval; // NAK timeout interval, in microseconds
int _minNAKInterval { 100000 }; // NAK timeout interval lower bound, default of 100ms
std::chrono::high_resolution_clock::time_point _lastNAKTime;
bool _hasReceivedFirstPacket { false };
LossList _lossList; // List of all missing packets
SequenceNumber _lastReceivedSequenceNumber; // The largest sequence number received from the peer
SequenceNumber _lastReceivedACK; // The last ACK received
SequenceNumber _lastReceivedAcknowledgedACK; // The last sent ACK that has been acknowledged via an ACK2 from the peer
SequenceNumber _currentACKSubSequenceNumber; // The current ACK sub-sequence number (used for Acknowledgment of ACKs)
SequenceNumber _lastSentACK; // The last sent ACK
SequenceNumber _lastSentACK2; // The last sent ACK sub-sequence number in an ACK2
int _acksDuringSYN { 1 }; // The number of non-SYN ACKs sent during SYN
int _lightACKsDuringSYN { 1 }; // The number of lite ACKs sent during SYN interval
int32_t _rtt; // RTT, in microseconds
int32_t _rttVariance; // RTT variance
int _flowWindowSize { udt::MAX_PACKETS_IN_FLIGHT }; // Flow control window size
int _bandwidth { 1 }; // Exponential moving average for estimated bandwidth, in packets per second
int _deliveryRate { 16 }; // Exponential moving average for receiver's receive rate, in packets per second
SentACKList _sentACKs; // Map of ACK sub-sequence numbers to ACKed sequence number and sent time
Socket* _parentSocket { nullptr };
HifiSockAddr _destination;
PacketTimeWindow _receiveWindow { 16, 64 }; // Window of interval between packets (16) and probes (64) for bandwidth and receive speed
std::unique_ptr<CongestionControl> _congestionControl;
std::unique_ptr<SendQueue> _sendQueue;
int _packetsSinceACK { 0 }; // The number of packets that have been received during the current ACK interval
ConnectionStats _stats;
};
}
#endif // hifi_Connection_h

View file

@ -0,0 +1,114 @@
//
// ConnectionStats.cpp
// libraries/networking/src/udt
//
// Created by Clement on 7/29/15.
// Copyright 2015 High Fidelity, Inc.
//
// Distributed under the Apache License, Version 2.0.
// See the accompanying file LICENSE or http://www.apache.org/licenses/LICENSE-2.0.html
//
#include "ConnectionStats.h"
using namespace udt;
using namespace std::chrono;
ConnectionStats::ConnectionStats() {
auto now = duration_cast<microseconds>(high_resolution_clock::now().time_since_epoch());
_currentSample.startTime = now;
_total.startTime = now;
}
ConnectionStats::Stats ConnectionStats::sample() {
Stats sample = _currentSample;
_currentSample = Stats();
auto now = duration_cast<microseconds>(high_resolution_clock::now().time_since_epoch());
sample.endTime = now;
_currentSample.startTime = now;
return sample;
}
void ConnectionStats::record(Stats::Event event) {
++_currentSample.events[(int) event];
++_total.events[(int) event];
}
void ConnectionStats::recordSentPackets(int payload, int total) {
++_currentSample.sentPackets;
++_total.sentPackets;
_currentSample.sentUtilBytes += payload;
_total.sentUtilBytes += payload;
_currentSample.sentBytes += total;
_total.sentBytes += total;
}
void ConnectionStats::recordReceivedPackets(int payload, int total) {
++_currentSample.recievedPackets;
++_total.recievedPackets;
_currentSample.recievedUtilBytes += payload;
_total.recievedUtilBytes += payload;
_currentSample.recievedBytes += total;
_total.recievedBytes += total;
}
void ConnectionStats::recordUnreliableSentPackets(int payload, int total) {
++_currentSample.sentUnreliablePackets;
++_total.sentUnreliablePackets;
_currentSample.sentUnreliableUtilBytes += payload;
_total.sentUnreliableUtilBytes += payload;
_currentSample.sentUnreliableBytes += total;
_total.sentUnreliableBytes += total;
}
void ConnectionStats::recordUnreliableReceivedPackets(int payload, int total) {
++_currentSample.recievedUnreliablePackets;
++_total.recievedUnreliablePackets;
_currentSample.recievedUnreliableUtilBytes += payload;
_total.recievedUnreliableUtilBytes += payload;
_currentSample.sentUnreliableBytes += total;
_total.recievedUnreliableBytes += total;
}
static const double EWMA_CURRENT_SAMPLE_WEIGHT = 0.125;
static const double EWMA_PREVIOUS_SAMPLES_WEIGHT = 1.0 - EWMA_CURRENT_SAMPLE_WEIGHT;
void ConnectionStats::recordSendRate(int sample) {
_currentSample.sendRate = sample;
_total.sendRate = (_total.sendRate * EWMA_PREVIOUS_SAMPLES_WEIGHT) + (sample * EWMA_CURRENT_SAMPLE_WEIGHT);
}
void ConnectionStats::recordReceiveRate(int sample) {
_currentSample.receiveRate = sample;
_total.receiveRate = (_total.receiveRate * EWMA_PREVIOUS_SAMPLES_WEIGHT) + (sample * EWMA_CURRENT_SAMPLE_WEIGHT);
}
void ConnectionStats::recordEstimatedBandwidth(int sample) {
_currentSample.estimatedBandwith = sample;
_total.estimatedBandwith = (_total.estimatedBandwith * EWMA_PREVIOUS_SAMPLES_WEIGHT) + (sample * EWMA_CURRENT_SAMPLE_WEIGHT);
}
void ConnectionStats::recordRTT(int sample) {
_currentSample.rtt = sample;
_total.rtt = (_total.rtt * EWMA_PREVIOUS_SAMPLES_WEIGHT) + (sample * EWMA_CURRENT_SAMPLE_WEIGHT);
}
void ConnectionStats::recordCongestionWindowSize(int sample) {
_currentSample.congestionWindowSize = sample;
_total.congestionWindowSize = (_total.congestionWindowSize * EWMA_PREVIOUS_SAMPLES_WEIGHT) + (sample * EWMA_CURRENT_SAMPLE_WEIGHT);
}
void ConnectionStats::recordPacketSendPeriod(int sample) {
_currentSample.packetSendPeriod = sample;
_total.packetSendPeriod = (_total.packetSendPeriod * EWMA_PREVIOUS_SAMPLES_WEIGHT) + (sample * EWMA_CURRENT_SAMPLE_WEIGHT);
}

View file

@ -0,0 +1,96 @@
//
// ConnectionStats.h
// libraries/networking/src/udt
//
// Created by Clement on 7/29/15.
// Copyright 2015 High Fidelity, Inc.
//
// Distributed under the Apache License, Version 2.0.
// See the accompanying file LICENSE or http://www.apache.org/licenses/LICENSE-2.0.html
//
#ifndef hifi_ConnectionStats_h
#define hifi_ConnectionStats_h
#include <chrono>
#include <vector>
namespace udt {
class ConnectionStats {
public:
struct Stats {
std::chrono::microseconds startTime;
std::chrono::microseconds endTime;
enum Event {
SentACK,
ReceivedACK,
ProcessedACK,
SentLightACK,
ReceivedLightACK,
SentACK2,
ReceivedACK2,
SentNAK,
ReceivedNAK,
SentTimeoutNAK,
ReceivedTimeoutNAK,
Retransmission,
Duplicate
};
// construct a vector for the events of the size of our Enum - default value is zero
std::vector<int> events = std::vector<int>((int) Event::Duplicate + 1, 0);
// packet counts and sizes
int sentPackets { 0 };
int recievedPackets { 0 };
int sentUtilBytes { 0 };
int recievedUtilBytes { 0 };
int sentBytes { 0 };
int recievedBytes { 0 };
int sentUnreliablePackets { 0 };
int recievedUnreliablePackets { 0 };
int sentUnreliableUtilBytes { 0 };
int recievedUnreliableUtilBytes { 0 };
int sentUnreliableBytes { 0 };
int recievedUnreliableBytes { 0 };
// the following stats are trailing averages in the result, not totals
int sendRate { 0 };
int receiveRate { 0 };
int estimatedBandwith { 0 };
int rtt { 0 };
int congestionWindowSize { 0 };
int packetSendPeriod { 0 };
};
ConnectionStats();
Stats sample();
Stats getTotalStats();
void record(Stats::Event event);
void recordSentPackets(int payload, int total);
void recordReceivedPackets(int payload, int total);
void recordUnreliableSentPackets(int payload, int total);
void recordUnreliableReceivedPackets(int payload, int total);
void recordSendRate(int sample);
void recordReceiveRate(int sample);
void recordEstimatedBandwidth(int sample);
void recordRTT(int sample);
void recordCongestionWindowSize(int sample);
void recordPacketSendPeriod(int sample);
private:
Stats _currentSample;
Stats _total;
};
}
#endif // hifi_ConnectionStats_h

View file

@ -0,0 +1,32 @@
//
// Constants.h
// libraries/networking/src/udt
//
// Created by Clement on 7/13/15.
// Copyright 2015 High Fidelity, Inc.
//
// Distributed under the Apache License, Version 2.0.
// See the accompanying file LICENSE or http://www.apache.org/licenses/LICENSE-2.0.html
//
#pragma once
#ifndef hifi_udt_Constants_h
#define hifi_udt_Constants_h
#include "SequenceNumber.h"
namespace udt {
static const int MAX_PACKET_SIZE_WITH_UDP_HEADER = 1500;
static const int MAX_PACKET_SIZE = MAX_PACKET_SIZE_WITH_UDP_HEADER - 28;
static const int MAX_PACKETS_IN_FLIGHT = 25600;
static const int CONNECTION_RECEIVE_BUFFER_SIZE_PACKETS = 8192;
static const int CONNECTION_SEND_BUFFER_SIZE_PACKETS = 8192;
static const int UDP_SEND_BUFFER_SIZE_BYTES = 1048576;
static const int UDP_RECEIVE_BUFFER_SIZE_BYTES = 1048576;
static const int DEFAULT_SYN_INTERVAL_USECS = 10 * 1000;
static const int SEQUENCE_NUMBER_BITS = sizeof(SequenceNumber) * 8;
static const uint32_t CONTROL_BIT_MASK = uint32_t(1) << (SEQUENCE_NUMBER_BITS - 1);
}
#endif // hifi_udt_Constants_h

View file

@ -11,51 +11,60 @@
#include "ControlPacket.h"
#include "Constants.h"
using namespace udt;
std::unique_ptr<ControlPacket> ControlPacket::create(Type type, const SequenceNumberList& sequenceNumbers) {
return ControlPacket::create(type, sequenceNumbers);
int ControlPacket::localHeaderSize() {
return sizeof(ControlPacket::ControlBitAndType);
}
int ControlPacket::totalHeaderSize() {
return BasePacket::totalHeaderSize() + ControlPacket::localHeaderSize();
}
int ControlPacket::maxPayloadSize() {
return BasePacket::maxPayloadSize() - ControlPacket::localHeaderSize();
}
ControlPacket::ControlPacketPair ControlPacket::createPacketPair(quint64 timestamp) {
// create each of the two packets in the packet pair
ControlPacketPair packetPair { std::unique_ptr<ControlPacket>(new ControlPacket(timestamp)),
std::unique_ptr<ControlPacket>(new ControlPacket(timestamp)) };
return packetPair;
std::unique_ptr<ControlPacket> ControlPacket::fromReceivedPacket(std::unique_ptr<char[]> data, qint64 size,
const HifiSockAddr &senderSockAddr) {
// Fail with null data
Q_ASSERT(data);
// Fail with invalid size
Q_ASSERT(size >= 0);
// allocate memory
auto packet = std::unique_ptr<ControlPacket>(new ControlPacket(std::move(data), size, senderSockAddr));
packet->open(QIODevice::ReadOnly);
return packet;
}
qint64 ControlPacket::localHeaderSize() {
return sizeof(TypeAndSubSequenceNumber);
std::unique_ptr<ControlPacket> ControlPacket::create(Type type, qint64 size) {
return std::unique_ptr<ControlPacket>(new ControlPacket(type, size));
}
qint64 ControlPacket::totalHeadersSize() const {
return BasePacket::totalHeadersSize() + localHeaderSize();
}
ControlPacket::ControlPacket(Type type, const SequenceNumberList& sequenceNumbers) :
BasePacket(localHeaderSize() + (sizeof(Packet::SequenceNumber) * sequenceNumbers.size())),
ControlPacket::ControlPacket(Type type, qint64 size) :
BasePacket((size == -1) ? -1 : ControlPacket::localHeaderSize() + size),
_type(type)
{
adjustPayloadStartAndCapacity();
adjustPayloadStartAndCapacity(ControlPacket::localHeaderSize());
open(QIODevice::ReadWrite);
// pack in the sequence numbers
for (auto& sequenceNumber : sequenceNumbers) {
writePrimitive(sequenceNumber);
}
writeType();
}
ControlPacket::ControlPacket(quint64 timestamp) :
BasePacket(localHeaderSize() + sizeof(timestamp)),
_type(Type::PacketPair)
ControlPacket::ControlPacket(std::unique_ptr<char[]> data, qint64 size, const HifiSockAddr& senderSockAddr) :
BasePacket(std::move(data), size, senderSockAddr)
{
adjustPayloadStartAndCapacity();
// sanity check before we decrease the payloadSize with the payloadCapacity
Q_ASSERT(_payloadSize == _payloadCapacity);
open(QIODevice::ReadWrite);
adjustPayloadStartAndCapacity(ControlPacket::localHeaderSize(), _payloadSize > 0);
// pack in the timestamp
writePrimitive(timestamp);
readType();
}
ControlPacket::ControlPacket(ControlPacket&& other) :
@ -71,3 +80,28 @@ ControlPacket& ControlPacket::operator=(ControlPacket&& other) {
return *this;
}
void ControlPacket::setType(udt::ControlPacket::Type type) {
_type = type;
writeType();
}
void ControlPacket::writeType() {
ControlBitAndType* bitAndType = reinterpret_cast<ControlBitAndType*>(_packet.get());
// We override the control bit here by writing the type but it's okay, it'll always be 1
*bitAndType = CONTROL_BIT_MASK | (ControlBitAndType(_type) << (8 * sizeof(Type)));
}
void ControlPacket::readType() {
ControlBitAndType bitAndType = *reinterpret_cast<ControlBitAndType*>(_packet.get());
Q_ASSERT_X(bitAndType & CONTROL_BIT_MASK, "ControlPacket::readHeader()", "This should be a control packet");
uint16_t packetType = (bitAndType & ~CONTROL_BIT_MASK) >> (8 * sizeof(Type));
Q_ASSERT_X(packetType <= ControlPacket::Type::TimeoutNAK, "ControlPacket::readType()", "Received a control packet with wrong type");
// read the type
_type = (Type) packetType;
}

View file

@ -21,39 +21,47 @@
namespace udt {
using SequenceNumberList = std::vector<Packet::SequenceNumber>;
class ControlPacket : public BasePacket {
Q_OBJECT
public:
using TypeAndSubSequenceNumber = uint32_t;
using ControlPacketPair = std::pair<std::unique_ptr<ControlPacket>, std::unique_ptr<ControlPacket>>;
using ControlBitAndType = uint32_t;
enum class Type : uint16_t {
enum Type : uint16_t {
ACK,
ACK2,
NAK,
PacketPair
TimeoutNAK
};
std::unique_ptr<ControlPacket> create(Type type, const SequenceNumberList& sequenceNumbers);
ControlPacketPair createPacketPair(quint64 timestamp);
static std::unique_ptr<ControlPacket> create(Type type, qint64 size = -1);
static std::unique_ptr<ControlPacket> fromReceivedPacket(std::unique_ptr<char[]> data, qint64 size,
const HifiSockAddr& senderSockAddr);
// Current level's header size
static int localHeaderSize();
// Cumulated size of all the headers
static int totalHeaderSize();
// The maximum payload size this packet can use to fit in MTU
static int maxPayloadSize();
static qint64 localHeaderSize(); // Current level's header size
virtual qint64 totalHeadersSize() const; // Cumulated size of all the headers
Type getType() const { return _type; }
void setType(Type type);
private:
ControlPacket(Type type, const SequenceNumberList& sequenceNumbers);
ControlPacket(quint64 timestamp);
ControlPacket(Type type, qint64 size = -1);
ControlPacket(std::unique_ptr<char[]> data, qint64 size, const HifiSockAddr& senderSockAddr);
ControlPacket(ControlPacket&& other);
ControlPacket(const ControlPacket& other) = delete;
ControlPacket& operator=(ControlPacket&& other);
ControlPacket& operator=(const ControlPacket& other) = delete;
// Header read/write
void readType();
void writeType();
Type _type;
};
} // namespace udt

View file

@ -0,0 +1,183 @@
//
// LossList.cpp
// libraries/networking/src/udt
//
// Created by Clement on 7/27/15.
// Copyright 2015 High Fidelity, Inc.
//
// Distributed under the Apache License, Version 2.0.
// See the accompanying file LICENSE or http://www.apache.org/licenses/LICENSE-2.0.html
//
#include "LossList.h"
#include "ControlPacket.h"
using namespace udt;
using namespace std;
void LossList::append(SequenceNumber seq) {
Q_ASSERT_X(_lossList.empty() || (_lossList.back().second < seq), "LossList::append(SequenceNumber)",
"SequenceNumber appended is not greater than the last SequenceNumber in the list");
if (getLength() > 0 && _lossList.back().second + 1 == seq) {
++_lossList.back().second;
} else {
_lossList.push_back(make_pair(seq, seq));
}
_length += 1;
}
void LossList::append(SequenceNumber start, SequenceNumber end) {
Q_ASSERT_X(_lossList.empty() || (_lossList.back().second < start),
"LossList::append(SequenceNumber, SequenceNumber)",
"SequenceNumber range appended is not greater than the last SequenceNumber in the list");
Q_ASSERT_X(start <= end,
"LossList::append(SequenceNumber, SequenceNumber)", "Range start greater than range end");
if (getLength() > 0 && _lossList.back().second + 1 == start) {
_lossList.back().second = end;
} else {
_lossList.push_back(make_pair(start, end));
}
_length += seqlen(start, end);
}
void LossList::insert(SequenceNumber start, SequenceNumber end) {
Q_ASSERT_X(start <= end,
"LossList::insert(SequenceNumber, SequenceNumber)", "Range start greater than range end");
auto it = find_if_not(_lossList.begin(), _lossList.end(), [&start](pair<SequenceNumber, SequenceNumber> pair){
return pair.second < start;
});
if (it == _lossList.end() || end < it->first) {
// No overlap, simply insert
_length += seqlen(start, end);
_lossList.insert(it, make_pair(start, end));
} else {
// If it starts before segment, extend segment
if (start < it->first) {
_length += seqlen(start, it->first - 1);
it->first = start;
}
// If it ends after segment, extend segment
if (end > it->second) {
_length += seqlen(it->second + 1, end);
it->second = end;
}
auto it2 = it;
++it2;
// For all ranges touching the current range
while (it2 != _lossList.end() && it->second >= it2->first - 1) {
// extend current range if necessary
if (it->second < it2->second) {
_length += seqlen(it->second + 1, it2->second);
it->second = it2->second;
}
// Remove overlapping range
_length -= seqlen(it2->first, it2->second);
it2 = _lossList.erase(it2);
}
}
}
bool LossList::remove(SequenceNumber seq) {
auto it = find_if(_lossList.begin(), _lossList.end(), [&seq](pair<SequenceNumber, SequenceNumber> pair) {
return pair.first <= seq && seq <= pair.second;
});
if (it != end(_lossList)) {
if (it->first == it->second) {
_lossList.erase(it);
} else if (seq == it->first) {
++it->first;
} else if (seq == it->second) {
--it->second;
} else {
auto temp = it->second;
it->second = seq - 1;
_lossList.insert(++it, make_pair(seq + 1, temp));
}
_length -= 1;
// this sequence number was found in the loss list, return true
return true;
} else {
// this sequence number was not found in the loss list, return false
return false;
}
}
void LossList::remove(SequenceNumber start, SequenceNumber end) {
Q_ASSERT_X(start <= end,
"LossList::remove(SequenceNumber, SequenceNumber)", "Range start greater than range end");
// Find the first segment sharing sequence numbers
auto it = find_if(_lossList.begin(), _lossList.end(), [&start, &end](pair<SequenceNumber, SequenceNumber> pair) {
return (pair.first <= start && start <= pair.second) || (start <= pair.first && pair.first <= end);
});
// If we found one
if (it != _lossList.end()) {
// While the end of the current segment is contained, either shorten it (first one only - sometimes)
// or remove it altogether since it is fully contained it the range
while (it != _lossList.end() && end >= it->second) {
if (start <= it->first) {
// Segment is contained, update new length and erase it.
_length -= seqlen(it->first, it->second);
it = _lossList.erase(it);
} else {
// Beginning of segment not contained, modify end of segment.
// Will only occur sometimes one the first loop
_length -= seqlen(start, it->second);
it->second = start - 1;
++it;
}
}
// There might be more to remove
if (it != _lossList.end() && it->first <= end) {
if (start <= it->first) {
// Truncate beginning of segment
_length -= seqlen(it->first, end);
it->first = end + 1;
} else {
// Cut it in half if the range we are removing is contained within one segment
_length -= seqlen(start, end);
auto temp = it->second;
it->second = start - 1;
_lossList.insert(++it, make_pair(end + 1, temp));
}
}
}
}
SequenceNumber LossList::getFirstSequenceNumber() const {
Q_ASSERT_X(getLength() > 0, "LossList::getFirstSequenceNumber()", "Trying to get first element of an empty list");
return _lossList.front().first;
}
SequenceNumber LossList::popFirstSequenceNumber() {
auto front = getFirstSequenceNumber();
remove(front);
return front;
}
void LossList::write(ControlPacket& packet, int maxPairs) {
int writtenPairs = 0;
for(const auto& pair : _lossList) {
packet.writePrimitive(pair.first);
packet.writePrimitive(pair.second);
++writtenPairs;
// check if we've written the maximum number we were told to write
if (maxPairs != -1 && writtenPairs >= maxPairs) {
break;
}
}
}

View file

@ -0,0 +1,52 @@
//
// LossList.h
// libraries/networking/src/udt
//
// Created by Clement on 7/27/15.
// Copyright 2015 High Fidelity, Inc.
//
// Distributed under the Apache License, Version 2.0.
// See the accompanying file LICENSE or http://www.apache.org/licenses/LICENSE-2.0.html
//
#ifndef hifi_LossList_h
#define hifi_LossList_h
#include <list>
#include "SequenceNumber.h"
namespace udt {
class ControlPacket;
class LossList {
public:
LossList() {}
void clear() { _length = 0; _lossList.clear(); }
// must always add at the end - faster than insert
void append(SequenceNumber seq);
void append(SequenceNumber start, SequenceNumber end);
// inserts anywhere - MUCH slower
void insert(SequenceNumber start, SequenceNumber end);
bool remove(SequenceNumber seq);
void remove(SequenceNumber start, SequenceNumber end);
int getLength() const { return _length; }
SequenceNumber getFirstSequenceNumber() const;
SequenceNumber popFirstSequenceNumber();
void write(ControlPacket& packet, int maxPairs = -1);
private:
std::list<std::pair<SequenceNumber, SequenceNumber>> _lossList;
int _length { 0 };
};
}
#endif // hifi_LossList_h

View file

@ -13,6 +13,20 @@
using namespace udt;
int Packet::localHeaderSize(bool isPartOfMessage) {
return sizeof(Packet::SequenceNumberAndBitField) +
(isPartOfMessage ? sizeof(Packet::MessageNumberAndBitField) : 0);
}
int Packet::totalHeaderSize(bool isPartOfMessage) {
return BasePacket::totalHeaderSize() + Packet::localHeaderSize(isPartOfMessage);
}
int Packet::maxPayloadSize(bool isPartOfMessage) {
return BasePacket::maxPayloadSize() - Packet::localHeaderSize(isPartOfMessage);
}
std::unique_ptr<Packet> Packet::create(qint64 size, bool isReliable, bool isPartOfMessage) {
auto packet = std::unique_ptr<Packet>(new Packet(size, isReliable, isPartOfMessage));
@ -37,45 +51,23 @@ std::unique_ptr<Packet> Packet::createCopy(const Packet& other) {
return std::unique_ptr<Packet>(new Packet(other));
}
qint64 Packet::maxPayloadSize(bool isPartOfMessage) {
return MAX_PACKET_SIZE - localHeaderSize(isPartOfMessage);
}
qint64 Packet::localHeaderSize(bool isPartOfMessage) {
return sizeof(SequenceNumberAndBitField) + (isPartOfMessage ? sizeof(MessageNumberAndBitField) : 0);
}
qint64 Packet::maxPayloadSize() const {
return MAX_PACKET_SIZE - localHeaderSize();
}
qint64 Packet::totalHeadersSize() const {
return BasePacket::localHeaderSize() + localHeaderSize();
}
qint64 Packet::localHeaderSize() const {
return localHeaderSize(_isPartOfMessage);
}
Packet::Packet(qint64 size, bool isReliable, bool isPartOfMessage) :
BasePacket(size),
BasePacket((size == -1) ? -1 : (Packet::localHeaderSize() + size)),
_isReliable(isReliable),
_isPartOfMessage(isPartOfMessage)
{
adjustPayloadStartAndCapacity();
adjustPayloadStartAndCapacity(Packet::localHeaderSize(_isPartOfMessage));
// set the UDT header to default values
writeSequenceNumber(0);
writeHeader();
}
Packet::Packet(std::unique_ptr<char[]> data, qint64 size, const HifiSockAddr& senderSockAddr) :
BasePacket(std::move(data), size, senderSockAddr)
{
readIsReliable();
readIsPartOfMessage();
readSequenceNumber();
readHeader();
adjustPayloadStartAndCapacity(_payloadSize > 0);
adjustPayloadStartAndCapacity(Packet::localHeaderSize(_isPartOfMessage), _payloadSize > 0);
}
Packet::Packet(const Packet& other) :
@ -88,7 +80,7 @@ Packet::Packet(const Packet& other) :
Packet& Packet::operator=(const Packet& other) {
BasePacket::operator=(other);
_isReliable = other._isReliable;
_isPartOfMessage = other._isPartOfMessage;
_sequenceNumber = other._sequenceNumber;
@ -114,36 +106,39 @@ Packet& Packet::operator=(Packet&& other) {
return *this;
}
static const uint32_t CONTROL_BIT_MASK = 1 << (sizeof(Packet::SequenceNumberAndBitField) - 1);
static const uint32_t RELIABILITY_BIT_MASK = 1 << (sizeof(Packet::SequenceNumberAndBitField) - 2);
static const uint32_t MESSAGE_BIT_MASK = 1 << (sizeof(Packet::SequenceNumberAndBitField) - 3);
static const int BIT_FIELD_LENGTH = 3;
void Packet::writeSequenceNumber(SequenceNumber sequenceNumber) const {
_sequenceNumber = sequenceNumber;
writeHeader();
}
static const uint32_t RELIABILITY_BIT_MASK = uint32_t(1) << (SEQUENCE_NUMBER_BITS - 2);
static const uint32_t MESSAGE_BIT_MASK = uint32_t(1) << (SEQUENCE_NUMBER_BITS - 3);
static const uint32_t BIT_FIELD_MASK = CONTROL_BIT_MASK | RELIABILITY_BIT_MASK | MESSAGE_BIT_MASK;
void Packet::readIsReliable() {
void Packet::readHeader() const {
SequenceNumberAndBitField seqNumBitField = *reinterpret_cast<SequenceNumberAndBitField*>(_packet.get());
_isReliable = (bool) (seqNumBitField & RELIABILITY_BIT_MASK); // Only keep reliability bit
}
void Packet::readIsPartOfMessage() {
SequenceNumberAndBitField seqNumBitField = *reinterpret_cast<SequenceNumberAndBitField*>(_packet.get());
_isReliable = (bool) (seqNumBitField & MESSAGE_BIT_MASK); // Only keep message bit
}
void Packet::readSequenceNumber() {
SequenceNumberAndBitField seqNumBitField = *reinterpret_cast<SequenceNumberAndBitField*>(_packet.get());
_sequenceNumber = (seqNumBitField & ~BIT_FIELD_MASK); // Remove the bit field
}
static const uint32_t MAX_SEQUENCE_NUMBER = UINT32_MAX >> BIT_FIELD_LENGTH;
void Packet::writeSequenceNumber(SequenceNumber sequenceNumber) {
// make sure this is a sequence number <= 29 bit unsigned max (536,870,911)
Q_ASSERT(sequenceNumber <= MAX_SEQUENCE_NUMBER);
Q_ASSERT_X(!(seqNumBitField & CONTROL_BIT_MASK), "Packet::readHeader()", "This should be a data packet");
_isReliable = (bool) (seqNumBitField & RELIABILITY_BIT_MASK); // Only keep reliability bit
_isPartOfMessage = (bool) (seqNumBitField & MESSAGE_BIT_MASK); // Only keep message bit
_sequenceNumber = SequenceNumber{ seqNumBitField & ~BIT_FIELD_MASK }; // Remove the bit field
}
void Packet::writeHeader() const {
// grab pointer to current SequenceNumberAndBitField
SequenceNumberAndBitField* seqNumBitField = reinterpret_cast<SequenceNumberAndBitField*>(_packet.get());
// write new value by ORing (old value & BIT_FIELD_MASK) with new seqNum
*seqNumBitField = (*seqNumBitField & BIT_FIELD_MASK) | sequenceNumber;
// Write sequence number and reset bit field
Q_ASSERT_X(!((SequenceNumber::Type)_sequenceNumber & BIT_FIELD_MASK),
"Packet::writeHeader()", "Sequence number is overflowing into bit field");
*seqNumBitField = ((SequenceNumber::Type)_sequenceNumber);
if (_isReliable) {
*seqNumBitField |= RELIABILITY_BIT_MASK;
}
if (_isPartOfMessage) {
*seqNumBitField |= MESSAGE_BIT_MASK;
}
}

View file

@ -18,6 +18,7 @@
#include "BasePacket.h"
#include "PacketHeaders.h"
#include "SequenceNumber.h"
namespace udt {
@ -25,32 +26,30 @@ class Packet : public BasePacket {
Q_OBJECT
public:
// NOTE: The SequenceNumber is only actually 29 bits to leave room for a bit field
using SequenceNumber = uint32_t;
using SequenceNumberAndBitField = uint32_t;
// NOTE: The MessageNumber is only actually 29 bits to leave room for a bit field
using MessageNumber = uint32_t;
using MessageNumberAndBitField = uint32_t;
static const uint32_t DEFAULT_SEQUENCE_NUMBER = 0;
static std::unique_ptr<Packet> create(qint64 size = -1, bool isReliable = false, bool isPartOfMessage = false);
static std::unique_ptr<Packet> fromReceivedPacket(std::unique_ptr<char[]> data, qint64 size, const HifiSockAddr& senderSockAddr);
// Provided for convenience, try to limit use
static std::unique_ptr<Packet> createCopy(const Packet& other);
// The maximum payload size this packet can use to fit in MTU
static qint64 maxPayloadSize(bool isPartOfMessage);
virtual qint64 maxPayloadSize() const;
// Current level's header size
static qint64 localHeaderSize(bool isPartOfMessage);
virtual qint64 localHeaderSize() const;
static int localHeaderSize(bool isPartOfMessage = false);
// Cumulated size of all the headers
static int totalHeaderSize(bool isPartOfMessage = false);
// The maximum payload size this packet can use to fit in MTU
static int maxPayloadSize(bool isPartOfMessage = false);
virtual qint64 totalHeadersSize() const; // Cumulated size of all the headers
void writeSequenceNumber(SequenceNumber sequenceNumber);
bool isPartOfMessage() const { return _isPartOfMessage; }
bool isReliable() const { return _isReliable; }
SequenceNumber getSequenceNumber() const { return _sequenceNumber; }
void writeSequenceNumber(SequenceNumber sequenceNumber) const;
protected:
Packet(qint64 size, bool isReliable = false, bool isPartOfMessage = false);
@ -62,14 +61,15 @@ protected:
Packet& operator=(const Packet& other);
Packet& operator=(Packet&& other);
private:
// Header readers - these read data to member variables after pulling packet off wire
void readIsPartOfMessage();
void readIsReliable();
void readSequenceNumber();
void readHeader() const;
void writeHeader() const;
bool _isReliable { false };
bool _isPartOfMessage { false };
SequenceNumber _sequenceNumber { 0 };
// Simple holders to prevent multiple reading and bitwise ops
mutable bool _isReliable { false };
mutable bool _isPartOfMessage { false };
mutable SequenceNumber _sequenceNumber;
};
} // namespace udt

View file

@ -32,16 +32,14 @@ const QSet<PacketType> NON_SOURCED_PACKETS = QSet<PacketType>()
const QSet<PacketType> RELIABLE_PACKETS = QSet<PacketType>();
PacketVersion versionForPacketType(PacketType::Value packetType) {
PacketVersion versionForPacketType(PacketType packetType) {
switch (packetType) {
case PacketType::EntityAdd:
case PacketType::EntityEdit:
case PacketType::EntityData:
return VERSION_ENTITIES_PROTOCOL_HEADER_SWAP;
case AvatarData:
return 13;
default:
return 12;
return 14;
}
}

View file

@ -21,8 +21,6 @@
#include <QtCore/QSet>
#include <QtCore/QUuid>
#include "UUID.h"
// If adding a new packet packetType, you can replace one marked usable or add at the end.
// If you want the name of the packet packetType to be available for debugging or logging, update nameForPacketType() as well
// This enum must hold 256 or fewer packet types (so the value is <= 255) since it is statically typed as a uint8_t

View file

@ -0,0 +1,121 @@
//
// PacketTimeWindow.cpp
// libraries/networking/src/udt
//
// Created by Stephen Birarda on 2015-07-28.
// Copyright 2015 High Fidelity, Inc.
//
// Distributed under the Apache License, Version 2.0.
// See the accompanying file LICENSE or http://www.apache.org/licenses/LICENSE-2.0.html
//
#include "PacketTimeWindow.h"
#include <numeric>
#include <cmath>
#include <NumericalConstants.h>
using namespace udt;
using namespace std::chrono;
static const int DEFAULT_PACKET_INTERVAL_MICROSECONDS = 1000000; // 1s
static const int DEFAULT_PROBE_INTERVAL_MICROSECONDS = 1000; // 1ms
PacketTimeWindow::PacketTimeWindow(int numPacketIntervals, int numProbeIntervals) :
_numPacketIntervals(numPacketIntervals),
_numProbeIntervals(numProbeIntervals),
_packetIntervals(_numPacketIntervals, DEFAULT_PACKET_INTERVAL_MICROSECONDS),
_probeIntervals(_numProbeIntervals, DEFAULT_PROBE_INTERVAL_MICROSECONDS)
{
}
template <typename Iterator>
int median(Iterator begin, Iterator end) {
// use std::nth_element to grab the middle - for an even number of elements this is the upper middle
Iterator middle = begin + (end - begin) / 2;
std::nth_element(begin, middle, end);
if ((end - begin) % 2 != 0) {
// odd number of elements, just return the middle
return *middle;
} else {
// even number of elements, return the mean of the upper middle and the lower middle
Iterator lowerMiddle = std::max_element(begin, middle);
return (*middle + *lowerMiddle) / 2;
}
}
int32_t meanOfMedianFilteredValues(std::vector<int> intervals, int numValues, int valuesRequired = 0) {
// grab the median value of the intervals vector
int intervalsMedian = median(intervals.begin(), intervals.end());
// figure out our bounds for median filtering
static const int MEDIAN_FILTERING_BOUND_MULTIPLIER = 8;
int upperBound = intervalsMedian * MEDIAN_FILTERING_BOUND_MULTIPLIER;
int lowerBound = intervalsMedian / MEDIAN_FILTERING_BOUND_MULTIPLIER;
int sum = 0;
int count = 0;
// sum the values that are inside the median filtered bounds
for (auto& interval : intervals) {
if ((interval < upperBound) && (interval > lowerBound)) {
++count;
sum += interval;
}
}
// make sure we hit our threshold of values required
if (count >= valuesRequired) {
// return the frequency (per second) for the mean interval
static const double USECS_PER_SEC = 1000000.0;
return (int32_t) ceil(USECS_PER_SEC / (((double) sum) / ((double) count)));
} else {
return 0;
}
}
int32_t PacketTimeWindow::getPacketReceiveSpeed() const {
// return the mean value of median filtered values (per second) - or zero if there are too few filtered values
return meanOfMedianFilteredValues(_packetIntervals, _numPacketIntervals, _numPacketIntervals / 2);
}
int32_t PacketTimeWindow::getEstimatedBandwidth() const {
// return mean value of median filtered values (per second)
return meanOfMedianFilteredValues(_probeIntervals, _numProbeIntervals);
}
void PacketTimeWindow::onPacketArrival() {
// take the current time
auto now = high_resolution_clock::now();
// record the interval between this packet and the last one
_packetIntervals[_currentPacketInterval++] = duration_cast<microseconds>(now - _lastPacketTime).count();
// reset the currentPacketInterval index when it wraps
if (_currentPacketInterval == _numPacketIntervals) {
_currentPacketInterval = 0;
}
// remember this as the last packet arrival time
_lastPacketTime = now;
}
void PacketTimeWindow::onProbePair1Arrival() {
// take the current time as the first probe time
_firstProbeTime = high_resolution_clock::now();
}
void PacketTimeWindow::onProbePair2Arrival() {
// store the interval between the two probes
auto now = high_resolution_clock::now();
_probeIntervals[_currentProbeInterval++] = duration_cast<microseconds>(now - _firstProbeTime).count();
// reset the currentProbeInterval index when it wraps
if (_currentProbeInterval == _numProbeIntervals) {
_currentProbeInterval = 0;
}
}

View file

@ -0,0 +1,48 @@
//
// PacketTimeWindow.h
// libraries/networking/src/udt
//
// Created by Stephen Birarda on 2015-07-28.
// Copyright 2015 High Fidelity, Inc.
//
// Distributed under the Apache License, Version 2.0.
// See the accompanying file LICENSE or http://www.apache.org/licenses/LICENSE-2.0.html
//
#pragma once
#ifndef hifi_PacketTimeWindow_h
#define hifi_PacketTimeWindow_h
#include <chrono>
#include <vector>
namespace udt {
class PacketTimeWindow {
public:
PacketTimeWindow(int numPacketIntervals = 16, int numProbeIntervals = 16);
void onPacketArrival();
void onProbePair1Arrival();
void onProbePair2Arrival();
int32_t getPacketReceiveSpeed() const;
int32_t getEstimatedBandwidth() const;
private:
int _numPacketIntervals { 0 }; // the number of packet intervals to store
int _numProbeIntervals { 0 }; // the number of probe intervals to store
int _currentPacketInterval { 0 }; // index for the current packet interval
int _currentProbeInterval { 0 }; // index for the current probe interval
std::vector<int> _packetIntervals; // vector of microsecond intervals between packet arrivals
std::vector<int> _probeIntervals; // vector of microsecond intervals between probe pair arrivals
std::chrono::high_resolution_clock::time_point _lastPacketTime; // the time_point when last packet arrived
std::chrono::high_resolution_clock::time_point _firstProbeTime; // the time_point when first probe in pair arrived
};
}
#endif // hifi_PacketTimeWindow_h

View file

@ -0,0 +1,254 @@
//
// SendQueue.cpp
// libraries/networking/src/udt
//
// Created by Clement on 7/21/15.
// Copyright 2015 High Fidelity, Inc.
//
// Distributed under the Apache License, Version 2.0.
// See the accompanying file LICENSE or http://www.apache.org/licenses/LICENSE-2.0.html
//
#include "SendQueue.h"
#include <algorithm>
#include <QtCore/QCoreApplication>
#include <QtCore/QThread>
#include <SharedUtil.h>
#include "ControlPacket.h"
#include "Packet.h"
#include "Socket.h"
using namespace udt;
using namespace std::chrono;
std::unique_ptr<SendQueue> SendQueue::create(Socket* socket, HifiSockAddr destination) {
auto queue = std::unique_ptr<SendQueue>(new SendQueue(socket, destination));
Q_ASSERT_X(socket, "SendQueue::create", "Must be called with a valid Socket*");
// Setup queue private thread
QThread* thread = new QThread();
thread->setObjectName("Networking: SendQueue " + destination.objectName()); // Name thread for easier debug
connect(thread, &QThread::started, queue.get(), &SendQueue::run);
connect(queue.get(), &QObject::destroyed, thread, &QThread::quit); // Thread auto cleanup
connect(thread, &QThread::finished, thread, &QThread::deleteLater); // Thread auto cleanup
// Move queue to private thread and start it
queue->moveToThread(thread);
thread->start();
return std::move(queue);
}
SendQueue::SendQueue(Socket* socket, HifiSockAddr dest) :
_socket(socket),
_destination(dest)
{
}
void SendQueue::queuePacket(std::unique_ptr<Packet> packet) {
{
QWriteLocker locker(&_packetsLock);
_packets.push_back(std::move(packet));
}
if (!this->thread()->isRunning()) {
this->thread()->start();
}
}
void SendQueue::stop() {
_isRunning = false;
}
void SendQueue::sendPacket(const Packet& packet) {
_socket->writeDatagram(packet.getData(), packet.getDataSize(), _destination);
}
void SendQueue::ack(SequenceNumber ack) {
if (_lastACKSequenceNumber == (uint32_t) ack) {
return;
}
{ // remove any ACKed packets from the map of sent packets
QWriteLocker locker(&_sentLock);
for (auto seq = SequenceNumber { (uint32_t) _lastACKSequenceNumber }; seq <= ack; ++seq) {
_sentPackets.erase(seq);
}
}
{ // remove any sequence numbers equal to or lower than this ACK in the loss list
QWriteLocker nakLocker(&_naksLock);
if (_naks.getLength() > 0 && _naks.getFirstSequenceNumber() <= ack) {
_naks.remove(_naks.getFirstSequenceNumber(), ack);
}
}
_lastACKSequenceNumber = (uint32_t) ack;
}
void SendQueue::nak(SequenceNumber start, SequenceNumber end) {
QWriteLocker locker(&_naksLock);
_naks.insert(start, end);
}
void SendQueue::overrideNAKListFromPacket(ControlPacket& packet) {
QWriteLocker locker(&_naksLock);
_naks.clear();
SequenceNumber first, second;
while (packet.bytesLeftToRead() >= (qint64)(2 * sizeof(SequenceNumber))) {
packet.readPrimitive(&first);
packet.readPrimitive(&second);
if (first == second) {
_naks.append(first);
} else {
_naks.append(first, second);
}
}
}
SequenceNumber SendQueue::getNextSequenceNumber() {
_atomicCurrentSequenceNumber = (SequenceNumber::Type)++_currentSequenceNumber;
return _currentSequenceNumber;
}
void SendQueue::sendNewPacketAndAddToSentList(std::unique_ptr<Packet> newPacket, SequenceNumber sequenceNumber) {
// write the sequence number and send the packet
newPacket->writeSequenceNumber(sequenceNumber);
sendPacket(*newPacket);
// Save packet/payload size before we move it
auto packetSize = newPacket->getDataSize();
auto payloadSize = newPacket->getPayloadSize();
{
// Insert the packet we have just sent in the sent list
QWriteLocker locker(&_sentLock);
_sentPackets[newPacket->getSequenceNumber()].swap(newPacket);
Q_ASSERT_X(!newPacket, "SendQueue::sendNewPacketAndAddToSentList()", "Overriden packet in sent list");
}
emit packetSent(packetSize, payloadSize);
}
void SendQueue::run() {
_isRunning = true;
while (_isRunning) {
// Record timing
_lastSendTimestamp = high_resolution_clock::now();
bool resentPacket = false;
// the following while makes sure that we find a packet to re-send, if there is one
while (!resentPacket) {
QWriteLocker naksLocker(&_naksLock);
if (_naks.getLength() > 0) {
// pull the sequence number we need to re-send
SequenceNumber resendNumber = _naks.popFirstSequenceNumber();
naksLocker.unlock();
// pull the packet to re-send from the sent packets list
QReadLocker sentLocker(&_sentLock);
// see if we can find the packet to re-send
auto it = _sentPackets.find(resendNumber);
if (it != _sentPackets.end()) {
// we found the packet - grab it
auto& resendPacket = *(it->second);
// unlock the sent packets
sentLocker.unlock();
// send it off
sendPacket(resendPacket);
emit packetRetransmitted();
// mark that we did resend a packet
resentPacket = true;
// break out of our while now that we have re-sent a packet
break;
} else {
// we didn't find this packet in the sentPackets queue - assume this means it was ACKed
// we'll fire the loop again to see if there is another to re-send
continue;
}
}
// break from the while, we didn't resend a packet
break;
}
// if we didn't find a packet to re-send AND we think we can fit a new packet on the wire
// (this is according to the current flow window size) then we send out a new packet
if (!resentPacket
&& seqlen(SequenceNumber { (uint32_t) _lastACKSequenceNumber }, _currentSequenceNumber) <= _flowWindowSize) {
// we didn't re-send a packet, so time to send a new one
QWriteLocker locker(&_packetsLock);
if (_packets.size() > 0) {
SequenceNumber nextNumber = getNextSequenceNumber();
// grab the first packet we will send
std::unique_ptr<Packet> firstPacket;
firstPacket.swap(_packets.front());
_packets.pop_front();
std::unique_ptr<Packet> secondPacket;
if (((uint32_t) nextNumber & 0xF) == 0) {
// the first packet is the first in a probe pair - every 16 (rightmost 16 bits = 0) packets
// pull off a second packet if we can before we unlock
if (_packets.size() > 0) {
secondPacket.swap(_packets.front());
_packets.pop_front();
}
}
// unlock the packets, we're done pulling
locker.unlock();
// definitely send the first packet
sendNewPacketAndAddToSentList(move(firstPacket), nextNumber);
// do we have a second in a pair to send as well?
if (secondPacket) {
nextNumber = getNextSequenceNumber();
sendNewPacketAndAddToSentList(move(secondPacket), nextNumber);
}
} else {
locker.unlock();
}
}
// since we're a while loop, give the thread a chance to process events
QCoreApplication::processEvents();
// we just processed events so check now if we were just told to stop
if (!_isRunning) {
break;
}
// sleep as long as we need until next packet send, if we can
auto now = high_resolution_clock::now();
auto microsecondDuration = duration_cast<microseconds>((_lastSendTimestamp + microseconds(_packetSendPeriod)) - now);
if (microsecondDuration.count() > 0) {
usleep(microsecondDuration.count());
}
}
}

View file

@ -0,0 +1,103 @@
//
// SendQueue.h
// libraries/networking/src/udt
//
// Created by Clement on 7/21/15.
// Copyright 2015 High Fidelity, Inc.
//
// Distributed under the Apache License, Version 2.0.
// See the accompanying file LICENSE or http://www.apache.org/licenses/LICENSE-2.0.html
//
#ifndef hifi_SendQueue_h
#define hifi_SendQueue_h
#include <chrono>
#include <list>
#include <unordered_map>
#include <QtCore/QObject>
#include <QtCore/QReadWriteLock>
#include <QtCore/QTimer>
#include "../HifiSockAddr.h"
#include "Constants.h"
#include "SequenceNumber.h"
#include "LossList.h"
namespace udt {
class BasePacket;
class ControlPacket;
class Packet;
class Socket;
class SendQueue : public QObject {
Q_OBJECT
public:
static std::unique_ptr<SendQueue> create(Socket* socket, HifiSockAddr destination);
void queuePacket(std::unique_ptr<Packet> packet);
int getQueueSize() const { QReadLocker locker(&_packetsLock); return _packets.size(); }
SequenceNumber getCurrentSequenceNumber() const { return SequenceNumber(_atomicCurrentSequenceNumber); }
void setFlowWindowSize(int flowWindowSize) { _flowWindowSize = flowWindowSize; }
int getPacketSendPeriod() const { return _packetSendPeriod; }
void setPacketSendPeriod(int newPeriod) { _packetSendPeriod = newPeriod; }
public slots:
void stop();
void ack(SequenceNumber ack);
void nak(SequenceNumber start, SequenceNumber end);
void overrideNAKListFromPacket(ControlPacket& packet);
signals:
void packetSent(int dataSize, int payloadSize);
void packetRetransmitted();
private slots:
void run();
private:
SendQueue(Socket* socket, HifiSockAddr dest);
SendQueue(SendQueue& other) = delete;
SendQueue(SendQueue&& other) = delete;
void sendPacket(const Packet& packet);
void sendNewPacketAndAddToSentList(std::unique_ptr<Packet> newPacket, SequenceNumber sequenceNumber);
// Increments current sequence number and return it
SequenceNumber getNextSequenceNumber();
mutable QReadWriteLock _packetsLock; // Protects the packets to be sent list.
std::list<std::unique_ptr<Packet>> _packets; // List of packets to be sent
Socket* _socket { nullptr }; // Socket to send packet on
HifiSockAddr _destination; // Destination addr
std::atomic<uint32_t> _lastACKSequenceNumber; // Last ACKed sequence number
SequenceNumber _currentSequenceNumber; // Last sequence number sent out
std::atomic<uint32_t> _atomicCurrentSequenceNumber;// Atomic for last sequence number sent out
std::atomic<int> _packetSendPeriod; // Interval between two packet send event in microseconds, set from CC
std::chrono::high_resolution_clock::time_point _lastSendTimestamp; // Record last time of packet departure
std::atomic<bool> _isRunning { false };
std::atomic<int> _flowWindowSize; // Flow control window size (number of packets that can be on wire) - set from CC
mutable QReadWriteLock _naksLock; // Protects the naks list.
LossList _naks; // Sequence numbers of packets to resend
mutable QReadWriteLock _sentLock; // Protects the sent packet list
std::unordered_map<SequenceNumber, std::unique_ptr<Packet>> _sentPackets; // Packets waiting for ACK.
};
}
#endif // hifi_SendQueue_h

View file

@ -0,0 +1,29 @@
//
// SequenceNumber.cpp
// libraries/networking/src/udt
//
// Created by Clement on 7/23/15.
// Copyright 2015 High Fidelity, Inc.
//
// Distributed under the Apache License, Version 2.0.
// See the accompanying file LICENSE or http://www.apache.org/licenses/LICENSE-2.0.html
//
#include "SequenceNumber.h"
int udt::seqlen(const SequenceNumber& seq1, const SequenceNumber& seq2) {
return (seq1._value <= seq2._value) ? (seq2._value - seq1._value + 1)
: (seq2._value - seq1._value + SequenceNumber::MAX + 2);
}
int udt::seqoff(const SequenceNumber& seq1, const SequenceNumber& seq2) {
if (glm::abs(seq1._value - seq2._value) < SequenceNumber::THRESHOLD) {
return seq2._value - seq1._value;
}
if (seq1._value < seq2._value) {
return seq2._value - seq1._value - SequenceNumber::MAX - 1;
}
return seq2._value - seq1._value + SequenceNumber::MAX + 1;
}

View file

@ -0,0 +1,149 @@
//
// SequenceNumber.h
// libraries/networking/src/udt
//
// Created by Clement on 7/23/15.
// Copyright 2015 High Fidelity, Inc.
//
// Distributed under the Apache License, Version 2.0.
// See the accompanying file LICENSE or http://www.apache.org/licenses/LICENSE-2.0.html
//
#ifndef hifi_SequenceNumber_h
#define hifi_SequenceNumber_h
#include <functional>
#include <glm/detail/func_common.hpp>
namespace udt {
class SequenceNumber {
public:
// Base type of sequence numbers
using Type = int32_t;
using UType = uint32_t;
// Values are for 29 bit SequenceNumber
static const Type THRESHOLD = 0x0FFFFFFF; // threshold for comparing sequence numbers
static const Type MAX = 0x1FFFFFFF; // maximum sequence number used in UDT
SequenceNumber() = default;
SequenceNumber(const SequenceNumber& other) : _value(other._value) {}
// Only explicit conversions
explicit SequenceNumber(char* value) { _value = (*reinterpret_cast<int32_t*>(value)) & MAX; }
explicit SequenceNumber(Type value) { _value = (value <= MAX) ? ((value >= 0) ? value : 0) : MAX; }
explicit SequenceNumber(UType value) { _value = (value <= MAX) ? value : MAX; }
explicit operator Type() { return _value; }
explicit operator UType() { return static_cast<UType>(_value); }
inline SequenceNumber& operator++() {
_value = (_value + 1) % (MAX + 1);
return *this;
}
inline SequenceNumber& operator--() {
_value = (_value == 0) ? MAX : --_value;
return *this;
}
inline SequenceNumber operator++(int) {
SequenceNumber before = *this;
++(*this);
return before;
}
inline SequenceNumber operator--(int) {
SequenceNumber before = *this;
--(*this);
return before;
}
inline SequenceNumber& operator=(const SequenceNumber& other) {
_value = other._value;
return *this;
}
inline SequenceNumber& operator+=(Type inc) {
_value = (_value + inc > MAX) ? _value + inc - (MAX + 1) : _value + inc;
return *this;
}
inline SequenceNumber& operator-=(Type dec) {
_value = (_value < dec) ? MAX - (dec - _value + 1) : _value - dec;
return *this;
}
inline bool operator==(const SequenceNumber& other) const {
return _value == other._value;
}
inline bool operator!=(const SequenceNumber& other) const {
return _value != other._value;
}
friend bool operator<(const SequenceNumber& a, const SequenceNumber& b);
friend bool operator>(const SequenceNumber& a, const SequenceNumber& b);
friend bool operator<=(const SequenceNumber& a, const SequenceNumber& b);
friend bool operator>=(const SequenceNumber& a, const SequenceNumber& b);
friend SequenceNumber operator+(const SequenceNumber a, const Type& b);
friend SequenceNumber operator+(const Type& a, const SequenceNumber b);
friend SequenceNumber operator-(const SequenceNumber a, const Type& b);
friend SequenceNumber operator-(const Type& a, const SequenceNumber b);
friend int seqlen(const SequenceNumber& seq1, const SequenceNumber& seq2);
friend int seqoff(const SequenceNumber& seq1, const SequenceNumber& seq2);
private:
Type _value { 0 };
friend struct std::hash<SequenceNumber>;
};
static_assert(sizeof(SequenceNumber) == sizeof(uint32_t), "SequenceNumber invalid size");
inline bool operator<(const SequenceNumber& a, const SequenceNumber& b) {
return (glm::abs(a._value - b._value) < SequenceNumber::THRESHOLD) ? a._value < b._value : b._value < a._value;
}
inline bool operator>(const SequenceNumber& a, const SequenceNumber& b) {
return (glm::abs(a._value - b._value) < SequenceNumber::THRESHOLD) ? a._value > b._value : b._value > a._value;
}
inline bool operator<=(const SequenceNumber& a, const SequenceNumber& b) {
return (glm::abs(a._value - b._value) < SequenceNumber::THRESHOLD) ? a._value <= b._value : b._value <= a._value;
}
inline bool operator>=(const SequenceNumber& a, const SequenceNumber& b) {
return (glm::abs(a._value - b._value) < SequenceNumber::THRESHOLD) ? a._value >= b._value : b._value >= a._value;
}
inline SequenceNumber operator+(SequenceNumber a, const SequenceNumber::Type& b) {
a += b;
return a;
}
inline SequenceNumber operator+(const SequenceNumber::Type& a, SequenceNumber b) {
b += a;
return b;
}
inline SequenceNumber operator-(SequenceNumber a, const SequenceNumber::Type& b) {
a -= b;
return a;
}
inline SequenceNumber operator-(const SequenceNumber::Type& a, SequenceNumber b) {
b -= a;
return b;
}
int seqlen(const SequenceNumber& seq1, const SequenceNumber& seq2);
int seqoff(const SequenceNumber& seq1, const SequenceNumber& seq2);
}
template<> struct std::hash<udt::SequenceNumber> {
size_t operator()(const udt::SequenceNumber& SequenceNumber) const {
return std::hash<unsigned long>()(SequenceNumber._value);
}
};
#endif // hifi_SequenceNumber_h

View file

@ -11,7 +11,15 @@
#include "Socket.h"
#include <QtCore/QThread>
#include <LogHandler.h>
#include "../NetworkLogging.h"
#include "Connection.h"
#include "ControlPacket.h"
#include "Packet.h"
#include "../NLPacket.h"
using namespace udt;
@ -19,32 +27,43 @@ Socket::Socket(QObject* parent) :
QObject(parent)
{
connect(&_udpSocket, &QUdpSocket::readyRead, this, &Socket::readPendingDatagrams);
// make sure our synchronization method is called every SYN interval
connect(&_synTimer, &QTimer::timeout, this, &Socket::rateControlSync);
// start our timer for the synchronization time interval
_synTimer.start(_synInterval);
}
void Socket::rebind() {
quint16 oldPort = _udpSocket.localPort();
_udpSocket.close();
_udpSocket.bind(QHostAddress::AnyIPv4, oldPort);
bind(QHostAddress::AnyIPv4, oldPort);
}
void Socket::setBufferSizes(int numBytes) {
void Socket::setSystemBufferSizes() {
for (int i = 0; i < 2; i++) {
QAbstractSocket::SocketOption bufferOpt;
QString bufferTypeString;
int numBytes = 0;
if (i == 0) {
bufferOpt = QAbstractSocket::SendBufferSizeSocketOption;
numBytes = udt::UDP_SEND_BUFFER_SIZE_BYTES;
bufferTypeString = "send";
} else {
bufferOpt = QAbstractSocket::ReceiveBufferSizeSocketOption;
numBytes = udt::UDP_RECEIVE_BUFFER_SIZE_BYTES;
bufferTypeString = "receive";
}
int oldBufferSize = _udpSocket.socketOption(bufferOpt).toInt();
if (oldBufferSize < numBytes) {
_udpSocket.setSocketOption(bufferOpt, QVariant(numBytes));
int newBufferSize = _udpSocket.socketOption(bufferOpt).toInt();
qCDebug(networking) << "Changed socket" << bufferTypeString << "buffer size from" << oldBufferSize << "to"
@ -57,32 +76,73 @@ void Socket::setBufferSizes(int numBytes) {
}
}
qint64 Socket::writeUnreliablePacket(const Packet& packet, const HifiSockAddr& sockAddr) {
qint64 Socket::writeBasePacket(const udt::BasePacket& packet, const HifiSockAddr &sockAddr) {
// Since this is a base packet we have no way to know if this is reliable or not - we just fire it off
// this should not be called with an instance of Packet
Q_ASSERT_X(!dynamic_cast<const Packet*>(&packet),
"Socket::writeBasePacket", "Cannot send a Packet/NLPacket via writeBasePacket");
return writeDatagram(packet.getData(), packet.getDataSize(), sockAddr);
}
qint64 Socket::writePacket(const Packet& packet, const HifiSockAddr& sockAddr) {
Q_ASSERT_X(!packet.isReliable(), "Socket::writePacket", "Cannot send a reliable packet unreliably");
// write the correct sequence number to the Packet here
packet.writeSequenceNumber(++_unreliableSequenceNumbers[sockAddr]);
return writeDatagram(packet.getData(), packet.getDataSize(), sockAddr);
}
qint64 Socket::writePacket(std::unique_ptr<Packet> packet, const HifiSockAddr& sockAddr) {
if (packet->isReliable()) {
findOrCreateConnection(sockAddr).sendReliablePacket(move(packet));
return 0;
}
return writePacket(*packet, sockAddr);
}
qint64 Socket::writeDatagram(const char* data, qint64 size, const HifiSockAddr& sockAddr) {
return writeDatagram(QByteArray::fromRawData(data, size), sockAddr);
}
qint64 Socket::writeDatagram(const QByteArray& datagram, const HifiSockAddr& sockAddr) {
qint64 bytesWritten = _udpSocket.writeDatagram(datagram, sockAddr.getAddress(), sockAddr.getPort());
// TODO: write the correct sequence number to the Packet here
// const_cast<NLPacket&>(packet).writeSequenceNumber(sequenceNumber);
if (bytesWritten < 0) {
qCDebug(networking) << "ERROR in writeDatagram:" << _udpSocket.error() << "-" << _udpSocket.errorString();
// when saturating a link this isn't an uncommon message - suppress it so it doesn't bomb the debug
static const QString WRITE_ERROR_REGEX = "Socket::writeDatagram QAbstractSocket::NetworkError - Unable to send a message";
static QString repeatedMessage
= LogHandler::getInstance().addRepeatedMessageRegex(WRITE_ERROR_REGEX);
qCDebug(networking) << "Socket::writeDatagram" << _udpSocket.error() << "-" << qPrintable(_udpSocket.errorString());
}
return bytesWritten;
}
Connection& Socket::findOrCreateConnection(const HifiSockAddr& sockAddr) {
auto it = _connectionsHash.find(sockAddr);
if (it == _connectionsHash.end()) {
auto connection = std::unique_ptr<Connection>(new Connection(this, sockAddr, _ccFactory->create()));
it = _connectionsHash.insert(it, std::make_pair(sockAddr, std::move(connection)));
}
return *it->second;
}
void Socket::readPendingDatagrams() {
while (_udpSocket.hasPendingDatagrams()) {
int packetSizeWithHeader = -1;
while ((packetSizeWithHeader = _udpSocket.pendingDatagramSize()) != -1) {
// setup a HifiSockAddr to read into
HifiSockAddr senderSockAddr;
// setup a buffer to read the packet into
int packetSizeWithHeader = _udpSocket.pendingDatagramSize();
std::unique_ptr<char> buffer = std::unique_ptr<char>(new char[packetSizeWithHeader]);
auto buffer = std::unique_ptr<char[]>(new char[packetSizeWithHeader]);
// pull the datagram
_udpSocket.readDatagram(buffer.get(), packetSizeWithHeader,
@ -100,15 +160,88 @@ void Socket::readPendingDatagrams() {
return;
}
// setup a Packet from the data we just read
auto packet = Packet::fromReceivedPacket(std::move(buffer), packetSizeWithHeader, senderSockAddr);
// check if this was a control packet or a data packet
bool isControlPacket = *reinterpret_cast<uint32_t*>(buffer.get()) & CONTROL_BIT_MASK;
// call our verification operator to see if this packet is verified
if (!_packetFilterOperator || _packetFilterOperator(*packet)) {
if (_packetHandler) {
// call the verified packet callback to let it handle this packet
return _packetHandler(std::move(packet));
if (isControlPacket) {
// setup a control packet from the data we just read
auto controlPacket = ControlPacket::fromReceivedPacket(std::move(buffer), packetSizeWithHeader, senderSockAddr);
// move this control packet to the matching connection
auto& connection = findOrCreateConnection(senderSockAddr);
connection.processControl(move(controlPacket));
} else {
// setup a Packet from the data we just read
auto packet = Packet::fromReceivedPacket(std::move(buffer), packetSizeWithHeader, senderSockAddr);
// call our verification operator to see if this packet is verified
if (!_packetFilterOperator || _packetFilterOperator(*packet)) {
if (packet->isReliable()) {
// if this was a reliable packet then signal the matching connection with the sequence number
auto& connection = findOrCreateConnection(senderSockAddr);
connection.processReceivedSequenceNumber(packet->getSequenceNumber(),
packet->getDataSize(),
packet->getPayloadSize());
}
if (_packetHandler) {
// call the verified packet callback to let it handle this packet
_packetHandler(std::move(packet));
}
}
}
}
}
void Socket::connectToSendSignal(const HifiSockAddr& destinationAddr, QObject* receiver, const char* slot) {
auto it = _connectionsHash.find(destinationAddr);
if (it != _connectionsHash.end()) {
connect(it->second.get(), SIGNAL(packetSent()), receiver, slot);
}
}
void Socket::rateControlSync() {
// enumerate our list of connections and ask each of them to send off periodic ACK packet for rate control
for (auto& connection : _connectionsHash) {
connection.second->sync();
}
if (_synTimer.interval() != _synInterval) {
// if the _synTimer interval doesn't match the current _synInterval (changes when the CC factory is changed)
// then restart it now with the right interval
_synTimer.start(_synInterval);
}
}
void Socket::setCongestionControlFactory(std::unique_ptr<CongestionControlVirtualFactory> ccFactory) {
// swap the current unique_ptr for the new factory
_ccFactory.swap(ccFactory);
// update the _synInterval to the value from the factory
_synInterval = _ccFactory->synInterval();
}
ConnectionStats::Stats Socket::sampleStatsForConnection(const HifiSockAddr& destination) {
Q_ASSERT_X(thread() == QThread::currentThread(),
"Socket::sampleStatsForConnection",
"Stats sampling for connection must be on socket thread");
auto it = _connectionsHash.find(destination);
if (it != _connectionsHash.end()) {
return it->second->sampleStats();
} else {
return ConnectionStats::Stats();
}
}
std::vector<HifiSockAddr> Socket::getConnectionSockAddrs() {
std::vector<HifiSockAddr> addr;
addr.reserve(_connectionsHash.size());
for (const auto& connectionPair : _connectionsHash) {
addr.push_back(connectionPair.first);
}
return addr;
}

View file

@ -18,13 +18,20 @@
#include <unordered_map>
#include <QtCore/QObject>
#include <QtCore/QTimer>
#include <QtNetwork/QUdpSocket>
#include "../HifiSockAddr.h"
#include "Packet.h"
#include "CongestionControl.h"
#include "Connection.h"
namespace udt {
class BasePacket;
class ControlSender;
class Packet;
class SequenceNumber;
using PacketFilterOperator = std::function<bool(const Packet&)>;
using BasePacketHandler = std::function<void(std::unique_ptr<BasePacket>)>;
@ -37,34 +44,49 @@ public:
quint16 localPort() const { return _udpSocket.localPort(); }
qint64 writeUnreliablePacket(const Packet& packet, const HifiSockAddr& sockAddr);
qint64 writeDatagram(const char* data, qint64 size, const HifiSockAddr& sockAddr)
{ return writeDatagram(QByteArray::fromRawData(data, size), sockAddr); }
// Simple functions writing to the socket with no processing
qint64 writeBasePacket(const BasePacket& packet, const HifiSockAddr& sockAddr);
qint64 writePacket(const Packet& packet, const HifiSockAddr& sockAddr);
qint64 writePacket(std::unique_ptr<Packet> packet, const HifiSockAddr& sockAddr);
qint64 writeDatagram(const char* data, qint64 size, const HifiSockAddr& sockAddr);
qint64 writeDatagram(const QByteArray& datagram, const HifiSockAddr& sockAddr);
void bind(const QHostAddress& address, quint16 port = 0) { _udpSocket.bind(address, port); }
void bind(const QHostAddress& address, quint16 port = 0) { _udpSocket.bind(address, port); setSystemBufferSizes(); }
void rebind();
void setPacketFilterOperator(PacketFilterOperator filterOperator) { _packetFilterOperator = filterOperator; }
void setPacketHandler(PacketHandler handler) { _packetHandler = handler; }
void setBufferSizes(int numBytes);
void addUnfilteredHandler(const HifiSockAddr& senderSockAddr, BasePacketHandler handler)
{ _unfilteredHandlers[senderSockAddr] = handler; }
void setCongestionControlFactory(std::unique_ptr<CongestionControlVirtualFactory> ccFactory);
void connectToSendSignal(const HifiSockAddr& destinationAddr, QObject* receiver, const char* slot);
ConnectionStats::Stats sampleStatsForConnection(const HifiSockAddr& destination);
std::vector<HifiSockAddr> getConnectionSockAddrs();
private slots:
void readPendingDatagrams();
void rateControlSync();
private:
void setSystemBufferSizes();
Connection& findOrCreateConnection(const HifiSockAddr& sockAddr);
QUdpSocket _udpSocket { this };
PacketFilterOperator _packetFilterOperator;
PacketHandler _packetHandler;
std::unordered_map<HifiSockAddr, BasePacketHandler> _unfilteredHandlers;
std::unordered_map<HifiSockAddr, Packet::SequenceNumber> _packetSequenceNumbers;
std::unordered_map<HifiSockAddr, SequenceNumber> _unreliableSequenceNumbers;
std::unordered_map<HifiSockAddr, std::unique_ptr<Connection>> _connectionsHash;
int _synInterval = 10; // 10ms
QTimer _synTimer;
std::unique_ptr<CongestionControlVirtualFactory> _ccFactory { new CongestionControlFactory<DefaultCC>() };
};
} // namespace udt

View file

@ -1,12 +0,0 @@
//
// udt.cpp
//
//
// Created by Clement on 7/13/15.
// Copyright 2015 High Fidelity, Inc.
//
// Distributed under the Apache License, Version 2.0.
// See the accompanying file LICENSE or http://www.apache.org/licenses/LICENSE-2.0.html
//
#include "udt.h"

View file

@ -1,15 +0,0 @@
//
// udt.h
//
//
// Created by Clement on 7/13/15.
// Copyright 2015 High Fidelity, Inc.
//
// Distributed under the Apache License, Version 2.0.
// See the accompanying file LICENSE or http://www.apache.org/licenses/LICENSE-2.0.html
//
#ifndef hifi_udt_h
#define hifi_udt_h
#endif // hifi_udt_h

View file

@ -12,20 +12,14 @@
#ifndef hifi_OctreeEditPacketSender_h
#define hifi_OctreeEditPacketSender_h
#include <unordered_map>
#include <PacketSender.h>
#include <udt/PacketHeaders.h>
#include "JurisdictionMap.h"
#include "SentPacketHistory.h"
namespace std {
template <> struct hash<QUuid> {
size_t operator()(const QUuid& uuid) const {
return qHash(uuid);
}
};
}
/// Utility for processing, packing, queueing and sending of outbound edit messages.
class OctreeEditPacketSender : public PacketSender {
Q_OBJECT

View file

@ -18,7 +18,7 @@ QTEST_MAIN(PacketTests)
std::unique_ptr<Packet> copyToReadPacket(std::unique_ptr<Packet>& packet) {
auto size = packet->getDataSize();
auto data = std::unique_ptr<char>(new char[size]);
auto data = std::unique_ptr<char[]>(new char[size]);
memcpy(data.get(), packet->getData(), size);
return Packet::fromReceivedPacket(std::move(data), size, HifiSockAddr());
}

View file

@ -28,7 +28,6 @@
#include <QQmlContext>
#include <QtQml/QQmlApplicationEngine>
#include <PathUtils.h>
#include <unordered_map>
#include <memory>
#include <glm/glm.hpp>
#include <PathUtils.h>

View file

@ -5,6 +5,8 @@ set_target_properties(mtc PROPERTIES FOLDER "Tools")
add_subdirectory(scribe)
set_target_properties(scribe PROPERTIES FOLDER "Tools")
add_subdirectory(udt-test)
set_target_properties(udt-test PROPERTIES FOLDER "Tools")
add_subdirectory(vhacd-util)
set_target_properties(vhacd-util PROPERTIES FOLDER "Tools")

View file

@ -0,0 +1,6 @@
set(TARGET_NAME udt-test)
setup_hifi_project()
link_hifi_libraries(networking shared)
copy_dlls_beside_windows_executable()

View file

@ -0,0 +1,290 @@
//
// UDTTest.cpp
// tools/udt-test/src
//
// Created by Stephen Birarda on 2015-07-30.
// Copyright 2015 High Fidelity, Inc.
//
// Distributed under the Apache License, Version 2.0.
// See the accompanying file LICENSE or http://www.apache.org/licenses/LICENSE-2.0.html
//
#include "UDTTest.h"
#include <QtCore/QDebug>
#include <udt/Constants.h>
#include <udt/Packet.h>
#include <LogHandler.h>
const QCommandLineOption PORT_OPTION { "p", "listening port for socket (defaults to random)", "port", 0 };
const QCommandLineOption TARGET_OPTION {
"target", "target for sent packets (default is listen only)",
"IP:PORT or HOSTNAME:PORT"
};
const QCommandLineOption PACKET_SIZE {
"packet-size", "size for sent packets in bytes (defaults to 1500)", "bytes",
QString(udt::MAX_PACKET_SIZE_WITH_UDP_HEADER)
};
const QCommandLineOption MIN_PACKET_SIZE {
"min-packet-size", "min size for sent packets in bytes", "min-bytes"
};
const QCommandLineOption MAX_PACKET_SIZE {
"max-packet-size", "max size for sent packets in bytes", "max-bytes"
};
const QCommandLineOption MAX_SEND_BYTES {
"max-send-bytes", "number of bytes to send before stopping (default is infinite)", "max-bytes"
};
const QCommandLineOption MAX_SEND_PACKETS {
"max-send-packets", "number of packets to send before stopping (default is infinite)", "max-packets"
};
const QCommandLineOption UNRELIABLE_PACKETS {
"unreliable", "send unreliable packets (default is reliable)"
};
const QStringList CLIENT_STATS_TABLE_HEADERS {
"Send Rate (P/s)", "Bandwidth (P/s)", "RTT(ms)", "CW (P)", "Send Period (us)",
"Received ACK", "Processed ACK", "Received LACK", "Received NAK", "Received TNAK",
"Sent ACK2", "Sent Packets", "Re-sent Packets"
};
const QStringList SERVER_STATS_TABLE_HEADERS {
"Receive Rate (P/s)", "Bandwidth (P/s)", "RTT(ms)", "CW (P)",
"Sent ACK", "Sent LACK", "Sent NAK", "Sent TNAK",
"Recieved ACK2", "Duplicate Packets"
};
UDTTest::UDTTest(int& argc, char** argv) :
QCoreApplication(argc, argv)
{
qInstallMessageHandler(LogHandler::verboseMessageHandler);
parseArguments();
// randomize the seed for packet size randomization
srand(time(NULL));
_socket.bind(QHostAddress::AnyIPv4, _argumentParser.value(PORT_OPTION).toUInt());
qDebug() << "Test socket is listening on" << _socket.localPort();
if (_argumentParser.isSet(TARGET_OPTION)) {
// parse the IP and port combination for this target
QString hostnamePortString = _argumentParser.value(TARGET_OPTION);
QHostAddress address { hostnamePortString.left(hostnamePortString.indexOf(':')) };
quint16 port { (quint16) hostnamePortString.mid(hostnamePortString.indexOf(':') + 1).toUInt() };
if (address.isNull() || port == 0) {
qCritical() << "Could not parse an IP address and port combination from" << hostnamePortString << "-" <<
"The parsed IP was" << address.toString() << "and the parsed port was" << port;
QMetaObject::invokeMethod(this, "quit", Qt::QueuedConnection);
} else {
_target = HifiSockAddr(address, port);
qDebug() << "Packets will be sent to" << _target;
}
}
if (_argumentParser.isSet(PACKET_SIZE)) {
// parse the desired packet size
_minPacketSize = _maxPacketSize = _argumentParser.value(PACKET_SIZE).toInt();
if (_argumentParser.isSet(MIN_PACKET_SIZE) || _argumentParser.isSet(MAX_PACKET_SIZE)) {
qCritical() << "Cannot set a min packet size or max packet size AND a specific packet size.";
QMetaObject::invokeMethod(this, "quit", Qt::QueuedConnection);
}
} else {
bool customMinSize = false;
if (_argumentParser.isSet(MIN_PACKET_SIZE)) {
_minPacketSize = _argumentParser.value(MIN_PACKET_SIZE).toInt();
customMinSize = true;
}
if (_argumentParser.isSet(MAX_PACKET_SIZE)) {
_maxPacketSize = _argumentParser.value(MAX_PACKET_SIZE).toInt();
// if we don't have a min packet size we should make it 1, because we have a max
if (customMinSize) {
_minPacketSize = 1;
}
}
if (_maxPacketSize < _minPacketSize) {
qCritical() << "Cannot set a max packet size that is smaller than the min packet size.";
QMetaObject::invokeMethod(this, "quit", Qt::QueuedConnection);
}
}
if (_argumentParser.isSet(MAX_SEND_BYTES)) {
_maxSendBytes = _argumentParser.value(MAX_SEND_BYTES).toInt();
}
if (_argumentParser.isSet(MAX_SEND_PACKETS)) {
_maxSendPackets = _argumentParser.value(MAX_SEND_PACKETS).toInt();
}
if (_argumentParser.isSet(UNRELIABLE_PACKETS)) {
_sendReliable = false;
}
if (!_target.isNull()) {
sendInitialPackets();
}
// the sender reports stats every 100 milliseconds
static const int STATS_SAMPLE_INTERVAL = 100;
QTimer* statsTimer = new QTimer(this);
connect(statsTimer, &QTimer::timeout, this, &UDTTest::sampleStats);
statsTimer->start(STATS_SAMPLE_INTERVAL);
}
void UDTTest::parseArguments() {
// use a QCommandLineParser to setup command line arguments and give helpful output
_argumentParser.setApplicationDescription("High Fidelity UDT Protocol Test Client");
_argumentParser.addHelpOption();
const QCommandLineOption helpOption = _argumentParser.addHelpOption();
_argumentParser.addOptions({
PORT_OPTION, TARGET_OPTION, PACKET_SIZE, MIN_PACKET_SIZE, MAX_PACKET_SIZE,
MAX_SEND_BYTES, MAX_SEND_PACKETS, UNRELIABLE_PACKETS
});
if (!_argumentParser.parse(arguments())) {
qCritical() << _argumentParser.errorText();
_argumentParser.showHelp();
Q_UNREACHABLE();
}
if (_argumentParser.isSet(helpOption)) {
_argumentParser.showHelp();
Q_UNREACHABLE();
}
}
void UDTTest::sendInitialPackets() {
static const int NUM_INITIAL_PACKETS = 500;
int numPackets = std::max(NUM_INITIAL_PACKETS, _maxSendPackets);
for (int i = 0; i < numPackets; ++i) {
sendPacket();
}
if (numPackets == NUM_INITIAL_PACKETS) {
// we've put 500 initial packets in the queue, everytime we hear one has gone out we should add a new one
_socket.connectToSendSignal(_target, this, SLOT(refillPacket()));
}
}
void UDTTest::sendPacket() {
if (_maxSendPackets != -1 && _totalQueuedPackets > _maxSendPackets) {
// don't send more packets, we've hit max
return;
}
if (_maxSendBytes != -1 && _totalQueuedBytes > _maxSendBytes) {
// don't send more packets, we've hit max
return;
}
// we're good to send a new packet, construct it now
// figure out what size the packet will be
int packetPayloadSize = 0;
if (_minPacketSize == _maxPacketSize) {
// we know what size we want - figure out the payload size
packetPayloadSize = _maxPacketSize - udt::Packet::localHeaderSize(false);
} else {
// pick a random size in our range
int randomPacketSize = rand() % _maxPacketSize + _minPacketSize;
packetPayloadSize = randomPacketSize - udt::Packet::localHeaderSize(false);
}
auto newPacket = udt::Packet::create(packetPayloadSize, _sendReliable);
newPacket->setPayloadSize(packetPayloadSize);
_totalQueuedBytes += newPacket->getDataSize();
// queue or send this packet by calling write packet on the socket for our target
if (_sendReliable) {
_socket.writePacket(std::move(newPacket), _target);
} else {
_socket.writePacket(*newPacket, _target);
}
++_totalQueuedPackets;
}
void UDTTest::sampleStats() {
static bool first = true;
static const double USECS_PER_MSEC = 1000.0;
if (!_target.isNull()) {
if (first) {
// output the headers for stats for our table
qDebug() << qPrintable(CLIENT_STATS_TABLE_HEADERS.join(" | "));
first = false;
}
udt::ConnectionStats::Stats stats = _socket.sampleStatsForConnection(_target);
int headerIndex = -1;
// setup a list of left justified values
QStringList values {
QString::number(stats.sendRate).leftJustified(CLIENT_STATS_TABLE_HEADERS[++headerIndex].size()),
QString::number(stats.estimatedBandwith).leftJustified(CLIENT_STATS_TABLE_HEADERS[++headerIndex].size()),
QString::number(stats.rtt / USECS_PER_MSEC).leftJustified(CLIENT_STATS_TABLE_HEADERS[++headerIndex].size()),
QString::number(stats.congestionWindowSize).leftJustified(CLIENT_STATS_TABLE_HEADERS[++headerIndex].size()),
QString::number(stats.packetSendPeriod).leftJustified(CLIENT_STATS_TABLE_HEADERS[++headerIndex].size()),
QString::number(stats.events[udt::ConnectionStats::Stats::ReceivedACK]).leftJustified(CLIENT_STATS_TABLE_HEADERS[++headerIndex].size()),
QString::number(stats.events[udt::ConnectionStats::Stats::ProcessedACK]).leftJustified(CLIENT_STATS_TABLE_HEADERS[++headerIndex].size()),
QString::number(stats.events[udt::ConnectionStats::Stats::ReceivedLightACK]).leftJustified(CLIENT_STATS_TABLE_HEADERS[++headerIndex].size()),
QString::number(stats.events[udt::ConnectionStats::Stats::ReceivedNAK]).leftJustified(CLIENT_STATS_TABLE_HEADERS[++headerIndex].size()),
QString::number(stats.events[udt::ConnectionStats::Stats::ReceivedTimeoutNAK]).leftJustified(CLIENT_STATS_TABLE_HEADERS[++headerIndex].size()),
QString::number(stats.events[udt::ConnectionStats::Stats::SentACK2]).leftJustified(CLIENT_STATS_TABLE_HEADERS[++headerIndex].size()),
QString::number(stats.sentPackets).leftJustified(CLIENT_STATS_TABLE_HEADERS[++headerIndex].size()),
QString::number(stats.events[udt::ConnectionStats::Stats::Retransmission]).leftJustified(CLIENT_STATS_TABLE_HEADERS[++headerIndex].size())
};
// output this line of values
qDebug() << qPrintable(values.join(" | "));
} else {
if (first) {
// output the headers for stats for our table
qDebug() << qPrintable(SERVER_STATS_TABLE_HEADERS.join(" | "));
first = false;
}
auto sockets = _socket.getConnectionSockAddrs();
if (sockets.size() > 0) {
udt::ConnectionStats::Stats stats = _socket.sampleStatsForConnection(sockets.front());
int headerIndex = -1;
// setup a list of left justified values
QStringList values {
QString::number(stats.receiveRate).leftJustified(SERVER_STATS_TABLE_HEADERS[++headerIndex].size()),
QString::number(stats.estimatedBandwith).leftJustified(SERVER_STATS_TABLE_HEADERS[++headerIndex].size()),
QString::number(stats.rtt / USECS_PER_MSEC).leftJustified(SERVER_STATS_TABLE_HEADERS[++headerIndex].size()),
QString::number(stats.congestionWindowSize).leftJustified(CLIENT_STATS_TABLE_HEADERS[++headerIndex].size()),
QString::number(stats.events[udt::ConnectionStats::Stats::SentACK]).leftJustified(SERVER_STATS_TABLE_HEADERS[++headerIndex].size()),
QString::number(stats.events[udt::ConnectionStats::Stats::SentLightACK]).leftJustified(SERVER_STATS_TABLE_HEADERS[++headerIndex].size()),
QString::number(stats.events[udt::ConnectionStats::Stats::SentNAK]).leftJustified(SERVER_STATS_TABLE_HEADERS[++headerIndex].size()),
QString::number(stats.events[udt::ConnectionStats::Stats::SentTimeoutNAK]).leftJustified(SERVER_STATS_TABLE_HEADERS[++headerIndex].size()),
QString::number(stats.events[udt::ConnectionStats::Stats::ReceivedACK2]).leftJustified(SERVER_STATS_TABLE_HEADERS[++headerIndex].size()),
QString::number(stats.events[udt::ConnectionStats::Stats::Duplicate]).leftJustified(SERVER_STATS_TABLE_HEADERS[++headerIndex].size())
};
// output this line of values
qDebug() << qPrintable(values.join(" | "));
}
}
}

View file

@ -0,0 +1,54 @@
//
// UDTTest.h
// tools/udt-test/src
//
// Created by Stephen Birarda on 2015-07-30.
// Copyright 2015 High Fidelity, Inc.
//
// Distributed under the Apache License, Version 2.0.
// See the accompanying file LICENSE or http://www.apache.org/licenses/LICENSE-2.0.html
//
#pragma once
#ifndef hifi_UDTTest_h
#define hifi_UDTTest_h
#include <QtCore/QCoreApplication>
#include <QtCore/QCommandLineParser>
#include <udt/Constants.h>
#include <udt/Socket.h>
class UDTTest : public QCoreApplication {
Q_OBJECT
public:
UDTTest(int& argc, char** argv);
public slots:
void refillPacket() { sendPacket(); } // adds a new packet to the queue when we are told one is sent
void sampleStats();
private:
void parseArguments();
void sendInitialPackets(); // fills the queue with packets to start
void sendPacket(); // constructs and sends a packet according to the test parameters
QCommandLineParser _argumentParser;
udt::Socket _socket;
HifiSockAddr _target; // the target for sent packets
int _minPacketSize { udt::MAX_PACKET_SIZE };
int _maxPacketSize { udt::MAX_PACKET_SIZE };
int _maxSendBytes { -1 }; // the number of bytes to send to the target before stopping
int _maxSendPackets { -1 }; // the number of packets to send to the target before stopping
bool _sendReliable { true }; // wether packets are sent reliably or unreliably
int _totalQueuedPackets { 0 }; // keeps track of the number of packets we have already queued
int _totalQueuedBytes { 0 }; // keeps track of the number of bytes we have already queued
};
#endif // hifi_UDTTest_h

View file

@ -0,0 +1,19 @@
//
// main.cpp
// tools/udt-test/src
//
// Created by Stephen Birarda on 7/30/15.
// Copyright 2015 High Fidelity, Inc.
//
// Distributed under the Apache License, Version 2.0.
// See the accompanying file LICENSE or http://www.apache.org/licenses/LICENSE-2.0.html
#include <QtCore/QCoreApplication>
#include "UDTTest.h"
int main(int argc, char* argv[]) {
UDTTest app(argc, argv);
return app.exec();
}