Add GID => RequestGroup index for faster access to RequestGroup

pull/25/merge
Tatsuhiro Tsujikawa 2012-07-31 23:55:51 +09:00
parent 329a17b3f9
commit 2795176d79
5 changed files with 168 additions and 102 deletions

View File

@ -124,6 +124,7 @@ a2_gid_t RequestGroup::gidCounter_ = 0;
RequestGroup::RequestGroup(const SharedHandle<Option>& option)
: gid_(newGID()),
state_(STATE_WAITING),
option_(option),
numConcurrentCommand_(option->getAsInt(PREF_SPLIT)),
numStreamConnection_(0),

View File

@ -82,11 +82,19 @@ public:
SHUTDOWN_SIGNAL,
USER_REQUEST
};
enum State {
// Waiting in the reserved queue
STATE_WAITING,
// Download has begun
STATE_ACTIVE
};
private:
static a2_gid_t gidCounter_;
a2_gid_t gid_;
int state_;
SharedHandle<Option> option_;
int numConcurrentCommand_;
@ -550,6 +558,16 @@ public:
return metadataInfo_;
}
int getState() const
{
return state_;
}
void setState(int state)
{
state_ = state;
}
static void resetGIDCounter() { gidCounter_ = 0; }
static a2_gid_t newGID();

View File

@ -101,7 +101,9 @@ RequestGroupMan::RequestGroupMan
removedErrorResult_(0),
removedLastErrorResult_(error_code::FINISHED),
maxDownloadResult_(option->getAsInt(PREF_MAX_DOWNLOAD_RESULT))
{}
{
addRequestGroupIndex(requestGroups);
}
RequestGroupMan::~RequestGroupMan() {}
@ -123,6 +125,7 @@ void RequestGroupMan::addReservedGroup
(const std::vector<SharedHandle<RequestGroup> >& groups)
{
requestQueueCheck();
addRequestGroupIndex(groups);
reservedGroups_.insert(reservedGroups_.end(), groups.begin(), groups.end());
}
@ -130,6 +133,7 @@ void RequestGroupMan::addReservedGroup
(const SharedHandle<RequestGroup>& group)
{
requestQueueCheck();
addRequestGroupIndex(group);
reservedGroups_.push_back(group);
}
@ -137,6 +141,7 @@ void RequestGroupMan::insertReservedGroup
(size_t pos, const std::vector<SharedHandle<RequestGroup> >& groups)
{
requestQueueCheck();
addRequestGroupIndex(groups);
reservedGroups_.insert
(reservedGroups_.begin()+std::min(reservedGroups_.size(), pos),
groups.begin(), groups.end());
@ -146,10 +151,37 @@ void RequestGroupMan::insertReservedGroup
(size_t pos, const SharedHandle<RequestGroup>& group)
{
requestQueueCheck();
addRequestGroupIndex(group);
reservedGroups_.insert
(reservedGroups_.begin()+std::min(reservedGroups_.size(), pos), group);
}
void RequestGroupMan::addRequestGroupIndex
(const SharedHandle<RequestGroup>& group)
{
assert(groupIndex_.count(group->getGID()) == 0);
groupIndex_[group->getGID()] = group;
}
void RequestGroupMan::addRequestGroupIndex
(const std::vector<SharedHandle<RequestGroup> >& groups)
{
for(std::vector<SharedHandle<RequestGroup> >::const_iterator i =
groups.begin(); i != groups.end(); ++i) {
addRequestGroupIndex(*i);
}
}
namespace {
void removeRequestGroupIndex
(std::map<a2_gid_t, SharedHandle<RequestGroup> >& groupIndex,
const SharedHandle<RequestGroup>& group)
{
assert(group->getGID() == 1);
groupIndex.erase(group->getGID());
}
} // namespace
size_t RequestGroupMan::countRequestGroup() const
{
return requestGroups_.size();
@ -180,24 +212,41 @@ Iterator findByGID(Iterator first, Iterator last, a2_gid_t gid)
SharedHandle<RequestGroup>
RequestGroupMan::findRequestGroup(a2_gid_t gid) const
{
std::deque<SharedHandle<RequestGroup> >::const_iterator i =
findByGID(requestGroups_.begin(), requestGroups_.end(), gid);
if(i == requestGroups_.end()) {
return SharedHandle<RequestGroup>();
SharedHandle<RequestGroup> res = findGroup(gid);
if(res) {
if(res->getState() == RequestGroup::STATE_ACTIVE) {
return res;
} else {
return SharedHandle<RequestGroup>();
}
} else {
return *i;
return res;
}
}
SharedHandle<RequestGroup>
RequestGroupMan::findReservedGroup(a2_gid_t gid) const
{
std::deque<SharedHandle<RequestGroup> >::const_iterator i =
findByGID(reservedGroups_.begin(), reservedGroups_.end(), gid);
if(i == reservedGroups_.end()) {
return SharedHandle<RequestGroup>();
SharedHandle<RequestGroup> res = findGroup(gid);
if(res) {
if(res->getState() == RequestGroup::STATE_WAITING) {
return res;
} else {
return SharedHandle<RequestGroup>();
}
} else {
return *i;
return res;
}
}
SharedHandle<RequestGroup> RequestGroupMan::findGroup(a2_gid_t gid) const
{
std::map<a2_gid_t, SharedHandle<RequestGroup> >::const_iterator i =
groupIndex_.find(gid);
if(i != groupIndex_.end()) {
return (*i).second;
} else {
return SharedHandle<RequestGroup>();
}
}
@ -250,6 +299,7 @@ bool RequestGroupMan::removeReservedGroup(a2_gid_t gid)
if(i == reservedGroups_.end()) {
return false;
} else {
removeRequestGroupIndex(groupIndex_, *i);
reservedGroups_.erase(i);
return true;
}
@ -300,7 +350,7 @@ private:
DownloadEngine* e_;
std::deque<SharedHandle<DownloadResult> >& downloadResults_;
std::deque<SharedHandle<RequestGroup> >& reservedGroups_;
Logger* logger_;
std::map<a2_gid_t, SharedHandle<RequestGroup> >& groupIndex_;
void saveSignature(const SharedHandle<RequestGroup>& group)
{
@ -321,10 +371,12 @@ public:
ProcessStoppedRequestGroup
(DownloadEngine* e,
std::deque<SharedHandle<DownloadResult> >& downloadResults,
std::deque<SharedHandle<RequestGroup> >& reservedGroups)
std::deque<SharedHandle<RequestGroup> >& reservedGroups,
std::map<a2_gid_t, SharedHandle<RequestGroup> >& groupIndex)
: e_(e),
downloadResults_(downloadResults),
reservedGroups_(reservedGroups)
reservedGroups_(reservedGroups),
groupIndex_(groupIndex)
{}
void operator()(const SharedHandle<RequestGroup>& group)
@ -396,6 +448,7 @@ public:
A2_LOG_ERROR_EX(EX_EXCEPTION_CAUGHT, ex);
}
if(group->isPauseRequested()) {
group->setState(RequestGroup::STATE_WAITING);
reservedGroups_.push_front(group);
group->releaseRuntimeResource(e_);
group->setForceHaltRequested(false);
@ -409,6 +462,7 @@ public:
e_->getRequestGroupMan()->addDownloadResult(dr);
executeStopHook(group, e_->getOption(), dr->result);
group->releaseRuntimeResource(e_);
removeRequestGroupIndex(groupIndex_, group);
}
}
}
@ -482,7 +536,7 @@ void RequestGroupMan::removeStoppedGroup(DownloadEngine* e)
std::for_each(requestGroups_.begin(), requestGroups_.end(),
ProcessStoppedRequestGroup
(e, downloadResults_, reservedGroups_));
(e, downloadResults_, reservedGroups_, groupIndex_));
std::deque<SharedHandle<RequestGroup> >::iterator i =
std::remove_if(requestGroups_.begin(),
requestGroups_.end(),
@ -540,6 +594,7 @@ void RequestGroupMan::fillRequestGroupFromReserver(DownloadEngine* e)
bool ok = createRequestGroupFromUriListParser(groups, option_,
uriListParser_.get());
if(ok) {
addRequestGroupIndex(groups);
reservedGroups_.insert(reservedGroups_.end(), groups.begin(),
groups.end());
} else {
@ -567,6 +622,7 @@ void RequestGroupMan::fillRequestGroupFromReserver(DownloadEngine* e)
if(commands.empty()) {
requestQueueCheck();
}
groupToAdd->setState(RequestGroup::STATE_ACTIVE);
requestGroups_.push_back(groupToAdd);
++count;
e->addCommand(commands);
@ -580,6 +636,7 @@ void RequestGroupMan::fillRequestGroupFromReserver(DownloadEngine* e)
groupToAdd->setLastErrorCode(ex.getErrorCode());
// We add groupToAdd to e in order to it is processed in
// removeStoppedGroup().
groupToAdd->setState(RequestGroup::STATE_ACTIVE);
requestGroups_.push_back(groupToAdd);
requestQueueCheck();
}

View File

@ -40,6 +40,7 @@
#include <string>
#include <deque>
#include <vector>
#include <map>
#include "SharedHandle.h"
#include "DownloadResult.h"
@ -61,6 +62,8 @@ class RequestGroupMan {
private:
std::deque<SharedHandle<RequestGroup> > requestGroups_;
std::deque<SharedHandle<RequestGroup> > reservedGroups_;
// GID => RequestGroup index for faster retrieval.
std::map<a2_gid_t, SharedHandle<RequestGroup> > groupIndex_;
std::deque<SharedHandle<DownloadResult> > downloadResults_;
int maxSimultaneousDownloads_;
@ -100,6 +103,10 @@ private:
void configureRequestGroup
(const SharedHandle<RequestGroup>& requestGroup) const;
void addRequestGroupIndex(const SharedHandle<RequestGroup>& group);
void addRequestGroupIndex
(const std::vector<SharedHandle<RequestGroup> >& groups);
public:
RequestGroupMan(const std::vector<SharedHandle<RequestGroup> >& requestGroups,
int maxSimultaneousDownloads,
@ -121,6 +128,9 @@ public:
void fillRequestGroupFromReserver(DownloadEngine* e);
// Note that this method does not call addRequestGroupIndex(). This
// method should be considered as private, but exposed for unit
// testing purpose.
void addRequestGroup(const SharedHandle<RequestGroup>& group);
void addReservedGroup(const std::vector<SharedHandle<RequestGroup> >& groups);
@ -141,6 +151,8 @@ public:
return requestGroups_;
}
// Note: Use only for unit testing. Use findGroup() and test
// RequestGroup::getState() instead.
SharedHandle<RequestGroup> findRequestGroup(a2_gid_t gid) const;
const std::deque<SharedHandle<RequestGroup> >& getReservedGroups() const
@ -148,8 +160,14 @@ public:
return reservedGroups_;
}
// Note: Use only for unit testing. Use findGroup() and test
// RequestGroup::getState() instead.
SharedHandle<RequestGroup> findReservedGroup(a2_gid_t gid) const;
// Returns RequestGroup object whose gid is gid. This method returns
// RequestGroup either in requestGroups_ or reservedGroups_.
SharedHandle<RequestGroup> findGroup(a2_gid_t gid) const;
enum HOW {
POS_SET,
POS_CUR,

View File

@ -167,18 +167,6 @@ addRequestGroup(const SharedHandle<RequestGroup>& group,
}
} // namespace
namespace {
SharedHandle<RequestGroup>
findRequestGroup(const SharedHandle<RequestGroupMan>& rgman, a2_gid_t gid)
{
SharedHandle<RequestGroup> group = rgman->findRequestGroup(gid);
if(!group) {
group = rgman->findReservedGroup(gid);
}
return group;
}
} // namespace
namespace {
bool checkPosParam(const Integer* posParam)
{
@ -376,25 +364,25 @@ SharedHandle<ValueBase> removeDownload
const String* gidParam = checkRequiredParam<String>(req, 0);
a2_gid_t gid = str2Gid(gidParam);
SharedHandle<RequestGroup> group =
e->getRequestGroupMan()->findRequestGroup(gid);
if(!group) {
group = e->getRequestGroupMan()->findReservedGroup(gid);
if(!group) {
throw DL_ABORT_EX(fmt("Active Download not found for GID#%" PRId64 "", gid));
}
if(group->isDependencyResolved()) {
e->getRequestGroupMan()->removeReservedGroup(gid);
SharedHandle<RequestGroup> group = e->getRequestGroupMan()->findGroup(gid);
if(group) {
if(group->getState() == RequestGroup::STATE_ACTIVE) {
if(forceRemove) {
group->setForceHaltRequested(true, RequestGroup::USER_REQUEST);
} else {
group->setHaltRequested(true, RequestGroup::USER_REQUEST);
}
e->setRefreshInterval(0);
} else {
throw DL_ABORT_EX(fmt("GID#%" PRId64 " cannot be removed now", gid));
if(group->isDependencyResolved()) {
e->getRequestGroupMan()->removeReservedGroup(gid);
} else {
throw DL_ABORT_EX(fmt("GID#%" PRId64 " cannot be removed now", gid));
}
}
} else {
if(forceRemove) {
group->setForceHaltRequested(true, RequestGroup::USER_REQUEST);
} else {
group->setHaltRequested(true, RequestGroup::USER_REQUEST);
}
e->setRefreshInterval(0);
throw DL_ABORT_EX(fmt("Active Download not found for GID#%" PRId64,
gid));
}
return createGIDResponse(gid);
}
@ -445,19 +433,15 @@ SharedHandle<ValueBase> pauseDownload
const String* gidParam = checkRequiredParam<String>(req, 0);
a2_gid_t gid = str2Gid(gidParam);
bool reserved = false;
SharedHandle<RequestGroup> group =
e->getRequestGroupMan()->findRequestGroup(gid);
if(!group) {
reserved = true;
group = e->getRequestGroupMan()->findReservedGroup(gid);
}
if(group && pauseRequestGroup(group, reserved, forcePause)) {
e->setRefreshInterval(0);
return createGIDResponse(gid);
} else {
throw DL_ABORT_EX(fmt("GID#%" PRId64 " cannot be paused now", gid));
SharedHandle<RequestGroup> group = e->getRequestGroupMan()->findGroup(gid);
if(group) {
bool reserved = group->getState() == RequestGroup::STATE_WAITING;
if(pauseRequestGroup(group, reserved, forcePause)) {
e->setRefreshInterval(0);
return createGIDResponse(gid);
}
}
throw DL_ABORT_EX(fmt("GID#%" PRId64 " cannot be paused now", gid));
}
} // namespace
@ -517,9 +501,10 @@ SharedHandle<ValueBase> UnpauseRpcMethod::process
const String* gidParam = checkRequiredParam<String>(req, 0);
a2_gid_t gid = str2Gid(gidParam);
SharedHandle<RequestGroup> group =
e->getRequestGroupMan()->findReservedGroup(gid);
if(!group || !group->isPauseRequested()) {
SharedHandle<RequestGroup> group = e->getRequestGroupMan()->findGroup(gid);
if(!group ||
group->getState() != RequestGroup::STATE_WAITING ||
!group->isPauseRequested()) {
throw DL_ABORT_EX(fmt("GID#%" PRId64 " cannot be unpaused now", gid));
} else {
group->setPauseRequested(false);
@ -917,13 +902,13 @@ SharedHandle<ValueBase> GetFilesRpcMethod::process
a2_gid_t gid = str2Gid(gidParam);
SharedHandle<List> files = List::g();
SharedHandle<RequestGroup> group =
findRequestGroup(e->getRequestGroupMan(), gid);
SharedHandle<RequestGroup> group = e->getRequestGroupMan()->findGroup(gid);
if(!group) {
SharedHandle<DownloadResult> dr =
e->getRequestGroupMan()->findDownloadResult(gid);
if(!dr) {
throw DL_ABORT_EX(fmt("No file data is available for GID#%" PRId64 "", gid));
throw DL_ABORT_EX(fmt("No file data is available for GID#%" PRId64 "",
gid));
} else {
createFileEntry(files, dr->fileEntries.begin(), dr->fileEntries.end(),
dr->totalLength, dr->pieceLength, dr->bitfield);
@ -947,10 +932,9 @@ SharedHandle<ValueBase> GetUrisRpcMethod::process
const String* gidParam = checkRequiredParam<String>(req, 0);
a2_gid_t gid = str2Gid(gidParam);
SharedHandle<RequestGroup> group =
findRequestGroup(e->getRequestGroupMan(), gid);
SharedHandle<RequestGroup> group = e->getRequestGroupMan()->findGroup(gid);
if(!group) {
throw DL_ABORT_EX(fmt("No URI data is available for GID#%" PRId64 "", gid));
throw DL_ABORT_EX(fmt("No URI data is available for GID#%" PRId64, gid));
}
SharedHandle<List> uriList = List::g();
// TODO Current implementation just returns first FileEntry's URIs.
@ -967,10 +951,9 @@ SharedHandle<ValueBase> GetPeersRpcMethod::process
const String* gidParam = checkRequiredParam<String>(req, 0);
a2_gid_t gid = str2Gid(gidParam);
SharedHandle<RequestGroup> group =
findRequestGroup(e->getRequestGroupMan(), gid);
SharedHandle<RequestGroup> group = e->getRequestGroupMan()->findGroup(gid);
if(!group) {
throw DL_ABORT_EX(fmt("No peer data is available for GID#%" PRId64 "", gid));
throw DL_ABORT_EX(fmt("No peer data is available for GID#%" PRId64, gid));
}
SharedHandle<List> peers = List::g();
const SharedHandle<BtObject>& btObject =
@ -993,32 +976,27 @@ SharedHandle<ValueBase> TellStatusRpcMethod::process
std::vector<std::string> keys;
toStringList(std::back_inserter(keys), keysParam);
SharedHandle<RequestGroup> group =
e->getRequestGroupMan()->findRequestGroup(gid);
SharedHandle<RequestGroup> group = e->getRequestGroupMan()->findGroup(gid);
SharedHandle<Dict> entryDict = Dict::g();
if(!group) {
group = e->getRequestGroupMan()->findReservedGroup(gid);
if(!group) {
SharedHandle<DownloadResult> ds =
e->getRequestGroupMan()->findDownloadResult(gid);
if(!ds) {
throw DL_ABORT_EX(fmt("No such download for GID#%" PRId64 "", gid));
}
gatherStoppedDownload(entryDict, ds, keys);
} else {
if(requested_key(keys, KEY_STATUS)) {
SharedHandle<DownloadResult> ds =
e->getRequestGroupMan()->findDownloadResult(gid);
if(!ds) {
throw DL_ABORT_EX(fmt("No such download for GID#%" PRId64 "", gid));
}
gatherStoppedDownload(entryDict, ds, keys);
} else {
if(requested_key(keys, KEY_STATUS)) {
if(group->getState() == RequestGroup::STATE_ACTIVE) {
entryDict->put(KEY_STATUS, VLB_ACTIVE);
} else {
if(group->isPauseRequested()) {
entryDict->put(KEY_STATUS, VLB_PAUSED);
} else {
entryDict->put(KEY_STATUS, VLB_WAITING);
}
}
gatherProgress(entryDict, group, e, keys);
}
} else {
if(requested_key(keys, KEY_STATUS)) {
entryDict->put(KEY_STATUS, VLB_ACTIVE);
}
gatherProgress(entryDict, group, e, keys);
}
@ -1188,20 +1166,17 @@ SharedHandle<ValueBase> ChangeOptionRpcMethod::process
const Dict* optsParam = checkRequiredParam<Dict>(req, 1);
a2_gid_t gid = str2Gid(gidParam);
SharedHandle<RequestGroup> group =
e->getRequestGroupMan()->findRequestGroup(gid);
SharedHandle<RequestGroup> group = e->getRequestGroupMan()->findGroup(gid);
Option option;
if(group) {
gatherChangeableOption(&option, optsParam);
if(group->getState() == RequestGroup::STATE_ACTIVE) {
gatherChangeableOption(&option, optsParam);
} else {
gatherChangeableOptionForReserved(&option, optsParam);
}
changeOption(group, option, e);
} else {
group = e->getRequestGroupMan()->findReservedGroup(gid);
if(group) {
gatherChangeableOptionForReserved(&option, optsParam);
changeOption(group, option, e);
} else {
throw DL_ABORT_EX(fmt("Cannot change option for GID#%" PRId64 "", gid));
}
throw DL_ABORT_EX(fmt("Cannot change option for GID#%" PRId64, gid));
}
return VLB_OK;
}
@ -1286,8 +1261,7 @@ SharedHandle<ValueBase> GetOptionRpcMethod::process
const String* gidParam = checkRequiredParam<String>(req, 0);
a2_gid_t gid = str2Gid(gidParam);
SharedHandle<RequestGroup> group =
findRequestGroup(e->getRequestGroupMan(), gid);
SharedHandle<RequestGroup> group = e->getRequestGroupMan()->findGroup(gid);
if(!group) {
throw DL_ABORT_EX(fmt("Cannot get option for GID#%" PRId64 "", gid));
}
@ -1354,10 +1328,9 @@ SharedHandle<ValueBase> GetServersRpcMethod::process
const String* gidParam = checkRequiredParam<String>(req, 0);
a2_gid_t gid = str2Gid(gidParam);
SharedHandle<RequestGroup> group =
e->getRequestGroupMan()->findRequestGroup(gid);
if(!group) {
throw DL_ABORT_EX(fmt("No active download for GID#%" PRId64 "", gid));
SharedHandle<RequestGroup> group = e->getRequestGroupMan()->findGroup(gid);
if(!group || group->getState() != RequestGroup::STATE_ACTIVE) {
throw DL_ABORT_EX(fmt("No active download for GID#%" PRId64, gid));
}
const SharedHandle<DownloadContext>& dctx = group->getDownloadContext();
const std::vector<SharedHandle<FileEntry> >& files = dctx->getFileEntries();
@ -1401,8 +1374,7 @@ SharedHandle<ValueBase> ChangeUriRpcMethod::process
bool posGiven = checkPosParam(posParam);
size_t pos = posGiven ? posParam->i() : 0;
size_t index = indexParam->i()-1;
SharedHandle<RequestGroup> group =
findRequestGroup(e->getRequestGroupMan(), gid);
SharedHandle<RequestGroup> group = e->getRequestGroupMan()->findGroup(gid);
if(!group) {
throw DL_ABORT_EX(fmt("Cannot remove URIs from GID#%" PRId64 "", gid));
}