16 Oct '24
Follow-up from previous patch, which introduced catalog in the GTID event.
With catalog in GTID, there is no longer a need to use the catalog in
Query_log_event or Table_map_log_event. In fact, since catalog must never
change in the middle of a transaction, it is better to _only_ set the
catalog from the GTID event and don't use any catalog info from other
events.
To save space, remove the previously introduced catalog from
Table_map_log_event. (The catalog in Query_log_event is kept as it was there
for long, so as not to change event formats).
Signed-off-by: Kristian Nielsen <knielsen(a)knielsen-hq.org>
---
sql/log_event.cc | 102 ++++++++++-----------------------------
sql/log_event.h | 23 ++-------
sql/log_event_client.cc | 51 ++++++++------------
sql/log_event_server.cc | 104 +++++++++++++++++++++++-----------------
4 files changed, 110 insertions(+), 170 deletions(-)
diff --git a/sql/log_event.cc b/sql/log_event.cc
index 004010e9d99..e8bb22b0ecb 100644
--- a/sql/log_event.cc
+++ b/sql/log_event.cc
@@ -1376,7 +1376,7 @@ Query_log_event::Query_log_event(const uchar *buf, uint event_len,
Log_event_type event_type)
:Log_event(buf, description_event), data_buf(0),
catalog(0), db(NullS), query(NullS),
- status_vars_len(0),
+ catalog_len(0), status_vars_len(0),
flags2_inited(0), sql_mode_inited(0), charset_inited(0), flags2(0),
auto_increment_increment(1), auto_increment_offset(1),
time_zone_len(0), lc_time_names_number(0), charset_database_number(0),
@@ -1387,12 +1387,11 @@ Query_log_event::Query_log_event(const uchar *buf, uint event_len,
uint8 common_header_len, post_header_len;
Log_event::Byte *start;
const Log_event::Byte *end;
+ bool catalog_nz= 1;
DBUG_ENTER("Query_log_event::Query_log_event(char*,...)");
memset(&user, 0, sizeof(user));
memset(&host, 0, sizeof(host));
- catalog_name.str= 0;
- catalog_name.length= 0;
common_header_len= description_event->common_header_len;
post_header_len= description_event->post_header_len[event_type-1];
DBUG_PRINT("info",("event_len: %u common_header_len: %d post_header_len: %d",
@@ -1461,16 +1460,13 @@ Query_log_event::Query_log_event(const uchar *buf, uint event_len,
}
case Q_CATALOG_NZ_CODE:
{
- uint length;
DBUG_PRINT("info", ("case Q_CATALOG_NZ_CODE; pos:%p; end:%p",
pos, end));
- if (get_str_len_and_pointer(&pos, &catalog_name.str, &length, end))
+ if (get_str_len_and_pointer(&pos, &catalog, &catalog_len, end))
{
query= 0;
DBUG_VOID_RETURN;
}
- catalog_name.length= length;
- break;
}
break;
case Q_AUTO_INCREMENT:
@@ -1500,10 +1496,11 @@ Query_log_event::Query_log_event(const uchar *buf, uint event_len,
case Q_CATALOG_CODE: /* for 5.0.x where 0<=x<=3 masters */
{
CHECK_SPACE(pos, end, 1);
- if ((catalog_name.length= *pos))
- catalog_name.str= (char*) pos+1; // Will be copied later
- CHECK_SPACE(pos, end, catalog_name.length + 2);
- pos+= catalog_name.length+2; // leap over end 0
+ if ((catalog_len= *pos))
+ catalog= (char*) pos+1; // Will be copied later
+ CHECK_SPACE(pos, end, catalog_len + 2);
+ pos+= catalog_len+2; // leap over end 0
+ catalog_nz= 0; // catalog has end 0 in event
}
break;
case Q_LC_TIME_NAMES_CODE:
@@ -1618,7 +1615,7 @@ Query_log_event::Query_log_event(const uchar *buf, uint event_len,
#if !defined(MYSQL_CLIENT) && defined(HAVE_QUERY_CACHE)
if (!(start= data_buf= (Log_event::Byte*) my_malloc(PSI_INSTRUMENT_ME,
- catalog_name.length + 1
+ catalog_len + 1
+ time_zone_len + 1
+ user.length + 1
+ host.length + 1
@@ -1630,7 +1627,7 @@ Query_log_event::Query_log_event(const uchar *buf, uint event_len,
MYF(MY_WME))))
#else
if (!(start= data_buf= (Log_event::Byte*) my_malloc(PSI_INSTRUMENT_ME,
- catalog_name.length + 1
+ catalog_len + 1
+ time_zone_len + 1
+ user.length + 1
+ host.length + 1
@@ -1638,36 +1635,22 @@ Query_log_event::Query_log_event(const uchar *buf, uint event_len,
MYF(MY_WME))))
#endif
DBUG_VOID_RETURN;
-
- /*
- Ensure we can support old replication clients that are using 'std' as catalog name
- This is also needed to support old mtr test that uses copies of old replication logs that
- still are using 'std'.
- */
- if (catalog_name.length == 3 and memcmp(catalog_name.str, "std", 3) == 0)
- catalog_name.str= "def";
-
-#ifndef MYSQL_CLIENT
- if (catalog_name.length) // If catalog was given
+ if (catalog_len) // If catalog is given
{
- if (!(catalog= get_catalog(&catalog_name, 1)))
+ /**
+ @todo we should clean up and do only copy_str_and_move; it
+ works for both cases. Then we can remove the catalog_nz
+ flag. /sven
+ */
+ if (likely(catalog_nz)) // true except if event comes from 5.0.0|1|2|3.
+ copy_str_and_move(&catalog, &start, catalog_len);
+ else
{
- if (!user.str)
- user.str= "";
- if (!host.str)
- host.str= "";
- my_error(ER_ACCESS_NO_SUCH_CATALOG, MYF(ME_ERROR_LOG),
- user.str, host.str, catalog_name.length, catalog_name.str);
- query= 0;
- DBUG_VOID_RETURN;
+ memcpy(start, catalog, catalog_len+1); // copy end 0
+ catalog= (const char *)start;
+ start+= catalog_len+1;
}
}
- else
- catalog= default_catalog();
-#endif
-
- copy_str_and_move(&catalog_name.str, &start, catalog_name.length);
-
if (time_zone_len)
copy_str_and_move(&time_zone_str, &start, time_zone_len);
@@ -2456,13 +2439,14 @@ Gtid_log_event::Gtid_log_event(const uchar *buf, uint event_len,
return;
}
uint32_t cat_len= *buf++;
- if (unlikely(cat_len > MAX_CATALOG_NAME) ||
+ if (unlikely(cat_len >= MAX_CATALOG_NAME) ||
unlikely(buf - buf_0 + cat_len) >= event_len)
{
seq_no= 0;
return;
}
memcpy(cat_name_buf, buf, cat_len);
+ cat_name_buf[cat_len]= '\0';
cat_name_int.str= cat_name_buf;
cat_name_int.length= cat_len;
cat_name= &cat_name_int;
@@ -3341,7 +3325,6 @@ Table_map_log_event::Table_map_log_event(const uchar *buf, uint event_len,
#ifndef MYSQL_CLIENT
m_table(NULL),
#endif
- m_catalog(NULL),
m_dbnam(NULL), m_dblen(0), m_tblnam(NULL), m_tbllen(0),
m_colcnt(0), m_coltype(0),
m_memory(NULL), m_table_id(ULONGLONG_MAX), m_flags(0),
@@ -3350,7 +3333,6 @@ Table_map_log_event::Table_map_log_event(const uchar *buf, uint event_len,
m_optional_metadata_len(0), m_optional_metadata(NULL)
{
unsigned int bytes_read= 0;
- LEX_CSTRING catalog= {"",0};
uint8 common_header_len= description_event->common_header_len;
uint8 post_header_len= description_event->post_header_len[TABLE_MAP_EVENT-1];
DBUG_ENTER("Table_map_log_event::Table_map_log_event(const char*,uint,...)");
@@ -3359,9 +3341,6 @@ Table_map_log_event::Table_map_log_event(const uchar *buf, uint event_len,
"post_header_len: %d",
event_len, common_header_len, post_header_len));
- m_catnam.str="";
- m_catnam.length= 0;
-
/*
Don't print debug messages when running valgrind since they can
trigger false warnings.
@@ -3396,35 +3375,7 @@ Table_map_log_event::Table_map_log_event(const uchar *buf, uint event_len,
m_flags= uint2korr(post_start);
/* Read the variable part of the event */
- const uchar *vpart= buf + common_header_len + post_header_len;
-
- if (m_flags & TM_BIT_HAS_CATALOG_F)
- {
- size_t catalog_len= *vpart;
- const char *catalog_name= (const char*) vpart + 1;
-
- catalog.str= catalog_name;
- catalog.length= catalog_len;
- if (vpart + catalog_len + 2 > buf + event_len)
- DBUG_VOID_RETURN;
-
-#ifndef MYSQL_CLIENT
- if (!(m_catalog= get_catalog(&catalog, 1)))
- {
- my_error(ER_ACCESS_NO_SUCH_CATALOG, MYF(ME_ERROR_LOG),
- "replication", "localhost",
- catalog_len, catalog_name);
- DBUG_VOID_RETURN;
- }
-#endif
- vpart+= catalog_len+2;
- }
-#ifndef MYSQL_CLIENT
- else
- {
- m_catalog= default_catalog();
- }
-#endif /* MYSQL_CLIENT */
+ const uchar *const vpart= buf + common_header_len + post_header_len;
/* Extract the length of the various parts from the buffer */
uchar const *const ptr_dblen= (uchar const*)vpart + 0;
@@ -3449,7 +3400,6 @@ Table_map_log_event::Table_map_log_event(const uchar *buf, uint event_len,
/* Allocate mem for all fields in one go. If fails, caught in is_valid() */
m_memory= (uchar*) my_multi_malloc(PSI_INSTRUMENT_ME, MYF(MY_WME),
- &m_catnam.str, (uint) catalog.length+1,
&m_dbnam, (uint) m_dblen + 1,
&m_tblnam, (uint) m_tbllen + 1,
&m_coltype, (uint) m_colcnt,
@@ -3458,8 +3408,6 @@ Table_map_log_event::Table_map_log_event(const uchar *buf, uint event_len,
if (m_memory)
{
/* Copy the different parts into their memory */
- memcpy(const_cast<char*>(m_catnam.str), catalog.str, catalog.length+1);
- m_catnam.length= catalog.length;
strncpy(const_cast<char*>(m_dbnam), (const char*)ptr_dblen + 1, m_dblen + 1);
strncpy(const_cast<char*>(m_tblnam), (const char*)ptr_tbllen + 1, m_tbllen + 1);
memcpy(m_coltype, ptr_after_colcnt, m_colcnt);
diff --git a/sql/log_event.h b/sql/log_event.h
index 2f866efca7c..3099d693c8b 100644
--- a/sql/log_event.h
+++ b/sql/log_event.h
@@ -838,7 +838,7 @@ typedef struct st_print_event_info
them if they are unchanged.
*/
char db[FN_REFLEN+1]; // TODO: make this a LEX_STRING when thd->db is
- const SQL_CATALOG *catalog;
+ char catalog_name[MAX_CATALOG_NAME];
char charset[6]; // 3 variables, each of them storable in 2 bytes
char time_zone_str[MAX_TIME_ZONE_NAME_LENGTH];
char delimiter[16];
@@ -2082,8 +2082,7 @@ class Query_log_event: public Log_event
protected:
Log_event::Byte* data_buf;
public:
- const SQL_CATALOG *catalog;
- LEX_CSTRING catalog_name;
+ const char *catalog;
const char *db;
const char *query;
/*
@@ -2107,6 +2106,8 @@ class Query_log_event: public Log_event
concerned) from here.
*/
+ uint catalog_len; // <= 255 char; 0 means uninited
+
/*
We want to be able to store a variable number of N-bit status vars:
(generally N=32; but N=64 for SQL_MODE) a user may want to log the number
@@ -3791,19 +3792,8 @@ class Annotate_rows_log_event: public Log_event
<td>Has the following bit values</tdd>
<td>TM_BIT_LEN_EXACT_F= 1 << 0</td>
<td>TM_BIT_HAS_TRIGGERS=1 << 14</td>
- <td>TM_BIT_HAS_CATALOG= 1 << 15</td>
</tr>
- <tr>
- <td>catalog_name</td>
- <td>one byte string length, followed by null-terminated string</td>
- <td>The name of the catalog in which the table resides. The name
- is represented as a one byte unsigned integer representing the
- number of bytes in the name, followed by length bytes containing
- the database name, followed by a terminating 0 byte. (Note the
- redundancy in the representation of the length.)
- This entry only exists if bit TM_BIT_HAS_CATALOG is set.</td>
- </tr>
</table>
The Body has the following components:
@@ -4362,8 +4352,7 @@ class Table_map_log_event : public Log_event
TM_NO_FLAGS = 0U,
TM_BIT_LEN_EXACT_F = (1U << 0),
// MariaDB flags (we starts from the other end)
- TM_BIT_HAS_TRIGGERS_F= (1U << 14),
- TM_BIT_HAS_CATALOG_F= (1U << 15),
+ TM_BIT_HAS_TRIGGERS_F= (1U << 14)
};
flag_set get_flags(flag_set flag) const { return m_flags & flag; }
@@ -4468,8 +4457,6 @@ class Table_map_log_event : public Log_event
class Default_charset_iterator;
class Column_charset_iterator;
#endif
- const SQL_CATALOG *m_catalog;
- LEX_CSTRING m_catnam; // Used by mysqlbinlog
char const *m_dbnam;
size_t m_dblen;
char const *m_tblnam;
diff --git a/sql/log_event_client.cc b/sql/log_event_client.cc
index 9315fdcf32b..b73dcd5c02c 100644
--- a/sql/log_event_client.cc
+++ b/sql/log_event_client.cc
@@ -1849,14 +1849,6 @@ bool Query_log_event::print_query_header(IO_CACHE* file,
goto err;
}
}
- if (print_event_info->catalog != catalog)
- {
- print_event_info->catalog= catalog;
- if (my_b_printf(file, "/*!110600 SET CATALOG `%.*s` */%s\n",
- catalog_name.length, catalog_name.str,
- print_event_info->delimiter))
- goto err;
- }
end=int10_to_str((long) when, strmov(buff,"SET TIMESTAMP="),10);
if (when_sec_part && when_sec_part <= TIME_MAX_SECOND_PART)
@@ -3017,11 +3009,9 @@ int Table_map_log_event::rewrite_db(const char *new_db, size_t new_len,
// m_dbnam resides in m_memory together with m_tblnam and m_coltype
uchar* memory= m_memory;
char const* tblnam= m_tblnam;
- char const* catnam= m_catnam.str;
uchar* coltype= m_coltype;
m_memory= (uchar*) my_multi_malloc(PSI_NOT_INSTRUMENTED, MYF(MY_WME),
- &m_catnam.str, (uint) m_catnam.length,
&m_dbnam, (uint) m_dblen + 1,
&m_tblnam, (uint) m_tbllen + 1,
&m_coltype, (uint) m_colcnt,
@@ -3030,13 +3020,11 @@ int Table_map_log_event::rewrite_db(const char *new_db, size_t new_len,
if (!m_memory)
{
sql_print_error("Table_map_log_event::rewrite_db: "
- "failed to allocate new m_memory (%d, %d + %d + %d bytes required)",
- (int) m_catnam.length + 1,+ m_dblen + 1, m_tbllen + 1,
- m_colcnt);
+ "failed to allocate new m_memory (%d + %d + %d bytes required)",
+ m_dblen + 1, m_tbllen + 1, m_colcnt);
DBUG_RETURN(-1);
}
- memcpy((void*)m_catnam.str, catnam, m_catnam.length+1);
memcpy((void*)m_dbnam, new_db, m_dblen + 1);
memcpy((void*)m_tblnam, tblnam, m_tbllen + 1);
memcpy(m_coltype, coltype, m_colcnt);
@@ -3051,26 +3039,13 @@ bool Table_map_log_event::print(FILE *file, PRINT_EVENT_INFO *print_event_info)
if (!print_event_info->short_form)
{
char llbuff[22];
- char options[80], *ptr;
-
- ptr= options;
- if (m_flags & (TM_BIT_HAS_TRIGGERS_F | TM_BIT_HAS_CATALOG_F))
- {
- ptr= strmov(ptr, " (has ");
- if (m_flags & TM_BIT_HAS_TRIGGERS_F)
- ptr= strmov(ptr, "triggers,");
- if (m_flags & TM_BIT_HAS_CATALOG_F)
- ptr= strmov(ptr, "catalog,");
- ptr[-1]= ')';
- ptr++;
- }
- *ptr= 0;
print_header(&print_event_info->head_cache, print_event_info, TRUE);
if (my_b_printf(&print_event_info->head_cache,
"\tTable_map: %`s.%`s mapped to number %s%s\n",
m_dbnam, m_tblnam, ullstr(m_table_id, llbuff),
- options))
+ ((m_flags & TM_BIT_HAS_TRIGGERS_F) ?
+ " (has triggers)" : "")))
goto err;
}
if (!print_event_info->short_form || print_event_info->print_row_count)
@@ -3653,8 +3628,8 @@ st_print_event_info::st_print_event_info()
bzero(db, sizeof(db));
bzero(charset, sizeof(charset));
bzero(time_zone_str, sizeof(time_zone_str));
- // Set catalog to impossible value to ensure that catalog is updated later!
- catalog= (SQL_CATALOG *) 1;
+ catalog_name[0]= '\0';
+ catalog_length= 0;
delimiter[0]= ';';
delimiter[1]= 0;
flags2_inited= 0;
@@ -3725,6 +3700,7 @@ Gtid_log_event::print(FILE *file, PRINT_EVENT_INFO *print_event_info)
Write_on_release_cache::FLUSH_F, this);
char buf[21];
char buf2[21];
+ bool different;
if (!print_event_info->short_form && !is_flashback)
{
@@ -3797,6 +3773,19 @@ Gtid_log_event::print(FILE *file, PRINT_EVENT_INFO *print_event_info)
buf, print_event_info->delimiter))
goto err;
}
+
+ different= print_event_info->catalog_length != cat_name->length ||
+ memcmp(print_event_info->catalog_name, cat_name->str, cat_name->length);
+ if (different)
+ {
+ print_event_info->catalog_length= cat_name->length;
+ memcpy(print_event_info->catalog_name, cat_name->str, cat_name->length);
+ if (my_b_printf(&cache, "/*!110600 SET CATALOG `%.*s` */%s\n",
+ cat_name->length, cat_name->str,
+ print_event_info->delimiter))
+ goto err;
+ }
+
if ((flags2 & FL_PREPARED_XA) && !is_flashback)
{
my_b_write_string(&cache, "XA START ");
diff --git a/sql/log_event_server.cc b/sql/log_event_server.cc
index 22ad0a8b4a6..d3a5f616476 100644
--- a/sql/log_event_server.cc
+++ b/sql/log_event_server.cc
@@ -995,21 +995,13 @@ void Query_log_event::pack_info(Protocol *protocol)
{
char buf_mem[1024];
String buf(buf_mem, sizeof(buf_mem), system_charset_info);
- buf.real_alloc(30 + ((catalog ? catalog->name.length : 0 )+ db_len)*2 + q_len);
- if (!(flags & LOG_EVENT_SUPPRESS_USE_F))
+ buf.real_alloc(9 + db_len + q_len);
+ if (!(flags & LOG_EVENT_SUPPRESS_USE_F)
+ && db && db_len)
{
- if (catalog)
- {
- buf.append(STRING_WITH_LEN("SET CATALOG "));
- append_identifier(protocol->thd, &buf, catalog->name.str, catalog->name.length);
- buf.append(STRING_WITH_LEN("; "));
- }
- if (db && db_len)
- {
- buf.append(STRING_WITH_LEN("use "));
- append_identifier(protocol->thd, &buf, db, db_len);
- buf.append(STRING_WITH_LEN("; "));
- }
+ buf.append(STRING_WITH_LEN("use "));
+ append_identifier(protocol->thd, &buf, db, db_len);
+ buf.append(STRING_WITH_LEN("; "));
}
DBUG_ASSERT(!flags2 || flags2_inited);
@@ -1139,7 +1131,7 @@ bool Query_log_event::write()
int8store(start, (ulonglong)sql_mode);
start+= 8;
}
- store_str_with_code_and_len(&start, catalog->name.str, catalog->name.length,
+ store_str_with_code_and_len(&start, catalog, catalog_len,
(uint) Q_CATALOG_NZ_CODE);
/*
In 5.0.x where x<4 masters we used to store the end zero here. This was
@@ -1383,7 +1375,7 @@ Query_log_event::Query_log_event(THD* thd_arg, const char* query_arg,
? LOG_EVENT_THREAD_SPECIFIC_F : 0) |
(suppress_use ? LOG_EVENT_SUPPRESS_USE_F : 0),
using_trans),
- data_buf(0), catalog(thd_arg->catalog), query(query_arg),
+ data_buf(0), catalog(thd_arg->catalog->name.str), query(query_arg),
q_len((uint32) query_length),
thread_id(thd_arg->thread_id),
/* save the original thread id; we already know the server id */
@@ -1422,6 +1414,11 @@ Query_log_event::Query_log_event(THD* thd_arg, const char* query_arg,
end_time= my_time(0);
exec_time = (ulong) (end_time - thd_arg->start_time);
+ /**
+ @todo this means that if we have no catalog, then it is replicated
+ as an existing catalog of length zero. is that safe? /sven
+ */
+ catalog_len = (catalog) ? (uint32) strlen(catalog) : 0;
if (!(db= thd->db.str))
db= "";
@@ -1855,9 +1852,13 @@ int Query_log_event::do_apply_event(rpl_group_info *rgi,
bool skip_error_check= false;
DBUG_ENTER("Query_log_event::do_apply_event");
- DBUG_ASSERT(catalog);
- if (thd->catalog != catalog)
- thd->change_catalog(const_cast<SQL_CATALOG*>(catalog));
+ /*
+ Colleagues: please never free(thd->catalog) in MySQL. This would
+ lead to bugs as here thd->catalog is a part of an alloced block,
+ not an entire alloced block (see
+ Query_log_event::do_apply_event()). Same for thd->db. Thank
+ you.
+ */
rgi->start_alter_ev= this;
size_t valid_len= Well_formed_prefix(system_charset_info,
@@ -2348,11 +2349,6 @@ Query_log_event::do_shall_skip(rpl_group_info *rgi)
DBUG_ASSERT(query && q_len > 0);
DBUG_ASSERT(thd == rgi->thd);
- /* Set thd to point to the current catalog */
- DBUG_ASSERT(catalog);
- if (thd->catalog != catalog)
- thd->change_catalog(const_cast<SQL_CATALOG*>(catalog));
-
/*
An event skipped due to @@skip_replication must not be counted towards the
number of events to be skipped due to @@sql_slave_skip_counter.
@@ -3157,9 +3153,22 @@ Gtid_log_event::make_compatible_event(String *packet, bool *need_dummy_event,
void
Gtid_log_event::pack_info(Protocol *protocol)
{
- char buf[6+5+10+1+10+1+20+1+4+20+1+ ser_buf_size+5 /* sprintf */];
- char *p;
- p = strmov(buf, (flags2 & FL_STANDALONE ? "GTID " :
+ static constexpr int catalog_needed= 12 + 2*MAX_CATALOG_NAME+2 + 2;
+ char buf[catalog_needed+6+5+10+1+10+1+20+1+4+20+1+
+ ser_buf_size+5 /* sprintf */];
+ char *p= buf;
+ if (flags_extra & FL_CATALOG)
+ {
+ char buf_mem[2*MAX_CATALOG_NAME+2];
+ String buf2(buf_mem, sizeof(buf_mem), system_charset_info);
+ append_identifier(protocol->thd, &buf2, cat_name->str, cat_name->length);
+ size_t buf2_len= std::min(buf2.length(), (uint32)(2*MAX_CATALOG_NAME+2));
+ DBUG_ASSERT(buf2_len == buf2.length());
+ p= strmov(p, "SET CATALOG ");
+ memcpy(p, buf2.ptr(), buf2_len);
+ p= strmov(p + buf2_len, "; ");
+ }
+ p = strmov(p, (flags2 & FL_STANDALONE ? "GTID " :
flags2 & FL_PREPARED_XA ? "XA START " : "BEGIN GTID "));
if (flags2 & FL_PREPARED_XA)
{
@@ -3215,6 +3224,29 @@ Gtid_log_event::do_apply_event(rpl_group_info *rgi)
return 1;
}
+ if (using_catalogs)
+ {
+ if (flags_extra & FL_CATALOG)
+ {
+ if (!thd->catalog || cmp(cat_name, &thd->catalog->name))
+ {
+ SQL_CATALOG *catalog= get_catalog(cat_name, 1);
+ if (!catalog)
+ {
+ my_error(ER_NO_SUCH_CATALOG, MYF(ME_ERROR_LOG),
+ cat_name->length, cat_name->str);
+ return 1;
+ }
+ thd->change_catalog(catalog);
+ }
+ }
+ else
+ {
+ if (thd->catalog != default_catalog())
+ thd->change_catalog(default_catalog());
+ }
+ }
+
DBUG_ASSERT((bits & OPTION_GTID_BEGIN) == 0);
Master_info *mi=rgi->rli->mi;
@@ -5689,8 +5721,6 @@ Table_map_log_event::Table_map_log_event(THD *thd, TABLE *tbl, ulong tid,
bool is_transactional)
: Log_event(thd, 0, is_transactional),
m_table(tbl),
- m_catalog(thd->catalog),
- m_catnam(empty_clex_str),
m_dbnam(tbl->s->db.str),
m_dblen(m_dbnam ? tbl->s->db.length : 0),
m_tblnam(tbl->s->table_name.str),
@@ -5736,11 +5766,6 @@ Table_map_log_event::Table_map_log_event(THD *thd, TABLE *tbl, ulong tid,
if (tbl->triggers)
m_flags|= TM_BIT_HAS_TRIGGERS_F;
- if (m_catalog != default_catalog())
- {
- m_flags|= TM_BIT_HAS_CATALOG_F;
- m_data_size+= m_catalog->name.length+2;
- }
/* If malloc fails, caught in is_valid() */
if ((m_memory= (uchar*) my_malloc(PSI_INSTRUMENT_ME, m_colcnt, MYF(MY_WME))))
@@ -5910,8 +5935,6 @@ int Table_map_log_event::do_apply_event(rpl_group_info *rgi)
/* Step the query id to mark what columns that are actually used. */
thd->set_query_id(next_query_id());
- if (thd->catalog != m_catalog)
- thd->change_catalog(const_cast<SQL_CATALOG*>(m_catalog));
if (!(memory= my_multi_malloc(PSI_INSTRUMENT_ME, MYF(MY_WME),
&table_list, (uint) sizeof(RPL_TABLE_LIST),
@@ -5938,7 +5961,7 @@ int Table_map_log_event::do_apply_event(rpl_group_info *rgi)
LEX_CSTRING tmp_db_name= {db_mem, db_mem_length };
LEX_CSTRING tmp_tbl_name= {tname_mem, tname_mem_length };
- table_list->init_one_table(m_catalog, &tmp_db_name, &tmp_tbl_name, 0,
+ table_list->init_one_table(thd->catalog, &tmp_db_name, &tmp_tbl_name, 0,
TL_WRITE);
table_list->table_id= DBUG_IF("inject_tblmap_same_id_maps_diff_table") ? 0 : m_table_id;
table_list->updating= 1;
@@ -6079,13 +6102,6 @@ bool Table_map_log_event::write_data_body()
uchar mbuf[MAX_INT_WIDTH];
uchar *const mbuf_end= net_store_length(mbuf, m_field_metadata_size);
- if (m_flags & TM_BIT_HAS_CATALOG_F)
- {
- uchar const len[]= { (uchar) m_catalog->name.length };
- if (write_data(len, 1) ||
- write_data(m_catalog->name.str, m_catalog->name.length+1))
- return 1;
- }
return write_data(dbuf, sizeof(dbuf)) ||
write_data(m_dbnam, m_dblen+1) ||
write_data(tbuf, sizeof(tbuf)) ||
--
2.39.2
1
0
[PATCH] Catalogs: replicate single catalog from catalog-enabled master
by Kristian Nielsen 16 Oct '24
by Kristian Nielsen 16 Oct '24
16 Oct '24
This implements the basics to do replication from a single catalog on a
catalog-enabled master ("the cloud") to a (not necessarily catalog-enabled)
slave ("on-premise"). Such slave must only see its own events, eg. events
from the catalog of the connecting user.
Changes done:
- Extend the GTID event with the catalog (a follow-up patch will change
replication slave-side to use this when applying events).
- In the dump thread, when a catalog user does a binlog dump (eg.
connecting slave), filter events so only events from that catalog are
sent.
- Extend the server version of the connector library to be able to connect
to a specific catalog.
- Add a Master_catalog option to CHANGE MASTER to make a slave connect to a
user in a specific catalog.
Open issues:
- Possibly there needs to be a way for the catalog superuser to control
which catalogs allow the COM_BINLOG_DUMP command.
- In GTID replication, how in the single-catalog slave to maintain the GTID
position in domains that it does not receive events from. One option is
to update the GTID position (eg. with the GTID_LIST events) in the other
domains; this way the slave will have the correct global GTID position,
but it will not be possible to use multi-source to replicate two catalogs
individually to the same server. Another option is to binlog each catalog
in their own domain(s) and do master-side filtering on domain_id; then
the master will ignore irrelevant domains for a connecting catalog slave
(and not report "position too old" in those domains), and the slave will
only have its own domains in its GTID position.
Signed-off-by: Kristian Nielsen <knielsen(a)knielsen-hq.org>
---
include/mysql.h | 7 +-
include/sql_common.h | 3 +-
mysql-test/suite/rpl/r/rpl_catalogs.result | 137 +++++++++++++++++++++
mysql-test/suite/rpl/t/rpl_catalogs.test | 119 ++++++++++++++++++
sql-common/client.c | 36 +++++-
sql/lex.h | 1 +
sql/lex_string.h | 2 +-
sql/log_event.cc | 20 +++
sql/log_event.h | 13 +-
sql/log_event_server.cc | 88 ++++++++++++-
sql/privilege.h | 7 +-
sql/rpl_mi.cc | 15 ++-
sql/rpl_mi.h | 1 +
sql/slave.cc | 3 +
sql/sql_lex.h | 4 +-
sql/sql_repl.cc | 37 ++++++
sql/sql_yacc.yy | 6 +
17 files changed, 483 insertions(+), 16 deletions(-)
create mode 100644 mysql-test/suite/rpl/r/rpl_catalogs.result
create mode 100644 mysql-test/suite/rpl/t/rpl_catalogs.test
diff --git a/include/mysql.h b/include/mysql.h
index a66dcc7bd02..44ca2099694 100644
--- a/include/mysql.h
+++ b/include/mysql.h
@@ -189,14 +189,15 @@ enum mysql_option
/* MariaDB options */
MYSQL_PROGRESS_CALLBACK=5999,
MYSQL_OPT_NONBLOCK,
- MYSQL_OPT_USE_THREAD_SPECIFIC_MEMORY
+ MYSQL_OPT_USE_THREAD_SPECIFIC_MEMORY,
+ MARIADB_OPT_CATALOG=7029
};
/**
@todo remove the "extension", move st_mysql_options completely
out of mysql.h
*/
-struct st_mysql_options_extention;
+struct st_mysql_options_extension;
struct st_mysql_options {
unsigned int connect_timeout, read_timeout, write_timeout;
@@ -231,7 +232,7 @@ struct st_mysql_options {
void (*local_infile_end)(void *);
int (*local_infile_error)(void *, char *, unsigned int);
void *local_infile_userdata;
- struct st_mysql_options_extention *extension;
+ struct st_mysql_options_extension *extension;
};
enum mysql_status
diff --git a/include/sql_common.h b/include/sql_common.h
index ad5ab7e19af..c261ee9fe94 100644
--- a/include/sql_common.h
+++ b/include/sql_common.h
@@ -28,7 +28,7 @@ extern const char *cant_connect_sqlstate;
extern const char *not_error_sqlstate;
-struct st_mysql_options_extention {
+struct st_mysql_options_extension {
char *plugin_dir;
char *default_auth;
char *ssl_crl; /* PEM CRL file */
@@ -41,6 +41,7 @@ struct st_mysql_options_extention {
uint proc_info_length);
HASH connection_attributes;
size_t connection_attributes_length;
+ char *catalog;
};
typedef struct st_mysql_methods
diff --git a/mysql-test/suite/rpl/r/rpl_catalogs.result b/mysql-test/suite/rpl/r/rpl_catalogs.result
new file mode 100644
index 00000000000..4b25fd20e6c
--- /dev/null
+++ b/mysql-test/suite/rpl/r/rpl_catalogs.result
@@ -0,0 +1,137 @@
+include/master-slave.inc
+[connection master]
+*** Create some catalogs to work with.
+connection master_config;
+SET SESSION sql_log_bin= 0;
+CREATE CATALOG foo;
+CREATE CATALOG bar;
+CREATE CATALOG baz;
+SET SESSION sql_log_bin= 1;
+connection slave_config;
+SET SESSION sql_log_bin= 0;
+CREATE CATALOG foo;
+CREATE CATALOG bar;
+CREATE CATALOG baz;
+SET SESSION sql_log_bin= 1;
+*** Replicate a few events full-catalogs -> full-catalogs
+connection master_config;
+USE CATALOG foo;
+CREATE DATABASE db1;
+use db1;
+CREATE TABLE t1 (a INT PRIMARY KEY);
+INSERT INTO t1 VALUES (1);
+USE CATALOG baz;
+CREATE DATABASE db3;
+use db3;
+CREATE TABLE t3 (a INT PRIMARY KEY);
+INSERT INTO t3 VALUES (1);
+sync_slave_with_master slave;
+connection slave_config;
+USE CATALOG foo;
+use db1;
+SELECT * FROM t1 ORDER BY a;
+a
+1
+USE CATALOG baz;
+SELECT * FROM db3.t3 ORDER BY a;
+a
+1
+*** Create a normal slave user to replicate single catalog
+connection slave_config;
+include/stop_slave.inc
+CHANGE MASTER TO master_user='rpl_bar', master_password='bar_pw',
+master_catalog="bar";
+connection master_config;
+USE CATALOG bar;
+CREATE USER rpl_bar@localhost;
+GRANT replication slave, select ON *.* TO rpl_bar@localhost IDENTIFIED BY "bar_pw";
+CREATE DATABASE db2;
+use db2;
+CREATE TABLE t2 (a INT PRIMARY KEY, b INT);
+INSERT INTO t2(a) VALUES (1), (2), (3), (4), (5);
+INSERT INTO t2 VALUES (6, 0), (7, 1), (8, 2);
+USE CATALOG foo;
+INSERT INTO db1.t1 VALUES (10), (20), (30);
+USE CATALOG baz;
+INSERT INTO db3.t3 VALUES (10), (20), (30);
+USE CATALOG bar;
+use db2;
+UPDATE t2 SET b=a*a WHERE b IS NULL;
+USE CATALOG bar;
+SELECT * FROM db2.t2 ORDER BY a;
+a b
+1 1
+2 4
+3 9
+4 16
+5 25
+6 0
+7 1
+8 2
+USE CATALOG foo;
+SELECT * FROM db1.t1 ORDER BY a;
+a
+1
+10
+20
+30
+USE CATALOG baz;
+SELECT * FROM db3.t3 ORDER BY a;
+a
+1
+10
+20
+30
+USE CATALOG def;
+use test;
+connect con1,localhost,rpl_bar,bar_pw,bar.db2;
+SELECT * FROM t2 ORDER BY a;
+a b
+1 1
+2 4
+3 9
+4 16
+5 25
+6 0
+7 1
+8 2
+disconnect con1;
+connection master;
+save_master_pos;
+connection slave_config;
+include/start_slave.inc
+connection slave;
+sync_with_master;
+connection slave_config;
+USE CATALOG bar;
+SELECT * FROM db2.t2 ORDER BY a;
+a b
+1 1
+2 4
+3 9
+4 16
+5 25
+6 0
+7 1
+8 2
+USE CATALOG foo;
+SELECT * FROM db1.t1 ORDER BY a;
+a
+1
+USE CATALOG baz;
+SELECT * FROM db3.t3 ORDER BY a;
+a
+1
+*** Clean up.
+connection slave_config;
+include/stop_slave.inc
+CHANGE MASTER TO master_user='root', master_password='', master_catalog='';
+include/start_slave.inc
+connection master_config;
+USE CATALOG def;
+use test;
+DROP CATALOG foo;
+DROP CATALOG bar;
+DROP CATALOG baz;
+connection master;
+include/rpl_end.inc
diff --git a/mysql-test/suite/rpl/t/rpl_catalogs.test b/mysql-test/suite/rpl/t/rpl_catalogs.test
new file mode 100644
index 00000000000..939e68b920d
--- /dev/null
+++ b/mysql-test/suite/rpl/t/rpl_catalogs.test
@@ -0,0 +1,119 @@
+--source include/have_catalogs.inc
+--source include/master-slave.inc
+
+--echo *** Create some catalogs to work with.
+--connection master_config
+# ToDo: It seems CREATE CATALOG cannot replicate currently.
+# So don't binlog it for now.
+SET SESSION sql_log_bin= 0;
+CREATE CATALOG foo;
+CREATE CATALOG bar;
+CREATE CATALOG baz;
+SET SESSION sql_log_bin= 1;
+
+--connection slave_config
+SET SESSION sql_log_bin= 0;
+CREATE CATALOG foo;
+CREATE CATALOG bar;
+CREATE CATALOG baz;
+SET SESSION sql_log_bin= 1;
+
+
+--echo *** Replicate a few events full-catalogs -> full-catalogs
+
+--connection master_config
+USE CATALOG foo;
+CREATE DATABASE db1;
+use db1;
+CREATE TABLE t1 (a INT PRIMARY KEY);
+INSERT INTO t1 VALUES (1);
+
+USE CATALOG baz;
+CREATE DATABASE db3;
+use db3;
+CREATE TABLE t3 (a INT PRIMARY KEY);
+INSERT INTO t3 VALUES (1);
+
+--sync_slave_with_master slave
+
+--connection slave_config
+USE CATALOG foo;
+use db1;
+SELECT * FROM t1 ORDER BY a;
+USE CATALOG baz;
+SELECT * FROM db3.t3 ORDER BY a;
+
+
+--echo *** Create a normal slave user to replicate single catalog
+--connection slave_config
+--source include/stop_slave.inc
+CHANGE MASTER TO master_user='rpl_bar', master_password='bar_pw',
+ master_catalog="bar";
+
+--connection master_config
+USE CATALOG bar;
+CREATE USER rpl_bar@localhost;
+GRANT replication slave, select ON *.* TO rpl_bar@localhost IDENTIFIED BY "bar_pw";
+
+CREATE DATABASE db2;
+use db2;
+CREATE TABLE t2 (a INT PRIMARY KEY, b INT);
+INSERT INTO t2(a) VALUES (1), (2), (3), (4), (5);
+INSERT INTO t2 VALUES (6, 0), (7, 1), (8, 2);
+
+# Do something in other catalogs, see that it is not replicated.
+USE CATALOG foo;
+INSERT INTO db1.t1 VALUES (10), (20), (30);
+USE CATALOG baz;
+INSERT INTO db3.t3 VALUES (10), (20), (30);
+
+USE CATALOG bar;
+use db2;
+UPDATE t2 SET b=a*a WHERE b IS NULL;
+
+USE CATALOG bar;
+SELECT * FROM db2.t2 ORDER BY a;
+USE CATALOG foo;
+SELECT * FROM db1.t1 ORDER BY a;
+USE CATALOG baz;
+SELECT * FROM db3.t3 ORDER BY a;
+
+USE CATALOG def;
+use test;
+
+connect(con1,localhost,rpl_bar,bar_pw,bar.db2);
+SELECT * FROM t2 ORDER BY a;
+--disconnect con1
+
+--connection master
+--save_master_pos
+
+--connection slave_config
+--source include/start_slave.inc
+--connection slave
+--sync_with_master
+
+--connection slave_config
+USE CATALOG bar;
+SELECT * FROM db2.t2 ORDER BY a;
+USE CATALOG foo;
+SELECT * FROM db1.t1 ORDER BY a;
+USE CATALOG baz;
+SELECT * FROM db3.t3 ORDER BY a;
+
+--echo *** Clean up.
+
+--connection slave_config
+--source include/stop_slave.inc
+CHANGE MASTER TO master_user='root', master_password='', master_catalog='';
+--source include/start_slave.inc
+
+--connection master_config
+USE CATALOG def;
+use test;
+DROP CATALOG foo;
+DROP CATALOG bar;
+DROP CATALOG baz;
+
+--connection master
+--source include/rpl_end.inc
diff --git a/sql-common/client.c b/sql-common/client.c
index ba804ce2a7c..711f8733c9f 100644
--- a/sql-common/client.c
+++ b/sql-common/client.c
@@ -99,6 +99,7 @@ extern my_bool using_catalogs;
#define CONNECT_TIMEOUT 0
+#define MAX_CATALOG_NAME 65 /* Including terminating '\0' */
#include "client_settings.h"
#include <ssl_compat.h>
@@ -836,9 +837,9 @@ static int add_init_command(struct st_mysql_options *options, const char *cmd)
#define ALLOCATE_EXTENSIONS(OPTS) \
- (OPTS)->extension= (struct st_mysql_options_extention *) \
+ (OPTS)->extension= (struct st_mysql_options_extension *) \
my_malloc(key_memory_mysql_options, \
- sizeof(struct st_mysql_options_extention), \
+ sizeof(struct st_mysql_options_extension), \
MYF(MY_WME | MY_ZEROFILL)) \
@@ -2089,7 +2090,8 @@ static int send_client_reply_packet(MCPVIO_EXT *mpvio,
see end= buff+32 below, fixed size of the packet is 32 bytes.
+9 because data is a length encoded binary where meta data size is max 9.
*/
- buff_size= 33 + USERNAME_LENGTH + data_len + 9 + NAME_LEN + NAME_LEN + connect_attrs_len + 9;
+ buff_size= 33 + USERNAME_LENGTH + data_len + 9 + NAME_LEN + NAME_LEN +
+ connect_attrs_len + 9 + MAX_CATALOG_NAME;
buff= my_alloca(buff_size);
mysql->client_flag|= mysql->options.client_flag;
@@ -2122,10 +2124,19 @@ static int send_client_reply_packet(MCPVIO_EXT *mpvio,
if (mysql->client_flag & CLIENT_PROTOCOL_41)
{
/* 4.1 server and 4.1 client has a 32 byte option flag */
+ if (!(mysql->server_capabilities & CLIENT_MYSQL))
+ mysql->client_flag&= ~CLIENT_MYSQL;
int4store(buff,mysql->client_flag);
int4store(buff+4, net->max_packet_size);
buff[8]= (char) mysql->charset->number;
bzero(buff+9, 32-9);
+ if (!(mysql->server_capabilities & CLIENT_MYSQL))
+ {
+ /* ToDo: Should this check if the server has the catalog capability? */
+ uint client_extended_cap= mysql->options.extension->catalog ?
+ (uint)((MARIADB_CLIENT_CONNECT_CATALOG) >> 32) : (uint)0;
+ int4store(buff + 28, client_extended_cap);
+ }
end= buff+32;
}
else
@@ -2274,6 +2285,17 @@ static int send_client_reply_packet(MCPVIO_EXT *mpvio,
end= (char *) send_client_connect_attrs(mysql, (uchar *) end);
+ /* Add catalog */
+ if (mysql->options.extension->catalog)
+ {
+ size_t len= strlen(mysql->options.extension->catalog);
+ if (len >= MAX_CATALOG_NAME)
+ len= MAX_CATALOG_NAME - 1;
+ end= (char*)write_length_encoded_string4(
+ (uchar*)end, buff_size - (end - buff),
+ (uchar *)mysql->options.extension->catalog, len);
+ }
+
/* Write authentication package */
if (my_net_write(net, (uchar*) buff, (size_t) (end-buff)) || net_flush(net))
{
@@ -3332,6 +3354,7 @@ static void mysql_close_free_options(MYSQL *mysql)
my_free(mysql->options.extension->plugin_dir);
my_free(mysql->options.extension->default_auth);
my_hash_free(&mysql->options.extension->connection_attributes);
+ my_free(mysql->options.extension->catalog);
my_free(mysql->options.extension);
}
bzero((char*) &mysql->options,sizeof(mysql->options));
@@ -3850,9 +3873,9 @@ mysql_options(MYSQL *mysql,enum mysql_option option, const void *arg)
break;
case MYSQL_PROGRESS_CALLBACK:
if (!mysql->options.extension)
- mysql->options.extension= (struct st_mysql_options_extention *)
+ mysql->options.extension= (struct st_mysql_options_extension *)
my_malloc(key_memory_mysql_options,
- sizeof(struct st_mysql_options_extention),
+ sizeof(struct st_mysql_options_extension),
MYF(MY_WME | MY_ZEROFILL));
if (mysql->options.extension)
mysql->options.extension->report_progress=
@@ -3918,6 +3941,9 @@ mysql_options(MYSQL *mysql,enum mysql_option option, const void *arg)
}
}
break;
+ case MARIADB_OPT_CATALOG:
+ EXTENSION_SET_STRING(&mysql->options, catalog, arg);
+ break;
case MYSQL_SHARED_MEMORY_BASE_NAME:
default:
DBUG_RETURN(1);
diff --git a/sql/lex.h b/sql/lex.h
index bf0223ce544..9771c0e39ba 100644
--- a/sql/lex.h
+++ b/sql/lex.h
@@ -367,6 +367,7 @@ SYMBOL symbols[] = {
{ "LOOP", SYM(LOOP_SYM)},
{ "LOW_PRIORITY", SYM(LOW_PRIORITY)},
{ "MASTER", SYM(MASTER_SYM)},
+ { "MASTER_CATALOG", SYM(MASTER_CATALOG_SYM)},
{ "MASTER_CONNECT_RETRY", SYM(MASTER_CONNECT_RETRY_SYM)},
{ "MASTER_DELAY", SYM(MASTER_DELAY_SYM)},
{ "MASTER_GTID_POS", SYM(MASTER_GTID_POS_SYM)},
diff --git a/sql/lex_string.h b/sql/lex_string.h
index e7a732346c4..f49e567c64c 100644
--- a/sql/lex_string.h
+++ b/sql/lex_string.h
@@ -71,7 +71,7 @@ static inline bool lex_string_cmp(CHARSET_INFO *charset, const LEX_CSTRING *a,
}
/*
- Compare to LEX_CSTRING's and return 0 if equal
+ Compare two LEX_CSTRING's and return 0 if equal
*/
static inline bool cmp(const LEX_CSTRING *a, const LEX_CSTRING *b)
diff --git a/sql/log_event.cc b/sql/log_event.cc
index 992054c7d17..004010e9d99 100644
--- a/sql/log_event.cc
+++ b/sql/log_event.cc
@@ -2448,6 +2448,26 @@ Gtid_log_event::Gtid_log_event(const uchar *buf, uint event_len,
sa_seq_no= uint8korr(buf);
buf+= 8;
}
+ if (flags_extra & FL_CATALOG)
+ {
+ if (unlikely(buf - buf_0) >= event_len)
+ {
+ seq_no= 0;
+ return;
+ }
+ uint32_t cat_len= *buf++;
+ if (unlikely(cat_len > MAX_CATALOG_NAME) ||
+ unlikely(buf - buf_0 + cat_len) >= event_len)
+ {
+ seq_no= 0;
+ return;
+ }
+ memcpy(cat_name_buf, buf, cat_len);
+ cat_name_int.str= cat_name_buf;
+ cat_name_int.length= cat_len;
+ cat_name= &cat_name_int;
+ buf+= cat_len;
+ }
}
/*
the strict '<' part of the assert corresponds to extra zero-padded
diff --git a/sql/log_event.h b/sql/log_event.h
index bd318f147d7..2f866efca7c 100644
--- a/sql/log_event.h
+++ b/sql/log_event.h
@@ -3250,13 +3250,16 @@ class Gtid_log_event: public Log_event
public:
uint64 seq_no;
uint64 commit_id;
- uint32 domain_id;
uint64 sa_seq_no; // start alter identifier for CA/RA
+ const LEX_CSTRING *cat_name; // Points either to catalog object or own buffer
#ifdef MYSQL_SERVER
event_xid_t xid;
#else
event_mysql_xid_t xid;
#endif
+ LEX_CSTRING cat_name_int;
+ uint32 domain_id;
+ char cat_name_buf[MAX_CATALOG_NAME];
uchar flags2;
/*
More flags area placed after the regular flags2's area. The type
@@ -3309,11 +3312,15 @@ class Gtid_log_event: public Log_event
FL_EXTRA_MULTI_ENGINE_E1 is set for event group comprising a transaction
involving multiple storage engines. No flag and extra data are added
to the event when the transaction involves only one engine.
+
+ FL_CATALOG is set when a catalog name is included in the GTID (happens
+ when not the default catalog).
*/
static const uchar FL_EXTRA_MULTI_ENGINE_E1= 1;
static const uchar FL_START_ALTER_E1= 2;
static const uchar FL_COMMIT_ALTER_E1= 4;
static const uchar FL_ROLLBACK_ALTER_E1= 8;
+ static const uchar FL_CATALOG= 16;
#ifdef MYSQL_SERVER
Gtid_log_event(THD *thd_arg, uint64 seq_no, uint32 domain_id, bool standalone,
@@ -3346,6 +3353,10 @@ class Gtid_log_event: public Log_event
enum enum_binlog_checksum_alg checksum_alg,
uint32 *domain_id, uint32 *server_id, uint64 *seq_no,
uchar *flags2, const Format_description_log_event *fdev);
+ static bool peek_catalog(const uchar *event_start, size_t event_len,
+ const Format_description_log_event *fdev,
+ enum enum_binlog_checksum_alg checksum_alg,
+ uchar *out_flags2, LEX_CSTRING *out_catname);
#endif
};
diff --git a/sql/log_event_server.cc b/sql/log_event_server.cc
index ff3a5bda8e5..22ad0a8b4a6 100644
--- a/sql/log_event_server.cc
+++ b/sql/log_event_server.cc
@@ -2936,6 +2936,13 @@ Gtid_log_event::Gtid_log_event(THD *thd_arg, uint64 seq_no_arg,
if (extra_engines > 0)
flags_extra|= FL_EXTRA_MULTI_ENGINE_E1;
}
+ const SQL_CATALOG *cat= thd_arg->catalog;
+ if (cat != default_catalog())
+ {
+ flags_extra|= FL_CATALOG;
+ cat_name= &cat->name;
+ }
+
if (thd->get_binlog_flags_for_alter())
{
flags_extra |= thd->get_binlog_flags_for_alter();
@@ -2982,10 +2989,80 @@ Gtid_log_event::peek(const uchar *event_start, size_t event_len,
}
+/*
+ Obtain the catalog (if any) in the GTID (without constructing the full
+ object).
+
+ This is a separate function from Gtid_log_event::peek(), since this function
+ needs to do a lot of parsing of flags etc. to know where the catalog is, and
+ this overhead is not wanted in the often-used Gtid_log_event::peek(). But if
+ more peek-functionality would be needed in the future, it could make sense to
+ add it to this function which already has the parsing overhead.
+
+ Returns true if error (malformed or short event), false if ok. Returns the
+ name of the default catalog if catalog is not included explicitly in the GTID.
+
+ Note that the returned out_catname will point into the passed-in packet
+ memory, so will only be valid as long as the packet memory is!
+*/
+bool
+Gtid_log_event::peek_catalog(const uchar *event_start, size_t event_len,
+ const Format_description_log_event *fdev,
+ enum enum_binlog_checksum_alg checksum_alg,
+ uchar *out_flags2, LEX_CSTRING *out_catname)
+{
+ if (checksum_alg == BINLOG_CHECKSUM_ALG_CRC32)
+ {
+ if (event_len > BINLOG_CHECKSUM_LEN)
+ event_len-= BINLOG_CHECKSUM_LEN;
+ else
+ event_len= 0;
+ }
+ else
+ DBUG_ASSERT(checksum_alg == BINLOG_CHECKSUM_ALG_UNDEF ||
+ checksum_alg == BINLOG_CHECKSUM_ALG_OFF);
+
+ if (event_len < (uint32)fdev->common_header_len + GTID_HEADER_LEN)
+ return true;
+ const uchar *p= event_start + fdev->common_header_len;
+ const uchar *p_end= event_start + event_len;
+ uchar flags2= *out_flags2= p[12];
+ p+= 13; /* seq_no, domain_id, and flags2. */
+ if (flags2 & FL_GROUP_COMMIT_ID)
+ p+= 8;
+ if (flags2 & (FL_PREPARED_XA | FL_COMPLETED_XA))
+ {
+ if (p + 6 > p_end)
+ return true;
+ p+= 6 + p[4] + p[5];
+ }
+ uchar flags_extra;
+ if (p >= p_end || !((flags_extra= *p) & FL_CATALOG))
+ {
+ *out_catname= default_catalog_name;
+ return false;
+ }
+ ++p;
+
+ if (flags_extra & FL_EXTRA_MULTI_ENGINE_E1)
+ ++p;
+ if (flags_extra & (FL_COMMIT_ALTER_E1 | FL_ROLLBACK_ALTER_E1))
+ p+= 8;
+
+ uchar cat_len;
+ if (p >= p_end || (p + (cat_len= *p)) >= p_end)
+ return true;
+ out_catname->str= (const char *)p+1;
+ out_catname->length= cat_len;
+
+ return false;
+}
+
+
bool
Gtid_log_event::write()
{
- uchar buf[GTID_HEADER_LEN+2+sizeof(XID) + /* flags_extra: */ 1+4];
+ uchar buf[GTID_HEADER_LEN+2+sizeof(XID) + /* flags_extra: */ 1+1+8+MAX_CATALOG_NAME];
size_t write_len= 13;
int8store(buf, seq_no);
@@ -3026,6 +3103,15 @@ Gtid_log_event::write()
write_len+= 8;
}
+ if (flags_extra & FL_CATALOG)
+ {
+ uint32_t cat_len= std::min(cat_name->length, (size_t)(MAX_CATALOG_NAME-1));
+ DBUG_ASSERT(cat_name->length <= MAX_CATALOG_NAME-1);
+ buf[write_len++]= cat_len;
+ memcpy(buf + write_len, cat_name->str, cat_len);
+ write_len+= cat_len;
+ }
+
if (write_len < GTID_HEADER_LEN)
{
bzero(buf+write_len, GTID_HEADER_LEN-write_len);
diff --git a/sql/privilege.h b/sql/privilege.h
index 953ffb177c1..16d7f91f27d 100644
--- a/sql/privilege.h
+++ b/sql/privilege.h
@@ -305,7 +305,12 @@ constexpr privilege_t CATALOG_ACLS=
CATALOG_ACL |
SHUTDOWN_ACL |
CREATE_TABLESPACE_ACL |
- REPL_SLAVE_ACL |
+ /*
+ ToDo: REPL_SLAVE_ACL is needed to be able to replicate from a single
+ catalog to an on-premise slave. However, we may need a way for the catalog
+ superuser to control replication access for a catalog.
+ */
+// REPL_SLAVE_ACL |
BINLOG_ADMIN_ACL |
BINLOG_MONITOR_ACL |
// BINLOG_REPLAY_ACL |
diff --git a/sql/rpl_mi.cc b/sql/rpl_mi.cc
index 3c698f27a19..6a68254052a 100644
--- a/sql/rpl_mi.cc
+++ b/sql/rpl_mi.cc
@@ -47,6 +47,7 @@ Master_info::Master_info(LEX_CSTRING *connection_name_arg,
{
char *tmp;
host[0] = 0; user[0] = 0; password[0] = 0;
+ catalog_name[0]= 0;
ssl_ca[0]= 0; ssl_capath[0]= 0; ssl_cert[0]= 0;
ssl_cipher[0]= 0; ssl_key[0]= 0;
ssl_crl[0]= 0; ssl_crlpath[0]= 0;
@@ -644,6 +645,17 @@ file '%s')", fname);
}
seen_ignore_domain_ids= true;
}
+ else if (got_eq && !strcmp(buf, "master_catalog"))
+ {
+ if (init_strvar_from_file(mi->catalog_name,
+ sizeof(mi->catalog_name),
+ &mi->file, ""))
+
+ {
+ sql_print_error("Failed to initialize master info do_domain_ids");
+ goto errwithmsg;
+ }
+ }
else if (!got_eq && !strcmp(buf, "END_MARKER"))
{
/*
@@ -817,6 +829,7 @@ int flush_master_info(Master_info* mi,
"using_gtid=%d\n"
"do_domain_ids=%s\n"
"ignore_domain_ids=%s\n"
+ "master_catalog=%s\n"
"END_MARKER\n",
LINES_IN_MASTER_INFO,
mi->master_log_name, llstr(mi->master_log_pos, lbuf),
@@ -827,7 +840,7 @@ int flush_master_info(Master_info* mi,
heartbeat_buf, "", ignore_server_ids_buf,
"", 0,
mi->ssl_crl, mi->ssl_crlpath, mi->using_gtid,
- do_domain_ids_buf, ignore_domain_ids_buf);
+ do_domain_ids_buf, ignore_domain_ids_buf, mi->catalog_name);
err= flush_io_cache(file);
if (sync_masterinfo_period && !err &&
++(mi->sync_counter) >= sync_masterinfo_period)
diff --git a/sql/rpl_mi.h b/sql/rpl_mi.h
index 6058b7fb34c..98f26a4d050 100644
--- a/sql/rpl_mi.h
+++ b/sql/rpl_mi.h
@@ -213,6 +213,7 @@ class Master_info : public Slave_reporting_capability
/* the variables below are needed because we can change masters on the fly */
char master_log_name[FN_REFLEN+6]; /* Room for multi-*/
char host[HOSTNAME_LENGTH*SYSTEM_CHARSET_MBMAXLEN+1];
+ char catalog_name[MAX_CATALOG_NAME];
char user[USERNAME_LENGTH+1];
char password[MAX_PASSWORD_LENGTH*SYSTEM_CHARSET_MBMAXLEN+1];
LEX_CSTRING connection_name; /* User supplied connection name */
diff --git a/sql/slave.cc b/sql/slave.cc
index 28b183fbd69..02f58777732 100644
--- a/sql/slave.cc
+++ b/sql/slave.cc
@@ -7140,6 +7140,9 @@ static int connect_to_master(THD* thd, MYSQL* mysql, Master_info* mi,
if (opt_plugin_dir_ptr && *opt_plugin_dir_ptr)
mysql_options(mysql, MYSQL_PLUGIN_DIR, opt_plugin_dir_ptr);
+ if (mi->catalog_name[0])
+ mysql_options(mysql, MARIADB_OPT_CATALOG, mi->catalog_name);
+
/* we disallow empty users */
if (mi->user[0] == 0)
{
diff --git a/sql/sql_lex.h b/sql/sql_lex.h
index adcf45aea93..2d540c948a2 100644
--- a/sql/sql_lex.h
+++ b/sql/sql_lex.h
@@ -487,7 +487,7 @@ struct LEX_MASTER_INFO
DYNAMIC_ARRAY repl_ignore_server_ids;
DYNAMIC_ARRAY repl_do_domain_ids;
DYNAMIC_ARRAY repl_ignore_domain_ids;
- const char *host, *user, *password, *log_file_name;
+ const char *host, *catalog, *user, *password, *log_file_name;
const char *ssl_key, *ssl_cert, *ssl_ca, *ssl_capath, *ssl_cipher;
const char *ssl_crl, *ssl_crlpath;
const char *relay_log_name;
@@ -533,7 +533,7 @@ struct LEX_MASTER_INFO
delete_dynamic(&repl_ignore_domain_ids);
}
- host= user= password= log_file_name= ssl_key= ssl_cert= ssl_ca=
+ host= catalog= user= password= log_file_name= ssl_key= ssl_cert= ssl_ca=
ssl_capath= ssl_cipher= ssl_crl= ssl_crlpath= relay_log_name= NULL;
pos= relay_log_pos= server_id= port= connect_retry= 0;
heartbeat_period= 0;
diff --git a/sql/sql_repl.cc b/sql/sql_repl.cc
index dc27ab9ff8b..c3c6eadc0b3 100644
--- a/sql/sql_repl.cc
+++ b/sql/sql_repl.cc
@@ -129,6 +129,7 @@ struct binlog_send_info {
slave_connection_state *until_gtid_state;
slave_connection_state until_gtid_state_obj;
Format_description_log_event *fdev;
+ const SQL_CATALOG *catalog_filter;
int mariadb_slave_capability;
enum_gtid_skip_type gtid_skip_group;
enum_gtid_until_state gtid_until_group;
@@ -167,6 +168,7 @@ struct binlog_send_info {
char *lfn)
: thd(thd_arg), net(&thd_arg->net), packet(packet_arg),
log_file_name(lfn), until_gtid_state(NULL), fdev(NULL),
+ catalog_filter(NULL),
gtid_skip_group(GTID_SKIP_NOT), gtid_until_group(GTID_UNTIL_NOT_DONE),
flags(flags_arg), current_checksum_alg(BINLOG_CHECKSUM_ALG_UNDEF),
slave_gtid_strict_mode(false), send_fake_gtid_list(false),
@@ -459,6 +461,17 @@ inline void fix_checksum(enum_binlog_checksum_alg checksum_alg, String *packet,
}
+static const SQL_CATALOG *get_catalog_filter(THD *thd)
+{
+ if (!using_catalogs)
+ return nullptr;
+ if ((thd->security_ctx->master_access & CATALOG_ACL) &&
+ thd->catalog == default_catalog())
+ return nullptr;
+ return thd->catalog;
+}
+
+
static user_var_entry * get_binlog_checksum_uservar(THD * thd)
{
LEX_CSTRING name= { STRING_WITH_LEN("master_binlog_checksum")};
@@ -1751,6 +1764,26 @@ send_event_to_slave(binlog_send_info *info, Log_event_type event_type,
}
}
+ if (info->catalog_filter && event_type == GTID_EVENT)
+ {
+ uchar flags2;
+ LEX_CSTRING cat_name;
+ if (ev_offset > len || Gtid_log_event::peek_catalog(
+ (uchar*) packet->ptr()+ev_offset, len - ev_offset,
+ info->fdev, current_checksum_alg, &flags2, &cat_name))
+ {
+ info->error= ER_MASTER_FATAL_ERROR_READING_BINLOG;
+ return "Failed to read Gtid_log_event: corrupt binlog";
+ }
+
+ if (cmp(&info->catalog_filter->name, &cat_name))
+ {
+ /* Skip this event group as it doesn't match the user's catalog. */
+ info->gtid_skip_group= (flags2 & Gtid_log_event::FL_STANDALONE ?
+ GTID_SKIP_STANDALONE : GTID_SKIP_TRANSACTION);
+ }
+ }
+
/* Skip GTID event groups until we reach slave position within a domain_id. */
if (event_type == GTID_EVENT && info->using_gtid_state)
{
@@ -2128,6 +2161,7 @@ static int init_binlog_sender(binlog_send_info *info,
/** init last pos */
info->last_pos= *pos;
+ info->catalog_filter= get_catalog_filter(thd);
info->current_checksum_alg= get_binlog_checksum_value_at_connect(thd);
info->mariadb_slave_capability= get_mariadb_slave_capability(thd);
info->using_gtid_state= get_slave_connect_state(thd, &connect_gtid_state);
@@ -3729,6 +3763,9 @@ bool change_master(THD* thd, Master_info* mi, bool *master_info_added)
if (get_string_parameter(mi->host, lex_mi->host, sizeof(mi->host)-1,
"MASTER_HOST", system_charset_info) ||
+ get_string_parameter(mi->catalog_name, lex_mi->catalog,
+ sizeof(mi->catalog_name)-1, "MASTER_CATALOG",
+ system_charset_info) ||
get_string_parameter(mi->user, lex_mi->user, sizeof(mi->user)-1,
"MASTER_USER", system_charset_info) ||
get_string_parameter(mi->password, lex_mi->password,
diff --git a/sql/sql_yacc.yy b/sql/sql_yacc.yy
index 98e18d32e0f..8e3c93cbe51 100644
--- a/sql/sql_yacc.yy
+++ b/sql/sql_yacc.yy
@@ -923,6 +923,7 @@ bool my_yyoverflow(short **a, YYSTYPE **b, size_t *yystacksize);
%token <kwd> LOCKS_SYM
%token <kwd> LOGFILE_SYM
%token <kwd> LOGS_SYM
+%token <kwd> MASTER_CATALOG_SYM
%token <kwd> MASTER_CONNECT_RETRY_SYM
%token <kwd> MASTER_DELAY_SYM
%token <kwd> MASTER_GTID_POS_SYM
@@ -2126,6 +2127,10 @@ master_def:
{
Lex->mi.host = $3.str;
}
+ | MASTER_CATALOG_SYM '=' TEXT_STRING_sys
+ {
+ Lex->mi.catalog = $3.str;
+ }
| MASTER_USER_SYM '=' TEXT_STRING_sys
{
Lex->mi.user = $3.str;
@@ -15988,6 +15993,7 @@ keyword_sp_var_and_label:
| MASTER_HEARTBEAT_PERIOD_SYM
| MASTER_GTID_POS_SYM
| MASTER_HOST_SYM
+ | MASTER_CATALOG_SYM
| MASTER_PORT_SYM
| MASTER_LOG_FILE_SYM
| MASTER_LOG_POS_SYM
--
2.39.2
1
0
[PATCH] MDEV-35019 Provide a way to enable "rollback XA on disconnect" behavior we had before 10.5.2
by Kristian Nielsen 10 Oct '24
by Kristian Nielsen 10 Oct '24
10 Oct '24
Implement variable legacy_xa_rollback_at_disconnect to support
backwards compatibility for applications that rely on the pre-10.5
behavior for connection disconnect, which is to rollback the
transaction (in violation of the XA specification).
Signed-off-by: Kristian Nielsen <knielsen(a)knielsen-hq.org>
---
mysql-test/main/mysqld--help.result | 8 +++++++
mysql-test/main/xa.result | 16 +++++++++++++
mysql-test/main/xa.test | 19 +++++++++++++++
mysql-test/suite/rpl/r/rpl_xa.result | 24 +++++++++++++++++++
.../rpl/r/rpl_xa_gtid_pos_auto_engine.result | 24 +++++++++++++++++++
mysql-test/suite/rpl/t/rpl_xa.inc | 23 ++++++++++++++++++
.../sys_vars/r/sysvars_server_embedded.result | 10 ++++++++
.../r/sysvars_server_notembedded.result | 10 ++++++++
sql/sql_class.cc | 7 +++++-
sql/sql_class.h | 1 +
sql/sys_vars.cc | 10 ++++++++
sql/xa.h | 1 +
12 files changed, 152 insertions(+), 1 deletion(-)
diff --git a/mysql-test/main/mysqld--help.result b/mysql-test/main/mysqld--help.result
index 1a2a740cb77..ac5ab0a88b1 100644
--- a/mysql-test/main/mysqld--help.result
+++ b/mysql-test/main/mysqld--help.result
@@ -433,6 +433,13 @@ The following specify which files/extra groups are read (specified before remain
--lc-time-names=name
Set the language used for the month names and the days of
the week.
+ --legacy-xa-rollback-at-disconnect
+ If a user session disconnects after putting a transaction
+ into the XA PREPAREd state, roll back the transaction.
+ Can be used for backwards compatibility to enable this
+ pre-10.5 behavior for applications that expect it. Note
+ that this violates the XA specification and should not be
+ used for new code
--local-infile Enable LOAD DATA LOCAL INFILE
(Defaults to on; use --skip-local-infile to disable.)
--lock-wait-timeout=#
@@ -1566,6 +1573,7 @@ large-pages FALSE
lc-messages en_US
lc-messages-dir MYSQL_SHAREDIR/
lc-time-names en_US
+legacy-xa-rollback-at-disconnect FALSE
local-infile TRUE
lock-wait-timeout 86400
log-bin foo
diff --git a/mysql-test/main/xa.result b/mysql-test/main/xa.result
index faabf1cf695..58ad2a54edd 100644
--- a/mysql-test/main/xa.result
+++ b/mysql-test/main/xa.result
@@ -592,6 +592,22 @@ formatID gtrid_length bqual_length data
xa rollback '4';
ERROR XA100: XA_RBROLLBACK: Transaction branch was rolled back
set @@global.read_only=@sav_read_only;
+# MDEV-35019: Provide a way to enable "rollback XA on disconnect" behavior we had before 10.5.2
+# Test legacy_xa_rollback_at_disconnect option.
+CREATE TABLE t1 (a INT PRIMARY KEY, b INT) ENGINE=InnoDB;
+INSERT INTO t1 VALUES (1, 0);
+connect con1,localhost,root,,;
+SET SESSION legacy_xa_rollback_at_disconnect= 1;
+XA START '5';
+INSERT INTO t1 VALUES (2, 0);
+XA END '5';
+XA PREPARE '5';
+disconnect con1;
+connection default;
+INSERT INTO t1 VALUES (3, 0);
+XA ROLLBACK '5';
+ERROR XAE04: XAER_NOTA: Unknown XID
+DROP TABLE t1;
#
# End of 10.5 tests
#
diff --git a/mysql-test/main/xa.test b/mysql-test/main/xa.test
index e1ca39be9ab..dfc97002168 100644
--- a/mysql-test/main/xa.test
+++ b/mysql-test/main/xa.test
@@ -747,6 +747,25 @@ xa rollback '4';
set @@global.read_only=@sav_read_only;
+--echo # MDEV-35019: Provide a way to enable "rollback XA on disconnect" behavior we had before 10.5.2
+--echo # Test legacy_xa_rollback_at_disconnect option.
+CREATE TABLE t1 (a INT PRIMARY KEY, b INT) ENGINE=InnoDB;
+INSERT INTO t1 VALUES (1, 0);
+connect (con1,localhost,root,,);
+SET SESSION legacy_xa_rollback_at_disconnect= 1;
+XA START '5';
+INSERT INTO t1 VALUES (2, 0);
+XA END '5';
+XA PREPARE '5';
+disconnect con1;
+
+connection default;
+--source include/wait_until_count_sessions.inc
+INSERT INTO t1 VALUES (3, 0);
+--error ER_XAER_NOTA
+XA ROLLBACK '5';
+DROP TABLE t1;
+
--echo #
--echo # End of 10.5 tests
--echo #
diff --git a/mysql-test/suite/rpl/r/rpl_xa.result b/mysql-test/suite/rpl/r/rpl_xa.result
index 061c7b360d0..7b58d3d6e06 100644
--- a/mysql-test/suite/rpl/r/rpl_xa.result
+++ b/mysql-test/suite/rpl/r/rpl_xa.result
@@ -280,4 +280,28 @@ disconnect con1;
connection master;
xa commit '1';
drop table t2, t1;
+# MDEV-35019 Provide a way to enable "rollback XA on disconnect" behavior we had before 10.5.2
+CREATE TABLE t1 (a INT PRIMARY KEY) ENGINE=InnoDB;
+INSERT INTO t1 VALUES (1), (3);
+connect con1, localhost,root;
+SET SESSION legacy_xa_rollback_at_disconnect= 1;
+XA START '3';
+INSERT INTO t1 VALUES (2);
+XA END '3';
+XA PREPARE '3';
+disconnect con1;
+connection master;
+include/save_master_gtid.inc
+SELECT * FROM t1 ORDER BY a;
+a
+1
+3
+connection slave;
+include/sync_with_master_gtid.inc
+SELECT * FROM t1 ORDER BY a;
+a
+1
+3
+connection master;
+DROP TABLE t1;
include/rpl_end.inc
diff --git a/mysql-test/suite/rpl/r/rpl_xa_gtid_pos_auto_engine.result b/mysql-test/suite/rpl/r/rpl_xa_gtid_pos_auto_engine.result
index 35625cc7026..2cb9b6c4290 100644
--- a/mysql-test/suite/rpl/r/rpl_xa_gtid_pos_auto_engine.result
+++ b/mysql-test/suite/rpl/r/rpl_xa_gtid_pos_auto_engine.result
@@ -289,6 +289,30 @@ disconnect con1;
connection master;
xa commit '1';
drop table t2, t1;
+# MDEV-35019 Provide a way to enable "rollback XA on disconnect" behavior we had before 10.5.2
+CREATE TABLE t1 (a INT PRIMARY KEY) ENGINE=InnoDB;
+INSERT INTO t1 VALUES (1), (3);
+connect con1, localhost,root;
+SET SESSION legacy_xa_rollback_at_disconnect= 1;
+XA START '3';
+INSERT INTO t1 VALUES (2);
+XA END '3';
+XA PREPARE '3';
+disconnect con1;
+connection master;
+include/save_master_gtid.inc
+SELECT * FROM t1 ORDER BY a;
+a
+1
+3
+connection slave;
+include/sync_with_master_gtid.inc
+SELECT * FROM t1 ORDER BY a;
+a
+1
+3
+connection master;
+DROP TABLE t1;
connection slave;
include/stop_slave.inc
SET @@global.gtid_pos_auto_engines="";
diff --git a/mysql-test/suite/rpl/t/rpl_xa.inc b/mysql-test/suite/rpl/t/rpl_xa.inc
index d22d2d2ef3d..050b9597ccc 100644
--- a/mysql-test/suite/rpl/t/rpl_xa.inc
+++ b/mysql-test/suite/rpl/t/rpl_xa.inc
@@ -431,3 +431,26 @@ disconnect con1;
connection master;
xa commit '1';
drop table t2, t1;
+
+--echo # MDEV-35019 Provide a way to enable "rollback XA on disconnect" behavior we had before 10.5.2
+CREATE TABLE t1 (a INT PRIMARY KEY) ENGINE=InnoDB;
+INSERT INTO t1 VALUES (1), (3);
+
+connect con1, localhost,root;
+SET SESSION legacy_xa_rollback_at_disconnect= 1;
+XA START '3';
+INSERT INTO t1 VALUES (2);
+XA END '3';
+XA PREPARE '3';
+--disconnect con1
+
+--connection master
+--source include/save_master_gtid.inc
+SELECT * FROM t1 ORDER BY a;
+
+--connection slave
+--source include/sync_with_master_gtid.inc
+SELECT * FROM t1 ORDER BY a;
+
+--connection master
+DROP TABLE t1;
diff --git a/mysql-test/suite/sys_vars/r/sysvars_server_embedded.result b/mysql-test/suite/sys_vars/r/sysvars_server_embedded.result
index fcd1ee9a141..d45ab7f80c8 100644
--- a/mysql-test/suite/sys_vars/r/sysvars_server_embedded.result
+++ b/mysql-test/suite/sys_vars/r/sysvars_server_embedded.result
@@ -1532,6 +1532,16 @@ NUMERIC_BLOCK_SIZE NULL
ENUM_VALUE_LIST NULL
READ_ONLY NO
COMMAND_LINE_ARGUMENT NULL
+VARIABLE_NAME LEGACY_XA_ROLLBACK_AT_DISCONNECT
+VARIABLE_SCOPE SESSION
+VARIABLE_TYPE BOOLEAN
+VARIABLE_COMMENT If a user session disconnects after putting a transaction into the XA PREPAREd state, roll back the transaction. Can be used for backwards compatibility to enable this pre-10.5 behavior for applications that expect it. Note that this violates the XA specification and should not be used for new code
+NUMERIC_MIN_VALUE NULL
+NUMERIC_MAX_VALUE NULL
+NUMERIC_BLOCK_SIZE NULL
+ENUM_VALUE_LIST OFF,ON
+READ_ONLY NO
+COMMAND_LINE_ARGUMENT OPTIONAL
VARIABLE_NAME LICENSE
VARIABLE_SCOPE GLOBAL
VARIABLE_TYPE VARCHAR
diff --git a/mysql-test/suite/sys_vars/r/sysvars_server_notembedded.result b/mysql-test/suite/sys_vars/r/sysvars_server_notembedded.result
index 1d5e9499c7a..73e853f12a5 100644
--- a/mysql-test/suite/sys_vars/r/sysvars_server_notembedded.result
+++ b/mysql-test/suite/sys_vars/r/sysvars_server_notembedded.result
@@ -1642,6 +1642,16 @@ NUMERIC_BLOCK_SIZE NULL
ENUM_VALUE_LIST NULL
READ_ONLY NO
COMMAND_LINE_ARGUMENT NULL
+VARIABLE_NAME LEGACY_XA_ROLLBACK_AT_DISCONNECT
+VARIABLE_SCOPE SESSION
+VARIABLE_TYPE BOOLEAN
+VARIABLE_COMMENT If a user session disconnects after putting a transaction into the XA PREPAREd state, roll back the transaction. Can be used for backwards compatibility to enable this pre-10.5 behavior for applications that expect it. Note that this violates the XA specification and should not be used for new code
+NUMERIC_MIN_VALUE NULL
+NUMERIC_MAX_VALUE NULL
+NUMERIC_BLOCK_SIZE NULL
+ENUM_VALUE_LIST OFF,ON
+READ_ONLY NO
+COMMAND_LINE_ARGUMENT OPTIONAL
VARIABLE_NAME LICENSE
VARIABLE_SCOPE GLOBAL
VARIABLE_TYPE VARCHAR
diff --git a/sql/sql_class.cc b/sql/sql_class.cc
index 7913f1a40d2..cd0418553fa 100644
--- a/sql/sql_class.cc
+++ b/sql/sql_class.cc
@@ -1552,7 +1552,12 @@ void THD::cleanup(void)
close_temporary_tables();
if (transaction->xid_state.is_explicit_XA())
- trans_xa_detach(this);
+ {
+ if (unlikely(variables.legacy_xa_rollback_at_disconnect))
+ xa_trans_force_rollback(this);
+ else
+ trans_xa_detach(this);
+ }
else
trans_rollback(this);
diff --git a/sql/sql_class.h b/sql/sql_class.h
index 7e5d9ac96e3..c0e304dd95f 100644
--- a/sql/sql_class.h
+++ b/sql/sql_class.h
@@ -846,6 +846,7 @@ typedef struct system_variables
uint in_subquery_conversion_threshold;
ulong optimizer_max_sel_arg_weight;
ulonglong max_rowid_filter_size;
+ my_bool legacy_xa_rollback_at_disconnect;
vers_asof_timestamp_t vers_asof_timestamp;
ulong vers_alter_history;
diff --git a/sql/sys_vars.cc b/sql/sys_vars.cc
index 9a4180ae000..18374544126 100644
--- a/sql/sys_vars.cc
+++ b/sql/sys_vars.cc
@@ -6761,3 +6761,13 @@ static Sys_var_ulonglong Sys_max_rowid_filter_size(
SESSION_VAR(max_rowid_filter_size), CMD_LINE(REQUIRED_ARG),
VALID_RANGE(1024, (ulonglong)~(intptr)0), DEFAULT(128*1024),
BLOCK_SIZE(1));
+
+static Sys_var_mybool Sys_legacy_xa_rollback_at_disconnect(
+ "legacy_xa_rollback_at_disconnect",
+ "If a user session disconnects after putting a transaction into the XA "
+ "PREPAREd state, roll back the transaction. Can be used for backwards "
+ "compatibility to enable this pre-10.5 behavior for applications that "
+ "expect it. Note that this violates the XA specification and should not "
+ "be used for new code",
+ SESSION_VAR(legacy_xa_rollback_at_disconnect), CMD_LINE(OPT_ARG),
+ DEFAULT(FALSE), NO_MUTEX_GUARD, NOT_IN_BINLOG);
diff --git a/sql/xa.h b/sql/xa.h
index 0b2d0696642..4260033d0fb 100644
--- a/sql/xa.h
+++ b/sql/xa.h
@@ -45,6 +45,7 @@ bool xid_cache_insert(XID *xid);
bool xid_cache_insert(THD *thd, XID_STATE *xid_state, XID *xid);
void xid_cache_delete(THD *thd, XID_STATE *xid_state);
+bool xa_trans_force_rollback(THD *thd);
bool trans_xa_start(THD *thd);
bool trans_xa_end(THD *thd);
bool trans_xa_prepare(THD *thd);
--
2.39.2
1
0