From 8a9fa9a692e15396285cd0a4d993eb7882ae6898 Mon Sep 17 00:00:00 2001 From: Tatsuhiro Tsujikawa Date: Thu, 10 Mar 2011 00:56:37 +0900 Subject: [PATCH] Added JSON-RPC 2.0 batch call. --- src/HttpServerBodyCommand.cc | 176 +++++++++++++++++++++++++++-------- src/HttpServerBodyCommand.h | 16 ++++ src/XmlRpcResponse.cc | 40 ++++++++ src/XmlRpcResponse.h | 8 ++ 4 files changed, 199 insertions(+), 41 deletions(-) diff --git a/src/HttpServerBodyCommand.cc b/src/HttpServerBodyCommand.cc index 3d39e111..c60502b1 100644 --- a/src/HttpServerBodyCommand.cc +++ b/src/HttpServerBodyCommand.cc @@ -56,6 +56,7 @@ #include "SocketRecvBuffer.h" #include "json.h" #include "DlAbortEx.h" +#include "message.h" namespace aria2 { @@ -82,6 +83,102 @@ HttpServerBodyCommand::~HttpServerBodyCommand() e_->deleteSocketForReadCheck(socket_, this); } +namespace { +xmlrpc::XmlRpcResponse +createJsonRpcErrorResponse +(int code, + const std::string& msg, + const SharedHandle& id) +{ + SharedHandle params = Dict::g(); + params->put("code", Integer::g(code)); + params->put("message", msg); + xmlrpc::XmlRpcResponse res(code, params, id); + return res; +} +} // namespace + +void HttpServerBodyCommand::sendJsonRpcResponse +(const xmlrpc::XmlRpcResponse& res, + const std::string& callback) +{ + bool gzip = httpServer_->supportsGZip(); + std::string responseData = res.toJson(callback, gzip); + if(res.code == 0) { + httpServer_->feedResponse(responseData, "application/json-rpc"); + } else { + httpServer_->disableKeepAlive(); + std::string httpCode; + switch(res.code) { + case -32600: + httpCode = "400 Bad Request"; + break; + case -32601: + httpCode = "404 Not Found"; + break; + default: + httpCode = "500 Internal Server Error"; + }; + httpServer_->feedResponse(httpCode, A2STR::NIL, + responseData, "application/json-rpc"); + } + addHttpServerResponseCommand(); +} + +void HttpServerBodyCommand::sendJsonRpcBatchResponse +(const std::vector& results, + const std::string& callback) +{ + bool gzip = httpServer_->supportsGZip(); + std::string responseData = xmlrpc::toJsonBatch(results, callback, gzip); + httpServer_->feedResponse(responseData, "application/json-rpc"); + addHttpServerResponseCommand(); +} + +void HttpServerBodyCommand::addHttpServerResponseCommand() +{ + Command* command = + new HttpServerResponseCommand(getCuid(), httpServer_, e_, socket_); + e_->addCommand(command); + e_->setNoWait(true); +} + +xmlrpc::XmlRpcResponse +HttpServerBodyCommand::processJsonRpcRequest(const Dict* jsondict) +{ + + SharedHandle id = jsondict->get("id"); + if(!id) { + return createJsonRpcErrorResponse(-32600, "Invalid Request.", + SharedHandle()); + } + const String* methodName = asString(jsondict->get("method")); + if(!methodName) { + return createJsonRpcErrorResponse(-32600, "Invalid Request.", id); + } + SharedHandle params; + const SharedHandle& tempParams = jsondict->get("params"); + if(asList(tempParams)) { + params = static_pointer_cast(tempParams); + } else if(!tempParams) { + params = List::g(); + } else { + // TODO No support for Named params + return createJsonRpcErrorResponse(-32602, "Invalid params.", id); + } + xmlrpc::XmlRpcRequest req(methodName->s(), params, id); + SharedHandle method; + try { + method = xmlrpc::XmlRpcMethodFactory::create(req.methodName); + } catch(RecoverableException& e) { + A2_LOG_INFO_EX(EX_EXCEPTION_CAUGHT, e); + return createJsonRpcErrorResponse(-32601, "Method not found.", id); + } + method->setJsonRpc(true); + xmlrpc::XmlRpcResponse res = method->execute(req, e_); + return res; +} + bool HttpServerBodyCommand::execute() { if(e_->getRequestGroupMan()->downloadFinished() || e_->isHaltRequested()) { @@ -113,54 +210,51 @@ bool HttpServerBodyCommand::execute() bool gzip = httpServer_->supportsGZip(); std::string responseData = res.toXml(gzip); httpServer_->feedResponse(responseData, "text/xml"); - Command* command = - new HttpServerResponseCommand(getCuid(), httpServer_, e_, socket_); - e_->addCommand(command); - e_->setNoWait(true); + addHttpServerResponseCommand(); return true; } else if(reqPath == "/jsonrpc") { // TODO handle query parameter - std::string callback;// = "callback"; + std::string callback; - SharedHandle json = json::decode(httpServer_->getBody()); + SharedHandle json; + try { + json = json::decode(httpServer_->getBody()); + } catch(RecoverableException& e) { + A2_LOG_INFO_EX + (fmt("CUID#%lld - Failed to parse JSON-RPC request", + getCuid()), + e); + xmlrpc::XmlRpcResponse res + (createJsonRpcErrorResponse + (-32700, "Parse error.", SharedHandle())); + sendJsonRpcResponse(res, callback); + return true; + } const Dict* jsondict = asDict(json); - if(!jsondict) { - // TODO code: -32600, Invalid Request - throw DL_ABORT_EX("JSON-RPC Invalid Request"); - } - const String* methodName = asString(jsondict->get("method")); - if(!methodName) { - // TODO Batch request does not have method - throw DL_ABORT_EX("JSON-RPC No Method Found"); - } - SharedHandle params; - const SharedHandle& tempParams = jsondict->get("params"); - if(asList(tempParams)) { - params = static_pointer_cast(tempParams); - } else if(!tempParams) { - params = List::g(); + if(jsondict) { + xmlrpc::XmlRpcResponse res = processJsonRpcRequest(jsondict); + sendJsonRpcResponse(res, callback); } else { - // TODO No support for Named params - throw DL_ABORT_EX("JSON-RPC Named params are not supported"); + const List* jsonlist = asList(json); + if(jsonlist) { + // This is batch call + std::vector results; + for(List::ValueType::const_iterator i = jsonlist->begin(), + eoi = jsonlist->end(); i != eoi; ++i) { + const Dict* jsondict = asDict(*i); + if(jsondict) { + xmlrpc::XmlRpcResponse r = processJsonRpcRequest(jsondict); + results.push_back(r); + } + } + sendJsonRpcBatchResponse(results, callback); + } else { + xmlrpc::XmlRpcResponse res + (createJsonRpcErrorResponse + (-32600, "Invalid Request.", SharedHandle())); + sendJsonRpcResponse(res, callback); + } } - SharedHandle id = jsondict->get("id"); - if(!id) { - // TODO Batch request does not have id - throw DL_ABORT_EX("JSON-RPC NO id found"); - } - xmlrpc::XmlRpcRequest req(methodName->s(), params, id); - - SharedHandle method = - xmlrpc::XmlRpcMethodFactory::create(req.methodName); - method->setJsonRpc(true); - xmlrpc::XmlRpcResponse res = method->execute(req, e_); - bool gzip = httpServer_->supportsGZip(); - std::string responseData = res.toJson(callback, gzip); - httpServer_->feedResponse(responseData, "application/json-rpc"); - Command* command = - new HttpServerResponseCommand(getCuid(), httpServer_, e_, socket_); - e_->addCommand(command); - e_->setNoWait(true); return true; } else { return true; diff --git a/src/HttpServerBodyCommand.h b/src/HttpServerBodyCommand.h index a8215dda..b4503a8c 100644 --- a/src/HttpServerBodyCommand.h +++ b/src/HttpServerBodyCommand.h @@ -38,6 +38,8 @@ #include "Command.h" #include "SharedHandle.h" #include "TimerA2.h" +#include "ValueBase.h" +#include "XmlRpcResponse.h" namespace aria2 { @@ -51,6 +53,20 @@ private: SharedHandle socket_; SharedHandle httpServer_; Timer timeoutTimer_; + void sendJsonRpcErrorResponse + (const std::string& httpStatus, + int code, + const std::string& message, + const SharedHandle& id, + const std::string& callback); + void sendJsonRpcResponse + (const xmlrpc::XmlRpcResponse& res, + const std::string& callback); + void sendJsonRpcBatchResponse + (const std::vector& results, + const std::string& callback); + xmlrpc::XmlRpcResponse processJsonRpcRequest(const Dict* jsondict); + void addHttpServerResponseCommand(); public: HttpServerBodyCommand(cuid_t cuid, const SharedHandle& httpServer, diff --git a/src/XmlRpcResponse.cc b/src/XmlRpcResponse.cc index 107efa01..22cb006f 100644 --- a/src/XmlRpcResponse.cc +++ b/src/XmlRpcResponse.cc @@ -208,6 +208,46 @@ std::string XmlRpcResponse::toJson(const std::string& callback, bool gzip) const } } +namespace { +template +OutputStream& encodeJsonBatchAll +(OutputStream& o, + const std::vector& results, + const std::string& callback) +{ + o << '['; + if(!results.empty()) { + encodeJsonAll(o, results[0].code, results[0].param, results[0].id,callback); + } + for(std::vector::const_iterator i = results.begin()+1, + eoi = results.end(); i != eoi; ++i) { + o << ','; + encodeJsonAll(o, (*i).code, (*i).param, (*i).id, callback); + } + o << ']'; + return o; +} +} // namespace + +std::string toJsonBatch +(const std::vector& results, + const std::string& callback, + bool gzip) +{ + if(gzip) { +#ifdef HAVE_ZLIB + GZipEncoder o; + o.init(); + return encodeJsonBatchAll(o, results, callback).str(); +#else // !HAVE_ZLIB + abort(); +#endif // !HAVE_ZLIB + } else { + std::stringstream o; + return encodeJsonBatchAll(o, results, callback).str(); + } +} + } // namespace xmlrpc } // namespace aria2 diff --git a/src/XmlRpcResponse.h b/src/XmlRpcResponse.h index 8a978a3d..13465bf9 100644 --- a/src/XmlRpcResponse.h +++ b/src/XmlRpcResponse.h @@ -38,6 +38,7 @@ #include "common.h" #include +#include #include "ValueBase.h" @@ -69,8 +70,15 @@ struct XmlRpcResponse { // Encodes RPC response in JSON. If callback is not empty, the // resulting string is JSONP. std::string toJson(const std::string& callback, bool gzip = false) const; + + std::string toJsonBatch(const std::string& callback, bool gzip = false) const; }; +std::string toJsonBatch +(const std::vector& results, + const std::string& callback, + bool gzip); + } // namespace xmlrpc } // namespace aria2