allow the assignment-client to specify the type of assignment it desires

This commit is contained in:
Stephen Birarda 2013-09-17 12:09:10 -07:00
parent 74a0b8a7b5
commit 2cfa45e4f8
3 changed files with 71 additions and 44 deletions

View file

@ -31,6 +31,7 @@ const char CHILD_TARGET_NAME[] = "assignment-client";
pid_t* childForks = NULL;
sockaddr_in customAssignmentSocket = {};
int numForks = 0;
Assignment::Type overiddenAssignmentType = Assignment::AllTypes;
void childClient() {
// this is one of the child forks or there is a single assignment client, continue assignment-client execution
@ -56,8 +57,8 @@ void childClient() {
sockaddr_in senderSocket = {};
// create a request assignment, accept all assignments, pass the desired pool (if it exists)
Assignment requestAssignment(Assignment::RequestCommand, Assignment::AllTypes);
// create a request assignment, accept assignments defined by the overidden type
Assignment requestAssignment(Assignment::RequestCommand, ::overiddenAssignmentType);
while (true) {
if (usecTimestampNow() - usecTimestamp(&lastRequest) >= ASSIGNMENT_REQUEST_INTERVAL_USECS) {
@ -211,6 +212,15 @@ int main(int argc, const char* argv[]) {
::customAssignmentSocket = socketForHostnameAndHostOrderPort(customAssignmentServerHostname, assignmentServerPort);
}
const char ASSIGNMENT_TYPE_OVVERIDE_OPTION[] = "-t";
const char* assignmentTypeString = getCmdOption(argc, argv, ASSIGNMENT_TYPE_OVVERIDE_OPTION);
if (assignmentTypeString) {
// the user is asking to only be assigned to a particular type of assignment
// so set that as the ::overridenAssignmentType to be used in requests
::overiddenAssignmentType = (Assignment::Type) atoi(assignmentTypeString);
}
const char* NUM_FORKS_PARAMETER = "-n";
const char* numForksString = getCmdOption(argc, argv, NUM_FORKS_PARAMETER);

View file

@ -62,30 +62,39 @@ int main(int argc, const char* argv[]) {
continue;
}
// check if the requestor is on the same network as the destination for the assignment
if (senderSocket.sin_addr.s_addr ==
((sockaddr_in*) (*assignment)->getAttachedPublicSocket())->sin_addr.s_addr) {
// if this is the case we remove the public socket on the assignment by setting it to NULL
// this ensures the local IP and port sent to the requestor is the local address of destination
(*assignment)->setAttachedPublicSocket(NULL);
if (requestAssignment.getType() == Assignment::AllTypes ||
(*assignment)->getType() == requestAssignment.getType()) {
// give this assignment out, either we have a type match or the requestor has said they will
// take all types
// check if the requestor is on the same network as the destination for the assignment
if (senderSocket.sin_addr.s_addr ==
((sockaddr_in*) (*assignment)->getAttachedPublicSocket())->sin_addr.s_addr) {
// if this is the case we remove the public socket on the assignment by setting it to NULL
// this ensures the local IP and port sent to the requestor is the local address of destination
(*assignment)->setAttachedPublicSocket(NULL);
}
int numAssignmentBytes = (*assignment)->packToBuffer(assignmentPacket + numSendHeaderBytes);
// send the assignment
serverSocket.send((sockaddr*) &senderSocket,
assignmentPacket,
numSendHeaderBytes + numAssignmentBytes);
// delete this assignment now that it has been sent out
delete *assignment;
// remove it from the deque and make the iterator the next assignment
assignmentQueue.erase(assignment);
// stop looping - we've handed out an assignment
break;
} else {
// push forward the iterator to check the next assignment
assignment++;
}
int numAssignmentBytes = (*assignment)->packToBuffer(assignmentPacket + numSendHeaderBytes);
// send the assignment
serverSocket.send((sockaddr*) &senderSocket,
assignmentPacket,
numSendHeaderBytes + numAssignmentBytes);
// delete this assignment now that it has been sent out
delete *assignment;
// remove it from the deque and make the iterator the next assignment
assignmentQueue.erase(assignment);
// stop looping - we've handed out an assignment
break;
}
}
} else if (senderData[0] == PACKET_TYPE_CREATE_ASSIGNMENT && packetVersionMatch(senderData)) {

View file

@ -330,31 +330,39 @@ int main(int argc, const char* argv[]) {
std::deque<Assignment*>::iterator assignment = ::assignmentQueue.begin();
while (assignment != ::assignmentQueue.end()) {
// construct the requested assignment from the packet data
Assignment requestAssignment(packetData, receivedBytes);
// give this assignment out, no conditions stop us from giving it to the local assignment client
int numHeaderBytes = populateTypeAndVersion(broadcastPacket, PACKET_TYPE_CREATE_ASSIGNMENT);
int numAssignmentBytes = (*assignment)->packToBuffer(broadcastPacket + numHeaderBytes);
nodeList->getNodeSocket()->send((sockaddr*) &nodePublicAddress,
broadcastPacket,
numHeaderBytes + numAssignmentBytes);
if ((*assignment)->getType() == Assignment::AgentType) {
// if this is a script assignment we need to delete it to avoid a memory leak
// or if there is more than one instance to send out, simpy decrease the number of instances
if ((*assignment)->getNumberOfInstances() > 1) {
(*assignment)->decrementNumberOfInstances();
if (requestAssignment.getType() == Assignment::AllTypes ||
(*assignment)->getType() == requestAssignment.getType()) {
// give this assignment out, either the type matches or the requestor said they will take any
int numHeaderBytes = populateTypeAndVersion(broadcastPacket, PACKET_TYPE_CREATE_ASSIGNMENT);
int numAssignmentBytes = (*assignment)->packToBuffer(broadcastPacket + numHeaderBytes);
nodeList->getNodeSocket()->send((sockaddr*) &nodePublicAddress,
broadcastPacket,
numHeaderBytes + numAssignmentBytes);
if ((*assignment)->getType() == Assignment::AgentType) {
// if this is a script assignment we need to delete it to avoid a memory leak
// or if there is more than one instance to send out, simpy decrease the number of instances
if ((*assignment)->getNumberOfInstances() > 1) {
(*assignment)->decrementNumberOfInstances();
} else {
::assignmentQueue.erase(assignment);
delete *assignment;
}
} else {
// remove the assignment from the queue
::assignmentQueue.erase(assignment);
delete *assignment;
}
// stop looping, we've handed out an assignment
break;
} else {
// remove the assignment from the queue
::assignmentQueue.erase(assignment);
// push forward the iterator to check the next assignment
assignment++;
}
// stop looping, we've handed out an assignment
break;
}
::assignmentQueueMutex.unlock();