使用tornado的异步请求方式解决了core和web相互调用时tornado无法响应的问题,目前普通用户SSH登录功能可用了,但是还有大量代码需要调整为异步方式。

pull/32/merge
Apex Liu 2017-03-03 01:40:16 +08:00
parent 411eab32f0
commit 6d43737a78
8 changed files with 316 additions and 126 deletions

View File

@ -6,7 +6,7 @@
#include <ex/ex_str.h>
#include <json/json.h>
//#include <json/json.h>
#include <algorithm>
#include <functional>
@ -64,20 +64,20 @@ sqlite3* TsDB::get_db()
return NULL;
}
bool TsDB::get_auth_info(int auth_id, TS_DB_AUTH_INFO& info)
bool TsDB::get_auth_info(int auth_id, Json::Value& jret)
{
Json::FastWriter json_writer;
Json::Value json_req;
json_req["method"] = "get_auth_info";
json_req["param"]["authid"] = auth_id;
Json::Value jreq;
jreq["method"] = "get_auth_info";
jreq["param"]["authid"] = auth_id;
ex_astr json_param;
json_param = json_writer.write(json_req);
json_param = json_writer.write(jreq);
// char tmp[128] = { 0 };
// ex_strformat(tmp, 127, "{\"method\":\"get_auth_info\",\"param\":[\"authid\":%d]}", auth_id);
//
// char tmp[128] = { 0 };
// ex_strformat(tmp, 127, "{\"method\":\"get_auth_info\",\"param\":[\"authid\":%d]}", auth_id);
//
ex_astr param;
//ts_url_encode("{\"method\":\"get_auth_info\",\"param\":[]}", param);
ts_url_encode(json_param.c_str(), param);
@ -92,8 +92,53 @@ bool TsDB::get_auth_info(int auth_id, TS_DB_AUTH_INFO& info)
EXLOGV("\n");
}
// {'account_lock': 0, 'encrypt': 1, 'account_name': 'apexliu', 'host_port': 22, 'cert_id': 0, 'user_name': 'root',
// 'auth_mode': 1, 'sys_type': 2, 'host_ip': '120.26.109.25', 'user_param': 'ogin:\nassword:',
// 'user_pswd': '40V4q3cT4/HT59YaSq8IVJjz0tBV2dmPbViZ4nCnWc4=', 'protocol': 2}
return false;
// {'user_auth': '40V4q3cT4/HT59YaSq8IVJjz0tBV2dmPbViZ4nCnWc4=', 'protocol': 2, 'auth_mode': 1, 'user_name': 'root', 'account_lock': 0, 'user_param': 'ogin:\nassword:', 'host_ip': '120.26.109.25', 'sys_type': 2, 'encrypt': 1, 'account_name': 'apexliu', 'host_port': 22}
// {"message": "", "code" : 0, "data" : {"user_auth": "40V4q3cT4/HT59YaSq8IVJjz0tBV2dmPbViZ4nCnWc4=", "protocol": 2, "auth_mode": 1, "user_name": "root", "account_lock": 0, "user_param": "ogin : \nassword : ", "host_ip": "120.26.109.25", "sys_type": 2, "encrypt": 1, "account_name": "apexliu", "host_port": 22}}
Json::Reader jreader;
if (!jreader.parse(body.c_str(), jret))
return false;
if (!jret.isObject())
return false;
if (!jret["data"].isObject())
return false;
Json::Value& _jret = jret["data"];
if (
!_jret["host_ip"].isString()
|| !_jret["host_port"].isInt()
|| !_jret["sys_type"].isInt()
|| !_jret["protocol"].isInt()
|| !_jret["auth_mode"].isInt()
|| !_jret["account_lock"].isInt()
|| !_jret["user_name"].isString()
|| !_jret["user_auth"].isString()
|| !_jret["user_param"].isString()
|| !_jret["account_name"].isString()
)
{
return false;
}
// info.host_ip = jret["host_ip"].asString();
// info.host_port = jret["host_port"].asInt();
// info.host_lock = 0;
// info.sys_type = jret["sys_type"].asInt();
// info.protocol = jret["protocol"].asInt();
// info.is_encrypt = true;
// info.auth_mode = jret["auth_mode"].asInt();
// info.account_lock = jret["account_lock"].asInt();
// info.user_name = jret["user_name"].asString();
// info.user_auth = jret["user_auth"].asString();
// info.user_param = jret["user_param"].asString();
// info.account_name = jret["account_name"].asString();
return true;
}
// bool TsDB::get_auth_info(int auth_id, TS_DB_AUTH_INFO& info)
@ -252,50 +297,50 @@ bool TsDB::get_auth_info(int auth_id, TS_DB_AUTH_INFO& info)
// return bFind;
// }
bool TsDB::get_cert_pri(int cert_id, ex_astr& cert_pri)
{
int result = 0;
char * errmsg = NULL;
char **dbResult;
int nRow, nColumn;
int i, j;
int index;
sqlite3* sql_exec = get_db();
if (sql_exec == NULL)
return false;
char szSQL[256] = { 0 };
ex_strformat(szSQL, 256, "SELECT a.cert_pri as cert_pri FROM ts_cert as a WHERE a.cert_id=%d", cert_id);
result = sqlite3_get_table(sql_exec, szSQL, &dbResult, &nRow, &nColumn, &errmsg);
if (result != 0)
return false;
//²éѯÊÇ·ñ´æÔÚ±í
index = nColumn;
for (i = 0; i < nRow; i++)
{
mapStringKey mapstringKey;
for (j = 0; j < nColumn; j++)
{
ex_astr temp = dbResult[j];
if (dbResult[index] == NULL)
mapstringKey[dbResult[j]] = "";
else
mapstringKey[dbResult[j]] = dbResult[index];
++index;
}
mapStringKey::iterator it = mapstringKey.find("cert_pri");
if (it != mapstringKey.end())
cert_pri = it->second.c_str();
}
sqlite3_free_table(dbResult);
return true;
}
// bool TsDB::get_cert_pri(int cert_id, ex_astr& cert_pri)
// {
// int result = 0;
// char * errmsg = NULL;
// char **dbResult;
// int nRow, nColumn;
// int i, j;
// int index;
//
// sqlite3* sql_exec = get_db();
// if (sql_exec == NULL)
// return false;
//
// char szSQL[256] = { 0 };
// ex_strformat(szSQL, 256, "SELECT a.cert_pri as cert_pri FROM ts_cert as a WHERE a.cert_id=%d", cert_id);
//
// result = sqlite3_get_table(sql_exec, szSQL, &dbResult, &nRow, &nColumn, &errmsg);
// if (result != 0)
// return false;
//
// //²éѯÊÇ·ñ´æÔÚ±í
// index = nColumn;
// for (i = 0; i < nRow; i++)
// {
// mapStringKey mapstringKey;
// for (j = 0; j < nColumn; j++)
// {
// ex_astr temp = dbResult[j];
// if (dbResult[index] == NULL)
// mapstringKey[dbResult[j]] = "";
// else
// mapstringKey[dbResult[j]] = dbResult[index];
//
// ++index;
// }
//
// mapStringKey::iterator it = mapstringKey.find("cert_pri");
// if (it != mapstringKey.end())
// cert_pri = it->second.c_str();
// }
//
// sqlite3_free_table(dbResult);
// return true;
// }
// bool TsDB::get_host_count(int& count)
// {

View File

@ -3,6 +3,7 @@
#include "ts_session.h"
#include <json/json.h>
#include <sqlite3.h>
#include <ex.h>
@ -67,11 +68,11 @@ public:
~TsDB();
// 根据认证ID获取认证信息包括服务器IP、端口用户名、密码或私钥、协议如RDP或SSH等等
bool get_auth_info(int auth_id, TS_DB_AUTH_INFO& info);
bool get_auth_info(int auth_id, Json::Value& jret);
bool get_cert_pri(int cert_id, ex_astr& cert_pri);
// bool get_cert_pri(int cert_id, ex_astr& cert_pri);
// 授权的主机数量
bool get_host_count(int& count);
// bool get_host_count(int& count);
// 重置log日志状态
bool update_reset_log();

View File

@ -143,6 +143,7 @@ void TsHttpRpc::_mg_event_handler(struct mg_connection *nc, int ev, void *ev_dat
}
else
{
EXLOGV("[core-rpc] got request method `%s`\n", method.c_str());
_this->_process_request(method, json_param, ret_buf);
}
}
@ -284,26 +285,98 @@ void TsHttpRpc::_rpc_func_request_session(const Json::Value& json_param, ex_astr
int authid = 0;
ex_astr host_ip;
int host_port = 0;
int sys_type = 0;
ex_astr user_name;
ex_astr user_auth;
ex_astr user_param;
ex_astr account_name;
bool account_lock = true;
int auth_mode = 0;
int protocol = 0;
int is_enc = 1;
// TODO: 如果authid为正整数这是一个长期保留的认证ID如果是负整数这是一个临时的认证ID用于连接测试如果为0则报错
if (!json_param["authid"].isNull())
{
// 使用认证ID的方式申请SID
if (!json_param["authid"].isNumeric())
if (!json_param["authid"].isInt())
{
_create_json_ret(buf, TSR_INVALID_JSON_PARAM);
return;
}
authid = json_param["authid"].asUInt();
authid = json_param["authid"].asInt();
TS_DB_AUTH_INFO ts_auth_info;
if (!g_db.get_auth_info(authid, ts_auth_info))
Json::Value jret;
if (!g_db.get_auth_info(authid, jret))
{
_create_json_ret(buf, TSR_GETAUTH_INFO_ERROR);
return;
}
Json::Value& _jret = jret["data"];
host_ip = _jret["host_ip"].asString();
host_port = _jret["host_port"].asInt();
//host_lock = 0;
sys_type = _jret["sys_type"].asInt();
protocol = _jret["protocol"].asInt();
is_enc = _jret["encrypt"].asInt() == 0 ? false : true;
auth_mode = _jret["auth_mode"].asInt();
account_lock = _jret["account_lock"].asInt() == 0 ? true : false;
user_name = _jret["user_name"].asString();
user_auth = _jret["user_auth"].asString();
user_param = _jret["user_param"].asString();
account_name = _jret["account_name"].asString();
}
_create_json_ret(buf, TSR_FAILED);
// 进一步判断参数是否合法
if (host_ip.length() == 0 || host_port >= 65535 || account_name.length() == 0
|| !(auth_mode == TS_AUTH_MODE_NONE || auth_mode == TS_AUTH_MODE_PASSWORD || auth_mode == TS_AUTH_MODE_PRIVATE_KEY)
|| !(protocol == TS_PROXY_PROTOCOL_RDP || protocol == TS_PROXY_PROTOCOL_SSH || protocol == TS_PROXY_PROTOCOL_TELNET)
//|| !(is_enc == 0 || is_enc == 1)
)
{
_create_json_ret(buf, TSR_INVALID_JSON_PARAM);
return;
}
if(is_enc)
{
if (user_auth.length() > 0)
{
ex_astr _auth;
if (!ts_db_field_decrypt(user_auth, _auth))
{
_create_json_ret(buf, TSR_FAILED);
return;
}
user_auth = _auth;
}
}
// 生成一个session-id内部会避免重复
ex_astr sid;
ex_rv rv = g_session_mgr.request_session(sid, account_name, authid,
host_ip, host_port, sys_type, protocol,
user_name, user_auth, user_param, auth_mode);
if (rv != TSR_OK)
{
_create_json_ret(buf, rv);
return;
}
EXLOGD("[core-rpc] new session-id: %s\n", sid.c_str());
Json::Value jr_data;
jr_data["sid"] = sid;
_create_json_ret(buf, TSR_OK, jr_data);
}
// void TsHttpRpc::_rpc_func_request_session(const ex_astr& func_args, ex_astr& buf)

View File

@ -51,7 +51,7 @@ private:
void _process_request(const ex_astr& func_cmd, const Json::Value& json_param, ex_astr& buf);
//void _create_json_ret(ex_astr& buf, Json::Value& jr_root);
void _create_json_ret(ex_astr& buf, int errcode, const Json::Value& jr_root);
void _create_json_ret(ex_astr& buf, int errcode, const Json::Value& jr_data);
void _create_json_ret(ex_astr& buf, int errcode);
void _create_json_ret(ex_astr& buf, int errcode, const char* message);

View File

@ -0,0 +1,29 @@
# -*- coding: utf-8 -*-
import json
import urllib.parse
import tornado.gen
import tornado.httpclient
from .configs import app_cfg
cfg = app_cfg()
__all__ = ['async_post_http']
@tornado.gen.coroutine
def async_post_http(url, values):
try:
v = json.dumps(values)
data = urllib.parse.quote(v).encode('utf-8')
c = tornado.httpclient.AsyncHTTPClient()
r = yield c.fetch(url, body=data, method='POST')
return json.loads(r.body.decode())
# return r.body
except:
# return {'code': -2, 'message': 'can not fetch {}'.format(url)}
return None

View File

@ -6,9 +6,12 @@ import os
import urllib
import urllib.parse
import urllib.request
import tornado.gen
import tornado.httpclient
from eom_app.app.configs import app_cfg
from eom_app.module import set
# from eom_app.module import set
from eom_app.app.util import *
from eom_app.module import host
from eom_app.module.common import *
from eom_common.eomcore.logger import *
@ -770,36 +773,46 @@ class UpdateHostExtendInfo(SwxAuthJsonHandler):
self.write_json(-1)
def post_http(url, values):
try:
# log.v('post_http(), url={}\n'.format(url))
user_agent = 'Mozilla/4.0 (compatible;MSIE 5.5; Windows NT)'
# values = {
# 'act': 'login',
# 'login[email]': 'yzhang@i9i8.com',
# 'login[password]': '123456'
# }
values = json.dumps(values)
data = urllib.parse.quote(values).encode('utf-8')
headers = {'User-Agent': user_agent}
req = urllib.request.Request(url=url, data=data, headers=headers)
response = urllib.request.urlopen(req, timeout=3)
the_page = response.read()
info = response.info()
_zip = info.get('Content-Encoding')
if _zip == 'gzip':
the_page = gzip.decompress(the_page)
else:
pass
the_page = the_page.decode()
# print(the_page)
return the_page
except:
return None
# @tornado.gen.coroutine
# def post_http(url, values):
# try:
# # log.v('post_http(), url={}\n'.format(url))
#
# # user_agent = 'Mozilla/4.0 (compatible;MSIE 5.5; Windows NT)'
# # values = {
# # 'act': 'login',
# # 'login[email]': 'yzhang@i9i8.com',
# # 'login[password]': '123456'
# # }
# values = json.dumps(values)
# data = urllib.parse.quote(values).encode('utf-8')
# # headers = {'User-Agent': user_agent}
#
# # req = urllib.request.Request(url=url, data=data, headers=headers)
# # response = urllib.request.urlopen(req, timeout=3)
#
# client = tornado.httpclient.AsyncHTTPClient()
# r = yield client.fetch(url, body=data, method='POST')
# print('----------', r.body)
# return r.body
#
#
# # the_page = response.read()
# # info = response.info()
# # _zip = info.get('Content-Encoding')
# # if _zip == 'gzip':
# # the_page = gzip.decompress(the_page)
# # else:
# # pass
# # the_page = the_page.decode()
# # # print(the_page)
# # return the_page
# except:
# return None
class GetSessionId(SwxAuthJsonHandler):
@tornado.gen.coroutine
def post(self, *args, **kwargs):
args = self.get_argument('args', None)
if args is not None:
@ -827,18 +840,34 @@ class GetSessionId(SwxAuthJsonHandler):
url = 'http://{}:{}/rpc'.format(ts_server_rpc_ip, ts_server_rpc_port)
req = {'method': 'request_session', 'param': {'authid': auth_id}}
return_data = post_http(url, req)
# values = json.dumps(req)
# data = urllib.parse.quote(values).encode('utf-8')
# client = tornado.httpclient.AsyncHTTPClient()
# r = yield client.fetch(url, body=data, method='POST')
# if r.code == 200:
# # self.write(r.body)
# print('+++++++++', r.body)
_yr = async_post_http(url, req)
return_data = yield _yr
if return_data is None:
return self.write_json(-1)
return_data = json.loads(return_data)
# return_data = result.decode()
# print('############', return_data)
# return_data = json.loads(result.decode())
if 'code' not in return_data:
return self.write_json(-1)
_code = return_data['code']
if _code != 0:
return self.write_json(_code)
try:
session_id = return_data['data']['sid']
except:
except IndexError:
return self.write_json(-1)
data = dict()

View File

@ -1,6 +1,6 @@
# -*- coding: utf-8 -*-
import tornado.web
# import tornado.web
import tornado.gen
import json
@ -11,7 +11,6 @@ from .base import SwxJsonHandler
class RpcHandler(SwxJsonHandler):
@tornado.web.asynchronous
@tornado.gen.coroutine
def get(self):
_uri = self.request.uri.split('?', 1)
@ -20,8 +19,10 @@ class RpcHandler(SwxJsonHandler):
self.write_json(-1, message='need request param.')
return
self._dispatch(urllib.parse.unquote(_uri[1]))
yield self._dispatch(urllib.parse.unquote(_uri[1]))
self.finish()
@tornado.gen.coroutine
def post(self):
# curl -X POST --data '{"method":"get_auth_info","param":{"authid":0}}' http://127.0.0.1:7190/rpc
req = self.request.body.decode('utf-8')
@ -29,8 +30,10 @@ class RpcHandler(SwxJsonHandler):
self.write_json(-1, message='need request param.')
return
self._dispatch(req)
yield self._dispatch(req)
self.finish()
@tornado.gen.coroutine
def _dispatch(self, req):
print('rpc-req:', req)
try:

View File

@ -785,7 +785,7 @@ def get_auth_info(auth_id):
','.join(['d.{}'.format(i) for i in field_d]),
auth_id)
print(str_sql)
# print(str_sql)
"""
"SELECT a.auth_id as auth_id, a.account_name as account_name, \
@ -802,35 +802,45 @@ def get_auth_info(auth_id):
db_ret = sql_exec.ExecProcQuery(str_sql)
if db_ret is None:
if db_ret is None or len(db_ret) > 1:
return None
ret = list()
for item in db_ret:
x = DbItem()
x.load(item,
['a_{}'.format(i) for i in field_a] +
['b_{}'.format(i) for i in field_b] +
['c_{}'.format(i) for i in field_c] +
['d_{}'.format(i) for i in field_d]
)
db_item = DbItem()
h = dict()
h['host_ip'] = x.b_host_ip
h['sys_type'] = x.b_host_sys_type
h['account_name'] = x.a_account_name
h['account_lock'] = x.d_account_lock
# h['host_lock'] = x.a_host_lock
h['host_port'] = x.b_host_port
h['protocol'] = x.b_protocol
h['encrypt'] = x.c_encrypt
h['auth_mode'] = x.c_auth_mode
h['user_name'] = x.c_user_name
h['user_param'] = x.c_user_param
h['user_pswd'] = x.c_user_pswd
h['cert_id'] = x.c_cert_id
db_item.load(db_ret[0],
['a_{}'.format(i) for i in field_a] +
['b_{}'.format(i) for i in field_b] +
['c_{}'.format(i) for i in field_c] +
['d_{}'.format(i) for i in field_d]
)
ret.append(h)
ret = dict()
ret['host_ip'] = db_item.b_host_ip
ret['sys_type'] = db_item.b_host_sys_type
ret['account_name'] = db_item.a_account_name
ret['account_lock'] = db_item.d_account_lock
# h['host_lock'] = x.a_host_lock
ret['host_port'] = db_item.b_host_port
ret['protocol'] = db_item.b_protocol
ret['encrypt'] = db_item.c_encrypt
ret['auth_mode'] = db_item.c_auth_mode
ret['user_name'] = db_item.c_user_name
ret['user_param'] = db_item.c_user_param
# ret['user_pswd'] = db_item.c_user_pswd
# ret['cert_id'] = db_item.c_cert_id
if db_item.c_auth_mode == 1:
ret['user_auth'] = db_item.c_user_pswd
elif db_item.c_auth_mode == 2:
cert_id = db_item.c_cert_id
str_sql = 'SELECT cert_pri FROM ts_cert WHERE cert_id={}'.format(cert_id)
db_ret = sql_exec.ExecProcQuery(str_sql)
print(db_ret)
if db_ret is None or len(db_ret) > 1:
return None
ret['user_auth'] = db_ret[0][0]
else:
pass
print(ret)
return ret