try to fix thread problem when use libuv.

pull/130/head
Apex Liu 2018-11-03 14:11:02 +08:00
parent 25d562ccf4
commit b1524fd6a0
16 changed files with 1355 additions and 1422 deletions

View File

@ -31,10 +31,12 @@ public:
bool terminate(void); bool terminate(void);
protected: protected:
// 线程循环 // main loop of this thread.
virtual void _thread_loop(void) = 0; virtual void _thread_loop(void) = 0;
// 设置停止标志,让线程能够正常结束 // called by another thread when thread ready to stop.
virtual void _set_stop_flag(void) = 0; virtual void _on_stop(void) {};
// called inside thread when thread fully stopped.
virtual void _on_stopped(void) {};
#ifdef EX_OS_WIN32 #ifdef EX_OS_WIN32
static unsigned int WINAPI _thread_func(LPVOID lpParam); static unsigned int WINAPI _thread_func(LPVOID lpParam);
@ -46,7 +48,7 @@ protected:
ex_astr m_thread_name; ex_astr m_thread_name;
EX_THREAD_HANDLE m_handle; EX_THREAD_HANDLE m_handle;
bool m_is_running; bool m_is_running;
bool m_stop_flag; bool m_need_stop;
}; };

View File

@ -9,90 +9,91 @@
#ifdef EX_OS_WIN32 #ifdef EX_OS_WIN32
unsigned int WINAPI ExThreadBase::_thread_func(LPVOID pParam) unsigned int WINAPI ExThreadBase::_thread_func(LPVOID pParam)
#else #else
void* ExThreadBase::_thread_func(void* pParam)
void *ExThreadBase::_thread_func(void *pParam)
#endif #endif
{ {
ExThreadBase* p = (ExThreadBase*)pParam; ExThreadBase *_this = (ExThreadBase *) pParam;
ex_astr thread_name = p->m_thread_name;
p->m_is_running = true;
p->_thread_loop();
p->m_is_running = false;
// if(!p->m_stop_by_request)
// p->m_thread_manager->_remove_thread(p);
EXLOGV(" # thread [%s] end.\n", thread_name.c_str()); _this->m_is_running = true;
_this->_thread_loop();
_this->m_is_running = false;
_this->m_handle = 0;
return 0; _this->_on_stopped();
EXLOGV(" # thread [%s] exit.\n", _this->m_thread_name.c_str());
return 0;
} }
ExThreadBase::ExThreadBase(const char* thread_name) : ExThreadBase::ExThreadBase(const char *thread_name) :
m_handle(0), m_handle(0),
m_is_running(false), m_is_running(false),
m_stop_flag(false) m_need_stop(false) {
{ m_thread_name = thread_name;
m_thread_name = thread_name;
} }
ExThreadBase::~ExThreadBase() ExThreadBase::~ExThreadBase() {
{ if(m_is_running) {
EXLOGE(" # thread [%s] not stop before destroy.\n", m_thread_name.c_str());
}
} }
bool ExThreadBase::start(void) bool ExThreadBase::start(void) {
{ m_need_stop = false;
EXLOGV(" . thread [%s] starting.\n", m_thread_name.c_str()); EXLOGV(" . thread [%s] starting.\n", m_thread_name.c_str());
#ifdef WIN32 #ifdef WIN32
HANDLE h = (HANDLE)_beginthreadex(NULL, 0, _thread_func, (void*)this, 0, NULL); HANDLE h = (HANDLE)_beginthreadex(NULL, 0, _thread_func, (void*)this, 0, NULL);
if (NULL == h) if (NULL == h)
{ {
return false; return false;
} }
m_handle = h; m_listener_handle = h;
#else #else
pthread_t ptid = 0; pthread_t ptid = 0;
int ret = pthread_create(&ptid, NULL, _thread_func, (void*)this); int ret = pthread_create(&ptid, NULL, _thread_func, (void *) this);
if (ret != 0) if (ret != 0) {
{ return false;
return false; }
} m_handle = ptid;
m_handle = ptid;
#endif #endif
return true; return true;
} }
bool ExThreadBase::stop(void) bool ExThreadBase::stop(void) {
{ if (m_handle == 0) {
EXLOGV("[thread] try to stop thread [%s].\n", m_thread_name.c_str()); EXLOGW("[thread] thread [%s] already stopped.\n", m_thread_name.c_str());
_set_stop_flag(); return true;
}
EXLOGV("[thread] wait thread [%s] end.\n", m_thread_name.c_str()); EXLOGV("[thread] try to stop thread [%s].\n", m_thread_name.c_str());
m_need_stop = true;
_on_stop();
if(m_handle == 0) EXLOGV("[thread] wait thread [%s] exit.\n", m_thread_name.c_str());
return true;
#ifdef EX_OS_WIN32 #ifdef EX_OS_WIN32
if (WaitForSingleObject(m_handle, INFINITE) != WAIT_OBJECT_0) if (WaitForSingleObject(m_listener_handle, INFINITE) != WAIT_OBJECT_0)
{ {
return false; return false;
} }
#else #else
if (pthread_join(m_handle, NULL) != 0) if (pthread_join(m_handle, NULL) != 0) {
{ return false;
return false; }
}
#endif #endif
return true; return true;
} }
bool ExThreadBase::terminate(void) bool ExThreadBase::terminate(void) {
{
#ifdef EX_OS_WIN32 #ifdef EX_OS_WIN32
return TerminateThread(m_handle, 1) ? true : false; return (TerminateThread(m_listener_handle, 1) == TRUE);
#else #else
return pthread_cancel(m_handle) == 0 ? true : false; return (pthread_cancel(m_handle) == 0);
#endif #endif
} }
@ -100,105 +101,89 @@ bool ExThreadBase::terminate(void)
// //
//========================================================= //=========================================================
ExThreadManager::ExThreadManager() ExThreadManager::ExThreadManager() {}
{}
ExThreadManager::~ExThreadManager() ExThreadManager::~ExThreadManager() {
{ if (!m_threads.empty()) {
if (m_threads.size() > 0) EXLOGE("when destroy thread manager, there are %d thread not exit.\n", m_threads.size());
{ stop_all();
EXLOGE("when destroy thread manager, there are %d thread not exit.\n", m_threads.size()); }
stop_all();
}
} }
void ExThreadManager::stop_all(void) void ExThreadManager::stop_all(void) {
{ ExThreadSmartLock locker(m_lock);
ExThreadSmartLock locker(m_lock);
ex_threads::iterator it = m_threads.begin(); ex_threads::iterator it = m_threads.begin();
for (; it != m_threads.end(); ++it) for (; it != m_threads.end(); ++it) {
{ (*it)->stop();
(*it)->stop(); }
} m_threads.clear();
m_threads.clear();
} }
void ExThreadManager::add(ExThreadBase* tb) void ExThreadManager::add(ExThreadBase *tb) {
{ ExThreadSmartLock locker(m_lock);
ExThreadSmartLock locker(m_lock);
ex_threads::iterator it = m_threads.begin(); ex_threads::iterator it = m_threads.begin();
for (; it != m_threads.end(); ++it) for (; it != m_threads.end(); ++it) {
{ if ((*it) == tb) {
if ((*it) == tb) EXLOGE("when add thread to manager, it already exist.\n");
{ return;
EXLOGE("when add thread to manager, it already exist.\n"); }
return; }
}
}
m_threads.push_back(tb); m_threads.push_back(tb);
} }
void ExThreadManager::remove(ExThreadBase* tb) void ExThreadManager::remove(ExThreadBase *tb) {
{ ExThreadSmartLock locker(m_lock);
ExThreadSmartLock locker(m_lock);
ex_threads::iterator it = m_threads.begin(); ex_threads::iterator it = m_threads.begin();
for (; it != m_threads.end(); ++it) for (; it != m_threads.end(); ++it) {
{ if ((*it) == tb) {
if ((*it) == tb) m_threads.erase(it);
{ return;
//delete (*it); }
m_threads.erase(it); }
return; EXLOGE("thread not hold by thread-manager while remove it.\n");
}
}
EXLOGE("when remove thread from manager, it not exist.\n");
} }
//========================================================= //=========================================================
// //
//========================================================= //=========================================================
ExThreadLock::ExThreadLock() ExThreadLock::ExThreadLock() {
{
#ifdef EX_OS_WIN32 #ifdef EX_OS_WIN32
InitializeCriticalSection(&m_locker); InitializeCriticalSection(&m_locker);
#else #else
pthread_mutexattr_t attr; pthread_mutexattr_t attr;
pthread_mutexattr_init(&attr); pthread_mutexattr_init(&attr);
pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE); pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE);
pthread_mutex_init(&m_locker, &attr); pthread_mutex_init(&m_locker, &attr);
pthread_mutexattr_destroy(&attr); pthread_mutexattr_destroy(&attr);
#endif #endif
} }
ExThreadLock::~ExThreadLock() ExThreadLock::~ExThreadLock() {
{
#ifdef EX_OS_WIN32 #ifdef EX_OS_WIN32
DeleteCriticalSection(&m_locker); DeleteCriticalSection(&m_locker);
#else #else
pthread_mutex_destroy(&m_locker); pthread_mutex_destroy(&m_locker);
#endif #endif
} }
void ExThreadLock::lock(void) void ExThreadLock::lock(void) {
{
#ifdef EX_OS_WIN32 #ifdef EX_OS_WIN32
EnterCriticalSection(&m_locker); EnterCriticalSection(&m_locker);
#else #else
pthread_mutex_lock(&m_locker); pthread_mutex_lock(&m_locker);
#endif #endif
} }
void ExThreadLock::unlock(void) void ExThreadLock::unlock(void) {
{
#ifdef EX_OS_WIN32 #ifdef EX_OS_WIN32
LeaveCriticalSection(&m_locker); LeaveCriticalSection(&m_locker);
#else #else
pthread_mutex_unlock(&m_locker); pthread_mutex_unlock(&m_locker);
#endif #endif
} }
@ -206,39 +191,35 @@ void ExThreadLock::unlock(void)
// //
//========================================================= //=========================================================
int ex_atomic_add(volatile int* pt, int t) int ex_atomic_add(volatile int *pt, int t) {
{
#ifdef EX_OS_WIN32 #ifdef EX_OS_WIN32
return (int)InterlockedExchangeAdd((long*)pt, (long)t); return (int)InterlockedExchangeAdd((long*)pt, (long)t);
#else #else
return __sync_add_and_fetch(pt, t); return __sync_add_and_fetch(pt, t);
#endif #endif
} }
int ex_atomic_inc(volatile int* pt) int ex_atomic_inc(volatile int *pt) {
{
#ifdef EX_OS_WIN32 #ifdef EX_OS_WIN32
return (int)InterlockedIncrement((long*)pt); return (int)InterlockedIncrement((long*)pt);
#else #else
return __sync_add_and_fetch(pt, 1); return __sync_add_and_fetch(pt, 1);
#endif #endif
} }
int ex_atomic_dec(volatile int* pt) int ex_atomic_dec(volatile int *pt) {
{
#ifdef EX_OS_WIN32 #ifdef EX_OS_WIN32
return (int)InterlockedDecrement((long*)pt); return (int)InterlockedDecrement((long*)pt);
#else #else
return __sync_add_and_fetch(pt, -1); return __sync_add_and_fetch(pt, -1);
#endif #endif
} }
ex_u64 ex_get_thread_id(void) ex_u64 ex_get_thread_id(void) {
{
#ifdef EX_OS_WIN32 #ifdef EX_OS_WIN32
return GetCurrentThreadId(); return GetCurrentThreadId();
#else #else
return (ex_u64)pthread_self(); return (ex_u64) pthread_self();
#endif #endif
} }

View File

@ -31,11 +31,9 @@
<file url="file://$PROJECT_DIR$/tp_core/core/ts_web_rpc.h" charset="GBK" /> <file url="file://$PROJECT_DIR$/tp_core/core/ts_web_rpc.h" charset="GBK" />
<file url="file://$PROJECT_DIR$/tp_core/protocol/rdp/rdp_conn.cpp" charset="GBK" /> <file url="file://$PROJECT_DIR$/tp_core/protocol/rdp/rdp_conn.cpp" charset="GBK" />
<file url="file://$PROJECT_DIR$/tp_core/protocol/rdp/rdp_conn.h" charset="GBK" /> <file url="file://$PROJECT_DIR$/tp_core/protocol/rdp/rdp_conn.h" charset="GBK" />
<file url="file://$PROJECT_DIR$/tp_core/protocol/rdp/rdp_keys.cpp" charset="GBK" />
<file url="file://$PROJECT_DIR$/tp_core/protocol/rdp/rdp_package.cpp" charset="GBK" />
<file url="file://$PROJECT_DIR$/tp_core/protocol/rdp/rdp_package.h" charset="GBK" />
<file url="file://$PROJECT_DIR$/tp_core/protocol/rdp/rdp_proxy.cpp" charset="GBK" /> <file url="file://$PROJECT_DIR$/tp_core/protocol/rdp/rdp_proxy.cpp" charset="GBK" />
<file url="file://$PROJECT_DIR$/tp_core/protocol/rdp/rdp_session.cpp" charset="GBK" /> <file url="file://$PROJECT_DIR$/tp_core/protocol/rdp/rdp_session.cpp" charset="GBK" />
<file url="file://$PROJECT_DIR$/tp_core/protocol/rdp/rdp_session.h" charset="GBK" />
<file url="file://$PROJECT_DIR$/tp_core/protocol/ssh/ssh_proxy.cpp" charset="GBK" /> <file url="file://$PROJECT_DIR$/tp_core/protocol/ssh/ssh_proxy.cpp" charset="GBK" />
<file url="file://$PROJECT_DIR$/tp_core/protocol/ssh/ssh_proxy.h" charset="GBK" /> <file url="file://$PROJECT_DIR$/tp_core/protocol/ssh/ssh_proxy.h" charset="GBK" />
<file url="file://$PROJECT_DIR$/tp_core/protocol/ssh/ssh_recorder.cpp" charset="GBK" /> <file url="file://$PROJECT_DIR$/tp_core/protocol/ssh/ssh_recorder.cpp" charset="GBK" />
@ -43,7 +41,10 @@
<file url="file://$PROJECT_DIR$/tp_core/protocol/ssh/ssh_session.cpp" charset="GBK" /> <file url="file://$PROJECT_DIR$/tp_core/protocol/ssh/ssh_session.cpp" charset="GBK" />
<file url="file://$PROJECT_DIR$/tp_core/protocol/ssh/ssh_session.h" charset="GBK" /> <file url="file://$PROJECT_DIR$/tp_core/protocol/ssh/ssh_session.h" charset="GBK" />
<file url="file://$PROJECT_DIR$/tp_core/protocol/telnet/telnet_conn.cpp" charset="GBK" /> <file url="file://$PROJECT_DIR$/tp_core/protocol/telnet/telnet_conn.cpp" charset="GBK" />
<file url="file://$PROJECT_DIR$/tp_core/protocol/telnet/telnet_conn.h" charset="GBK" />
<file url="file://$PROJECT_DIR$/tp_core/protocol/telnet/telnet_proxy.cpp" charset="GBK" />
<file url="file://$PROJECT_DIR$/tp_core/protocol/telnet/telnet_session.cpp" charset="GBK" /> <file url="file://$PROJECT_DIR$/tp_core/protocol/telnet/telnet_session.cpp" charset="GBK" />
<file url="file://$PROJECT_DIR$/tp_core/protocol/telnet/telnet_session.h" charset="GBK" />
<file url="file://$PROJECT_DIR$/tp_web/src/main.cpp" charset="GBK" /> <file url="file://$PROJECT_DIR$/tp_web/src/main.cpp" charset="GBK" />
<file url="file://$PROJECT_DIR$/tp_web/src/ts_env.cpp" charset="GBK" /> <file url="file://$PROJECT_DIR$/tp_web/src/ts_env.cpp" charset="GBK" />
</component> </component>

View File

@ -4,7 +4,8 @@ project(teleport)
MESSAGE(STATUS "operation system is ${CMAKE_SYSTEM}") MESSAGE(STATUS "operation system is ${CMAKE_SYSTEM}")
MESSAGE(STATUS "current source directory is ${CMAKE_CURRENT_SOURCE_DIR}") MESSAGE(STATUS "current source directory is ${CMAKE_CURRENT_SOURCE_DIR}")
set(CMAKE_RUNTIME_OUTPUT_DIRECTORY "${teleport_SOURCE_DIR}/../out/server/x64/bin") set(CMAKE_RUNTIME_OUTPUT_DIRECTORY "${CMAKE_CURRENT_SOURCE_DIR}/../out/server/x64/bin")
set(CMAKE_LIBRARY_OUTPUT_DIRECTORY "${CMAKE_CURRENT_SOURCE_DIR}/../out/server/x64/bin")
set(CMAKE_CONFIGURATION_TYPES Debug Release) set(CMAKE_CONFIGURATION_TYPES Debug Release)
@ -13,13 +14,13 @@ if("${CMAKE_SYSTEM_NAME}" STREQUAL "Darwin")
MESSAGE(STATUS "build on macOS...") MESSAGE(STATUS "build on macOS...")
set(OS_MACOS 1) set(OS_MACOS 1)
set(OS_POSIX 1) set(OS_POSIX 1)
set(TP_EXTERNAL_RELEASE_DIR "${teleport_SOURCE_DIR}/../external/macos/release") set(TP_EXTERNAL_RELEASE_DIR "${CMAKE_CURRENT_SOURCE_DIR}/../external/macos/release")
elseif("${CMAKE_SYSTEM_NAME}" STREQUAL "Linux") elseif("${CMAKE_SYSTEM_NAME}" STREQUAL "Linux")
set(OS_LINUX 1) set(OS_LINUX 1)
set(OS_POSIX 1) set(OS_POSIX 1)
MESSAGE(STATUS "build on Linux...") MESSAGE(STATUS "build on Linux...")
add_subdirectory(tp_web/src) add_subdirectory(tp_web/src)
set(TP_EXTERNAL_RELEASE_DIR "${teleport_SOURCE_DIR}/../external/linux/release") set(TP_EXTERNAL_RELEASE_DIR "${CMAKE_CURRENT_SOURCE_DIR}/../external/linux/release")
elseif("${CMAKE_SYSTEM_NAME}" STREQUAL "Windows") elseif("${CMAKE_SYSTEM_NAME}" STREQUAL "Windows")
MESSAGE(FATAL_ERROR "unsupported platform: Windows") MESSAGE(FATAL_ERROR "unsupported platform: Windows")
else() else()

View File

@ -359,9 +359,8 @@ void _sig_handler(int signum, siginfo_t* info, void* ptr)
{ {
if (signum == SIGINT || signum == SIGTERM) if (signum == SIGINT || signum == SIGTERM)
{ {
EXLOGW("[core] received signal SIGINT, exit now.\n"); EXLOGW("\n[core] received signal SIGINT, exit now.\n");
g_exit_flag = true; g_exit_flag = true;
// exit(1);
} }
} }

View File

@ -62,7 +62,7 @@ void TsHttpRpc::_thread_loop(void)
{ {
EXLOGI("[core] TeleportServer-RPC ready on %s:%d\n", m_host_ip.c_str(), m_host_port); EXLOGI("[core] TeleportServer-RPC ready on %s:%d\n", m_host_ip.c_str(), m_host_port);
while (!m_stop_flag) while (!m_need_stop)
{ {
mg_mgr_poll(&m_mg_mgr, 500); mg_mgr_poll(&m_mg_mgr, 500);
} }
@ -70,10 +70,6 @@ void TsHttpRpc::_thread_loop(void)
EXLOGV("[core] rpc main loop end.\n"); EXLOGV("[core] rpc main loop end.\n");
} }
void TsHttpRpc::_set_stop_flag(void)
{
m_stop_flag = true;
}
bool TsHttpRpc::init(void) bool TsHttpRpc::init(void)
{ {

View File

@ -20,7 +20,6 @@ public:
protected: protected:
void _thread_loop(void); void _thread_loop(void);
void _set_stop_flag(void);
private: private:
ex_rv _parse_request(struct http_message* req, ex_astr& func_cmd, Json::Value& json_param); ex_rv _parse_request(struct http_message* req, ex_astr& func_cmd, Json::Value& json_param);

View File

@ -10,96 +10,93 @@
bool g_exit_flag = false; bool g_exit_flag = false;
TPP_CONNECT_INFO* tpp_get_connect_info(const char* sid) TPP_CONNECT_INFO *tpp_get_connect_info(const char *sid) {
{ TS_CONNECT_INFO sinfo;
TS_CONNECT_INFO sinfo; bool ret = g_session_mgr.get_connect_info(sid, sinfo);
bool ret = g_session_mgr.get_connect_info(sid, sinfo); if (!ret)
if (!ret) return NULL;
return NULL;
TPP_CONNECT_INFO* info = (TPP_CONNECT_INFO*)calloc(1, sizeof(TPP_CONNECT_INFO)); TPP_CONNECT_INFO *info = (TPP_CONNECT_INFO *) calloc(1, sizeof(TPP_CONNECT_INFO));
info->sid = (char*)calloc(1, sinfo.sid.length() + 1);
ex_strcpy(info->sid, sinfo.sid.length() + 1, sinfo.sid.c_str());
info->user_username = (char*)calloc(1, sinfo.user_username.length() + 1);
ex_strcpy(info->user_username, sinfo.user_username.length() + 1, sinfo.user_username.c_str());
info->host_ip = (char*)calloc(1, sinfo.host_ip.length() + 1);
ex_strcpy(info->host_ip, sinfo.host_ip.length() + 1, sinfo.host_ip.c_str());
info->conn_ip = (char*)calloc(1, sinfo.conn_ip.length() + 1);
ex_strcpy(info->conn_ip, sinfo.conn_ip.length() + 1, sinfo.conn_ip.c_str());
info->client_ip = (char*)calloc(1, sinfo.client_ip.length() + 1);
ex_strcpy(info->client_ip, sinfo.client_ip.length() + 1, sinfo.client_ip.c_str());
info->acc_username = (char*)calloc(1, sinfo.acc_username.length() + 1);
ex_strcpy(info->acc_username, sinfo.acc_username.length() + 1, sinfo.acc_username.c_str());
info->acc_secret = (char*)calloc(1, sinfo.acc_secret.length() + 1);
ex_strcpy(info->acc_secret, sinfo.acc_secret.length() + 1, sinfo.acc_secret.c_str());
info->username_prompt = (char*)calloc(1, sinfo.username_prompt.length() + 1);
ex_strcpy(info->username_prompt, sinfo.username_prompt.length() + 1, sinfo.username_prompt.c_str());
info->password_prompt = (char*)calloc(1, sinfo.password_prompt.length() + 1);
ex_strcpy(info->password_prompt, sinfo.password_prompt.length() + 1, sinfo.password_prompt.c_str());
info->user_id = sinfo.user_id; info->sid = (char *) calloc(1, sinfo.sid.length() + 1);
info->host_id = sinfo.host_id; ex_strcpy(info->sid, sinfo.sid.length() + 1, sinfo.sid.c_str());
info->acc_id = sinfo.acc_id; info->user_username = (char *) calloc(1, sinfo.user_username.length() + 1);
info->conn_port = sinfo.conn_port; ex_strcpy(info->user_username, sinfo.user_username.length() + 1, sinfo.user_username.c_str());
info->protocol_type = sinfo.protocol_type; info->host_ip = (char *) calloc(1, sinfo.host_ip.length() + 1);
info->protocol_sub_type = sinfo.protocol_sub_type; ex_strcpy(info->host_ip, sinfo.host_ip.length() + 1, sinfo.host_ip.c_str());
info->protocol_flag = sinfo.protocol_flag; info->conn_ip = (char *) calloc(1, sinfo.conn_ip.length() + 1);
info->record_flag = sinfo.record_flag; ex_strcpy(info->conn_ip, sinfo.conn_ip.length() + 1, sinfo.conn_ip.c_str());
info->auth_type= sinfo.auth_type; info->client_ip = (char *) calloc(1, sinfo.client_ip.length() + 1);
ex_strcpy(info->client_ip, sinfo.client_ip.length() + 1, sinfo.client_ip.c_str());
info->acc_username = (char *) calloc(1, sinfo.acc_username.length() + 1);
ex_strcpy(info->acc_username, sinfo.acc_username.length() + 1, sinfo.acc_username.c_str());
info->acc_secret = (char *) calloc(1, sinfo.acc_secret.length() + 1);
ex_strcpy(info->acc_secret, sinfo.acc_secret.length() + 1, sinfo.acc_secret.c_str());
info->username_prompt = (char *) calloc(1, sinfo.username_prompt.length() + 1);
ex_strcpy(info->username_prompt, sinfo.username_prompt.length() + 1, sinfo.username_prompt.c_str());
info->password_prompt = (char *) calloc(1, sinfo.password_prompt.length() + 1);
ex_strcpy(info->password_prompt, sinfo.password_prompt.length() + 1, sinfo.password_prompt.c_str());
return info; info->user_id = sinfo.user_id;
info->host_id = sinfo.host_id;
info->acc_id = sinfo.acc_id;
info->conn_port = sinfo.conn_port;
info->protocol_type = sinfo.protocol_type;
info->protocol_sub_type = sinfo.protocol_sub_type;
info->protocol_flag = sinfo.protocol_flag;
info->record_flag = sinfo.record_flag;
info->auth_type = sinfo.auth_type;
return info;
} }
void tpp_free_connect_info(TPP_CONNECT_INFO* info) void tpp_free_connect_info(TPP_CONNECT_INFO *info) {
{ if (NULL == info)
if (NULL == info) return;
return;
g_session_mgr.free_connect_info(info->sid); g_session_mgr.free_connect_info(info->sid);
free(info->sid); free(info->sid);
free(info->user_username); free(info->user_username);
free(info->host_ip); free(info->host_ip);
free(info->conn_ip); free(info->conn_ip);
free(info->client_ip); free(info->client_ip);
free(info->acc_username); free(info->acc_username);
free(info->acc_secret); free(info->acc_secret);
free(info->username_prompt); free(info->username_prompt);
free(info->password_prompt); free(info->password_prompt);
free(info); free(info);
} }
bool tpp_session_begin(const TPP_CONNECT_INFO* info, int* db_id) bool tpp_session_begin(const TPP_CONNECT_INFO *info, int *db_id) {
{ if (NULL == info || NULL == db_id)
if (NULL == info || NULL == db_id) return false;
return false;
TS_CONNECT_INFO sinfo; TS_CONNECT_INFO sinfo;
sinfo.sid = info->sid; sinfo.sid = info->sid;
sinfo.user_id = info->user_id; sinfo.user_id = info->user_id;
sinfo.host_id = info->host_id; sinfo.host_id = info->host_id;
sinfo.acc_id = info->acc_id; sinfo.acc_id = info->acc_id;
sinfo.user_username = info->user_username; sinfo.user_username = info->user_username;
sinfo.host_ip = info->host_ip; sinfo.host_ip = info->host_ip;
sinfo.conn_ip = info->conn_ip; sinfo.conn_ip = info->conn_ip;
sinfo.client_ip = info->client_ip; sinfo.client_ip = info->client_ip;
sinfo.acc_username = info->acc_username; sinfo.acc_username = info->acc_username;
sinfo.conn_port = info->conn_port; sinfo.conn_port = info->conn_port;
sinfo.protocol_type = info->protocol_type; sinfo.protocol_type = info->protocol_type;
sinfo.protocol_sub_type = info->protocol_sub_type; sinfo.protocol_sub_type = info->protocol_sub_type;
sinfo.auth_type = info->auth_type; sinfo.auth_type = info->auth_type;
return ts_web_rpc_session_begin(sinfo, *db_id); return ts_web_rpc_session_begin(sinfo, *db_id);
} }
bool tpp_session_update(int db_id, int protocol_sub_type, int state) { bool tpp_session_update(int db_id, int protocol_sub_type, int state) {
return ts_web_rpc_session_update(db_id, protocol_sub_type, state); return ts_web_rpc_session_update(db_id, protocol_sub_type, state);
} }
bool tpp_session_end(const char* sid, int db_id, int ret) { bool tpp_session_end(const char *sid, int db_id, int ret) {
return ts_web_rpc_session_end(sid, db_id, ret); return ts_web_rpc_session_end(sid, db_id, ret);
} }
// typedef struct TPP_LIB // typedef struct TPP_LIB
@ -246,87 +243,78 @@ bool tpp_session_end(const char* sid, int db_id, int ret) {
// } // }
// } // }
int ts_main(void) int ts_main(void) {
{ ExIniFile &ini = g_env.get_ini();
ExIniFile& ini = g_env.get_ini();
EXLOGI(L"\n"); EXLOGI(L"\n");
EXLOGI(L"###############################################################\n"); EXLOGI(L"###############################################################\n");
EXLOGI(L"Load config file: %ls.\n", ini.get_filename().c_str()); EXLOGI(L"Load config file: %ls.\n", ini.get_filename().c_str());
EXLOGI(L"Teleport Core Server starting ...\n"); EXLOGI(L"Teleport Core Server starting ...\n");
ex_ini_sections& secs = ini.GetAllSections(); ex_ini_sections &secs = ini.GetAllSections();
TsHttpRpc rpc; TsHttpRpc rpc;
// 枚举配置文件中的[protocol-xxx]小节,加载对应的协议动态库 // 枚举配置文件中的[protocol-xxx]小节,加载对应的协议动态库
bool all_ok = true; bool all_ok = true;
do { do {
if (!g_session_mgr.start()) if (!g_session_mgr.start()) {
{ EXLOGE(L"[core] failed to start session-id manager.\n");
EXLOGE(L"[core] failed to start session-id manager.\n"); all_ok = false;
all_ok = false; break;
break; }
}
if (!rpc.init() || !rpc.start()) if (!rpc.init() || !rpc.start()) {
{ EXLOGE(L"[core] rpc init/start failed.\n");
EXLOGE(L"[core] rpc init/start failed.\n"); all_ok = false;
all_ok = false; break;
break; }
}
ex_ini_sections::iterator it = secs.begin(); ex_ini_sections::iterator it = secs.begin();
for (; it != secs.end(); ++it) for (; it != secs.end(); ++it) {
{ if (it->first.length() > 9 && 0 == wcsncmp(it->first.c_str(), L"protocol-", 9)) {
if (it->first.length() > 9 && 0 == wcsncmp(it->first.c_str(), L"protocol-", 9)) ex_wstr libname;
{ if (!it->second->GetStr(L"lib", libname))
ex_wstr libname; continue;
if (!it->second->GetStr(L"lib", libname))
continue;
bool enabled = false; bool enabled = false;
it->second->GetBool(L"enabled", enabled, false); it->second->GetBool(L"enabled", enabled, false);
if (!enabled) if (!enabled) {
{ EXLOGV(L"[core] `%ls` not enabled.\n", libname.c_str());
EXLOGV(L"[core] `%ls` not enabled.\n", libname.c_str()); continue;
continue; }
}
if (!g_tpp_mgr.load_tpp(libname)) if (!g_tpp_mgr.load_tpp(libname)) {
{ all_ok = false;
all_ok = false; break;
break; }
} }
} }
}
} while (0); } while (0);
if (0 == g_tpp_mgr.count()) if (0 == g_tpp_mgr.count()) {
{ all_ok = false;
all_ok = false; }
}
if (!all_ok) if (!all_ok) {
{ g_exit_flag = true;
g_exit_flag = true; }
}
if (!g_exit_flag) { if (!g_exit_flag) {
ts_web_rpc_register_core(); ts_web_rpc_register_core();
EXLOGV("[core] ---- initialized, ready for service ----\n"); EXLOGV("[core] ---- initialized, ready for service ----\n");
while (!g_exit_flag) while (!g_exit_flag) {
{ ex_sleep_ms(1000);
ex_sleep_ms(1000); g_tpp_mgr.timer();
g_tpp_mgr.timer(); }
} }
}
g_tpp_mgr.stop_all(); EXLOGW("[core] try to stop all thread and exit.\n");
rpc.stop(); g_tpp_mgr.stop_all();
g_session_mgr.stop(); rpc.stop();
g_session_mgr.stop();
return 0; return 0;
} }

View File

@ -7,188 +7,166 @@
TsSessionManager g_session_mgr; TsSessionManager g_session_mgr;
TsSessionManager::TsSessionManager() : TsSessionManager::TsSessionManager() :
ExThreadBase("sid-mgr-thread") ExThreadBase("sid-mgr-thread") {
{
} }
TsSessionManager::~TsSessionManager() TsSessionManager::~TsSessionManager() {
{ ts_connections::iterator it_conn = m_connections.begin();
ts_connections::iterator it_conn = m_connections.begin(); for (; it_conn != m_connections.end(); ++it_conn) {
for (; it_conn != m_connections.end(); ++it_conn) delete it_conn->second;
{ }
delete it_conn->second; m_connections.clear();
}
m_connections.clear();
} }
void TsSessionManager::_thread_loop(void) void TsSessionManager::_thread_loop(void) {
{ for (;;) {
for (;;) ex_sleep_ms(1000);
{ if (m_need_stop)
ex_sleep_ms(1000); return;
if (m_stop_flag) _remove_expired_connect_info();
return; }
_remove_expired_connect_info();
}
} }
void TsSessionManager::_set_stop_flag(void) void TsSessionManager::_remove_expired_connect_info(void) {
{ // 超过15秒未进行连接的connect-info会被移除
m_stop_flag = true;
ExThreadSmartLock locker(m_lock);
ex_u64 _now = ex_get_tick_count();
ts_connections::iterator it = m_connections.begin();
for (; it != m_connections.end();) {
//EXLOGD("[core] check expired connect info: [%s] %d, %d %d %d\n", it->first.c_str(), it->second->ref_count, int(_now), int(it->second->ticket_start), int(_now - it->second->ticket_start));
if (it->second->ref_count == 0 && _now - 15000 > it->second->ticket_start) {
EXLOGD("[core] remove connection info, because timeout: %s\n", it->first.c_str());
delete it->second;
m_connections.erase(it++);
EXLOGD("[core] there are %d connection info exists.\n", m_connections.size());
} else {
++it;
}
}
} }
bool TsSessionManager::get_connect_info(const ex_astr &sid, TS_CONNECT_INFO &info) {
ExThreadSmartLock locker(m_lock);
void TsSessionManager::_remove_expired_connect_info(void) ts_connections::iterator it = m_connections.find(sid);
{ if (it == m_connections.end())
// 超过15秒未进行连接的connect-info会被移除 return false;
ExThreadSmartLock locker(m_lock); info.sid = it->second->sid;
info.user_id = it->second->user_id;
info.host_id = it->second->host_id;
info.acc_id = it->second->acc_id;
info.user_username = it->second->user_username;
info.host_ip = it->second->host_ip;
info.conn_ip = it->second->conn_ip;
info.conn_port = it->second->conn_port;
info.client_ip = it->second->client_ip;
info.acc_username = it->second->acc_username;
info.acc_secret = it->second->acc_secret;
info.username_prompt = it->second->username_prompt;
info.password_prompt = it->second->password_prompt;
info.protocol_type = it->second->protocol_type;
info.protocol_sub_type = it->second->protocol_sub_type;
info.protocol_flag = it->second->protocol_flag;
info.record_flag = it->second->record_flag;
info.auth_type = it->second->auth_type;
ex_u64 _now = ex_get_tick_count(); it->second->ref_count++;
ts_connections::iterator it = m_connections.begin();
for (; it != m_connections.end(); ) return true;
{
//EXLOGD("[core] check expired connect info: [%s] %d, %d %d %d\n", it->first.c_str(), it->second->ref_count, int(_now), int(it->second->ticket_start), int(_now - it->second->ticket_start));
if (it->second->ref_count == 0 && _now - 15000 > it->second->ticket_start)
{
EXLOGD("[core] remove connection info, because timeout: %s\n", it->first.c_str());
delete it->second;
m_connections.erase(it++);
EXLOGD("[core] there are %d connection info exists.\n", m_connections.size());
}
else
{
++it;
}
}
} }
bool TsSessionManager::get_connect_info(const ex_astr& sid, TS_CONNECT_INFO& info) bool TsSessionManager::free_connect_info(const ex_astr &sid) {
{ ExThreadSmartLock locker(m_lock);
ExThreadSmartLock locker(m_lock);
ts_connections::iterator it = m_connections.find(sid); ts_connections::iterator it = m_connections.find(sid);
if (it == m_connections.end()) if (it == m_connections.end())
return false; return false;
info.sid = it->second->sid; it->second->ref_count--;
info.user_id = it->second->user_id;
info.host_id = it->second->host_id;
info.acc_id = it->second->acc_id;
info.user_username = it->second->user_username;
info.host_ip = it->second->host_ip;
info.conn_ip = it->second->conn_ip;
info.conn_port = it->second->conn_port;
info.client_ip = it->second->client_ip;
info.acc_username = it->second->acc_username;
info.acc_secret = it->second->acc_secret;
info.username_prompt = it->second->username_prompt;
info.password_prompt = it->second->password_prompt;
info.protocol_type = it->second->protocol_type;
info.protocol_sub_type = it->second->protocol_sub_type;
info.protocol_flag = it->second->protocol_flag;
info.record_flag = it->second->record_flag;
info.auth_type = it->second->auth_type;
it->second->ref_count++; // 对于RDP来说此时不要移除连接信息系统自带RDP客户端在第一次连接时进行协议协商然后马上会断开之后立即重新连接一次第二次连接之前可能会提示证书信息如果用户长时间不操作可能会导致超时
// 因此,我们将其引用计数减低,并更新一下最后访问时间,让定时器来移除它。
if (it->second->protocol_type != TP_PROTOCOL_TYPE_RDP) {
if (it->second->ref_count <= 0) {
EXLOGD("[core] remove connection info, because all connections closed: %s\n", it->first.c_str());
delete it->second;
m_connections.erase(it);
EXLOGD("[core] there are %d connection info exists.\n", m_connections.size());
}
} else {
if (it->second->ref_count == 1)
it->second->ref_count = 0;
it->second->ticket_start = ex_get_tick_count() + 45000; // 我们将时间向后移动45秒这样如果没有发生RDP的第二次连接这个连接信息就会在一分钟后被清除。
}
return true;
return true;
} }
bool TsSessionManager::free_connect_info(const ex_astr& sid) { bool TsSessionManager::request_session(ex_astr &sid, TS_CONNECT_INFO *info) {
ExThreadSmartLock locker(m_lock); ExThreadSmartLock locker(m_lock);
ts_connections::iterator it = m_connections.find(sid); EXLOGD("[core] request session: account: [%s], protocol: [%d], auth-mode: [%d]\n", info->acc_username.c_str(),
if (it == m_connections.end()) info->protocol_type, info->auth_type);
return false;
it->second->ref_count--; ex_astr _sid;
int retried = 0;
ts_connections::iterator it;
for (;;) {
_gen_session_id(_sid, info, 6);
it = m_connections.find(_sid);
if (it == m_connections.end())
break;
// 对于RDP来说此时不要移除连接信息系统自带RDP客户端在第一次连接时进行协议协商然后马上会断开之后立即重新连接一次第二次连接之前可能会提示证书信息如果用户长时间不操作可能会导致超时 retried++;
// 因此,我们将其引用计数减低,并更新一下最后访问时间,让定时器来移除它。 if (retried > 50)
if (it->second->protocol_type != TP_PROTOCOL_TYPE_RDP) { return false;
if (it->second->ref_count <= 0) { }
EXLOGD("[core] remove connection info, because all connections closed: %s\n", it->first.c_str());
delete it->second;
m_connections.erase(it);
EXLOGD("[core] there are %d connection info exists.\n", m_connections.size());
}
}
else {
if (it->second->ref_count == 1)
it->second->ref_count = 0;
it->second->ticket_start = ex_get_tick_count() + 45000; // 我们将时间向后移动45秒这样如果没有发生RDP的第二次连接这个连接信息就会在一分钟后被清除。
}
info->sid = _sid;
info->ref_count = 0;
info->ticket_start = ex_get_tick_count();
m_connections.insert(std::make_pair(_sid, info));
return true; sid = _sid;
if (info->protocol_type == TP_PROTOCOL_TYPE_RDP) {
info->ref_count = 1; // 因为RDP连接之前可能会有很长时间用于确认是否连接、是否信任证书所以很容易超时我们认为将引用计数+1防止因超时被清除。
char szTmp[8] = {0};
snprintf(szTmp, 8, "%02X", (unsigned char) (info->acc_username.length() + info->acc_secret.length()));
sid += szTmp;
}
return true;
} }
bool TsSessionManager::request_session(ex_astr& sid, TS_CONNECT_INFO* info) void TsSessionManager::_gen_session_id(ex_astr &sid, const TS_CONNECT_INFO *info, int len) {
{ mbedtls_sha1_context sha;
ExThreadSmartLock locker(m_lock); ex_u8 sha_digist[20] = {0};
EXLOGD("[core] request session: account: [%s], protocol: [%d], auth-mode: [%d]\n", info->acc_username.c_str(), info->protocol_type, info->auth_type); ex_u64 _tick = ex_get_tick_count();
ex_u64 _tid = ex_get_thread_id();
ex_astr _sid; mbedtls_sha1_init(&sha);
int retried = 0; mbedtls_sha1_starts(&sha);
ts_connections::iterator it; mbedtls_sha1_update(&sha, (const unsigned char *) &_tick, sizeof(ex_u64));
for (;;) mbedtls_sha1_update(&sha, (const unsigned char *) &_tid, sizeof(ex_u64));
{ mbedtls_sha1_update(&sha, (const unsigned char *) info->conn_ip.c_str(), info->conn_ip.length());
_gen_session_id(_sid, info, 6); mbedtls_sha1_update(&sha, (const unsigned char *) info->client_ip.c_str(), info->client_ip.length());
it = m_connections.find(_sid); mbedtls_sha1_update(&sha, (const unsigned char *) info->acc_username.c_str(), info->acc_username.length());
if (it == m_connections.end()) mbedtls_sha1_finish(&sha, sha_digist);
break; mbedtls_sha1_free(&sha);
retried++; char szTmp[64] = {0};
if (retried > 50) int _len = len / 2 + 1;
return false; int i = 0;
} int offset = 0;
for (i = 0; i < _len; ++i) {
snprintf(szTmp + offset, 64 - offset, "%02X", sha_digist[i]);
offset += 2;
}
info->sid = _sid; sid.assign(szTmp, len);
info->ref_count = 0;
info->ticket_start = ex_get_tick_count();
m_connections.insert(std::make_pair(_sid, info));
sid = _sid;
if (info->protocol_type == TP_PROTOCOL_TYPE_RDP)
{
info->ref_count = 1; // 因为RDP连接之前可能会有很长时间用于确认是否连接、是否信任证书所以很容易超时我们认为将引用计数+1防止因超时被清除。
char szTmp[8] = { 0 };
snprintf(szTmp, 8, "%02X", (unsigned char)(info->acc_username.length() + info->acc_secret.length()));
sid += szTmp;
}
return true;
}
void TsSessionManager::_gen_session_id(ex_astr& sid, const TS_CONNECT_INFO* info, int len)
{
mbedtls_sha1_context sha;
ex_u8 sha_digist[20] = { 0 };
ex_u64 _tick = ex_get_tick_count();
ex_u64 _tid = ex_get_thread_id();
mbedtls_sha1_init(&sha);
mbedtls_sha1_starts(&sha);
mbedtls_sha1_update(&sha, (const unsigned char*)&_tick, sizeof(ex_u64));
mbedtls_sha1_update(&sha, (const unsigned char*)&_tid, sizeof(ex_u64));
mbedtls_sha1_update(&sha, (const unsigned char*)info->conn_ip.c_str(), info->conn_ip.length());
mbedtls_sha1_update(&sha, (const unsigned char*)info->client_ip.c_str(), info->client_ip.length());
mbedtls_sha1_update(&sha, (const unsigned char*)info->acc_username.c_str(), info->acc_username.length());
mbedtls_sha1_finish(&sha, sha_digist);
mbedtls_sha1_free(&sha);
char szTmp[64] = { 0 };
int _len = len / 2 + 1;
int i = 0;
int offset = 0;
for (i = 0; i < _len; ++i)
{
snprintf(szTmp + offset, 64 - offset, "%02X", sha_digist[i]);
offset += 2;
}
sid.assign(szTmp, len);
} }

View File

@ -57,10 +57,7 @@ public:
bool free_connect_info(const ex_astr& sid); bool free_connect_info(const ex_astr& sid);
protected: protected:
// 线程循环
void _thread_loop(void); void _thread_loop(void);
// 设置停止标志,让线程能够正常结束
void _set_stop_flag(void);
private: private:
void _gen_session_id(ex_astr& sid, const TS_CONNECT_INFO* info, int len); void _gen_session_id(ex_astr& sid, const TS_CONNECT_INFO* info, int len);

View File

@ -19,19 +19,19 @@ list(REMOVE_ITEM DIR_TELNET_SRCS "./stdafx.cpp")
include_directories( include_directories(
../../../../common/libex/include ../../../../common/libex/include
../../../../common/teleport ../../../../common/teleport
../../../../external/jsoncpp/include ../../../../external/jsoncpp/include
) )
include_directories( include_directories(
${TP_EXTERNAL_RELEASE_DIR}/include ${TP_EXTERNAL_RELEASE_DIR}/include
) )
link_directories(${TP_EXTERNAL_RELEASE_DIR}/lib) link_directories(${TP_EXTERNAL_RELEASE_DIR}/lib)
add_library(tptelnet SHARED ${DIR_TELNET_SRCS}) add_library(tptelnet SHARED ${DIR_TELNET_SRCS})
if (OS_LINUX) if (OS_LINUX)
target_link_libraries(tptelnet uv dl pthread rt util) target_link_libraries(tptelnet uv dl pthread rt util)
elseif (OS_MACOS) elseif (OS_MACOS)
target_link_libraries(tptelnet uv dl pthread util) target_link_libraries(tptelnet uv dl pthread util)
endif() endif ()

View File

@ -4,72 +4,90 @@
#include "../../common/ts_const.h" #include "../../common/ts_const.h"
#include <teleport_const.h> #include <teleport_const.h>
ex_astr _uv_str_error(int retcode) ex_astr _uv_str_error(int retcode) {
{ ex_astr err;
ex_astr err; err = uv_err_name(retcode);
err = uv_err_name(retcode); err += ":";
err += ":"; err += uv_strerror(retcode);
err += uv_strerror(retcode); return err;
return err;
} }
TelnetConn::TelnetConn(TelnetSession *sess, bool is_server_side) : m_session(sess), m_is_server(is_server_side) { TelnetConn::TelnetConn(TelnetSession *sess, bool is_server_side) : m_session(sess), m_is_server(is_server_side) {
if (is_server_side) { if (is_server_side) {
m_name = "cli<->tp"; m_name = "cli<->tp";
m_state = TELNET_CONN_STATE_CONNECTED; m_state = TELNET_CONN_STATE_CONNECTED;
} } else {
else { m_name = "tp<->srv";
m_name = "tp<->srv"; m_state = TELNET_CONN_STATE_FREE;
m_state = TELNET_CONN_STATE_FREE;
} }
m_timer_running = false; m_timer_running = false;
uv_tcp_init(sess->get_loop(), &m_handle); uv_tcp_init(sess->get_loop(), &m_handle);
m_handle.data = this; m_handle.data = this;
uv_async_init(sess->get_loop(), &m_stop_handle, _on_stop_cb);
m_stop_handle.data = this;
} }
TelnetConn::~TelnetConn() { TelnetConn::~TelnetConn() {
} }
bool TelnetConn::start_recv() { bool TelnetConn::start_recv() {
int err = uv_read_start((uv_stream_t *)&m_handle, _on_alloc, _on_recv); int err = uv_read_start((uv_stream_t *) &m_handle, _on_alloc, _on_recv);
if (err != 0) { if (err != 0) {
EXLOGE("[telnet] [%s] can not start to read.\n", m_name); EXLOGE("[telnet] [%s] can not start to read.\n", m_name);
m_session->close(TP_SESS_STAT_ERR_IO); m_session->close(TP_SESS_STAT_ERR_IO);
return false; return false;
} }
return true; return true;
} }
void TelnetConn::close() { void TelnetConn::close() {
if (m_state == TELNET_CONN_STATE_FREE || m_state == TELNET_CONN_STATE_CLOSING) if (m_state == TELNET_CONN_STATE_FREE || m_state == TELNET_CONN_STATE_CLOSING)
return; return;
uv_async_send(&m_stop_handle);
}
if (m_timer_running) { void TelnetConn::_do_close() {
m_timer_running = false; if (m_state == TELNET_CONN_STATE_FREE || m_state == TELNET_CONN_STATE_CLOSING)
uv_timer_stop(&m_timer_connect_timeout); return;
EXLOGW("[telnet] [%s] try to close while it connecting.\n", m_name); if (m_timer_running) {
m_state = TELNET_CONN_STATE_CLOSING; m_timer_running = false;
uv_close(handle(), NULL); uv_timer_stop(&m_timer_connect_timeout);
return; EXLOGW("[telnet] [%s] try to close while it connecting.\n", m_name);
} m_state = TELNET_CONN_STATE_CLOSING;
uv_close(handle(), NULL);
uv_read_stop((uv_stream_t*)&m_handle); return;
uv_close(handle() , _uv_on_closed); }
uv_read_stop((uv_stream_t *) &m_handle);
uv_close(handle(), _uv_on_closed);
}
//static
void TelnetConn::_on_stop_cb(uv_async_t *handle) {
TelnetConn *_this = (TelnetConn *) handle->data;
uv_close((uv_handle_t *) &_this->m_stop_handle, _this->_on_stop_handler_closed);
}
//static
void TelnetConn::_on_stop_handler_closed(uv_handle_t *handle) {
TelnetConn *_this = (TelnetConn *) handle->data;
_this->_do_close();
} }
void TelnetConn::_uv_on_closed(uv_handle_t *handle) { void TelnetConn::_uv_on_closed(uv_handle_t *handle) {
TelnetConn *_this = (TelnetConn *)handle->data; TelnetConn *_this = (TelnetConn *) handle->data;
_this->m_state = TELNET_CONN_STATE_FREE; _this->m_state = TELNET_CONN_STATE_FREE;
_this->m_session->on_conn_close(); _this->m_session->on_conn_close();
} }
void TelnetConn::_on_alloc(uv_handle_t *handle, size_t suggested_size, uv_buf_t *buf) { void TelnetConn::_on_alloc(uv_handle_t *handle, size_t suggested_size, uv_buf_t *buf) {
TelnetConn *_this = (TelnetConn *) handle->data; // TelnetConn *_this = (TelnetConn *) handle->data;
buf->base = (char *) calloc(1, suggested_size); buf->base = (char *) calloc(1, suggested_size);
buf->len = suggested_size; buf->len = suggested_size;
} }
@ -80,16 +98,15 @@ void TelnetConn::_on_recv(uv_stream_t *handle, ssize_t nread, const uv_buf_t *bu
if (nread == 0) { if (nread == 0) {
free(buf->base); free(buf->base);
return; return;
} } else if (nread < 0) {
else if (nread < 0) {
free(buf->base); free(buf->base);
if (nread == UV_EOF) if (nread == UV_EOF)
EXLOGD("[telnet] [%s] [recv] disconnected.\n", _this->m_name); EXLOGD("[telnet] [%s] [recv] disconnected.\n", _this->m_name);
else if(nread == UV_ECONNRESET) else if (nread == UV_ECONNRESET)
EXLOGD("[telnet] [%s] [recv] connection reset by peer.\n", _this->m_name); EXLOGD("[telnet] [%s] [recv] connection reset by peer.\n", _this->m_name);
else else
EXLOGD("[telnet] [%s] [recv] %s.\n", _this->m_name, _uv_str_error(nread).c_str()); EXLOGD("[telnet] [%s] [recv] %s.\n", _this->m_name, _uv_str_error(nread).c_str());
// if (nread == -4077) // if (nread == -4077)
@ -101,13 +118,12 @@ void TelnetConn::_on_recv(uv_stream_t *handle, ssize_t nread, const uv_buf_t *bu
_this->m_session->close(TP_SESS_STAT_END); _this->m_session->close(TP_SESS_STAT_END);
return; return;
} } else {
else {
// #ifdef LOG_DATA // #ifdef LOG_DATA
// if(!_this->m_session->is_relay()) // if(!_this->m_session->is_relay())
// EXLOG_BIN((ex_u8*)buf->base, nread, "[telnet] [%s] RECV %d.", _this->m_name, nread); // EXLOG_BIN((ex_u8*)buf->base, nread, "[telnet] [%s] RECV %d.", _this->m_name, nread);
// #endif // #endif
} }
_this->m_buf_data.append((ex_u8 *) buf->base, nread); _this->m_buf_data.append((ex_u8 *) buf->base, nread);
free(buf->base); free(buf->base);
@ -129,7 +145,7 @@ bool TelnetConn::_raw_send(const ex_u8 *data, size_t size) {
// EXLOG_BIN(data, size, "[telnet] [%s] SEND %dB.", m_name, size); // EXLOG_BIN(data, size, "[telnet] [%s] SEND %dB.", m_name, size);
// #endif // #endif
uv_write_t *w = (uv_write_t *) calloc(1, sizeof(uv_write_t)); uv_write_t *w = (uv_write_t *) calloc(1, sizeof(uv_write_t));
ex_u8 *_data = (ex_u8 *) calloc(1, size); ex_u8 *_data = (ex_u8 *) calloc(1, size);
if (NULL == _data) { if (NULL == _data) {
@ -170,13 +186,14 @@ void TelnetConn::connect(const char *server_ip, ex_u16 server_port) {
if (m_state == TELNET_CONN_STATE_CONNECTED) { if (m_state == TELNET_CONN_STATE_CONNECTED) {
// 当前已经连接上了服务器了,断开重连 // 当前已经连接上了服务器了,断开重连
EXLOGV("[telnet] [%s] [%s] try to disconnect. %s:%d\n", m_name, m_session->client_addr(), server_ip, server_port); EXLOGV("[telnet] [%s] [%s] try to disconnect from real TELNET server %s:%d and reconnect.\n", m_name,
m_state = TELNET_CONN_STATE_CLOSING; m_session->client_addr(), server_ip, server_port);
m_state = TELNET_CONN_STATE_CLOSING;
uv_close((uv_handle_t *) &m_handle, _uv_on_reconnect); uv_close((uv_handle_t *) &m_handle, _uv_on_reconnect);
return; return;
} } else {
else { EXLOGV("[telnet] [%s] [%s] try to connect to real TELNET server %s:%d\n", m_name, m_session->client_addr(),
EXLOGV("[telnet] [%s] [%s] try to connect to real TELNET server at %s:%d\n", m_name, m_session->client_addr(), server_ip, server_port); server_ip, server_port);
} }
struct sockaddr_in addr; struct sockaddr_in addr;
@ -185,53 +202,51 @@ void TelnetConn::connect(const char *server_ip, ex_u16 server_port) {
uv_connect_t *conn_req = (uv_connect_t *) calloc(1, sizeof(uv_connect_t)); uv_connect_t *conn_req = (uv_connect_t *) calloc(1, sizeof(uv_connect_t));
conn_req->data = this; conn_req->data = this;
// 设置一个超时回调,如果超时发生时连接尚未完成,就报错 // 设置一个超时回调,如果超时发生时连接尚未完成,就报错
uv_timer_init(m_session->get_loop(), &m_timer_connect_timeout); uv_timer_init(m_session->get_loop(), &m_timer_connect_timeout);
m_timer_connect_timeout.data = this; m_timer_connect_timeout.data = this;
#ifdef EX_DEBUG #ifdef EX_DEBUG
uv_timer_start(&m_timer_connect_timeout, _uv_on_connect_timeout, 5000, 0); uv_timer_start(&m_timer_connect_timeout, _uv_on_connect_timeout, 5000, 0);
#else #else
uv_timer_start(&m_timer_connect_timeout, _uv_on_connect_timeout, 10000, 0); uv_timer_start(&m_timer_connect_timeout, _uv_on_connect_timeout, 10000, 0);
#endif #endif
m_timer_running = true; m_timer_running = true;
//m_is_connecting = true; m_state = TELNET_CONN_STATE_CONNECTING;
m_state = TELNET_CONN_STATE_CONNECTING;
int err = 0; int err = 0;
if ((err = uv_tcp_connect(conn_req, &m_handle, (const struct sockaddr *) &addr, _uv_on_connected)) != 0) { if ((err = uv_tcp_connect(conn_req, &m_handle, (const struct sockaddr *) &addr, _uv_on_connected)) != 0) {
free(conn_req); free(conn_req);
EXLOGE("[telnet] [%s] can not connect to server: %s\n", m_name, uv_strerror(err)); EXLOGE("[telnet] [%s] can not connect to server: %s\n", m_name, uv_strerror(err));
m_timer_running = false; m_timer_running = false;
uv_timer_stop(&m_timer_connect_timeout); uv_timer_stop(&m_timer_connect_timeout);
uv_close((uv_handle_t*)&m_timer_connect_timeout, _uv_on_timer_connect_timeout_closed); uv_close((uv_handle_t *) &m_timer_connect_timeout, NULL);
m_state = TELNET_CONN_STATE_FREE; m_state = TELNET_CONN_STATE_FREE;
m_session->close(TP_SESS_STAT_ERR_CONNECT); m_session->close(TP_SESS_STAT_ERR_CONNECT);
} }
} }
void TelnetConn::_uv_on_connect_timeout(uv_timer_t *timer) void TelnetConn::_uv_on_connect_timeout(uv_timer_t *timer) {
{ TelnetConn *_this = (TelnetConn *) timer->data;
TelnetConn *_this = (TelnetConn *)timer->data;
if (_this->m_timer_running) { if (_this->m_timer_running) {
_this->m_timer_running = false; _this->m_timer_running = false;
uv_timer_stop(&_this->m_timer_connect_timeout); uv_timer_stop(&_this->m_timer_connect_timeout);
uv_close((uv_handle_t*)&_this->m_timer_connect_timeout, _this->_uv_on_timer_connect_timeout_closed); uv_close((uv_handle_t *) &_this->m_timer_connect_timeout, NULL);
} }
// 如果在连接成功之前就超时了,则关闭连接 // 如果在连接成功之前就超时了,则关闭连接
EXLOGE("[telnet] [%s] timeout when connect to real TELNET server, cancel connection.\n", _this->m_name); EXLOGE("[telnet] [%s] timeout when connect to real TELNET server.\n", _this->m_name);
_this->m_state = TELNET_CONN_STATE_CLOSING; _this->m_state = TELNET_CONN_STATE_CLOSING;
uv_close(_this->handle(), _uv_on_closed); uv_close(_this->handle(), _uv_on_closed);
} }
void TelnetConn::_uv_on_reconnect(uv_handle_t *handle) { void TelnetConn::_uv_on_reconnect(uv_handle_t *handle) {
TelnetConn *_this = (TelnetConn *)handle->data; TelnetConn *_this = (TelnetConn *) handle->data;
_this->m_state = TELNET_CONN_STATE_FREE; _this->m_state = TELNET_CONN_STATE_FREE;
uv_tcp_init(_this->m_session->get_loop(), &_this->m_handle); uv_tcp_init(_this->m_session->get_loop(), &_this->m_handle);
_this->m_handle.data = _this; _this->m_handle.data = _this;
@ -240,33 +255,33 @@ void TelnetConn::_uv_on_reconnect(uv_handle_t *handle) {
} }
void TelnetConn::_uv_on_connected(uv_connect_t *req, int status) { void TelnetConn::_uv_on_connected(uv_connect_t *req, int status) {
TelnetConn *_this = (TelnetConn *)req->data; TelnetConn *_this = (TelnetConn *) req->data;
free(req); free(req);
if (_this->m_timer_running) { if (_this->m_timer_running) {
_this->m_timer_running = false; _this->m_timer_running = false;
uv_timer_stop(&_this->m_timer_connect_timeout); uv_timer_stop(&_this->m_timer_connect_timeout);
uv_close((uv_handle_t*)&_this->m_timer_connect_timeout, _this->_uv_on_timer_connect_timeout_closed); uv_close((uv_handle_t *) &_this->m_timer_connect_timeout, NULL);
} }
if (status != 0) { if (status != 0) {
EXLOGE("[telnet] [%s] cannot connect to real TELNET server. %s\n", _this->m_name, uv_strerror(status)); EXLOGE("[telnet] [%s] cannot connect to real TELNET server. %s\n", _this->m_name, uv_strerror(status));
_this->m_state = TELNET_CONN_STATE_FREE; _this->m_state = TELNET_CONN_STATE_FREE;
_this->m_session->close(TP_SESS_STAT_ERR_CONNECT); _this->m_session->close(TP_SESS_STAT_ERR_CONNECT);
return; return;
} }
EXLOGW("[telnet] [%s] real TELNET server connected.\n", _this->m_session->client_addr()); EXLOGW("[telnet] [%s] real TELNET server connected.\n", _this->m_session->client_addr());
_this->m_state = TELNET_CONN_STATE_CONNECTED; _this->m_state = TELNET_CONN_STATE_CONNECTED;
if (!_this->start_recv()) { if (!_this->start_recv()) {
_this->m_session->close(TP_SESS_STAT_ERR_IO); _this->m_session->close(TP_SESS_STAT_ERR_IO);
return; return;
} }
_this->m_session->do_next(_this, s_server_connected); _this->m_session->do_next(_this, s_server_connected);
} }
//static //static
void TelnetConn::_uv_on_timer_connect_timeout_closed(uv_handle_t *handle) { //void TelnetConn::_uv_on_timer_connect_timeout_closed(uv_handle_t *handle) {
} //}

View File

@ -8,10 +8,10 @@
//#define LOG_DATA //#define LOG_DATA
#define TELNET_CONN_STATE_FREE 0 // not connected yet or closed #define TELNET_CONN_STATE_FREE 0 // not connected yet or closed
#define TELNET_CONN_STATE_CONNECTING 1 // connecting #define TELNET_CONN_STATE_CONNECTING 1 // connecting
#define TELNET_CONN_STATE_CONNECTED 2 // connected. #define TELNET_CONN_STATE_CONNECTED 2 // connected.
#define TELNET_CONN_STATE_CLOSING 3 // closing. #define TELNET_CONN_STATE_CLOSING 3 // closing.
class TelnetSession; class TelnetSession;
@ -19,40 +19,59 @@ class TelnetSession;
class TelnetConn { class TelnetConn {
public: public:
TelnetConn(TelnetSession *sess, bool is_server_side); TelnetConn(TelnetSession *sess, bool is_server_side);
~TelnetConn(); ~TelnetConn();
TelnetSession *session() { return m_session; } TelnetSession *session() { return m_session; }
// just for debug-info // just for debug-info
const char *name() const { return m_name; } const char *name() const { return m_name; }
bool is_server_side() const { return m_is_server; } bool is_server_side() const { return m_is_server; }
ex_u8 state() const { return m_state; }
ex_u8 state() const { return m_state; }
uv_handle_t *handle() { return (uv_handle_t *) &m_handle; } uv_handle_t *handle() { return (uv_handle_t *) &m_handle; }
uv_tcp_t *tcp_handle() { return &m_handle; } uv_tcp_t *tcp_handle() { return &m_handle; }
uv_stream_t *stream_handle() { return (uv_stream_t *) &m_handle; } uv_stream_t *stream_handle() { return (uv_stream_t *) &m_handle; }
MemBuffer &data() { return m_buf_data; } MemBuffer &data() { return m_buf_data; }
bool send(MemBuffer &mbuf); bool send(MemBuffer &mbuf);
bool send(const ex_u8 *data, size_t size); bool send(const ex_u8 *data, size_t size);
// connect to real server, for proxy-client-side only. // connect to real server, for proxy-client-side only.
void connect(const char *server_ip, ex_u16 server_port = 3389); void connect(const char *server_ip, ex_u16 server_port = 3389);
// try to close this connection. return current TELNET_CONN_STATE_XXXX.
void close(); // try to close this connection. return current TELNET_CONN_STATE_XXXX.
bool start_recv(); void close();
bool start_recv();
private: private:
static void _on_alloc(uv_handle_t *handle, size_t suggested_size, uv_buf_t *buf); static void _on_alloc(uv_handle_t *handle, size_t suggested_size, uv_buf_t *buf);
static void _on_recv(uv_stream_t *handle, ssize_t nread, const uv_buf_t *buf); static void _on_recv(uv_stream_t *handle, ssize_t nread, const uv_buf_t *buf);
static void _on_send_done(uv_write_t *req, int status); static void _on_send_done(uv_write_t *req, int status);
static void _uv_on_connect_timeout(uv_timer_t *timer);
static void _uv_on_connect_timeout(uv_timer_t *timer);
static void _uv_on_connected(uv_connect_t *req, int status); static void _uv_on_connected(uv_connect_t *req, int status);
static void _uv_on_reconnect(uv_handle_t *handle); static void _uv_on_reconnect(uv_handle_t *handle);
static void _uv_on_closed(uv_handle_t *handle);
static void _uv_on_timer_connect_timeout_closed(uv_handle_t *handle); static void _uv_on_closed(uv_handle_t *handle);
// static void _uv_on_timer_connect_timeout_closed(uv_handle_t *handle);
static void _on_stop_cb(uv_async_t *handle);
static void _on_stop_handler_closed(uv_handle_t *handle);
void _do_close();
bool _raw_send(const ex_u8 *data, size_t size); bool _raw_send(const ex_u8 *data, size_t size);
@ -60,14 +79,15 @@ private:
TelnetSession *m_session; TelnetSession *m_session;
bool m_is_server; bool m_is_server;
// for debug-info. // for debug-info.
const char *m_name; const char *m_name;
uv_tcp_t m_handle; uv_tcp_t m_handle;
uv_timer_t m_timer_connect_timeout; uv_timer_t m_timer_connect_timeout;
bool m_timer_running; // does m_timer_connect_timeout initialized and started. bool m_timer_running; // does m_timer_connect_timeout initialized and started.
uv_async_t m_stop_handle; // event for stop whole listener handler.
ex_u8 m_state; // TELNET_CONN_STATE_XXXX ex_u8 m_state; // TELNET_CONN_STATE_XXXX
// 作为client需要的数据远程主机信息 // 作为client需要的数据远程主机信息
std::string m_server_ip; std::string m_server_ip;

View File

@ -4,228 +4,239 @@
TelnetProxy g_telnet_proxy; TelnetProxy g_telnet_proxy;
TelnetProxy::TelnetProxy() : ExThreadBase("telnet-proxy-thread") TelnetProxy::TelnetProxy() : ExThreadBase("telnet-proxy-thread") {
{ memset(&m_loop, 0, sizeof(uv_loop_t));
memset(&m_loop, 0, sizeof(uv_loop_t)); m_timer_counter = 0;
m_timer_counter = 0; m_noop_timeout_sec = 900;
m_noop_timeout_sec = 900;
} }
TelnetProxy::~TelnetProxy() TelnetProxy::~TelnetProxy() {
{ if (!m_sessions.empty())
if (m_sessions.size() > 0) EXLOGE("[telnet] not all session stopped.\n");
EXLOGE("[telnet] not all session stopped.\n");
} }
bool TelnetProxy::init() bool TelnetProxy::init() {
{ if (0 != uv_loop_init(&m_loop))
if (0 != uv_loop_init(&m_loop)) return false;
return false;
if (0 != uv_async_init(&m_loop, &m_clean_session_handle, _on_clean_session_cb)) if (0 != uv_async_init(&m_loop, &m_clean_session_handle, _on_clean_session_cb))
return false; return false;
m_clean_session_handle.data = this; m_clean_session_handle.data = this;
m_host_ip = g_telnet_env.bind_ip; if (0 != uv_async_init(&m_loop, &m_stop_handle, _on_stop_cb))
m_host_port = g_telnet_env.bind_port; return false;
m_stop_handle.data = this;
if (0 != uv_tcp_init(&m_loop, &m_handle)) m_host_ip = g_telnet_env.bind_ip;
return false; m_host_port = g_telnet_env.bind_port;
m_handle.data = this;
return true; if (0 != uv_tcp_init(&m_loop, &m_listener_handle))
return false;
m_listener_handle.data = this;
return true;
} }
void TelnetProxy::timer() { void TelnetProxy::timer() {
// timer() will be called per one second, and I will do my job per 5 seconds. // timer() will be called per one second, and I will do my job per 5 seconds.
m_timer_counter++; m_timer_counter++;
if (m_timer_counter < 5) if (m_timer_counter < 5)
return; return;
m_timer_counter = 0; m_timer_counter = 0;
ExThreadSmartLock locker(m_lock); ExThreadSmartLock locker(m_lock);
ex_u32 t_now = (ex_u32)time(NULL); ex_u32 t_now = (ex_u32) time(NULL);
ts_telnet_sessions::iterator it = m_sessions.begin(); ts_telnet_sessions::iterator it = m_sessions.begin();
for (; it != m_sessions.end(); ++it) for (; it != m_sessions.end(); ++it) {
{ it->first->save_record();
it->first->save_record(); if (0 != m_noop_timeout_sec)
if(0 != m_noop_timeout_sec) it->first->check_noop_timeout(t_now, m_noop_timeout_sec);
it->first->check_noop_timeout(t_now, m_noop_timeout_sec); }
}
} }
void TelnetProxy::set_cfg(ex_u32 noop_timeout) { void TelnetProxy::set_cfg(ex_u32 noop_timeout) {
m_noop_timeout_sec = noop_timeout; m_noop_timeout_sec = noop_timeout;
} }
void TelnetProxy::kill_sessions(const ex_astrs& sessions) { void TelnetProxy::kill_sessions(const ex_astrs &sessions) {
ExThreadSmartLock locker(m_lock); ExThreadSmartLock locker(m_lock);
ts_telnet_sessions::iterator it = m_sessions.begin(); ts_telnet_sessions::iterator it = m_sessions.begin();
for (; it != m_sessions.end(); ++it) { for (; it != m_sessions.end(); ++it) {
for (size_t i = 0; i < sessions.size(); ++i) { for (size_t i = 0; i < sessions.size(); ++i) {
if (it->first->sid() == sessions[i]) { if (it->first->sid() == sessions[i]) {
EXLOGW("[telnet] try to kill %s\n", sessions[i].c_str()); EXLOGW("[telnet] try to kill %s\n", sessions[i].c_str());
it->first->check_noop_timeout(0, 0); // 立即结束 it->first->check_noop_timeout(0, 0); // 立即结束
} }
} }
} }
} }
void TelnetProxy::_thread_loop(void) void TelnetProxy::_thread_loop(void) {
{ struct sockaddr_in addr;
struct sockaddr_in addr; if (0 != uv_ip4_addr(m_host_ip.c_str(), m_host_port, &addr)) {
if (0 != uv_ip4_addr(m_host_ip.c_str(), m_host_port, &addr)) { EXLOGE("[telnet] invalid ip/port for TELNET listener.\n");
EXLOGE("[telnet] invalid ip/port for TELNET listener.\n"); return;
return; }
}
if (0 != uv_tcp_bind(&m_handle, (const struct sockaddr*) &addr, 0)) { if (0 != uv_tcp_bind(&m_listener_handle, (const struct sockaddr *) &addr, 0)) {
EXLOGE("[telnet] can not bind %s:%d.\n", m_host_ip.c_str(), m_host_port); EXLOGE("[telnet] can not bind %s:%d.\n", m_host_ip.c_str(), m_host_port);
return; return;
} }
// 开始监听,有客户端连接到来时,会回调 _on_client_connect() // 开始监听,有客户端连接到来时,会回调 _on_client_connect()
if (0 != uv_listen((uv_stream_t*)&m_handle, 8, _on_client_connect)) { if (0 != uv_listen((uv_stream_t *) &m_listener_handle, 8, _on_client_connect)) {
EXLOGE("[telnet] can not listen on %s:%d.\n", m_host_ip.c_str(), m_host_port); EXLOGE("[telnet] can not listen on %s:%d.\n", m_host_ip.c_str(), m_host_port);
return; return;
} }
EXLOGI("[telnet] TeleportServer-TELNET ready on %s:%d\n", m_host_ip.c_str(), m_host_port); EXLOGI("[telnet] TeleportServer-TELNET ready on %s:%d\n", m_host_ip.c_str(), m_host_port);
int err = 0; int err = 0;
if ((err = uv_run(&m_loop, UV_RUN_DEFAULT)) != 0) { if ((err = uv_run(&m_loop, UV_RUN_DEFAULT)) != 0) {
EXLOGE("[telnet] main-loop end. %s\n", uv_strerror(err)); EXLOGE("[telnet] main-loop end. %s\n", uv_strerror(err));
} }
// 注意,如果在 uv_loop_close() 内部崩溃可能某个uv的handle未关闭。 // https://github.com/libuv/libuv/issues/709
uv_loop_close(&m_loop); //
// uv_close is not thread safe (see the docs here. That means you cannot call it from outside of the loop thread.
// http://docs.libuv.org/en/v1.x/design.html#the-i-o-loop
//
// The usual way to approach this when the loop is running in another thread is to use a uv_async_t handle,
// because uv_async_send is thread safe. You could have one such handle and call uv_stop in its callback.
// Then, after uv_run returns you can use uv_walk to close all handles and run the loop one last time so
// that close callbacks are called.
EXLOGV("[telnet] main-loop end.\n"); // 注意,如果在 uv_loop_close() 内部崩溃可能某个uv的handle未关闭。
} uv_loop_close(&m_loop);
void TelnetProxy::_set_stop_flag(void) { EXLOGV("[telnet] main-loop end.\n");
m_stop_flag = true;
if (m_is_running) {
uv_close((uv_handle_t*)&m_handle, _on_listener_closed);
}
} }
// static // static
void TelnetProxy::_on_listener_closed(uv_handle_t* handle) void TelnetProxy::_on_stop(void) {
{ ExThreadBase::_on_stop();
TelnetProxy* _this = (TelnetProxy*)handle->data;
EXLOGV("[telnet] listener close.\n");
_this->_close_all_sessions(); if (m_is_running) {
uv_async_send(&m_stop_handle);
}
}
// static
void TelnetProxy::_on_listener_closed(uv_handle_t *handle) {
TelnetProxy *_this = (TelnetProxy *) handle->data;
EXLOGV("[telnet] listener close.\n");
uv_close((uv_handle_t *) &_this->m_stop_handle, _on_stop_handle_closed);
// _this->_close_all_sessions();
} }
void TelnetProxy::clean_session() { void TelnetProxy::clean_session() {
uv_async_send(&m_clean_session_handle); uv_async_send(&m_clean_session_handle);
} }
void TelnetProxy::_close_all_sessions(void) void TelnetProxy::_close_all_sessions(void) {
{ ExThreadSmartLock locker(m_lock);
ExThreadSmartLock locker(m_lock);
if (m_sessions.size() == 0) { if (m_sessions.empty()) {
_close_clean_session_handle(); _close_clean_session_handle();
return; return;
} }
ts_telnet_sessions::iterator it = m_sessions.begin(); ts_telnet_sessions::iterator it = m_sessions.begin();
for (; it != m_sessions.end(); ++it) for (; it != m_sessions.end(); ++it) {
{ it->first->close(TP_SESS_STAT_ERR_RESET);
it->first->close(TP_SESS_STAT_ERR_RESET); }
}
} }
// static // static
void TelnetProxy::_on_clean_session_cb(uv_async_t* handle) void TelnetProxy::_on_clean_session_cb(uv_async_t *handle) {
{ TelnetProxy *_this = (TelnetProxy *) handle->data;
TelnetProxy* _this = (TelnetProxy*)handle->data;
// check closed session // check closed session
ExThreadSmartLock locker(_this->m_lock); ExThreadSmartLock locker(_this->m_lock);
ts_telnet_sessions::iterator it = _this->m_sessions.begin(); ts_telnet_sessions::iterator it = _this->m_sessions.begin();
for (; it != _this->m_sessions.end(); ) for (; it != _this->m_sessions.end();) {
{ if (it->first->is_closed()) {
if (it->first->is_closed()) { delete it->first;
delete it->first; _this->m_sessions.erase(it++);
_this->m_sessions.erase(it++); EXLOGD("[telnet] - removed one session.\n");
EXLOGD("[telnet] - removed one session.\n"); } else {
} it++;
else { }
it++; }
}
}
if (_this->m_stop_flag && _this->m_sessions.size() == 0) { if (_this->m_need_stop && _this->m_sessions.empty()) {
_this->_close_clean_session_handle(); _this->_close_clean_session_handle();
} }
}
////static
//void TelnetProxy::_on_clean_session_handle_closed(uv_handle_t *handle) {
//}
//static
void TelnetProxy::_on_stop_handle_closed(uv_handle_t *handle) {
TelnetProxy *_this = (TelnetProxy *) handle->data;
_this->_close_all_sessions();
} }
//static //static
void TelnetProxy::_on_clean_session_handle_closed(uv_handle_t *handle) { void TelnetProxy::_on_stop_cb(uv_async_t *handle) {
TelnetProxy *_this = (TelnetProxy *) handle->data;
uv_close((uv_handle_t *) &_this->m_listener_handle, _on_listener_closed);
} }
// static // static
void TelnetProxy::_on_client_connect(uv_stream_t* server, int status) void TelnetProxy::_on_client_connect(uv_stream_t *server, int status) {
{ if (0 != status)
if (0 != status) return;
return;
TelnetProxy* _this = (TelnetProxy*)server->data; TelnetProxy *_this = (TelnetProxy *) server->data;
_this->_on_accept(server); _this->_on_accept(server);
} }
bool TelnetProxy::_on_accept(uv_stream_t* server) bool TelnetProxy::_on_accept(uv_stream_t *server) {
{ TelnetSession *sess = new TelnetSession(this);
TelnetSession* sess = new TelnetSession(this);
if (0 != uv_accept(server, sess->client()->stream_handle())) if (0 != uv_accept(server, sess->client()->stream_handle())) {
{ EXLOGE("[telnet] socket accept failed.\n");
EXLOGE("[telnet] socket accept failed.\n"); delete sess;
delete sess; return false;
return false; }
}
if (m_stop_flag) if (m_need_stop) {
{ delete sess;
delete sess; return false;
return false; }
}
// 获取客户端IP地址和端口号 // 获取客户端IP地址和端口号
struct sockaddr sock_client; struct sockaddr sock_client;
int namelen = sizeof(sock_client); int namelen = sizeof(sock_client);
if (0 == uv_tcp_getpeername(sess->client()->tcp_handle(), &sock_client, &namelen)) if (0 == uv_tcp_getpeername(sess->client()->tcp_handle(), &sock_client, &namelen)) {
{ sockaddr_in *addrin = (sockaddr_in *) &sock_client;
sockaddr_in* addrin = (sockaddr_in*)&sock_client; char ip[17] = {0};
char ip[17] = { 0 }; if (0 == uv_ip4_name(addrin, ip, sizeof(ip))) {
if (0 == uv_ip4_name(addrin, ip, sizeof(ip))) char client_addr[64] = {0};
{ snprintf(client_addr, 64, "%s:%d", ip, addrin->sin_port);
char client_addr[64] = { 0 }; sess->client_addr(client_addr);
snprintf(client_addr, 64, "%s:%d", ip, addrin->sin_port); }
sess->client_addr(client_addr); }
}
}
EXLOGV("\n=================== NEW TELNET CLIENT [%s] ============\n", sess->client_addr()); EXLOGV("\n=================== NEW TELNET CLIENT [%s] ============\n", sess->client_addr());
{ {
ExThreadSmartLock locker(m_lock); ExThreadSmartLock locker(m_lock);
m_sessions.insert(std::make_pair(sess, 0)); m_sessions.insert(std::make_pair(sess, 0));
} }
sess->client()->start_recv(); sess->client()->start_recv();
return true; return true;
} }
void TelnetProxy::_close_clean_session_handle() { void TelnetProxy::_close_clean_session_handle() {
uv_close((uv_handle_t*)&m_clean_session_handle, _on_clean_session_handle_closed); uv_close((uv_handle_t *) &m_clean_session_handle, NULL);
} }

View File

@ -6,53 +6,65 @@
#include "telnet_session.h" #include "telnet_session.h"
typedef std::map<TelnetSession*, unsigned char> ts_telnet_sessions; typedef std::map<TelnetSession *, unsigned char> ts_telnet_sessions;
class TelnetProxy : public ExThreadBase class TelnetProxy : public ExThreadBase {
{
public: public:
TelnetProxy(); TelnetProxy();
~TelnetProxy();
bool init(); ~TelnetProxy();
void timer();
void set_cfg(ex_u32 noop_timeout);
void kill_sessions(const ex_astrs& sessions);
uv_loop_t* get_loop() { return &m_loop; } bool init();
void clean_session(); void timer();
void set_cfg(ex_u32 noop_timeout);
void kill_sessions(const ex_astrs &sessions);
uv_loop_t *get_loop() { return &m_loop; }
void clean_session();
protected: protected:
void _thread_loop(); void _thread_loop();
void _set_stop_flag();
void _close_all_sessions(); void _on_stop();
void _close_clean_session_handle();
void _close_all_sessions();
void _close_clean_session_handle();
private: private:
static void _on_client_connect(uv_stream_t* server, int status); static void _on_client_connect(uv_stream_t *server, int status);
static void _on_listener_closed(uv_handle_t* handle);
static void _on_clean_session_cb(uv_async_t* handle);
static void _on_clean_session_handle_closed(uv_handle_t *handle);
bool _on_accept(uv_stream_t* server); static void _on_listener_closed(uv_handle_t *handle);
static void _on_clean_session_cb(uv_async_t *handle);
// static void _on_clean_session_handle_closed(uv_handle_t *handle);
static void _on_stop_cb(uv_async_t *handle);
static void _on_stop_handle_closed(uv_handle_t *handle);
bool _on_accept(uv_stream_t *server);
private: private:
bool m_stop_flag; int m_timer_counter;
int m_timer_counter; ex_u32 m_noop_timeout_sec;
//
ex_u32 m_noop_timeout_sec;
uv_loop_t m_loop; uv_loop_t m_loop;
uv_tcp_t m_handle; uv_tcp_t m_listener_handle;
uv_async_t m_clean_session_handle; uv_async_t m_clean_session_handle;
uv_async_t m_stop_handle; // event for stop whole listener handler.
ExThreadLock m_lock; ExThreadLock m_lock;
ex_astr m_host_ip; ex_astr m_host_ip;
int m_host_port; int m_host_port;
ts_telnet_sessions m_sessions; ts_telnet_sessions m_sessions;
}; };
extern TelnetProxy g_telnet_proxy; extern TelnetProxy g_telnet_proxy;

File diff suppressed because it is too large Load Diff