Add ordered sending to Socket and Connection

This commit is contained in:
Ryan Huffman 2015-08-18 23:04:32 -07:00
parent 4bb1c46ad0
commit 9c65849037
4 changed files with 123 additions and 7 deletions

View file

@ -16,9 +16,12 @@
#include <NumericalConstants.h>
#include "../HifiSockAddr.h"
#include "../NetworkLogging.h"
#include "CongestionControl.h"
#include "ControlPacket.h"
#include "Packet.h"
#include "PacketList.h"
#include "Socket.h"
using namespace udt;
@ -79,7 +82,32 @@ SendQueue& Connection::getSendQueue() {
void Connection::sendReliablePacket(std::unique_ptr<Packet> packet) {
Q_ASSERT_X(packet->isReliable(), "Connection::send", "Trying to send an unreliable packet reliably.");
getSendQueue().queuePacket(move(packet));
getSendQueue().queuePacket(std::move(packet));
}
void Connection::sendReliablePacketList(std::unique_ptr<PacketList> packetList) {
Q_ASSERT_X(packetList->isReliable(), "Connection::send", "Trying to send an unreliable packet reliably.");
getSendQueue().queuePacketList(std::move(packetList));
}
void Connection::queueReceivedMessagePacket(std::unique_ptr<Packet> packet) {
Q_ASSERT(packet->isPartOfMessage());
auto messageNumber = packet->getMessageNumber();
PendingReceivedMessage& pendingMessage = _pendingReceivedMessages[messageNumber];
pendingMessage.enqueuePacket(std::move(packet));
if (pendingMessage.isComplete()) {
// All messages have been received, create PacketList
auto packetList = PacketList::fromReceivedPackets(std::move(pendingMessage._packets));
_pendingReceivedMessages.erase(messageNumber);
if (_parentSocket) {
_parentSocket->messageReceived(std::move(packetList));
}
}
}
void Connection::sync() {
@ -609,3 +637,37 @@ void Connection::updateCongestionControlAndSendQueue(std::function<void ()> cong
_stats.recordPacketSendPeriod(_congestionControl->_packetSendPeriod);
_stats.recordCongestionWindowSize(_congestionControl->_congestionWindowSize);
}
void PendingReceivedMessage::enqueuePacket(std::unique_ptr<Packet> packet) {
if (_isComplete) {
qCDebug(networking) << "UNEXPECTED: Received packet for a message that is already complete";
return;
}
if (packet->getPacketPosition() == Packet::PacketPosition::FIRST) {
_hasFirstSequenceNumber = true;
_firstSequenceNumber = packet->getSequenceNumber();
} else if (packet->getPacketPosition() == Packet::PacketPosition::LAST) {
_hasLastSequenceNumber = true;
_lastSequenceNumber = packet->getSequenceNumber();
} else if (packet->getPacketPosition() == Packet::PacketPosition::ONLY) {
_hasFirstSequenceNumber = true;
_hasLastSequenceNumber = true;
_firstSequenceNumber = packet->getSequenceNumber();
_lastSequenceNumber = packet->getSequenceNumber();
}
_packets.push_back(std::move(packet));
if (_hasFirstSequenceNumber && _hasLastSequenceNumber) {
auto numPackets = udt::seqlen(_firstSequenceNumber, _lastSequenceNumber);
if (uint64_t(numPackets) == _packets.size()) {
_isComplete = true;
// Sort packets by sequence number
_packets.sort([](std::unique_ptr<Packet>& a, std::unique_ptr<Packet>& b) {
return a->getSequenceNumber() < b->getSequenceNumber();
});
}
}
}

View file

@ -30,8 +30,24 @@ namespace udt {
class CongestionControl;
class ControlPacket;
class Packet;
class PacketList;
class Socket;
class PendingReceivedMessage {
public:
void enqueuePacket(std::unique_ptr<Packet> packet);
bool isComplete() const { return _isComplete; }
std::list<std::unique_ptr<Packet>> _packets;
private:
bool _isComplete { false };
bool _hasFirstSequenceNumber { false };
bool _hasLastSequenceNumber { false };
SequenceNumber _firstSequenceNumber;
SequenceNumber _lastSequenceNumber;
};
class Connection : public QObject {
Q_OBJECT
public:
@ -43,12 +59,15 @@ public:
~Connection();
void sendReliablePacket(std::unique_ptr<Packet> packet);
void sendReliablePacketList(std::unique_ptr<PacketList> packet);
void sync(); // rate control method, fired by Socket for all connections on SYN interval
// return indicates if this packet was a duplicate
bool processReceivedSequenceNumber(SequenceNumber sequenceNumber, int packetSize, int payloadSize);
void processControl(std::unique_ptr<ControlPacket> controlPacket);
void queueReceivedMessagePacket(std::unique_ptr<Packet> packet);
ConnectionStats::Stats sampleStats() { return _stats.sample(); }
@ -117,6 +136,8 @@ private:
std::unique_ptr<CongestionControl> _congestionControl;
std::unique_ptr<SendQueue> _sendQueue;
std::map<MessageNumber, PendingReceivedMessage> _pendingReceivedMessages;
int _packetsSinceACK { 0 }; // The number of packets that have been received during the current ACK interval

View file

@ -20,6 +20,7 @@
#include "ControlPacket.h"
#include "Packet.h"
#include "../NLPacket.h"
#include "PacketList.h"
using namespace udt;
@ -104,6 +105,23 @@ qint64 Socket::writePacket(std::unique_ptr<Packet> packet, const HifiSockAddr& s
return writePacket(*packet, sockAddr);
}
qint64 Socket::writePacketList(std::unique_ptr<PacketList> packetList, const HifiSockAddr& sockAddr) {
if (packetList->isReliable()) {
// Reliable and Ordered
// Reliable and Unordered
findOrCreateConnection(sockAddr).sendReliablePacketList(move(packetList));
return 0;
}
// Unerliable and Unordered
qint64 totalBytesSent = 0;
while (!packetList->_packets.empty()) {
totalBytesSent += writePacket(packetList->takeFront<Packet>(), sockAddr);
}
return totalBytesSent;
}
qint64 Socket::writeDatagram(const char* data, qint64 size, const HifiSockAddr& sockAddr) {
return writeDatagram(QByteArray::fromRawData(data, size), sockAddr);
}
@ -126,7 +144,7 @@ qint64 Socket::writeDatagram(const QByteArray& datagram, const HifiSockAddr& soc
Connection& Socket::findOrCreateConnection(const HifiSockAddr& sockAddr) {
auto it = _connectionsHash.find(sockAddr);
if (it == _connectionsHash.end()) {
auto connection = std::unique_ptr<Connection>(new Connection(this, sockAddr, _ccFactory->create()));
it = _connectionsHash.insert(it, std::make_pair(sockAddr, std::move(connection)));
@ -135,6 +153,12 @@ Connection& Socket::findOrCreateConnection(const HifiSockAddr& sockAddr) {
return *it->second;
}
void Socket::messageReceived(std::unique_ptr<PacketList> packetList) {
if (_packetListHandler) {
_packetListHandler(std::move(packetList));
}
}
void Socket::readPendingDatagrams() {
int packetSizeWithHeader = -1;
while ((packetSizeWithHeader = _udpSocket.pendingDatagramSize()) != -1) {
@ -177,16 +201,18 @@ void Socket::readPendingDatagrams() {
// call our verification operator to see if this packet is verified
if (!_packetFilterOperator || _packetFilterOperator(*packet)) {
if (packet->isReliable()) {
// if this was a reliable packet then signal the matching connection with the sequence number
auto& connection = findOrCreateConnection(senderSockAddr);
connection.processReceivedSequenceNumber(packet->getSequenceNumber(),
packet->getDataSize(),
packet->getPayloadSize());
packet->getDataSize(),
packet->getPayloadSize());
}
if (_packetHandler) {
if (packet->isPartOfMessage()) {
auto& connection = findOrCreateConnection(senderSockAddr);
connection.queueReceivedMessagePacket(std::move(packet));
} else if (_packetHandler) {
// call the verified packet callback to let it handle this packet
_packetHandler(std::move(packet));
}

View file

@ -30,12 +30,14 @@ namespace udt {
class BasePacket;
class ControlSender;
class Packet;
class PacketList;
class SequenceNumber;
using PacketFilterOperator = std::function<bool(const Packet&)>;
using BasePacketHandler = std::function<void(std::unique_ptr<BasePacket>)>;
using PacketHandler = std::function<void(std::unique_ptr<Packet>)>;
using PacketListHandler = std::function<void(std::unique_ptr<PacketList>)>;
class Socket : public QObject {
Q_OBJECT
@ -48,6 +50,7 @@ public:
qint64 writeBasePacket(const BasePacket& packet, const HifiSockAddr& sockAddr);
qint64 writePacket(const Packet& packet, const HifiSockAddr& sockAddr);
qint64 writePacket(std::unique_ptr<Packet> packet, const HifiSockAddr& sockAddr);
qint64 writePacketList(std::unique_ptr<PacketList> packetList, const HifiSockAddr& sockAddr);
qint64 writeDatagram(const char* data, qint64 size, const HifiSockAddr& sockAddr);
qint64 writeDatagram(const QByteArray& datagram, const HifiSockAddr& sockAddr);
@ -56,6 +59,7 @@ public:
void setPacketFilterOperator(PacketFilterOperator filterOperator) { _packetFilterOperator = filterOperator; }
void setPacketHandler(PacketHandler handler) { _packetHandler = handler; }
void setPacketListHandler(PacketListHandler handler) { _packetListHandler = handler; }
void addUnfilteredHandler(const HifiSockAddr& senderSockAddr, BasePacketHandler handler)
{ _unfilteredHandlers[senderSockAddr] = handler; }
@ -63,6 +67,8 @@ public:
void setCongestionControlFactory(std::unique_ptr<CongestionControlVirtualFactory> ccFactory);
void connectToSendSignal(const HifiSockAddr& destinationAddr, QObject* receiver, const char* slot);
void messageReceived(std::unique_ptr<PacketList> packetList);
ConnectionStats::Stats sampleStatsForConnection(const HifiSockAddr& destination);
std::vector<HifiSockAddr> getConnectionSockAddrs();
@ -78,6 +84,7 @@ private:
QUdpSocket _udpSocket { this };
PacketFilterOperator _packetFilterOperator;
PacketHandler _packetHandler;
PacketListHandler _packetListHandler;
std::unordered_map<HifiSockAddr, BasePacketHandler> _unfilteredHandlers;
std::unordered_map<HifiSockAddr, SequenceNumber> _unreliableSequenceNumbers;