Refactor Notifier interface to accept DownloadEventListener interface

WebSocketSessionMan now implements DownloadEventListener and is added
to Notifier. It becomes member variable of DownloadEngine.  The event
constant for download event is defined in aria2.h so that we can add
event callback API later.
pull/89/head
Tatsuhiro Tsujikawa 2013-05-11 11:04:47 +09:00
parent 9f4f888e39
commit 1c571f196a
11 changed files with 153 additions and 70 deletions

View File

@ -494,7 +494,7 @@ void DefaultPieceStorage::completePiece(const SharedHandle<Piece>& piece)
util::executeHookByOptName(downloadContext_->getOwnerRequestGroup(),
option_, PREF_ON_BT_DOWNLOAD_COMPLETE);
SingletonHolder<Notifier>::instance()->
notifyDownloadEvent(Notifier::ON_BT_DOWNLOAD_COMPLETE,
notifyDownloadEvent(EVENT_ON_BT_DOWNLOAD_COMPLETE,
downloadContext_->getOwnerRequestGroup());
}
}

View File

@ -69,6 +69,9 @@
#ifdef ENABLE_BITTORRENT
# include "BtRegistry.h"
#endif // ENABLE_BITTORRENT
#ifdef ENABLE_WEBSOCKET
# include "WebSocketSessionMan.h"
#endif // ENABLE_WEBSOCKET
namespace aria2 {
@ -596,4 +599,12 @@ void DownloadEngine::setAsyncDNSServers(ares_addr_node* asyncDNSServers)
}
#endif // HAVE_ARES_ADDR_NODE
#ifdef ENABLE_WEBSOCKET
void DownloadEngine::setWebSocketSessionMan
(const SharedHandle<rpc::WebSocketSessionMan>& wsman)
{
webSocketSessionMan_ = wsman;
}
#endif // ENABLE_WEBSOCKET
} // namespace aria2

View File

@ -68,6 +68,11 @@ class Command;
#ifdef ENABLE_BITTORRENT
class BtRegistry;
#endif // ENABLE_BITTORRENT
#ifdef ENABLE_WEBSOCKET
namespace rpc {
class WebSocketSessionMan;
} // namespace rpc
#endif // ENABLE_WEBSOCKET
class DownloadEngine {
private:
@ -144,6 +149,10 @@ private:
SharedHandle<AuthConfigFactory> authConfigFactory_;
#ifdef ENABLE_WEBSOCKET
SharedHandle<rpc::WebSocketSessionMan> webSocketSessionMan_;
#endif // ENABLE_WEBSOCKET
/**
* Delegates to StatCalc
*/
@ -349,6 +358,15 @@ public:
return asyncDNSServers_;
}
#endif // HAVE_ARES_ADDR_NODE
#ifdef ENABLE_WEBSOCKET
void setWebSocketSessionMan
(const SharedHandle<rpc::WebSocketSessionMan>& wsman);
const SharedHandle<rpc::WebSocketSessionMan>& getWebSocketSessionMan() const
{
return webSocketSessionMan_;
}
#endif // ENABLE_WEBSOCKET
};
} // namespace aria2

View File

@ -133,11 +133,7 @@ int MultiUrlRequestInfo::prepare()
{
global::globalHaltRequested = 0;
try {
SharedHandle<rpc::WebSocketSessionMan> wsSessionMan;
if(option_->getAsBool(PREF_ENABLE_RPC)) {
wsSessionMan.reset(new rpc::WebSocketSessionMan());
}
SharedHandle<Notifier> notifier(new Notifier(wsSessionMan));
SharedHandle<Notifier> notifier(new Notifier());
SingletonHolder<Notifier>::instance(notifier);
#ifdef ENABLE_SSL
@ -166,6 +162,14 @@ int MultiUrlRequestInfo::prepare()
// Avoid keeping RequestGroups alive longer than necessary
requestGroups_.clear();
if(option_->getAsBool(PREF_ENABLE_RPC)) {
SharedHandle<rpc::WebSocketSessionMan> wsSessionMan
(new rpc::WebSocketSessionMan());
e_->setWebSocketSessionMan(wsSessionMan);
SingletonHolder<Notifier>::instance()->addDownloadEventListener
(wsSessionMan);
}
if(!option_->blank(PREF_LOAD_COOKIES)) {
File cookieFile(option_->get(PREF_LOAD_COOKIES));
if(cookieFile.isFile() &&

View File

@ -35,47 +35,25 @@
#include "Notifier.h"
#include "RequestGroup.h"
#include "LogFactory.h"
#ifdef ENABLE_WEBSOCKET
# include "WebSocketSessionMan.h"
#else // !ENABLE_WEBSOCKET
# include "NullWebSocketSessionMan.h"
#endif // !ENABLE_WEBSOCKET
namespace aria2 {
Notifier::Notifier(const SharedHandle<rpc::WebSocketSessionMan>& wsSessionMan)
: wsSessionMan_(wsSessionMan)
{}
Notifier::Notifier() {}
Notifier::~Notifier() {}
void Notifier::addWebSocketSession
(const SharedHandle<rpc::WebSocketSession>& wsSession)
void Notifier::addDownloadEventListener
(const SharedHandle<DownloadEventListener>& listener)
{
A2_LOG_DEBUG("WebSocket session added.");
wsSessionMan_->addSession(wsSession);
listeners_.push_back(listener);
}
void Notifier::removeWebSocketSession
(const SharedHandle<rpc::WebSocketSession>& wsSession)
{
A2_LOG_DEBUG("WebSocket session removed.");
wsSessionMan_->removeSession(wsSession);
}
const std::string Notifier::ON_DOWNLOAD_START = "aria2.onDownloadStart";
const std::string Notifier::ON_DOWNLOAD_PAUSE = "aria2.onDownloadPause";
const std::string Notifier::ON_DOWNLOAD_STOP = "aria2.onDownloadStop";
const std::string Notifier::ON_DOWNLOAD_COMPLETE = "aria2.onDownloadComplete";
const std::string Notifier::ON_DOWNLOAD_ERROR = "aria2.onDownloadError";
const std::string Notifier::ON_BT_DOWNLOAD_COMPLETE =
"aria2.onBtDownloadComplete";
void Notifier::notifyDownloadEvent
(const std::string& event, const RequestGroup* group)
(DownloadEvent event, const RequestGroup* group)
{
if(wsSessionMan_) {
wsSessionMan_->addNotification(event, group);
for(std::vector<SharedHandle<DownloadEventListener> >::const_iterator i =
listeners_.begin(), eoi = listeners_.end(); i != eoi; ++i) {
(*i)->onEvent(event, group);
}
}

View File

@ -36,45 +36,37 @@
#define D_NOTIFIER_H
#include "common.h"
#include <vector>
#include <aria2/aria2.h>
#include "SharedHandle.h"
namespace aria2 {
class RequestGroup;
class Option;
struct Pref;
namespace rpc {
class WebSocketSessionMan;
class WebSocketSession;
} // namespace rpc
struct DownloadEventListener {
virtual ~DownloadEventListener() {}
virtual void onEvent(DownloadEvent event, const RequestGroup* group) = 0;
};
class Notifier {
public:
// The string constants for download events.
static const std::string ON_DOWNLOAD_START;
static const std::string ON_DOWNLOAD_PAUSE;
static const std::string ON_DOWNLOAD_STOP;
static const std::string ON_DOWNLOAD_COMPLETE;
static const std::string ON_DOWNLOAD_ERROR;
static const std::string ON_BT_DOWNLOAD_COMPLETE;
Notifier(const SharedHandle<rpc::WebSocketSessionMan>& wsSessionMan);
Notifier();
~Notifier();
void addWebSocketSession(const SharedHandle<rpc::WebSocketSession>& wsSes);
void removeWebSocketSession(const SharedHandle<rpc::WebSocketSession>& wsSes);
void addDownloadEventListener
(const SharedHandle<DownloadEventListener>& listener);
// Notifies the download event to all listeners.
void notifyDownloadEvent(const std::string& event, const RequestGroup* group);
void notifyDownloadEvent(DownloadEvent event, const RequestGroup* group);
void notifyDownloadEvent(const std::string& event,
void notifyDownloadEvent(DownloadEvent event,
const SharedHandle<RequestGroup>& group)
{
notifyDownloadEvent(event, group.get());
}
private:
SharedHandle<rpc::WebSocketSessionMan> wsSessionMan_;
std::vector<SharedHandle<DownloadEventListener> > listeners_;
};
} // namespace aria2

View File

@ -211,7 +211,7 @@ bool RequestGroupMan::removeReservedGroup(a2_gid_t gid)
namespace {
void notifyDownloadEvent
(const std::string& event, const SharedHandle<RequestGroup>& group)
(DownloadEvent event, const SharedHandle<RequestGroup>& group)
{
// Check NULL to make unit test easier.
if(SingletonHolder<Notifier>::instance()) {
@ -239,12 +239,12 @@ void executeStopHook
util::executeHookByOptName(group, option, PREF_ON_DOWNLOAD_STOP);
}
if(result == error_code::FINISHED) {
notifyDownloadEvent(Notifier::ON_DOWNLOAD_COMPLETE, group);
notifyDownloadEvent(EVENT_ON_DOWNLOAD_COMPLETE, group);
} else if(result != error_code::IN_PROGRESS &&
result != error_code::REMOVED) {
notifyDownloadEvent(Notifier::ON_DOWNLOAD_ERROR, group);
notifyDownloadEvent(EVENT_ON_DOWNLOAD_ERROR, group);
} else {
notifyDownloadEvent(Notifier::ON_DOWNLOAD_STOP, group);
notifyDownloadEvent(EVENT_ON_DOWNLOAD_STOP, group);
}
}
@ -392,7 +392,7 @@ public:
group->setForceHaltRequested(false);
util::executeHookByOptName(group, e_->getOption(),
PREF_ON_DOWNLOAD_PAUSE);
notifyDownloadEvent(Notifier::ON_DOWNLOAD_PAUSE, group);
notifyDownloadEvent(EVENT_ON_DOWNLOAD_PAUSE, group);
// TODO Should we have to prepend spend uris to remaining uris
// in case PREF_REUSE_URI is disabed?
} else {
@ -508,7 +508,7 @@ void RequestGroupMan::fillRequestGroupFromReserver(DownloadEngine* e)
util::executeHookByOptName(groupToAdd, e->getOption(),
PREF_ON_DOWNLOAD_START);
notifyDownloadEvent(Notifier::ON_DOWNLOAD_START, groupToAdd);
notifyDownloadEvent(EVENT_ON_DOWNLOAD_START, groupToAdd);
}
if(!pending.empty()) {
reservedGroups_.insert(reservedGroups_.begin(), RequestGroupKeyFunc(),
@ -577,7 +577,7 @@ RequestGroupMan::DownloadStat RequestGroupMan::getDownloadStat() const
lastError);
}
enum DownloadStatus {
enum DownloadResultStatus {
A2_STATUS_OK,
A2_STATUS_INPR,
A2_STATUS_RM,
@ -585,7 +585,7 @@ enum DownloadStatus {
};
namespace {
const char* getStatusStr(DownloadStatus status, bool useColor)
const char* getStatusStr(DownloadResultStatus status, bool useColor)
{
// status string is formatted in 4 characters wide.
switch(status) {

View File

@ -42,6 +42,7 @@
#include "fmt.h"
#include "SingletonHolder.h"
#include "Notifier.h"
#include "WebSocketSessionMan.h"
namespace aria2 {
@ -58,7 +59,7 @@ WebSocketInteractionCommand::WebSocketInteractionCommand
writeCheck_(false),
wsSession_(wsSession)
{
SingletonHolder<Notifier>::instance()->addWebSocketSession(wsSession_);
e_->getWebSocketSessionMan()->addSession(wsSession_);
e_->addSocketForReadCheck(socket_, this);
}
@ -68,7 +69,7 @@ WebSocketInteractionCommand::~WebSocketInteractionCommand()
if(writeCheck_) {
e_->deleteSocketForWriteCheck(socket_, this);
}
SingletonHolder<Notifier>::instance()->removeWebSocketSession(wsSession_);
e_->getWebSocketSessionMan()->removeSession(wsSession_);
}
void WebSocketInteractionCommand::updateWriteCheck()

View File

@ -38,6 +38,7 @@
#include "json.h"
#include "util.h"
#include "WebSocketInteractionCommand.h"
#include "LogFactory.h"
namespace aria2 {
@ -50,12 +51,14 @@ WebSocketSessionMan::~WebSocketSessionMan() {}
void WebSocketSessionMan::addSession
(const SharedHandle<WebSocketSession>& wsSession)
{
A2_LOG_DEBUG("WebSocket session added.");
sessions_.insert(wsSession);
}
void WebSocketSessionMan::removeSession
(const SharedHandle<WebSocketSession>& wsSession)
{
A2_LOG_DEBUG("WebSocket session removed.");
sessions_.erase(wsSession);
}
@ -78,6 +81,48 @@ void WebSocketSessionMan::addNotification
}
}
namespace {
// The string constants for download events.
const std::string ON_DOWNLOAD_START = "aria2.onDownloadStart";
const std::string ON_DOWNLOAD_PAUSE = "aria2.onDownloadPause";
const std::string ON_DOWNLOAD_STOP = "aria2.onDownloadStop";
const std::string ON_DOWNLOAD_COMPLETE = "aria2.onDownloadComplete";
const std::string ON_DOWNLOAD_ERROR = "aria2.onDownloadError";
const std::string ON_BT_DOWNLOAD_COMPLETE = "aria2.onBtDownloadComplete";
} // namespace
namespace {
const std::string& getMethodName(DownloadEvent event)
{
switch(event) {
case EVENT_ON_DOWNLOAD_START:
return ON_DOWNLOAD_START;
case EVENT_ON_DOWNLOAD_PAUSE:
return ON_DOWNLOAD_PAUSE;
case EVENT_ON_DOWNLOAD_STOP:
return ON_DOWNLOAD_STOP;
case EVENT_ON_DOWNLOAD_COMPLETE:
return ON_DOWNLOAD_COMPLETE;
case EVENT_ON_DOWNLOAD_ERROR:
return ON_DOWNLOAD_ERROR;
case EVENT_ON_BT_DOWNLOAD_COMPLETE:
return ON_BT_DOWNLOAD_COMPLETE;
default:
// Not reachable
assert(0);
// For suppress compiler warning
return A2STR::NIL;
}
}
} // namespace
void WebSocketSessionMan::onEvent(DownloadEvent event,
const RequestGroup* group)
{
addNotification(getMethodName(event), group);
}
} // namespace rpc
} // namespace aria2

View File

@ -35,7 +35,7 @@
#ifndef D_WEB_SOCKET_SESSION_MAN_H
#define D_WEB_SOCKET_SESSION_MAN_H
#include "common.h"
#include "Notifier.h"
#include <set>
#include <string>
@ -51,7 +51,7 @@ namespace rpc {
class WebSocketSession;
class WebSocketSessionMan {
class WebSocketSessionMan : public DownloadEventListener {
public:
typedef std::set<SharedHandle<WebSocketSession>,
RefLess<WebSocketSession> > WebSocketSessions;
@ -60,6 +60,7 @@ public:
void addSession(const SharedHandle<WebSocketSession>& wsSession);
void removeSession(const SharedHandle<WebSocketSession>& wsSession);
void addNotification(const std::string& method, const RequestGroup* group);
virtual void onEvent(DownloadEvent event, const RequestGroup* group);
private:
WebSocketSessions sessions_;
};

View File

@ -97,6 +97,39 @@ typedef uint64_t A2Gid;
*/
typedef std::vector<std::pair<std::string, std::string> > KeyVals;
/**
* @enum
*
* Download event constants
*/
enum DownloadEvent {
/**
* Indicating download has started.
*/
EVENT_ON_DOWNLOAD_START = 1,
/**
* Indicating download has paused.
*/
EVENT_ON_DOWNLOAD_PAUSE,
/**
* Indicating download has stopped.
*/
EVENT_ON_DOWNLOAD_STOP,
/**
* Indicating download has completed.
*/
EVENT_ON_DOWNLOAD_COMPLETE,
/**
* Indicating download has stopped becauseof the error.
*/
EVENT_ON_DOWNLOAD_ERROR,
/**
* Indicating BitTorrent download has completed, but it may still
* continue to perform seeding.
*/
EVENT_ON_BT_DOWNLOAD_COMPLETE
};
/**
* @struct
*