2009-07-01 Tatsuhiro Tsujikawa <t-tujikawa@users.sourceforge.net>

HTTP pipelining is now working. Fixed Segmention fault in
	BitfieldMan's copy ctor.	
	* src/AbstractCommand.cc
	* src/BitfieldMan.cc
	* src/BitfieldMan.h
	* src/DownloadCommand.cc
	* src/HttpDownloadCommand.cc
	* src/HttpRequest.cc
	* src/SegmentMan.cc
	* src/SegmentMan.h
	* test/BitfieldManTest.cc
	* test/SegmentManTest.cc
pull/1/head
Tatsuhiro Tsujikawa 2009-06-30 17:03:57 +00:00
parent cece2bc896
commit cf19dce855
11 changed files with 228 additions and 31 deletions

View File

@ -1,3 +1,18 @@
2009-07-01 Tatsuhiro Tsujikawa <t-tujikawa@users.sourceforge.net>
HTTP pipelining is now working. Fixed Segmention fault in
BitfieldMan's copy ctor.
* src/AbstractCommand.cc
* src/BitfieldMan.cc
* src/BitfieldMan.h
* src/DownloadCommand.cc
* src/HttpDownloadCommand.cc
* src/HttpRequest.cc
* src/SegmentMan.cc
* src/SegmentMan.h
* test/BitfieldManTest.cc
* test/SegmentManTest.cc
2009-06-30 Tatsuhiro Tsujikawa <t-tujikawa@users.sourceforge.net>
Removed ServerHost. Same functionality is implemented using

View File

@ -156,19 +156,29 @@ bool AbstractCommand::execute() {
if(!_requestGroup->getPieceStorage().isNull()) {
_segments.clear();
_requestGroup->getSegmentMan()->getInFlightSegment(_segments, cuid);
size_t maxSegments = req.isNull()?1:req->getMaxPipelinedRequest();
while(_segments.size() < maxSegments) {
SegmentHandle segment = _requestGroup->getSegmentMan()->getSegment(cuid);
if(segment.isNull()) {
break;
}
if(req.isNull() || req->getMaxPipelinedRequest() == 1) {
if(_segments.empty()) {
SharedHandle<Segment> segment =
_requestGroup->getSegmentMan()->getSegment(cuid);
if(!segment.isNull()) {
_segments.push_back(segment);
}
}
if(_segments.empty()) {
// TODO socket could be pooled here if pipelining is enabled...
logger->info(MSG_NO_SEGMENT_AVAILABLE, cuid);
return true;
}
} else {
size_t maxSegments = req->getMaxPipelinedRequest();
if(_segments.size() < maxSegments) {
_requestGroup->getSegmentMan()->getSegment
(_segments, cuid, _fileEntry, maxSegments);
}
if(_segments.empty()) {
return prepareForRetry(0);
}
}
}
return executeInternal();
} else if(_errorEvent) {
@ -266,12 +276,12 @@ bool AbstractCommand::prepareForRetry(time_t wait) {
}
if(!req.isNull()) {
_fileEntry->poolRequest(req);
}
if(!_segments.empty()) {
logger->debug("CUID#%d - Pooling request URI=%s",
cuid, req->getUrl().c_str());
if(!_requestGroup->getSegmentMan().isNull()) {
_requestGroup->getSegmentMan()->recognizeSegmentFor(_fileEntry);
}
}
Command* command = new CreateRequestCommand(cuid, _requestGroup, e);
if(wait == 0) {

View File

@ -96,7 +96,7 @@ BitfieldMan::BitfieldMan(const BitfieldMan& bitfieldMan)
memcpy(bitfield, bitfieldMan.bitfield, bitfieldLength);
memcpy(useBitfield, bitfieldMan.useBitfield, bitfieldLength);
filterEnabled = bitfieldMan.filterEnabled;
if(filterBitfield) {
if(filterEnabled) {
filterBitfield = new unsigned char[bitfieldLength];
memcpy(filterBitfield, bitfieldMan.filterBitfield, bitfieldLength);
} else {
@ -612,6 +612,29 @@ void BitfieldMan::removeFilter(uint64_t offset, uint64_t length) {
updateCache();
}
void BitfieldMan::addNotFilter(uint64_t offset, uint64_t length)
{
// TODO1.5 Create ensureFilterBitfield() to initialize this
if(!filterBitfield) {
filterBitfield = new unsigned char[bitfieldLength];
memset(filterBitfield, 0, bitfieldLength);
}
if(length > 0 && blocks > 0) {
size_t startBlock = offset/blockLength;
if(blocks <= startBlock) {
startBlock = blocks;
}
size_t endBlock = (offset+length-1)/blockLength;
for(size_t i = 0; i < startBlock; ++i) {
setFilterBit(i);
}
for(size_t i = endBlock+1; i < blocks; ++i) {
setFilterBit(i);
}
}
updateCache();
}
void BitfieldMan::enableFilter() {
if(!filterBitfield) {
filterBitfield = new unsigned char[bitfieldLength];

View File

@ -248,6 +248,9 @@ public:
void addFilter(uint64_t offset, uint64_t length);
void removeFilter(uint64_t offset, uint64_t length);
// Add filter not in the range of [offset, offset+length) bytes
void addNotFilter(uint64_t offset, uint64_t length);
/**
* Clears filter and disables filter
*/

View File

@ -229,9 +229,12 @@ bool DownloadCommand::executeInternal() {
#else // !ENABLE_MESSAGE_DIGEST
_requestGroup->getSegmentMan()->completeSegment(cuid, segment);
#endif // !ENABLE_MESSAGE_DIGEST
} else {
// If segment is not cacnel here, in the next pipelining
// request, aria2 requests bad range
// [FileEntry->getLastOffset(), FileEntry->getLastOffset())
_requestGroup->getSegmentMan()->cancelSegment(cuid, segment);
}
checkLowestDownloadSpeed();
// this unit is going to download another segment.
return prepareForNextSegment();

View File

@ -80,11 +80,12 @@ bool HttpDownloadCommand::prepareForNextSegment() {
e->commands.push_back(command);
return true;
} else {
if(req->isPipeliningEnabled() ||
uint64_t loff = _fileEntry->gtoloff(_segments.front()->getPositionToWrite());
if((req->isPipeliningEnabled() && loff == _fileEntry->getLength()) ||
(req->isKeepAliveEnabled() &&
((!_transferEncodingDecoder.isNull() &&
_requestGroup->downloadFinished()) ||
static_cast<uint64_t>(_fileEntry->gtoloff(_segments.front()->getPositionToWrite())) == _fileEntry->getLength()))) {
loff == _fileEntry->getLength()))) {
e->poolSocket(req, isProxyDefined(), socket);
}
// The request was sent assuming that server supported pipelining, but
@ -93,11 +94,15 @@ bool HttpDownloadCommand::prepareForNextSegment() {
// of the response with the end byte of segment.
// If it is the same, HTTP negotiation is necessary for the next request.
if(!req->isPipeliningEnabled() && req->isPipeliningHint() &&
!_segments.empty() && !downloadFinished) {
!downloadFinished) {
const SharedHandle<Segment>& segment = _segments.front();
if(static_cast<uint64_t>(segment->getPosition())+segment->getLength() ==
static_cast<uint64_t>(_httpResponse->getHttpHeader()->
getRange()->getEndByte()+1)) {
off_t lastOffset =_fileEntry->gtoloff
(std::min(static_cast<off_t>(segment->getPosition()+segment->getLength()),
_fileEntry->getLastOffset()));
if(lastOffset ==
_httpResponse->getHttpHeader()->getRange()->getEndByte()+1) {
return prepareForRetry(0);
}
}

View File

@ -83,7 +83,7 @@ off_t HttpRequest::getEndByte() const
} else {
if(request->isPipeliningEnabled()) {
off_t endByte = _fileEntry->gtoloff(segment->getPosition()+segment->getLength()-1);
return std::min(endByte, _fileEntry->getLastOffset()-1);
return std::min(endByte, static_cast<off_t>(_fileEntry->getLength()-1));
} else {
return 0;
}

View File

@ -170,6 +170,36 @@ SegmentHandle SegmentMan::getSegment(cuid_t cuid) {
return checkoutSegment(cuid, piece);
}
void SegmentMan::getSegment(std::deque<SharedHandle<Segment> >& segments,
cuid_t cuid,
const SharedHandle<FileEntry>& fileEntry,
size_t maxSegments)
{
BitfieldMan filter(_ignoreBitfield);
filter.enableFilter();
filter.addNotFilter(fileEntry->getOffset(), fileEntry->getLength());
std::deque<SharedHandle<Segment> > pending;
while(segments.size() < maxSegments) {
SharedHandle<Segment> segment =
checkoutSegment(cuid,
_pieceStorage->getSparseMissingUnusedPiece
(filter.getFilterBitfield(), filter.getBitfieldLength()));
if(segment.isNull()) {
break;
}
if(segment->getPositionToWrite() < fileEntry->getOffset() ||
fileEntry->getLastOffset() <= segment->getPositionToWrite()) {
pending.push_back(segment);
} else {
segments.push_back(segment);
}
}
for(std::deque<SharedHandle<Segment> >::const_iterator i = pending.begin();
i != pending.end(); ++i) {
cancelSegment(cuid, *i);
}
}
SegmentHandle SegmentMan::getSegment(cuid_t cuid, size_t index) {
if(_downloadContext->getNumPieces() <= index) {
return SharedHandle<Segment>();
@ -177,16 +207,19 @@ SegmentHandle SegmentMan::getSegment(cuid_t cuid, size_t index) {
return checkoutSegment(cuid, _pieceStorage->getMissingPiece(index));
}
void SegmentMan::cancelSegment(const SharedHandle<Segment>& segment)
{
_pieceStorage->cancelPiece(segment->getPiece());
_segmentWrittenLengthMemo[segment->getIndex()] = segment->getWrittenLength();
logger->debug("Memorized segment index=%u, writtenLength=%u",
segment->getIndex(), segment->getWrittenLength());
}
void SegmentMan::cancelSegment(cuid_t cuid) {
for(SegmentEntries::iterator itr = usedSegmentEntries.begin();
itr != usedSegmentEntries.end();) {
if((*itr)->cuid == cuid) {
_pieceStorage->cancelPiece((*itr)->segment->getPiece());
_segmentWrittenLengthMemo[(*itr)->segment->getIndex()] =
(*itr)->segment->getWrittenLength();
logger->debug("Memorized segment index=%u, writtenLength=%u",
(*itr)->segment->getIndex(),
(*itr)->segment->getWrittenLength());
cancelSegment((*itr)->segment);
itr = usedSegmentEntries.erase(itr);
} else {
++itr;
@ -194,6 +227,21 @@ void SegmentMan::cancelSegment(cuid_t cuid) {
}
}
void SegmentMan::cancelSegment
(cuid_t cuid, const SharedHandle<Segment>& segment)
{
for(SegmentEntries::iterator itr = usedSegmentEntries.begin();
itr != usedSegmentEntries.end();) {
if((*itr)->cuid == cuid && (*itr)->segment == segment) {
cancelSegment((*itr)->segment);
itr = usedSegmentEntries.erase(itr);
break;
} else {
++itr;
}
}
}
class FindSegmentEntry {
private:
SegmentHandle _segment;

View File

@ -101,6 +101,8 @@ private:
SharedHandle<Segment> checkoutSegment(cuid_t cuid,
const SharedHandle<Piece>& piece);
void cancelSegment(const SharedHandle<Segment>& segment);
public:
SegmentMan(const Option* option,
const SharedHandle<DownloadContext>& downloadContext,
@ -136,6 +138,13 @@ public:
SharedHandle<Segment> getSegment(cuid_t cuid);
// Checkouts segments in the range of fileEntry and push back to
// segments until segments.size() < maxSegments holds false
void getSegment(std::deque<SharedHandle<Segment> >& segments,
cuid_t cuid,
const SharedHandle<FileEntry>& fileEntry,
size_t maxSegments);
/**
* Returns a segment whose index is index.
* If it has already assigned
@ -152,6 +161,9 @@ public:
* uses.
*/
void cancelSegment(cuid_t cuid);
void cancelSegment(cuid_t cuid, const SharedHandle<Segment>& segment);
/**
* Tells SegmentMan that the segment has been downloaded successfully.
*/

View File

@ -18,6 +18,9 @@ class BitfieldManTest:public CppUnit::TestFixture {
CPPUNIT_TEST(testIsAllBitSet);
CPPUNIT_TEST(testFilter);
CPPUNIT_TEST(testAddFilter_zeroLength);
CPPUNIT_TEST(testAddNotFilter);
CPPUNIT_TEST(testAddNotFilter_zeroLength);
CPPUNIT_TEST(testAddNotFilter_overflow);
CPPUNIT_TEST(testGetMissingIndex);
CPPUNIT_TEST(testGetSparceMissingUnusedIndex);
CPPUNIT_TEST(testGetSparceMissingUnusedIndex_setBit);
@ -63,6 +66,9 @@ public:
void testIsAllBitSet();
void testFilter();
void testAddFilter_zeroLength();
void testAddNotFilter();
void testAddNotFilter_zeroLength();
void testAddNotFilter_overflow();
void testGetSparceMissingUnusedIndex();
void testGetSparceMissingUnusedIndex_setBit();
void testIsBitSetOffsetRange();
@ -388,6 +394,35 @@ void BitfieldManTest::testAddFilter_zeroLength()
CPPUNIT_ASSERT(bits.isFilteredAllBitSet());
}
void BitfieldManTest::testAddNotFilter() {
BitfieldMan btman(2, 32);
btman.addNotFilter(3, 6);
CPPUNIT_ASSERT(bitfield::test(btman.getFilterBitfield(), 16, 0));
for(size_t i = 1; i < 5; ++i) {
CPPUNIT_ASSERT(!bitfield::test(btman.getFilterBitfield(), 16, i));
}
for(size_t i = 5; i < 16; ++i) {
CPPUNIT_ASSERT(bitfield::test(btman.getFilterBitfield(), 16, i));
}
}
void BitfieldManTest::testAddNotFilter_zeroLength() {
BitfieldMan btman(2, 6);
btman.addNotFilter(2, 0);
CPPUNIT_ASSERT(!bitfield::test(btman.getFilterBitfield(), 3, 0));
CPPUNIT_ASSERT(!bitfield::test(btman.getFilterBitfield(), 3, 1));
CPPUNIT_ASSERT(!bitfield::test(btman.getFilterBitfield(), 3, 2));
}
void BitfieldManTest::testAddNotFilter_overflow() {
BitfieldMan btman(2, 6);
btman.addNotFilter(6, 100);
CPPUNIT_ASSERT(bitfield::test(btman.getFilterBitfield(), 3, 0));
CPPUNIT_ASSERT(bitfield::test(btman.getFilterBitfield(), 3, 1));
CPPUNIT_ASSERT(bitfield::test(btman.getFilterBitfield(), 3, 2));
}
void BitfieldManTest::testGetMissingIndex() {
BitfieldMan bt1(1024, 1024*256);
bt1.setRandomizer(fixedNumberRandomizer);

View File

@ -16,6 +16,7 @@ class SegmentManTest:public CppUnit::TestFixture {
CPPUNIT_TEST_SUITE(SegmentManTest);
CPPUNIT_TEST(testNullBitfield);
CPPUNIT_TEST(testCompleteSegment);
CPPUNIT_TEST(testGetSegment_sameFileEntry);
CPPUNIT_TEST_SUITE_END();
private:
@ -26,7 +27,7 @@ public:
void testNullBitfield();
void testCompleteSegment();
void testGetPeerStat();
void testGetSegment_segmentForward();
void testGetSegment_sameFileEntry();
};
@ -81,4 +82,46 @@ void SegmentManTest::testCompleteSegment()
CPPUNIT_ASSERT_EQUAL((size_t)2, segments[1]->getIndex());
}
void SegmentManTest::testGetSegment_sameFileEntry()
{
Option op;
SharedHandle<DownloadContext> dctx(new DownloadContext());
dctx->setPieceLength(2);
SharedHandle<FileEntry> fileEntries[] = {
SharedHandle<FileEntry>(new FileEntry("file1", 3, 0)),
SharedHandle<FileEntry>(new FileEntry("file2", 6, 3)),
SharedHandle<FileEntry>(new FileEntry("file3", 1, 9))
};
dctx->setFileEntries(&fileEntries[0], &fileEntries[3]);
SharedHandle<DefaultPieceStorage> ps(new DefaultPieceStorage(dctx, &op));
SegmentMan segman(&op, dctx, ps);
std::deque<SharedHandle<Segment> > segments;
segman.getSegment(segments, 1, fileEntries[1], 4);
// See 3 segments are returned, not 4 because the part of file1 is
// not filled in segment#1
CPPUNIT_ASSERT_EQUAL((size_t)3, segments.size());
SharedHandle<Segment> segmentNo1 = segman.getSegment(2, 1);
// Fill the part of file1 in segment#1
segmentNo1->updateWrittenLength(1);
segman.cancelSegment(2);
segman.cancelSegment(1);
segments.clear();
segman.getSegment(segments, 1, fileEntries[1], 4);
CPPUNIT_ASSERT_EQUAL((size_t)4, segments.size());
segman.cancelSegment(1);
SharedHandle<Segment> segmentNo4 = segman.getSegment(1, 4);
// Fill the part of file2 in segment#4
segmentNo4->updateWrittenLength(1);
segman.cancelSegment(1);
segments.clear();
segman.getSegment(segments, 1, fileEntries[1], 4);
// segment#4 is not returned because the part of file2 is filled.
CPPUNIT_ASSERT_EQUAL((size_t)3, segments.size());
}
} // namespace aria2