[Commits] 0dc0f8a: MDEV-9736: Window functions: multiple cursors to read filesort result

Sergei Petrunia psergey at askmonty.org
Fri Sep 23 14:18:29 EEST 2016


revision-id: 0dc0f8a1d555bc1b351ffd3a793d6c3841444b64
parent(s): 85358ddf88fc5d7f849041ebe4ba77107197f667
committer: Sergei Petrunia
branch nick: 10.2-cl
timestamp: 2016-09-23 14:18:29 +0300
message:

MDEV-9736: Window functions: multiple cursors to read filesort result

Add support for having multiple IO_CACHEs with type=READ_CACHE to share
the file they are reading from.
Each IO_CACHE keeps its own in-memory buffer. When doing a read or seek
operation on the file, it notifies other IO_CACHEs that the file position
has been changed.

Make Rowid_seq_cursor use cloned IO_CACHE when reading filesort result.

---
 include/my_sys.h            |    7 ++
 mysql-test/r/win_big.result |   93 ++++++++++++++++++++++++
 mysql-test/t/win_big.test   |  104 +++++++++++++++++++++++++++
 mysys/mf_iocache.c          |  148 ++++++++++++++++++++++++++++++++++----
 sql/sql_window.cc           |  164 ++++++++++++++++++++++++++++---------------
 5 files changed, 447 insertions(+), 69 deletions(-)

diff --git a/include/my_sys.h b/include/my_sys.h
index 7b71585..fc4078d 100644
--- a/include/my_sys.h
+++ b/include/my_sys.h
@@ -472,6 +472,8 @@ struct st_my_file_info
   const char *dir;
   char prefix[3];
   File file; /* file descriptor */
+
+  struct st_io_cache *next_file_user;
   /*
     seek_not_done is set by my_b_seek() to inform the upcoming read/write
     operation that a seek needs to be preformed prior to the actual I/O
@@ -802,6 +804,11 @@ extern my_bool reinit_io_cache(IO_CACHE *info,enum cache_type type,
 extern void setup_io_cache(IO_CACHE* info);
 extern void init_io_cache_share(IO_CACHE *read_cache, IO_CACHE_SHARE *cshare,
                                 IO_CACHE *write_cache, uint num_threads);
+
+extern int init_slave_io_cache(IO_CACHE *master, IO_CACHE *slave);
+void end_slave_io_cache(IO_CACHE *cache);
+void seek_io_cache(IO_CACHE *cache, my_off_t needed_offset);
+
 extern void remove_io_thread(IO_CACHE *info);
 extern int _my_b_async_read(IO_CACHE *info,uchar *Buffer,size_t Count);
 extern int my_b_append(IO_CACHE *info,const uchar *Buffer,size_t Count);
diff --git a/mysql-test/r/win_big.result b/mysql-test/r/win_big.result
new file mode 100644
index 0000000..7ea044e
--- /dev/null
+++ b/mysql-test/r/win_big.result
@@ -0,0 +1,93 @@
+create table t0 (a int);
+insert into t0 values (0),(1),(2),(3),(4),(5),(6),(7),(8),(9);
+create table t1(a int);
+insert into t1 select A.a + B.a* 10 + C.a * 100 from t0 A, t0 B, t0 C;
+create table t10 (a int, b int, c int);
+insert into t10 
+select 
+A.a + 1000*B.a,
+A.a + 1000*B.a,
+A.a + 1000*B.a
+from t1 A, t0 B
+order by A.a+1000*B.a;
+#################################################################
+## Try a basic example
+flush status;
+create table t21 as
+select 
+sum(b) over (order by a rows between 2 preceding and 2 following) as SUM_B
+from
+t10;
+show status like 'Sort_merge_passes';
+Variable_name	Value
+Sort_merge_passes	0
+set sort_buffer_size=1024;
+flush status;
+create table t22 as
+select 
+sum(b) over (order by a rows between 2 preceding and 2 following) as SUM_B
+from
+t10;
+show status like 'Sort_merge_passes';
+Variable_name	Value
+Sort_merge_passes	35
+include/diff_tables.inc [t21, t22]
+drop table t21, t22;
+#################################################################
+# Try many cursors
+set sort_buffer_size=default;
+flush status;
+create table t21 as
+select 
+sum(b) over (order by a rows between 2 preceding and 2 following) as SUM_B1,
+sum(b) over (order by a rows between 5 preceding and 5 following) as SUM_B2,
+sum(b) over (order by a rows between 20 preceding and 20 following) as SUM_B3
+from
+t10;
+show status like 'Sort_merge_passes';
+Variable_name	Value
+Sort_merge_passes	0
+set sort_buffer_size=1024;
+flush status;
+create table t22 as
+select 
+sum(b) over (order by a rows between 2 preceding and 2 following) as SUM_B1,
+sum(b) over (order by a rows between 5 preceding and 5 following) as SUM_B2,
+sum(b) over (order by a rows between 20 preceding and 20 following) as SUM_B3
+from
+t10;
+show status like 'Sort_merge_passes';
+Variable_name	Value
+Sort_merge_passes	35
+include/diff_tables.inc [t21, t22]
+drop table t21, t22;
+#################################################################
+# Try having cursors pointing at different IO_CACHE pages
+# in the IO_CACHE
+set sort_buffer_size=default;
+flush status;
+create table t21 as
+select 
+a,
+sum(b) over (order by a range between 5000 preceding and 5000 following) as SUM_B1
+from
+t10;
+show status like 'Sort_merge_passes';
+Variable_name	Value
+Sort_merge_passes	0
+set sort_buffer_size=1024;
+flush status;
+create table t22 as
+select
+a,
+sum(b) over (order by a range between 5000 preceding and 5000 following) as SUM_B1
+from
+t10;
+show status like 'Sort_merge_passes';
+Variable_name	Value
+Sort_merge_passes	35
+include/diff_tables.inc [t21, t22]
+drop table t21, t22;
+#################################################################
+drop table t10;
+drop table t0,t1;
diff --git a/mysql-test/t/win_big.test b/mysql-test/t/win_big.test
new file mode 100644
index 0000000..a039812
--- /dev/null
+++ b/mysql-test/t/win_big.test
@@ -0,0 +1,104 @@
+#
+# Tests for window functions over big datasets.
+#  "Big" here is "big enough so that filesort result doesn't fit in a 
+#   memory  buffer".
+#
+#
+
+create table t0 (a int);
+insert into t0 values (0),(1),(2),(3),(4),(5),(6),(7),(8),(9);
+
+create table t1(a int);
+insert into t1 select A.a + B.a* 10 + C.a * 100 from t0 A, t0 B, t0 C;
+
+create table t10 (a int, b int, c int);
+insert into t10 
+select 
+  A.a + 1000*B.a,
+  A.a + 1000*B.a,
+  A.a + 1000*B.a
+from t1 A, t0 B
+order by A.a+1000*B.a;
+
+--echo #################################################################
+--echo ## Try a basic example
+flush status;
+create table t21 as
+select 
+  sum(b) over (order by a rows between 2 preceding and 2 following) as SUM_B
+from
+  t10;
+show status like 'Sort_merge_passes';
+
+set sort_buffer_size=1024;
+flush status;
+create table t22 as
+select 
+  sum(b) over (order by a rows between 2 preceding and 2 following) as SUM_B
+from
+  t10;
+show status like 'Sort_merge_passes';
+
+let $diff_tables= t21, t22;
+source include/diff_tables.inc;
+drop table t21, t22;
+
+--echo #################################################################
+--echo # Try many cursors
+set sort_buffer_size=default;
+flush status;
+create table t21 as
+select 
+  sum(b) over (order by a rows between 2 preceding and 2 following) as SUM_B1,
+  sum(b) over (order by a rows between 5 preceding and 5 following) as SUM_B2,
+  sum(b) over (order by a rows between 20 preceding and 20 following) as SUM_B3
+from
+  t10;
+show status like 'Sort_merge_passes';
+
+set sort_buffer_size=1024;
+flush status;
+create table t22 as
+select 
+  sum(b) over (order by a rows between 2 preceding and 2 following) as SUM_B1,
+  sum(b) over (order by a rows between 5 preceding and 5 following) as SUM_B2,
+  sum(b) over (order by a rows between 20 preceding and 20 following) as SUM_B3
+from
+  t10;
+show status like 'Sort_merge_passes';
+
+let $diff_tables= t21, t22;
+source include/diff_tables.inc;
+drop table t21, t22;
+
+--echo #################################################################
+--echo # Try having cursors pointing at different IO_CACHE pages
+--echo # in the IO_CACHE
+set sort_buffer_size=default;
+flush status;
+create table t21 as
+select 
+  a,
+  sum(b) over (order by a range between 5000 preceding and 5000 following) as SUM_B1
+from
+  t10;
+show status like 'Sort_merge_passes';
+
+set sort_buffer_size=1024;
+flush status;
+create table t22 as
+select
+  a,
+  sum(b) over (order by a range between 5000 preceding and 5000 following) as SUM_B1
+from
+  t10;
+show status like 'Sort_merge_passes';
+
+let $diff_tables= t21, t22;
+source include/diff_tables.inc;
+drop table t21, t22;
+--echo #################################################################
+
+drop table t10;
+drop table t0,t1;
+
diff --git a/mysys/mf_iocache.c b/mysys/mf_iocache.c
index 635e544..77581a5 100644
--- a/mysys/mf_iocache.c
+++ b/mysys/mf_iocache.c
@@ -193,6 +193,7 @@ int init_io_cache(IO_CACHE *info, File file, size_t cachesize,
   info->alloced_buffer = 0;
   info->buffer=0;
   info->seek_not_done= 0;
+  info->next_file_user= NULL;
 
   if (file >= 0)
   {
@@ -328,6 +329,101 @@ int init_io_cache(IO_CACHE *info, File file, size_t cachesize,
   DBUG_RETURN(0);
 }						/* init_io_cache */
 
+
+
+/*
+  Initialize the slave IO_CACHE to read the same file (and data)
+  as master does.
+
+  One can create multiple slaves from a single master. Every slave and master
+  will have independent file positions.
+
+  The master must be a non-shared READ_CACHE.
+  It is assumed that no more reads are done after a master and/or a slave 
+  has been freed (this limitation can be easily lifted).
+*/
+
+int init_slave_io_cache(IO_CACHE *master, IO_CACHE *slave)
+{
+  uchar *slave_buf;
+  DBUG_ASSERT(master->type == READ_CACHE);
+  DBUG_ASSERT(!master->share);
+  DBUG_ASSERT(master->alloced_buffer);
+
+  if (!(slave_buf= (uchar*)my_malloc(master->buffer_length, MYF(0))))
+  {
+    return 1;
+  }
+  memcpy(slave, master, sizeof(IO_CACHE));
+  slave->buffer= slave_buf;
+
+  memcpy(slave->buffer, master->buffer, master->buffer_length);
+  slave->read_pos= slave->buffer + (master->read_pos - master->buffer);
+  slave->read_end= slave->buffer + (master->read_end - master->buffer);
+   
+  DBUG_ASSERT(master->current_pos == &master->read_pos);
+  slave->current_pos= &slave->read_pos;
+  DBUG_ASSERT(master->current_end == &master->read_end);
+  slave->current_end= &slave->read_end;
+  
+  if (master->next_file_user)
+  {
+    IO_CACHE *p;
+    for (p= master->next_file_user; 
+         p->next_file_user !=master;
+         p= p->next_file_user)
+    {}
+
+    p->next_file_user= slave;
+    slave->next_file_user= master;
+  }
+  else
+  {
+    slave->next_file_user= master;
+    master->next_file_user= slave;
+  }
+  return 0;
+}
+
+
+void end_slave_io_cache(IO_CACHE *cache)
+{
+  my_free(cache->buffer);
+}
+
+/*
+  Seek a read io cache to a given offset
+*/
+void seek_io_cache(IO_CACHE *cache, my_off_t needed_offset)
+{
+  my_off_t cached_data_start= cache->pos_in_file;
+  my_off_t cached_data_end= cache->pos_in_file + (cache->read_pos -
+                                                  cache->buffer);
+  if (needed_offset >= cached_data_start &&
+      needed_offset < cached_data_end)
+  {
+    /* 
+      The offset we're seeking to is in the buffer. 
+      Move buffer's read position accordingly
+    */
+    cache->read_pos= cache->buffer + (needed_offset - cached_data_start);
+  }
+  else
+  {
+    if (needed_offset > cache->end_of_file)
+      needed_offset= cache->end_of_file;
+    /* 
+      The offset we're seeking to is not in the buffer.
+      - Set the buffer to be exhausted.
+      - Make the next read to a mysql_file_seek() call to the required 
+        offset (but still use aligned reads).
+    */
+    cache->read_pos= cache->read_end;
+    cache->seek_not_done= 1;
+    cache->pos_in_file= (needed_offset / IO_SIZE) * IO_SIZE;
+  }
+}
+
 	/* Wait until current request is ready */
 
 #ifdef HAVE_AIOWAIT
@@ -583,6 +679,17 @@ int _my_b_cache_read(IO_CACHE *info, uchar *Buffer, size_t Count)
     {
       /* No error, reset seek_not_done flag. */
       info->seek_not_done= 0;
+
+      if (info->next_file_user)
+      {
+        IO_CACHE *c;
+        for (c= info->next_file_user;
+             c!= info;
+             c= c->next_file_user)
+        {
+          c->seek_not_done= 1;
+        }
+      }
     }
     else
     {
@@ -671,22 +778,35 @@ int _my_b_cache_read(IO_CACHE *info, uchar *Buffer, size_t Count)
       DBUG_RETURN(0);                           /* EOF */
     }
   }
-  else if ((length= mysql_file_read(info->file,info->buffer, max_length,
+  else 
+  {
+    if (info->next_file_user)
+    {
+      IO_CACHE *c;
+      for (c= info->next_file_user;
+           c!= info;
+           c= c->next_file_user)
+      {
+        c->seek_not_done= 1;
+      }
+    }
+    if ((length= mysql_file_read(info->file,info->buffer, max_length,
                             info->myflags)) < Count ||
 	   length == (size_t) -1)
-  {
-    /*
-      We got an read error, or less than requested (end of file).
-      If not a read error, copy, what we got.
-    */
-    if (length != (size_t) -1)
-      memcpy(Buffer, info->buffer, length);
-    info->pos_in_file= pos_in_file;
-    /* For a read error, return -1, otherwise, what we got in total. */
-    info->error= length == (size_t) -1 ? -1 : (int) (length+left_length);
-    info->read_pos=info->read_end=info->buffer;
-    info->seek_not_done=1;
-    DBUG_RETURN(1);
+    {
+      /*
+        We got an read error, or less than requested (end of file).
+        If not a read error, copy, what we got.
+      */
+      if (length != (size_t) -1)
+        memcpy(Buffer, info->buffer, length);
+      info->pos_in_file= pos_in_file;
+      /* For a read error, return -1, otherwise, what we got in total. */
+      info->error= length == (size_t) -1 ? -1 : (int) (length+left_length);
+      info->read_pos=info->read_end=info->buffer;
+      info->seek_not_done=1;
+      DBUG_RETURN(1);
+    }
   }
   /*
     Count is the remaining number of bytes requested.
diff --git a/sql/sql_window.cc b/sql/sql_window.cc
index e720c39..4b6e45b 100644
--- a/sql/sql_window.cc
+++ b/sql/sql_window.cc
@@ -515,17 +515,6 @@ void order_window_funcs_by_window_specs(List<Item_window_func> *win_func_list)
 // note: make rr_from_pointers static again when not need it here anymore
 int rr_from_pointers(READ_RECORD *info);
 
-/*
-  A temporary way to clone READ_RECORD structures until Monty provides the real
-  one.
-*/
-bool clone_read_record(const READ_RECORD *src, READ_RECORD *dst)
-{
-  //DBUG_ASSERT(src->table->sort.record_pointers);
-  DBUG_ASSERT(src->read_record == rr_from_pointers);
-  memcpy(dst, src, sizeof(READ_RECORD));
-  return false;
-}
 
 /////////////////////////////////////////////////////////////////////////////
 
@@ -540,68 +529,145 @@ bool clone_read_record(const READ_RECORD *src, READ_RECORD *dst)
 class Rowid_seq_cursor
 {
 public:
-  virtual ~Rowid_seq_cursor() {}
+  Rowid_seq_cursor() : io_cache(NULL), ref_buffer(0) {}
+  virtual ~Rowid_seq_cursor()
+  {
+    if (ref_buffer)
+      my_free(ref_buffer);
+    if (io_cache)
+    {
+      end_slave_io_cache(io_cache);
+      my_free(io_cache);
+      io_cache= NULL;
+    }
+  }
+
+private:
+  /* Length of one rowid element */
+  size_t ref_length;
+
+  /* If io_cache=!NULL, use it */
+  IO_CACHE *io_cache;
+  uchar *ref_buffer;   /* Buffer for the last returned rowid */
+  uint rownum;     /* Number of the rowid that is about to be returned */
+  bool cache_eof; /* whether we've reached EOF */
+  
+  /* The following are used when we are reading from an array of pointers */
+  uchar *cache_start;
+  uchar *cache_pos;
+  uchar *cache_end;
+public:
 
   void init(READ_RECORD *info)
   {
-    cache_start= info->cache_pos;
-    cache_pos=   info->cache_pos;
-    cache_end=   info->cache_end;
     ref_length= info->ref_length;
+    if (info->read_record == rr_from_pointers)
+    {
+      io_cache= NULL;
+      cache_start= info->cache_pos;
+      cache_pos=   info->cache_pos;
+      cache_end=   info->cache_end;
+    }
+    else
+    {
+      //DBUG_ASSERT(info->read_record == rr_from_tempfile);
+      rownum= 0;
+      cache_eof= false;
+      io_cache= (IO_CACHE*)my_malloc(sizeof(IO_CACHE), MYF(0));
+      init_slave_io_cache(info->io_cache, io_cache);
+
+      ref_buffer= (uchar*)my_malloc(ref_length, MYF(0));
+    }
   }
 
   virtual int next()
   {
-    /* Allow multiple next() calls in EOF state. */
-    if (cache_pos == cache_end)
-      return -1;
-
-    cache_pos+= ref_length;
-    DBUG_ASSERT(cache_pos <= cache_end);
+    if (io_cache)
+    {
+      if (cache_eof)
+        return 1;
 
+      if (my_b_read(io_cache,ref_buffer,ref_length))
+      {
+        cache_eof= 1; // TODO: remove cache_eof
+        return -1;
+      }
+      rownum++;
+      return 0;
+    }
+    else
+    {
+      /* Allow multiple next() calls in EOF state. */
+      if (cache_pos == cache_end)
+        return -1;
+      cache_pos+= ref_length;
+      DBUG_ASSERT(cache_pos <= cache_end);
+    }
     return 0;
   }
 
   virtual int prev()
   {
-    /* Allow multiple prev() calls when positioned at the start. */
-    if (cache_pos == cache_start)
-      return -1;
-    cache_pos-= ref_length;
-    DBUG_ASSERT(cache_pos >= cache_start);
+    if (io_cache)
+    {
+      if (rownum == 0)
+        return -1;
 
-    return 0;
+      move_to(rownum - 1);
+      return 0;
+    }
+    else
+    {
+      /* Allow multiple prev() calls when positioned at the start. */
+      if (cache_pos == cache_start)
+        return -1;
+      cache_pos-= ref_length;
+      DBUG_ASSERT(cache_pos >= cache_start);
+      return 0;
+    }
   }
 
   ha_rows get_rownum() const
   {
-    return (cache_pos - cache_start) / ref_length;
+    if (io_cache)
+      return rownum;
+    else
+      return (cache_pos - cache_start) / ref_length;
   }
 
   void move_to(ha_rows row_number)
   {
-    cache_pos= MY_MIN(cache_end, cache_start + row_number * ref_length);
-    DBUG_ASSERT(cache_pos <= cache_end);
+    if (io_cache)
+    {
+      seek_io_cache(io_cache, row_number * ref_length);
+      rownum= row_number;
+      next();
+    }
+    else
+    {
+      cache_pos= MY_MIN(cache_end, cache_start + row_number * ref_length);
+      DBUG_ASSERT(cache_pos <= cache_end);
+    }
   }
 
 protected:
-  bool at_eof() { return (cache_pos == cache_end); }
-
-  uchar *get_prev_rowid()
+  bool at_eof()
   {
-    if (cache_pos == cache_start)
-      return NULL;
+    if (io_cache)
+    {
+      return cache_eof;
+    }
     else
-      return cache_pos - ref_length;
+      return (cache_pos == cache_end);
   }
 
-  uchar *get_curr_rowid() { return cache_pos; }
-
-private:
-  uchar *cache_start;
-  uchar *cache_pos;
-  uchar *cache_end;
-  uint ref_length;
+  uchar *get_curr_rowid()
+  {
+    if (io_cache)
+      return ref_buffer;
+    else
+      return cache_pos;
+  }
 };
 
 
@@ -630,18 +696,6 @@ class Table_read_cursor : public Rowid_seq_cursor
     return table->file->ha_rnd_pos(record, curr_rowid);
   }
 
-  bool fetch_prev_row()
-  {
-    uchar *p;
-    if ((p= get_prev_rowid()))
-    {
-      int rc= table->file->ha_rnd_pos(record, p);
-      if (!rc)
-        return true; // restored ok
-    }
-    return false; // didn't restore
-  }
-
 private:
   /* The table that is acccesed by this cursor. */
   TABLE *table;


More information about the commits mailing list