[Commits] bzr commit into MariaDB 5.1, with Maria 1.5:maria branch (monty:2920)

Michael Widenius monty at askmonty.org
Mon Sep 6 02:25:46 EEST 2010


#At lp:maria based on revid:monty at askmonty.org-20100827141244-sm8o9u7fnvl9nfwa

 2920 Michael Widenius	2010-09-06
      Fixed bug that 'maria_read_log -a' didn't set max_trid when reparing tables.
      Fixed bug in Aria when replacing short keys with long keys and a key tree both overflow and underflow at same time.
      Fixed several bugs when generating recovery logs when using RGQ with replacing long keys with short keys and vice versa.
      Lots of new DBUG_ASSERT()'s
      Added more information to recovery log to make it easier to know from where log entry orginated.
      Introduced MARIA_PAGE->org_size that tells what the size of the page was in last log entry. This allows us to find out if all key changes for index page was logged.
      Small code cleanups:
      - Introduced _ma_log_key_changes() to log crc of key page changes
      - Added share->max_index_block_size as max size of data one can put in key block (block_size - KEYPAGE_CHECKSUM_SIZE)
        This will later simplify adding a directory to index pages.
      - Write page number instead of page postition to DBUG log
      modified:
        mysql-test/lib/v1/mysql-test-run.pl
        sql/mysqld.cc
        storage/maria/ha_maria.cc
        storage/maria/ma_check.c
        storage/maria/ma_delete.c
        storage/maria/ma_key_recover.c
        storage/maria/ma_key_recover.h
        storage/maria/ma_loghandler.h
        storage/maria/ma_open.c
        storage/maria/ma_page.c
        storage/maria/ma_recovery.c
        storage/maria/ma_rt_key.c
        storage/maria/ma_rt_split.c
        storage/maria/ma_unique.c
        storage/maria/ma_write.c
        storage/maria/maria_def.h
        storage/maria/trnman.c

per-file messages:
  mysql-test/lib/v1/mysql-test-run.pl
    Use --general-log instead of --log to disable warning when using RQG
  sql/mysqld.cc
    If we have already sent ok to client when we get an error, log this to stderr
    Don't disable option --log-output if CSV engine is not supported.
  storage/maria/ha_maria.cc
    Log queries to recovery log also in LOCK TABLES
  storage/maria/ma_check.c
    If param->max_trid is set, use this value instead of max_trid_in_system().
    This is used by recovery to set max_trid to max seen trid so far.
    keyinfo->block_length - KEYPAGE_CHECKSUM_SIZE -> max_index_block_size (Style optimization)
  storage/maria/ma_delete.c
    Mark tables crashed early
    Write page number instead of page position to debug log.
    Added parameter to ma_log_delete() and ma_log_prefix() that is logged so that we can find where wrong log entries where generated.
    Fixed bug where a page was not proplerly written when same key tree had both an overflow and underflow when deleting a key.
    keyinfo->block_length - KEYPAGE_CHECKSUM_SIZE => max_index_block_size (Style optimization)
    ma_log_delete() now has extra parameter of how many bytes from end of page should be appended to log for page (for page overflows)
  storage/maria/ma_key_recover.c
    Added extra parameter to ma_log_prefix() to indicate what caused log entry.
    Update MARIA_PAGE->org_size when logging info about page.
    Much more DBUG_ASSERT()'s.
    Fix some bugs in maria_log_add() to handle page overflows.
    Added _ma_log_key_changes() to log crc of key page changes.
    If EXTRA_STORE_FULL_PAGE_IN_KEY_CHANGES is defines, log the resulting pages to log so one can trivally
    see how the resulting page should have looked like (for errors in CRC values)
  storage/maria/ma_key_recover.h
    Added _ma_log_key_changes() which is only called if EXTRA_DEBUG_KEY_CHANGES is defined.
    Updated function prototypes.
  storage/maria/ma_loghandler.h
    Added more values to en_key_debug, to get more exact location where things went wrong when logging to recovery log.
  storage/maria/ma_open.c
    Initialize share->max_index_block_size
  storage/maria/ma_page.c
    Added updating and testing of MARIA_PAGE->org_size
    Write page number instead of page postition to DBUG log
    Generate error if we read page with wrong data.
    Removed wrong assert: key_del_current != share->state.key_del.
    Simplify _ma_log_compact_keypage()
  storage/maria/ma_recovery.c
    Set param.max_trid to max seen trid before running repair table (used for alter table to create index)
  storage/maria/ma_rt_key.c
    Update call to _ma_log_delete()
  storage/maria/ma_rt_split.c
    Use _ma_log_key_changes()
    Update MARIA_PAGE->org_size
  storage/maria/ma_unique.c
    Remove casts
  storage/maria/ma_write.c
    keyinfo->block_length - KEYPAGE_CHECKSUM_SIZE => share->max_index_block_length.
    Updated calls to _ma_log_prefix()
    Changed code to use _ma_log_key_changes()
    Update ma_page->org_size
    Fixed bug in _ma_log_split() for pages that overflow
    Added KEY_OP_DEBUG logging to functions
    Log KEYPAGE_FLAG in all log entries
  storage/maria/maria_def.h
    Added SHARE->max_index_block_size
    Added MARIA_PAGE->org_size
  storage/maria/trnman.c
    Reset flags for new transaction.
=== modified file 'mysql-test/lib/v1/mysql-test-run.pl'
--- a/mysql-test/lib/v1/mysql-test-run.pl	2010-07-30 07:45:27 +0000
+++ b/mysql-test/lib/v1/mysql-test-run.pl	2010-09-05 23:25:44 +0000
@@ -2959,7 +2959,7 @@ sub run_benchmarks ($) {
 
   if ( ! $benchmark )
   {
-    mtr_add_arg($args, "--log");
+    mtr_add_arg($args, "--general-log");
     mtr_run("$glob_mysql_bench_dir/run-all-tests", $args, "", "", "", "");
     # FIXME check result code?!
   }

=== modified file 'sql/mysqld.cc'
--- a/sql/mysqld.cc	2010-08-27 14:12:44 +0000
+++ b/sql/mysqld.cc	2010-09-05 23:25:44 +0000
@@ -3070,7 +3070,19 @@ int my_message_sql(uint error, const cha
     }
     else
     {
-      if (! thd->main_da.is_error())            // Return only first message
+      if (thd->main_da.is_ok())
+      {
+        /*
+          Client has already got ok packet; Write message to error log.
+          This could happen if we get an error in implicit commit.
+          This should never happen in normal operation, so lets
+          assert here in debug builds.
+        */
+        DBUG_ASSERT(0);
+        func= sql_print_error;
+        MyFlags|= ME_NOREFRESH;
+      }
+      else if (! thd->main_da.is_error()) // Return only first message
       {
         thd->main_da.set_error_status(thd, error, str);
       }
@@ -4185,7 +4197,6 @@ a file name for --log-bin-index option",
     unireg_abort(1);
   }
 
-#ifdef WITH_CSV_STORAGE_ENGINE
   if (opt_bootstrap)
     log_output_options= LOG_FILE;
   else
@@ -4219,10 +4230,6 @@ a file name for --log-bin-index option",
     logger.set_handlers(LOG_FILE, opt_slow_log ? log_output_options:LOG_NONE,
                         opt_log ? log_output_options:LOG_NONE);
   }
-#else
-  logger.set_handlers(LOG_FILE, opt_slow_log ? LOG_FILE:LOG_NONE,
-                      opt_log ? LOG_FILE:LOG_NONE);
-#endif
 
   /*
     Check that the default storage engine is actually available.
@@ -6297,13 +6304,11 @@ each time the SQL thread starts.",
    "Log some extra information to update log. Please note that this option "
    "is deprecated; see --log-short-format option.",
    0, 0, 0, GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0},
-#ifdef WITH_CSV_STORAGE_ENGINE
   {"log-output", OPT_LOG_OUTPUT,
    "Syntax: log-output[=value[,value...]], where \"value\" could be TABLE, "
    "FILE or NONE.",
    &log_output_str, &log_output_str, 0,
    GET_STR, OPT_ARG, 0, 0, 0, 0, 0, 0},
-#endif
   {"log-queries-not-using-indexes", OPT_LOG_QUERIES_NOT_USING_INDEXES,
    "Log queries that are executed without benefit of any index to the slow log if it is open.",
    &opt_log_queries_not_using_indexes, &opt_log_queries_not_using_indexes,
@@ -8631,7 +8636,6 @@ mysqld_get_one_option(int optid,
     WARN_DEPRECATED(NULL, "7.0", "--log_slow_queries", "'--slow_query_log'/'--log-slow-file'");
     opt_slow_log= 1;
     break;
-#ifdef WITH_CSV_STORAGE_ENGINE
   case  OPT_LOG_OUTPUT:
   {
     if (!argument || !argument[0])
@@ -8649,7 +8653,6 @@ mysqld_get_one_option(int optid,
   }
     break;
   }
-#endif
   case OPT_EVENT_SCHEDULER:
 #ifndef HAVE_EVENT_SCHEDULER
     sql_perror("Event scheduler is not supported in embedded build.");

=== modified file 'storage/maria/ha_maria.cc'
--- a/storage/maria/ha_maria.cc	2010-08-24 18:15:05 +0000
+++ b/storage/maria/ha_maria.cc	2010-09-05 23:25:44 +0000
@@ -750,8 +750,11 @@ static int maria_create_trn_for_mysql(MA
                                    thd->query_length());
   }
   else
+  {
     DBUG_PRINT("info", ("lock_type: %d  trnman_flags: %u",
-                        info->lock_type, trnman_get_flags(trn))); /* QQ */
+                        info->lock_type, trnman_get_flags(trn)));
+  }
+  
 #endif
   DBUG_RETURN(0);
 }
@@ -2347,6 +2350,12 @@ int ha_maria::extra(enum ha_extra_functi
 
 int ha_maria::reset(void)
 {
+  if (file->trn)
+  {
+    TRN *trn= file->trn;
+    /* Next statement is a new statement. Ensure it's logged */
+    trnman_set_flags(trn, trnman_get_flags(trn) & ~TRN_STATE_INFO_LOGGED);
+  }
   return maria_reset(file);
 }
 

=== modified file 'storage/maria/ma_check.c'
--- a/storage/maria/ma_check.c	2010-08-12 16:46:36 +0000
+++ b/storage/maria/ma_check.c	2010-09-05 23:25:44 +0000
@@ -136,11 +136,13 @@ void maria_chk_init_for_check(HA_CHECK *
     Set up transaction handler so that we can see all rows. When rows is read
     we will check the found id against param->max_tried
   */
-  if (!ma_control_file_inited())
-    param->max_trid= 0;                 /* Give warning for first trid found */
-  else
-    param->max_trid= max_trid_in_system();
-
+  if (param->max_trid == 0)
+  {
+    if (!ma_control_file_inited())
+      param->max_trid= 0;      /* Give warning for first trid found */
+    else
+      param->max_trid= max_trid_in_system();
+  }
   maria_ignore_trids(info);
 }
 
@@ -867,7 +869,7 @@ static int chk_index(HA_CHECK *param, MA
                           llstr(anc_page->pos, llbuff));
   }
 
-  if (anc_page->size > (uint) keyinfo->block_length - KEYPAGE_CHECKSUM_SIZE)
+  if (anc_page->size > share->max_index_block_size)
   {
     _ma_check_print_error(param,
                           "Page at %s has impossible (too big) pagelength",
@@ -2325,11 +2327,13 @@ static int initialize_variables_for_repa
   }
 
   /* Set up transaction handler so that we can see all rows */
-  if (!ma_control_file_inited())
-    param->max_trid= 0;                 /* Give warning for first trid found */
-  else
-    param->max_trid= max_trid_in_system();
-
+  if (param->max_trid == 0)
+  {
+    if (!ma_control_file_inited())
+      param->max_trid= 0;      /* Give warning for first trid found */
+    else
+      param->max_trid= max_trid_in_system();
+  }
   maria_ignore_trids(info);
   /* Don't write transid's during repair */
   maria_versioning(info, 0);
@@ -5609,7 +5613,7 @@ static int sort_insert_key(MARIA_SORT_PA
   a_length+=t_length;
   _ma_store_page_used(share, anc_buff, a_length);
   key_block->end_pos+=t_length;
-  if (a_length <= (uint) (keyinfo->block_length - KEYPAGE_CHECKSUM_SIZE))
+  if (a_length <= share->max_index_block_size)
   {
     MARIA_KEY tmp_key2;
     tmp_key2.data= key_block->lastkey;

=== modified file 'storage/maria/ma_delete.c'
--- a/storage/maria/ma_delete.c	2010-03-10 10:32:14 +0000
+++ b/storage/maria/ma_delete.c	2010-09-05 23:25:44 +0000
@@ -1,4 +1,5 @@
 /* Copyright (C) 2006 MySQL AB & MySQL Finland AB & TCX DataKonsult AB
+   Copyright (C) 2009-2010 Monty Program Ab
 
    This program is free software; you can redistribute it and/or modify
    it under the terms of the GNU General Public License as published by
@@ -180,7 +181,11 @@ my_bool _ma_ck_delete(MARIA_HA *info, MA
     key->data= key_buff;
   }
 
-  res= _ma_ck_real_delete(info, key, &new_root);
+  if ((res= _ma_ck_real_delete(info, key, &new_root)))
+  {
+    /* We have to mark the table crashed before unpin_all_pages() */
+    maria_mark_crashed(info);
+  }
 
   key->data= save_key_data;
   if (!res && share->now_transactional)
@@ -218,7 +223,8 @@ my_bool _ma_ck_real_delete(register MARI
     my_errno=ENOMEM;
     DBUG_RETURN(1);
   }
-  DBUG_PRINT("info",("root_page: %ld", (long) old_root));
+  DBUG_PRINT("info",("root_page: %lu",
+                     (ulong) (old_root / keyinfo->block_length)));
   if (_ma_fetch_keypage(&page, info, keyinfo, old_root,
                         PAGECACHE_LOCK_WRITE, DFLT_INIT_HITS, root_buff, 0))
   {
@@ -435,7 +441,8 @@ static int d_search(MARIA_HA *info, MARI
     */
     if (share->now_transactional &&
         _ma_log_delete(anc_page, s_temp.key_pos,
-                       s_temp.changed_length, s_temp.move_length))
+                       s_temp.changed_length, s_temp.move_length,
+                       0, KEY_OP_DEBUG_LOG_DEL_CHANGE_1))
       DBUG_RETURN(-1);
 
     if (!nod_flag)
@@ -458,7 +465,7 @@ static int d_search(MARIA_HA *info, MARI
   }
   if (ret_value >0)
   {
-    save_flag=1;
+    save_flag= 2;
     if (ret_value == 1)
       ret_value= underflow(info, keyinfo, anc_page, &leaf_page, keypos);
     else
@@ -474,17 +481,20 @@ static int d_search(MARIA_HA *info, MARI
       ret_value= _ma_insert(info, key, anc_page, keypos,
                             last_key.data,
                             (MARIA_PAGE*) 0, (uchar*) 0, (my_bool) 0);
+
+      if (_ma_write_keypage(&leaf_page, PAGECACHE_LOCK_LEFT_WRITELOCKED,
+                            DFLT_INIT_HITS))
+        ret_value= -1;
     }
   }
-  if (ret_value == 0 && anc_page->size >
-      (uint) (keyinfo->block_length - KEYPAGE_CHECKSUM_SIZE))
+  if (ret_value == 0 && anc_page->size > share->max_index_block_size)
   {
     /* parent buffer got too big ; We have to split the page */
-    save_flag=1;
+    save_flag= 3;
     ret_value= _ma_split_page(info, key, anc_page,
-                              (uint) (keyinfo->block_length -
-                                      KEYPAGE_CHECKSUM_SIZE),
+                              share->max_index_block_size,
                               (uchar*) 0, 0, 0, lastkey, 0) | 2;
+    DBUG_ASSERT(anc_page->org_size == anc_page->size);
   }
   if (save_flag && ret_value != 1)
   {
@@ -550,7 +560,8 @@ static int del(MARIA_HA *info, MARIA_KEY
   MARIA_KEY ret_key;
   MARIA_PAGE next_page;
   DBUG_ENTER("del");
-  DBUG_PRINT("enter",("leaf_page: %ld  keypos: 0x%lx", (long) leaf_page,
+  DBUG_PRINT("enter",("leaf_page: %lu  keypos: 0x%lx",
+                      (ulong) (leaf_page->pos / share->block_size),
 		      (ulong) keypos));
   DBUG_DUMP("leaf_buff", leaf_page->buff, leaf_page->size);
 
@@ -587,11 +598,10 @@ static int del(MARIA_HA *info, MARIA_KEY
 	  ret_value= underflow(info, keyinfo, leaf_page, &next_page,
                                endpos);
 	  if (ret_value == 0 && leaf_page->size >
-              (uint) (keyinfo->block_length - KEYPAGE_CHECKSUM_SIZE))
+              share->max_index_block_size)
 	  {
 	    ret_value= (_ma_split_page(info, key, leaf_page,
-                                       (uint) (keyinfo->block_length -
-                                               KEYPAGE_CHECKSUM_SIZE),
+                                       share->max_index_block_size,
                                        (uchar*) 0, 0, 0,
                                        ret_key_buff, 0) | 2);
 	  }
@@ -708,8 +718,7 @@ err:
 
    @fn    underflow()
    @param anc_buff        Anchestor page data
-   @param leaf_page       Page number of leaf page
-   @param leaf_buff       Leaf page (page that underflowed)
+   @param leaf_page       Leaf page (page that underflowed)
    @param leaf_page_link  Pointer to pin information about leaf page
    @param keypos          Position after current key in anc_buff
 
@@ -743,7 +752,8 @@ static int underflow(MARIA_HA *info, MAR
   MARIA_KEY tmp_key, anc_key, leaf_key;
   MARIA_PAGE next_page;
   DBUG_ENTER("underflow");
-  DBUG_PRINT("enter",("leaf_page: %ld  keypos: 0x%lx",(long) leaf_page->pos,
+  DBUG_PRINT("enter",("leaf_page: %lu  keypos: 0x%lx",
+                      (ulong) (leaf_page->pos / share->block_size),
 		      (ulong) keypos));
   DBUG_DUMP("anc_buff", anc_page->buff,  anc_page->size);
   DBUG_DUMP("leaf_buff", leaf_page->buff, leaf_page->size);
@@ -841,7 +851,7 @@ static int underflow(MARIA_HA *info, MAR
     anc_page->size= new_anc_length;
     page_store_size(share, anc_page);
 
-    if (buff_length <= (uint) (keyinfo->block_length - KEYPAGE_CHECKSUM_SIZE))
+    if (buff_length <= share->max_index_block_size)
     {
       /* All keys fitted into one page */
       page_mark_changed(info, &next_page);
@@ -854,10 +864,15 @@ static int underflow(MARIA_HA *info, MAR
 
       if (share->now_transactional)
       {
-        /* Log changes to parent page */
+        /*
+          Log changes to parent page. Note that this page may have been
+          temporarily bigger than block_size.
+         */
         if (_ma_log_delete(anc_page, key_deleted.key_pos,
                            key_deleted.changed_length,
-                           key_deleted.move_length))
+                           key_deleted.move_length,
+                           anc_length - anc_page->org_size,
+                           KEY_OP_DEBUG_LOG_DEL_CHANGE_2))
           goto err;
         /*
           Log changes to leaf page. Data for leaf page is in leaf_buff
@@ -986,7 +1001,8 @@ static int underflow(MARIA_HA *info, MAR
         */
         DBUG_ASSERT(new_buff_length <= next_buff_length);
         if (_ma_log_prefix(&next_page, key_inserted.changed_length,
-                           (int) (new_buff_length - next_buff_length)))
+                           (int) (new_buff_length - next_buff_length),
+                           KEY_OP_DEBUG_LOG_PREFIX_1))
           goto err;
       }
       page_mark_changed(info, &next_page);
@@ -1044,11 +1060,19 @@ static int underflow(MARIA_HA *info, MAR
 
   /* Remember for logging how many bytes of leaf_buff that are not changed */
   DBUG_ASSERT((int) key_inserted.changed_length >= key_inserted.move_length);
-  unchanged_leaf_length= leaf_length - (key_inserted.changed_length -
-                                        key_inserted.move_length);
+  unchanged_leaf_length= (leaf_length - p_length -
+                          (key_inserted.changed_length -
+                           key_inserted.move_length));
 
   new_buff_length= buff_length + leaf_length - p_length + t_length;
 
+#ifdef EXTRA_DEBUG
+  /* Ensure that unchanged_leaf_length is correct */
+  DBUG_ASSERT(bcmp(next_page.buff + new_buff_length - unchanged_leaf_length,
+                   leaf_buff + leaf_length - unchanged_leaf_length,
+                   unchanged_leaf_length) == 0);
+#endif
+
   page_flag= next_page.flag | leaf_page->flag;
   if (anc_key.flag & (SEARCH_USER_KEY_HAS_TRANSID |
                        SEARCH_PAGE_KEY_HAS_TRANSID))
@@ -1069,8 +1093,7 @@ static int underflow(MARIA_HA *info, MAR
   anc_page->size= new_anc_length;
   page_store_size(share, anc_page);
 
-  if (new_buff_length <= (uint) (keyinfo->block_length -
-                                 KEYPAGE_CHECKSUM_SIZE))
+  if (new_buff_length <= share->max_index_block_size)
   {
     /* All keys fitted into one page */
     page_mark_changed(info, leaf_page);
@@ -1079,10 +1102,14 @@ static int underflow(MARIA_HA *info, MAR
 
     if (share->now_transactional)
     {
-      /* Log changes to parent page */
+      /*
+        Log changes to parent page. Note that this page may have been
+        temporarily bigger than block_size.
+      */
       if (_ma_log_delete(anc_page, key_deleted.key_pos,
-                         key_deleted.changed_length, key_deleted.move_length))
-
+                         key_deleted.changed_length, key_deleted.move_length,
+                         anc_length - anc_page->org_size,
+                         KEY_OP_DEBUG_LOG_DEL_CHANGE_3))
         goto err;
       /*
         Log changes to next page. Data for leaf page is in buff
@@ -1192,8 +1219,10 @@ static int underflow(MARIA_HA *info, MAR
         This contains original data with new data added first
       */
       DBUG_ASSERT(leaf_length <= new_leaf_length);
+      DBUG_ASSERT(new_leaf_length >= unchanged_leaf_length);
       if (_ma_log_prefix(leaf_page, new_leaf_length - unchanged_leaf_length,
-                         (int) (new_leaf_length - leaf_length)))
+                         (int) (new_leaf_length - leaf_length),
+                         KEY_OP_DEBUG_LOG_PREFIX_2))
         goto err;
       /*
         Log changes to next page
@@ -1395,7 +1424,9 @@ static uint remove_key(MARIA_KEYDEF *key
 ****************************************************************************/
 
 /**
-   @brief log entry where some parts are deleted and some things are changed
+   @brief
+   log entry where some parts are deleted and some things are changed
+   and some data could be added last.
 
    @fn _ma_log_delete()
    @param info		  Maria handler
@@ -1404,74 +1435,148 @@ static uint remove_key(MARIA_KEYDEF *key
    @param key_pos         Start of change area
    @param changed_length  How many bytes where changed at key_pos
    @param move_length     How many bytes where deleted at key_pos
+   @param append_length	  Length of data added last
+		          This is taken from end of ma_page->buff
 
+   This is mainly used when a key is deleted. The append happens
+   when we delete a key from a page with data > block_size kept in
+   memory and we have to add back the data that was stored > block_size
 */
 
 my_bool _ma_log_delete(MARIA_PAGE *ma_page, const uchar *key_pos,
-                       uint changed_length, uint move_length)
+                       uint changed_length, uint move_length,
+                       uint append_length,
+                       enum en_key_debug debug_marker __attribute__((unused)))
 {
   LSN lsn;
-  uchar log_data[FILEID_STORE_SIZE + PAGE_STORE_SIZE + 9 + 7], *log_pos;
-  LEX_CUSTRING log_array[TRANSLOG_INTERNAL_PARTS + 3];
-  uint translog_parts;
+  uchar log_data[FILEID_STORE_SIZE + PAGE_STORE_SIZE + 2 + 2 + 3 + 3 + 6 + 3 + 7];
+  uchar *log_pos;
+  LEX_CUSTRING log_array[TRANSLOG_INTERNAL_PARTS + 7];
+  uint translog_parts, current_size, extra_length;
   uint offset= (uint) (key_pos - ma_page->buff);
   MARIA_HA *info= ma_page->info;
   MARIA_SHARE *share= info->s;
   my_off_t page;
   DBUG_ENTER("_ma_log_delete");
   DBUG_PRINT("enter", ("page: %lu  changed_length: %u  move_length: %d",
-                       (ulong) ma_page->pos, changed_length, move_length));
+                       (ulong) (ma_page->pos / share->block_size),
+                       changed_length, move_length));
   DBUG_ASSERT(share->now_transactional && move_length);
   DBUG_ASSERT(offset + changed_length <= ma_page->size);
+  DBUG_ASSERT(ma_page->org_size - move_length + append_length == ma_page->size);
+  DBUG_ASSERT(move_length <= ma_page->org_size - share->keypage_header);
 
   /* Store address of new root page */
   page= ma_page->pos / share->block_size;
   page_store(log_data + FILEID_STORE_SIZE, page);
   log_pos= log_data+ FILEID_STORE_SIZE + PAGE_STORE_SIZE;
+  current_size= ma_page->org_size;
+
+#ifdef EXTRA_DEBUG_KEY_CHANGES
+  *log_pos++= KEY_OP_DEBUG;
+  *log_pos++= debug_marker;
+#endif
+
+  /* Store keypage_flag */
+  *log_pos++= KEY_OP_SET_PAGEFLAG;
+  *log_pos++= ma_page->buff[KEYPAGE_TRANSFLAG_OFFSET];
+
   log_pos[0]= KEY_OP_OFFSET;
   int2store(log_pos+1, offset);
-  log_pos[3]= KEY_OP_SHIFT;
-  int2store(log_pos+4, -(int) move_length);
-  log_pos+= 6;
-  translog_parts= 1;
+  log_pos+= 3;
+  translog_parts= TRANSLOG_INTERNAL_PARTS + 1;
+  extra_length= 0;
+
   if (changed_length)
   {
+    if (offset + changed_length >= share->max_index_block_size)
+    {
+      changed_length= share->max_index_block_size - offset;
+      move_length= 0;                           /* Nothing to move */
+      current_size= share->max_index_block_size;
+    }
+
     log_pos[0]= KEY_OP_CHANGE;
     int2store(log_pos+1, changed_length);
     log_pos+= 3;
-    translog_parts= 2;
-    log_array[TRANSLOG_INTERNAL_PARTS + 1].str=    ma_page->buff + offset;
-    log_array[TRANSLOG_INTERNAL_PARTS + 1].length= changed_length;
+    log_array[translog_parts].str=    ma_page->buff + offset;
+    log_array[translog_parts].length= changed_length;
+    translog_parts++;
+
+    /* We only have to move things after offset+changed_length */
+    offset+= changed_length;
   }
 
-#ifdef EXTRA_DEBUG_KEY_CHANGES
+  log_array[TRANSLOG_INTERNAL_PARTS + 0].str=    log_data;
+  log_array[TRANSLOG_INTERNAL_PARTS + 0].length= (uint) (log_pos - log_data);
+
+  if (move_length)
   {
-    int page_length= ma_page->size;
-    ha_checksum crc;
-    crc= my_checksum(0, ma_page->buff + LSN_STORE_SIZE,
-                     page_length - LSN_STORE_SIZE);
-    log_pos[0]= KEY_OP_CHECK;
-    int2store(log_pos+1, page_length);
-    int4store(log_pos+3, crc);
-
-    log_array[TRANSLOG_INTERNAL_PARTS + translog_parts].str= log_pos;
-    log_array[TRANSLOG_INTERNAL_PARTS + translog_parts].length= 7;
-    changed_length+= 7;
+    uint log_length;
+    if (offset + move_length < share->max_index_block_size)
+    {
+      /*
+        Move down things that is on page.
+        page_offset in apply_redo_inxed() will be at original offset
+        + changed_length.
+      */
+      log_pos[0]= KEY_OP_SHIFT;
+      int2store(log_pos+1, - (int) move_length);
+      log_length= 3;
+      current_size-= move_length;
+    }
+    else
+    {
+      /* Delete to end of page */
+      uint tmp= current_size - offset;
+      current_size= offset;
+      log_pos[0]= KEY_OP_DEL_SUFFIX;
+      int2store(log_pos+1, tmp);
+      log_length= 3;
+    }
+    log_array[translog_parts].str=    log_pos;
+    log_array[translog_parts].length= log_length;
     translog_parts++;
+    log_pos+= log_length;
+    extra_length+= log_length;
   }
-#endif
 
-  log_array[TRANSLOG_INTERNAL_PARTS + 0].str=    log_data;
-  log_array[TRANSLOG_INTERNAL_PARTS + 0].length= (uint) (log_pos - log_data);
+  if (current_size != ma_page->size &&
+      current_size != share->max_index_block_size)
+  {
+    /* Append data that didn't fit on the page before */
+    uint length= (min(ma_page->size, share->max_index_block_size) -
+                  current_size);
+    uchar *data= ma_page->buff + current_size;
+
+    DBUG_ASSERT(length <= append_length);
+
+    log_pos[0]= KEY_OP_ADD_SUFFIX;
+    int2store(log_pos+1, length);
+    log_array[translog_parts].str=        log_pos;
+    log_array[translog_parts].length=     3;
+    log_array[translog_parts + 1].str=    data;
+    log_array[translog_parts + 1].length= length;
+    log_pos+= 3;
+    translog_parts+= 2;
+    current_size+= length;
+    extra_length+= 3 + length;
+  }
+
+  _ma_log_key_changes(ma_page,
+                      log_array + translog_parts,
+                      log_pos, &extra_length, &translog_parts);
+  /* Remember new page length for future log entires for same page */
+  ma_page->org_size= current_size;
 
   if (translog_write_record(&lsn, LOGREC_REDO_INDEX,
                             info->trn, info,
                             (translog_size_t)
-                            log_array[TRANSLOG_INTERNAL_PARTS + 0].length +
-                            changed_length,
-                            TRANSLOG_INTERNAL_PARTS + translog_parts,
+                            log_array[TRANSLOG_INTERNAL_PARTS].length +
+                            changed_length + extra_length, translog_parts,
                             log_array, log_data, NULL))
     DBUG_RETURN(1);
+
   DBUG_RETURN(0);
 }
 

=== modified file 'storage/maria/ma_key_recover.c'
--- a/storage/maria/ma_key_recover.c	2010-08-23 09:52:57 +0000
+++ b/storage/maria/ma_key_recover.c	2010-09-05 23:25:44 +0000
@@ -312,24 +312,33 @@ my_bool write_hook_for_undo_key_delete(e
 */
 
 my_bool _ma_log_prefix(MARIA_PAGE *ma_page, uint changed_length,
-                       int move_length)
+                       int move_length,
+                       enum en_key_debug debug_marker __attribute__((unused)))
 {
   uint translog_parts;
   LSN lsn;
-  uchar log_data[FILEID_STORE_SIZE + PAGE_STORE_SIZE + 7 + 7 + 2], *log_pos;
+  uchar log_data[FILEID_STORE_SIZE + PAGE_STORE_SIZE + 7 + 7 + 2 + 2];
+  uchar *log_pos;
   uchar *buff= ma_page->buff;
-  LEX_CUSTRING log_array[TRANSLOG_INTERNAL_PARTS + 3];
+  LEX_CUSTRING log_array[TRANSLOG_INTERNAL_PARTS + 4];
   pgcache_page_no_t page;
   MARIA_HA *info= ma_page->info;
   DBUG_ENTER("_ma_log_prefix");
   DBUG_PRINT("enter", ("page: %lu  changed_length: %u  move_length: %d",
                         (ulong) ma_page->pos, changed_length, move_length));
 
+  DBUG_ASSERT(ma_page->size == ma_page->org_size + move_length);
+
   page= ma_page->pos / info->s->block_size;
   log_pos= log_data + FILEID_STORE_SIZE;
   page_store(log_pos, page);
   log_pos+= PAGE_STORE_SIZE;
 
+#ifdef EXTRA_DEBUG_KEY_CHANGES
+  (*log_pos++)= KEY_OP_DEBUG;
+  (*log_pos++)= debug_marker;
+#endif
+
   /* Store keypage_flag */
   *log_pos++= KEY_OP_SET_PAGEFLAG;
   *log_pos++= buff[KEYPAGE_TRANSFLAG_OFFSET];
@@ -373,21 +382,11 @@ my_bool _ma_log_prefix(MARIA_PAGE *ma_pa
     translog_parts= 2;
   }
 
-#ifdef EXTRA_DEBUG_KEY_CHANGES
-  {
-    int page_length= ma_page->size;
-    ha_checksum crc;
-    crc= my_checksum(0, buff + LSN_STORE_SIZE, page_length - LSN_STORE_SIZE);
-    log_pos[0]= KEY_OP_CHECK;
-    int2store(log_pos+1, page_length);
-    int4store(log_pos+3, crc);
-
-    log_array[TRANSLOG_INTERNAL_PARTS + translog_parts].str= log_pos;
-    log_array[TRANSLOG_INTERNAL_PARTS + translog_parts].length= 7;
-    changed_length+= 7;
-    translog_parts++;
-  }
-#endif
+  _ma_log_key_changes(ma_page, log_array + TRANSLOG_INTERNAL_PARTS +
+                      translog_parts, log_pos, &changed_length,
+                      &translog_parts);
+  /* Remember new page length for future log entires for same page */
+  ma_page->org_size= ma_page->size;
 
   DBUG_RETURN(translog_write_record(&lsn, LOGREC_REDO_INDEX,
                                     info->trn, info,
@@ -407,7 +406,7 @@ my_bool _ma_log_prefix(MARIA_PAGE *ma_pa
 my_bool _ma_log_suffix(MARIA_PAGE *ma_page, uint org_length, uint new_length)
 {
   LSN lsn;
-  LEX_CUSTRING log_array[TRANSLOG_INTERNAL_PARTS + 3];
+  LEX_CUSTRING log_array[TRANSLOG_INTERNAL_PARTS + 4];
   uchar log_data[FILEID_STORE_SIZE + PAGE_STORE_SIZE + 10 + 7 + 2], *log_pos;
   uchar *buff= ma_page->buff;
   int diff;
@@ -417,6 +416,8 @@ my_bool _ma_log_suffix(MARIA_PAGE *ma_pa
   DBUG_ENTER("_ma_log_suffix");
   DBUG_PRINT("enter", ("page: %lu  org_length: %u  new_length: %u",
                        (ulong) ma_page->pos, org_length, new_length));
+  DBUG_ASSERT(ma_page->size == new_length);
+  DBUG_ASSERT(ma_page->org_size == org_length);
 
   page= ma_page->pos / info->s->block_size;
 
@@ -451,20 +452,11 @@ my_bool _ma_log_suffix(MARIA_PAGE *ma_pa
   log_array[TRANSLOG_INTERNAL_PARTS + 0].length= (uint) (log_pos -
                                                          log_data);
 
-#ifdef EXTRA_DEBUG_KEY_CHANGES
-  {
-    ha_checksum crc;
-    crc= my_checksum(0, buff + LSN_STORE_SIZE, new_length - LSN_STORE_SIZE);
-    log_pos[0]= KEY_OP_CHECK;
-    int2store(log_pos+1, new_length);
-    int4store(log_pos+3, crc);
-
-    log_array[TRANSLOG_INTERNAL_PARTS + translog_parts].str= log_pos;
-    log_array[TRANSLOG_INTERNAL_PARTS + translog_parts].length= 7;
-    extra_length+= 7;
-    translog_parts++;
-  }
-#endif
+  _ma_log_key_changes(ma_page,
+                      log_array + TRANSLOG_INTERNAL_PARTS + translog_parts,
+                      log_pos, &extra_length, &translog_parts);
+  /* Remember new page length for future log entires for same page */
+  ma_page->org_size= ma_page->size;
 
   DBUG_RETURN(translog_write_record(&lsn, LOGREC_REDO_INDEX,
                                     info->trn, info,
@@ -481,12 +473,16 @@ my_bool _ma_log_suffix(MARIA_PAGE *ma_pa
 
    @param ma_page          Changed page
    @param org_page_length  Length of data in page before key was added
+			   Final length in ma_page->size
 
    @note
      If handle_overflow is set, then we have to protect against
      logging changes that is outside of the page.
      This may happen during underflow() handling where the buffer
      in memory temporary contains more data than block_size
+
+     ma_page may be a page that was previously logged and cuted down
+     becasue it's too big. (org_page_length > ma_page->org_size)
 */
 
 my_bool _ma_log_add(MARIA_PAGE *ma_page,
@@ -496,14 +492,14 @@ my_bool _ma_log_add(MARIA_PAGE *ma_page,
 {
   LSN lsn;
   uchar log_data[FILEID_STORE_SIZE + PAGE_STORE_SIZE + 2 + 3 + 3 + 3 + 3 + 7 +
-                 2];
+                 3 + 2];
   uchar *log_pos;
   uchar *buff= ma_page->buff;
-  LEX_CUSTRING log_array[TRANSLOG_INTERNAL_PARTS + 3];
+  LEX_CUSTRING log_array[TRANSLOG_INTERNAL_PARTS + 6];
   MARIA_HA *info= ma_page->info;
   uint offset= (uint) (key_pos - buff);
-  uint page_length= info->s->block_size - KEYPAGE_CHECKSUM_SIZE;
-  uint translog_parts;
+  uint max_page_size= info->s->max_index_block_size;
+  uint translog_parts, current_size;
   pgcache_page_no_t page_pos;
   DBUG_ENTER("_ma_log_add");
   DBUG_PRINT("enter", ("page: %lu  org_page_length: %u  changed_length: %u  "
@@ -512,6 +508,9 @@ my_bool _ma_log_add(MARIA_PAGE *ma_page,
                        move_length));
   DBUG_ASSERT(info->s->now_transactional);
   DBUG_ASSERT(move_length <= (int) changed_length);
+  DBUG_ASSERT(ma_page->org_size == min(org_page_length, max_page_size));
+  DBUG_ASSERT(ma_page->size == org_page_length + move_length);
+  DBUG_ASSERT(offset < max_page_size);
 
   /*
     Write REDO entry that contains the logical operations we need
@@ -520,6 +519,7 @@ my_bool _ma_log_add(MARIA_PAGE *ma_page,
   log_pos= log_data + FILEID_STORE_SIZE;
   page_pos= ma_page->pos / info->s->block_size;
   page_store(log_pos, page_pos);
+  current_size= ma_page->org_size;
   log_pos+= PAGE_STORE_SIZE;
 
 #ifdef EXTRA_DEBUG_KEY_CHANGES
@@ -531,40 +531,41 @@ my_bool _ma_log_add(MARIA_PAGE *ma_page,
   *log_pos++= KEY_OP_SET_PAGEFLAG;
   *log_pos++= buff[KEYPAGE_TRANSFLAG_OFFSET];
 
-  if (org_page_length + move_length > page_length)
+  /*
+    Don't overwrite page boundary
+    It's ok to cut this as we will append the data at end of page
+    in the next log entry
+  */
+  if (offset + changed_length > max_page_size)
+  {
+    DBUG_ASSERT(handle_overflow);
+    changed_length= max_page_size - offset;   /* Update to end of page */
+    move_length= 0;                             /* Nothing to move */
+    /* Extend the page to max length on recovery */
+    *log_pos++= KEY_OP_MAX_PAGELENGTH;
+    current_size= max_page_size;
+  }
+
+  /* Check if adding the key made the page overflow */
+  if (current_size + move_length > max_page_size)
   {
     /*
-      Overflow. Cut either key or data from page end so that key fits
-      The code that splits the too big page will ignore logging any
-      data over org_page_length
+      Adding the key caused an overflow. Cut away the part of the
+      page that doesn't fit.
     */
+    uint diff;
     DBUG_ASSERT(handle_overflow);
-    if (offset + changed_length > page_length)
-    {
-      /* Log that data changed to end of page */
-      changed_length= page_length - offset;
-      move_length= 0;
-      /* Set page to max length */
-      org_page_length= page_length;
-      *log_pos++= KEY_OP_MAX_PAGELENGTH;
-    }
-    else
-    {
-      /* They key will not be part of the page ; Don't log it */
-      uint diff= org_page_length + move_length - page_length;
-      log_pos[0]= KEY_OP_DEL_SUFFIX;
-      int2store(log_pos+1, diff);
-      log_pos+= 3;
-      org_page_length-= diff;
-      DBUG_ASSERT(org_page_length == page_length - move_length);
-    }
-    DBUG_ASSERT(offset != org_page_length);
+    diff= current_size + move_length - max_page_size;
+    log_pos[0]= KEY_OP_DEL_SUFFIX;
+    int2store(log_pos+1, diff);
+    log_pos+= 3;
+    current_size= max_page_size - move_length;
   }
 
-  if (offset == org_page_length)
+  if (offset == current_size)
   {
-    DBUG_ASSERT(move_length == (int) changed_length);
     log_pos[0]= KEY_OP_ADD_SUFFIX;
+    current_size+= changed_length;
   }
   else
   {
@@ -576,51 +577,103 @@ my_bool _ma_log_add(MARIA_PAGE *ma_page,
       log_pos[0]= KEY_OP_SHIFT;
       int2store(log_pos+1, move_length);
       log_pos+= 3;
+      current_size+= move_length;
     }
     log_pos[0]= KEY_OP_CHANGE;
   }
   int2store(log_pos+1, changed_length);
   log_pos+= 3;
-  translog_parts= 2;
 
   log_array[TRANSLOG_INTERNAL_PARTS + 0].str=    log_data;
   log_array[TRANSLOG_INTERNAL_PARTS + 0].length= (uint) (log_pos -
                                                          log_data);
   log_array[TRANSLOG_INTERNAL_PARTS + 1].str=    key_pos;
   log_array[TRANSLOG_INTERNAL_PARTS + 1].length= changed_length;
+  translog_parts= TRANSLOG_INTERNAL_PARTS + 2;
 
-#ifdef EXTRA_DEBUG_KEY_CHANGES
+  /*
+    If page was originally > block_size before operation and now all data
+    fits, append the end data that was not part of the previous logged
+    page to it.
+  */
+  DBUG_ASSERT(current_size <= max_page_size && current_size <= ma_page->size);
+  if (current_size != ma_page->size && current_size != max_page_size)
   {
-    MARIA_SHARE *share= info->s;
-    ha_checksum crc;
-    uint save_page_length= ma_page->size;
-    uint new_length= org_page_length + move_length;
-    _ma_store_page_used(share, buff, new_length);
-    crc= my_checksum(0, buff + LSN_STORE_SIZE, new_length - LSN_STORE_SIZE);
-    log_pos[0]= KEY_OP_CHECK;
-    int2store(log_pos+1, new_length);
-    int4store(log_pos+3, crc);
-
-    log_array[TRANSLOG_INTERNAL_PARTS + translog_parts].str= log_pos;
-    log_array[TRANSLOG_INTERNAL_PARTS + translog_parts].length= 7;
-    changed_length+= 7;
-    translog_parts++;
-    _ma_store_page_used(share, buff, save_page_length);
+    uint length= min(ma_page->size, max_page_size) - current_size;
+    uchar *data= ma_page->buff + current_size;
+
+    log_pos[0]= KEY_OP_ADD_SUFFIX;
+    int2store(log_pos+1, length);
+    log_array[translog_parts].str=      log_pos;
+    log_array[translog_parts].length=   3;
+    log_array[translog_parts+1].str=    data;
+    log_array[translog_parts+1].length= length;
+    log_pos+= 3;
+    translog_parts+= 2;
+    current_size+=   length;
+    changed_length+= length + 3;
   }
-#endif
+
+  _ma_log_key_changes(ma_page, log_array + translog_parts,
+                      log_pos, &changed_length, &translog_parts);
+  /*
+    Remember new page length for future log entries for same page
+    Note that this can be different from ma_page->size in case of page
+    overflow!
+  */
+  ma_page->org_size= current_size;
+  DBUG_ASSERT(ma_page->org_size == min(ma_page->size, max_page_size));
 
   if (translog_write_record(&lsn, LOGREC_REDO_INDEX,
                             info->trn, info,
                             (translog_size_t)
                             log_array[TRANSLOG_INTERNAL_PARTS + 0].length +
-                            changed_length,
-                            TRANSLOG_INTERNAL_PARTS + translog_parts,
+                            changed_length, translog_parts,
                             log_array, log_data, NULL))
     DBUG_RETURN(-1);
   DBUG_RETURN(0);
 }
 
 
+#ifdef EXTRA_DEBUG_KEY_CHANGES
+
+/* Log checksum and optionally key page to log */
+
+void _ma_log_key_changes(MARIA_PAGE *ma_page, LEX_CUSTRING *log_array,
+                         uchar *log_pos, uint *changed_length,
+                         uint *translog_parts)
+{
+  MARIA_SHARE *share= ma_page->info->s;
+  int page_length= min(ma_page->size, share->max_index_block_size);
+  uint org_length;
+  ha_checksum crc;
+
+  DBUG_ASSERT(ma_page->flag == (uint) ma_page->buff[KEYPAGE_TRANSFLAG_OFFSET]);
+
+  /* We have to change length as the page may have been shortened */
+  org_length= _ma_get_page_used(share, ma_page->buff);
+  _ma_store_page_used(share, ma_page->buff, page_length);
+  crc= my_checksum(0, ma_page->buff + LSN_STORE_SIZE,
+                   page_length - LSN_STORE_SIZE);
+  _ma_store_page_used(share, ma_page->buff, org_length);
+
+  log_pos[0]= KEY_OP_CHECK;
+  int2store(log_pos+1, page_length);
+  int4store(log_pos+3, crc);
+
+  log_array[0].str=    log_pos;
+  log_array[0].length= 7;
+  (*changed_length)+=  7;
+  (*translog_parts)++;
+#ifdef EXTRA_STORE_FULL_PAGE_IN_KEY_CHANGES
+  log_array[1].str=    ma_page->buff;
+  log_array[1].length= page_length;
+  (*changed_length)+=  page_length;
+  (*translog_parts)++;
+#endif
+#endif
+}
+
 /****************************************************************************
   Redo of key pages
 ****************************************************************************/
@@ -716,7 +769,7 @@ uint _ma_apply_redo_index_new_page(MARIA
   bzero(buff, LSN_STORE_SIZE);
   memcpy(buff + LSN_STORE_SIZE, header, length);
   bzero(buff + LSN_STORE_SIZE + length,
-        share->block_size - LSN_STORE_SIZE - KEYPAGE_CHECKSUM_SIZE - length);
+        share->max_index_block_size - LSN_STORE_SIZE -  length);
   bfill(buff + share->block_size - KEYPAGE_CHECKSUM_SIZE,
         KEYPAGE_CHECKSUM_SIZE, (uchar) 255);
 
@@ -847,7 +900,9 @@ err:
    KEY_OP_ADD_SUFFIX 2 length, data       Add data to end of page
    KEY_OP_DEL_SUFFIX 2 length             Reduce page length with this
 				          Sets position to start of page
-   KEY_OP_CHECK      6 page_length[2},CRC Used only when debugging
+   KEY_OP_CHECK      6 page_length[2],CRC  Used only when debugging
+					  This may be followed by page_length
+                                          of data (until end of log record)
    KEY_OP_COMPACT_PAGE  6 transid
    KEY_OP_SET_PAGEFLAG  1 flag for page
    KEY_OP_MAX_PAGELENGTH 0                Set page to max length
@@ -870,7 +925,7 @@ uint _ma_apply_redo_index(MARIA_HA *info
   const uchar *header_end= header + head_length;
   uint page_offset= 0, org_page_length;
   uint nod_flag, page_length, keypage_header, keynr;
-  uint max_page_length= share->block_size - KEYPAGE_CHECKSUM_SIZE;
+  uint max_page_size= share->max_index_block_size;
   int result;
   MARIA_PAGE page;
   DBUG_ENTER("_ma_apply_redo_index");
@@ -919,11 +974,14 @@ uint _ma_apply_redo_index(MARIA_HA *info
       header+= 2;
       DBUG_PRINT("redo", ("key_op_shift: %d", length));
       DBUG_ASSERT(page_offset != 0 && page_offset <= page_length &&
-                  page_length + length <= max_page_length);
+                  page_length + length <= max_page_size);
 
       if (length < 0)
+      {
+        DBUG_ASSERT(page_offset - length <= page_length);
         bmove(buff + page_offset, buff + page_offset - length,
               page_length - page_offset + length);
+      }
       else if (page_length != page_offset)
         bmove_upp(buff + page_length + length, buff + page_length,
                   page_length - page_offset);
@@ -937,6 +995,7 @@ uint _ma_apply_redo_index(MARIA_HA *info
       DBUG_ASSERT(page_offset != 0 && page_offset + length <= page_length);
 
       memcpy(buff + page_offset, header + 2 , length);
+      page_offset+= length;           /* Put offset after changed length */
       header+= 2 + length;
       break;
     }
@@ -948,7 +1007,7 @@ uint _ma_apply_redo_index(MARIA_HA *info
                           insert_length, changed_length));
 
       DBUG_ASSERT(insert_length <= changed_length &&
-                  page_length + changed_length <= max_page_length);
+                  page_length + changed_length <= max_page_size);
 
       bmove_upp(buff + page_length + insert_length, buff + page_length,
                 page_length - keypage_header);
@@ -974,8 +1033,8 @@ uint _ma_apply_redo_index(MARIA_HA *info
     case KEY_OP_ADD_SUFFIX:                     /* 6 */
     {
       uint insert_length= uint2korr(header);
-      DBUG_PRINT("redo", ("key_op_add_prefix: %u", insert_length));
-      DBUG_ASSERT(page_length + insert_length <= max_page_length);
+      DBUG_PRINT("redo", ("key_op_add_suffix: %u", insert_length));
+      DBUG_ASSERT(page_length + insert_length <= max_page_size);
       memcpy(buff + page_length, header+2, insert_length);
 
       page_length+= insert_length;
@@ -1003,13 +1062,22 @@ uint _ma_apply_redo_index(MARIA_HA *info
       if (crc != (uint32) my_checksum(0, buff + LSN_STORE_SIZE,
                                       page_length - LSN_STORE_SIZE))
       {
-        DBUG_PRINT("error", ("page_length %u",page_length));
-        DBUG_DUMP("KEY_OP_CHECK bad page", buff, max_page_length);
-        DBUG_ASSERT("crc" == "failure in REDO_INDEX");
+        DBUG_DUMP("KEY_OP_CHECK bad page", buff, page_length);
+        if (header + 6 + page_length <= header_end)
+        {
+          DBUG_DUMP("KEY_OP_CHECK org page", header + 6, page_length);
+        }
+        DBUG_ASSERT("crc failure in REDO_INDEX" == 0);
       }
 #endif
       DBUG_PRINT("redo", ("key_op_check"));
-      header+= 6;
+      /*
+        This is the last entry in the block and it can contain page_length
+        data or not
+      */
+      DBUG_ASSERT(header + 6 == header_end ||
+                  header + 6 + page_length == header_end);
+      header= header_end;
       break;
     }
     case KEY_OP_DEBUG:
@@ -1018,7 +1086,7 @@ uint _ma_apply_redo_index(MARIA_HA *info
       break;
     case KEY_OP_MAX_PAGELENGTH:
       DBUG_PRINT("redo", ("key_op_max_page_length"));
-      page_length= max_page_length;
+      page_length= max_page_size;
       break;
     case KEY_OP_MULTI_COPY:                     /* 9 */
     {
@@ -1040,7 +1108,7 @@ uint _ma_apply_redo_index(MARIA_HA *info
       log_memcpy_length= uint2korr(header);
       header+= 2;
       log_memcpy_end= header + log_memcpy_length;
-      DBUG_ASSERT(full_length <= max_page_length);
+      DBUG_ASSERT(full_length <= max_page_size);
       while (header < log_memcpy_end)
       {
         uint to, from;
@@ -1049,7 +1117,7 @@ uint _ma_apply_redo_index(MARIA_HA *info
         from= uint2korr(header);
         header+= 2;
         /* "from" is a place in the existing page */
-        DBUG_ASSERT(max(from, to) < max_page_length);
+        DBUG_ASSERT(max(from, to) < max_page_size);
         memcpy(buff + to, buff + from, full_length);
       }
       break;

=== modified file 'storage/maria/ma_key_recover.h'
--- a/storage/maria/ma_key_recover.h	2010-08-09 17:05:42 +0000
+++ b/storage/maria/ma_key_recover.h	2010-09-05 23:25:44 +0000
@@ -64,17 +64,26 @@ extern my_bool write_hook_for_undo_key_d
                                               TRN *trn, MARIA_HA *tbl_info,
                                               LSN *lsn, void *hook_arg);
 
-my_bool _ma_log_prefix(MARIA_PAGE *page, uint changed_length, int move_length);
+my_bool _ma_log_prefix(MARIA_PAGE *page, uint changed_length, int move_length,
+                       enum en_key_debug debug_marker);
 my_bool _ma_log_suffix(MARIA_PAGE *page, uint org_length,
                        uint new_length);
 my_bool _ma_log_add(MARIA_PAGE *page, uint buff_length, uchar *key_pos,
                     uint changed_length, int move_length,
                     my_bool handle_overflow);
 my_bool _ma_log_delete(MARIA_PAGE *page, const uchar *key_pos,
-                       uint changed_length, uint move_length);
+                       uint changed_length, uint move_length,
+                       uint append_length, enum en_key_debug debug_marker);
 my_bool _ma_log_change(MARIA_PAGE *page, const uchar *key_pos, uint length,
                        enum en_key_debug debug_marker);
 my_bool _ma_log_new(MARIA_PAGE *page, my_bool root_page);
+#ifdef EXTRA_DEBUG_KEY_CHANGES
+void _ma_log_key_changes(MARIA_PAGE *ma_page, LEX_CUSTRING *log_array,
+                         uchar *log_pos, uint *changed_length,
+                         uint *translog_parts);
+#else
+void _ma_log_key_changes(A,B,C,D,E)
+#endif
 
 uint _ma_apply_redo_index_new_page(MARIA_HA *info, LSN lsn,
                                    const uchar *header, uint length);

=== modified file 'storage/maria/ma_loghandler.h'
--- a/storage/maria/ma_loghandler.h	2010-08-09 17:05:42 +0000
+++ b/storage/maria/ma_loghandler.h	2010-09-05 23:25:44 +0000
@@ -172,13 +172,24 @@ enum en_key_op
 
 enum en_key_debug
 {
-  KEY_OP_DEBUG_RTREE_COMBINE,
-  KEY_OP_DEBUG_RTREE_SPLIT,
-  KEY_OP_DEBUG_RTREE_SET_KEY,
-  KEY_OP_DEBUG_FATHER_CHANGED_1,
-  KEY_OP_DEBUG_FATHER_CHANGED_2,
-  KEY_OP_DEBUG_LOG_SPLIT,
-  KEY_OP_DEBUG_LOG_ADD
+  KEY_OP_DEBUG_RTREE_COMBINE, 		/* 0 */
+  KEY_OP_DEBUG_RTREE_SPLIT,		/* 1 */
+  KEY_OP_DEBUG_RTREE_SET_KEY,		/* 2 */
+  KEY_OP_DEBUG_FATHER_CHANGED_1,	/* 3 */
+  KEY_OP_DEBUG_FATHER_CHANGED_2,	/* 4 */
+  KEY_OP_DEBUG_LOG_SPLIT,		/* 5 */
+  KEY_OP_DEBUG_LOG_ADD,			/* 6 */
+  KEY_OP_DEBUG_LOG_PREFIX_1,		/* 7 */
+  KEY_OP_DEBUG_LOG_PREFIX_2,		/* 8 */
+  KEY_OP_DEBUG_LOG_PREFIX_3,		/* 9 */
+  KEY_OP_DEBUG_LOG_PREFIX_4,		/* 10 */
+  KEY_OP_DEBUG_LOG_PREFIX_5,		/* 11 */
+  KEY_OP_DEBUG_LOG_DEL_CHANGE_1,	/* 12 */
+  KEY_OP_DEBUG_LOG_DEL_CHANGE_2,	/* 13 */
+  KEY_OP_DEBUG_LOG_DEL_CHANGE_3,	/* 14 */
+  KEY_OP_DEBUG_LOG_DEL_CHANGE_RT,	/* 15 */
+  KEY_OP_DEBUG_LOG_DEL_PREFIX,		/* 16 */
+  KEY_OP_DEBUG_LOG_MIDDLE		/* 17 */
 };
 
 

=== modified file 'storage/maria/ma_open.c'
--- a/storage/maria/ma_open.c	2010-07-30 07:45:27 +0000
+++ b/storage/maria/ma_open.c	2010-09-05 23:25:44 +0000
@@ -550,6 +550,7 @@ MARIA_HA *maria_open(const char *name, i
     strmov(share->open_file_name.str,  name);
 
     share->block_size= share->base.block_size;   /* Convenience */
+    share->max_index_block_size= share->block_size - KEYPAGE_CHECKSUM_SIZE;
     {
       HA_KEYSEG *pos=share->keyparts;
       uint32 ftkey_nr= 1;

=== modified file 'storage/maria/ma_page.c'
--- a/storage/maria/ma_page.c	2010-03-09 19:22:24 +0000
+++ b/storage/maria/ma_page.c	2010-09-05 23:25:44 +0000
@@ -59,6 +59,7 @@ void _ma_page_setup(MARIA_PAGE *page, MA
   page->buff=    buff;
   page->pos=     pos;
   page->size=    _ma_get_page_used(share, buff);
+  page->org_size= page->size;
   page->flag=    _ma_get_keypage_flag(share, buff);
   page->node=    ((page->flag & KEYPAGE_FLAG_ISNOD) ?
                   share->base.key_reflength : 0);
@@ -68,7 +69,7 @@ void _ma_page_setup(MARIA_PAGE *page, MA
 void page_cleanup(MARIA_SHARE *share, MARIA_PAGE *page)
 {
   uint length= page->size;
-  DBUG_ASSERT(length <= block_size - KEYPAGE_CHECKSUM_SIZE);
+  DBUG_ASSERT(length <= share->max_index_block_size);
   bzero(page->buff + length, share->block_size - length);
 }
 #endif
@@ -103,7 +104,7 @@ my_bool _ma_fetch_keypage(MARIA_PAGE *pa
   MARIA_SHARE *share= info->s;
   uint block_size= share->block_size;
   DBUG_ENTER("_ma_fetch_keypage");
-  DBUG_PRINT("enter",("pos: %ld", (long) pos));
+  DBUG_PRINT("enter",("page: %lu", (ulong) (pos / block_size)));
 
   tmp= pagecache_read(share->pagecache, &share->kfile,
                       (pgcache_page_no_t) (pos / block_size), level, buff,
@@ -142,6 +143,7 @@ my_bool _ma_fetch_keypage(MARIA_PAGE *pa
   page->buff=    tmp;
   page->pos=     pos;
   page->size=    _ma_get_page_used(share, tmp);
+  page->org_size= page->size;                    /* For debugging */
   page->flag=    _ma_get_keypage_flag(share, tmp);
   page->node=   ((page->flag & KEYPAGE_FLAG_ISNOD) ?
                  share->base.key_reflength : 0);
@@ -149,7 +151,7 @@ my_bool _ma_fetch_keypage(MARIA_PAGE *pa
 #ifdef EXTRA_DEBUG
   {
     uint page_size= page->size;
-    if (page_size < 4 || page_size > block_size ||
+    if (page_size < 4 || page_size > share->max_index_block_size ||
         _ma_get_keynr(share, tmp) != keyinfo->key_nr)
     {
       DBUG_PRINT("error",("page %lu had wrong page length: %u  keynr: %u",
@@ -159,7 +161,7 @@ my_bool _ma_fetch_keypage(MARIA_PAGE *pa
       info->last_keypage = HA_OFFSET_ERROR;
       maria_print_error(share, HA_ERR_CRASHED);
       my_errno= HA_ERR_CRASHED;
-      tmp= 0;
+      DBUG_RETURN(1);
     }
   }
 #endif
@@ -179,6 +181,13 @@ my_bool _ma_write_keypage(MARIA_PAGE *pa
   MARIA_PINNED_PAGE page_link;
   DBUG_ENTER("_ma_write_keypage");
 
+  /*
+    The following ensures that for transactional tables we have logged
+    all changes that changes the page size (as the logging code sets
+    page->org_size)
+  */
+  DBUG_ASSERT(!share->now_transactional || page->size == page->org_size);
+
 #ifdef EXTRA_DEBUG				/* Safety check */
   {
     uint page_length, nod_flag;
@@ -193,7 +202,7 @@ my_bool _ma_write_keypage(MARIA_PAGE *pa
         (page->pos & (maria_block_size-1)))
     {
       DBUG_PRINT("error",("Trying to write inside key status region: "
-                          "key_start: %lu  length: %lu  page: %lu",
+                          "key_start: %lu  length: %lu  page_pos: %lu",
                           (long) share->base.keystart,
                           (long) share->state.state.key_file_length,
                           (long) page->pos));
@@ -201,7 +210,7 @@ my_bool _ma_write_keypage(MARIA_PAGE *pa
       DBUG_ASSERT(0);
       DBUG_RETURN(1);
     }
-    DBUG_PRINT("page",("write page at: %lu",(long) page->pos));
+    DBUG_PRINT("page",("write page at: %lu",(ulong) (page->pos / block_size)));
     DBUG_DUMP("buff", buff, page_length);
     DBUG_ASSERT(page_length >= share->keypage_header + nod_flag +
                 page->keyinfo->minlength || maria_in_recovery);
@@ -274,7 +283,7 @@ int _ma_dispose(register MARIA_HA *info,
   enum pagecache_page_lock lock_method;
   enum pagecache_page_pin pin_method;
   DBUG_ENTER("_ma_dispose");
-  DBUG_PRINT("enter",("pos: %ld", (long) pos));
+  DBUG_PRINT("enter",("page: %lu", (ulong) (pos / block_size)));
   DBUG_ASSERT(pos % block_size == 0);
 
   (void) _ma_lock_key_del(info, 0);
@@ -423,8 +432,7 @@ my_off_t _ma_new(register MARIA_HA *info
       share->key_del_current= mi_sizekorr(buff+share->keypage_header);
 #ifndef DBUG_OFF
       key_del_current= share->key_del_current;
-      DBUG_ASSERT(key_del_current != share->state.key_del &&
-                  (key_del_current != 0) &&
+      DBUG_ASSERT((key_del_current != 0) &&
                   ((key_del_current == HA_OFFSET_ERROR) ||
                    (key_del_current <=
                     (share->state.state.key_file_length - block_size))));
@@ -453,32 +461,48 @@ my_off_t _ma_new(register MARIA_HA *info
    Log compactation of a index page
 */
 
-static my_bool _ma_log_compact_keypage(MARIA_HA *info, my_off_t page,
+static my_bool _ma_log_compact_keypage(MARIA_PAGE *ma_page,
                                        TrID min_read_from)
 {
   LSN lsn;
-  uchar log_data[FILEID_STORE_SIZE + PAGE_STORE_SIZE + 1 + TRANSID_SIZE];
+  uchar log_data[FILEID_STORE_SIZE + PAGE_STORE_SIZE + 1 + 7 + TRANSID_SIZE];
+  uchar *log_pos;
   LEX_CUSTRING log_array[TRANSLOG_INTERNAL_PARTS + 1];
+  MARIA_HA *info= ma_page->info;
   MARIA_SHARE *share= info->s;
+  uint translog_parts, extra_length;
+  my_off_t page= ma_page->pos;
   DBUG_ENTER("_ma_log_compact_keypage");
-  DBUG_PRINT("enter", ("page: %lu", (ulong) page));
+  DBUG_PRINT("enter", ("page: %lu", (ulong) (page / share->block_size)));
 
   /* Store address of new root page */
   page/= share->block_size;
   page_store(log_data + FILEID_STORE_SIZE, page);
 
-  log_data[FILEID_STORE_SIZE + PAGE_STORE_SIZE]= KEY_OP_COMPACT_PAGE;
-  transid_store(log_data + FILEID_STORE_SIZE + PAGE_STORE_SIZE +1,
-                min_read_from);
+  log_pos= log_data + FILEID_STORE_SIZE + PAGE_STORE_SIZE;
+
+  log_pos[0]= KEY_OP_COMPACT_PAGE;
+  transid_store(log_pos + 1, min_read_from);
+  log_pos+= 1 + TRANSID_SIZE;
 
   log_array[TRANSLOG_INTERNAL_PARTS + 0].str=    log_data;
-  log_array[TRANSLOG_INTERNAL_PARTS + 0].length= sizeof(log_data);
+  log_array[TRANSLOG_INTERNAL_PARTS + 0].length= (uint) (log_pos -
+                                                         log_data);
+  translog_parts= 1;
+  extra_length= 0;
+
+  _ma_log_key_changes(ma_page,
+                      log_array + TRANSLOG_INTERNAL_PARTS + translog_parts,
+                      log_pos, &extra_length, &translog_parts);
+  /* Remember new page length for future log entires for same page */
+  ma_page->org_size= ma_page->size;
 
   if (translog_write_record(&lsn, LOGREC_REDO_INDEX,
                             info->trn, info,
-                            (translog_size_t) sizeof(log_data),
-                            TRANSLOG_INTERNAL_PARTS + 1, log_array,
-                            log_data, NULL))
+                            log_array[TRANSLOG_INTERNAL_PARTS +
+                                      0].length + extra_length,
+                            TRANSLOG_INTERNAL_PARTS + translog_parts,
+                            log_array, log_data, NULL))
     DBUG_RETURN(1);
   DBUG_RETURN(0);
 }
@@ -526,7 +550,7 @@ my_bool _ma_compact_keypage(MARIA_PAGE *
   {
     if (!(page= (*ma_page->keyinfo->skip_key)(&key, 0, 0, page)))
     {
-      DBUG_PRINT("error",("Couldn't find last key:  page: 0x%lx",
+      DBUG_PRINT("error",("Couldn't find last key:  page_pos: 0x%lx",
                           (long) page));
       maria_print_error(share, HA_ERR_CRASHED);
       my_errno=HA_ERR_CRASHED;
@@ -588,7 +612,7 @@ my_bool _ma_compact_keypage(MARIA_PAGE *
 
   if (share->now_transactional)
   {
-    if (_ma_log_compact_keypage(info, ma_page->pos, min_read_from))
+    if (_ma_log_compact_keypage(ma_page, min_read_from))
       DBUG_RETURN(1);
   }
   DBUG_RETURN(0);

=== modified file 'storage/maria/ma_recovery.c'
--- a/storage/maria/ma_recovery.c	2010-08-23 09:52:57 +0000
+++ b/storage/maria/ma_recovery.c	2010-09-05 23:25:44 +0000
@@ -545,7 +545,7 @@ static int display_and_apply_record(cons
   if (log_desc->record_execute_in_redo_phase == NULL)
   {
     /* die on all not-yet-handled records :) */
-    DBUG_ASSERT("one more hook" == "to write");
+    DBUG_ASSERT("one more hook to write" == 0);
     return 1;
   }
   if ((error= (*log_desc->record_execute_in_redo_phase)(rec)))
@@ -1063,6 +1063,7 @@ prototype_redo_exec_hook(REDO_REPAIR_TAB
   param.isam_file_name= name= info->s->open_file_name.str;
   param.testflag= uint8korr(rec->header + FILEID_STORE_SIZE);
   param.tmpdir= maria_tmpdir;
+  param.max_trid= max_long_trid;
   DBUG_ASSERT(maria_tmpdir);
 
   info->s->state.key_map= uint8korr(rec->header + FILEID_STORE_SIZE + 8);

=== modified file 'storage/maria/ma_rt_key.c'
--- a/storage/maria/ma_rt_key.c	2008-09-01 17:31:40 +0000
+++ b/storage/maria/ma_rt_key.c	2010-09-05 23:25:44 +0000
@@ -91,7 +91,8 @@ int maria_rtree_delete_key(MARIA_PAGE *p
   page->size-= key_length_with_nod_flag;
   page_store_size(share, page);
   if (share->now_transactional &&
-      _ma_log_delete(page, key_start, 0, key_length_with_nod_flag))
+      _ma_log_delete(page, key_start, 0, key_length_with_nod_flag,
+                     0, KEY_OP_DEBUG_LOG_DEL_CHANGE_RT))
     return -1;
   return 0;
 }

=== modified file 'storage/maria/ma_rt_split.c'
--- a/storage/maria/ma_rt_split.c	2008-09-01 17:31:40 +0000
+++ b/storage/maria/ma_rt_split.c	2010-09-05 23:25:44 +0000
@@ -308,7 +308,7 @@ static my_bool _ma_log_rt_split(MARIA_PA
   LSN lsn;
   uchar log_data[FILEID_STORE_SIZE + PAGE_STORE_SIZE + 1 + 2 + 1 + 2 + 2 + 7],
     *log_pos;
-  LEX_CUSTRING log_array[TRANSLOG_INTERNAL_PARTS + 5];
+  LEX_CUSTRING log_array[TRANSLOG_INTERNAL_PARTS + 6];
   uint translog_parts, extra_length= 0;
   my_off_t page_pos;
   DBUG_ENTER("_ma_log_rt_split");
@@ -344,24 +344,11 @@ static my_bool _ma_log_rt_split(MARIA_PA
     translog_parts+= 2;
   }
 
-#ifdef EXTRA_DEBUG_KEY_CHANGES
-  {
-    int page_length= page->size;
-    ha_checksum crc;
-    uchar *check_start= log_pos;
-    crc= my_checksum(0, page->buff + LSN_STORE_SIZE,
-                     page_length - LSN_STORE_SIZE);
-    log_pos[0]= KEY_OP_CHECK;
-    log_pos++;
-    int2store(log_pos, page_length);
-    log_pos+= 2;
-    int4store(log_pos, crc);
-    log_pos+= 4;
-    log_array[TRANSLOG_INTERNAL_PARTS + translog_parts].str=    check_start;
-    log_array[TRANSLOG_INTERNAL_PARTS + translog_parts].length= 7;
-    translog_parts++;
-  }
-#endif
+  _ma_log_key_changes(page,
+                      log_array + TRANSLOG_INTERNAL_PARTS + translog_parts,
+                      log_pos, &extra_length, &translog_parts);
+  /* Remember new page length for future log entires for same page */
+  page->org_size= page->size;
 
   if (translog_write_record(&lsn, LOGREC_REDO_INDEX,
                             info->trn, info,

=== modified file 'storage/maria/ma_unique.c'
--- a/storage/maria/ma_unique.c	2008-06-26 05:18:28 +0000
+++ b/storage/maria/ma_unique.c	2010-09-05 23:25:44 +0000
@@ -68,8 +68,7 @@ my_bool _ma_check_unique(MARIA_HA *info,
     DBUG_ASSERT(info->last_key.data_length == MARIA_UNIQUE_HASH_LENGTH);
     if (_ma_search_next(info, &info->last_key, SEARCH_BIGGER,
 			info->s->state.key_root[def->key]) ||
-	bcmp((char*) info->last_key.data, (char*) key_buff,
-             MARIA_UNIQUE_HASH_LENGTH))
+	bcmp(info->last_key.data, key_buff, MARIA_UNIQUE_HASH_LENGTH))
     {
       info->page_changed= 1;			/* Can't optimize read next */
       info->cur_row.lastpos= lastpos;

=== modified file 'storage/maria/ma_write.c'
--- a/storage/maria/ma_write.c	2010-08-20 07:29:26 +0000
+++ b/storage/maria/ma_write.c	2010-09-05 23:25:44 +0000
@@ -823,9 +823,9 @@ int _ma_insert(register MARIA_HA *info,
     Check if the new key fits totally into the the page
     (anc_buff is big enough to contain a full page + one key)
   */
-  if (a_length <= (uint) keyinfo->block_length - KEYPAGE_CHECKSUM_SIZE)
+  if (a_length <= share->max_index_block_size)
   {
-    if (keyinfo->block_length - KEYPAGE_CHECKSUM_SIZE - a_length < 32 &&
+    if (share->max_index_block_size - a_length < 32 &&
         (keyinfo->flag & HA_FULLTEXT) && key_pos == endpos &&
         share->base.key_reflength <= share->base.rec_reflength &&
         share->options & (HA_OPTION_PACK_RECORD | HA_OPTION_COMPRESS_RECORD))
@@ -885,9 +885,9 @@ ChangeSet at 1.2562, 2008-04-09 07:41:40+02
     }
     else
     {
-      if (share->now_transactional &&
+      if (share->now_transactional && 
           _ma_log_add(anc_page, org_anc_length,
-                      key_pos, s_temp.changed_length, t_length, 0))
+                      key_pos, s_temp.changed_length, t_length, 1))
         DBUG_RETURN(-1);
     }
     DBUG_RETURN(0);				/* There is room on page */
@@ -1265,7 +1265,7 @@ static int _ma_balance_page(MARIA_HA *in
          curr_keylength);
 
   if ((right ? right_length : left_length) + curr_keylength <=
-      (uint) keyinfo->block_length - KEYPAGE_CHECKSUM_SIZE)
+      share->max_index_block_size)
   {
     /* Enough space to hold all keys in the two buffers ; Balance bufferts */
     new_left_length= share->keypage_header+nod_flag+(keys/2)*curr_keylength;
@@ -1320,7 +1320,8 @@ static int _ma_balance_page(MARIA_HA *in
             start of page
           */
           if (_ma_log_prefix(&next_page, 0,
-                             ((int) new_right_length - (int) right_length)))
+                             ((int) new_right_length - (int) right_length),
+                             KEY_OP_DEBUG_LOG_PREFIX_3))
             goto err;
         }
         else
@@ -1383,7 +1384,8 @@ static int _ma_balance_page(MARIA_HA *in
           */
           if (_ma_log_prefix(&next_page,
                              (uint) (new_right_length - right_length),
-                             (int) (new_right_length - right_length)))
+                             (int) (new_right_length - right_length),
+                             KEY_OP_DEBUG_LOG_PREFIX_4))
             goto err;
         }
         else
@@ -1545,7 +1547,8 @@ static int _ma_balance_page(MARIA_HA *in
         This contains the last 'extra_buff' from 'buff'
       */
       if (_ma_log_prefix(&extra_page,
-                         0, (int) (extra_buff_length - right_length)))
+                         0, (int) (extra_buff_length - right_length),
+                         KEY_OP_DEBUG_LOG_PREFIX_5))
         goto err;
 
       /*
@@ -1891,6 +1894,9 @@ my_bool _ma_log_new(MARIA_PAGE *ma_page,
   log_array[TRANSLOG_INTERNAL_PARTS + 1].str=   ma_page->buff + LSN_STORE_SIZE;
   log_array[TRANSLOG_INTERNAL_PARTS + 1].length= page_length;
 
+  /* Remember new page length for future log entires for same page */
+  ma_page->org_size= ma_page->size;
+
   if (translog_write_record(&lsn, LOGREC_REDO_INDEX_NEW_PAGE,
                             info->trn, info,
                             (translog_size_t)
@@ -1912,7 +1918,7 @@ my_bool _ma_log_change(MARIA_PAGE *ma_pa
 {
   LSN lsn;
   uchar log_data[FILEID_STORE_SIZE + PAGE_STORE_SIZE + 2 + 6 + 7], *log_pos;
-  LEX_CUSTRING log_array[TRANSLOG_INTERNAL_PARTS + 3];
+  LEX_CUSTRING log_array[TRANSLOG_INTERNAL_PARTS + 4];
   uint offset= (uint) (key_pos - ma_page->buff), translog_parts;
   my_off_t page;
   MARIA_HA *info= ma_page->info;
@@ -1921,6 +1927,7 @@ my_bool _ma_log_change(MARIA_PAGE *ma_pa
 
   DBUG_ASSERT(info->s->now_transactional);
   DBUG_ASSERT(offset + length <= ma_page->size);
+  DBUG_ASSERT(ma_page->org_size == ma_page->size);
 
   /* Store address of new root page */
   page= ma_page->pos / info->s->block_size;
@@ -1944,21 +1951,9 @@ my_bool _ma_log_change(MARIA_PAGE *ma_pa
   log_array[TRANSLOG_INTERNAL_PARTS + 1].length= length;
   translog_parts= 2;
 
-#ifdef EXTRA_DEBUG_KEY_CHANGES
-  {
-    int page_length= ma_page->size;
-    ha_checksum crc;
-    crc= my_checksum(0, ma_page->buff + LSN_STORE_SIZE,
-                     page_length - LSN_STORE_SIZE);
-    log_pos[0]= KEY_OP_CHECK;
-    int2store(log_pos+1, page_length);
-    int4store(log_pos+3, crc);
-    log_array[TRANSLOG_INTERNAL_PARTS + translog_parts].str= log_pos;
-    log_array[TRANSLOG_INTERNAL_PARTS + translog_parts].length= 7;
-    log_pos+= 7;
-    translog_parts++;
-  }
-#endif
+  _ma_log_key_changes(ma_page,
+                      log_array + TRANSLOG_INTERNAL_PARTS + translog_parts,
+                      log_pos, &length, &translog_parts);
 
   if (translog_write_record(&lsn, LOGREC_REDO_INDEX,
                             info->trn, info,
@@ -1994,8 +1989,6 @@ my_bool _ma_log_change(MARIA_PAGE *ma_pa
      - Page is shortened from end
      - Data is added to end of page
      - Data added at front of page
-
-
 */
 
 static my_bool _ma_log_split(MARIA_PAGE *ma_page,
@@ -2006,9 +1999,9 @@ static my_bool _ma_log_split(MARIA_PAGE
                              uint changed_length)
 {
   LSN lsn;
-  uchar log_data[FILEID_STORE_SIZE + PAGE_STORE_SIZE + 2 + 3+3+3+3+3+2 +7];
+  uchar log_data[FILEID_STORE_SIZE + PAGE_STORE_SIZE + 2 + 2 + 3+3+3+3+3+2 +7];
   uchar *log_pos;
-  LEX_CUSTRING log_array[TRANSLOG_INTERNAL_PARTS + 4];
+  LEX_CUSTRING log_array[TRANSLOG_INTERNAL_PARTS + 6];
   uint offset= (uint) (key_pos - ma_page->buff);
   uint translog_parts, extra_length;
   MARIA_HA *info= ma_page->info; 
@@ -2018,6 +2011,7 @@ static my_bool _ma_log_split(MARIA_PAGE
                        (ulong) ma_page->pos, org_length, new_length));
 
   DBUG_ASSERT(changed_length >= data_length);
+  DBUG_ASSERT(org_length <= info->s->max_index_block_size);
 
   log_pos= log_data + FILEID_STORE_SIZE;
   page= ma_page->pos / info->s->block_size;
@@ -2029,6 +2023,10 @@ static my_bool _ma_log_split(MARIA_PAGE
   (*log_pos++)= KEY_OP_DEBUG_LOG_SPLIT;
 #endif
 
+  /* Store keypage_flag */
+  *log_pos++= KEY_OP_SET_PAGEFLAG;
+  *log_pos++= ma_page->buff[KEYPAGE_TRANSFLAG_OFFSET];
+
   if (new_length <= offset || !key_pos)
   {
     /*
@@ -2053,6 +2051,11 @@ static my_bool _ma_log_split(MARIA_PAGE
     */
     max_key_length= new_length - offset;
     extra_length= min(key_length, max_key_length);
+    if (offset + move_length > new_length)
+    {
+      /* This is true when move_length includes changes for next packed key */
+      move_length= new_length - offset;
+    }
 
     if ((int) new_length < (int) (org_length + move_length + data_length))
     {
@@ -2115,21 +2118,12 @@ static my_bool _ma_log_split(MARIA_PAGE
   log_array[TRANSLOG_INTERNAL_PARTS + 0].str=    log_data;
   log_array[TRANSLOG_INTERNAL_PARTS + 0].length= (uint) (log_pos -
                                                          log_data);
-#ifdef EXTRA_DEBUG_KEY_CHANGES
-  {
-    int page_length= ma_page->size;
-    ha_checksum crc;
-    crc= my_checksum(0, ma_page->buff + LSN_STORE_SIZE,
-                     page_length - LSN_STORE_SIZE);
-    log_pos[0]= KEY_OP_CHECK;
-    int2store(log_pos+1, page_length);
-    int4store(log_pos+3, crc);
-    log_array[TRANSLOG_INTERNAL_PARTS + translog_parts].str= log_pos;
-    log_array[TRANSLOG_INTERNAL_PARTS + translog_parts].length= 7;
-    extra_length+= 7;
-    translog_parts++;
-  }
-#endif
+
+  _ma_log_key_changes(ma_page,
+                      log_array + TRANSLOG_INTERNAL_PARTS + translog_parts,
+                      log_pos, &extra_length, &translog_parts);
+  /* Remember new page length for future log entires for same page */
+  ma_page->org_size= ma_page->size;
 
   DBUG_RETURN(translog_write_record(&lsn, LOGREC_REDO_INDEX,
                                     info->trn, info,
@@ -2168,8 +2162,9 @@ static my_bool _ma_log_del_prefix(MARIA_
                                   int move_length)
 {
   LSN lsn;
-  uchar log_data[FILEID_STORE_SIZE + PAGE_STORE_SIZE + 12 + 7], *log_pos;
-  LEX_CUSTRING log_array[TRANSLOG_INTERNAL_PARTS + 3];
+  uchar log_data[FILEID_STORE_SIZE + PAGE_STORE_SIZE + 2 + 2 + 12 + 7];
+  uchar *log_pos;
+  LEX_CUSTRING log_array[TRANSLOG_INTERNAL_PARTS + 4];
   uint offset= (uint) (key_pos - ma_page->buff);
   uint diff_length= org_length + move_length - new_length;
   uint translog_parts, extra_length;
@@ -2180,6 +2175,7 @@ static my_bool _ma_log_del_prefix(MARIA_
                        (ulong) ma_page->pos, org_length, new_length));
 
   DBUG_ASSERT((int) diff_length > 0);
+  DBUG_ASSERT(ma_page->size == new_length);
 
   log_pos= log_data + FILEID_STORE_SIZE;
   page= ma_page->pos / info->s->block_size;
@@ -2189,6 +2185,15 @@ static my_bool _ma_log_del_prefix(MARIA_
   translog_parts= 1;
   extra_length= 0;
 
+#ifdef EXTRA_DEBUG_KEY_CHANGES
+  *log_pos++= KEY_OP_DEBUG;
+  *log_pos++= KEY_OP_DEBUG_LOG_DEL_PREFIX;
+#endif
+
+  /* Store keypage_flag */
+  *log_pos++= KEY_OP_SET_PAGEFLAG;
+  *log_pos++= ma_page->buff[KEYPAGE_TRANSFLAG_OFFSET];
+
   if (offset < diff_length + info->s->keypage_header)
   {
     /*
@@ -2236,21 +2241,11 @@ static my_bool _ma_log_del_prefix(MARIA_
   log_array[TRANSLOG_INTERNAL_PARTS + 0].str=    log_data;
   log_array[TRANSLOG_INTERNAL_PARTS + 0].length= (uint) (log_pos -
                                                          log_data);
-#ifdef EXTRA_DEBUG_KEY_CHANGES
-  {
-    int page_length= ma_page->size;
-    ha_checksum crc;
-    crc= my_checksum(0, ma_page->buff + LSN_STORE_SIZE,
-                     page_length - LSN_STORE_SIZE);
-    log_pos[0]= KEY_OP_CHECK;
-    int2store(log_pos+1, page_length);
-    int4store(log_pos+3, crc);
-    log_array[TRANSLOG_INTERNAL_PARTS + translog_parts].str= log_pos;
-    log_array[TRANSLOG_INTERNAL_PARTS + translog_parts].length= 7;
-    extra_length+= 7;
-    translog_parts++;
-  }
-#endif
+  _ma_log_key_changes(ma_page,
+                      log_array + TRANSLOG_INTERNAL_PARTS + translog_parts,
+                      log_pos, &extra_length, &translog_parts);
+  /* Remember new page length for future log entires for same page */
+  ma_page->org_size= ma_page->size;
 
   DBUG_RETURN(translog_write_record(&lsn, LOGREC_REDO_INDEX,
                                     info->trn, info,
@@ -2277,9 +2272,9 @@ static my_bool _ma_log_key_middle(MARIA_
                                   uint key_length, int move_length)
 {
   LSN lsn;
-  uchar log_data[FILEID_STORE_SIZE + PAGE_STORE_SIZE + 3+5+3+3+3 + 7];
+  uchar log_data[FILEID_STORE_SIZE + PAGE_STORE_SIZE + 2 + 2 + 3+5+3+3+3 + 7];
   uchar *log_pos;
-  LEX_CUSTRING log_array[TRANSLOG_INTERNAL_PARTS + 5];
+  LEX_CUSTRING log_array[TRANSLOG_INTERNAL_PARTS + 6];
   uint key_offset;
   uint translog_parts, extra_length;
   my_off_t page;
@@ -2287,6 +2282,8 @@ static my_bool _ma_log_key_middle(MARIA_
   DBUG_ENTER("_ma_log_key_middle");
   DBUG_PRINT("enter", ("page: %lu", (ulong) ma_page->pos));
 
+  DBUG_ASSERT(ma_page->size == new_length);
+
   /* new place of key after changes */
   key_pos+= data_added_first;
   key_offset= (uint) (key_pos - ma_page->buff);
@@ -2314,6 +2311,15 @@ static my_bool _ma_log_key_middle(MARIA_
   page_store(log_pos, page);
   log_pos+= PAGE_STORE_SIZE;
 
+#ifdef EXTRA_DEBUG_KEY_CHANGES
+  *log_pos++= KEY_OP_DEBUG;
+  *log_pos++= KEY_OP_DEBUG_LOG_MIDDLE;
+#endif
+
+  /* Store keypage_flag */
+  *log_pos++= KEY_OP_SET_PAGEFLAG;
+  *log_pos++= ma_page->buff[KEYPAGE_TRANSFLAG_OFFSET];
+
   log_pos[0]= KEY_OP_DEL_SUFFIX;
   int2store(log_pos+1, data_deleted_last);
   log_pos+= 3;
@@ -2362,21 +2368,11 @@ static my_bool _ma_log_key_middle(MARIA_
                            key_length);
   }
 
-#ifdef EXTRA_DEBUG_KEY_CHANGES
-  {
-    int page_length= ma_page->size;
-    ha_checksum crc;
-    crc= my_checksum(0, ma_page->buff + LSN_STORE_SIZE,
-                     page_length - LSN_STORE_SIZE);
-    log_pos[0]= KEY_OP_CHECK;
-    int2store(log_pos+1, page_length);
-    int4store(log_pos+3, crc);
-    log_array[TRANSLOG_INTERNAL_PARTS + translog_parts].str= log_pos;
-    log_array[TRANSLOG_INTERNAL_PARTS + translog_parts].length= 7;
-    extra_length+= 7;
-    translog_parts++;
-  }
-#endif
+  _ma_log_key_changes(ma_page,
+                      log_array + TRANSLOG_INTERNAL_PARTS + translog_parts,
+                      log_pos, &extra_length, &translog_parts);
+  /* Remember new page length for future log entires for same page */
+  ma_page->org_size= ma_page->size;
 
   DBUG_RETURN(translog_write_record(&lsn, LOGREC_REDO_INDEX,
                                     info->trn, info,
@@ -2401,14 +2397,17 @@ static my_bool _ma_log_middle(MARIA_PAGE
                               uint data_deleted_last)
 {
   LSN lsn;
-  LEX_STRING log_array[TRANSLOG_INTERNAL_PARTS + 2];
-  uchar log_data[FILEID_STORE_SIZE + PAGE_STORE_SIZE + 3 + 5], *log_pos;
+  LEX_STRING log_array[TRANSLOG_INTERNAL_PARTS + 4];
+  uchar log_data[FILEID_STORE_SIZE + PAGE_STORE_SIZE + 3 + 5 + 7], *log_pos;
   MARIA_HA *info= ma_page->info;
   my_off_t page;
   uint translog_parts, extra_length;
   DBUG_ENTER("_ma_log_middle");
   DBUG_PRINT("enter", ("page: %lu", (ulong) page));
 
+  DBUG_ASSERT(ma_page->org_size + data_added_first - data_deleted_last ==
+              ma_page->size);
+
   page= ma_page->page / info->s->block_size;
 
   log_pos= log_data + FILEID_STORE_SIZE;
@@ -2434,21 +2433,11 @@ static my_bool _ma_log_middle(MARIA_PAGE
   translog_parts= 2;
   extra_length= data_changed_first;
 
-#ifdef EXTRA_DEBUG_KEY_CHANGES
-  {
-    int page_length= ma_page->size;
-    ha_checksum crc;
-    crc= my_checksum(0, ma_page->buff + LSN_STORE_SIZE,
-                     page_length - LSN_STORE_SIZE);
-    log_pos[0]= KEY_OP_CHECK;
-    int2store(log_pos+1, page_length);
-    int4store(log_pos+3, crc);
-    log_array[TRANSLOG_INTERNAL_PARTS + translog_parts].str= log_pos;
-    log_array[TRANSLOG_INTERNAL_PARTS + translog_parts].length= 7;
-    extra_length+= 7;
-    translog_parts++;
-  }
-#endif
+  _ma_log_key_changes(ma_page,
+                      log_array + TRANSLOG_INTERNAL_PARTS + translog_parts,
+                      log_pos, &extra_length, &translog_parts);
+  /* Remember new page length for future log entires for same page */
+  ma_page->org_size= ma_page->size;
 
   DBUG_RETURN(translog_write_record(&lsn, LOGREC_REDO_INDEX,
                                     info->trn, info,

=== modified file 'storage/maria/maria_def.h'
--- a/storage/maria/maria_def.h	2010-08-24 22:18:01 +0000
+++ b/storage/maria/maria_def.h	2010-09-05 23:25:44 +0000
@@ -39,6 +39,7 @@
 #define SANITY_CHECKS 1
 #ifdef EXTRA_DEBUG
 #define EXTRA_DEBUG_KEY_CHANGES
+#define EXTRA_STORE_FULL_PAGE_IN_KEY_CHANGES
 #endif
 
 #define MAX_NONMAPPED_INSERTS 1000
@@ -361,6 +362,7 @@ typedef struct st_maria_share
   uint in_trans;                        /* Number of references by trn */
   uint w_locks, r_locks, tot_locks;	/* Number of read/write locks */
   uint block_size;			/* block_size of keyfile & data file*/
+  uint max_index_block_size;            /* block_size - end_of_page_info */
   /* Fixed length part of a packed row in BLOCK_RECORD format */
   uint base_length;
   myf write_flag;
@@ -833,6 +835,7 @@ typedef struct st_maria_page
   uchar *buff;				/* Data for page */
   my_off_t pos;                         /* Disk address to page */
   uint     size;                        /* Size of data on page */
+  uint     org_size;                    /* Size of page at read or after log */
   uint     node;      			/* 0 or share->base.key_reflength */
   uint     flag;			/* Page flag */
   uint     link_offset;

=== modified file 'storage/maria/trnman.c'
--- a/storage/maria/trnman.c	2010-08-07 14:42:30 +0000
+++ b/storage/maria/trnman.c	2010-09-05 23:25:44 +0000
@@ -363,6 +363,7 @@ TRN *trnman_new_trn(WT_THD *wt)
   trn->used_tables= 0;
 
   trn->locked_tables= 0;
+  trn->flags= 0;
 
   /*
     only after the following function TRN is considered initialized,



More information about the commits mailing list