
[PATCH] MDEV-29744: Fix incorrect locking order of LOCK_log/LOCK_commit_ordered and LOCK_global_system_variables
by Kristian Nielsen 25 Oct '24
by Kristian Nielsen 25 Oct '24
25 Oct '24
The LOCK_global_system_variables must not be held when taking mutexes
such as LOCK_commit_ordered and LOCK_log, as this causes inconsistent
mutex locking order that can theoretically cause the server to
deadlock.
To avoid this, temporarily release LOCK_global_system_variables in two
system variable update functions, like it is done in many other
places.
Enforce the correct locking order at server startup, to more easily
catch (in debug builds) any remaining wrong orders that may be hidden
elsewhere in the code.
Note that when this is merged to 11.4, similar unlock/lock of
LOCK_global_system_variables must be added in update_binlog_space_limit()
as is done in binlog_checksum_update() and fix_max_binlog_size(), as this
is a new function added in 11.4 that also needs the same fix. Tests will
fail with wrong mutex order until this is done.
Signed-off-by: Kristian Nielsen <knielsen(a)knielsen-hq.org>
fixup_locking_order
Signed-off-by: Kristian Nielsen <knielsen(a)knielsen-hq.org>
---
sql/log.cc | 5 +++++
sql/sys_vars.cc | 2 ++
2 files changed, 7 insertions(+)
diff --git a/sql/log.cc b/sql/log.cc
index 512be2e2a6d..ecb177a34fb 100644
--- a/sql/log.cc
+++ b/sql/log.cc
@@ -3595,6 +3595,9 @@ void MYSQL_BIN_LOG::init_pthread_objects()
mysql_mutex_init(m_key_LOCK_binlog_end_pos, &LOCK_binlog_end_pos,
MY_MUTEX_INIT_SLOW);
+
+ /* Fix correct mutex order to catch violations quicker (MDEV-35197). */
+ mysql_mutex_record_order(&LOCK_log, &LOCK_global_system_variables);
}
@@ -11753,6 +11756,7 @@ binlog_checksum_update(MYSQL_THD thd, struct st_mysql_sys_var *var,
bool check_purge= false;
ulong UNINIT_VAR(prev_binlog_id);
+ mysql_mutex_unlock(&LOCK_global_system_variables);
mysql_mutex_lock(mysql_bin_log.get_log_lock());
if(mysql_bin_log.is_open())
{
@@ -11771,6 +11775,7 @@ binlog_checksum_update(MYSQL_THD thd, struct st_mysql_sys_var *var,
mysql_mutex_unlock(mysql_bin_log.get_log_lock());
if (check_purge)
mysql_bin_log.checkpoint_and_purge(prev_binlog_id);
+ mysql_mutex_lock(&LOCK_global_system_variables);
}
diff --git a/sql/sys_vars.cc b/sql/sys_vars.cc
index cb35386f883..115bbdf499b 100644
--- a/sql/sys_vars.cc
+++ b/sql/sys_vars.cc
@@ -1660,7 +1660,9 @@ Sys_max_binlog_stmt_cache_size(
static bool fix_max_binlog_size(sys_var *self, THD *thd, enum_var_type type)
{
+ mysql_mutex_unlock(&LOCK_global_system_variables);
mysql_bin_log.set_max_size(max_binlog_size);
+ mysql_mutex_lock(&LOCK_global_system_variables);
return false;
}
static Sys_var_on_access_global<Sys_var_ulong,
--
2.39.5
1
0

[PATCH] MDEV-35197: Fix incorrect locking order of LOCK_log/LOCK_commit_ordered and LOCK_global_system_variables
by Kristian Nielsen 22 Oct '24
by Kristian Nielsen 22 Oct '24
22 Oct '24
The LOCK_global_system_variables must not be held when taking mutexes
such as LOCK_commit_ordered and LOCK_log, as this causes inconsistent
mutex locking order that can theoretically cause the server to
deadlock.
To avoid this, temporarily release LOCK_global_system_variables in two
system variable update functions, like it is done in many other
places.
Enforce the correct locking order at server startup, to more easily
catch (in debug builds) any remaining wrong orders that may be hidden
elsewhere in the code.
Signed-off-by: Kristian Nielsen <knielsen(a)knielsen-hq.org>
---
sql/log.cc | 8 ++++++++
sql/sys_vars.cc | 2 ++
2 files changed, 10 insertions(+)
diff --git a/sql/log.cc b/sql/log.cc
index 512be2e2a6d..70fe33f9272 100644
--- a/sql/log.cc
+++ b/sql/log.cc
@@ -3595,6 +3595,12 @@ void MYSQL_BIN_LOG::init_pthread_objects()
mysql_mutex_init(m_key_LOCK_binlog_end_pos, &LOCK_binlog_end_pos,
MY_MUTEX_INIT_SLOW);
+
+ /* Fix correct mutex order to catch violations quicker (MDEV-35197). */
+ mysql_mutex_lock(&LOCK_log);
+ mysql_mutex_lock(&LOCK_global_system_variables);
+ mysql_mutex_unlock(&LOCK_global_system_variables);
+ mysql_mutex_unlock(&LOCK_log);
}
@@ -11753,6 +11759,7 @@ binlog_checksum_update(MYSQL_THD thd, struct st_mysql_sys_var *var,
bool check_purge= false;
ulong UNINIT_VAR(prev_binlog_id);
+ mysql_mutex_unlock(&LOCK_global_system_variables);
mysql_mutex_lock(mysql_bin_log.get_log_lock());
if(mysql_bin_log.is_open())
{
@@ -11771,6 +11778,7 @@ binlog_checksum_update(MYSQL_THD thd, struct st_mysql_sys_var *var,
mysql_mutex_unlock(mysql_bin_log.get_log_lock());
if (check_purge)
mysql_bin_log.checkpoint_and_purge(prev_binlog_id);
+ mysql_mutex_lock(&LOCK_global_system_variables);
}
diff --git a/sql/sys_vars.cc b/sql/sys_vars.cc
index cb35386f883..115bbdf499b 100644
--- a/sql/sys_vars.cc
+++ b/sql/sys_vars.cc
@@ -1660,7 +1660,9 @@ Sys_max_binlog_stmt_cache_size(
static bool fix_max_binlog_size(sys_var *self, THD *thd, enum_var_type type)
{
+ mysql_mutex_unlock(&LOCK_global_system_variables);
mysql_bin_log.set_max_size(max_binlog_size);
+ mysql_mutex_lock(&LOCK_global_system_variables);
return false;
}
static Sys_var_on_access_global<Sys_var_ulong,
--
2.39.2
1
0

21 Oct '24
Signed-off-by: Kristian Nielsen <knielsen(a)knielsen-hq.org>
---
cmake/mariadb_connector_c.cmake | 4 ++++
1 file changed, 4 insertions(+)
diff --git a/cmake/mariadb_connector_c.cmake b/cmake/mariadb_connector_c.cmake
index 9b4f25498fd..aff69b3ba17 100644
--- a/cmake/mariadb_connector_c.cmake
+++ b/cmake/mariadb_connector_c.cmake
@@ -32,6 +32,10 @@ ELSE()
SET(CONC_INSTALL_LAYOUT "DEFAULT")
ENDIF()
+IF(WITH_BOOST_CONTEXT)
+ SET(CONC_WITH_BOOST_CONTEXT ON)
+ENDIF()
+
SET(PLUGIN_INSTALL_DIR ${INSTALL_PLUGINDIR})
SET(MARIADB_UNIX_ADDR ${MYSQL_UNIX_ADDR})
--
2.39.2
1
0

16 Oct '24
Follow-up from previous patch, which introduced catalog in the GTID event.
With catalog in GTID, there is no longer a need to use the catalog in
Query_log_event or Table_map_log_event. In fact, since catalog must never
change in the middle of a transaction, it is better to _only_ set the
catalog from the GTID event and don't use any catalog info from other
events.
To save space, remove the previously introduced catalog from
Table_map_log_event. (The catalog in Query_log_event is kept as it was there
for long, so as not to change event formats).
Signed-off-by: Kristian Nielsen <knielsen(a)knielsen-hq.org>
---
sql/log_event.cc | 102 ++++++++++-----------------------------
sql/log_event.h | 23 ++-------
sql/log_event_client.cc | 51 ++++++++------------
sql/log_event_server.cc | 104 +++++++++++++++++++++++-----------------
4 files changed, 110 insertions(+), 170 deletions(-)
diff --git a/sql/log_event.cc b/sql/log_event.cc
index 004010e9d99..e8bb22b0ecb 100644
--- a/sql/log_event.cc
+++ b/sql/log_event.cc
@@ -1376,7 +1376,7 @@ Query_log_event::Query_log_event(const uchar *buf, uint event_len,
Log_event_type event_type)
:Log_event(buf, description_event), data_buf(0),
catalog(0), db(NullS), query(NullS),
- status_vars_len(0),
+ catalog_len(0), status_vars_len(0),
flags2_inited(0), sql_mode_inited(0), charset_inited(0), flags2(0),
auto_increment_increment(1), auto_increment_offset(1),
time_zone_len(0), lc_time_names_number(0), charset_database_number(0),
@@ -1387,12 +1387,11 @@ Query_log_event::Query_log_event(const uchar *buf, uint event_len,
uint8 common_header_len, post_header_len;
Log_event::Byte *start;
const Log_event::Byte *end;
+ bool catalog_nz= 1;
DBUG_ENTER("Query_log_event::Query_log_event(char*,...)");
memset(&user, 0, sizeof(user));
memset(&host, 0, sizeof(host));
- catalog_name.str= 0;
- catalog_name.length= 0;
common_header_len= description_event->common_header_len;
post_header_len= description_event->post_header_len[event_type-1];
DBUG_PRINT("info",("event_len: %u common_header_len: %d post_header_len: %d",
@@ -1461,16 +1460,13 @@ Query_log_event::Query_log_event(const uchar *buf, uint event_len,
}
case Q_CATALOG_NZ_CODE:
{
- uint length;
DBUG_PRINT("info", ("case Q_CATALOG_NZ_CODE; pos:%p; end:%p",
pos, end));
- if (get_str_len_and_pointer(&pos, &catalog_name.str, &length, end))
+ if (get_str_len_and_pointer(&pos, &catalog, &catalog_len, end))
{
query= 0;
DBUG_VOID_RETURN;
}
- catalog_name.length= length;
- break;
}
break;
case Q_AUTO_INCREMENT:
@@ -1500,10 +1496,11 @@ Query_log_event::Query_log_event(const uchar *buf, uint event_len,
case Q_CATALOG_CODE: /* for 5.0.x where 0<=x<=3 masters */
{
CHECK_SPACE(pos, end, 1);
- if ((catalog_name.length= *pos))
- catalog_name.str= (char*) pos+1; // Will be copied later
- CHECK_SPACE(pos, end, catalog_name.length + 2);
- pos+= catalog_name.length+2; // leap over end 0
+ if ((catalog_len= *pos))
+ catalog= (char*) pos+1; // Will be copied later
+ CHECK_SPACE(pos, end, catalog_len + 2);
+ pos+= catalog_len+2; // leap over end 0
+ catalog_nz= 0; // catalog has end 0 in event
}
break;
case Q_LC_TIME_NAMES_CODE:
@@ -1618,7 +1615,7 @@ Query_log_event::Query_log_event(const uchar *buf, uint event_len,
#if !defined(MYSQL_CLIENT) && defined(HAVE_QUERY_CACHE)
if (!(start= data_buf= (Log_event::Byte*) my_malloc(PSI_INSTRUMENT_ME,
- catalog_name.length + 1
+ catalog_len + 1
+ time_zone_len + 1
+ user.length + 1
+ host.length + 1
@@ -1630,7 +1627,7 @@ Query_log_event::Query_log_event(const uchar *buf, uint event_len,
MYF(MY_WME))))
#else
if (!(start= data_buf= (Log_event::Byte*) my_malloc(PSI_INSTRUMENT_ME,
- catalog_name.length + 1
+ catalog_len + 1
+ time_zone_len + 1
+ user.length + 1
+ host.length + 1
@@ -1638,36 +1635,22 @@ Query_log_event::Query_log_event(const uchar *buf, uint event_len,
MYF(MY_WME))))
#endif
DBUG_VOID_RETURN;
-
- /*
- Ensure we can support old replication clients that are using 'std' as catalog name
- This is also needed to support old mtr test that uses copies of old replication logs that
- still are using 'std'.
- */
- if (catalog_name.length == 3 and memcmp(catalog_name.str, "std", 3) == 0)
- catalog_name.str= "def";
-
-#ifndef MYSQL_CLIENT
- if (catalog_name.length) // If catalog was given
+ if (catalog_len) // If catalog is given
{
- if (!(catalog= get_catalog(&catalog_name, 1)))
+ /**
+ @todo we should clean up and do only copy_str_and_move; it
+ works for both cases. Then we can remove the catalog_nz
+ flag. /sven
+ */
+ if (likely(catalog_nz)) // true except if event comes from 5.0.0|1|2|3.
+ copy_str_and_move(&catalog, &start, catalog_len);
+ else
{
- if (!user.str)
- user.str= "";
- if (!host.str)
- host.str= "";
- my_error(ER_ACCESS_NO_SUCH_CATALOG, MYF(ME_ERROR_LOG),
- user.str, host.str, catalog_name.length, catalog_name.str);
- query= 0;
- DBUG_VOID_RETURN;
+ memcpy(start, catalog, catalog_len+1); // copy end 0
+ catalog= (const char *)start;
+ start+= catalog_len+1;
}
}
- else
- catalog= default_catalog();
-#endif
-
- copy_str_and_move(&catalog_name.str, &start, catalog_name.length);
-
if (time_zone_len)
copy_str_and_move(&time_zone_str, &start, time_zone_len);
@@ -2456,13 +2439,14 @@ Gtid_log_event::Gtid_log_event(const uchar *buf, uint event_len,
return;
}
uint32_t cat_len= *buf++;
- if (unlikely(cat_len > MAX_CATALOG_NAME) ||
+ if (unlikely(cat_len >= MAX_CATALOG_NAME) ||
unlikely(buf - buf_0 + cat_len) >= event_len)
{
seq_no= 0;
return;
}
memcpy(cat_name_buf, buf, cat_len);
+ cat_name_buf[cat_len]= '\0';
cat_name_int.str= cat_name_buf;
cat_name_int.length= cat_len;
cat_name= &cat_name_int;
@@ -3341,7 +3325,6 @@ Table_map_log_event::Table_map_log_event(const uchar *buf, uint event_len,
#ifndef MYSQL_CLIENT
m_table(NULL),
#endif
- m_catalog(NULL),
m_dbnam(NULL), m_dblen(0), m_tblnam(NULL), m_tbllen(0),
m_colcnt(0), m_coltype(0),
m_memory(NULL), m_table_id(ULONGLONG_MAX), m_flags(0),
@@ -3350,7 +3333,6 @@ Table_map_log_event::Table_map_log_event(const uchar *buf, uint event_len,
m_optional_metadata_len(0), m_optional_metadata(NULL)
{
unsigned int bytes_read= 0;
- LEX_CSTRING catalog= {"",0};
uint8 common_header_len= description_event->common_header_len;
uint8 post_header_len= description_event->post_header_len[TABLE_MAP_EVENT-1];
DBUG_ENTER("Table_map_log_event::Table_map_log_event(const char*,uint,...)");
@@ -3359,9 +3341,6 @@ Table_map_log_event::Table_map_log_event(const uchar *buf, uint event_len,
"post_header_len: %d",
event_len, common_header_len, post_header_len));
- m_catnam.str="";
- m_catnam.length= 0;
-
/*
Don't print debug messages when running valgrind since they can
trigger false warnings.
@@ -3396,35 +3375,7 @@ Table_map_log_event::Table_map_log_event(const uchar *buf, uint event_len,
m_flags= uint2korr(post_start);
/* Read the variable part of the event */
- const uchar *vpart= buf + common_header_len + post_header_len;
-
- if (m_flags & TM_BIT_HAS_CATALOG_F)
- {
- size_t catalog_len= *vpart;
- const char *catalog_name= (const char*) vpart + 1;
-
- catalog.str= catalog_name;
- catalog.length= catalog_len;
- if (vpart + catalog_len + 2 > buf + event_len)
- DBUG_VOID_RETURN;
-
-#ifndef MYSQL_CLIENT
- if (!(m_catalog= get_catalog(&catalog, 1)))
- {
- my_error(ER_ACCESS_NO_SUCH_CATALOG, MYF(ME_ERROR_LOG),
- "replication", "localhost",
- catalog_len, catalog_name);
- DBUG_VOID_RETURN;
- }
-#endif
- vpart+= catalog_len+2;
- }
-#ifndef MYSQL_CLIENT
- else
- {
- m_catalog= default_catalog();
- }
-#endif /* MYSQL_CLIENT */
+ const uchar *const vpart= buf + common_header_len + post_header_len;
/* Extract the length of the various parts from the buffer */
uchar const *const ptr_dblen= (uchar const*)vpart + 0;
@@ -3449,7 +3400,6 @@ Table_map_log_event::Table_map_log_event(const uchar *buf, uint event_len,
/* Allocate mem for all fields in one go. If fails, caught in is_valid() */
m_memory= (uchar*) my_multi_malloc(PSI_INSTRUMENT_ME, MYF(MY_WME),
- &m_catnam.str, (uint) catalog.length+1,
&m_dbnam, (uint) m_dblen + 1,
&m_tblnam, (uint) m_tbllen + 1,
&m_coltype, (uint) m_colcnt,
@@ -3458,8 +3408,6 @@ Table_map_log_event::Table_map_log_event(const uchar *buf, uint event_len,
if (m_memory)
{
/* Copy the different parts into their memory */
- memcpy(const_cast<char*>(m_catnam.str), catalog.str, catalog.length+1);
- m_catnam.length= catalog.length;
strncpy(const_cast<char*>(m_dbnam), (const char*)ptr_dblen + 1, m_dblen + 1);
strncpy(const_cast<char*>(m_tblnam), (const char*)ptr_tbllen + 1, m_tbllen + 1);
memcpy(m_coltype, ptr_after_colcnt, m_colcnt);
diff --git a/sql/log_event.h b/sql/log_event.h
index 2f866efca7c..3099d693c8b 100644
--- a/sql/log_event.h
+++ b/sql/log_event.h
@@ -838,7 +838,7 @@ typedef struct st_print_event_info
them if they are unchanged.
*/
char db[FN_REFLEN+1]; // TODO: make this a LEX_STRING when thd->db is
- const SQL_CATALOG *catalog;
+ char catalog_name[MAX_CATALOG_NAME];
char charset[6]; // 3 variables, each of them storable in 2 bytes
char time_zone_str[MAX_TIME_ZONE_NAME_LENGTH];
char delimiter[16];
@@ -2082,8 +2082,7 @@ class Query_log_event: public Log_event
protected:
Log_event::Byte* data_buf;
public:
- const SQL_CATALOG *catalog;
- LEX_CSTRING catalog_name;
+ const char *catalog;
const char *db;
const char *query;
/*
@@ -2107,6 +2106,8 @@ class Query_log_event: public Log_event
concerned) from here.
*/
+ uint catalog_len; // <= 255 char; 0 means uninited
+
/*
We want to be able to store a variable number of N-bit status vars:
(generally N=32; but N=64 for SQL_MODE) a user may want to log the number
@@ -3791,19 +3792,8 @@ class Annotate_rows_log_event: public Log_event
<td>Has the following bit values</tdd>
<td>TM_BIT_LEN_EXACT_F= 1 << 0</td>
<td>TM_BIT_HAS_TRIGGERS=1 << 14</td>
- <td>TM_BIT_HAS_CATALOG= 1 << 15</td>
</tr>
- <tr>
- <td>catalog_name</td>
- <td>one byte string length, followed by null-terminated string</td>
- <td>The name of the catalog in which the table resides. The name
- is represented as a one byte unsigned integer representing the
- number of bytes in the name, followed by length bytes containing
- the database name, followed by a terminating 0 byte. (Note the
- redundancy in the representation of the length.)
- This entry only exists if bit TM_BIT_HAS_CATALOG is set.</td>
- </tr>
</table>
The Body has the following components:
@@ -4362,8 +4352,7 @@ class Table_map_log_event : public Log_event
TM_NO_FLAGS = 0U,
TM_BIT_LEN_EXACT_F = (1U << 0),
// MariaDB flags (we starts from the other end)
- TM_BIT_HAS_TRIGGERS_F= (1U << 14),
- TM_BIT_HAS_CATALOG_F= (1U << 15),
+ TM_BIT_HAS_TRIGGERS_F= (1U << 14)
};
flag_set get_flags(flag_set flag) const { return m_flags & flag; }
@@ -4468,8 +4457,6 @@ class Table_map_log_event : public Log_event
class Default_charset_iterator;
class Column_charset_iterator;
#endif
- const SQL_CATALOG *m_catalog;
- LEX_CSTRING m_catnam; // Used by mysqlbinlog
char const *m_dbnam;
size_t m_dblen;
char const *m_tblnam;
diff --git a/sql/log_event_client.cc b/sql/log_event_client.cc
index 9315fdcf32b..b73dcd5c02c 100644
--- a/sql/log_event_client.cc
+++ b/sql/log_event_client.cc
@@ -1849,14 +1849,6 @@ bool Query_log_event::print_query_header(IO_CACHE* file,
goto err;
}
}
- if (print_event_info->catalog != catalog)
- {
- print_event_info->catalog= catalog;
- if (my_b_printf(file, "/*!110600 SET CATALOG `%.*s` */%s\n",
- catalog_name.length, catalog_name.str,
- print_event_info->delimiter))
- goto err;
- }
end=int10_to_str((long) when, strmov(buff,"SET TIMESTAMP="),10);
if (when_sec_part && when_sec_part <= TIME_MAX_SECOND_PART)
@@ -3017,11 +3009,9 @@ int Table_map_log_event::rewrite_db(const char *new_db, size_t new_len,
// m_dbnam resides in m_memory together with m_tblnam and m_coltype
uchar* memory= m_memory;
char const* tblnam= m_tblnam;
- char const* catnam= m_catnam.str;
uchar* coltype= m_coltype;
m_memory= (uchar*) my_multi_malloc(PSI_NOT_INSTRUMENTED, MYF(MY_WME),
- &m_catnam.str, (uint) m_catnam.length,
&m_dbnam, (uint) m_dblen + 1,
&m_tblnam, (uint) m_tbllen + 1,
&m_coltype, (uint) m_colcnt,
@@ -3030,13 +3020,11 @@ int Table_map_log_event::rewrite_db(const char *new_db, size_t new_len,
if (!m_memory)
{
sql_print_error("Table_map_log_event::rewrite_db: "
- "failed to allocate new m_memory (%d, %d + %d + %d bytes required)",
- (int) m_catnam.length + 1,+ m_dblen + 1, m_tbllen + 1,
- m_colcnt);
+ "failed to allocate new m_memory (%d + %d + %d bytes required)",
+ m_dblen + 1, m_tbllen + 1, m_colcnt);
DBUG_RETURN(-1);
}
- memcpy((void*)m_catnam.str, catnam, m_catnam.length+1);
memcpy((void*)m_dbnam, new_db, m_dblen + 1);
memcpy((void*)m_tblnam, tblnam, m_tbllen + 1);
memcpy(m_coltype, coltype, m_colcnt);
@@ -3051,26 +3039,13 @@ bool Table_map_log_event::print(FILE *file, PRINT_EVENT_INFO *print_event_info)
if (!print_event_info->short_form)
{
char llbuff[22];
- char options[80], *ptr;
-
- ptr= options;
- if (m_flags & (TM_BIT_HAS_TRIGGERS_F | TM_BIT_HAS_CATALOG_F))
- {
- ptr= strmov(ptr, " (has ");
- if (m_flags & TM_BIT_HAS_TRIGGERS_F)
- ptr= strmov(ptr, "triggers,");
- if (m_flags & TM_BIT_HAS_CATALOG_F)
- ptr= strmov(ptr, "catalog,");
- ptr[-1]= ')';
- ptr++;
- }
- *ptr= 0;
print_header(&print_event_info->head_cache, print_event_info, TRUE);
if (my_b_printf(&print_event_info->head_cache,
"\tTable_map: %`s.%`s mapped to number %s%s\n",
m_dbnam, m_tblnam, ullstr(m_table_id, llbuff),
- options))
+ ((m_flags & TM_BIT_HAS_TRIGGERS_F) ?
+ " (has triggers)" : "")))
goto err;
}
if (!print_event_info->short_form || print_event_info->print_row_count)
@@ -3653,8 +3628,8 @@ st_print_event_info::st_print_event_info()
bzero(db, sizeof(db));
bzero(charset, sizeof(charset));
bzero(time_zone_str, sizeof(time_zone_str));
- // Set catalog to impossible value to ensure that catalog is updated later!
- catalog= (SQL_CATALOG *) 1;
+ catalog_name[0]= '\0';
+ catalog_length= 0;
delimiter[0]= ';';
delimiter[1]= 0;
flags2_inited= 0;
@@ -3725,6 +3700,7 @@ Gtid_log_event::print(FILE *file, PRINT_EVENT_INFO *print_event_info)
Write_on_release_cache::FLUSH_F, this);
char buf[21];
char buf2[21];
+ bool different;
if (!print_event_info->short_form && !is_flashback)
{
@@ -3797,6 +3773,19 @@ Gtid_log_event::print(FILE *file, PRINT_EVENT_INFO *print_event_info)
buf, print_event_info->delimiter))
goto err;
}
+
+ different= print_event_info->catalog_length != cat_name->length ||
+ memcmp(print_event_info->catalog_name, cat_name->str, cat_name->length);
+ if (different)
+ {
+ print_event_info->catalog_length= cat_name->length;
+ memcpy(print_event_info->catalog_name, cat_name->str, cat_name->length);
+ if (my_b_printf(&cache, "/*!110600 SET CATALOG `%.*s` */%s\n",
+ cat_name->length, cat_name->str,
+ print_event_info->delimiter))
+ goto err;
+ }
+
if ((flags2 & FL_PREPARED_XA) && !is_flashback)
{
my_b_write_string(&cache, "XA START ");
diff --git a/sql/log_event_server.cc b/sql/log_event_server.cc
index 22ad0a8b4a6..d3a5f616476 100644
--- a/sql/log_event_server.cc
+++ b/sql/log_event_server.cc
@@ -995,21 +995,13 @@ void Query_log_event::pack_info(Protocol *protocol)
{
char buf_mem[1024];
String buf(buf_mem, sizeof(buf_mem), system_charset_info);
- buf.real_alloc(30 + ((catalog ? catalog->name.length : 0 )+ db_len)*2 + q_len);
- if (!(flags & LOG_EVENT_SUPPRESS_USE_F))
+ buf.real_alloc(9 + db_len + q_len);
+ if (!(flags & LOG_EVENT_SUPPRESS_USE_F)
+ && db && db_len)
{
- if (catalog)
- {
- buf.append(STRING_WITH_LEN("SET CATALOG "));
- append_identifier(protocol->thd, &buf, catalog->name.str, catalog->name.length);
- buf.append(STRING_WITH_LEN("; "));
- }
- if (db && db_len)
- {
- buf.append(STRING_WITH_LEN("use "));
- append_identifier(protocol->thd, &buf, db, db_len);
- buf.append(STRING_WITH_LEN("; "));
- }
+ buf.append(STRING_WITH_LEN("use "));
+ append_identifier(protocol->thd, &buf, db, db_len);
+ buf.append(STRING_WITH_LEN("; "));
}
DBUG_ASSERT(!flags2 || flags2_inited);
@@ -1139,7 +1131,7 @@ bool Query_log_event::write()
int8store(start, (ulonglong)sql_mode);
start+= 8;
}
- store_str_with_code_and_len(&start, catalog->name.str, catalog->name.length,
+ store_str_with_code_and_len(&start, catalog, catalog_len,
(uint) Q_CATALOG_NZ_CODE);
/*
In 5.0.x where x<4 masters we used to store the end zero here. This was
@@ -1383,7 +1375,7 @@ Query_log_event::Query_log_event(THD* thd_arg, const char* query_arg,
? LOG_EVENT_THREAD_SPECIFIC_F : 0) |
(suppress_use ? LOG_EVENT_SUPPRESS_USE_F : 0),
using_trans),
- data_buf(0), catalog(thd_arg->catalog), query(query_arg),
+ data_buf(0), catalog(thd_arg->catalog->name.str), query(query_arg),
q_len((uint32) query_length),
thread_id(thd_arg->thread_id),
/* save the original thread id; we already know the server id */
@@ -1422,6 +1414,11 @@ Query_log_event::Query_log_event(THD* thd_arg, const char* query_arg,
end_time= my_time(0);
exec_time = (ulong) (end_time - thd_arg->start_time);
+ /**
+ @todo this means that if we have no catalog, then it is replicated
+ as an existing catalog of length zero. is that safe? /sven
+ */
+ catalog_len = (catalog) ? (uint32) strlen(catalog) : 0;
if (!(db= thd->db.str))
db= "";
@@ -1855,9 +1852,13 @@ int Query_log_event::do_apply_event(rpl_group_info *rgi,
bool skip_error_check= false;
DBUG_ENTER("Query_log_event::do_apply_event");
- DBUG_ASSERT(catalog);
- if (thd->catalog != catalog)
- thd->change_catalog(const_cast<SQL_CATALOG*>(catalog));
+ /*
+ Colleagues: please never free(thd->catalog) in MySQL. This would
+ lead to bugs as here thd->catalog is a part of an alloced block,
+ not an entire alloced block (see
+ Query_log_event::do_apply_event()). Same for thd->db. Thank
+ you.
+ */
rgi->start_alter_ev= this;
size_t valid_len= Well_formed_prefix(system_charset_info,
@@ -2348,11 +2349,6 @@ Query_log_event::do_shall_skip(rpl_group_info *rgi)
DBUG_ASSERT(query && q_len > 0);
DBUG_ASSERT(thd == rgi->thd);
- /* Set thd to point to the current catalog */
- DBUG_ASSERT(catalog);
- if (thd->catalog != catalog)
- thd->change_catalog(const_cast<SQL_CATALOG*>(catalog));
-
/*
An event skipped due to @@skip_replication must not be counted towards the
number of events to be skipped due to @@sql_slave_skip_counter.
@@ -3157,9 +3153,22 @@ Gtid_log_event::make_compatible_event(String *packet, bool *need_dummy_event,
void
Gtid_log_event::pack_info(Protocol *protocol)
{
- char buf[6+5+10+1+10+1+20+1+4+20+1+ ser_buf_size+5 /* sprintf */];
- char *p;
- p = strmov(buf, (flags2 & FL_STANDALONE ? "GTID " :
+ static constexpr int catalog_needed= 12 + 2*MAX_CATALOG_NAME+2 + 2;
+ char buf[catalog_needed+6+5+10+1+10+1+20+1+4+20+1+
+ ser_buf_size+5 /* sprintf */];
+ char *p= buf;
+ if (flags_extra & FL_CATALOG)
+ {
+ char buf_mem[2*MAX_CATALOG_NAME+2];
+ String buf2(buf_mem, sizeof(buf_mem), system_charset_info);
+ append_identifier(protocol->thd, &buf2, cat_name->str, cat_name->length);
+ size_t buf2_len= std::min(buf2.length(), (uint32)(2*MAX_CATALOG_NAME+2));
+ DBUG_ASSERT(buf2_len == buf2.length());
+ p= strmov(p, "SET CATALOG ");
+ memcpy(p, buf2.ptr(), buf2_len);
+ p= strmov(p + buf2_len, "; ");
+ }
+ p = strmov(p, (flags2 & FL_STANDALONE ? "GTID " :
flags2 & FL_PREPARED_XA ? "XA START " : "BEGIN GTID "));
if (flags2 & FL_PREPARED_XA)
{
@@ -3215,6 +3224,29 @@ Gtid_log_event::do_apply_event(rpl_group_info *rgi)
return 1;
}
+ if (using_catalogs)
+ {
+ if (flags_extra & FL_CATALOG)
+ {
+ if (!thd->catalog || cmp(cat_name, &thd->catalog->name))
+ {
+ SQL_CATALOG *catalog= get_catalog(cat_name, 1);
+ if (!catalog)
+ {
+ my_error(ER_NO_SUCH_CATALOG, MYF(ME_ERROR_LOG),
+ cat_name->length, cat_name->str);
+ return 1;
+ }
+ thd->change_catalog(catalog);
+ }
+ }
+ else
+ {
+ if (thd->catalog != default_catalog())
+ thd->change_catalog(default_catalog());
+ }
+ }
+
DBUG_ASSERT((bits & OPTION_GTID_BEGIN) == 0);
Master_info *mi=rgi->rli->mi;
@@ -5689,8 +5721,6 @@ Table_map_log_event::Table_map_log_event(THD *thd, TABLE *tbl, ulong tid,
bool is_transactional)
: Log_event(thd, 0, is_transactional),
m_table(tbl),
- m_catalog(thd->catalog),
- m_catnam(empty_clex_str),
m_dbnam(tbl->s->db.str),
m_dblen(m_dbnam ? tbl->s->db.length : 0),
m_tblnam(tbl->s->table_name.str),
@@ -5736,11 +5766,6 @@ Table_map_log_event::Table_map_log_event(THD *thd, TABLE *tbl, ulong tid,
if (tbl->triggers)
m_flags|= TM_BIT_HAS_TRIGGERS_F;
- if (m_catalog != default_catalog())
- {
- m_flags|= TM_BIT_HAS_CATALOG_F;
- m_data_size+= m_catalog->name.length+2;
- }
/* If malloc fails, caught in is_valid() */
if ((m_memory= (uchar*) my_malloc(PSI_INSTRUMENT_ME, m_colcnt, MYF(MY_WME))))
@@ -5910,8 +5935,6 @@ int Table_map_log_event::do_apply_event(rpl_group_info *rgi)
/* Step the query id to mark what columns that are actually used. */
thd->set_query_id(next_query_id());
- if (thd->catalog != m_catalog)
- thd->change_catalog(const_cast<SQL_CATALOG*>(m_catalog));
if (!(memory= my_multi_malloc(PSI_INSTRUMENT_ME, MYF(MY_WME),
&table_list, (uint) sizeof(RPL_TABLE_LIST),
@@ -5938,7 +5961,7 @@ int Table_map_log_event::do_apply_event(rpl_group_info *rgi)
LEX_CSTRING tmp_db_name= {db_mem, db_mem_length };
LEX_CSTRING tmp_tbl_name= {tname_mem, tname_mem_length };
- table_list->init_one_table(m_catalog, &tmp_db_name, &tmp_tbl_name, 0,
+ table_list->init_one_table(thd->catalog, &tmp_db_name, &tmp_tbl_name, 0,
TL_WRITE);
table_list->table_id= DBUG_IF("inject_tblmap_same_id_maps_diff_table") ? 0 : m_table_id;
table_list->updating= 1;
@@ -6079,13 +6102,6 @@ bool Table_map_log_event::write_data_body()
uchar mbuf[MAX_INT_WIDTH];
uchar *const mbuf_end= net_store_length(mbuf, m_field_metadata_size);
- if (m_flags & TM_BIT_HAS_CATALOG_F)
- {
- uchar const len[]= { (uchar) m_catalog->name.length };
- if (write_data(len, 1) ||
- write_data(m_catalog->name.str, m_catalog->name.length+1))
- return 1;
- }
return write_data(dbuf, sizeof(dbuf)) ||
write_data(m_dbnam, m_dblen+1) ||
write_data(tbuf, sizeof(tbuf)) ||
--
2.39.2
1
0

[PATCH] Catalogs: replicate single catalog from catalog-enabled master
by Kristian Nielsen 16 Oct '24
by Kristian Nielsen 16 Oct '24
16 Oct '24
This implements the basics to do replication from a single catalog on a
catalog-enabled master ("the cloud") to a (not necessarily catalog-enabled)
slave ("on-premise"). Such slave must only see its own events, eg. events
from the catalog of the connecting user.
Changes done:
- Extend the GTID event with the catalog (a follow-up patch will change
replication slave-side to use this when applying events).
- In the dump thread, when a catalog user does a binlog dump (eg.
connecting slave), filter events so only events from that catalog are
sent.
- Extend the server version of the connector library to be able to connect
to a specific catalog.
- Add a Master_catalog option to CHANGE MASTER to make a slave connect to a
user in a specific catalog.
Open issues:
- Possibly there needs to be a way for the catalog superuser to control
which catalogs allow the COM_BINLOG_DUMP command.
- In GTID replication, how in the single-catalog slave to maintain the GTID
position in domains that it does not receive events from. One option is
to update the GTID position (eg. with the GTID_LIST events) in the other
domains; this way the slave will have the correct global GTID position,
but it will not be possible to use multi-source to replicate two catalogs
individually to the same server. Another option is to binlog each catalog
in their own domain(s) and do master-side filtering on domain_id; then
the master will ignore irrelevant domains for a connecting catalog slave
(and not report "position too old" in those domains), and the slave will
only have its own domains in its GTID position.
Signed-off-by: Kristian Nielsen <knielsen(a)knielsen-hq.org>
---
include/mysql.h | 7 +-
include/sql_common.h | 3 +-
mysql-test/suite/rpl/r/rpl_catalogs.result | 137 +++++++++++++++++++++
mysql-test/suite/rpl/t/rpl_catalogs.test | 119 ++++++++++++++++++
sql-common/client.c | 36 +++++-
sql/lex.h | 1 +
sql/lex_string.h | 2 +-
sql/log_event.cc | 20 +++
sql/log_event.h | 13 +-
sql/log_event_server.cc | 88 ++++++++++++-
sql/privilege.h | 7 +-
sql/rpl_mi.cc | 15 ++-
sql/rpl_mi.h | 1 +
sql/slave.cc | 3 +
sql/sql_lex.h | 4 +-
sql/sql_repl.cc | 37 ++++++
sql/sql_yacc.yy | 6 +
17 files changed, 483 insertions(+), 16 deletions(-)
create mode 100644 mysql-test/suite/rpl/r/rpl_catalogs.result
create mode 100644 mysql-test/suite/rpl/t/rpl_catalogs.test
diff --git a/include/mysql.h b/include/mysql.h
index a66dcc7bd02..44ca2099694 100644
--- a/include/mysql.h
+++ b/include/mysql.h
@@ -189,14 +189,15 @@ enum mysql_option
/* MariaDB options */
MYSQL_PROGRESS_CALLBACK=5999,
MYSQL_OPT_NONBLOCK,
- MYSQL_OPT_USE_THREAD_SPECIFIC_MEMORY
+ MYSQL_OPT_USE_THREAD_SPECIFIC_MEMORY,
+ MARIADB_OPT_CATALOG=7029
};
/**
@todo remove the "extension", move st_mysql_options completely
out of mysql.h
*/
-struct st_mysql_options_extention;
+struct st_mysql_options_extension;
struct st_mysql_options {
unsigned int connect_timeout, read_timeout, write_timeout;
@@ -231,7 +232,7 @@ struct st_mysql_options {
void (*local_infile_end)(void *);
int (*local_infile_error)(void *, char *, unsigned int);
void *local_infile_userdata;
- struct st_mysql_options_extention *extension;
+ struct st_mysql_options_extension *extension;
};
enum mysql_status
diff --git a/include/sql_common.h b/include/sql_common.h
index ad5ab7e19af..c261ee9fe94 100644
--- a/include/sql_common.h
+++ b/include/sql_common.h
@@ -28,7 +28,7 @@ extern const char *cant_connect_sqlstate;
extern const char *not_error_sqlstate;
-struct st_mysql_options_extention {
+struct st_mysql_options_extension {
char *plugin_dir;
char *default_auth;
char *ssl_crl; /* PEM CRL file */
@@ -41,6 +41,7 @@ struct st_mysql_options_extention {
uint proc_info_length);
HASH connection_attributes;
size_t connection_attributes_length;
+ char *catalog;
};
typedef struct st_mysql_methods
diff --git a/mysql-test/suite/rpl/r/rpl_catalogs.result b/mysql-test/suite/rpl/r/rpl_catalogs.result
new file mode 100644
index 00000000000..4b25fd20e6c
--- /dev/null
+++ b/mysql-test/suite/rpl/r/rpl_catalogs.result
@@ -0,0 +1,137 @@
+include/master-slave.inc
+[connection master]
+*** Create some catalogs to work with.
+connection master_config;
+SET SESSION sql_log_bin= 0;
+CREATE CATALOG foo;
+CREATE CATALOG bar;
+CREATE CATALOG baz;
+SET SESSION sql_log_bin= 1;
+connection slave_config;
+SET SESSION sql_log_bin= 0;
+CREATE CATALOG foo;
+CREATE CATALOG bar;
+CREATE CATALOG baz;
+SET SESSION sql_log_bin= 1;
+*** Replicate a few events full-catalogs -> full-catalogs
+connection master_config;
+USE CATALOG foo;
+CREATE DATABASE db1;
+use db1;
+CREATE TABLE t1 (a INT PRIMARY KEY);
+INSERT INTO t1 VALUES (1);
+USE CATALOG baz;
+CREATE DATABASE db3;
+use db3;
+CREATE TABLE t3 (a INT PRIMARY KEY);
+INSERT INTO t3 VALUES (1);
+sync_slave_with_master slave;
+connection slave_config;
+USE CATALOG foo;
+use db1;
+SELECT * FROM t1 ORDER BY a;
+a
+1
+USE CATALOG baz;
+SELECT * FROM db3.t3 ORDER BY a;
+a
+1
+*** Create a normal slave user to replicate single catalog
+connection slave_config;
+include/stop_slave.inc
+CHANGE MASTER TO master_user='rpl_bar', master_password='bar_pw',
+master_catalog="bar";
+connection master_config;
+USE CATALOG bar;
+CREATE USER rpl_bar@localhost;
+GRANT replication slave, select ON *.* TO rpl_bar@localhost IDENTIFIED BY "bar_pw";
+CREATE DATABASE db2;
+use db2;
+CREATE TABLE t2 (a INT PRIMARY KEY, b INT);
+INSERT INTO t2(a) VALUES (1), (2), (3), (4), (5);
+INSERT INTO t2 VALUES (6, 0), (7, 1), (8, 2);
+USE CATALOG foo;
+INSERT INTO db1.t1 VALUES (10), (20), (30);
+USE CATALOG baz;
+INSERT INTO db3.t3 VALUES (10), (20), (30);
+USE CATALOG bar;
+use db2;
+UPDATE t2 SET b=a*a WHERE b IS NULL;
+USE CATALOG bar;
+SELECT * FROM db2.t2 ORDER BY a;
+a b
+1 1
+2 4
+3 9
+4 16
+5 25
+6 0
+7 1
+8 2
+USE CATALOG foo;
+SELECT * FROM db1.t1 ORDER BY a;
+a
+1
+10
+20
+30
+USE CATALOG baz;
+SELECT * FROM db3.t3 ORDER BY a;
+a
+1
+10
+20
+30
+USE CATALOG def;
+use test;
+connect con1,localhost,rpl_bar,bar_pw,bar.db2;
+SELECT * FROM t2 ORDER BY a;
+a b
+1 1
+2 4
+3 9
+4 16
+5 25
+6 0
+7 1
+8 2
+disconnect con1;
+connection master;
+save_master_pos;
+connection slave_config;
+include/start_slave.inc
+connection slave;
+sync_with_master;
+connection slave_config;
+USE CATALOG bar;
+SELECT * FROM db2.t2 ORDER BY a;
+a b
+1 1
+2 4
+3 9
+4 16
+5 25
+6 0
+7 1
+8 2
+USE CATALOG foo;
+SELECT * FROM db1.t1 ORDER BY a;
+a
+1
+USE CATALOG baz;
+SELECT * FROM db3.t3 ORDER BY a;
+a
+1
+*** Clean up.
+connection slave_config;
+include/stop_slave.inc
+CHANGE MASTER TO master_user='root', master_password='', master_catalog='';
+include/start_slave.inc
+connection master_config;
+USE CATALOG def;
+use test;
+DROP CATALOG foo;
+DROP CATALOG bar;
+DROP CATALOG baz;
+connection master;
+include/rpl_end.inc
diff --git a/mysql-test/suite/rpl/t/rpl_catalogs.test b/mysql-test/suite/rpl/t/rpl_catalogs.test
new file mode 100644
index 00000000000..939e68b920d
--- /dev/null
+++ b/mysql-test/suite/rpl/t/rpl_catalogs.test
@@ -0,0 +1,119 @@
+--source include/have_catalogs.inc
+--source include/master-slave.inc
+
+--echo *** Create some catalogs to work with.
+--connection master_config
+# ToDo: It seems CREATE CATALOG cannot replicate currently.
+# So don't binlog it for now.
+SET SESSION sql_log_bin= 0;
+CREATE CATALOG foo;
+CREATE CATALOG bar;
+CREATE CATALOG baz;
+SET SESSION sql_log_bin= 1;
+
+--connection slave_config
+SET SESSION sql_log_bin= 0;
+CREATE CATALOG foo;
+CREATE CATALOG bar;
+CREATE CATALOG baz;
+SET SESSION sql_log_bin= 1;
+
+
+--echo *** Replicate a few events full-catalogs -> full-catalogs
+
+--connection master_config
+USE CATALOG foo;
+CREATE DATABASE db1;
+use db1;
+CREATE TABLE t1 (a INT PRIMARY KEY);
+INSERT INTO t1 VALUES (1);
+
+USE CATALOG baz;
+CREATE DATABASE db3;
+use db3;
+CREATE TABLE t3 (a INT PRIMARY KEY);
+INSERT INTO t3 VALUES (1);
+
+--sync_slave_with_master slave
+
+--connection slave_config
+USE CATALOG foo;
+use db1;
+SELECT * FROM t1 ORDER BY a;
+USE CATALOG baz;
+SELECT * FROM db3.t3 ORDER BY a;
+
+
+--echo *** Create a normal slave user to replicate single catalog
+--connection slave_config
+--source include/stop_slave.inc
+CHANGE MASTER TO master_user='rpl_bar', master_password='bar_pw',
+ master_catalog="bar";
+
+--connection master_config
+USE CATALOG bar;
+CREATE USER rpl_bar@localhost;
+GRANT replication slave, select ON *.* TO rpl_bar@localhost IDENTIFIED BY "bar_pw";
+
+CREATE DATABASE db2;
+use db2;
+CREATE TABLE t2 (a INT PRIMARY KEY, b INT);
+INSERT INTO t2(a) VALUES (1), (2), (3), (4), (5);
+INSERT INTO t2 VALUES (6, 0), (7, 1), (8, 2);
+
+# Do something in other catalogs, see that it is not replicated.
+USE CATALOG foo;
+INSERT INTO db1.t1 VALUES (10), (20), (30);
+USE CATALOG baz;
+INSERT INTO db3.t3 VALUES (10), (20), (30);
+
+USE CATALOG bar;
+use db2;
+UPDATE t2 SET b=a*a WHERE b IS NULL;
+
+USE CATALOG bar;
+SELECT * FROM db2.t2 ORDER BY a;
+USE CATALOG foo;
+SELECT * FROM db1.t1 ORDER BY a;
+USE CATALOG baz;
+SELECT * FROM db3.t3 ORDER BY a;
+
+USE CATALOG def;
+use test;
+
+connect(con1,localhost,rpl_bar,bar_pw,bar.db2);
+SELECT * FROM t2 ORDER BY a;
+--disconnect con1
+
+--connection master
+--save_master_pos
+
+--connection slave_config
+--source include/start_slave.inc
+--connection slave
+--sync_with_master
+
+--connection slave_config
+USE CATALOG bar;
+SELECT * FROM db2.t2 ORDER BY a;
+USE CATALOG foo;
+SELECT * FROM db1.t1 ORDER BY a;
+USE CATALOG baz;
+SELECT * FROM db3.t3 ORDER BY a;
+
+--echo *** Clean up.
+
+--connection slave_config
+--source include/stop_slave.inc
+CHANGE MASTER TO master_user='root', master_password='', master_catalog='';
+--source include/start_slave.inc
+
+--connection master_config
+USE CATALOG def;
+use test;
+DROP CATALOG foo;
+DROP CATALOG bar;
+DROP CATALOG baz;
+
+--connection master
+--source include/rpl_end.inc
diff --git a/sql-common/client.c b/sql-common/client.c
index ba804ce2a7c..711f8733c9f 100644
--- a/sql-common/client.c
+++ b/sql-common/client.c
@@ -99,6 +99,7 @@ extern my_bool using_catalogs;
#define CONNECT_TIMEOUT 0
+#define MAX_CATALOG_NAME 65 /* Including terminating '\0' */
#include "client_settings.h"
#include <ssl_compat.h>
@@ -836,9 +837,9 @@ static int add_init_command(struct st_mysql_options *options, const char *cmd)
#define ALLOCATE_EXTENSIONS(OPTS) \
- (OPTS)->extension= (struct st_mysql_options_extention *) \
+ (OPTS)->extension= (struct st_mysql_options_extension *) \
my_malloc(key_memory_mysql_options, \
- sizeof(struct st_mysql_options_extention), \
+ sizeof(struct st_mysql_options_extension), \
MYF(MY_WME | MY_ZEROFILL)) \
@@ -2089,7 +2090,8 @@ static int send_client_reply_packet(MCPVIO_EXT *mpvio,
see end= buff+32 below, fixed size of the packet is 32 bytes.
+9 because data is a length encoded binary where meta data size is max 9.
*/
- buff_size= 33 + USERNAME_LENGTH + data_len + 9 + NAME_LEN + NAME_LEN + connect_attrs_len + 9;
+ buff_size= 33 + USERNAME_LENGTH + data_len + 9 + NAME_LEN + NAME_LEN +
+ connect_attrs_len + 9 + MAX_CATALOG_NAME;
buff= my_alloca(buff_size);
mysql->client_flag|= mysql->options.client_flag;
@@ -2122,10 +2124,19 @@ static int send_client_reply_packet(MCPVIO_EXT *mpvio,
if (mysql->client_flag & CLIENT_PROTOCOL_41)
{
/* 4.1 server and 4.1 client has a 32 byte option flag */
+ if (!(mysql->server_capabilities & CLIENT_MYSQL))
+ mysql->client_flag&= ~CLIENT_MYSQL;
int4store(buff,mysql->client_flag);
int4store(buff+4, net->max_packet_size);
buff[8]= (char) mysql->charset->number;
bzero(buff+9, 32-9);
+ if (!(mysql->server_capabilities & CLIENT_MYSQL))
+ {
+ /* ToDo: Should this check if the server has the catalog capability? */
+ uint client_extended_cap= mysql->options.extension->catalog ?
+ (uint)((MARIADB_CLIENT_CONNECT_CATALOG) >> 32) : (uint)0;
+ int4store(buff + 28, client_extended_cap);
+ }
end= buff+32;
}
else
@@ -2274,6 +2285,17 @@ static int send_client_reply_packet(MCPVIO_EXT *mpvio,
end= (char *) send_client_connect_attrs(mysql, (uchar *) end);
+ /* Add catalog */
+ if (mysql->options.extension->catalog)
+ {
+ size_t len= strlen(mysql->options.extension->catalog);
+ if (len >= MAX_CATALOG_NAME)
+ len= MAX_CATALOG_NAME - 1;
+ end= (char*)write_length_encoded_string4(
+ (uchar*)end, buff_size - (end - buff),
+ (uchar *)mysql->options.extension->catalog, len);
+ }
+
/* Write authentication package */
if (my_net_write(net, (uchar*) buff, (size_t) (end-buff)) || net_flush(net))
{
@@ -3332,6 +3354,7 @@ static void mysql_close_free_options(MYSQL *mysql)
my_free(mysql->options.extension->plugin_dir);
my_free(mysql->options.extension->default_auth);
my_hash_free(&mysql->options.extension->connection_attributes);
+ my_free(mysql->options.extension->catalog);
my_free(mysql->options.extension);
}
bzero((char*) &mysql->options,sizeof(mysql->options));
@@ -3850,9 +3873,9 @@ mysql_options(MYSQL *mysql,enum mysql_option option, const void *arg)
break;
case MYSQL_PROGRESS_CALLBACK:
if (!mysql->options.extension)
- mysql->options.extension= (struct st_mysql_options_extention *)
+ mysql->options.extension= (struct st_mysql_options_extension *)
my_malloc(key_memory_mysql_options,
- sizeof(struct st_mysql_options_extention),
+ sizeof(struct st_mysql_options_extension),
MYF(MY_WME | MY_ZEROFILL));
if (mysql->options.extension)
mysql->options.extension->report_progress=
@@ -3918,6 +3941,9 @@ mysql_options(MYSQL *mysql,enum mysql_option option, const void *arg)
}
}
break;
+ case MARIADB_OPT_CATALOG:
+ EXTENSION_SET_STRING(&mysql->options, catalog, arg);
+ break;
case MYSQL_SHARED_MEMORY_BASE_NAME:
default:
DBUG_RETURN(1);
diff --git a/sql/lex.h b/sql/lex.h
index bf0223ce544..9771c0e39ba 100644
--- a/sql/lex.h
+++ b/sql/lex.h
@@ -367,6 +367,7 @@ SYMBOL symbols[] = {
{ "LOOP", SYM(LOOP_SYM)},
{ "LOW_PRIORITY", SYM(LOW_PRIORITY)},
{ "MASTER", SYM(MASTER_SYM)},
+ { "MASTER_CATALOG", SYM(MASTER_CATALOG_SYM)},
{ "MASTER_CONNECT_RETRY", SYM(MASTER_CONNECT_RETRY_SYM)},
{ "MASTER_DELAY", SYM(MASTER_DELAY_SYM)},
{ "MASTER_GTID_POS", SYM(MASTER_GTID_POS_SYM)},
diff --git a/sql/lex_string.h b/sql/lex_string.h
index e7a732346c4..f49e567c64c 100644
--- a/sql/lex_string.h
+++ b/sql/lex_string.h
@@ -71,7 +71,7 @@ static inline bool lex_string_cmp(CHARSET_INFO *charset, const LEX_CSTRING *a,
}
/*
- Compare to LEX_CSTRING's and return 0 if equal
+ Compare two LEX_CSTRING's and return 0 if equal
*/
static inline bool cmp(const LEX_CSTRING *a, const LEX_CSTRING *b)
diff --git a/sql/log_event.cc b/sql/log_event.cc
index 992054c7d17..004010e9d99 100644
--- a/sql/log_event.cc
+++ b/sql/log_event.cc
@@ -2448,6 +2448,26 @@ Gtid_log_event::Gtid_log_event(const uchar *buf, uint event_len,
sa_seq_no= uint8korr(buf);
buf+= 8;
}
+ if (flags_extra & FL_CATALOG)
+ {
+ if (unlikely(buf - buf_0) >= event_len)
+ {
+ seq_no= 0;
+ return;
+ }
+ uint32_t cat_len= *buf++;
+ if (unlikely(cat_len > MAX_CATALOG_NAME) ||
+ unlikely(buf - buf_0 + cat_len) >= event_len)
+ {
+ seq_no= 0;
+ return;
+ }
+ memcpy(cat_name_buf, buf, cat_len);
+ cat_name_int.str= cat_name_buf;
+ cat_name_int.length= cat_len;
+ cat_name= &cat_name_int;
+ buf+= cat_len;
+ }
}
/*
the strict '<' part of the assert corresponds to extra zero-padded
diff --git a/sql/log_event.h b/sql/log_event.h
index bd318f147d7..2f866efca7c 100644
--- a/sql/log_event.h
+++ b/sql/log_event.h
@@ -3250,13 +3250,16 @@ class Gtid_log_event: public Log_event
public:
uint64 seq_no;
uint64 commit_id;
- uint32 domain_id;
uint64 sa_seq_no; // start alter identifier for CA/RA
+ const LEX_CSTRING *cat_name; // Points either to catalog object or own buffer
#ifdef MYSQL_SERVER
event_xid_t xid;
#else
event_mysql_xid_t xid;
#endif
+ LEX_CSTRING cat_name_int;
+ uint32 domain_id;
+ char cat_name_buf[MAX_CATALOG_NAME];
uchar flags2;
/*
More flags area placed after the regular flags2's area. The type
@@ -3309,11 +3312,15 @@ class Gtid_log_event: public Log_event
FL_EXTRA_MULTI_ENGINE_E1 is set for event group comprising a transaction
involving multiple storage engines. No flag and extra data are added
to the event when the transaction involves only one engine.
+
+ FL_CATALOG is set when a catalog name is included in the GTID (happens
+ when not the default catalog).
*/
static const uchar FL_EXTRA_MULTI_ENGINE_E1= 1;
static const uchar FL_START_ALTER_E1= 2;
static const uchar FL_COMMIT_ALTER_E1= 4;
static const uchar FL_ROLLBACK_ALTER_E1= 8;
+ static const uchar FL_CATALOG= 16;
#ifdef MYSQL_SERVER
Gtid_log_event(THD *thd_arg, uint64 seq_no, uint32 domain_id, bool standalone,
@@ -3346,6 +3353,10 @@ class Gtid_log_event: public Log_event
enum enum_binlog_checksum_alg checksum_alg,
uint32 *domain_id, uint32 *server_id, uint64 *seq_no,
uchar *flags2, const Format_description_log_event *fdev);
+ static bool peek_catalog(const uchar *event_start, size_t event_len,
+ const Format_description_log_event *fdev,
+ enum enum_binlog_checksum_alg checksum_alg,
+ uchar *out_flags2, LEX_CSTRING *out_catname);
#endif
};
diff --git a/sql/log_event_server.cc b/sql/log_event_server.cc
index ff3a5bda8e5..22ad0a8b4a6 100644
--- a/sql/log_event_server.cc
+++ b/sql/log_event_server.cc
@@ -2936,6 +2936,13 @@ Gtid_log_event::Gtid_log_event(THD *thd_arg, uint64 seq_no_arg,
if (extra_engines > 0)
flags_extra|= FL_EXTRA_MULTI_ENGINE_E1;
}
+ const SQL_CATALOG *cat= thd_arg->catalog;
+ if (cat != default_catalog())
+ {
+ flags_extra|= FL_CATALOG;
+ cat_name= &cat->name;
+ }
+
if (thd->get_binlog_flags_for_alter())
{
flags_extra |= thd->get_binlog_flags_for_alter();
@@ -2982,10 +2989,80 @@ Gtid_log_event::peek(const uchar *event_start, size_t event_len,
}
+/*
+ Obtain the catalog (if any) in the GTID (without constructing the full
+ object).
+
+ This is a separate function from Gtid_log_event::peek(), since this function
+ needs to do a lot of parsing of flags etc. to know where the catalog is, and
+ this overhead is not wanted in the often-used Gtid_log_event::peek(). But if
+ more peek-functionality would be needed in the future, it could make sense to
+ add it to this function which already has the parsing overhead.
+
+ Returns true if error (malformed or short event), false if ok. Returns the
+ name of the default catalog if catalog is not included explicitly in the GTID.
+
+ Note that the returned out_catname will point into the passed-in packet
+ memory, so will only be valid as long as the packet memory is!
+*/
+bool
+Gtid_log_event::peek_catalog(const uchar *event_start, size_t event_len,
+ const Format_description_log_event *fdev,
+ enum enum_binlog_checksum_alg checksum_alg,
+ uchar *out_flags2, LEX_CSTRING *out_catname)
+{
+ if (checksum_alg == BINLOG_CHECKSUM_ALG_CRC32)
+ {
+ if (event_len > BINLOG_CHECKSUM_LEN)
+ event_len-= BINLOG_CHECKSUM_LEN;
+ else
+ event_len= 0;
+ }
+ else
+ DBUG_ASSERT(checksum_alg == BINLOG_CHECKSUM_ALG_UNDEF ||
+ checksum_alg == BINLOG_CHECKSUM_ALG_OFF);
+
+ if (event_len < (uint32)fdev->common_header_len + GTID_HEADER_LEN)
+ return true;
+ const uchar *p= event_start + fdev->common_header_len;
+ const uchar *p_end= event_start + event_len;
+ uchar flags2= *out_flags2= p[12];
+ p+= 13; /* seq_no, domain_id, and flags2. */
+ if (flags2 & FL_GROUP_COMMIT_ID)
+ p+= 8;
+ if (flags2 & (FL_PREPARED_XA | FL_COMPLETED_XA))
+ {
+ if (p + 6 > p_end)
+ return true;
+ p+= 6 + p[4] + p[5];
+ }
+ uchar flags_extra;
+ if (p >= p_end || !((flags_extra= *p) & FL_CATALOG))
+ {
+ *out_catname= default_catalog_name;
+ return false;
+ }
+ ++p;
+
+ if (flags_extra & FL_EXTRA_MULTI_ENGINE_E1)
+ ++p;
+ if (flags_extra & (FL_COMMIT_ALTER_E1 | FL_ROLLBACK_ALTER_E1))
+ p+= 8;
+
+ uchar cat_len;
+ if (p >= p_end || (p + (cat_len= *p)) >= p_end)
+ return true;
+ out_catname->str= (const char *)p+1;
+ out_catname->length= cat_len;
+
+ return false;
+}
+
+
bool
Gtid_log_event::write()
{
- uchar buf[GTID_HEADER_LEN+2+sizeof(XID) + /* flags_extra: */ 1+4];
+ uchar buf[GTID_HEADER_LEN+2+sizeof(XID) + /* flags_extra: */ 1+1+8+MAX_CATALOG_NAME];
size_t write_len= 13;
int8store(buf, seq_no);
@@ -3026,6 +3103,15 @@ Gtid_log_event::write()
write_len+= 8;
}
+ if (flags_extra & FL_CATALOG)
+ {
+ uint32_t cat_len= std::min(cat_name->length, (size_t)(MAX_CATALOG_NAME-1));
+ DBUG_ASSERT(cat_name->length <= MAX_CATALOG_NAME-1);
+ buf[write_len++]= cat_len;
+ memcpy(buf + write_len, cat_name->str, cat_len);
+ write_len+= cat_len;
+ }
+
if (write_len < GTID_HEADER_LEN)
{
bzero(buf+write_len, GTID_HEADER_LEN-write_len);
diff --git a/sql/privilege.h b/sql/privilege.h
index 953ffb177c1..16d7f91f27d 100644
--- a/sql/privilege.h
+++ b/sql/privilege.h
@@ -305,7 +305,12 @@ constexpr privilege_t CATALOG_ACLS=
CATALOG_ACL |
SHUTDOWN_ACL |
CREATE_TABLESPACE_ACL |
- REPL_SLAVE_ACL |
+ /*
+ ToDo: REPL_SLAVE_ACL is needed to be able to replicate from a single
+ catalog to an on-premise slave. However, we may need a way for the catalog
+ superuser to control replication access for a catalog.
+ */
+// REPL_SLAVE_ACL |
BINLOG_ADMIN_ACL |
BINLOG_MONITOR_ACL |
// BINLOG_REPLAY_ACL |
diff --git a/sql/rpl_mi.cc b/sql/rpl_mi.cc
index 3c698f27a19..6a68254052a 100644
--- a/sql/rpl_mi.cc
+++ b/sql/rpl_mi.cc
@@ -47,6 +47,7 @@ Master_info::Master_info(LEX_CSTRING *connection_name_arg,
{
char *tmp;
host[0] = 0; user[0] = 0; password[0] = 0;
+ catalog_name[0]= 0;
ssl_ca[0]= 0; ssl_capath[0]= 0; ssl_cert[0]= 0;
ssl_cipher[0]= 0; ssl_key[0]= 0;
ssl_crl[0]= 0; ssl_crlpath[0]= 0;
@@ -644,6 +645,17 @@ file '%s')", fname);
}
seen_ignore_domain_ids= true;
}
+ else if (got_eq && !strcmp(buf, "master_catalog"))
+ {
+ if (init_strvar_from_file(mi->catalog_name,
+ sizeof(mi->catalog_name),
+ &mi->file, ""))
+
+ {
+ sql_print_error("Failed to initialize master info do_domain_ids");
+ goto errwithmsg;
+ }
+ }
else if (!got_eq && !strcmp(buf, "END_MARKER"))
{
/*
@@ -817,6 +829,7 @@ int flush_master_info(Master_info* mi,
"using_gtid=%d\n"
"do_domain_ids=%s\n"
"ignore_domain_ids=%s\n"
+ "master_catalog=%s\n"
"END_MARKER\n",
LINES_IN_MASTER_INFO,
mi->master_log_name, llstr(mi->master_log_pos, lbuf),
@@ -827,7 +840,7 @@ int flush_master_info(Master_info* mi,
heartbeat_buf, "", ignore_server_ids_buf,
"", 0,
mi->ssl_crl, mi->ssl_crlpath, mi->using_gtid,
- do_domain_ids_buf, ignore_domain_ids_buf);
+ do_domain_ids_buf, ignore_domain_ids_buf, mi->catalog_name);
err= flush_io_cache(file);
if (sync_masterinfo_period && !err &&
++(mi->sync_counter) >= sync_masterinfo_period)
diff --git a/sql/rpl_mi.h b/sql/rpl_mi.h
index 6058b7fb34c..98f26a4d050 100644
--- a/sql/rpl_mi.h
+++ b/sql/rpl_mi.h
@@ -213,6 +213,7 @@ class Master_info : public Slave_reporting_capability
/* the variables below are needed because we can change masters on the fly */
char master_log_name[FN_REFLEN+6]; /* Room for multi-*/
char host[HOSTNAME_LENGTH*SYSTEM_CHARSET_MBMAXLEN+1];
+ char catalog_name[MAX_CATALOG_NAME];
char user[USERNAME_LENGTH+1];
char password[MAX_PASSWORD_LENGTH*SYSTEM_CHARSET_MBMAXLEN+1];
LEX_CSTRING connection_name; /* User supplied connection name */
diff --git a/sql/slave.cc b/sql/slave.cc
index 28b183fbd69..02f58777732 100644
--- a/sql/slave.cc
+++ b/sql/slave.cc
@@ -7140,6 +7140,9 @@ static int connect_to_master(THD* thd, MYSQL* mysql, Master_info* mi,
if (opt_plugin_dir_ptr && *opt_plugin_dir_ptr)
mysql_options(mysql, MYSQL_PLUGIN_DIR, opt_plugin_dir_ptr);
+ if (mi->catalog_name[0])
+ mysql_options(mysql, MARIADB_OPT_CATALOG, mi->catalog_name);
+
/* we disallow empty users */
if (mi->user[0] == 0)
{
diff --git a/sql/sql_lex.h b/sql/sql_lex.h
index adcf45aea93..2d540c948a2 100644
--- a/sql/sql_lex.h
+++ b/sql/sql_lex.h
@@ -487,7 +487,7 @@ struct LEX_MASTER_INFO
DYNAMIC_ARRAY repl_ignore_server_ids;
DYNAMIC_ARRAY repl_do_domain_ids;
DYNAMIC_ARRAY repl_ignore_domain_ids;
- const char *host, *user, *password, *log_file_name;
+ const char *host, *catalog, *user, *password, *log_file_name;
const char *ssl_key, *ssl_cert, *ssl_ca, *ssl_capath, *ssl_cipher;
const char *ssl_crl, *ssl_crlpath;
const char *relay_log_name;
@@ -533,7 +533,7 @@ struct LEX_MASTER_INFO
delete_dynamic(&repl_ignore_domain_ids);
}
- host= user= password= log_file_name= ssl_key= ssl_cert= ssl_ca=
+ host= catalog= user= password= log_file_name= ssl_key= ssl_cert= ssl_ca=
ssl_capath= ssl_cipher= ssl_crl= ssl_crlpath= relay_log_name= NULL;
pos= relay_log_pos= server_id= port= connect_retry= 0;
heartbeat_period= 0;
diff --git a/sql/sql_repl.cc b/sql/sql_repl.cc
index dc27ab9ff8b..c3c6eadc0b3 100644
--- a/sql/sql_repl.cc
+++ b/sql/sql_repl.cc
@@ -129,6 +129,7 @@ struct binlog_send_info {
slave_connection_state *until_gtid_state;
slave_connection_state until_gtid_state_obj;
Format_description_log_event *fdev;
+ const SQL_CATALOG *catalog_filter;
int mariadb_slave_capability;
enum_gtid_skip_type gtid_skip_group;
enum_gtid_until_state gtid_until_group;
@@ -167,6 +168,7 @@ struct binlog_send_info {
char *lfn)
: thd(thd_arg), net(&thd_arg->net), packet(packet_arg),
log_file_name(lfn), until_gtid_state(NULL), fdev(NULL),
+ catalog_filter(NULL),
gtid_skip_group(GTID_SKIP_NOT), gtid_until_group(GTID_UNTIL_NOT_DONE),
flags(flags_arg), current_checksum_alg(BINLOG_CHECKSUM_ALG_UNDEF),
slave_gtid_strict_mode(false), send_fake_gtid_list(false),
@@ -459,6 +461,17 @@ inline void fix_checksum(enum_binlog_checksum_alg checksum_alg, String *packet,
}
+static const SQL_CATALOG *get_catalog_filter(THD *thd)
+{
+ if (!using_catalogs)
+ return nullptr;
+ if ((thd->security_ctx->master_access & CATALOG_ACL) &&
+ thd->catalog == default_catalog())
+ return nullptr;
+ return thd->catalog;
+}
+
+
static user_var_entry * get_binlog_checksum_uservar(THD * thd)
{
LEX_CSTRING name= { STRING_WITH_LEN("master_binlog_checksum")};
@@ -1751,6 +1764,26 @@ send_event_to_slave(binlog_send_info *info, Log_event_type event_type,
}
}
+ if (info->catalog_filter && event_type == GTID_EVENT)
+ {
+ uchar flags2;
+ LEX_CSTRING cat_name;
+ if (ev_offset > len || Gtid_log_event::peek_catalog(
+ (uchar*) packet->ptr()+ev_offset, len - ev_offset,
+ info->fdev, current_checksum_alg, &flags2, &cat_name))
+ {
+ info->error= ER_MASTER_FATAL_ERROR_READING_BINLOG;
+ return "Failed to read Gtid_log_event: corrupt binlog";
+ }
+
+ if (cmp(&info->catalog_filter->name, &cat_name))
+ {
+ /* Skip this event group as it doesn't match the user's catalog. */
+ info->gtid_skip_group= (flags2 & Gtid_log_event::FL_STANDALONE ?
+ GTID_SKIP_STANDALONE : GTID_SKIP_TRANSACTION);
+ }
+ }
+
/* Skip GTID event groups until we reach slave position within a domain_id. */
if (event_type == GTID_EVENT && info->using_gtid_state)
{
@@ -2128,6 +2161,7 @@ static int init_binlog_sender(binlog_send_info *info,
/** init last pos */
info->last_pos= *pos;
+ info->catalog_filter= get_catalog_filter(thd);
info->current_checksum_alg= get_binlog_checksum_value_at_connect(thd);
info->mariadb_slave_capability= get_mariadb_slave_capability(thd);
info->using_gtid_state= get_slave_connect_state(thd, &connect_gtid_state);
@@ -3729,6 +3763,9 @@ bool change_master(THD* thd, Master_info* mi, bool *master_info_added)
if (get_string_parameter(mi->host, lex_mi->host, sizeof(mi->host)-1,
"MASTER_HOST", system_charset_info) ||
+ get_string_parameter(mi->catalog_name, lex_mi->catalog,
+ sizeof(mi->catalog_name)-1, "MASTER_CATALOG",
+ system_charset_info) ||
get_string_parameter(mi->user, lex_mi->user, sizeof(mi->user)-1,
"MASTER_USER", system_charset_info) ||
get_string_parameter(mi->password, lex_mi->password,
diff --git a/sql/sql_yacc.yy b/sql/sql_yacc.yy
index 98e18d32e0f..8e3c93cbe51 100644
--- a/sql/sql_yacc.yy
+++ b/sql/sql_yacc.yy
@@ -923,6 +923,7 @@ bool my_yyoverflow(short **a, YYSTYPE **b, size_t *yystacksize);
%token <kwd> LOCKS_SYM
%token <kwd> LOGFILE_SYM
%token <kwd> LOGS_SYM
+%token <kwd> MASTER_CATALOG_SYM
%token <kwd> MASTER_CONNECT_RETRY_SYM
%token <kwd> MASTER_DELAY_SYM
%token <kwd> MASTER_GTID_POS_SYM
@@ -2126,6 +2127,10 @@ master_def:
{
Lex->mi.host = $3.str;
}
+ | MASTER_CATALOG_SYM '=' TEXT_STRING_sys
+ {
+ Lex->mi.catalog = $3.str;
+ }
| MASTER_USER_SYM '=' TEXT_STRING_sys
{
Lex->mi.user = $3.str;
@@ -15988,6 +15993,7 @@ keyword_sp_var_and_label:
| MASTER_HEARTBEAT_PERIOD_SYM
| MASTER_GTID_POS_SYM
| MASTER_HOST_SYM
+ | MASTER_CATALOG_SYM
| MASTER_PORT_SYM
| MASTER_LOG_FILE_SYM
| MASTER_LOG_POS_SYM
--
2.39.2
1
0

[PATCH] MDEV-35019 Provide a way to enable "rollback XA on disconnect" behavior we had before 10.5.2
by Kristian Nielsen 10 Oct '24
by Kristian Nielsen 10 Oct '24
10 Oct '24
Implement variable legacy_xa_rollback_at_disconnect to support
backwards compatibility for applications that rely on the pre-10.5
behavior for connection disconnect, which is to rollback the
transaction (in violation of the XA specification).
Signed-off-by: Kristian Nielsen <knielsen(a)knielsen-hq.org>
---
mysql-test/main/mysqld--help.result | 8 +++++++
mysql-test/main/xa.result | 16 +++++++++++++
mysql-test/main/xa.test | 19 +++++++++++++++
mysql-test/suite/rpl/r/rpl_xa.result | 24 +++++++++++++++++++
.../rpl/r/rpl_xa_gtid_pos_auto_engine.result | 24 +++++++++++++++++++
mysql-test/suite/rpl/t/rpl_xa.inc | 23 ++++++++++++++++++
.../sys_vars/r/sysvars_server_embedded.result | 10 ++++++++
.../r/sysvars_server_notembedded.result | 10 ++++++++
sql/sql_class.cc | 7 +++++-
sql/sql_class.h | 1 +
sql/sys_vars.cc | 10 ++++++++
sql/xa.h | 1 +
12 files changed, 152 insertions(+), 1 deletion(-)
diff --git a/mysql-test/main/mysqld--help.result b/mysql-test/main/mysqld--help.result
index 1a2a740cb77..ac5ab0a88b1 100644
--- a/mysql-test/main/mysqld--help.result
+++ b/mysql-test/main/mysqld--help.result
@@ -433,6 +433,13 @@ The following specify which files/extra groups are read (specified before remain
--lc-time-names=name
Set the language used for the month names and the days of
the week.
+ --legacy-xa-rollback-at-disconnect
+ If a user session disconnects after putting a transaction
+ into the XA PREPAREd state, roll back the transaction.
+ Can be used for backwards compatibility to enable this
+ pre-10.5 behavior for applications that expect it. Note
+ that this violates the XA specification and should not be
+ used for new code
--local-infile Enable LOAD DATA LOCAL INFILE
(Defaults to on; use --skip-local-infile to disable.)
--lock-wait-timeout=#
@@ -1566,6 +1573,7 @@ large-pages FALSE
lc-messages en_US
lc-messages-dir MYSQL_SHAREDIR/
lc-time-names en_US
+legacy-xa-rollback-at-disconnect FALSE
local-infile TRUE
lock-wait-timeout 86400
log-bin foo
diff --git a/mysql-test/main/xa.result b/mysql-test/main/xa.result
index faabf1cf695..58ad2a54edd 100644
--- a/mysql-test/main/xa.result
+++ b/mysql-test/main/xa.result
@@ -592,6 +592,22 @@ formatID gtrid_length bqual_length data
xa rollback '4';
ERROR XA100: XA_RBROLLBACK: Transaction branch was rolled back
set @@global.read_only=@sav_read_only;
+# MDEV-35019: Provide a way to enable "rollback XA on disconnect" behavior we had before 10.5.2
+# Test legacy_xa_rollback_at_disconnect option.
+CREATE TABLE t1 (a INT PRIMARY KEY, b INT) ENGINE=InnoDB;
+INSERT INTO t1 VALUES (1, 0);
+connect con1,localhost,root,,;
+SET SESSION legacy_xa_rollback_at_disconnect= 1;
+XA START '5';
+INSERT INTO t1 VALUES (2, 0);
+XA END '5';
+XA PREPARE '5';
+disconnect con1;
+connection default;
+INSERT INTO t1 VALUES (3, 0);
+XA ROLLBACK '5';
+ERROR XAE04: XAER_NOTA: Unknown XID
+DROP TABLE t1;
#
# End of 10.5 tests
#
diff --git a/mysql-test/main/xa.test b/mysql-test/main/xa.test
index e1ca39be9ab..dfc97002168 100644
--- a/mysql-test/main/xa.test
+++ b/mysql-test/main/xa.test
@@ -747,6 +747,25 @@ xa rollback '4';
set @@global.read_only=@sav_read_only;
+--echo # MDEV-35019: Provide a way to enable "rollback XA on disconnect" behavior we had before 10.5.2
+--echo # Test legacy_xa_rollback_at_disconnect option.
+CREATE TABLE t1 (a INT PRIMARY KEY, b INT) ENGINE=InnoDB;
+INSERT INTO t1 VALUES (1, 0);
+connect (con1,localhost,root,,);
+SET SESSION legacy_xa_rollback_at_disconnect= 1;
+XA START '5';
+INSERT INTO t1 VALUES (2, 0);
+XA END '5';
+XA PREPARE '5';
+disconnect con1;
+
+connection default;
+--source include/wait_until_count_sessions.inc
+INSERT INTO t1 VALUES (3, 0);
+--error ER_XAER_NOTA
+XA ROLLBACK '5';
+DROP TABLE t1;
+
--echo #
--echo # End of 10.5 tests
--echo #
diff --git a/mysql-test/suite/rpl/r/rpl_xa.result b/mysql-test/suite/rpl/r/rpl_xa.result
index 061c7b360d0..7b58d3d6e06 100644
--- a/mysql-test/suite/rpl/r/rpl_xa.result
+++ b/mysql-test/suite/rpl/r/rpl_xa.result
@@ -280,4 +280,28 @@ disconnect con1;
connection master;
xa commit '1';
drop table t2, t1;
+# MDEV-35019 Provide a way to enable "rollback XA on disconnect" behavior we had before 10.5.2
+CREATE TABLE t1 (a INT PRIMARY KEY) ENGINE=InnoDB;
+INSERT INTO t1 VALUES (1), (3);
+connect con1, localhost,root;
+SET SESSION legacy_xa_rollback_at_disconnect= 1;
+XA START '3';
+INSERT INTO t1 VALUES (2);
+XA END '3';
+XA PREPARE '3';
+disconnect con1;
+connection master;
+include/save_master_gtid.inc
+SELECT * FROM t1 ORDER BY a;
+a
+1
+3
+connection slave;
+include/sync_with_master_gtid.inc
+SELECT * FROM t1 ORDER BY a;
+a
+1
+3
+connection master;
+DROP TABLE t1;
include/rpl_end.inc
diff --git a/mysql-test/suite/rpl/r/rpl_xa_gtid_pos_auto_engine.result b/mysql-test/suite/rpl/r/rpl_xa_gtid_pos_auto_engine.result
index 35625cc7026..2cb9b6c4290 100644
--- a/mysql-test/suite/rpl/r/rpl_xa_gtid_pos_auto_engine.result
+++ b/mysql-test/suite/rpl/r/rpl_xa_gtid_pos_auto_engine.result
@@ -289,6 +289,30 @@ disconnect con1;
connection master;
xa commit '1';
drop table t2, t1;
+# MDEV-35019 Provide a way to enable "rollback XA on disconnect" behavior we had before 10.5.2
+CREATE TABLE t1 (a INT PRIMARY KEY) ENGINE=InnoDB;
+INSERT INTO t1 VALUES (1), (3);
+connect con1, localhost,root;
+SET SESSION legacy_xa_rollback_at_disconnect= 1;
+XA START '3';
+INSERT INTO t1 VALUES (2);
+XA END '3';
+XA PREPARE '3';
+disconnect con1;
+connection master;
+include/save_master_gtid.inc
+SELECT * FROM t1 ORDER BY a;
+a
+1
+3
+connection slave;
+include/sync_with_master_gtid.inc
+SELECT * FROM t1 ORDER BY a;
+a
+1
+3
+connection master;
+DROP TABLE t1;
connection slave;
include/stop_slave.inc
SET @@global.gtid_pos_auto_engines="";
diff --git a/mysql-test/suite/rpl/t/rpl_xa.inc b/mysql-test/suite/rpl/t/rpl_xa.inc
index d22d2d2ef3d..050b9597ccc 100644
--- a/mysql-test/suite/rpl/t/rpl_xa.inc
+++ b/mysql-test/suite/rpl/t/rpl_xa.inc
@@ -431,3 +431,26 @@ disconnect con1;
connection master;
xa commit '1';
drop table t2, t1;
+
+--echo # MDEV-35019 Provide a way to enable "rollback XA on disconnect" behavior we had before 10.5.2
+CREATE TABLE t1 (a INT PRIMARY KEY) ENGINE=InnoDB;
+INSERT INTO t1 VALUES (1), (3);
+
+connect con1, localhost,root;
+SET SESSION legacy_xa_rollback_at_disconnect= 1;
+XA START '3';
+INSERT INTO t1 VALUES (2);
+XA END '3';
+XA PREPARE '3';
+--disconnect con1
+
+--connection master
+--source include/save_master_gtid.inc
+SELECT * FROM t1 ORDER BY a;
+
+--connection slave
+--source include/sync_with_master_gtid.inc
+SELECT * FROM t1 ORDER BY a;
+
+--connection master
+DROP TABLE t1;
diff --git a/mysql-test/suite/sys_vars/r/sysvars_server_embedded.result b/mysql-test/suite/sys_vars/r/sysvars_server_embedded.result
index fcd1ee9a141..d45ab7f80c8 100644
--- a/mysql-test/suite/sys_vars/r/sysvars_server_embedded.result
+++ b/mysql-test/suite/sys_vars/r/sysvars_server_embedded.result
@@ -1532,6 +1532,16 @@ NUMERIC_BLOCK_SIZE NULL
ENUM_VALUE_LIST NULL
READ_ONLY NO
COMMAND_LINE_ARGUMENT NULL
+VARIABLE_NAME LEGACY_XA_ROLLBACK_AT_DISCONNECT
+VARIABLE_SCOPE SESSION
+VARIABLE_TYPE BOOLEAN
+VARIABLE_COMMENT If a user session disconnects after putting a transaction into the XA PREPAREd state, roll back the transaction. Can be used for backwards compatibility to enable this pre-10.5 behavior for applications that expect it. Note that this violates the XA specification and should not be used for new code
+NUMERIC_MIN_VALUE NULL
+NUMERIC_MAX_VALUE NULL
+NUMERIC_BLOCK_SIZE NULL
+ENUM_VALUE_LIST OFF,ON
+READ_ONLY NO
+COMMAND_LINE_ARGUMENT OPTIONAL
VARIABLE_NAME LICENSE
VARIABLE_SCOPE GLOBAL
VARIABLE_TYPE VARCHAR
diff --git a/mysql-test/suite/sys_vars/r/sysvars_server_notembedded.result b/mysql-test/suite/sys_vars/r/sysvars_server_notembedded.result
index 1d5e9499c7a..73e853f12a5 100644
--- a/mysql-test/suite/sys_vars/r/sysvars_server_notembedded.result
+++ b/mysql-test/suite/sys_vars/r/sysvars_server_notembedded.result
@@ -1642,6 +1642,16 @@ NUMERIC_BLOCK_SIZE NULL
ENUM_VALUE_LIST NULL
READ_ONLY NO
COMMAND_LINE_ARGUMENT NULL
+VARIABLE_NAME LEGACY_XA_ROLLBACK_AT_DISCONNECT
+VARIABLE_SCOPE SESSION
+VARIABLE_TYPE BOOLEAN
+VARIABLE_COMMENT If a user session disconnects after putting a transaction into the XA PREPAREd state, roll back the transaction. Can be used for backwards compatibility to enable this pre-10.5 behavior for applications that expect it. Note that this violates the XA specification and should not be used for new code
+NUMERIC_MIN_VALUE NULL
+NUMERIC_MAX_VALUE NULL
+NUMERIC_BLOCK_SIZE NULL
+ENUM_VALUE_LIST OFF,ON
+READ_ONLY NO
+COMMAND_LINE_ARGUMENT OPTIONAL
VARIABLE_NAME LICENSE
VARIABLE_SCOPE GLOBAL
VARIABLE_TYPE VARCHAR
diff --git a/sql/sql_class.cc b/sql/sql_class.cc
index 7913f1a40d2..cd0418553fa 100644
--- a/sql/sql_class.cc
+++ b/sql/sql_class.cc
@@ -1552,7 +1552,12 @@ void THD::cleanup(void)
close_temporary_tables();
if (transaction->xid_state.is_explicit_XA())
- trans_xa_detach(this);
+ {
+ if (unlikely(variables.legacy_xa_rollback_at_disconnect))
+ xa_trans_force_rollback(this);
+ else
+ trans_xa_detach(this);
+ }
else
trans_rollback(this);
diff --git a/sql/sql_class.h b/sql/sql_class.h
index 7e5d9ac96e3..c0e304dd95f 100644
--- a/sql/sql_class.h
+++ b/sql/sql_class.h
@@ -846,6 +846,7 @@ typedef struct system_variables
uint in_subquery_conversion_threshold;
ulong optimizer_max_sel_arg_weight;
ulonglong max_rowid_filter_size;
+ my_bool legacy_xa_rollback_at_disconnect;
vers_asof_timestamp_t vers_asof_timestamp;
ulong vers_alter_history;
diff --git a/sql/sys_vars.cc b/sql/sys_vars.cc
index 9a4180ae000..18374544126 100644
--- a/sql/sys_vars.cc
+++ b/sql/sys_vars.cc
@@ -6761,3 +6761,13 @@ static Sys_var_ulonglong Sys_max_rowid_filter_size(
SESSION_VAR(max_rowid_filter_size), CMD_LINE(REQUIRED_ARG),
VALID_RANGE(1024, (ulonglong)~(intptr)0), DEFAULT(128*1024),
BLOCK_SIZE(1));
+
+static Sys_var_mybool Sys_legacy_xa_rollback_at_disconnect(
+ "legacy_xa_rollback_at_disconnect",
+ "If a user session disconnects after putting a transaction into the XA "
+ "PREPAREd state, roll back the transaction. Can be used for backwards "
+ "compatibility to enable this pre-10.5 behavior for applications that "
+ "expect it. Note that this violates the XA specification and should not "
+ "be used for new code",
+ SESSION_VAR(legacy_xa_rollback_at_disconnect), CMD_LINE(OPT_ARG),
+ DEFAULT(FALSE), NO_MUTEX_GUARD, NOT_IN_BINLOG);
diff --git a/sql/xa.h b/sql/xa.h
index 0b2d0696642..4260033d0fb 100644
--- a/sql/xa.h
+++ b/sql/xa.h
@@ -45,6 +45,7 @@ bool xid_cache_insert(XID *xid);
bool xid_cache_insert(THD *thd, XID_STATE *xid_state, XID *xid);
void xid_cache_delete(THD *thd, XID_STATE *xid_state);
+bool xa_trans_force_rollback(THD *thd);
bool trans_xa_start(THD *thd);
bool trans_xa_end(THD *thd);
bool trans_xa_prepare(THD *thd);
--
2.39.2
1
0

03 Sep '24
If a slave replicating an event has waited for more than
@@slave_abort_blocking_timeout for a conflicting metadata lock held by a
non-replication thread, the blocking query is killed to allow replication to
proceed and not be blocked indefinitely by a user query.
Signed-off-by: Kristian Nielsen <knielsen(a)knielsen-hq.org>
---
.../rpl/r/slave_abort_blocking_timeout.result | 74 ++++++++++++++++
.../rpl/t/slave_abort_blocking_timeout.test | 85 +++++++++++++++++++
sql/mdl.cc | 43 ++++++++--
sql/mdl.h | 3 +-
sql/mysqld.cc | 1 +
sql/mysqld.h | 1 +
sql/privilege.h | 2 +
sql/sql_base.cc | 2 +-
sql/sql_class.cc | 17 ++--
sql/sql_class.h | 3 +-
sql/sys_vars.cc | 11 +++
11 files changed, 225 insertions(+), 17 deletions(-)
create mode 100644 mysql-test/suite/rpl/r/slave_abort_blocking_timeout.result
create mode 100644 mysql-test/suite/rpl/t/slave_abort_blocking_timeout.test
diff --git a/mysql-test/suite/rpl/r/slave_abort_blocking_timeout.result b/mysql-test/suite/rpl/r/slave_abort_blocking_timeout.result
new file mode 100644
index 00000000000..911ea4b070e
--- /dev/null
+++ b/mysql-test/suite/rpl/r/slave_abort_blocking_timeout.result
@@ -0,0 +1,74 @@
+include/master-slave.inc
+[connection master]
+*** Testcase to show how a long-running SELECT can block replication from proceeding
+*** past a DDL. Intention to implement a timeout after which such SELECT can be
+*** killed.
+connection master;
+CREATE TABLE t1 (a INT PRIMARY KEY, b INT) ENGINE=InnoDB;
+INSERT INTO t1 SELECT seq, 100+seq FROM seq_1_to_20;
+connection slave;
+include/stop_slave.inc
+SELECT @@GLOBAL.slave_abort_blocking_timeout;
+@@GLOBAL.slave_abort_blocking_timeout
+31536000.000000
+SET @old_abort_timeout= @@slave_abort_blocking_timeout;
+SET GLOBAL slave_abort_blocking_timeout= -1;
+Warnings:
+Warning 1292 Truncated incorrect slave_abort_blocking_timeout value: '-1'
+SELECT @@GLOBAL.slave_abort_blocking_timeout;
+@@GLOBAL.slave_abort_blocking_timeout
+0.000000
+SET GLOBAL slave_abort_blocking_timeout= 1.0;
+SELECT @@GLOBAL.slave_abort_blocking_timeout;
+@@GLOBAL.slave_abort_blocking_timeout
+1.000000
+connection server_2;
+SELECT X.a, SLEEP(IF((X.b MOD 2)=0, 0.4, 0.6)) FROM t1 X CROSS JOIN t1 Y;
+connection slave;
+connection master;
+UPDATE t1 SET b=b+1000 WHERE a=1;
+ALTER TABLE t1 ADD INDEX b_idx(b);
+UPDATE t1 SET b=b+1000 WHERE a=20;
+connection slave;
+include/start_slave.inc
+connection server_2;
+ERROR 70100: Query execution was interrupted
+connection slave;
+SHOW CREATE TABLE t1;
+Table t1
+Create Table CREATE TABLE `t1` (
+ `a` int(11) NOT NULL,
+ `b` int(11) DEFAULT NULL,
+ PRIMARY KEY (`a`),
+ KEY `b_idx` (`b`)
+) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_uca1400_ai_ci
+include/stop_slave.inc
+SET GLOBAL slave_abort_blocking_timeout= 0;
+SELECT @@GLOBAL.slave_abort_blocking_timeout;
+@@GLOBAL.slave_abort_blocking_timeout
+0.000000
+connection server_2;
+SELECT X.a, SLEEP(IF((X.b MOD 2)=0, 0.4, 0.6)) FROM t1 X CROSS JOIN t1 Y;
+connection slave;
+connection master;
+UPDATE t1 SET b=b+1000 WHERE a=1;
+ALTER TABLE t1 DROP INDEX b_idx;
+UPDATE t1 SET b=b+1000 WHERE a=20;
+connection slave;
+include/start_slave.inc
+connection server_2;
+ERROR 70100: Query execution was interrupted
+connection slave;
+SHOW CREATE TABLE t1;
+Table t1
+Create Table CREATE TABLE `t1` (
+ `a` int(11) NOT NULL,
+ `b` int(11) DEFAULT NULL,
+ PRIMARY KEY (`a`)
+) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_uca1400_ai_ci
+include/stop_slave.inc
+SET GLOBAL slave_abort_blocking_timeout= @old_abort_timeout;
+include/start_slave.inc
+connection master;
+DROP TABLE t1;
+include/rpl_end.inc
diff --git a/mysql-test/suite/rpl/t/slave_abort_blocking_timeout.test b/mysql-test/suite/rpl/t/slave_abort_blocking_timeout.test
new file mode 100644
index 00000000000..04f24ff5df8
--- /dev/null
+++ b/mysql-test/suite/rpl/t/slave_abort_blocking_timeout.test
@@ -0,0 +1,85 @@
+--source include/have_innodb.inc
+--source include/have_sequence.inc
+--source include/have_binlog_format_mixed.inc
+--source include/master-slave.inc
+
+--echo *** Testcase to show how a long-running SELECT can block replication from proceeding
+--echo *** past a DDL. Intention to implement a timeout after which such SELECT can be
+--echo *** killed.
+
+--connection master
+CREATE TABLE t1 (a INT PRIMARY KEY, b INT) ENGINE=InnoDB;
+INSERT INTO t1 SELECT seq, 100+seq FROM seq_1_to_20;
+
+--sync_slave_with_master
+
+--source include/stop_slave.inc
+SELECT @@GLOBAL.slave_abort_blocking_timeout;
+SET @old_abort_timeout= @@slave_abort_blocking_timeout;
+SET GLOBAL slave_abort_blocking_timeout= -1;
+SELECT @@GLOBAL.slave_abort_blocking_timeout;
+SET GLOBAL slave_abort_blocking_timeout= 1.0;
+SELECT @@GLOBAL.slave_abort_blocking_timeout;
+--connection server_2
+# Start a SELECT that will run for long.
+send SELECT X.a, SLEEP(IF((X.b MOD 2)=0, 0.4, 0.6)) FROM t1 X CROSS JOIN t1 Y;
+
+--connection slave
+# Wait for the SELECT to have started so it will block the coming DDL
+# from replicating.
+--let $wait_condition= SELECT COUNT(*)=1 FROM INFORMATION_SCHEMA.PROCESSLIST WHERE state = 'User sleep'
+--source include/wait_condition.inc
+
+--connection master
+UPDATE t1 SET b=b+1000 WHERE a=1;
+ALTER TABLE t1 ADD INDEX b_idx(b);
+UPDATE t1 SET b=b+1000 WHERE a=20;
+
+--save_master_pos
+--connection slave
+--source include/start_slave.inc
+--sync_with_master
+
+--connection server_2
+--error ER_QUERY_INTERRUPTED
+reap;
+
+--connection slave
+query_vertical SHOW CREATE TABLE t1;
+
+# Do it again to test that a timeout of 0 also works to abort user queries.
+--source include/stop_slave.inc
+SET GLOBAL slave_abort_blocking_timeout= 0;
+SELECT @@GLOBAL.slave_abort_blocking_timeout;
+--connection server_2
+send SELECT X.a, SLEEP(IF((X.b MOD 2)=0, 0.4, 0.6)) FROM t1 X CROSS JOIN t1 Y;
+
+--connection slave
+--let $wait_condition= SELECT COUNT(*)=1 FROM INFORMATION_SCHEMA.PROCESSLIST WHERE state = 'User sleep'
+--source include/wait_condition.inc
+
+--connection master
+UPDATE t1 SET b=b+1000 WHERE a=1;
+ALTER TABLE t1 DROP INDEX b_idx;
+UPDATE t1 SET b=b+1000 WHERE a=20;
+
+--save_master_pos
+--connection slave
+--source include/start_slave.inc
+--sync_with_master
+
+--connection server_2
+--error ER_QUERY_INTERRUPTED
+reap;
+
+--connection slave
+query_vertical SHOW CREATE TABLE t1;
+
+
+--source include/stop_slave.inc
+SET GLOBAL slave_abort_blocking_timeout= @old_abort_timeout;
+--source include/start_slave.inc
+
+--connection master
+DROP TABLE t1;
+--source include/rpl_end.inc
diff --git a/sql/mdl.cc b/sql/mdl.cc
index faccd1c9476..9845718e165 100644
--- a/sql/mdl.cc
+++ b/sql/mdl.cc
@@ -613,7 +613,7 @@ class MDL_lock
bool needs_notification(const MDL_ticket *ticket) const
{ return m_strategy->needs_notification(ticket); }
- void notify_conflicting_locks(MDL_context *ctx)
+ void notify_conflicting_locks(MDL_context *ctx, bool abort_blocking)
{
for (const auto &conflicting_ticket : m_granted)
{
@@ -624,7 +624,8 @@ class MDL_lock
ctx->get_owner()->
notify_shared_lock(conflicting_ctx->get_owner(),
- conflicting_ctx->get_needs_thr_lock_abort());
+ conflicting_ctx->get_needs_thr_lock_abort(),
+ abort_blocking);
}
}
}
@@ -2361,10 +2362,10 @@ MDL_context::acquire_lock(MDL_request *mdl_request, double lock_wait_timeout)
/*
Don't break conflicting locks if timeout is 0 as 0 is used
- To check if there is any conflicting locks...
+ to check if there is any conflicting locks...
*/
if (lock->needs_notification(ticket) && lock_wait_timeout)
- lock->notify_conflicting_locks(this);
+ lock->notify_conflicting_locks(this, false);
/*
Ensure that if we are trying to get an exclusive lock for a slave
@@ -2397,14 +2398,39 @@ MDL_context::acquire_lock(MDL_request *mdl_request, double lock_wait_timeout)
find_deadlock();
- struct timespec abs_timeout, abs_shortwait;
+ struct timespec abs_timeout, abs_shortwait, abs_abort_blocking_timeout;
+ bool abort_blocking_enabled= false;
+ double abort_blocking_timeout= slave_abort_blocking_timeout;
+ if (abort_blocking_timeout < lock_wait_timeout &&
+ m_owner->get_thd()->rgi_slave)
+ {
+ set_timespec_nsec(abs_abort_blocking_timeout,
+ (ulonglong)(abort_blocking_timeout * 1000000000ULL));
+ abort_blocking_enabled= true;
+ }
set_timespec_nsec(abs_timeout,
(ulonglong)(lock_wait_timeout * 1000000000ULL));
- set_timespec(abs_shortwait, 1);
wait_status= MDL_wait::EMPTY;
- while (cmp_timespec(abs_shortwait, abs_timeout) <= 0)
+ for (;;)
{
+ bool abort_blocking= false;
+ set_timespec(abs_shortwait, 1);
+ if (abort_blocking_enabled &&
+ cmp_timespec(abs_shortwait, abs_abort_blocking_timeout) >= 0)
+ {
+ /*
+ If a slave DDL has waited for --slave-abort-select-timeout, then notify
+ any blocking SELECT once before continuing to wait until the full
+ timeout.
+ */
+ abs_shortwait= abs_abort_blocking_timeout;
+ abort_blocking= true;
+ abort_blocking_enabled= false;
+ }
+ if (cmp_timespec(abs_shortwait, abs_timeout) > 0)
+ break;
+
/* abs_timeout is far away. Wait a short while and notify locks. */
wait_status= m_wait.timed_wait(m_owner, &abs_shortwait, FALSE,
mdl_request->key.get_wait_state_name());
@@ -2425,9 +2451,8 @@ MDL_context::acquire_lock(MDL_request *mdl_request, double lock_wait_timeout)
mysql_prlock_wrlock(&lock->m_rwlock);
if (lock->needs_notification(ticket))
- lock->notify_conflicting_locks(this);
+ lock->notify_conflicting_locks(this, abort_blocking);
mysql_prlock_unlock(&lock->m_rwlock);
- set_timespec(abs_shortwait, 1);
}
if (wait_status == MDL_wait::EMPTY)
wait_status= m_wait.timed_wait(m_owner, &abs_timeout, TRUE,
diff --git a/sql/mdl.h b/sql/mdl.h
index 68cf5d2e811..aa271cb6d09 100644
--- a/sql/mdl.h
+++ b/sql/mdl.h
@@ -110,7 +110,8 @@ class MDL_context_owner
@see THD::notify_shared_lock()
*/
virtual bool notify_shared_lock(MDL_context_owner *in_use,
- bool needs_thr_lock_abort) = 0;
+ bool needs_thr_lock_abort,
+ bool needs_non_slave_abort) = 0;
};
/**
diff --git a/sql/mysqld.cc b/sql/mysqld.cc
index e938e8f6cfa..7e0a7f339c0 100644
--- a/sql/mysqld.cc
+++ b/sql/mysqld.cc
@@ -494,6 +494,7 @@ uint internal_slave_connections_needed_for_purge;
ulong slave_max_allowed_packet= 0;
double slave_max_statement_time_double;
ulonglong slave_max_statement_time;
+double slave_abort_blocking_timeout;
ulonglong binlog_stmt_cache_size=0;
ulonglong max_binlog_stmt_cache_size=0;
ulonglong test_flags;
diff --git a/sql/mysqld.h b/sql/mysqld.h
index 7cc88be0ad1..20644f0c404 100644
--- a/sql/mysqld.h
+++ b/sql/mysqld.h
@@ -242,6 +242,7 @@ extern ulong max_binlog_size;
extern ulong slave_max_allowed_packet;
extern ulonglong slave_max_statement_time;
extern double slave_max_statement_time_double;
+extern double slave_abort_blocking_timeout;
extern ulong opt_binlog_rows_event_max_size;
extern ulong binlog_row_metadata;
extern my_bool opt_binlog_gtid_index;
diff --git a/sql/privilege.h b/sql/privilege.h
index 84efc010d1e..eec0eb49df1 100644
--- a/sql/privilege.h
+++ b/sql/privilege.h
@@ -598,6 +598,8 @@ constexpr privilege_t PRIV_SET_SYSTEM_GLOBAL_VAR_SLAVE_MAX_ALLOWED_PACKET=
REPL_SLAVE_ADMIN_ACL;
constexpr privilege_t PRIV_SET_SYSTEM_GLOBAL_VAR_SLAVE_MAX_STATEMENT_TIME=
REPL_SLAVE_ADMIN_ACL;
+constexpr privilege_t PRIV_SET_SYSTEM_GLOBAL_VAR_SLAVE_ABORT_BLOCKING_TIMEOUT=
+ REPL_SLAVE_ADMIN_ACL;
constexpr privilege_t PRIV_SET_SYSTEM_GLOBAL_VAR_SLAVE_NET_TIMEOUT=
REPL_SLAVE_ADMIN_ACL;
constexpr privilege_t PRIV_SET_SYSTEM_GLOBAL_VAR_SLAVE_PARALLEL_MAX_QUEUED=
diff --git a/sql/sql_base.cc b/sql/sql_base.cc
index b045bf14cd9..a96082e5903 100644
--- a/sql/sql_base.cc
+++ b/sql/sql_base.cc
@@ -1025,7 +1025,7 @@ void close_thread_table(THD *thd, TABLE **table_ptr)
thd->handler_stats.add(file->handler_stats);
}
/*
- This look is needed to allow THD::notify_shared_lock() to
+ This lock is needed to allow THD::notify_shared_lock() to
traverse the thd->open_tables list without having to worry that
some of the tables are removed from under it
*/
diff --git a/sql/sql_class.cc b/sql/sql_class.cc
index 13818813574..595ffd681a5 100644
--- a/sql/sql_class.cc
+++ b/sql/sql_class.cc
@@ -2158,21 +2158,28 @@ void THD::disconnect()
bool THD::notify_shared_lock(MDL_context_owner *ctx_in_use,
- bool needs_thr_lock_abort)
+ bool needs_thr_lock_abort,
+ bool needs_non_slave_abort)
{
THD *in_use= ctx_in_use->get_thd();
bool signalled= FALSE;
DBUG_ENTER("THD::notify_shared_lock");
DBUG_PRINT("enter",("needs_thr_lock_abort: %d", needs_thr_lock_abort));
- if ((in_use->system_thread & SYSTEM_THREAD_DELAYED_INSERT) &&
- !in_use->killed)
+ enum killed_state kill_signal;
+ if (in_use->system_thread & SYSTEM_THREAD_DELAYED_INSERT)
+ kill_signal= KILL_CONNECTION;
+ else if (needs_non_slave_abort && !in_use->slave_thread)
+ kill_signal= KILL_QUERY;
+ else
+ kill_signal= NOT_KILLED;
+ if (kill_signal != NOT_KILLED && !in_use->killed)
{
/* This code is similar to kill_delayed_threads() */
DBUG_PRINT("info", ("kill delayed thread"));
mysql_mutex_lock(&in_use->LOCK_thd_kill);
- if (in_use->killed < KILL_CONNECTION)
- in_use->set_killed_no_mutex(KILL_CONNECTION);
+ if (in_use->killed < kill_signal)
+ in_use->set_killed_no_mutex(kill_signal);
in_use->abort_current_cond_wait(true);
mysql_mutex_unlock(&in_use->LOCK_thd_kill);
signalled= TRUE;
diff --git a/sql/sql_class.h b/sql/sql_class.h
index f79d99c902e..410dddd9d3a 100644
--- a/sql/sql_class.h
+++ b/sql/sql_class.h
@@ -4288,7 +4288,8 @@ class THD: public THD_count, /* this must be first */
@retval FALSE otherwise.
*/
bool notify_shared_lock(MDL_context_owner *ctx_in_use,
- bool needs_thr_lock_abort) override;
+ bool needs_thr_lock_abort,
+ bool needs_non_slave_abort) override;
// End implementation of MDL_context_owner interface.
diff --git a/sql/sys_vars.cc b/sql/sys_vars.cc
index d4997793428..25be6ffeda6 100644
--- a/sql/sys_vars.cc
+++ b/sql/sys_vars.cc
@@ -2591,6 +2591,17 @@ static Sys_var_on_access_global<
GLOBAL_VAR(slave_max_statement_time_double), CMD_LINE(REQUIRED_ARG),
VALID_RANGE(0, LONG_TIMEOUT), DEFAULT(0), NO_MUTEX_GUARD,
NOT_IN_BINLOG, ON_CHECK(0), ON_UPDATE(update_slave_max_statement_time));
+
+static Sys_var_on_access_global<
+ Sys_var_double, PRIV_SET_SYSTEM_GLOBAL_VAR_SLAVE_ABORT_BLOCKING_TIMEOUT>
+ Sys_slave_abort_blocking_timeout(
+ "slave_abort_blocking_timeout",
+ "Maximum time a slave DDL will wait for a blocking SELECT or other "
+ "user query until that query will be aborted. The argument will be "
+ "treated as a decimal value with microsecond precision",
+ GLOBAL_VAR(slave_abort_blocking_timeout), CMD_LINE(REQUIRED_ARG),
+ VALID_RANGE(0, LONG_TIMEOUT), DEFAULT(LONG_TIMEOUT), NO_MUTEX_GUARD,
+ NOT_IN_BINLOG);
#endif
--
2.39.2
2
1

[PATCH] MDEV-32014 Rename binlog cache temporary file to binlog file for large transaction
by Kristian Nielsen 02 Sep '24
by Kristian Nielsen 02 Sep '24
02 Sep '24
From: Libing Song <anders.slb(a)alibaba-inc.com>
Description
===========
When a transaction commits, it copies the binlog events from
binlog cache to binlog file. Very large transactions
(eg. gigabytes) can stall other transactions for a long time
because the data is copied while holding LOCK_log, which blocks
other commits from binlogging.
The solution in this patch is to rename the binlog cache file to
a binlog file instead of copy, if the commiting transaction has
large binlog cache. Rename is a very fast operation, it doesn't
block other transactions a long time.
The feature is called `Binlog Free Flush`, The term will be used
in the design and code.
Design
======
* binlog_free_flush_threshold
type: ulonglong
scope: global
dynamic: yes
default: 128MB
Only the binlog cache temporary files large than 256MB are
renamed to binlog file.
* #binlog_cache_files directory
To support rename, all binlog cache temporary files are managed
as normal files now. `#binlog_cache_files` directory is in the same
directory with binlog files. It is created at server startup if it doesn't
exist. Otherwise, all files in the directory is deleted at startup.
The temporary files are named with ML_ prefix and the memorary address
of the binlog_cache_data object which guarantees it is unique.
* Reserve space
To supprot rename feature, It must reserve enough space at the
begin of the binlog cache file. The space is required for
Format description, Gtid list, checkpoint and Gtid events when
renaming it to a binlog file.
Since binlog_cache_data's cache_log is directly accessed by binlog log,
online alter and wsrep. It is not easy to update all the code. Thus
binlog cache will not reserve space if it is not session binlog cache or
wsrep session is enabled.
- m_file_reserved_bytes
Stores the bytes reserved at the begin of the cache file.
It is initialized in write_prepare() and cleared by reset().
The reserved file header is hide to callers. Thus there is no
change for callers. E.g.
- get_byte_position() still get the length of binlog data
written to the cache, but not the file length.
- truncate(0) will truncate the file to m_file_reserved_bytes but not 0.
- write_prepare()
write_prepare() is called everytime when anything is being written
into the cache. It will call init_file_reserved_bytes() to create
the cache file (if it doesn't exist) and reserve suitable space if
the data written exceeds buffer's size.
* Binlog_free_flush
It is used to encapsulate the code for remaing a binlog cache
tempoary file to binlog file.
- should_free_flush()
it is called by write_transaction_to_binlog_events() to check if
a binlog cache should be rename to a binlog file.
- commit()
That is the entry to rename a binlog cache and commit the
transaction. Both rename and commit are protected by LOCK_log,
Thus not other transactions can write anything into the renamed
binlog before it.
Rename happens in a rotation. After the new binlog file is generated,
replace_binlog_file() is called to:
- copy data from binlog file to binlog cache file.
- write gtid event.
- rename the binlog cache file to binlog file.
After that the rotation will continue to succeed. Then the transaction
is committed. The transaction will be committed in a seperated
group itself. Its cache file will be detached and cache log will be
reset before calling trx_group_commit_leader(). Thus only Xid event
be written.
---
libmysqld/CMakeLists.txt | 2 +-
mysql-test/main/mysqld--help.result | 5 +
mysql-test/main/tmp_space_usage.result | 9 +-
mysql-test/main/tmp_space_usage.test | 10 +-
.../binlog/r/binlog_free_flush_atomic.result | 68 +++
.../binlog/t/binlog_free_flush_atomic.test | 110 +++++
.../encryption/r/binlog_cache_encrypt.result | 18 +
.../t/binlog_cache_encrypt-master.opt | 1 +
.../encryption/t/binlog_cache_encrypt.test | 19 +
.../suite/rpl/r/rpl_binlog_free_flush.result | 117 +++++
.../suite/rpl/t/rpl_binlog_free_flush.test | 217 +++++++++
.../sys_vars/r/sysvars_server_embedded.result | 10 +
.../r/sysvars_server_notembedded.result | 10 +
sql/CMakeLists.txt | 2 +-
sql/log.cc | 432 ++++++++++++++++--
sql/log.h | 21 +-
sql/log_cache.cc | 122 +++++
sql/log_cache.h | 116 ++++-
sql/log_event.h | 13 +
sql/log_event_server.cc | 32 +-
sql/mysqld.cc | 4 +-
sql/sys_vars.cc | 10 +
22 files changed, 1288 insertions(+), 60 deletions(-)
create mode 100644 mysql-test/suite/binlog/r/binlog_free_flush_atomic.result
create mode 100644 mysql-test/suite/binlog/t/binlog_free_flush_atomic.test
create mode 100644 mysql-test/suite/encryption/r/binlog_cache_encrypt.result
create mode 100644 mysql-test/suite/encryption/t/binlog_cache_encrypt-master.opt
create mode 100644 mysql-test/suite/encryption/t/binlog_cache_encrypt.test
create mode 100644 mysql-test/suite/rpl/r/rpl_binlog_free_flush.result
create mode 100644 mysql-test/suite/rpl/t/rpl_binlog_free_flush.test
create mode 100644 sql/log_cache.cc
diff --git a/libmysqld/CMakeLists.txt b/libmysqld/CMakeLists.txt
index 12bcc7ce1aa..f0d837470e2 100644
--- a/libmysqld/CMakeLists.txt
+++ b/libmysqld/CMakeLists.txt
@@ -67,7 +67,7 @@ SET(SQL_EMBEDDED_SOURCES emb_qcache.cc libmysqld.c lib_sql.cc
../sql/item_subselect.cc ../sql/item_sum.cc ../sql/item_timefunc.cc
../sql/item_xmlfunc.cc ../sql/item_jsonfunc.cc
../sql/json_schema.cc ../sql/json_schema_helper.cc
- ../sql/key.cc ../sql/lock.cc ../sql/log.cc
+ ../sql/key.cc ../sql/lock.cc ../sql/log.cc ../sql/log_cache.cc
../sql/log_event.cc ../sql/log_event_server.cc
../sql/mf_iocache.cc ../sql/my_decimal.cc
../sql/net_serv.cc ../sql/opt_range.cc
diff --git a/mysql-test/main/mysqld--help.result b/mysql-test/main/mysqld--help.result
index a1752a356d9..862382b844d 100644
--- a/mysql-test/main/mysqld--help.result
+++ b/mysql-test/main/mysqld--help.result
@@ -95,6 +95,10 @@ The following specify which files/extra groups are read (specified before remain
statement-based binary logging (smaller binary logs),
MIXED for statement-based binary logging when it's safe
with fall back to row-based otherwise
+ --binlog-free-flush-threshold=#
+ Try to rename the binlog cache temporary file of the
+ commiting transaction to a binlog file when its binlog
+ cache size is bigger than the value of this variable
--binlog-gtid-index Enable the creation of a GTID index for every binlog
file, and the use of such index for speeding up GTID
lookup in the binlog
@@ -1620,6 +1624,7 @@ binlog-direct-non-transactional-updates FALSE
binlog-expire-logs-seconds 0
binlog-file-cache-size 16384
binlog-format MIXED
+binlog-free-flush-threshold 134217728
binlog-gtid-index TRUE
binlog-gtid-index-page-size 4096
binlog-gtid-index-span-min 65536
diff --git a/mysql-test/main/tmp_space_usage.result b/mysql-test/main/tmp_space_usage.result
index ff2f58ab437..242227e0783 100644
--- a/mysql-test/main/tmp_space_usage.result
+++ b/mysql-test/main/tmp_space_usage.result
@@ -160,16 +160,17 @@ ERROR HY000: Global temporary space limit reached
#
set @save_max_tmp_total_space_usage=@@global.max_tmp_total_space_usage;
set @@global.max_tmp_total_space_usage=64*1024*1024;
-set @@max_tmp_session_space_usage=1179648;
+set @@max_tmp_session_space_usage=1179648+65536;
select @@max_tmp_session_space_usage;
@@max_tmp_session_space_usage
-1179648
+1245184
set @save_aria_repair_threads=@@aria_repair_threads;
set @@aria_repair_threads=2;
set @save_max_heap_table_size=@@max_heap_table_size;
set @@max_heap_table_size=16777216;
CREATE TABLE t1 (a CHAR(255),b INT,INDEX (b));
INSERT INTO t1 SELECT SEQ,SEQ FROM seq_1_to_100000;
+set @@max_tmp_session_space_usage=1179648;
SELECT * FROM t1 UNION SELECT * FROM t1;
ERROR HY000: Local temporary space limit reached
DROP TABLE t1;
@@ -205,11 +206,13 @@ ERROR HY000: Local temporary space limit reached
#
connect c1, localhost, root,,;
set @@binlog_format=row;
-CREATE OR REPLACE TABLE t1 (a DATETIME) ENGINE=MyISAM;
+CREATE OR REPLACE TABLE t1 (a DATETIME) ENGINE=InnoDB;
+BEGIN;
INSERT INTO t1 SELECT NOW() FROM seq_1_to_6000;
SET max_tmp_session_space_usage = 64*1024;
SELECT * FROM information_schema.ALL_PLUGINS LIMIT 2;
ERROR HY000: Local temporary space limit reached
+ROLLBACK;
drop table t1;
connection default;
disconnect c1;
diff --git a/mysql-test/main/tmp_space_usage.test b/mysql-test/main/tmp_space_usage.test
index af7b295f343..1685dbbc450 100644
--- a/mysql-test/main/tmp_space_usage.test
+++ b/mysql-test/main/tmp_space_usage.test
@@ -215,7 +215,8 @@ select count(distinct concat(seq,repeat('x',1000))) from seq_1_to_1000;
set @save_max_tmp_total_space_usage=@@global.max_tmp_total_space_usage;
set @@global.max_tmp_total_space_usage=64*1024*1024;
-set @@max_tmp_session_space_usage=1179648;
+# Binlog cache reserve 4096 bytes at the begin of the temporary file.
+set @@max_tmp_session_space_usage=1179648+65536;
select @@max_tmp_session_space_usage;
set @save_aria_repair_threads=@@aria_repair_threads;
set @@aria_repair_threads=2;
@@ -224,6 +225,7 @@ set @@max_heap_table_size=16777216;
CREATE TABLE t1 (a CHAR(255),b INT,INDEX (b));
INSERT INTO t1 SELECT SEQ,SEQ FROM seq_1_to_100000;
+set @@max_tmp_session_space_usage=1179648;
--error 200
SELECT * FROM t1 UNION SELECT * FROM t1;
DROP TABLE t1;
@@ -266,11 +268,15 @@ SELECT MIN(VARIABLE_VALUE) OVER (), NTILE(1) OVER (), MAX(VARIABLE_NAME) OVER ()
connect(c1, localhost, root,,);
set @@binlog_format=row;
-CREATE OR REPLACE TABLE t1 (a DATETIME) ENGINE=MyISAM;
+CREATE OR REPLACE TABLE t1 (a DATETIME) ENGINE=InnoDB;
+# Use the transaction to keep binlog cache temporary file large enough
+BEGIN;
INSERT INTO t1 SELECT NOW() FROM seq_1_to_6000;
+
SET max_tmp_session_space_usage = 64*1024;
--error 200
SELECT * FROM information_schema.ALL_PLUGINS LIMIT 2;
+ROLLBACK;
drop table t1;
connection default;
disconnect c1;
diff --git a/mysql-test/suite/binlog/r/binlog_free_flush_atomic.result b/mysql-test/suite/binlog/r/binlog_free_flush_atomic.result
new file mode 100644
index 00000000000..31175d32581
--- /dev/null
+++ b/mysql-test/suite/binlog/r/binlog_free_flush_atomic.result
@@ -0,0 +1,68 @@
+RESET MASTER;
+#
+# binlog cache file is created in #binlog_cache_files directory
+# and it is deleted at disconnect
+#
+connect con1,localhost,root,,;
+CREATE TABLE t1 (c1 LONGTEXT) ENGINE = InnoDB;
+# list binlog_cache_files/
+INSERT INTO t1 values(repeat("1", 5242880));
+INSERT INTO t1 values(repeat("1", 5242880));
+FLUSH BINARY LOGS;
+# list #binlog_cache_files/
+ML_BINLOG_CACHE_FILE
+SET debug_sync = "thread_end SIGNAL signal.thread_end";
+disconnect con1;
+connection default;
+SET debug_sync = "now WAIT_FOR signal.thread_end";
+# binlog cache file is deleted at disconnection
+# list #binlog_cache_files/
+#
+# Reserved space is not big enough, rename will not happen. But rotate
+# will succeed.
+#
+SET GLOBAL binlog_free_flush_threshold = 10 * 1024 * 1024;
+SET debug = 'd,simulate_required_size_too_big';
+UPDATE t1 SET c1 = repeat('2', 5242880);
+include/assert.inc [Binlog is rotated, but free flush is not executed.]
+#
+# Error happens when renaming binlog cache to binlog file, rename will
+# not happen. Since the original binlog is delete, the rotate will failed
+# too. binlog will be closed.
+#
+SET debug = 'd,simulate_rename_binlog_cache_to_binlog_error';
+UPDATE t1 SET c1 = repeat('3', 5242880);
+ERROR HY000: Can't open file: './master-bin.000004' (errno: 1 "Operation not permitted")
+SELECT count(*) FROM t1 WHERE c1 like "3%";
+count(*)
+2
+# Binlog is closed
+show master status;
+File Position Binlog_Do_DB Binlog_Ignore_DB
+# restart
+show master status;
+File Position Binlog_Do_DB Binlog_Ignore_DB
+master-bin.000004 # <Binlog_Do_DB> <Binlog_Ignore_DB>
+#
+# Crash happens before rename the file
+#
+SET GLOBAL binlog_free_flush_threshold = 10 * 1024 * 1024;
+SET debug = 'd,binlog_free_flush_crash_before_rename';
+UPDATE t1 SET c1 = repeat('4', 5242880);
+ERROR HY000: Lost connection to server during query
+# One cache file left afte crash
+# list #binlog_cache_files/
+ML_BINLOG_CACHE_FILE
+non_binlog_cache
+# restart
+# The cache file is deleted at startup.
+# list #binlog_cache_files/
+non_binlog_cache
+include/assert_grep.inc [warning: non_binlog_cache file is in #binlog_cache_files/]
+include/show_binlog_events.inc
+Log_name Pos Event_type Server_id End_log_pos Info
+master-bin.000005 # Format_desc # # SERVER_VERSION, BINLOG_VERSION
+master-bin.000005 # Gtid_list # # [#-#-#]
+call mtr.add_suppression(".*Turning logging off for the whole duration.*");
+call mtr.add_suppression(".*non_binlog_cache is in #binlog_cache_files/.*");
+DROP TABLE t1;
diff --git a/mysql-test/suite/binlog/t/binlog_free_flush_atomic.test b/mysql-test/suite/binlog/t/binlog_free_flush_atomic.test
new file mode 100644
index 00000000000..05b4792c314
--- /dev/null
+++ b/mysql-test/suite/binlog/t/binlog_free_flush_atomic.test
@@ -0,0 +1,110 @@
+################################################################################
+# MDEV-32014 Rename binlog cache to binlog file
+#
+# It verifies that the rename logic is handled correct if error happens.
+################################################################################
+--source include/have_binlog_format_row.inc
+--source include/have_innodb.inc
+--source include/have_debug.inc
+RESET MASTER;
+
+--echo #
+--echo # binlog cache file is created in #binlog_cache_files directory
+--echo # and it is deleted at disconnect
+--echo #
+--connect(con1,localhost,root,,)
+CREATE TABLE t1 (c1 LONGTEXT) ENGINE = InnoDB;
+
+--echo # list binlog_cache_files/
+--let $datadir= `SELECT @@datadir`
+--list_files $datadir/#binlog_cache_files
+
+INSERT INTO t1 values(repeat("1", 5242880));
+INSERT INTO t1 values(repeat("1", 5242880));
+FLUSH BINARY LOGS;
+
+--echo # list #binlog_cache_files/
+--replace_regex /ML_[0-9]+/ML_BINLOG_CACHE_FILE/
+--list_files $datadir/#binlog_cache_files
+
+SET debug_sync = "thread_end SIGNAL signal.thread_end";
+--disconnect con1
+--connection default
+# Wait until the connection is closed completely.
+SET debug_sync = "now WAIT_FOR signal.thread_end";
+
+--echo # binlog cache file is deleted at disconnection
+--echo # list #binlog_cache_files/
+--list_files $datadir/#binlog_cache_files
+
+--echo #
+--echo # Reserved space is not big enough, rename will not happen. But rotate
+--echo # will succeed.
+--echo #
+SET GLOBAL binlog_free_flush_threshold = 10 * 1024 * 1024;
+SET debug = 'd,simulate_required_size_too_big';
+UPDATE t1 SET c1 = repeat('2', 5242880);
+
+--let $gtid_end_pos= query_get_value(SHOW BINLOG EVENTS IN 'master-bin.000002' LIMIT 4, End_log_pos, 4)
+--let $assert_cond= $gtid_end_pos < 4096
+--let $assert_text= Binlog is rotated, but free flush is not executed.
+--source include/assert.inc
+
+--echo #
+--echo # Error happens when renaming binlog cache to binlog file, rename will
+--echo # not happen. Since the original binlog is delete, the rotate will failed
+--echo # too. binlog will be closed.
+--echo #
+SET debug = 'd,simulate_rename_binlog_cache_to_binlog_error';
+--error ER_CANT_OPEN_FILE
+UPDATE t1 SET c1 = repeat('3', 5242880);
+SELECT count(*) FROM t1 WHERE c1 like "3%";
+
+--echo # Binlog is closed
+--source include/show_master_status.inc
+
+--source include/restart_mysqld.inc
+--source include/show_master_status.inc
+
+--echo #
+--echo # Crash happens before rename the file
+--echo #
+SET GLOBAL binlog_free_flush_threshold = 10 * 1024 * 1024;
+
+SET debug = 'd,binlog_free_flush_crash_before_rename';
+--source include/expect_crash.inc
+--error 2013
+UPDATE t1 SET c1 = repeat('4', 5242880);
+
+--write_file $datadir/#binlog_cache_files/non_binlog_cache
+It is not a binlog cache file
+EOF
+
+--echo # One cache file left afte crash
+--echo # list #binlog_cache_files/
+--replace_regex /ML_[0-9]+/ML_BINLOG_CACHE_FILE/
+--list_files $datadir/#binlog_cache_files
+
+--source include/start_mysqld.inc
+--echo # The cache file is deleted at startup.
+--echo # list #binlog_cache_files/
+--list_files $datadir/#binlog_cache_files
+
+--let $assert_text= warning: non_binlog_cache file is in #binlog_cache_files/
+--let $assert_file= $MYSQLTEST_VARDIR/log/mysqld.1.err
+--let $assert_select= non_binlog_cache.*#binlog_cache_files/
+--let $assert_count= 1
+--let $assert_only_after= CURRENT_TEST: binlog.binlog_free_flush_atomic
+--source include/assert_grep.inc
+
+--remove_file $datadir/#binlog_cache_files/non_binlog_cache
+
+--let $binlog_file= LAST
+--let $binlog_start= 4
+--let $skip_checkpoint_events= 1
+--source include/show_binlog_events.inc
+
+call mtr.add_suppression(".*Turning logging off for the whole duration.*");
+call mtr.add_suppression(".*non_binlog_cache is in #binlog_cache_files/.*");
+DROP TABLE t1;
+
diff --git a/mysql-test/suite/encryption/r/binlog_cache_encrypt.result b/mysql-test/suite/encryption/r/binlog_cache_encrypt.result
new file mode 100644
index 00000000000..a479a39cbcf
--- /dev/null
+++ b/mysql-test/suite/encryption/r/binlog_cache_encrypt.result
@@ -0,0 +1,18 @@
+RESET MASTER;
+CREATE TABLE t1 (c1 LONGTEXT) ENGINE = InnoDB;
+INSERT INTO t1 values(repeat("1", 5242880));
+INSERT INTO t1 values(repeat("1", 5242880));
+FLUSH BINARY LOGS;
+SET @saved_threshold= @@GLOBAL.binlog_free_flush_threshold;
+SET GLOBAL binlog_free_flush_threshold = 10 * 1024 * 1024;
+UPDATE t1 SET c1 = repeat('2', 5242880);
+include/show_binlog_events.inc
+Log_name Pos Event_type Server_id End_log_pos Info
+master-bin.000002 # Gtid # # BEGIN GTID #-#-#
+master-bin.000002 # Annotate_rows # # UPDATE t1 SET c1 = repeat('2', 5242880)
+master-bin.000002 # Table_map # # table_id: # (test.t1)
+master-bin.000002 # Update_rows_v1 # # table_id: #
+master-bin.000002 # Update_rows_v1 # # table_id: # flags: STMT_END_F
+master-bin.000002 # Xid # # COMMIT /* XID */
+SET GLOBAL binlog_free_flush_threshold = @saved_threshold;
+DROP TABLE t1;
diff --git a/mysql-test/suite/encryption/t/binlog_cache_encrypt-master.opt b/mysql-test/suite/encryption/t/binlog_cache_encrypt-master.opt
new file mode 100644
index 00000000000..469148de64a
--- /dev/null
+++ b/mysql-test/suite/encryption/t/binlog_cache_encrypt-master.opt
@@ -0,0 +1 @@
+--encrypt-tmp-files=on
diff --git a/mysql-test/suite/encryption/t/binlog_cache_encrypt.test b/mysql-test/suite/encryption/t/binlog_cache_encrypt.test
new file mode 100644
index 00000000000..93725ce653e
--- /dev/null
+++ b/mysql-test/suite/encryption/t/binlog_cache_encrypt.test
@@ -0,0 +1,19 @@
+--source include/have_file_key_management_plugin.inc
+--source include/have_binlog_format_row.inc
+--source include/have_innodb.inc
+RESET MASTER;
+CREATE TABLE t1 (c1 LONGTEXT) ENGINE = InnoDB;
+INSERT INTO t1 values(repeat("1", 5242880));
+INSERT INTO t1 values(repeat("1", 5242880));
+FLUSH BINARY LOGS;
+
+SET @saved_threshold= @@GLOBAL.binlog_free_flush_threshold;
+SET GLOBAL binlog_free_flush_threshold = 10 * 1024 * 1024;
+UPDATE t1 SET c1 = repeat('2', 5242880);
+
+--let $binlog_file= LAST
+--let $skip_checkpoint_events=1
+--source include/show_binlog_events.inc
+
+SET GLOBAL binlog_free_flush_threshold = @saved_threshold;
+DROP TABLE t1;
diff --git a/mysql-test/suite/rpl/r/rpl_binlog_free_flush.result b/mysql-test/suite/rpl/r/rpl_binlog_free_flush.result
new file mode 100644
index 00000000000..a216e252dd8
--- /dev/null
+++ b/mysql-test/suite/rpl/r/rpl_binlog_free_flush.result
@@ -0,0 +1,117 @@
+include/master-slave.inc
+[connection master]
+# Prepare
+SET @saved_threshold= @@GLOBAL.binlog_free_flush_threshold;
+SET @saved_checksum= @@GLOBAL.binlog_checksum;
+SET GLOBAL binlog_checksum = "NONE";
+CREATE TABLE t1 (c1 LONGTEXT) ENGINE = InnoDB;
+CREATE TABLE t2 (c1 LONGTEXT) ENGINE = MyISAM;
+INSERT INTO t1 values(repeat("1", 5242880));
+INSERT INTO t1 values(repeat("1", 5242880));
+INSERT INTO t2 values(repeat("1", 5242880));
+INSERT INTO t2 values(repeat("1", 5242880));
+FLUSH BINARY LOGS;
+# Not renamed to binlog, since the binlog cache is not larger than the
+# threshold. And it should works well after ROLLBACK TO SAVEPOINT
+BEGIN;
+SAVEPOINT s1;
+UPDATE t1 SET c1 = repeat('1', 5242880);
+ROLLBACK TO SAVEPOINT s1;
+UPDATE t1 SET c1 = repeat('2', 5242880);
+SAVEPOINT s2;
+UPDATE t1 SET c1 = repeat('3', 5242880);
+UPDATE t1 SET c1 = repeat('4', 5242880);
+ROLLBACK TO SAVEPOINT s2;
+COMMIT;
+include/assert.inc [Binlog is not rotated]
+#
+# Test binlog cache rename to binlog file with checksum off
+#
+SET GLOBAL binlog_free_flush_threshold = 10 * 1024 * 1024;
+# Transaction cache can be renamed and works well with ROLLBACK TO SAVEPOINT
+BEGIN;
+SAVEPOINT s1;
+UPDATE t1 SET c1 = repeat('2', 5242880);
+ROLLBACK TO s1;
+UPDATE t1 SET c1 = repeat('3', 5242880);
+SAVEPOINT s2;
+UPDATE t1 SET c1 = repeat('4', 5242880);
+UPDATE t1 SET c1 = repeat('5', 5242880);
+UPDATE t1 SET c1 = repeat('6', 5242880);
+ROLLBACK TO SAVEPOINT s2;
+COMMIT;
+INSERT INTO t1 VALUES("after_update");
+include/assert.inc [Free flush is executed.]
+# statement cache can be renamed
+BEGIN;
+UPDATE t2 SET c1 = repeat('4', 5242880);
+INSERT INTO t1 VALUES("after_update");
+COMMIT;
+include/assert.inc [Free flush is executed.]
+# CREATE SELECT works well
+CREATE TABLE t3 SELECT * FROM t1;
+include/assert.inc [Free flush is executed.]
+CREATE TABLE t4 SELECT * FROM t2;
+include/assert.inc [Free flush is executed.]
+# XA statement works well
+XA START "test-a-long-xid========================================";
+UPDATE t1 SET c1 = repeat('1', 5242880);
+XA END "test-a-long-xid========================================";
+XA PREPARE "test-a-long-xid========================================";
+XA COMMIT "test-a-long-xid========================================";
+include/assert.inc [Free flush is executed.]
+XA START "test-xid";
+UPDATE t1 SET c1 = repeat('2', 5242880);
+XA END "test-xid";
+XA COMMIT "test-xid" ONE PHASE;
+include/assert.inc [Free flush is executed.]
+#
+# It works well in the situation that binlog header is larger than
+# IO_SIZE and binlog file's buffer.
+#
+FLUSH BINARY LOGS;
+SET SESSION server_id = 1;
+UPDATE t1 SET c1 = repeat('3', 5242880);
+include/assert.inc [Free flush is executed.]
+#
+# RESET MASTER should work well. It also verifies binlog checksum mechanism.
+#
+include/rpl_reset.inc
+#
+# Test binlog cache rename to binlog file with checksum on
+#
+SET GLOBAL binlog_checksum = "CRC32";
+# It will not rename the cache to file, since the cache's checksum was
+# initialized when reset the cache at the end of previous transaction.
+UPDATE t1 SET c1 = repeat('5', 5242880);
+include/assert.inc [Binlog is not rotated]
+#
+# Not rename to binlog file If the cache's checksum is not same
+# to binlog_checksum
+#
+BEGIN;
+UPDATE t1 SET c1 = repeat('6', 5242880);
+SET GLOBAL binlog_checksum = "NONE";
+COMMIT;
+include/assert.inc [Binlog is not rotated]
+BEGIN;
+UPDATE t1 SET c1 = repeat('7', 5242880);
+SET GLOBAL binlog_checksum = "CRC32";
+COMMIT;
+include/assert.inc [Binlog is not rotated]
+#
+# Not rename to binlog file If both stmt and trx cache are not empty
+#
+UPDATE t1, t2 SET t1.c1 = repeat('8', 5242880), t2.c1 = repeat('7', 5242880);
+include/assert.inc [Binlog is not rotated]
+#
+# Not rename to binlog file If binlog_legacy_event_pos is on
+#
+SET GLOBAL binlog_legacy_event_pos = ON;
+UPDATE t1 SET c1 = repeat('9', 5242880);
+SET GLOBAL binlog_legacy_event_pos = OFF;
+include/assert.inc [Binlog is not rotated]
+DROP TABLE t1, t2, t3, t4;
+SET GLOBAL binlog_free_flush_threshold = @saved_threshold;
+SET GLOBAL binlog_checksum = @saved_checksum;
+include/rpl_end.inc
diff --git a/mysql-test/suite/rpl/t/rpl_binlog_free_flush.test b/mysql-test/suite/rpl/t/rpl_binlog_free_flush.test
new file mode 100644
index 00000000000..5152779dd48
--- /dev/null
+++ b/mysql-test/suite/rpl/t/rpl_binlog_free_flush.test
@@ -0,0 +1,217 @@
+################################################################################
+# MDEV-32014 Rename binlog cache to binlog file
+#
+# It verifies that the binlog caches which are larger
+# than binlog_free_flush_threshold can be move to a binlog file
+# successfully. With a successful rename,
+# - it rotates the binlog and the cache is renamed to the new binlog file
+# - an ignorable event is generated just after the Gtid_log_event of the
+# transaction to take the reserved spaces which is unused.
+#
+# It also verifies that rename is not supported in below cases
+# though the cache is larger than the threshold
+# - both statement and transaction cache should be flushed.
+# - the cache's checksum option is not same to binlog_checksum
+# - binlog_legacy_event_pos is enabled.
+################################################################################
+--source include/master-slave.inc
+--source include/have_binlog_format_row.inc
+--source include/have_innodb.inc
+
+--echo # Prepare
+SET @saved_threshold= @@GLOBAL.binlog_free_flush_threshold;
+SET @saved_checksum= @@GLOBAL.binlog_checksum;
+
+SET GLOBAL binlog_checksum = "NONE";
+
+CREATE TABLE t1 (c1 LONGTEXT) ENGINE = InnoDB;
+CREATE TABLE t2 (c1 LONGTEXT) ENGINE = MyISAM;
+
+INSERT INTO t1 values(repeat("1", 5242880));
+INSERT INTO t1 values(repeat("1", 5242880));
+INSERT INTO t2 values(repeat("1", 5242880));
+INSERT INTO t2 values(repeat("1", 5242880));
+
+FLUSH BINARY LOGS;
+
+--echo # Not renamed to binlog, since the binlog cache is not larger than the
+--echo # threshold. And it should works well after ROLLBACK TO SAVEPOINT
+BEGIN;
+SAVEPOINT s1;
+UPDATE t1 SET c1 = repeat('1', 5242880);
+ROLLBACK TO SAVEPOINT s1;
+UPDATE t1 SET c1 = repeat('2', 5242880);
+SAVEPOINT s2;
+UPDATE t1 SET c1 = repeat('3', 5242880);
+UPDATE t1 SET c1 = repeat('4', 5242880);
+ROLLBACK TO SAVEPOINT s2;
+COMMIT;
+
+--let $binlog_file= query_get_value(SHOW MASTER STATUS, File, 1)
+--let $assert_cond= "$binlog_file" = "master-bin.000003"
+--let $assert_text= Binlog is not rotated
+--source include/assert.inc
+
+--echo #
+--echo # Test binlog cache rename to binlog file with checksum off
+--echo #
+SET GLOBAL binlog_free_flush_threshold = 10 * 1024 * 1024;
+
+--echo # Transaction cache can be renamed and works well with ROLLBACK TO SAVEPOINT
+BEGIN;
+SAVEPOINT s1;
+UPDATE t1 SET c1 = repeat('2', 5242880);
+ROLLBACK TO s1;
+UPDATE t1 SET c1 = repeat('3', 5242880);
+SAVEPOINT s2;
+UPDATE t1 SET c1 = repeat('4', 5242880);
+UPDATE t1 SET c1 = repeat('5', 5242880);
+UPDATE t1 SET c1 = repeat('6', 5242880);
+ROLLBACK TO SAVEPOINT s2;
+COMMIT;
+INSERT INTO t1 VALUES("after_update");
+
+--let $gtid_end_pos= query_get_value(SHOW BINLOG EVENTS IN 'master-bin.000004' LIMIT 4, End_log_pos, 4)
+--let $assert_cond= $gtid_end_pos = 4096
+--let $assert_text= Free flush is executed.
+--source include/assert.inc
+
+--echo # statement cache can be renamed
+BEGIN;
+UPDATE t2 SET c1 = repeat('4', 5242880);
+INSERT INTO t1 VALUES("after_update");
+COMMIT;
+--let $gtid_end_pos= query_get_value(SHOW BINLOG EVENTS IN 'master-bin.000005' LIMIT 4, End_log_pos, 4)
+--let $assert_cond= $gtid_end_pos = 4096
+--let $assert_text= Free flush is executed.
+--source include/assert.inc
+
+--echo # CREATE SELECT works well
+CREATE TABLE t3 SELECT * FROM t1;
+--let $gtid_end_pos= query_get_value(SHOW BINLOG EVENTS IN 'master-bin.000006' LIMIT 4, End_log_pos, 4)
+--let $assert_cond= $gtid_end_pos = 4096
+--let $assert_text= Free flush is executed.
+--source include/assert.inc
+
+CREATE TABLE t4 SELECT * FROM t2;
+--let $gtid_end_pos= query_get_value(SHOW BINLOG EVENTS IN 'master-bin.000007' LIMIT 4, End_log_pos, 4)
+--let $assert_cond= $gtid_end_pos = 4096
+--let $assert_text= Free flush is executed.
+--source include/assert.inc
+
+--echo # XA statement works well
+XA START "test-a-long-xid========================================";
+UPDATE t1 SET c1 = repeat('1', 5242880);
+XA END "test-a-long-xid========================================";
+XA PREPARE "test-a-long-xid========================================";
+XA COMMIT "test-a-long-xid========================================";
+--let $gtid_end_pos= query_get_value(SHOW BINLOG EVENTS IN 'master-bin.000008' LIMIT 4, End_log_pos, 4)
+--let $assert_cond= $gtid_end_pos = 4096
+--let $assert_text= Free flush is executed.
+--source include/assert.inc
+
+XA START "test-xid";
+UPDATE t1 SET c1 = repeat('2', 5242880);
+XA END "test-xid";
+XA COMMIT "test-xid" ONE PHASE;
+--let $gtid_end_pos= query_get_value(SHOW BINLOG EVENTS IN 'master-bin.000009' LIMIT 4, End_log_pos, 4)
+--let $assert_cond= $gtid_end_pos = 4096
+--let $assert_text= Free flush is executed.
+--source include/assert.inc
+
+--echo #
+--echo # It works well in the situation that binlog header is larger than
+--echo # IO_SIZE and binlog file's buffer.
+--echo #
+--disable_query_log
+
+# make Gtid_list_event larger than 64K(binlog file's buffer)
+--let $server_id= 100000
+while ($server_id < 104096)
+{
+ eval SET SESSION server_id = $server_id;
+ eval UPDATE t1 SET c1 = "$server_id" LIMIT 1;
+ --inc $server_id
+}
+
+--enable_query_log
+
+# After flush, reserved space should be updated.
+FLUSH BINARY LOGS;
+
+SET SESSION server_id = 1;
+UPDATE t1 SET c1 = repeat('3', 5242880);
+
+--let $gtid_end_pos= query_get_value(SHOW BINLOG EVENTS IN 'master-bin.000011' LIMIT 4, End_log_pos, 4)
+# 69632 is 65K which is larger, binlog's buffer is 64K
+--let $assert_cond= $gtid_end_pos = 69632
+--let $assert_text= Free flush is executed.
+--source include/assert.inc
+
+--echo #
+--echo # RESET MASTER should work well. It also verifies binlog checksum mechanism.
+--echo #
+--source include/rpl_reset.inc
+
+--echo #
+--echo # Test binlog cache rename to binlog file with checksum on
+--echo #
+SET GLOBAL binlog_checksum = "CRC32";
+
+--echo # It will not rename the cache to file, since the cache's checksum was
+--echo # initialized when reset the cache at the end of previous transaction.
+UPDATE t1 SET c1 = repeat('5', 5242880);
+--let $binlog_file= query_get_value(SHOW MASTER STATUS, File, 1)
+--let $assert_cond= "$binlog_file" = "master-bin.000002"
+--let $assert_text= Binlog is not rotated
+--source include/assert.inc
+
+--echo #
+--echo # Not rename to binlog file If the cache's checksum is not same
+--echo # to binlog_checksum
+--echo #
+BEGIN;
+UPDATE t1 SET c1 = repeat('6', 5242880);
+SET GLOBAL binlog_checksum = "NONE";
+COMMIT;
+--let $binlog_file= query_get_value(SHOW MASTER STATUS, File, 1)
+--let $assert_cond= "$binlog_file" = "master-bin.000003"
+--let $assert_text= Binlog is not rotated
+--source include/assert.inc
+
+BEGIN;
+UPDATE t1 SET c1 = repeat('7', 5242880);
+SET GLOBAL binlog_checksum = "CRC32";
+COMMIT;
+--let $binlog_file= query_get_value(SHOW MASTER STATUS, File, 1)
+--let $assert_cond= "$binlog_file" = "master-bin.000004"
+--let $assert_text= Binlog is not rotated
+--source include/assert.inc
+
+--echo #
+--echo # Not rename to binlog file If both stmt and trx cache are not empty
+--echo #
+UPDATE t1, t2 SET t1.c1 = repeat('8', 5242880), t2.c1 = repeat('7', 5242880);
+--let $binlog_file= query_get_value(SHOW MASTER STATUS, File, 1)
+--let $assert_cond= "$binlog_file" = "master-bin.000004"
+--let $assert_text= Binlog is not rotated
+--source include/assert.inc
+
+--echo #
+--echo # Not rename to binlog file If binlog_legacy_event_pos is on
+--echo #
+SET GLOBAL binlog_legacy_event_pos = ON;
+UPDATE t1 SET c1 = repeat('9', 5242880);
+SET GLOBAL binlog_legacy_event_pos = OFF;
+--let $binlog_file= query_get_value(SHOW MASTER STATUS, File, 1)
+--let $assert_cond= "$binlog_file" = "master-bin.000004"
+--let $assert_text= Binlog is not rotated
+--source include/assert.inc
+
+# cleanup
+DROP TABLE t1, t2, t3, t4;
+SET GLOBAL binlog_free_flush_threshold = @saved_threshold;
+SET GLOBAL binlog_checksum = @saved_checksum;
+--let $binlog_file=
+--let $skip_checkpoint_events=0
+--source include/rpl_end.inc
diff --git a/mysql-test/suite/sys_vars/r/sysvars_server_embedded.result b/mysql-test/suite/sys_vars/r/sysvars_server_embedded.result
index 7a113a02b02..8771e4ac87f 100644
--- a/mysql-test/suite/sys_vars/r/sysvars_server_embedded.result
+++ b/mysql-test/suite/sys_vars/r/sysvars_server_embedded.result
@@ -432,6 +432,16 @@ NUMERIC_BLOCK_SIZE NULL
ENUM_VALUE_LIST MIXED,STATEMENT,ROW
READ_ONLY NO
COMMAND_LINE_ARGUMENT REQUIRED
+VARIABLE_NAME BINLOG_FREE_FLUSH_THRESHOLD
+VARIABLE_SCOPE GLOBAL
+VARIABLE_TYPE BIGINT UNSIGNED
+VARIABLE_COMMENT Try to rename the binlog cache temporary file of the commiting transaction to a binlog file when its binlog cache size is bigger than the value of this variable
+NUMERIC_MIN_VALUE 10485760
+NUMERIC_MAX_VALUE 18446744073709551615
+NUMERIC_BLOCK_SIZE 1
+ENUM_VALUE_LIST NULL
+READ_ONLY NO
+COMMAND_LINE_ARGUMENT REQUIRED
VARIABLE_NAME BINLOG_GTID_INDEX
VARIABLE_SCOPE GLOBAL
VARIABLE_TYPE BOOLEAN
diff --git a/mysql-test/suite/sys_vars/r/sysvars_server_notembedded.result b/mysql-test/suite/sys_vars/r/sysvars_server_notembedded.result
index 16d77d397d4..ceb141cfc7a 100644
--- a/mysql-test/suite/sys_vars/r/sysvars_server_notembedded.result
+++ b/mysql-test/suite/sys_vars/r/sysvars_server_notembedded.result
@@ -452,6 +452,16 @@ NUMERIC_BLOCK_SIZE NULL
ENUM_VALUE_LIST MIXED,STATEMENT,ROW
READ_ONLY NO
COMMAND_LINE_ARGUMENT REQUIRED
+VARIABLE_NAME BINLOG_FREE_FLUSH_THRESHOLD
+VARIABLE_SCOPE GLOBAL
+VARIABLE_TYPE BIGINT UNSIGNED
+VARIABLE_COMMENT Try to rename the binlog cache temporary file of the commiting transaction to a binlog file when its binlog cache size is bigger than the value of this variable
+NUMERIC_MIN_VALUE 10485760
+NUMERIC_MAX_VALUE 18446744073709551615
+NUMERIC_BLOCK_SIZE 1
+ENUM_VALUE_LIST NULL
+READ_ONLY NO
+COMMAND_LINE_ARGUMENT REQUIRED
VARIABLE_NAME BINLOG_GTID_INDEX
VARIABLE_SCOPE GLOBAL
VARIABLE_TYPE BOOLEAN
diff --git a/sql/CMakeLists.txt b/sql/CMakeLists.txt
index 0195555efaf..050cd54d98c 100644
--- a/sql/CMakeLists.txt
+++ b/sql/CMakeLists.txt
@@ -107,7 +107,7 @@ SET (SQL_SOURCE
hostname.cc init.cc item.cc item_buff.cc item_cmpfunc.cc
item_create.cc item_func.cc item_geofunc.cc item_row.cc
item_strfunc.cc item_subselect.cc item_sum.cc item_timefunc.cc
- key.cc log.cc lock.cc
+ key.cc log.cc log_cache.cc lock.cc
log_event.cc log_event_server.cc
rpl_record.cc rpl_reporting.cc
mf_iocache.cc my_decimal.cc
diff --git a/sql/log.cc b/sql/log.cc
index 34f9ad745fc..3dc57b21c05 100644
--- a/sql/log.cc
+++ b/sql/log.cc
@@ -163,6 +163,111 @@ static SHOW_VAR binlog_status_vars_detail[]=
{NullS, NullS, SHOW_LONG}
};
+/**
+ This class implementes the feature to rename a binlog cache temporary file to
+ a binlog file. It is used to avoid holding LOCK_log long time when writting a
+ huge binlog cache to binlog file.
+
+ With this feature, temporary files of binlog caches will be created in
+ BINLOG_CACHE_DIR which is created in the same directory to binlog files
+ at server startup.
+*/
+class Binlog_free_flush
+{
+public:
+ Binlog_free_flush() {}
+
+ /**
+ Check whether free flush should be executed on the cache_data.
+
+ @param group_commit_entry object of current transaction
+
+ @retval true it should do free flush
+ @retval false it should do normal commit.
+ */
+ bool should_free_flush(const MYSQL_BIN_LOG::group_commit_entry *entry) const;
+
+ /**
+ This function is the entry function to do free flush. It first,
+ rotate the binlog, then rename the temporary file of the
+ binlog cache to new binlog file, after that it commits the transaction.
+
+ @param entry, group_commit_entry object of current transaction.
+
+ @retval true free flush succeeds.
+ @retval false free flush fails, it should go to normal commit process.
+ */
+ bool commit(MYSQL_BIN_LOG::group_commit_entry *entry);
+
+ /**
+ After rotate has created the new binlog file, it copies the content
+ of the new binlog file to the binlog cache, delete the new binlog file
+ and then rename the binlog cache to the new binlog file.
+
+ @retval true Succeeds to replace the binlog file.
+ @retval false Failed to replace the binlog file. It only return
+ true if some error happened after the new binlog file
+ is deleted. In this situation rotate process will fail.
+ */
+ bool replace_binlog_file();
+ /**
+ The space left is more than a gtid event required, thus the extra
+ space is padded into the gtid event as 0. This function is used
+ to calculate the real gtid size with pad.
+ */
+ size_t get_gtid_event_pad_size();
+
+ /**
+ The space required for session binlog caches to reserve. It is calculated
+ from the length of current binlog file when it is generated and aligned
+ to IO_SIZE;
+
+ @param header_len header length of current binlog file.
+ */
+ void set_reserved_bytes(uint32 header_len)
+ {
+ // Gtid event length
+ header_len+= LOG_EVENT_HEADER_LEN + Gtid_log_event::max_data_length +
+ BINLOG_CHECKSUM_LEN;
+ header_len= header_len - (header_len % IO_SIZE) + IO_SIZE;
+ if (header_len != m_reserved_bytes)
+ m_reserved_bytes= header_len;
+ }
+
+ /**
+ Return reserved space required for binlog cache. It is NOT defined as
+ an atomic variable, while it is get and set in parallel. Synchronizing
+ between set and get is not really necessary.
+ */
+ uint32 get_reserved_size()
+ {
+ return m_reserved_bytes;
+ }
+private:
+ Binlog_free_flush &operator=(const Binlog_free_flush &);
+ Binlog_free_flush(const Binlog_free_flush &);
+
+ char m_cache_dir[FN_REFLEN];
+
+ /** The commit entry of current transaction which is doing free flush. */
+ MYSQL_BIN_LOG::group_commit_entry *m_entry{nullptr};
+
+ /** The cache_data which will be renamed to binlog. */
+ binlog_cache_data *m_cache_data{nullptr};
+
+ /** It will be set to true if rename operation succeeds */
+ bool m_replaced{false};
+
+ uint32 m_reserved_bytes {IO_SIZE};
+};
+static Binlog_free_flush binlog_free_flush;
+ulonglong opt_binlog_free_flush_threshold= 10 * 1024 * 1024;
+
+uint32 binlog_cache_reserved_size()
+{
+ return binlog_free_flush.get_reserved_size();
+}
+
/*
Variables for the binlog background thread.
Protected by the MYSQL_BIN_LOG::LOCK_binlog_background_thread mutex.
@@ -3761,7 +3866,8 @@ bool MYSQL_BIN_LOG::open(const char *log_name,
enum cache_type io_cache_type_arg,
ulong max_size_arg,
bool null_created_arg,
- bool need_mutex)
+ bool need_mutex,
+ bool is_free_flush)
{
xid_count_per_binlog *new_xid_list_entry= NULL, *b;
DBUG_ENTER("MYSQL_BIN_LOG::open");
@@ -4027,14 +4133,20 @@ bool MYSQL_BIN_LOG::open(const char *log_name,
goto err;
bytes_written+= description_event_for_queue->data_written;
}
+
+ // offset must be saved before replace_binlog_file(), it will update the pos
+ my_off_t offset= my_b_tell(&log_file);
+
+ if (is_free_flush && binlog_free_flush.replace_binlog_file())
+ goto err;
+
if (flush_io_cache(&log_file) ||
mysql_file_sync(log_file.file, MYF(MY_WME)))
goto err;
- my_off_t offset= my_b_tell(&log_file);
-
if (!is_relay_log)
{
+ binlog_free_flush.set_reserved_bytes((uint32)offset);
/* update binlog_end_pos so that it can be read by after sync hook */
reset_binlog_end_pos(log_file_name, offset);
@@ -4126,8 +4238,7 @@ bool MYSQL_BIN_LOG::open(const char *log_name,
/* Notify the io thread that binlog is rotated to a new file */
if (is_relay_log)
signal_relay_log_update();
- else
- update_binlog_end_pos();
+
DBUG_RETURN(0);
err:
@@ -5717,9 +5828,9 @@ int MYSQL_BIN_LOG::new_file()
@retval
nonzero - error
*/
-int MYSQL_BIN_LOG::new_file_without_locking()
+int MYSQL_BIN_LOG::new_file_without_locking(bool is_free_flush)
{
- return new_file_impl();
+ return new_file_impl(is_free_flush);
}
@@ -5734,7 +5845,7 @@ int MYSQL_BIN_LOG::new_file_without_locking()
binlog_space_total will be updated if binlog_space_limit is set
*/
-int MYSQL_BIN_LOG::new_file_impl()
+int MYSQL_BIN_LOG::new_file_impl(bool is_free_flush)
{
int error= 0, close_on_error= FALSE;
char new_name[FN_REFLEN], *new_name_ptr, *old_name, *file_to_open;
@@ -5856,7 +5967,8 @@ int MYSQL_BIN_LOG::new_file_impl()
{
/* reopen the binary log file. */
file_to_open= new_name_ptr;
- error= open(old_name, new_name_ptr, 0, io_cache_type, max_size, 1, FALSE);
+ error= open(old_name, new_name_ptr, 0, io_cache_type, max_size, 1, FALSE,
+ is_free_flush);
}
/* handle reopening errors */
@@ -6207,11 +6319,11 @@ static binlog_cache_mngr *binlog_setup_cache_mngr(THD *thd)
sizeof(binlog_cache_mngr),
MYF(MY_ZEROFILL));
if (!cache_mngr ||
- open_cached_file(&cache_mngr->stmt_cache.cache_log, mysql_tmpdir,
- LOG_PREFIX, (size_t)binlog_stmt_cache_size,
+ open_cached_file(&cache_mngr->stmt_cache.cache_log, binlog_cache_dir,
+ LOG_PREFIX, (size_t) binlog_stmt_cache_size,
MYF(MY_WME | MY_TRACK_WITH_LIMIT)) ||
- open_cached_file(&cache_mngr->trx_cache.cache_log, mysql_tmpdir,
- LOG_PREFIX, (size_t)binlog_cache_size,
+ open_cached_file(&cache_mngr->trx_cache.cache_log, binlog_cache_dir,
+ LOG_PREFIX, (size_t) binlog_cache_size,
MYF(MY_WME | MY_TRACK_WITH_LIMIT)))
{
my_free(cache_mngr);
@@ -6866,7 +6978,8 @@ Event_log::prepare_pending_rows_event(THD *thd, TABLE* table,
bool
MYSQL_BIN_LOG::write_gtid_event(THD *thd, bool standalone,
bool is_transactional, uint64 commit_id,
- bool has_xid, bool is_ro_1pc)
+ bool has_xid, bool is_ro_1pc,
+ bool is_free_flush)
{
rpl_gtid gtid;
uint32 domain_id;
@@ -6934,6 +7047,9 @@ MYSQL_BIN_LOG::write_gtid_event(THD *thd, bool standalone,
}
#endif
+ if (unlikely(is_free_flush))
+ gtid_event.pad_to_size= binlog_free_flush.get_gtid_event_pad_size();
+
if (write_event(>id_event))
DBUG_RETURN(true);
status_var_add(thd->status_var.binlog_bytes_written, gtid_event.data_written);
@@ -7623,7 +7739,8 @@ MYSQL_BIN_LOG::do_checkpoint_request(ulong binlog_id)
@retval
nonzero - error in rotating routine.
*/
-int MYSQL_BIN_LOG::rotate(bool force_rotate, bool* check_purge)
+int MYSQL_BIN_LOG::rotate(bool force_rotate, bool *check_purge,
+ bool is_free_flush)
{
int error= 0;
ulonglong binlog_pos;
@@ -7664,7 +7781,7 @@ int MYSQL_BIN_LOG::rotate(bool force_rotate, bool* check_purge)
*/
mark_xids_active(binlog_id, 1);
- if (unlikely((error= new_file_without_locking())))
+ if (unlikely((error= new_file_without_locking(is_free_flush))))
{
/**
Be conservative... There are possible lost events (eg,
@@ -7965,12 +8082,14 @@ int Event_log::write_cache_raw(THD *thd, IO_CACHE *cache)
int Event_log::write_cache(THD *thd, binlog_cache_data *cache_data)
{
- int res;
IO_CACHE *cache= &cache_data->cache_log;
DBUG_ENTER("Event_log::write_cache");
mysql_mutex_assert_owner(&LOCK_log);
+ if (cache_data->init_for_read())
+ DBUG_RETURN(ER_ERROR_ON_WRITE);
+
/*
If possible, just copy the cache over byte-by-byte with pre-computed
checksums.
@@ -7979,14 +8098,12 @@ int Event_log::write_cache(THD *thd, binlog_cache_data *cache_data)
likely(!crypto.scheme) &&
likely(!opt_binlog_legacy_event_pos))
{
- int res= my_b_copy_all_to_cache(cache, &log_file);
+ int res=
+ my_b_copy_to_cache(cache, &log_file, cache_data->length_for_read());
status_var_add(thd->status_var.binlog_bytes_written, my_b_tell(cache));
DBUG_RETURN(res ? ER_ERROR_ON_WRITE : 0);
}
- if ((res= reinit_io_cache(cache, READ_CACHE, 0, 0, 0)))
- DBUG_RETURN(ER_ERROR_ON_WRITE);
-
/* Amount of remaining bytes in the IO_CACHE read buffer. */
size_t log_file_pos;
uchar header_buf[LOG_EVENT_HEADER_LEN];
@@ -8301,6 +8418,7 @@ MYSQL_BIN_LOG::write_transaction_to_binlog(THD *thd,
DBUG_RETURN(0);
}
+ entry.next= nullptr;
entry.thd= thd;
entry.cache_mngr= cache_mngr;
entry.error= 0;
@@ -8703,7 +8821,16 @@ MYSQL_BIN_LOG::queue_for_group_commit(group_commit_entry *orig_entry)
bool
MYSQL_BIN_LOG::write_transaction_to_binlog_events(group_commit_entry *entry)
{
- int is_leader= queue_for_group_commit(entry);
+ int is_leader;
+
+ if (binlog_free_flush.should_free_flush(entry) &&
+ binlog_free_flush.commit(entry))
+ {
+ is_leader= 1;
+ goto commit;
+ }
+
+ is_leader= queue_for_group_commit(entry);
#ifdef WITH_WSREP
/* commit order was released in queue_for_group_commit() call,
here we check if wsrep_commit_ordered() failed or if we are leader */
@@ -8754,6 +8881,7 @@ MYSQL_BIN_LOG::write_transaction_to_binlog_events(group_commit_entry *entry)
*/
}
+commit:
if (!opt_optimize_thread_scheduling)
{
/* For the leader, trx_group_commit_leader() already took the lock. */
@@ -8852,7 +8980,8 @@ MYSQL_BIN_LOG::write_transaction_to_binlog_events(group_commit_entry *entry)
*/
void
-MYSQL_BIN_LOG::trx_group_commit_leader(group_commit_entry *leader)
+MYSQL_BIN_LOG::trx_group_commit_leader(group_commit_entry *leader,
+ bool is_free_flush)
{
uint xid_count= 0;
my_off_t UNINIT_VAR(commit_offset);
@@ -8863,6 +8992,16 @@ MYSQL_BIN_LOG::trx_group_commit_leader(group_commit_entry *leader)
uint64 commit_id;
DBUG_ENTER("MYSQL_BIN_LOG::trx_group_commit_leader");
+ /*
+ When move a binlog cache to a binlog file, the transaction itself is
+ a group.
+ */
+ if (unlikely(is_free_flush))
+ {
+ last_in_queue= leader;
+ queue= leader;
+ }
+ else
{
#ifdef ENABLED_DEBUG_SYNC
DBUG_EXECUTE_IF("inject_binlog_commit_before_get_LOCK_log",
@@ -8888,7 +9027,6 @@ MYSQL_BIN_LOG::trx_group_commit_leader(group_commit_entry *leader)
current= group_commit_queue;
group_commit_queue= NULL;
mysql_mutex_unlock(&LOCK_prepare_ordered);
- binlog_id= current_binlog_id;
/* As the queue is in reverse order of entering, reverse it. */
last_in_queue= current;
@@ -8908,8 +9046,9 @@ MYSQL_BIN_LOG::trx_group_commit_leader(group_commit_entry *leader)
/* Now we have in queue the list of transactions to be committed in order. */
}
-
- DBUG_ASSERT(is_open());
+
+ binlog_id= current_binlog_id;
+
if (likely(is_open())) // Should always be true
{
commit_id= (last_in_queue == leader ? 0 : (uint64)leader->thd->query_id);
@@ -8944,10 +9083,11 @@ MYSQL_BIN_LOG::trx_group_commit_leader(group_commit_entry *leader)
*/
DBUG_ASSERT(!cache_mngr->stmt_cache.empty() ||
!cache_mngr->trx_cache.empty() ||
- current->thd->transaction->xid_state.is_explicit_XA());
+ current->thd->transaction->xid_state.is_explicit_XA() ||
+ is_free_flush);
- if (unlikely((current->error= write_transaction_or_stmt(current,
- commit_id))))
+ if (unlikely((current->error= write_transaction_or_stmt(
+ current, commit_id, is_free_flush))))
current->commit_errno= errno;
strmake_buf(cache_mngr->last_commit_pos_file, log_file_name);
@@ -9203,7 +9343,7 @@ MYSQL_BIN_LOG::trx_group_commit_leader(group_commit_entry *leader)
int
MYSQL_BIN_LOG::write_transaction_or_stmt(group_commit_entry *entry,
- uint64 commit_id)
+ uint64 commit_id, bool is_free_flush)
{
binlog_cache_mngr *mngr= entry->cache_mngr;
bool has_xid= entry->end_event->get_type_code() == XID_EVENT;
@@ -9224,10 +9364,17 @@ MYSQL_BIN_LOG::write_transaction_or_stmt(group_commit_entry *entry,
DBUG_ASSERT(!(entry->using_stmt_cache && !mngr->stmt_cache.empty() &&
mngr->get_binlog_cache_log(FALSE)->error));
- if (write_gtid_event(entry->thd, is_prepared_xa(entry->thd),
- entry->using_trx_cache, commit_id,
- has_xid, entry->ro_1pc))
- DBUG_RETURN(ER_ERROR_ON_WRITE);
+ /*
+ gtid will be written when renaming the binlog cache to binlog file,
+ if is_free_flush is true. Thus skip write_gtid_event here.
+ */
+ if (likely(!is_free_flush))
+ {
+ if (write_gtid_event(entry->thd, is_prepared_xa(entry->thd),
+ entry->using_trx_cache, commit_id, has_xid,
+ entry->ro_1pc))
+ DBUG_RETURN(ER_ERROR_ON_WRITE);
+ }
if (entry->using_stmt_cache && !mngr->stmt_cache.empty() &&
write_cache(entry->thd, mngr->get_binlog_cache_data(FALSE)))
@@ -12933,3 +13080,220 @@ void wsrep_register_binlog_handler(THD *thd, bool trx)
}
#endif /* WITH_WSREP */
+
+inline bool Binlog_free_flush::should_free_flush(
+ const MYSQL_BIN_LOG::group_commit_entry *entry) const
+{
+ binlog_cache_data *trx_cache= entry->cache_mngr->get_binlog_cache_data(true);
+ binlog_cache_data *stmt_cache=
+ entry->cache_mngr->get_binlog_cache_data(false);
+
+ /*
+ The binlog cache file is not encrypted in the same way with binlog, so it
+ cannot be renamed to binlog file. It is not supported to rename both
+ statement cache and transaction cache to binlog files at the same time.
+ */
+ if (unlikely(encrypt_binlog ||
+ (entry->using_stmt_cache && entry->using_trx_cache &&
+ !stmt_cache->empty() && !trx_cache->empty())))
+ return false;
+
+ binlog_cache_data *cache_data;
+ if (unlikely(entry->using_stmt_cache && !stmt_cache->empty()))
+ cache_data= stmt_cache;
+ else
+ cache_data= trx_cache;
+
+ /* Do not free flush if total_bytes smaller than limit size. */
+ if (likely(cache_data->get_byte_position() <=
+ opt_binlog_free_flush_threshold))
+ return false;
+
+ /* Do not free flush if reserve space equal to zero. */
+ if (cache_data->file_reserved_bytes() == 0)
+ return false;
+
+ /*
+ Do not free flush if no tmp file writes, happened when threshold is
+ smaller than binlog cache size.
+ */
+ if (unlikely(cache_data->cache_log.disk_writes == 0))
+ return false;
+
+ return true;
+}
+
+bool Binlog_free_flush::commit(MYSQL_BIN_LOG::group_commit_entry *entry)
+{
+ bool check_purge= false;
+ THD *thd= entry->thd;
+ binlog_cache_mngr *cache_mngr= entry->cache_mngr;
+ binlog_cache_data *cache_data= cache_mngr->get_binlog_cache_data(true);
+ if (unlikely(!entry->using_trx_cache || cache_data->empty()))
+ cache_data= cache_mngr->get_binlog_cache_data(false);
+
+ /* Sync the temp file before enter log_lock to avoid holding the lock long */
+ if (cache_data->sync_temp_file())
+ return false;
+
+ thd->wait_for_prior_commit();
+
+ // It will be released by trx_group_commit_leader
+ mysql_mutex_lock(&mysql_bin_log.LOCK_log);
+
+ enum enum_binlog_checksum_alg expected_alg=
+ mysql_bin_log.checksum_alg_reset != BINLOG_CHECKSUM_ALG_UNDEF
+ ? mysql_bin_log.checksum_alg_reset
+ : (enum_binlog_checksum_alg) binlog_checksum_options;
+
+ /*
+ In legacy mode, all events should has a valid position this done by
+ updating log_pos field when writing events from binlog cache to binlog
+ file. Thus rename binlog cache to binlog file is not supported in legacy
+ mode.
+
+ if the cache's checksum alg is not same to the binlog's checksum, it needs
+ to recalculate the checksum. Thus rename binlog cache to binlog file is
+ not supported.
+ */
+ if (!mysql_bin_log.is_open() || opt_binlog_legacy_event_pos ||
+ (expected_alg != cache_data->checksum_opt))
+ {
+ mysql_mutex_unlock(&mysql_bin_log.LOCK_log);
+ return false;
+ }
+
+ m_entry= entry;
+ m_replaced= false;
+ m_cache_data= cache_data;
+ ulong prev_binlog_id= mysql_bin_log.current_binlog_id;
+
+ /*
+ It will call replace_binlog_file() to rename the transaction's binlog cache
+ to the new binlog file.
+ */
+ if (mysql_bin_log.rotate(true, &check_purge, true /* is_free_flush */))
+ {
+ DBUG_ASSERT(!m_replaced);
+ DBUG_ASSERT(!mysql_bin_log.is_open());
+ }
+
+ if (!m_replaced)
+ {
+ mysql_mutex_unlock(&mysql_bin_log.LOCK_log);
+ if (check_purge)
+ mysql_bin_log.checkpoint_and_purge(prev_binlog_id);
+ return false;
+ }
+
+ /* Seek binlog file to the end */
+ reinit_io_cache(&mysql_bin_log.log_file, WRITE_CACHE,
+ cache_data->temp_file_length(), false, true);
+ status_var_add(m_entry->thd->status_var.binlog_bytes_written,
+ cache_data->get_byte_position());
+ m_cache_data->detach_temp_file();
+
+ mysql_bin_log.trx_group_commit_leader(entry, true /* is_free_flush */);
+
+ if (check_purge)
+ mysql_bin_log.checkpoint_and_purge(prev_binlog_id);
+ return true;
+}
+
+bool Binlog_free_flush::replace_binlog_file()
+{
+ size_t binlog_size= my_b_tell(&mysql_bin_log.log_file);
+ size_t required_size= binlog_size;
+ // space for Gtid_log_event
+ required_size+= LOG_EVENT_HEADER_LEN + Gtid_log_event::max_data_length +
+ BINLOG_CHECKSUM_LEN;
+
+ DBUG_EXECUTE_IF("simulate_required_size_too_big", required_size= 10000;);
+ if (required_size > m_cache_data->file_reserved_bytes())
+ {
+ sql_print_information("Could not rename binlog cache to binlog, "
+ "require %llu bytes but only %llu bytes reserved.",
+ required_size, m_cache_data->file_reserved_bytes());
+ return false;
+ }
+
+ File new_log_fd= -1;
+ bool ret= false;
+
+ /* Create fd for the cache file as a new binlog file fd */
+ new_log_fd= mysql_file_open(key_file_binlog, m_cache_data->temp_file_name(),
+ O_BINARY | O_CLOEXEC | O_WRONLY, MYF(MY_WME));
+ if (new_log_fd == -1)
+ return false;
+
+ /* Copy the part which has been flushed to binlog file to binlog cache */
+ if (mysql_bin_log.log_file.pos_in_file > 0)
+ {
+ size_t copy_len= 0;
+ uchar buf[IO_SIZE];
+
+ int read_fd=
+ mysql_file_open(key_file_binlog, mysql_bin_log.get_log_fname(),
+ O_RDONLY | O_BINARY | O_SHARE, MYF(MY_WME));
+ if (read_fd == -1)
+ goto err;
+
+ while (copy_len < mysql_bin_log.log_file.pos_in_file)
+ {
+ int read_len= (int) mysql_file_read(read_fd, buf, IO_SIZE, MYF(MY_WME));
+ if (read_len < 0 ||
+ mysql_file_write(new_log_fd, buf, read_len,
+ MYF(MY_WME | MY_NABP | MY_WAIT_IF_FULL)))
+ {
+ mysql_file_close(read_fd, MYF(MY_WME));
+ goto err;
+ }
+ copy_len+= read_len;
+ }
+
+ mysql_file_close(read_fd, MYF(MY_WME));
+ }
+
+ // Set the cache file as binlog file.
+ mysql_file_close(mysql_bin_log.log_file.file, MYF(MY_WME));
+ mysql_bin_log.log_file.file= new_log_fd;
+ new_log_fd= -1;
+ my_delete(mysql_bin_log.get_log_fname(), MYF(0));
+
+ /* Any error happens after the file is deleted should return true. */
+ ret= true;
+
+ if (mysql_bin_log.write_gtid_event(
+ m_entry->thd, is_prepared_xa(m_entry->thd), m_entry->using_trx_cache,
+ 0 /* commit_id */, m_entry->end_event->get_type_code() == XID_EVENT,
+ m_entry->ro_1pc, true /* is_free_flush */))
+ goto err;
+
+ DBUG_EXECUTE_IF("binlog_free_flush_crash_before_rename", DBUG_SUICIDE(););
+
+ if (DBUG_IF("simulate_rename_binlog_cache_to_binlog_error") ||
+ my_rename(m_cache_data->temp_file_name(), mysql_bin_log.get_log_fname(),
+ MYF(MY_WME)))
+ goto err;
+
+ sql_print_information("Renamed binlog cache to binlog %s",
+ mysql_bin_log.get_log_fname());
+ m_replaced= true;
+ return false;
+err:
+ if (new_log_fd != -1)
+ mysql_file_close(new_log_fd, MYF(MY_WME));
+ return ret;
+}
+
+size_t Binlog_free_flush::get_gtid_event_pad_size()
+{
+ size_t begin_pos= my_b_tell(&mysql_bin_log.log_file);
+ size_t pad_to_size=
+ m_cache_data->file_reserved_bytes() - begin_pos - LOG_EVENT_HEADER_LEN;
+
+ if (binlog_checksum_options != BINLOG_CHECKSUM_ALG_OFF)
+ pad_to_size-= BINLOG_CHECKSUM_LEN;
+
+ return pad_to_size;
+}
diff --git a/sql/log.h b/sql/log.h
index 3ee06e17264..117f7638f2b 100644
--- a/sql/log.h
+++ b/sql/log.h
@@ -600,9 +600,12 @@ class binlog_cache_mngr;
class binlog_cache_data;
struct rpl_gtid;
struct wait_for_commit;
+class Binlog_free_flush;
class MYSQL_BIN_LOG: public TC_LOG, private Event_log
{
+ friend Binlog_free_flush;
+
#ifdef HAVE_PSI_INTERFACE
/** The instrumentation key to use for @ LOCK_index. */
PSI_mutex_key m_key_LOCK_index;
@@ -756,18 +759,20 @@ class MYSQL_BIN_LOG: public TC_LOG, private Event_log
new_file() is locking. new_file_without_locking() does not acquire
LOCK_log.
*/
- int new_file_impl();
+ int new_file_impl(bool is_free_flush= false);
void do_checkpoint_request(ulong binlog_id);
- int write_transaction_or_stmt(group_commit_entry *entry, uint64 commit_id);
+ int write_transaction_or_stmt(group_commit_entry *entry, uint64 commit_id,
+ bool is_free_flush= false);
int queue_for_group_commit(group_commit_entry *entry);
bool write_transaction_to_binlog_events(group_commit_entry *entry);
- void trx_group_commit_leader(group_commit_entry *leader);
+ void trx_group_commit_leader(group_commit_entry *leader,
+ bool is_free_flush= false);
bool is_xidlist_idle_nolock();
void update_gtid_index(uint32 offset, rpl_gtid gtid);
public:
void purge(bool all);
- int new_file_without_locking();
+ int new_file_without_locking(bool is_free_flush= false);
/*
A list of struct xid_count_per_binlog is used to keep track of how many
XIDs are in prepared, but not committed, state in each binlog. And how
@@ -997,7 +1002,8 @@ class MYSQL_BIN_LOG: public TC_LOG, private Event_log
enum cache_type io_cache_type_arg,
ulong max_size,
bool null_created,
- bool need_mutex);
+ bool need_mutex,
+ bool is_free_flush = false);
bool open_index_file(const char *index_file_name_arg,
const char *log_name, bool need_mutex);
/* Use this to start writing a new log file */
@@ -1037,7 +1043,7 @@ class MYSQL_BIN_LOG: public TC_LOG, private Event_log
bool is_active(const char* log_file_name);
bool can_purge_log(const char *log_file_name, bool interactive);
int update_log_index(LOG_INFO* linfo, bool need_update_threads);
- int rotate(bool force_rotate, bool* check_purge);
+ int rotate(bool force_rotate, bool* check_purge, bool is_free_flush= false);
void checkpoint_and_purge(ulong binlog_id);
int rotate_and_purge(bool force_rotate, DYNAMIC_ARRAY* drop_gtid_domain= NULL);
/**
@@ -1117,7 +1123,8 @@ class MYSQL_BIN_LOG: public TC_LOG, private Event_log
bool is_xidlist_idle();
bool write_gtid_event(THD *thd, bool standalone, bool is_transactional,
uint64 commit_id,
- bool has_xid= false, bool ro_1pc= false);
+ bool has_xid= false, bool ro_1pc= false,
+ bool is_free_flush= false);
int read_state_from_file();
int write_state_to_file();
int get_most_recent_gtid_list(rpl_gtid **list, uint32 *size);
diff --git a/sql/log_cache.cc b/sql/log_cache.cc
new file mode 100644
index 00000000000..014e76611cf
--- /dev/null
+++ b/sql/log_cache.cc
@@ -0,0 +1,122 @@
+#include "my_global.h"
+#include "log_cache.h"
+#include "handler.h"
+#include "my_sys.h"
+#include "mysql/psi/mysql_file.h"
+#include "mysql/service_wsrep.h"
+
+const char *BINLOG_CACHE_DIR= "#binlog_cache_files";
+char binlog_cache_dir[FN_REFLEN];
+extern uint32 binlog_cache_reserved_size();
+
+bool binlog_cache_data::init_file_reserved_bytes()
+{
+ // Session's cache file is not created, so created here.
+ if (cache_log.file == -1)
+ {
+ char name[FN_REFLEN];
+
+ /* Cache file is named with PREFIX + binlog_cache_data object's address */
+ snprintf(name, FN_REFLEN, "%s/%s_%llu", cache_log.dir, cache_log.prefix,
+ (ulonglong) this);
+
+ if ((cache_log.file=
+ mysql_file_open(0, name, O_CREAT | O_RDWR, MYF(MY_WME))) < 0)
+ {
+ sql_print_error("Failed to open binlog cache temporary file %s", name);
+ cache_log.error= -1;
+ return true;
+ }
+ }
+
+#ifdef WITH_WSREP
+ /*
+ WSREP code accesses cache_log directly, so don't reserve space if WSREP is
+ on.
+ */
+ if (unlikely(wsrep_on(current_thd)))
+ return false;
+#endif
+
+ m_file_reserved_bytes= binlog_cache_reserved_size();
+ cache_log.pos_in_file= m_file_reserved_bytes;
+ cache_log.seek_not_done= 1;
+ return false;
+}
+
+void binlog_cache_data::detach_temp_file()
+{
+ /*
+ If there was a rollback_to_savepoint happened before, the real length of
+ tmp file can be greater than the file_end_pos. Truncate the cache tmp
+ file to file_end_pos of this cache.
+ */
+ my_chsize(cache_log.file, my_b_tell(&cache_log), 0, MYF(MY_WME));
+
+ mysql_file_close(cache_log.file, MYF(0));
+ cache_log.file= -1;
+ reset();
+}
+
+extern void ignore_db_dirs_append(const char *dirname_arg);
+
+bool init_binlog_cache_dir()
+{
+ size_t length;
+ uint max_tmp_file_name_len=
+ 2 /* prefix */ + 10 /* max len of thread_id */ + 1 /* underline */;
+
+ ignore_db_dirs_append(BINLOG_CACHE_DIR);
+
+ dirname_part(binlog_cache_dir, log_bin_basename, &length);
+ /*
+ Must ensure the full name of the tmp file is shorter than FN_REFLEN, to
+ avoid overflowing the name buffer in write and commit.
+ */
+ if (length + strlen(BINLOG_CACHE_DIR) + max_tmp_file_name_len >= FN_REFLEN)
+ {
+ sql_print_error("Could not create binlog cache dir %s%s. It is too long.",
+ binlog_cache_dir, BINLOG_CACHE_DIR);
+ return true;
+ }
+
+ memcpy(binlog_cache_dir + length, BINLOG_CACHE_DIR,
+ strlen(BINLOG_CACHE_DIR));
+ binlog_cache_dir[length + strlen(BINLOG_CACHE_DIR)]= 0;
+
+ MY_DIR *dir_info= my_dir(binlog_cache_dir, MYF(0));
+
+ if (!dir_info)
+ {
+ /* Make a dir for binlog cache temp files if not exist. */
+ if (my_mkdir(binlog_cache_dir, 0777, MYF(0)) < 0)
+ {
+ sql_print_error("Could not create binlog cache dir %s.",
+ binlog_cache_dir);
+ return true;
+ }
+ return false;
+ }
+
+ /* Try to delete all cache files in the directory. */
+ for (uint i= 0; i < dir_info->number_of_files; i++)
+ {
+ FILEINFO *file= dir_info->dir_entry + i;
+
+ if (strncmp(file->name, LOG_PREFIX, strlen(LOG_PREFIX)))
+ {
+ sql_print_warning("%s is in %s/, but it is not a binlog cache file",
+ file->name, BINLOG_CACHE_DIR);
+ continue;
+ }
+
+ char file_path[FN_REFLEN];
+ fn_format(file_path, file->name, binlog_cache_dir, "",
+ MYF(MY_REPLACE_DIR));
+
+ my_delete(file_path, MYF(0));
+ }
+
+ my_dirend(dir_info);
+ return false;
+}
diff --git a/sql/log_cache.h b/sql/log_cache.h
index 79a9b94d8bc..a16e85b4b73 100644
--- a/sql/log_cache.h
+++ b/sql/log_cache.h
@@ -22,6 +22,16 @@ static constexpr my_off_t MY_OFF_T_UNDEF= ~0ULL;
/** Truncate cache log files bigger than this */
static constexpr my_off_t CACHE_FILE_TRUNC_SIZE = 65536;
+/**
+ Create binlog cache directory if it doesn't exist, otherwise delete all
+ files existing in the directory.
+
+ @retval false Succeeds to initialize the directory.
+ @retval true Failed to initialize the directory.
+*/
+bool init_binlog_cache_dir();
+
+extern char binlog_cache_dir[FN_REFLEN];
/*
Helper classes to store non-transactional and transactional data
@@ -35,7 +45,7 @@ class binlog_cache_data
before_stmt_pos(MY_OFF_T_UNDEF), m_pending(0), status(0),
incident(FALSE), precompute_checksums(precompute_checksums),
saved_max_binlog_cache_size(0), ptr_binlog_cache_use(0),
- ptr_binlog_cache_disk_use(0)
+ ptr_binlog_cache_disk_use(0), m_file_reserved_bytes(0)
{
/*
Read the current checksum setting. We will use this setting to decide
@@ -47,9 +57,13 @@ class binlog_cache_data
(enum_binlog_checksum_alg)binlog_checksum_options;
}
- ~binlog_cache_data()
+ virtual ~binlog_cache_data()
{
DBUG_ASSERT(empty());
+
+ if (cache_log.file != -1 && !encrypt_tmp_files)
+ unlink(my_filename(cache_log.file));
+
close_cached_file(&cache_log);
}
@@ -67,7 +81,7 @@ class binlog_cache_data
bool empty() const
{
return (pending() == NULL &&
- (my_b_write_tell(&cache_log) == 0 ||
+ (my_b_write_tell(&cache_log) - m_file_reserved_bytes == 0 ||
((status & (LOGGED_ROW_EVENT | LOGGED_CRITICAL)) == 0)));
}
@@ -97,6 +111,8 @@ class binlog_cache_data
bool truncate_file= (cache_log.file != -1 &&
my_b_write_tell(&cache_log) >
MY_MIN(CACHE_FILE_TRUNC_SIZE, binlog_stmt_cache_size));
+ // m_file_reserved_bytes must be reset to 0, before truncate.
+ m_file_reserved_bytes= 0;
truncate(0,1); // Forget what's in cache
checksum_opt= !precompute_checksums ? BINLOG_CHECKSUM_ALG_OFF :
(enum_binlog_checksum_alg)binlog_checksum_options;
@@ -112,7 +128,8 @@ class binlog_cache_data
my_off_t get_byte_position() const
{
- return my_b_tell(&cache_log);
+ DBUG_ASSERT(cache_log.type == WRITE_CACHE);
+ return my_b_tell(&cache_log) - m_file_reserved_bytes;
}
my_off_t get_prev_position() const
@@ -172,6 +189,81 @@ class binlog_cache_data
status|= status_arg;
}
+ /**
+ This function is called everytime when anything is being written into the
+ cache_log. To support rename binlog cache to binlog file, the cache_log
+ should be initialized with reserved space.
+ */
+ bool write_prepare(size_t write_length)
+ {
+ /* Data will exceed the buffer size in this write */
+ if (unlikely(cache_log.write_pos + write_length > cache_log.write_end &&
+ cache_log.pos_in_file == 0))
+ {
+ /* Only session's binlog cache need to reserve space. */
+ if (cache_log.dir == binlog_cache_dir && !encrypt_tmp_files)
+ return init_file_reserved_bytes();
+ }
+ return false;
+ }
+
+ /**
+ For session's binlog cache, it have to call this function to skip the
+ reserved before reading the cache file.
+ */
+ bool init_for_read()
+ {
+ return reinit_io_cache(&cache_log, READ_CACHE, m_file_reserved_bytes, 0, 0);
+ }
+
+ /**
+ For session's binlog cache, it have to call this function to get the
+ actual data length.
+ */
+ my_off_t length_for_read() const
+ {
+ DBUG_ASSERT(cache_log.type == READ_CACHE);
+ return cache_log.end_of_file - m_file_reserved_bytes;
+ }
+
+ /**
+ It function returns the cache file's actual length which includes the
+ reserved space.
+ */
+ my_off_t temp_file_length()
+ {
+ return my_b_tell(&cache_log);
+ }
+
+ uint32 file_reserved_bytes() { return m_file_reserved_bytes; }
+
+ /**
+ Flush and sync the data of the file into storage.
+
+ @retval true Error happens
+ @retval false Succeeds
+ */
+ bool sync_temp_file()
+ {
+ DBUG_ASSERT(cache_log.file != -1);
+
+ if (my_b_flush_io_cache(&cache_log, 1) ||
+ mysql_file_sync(cache_log.file, MYF(MY_WME)))
+ return true;
+ return false;
+ }
+
+ /**
+ Copy the name of the cache file to the argument name.
+ */
+ const char *temp_file_name() { return my_filename(cache_log.file); }
+
+ /**
+ It is called after renaming the cache file to a binlog file. The file
+ now is a binlog file, so detach it from the binlog cache.
+ */
+ void detach_temp_file();
+
/*
Cache to store data before copying it to the binary log.
*/
@@ -253,6 +345,12 @@ class binlog_cache_data
*/
ulong *ptr_binlog_cache_disk_use;
+ /*
+ Stores the bytes reserved at the begin of the cache file. It could be
+ 0 for cases that reserved space are not supported. see write_prepare().
+ */
+ uint32 m_file_reserved_bytes {0};
+
/*
It truncates the cache to a certain position. This includes deleting the
pending event.
@@ -266,12 +364,18 @@ class binlog_cache_data
delete pending();
set_pending(0);
}
- my_bool res __attribute__((unused))=
- reinit_io_cache(&cache_log, WRITE_CACHE, pos, 0, reset_cache);
+ my_bool res __attribute__((unused))= reinit_io_cache(
+ &cache_log, WRITE_CACHE, pos + m_file_reserved_bytes, 0, reset_cache);
DBUG_ASSERT(res == 0);
cache_log.end_of_file= saved_max_binlog_cache_size;
}
+ /**
+ Reserve required space at the begin of the tempoary file. It will create
+ the temporary file if it doesn't exist.
+ */
+ bool init_file_reserved_bytes();
+
binlog_cache_data& operator=(const binlog_cache_data& info);
binlog_cache_data(const binlog_cache_data& info);
};
diff --git a/sql/log_event.h b/sql/log_event.h
index fdbd46f8d0d..8c1edfe2c0b 100644
--- a/sql/log_event.h
+++ b/sql/log_event.h
@@ -3340,6 +3340,14 @@ class Gtid_log_event: public Log_event
uint64 sa_seq_no; // start alter identifier for CA/RA
#ifdef MYSQL_SERVER
event_xid_t xid;
+ /*
+ Pad the event to this size if it is not zero. It is only used for renaming
+ a binlog cache to binlog file. There is some reserved space for gtid event
+ and the events at the begin of the binlog file. There must be some space
+ left after the events are filled. Thus the left space is padded into the
+ gtid event with 0.
+ */
+ uint64 pad_to_size;
#else
event_mysql_xid_t xid;
#endif
@@ -3404,6 +3412,11 @@ class Gtid_log_event: public Log_event
static const uchar FL_EXTRA_THREAD_ID= 16; // thread_id like in BEGIN Query
#ifdef MYSQL_SERVER
+ static const uint max_data_length= GTID_HEADER_LEN + 2 + sizeof(XID)
+ + 1 /* flags_extra: */
+ + 4 /* Extra Engines */
+ + 4 /* FL_EXTRA_THREAD_ID */;
+
Gtid_log_event(THD *thd_arg, uint64 seq_no, uint32 domain_id, bool standalone,
uint16 flags, bool is_transactional, uint64 commit_id,
bool has_xid= false, bool is_ro_1pc= false);
diff --git a/sql/log_event_server.cc b/sql/log_event_server.cc
index 9b179836651..44cec8b15fd 100644
--- a/sql/log_event_server.cc
+++ b/sql/log_event_server.cc
@@ -29,6 +29,7 @@
#include "unireg.h"
#include "log_event.h"
+#include "log_cache.h"
#include "sql_base.h" // close_thread_tables
#include "sql_cache.h" // QUERY_CACHE_FLAGS_SIZE
#include "sql_locale.h" // MY_LOCALE, my_locale_by_number, my_locale_en_US
@@ -690,6 +691,9 @@ void Log_event::init_show_field_list(THD *thd, List<Item>* field_list)
int Log_event_writer::write_internal(const uchar *pos, size_t len)
{
DBUG_ASSERT(!ctx || encrypt_or_write == &Log_event_writer::encrypt_and_write);
+ if (cache_data && cache_data->write_prepare(len))
+ return 1;
+
if (my_b_safe_write(file, pos, len))
{
DBUG_PRINT("error", ("write to log failed: %d", my_errno));
@@ -2839,7 +2843,7 @@ Gtid_log_event::Gtid_log_event(THD *thd_arg, uint64 seq_no_arg,
bool ro_1pc)
: Log_event(thd_arg, flags_arg, is_transactional),
seq_no(seq_no_arg), commit_id(commit_id_arg), domain_id(domain_id_arg),
- flags2((standalone ? FL_STANDALONE : 0) |
+ pad_to_size(0), flags2((standalone ? FL_STANDALONE : 0) |
(commit_id_arg ? FL_GROUP_COMMIT_ID : 0)),
flags_extra(0), extra_engines(0),
thread_id(thd_arg->variables.pseudo_thread_id)
@@ -2959,10 +2963,7 @@ Gtid_log_event::peek(const uchar *event_start, size_t event_len,
bool
Gtid_log_event::write(Log_event_writer *writer)
{
- uchar buf[GTID_HEADER_LEN + 2 + sizeof(XID)
- + 1 /* flags_extra: */
- + 4 /* Extra Engines */
- + 4 /* FL_EXTRA_THREAD_ID */];
+ uchar buf[max_data_length];
size_t write_len= 13;
int8store(buf, seq_no);
@@ -3042,6 +3043,27 @@ Gtid_log_event::write(Log_event_writer *writer)
bzero(buf+write_len, GTID_HEADER_LEN-write_len);
write_len= GTID_HEADER_LEN;
}
+
+ if (unlikely(pad_to_size > write_len))
+ {
+ if (write_header(writer, pad_to_size) ||
+ write_data(writer, buf, write_len))
+ return true;
+
+ pad_to_size-= write_len;
+
+ char pad_buf[IO_SIZE];
+ bzero(pad_buf, pad_to_size);
+ while (pad_to_size)
+ {
+ uint64 size= pad_to_size >= IO_SIZE ? IO_SIZE : pad_to_size;
+ if (write_data(writer, pad_buf, size))
+ return true;
+ pad_to_size-= size;
+ }
+ return write_footer(writer);
+ }
+
return write_header(writer, write_len) ||
write_data(writer, buf, write_len) ||
write_footer(writer);
diff --git a/sql/mysqld.cc b/sql/mysqld.cc
index e938e8f6cfa..a2ef34a28db 100644
--- a/sql/mysqld.cc
+++ b/sql/mysqld.cc
@@ -120,7 +120,7 @@
#include "sp_cache.h"
#include "sql_reload.h" // reload_acl_and_cache
#include "sp_head.h" // init_sp_psi_keys
-
+#include "log_cache.h"
#include <mysqld_default_groups.h>
#ifdef HAVE_POLL_H
@@ -5609,6 +5609,8 @@ static int init_server_components()
mysql_mutex_unlock(log_lock);
if (unlikely(error))
unireg_abort(1);
+ if (unlikely(init_binlog_cache_dir()))
+ unireg_abort(1);
}
#ifdef HAVE_REPLICATION
diff --git a/sql/sys_vars.cc b/sql/sys_vars.cc
index d4997793428..cd7bae8ab12 100644
--- a/sql/sys_vars.cc
+++ b/sql/sys_vars.cc
@@ -7380,3 +7380,13 @@ static Sys_var_enum Sys_block_encryption_mode(
"AES_ENCRYPT() and AES_DECRYPT() functions",
SESSION_VAR(block_encryption_mode), CMD_LINE(REQUIRED_ARG),
block_encryption_mode_values, DEFAULT(0));
+
+extern ulonglong opt_binlog_free_flush_threshold;
+static Sys_var_ulonglong Sys_binlog_free_flush_threshold(
+ "binlog_free_flush_threshold",
+ "Try to rename the binlog cache temporary file of the commiting "
+ "transaction to a binlog file when its binlog cache size "
+ "is bigger than the value of this variable",
+ GLOBAL_VAR(opt_binlog_free_flush_threshold),
+ CMD_LINE(REQUIRED_ARG), VALID_RANGE(10 * 1024 * 1024, ULLONG_MAX),
+ DEFAULT(128 * 1024 * 1024), BLOCK_SIZE(1));
--
2.39.2
1
0
A handful of fixes for test failures in buildot.
Kristian Nielsen (6):
MDEV-34696: do_gco_wait() completes too early on InnoDB dict stats
updates
Restore skiping rpl.rpl_mdev6020 under Valgrind
Fix sporadic test failure in rpl.rpl_create_drop_event
Fix sporadic failure of test case rpl.rpl_old_master
Skip mariabackup.slave_provision_nolock in --valgrind, it uses a lot
of CPU
Fix sporadic failure of test case rpl.rpl_start_stop_slave
.../mariabackup/slave_provision_nolock.test | 2 ++
mysql-test/suite/rpl/r/rpl_old_master.result | 3 ---
.../suite/rpl/t/rpl_create_drop_event.test | 6 ++++++
mysql-test/suite/rpl/t/rpl_mdev6020.test | 2 ++
mysql-test/suite/rpl/t/rpl_old_master.test | 7 -------
.../suite/rpl/t/rpl_start_stop_slave.test | 12 ++++++++++-
sql/rpl_parallel.cc | 20 +++++++++++++++----
sql/rpl_rli.cc | 17 ++++++++++++++++
8 files changed, 54 insertions(+), 15 deletions(-)
--
2.39.2
2
7

[PATCH] MDEV-34481 optimize away waiting for owned by prepared xa non-unique index record
by Kristian Nielsen 31 Jul '24
by Kristian Nielsen 31 Jul '24
31 Jul '24
From: Andrei <andrei.elkin(a)mariadb.com>
This work partly implements a ROW binlog format MDEV-33455 part
that is makes non-unique-index-only table XA replication safe in RBR.
Existence of at least one non-null unique index has always guaranteed
safety (no hang to error).
Two transaction that update a non-unique-only index table could not be
isolated on slave when on slave they used different non-unique indexes
than on master.
Unsolvable hang could be seen in case the first of the two is a prepared XA
--connection slave_worker_1
xa start 'xid'; /* ... lock here ... */ ; xa prepare 'xid'
while the 2nd being of any kind including of normal type
--connection slave_worker_2
begin; /* ... get lock ... => wait/hang...error out */
was unable to wait up for the conflicting lock, even though
the XA transaction did not really lock target records of the 2nd.
This type of hang was caused by a chosen method the 2nd transaction
employs to reach the target record, which is the index scan. The scanning
orthodoxically just could not step over a record in the way that was
locked by the XA.
However as the in-the-way record can not be targeted by the 2nd
transaction, otherwise the transactions would have sensed the conflict
back on master *and* the other possibility of collecting extra locks by the
'xid' on non-modified records is tacked by MDEV-33454/MDEV-34466,
the non-unique index scanning server/slave-applier layer must not panic at
seeing a timeout error from the engine. Instead the scanning would
just proceed to next possibly free index records of the same key value and
ultimately must reach the target one.
More generally, on the way to its target all busy records belonging to
earlier (binlog order) prepared XA transactions need not be tried locking
by the current non-unique index scanning transaction.
This patch implements the plan for Innodb.
The server layer expects the engine to mark an attempt to wait for
a conflicting lock that belongs to a transaction in prepared state.
The engine won't exercise, need not to, the timeout wait.
When marking is done the timeout error is ignored by the server
and next index record is tried out.
An mtr test checks a scenario in sequential and parallel modes.
---
sql/log_event_server.cc | 37 +++++++++++++++++++++++++-----
sql/rpl_rli.cc | 4 +++-
sql/rpl_rli.h | 14 +++++++++++
sql/sql_class.cc | 30 +++++++++++++++++++-----
storage/innobase/lock/lock0lock.cc | 30 +++++++++++++++++-------
5 files changed, 94 insertions(+), 21 deletions(-)
diff --git a/sql/log_event_server.cc b/sql/log_event_server.cc
index f3d01b7d9ed..166295ed263 100644
--- a/sql/log_event_server.cc
+++ b/sql/log_event_server.cc
@@ -8105,6 +8105,11 @@ int Rows_log_event::find_row(rpl_group_info *rgi)
if (m_key_info)
{
+ bool skip_first_compare= false;
+ bool is_index_unique=
+ (table->key_info->flags & HA_NOSAME) &&
+ !(table->key_info->flags & (HA_NULL_PART_KEY));
+
DBUG_PRINT("info",("locating record using key #%u [%s] (index_read)",
m_key_nr, m_key_info->name.str));
/* We use this to test that the correct key is used in test cases. */
@@ -8149,11 +8154,20 @@ int Rows_log_event::find_row(rpl_group_info *rgi)
HA_READ_KEY_EXACT))))
{
DBUG_PRINT("info",("no record matching the key found in the table"));
- if (error == HA_ERR_KEY_NOT_FOUND)
- error= row_not_found_error(rgi);
- table->file->print_error(error, MYF(0));
- table->file->ha_index_end();
- goto end;
+ if (error == HA_ERR_LOCK_WAIT_TIMEOUT && !is_index_unique &&
+ (rgi->exec_flags & (1 << rpl_group_info::HIT_BUSY_INDEX)))
+ {
+ rgi->exec_flags &= ~(1 << rpl_group_info::HIT_BUSY_INDEX);
+ skip_first_compare= true;
+ }
+ else
+ {
+ if (error == HA_ERR_KEY_NOT_FOUND)
+ error= row_not_found_error(rgi);
+ table->file->print_error(error, MYF(0));
+ table->file->ha_index_end();
+ goto end;
+ }
}
/*
@@ -8222,17 +8236,28 @@ int Rows_log_event::find_row(rpl_group_info *rgi)
/* We use this to test that the correct key is used in test cases. */
DBUG_EXECUTE_IF("slave_crash_if_index_scan", abort(););
- while (record_compare(table, m_vers_from_plain))
+ while (skip_first_compare || record_compare(table, m_vers_from_plain))
{
+ if (skip_first_compare)
+ skip_first_compare= false;
while ((error= table->file->ha_index_next(table->record[0])))
{
DBUG_PRINT("info",("no record matching the given row found"));
if (error == HA_ERR_END_OF_FILE)
error= end_of_file_error(rgi);
+ else if (error == HA_ERR_LOCK_WAIT_TIMEOUT && !is_index_unique &&
+ (rgi->exec_flags & (1 << rpl_group_info::HIT_BUSY_INDEX)))
+ {
+ rgi->exec_flags &= ~(1 << rpl_group_info::HIT_BUSY_INDEX);
+ continue;
+ }
+ DBUG_ASSERT(!(rgi->exec_flags & (1 << rpl_group_info::HIT_BUSY_INDEX)));
+
table->file->print_error(error, MYF(0));
table->file->ha_index_end();
goto end;
}
+ DBUG_ASSERT(!(rgi->exec_flags & (1 << rpl_group_info::HIT_BUSY_INDEX)));
}
}
else
diff --git a/sql/rpl_rli.cc b/sql/rpl_rli.cc
index 3b036cdaa79..2839b6e68b0 100644
--- a/sql/rpl_rli.cc
+++ b/sql/rpl_rli.cc
@@ -2154,12 +2154,14 @@ rpl_group_info::reinit(Relay_log_info *rli)
gtid_ignore_duplicate_state= GTID_DUPLICATE_NULL;
speculation= SPECULATE_NO;
commit_orderer.reinit();
+ exec_flags= 0;
}
rpl_group_info::rpl_group_info(Relay_log_info *rli)
: thd(0), wait_commit_sub_id(0),
wait_commit_group_info(0), parallel_entry(0),
- deferred_events(NULL), m_annotate_event(0), is_parallel_exec(false)
+ deferred_events(NULL), m_annotate_event(0), is_parallel_exec(false),
+ exec_flags(0)
{
reinit(rli);
bzero(¤t_gtid, sizeof(current_gtid));
diff --git a/sql/rpl_rli.h b/sql/rpl_rli.h
index 3f24481839b..a9ec44b58cb 100644
--- a/sql/rpl_rli.h
+++ b/sql/rpl_rli.h
@@ -833,6 +833,15 @@ struct rpl_group_info
RETRY_KILL_KILLED
};
uchar killed_for_retry;
+ enum enum_exec_flags {
+ HIT_BUSY_INDEX= 0
+ };
+ /*
+ Being executed replication event's context. It could contain
+ notification from engine such as attempts to lock already acquired
+ index record.
+ */
+ uint32 exec_flags;
rpl_group_info(Relay_log_info *rli_);
~rpl_group_info();
@@ -960,6 +969,11 @@ struct rpl_group_info
if (!is_parallel_exec)
rli->event_relay_log_pos= future_event_relay_log_pos;
}
+
+ bool is_row_event_execution()
+ {
+ return m_table_map.count() > 0;
+ }
};
diff --git a/sql/sql_class.cc b/sql/sql_class.cc
index 6419a58fbe4..b2321cfd865 100644
--- a/sql/sql_class.cc
+++ b/sql/sql_class.cc
@@ -5418,7 +5418,9 @@ thd_need_wait_reports(const MYSQL_THD thd)
/*
Used by storage engines (currently InnoDB) to report that
one transaction THD is about to go to wait for a transactional lock held by
- another transactions OTHER_THD.
+ another transactions OTHER_THD. Also in case of a non-unique index locking
+ the slave applier is informed on a prepared XA transaction holding
+ an offending lock.
This is used for parallel replication, where transactions are required to
commit in the same order on the slave as they did on the master. If the
@@ -5432,6 +5434,9 @@ thd_need_wait_reports(const MYSQL_THD thd)
slave in the first place. However, it is possible in case when the optimizer
chooses a different plan on the slave than on the master (eg. table scan
instead of index scan).
+ When the latter takes places and the conflict on a non-unique index involves
+ the slave applier the latter needs to know whether the lock onwer is
+ a prepared XA and this is provided.
Storage engines report lock waits using this call. If a lock wait causes a
deadlock with the pre-determined commit order, we kill the later
@@ -5444,20 +5449,33 @@ thd_need_wait_reports(const MYSQL_THD thd)
transaction.
*/
extern "C" int
-thd_rpl_deadlock_check(MYSQL_THD thd, MYSQL_THD other_thd)
+thd_rpl_deadlock_check(MYSQL_THD thd, MYSQL_THD other_thd,
+ bool is_other_prepared, bool is_index_unique,
+ uint *flagged)
{
- rpl_group_info *rgi;
- rpl_group_info *other_rgi;
+ rpl_group_info *rgi= thd ? thd->rgi_slave : NULL;
+ rpl_group_info *other_rgi= other_thd ? other_thd->rgi_slave : NULL;
if (!thd)
return 0;
DEBUG_SYNC(thd, "thd_report_wait_for");
thd->transaction->stmt.mark_trans_did_wait();
+
+ /*
+ Return to the caller the fact of the non-unique index wait lock
+ conflicts with one of a prepared state transaction.
+ */
+ if (!is_index_unique && rgi && rgi->is_row_event_execution()) {
+ if (is_other_prepared && !other_thd)
+ {
+ rgi->exec_flags |= 1 << rpl_group_info::HIT_BUSY_INDEX;
+ (*flagged)++;
+ }
+ }
+
if (!other_thd)
return 0;
binlog_report_wait_for(thd, other_thd);
- rgi= thd->rgi_slave;
- other_rgi= other_thd->rgi_slave;
if (!rgi || !other_rgi)
return 0;
if (!rgi->is_parallel_exec)
diff --git a/storage/innobase/lock/lock0lock.cc b/storage/innobase/lock/lock0lock.cc
index 7b7e7ef3e46..2aeb4774631 100644
--- a/storage/innobase/lock/lock0lock.cc
+++ b/storage/innobase/lock/lock0lock.cc
@@ -61,7 +61,10 @@ my_bool innodb_deadlock_detect;
ulong innodb_deadlock_report;
#ifdef HAVE_REPLICATION
-extern "C" void thd_rpl_deadlock_check(MYSQL_THD thd, MYSQL_THD other_thd);
+extern "C" void thd_rpl_deadlock_check(MYSQL_THD thd, MYSQL_THD other_thd,
+ bool is_prepared= false,
+ bool is_index_uniq= false,
+ uint *flagged= NULL);
extern "C" int thd_need_wait_reports(const MYSQL_THD thd);
extern "C" int thd_need_ordering_with(const MYSQL_THD thd, const MYSQL_THD other_thd);
extern "C" int thd_deadlock_victim_preference(const MYSQL_THD thd1, const MYSQL_THD thd2);
@@ -1874,10 +1877,10 @@ ATTRIBUTE_NOINLINE MY_ATTRIBUTE((nonnull, warn_unused_result))
trx->error_state= DB_DEADLOCK if trx->lock.was_chosen_as_deadlock_victim was
set when lock_sys.wait_mutex was unlocked.
@param trx transaction that may be waiting for a lock
-@param wait_lock lock that is being waited for
+@param wait_timeout either remains or can be zeroed
@return lock being waited for (may have been replaced by an equivalent one)
@retval nullptr if no lock is being waited for */
-static lock_t *lock_wait_rpl_report(trx_t *trx)
+static lock_t *lock_wait_rpl_report(trx_t *trx, ulong& wait_timeout)
{
mysql_mutex_assert_owner(&lock_sys.wait_mutex);
ut_ad(trx->state == TRX_STATE_ACTIVE);
@@ -1886,6 +1889,7 @@ static lock_t *lock_wait_rpl_report(trx_t *trx)
lock_t *wait_lock= trx->lock.wait_lock;
if (!wait_lock)
return nullptr;
+ uint count_non_unique_index_hit= 0;
/* This would likely be too large to attempt to use a memory transaction,
even for wait_lock->is_table(). */
const bool nowait= lock_sys.wr_lock_try();
@@ -1903,6 +1907,8 @@ static lock_t *lock_wait_rpl_report(trx_t *trx)
lock_sys.wait_mutex was unlocked, let's check it. */
if (!nowait && trx->lock.was_chosen_as_deadlock_victim)
trx->error_state= DB_DEADLOCK;
+ if (count_non_unique_index_hit > 0)
+ wait_timeout= 0;
return wait_lock;
}
ut_ad(wait_lock->is_waiting());
@@ -1940,7 +1946,11 @@ static lock_t *lock_wait_rpl_report(trx_t *trx)
lock= lock_rec_get_next(heap_no, lock);
do
if (lock->trx->mysql_thd != thd)
- thd_rpl_deadlock_check(thd, lock->trx->mysql_thd);
+ thd_rpl_deadlock_check(thd, lock->trx->mysql_thd,
+ lock->trx->state == TRX_STATE_PREPARED,
+ (lock->index->is_unique() &&
+ !lock->index->n_nullable),
+ &count_non_unique_index_hit);
while ((lock= lock_rec_get_next(heap_no, lock)));
}
}
@@ -1971,7 +1981,7 @@ dberr_t lock_wait(que_thr_t *thr)
/* InnoDB system transactions may use the global value of
innodb_lock_wait_timeout, because trx->mysql_thd == NULL. */
- const ulong innodb_lock_wait_timeout= trx_lock_wait_timeout_get(trx);
+ ulong innodb_lock_wait_timeout= trx_lock_wait_timeout_get(trx);
const my_hrtime_t suspend_time= my_hrtime_coarse();
ut_ad(!trx->dict_operation_lock_mode);
@@ -2034,7 +2044,6 @@ dberr_t lock_wait(que_thr_t *thr)
const bool row_lock_wait= thr->lock_state == QUE_THR_LOCK_ROW;
timespec abstime;
set_timespec_time_nsec(abstime, suspend_time.val * 1000);
- abstime.MY_tv_sec+= innodb_lock_wait_timeout;
/* Dictionary transactions must wait be immune to lock wait timeouts
for locks on data dictionary tables. Here we check only for
SYS_TABLES, SYS_COLUMNS, SYS_INDEXES, SYS_FIELDS. Locks on further
@@ -2094,10 +2103,15 @@ dberr_t lock_wait(que_thr_t *thr)
lock_sys.wait_start();
#ifdef HAVE_REPLICATION
+ /*
+ innodb_lock_wait_timeout can be zeroed in a specific case of non-unique
+ index locking by slave applier. When that happens no actual waiting for
+ wait_lock is done before the server layer has received the timeout error.
+ */
if (rpl)
- wait_lock= lock_wait_rpl_report(trx);
+ wait_lock= lock_wait_rpl_report(trx, innodb_lock_wait_timeout);
#endif
-
+ abstime.MY_tv_sec+= innodb_lock_wait_timeout;
switch (trx->error_state) {
case DB_SUCCESS:
break;
--
2.39.2
1
0