diff --git a/ChangeLog b/ChangeLog index 9e59f582..5ec7eb75 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,3 +1,18 @@ +2009-07-01 Tatsuhiro Tsujikawa + + HTTP pipelining is now working. Fixed Segmention fault in + BitfieldMan's copy ctor. + * src/AbstractCommand.cc + * src/BitfieldMan.cc + * src/BitfieldMan.h + * src/DownloadCommand.cc + * src/HttpDownloadCommand.cc + * src/HttpRequest.cc + * src/SegmentMan.cc + * src/SegmentMan.h + * test/BitfieldManTest.cc + * test/SegmentManTest.cc + 2009-06-30 Tatsuhiro Tsujikawa Removed ServerHost. Same functionality is implemented using diff --git a/src/AbstractCommand.cc b/src/AbstractCommand.cc index 54e96e67..992d2100 100644 --- a/src/AbstractCommand.cc +++ b/src/AbstractCommand.cc @@ -156,18 +156,28 @@ bool AbstractCommand::execute() { if(!_requestGroup->getPieceStorage().isNull()) { _segments.clear(); _requestGroup->getSegmentMan()->getInFlightSegment(_segments, cuid); - size_t maxSegments = req.isNull()?1:req->getMaxPipelinedRequest(); - while(_segments.size() < maxSegments) { - SegmentHandle segment = _requestGroup->getSegmentMan()->getSegment(cuid); - if(segment.isNull()) { - break; + if(req.isNull() || req->getMaxPipelinedRequest() == 1) { + if(_segments.empty()) { + SharedHandle segment = + _requestGroup->getSegmentMan()->getSegment(cuid); + if(!segment.isNull()) { + _segments.push_back(segment); + } + } + if(_segments.empty()) { + // TODO socket could be pooled here if pipelining is enabled... + logger->info(MSG_NO_SEGMENT_AVAILABLE, cuid); + return true; + } + } else { + size_t maxSegments = req->getMaxPipelinedRequest(); + if(_segments.size() < maxSegments) { + _requestGroup->getSegmentMan()->getSegment + (_segments, cuid, _fileEntry, maxSegments); + } + if(_segments.empty()) { + return prepareForRetry(0); } - _segments.push_back(segment); - } - if(_segments.empty()) { - // TODO socket could be pooled here if pipelining is enabled... - logger->info(MSG_NO_SEGMENT_AVAILABLE, cuid); - return true; } } return executeInternal(); @@ -266,11 +276,11 @@ bool AbstractCommand::prepareForRetry(time_t wait) { } if(!req.isNull()) { _fileEntry->poolRequest(req); - } - if(!_segments.empty()) { logger->debug("CUID#%d - Pooling request URI=%s", cuid, req->getUrl().c_str()); - _requestGroup->getSegmentMan()->recognizeSegmentFor(_fileEntry); + if(!_requestGroup->getSegmentMan().isNull()) { + _requestGroup->getSegmentMan()->recognizeSegmentFor(_fileEntry); + } } Command* command = new CreateRequestCommand(cuid, _requestGroup, e); diff --git a/src/BitfieldMan.cc b/src/BitfieldMan.cc index 2569c691..26e0fc2e 100644 --- a/src/BitfieldMan.cc +++ b/src/BitfieldMan.cc @@ -96,7 +96,7 @@ BitfieldMan::BitfieldMan(const BitfieldMan& bitfieldMan) memcpy(bitfield, bitfieldMan.bitfield, bitfieldLength); memcpy(useBitfield, bitfieldMan.useBitfield, bitfieldLength); filterEnabled = bitfieldMan.filterEnabled; - if(filterBitfield) { + if(filterEnabled) { filterBitfield = new unsigned char[bitfieldLength]; memcpy(filterBitfield, bitfieldMan.filterBitfield, bitfieldLength); } else { @@ -612,6 +612,29 @@ void BitfieldMan::removeFilter(uint64_t offset, uint64_t length) { updateCache(); } +void BitfieldMan::addNotFilter(uint64_t offset, uint64_t length) +{ + // TODO1.5 Create ensureFilterBitfield() to initialize this + if(!filterBitfield) { + filterBitfield = new unsigned char[bitfieldLength]; + memset(filterBitfield, 0, bitfieldLength); + } + if(length > 0 && blocks > 0) { + size_t startBlock = offset/blockLength; + if(blocks <= startBlock) { + startBlock = blocks; + } + size_t endBlock = (offset+length-1)/blockLength; + for(size_t i = 0; i < startBlock; ++i) { + setFilterBit(i); + } + for(size_t i = endBlock+1; i < blocks; ++i) { + setFilterBit(i); + } + } + updateCache(); +} + void BitfieldMan::enableFilter() { if(!filterBitfield) { filterBitfield = new unsigned char[bitfieldLength]; diff --git a/src/BitfieldMan.h b/src/BitfieldMan.h index 909d82c7..31db6383 100644 --- a/src/BitfieldMan.h +++ b/src/BitfieldMan.h @@ -248,6 +248,9 @@ public: void addFilter(uint64_t offset, uint64_t length); void removeFilter(uint64_t offset, uint64_t length); + // Add filter not in the range of [offset, offset+length) bytes + void addNotFilter(uint64_t offset, uint64_t length); + /** * Clears filter and disables filter */ diff --git a/src/DownloadCommand.cc b/src/DownloadCommand.cc index 8033a224..8bc3dbc7 100644 --- a/src/DownloadCommand.cc +++ b/src/DownloadCommand.cc @@ -229,9 +229,12 @@ bool DownloadCommand::executeInternal() { #else // !ENABLE_MESSAGE_DIGEST _requestGroup->getSegmentMan()->completeSegment(cuid, segment); #endif // !ENABLE_MESSAGE_DIGEST + } else { + // If segment is not cacnel here, in the next pipelining + // request, aria2 requests bad range + // [FileEntry->getLastOffset(), FileEntry->getLastOffset()) + _requestGroup->getSegmentMan()->cancelSegment(cuid, segment); } - - checkLowestDownloadSpeed(); // this unit is going to download another segment. return prepareForNextSegment(); diff --git a/src/HttpDownloadCommand.cc b/src/HttpDownloadCommand.cc index 98ada9eb..048fa8b4 100644 --- a/src/HttpDownloadCommand.cc +++ b/src/HttpDownloadCommand.cc @@ -80,11 +80,12 @@ bool HttpDownloadCommand::prepareForNextSegment() { e->commands.push_back(command); return true; } else { - if(req->isPipeliningEnabled() || + uint64_t loff = _fileEntry->gtoloff(_segments.front()->getPositionToWrite()); + if((req->isPipeliningEnabled() && loff == _fileEntry->getLength()) || (req->isKeepAliveEnabled() && ((!_transferEncodingDecoder.isNull() && _requestGroup->downloadFinished()) || - static_cast(_fileEntry->gtoloff(_segments.front()->getPositionToWrite())) == _fileEntry->getLength()))) { + loff == _fileEntry->getLength()))) { e->poolSocket(req, isProxyDefined(), socket); } // The request was sent assuming that server supported pipelining, but @@ -93,11 +94,15 @@ bool HttpDownloadCommand::prepareForNextSegment() { // of the response with the end byte of segment. // If it is the same, HTTP negotiation is necessary for the next request. if(!req->isPipeliningEnabled() && req->isPipeliningHint() && - !_segments.empty() && !downloadFinished) { + !downloadFinished) { const SharedHandle& segment = _segments.front(); - if(static_cast(segment->getPosition())+segment->getLength() == - static_cast(_httpResponse->getHttpHeader()-> - getRange()->getEndByte()+1)) { + + off_t lastOffset =_fileEntry->gtoloff + (std::min(static_cast(segment->getPosition()+segment->getLength()), + _fileEntry->getLastOffset())); + + if(lastOffset == + _httpResponse->getHttpHeader()->getRange()->getEndByte()+1) { return prepareForRetry(0); } } diff --git a/src/HttpRequest.cc b/src/HttpRequest.cc index bac87114..340eb3ab 100644 --- a/src/HttpRequest.cc +++ b/src/HttpRequest.cc @@ -83,7 +83,7 @@ off_t HttpRequest::getEndByte() const } else { if(request->isPipeliningEnabled()) { off_t endByte = _fileEntry->gtoloff(segment->getPosition()+segment->getLength()-1); - return std::min(endByte, _fileEntry->getLastOffset()-1); + return std::min(endByte, static_cast(_fileEntry->getLength()-1)); } else { return 0; } diff --git a/src/SegmentMan.cc b/src/SegmentMan.cc index a021accf..a321adab 100644 --- a/src/SegmentMan.cc +++ b/src/SegmentMan.cc @@ -170,6 +170,36 @@ SegmentHandle SegmentMan::getSegment(cuid_t cuid) { return checkoutSegment(cuid, piece); } +void SegmentMan::getSegment(std::deque >& segments, + cuid_t cuid, + const SharedHandle& fileEntry, + size_t maxSegments) +{ + BitfieldMan filter(_ignoreBitfield); + filter.enableFilter(); + filter.addNotFilter(fileEntry->getOffset(), fileEntry->getLength()); + std::deque > pending; + while(segments.size() < maxSegments) { + SharedHandle segment = + checkoutSegment(cuid, + _pieceStorage->getSparseMissingUnusedPiece + (filter.getFilterBitfield(), filter.getBitfieldLength())); + if(segment.isNull()) { + break; + } + if(segment->getPositionToWrite() < fileEntry->getOffset() || + fileEntry->getLastOffset() <= segment->getPositionToWrite()) { + pending.push_back(segment); + } else { + segments.push_back(segment); + } + } + for(std::deque >::const_iterator i = pending.begin(); + i != pending.end(); ++i) { + cancelSegment(cuid, *i); + } +} + SegmentHandle SegmentMan::getSegment(cuid_t cuid, size_t index) { if(_downloadContext->getNumPieces() <= index) { return SharedHandle(); @@ -177,16 +207,19 @@ SegmentHandle SegmentMan::getSegment(cuid_t cuid, size_t index) { return checkoutSegment(cuid, _pieceStorage->getMissingPiece(index)); } +void SegmentMan::cancelSegment(const SharedHandle& segment) +{ + _pieceStorage->cancelPiece(segment->getPiece()); + _segmentWrittenLengthMemo[segment->getIndex()] = segment->getWrittenLength(); + logger->debug("Memorized segment index=%u, writtenLength=%u", + segment->getIndex(), segment->getWrittenLength()); +} + void SegmentMan::cancelSegment(cuid_t cuid) { for(SegmentEntries::iterator itr = usedSegmentEntries.begin(); itr != usedSegmentEntries.end();) { if((*itr)->cuid == cuid) { - _pieceStorage->cancelPiece((*itr)->segment->getPiece()); - _segmentWrittenLengthMemo[(*itr)->segment->getIndex()] = - (*itr)->segment->getWrittenLength(); - logger->debug("Memorized segment index=%u, writtenLength=%u", - (*itr)->segment->getIndex(), - (*itr)->segment->getWrittenLength()); + cancelSegment((*itr)->segment); itr = usedSegmentEntries.erase(itr); } else { ++itr; @@ -194,6 +227,21 @@ void SegmentMan::cancelSegment(cuid_t cuid) { } } +void SegmentMan::cancelSegment +(cuid_t cuid, const SharedHandle& segment) +{ + for(SegmentEntries::iterator itr = usedSegmentEntries.begin(); + itr != usedSegmentEntries.end();) { + if((*itr)->cuid == cuid && (*itr)->segment == segment) { + cancelSegment((*itr)->segment); + itr = usedSegmentEntries.erase(itr); + break; + } else { + ++itr; + } + } +} + class FindSegmentEntry { private: SegmentHandle _segment; diff --git a/src/SegmentMan.h b/src/SegmentMan.h index e4782c25..bd449ffc 100644 --- a/src/SegmentMan.h +++ b/src/SegmentMan.h @@ -101,6 +101,8 @@ private: SharedHandle checkoutSegment(cuid_t cuid, const SharedHandle& piece); + + void cancelSegment(const SharedHandle& segment); public: SegmentMan(const Option* option, const SharedHandle& downloadContext, @@ -136,6 +138,13 @@ public: SharedHandle getSegment(cuid_t cuid); + // Checkouts segments in the range of fileEntry and push back to + // segments until segments.size() < maxSegments holds false + void getSegment(std::deque >& segments, + cuid_t cuid, + const SharedHandle& fileEntry, + size_t maxSegments); + /** * Returns a segment whose index is index. * If it has already assigned @@ -152,6 +161,9 @@ public: * uses. */ void cancelSegment(cuid_t cuid); + + void cancelSegment(cuid_t cuid, const SharedHandle& segment); + /** * Tells SegmentMan that the segment has been downloaded successfully. */ diff --git a/test/BitfieldManTest.cc b/test/BitfieldManTest.cc index f3900f5c..fc22ab4f 100644 --- a/test/BitfieldManTest.cc +++ b/test/BitfieldManTest.cc @@ -18,6 +18,9 @@ class BitfieldManTest:public CppUnit::TestFixture { CPPUNIT_TEST(testIsAllBitSet); CPPUNIT_TEST(testFilter); CPPUNIT_TEST(testAddFilter_zeroLength); + CPPUNIT_TEST(testAddNotFilter); + CPPUNIT_TEST(testAddNotFilter_zeroLength); + CPPUNIT_TEST(testAddNotFilter_overflow); CPPUNIT_TEST(testGetMissingIndex); CPPUNIT_TEST(testGetSparceMissingUnusedIndex); CPPUNIT_TEST(testGetSparceMissingUnusedIndex_setBit); @@ -63,6 +66,9 @@ public: void testIsAllBitSet(); void testFilter(); void testAddFilter_zeroLength(); + void testAddNotFilter(); + void testAddNotFilter_zeroLength(); + void testAddNotFilter_overflow(); void testGetSparceMissingUnusedIndex(); void testGetSparceMissingUnusedIndex_setBit(); void testIsBitSetOffsetRange(); @@ -388,6 +394,35 @@ void BitfieldManTest::testAddFilter_zeroLength() CPPUNIT_ASSERT(bits.isFilteredAllBitSet()); } +void BitfieldManTest::testAddNotFilter() { + BitfieldMan btman(2, 32); + + btman.addNotFilter(3, 6); + CPPUNIT_ASSERT(bitfield::test(btman.getFilterBitfield(), 16, 0)); + for(size_t i = 1; i < 5; ++i) { + CPPUNIT_ASSERT(!bitfield::test(btman.getFilterBitfield(), 16, i)); + } + for(size_t i = 5; i < 16; ++i) { + CPPUNIT_ASSERT(bitfield::test(btman.getFilterBitfield(), 16, i)); + } +} + +void BitfieldManTest::testAddNotFilter_zeroLength() { + BitfieldMan btman(2, 6); + btman.addNotFilter(2, 0); + CPPUNIT_ASSERT(!bitfield::test(btman.getFilterBitfield(), 3, 0)); + CPPUNIT_ASSERT(!bitfield::test(btman.getFilterBitfield(), 3, 1)); + CPPUNIT_ASSERT(!bitfield::test(btman.getFilterBitfield(), 3, 2)); +} + +void BitfieldManTest::testAddNotFilter_overflow() { + BitfieldMan btman(2, 6); + btman.addNotFilter(6, 100); + CPPUNIT_ASSERT(bitfield::test(btman.getFilterBitfield(), 3, 0)); + CPPUNIT_ASSERT(bitfield::test(btman.getFilterBitfield(), 3, 1)); + CPPUNIT_ASSERT(bitfield::test(btman.getFilterBitfield(), 3, 2)); +} + void BitfieldManTest::testGetMissingIndex() { BitfieldMan bt1(1024, 1024*256); bt1.setRandomizer(fixedNumberRandomizer); diff --git a/test/SegmentManTest.cc b/test/SegmentManTest.cc index 082c4842..aac84893 100644 --- a/test/SegmentManTest.cc +++ b/test/SegmentManTest.cc @@ -16,6 +16,7 @@ class SegmentManTest:public CppUnit::TestFixture { CPPUNIT_TEST_SUITE(SegmentManTest); CPPUNIT_TEST(testNullBitfield); CPPUNIT_TEST(testCompleteSegment); + CPPUNIT_TEST(testGetSegment_sameFileEntry); CPPUNIT_TEST_SUITE_END(); private: @@ -26,7 +27,7 @@ public: void testNullBitfield(); void testCompleteSegment(); void testGetPeerStat(); - void testGetSegment_segmentForward(); + void testGetSegment_sameFileEntry(); }; @@ -81,4 +82,46 @@ void SegmentManTest::testCompleteSegment() CPPUNIT_ASSERT_EQUAL((size_t)2, segments[1]->getIndex()); } +void SegmentManTest::testGetSegment_sameFileEntry() +{ + Option op; + SharedHandle dctx(new DownloadContext()); + dctx->setPieceLength(2); + SharedHandle fileEntries[] = { + SharedHandle(new FileEntry("file1", 3, 0)), + SharedHandle(new FileEntry("file2", 6, 3)), + SharedHandle(new FileEntry("file3", 1, 9)) + }; + dctx->setFileEntries(&fileEntries[0], &fileEntries[3]); + SharedHandle ps(new DefaultPieceStorage(dctx, &op)); + SegmentMan segman(&op, dctx, ps); + + std::deque > segments; + segman.getSegment(segments, 1, fileEntries[1], 4); + // See 3 segments are returned, not 4 because the part of file1 is + // not filled in segment#1 + CPPUNIT_ASSERT_EQUAL((size_t)3, segments.size()); + + SharedHandle segmentNo1 = segman.getSegment(2, 1); + // Fill the part of file1 in segment#1 + segmentNo1->updateWrittenLength(1); + segman.cancelSegment(2); + + segman.cancelSegment(1); + segments.clear(); + segman.getSegment(segments, 1, fileEntries[1], 4); + CPPUNIT_ASSERT_EQUAL((size_t)4, segments.size()); + + segman.cancelSegment(1); + SharedHandle segmentNo4 = segman.getSegment(1, 4); + // Fill the part of file2 in segment#4 + segmentNo4->updateWrittenLength(1); + segman.cancelSegment(1); + + segments.clear(); + segman.getSegment(segments, 1, fileEntries[1], 4); + // segment#4 is not returned because the part of file2 is filled. + CPPUNIT_ASSERT_EQUAL((size_t)3, segments.size()); +} + } // namespace aria2