/* */ #include "DownloadEngine.h" #include "Util.h" #include "LogFactory.h" #include "Time.h" #include #include #include #include #include #include using namespace std; DownloadEngine::DownloadEngine():noWait(false), segmentMan(NULL) { logger = LogFactory::getInstance(); } DownloadEngine::~DownloadEngine() { if(segmentMan != NULL) { delete segmentMan; } } void DownloadEngine::cleanQueue() { for_each(commands.begin(), commands.end(), Deleter()); commands.clear(); } class FindCommand { private: CommandUuid uuid; public: FindCommand(const CommandUuid& uuid):uuid(uuid) {} bool operator()(const Command* command) { if(command->getUuid() == uuid) { return true; } else { return false; } } }; void DownloadEngine::run() { initStatistics(); Time cp; CommandUuids activeCommandUuids; while(!commands.empty()) { if(cp.elapsed(1)) { cp.reset(); int max = commands.size(); for(int i = 0; i < max; i++) { Command* com = commands.front(); commands.pop_front(); if(com->execute()) { delete com; } } } else { for(CommandUuids::iterator itr = activeCommandUuids.begin(); itr != activeCommandUuids.end(); itr++) { Commands::iterator comItr = find_if(commands.begin(), commands.end(), FindCommand(*itr)); assert(comItr != commands.end()); Command* com = *comItr; commands.erase(comItr); if(com->execute()) { delete com; } } } afterEachIteration(); activeCommandUuids.clear(); if(!noWait && !commands.empty()) { waitData(activeCommandUuids); } noWait = false; calculateStatistics(); } onEndOfRun(); } void DownloadEngine::shortSleep() const { struct timeval tv; tv.tv_sec = 0; tv.tv_usec = 1000; fd_set rfds; FD_ZERO(&rfds); select(0, &rfds, NULL, NULL, &tv); } class SetDescriptor { private: fd_set* fds_ptr; int* max_ptr; public: SetDescriptor(int* max_ptr, fd_set* fds_ptr) :fds_ptr(fds_ptr), max_ptr(max_ptr) {} void operator()(const SockCmdMap::value_type& pa) { int fd = pa.first->getSockfd(); FD_SET(fd, fds_ptr); if(*max_ptr < fd) { *max_ptr = fd; } } }; class AccumulateActiveCommandUuid { private: CommandUuids* activeCommandUuids_ptr; fd_set* fds_ptr; public: AccumulateActiveCommandUuid(CommandUuids* activeCommandUuids_ptr, fd_set* fds_ptr) :activeCommandUuids_ptr(activeCommandUuids_ptr), fds_ptr(fds_ptr) {} void operator()(const SockCmdMap::value_type& pa) { if(FD_ISSET(pa.first->getSockfd(), fds_ptr)) { activeCommandUuids_ptr->push_back(pa.second); } } }; void DownloadEngine::waitData(CommandUuids& activeCommandUuids) { fd_set rfds; fd_set wfds; int retval = 0; while(1) { struct timeval tv; memcpy(&rfds, &rfdset, sizeof(fd_set)); memcpy(&wfds, &wfdset, sizeof(fd_set)); tv.tv_sec = 1; tv.tv_usec = 0; retval = select(fdmax+1, &rfds, &wfds, NULL, &tv); if(retval != -1 || errno != EINTR) { break; } } if(retval > 0) { for_each(rsockmap.begin(), rsockmap.end(), AccumulateActiveCommandUuid(&activeCommandUuids, &rfds)); for_each(wsockmap.begin(), wsockmap.end(), AccumulateActiveCommandUuid(&activeCommandUuids, &wfds)); sort(activeCommandUuids.begin(), activeCommandUuids.end()); activeCommandUuids.erase(unique(activeCommandUuids.begin(), activeCommandUuids.end()), activeCommandUuids.end()); } } void DownloadEngine::updateFdSet() { fdmax = 0; FD_ZERO(&rfdset); FD_ZERO(&wfdset); for_each(rsockmap.begin(), rsockmap.end(), SetDescriptor(&fdmax, &rfdset)); for_each(wsockmap.begin(), wsockmap.end(), SetDescriptor(&fdmax, &wfdset)); } bool DownloadEngine::addSocket(SockCmdMap& sockmap, const SocketHandle& socket, const CommandUuid& commandUuid) { SockCmdMap::iterator itr = find_if(sockmap.begin(), sockmap.end(), PairFind(socket, commandUuid)); if(itr == sockmap.end()) { SockCmdMap::value_type vt(socket, commandUuid); sockmap.insert(vt); updateFdSet(); return true; } else { return false; } } bool DownloadEngine::deleteSocket(SockCmdMap& sockmap, const SocketHandle& socket, const CommandUuid& commandUuid) { SockCmdMap::iterator itr = find_if(sockmap.begin(), sockmap.end(), PairFind(socket, commandUuid)); if(itr == sockmap.end()) { return false; } else { sockmap.erase(itr); updateFdSet(); return true; } } bool DownloadEngine::addSocketForReadCheck(const SocketHandle& socket, const CommandUuid& commandUuid) { return addSocket(rsockmap, socket, commandUuid); } bool DownloadEngine::deleteSocketForReadCheck(const SocketHandle& socket, const CommandUuid& commandUuid) { return deleteSocket(rsockmap, socket, commandUuid); } bool DownloadEngine::addSocketForWriteCheck(const SocketHandle& socket, const CommandUuid& commandUuid) { return addSocket(wsockmap, socket, commandUuid); } bool DownloadEngine::deleteSocketForWriteCheck(const SocketHandle& socket, const CommandUuid& commandUuid) { return deleteSocket(wsockmap, socket, commandUuid); }