New retry params used when failure happens with writing to file or communicating with socket. New default name of socket: /tmp/mysql.audit_[cwd]_[port number OR unix socket with underscores instead of slashes], used for better support of multiple mysql instances running on the same machine.

pull/103/head v1.0.6
Guy Lichtman 2014-09-16 17:55:05 +03:00
parent f1adc23fe1
commit 23fb7f63c9
3 changed files with 327 additions and 155 deletions

View File

@ -76,11 +76,15 @@ class IWriter
{
public:
virtual ~IWriter() {}
//return negative on fail
virtual ssize_t write(const char * data, size_t size) = 0;
inline ssize_t write_str(const char * str)
{
return write(str, strlen(str));
}
//return 0 on success
virtual int open(const char * io_dest, bool log_errors) = 0;
virtual void close() = 0;
};
class ThdSesData {
@ -369,7 +373,7 @@ public:
static void stop_all();
Audit_handler() :
m_initialized(false), m_enabled(false), m_print_offset_err(true), m_formatter(NULL)
m_initialized(false), m_enabled(false), m_print_offset_err(true), m_formatter(NULL), m_failed(false), m_log_io_errors(true)
{
}
@ -378,6 +382,7 @@ public:
if (m_initialized)
{
rwlock_destroy(&LOCK_audit);
pthread_mutex_destroy(&LOCK_io);
}
}
@ -385,8 +390,6 @@ public:
* Should be called to initialize. We don't init in constructor in order to provide indication if
* pthread stuff failed init.
*
* Will internally call handler_init();
*
* @frmt the formatter to use in this handler (does not manage distruction of this object)
* @return 0 on success
*/
@ -402,7 +405,7 @@ public:
{
return res;
}
res = handler_init();
res = pthread_mutex_init(&LOCK_io, MY_MUTEX_INIT_SLOW);;
if (res)
{
return res;
@ -431,17 +434,48 @@ public:
/**
* Will get relevant shared lock and call internal method of handler
*/
void log_audit(ThdSesData *pThdData);
void log_audit(ThdSesData *pThdData);
/**
* Public so can be configured via sysvar
*/
unsigned int m_retry_interval;
protected:
Audit_formatter * m_formatter;
virtual void handler_start() = 0;
virtual void handler_stop() = 0;
virtual int handler_init() = 0;
virtual void handler_log_audit(ThdSesData *pThdData) =0;
virtual void handler_start();
//wiil call internal method and set failed as needed
bool handler_start_nolock();
virtual void handler_stop();
virtual bool handler_start_internal() = 0;
virtual void handler_stop_internal() = 0;
virtual bool handler_log_audit(ThdSesData *pThdData) =0;
bool m_initialized;
bool m_enabled;
rw_lock_t LOCK_audit;
bool m_failed;
bool m_log_io_errors;
time_t m_last_retry_sec_ts;
inline void set_failed()
{
time(&m_last_retry_sec_ts);
m_failed = true;
m_log_io_errors = false;
}
inline bool is_failed_now()
{
return m_failed && (m_retry_interval < 0 ||
difftime(time(NULL), m_last_retry_sec_ts) > m_retry_interval);
}
//override default assignment and copy to protect against creating additional instances
Audit_handler & operator=(const Audit_handler&);
Audit_handler(const Audit_handler&);
private:
//bool indicating if to print offset errors to log or not
bool m_print_offset_err;
//lock io
pthread_mutex_t LOCK_io;
//audit (enable) lock
rw_lock_t LOCK_audit;
inline void lock_shared()
{
rw_rdlock(&LOCK_audit);
@ -454,12 +488,6 @@ protected:
{
rw_unlock(&LOCK_audit);
}
//override default assignment and copy to protect against creating additional instances
Audit_handler & operator=(const Audit_handler&);
Audit_handler(const Audit_handler&);
private:
//bool indicating if to print offset errors to log or not
bool m_print_offset_err;
};
/**
@ -468,21 +496,25 @@ private:
class Audit_io_handler: public Audit_handler, public IWriter
{
public:
virtual ~Audit_io_handler()
{
if (m_initialized)
{
pthread_mutex_destroy(&LOCK_io);
}
Audit_io_handler() : m_io_dest(NULL), m_io_type(NULL)
{
}
virtual ~Audit_io_handler()
{
}
/**
* target we write to (socket/file). Public so we update via sysvar
*/
char * m_io_dest;
protected:
virtual int handler_init()
{
return pthread_mutex_init(&LOCK_io, MY_MUTEX_INIT_SLOW);
}
//mutex we need sync writes on file
pthread_mutex_t LOCK_io;
virtual bool handler_start_internal();
virtual void handler_stop_internal();
//used for logging messages
const char * m_io_type;
};
class Audit_file_handler: public Audit_io_handler
@ -490,8 +522,9 @@ class Audit_file_handler: public Audit_io_handler
public:
Audit_file_handler() :
m_sync_period(0), m_filename(NULL), m_log_file(NULL), m_sync_counter(0)
m_sync_period(0), m_log_file(NULL), m_sync_counter(0)
{
m_io_type = "file";
}
virtual ~Audit_file_handler()
@ -505,38 +538,31 @@ public:
* We leave this public so the mysql sysvar function can update this variable directly.
*/
unsigned int m_sync_period;
/**
* File name of the file we write to. Public so sysvar function can update this directly.
*/
char * m_filename;
/**
* Write function we pass to formatter
*/
ssize_t write(const char * data, size_t size);
void close();
int open(const char * io_dest, bool m_log_errors);
//static void print_sleep (THD *thd, int delay_ms);
protected:
//override default assignment and copy to protect against creating additional instances
Audit_file_handler & operator=(const Audit_file_handler&);
Audit_file_handler(const Audit_file_handler&);
virtual void handler_start();
virtual void handler_stop();
/**
* Will acquire locks and call handler_write
*/
virtual void handler_log_audit(ThdSesData *pThdData);
virtual bool handler_log_audit(ThdSesData *pThdData);
FILE * m_log_file;
//the period to use for syncing
unsigned int m_sync_counter;
void close_file()
{
if (m_log_file)
{
my_fclose(m_log_file, MYF(0));
}
m_log_file = NULL;
}
};
@ -545,19 +571,16 @@ class Audit_socket_handler: public Audit_io_handler
public:
Audit_socket_handler() :
m_sockname(NULL), m_vio(NULL), m_connect_timeout(1)
m_vio(NULL), m_connect_timeout(1)
{
m_io_type = "socket";
}
virtual ~Audit_socket_handler()
{
}
/**
* Socket name of the UNIX socket we write to. Public so sysvar function can update this directly.
*/
char * m_sockname;
/**
* Connect timeout in secconds
*/
@ -567,31 +590,22 @@ public:
* Write function we pass to formatter
*/
ssize_t write(const char * data, size_t size);
void close();
int open(const char * io_dest, bool log_errors);
protected:
//override default assignment and copy to protect against creating additional instances
Audit_socket_handler & operator=(const Audit_socket_handler&);
Audit_socket_handler(const Audit_socket_handler&);
virtual void handler_start();
virtual void handler_stop();
Audit_socket_handler(const Audit_socket_handler&);
/**
* Will acquire locks and call handler_write
*/
virtual void handler_log_audit(ThdSesData *pThdData);
virtual bool handler_log_audit(ThdSesData *pThdData);
//Vio we write to
//define as void* so we don't access members directly
void * m_vio;
void close_vio()
{
if (m_vio)
{
//no need for vio_close as is called by delete (additionally close changed its name to vio_shutdown in 5.6.11)
vio_delete((Vio*)m_vio);
}
m_vio = NULL;
}
void * m_vio;
};
#endif /* AUDIT_HANDLER_H_ */

View File

@ -127,7 +127,7 @@ void Audit_handler::log_audit(ThdSesData *pThdData)
{
unlock();
return;
}
}
//sanity check that offsets match
//we can also consider using secutiry context function to do some sanity checks
// char buffer[2048];
@ -148,98 +148,196 @@ void Audit_handler::log_audit(ThdSesData *pThdData)
else
{//offsets are good
m_print_offset_err = true; //mark to print offset err to log incase we encounter in the future
handler_log_audit(pThdData);
pthread_mutex_lock(&LOCK_io);
//check if failed
bool do_log = true;
if (m_failed)
{
do_log = false;
bool retry = m_retry_interval > 0 &&
difftime(time(NULL), m_last_retry_sec_ts) > m_retry_interval;
if(retry)
{
do_log = handler_start_nolock();
}
}
if(do_log)
{
if(!handler_log_audit(pThdData))
{
set_failed();
handler_stop_internal();
}
}
pthread_mutex_unlock(&LOCK_io);
}
unlock();
}
void Audit_file_handler::close()
{
if (m_log_file)
{
my_fclose(m_log_file, MYF(0));
}
m_log_file = NULL;
}
ssize_t Audit_file_handler::write(const char * data, size_t size)
{
return my_fwrite(m_log_file, (uchar *) data, size, MYF(0));
ssize_t res = my_fwrite(m_log_file, (uchar *) data, size, MYF(0));
if(res < 0) // log the error
{
sql_print_error("%s failed writing to file: %s. Err: %s",
AUDIT_LOG_PREFIX, m_io_dest, strerror(errno));
}
return res;
}
void Audit_file_handler::handler_start()
int Audit_file_handler::open(const char * io_dest, bool log_errors)
{
pthread_mutex_lock(&LOCK_io);
char format_name[FN_REFLEN];
fn_format(format_name, m_filename, "", "", MY_UNPACK_FILENAME);
char format_name[FN_REFLEN];
fn_format(format_name, io_dest, "", "", MY_UNPACK_FILENAME);
m_log_file = my_fopen(format_name, O_RDWR | O_APPEND, MYF(0));
if (!m_log_file)
if (!m_log_file)
{
sql_print_error(
"%s unable to create %s: %s. audit file handler disabled!!",
AUDIT_LOG_PREFIX, m_filename, strerror(errno));
m_enabled = false;
}
else
if(log_errors)
{
sql_print_error(
"%s unable to open file %s: %s. audit file handler disabled!!",
AUDIT_LOG_PREFIX, m_io_dest, strerror(errno));
}
return -1;
}
return 0;
}
//no locks. called by handler_start and when it is time to retry
bool Audit_io_handler::handler_start_internal()
{
if (open(m_io_dest, m_log_io_errors) != 0)
{
ssize_t res = m_formatter->start_msg_format(this);
/*
sanity check of writing to the log. If we fail. We will print an erorr and disable this handler.
*/
if (res < 0 || fflush(m_log_file) != 0)
{
sql_print_error(
"%s unable to write to %s: %s. Disabling audit handler.",
AUDIT_LOG_PREFIX, m_filename, strerror(errno));
close_file();
m_enabled = false;
}
//open failed
return false;
}
ssize_t res = m_formatter->start_msg_format(this);
/*
sanity check of writing to the log. If we fail. We will print an erorr and disable this handler.
*/
if (res < 0)
{
if(m_log_io_errors)
{
sql_print_error(
"%s unable to write header msg to %s: %s.",
AUDIT_LOG_PREFIX, m_io_dest, strerror(errno));
}
close();
return false;
}
sql_print_information("%s success opening %s: %s.", AUDIT_LOG_PREFIX, m_io_type, m_io_dest);
return true;
}
void Audit_io_handler::handler_stop_internal()
{
if(!m_failed)
{
m_formatter->stop_msg_format(this);
}
close();
}
bool Audit_handler::handler_start_nolock()
{
bool res = handler_start_internal();
if(res)
{
m_failed = false;
}
else
{
set_failed();
}
return res;
}
void Audit_handler::handler_start()
{
pthread_mutex_lock(&LOCK_io);
m_log_io_errors = true;
handler_start_nolock();
pthread_mutex_unlock(&LOCK_io);
}
void Audit_file_handler::handler_stop()
void Audit_handler::handler_stop()
{
pthread_mutex_lock(&LOCK_io);
m_formatter->stop_msg_format(this);
close_file();
handler_stop_internal();
pthread_mutex_unlock(&LOCK_io);
}
void Audit_file_handler::handler_log_audit(ThdSesData *pThdData)
bool Audit_file_handler::handler_log_audit(ThdSesData *pThdData)
{
pthread_mutex_lock(&LOCK_io);
m_formatter->event_format(pThdData, this);
if (++m_sync_counter >= m_sync_period && m_sync_period)
bool res = (m_formatter->event_format(pThdData, this) >= 0);
if (res && m_sync_period && ++m_sync_counter >= m_sync_period)
{
m_sync_counter = 0;
//Note fflush() only flushes the user space buffers provided by the C library.
//To ensure that the data is physically stored on disk the kernel buffers must be flushed too,
//e.g. with sync(2) or fsync(2).
fflush(m_log_file);
int fd = fileno(m_log_file);
my_sync(fd, MYF(MY_WME));
res = (fflush(m_log_file) == 0);
if(res)
{
int fd = fileno(m_log_file);
res = (my_sync(fd, MYF(MY_WME)) == 0);
}
}
pthread_mutex_unlock(&LOCK_io);
return res;
}
/////////////////// Audit_socket_handler //////////////////////////////////
ssize_t Audit_socket_handler::write(const char * data, size_t size)
void Audit_socket_handler::close()
{
return vio_write((Vio*)m_vio, (const uchar *) data, size);
if (m_vio)
{
//no need for vio_close as is called by delete (additionally close changed its name to vio_shutdown in 5.6.11)
vio_delete((Vio*)m_vio);
}
m_vio = NULL;
}
void Audit_socket_handler::handler_start()
ssize_t Audit_socket_handler::write(const char * data, size_t size)
{
pthread_mutex_lock(&LOCK_io);
//open the socket
ssize_t res = vio_write((Vio*)m_vio, (const uchar *) data, size);
if(res < 0) // log the error
{
sql_print_error("%s failed writing to socket: %s. Err: %s",
AUDIT_LOG_PREFIX, m_io_dest, strerror(vio_errno((Vio*)m_vio)));
}
return res;
}
int Audit_socket_handler::open(const char * io_dest, bool log_errors)
{
//open the socket
int sock = socket(AF_UNIX,SOCK_STREAM,0);
if (sock < 0)
{
sql_print_error(
"%s unable to create socket: %s. audit socket handler disabled!!",
AUDIT_LOG_PREFIX, strerror(errno));
m_enabled = false;
pthread_mutex_unlock(&LOCK_io);
return;
if(log_errors)
{
sql_print_error(
"%s unable to create unix socket: %s.",
AUDIT_LOG_PREFIX, strerror(errno));
}
return -1;
}
//connect the socket
m_vio= vio_new(sock, VIO_TYPE_SOCKET, VIO_LOCALHOST);
struct sockaddr_un UNIXaddr;
UNIXaddr.sun_family = AF_UNIX;
strmake(UNIXaddr.sun_path, m_sockname, sizeof(UNIXaddr.sun_path)-1);
strmake(UNIXaddr.sun_path, io_dest, sizeof(UNIXaddr.sun_path)-1);
#if MYSQL_VERSION_ID < 50600
if (my_connect(sock,(struct sockaddr *) &UNIXaddr, sizeof(UNIXaddr),
m_connect_timeout))
@ -249,42 +347,21 @@ void Audit_socket_handler::handler_start()
m_connect_timeout * 1000))
#endif
{
sql_print_error(
"%s unable to connect to socket: %s. err: %s. audit socket handler disabled!!",
AUDIT_LOG_PREFIX, m_sockname, strerror(errno));
close_vio();
m_enabled = false;
pthread_mutex_unlock(&LOCK_io);
return;
}
ssize_t res = m_formatter->start_msg_format(this);
/*
sanity check of writing to the log. If we fail. We will print an erorr and disable this handler.
*/
if (res < 0)
{
sql_print_error(
"%s unable to write to %s: %s. Disabling audit handler.",
AUDIT_LOG_PREFIX, m_sockname, strerror(errno));
close_vio();
m_enabled = false;
}
pthread_mutex_unlock(&LOCK_io);
}
void Audit_socket_handler::handler_stop()
{
pthread_mutex_lock(&LOCK_io);
m_formatter->stop_msg_format(this);
close_vio();
pthread_mutex_unlock(&LOCK_io);
if(log_errors)
{
sql_print_error(
"%s unable to connect to socket: %s. err: %s.",
AUDIT_LOG_PREFIX, m_io_dest, strerror(errno));
}
close();
return -2;
}
return 0;
}
void Audit_socket_handler::handler_log_audit(ThdSesData *pThdData)
{
pthread_mutex_lock(&LOCK_io);
m_formatter->event_format(pThdData, this);
pthread_mutex_unlock(&LOCK_io);
bool Audit_socket_handler::handler_log_audit(ThdSesData *pThdData)
{
return (m_formatter->event_format(pThdData, this) >= 0);
}
//////////////////////// Audit Socket handler end ///////////////////////////////////////////

View File

@ -852,6 +852,9 @@ static char password_masking_regex_check_buff[4096] = {0};
static char * password_masking_regex_string = NULL;
static char password_masking_regex_buff[4096] = {0};
//socket name
static char json_socket_name_buff[1024] = {0};
/**
* The trampoline functions we use. Will be set to point to allocated mem.
@ -1949,12 +1952,76 @@ static void password_masking_regex_string_update(THD *thd, struct st_mysql_sys_v
if(str != password_masking_regex_buff)
{
strncpy( password_masking_regex_buff , str, array_elements( password_masking_regex_buff) - 1);
}
}
password_masking_regex_string = password_masking_regex_buff;
password_masking_regex_compile();
sql_print_information("%s Set password_masking_regex value: [%s]", log_prefix, str_val);
}
static void replace_char(char * str, const char tofind, const char rplc)
{
size_t n = strlen(str);
for(size_t i = 0; i< n; i++)
{
if(tofind == str[i])
{
str[i] = rplc;
}
}
}
static void json_socket_name_update(THD *thd, struct st_mysql_sys_var *var, void *tgt, const void *save)
{
const char * str_val = *static_cast<char*const*>(save);
const char * str = str_val;
const size_t buff_len = array_elements( json_socket_name_buff) -1;
//copy str to buffer only if str is not pointing to buff
if(NULL == str)
{
json_socket_name_buff[0] = '\0';
}
else if(str != json_socket_name_buff)
{
strncpy( json_socket_name_buff , str, buff_len);
}
if(strlen(json_socket_name_buff) == 0) //set default
{
const char * name_prefix = "/tmp/mysql.audit_";
size_t indx = strlen(name_prefix); //count how much to move forward the buff
strncpy( json_socket_name_buff, name_prefix, buff_len);
char cwd_buff[512] = {0};
my_getwd(cwd_buff, array_elements(cwd_buff) - 1, 0);
replace_char(cwd_buff, '/', '_');
size_t cwd_len = strlen(cwd_buff);
if(cwd_len > 0 && '_' != cwd_buff[cwd_len-1]) //add _ to end
{
strncpy(cwd_buff + cwd_len, "_", array_elements(cwd_buff) - 1 - cwd_len);
}
strncpy(json_socket_name_buff + indx, cwd_buff, buff_len - indx);
indx += cwd_len;
if(indx < buff_len)
{
if(mysqld_port > 0)
{
snprintf(json_socket_name_buff + indx, buff_len - indx, "%u", mysqld_port);
}
else
{
strncpy(json_socket_name_buff + indx, mysqld_unix_port, buff_len - indx);
replace_char(json_socket_name_buff + indx, '/', '_');
}
}
else //should never happen
{
sql_print_error("%s json_socket_name_buff not big enough to set default name. buff: %s",
log_prefix, json_socket_name_buff);
}
}
json_socket_handler.m_io_dest = json_socket_name_buff;
sql_print_information("%s Set json_socket_name str: [%s] value: [%s]", log_prefix, str, json_socket_handler.m_io_dest);
}
//check that the regex compiles. Return 0 on success.
static int password_masking_regex_check(THD* thd, struct st_mysql_sys_var * var, void* save, st_mysql_value* value)
{
@ -2049,6 +2116,8 @@ static void record_objs_string_update_extended(THD *thd, struct st_mysql_sys_var
if (NULL != password_masking_regex_string) {
password_masking_regex_string_update(NULL, NULL, NULL, &password_masking_regex_string);
}
//update to generate the default if needed
json_socket_name_update(NULL, NULL, NULL, &(json_socket_handler.m_io_dest));
//set the password masking callback for json formatters
json_formatter.m_perform_password_masking = check_do_password_masking;
@ -2257,7 +2326,7 @@ static MYSQL_SYSVAR_BOOL(header_msg, json_formatter.m_write_start_msg,
PLUGIN_VAR_RQCMDARG,
"AUDIT write header message at start of logging or file flush Enable|Disable. Default enabled.", NULL, NULL, 1);
static MYSQL_SYSVAR_STR(json_log_file, json_file_handler.m_filename,
static MYSQL_SYSVAR_STR(json_log_file, json_file_handler.m_io_dest,
PLUGIN_VAR_RQCMDARG | PLUGIN_VAR_MEMALLOC,
"AUDIT plugin json log file name",
NULL, NULL, "mysql-audit.json");
@ -2267,6 +2336,16 @@ static MYSQL_SYSVAR_UINT(json_file_sync, json_file_handler.m_sync_period,
"AUDIT plugin json log file sync period. If the value of this variable is greater than 0, audit log will sync to disk after every audit_json_file_sync writes.",
NULL, NULL, 0, 0, UINT_MAX32, 0);
static MYSQL_SYSVAR_UINT(json_file_retry, json_file_handler.m_retry_interval,
PLUGIN_VAR_RQCMDARG,
"AUDIT plugin json log file retry interval. If the plugin fails to open/write to the json log file, will retry to open every specified interval in seconds. Set for 0 to disable retrying. Default 60 seconds.",
NULL, NULL, 60, 0, UINT_MAX32, 0);
static MYSQL_SYSVAR_UINT(json_socket_retry, json_socket_handler.m_retry_interval,
PLUGIN_VAR_RQCMDARG,
"AUDIT plugin json socket connect interval. If the plugin fails to connect/write to the json audit socket, will retry to connect every specified interval in seconds. Set for 0 to disable retrying. Default 10 seconds.",
NULL, NULL, 10, 0, UINT_MAX32, 0);
static MYSQL_SYSVAR_BOOL(json_file, json_file_handler_enable,
PLUGIN_VAR_RQCMDARG,
"AUDIT plugin json log file Enable|Disable", NULL, json_log_file_enable, 0);
@ -2276,10 +2355,10 @@ static MYSQL_SYSVAR_BOOL(json_file_flush, json_file_handler_flush,
"AUDIT plugin json log file flush. Set to ON to perform a flush of the log.", NULL, json_log_file_flush, 0);
static MYSQL_SYSVAR_STR(json_socket_name, json_socket_handler.m_sockname,
PLUGIN_VAR_RQCMDARG | PLUGIN_VAR_MEMALLOC,
static MYSQL_SYSVAR_STR(json_socket_name, json_socket_handler.m_io_dest,
PLUGIN_VAR_RQCMDARG,
"AUDIT plugin json log unix socket name",
NULL, NULL, "/tmp/mysql-audit.json.sock");
NULL, json_socket_name_update, "");
static MYSQL_SYSVAR_STR(offsets, offsets_string,
PLUGIN_VAR_RQCMDARG | PLUGIN_VAR_READONLY | PLUGIN_VAR_MEMALLOC,
@ -2373,6 +2452,8 @@ static struct st_mysql_sys_var* audit_system_variables[] =
MYSQL_SYSVAR(header_msg),
MYSQL_SYSVAR(json_log_file),
MYSQL_SYSVAR(json_file_sync),
MYSQL_SYSVAR(json_file_retry),
MYSQL_SYSVAR(json_socket_retry),
MYSQL_SYSVAR(json_file),
MYSQL_SYSVAR(json_file_flush),
MYSQL_SYSVAR(uninstall_plugin),