[Commits] c06bc66: MDEV-11065: Compressed binary log

Kristian Nielsen knielsen at knielsen-hq.org
Thu Oct 20 19:04:05 EEST 2016


revision-id: c06bc668166ed658e6c68a57d9a6ee42b14189e8 (mariadb-10.2.2-53-gc06bc66)
parent(s): e1c502879fea4bc95c493e429c295ba8c95e5483
author: Kristian Nielsen
committer: Kristian Nielsen
timestamp: 2016-10-20 18:00:59 +0200
message:

MDEV-11065: Compressed binary log

Minor review comments/changes:

 - A bunch of style-fixes.

 - Change macros to static inline functions.

 - Update check_event_type() with compressed event types.

 - Small .result file update.

---
 mysql-test/suite/rpl/r/rpl_checksum.result    |   2 +-
 mysql-test/t/mysqlbinlog_row_compressed.test  |   2 +-
 mysql-test/t/mysqlbinlog_stmt_compressed.test |   2 +-
 sql/log_event.cc                              | 352 ++++++++++++++------------
 sql/log_event.h                               |  60 ++++-
 sql/rpl_parallel.cc                           |   4 +-
 sql/slave.cc                                  |  14 +-
 sql/sql_binlog.cc                             |   6 +
 sql/sql_class.cc                              |   1 -
 9 files changed, 267 insertions(+), 176 deletions(-)

diff --git a/mysql-test/suite/rpl/r/rpl_checksum.result b/mysql-test/suite/rpl/r/rpl_checksum.result
index 820224d..e74e5af 100644
--- a/mysql-test/suite/rpl/r/rpl_checksum.result
+++ b/mysql-test/suite/rpl/r/rpl_checksum.result
@@ -79,7 +79,7 @@ connection slave;
 set @@global.debug_dbug='d,simulate_slave_unaware_checksum';
 start slave;
 include/wait_for_slave_io_error.inc [errno=1236]
-Last_IO_Error = 'Got fatal error 1236 from master when reading data from binary log: 'Slave can not handle replication events with the checksum that master is configured to log; the first event 'master-bin.000009' at 368, the last event read from 'master-bin.000010' at 4, the last byte read from 'master-bin.000010' at 249.''
+Last_IO_Error = 'Got fatal error 1236 from master when reading data from binary log: 'Slave can not handle replication events with the checksum that master is configured to log; the first event 'master-bin.000009' at 375, the last event read from 'master-bin.000010' at 4, the last byte read from 'master-bin.000010' at 256.''
 select count(*) as zero from t1;
 zero
 0
diff --git a/mysql-test/t/mysqlbinlog_row_compressed.test b/mysql-test/t/mysqlbinlog_row_compressed.test
index 05edf7a..3f8c31a 100644
--- a/mysql-test/t/mysqlbinlog_row_compressed.test
+++ b/mysql-test/t/mysqlbinlog_row_compressed.test
@@ -7,7 +7,7 @@
 
 #
 #  
-# mysqlbinlog: comprssed row event 
+# mysqlbinlog: compressed row event 
 # 
 #
 
diff --git a/mysql-test/t/mysqlbinlog_stmt_compressed.test b/mysql-test/t/mysqlbinlog_stmt_compressed.test
index da5e7f4..4b22683 100644
--- a/mysql-test/t/mysqlbinlog_stmt_compressed.test
+++ b/mysql-test/t/mysqlbinlog_stmt_compressed.test
@@ -7,7 +7,7 @@
 
 #
 #  
-# mysqlbinlog: comprssed query event 
+# mysqlbinlog: compressed query event 
 # 
 #
 
diff --git a/sql/log_event.cc b/sql/log_event.cc
index fbc9db1..30a7700 100644
--- a/sql/log_event.cc
+++ b/sql/log_event.cc
@@ -708,11 +708,11 @@ char *str_to_hex(char *to, const char *from, uint len)
 /**
   Compressed Record
     Record Header: 1 Byte
-                   0 Bit: Always 1, mean compressed;
-                   1-3 Bit: Reversed, compressed algorithm¡£Always 0, means zlib
-                   4-7 Bit: Bytes of "Record Original Length"
+             0 Bit: Always 1, mean compressed;
+           1-3 Bit: Reversed, compressed algorithm - Always 0, means zlib
+           4-7 Bit: Bytes of "Record Original Length"
     Record Original Length: 1-4 Bytes
-    Compressed Buf:  
+    Compressed Buf:
 */
 
 /**
@@ -721,7 +721,7 @@ char *str_to_hex(char *to, const char *from, uint len)
 
 uint32 binlog_get_compress_len(uint32 len)
 {
-	/* 5 for the begin content, 1 reserved for a '\0'*/
+    /* 5 for the begin content, 1 reserved for a '\0'*/
     return ALIGN_SIZE((BINLOG_COMPRESSED_HEADER_LEN + BINLOG_COMPRESSED_ORIGINAL_LENGTH_MAX_BYTES) 
                         + compressBound(len) + 1);
 }
@@ -769,7 +769,8 @@ int binlog_buf_compress(const char *src, char *dst, uint32 len, uint32 *comlen)
   dst[0] = 0x80 | (lenlen & 0x07);
 
   uLongf tmplen = (uLongf)*comlen - BINLOG_COMPRESSED_HEADER_LEN - lenlen - 1;
-  if (compress((Bytef *)dst + BINLOG_COMPRESSED_HEADER_LEN + lenlen, &tmplen, (const Bytef *)src, (uLongf)len) != Z_OK)
+  if (compress((Bytef *)dst + BINLOG_COMPRESSED_HEADER_LEN + lenlen, &tmplen,
+               (const Bytef *)src, (uLongf)len) != Z_OK)
   {
     return 1;
   }
@@ -779,41 +780,46 @@ int binlog_buf_compress(const char *src, char *dst, uint32 len, uint32 *comlen)
 
 /**
    Convert a query_compressed_log_event to query_log_event
-   from 'src' to 'dst'(malloced inside), the size after compress
-   stored in 'newlen'. 
+   from 'src' to 'dst', the size after compression stored in 'newlen'.
 
-   @Warning:
-      1)The caller should call my_free to release 'dst'.
+   @Note:
+      1) The caller should call my_free to release 'dst' if *is_malloc is
+         returned as true.
+      2) If *is_malloc is retuened as false, then 'dst' reuses the passed-in
+         'buf'.
 
-   return zero if successful, others otherwise.
+   return zero if successful, non-zero otherwise.
 */
 
-int query_event_uncompress(const Format_description_log_event *description_event, bool contain_checksum,
-						   const char *src, char* buf, ulong buf_size, bool* is_malloc,
-               char **dst, ulong *newlen)
+int
+query_event_uncompress(const Format_description_log_event *description_event,
+                       bool contain_checksum, const char *src, char* buf,
+                       ulong buf_size, bool* is_malloc, char **dst,
+                       ulong *newlen)
 {
   ulong len = uint4korr(src + EVENT_LEN_OFFSET);
-	const char *tmp = src;
+  const char *tmp = src;
 
   DBUG_ASSERT((uchar)src[EVENT_TYPE_OFFSET] == QUERY_COMPRESSED_EVENT);
 
-	uint8 common_header_len= description_event->common_header_len;
-	uint8 post_header_len= description_event->post_header_len[QUERY_COMPRESSED_EVENT-1];
+  uint8 common_header_len= description_event->common_header_len;
+  uint8 post_header_len=
+    description_event->post_header_len[QUERY_COMPRESSED_EVENT-1];
 
-	tmp += common_header_len;
+  tmp += common_header_len;
 
-	uint db_len = (uint)tmp[Q_DB_LEN_OFFSET];
-	uint16 status_vars_len= uint2korr(tmp + Q_STATUS_VARS_LEN_OFFSET);
+  uint db_len = (uint)tmp[Q_DB_LEN_OFFSET];
+  uint16 status_vars_len= uint2korr(tmp + Q_STATUS_VARS_LEN_OFFSET);
 
-	tmp += post_header_len + status_vars_len + db_len + 1;
+  tmp += post_header_len + status_vars_len + db_len + 1;
 
-	uint32 un_len = binlog_get_uncompress_len(tmp);
-	*newlen = (tmp - src) + un_len;
+  uint32 un_len = binlog_get_uncompress_len(tmp);
+  *newlen = (tmp - src) + un_len;
   if(contain_checksum)
     *newlen += BINLOG_CHECKSUM_LEN;
   
   uint32 alloc_size = ALIGN_SIZE(*newlen);
-	char *new_dst = NULL;
+  char *new_dst = NULL;
 
   *is_malloc = false;
   if (alloc_size <= buf_size) 
@@ -822,7 +828,7 @@ int query_event_uncompress(const Format_description_log_event *description_event
   }
   else 
   {
-	  new_dst = (char *)my_malloc(alloc_size, MYF(MY_WME));
+    new_dst = (char *)my_malloc(alloc_size, MYF(MY_WME));
     if (!new_dst)
       return 1;
 
@@ -830,42 +836,47 @@ int query_event_uncompress(const Format_description_log_event *description_event
   }
 
   /* copy the head*/
-	memcpy(new_dst, src , tmp - src);
-	if (binlog_buf_uncompress(tmp, new_dst + (tmp - src), len - (tmp - src), &un_len))
-	{
+  memcpy(new_dst, src , tmp - src);
+  if (binlog_buf_uncompress(tmp, new_dst + (tmp - src),
+                            len - (tmp - src), &un_len))
+  {
     if (*is_malloc)
-		  my_free(new_dst);
+      my_free(new_dst);
 
     *is_malloc = false;
 
-		return 1;
-	}
+    return 1;
+  }
 
   new_dst[EVENT_TYPE_OFFSET] = QUERY_EVENT;
-	int4store(new_dst + EVENT_LEN_OFFSET, *newlen);
-  if(contain_checksum){
+  int4store(new_dst + EVENT_LEN_OFFSET, *newlen);
+  if(contain_checksum)
+  {
     ulong clear_len = *newlen - BINLOG_CHECKSUM_LEN;
-    int4store(new_dst + clear_len, my_checksum(0L, (uchar *)new_dst, clear_len));
+    int4store(new_dst + clear_len,
+              my_checksum(0L, (uchar *)new_dst, clear_len));
   }
   *dst = new_dst;
-	return 0;
+  return 0;
 }
 
-int row_log_event_uncompress(const Format_description_log_event *description_event, bool contain_checksum,
-                             const char *src, char* buf, ulong buf_size, bool* is_malloc,
-                             char **dst, ulong *newlen)
+int
+row_log_event_uncompress(const Format_description_log_event *description_event,
+                         bool contain_checksum, const char *src, char* buf,
+                         ulong buf_size, bool* is_malloc, char **dst,
+                         ulong *newlen)
 {
   Log_event_type type = (Log_event_type)(uchar)src[EVENT_TYPE_OFFSET];
   ulong len = uint4korr(src + EVENT_LEN_OFFSET);
-	const char *tmp = src;
+  const char *tmp = src;
   char *new_dst = NULL;
 
   DBUG_ASSERT(LOG_EVENT_IS_ROW_COMPRESSED(type));
 
-	uint8 common_header_len= description_event->common_header_len;
-	uint8 post_header_len= description_event->post_header_len[type-1];
+  uint8 common_header_len= description_event->common_header_len;
+  uint8 post_header_len= description_event->post_header_len[type-1];
 
-	tmp += common_header_len + ROWS_HEADER_LEN_V1;
+  tmp += common_header_len + ROWS_HEADER_LEN_V1;
   if (post_header_len == ROWS_HEADER_LEN_V2)
   {
     /*
@@ -879,15 +890,16 @@ int row_log_event_uncompress(const Format_description_log_event *description_eve
     tmp += var_header_len;
 
     /* get the uncompressed event type */
-    type = (Log_event_type)(type - WRITE_ROWS_COMPRESSED_EVENT + WRITE_ROWS_EVENT);
+    type=
+      (Log_event_type)(type - WRITE_ROWS_COMPRESSED_EVENT + WRITE_ROWS_EVENT);
   }
   else 
   {
     /* get the uncompressed event type */
-    type = (Log_event_type)(type - WRITE_ROWS_COMPRESSED_EVENT_V1 + WRITE_ROWS_EVENT_V1);
+    type= (Log_event_type)
+      (type - WRITE_ROWS_COMPRESSED_EVENT_V1 + WRITE_ROWS_EVENT_V1);
   }
 
-
   ulong m_width = net_field_length((uchar **)&tmp);
   tmp += (m_width + 7) / 8;
 
@@ -896,8 +908,8 @@ int row_log_event_uncompress(const Format_description_log_event *description_eve
     tmp += (m_width + 7) / 8;
   }
 
-	uint32 un_len = binlog_get_uncompress_len(tmp);
-	*newlen = (tmp - src) + un_len;
+  uint32 un_len = binlog_get_uncompress_len(tmp);
+  *newlen = (tmp - src) + un_len;
   if(contain_checksum)
     *newlen += BINLOG_CHECKSUM_LEN;
 
@@ -910,32 +922,34 @@ int row_log_event_uncompress(const Format_description_log_event *description_eve
   }
   else
   {
-	  new_dst = (char *)my_malloc(alloc_size, MYF(MY_WME));
+    new_dst = (char *)my_malloc(alloc_size, MYF(MY_WME));
     if (!new_dst)
       return 1;
 
     *is_malloc = true;
   }
 
-  /* copy the head*/
-	memcpy(new_dst, src , tmp - src);
-  /* uncompress the body */
-	if (binlog_buf_uncompress(tmp, new_dst + (tmp - src), len - (tmp - src), &un_len))
-	{
+  /* Copy the head. */
+  memcpy(new_dst, src , tmp - src);
+  /* Uncompress the body. */
+  if (binlog_buf_uncompress(tmp, new_dst + (tmp - src),
+                            len - (tmp - src), &un_len))
+  {
     if (*is_malloc)
-		  my_free(new_dst);
+      my_free(new_dst);
 
-		return 1;
-	}
+    return 1;
+  }
 
   new_dst[EVENT_TYPE_OFFSET] = type;
   int4store(new_dst + EVENT_LEN_OFFSET, *newlen);
   if(contain_checksum){
     ulong clear_len = *newlen - BINLOG_CHECKSUM_LEN;
-    int4store(new_dst + clear_len, my_checksum(0L, (uchar *)new_dst, clear_len));
+    int4store(new_dst + clear_len,
+              my_checksum(0L, (uchar *)new_dst, clear_len));
   }
   *dst = new_dst;
-	return 0;
+  return 0;
 }
 
 /**
@@ -945,27 +959,28 @@ int row_log_event_uncompress(const Format_description_log_event *description_eve
 uint32 binlog_get_uncompress_len(const char *buf)
 {
   DBUG_ASSERT((buf[0] & 0xe0) == 0x80);
-	uint32 lenlen = buf[0] & 0x07;
-	uint32 len = 0;
-	switch(lenlen)
-	{
-	case 1:
-		len = uchar(buf[1]);
-		break;
-	case 2:
-		len = uchar(buf[1]) << 8 | uchar(buf[2]);
-		break;
-	case 3:
-		len = uchar(buf[1]) << 16 | uchar(buf[2]) << 8 | uchar(buf[3]);
-		break;
-	case 4:
-		len = uchar(buf[1]) << 24 | uchar(buf[2]) << 16 | uchar(buf[3]) << 8 | uchar(buf[4]);
-		break;
-	default:
-		DBUG_ASSERT(lenlen >= 1 && lenlen <= 4);
-		break;
-	}
-	return len;
+  uint32 lenlen = buf[0] & 0x07;
+  uint32 len = 0;
+  switch(lenlen)
+  {
+  case 1:
+    len = uchar(buf[1]);
+    break;
+  case 2:
+    len = uchar(buf[1]) << 8 | uchar(buf[2]);
+    break;
+  case 3:
+    len = uchar(buf[1]) << 16 | uchar(buf[2]) << 8 | uchar(buf[3]);
+    break;
+  case 4:
+    len = uchar(buf[1]) << 24 | uchar(buf[2]) << 16 |
+          uchar(buf[3]) << 8 | uchar(buf[4]);
+    break;
+  default:
+    DBUG_ASSERT(lenlen >= 1 && lenlen <= 4);
+    break;
+  }
+  return len;
 }
 
 /**
@@ -979,22 +994,24 @@ uint32 binlog_get_uncompress_len(const char *buf)
 
    return zero if successful, others otherwise.
 */
-int binlog_buf_uncompress(const char *src, char *dst, uint32 len, uint32 *newlen)
+int binlog_buf_uncompress(const char *src, char *dst, uint32 len,
+                          uint32 *newlen)
 {
-	if((src[0] & 0x80) == 0)
-	{
-		return 1;
-	}
+  if((src[0] & 0x80) == 0)
+  {
+    return 1;
+  }
 
-	uint32 lenlen = src[0] & 0x07;
-    uLongf buflen  = *newlen;
-	if(uncompress((Bytef *)dst, &buflen, (const Bytef*)src + 1 + lenlen, len) != Z_OK)
-	{
-		return 1;
-	}
+  uint32 lenlen= src[0] & 0x07;
+  uLongf buflen= *newlen;
+  if(uncompress((Bytef *)dst, &buflen,
+                (const Bytef*)src + 1 + lenlen, len) != Z_OK)
+  {
+    return 1;
+  }
 
-	*newlen = (uint32)buflen;
-	return 0;
+  *newlen = (uint32)buflen;
+  return 0;
 }
 
 #ifndef MYSQL_CLIENT
@@ -1964,7 +1981,8 @@ Log_event* Log_event::read_log_event(const char* buf, uint event_len,
       ev  = new Query_log_event(buf, event_len, fdle, QUERY_EVENT);
       break;
     case QUERY_COMPRESSED_EVENT:
-      ev = new Query_compressed_log_event(buf, event_len, fdle, QUERY_COMPRESSED_EVENT);
+      ev = new Query_compressed_log_event(buf, event_len, fdle,
+                                          QUERY_COMPRESSED_EVENT);
       break;
     case LOAD_EVENT:
       ev = new Load_log_event(buf, event_len, fdle);
@@ -3149,32 +3167,32 @@ void Log_event::print_base64(IO_CACHE* file,
     }
     case UPDATE_ROWS_EVENT:
     case UPDATE_ROWS_EVENT_V1:
-      {
-        ev= new Update_rows_log_event((const char*) ptr, size,
-          glob_description_event);
-        break;
-      }
+    {
+      ev= new Update_rows_log_event((const char*) ptr, size,
+                                    glob_description_event);
+      break;
+    }
     case WRITE_ROWS_COMPRESSED_EVENT:
     case WRITE_ROWS_COMPRESSED_EVENT_V1:
-      {
-        ev= new Write_rows_compressed_log_event((const char*) ptr, size,
-          glob_description_event);
-        break;
-      }
+    {
+      ev= new Write_rows_compressed_log_event((const char*) ptr, size,
+                                              glob_description_event);
+      break;
+    }
     case UPDATE_ROWS_COMPRESSED_EVENT:
     case UPDATE_ROWS_COMPRESSED_EVENT_V1:
-      {
-        ev= new Update_rows_compressed_log_event((const char*) ptr, size,
-          glob_description_event);
-        break;
+    {
+      ev= new Update_rows_compressed_log_event((const char*) ptr, size,
+                                               glob_description_event);
+      break;
       }
     case DELETE_ROWS_COMPRESSED_EVENT:
     case DELETE_ROWS_COMPRESSED_EVENT_V1:
-      {
-        ev= new Delete_rows_compressed_log_event((const char*) ptr, size,
-          glob_description_event);
-        break;
-      }
+    {
+      ev= new Delete_rows_compressed_log_event((const char*) ptr, size,
+                                               glob_description_event);
+      break;
+    }
     default:
       break;
     }
@@ -3743,8 +3761,9 @@ Query_log_event::Query_log_event(THD* thd_arg, const char* query_arg,
 Query_compressed_log_event::Query_compressed_log_event(THD* thd_arg, const char* query_arg,
     ulong query_length, bool using_trans,
     bool direct, bool suppress_use, int errcode)
-    :Query_log_event(thd_arg, query_arg, query_length, using_trans, direct, suppress_use, errcode),
-    query_buf(0)
+    :Query_log_event(thd_arg, query_arg, query_length, using_trans, direct,
+                     suppress_use, errcode),
+     query_buf(0)
 {
 
 }
@@ -4154,17 +4173,22 @@ Query_log_event::Query_log_event(const char* buf, uint event_len,
   DBUG_VOID_RETURN;
 }
 
-Query_compressed_log_event::Query_compressed_log_event(const char *buf, uint event_len,
+Query_compressed_log_event::Query_compressed_log_event(const char *buf,
+      uint event_len,
       const Format_description_log_event
       *description_event,
       Log_event_type event_type)
-      :Query_log_event(buf, event_len, description_event, event_type),query_buf(NULL)
+      :Query_log_event(buf, event_len, description_event, event_type),
+       query_buf(NULL)
 {
   if(query)
   {
     uint32 un_len=binlog_get_uncompress_len(query);
-    query_buf = (Log_event::Byte*)my_malloc(ALIGN_SIZE(un_len + 1), MYF(MY_WME)); //reserve one byte for '\0'
-    if(query_buf && !binlog_buf_uncompress(query, (char *)query_buf, q_len, &un_len))
+    /* Reserve one byte for '\0' */
+    query_buf = (Log_event::Byte*)my_malloc(ALIGN_SIZE(un_len + 1),
+                                            MYF(MY_WME));
+    if(query_buf &&
+       !binlog_buf_uncompress(query, (char *)query_buf, q_len, &un_len))
     {
       query_buf[un_len] = 0;
       query = (const char *)query_buf;
@@ -9953,7 +9977,8 @@ void Rows_log_event::uncompress_buf()
   uchar *new_buf= (uchar*) my_malloc(ALIGN_SIZE(un_len), MYF(MY_WME));
   if (new_buf)
   {
-    if(!binlog_buf_uncompress((char *)m_rows_buf, (char *)new_buf, m_rows_cur - m_rows_buf, &un_len))
+    if(!binlog_buf_uncompress((char *)m_rows_buf, (char *)new_buf,
+                              m_rows_cur - m_rows_buf, &un_len))
     {
       my_free(m_rows_buf);
       m_rows_buf = new_buf;
@@ -10817,16 +10842,18 @@ bool Rows_log_event::write_compressed()
   uchar *m_rows_cur_tmp = m_rows_cur;
   bool ret = true;
   uint32 comlen, alloc_size;
-  comlen = alloc_size = binlog_get_compress_len(m_rows_cur_tmp - m_rows_buf_tmp);
+  comlen= alloc_size= binlog_get_compress_len(m_rows_cur_tmp - m_rows_buf_tmp);
   m_rows_buf = (uchar *)my_safe_alloca(alloc_size);
-  if(m_rows_buf && !binlog_buf_compress((const char *)m_rows_buf_tmp, (char *)m_rows_buf, m_rows_cur_tmp - m_rows_buf_tmp, &comlen))
+  if(m_rows_buf &&
+     !binlog_buf_compress((const char *)m_rows_buf_tmp, (char *)m_rows_buf,
+                          m_rows_cur_tmp - m_rows_buf_tmp, &comlen))
   {
-    m_rows_cur = comlen + m_rows_buf;
-    ret = Log_event::write();
+    m_rows_cur= comlen + m_rows_buf;
+    ret= Log_event::write();
   }
   my_safe_afree(m_rows_buf, alloc_size);
-  m_rows_buf = m_rows_buf_tmp;
-  m_rows_cur = m_rows_cur_tmp;
+  m_rows_buf= m_rows_buf_tmp;
+  m_rows_cur= m_rows_cur_tmp;
   return ret;
 }
 #endif
@@ -11744,18 +11771,19 @@ Write_rows_log_event::Write_rows_log_event(THD *thd_arg, TABLE *tbl_arg,
 {
 }
 
-Write_rows_compressed_log_event::Write_rows_compressed_log_event(THD *thd_arg, TABLE *tbl_arg,
+Write_rows_compressed_log_event::Write_rows_compressed_log_event(
+                                           THD *thd_arg,
+                                           TABLE *tbl_arg,
                                            ulong tid_arg,
                                            bool is_transactional)
   : Write_rows_log_event(thd_arg, tbl_arg, tid_arg, is_transactional)
 {
-  //m_type = log_bin_use_v1_row_events ? WRITE_ROWS_COMPRESSED_EVENT_V1 : WRITE_ROWS_COMPRESSED_EVENT;
   m_type = WRITE_ROWS_COMPRESSED_EVENT_V1;
 }
 
 bool Write_rows_compressed_log_event::write()
 {
-    return Rows_log_event::write_compressed();    
+  return Rows_log_event::write_compressed();
 }
 #endif
 
@@ -11770,7 +11798,8 @@ Write_rows_log_event::Write_rows_log_event(const char *buf, uint event_len,
 {
 }
 
-Write_rows_compressed_log_event::Write_rows_compressed_log_event(const char *buf, uint event_len,
+Write_rows_compressed_log_event::Write_rows_compressed_log_event(
+                                           const char *buf, uint event_len,
                                            const Format_description_log_event
                                            *description_event)
 : Write_rows_log_event(buf, event_len, description_event)
@@ -12269,21 +12298,25 @@ void Write_rows_log_event::print(FILE *file, PRINT_EVENT_INFO* print_event_info)
   Rows_log_event::print_helper(file, print_event_info, "Write_rows");
 }
 
-void Write_rows_compressed_log_event::print(FILE *file, PRINT_EVENT_INFO* print_event_info)
+void Write_rows_compressed_log_event::print(FILE *file,
+                                            PRINT_EVENT_INFO* print_event_info)
 {
   char *new_buf;
   ulong len;
   bool is_malloc = false;
-  if(!row_log_event_uncompress(glob_description_event, checksum_alg == BINLOG_CHECKSUM_ALG_CRC32,
+  if(!row_log_event_uncompress(glob_description_event,
+                               checksum_alg == BINLOG_CHECKSUM_ALG_CRC32,
                                temp_buf, NULL, 0, &is_malloc, &new_buf, &len))
   {
-      free_temp_buf();
-      register_temp_buf(new_buf, true);
-      Rows_log_event::print_helper(file, print_event_info, "Write_compressed_rows");
+    free_temp_buf();
+    register_temp_buf(new_buf, true);
+    Rows_log_event::print_helper(file, print_event_info,
+                                 "Write_compressed_rows");
   }
   else
   {
-      my_b_printf(&print_event_info->head_cache, "ERROR: uncompress write_compressed_rows failed\n");
+    my_b_printf(&print_event_info->head_cache,
+                "ERROR: uncompress write_compressed_rows failed\n");
   }
 }
 #endif
@@ -12471,7 +12504,7 @@ void issue_long_find_row_warning(Log_event_type type,
     if (delta > LONG_FIND_ROW_THRESHOLD)
     {
       rgi->set_long_find_row_note_printed();
-      const char* evt_type= type == DELETE_ROWS_EVENT ? " DELETE" : "n UPDATE";
+      const char* evt_type= LOG_EVENT_IS_DELETE_ROW(type) ? " DELETE" : "n UPDATE";
       const char* scan_type= is_index_scan ? "scanning an index" : "scanning the table";
 
       sql_print_information("The slave is applying a ROW event on behalf of a%s statement "
@@ -12807,18 +12840,18 @@ Delete_rows_log_event::Delete_rows_log_event(THD *thd_arg, TABLE *tbl_arg,
 {
 }
 
-Delete_rows_compressed_log_event::Delete_rows_compressed_log_event(THD *thd_arg, TABLE *tbl_arg,
+Delete_rows_compressed_log_event::Delete_rows_compressed_log_event(
+                                           THD *thd_arg, TABLE *tbl_arg,
                                            ulong tid_arg,
                                            bool is_transactional)
   : Delete_rows_log_event(thd_arg, tbl_arg, tid_arg, is_transactional)
 {
-  //m_type = log_bin_use_v1_row_events ? DELETE_ROWS_COMPRESSED_EVENT_V1 : DELETE_ROWS_COMPRESSED_EVENT;
-  m_type = DELETE_ROWS_COMPRESSED_EVENT_V1;
+  m_type= DELETE_ROWS_COMPRESSED_EVENT_V1;
 }
 
 bool Delete_rows_compressed_log_event::write()
 {
-    return Rows_log_event::write_compressed();    
+  return Rows_log_event::write_compressed();    
 }
 #endif /* #if !defined(MYSQL_CLIENT) */
 
@@ -12833,7 +12866,8 @@ Delete_rows_log_event::Delete_rows_log_event(const char *buf, uint event_len,
 {
 }
 
-Delete_rows_compressed_log_event::Delete_rows_compressed_log_event(const char *buf, uint event_len,
+Delete_rows_compressed_log_event::Delete_rows_compressed_log_event(
+                                           const char *buf, uint event_len,
                                            const Format_description_log_event
                                            *description_event)
   : Delete_rows_log_event(buf, event_len, description_event)
@@ -12944,16 +12978,19 @@ void Delete_rows_compressed_log_event::print(FILE *file,
   char *new_buf;
   ulong len;
   bool is_malloc = false;
-  if(!row_log_event_uncompress(glob_description_event, checksum_alg == BINLOG_CHECKSUM_ALG_CRC32,
+  if(!row_log_event_uncompress(glob_description_event,
+                               checksum_alg == BINLOG_CHECKSUM_ALG_CRC32,
                                temp_buf, NULL, 0, &is_malloc, &new_buf, &len))
   {
-      free_temp_buf();
-      register_temp_buf(new_buf, true);
-      Rows_log_event::print_helper(file, print_event_info, "Delete_compressed_rows");
+    free_temp_buf();
+    register_temp_buf(new_buf, true);
+    Rows_log_event::print_helper(file, print_event_info,
+                                 "Delete_compressed_rows");
   }
   else
   {
-      my_b_printf(&print_event_info->head_cache, "ERROR: uncompress delete_compressed_rows failed\n");
+    my_b_printf(&print_event_info->head_cache,
+                "ERROR: uncompress delete_compressed_rows failed\n");
   }
 }
 #endif
@@ -12988,13 +13025,12 @@ Update_rows_compressed_log_event::Update_rows_compressed_log_event(THD *thd_arg,
                                                                    bool is_transactional)
 : Update_rows_log_event(thd_arg, tbl_arg, tid, is_transactional)
 {
-  //m_type = log_bin_use_v1_row_events ? UPDATE_ROWS_COMPRESSED_EVENT_V1 : UPDATE_ROWS_COMPRESSED_EVENT;
   m_type = UPDATE_ROWS_COMPRESSED_EVENT_V1;
 }
 
 bool Update_rows_compressed_log_event::write()
 {
-    return Rows_log_event::write_compressed();
+  return Rows_log_event::write_compressed();
 }
 
 void Update_rows_log_event::init(MY_BITMAP const *cols)
@@ -13036,12 +13072,13 @@ Update_rows_log_event::Update_rows_log_event(const char *buf, uint event_len,
 {
 }
 
-Update_rows_compressed_log_event::Update_rows_compressed_log_event(const char *buf, uint event_len,
+Update_rows_compressed_log_event::Update_rows_compressed_log_event(
+                                             const char *buf, uint event_len,
                                              const Format_description_log_event
                                              *description_event)
   : Update_rows_log_event(buf, event_len, description_event)
 {
-   uncompress_buf();
+  uncompress_buf();
 }
 #endif
 
@@ -13200,17 +13237,20 @@ void Update_rows_compressed_log_event::print(FILE *file, PRINT_EVENT_INFO *print
 {
   char *new_buf;
   ulong len;
-  bool is_malloc = false;
-  if(!row_log_event_uncompress(glob_description_event, checksum_alg == BINLOG_CHECKSUM_ALG_CRC32,
+  bool is_malloc= false;
+  if(!row_log_event_uncompress(glob_description_event,
+                               checksum_alg == BINLOG_CHECKSUM_ALG_CRC32,
                                temp_buf, NULL, 0, &is_malloc, &new_buf, &len))
   {
-      free_temp_buf();
-      register_temp_buf(new_buf, true);
-      Rows_log_event::print_helper(file, print_event_info, "Update_compressed_rows");
+    free_temp_buf();
+    register_temp_buf(new_buf, true);
+    Rows_log_event::print_helper(file, print_event_info,
+                                 "Update_compressed_rows");
   }
   else
   {
-      my_b_printf(&print_event_info->head_cache, "ERROR: uncompress update_compressed_rows failed\n");
+    my_b_printf(&print_event_info->head_cache,
+                "ERROR: uncompress update_compressed_rows failed\n");
   }
 }
 #endif
diff --git a/sql/log_event.h b/sql/log_event.h
index 59e4dcd..7ac21e3 100644
--- a/sql/log_event.h
+++ b/sql/log_event.h
@@ -694,6 +694,11 @@ enum Log_event_type
 
   /*
     Compressed binlog event.
+
+    Note that the order between WRITE/UPDATE/DELETE events is significant;
+    this is so that we can convert from the compressed to the uncompressed
+    event type with (type-WRITE_ROWS_COMPRESSED_EVENT + WRITE_ROWS_EVENT)
+    and similar for _V1.
   */
   QUERY_COMPRESSED_EVENT = 165,
   WRITE_ROWS_COMPRESSED_EVENT_V1 = 166,
@@ -708,15 +713,52 @@ enum Log_event_type
   ENUM_END_EVENT /* end marker */
 };
 
-#define LOG_EVENT_IS_QUERY(type) (type == QUERY_EVENT || type == QUERY_COMPRESSED_EVENT)
-#define LOG_EVENT_IS_WRITE_ROW(type) (type == WRITE_ROWS_EVENT || type == WRITE_ROWS_EVENT_V1 || type == WRITE_ROWS_COMPRESSED_EVENT || type == WRITE_ROWS_COMPRESSED_EVENT_V1)
-#define LOG_EVENT_IS_UPDATE_ROW(type) (type == UPDATE_ROWS_EVENT || type == UPDATE_ROWS_EVENT_V1 || type == UPDATE_ROWS_COMPRESSED_EVENT || type == UPDATE_ROWS_COMPRESSED_EVENT_V1)
-#define LOG_EVENT_IS_DELETE_ROW(type) (type == DELETE_ROWS_EVENT || type == DELETE_ROWS_EVENT_V1 || type == DELETE_ROWS_COMPRESSED_EVENT || type == DELETE_ROWS_COMPRESSED_EVENT_V1)
-#define LOG_EVENT_IS_ROW_COMPRESSED(type) (type == WRITE_ROWS_COMPRESSED_EVENT || type == WRITE_ROWS_COMPRESSED_EVENT_V1 ||\
-                                            type == UPDATE_ROWS_COMPRESSED_EVENT || type == UPDATE_ROWS_COMPRESSED_EVENT_V1 ||\
-                                            type == DELETE_ROWS_COMPRESSED_EVENT || type == DELETE_ROWS_COMPRESSED_EVENT_V1) 
-#define LOG_EVENT_IS_ROW_V2(type) (type >= WRITE_ROWS_EVENT && type <= DELETE_ROWS_EVENT || \
-                                   type >= WRITE_ROWS_COMPRESSED_EVENT && type <= DELETE_ROWS_COMPRESSED_EVENT )
+static inline bool LOG_EVENT_IS_QUERY(enum Log_event_type type)
+{
+  return type == QUERY_EVENT || type == QUERY_COMPRESSED_EVENT;
+}
+
+
+static inline bool LOG_EVENT_IS_WRITE_ROW(enum Log_event_type type)
+{
+  return type == WRITE_ROWS_EVENT || type == WRITE_ROWS_EVENT_V1 ||
+    type == WRITE_ROWS_COMPRESSED_EVENT ||
+    type == WRITE_ROWS_COMPRESSED_EVENT_V1;
+}
+
+
+static inline bool LOG_EVENT_IS_UPDATE_ROW(enum Log_event_type type)
+{
+  return type == UPDATE_ROWS_EVENT || type == UPDATE_ROWS_EVENT_V1 ||
+    type == UPDATE_ROWS_COMPRESSED_EVENT ||
+    type == UPDATE_ROWS_COMPRESSED_EVENT_V1;
+}
+
+
+static inline bool LOG_EVENT_IS_DELETE_ROW(enum Log_event_type type)
+{
+  return type == DELETE_ROWS_EVENT || type == DELETE_ROWS_EVENT_V1 ||
+    type == DELETE_ROWS_COMPRESSED_EVENT ||
+    type == DELETE_ROWS_COMPRESSED_EVENT_V1;
+}
+
+
+static inline bool LOG_EVENT_IS_ROW_COMPRESSED(enum Log_event_type type)
+{
+  return type == WRITE_ROWS_COMPRESSED_EVENT ||
+    type == WRITE_ROWS_COMPRESSED_EVENT_V1 ||
+    type == UPDATE_ROWS_COMPRESSED_EVENT ||
+    type == UPDATE_ROWS_COMPRESSED_EVENT_V1 ||
+    type == DELETE_ROWS_COMPRESSED_EVENT ||
+    type == DELETE_ROWS_COMPRESSED_EVENT_V1;
+}
+
+
+static inline bool LOG_EVENT_IS_ROW_V2(enum Log_event_type type)
+{
+  return (type >= WRITE_ROWS_EVENT && type <= DELETE_ROWS_EVENT) ||
+    (type >= WRITE_ROWS_COMPRESSED_EVENT && type <= DELETE_ROWS_COMPRESSED_EVENT);
+}
 
 
 /*
diff --git a/sql/rpl_parallel.cc b/sql/rpl_parallel.cc
index eeb96d8..ec5ea72 100644
--- a/sql/rpl_parallel.cc
+++ b/sql/rpl_parallel.cc
@@ -645,7 +645,7 @@ is_group_ending(Log_event *ev, Log_event_type event_type)
 {
   if (event_type == XID_EVENT)
     return 1;
-  if (event_type == QUERY_EVENT)
+  if (event_type == QUERY_EVENT)  // COMMIT/ROLLBACK are never compressed
   {
     Query_log_event *qev = (Query_log_event *)ev;
     if (qev->is_commit())
@@ -2511,7 +2511,7 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
       {
         DBUG_ASSERT(rli->gtid_skip_flag == GTID_SKIP_TRANSACTION);
         if (typ == XID_EVENT ||
-            (typ == QUERY_EVENT &&
+            (typ == QUERY_EVENT &&  // COMMIT/ROLLBACK are never compressed
              (((Query_log_event *)ev)->is_commit() ||
               ((Query_log_event *)ev)->is_rollback())))
           rli->gtid_skip_flag= GTID_SKIP_NOT;
diff --git a/sql/slave.cc b/sql/slave.cc
index 62eb24d..8f0db28b 100644
--- a/sql/slave.cc
+++ b/sql/slave.cc
@@ -6153,8 +6153,10 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
   */
   case QUERY_COMPRESSED_EVENT:
     inc_pos= event_len;
-    if (query_event_uncompress(rli->relay_log.description_event_for_queue, checksum_alg == BINLOG_CHECKSUM_ALG_CRC32, 
-                                buf, new_buf_arr, sizeof(new_buf_arr), &is_malloc, (char **)&new_buf, &event_len))
+    if (query_event_uncompress(rli->relay_log.description_event_for_queue,
+                               checksum_alg == BINLOG_CHECKSUM_ALG_CRC32,
+                               buf, new_buf_arr, sizeof(new_buf_arr),
+                               &is_malloc, (char **)&new_buf, &event_len))
     {
       char  llbuf[22];
       error = ER_BINLOG_UNCOMPRESS_ERROR;
@@ -6175,8 +6177,10 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
   case DELETE_ROWS_COMPRESSED_EVENT_V1:
     inc_pos = event_len;
     {
-      if (row_log_event_uncompress(rli->relay_log.description_event_for_queue, checksum_alg == BINLOG_CHECKSUM_ALG_CRC32, 
-                                    buf, new_buf_arr, sizeof(new_buf_arr), &is_malloc, (char **)&new_buf, &event_len))
+      if (row_log_event_uncompress(rli->relay_log.description_event_for_queue,
+                                   checksum_alg == BINLOG_CHECKSUM_ALG_CRC32,
+                                   buf, new_buf_arr, sizeof(new_buf_arr),
+                                   &is_malloc, (char **)&new_buf, &event_len))
       {
         char  llbuf[22];
         error = ER_BINLOG_UNCOMPRESS_ERROR;
@@ -6207,7 +6211,7 @@ static int queue_event(Master_info* mi,const char* buf, ulong event_len)
     DBUG_EXECUTE_IF("kill_slave_io_after_2_events",
                     {
                       if (mi->dbug_do_disconnect &&
-                          (LOG_EVENT_IS_QUERY((uchar)buf[EVENT_TYPE_OFFSET]) ||
+                          (LOG_EVENT_IS_QUERY((Log_event_type)(uchar)buf[EVENT_TYPE_OFFSET]) ||
                            ((uchar)buf[EVENT_TYPE_OFFSET] == TABLE_MAP_EVENT))
                           && (--mi->dbug_event_counter == 0))
                       {
diff --git a/sql/sql_binlog.cc b/sql/sql_binlog.cc
index d92ac15..94619f1 100644
--- a/sql/sql_binlog.cc
+++ b/sql/sql_binlog.cc
@@ -82,6 +82,12 @@ static int check_event_type(int type, Relay_log_info *rli)
   case PRE_GA_WRITE_ROWS_EVENT:
   case PRE_GA_UPDATE_ROWS_EVENT:
   case PRE_GA_DELETE_ROWS_EVENT:
+  case WRITE_ROWS_COMPRESSED_EVENT_V1:
+  case UPDATE_ROWS_COMPRESSED_EVENT_V1:
+  case DELETE_ROWS_COMPRESSED_EVENT_V1:
+  case WRITE_ROWS_COMPRESSED_EVENT:
+  case UPDATE_ROWS_COMPRESSED_EVENT:
+  case DELETE_ROWS_COMPRESSED_EVENT:
     /*
       Row events are only allowed if a Format_description_event has
       already been seen.
diff --git a/sql/sql_class.cc b/sql/sql_class.cc
index a741a0f..2785c46 100644
--- a/sql/sql_class.cc
+++ b/sql/sql_class.cc
@@ -6961,7 +6961,6 @@ int THD::binlog_query(THD::enum_binlog_query_type qtype, char const *query_arg,
       flush the pending rows event if necessary.
     */
     {
-      Log_event* ev = NULL;
       int error = 0;
 
       /*


More information about the commits mailing list