/* */ #include "DefaultPeerStorage.h" #include #include "LogFactory.h" #include "Logger.h" #include "message.h" #include "Peer.h" #include "BtRuntime.h" #include "BtSeederStateChoke.h" #include "BtLeecherStateChoke.h" #include "PieceStorage.h" #include "wallclock.h" #include "a2functional.h" #include "fmt.h" namespace aria2 { namespace { const int MAX_PEER_LIST_SIZE = 1024; } // namespace DefaultPeerStorage::DefaultPeerStorage() : removedPeerSessionDownloadLength_(0), removedPeerSessionUploadLength_(0), seederStateChoke_(new BtSeederStateChoke()), leecherStateChoke_(new BtLeecherStateChoke()), lastTransferStatMapUpdated_(0) {} DefaultPeerStorage::~DefaultPeerStorage() { delete seederStateChoke_; delete leecherStateChoke_; } namespace { class FindIdenticalPeer { private: SharedHandle peer_; public: FindIdenticalPeer(const SharedHandle& peer):peer_(peer) {} bool operator()(const SharedHandle& peer) const { return (*peer_ == *peer) || ((peer_->getIPAddress() == peer->getIPAddress()) && (peer_->getPort() == peer->getPort())); } }; } // namespace bool DefaultPeerStorage::isPeerAlreadyAdded(const SharedHandle& peer) { return std::find_if(peers_.begin(), peers_.end(), FindIdenticalPeer(peer)) != peers_.end(); } namespace { size_t calculateMaxPeerListSize(const SharedHandle& btRuntime) { if(!btRuntime) { return MAX_PEER_LIST_SIZE; } return btRuntime->getMaxPeers() == 0 ? MAX_PEER_LIST_SIZE : btRuntime->getMaxPeers()+(btRuntime->getMaxPeers() >> 2); } } // namespace bool DefaultPeerStorage::addPeer(const SharedHandle& peer) { if(isPeerAlreadyAdded(peer)) { A2_LOG_DEBUG(fmt("Adding %s:%u is rejected because it has been already" " added.", peer->getIPAddress().c_str(), peer->getPort())); return false; } size_t maxPeerListSize = calculateMaxPeerListSize(btRuntime_); if(peers_.size() >= maxPeerListSize) { deleteUnusedPeer(peers_.size()-maxPeerListSize+1); } peers_.push_front(peer); return true; } void DefaultPeerStorage::addPeer(const std::vector >& peers) { for(std::vector >::const_iterator itr = peers.begin(), eoi = peers.end(); itr != eoi; ++itr) { const SharedHandle& peer = *itr; if(addPeer(peer)) { A2_LOG_DEBUG(fmt(MSG_ADDING_PEER, peer->getIPAddress().c_str(), peer->getPort())); } } } void DefaultPeerStorage::addDroppedPeer(const SharedHandle& peer) { droppedPeers_.push_front(peer); if(droppedPeers_.size() > 50) { droppedPeers_.pop_back(); } } const std::deque >& DefaultPeerStorage::getPeers() { return peers_; } const std::deque >& DefaultPeerStorage::getDroppedPeers() { return droppedPeers_; } namespace { class FindFinePeer { public: bool operator()(const SharedHandle& peer) const { return peer->unused() && peer->isGood(); } }; } // namespace SharedHandle DefaultPeerStorage::getUnusedPeer() { std::deque >::const_iterator itr = std::find_if(peers_.begin(), peers_.end(), FindFinePeer()); if(itr == peers_.end()) { return SharedHandle(); } else { return *itr; } } namespace { class FindPeer { private: std::string ipaddr; uint16_t port; public: FindPeer(const std::string& ipaddr, uint16_t port): ipaddr(ipaddr), port(port) {} bool operator()(const SharedHandle& peer) const { return ipaddr == peer->getIPAddress() && port == peer->getPort(); } }; } // namespace SharedHandle DefaultPeerStorage::getPeer(const std::string& ipaddr, uint16_t port) const { std::deque >::const_iterator itr = std::find_if(peers_.begin(), peers_.end(), FindPeer(ipaddr, port)); if(itr == peers_.end()) { return SharedHandle(); } else { return *itr; } } size_t DefaultPeerStorage::countPeer() const { return peers_.size(); } bool DefaultPeerStorage::isPeerAvailable() { return getUnusedPeer(); } namespace { class CollectActivePeer { private: std::vector >& activePeers_; public: CollectActivePeer(std::vector >& activePeers): activePeers_(activePeers) {} void operator()(const SharedHandle& peer) { if(peer->isActive()) { activePeers_.push_back(peer); } } }; } // namespace void DefaultPeerStorage::getActivePeers (std::vector >& activePeers) { std::for_each(peers_.begin(), peers_.end(), CollectActivePeer(activePeers)); } namespace { TransferStat calculateStatFor(const SharedHandle& peer) { TransferStat s; s.downloadSpeed = peer->calculateDownloadSpeed(); s.uploadSpeed = peer->calculateUploadSpeed(); s.sessionDownloadLength = peer->getSessionDownloadLength(); s.sessionUploadLength = peer->getSessionUploadLength(); return s; } } // namespace TransferStat DefaultPeerStorage::calculateStat() { TransferStat stat; if(lastTransferStatMapUpdated_.differenceInMillis(global::wallclock) >= 250) { A2_LOG_DEBUG("Updating TransferStat of PeerStorage"); lastTransferStatMapUpdated_ = global::wallclock; peerTransferStatMap_.clear(); std::vector > activePeers; getActivePeers(activePeers); for(std::vector >::const_iterator i = activePeers.begin(), eoi = activePeers.end(); i != eoi; ++i) { TransferStat s; s.downloadSpeed = (*i)->calculateDownloadSpeed(); s.uploadSpeed = (*i)->calculateUploadSpeed(); s.sessionDownloadLength = (*i)->getSessionDownloadLength(); s.sessionUploadLength = (*i)->getSessionUploadLength(); peerTransferStatMap_[(*i)->getID()] = calculateStatFor(*i); stat += s; } cachedTransferStat_ = stat; } else { stat = cachedTransferStat_; } stat.sessionDownloadLength += removedPeerSessionDownloadLength_; stat.sessionUploadLength += removedPeerSessionUploadLength_; stat.setAllTimeUploadLength(btRuntime_->getUploadLengthAtStartup()+ stat.getSessionUploadLength()); return stat; } void DefaultPeerStorage::updateTransferStatFor(const SharedHandle& peer) { A2_LOG_DEBUG(fmt("Updating TransferStat for peer %s", peer->getID().c_str())); std::map::iterator itr = peerTransferStatMap_.find(peer->getID()); if(itr == peerTransferStatMap_.end()) { return; } cachedTransferStat_ -= (*itr).second; TransferStat s = calculateStatFor(peer); cachedTransferStat_ += s; (*itr).second = s; } TransferStat DefaultPeerStorage::getTransferStatFor (const SharedHandle& peer) { std::map::const_iterator itr = peerTransferStatMap_.find(peer->getID()); if(itr == peerTransferStatMap_.end()) { return TransferStat(); } else { return (*itr).second; } } void DefaultPeerStorage::deleteUnusedPeer(size_t delSize) { std::deque > temp; for(std::deque >::const_reverse_iterator itr = peers_.rbegin(), eoi = peers_.rend(); itr != eoi; ++itr) { const SharedHandle& p = *itr; if(p->unused() && delSize > 0) { onErasingPeer(p); --delSize; } else { temp.push_front(p); } } peers_.swap(temp); } void DefaultPeerStorage::onErasingPeer(const SharedHandle& peer) {} void DefaultPeerStorage::onReturningPeer(const SharedHandle& peer) { if(peer->isActive()) { TransferStat removedStat(calculateStatFor(peer)); removedPeerSessionDownloadLength_ += removedStat.getSessionDownloadLength(); removedPeerSessionUploadLength_ += removedStat.getSessionUploadLength(); cachedTransferStat_ -= removedStat; if(peer->isDisconnectedGracefully() && !peer->isIncomingPeer()) { peer->startBadCondition(); addDroppedPeer(peer); } // Execute choking algorithm if unchoked and interested peer is // disconnected. if(!peer->amChoking() && peer->peerInterested()) { executeChoke(); } } } void DefaultPeerStorage::returnPeer(const SharedHandle& peer) { std::deque >::iterator itr = std::find_if(peers_.begin(), peers_.end(), derefEqual(peer)); if(itr == peers_.end()) { A2_LOG_DEBUG(fmt("Cannot find peer %s:%u in PeerStorage.", peer->getIPAddress().c_str(), peer->getPort())); } else { peers_.erase(itr); onReturningPeer(peer); onErasingPeer(peer); } } bool DefaultPeerStorage::chokeRoundIntervalElapsed() { const time_t CHOKE_ROUND_INTERVAL = 10; if(pieceStorage_->downloadFinished()) { return seederStateChoke_->getLastRound(). difference(global::wallclock) >= CHOKE_ROUND_INTERVAL; } else { return leecherStateChoke_->getLastRound(). difference(global::wallclock) >= CHOKE_ROUND_INTERVAL; } } void DefaultPeerStorage::executeChoke() { std::vector > activePeers; getActivePeers(activePeers); if(pieceStorage_->downloadFinished()) { return seederStateChoke_->executeChoke(activePeers); } else { return leecherStateChoke_->executeChoke(activePeers); } } void DefaultPeerStorage::setPieceStorage(const SharedHandle& ps) { pieceStorage_ = ps; } void DefaultPeerStorage::setBtRuntime(const SharedHandle& btRuntime) { btRuntime_ = btRuntime; } } // namespace aria2