Parse HTTP RPC request on the fly without buffering

pull/25/merge
Tatsuhiro Tsujikawa 2012-07-14 18:59:56 +09:00
parent 5352d76d4f
commit 7100b1b9ca
8 changed files with 517 additions and 39 deletions

View File

@ -50,6 +50,10 @@
#include "SocketRecvBuffer.h"
#include "TimeA2.h"
#include "array_fun.h"
#include "JsonDiskWriter.h"
#ifdef ENABLE_XML_RPC
# include "XmlRpcDiskWriter.h"
#endif // ENABLE_XML_RPC
namespace aria2 {
@ -62,6 +66,9 @@ HttpServer::HttpServer
e_(e),
headerProcessor_(new HttpHeaderProcessor
(HttpHeaderProcessor::SERVER_PARSER)),
lastContentLength_(0),
bodyConsumed_(0),
reqType_(RPC_TYPE_NONE),
keepAlive_(true),
gzip_(false),
acceptsPersistentConnection_(true),
@ -138,8 +145,10 @@ SharedHandle<HttpHeader> HttpServer::receiveRequest()
headerProcessor_->getHeaderString().c_str()));
socketRecvBuffer_->shiftBuffer(headerProcessor_->getLastBytesProcessed());
lastRequestHeader_ = header;
lastBody_.clear();
lastBody_.str("");
bodyConsumed_ = 0;
if(setupResponseRecv() < 0) {
A2_LOG_INFO("Request path is invaild. Ignore the request body.");
}
lastContentLength_ =
lastRequestHeader_->findAsLLInt(HttpHeader::CONTENT_LENGTH);
if(lastContentLength_ < 0) {
@ -182,7 +191,7 @@ SharedHandle<HttpHeader> HttpServer::receiveRequest()
bool HttpServer::receiveBody()
{
if(lastContentLength_ == 0) {
if(lastContentLength_ == bodyConsumed_) {
return true;
}
if(socketRecvBuffer_->bufferEmpty()) {
@ -193,16 +202,13 @@ bool HttpServer::receiveBody()
}
size_t length =
std::min(socketRecvBuffer_->getBufferLength(),
static_cast<size_t>(lastContentLength_-lastBody_.tellg()));
lastBody_.write(reinterpret_cast<const char*>(socketRecvBuffer_->getBuffer()),
length);
static_cast<size_t>(lastContentLength_ - bodyConsumed_));
if(lastBody_) {
lastBody_->writeData(socketRecvBuffer_->getBuffer(), length, 0);
}
socketRecvBuffer_->shiftBuffer(length);
return lastContentLength_ == lastBody_.tellp();
}
std::string HttpServer::getBody() const
{
return lastBody_.str();
bodyConsumed_ += length;
return lastContentLength_ == bodyConsumed_;
}
const std::string& HttpServer::getMethod() const
@ -314,4 +320,73 @@ void HttpServer::setUsernamePassword
password_ = password;
}
int HttpServer::setupResponseRecv()
{
std::string path = createPath();
if(getMethod() == "GET") {
if(path == "/jsonrpc") {
reqType_ = RPC_TYPE_JSONP;
lastBody_.reset();
return 0;
}
} else if(getMethod() == "POST") {
if(path == "/rpc") {
if(reqType_ != RPC_TYPE_XML) {
reqType_ = RPC_TYPE_XML;
lastBody_.reset(new rpc::XmlRpcDiskWriter());
}
return 0;
} else if(path == "/jsonrpc") {
if(reqType_ != RPC_TYPE_JSON) {
reqType_ = RPC_TYPE_JSON;
lastBody_.reset(new json::JsonDiskWriter());
}
return 0;
}
}
reqType_ = RPC_TYPE_NONE;
lastBody_.reset();
return -1;
}
std::string HttpServer::createPath() const
{
std::string reqPath = getRequestPath();
size_t i;
size_t len = reqPath.size();
for(i = 0; i < len; ++i) {
if(reqPath[i] == '#' || reqPath[i] == '?') {
break;
}
}
reqPath = reqPath.substr(0, i);
if(reqPath.empty()) {
reqPath = "/";
}
return reqPath;
}
std::string HttpServer::createQuery() const
{
std::string reqPath = getRequestPath();
size_t i;
size_t len = reqPath.size();
for(i = 0; i < len; ++i) {
if(reqPath[i] == '#' || reqPath[i] == '?') {
break;
}
}
if(i == len || reqPath[i] == '#') {
return "";
} else {
size_t start = i;
for(; i < len; ++i) {
if(reqPath[i] == '#') {
break;
}
}
return reqPath.substr(start, i - start);
}
}
} // namespace aria2

View File

@ -51,7 +51,17 @@ class HttpHeader;
class HttpHeaderProcessor;
class DownloadEngine;
class SocketRecvBuffer;
class DiskWriter;
enum RequestType {
RPC_TYPE_NONE,
RPC_TYPE_XML,
RPC_TYPE_JSON,
RPC_TYPE_JSONP
};
// HTTP server class handling RPC request from the client. It is not
// intended to be a generic HTTP server.
class HttpServer {
private:
SharedHandle<SocketCore> socket_;
@ -61,7 +71,11 @@ private:
SharedHandle<HttpHeaderProcessor> headerProcessor_;
SharedHandle<HttpHeader> lastRequestHeader_;
int64_t lastContentLength_;
std::stringstream lastBody_;
// How many bytes are consumed. The total number of bytes is
// lastContentLength_.
int64_t bodyConsumed_;
RequestType reqType_;
SharedHandle<DiskWriter> lastBody_;
bool keepAlive_;
bool gzip_;
std::string username_;
@ -78,12 +92,25 @@ public:
bool receiveBody();
std::string getBody() const;
const std::string& getMethod() const;
const std::string& getRequestPath() const;
int setupResponseRecv();
std::string createPath() const;
std::string createQuery() const;
const SharedHandle<DiskWriter>& getBody() const
{
return lastBody_;
}
RequestType getRequestType() const
{
return reqType_;
}
void feedResponse(std::string& text, const std::string& contentType);

View File

@ -57,8 +57,11 @@
#include "RpcRequest.h"
#include "RpcResponse.h"
#include "rpc_helper.h"
#include "JsonDiskWriter.h"
#include "ValueBaseJsonParser.h"
#ifdef ENABLE_XML_RPC
# include "XmlRpcRequestParserStateMachine.h"
# include "XmlRpcRequestParserStateMachine.h"
# include "XmlRpcDiskWriter.h"
#endif // ENABLE_XML_RPC
namespace aria2 {
@ -187,17 +190,22 @@ bool HttpServerBodyCommand::execute()
}
// Do something for requestpath and body
if(reqPath == "/rpc") {
switch(httpServer_->getRequestType()) {
case RPC_TYPE_XML: {
#ifdef ENABLE_XML_RPC
std::string body = httpServer_->getBody();
SharedHandle<rpc::XmlRpcDiskWriter> dw =
static_pointer_cast<rpc::XmlRpcDiskWriter>(httpServer_->getBody());
int error;
error = dw->finalize();
rpc::RpcRequest req;
try {
req = rpc::xmlParseMemory(body.c_str(), body.size());
} catch(RecoverableException& e) {
A2_LOG_INFO_EX
if(error == 0) {
req = dw->getResult();
}
dw->reset();
if(error < 0) {
A2_LOG_INFO
(fmt("CUID#%" PRId64 " - Failed to parse XML-RPC request",
getCuid()),
e);
getCuid()));
httpServer_->feedResponse(400);
addHttpServerResponseCommand();
return true;
@ -215,22 +223,33 @@ bool HttpServerBodyCommand::execute()
addHttpServerResponseCommand();
#endif // !ENABLE_XML_RPC
return true;
} else if(reqPath == "/jsonrpc") {
}
case RPC_TYPE_JSON:
case RPC_TYPE_JSONP: {
std::string callback;
SharedHandle<ValueBase> json;
try {
if(httpServer_->getMethod() == "GET") {
json::JsonGetParam param = json::decodeGetParams(query);
callback = param.callback;
json = json::decode(param.request);
} else {
json = json::decode(httpServer_->getBody());
ssize_t error = 0;
if(httpServer_->getRequestType() == RPC_TYPE_JSONP) {
json::JsonGetParam param = json::decodeGetParams(query);
callback = param.callback;
ssize_t error = 0;
json = json::ValueBaseJsonParser().parseFinal
(param.request.c_str(),
param.request.size(),
error);
} else {
SharedHandle<json::JsonDiskWriter> dw =
static_pointer_cast<json::JsonDiskWriter>(httpServer_->getBody());
error = dw->finalize();
if(error == 0) {
json = dw->getResult();
}
} catch(RecoverableException& e) {
A2_LOG_INFO_EX
dw->reset();
}
if(error < 0) {
A2_LOG_INFO
(fmt("CUID#%" PRId64 " - Failed to parse JSON-RPC request",
getCuid()),
e);
getCuid()));
rpc::RpcResponse res
(rpc::createJsonRpcErrorResponse(-32700, "Parse error.",
Null::g()));
@ -250,7 +269,8 @@ bool HttpServerBodyCommand::execute()
eoi = jsonlist->end(); i != eoi; ++i) {
const Dict* jsondict = downcast<Dict>(*i);
if(jsondict) {
rpc::RpcResponse r = rpc::processJsonRpcRequest(jsondict, e_);
rpc::RpcResponse r =
rpc::processJsonRpcRequest(jsondict, e_);
results.push_back(r);
}
}
@ -263,7 +283,8 @@ bool HttpServerBodyCommand::execute()
}
}
return true;
} else {
}
default:
httpServer_->feedResponse(404);
addHttpServerResponseCommand();
return true;

78
src/JsonDiskWriter.cc Normal file
View File

@ -0,0 +1,78 @@
/* <!-- copyright */
/*
* aria2 - The high speed download utility
*
* Copyright (C) 2012 Tatsuhiro Tsujikawa
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
*
* In addition, as a special exception, the copyright holders give
* permission to link the code of portions of this program with the
* OpenSSL library under certain conditions as described in each
* individual source file, and distribute linked combinations
* including the two.
* You must obey the GNU General Public License in all respects
* for all of the code used other than OpenSSL. If you modify
* file(s) with this exception, you may extend this exception to your
* version of the file(s), but you are not obligated to do so. If you
* do not wish to do so, delete this exception statement from your
* version. If you delete this exception statement from all source
* files in the program, then also delete it here.
*/
/* copyright --> */
#include "JsonDiskWriter.h"
#include "ValueBase.h"
namespace aria2 {
namespace json {
JsonDiskWriter::JsonDiskWriter()
: parser_(&psm_)
{}
JsonDiskWriter::~JsonDiskWriter()
{}
void JsonDiskWriter::initAndOpenFile(off_t totalLength)
{
parser_.reset();
}
void JsonDiskWriter::writeData(const unsigned char* data, size_t len,
off_t offset)
{
// Return value is ignored here but handled in finalize()
parser_.parseUpdate(reinterpret_cast<const char*>(data), len);
}
int JsonDiskWriter::finalize()
{
return parser_.parseFinal(0, 0);
}
SharedHandle<ValueBase> JsonDiskWriter::getResult() const
{
return psm_.getResult();
}
void JsonDiskWriter::reset()
{
parser_.reset();
}
} // namespace json
} // namespace aria2

94
src/JsonDiskWriter.h Normal file
View File

@ -0,0 +1,94 @@
/* <!-- copyright */
/*
* aria2 - The high speed download utility
*
* Copyright (C) 2012 Tatsuhiro Tsujikawa
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
*
* In addition, as a special exception, the copyright holders give
* permission to link the code of portions of this program with the
* OpenSSL library under certain conditions as described in each
* individual source file, and distribute linked combinations
* including the two.
* You must obey the GNU General Public License in all respects
* for all of the code used other than OpenSSL. If you modify
* file(s) with this exception, you may extend this exception to your
* version of the file(s), but you are not obligated to do so. If you
* do not wish to do so, delete this exception statement from your
* version. If you delete this exception statement from all source
* files in the program, then also delete it here.
*/
/* copyright --> */
#ifndef D_JSON_DISK_WRITER_H
#define D_JSON_DISK_WRITER_H
#include "DiskWriter.h"
#include "ValueBaseStructParserStateMachine.h"
#include "JsonParser.h"
namespace aria2 {
namespace json {
// DiskWriter backed with ValueBaseJsonParser. The written bytes are
// consumed by ValueBaseJsonParser. It is only capable of sequential
// write so offset argument in write() will be ignored. It also does
// not offer read().
class JsonDiskWriter : public DiskWriter {
public:
JsonDiskWriter();
virtual ~JsonDiskWriter();
virtual void initAndOpenFile(off_t totalLength = 0);
virtual void openFile(off_t totalLength = 0)
{
initAndOpenFile(totalLength);
}
virtual void closeFile() {}
virtual void openExistingFile(off_t totalLength = 0)
{
initAndOpenFile(totalLength);
}
virtual off_t size()
{
return 0;
}
virtual void writeData(const unsigned char* data, size_t len, off_t offset);
virtual ssize_t readData(unsigned char* data, size_t len, off_t offset)
{
return 0;
}
int finalize();
SharedHandle<ValueBase> getResult() const;
void reset();
private:
ValueBaseStructParserStateMachine psm_;
JsonParser parser_;
};
} // namespace json
} // namespace aria2
#endif // D_JSON_DISK_WRITER_H

View File

@ -205,6 +205,7 @@ SRCS = Socket.h\
ValueBaseStructParserState.h\
ValueBaseStructParserStateImpl.cc ValueBaseStructParserStateImpl.h\
ValueBaseStructParserStateMachine.cc ValueBaseStructParserStateMachine.h\
JsonDiskWriter.cc JsonDiskWriter.h\
HttpServerBodyCommand.cc HttpServerBodyCommand.h\
RpcRequest.cc RpcRequest.h\
RpcMethod.cc RpcMethod.h\
@ -268,7 +269,8 @@ if ENABLE_XML_RPC
SRCS += XmlRpcRequestParserController.cc XmlRpcRequestParserController.h\
XmlRpcRequestParserStateMachine.cc XmlRpcRequestParserStateMachine.h\
XmlRpcRequestParserState.h\
XmlRpcRequestParserStateImpl.cc XmlRpcRequestParserStateImpl.h
XmlRpcRequestParserStateImpl.cc XmlRpcRequestParserStateImpl.h\
XmlRpcDiskWriter.cc XmlRpcDiskWriter.h
endif # ENABLE_XML_RPC

86
src/XmlRpcDiskWriter.cc Normal file
View File

@ -0,0 +1,86 @@
/* <!-- copyright */
/*
* aria2 - The high speed download utility
*
* Copyright (C) 2012 Tatsuhiro Tsujikawa
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
*
* In addition, as a special exception, the copyright holders give
* permission to link the code of portions of this program with the
* OpenSSL library under certain conditions as described in each
* individual source file, and distribute linked combinations
* including the two.
* You must obey the GNU General Public License in all respects
* for all of the code used other than OpenSSL. If you modify
* file(s) with this exception, you may extend this exception to your
* version of the file(s), but you are not obligated to do so. If you
* do not wish to do so, delete this exception statement from your
* version. If you delete this exception statement from all source
* files in the program, then also delete it here.
*/
/* copyright --> */
#include "XmlRpcDiskWriter.h"
#include "DlAbortEx.h"
#include "message.h"
#include "ValueBase.h"
namespace aria2 {
namespace rpc {
XmlRpcDiskWriter::XmlRpcDiskWriter()
: parser_(&psm_)
{}
XmlRpcDiskWriter::~XmlRpcDiskWriter()
{}
void XmlRpcDiskWriter::initAndOpenFile(off_t totalLength)
{
parser_.reset();
}
void XmlRpcDiskWriter::writeData(const unsigned char* data, size_t len,
off_t offset)
{
// Return value is ignored here but handled in finalize()
parser_.parseUpdate(reinterpret_cast<const char*>(data), len);
}
int XmlRpcDiskWriter::finalize()
{
return parser_.parseFinal(0, 0);
}
RpcRequest XmlRpcDiskWriter::getResult() const
{
SharedHandle<List> params;
if(downcast<List>(psm_.getCurrentFrameValue())) {
params = static_pointer_cast<List>(psm_.getCurrentFrameValue());
} else {
params = List::g();
}
return RpcRequest(psm_.getMethodName(), params);
}
int XmlRpcDiskWriter::reset()
{
return parser_.reset();
}
} // namespace rpc
} // namespace aria2

95
src/XmlRpcDiskWriter.h Normal file
View File

@ -0,0 +1,95 @@
/* <!-- copyright */
/*
* aria2 - The high speed download utility
*
* Copyright (C) 2012 Tatsuhiro Tsujikawa
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
*
* In addition, as a special exception, the copyright holders give
* permission to link the code of portions of this program with the
* OpenSSL library under certain conditions as described in each
* individual source file, and distribute linked combinations
* including the two.
* You must obey the GNU General Public License in all respects
* for all of the code used other than OpenSSL. If you modify
* file(s) with this exception, you may extend this exception to your
* version of the file(s), but you are not obligated to do so. If you
* do not wish to do so, delete this exception statement from your
* version. If you delete this exception statement from all source
* files in the program, then also delete it here.
*/
/* copyright --> */
#ifndef D_XML_RPC_DISK_WRITER_H
#define D_XML_RPC_DISK_WRITER_H
#include "DiskWriter.h"
#include "RpcRequest.h"
#include "XmlRpcRequestParserStateMachine.h"
#include "XmlParser.h"
namespace aria2 {
namespace rpc {
// DiskWriter backed with XML-RPC XmlParser. The written bytes are
// consumed by XML-RPC XmlParser. It is only capable of sequential
// write so offset argument in write() will be ignored. It also does
// not offer read().
class XmlRpcDiskWriter : public DiskWriter {
public:
XmlRpcDiskWriter();
virtual ~XmlRpcDiskWriter();
virtual void initAndOpenFile(off_t totalLength = 0);
virtual void openFile(off_t totalLength = 0)
{
initAndOpenFile(totalLength);
}
virtual void closeFile() {}
virtual void openExistingFile(off_t totalLength = 0)
{
initAndOpenFile(totalLength);
}
virtual off_t size()
{
return 0;
}
virtual void writeData(const unsigned char* data, size_t len, off_t offset);
virtual ssize_t readData(unsigned char* data, size_t len, off_t offset)
{
return 0;
}
int finalize();
RpcRequest getResult() const;
int reset();
private:
XmlRpcRequestParserStateMachine psm_;
xml::XmlParser parser_;
};
} // namespace rpc
} // namespace aria2
#endif // D_XML_RPC_DISK_WRITER_H