[Commits] 78d3bd5: MDEV-10059: Compute window functions with same sorting criteria simultaneously

vicentiu vicentiu at mariadb.org
Tue Sep 6 19:51:24 EEST 2016


revision-id: 78d3bd5e0b8827d21c4fe907aa56dcda85d7dd89 (mariadb-10.2.1-21-g78d3bd5)
parent(s): 31a8cf54c8a7913338480a0571feaf32143b5f64
author: Vicențiu Ciorbaru
committer: Vicențiu Ciorbaru
timestamp: 2016-09-06 19:50:50 +0300
message:

MDEV-10059: Compute window functions with same sorting criteria simultaneously

Perform only one table scan for each window function present. We do this
by keeping keeping cursors for each window function frame bound and
running them for each function for every row.

---
 sql/item_windowfunc.cc |  43 +--
 sql/item_windowfunc.h  |  90 ++---
 sql/sql_window.cc      | 884 +++++++++++++++++++++++++++++--------------------
 sql/sql_window.h       |  37 +--
 4 files changed, 607 insertions(+), 447 deletions(-)

diff --git a/sql/item_windowfunc.cc b/sql/item_windowfunc.cc
index d157d54..c8ea979 100644
--- a/sql/item_windowfunc.cc
+++ b/sql/item_windowfunc.cc
@@ -41,7 +41,7 @@ Item_window_func::resolve_window_name(THD *thd)
     return true;
   }
 
-  return false;                      
+  return false;
 }
 
 
@@ -154,7 +154,7 @@ void Item_window_func::split_sum_func(THD *thd, Ref_ptr_array ref_pointer_array,
 
 
 /*
-  This must be called before advance_window() can be called.
+  This must be called before attempting to compute the window function values.
 
   @detail
     If we attempt to do it in fix_fields(), partition_fields will refer
@@ -162,30 +162,25 @@ void Item_window_func::split_sum_func(THD *thd, Ref_ptr_array ref_pointer_array,
     We need it to refer to temp.table columns.
 */
 
-void Item_window_func::setup_partition_border_check(THD *thd)
-{
-  partition_tracker.init(thd, window_spec->partition_list);
-  window_func()->setup_window_func(thd, window_spec);
-}
-
-
 void Item_sum_rank::setup_window_func(THD *thd, Window_spec *window_spec)
 {
   /* TODO: move this into Item_window_func? */
-  peer_tracker.init(thd, window_spec->order_list);
+  peer_tracker = new Group_bound_tracker(thd, window_spec->order_list);
+  peer_tracker->init();
   clear();
 }
 
 void Item_sum_dense_rank::setup_window_func(THD *thd, Window_spec *window_spec)
 {
   /* TODO: consider moving this && Item_sum_rank's implementation */
-  peer_tracker.init(thd, window_spec->order_list);
+  peer_tracker = new Group_bound_tracker(thd, window_spec->order_list);
+  peer_tracker->init();
   clear();
 }
 
 bool Item_sum_dense_rank::add()
 {
-  if (peer_tracker.check_if_next_group() || first_add)
+  if (peer_tracker->check_if_next_group() || first_add)
   {
     first_add= false;
     dense_rank++;
@@ -198,7 +193,7 @@ bool Item_sum_dense_rank::add()
 bool Item_sum_rank::add()
 {
   row_number++;
-  if (peer_tracker.check_if_next_group())
+  if (peer_tracker->check_if_next_group())
   {
     /* Row value changed */
     cur_rank= row_number;
@@ -206,25 +201,10 @@ bool Item_sum_rank::add()
   return false; 
 }
 
-bool Item_window_func::check_if_partition_changed()
-{
-  return partition_tracker.check_if_next_group();
-}
-
-void Item_window_func::advance_window()
-{
-  if (check_if_partition_changed())
-  {
-    /* Next partition */
-    window_func()->clear();
-  }
-  window_func()->add();
-}
-
 bool Item_sum_percent_rank::add()
 {
   row_number++;
-  if (peer_tracker.check_if_next_group())
+  if (peer_tracker->check_if_next_group())
   {
     /* Row value changed. */
     cur_rank= row_number;
@@ -235,8 +215,7 @@ bool Item_sum_percent_rank::add()
 void Item_sum_percent_rank::setup_window_func(THD *thd, Window_spec *window_spec)
 {
   /* TODO: move this into Item_window_func? */
-  peer_tracker.init(thd, window_spec->order_list);
+  peer_tracker = new Group_bound_tracker(thd, window_spec->order_list);
+  peer_tracker->init();
   clear();
 }
-
-
diff --git a/sql/item_windowfunc.h b/sql/item_windowfunc.h
index 9d2fa13..5b352a4 100644
--- a/sql/item_windowfunc.h
+++ b/sql/item_windowfunc.h
@@ -12,25 +12,19 @@ int test_if_group_changed(List<Cached_item> &list);
 /* A wrapper around test_if_group_changed */
 class Group_bound_tracker
 {
-  List<Cached_item> group_fields;
-  /*
-    During the first check_if_next_group, the list of cached_items is not
-    initialized. The compare function will return that the items match if
-    the field's value is the same as the Cached_item's default value (0).
-    This flag makes sure that we always return true during the first check.
-
-    XXX This is better to be implemented within test_if_group_changed, but
-    since it is used in other parts of the codebase, we keep it here for now.
-  */
-   bool first_check;
 public:
-  void init(THD *thd, SQL_I_List<ORDER> *list)
+
+  Group_bound_tracker(THD *thd, SQL_I_List<ORDER> *list)
   {
     for (ORDER *curr = list->first; curr; curr=curr->next) 
     {
       Cached_item *tmp= new_Cached_item(thd, curr->item[0], TRUE);
       group_fields.push_back(tmp);
     }
+  }
+
+  void init()
+  {
     first_check= true;
   }
 
@@ -76,6 +70,19 @@ class Group_bound_tracker
     }
     return 0;
   }
+
+private:
+  List<Cached_item> group_fields;
+  /*
+    During the first check_if_next_group, the list of cached_items is not
+    initialized. The compare function will return that the items match if
+    the field's value is the same as the Cached_item's default value (0).
+    This flag makes sure that we always return true during the first check.
+
+    XXX This is better to be implemented within test_if_group_changed, but
+    since it is used in other parts of the codebase, we keep it here for now.
+  */
+   bool first_check;
 };
 
 /*
@@ -92,19 +99,22 @@ class Item_sum_row_number: public Item_sum_int
   longlong count;
 
 public:
+
+  Item_sum_row_number(THD *thd)
+    : Item_sum_int(thd),  count(0) {}
+
   void clear()
   {
     count= 0;
   }
-  bool add() 
+
+  bool add()
   {
     count++;
-    return false; 
+    return false;
   }
-  void update_field() {}
 
-  Item_sum_row_number(THD *thd)
-    : Item_sum_int(thd),  count(0) {}
+  void update_field() {}
 
   enum Sumfunctype sum_func() const
   {
@@ -119,7 +129,6 @@ class Item_sum_row_number: public Item_sum_int
   {
     return "row_number(";
   }
-  
 };
 
 
@@ -145,9 +154,12 @@ class Item_sum_rank: public Item_sum_int
 protected:
   longlong row_number; // just ROW_NUMBER()
   longlong cur_rank;   // current value
-  
-  Group_bound_tracker peer_tracker;
+
+  Group_bound_tracker *peer_tracker;
 public:
+
+  Item_sum_rank(THD *thd) : Item_sum_int(thd), peer_tracker(NULL) {}
+
   void clear()
   {
     /* This is called on partition start */
@@ -168,10 +180,6 @@ class Item_sum_rank: public Item_sum_int
     TODO: ^^ what does this do ? It is not called ever?
   */
 
-public:
-  Item_sum_rank(THD *thd)
-    : Item_sum_int(thd) {}
-
   enum Sumfunctype sum_func () const
   {
     return RANK_FUNC;
@@ -183,9 +191,12 @@ class Item_sum_rank: public Item_sum_int
   }
 
   void setup_window_func(THD *thd, Window_spec *window_spec);
+
   void cleanup()
   {
-    peer_tracker.cleanup();
+    if (peer_tracker)
+      peer_tracker->cleanup();
+    delete peer_tracker;
     Item_sum_int::cleanup();
   }
 };
@@ -214,7 +225,7 @@ class Item_sum_dense_rank: public Item_sum_int
 {
   longlong dense_rank;
   bool first_add;
-  Group_bound_tracker peer_tracker;
+  Group_bound_tracker *peer_tracker;
  public:
   /*
      XXX(cvicentiu) This class could potentially be implemented in the rank
@@ -233,7 +244,7 @@ class Item_sum_dense_rank: public Item_sum_int
   }
 
   Item_sum_dense_rank(THD *thd)
-    : Item_sum_int(thd), dense_rank(0), first_add(true) {}
+    : Item_sum_int(thd), dense_rank(0), first_add(true), peer_tracker(NULL) {}
   enum Sumfunctype sum_func () const
   {
     return DENSE_RANK_FUNC;
@@ -248,7 +259,11 @@ class Item_sum_dense_rank: public Item_sum_int
 
   void cleanup()
   {
-    peer_tracker.cleanup();
+    if (peer_tracker)
+    {
+      peer_tracker->cleanup();
+      delete peer_tracker;
+    }
     Item_sum_int::cleanup();
   }
 };
@@ -289,7 +304,7 @@ class Item_sum_percent_rank: public Item_sum_window_with_row_count
 {
  public:
   Item_sum_percent_rank(THD *thd)
-    : Item_sum_window_with_row_count(thd), cur_rank(1) {}
+    : Item_sum_window_with_row_count(thd), cur_rank(1), peer_tracker(NULL) {}
 
   longlong val_int()
   {
@@ -347,11 +362,15 @@ class Item_sum_percent_rank: public Item_sum_window_with_row_count
   longlong cur_rank;   // Current rank of the current row.
   longlong row_number; // Value if this were ROW_NUMBER() function.
 
-  Group_bound_tracker peer_tracker;
+  Group_bound_tracker *peer_tracker;
 
   void cleanup()
   {
-    peer_tracker.cleanup();
+    if (peer_tracker)
+    {
+      peer_tracker->cleanup();
+      delete peer_tracker;
+    }
     Item_sum_num::cleanup();
   }
 };
@@ -502,12 +521,6 @@ class Item_window_func : public Item_func_or_sum
 public:
   Window_spec *window_spec;
   
-  /*
-    This stores the data about the partition we're currently in.
-    advance_window() uses this to tell when we've left one partition and
-    entered another
-  */
-  Group_bound_tracker partition_tracker;
 public:
   Item_window_func(THD *thd, Item_sum *win_func, LEX_STRING *win_name)
     : Item_func_or_sum(thd, (Item *) win_func),
@@ -600,9 +613,6 @@ class Item_window_func : public Item_func_or_sum
   */
   void setup_partition_border_check(THD *thd);
 
-  void advance_window();
-  bool check_if_partition_changed();
-
   enum_field_types field_type() const
   { 
     return ((Item_sum *) args[0])->field_type(); 
diff --git a/sql/sql_window.cc b/sql/sql_window.cc
index 4c5ef53..a862821 100644
--- a/sql/sql_window.cc
+++ b/sql/sql_window.cc
@@ -37,12 +37,12 @@ Window_spec::check_window_names(List_iterator_fast<Window_spec> &it)
       if (win_spec->order_list->elements && order_list->elements)
       {
         my_error(ER_ORDER_LIST_IN_REFERENCING_WINDOW_SPEC, MYF(0), ref_name);
-        return true;              
+        return true;
       } 
-      if (win_spec->window_frame) 
+      if (win_spec->window_frame)
       {
         my_error(ER_WINDOW_FRAME_IN_REFERENCED_WINDOW_SPEC, MYF(0), ref_name);
-        return true;              
+        return true;
       }
       referenced_win_spec= win_spec;
       if (partition_list->elements == 0)
@@ -54,7 +54,7 @@ Window_spec::check_window_names(List_iterator_fast<Window_spec> &it)
   if (ref_name && !referenced_win_spec)
   {
     my_error(ER_WRONG_WINDOW_SPEC_NAME, MYF(0), ref_name);
-    return true;              
+    return true;
   }
   window_names_are_checked= true;
   return false;
@@ -73,7 +73,7 @@ Window_frame::check_frame_bounds()
        top_bound->precedence_type == Window_frame_bound::FOLLOWING))
   {
     my_error(ER_BAD_COMBINATION_OF_WINDOW_FRAME_BOUND_SPECS, MYF(0));
-    return true;              
+    return true;
   }
 
   return false;
@@ -86,7 +86,7 @@ Window_frame::check_frame_bounds()
 
 int
 setup_windows(THD *thd, Ref_ptr_array ref_pointer_array, TABLE_LIST *tables,
-	      List<Item> &fields, List<Item> &all_fields, 
+              List<Item> &fields, List<Item> &all_fields,
               List<Window_spec> &win_specs, List<Item_window_func> &win_funcs)
 {
   Window_spec *win_spec;
@@ -116,7 +116,7 @@ setup_windows(THD *thd, Ref_ptr_array ref_pointer_array, TABLE_LIST *tables,
   it.rewind();
 
   List_iterator_fast<Window_spec> itp(win_specs);
-    
+
   while ((win_spec= it++))
   {
     bool hidden_group_fields;
@@ -131,7 +131,7 @@ setup_windows(THD *thd, Ref_ptr_array ref_pointer_array, TABLE_LIST *tables,
     {
       DBUG_RETURN(1);
     }
-    
+
     if (win_spec->window_frame &&
         win_spec->window_frame->exclusion != Window_frame::EXCL_NONE)
     {
@@ -188,7 +188,7 @@ setup_windows(THD *thd, Ref_ptr_array ref_pointer_array, TABLE_LIST *tables,
         }
       }
     }
-    
+
     /* "ROWS PRECEDING|FOLLOWING $n" must have a numeric $n */
     if (win_spec->window_frame && 
         win_spec->window_frame->units == Window_frame::UNITS_ROWS)
@@ -219,7 +219,7 @@ setup_windows(THD *thd, Ref_ptr_array ref_pointer_array, TABLE_LIST *tables,
   {
     win_func_item->update_used_tables();
   }
-  
+
   DBUG_RETURN(0);
 }
 
@@ -445,7 +445,7 @@ typedef int (*Item_window_func_cmp)(Item_window_func *f1,
   @brief
     Sort window functions so that those that can be computed together are
     adjacent.
-  
+
   @detail
     Sort window functions by their
      - required sorting order,
@@ -498,48 +498,15 @@ void order_window_funcs_by_window_specs(List<Item_window_func> *win_func_list)
     }
     else if (win_spec_prev->window_frame != win_spec_curr->window_frame)
       curr->marker|= FRAME_CHANGE_FLAG;
-   
-    prev= curr;                                             
-  }  
+
+    prev= curr;
+  }
 }
 
 
 /////////////////////////////////////////////////////////////////////////////
 
 
-/*
-  Do a pass over sorted table and compute window function values.
-
-  This function is for handling window functions that can be computed on the
-  fly. Examples are RANK() and ROW_NUMBER().
-*/
-bool compute_window_func_values(Item_window_func *item_win, 
-                                TABLE *tbl, READ_RECORD *info)
-{
-  int err;
-  while (!(err=info->read_record(info)))
-  {
-    store_record(tbl,record[1]);
-    
-    /* 
-      This will cause window function to compute its value for the
-      current row :
-    */
-    item_win->advance_window();
-
-    /*
-      Put the new value into temptable's field
-      TODO: Should this use item_win->update_field() call?
-      Regular aggegate function implementations seem to implement it.
-    */
-    item_win->save_in_field(item_win->result_field, true);
-    err= tbl->file->ha_update_row(tbl->record[1], tbl->record[0]);
-    if (err && err != HA_ERR_RECORD_IS_THE_SAME)
-      return true;
-  }
-  return false;
-}
-
 /////////////////////////////////////////////////////////////////////////////
 // Window Frames support 
 /////////////////////////////////////////////////////////////////////////////
@@ -571,11 +538,6 @@ bool clone_read_record(const READ_RECORD *src, READ_RECORD *dst)
 
 class Rowid_seq_cursor
 {
-  uchar *cache_start;
-  uchar *cache_pos;
-  uchar *cache_end;
-  uint ref_length;
-
 public:
   virtual ~Rowid_seq_cursor() {}
 
@@ -595,17 +557,17 @@ class Rowid_seq_cursor
     cache_pos+= ref_length;
     return 0;
   }
-  
+
   ha_rows get_rownum()
   {
     return (cache_pos - cache_start) / ref_length;
   }
 
-  // will be called by ROWS n FOLLOWING to catch up.
   void move_to(ha_rows row_number)
   {
     cache_pos= cache_start + row_number * ref_length;
   }
+
 protected:
   bool at_eof() { return (cache_pos == cache_end); }
 
@@ -618,6 +580,12 @@ class Rowid_seq_cursor
   }
 
   uchar *get_curr_rowid() { return cache_pos; }
+
+private:
+  uchar *cache_start;
+  uchar *cache_pos;
+  uchar *cache_end;
+  uint ref_length;
 };
 
 
@@ -627,11 +595,6 @@ class Rowid_seq_cursor
 
 class Table_read_cursor : public Rowid_seq_cursor
 {
-  /* 
-    Note: we don't own *read_record, somebody else is using it.
-    We only look at the constant part of it, e.g. table, record buffer, etc.
-  */
-  READ_RECORD *read_record;
 public:
   virtual ~Table_read_cursor() {}
 
@@ -668,7 +631,14 @@ class Table_read_cursor : public Rowid_seq_cursor
     return false; // didn't restore
   }
 
-  // todo: should move_to() also read row here? 
+private:
+  /*
+    Note: we don't own *read_record, somebody else is using it.
+    We only look at the constant part of it, e.g. table, record buffer, etc.
+  */
+  READ_RECORD *read_record;
+
+  // TODO(spetrunia): should move_to() also read row here?
 };
 
 
@@ -679,14 +649,14 @@ class Table_read_cursor : public Rowid_seq_cursor
 
 class Partition_read_cursor
 {
-  Table_read_cursor tbl_cursor;
-  Group_bound_tracker bound_tracker;
-  bool end_of_partition;
 public:
-  void init(THD *thd, READ_RECORD *info, SQL_I_List<ORDER> *partition_list)
+  Partition_read_cursor(THD *thd, SQL_I_List<ORDER> *partition_list) :
+    bound_tracker(thd, partition_list) {}
+
+  void init(READ_RECORD *info)
   {
     tbl_cursor.init(info);
-    bound_tracker.init(thd, partition_list);
+    bound_tracker.init();
     end_of_partition= false;
   }
 
@@ -704,7 +674,7 @@ class Partition_read_cursor
   }
 
   /*
-    Moves to a new row. The row is assumed to be within the current partition
+    Moves to a new row. The row is assumed to be within the current partition.
   */
   void move_to(ha_rows rownum) { tbl_cursor.move_to(rownum); }
 
@@ -731,14 +701,18 @@ class Partition_read_cursor
   {
     return tbl_cursor.restore_last_row();
   }
+
+private:
+  Table_read_cursor tbl_cursor;
+  Group_bound_tracker bound_tracker;
+  bool end_of_partition;
 };
 
 /////////////////////////////////////////////////////////////////////////////
 
-
 /*
   Window frame bound cursor. Abstract interface.
-  
+
   @detail
     The cursor moves within the partition that the current row is in.
     It may be ahead or behind the current row.
@@ -775,11 +749,12 @@ class Partition_read_cursor
 class Frame_cursor : public Sql_alloc
 {
 public:
-  virtual void init(THD *thd, READ_RECORD *info, 
-                    SQL_I_List<ORDER> *partition_list,
-                    SQL_I_List<ORDER> *order_list)
-  {}
+  virtual void init(READ_RECORD *info) {};
 
+  bool add_sum_func(Item_sum* item)
+  {
+    return sum_functions.push_back(item);
+  }
   /*
     Current row has moved to the next partition and is positioned on the first
     row there. Position the frame bound accordingly.
@@ -796,20 +771,95 @@ class Frame_cursor : public Sql_alloc
       - The callee may move tbl->file and tbl->record[0] to point to some other
         row.
   */
-  virtual void pre_next_partition(ha_rows rownum, Item_sum* item){};
-  virtual void next_partition(ha_rows rownum, Item_sum* item)=0;
-  
+  virtual void pre_next_partition(ha_rows rownum) {};
+  virtual void next_partition(ha_rows rownum)=0;
+
   /*
     The current row has moved one row forward.
     Move this frame bound accordingly, and update the value of aggregate
     function as necessary.
   */
-  virtual void pre_next_row(Item_sum* item){};
-  virtual void next_row(Item_sum* item)=0;
-  
-  virtual ~Frame_cursor(){}
+  virtual void pre_next_row() {};
+  virtual void next_row()=0;
+
+  virtual ~Frame_cursor() {}
+
+protected:
+  inline void add_value_to_items()
+  {
+    List_iterator_fast<Item_sum> it(sum_functions);
+    Item_sum *item_sum;
+    while ((item_sum= it++))
+    {
+      item_sum->add();
+    }
+  }
+  inline void remove_value_from_items()
+  {
+    List_iterator_fast<Item_sum> it(sum_functions);
+    Item_sum *item_sum;
+    while ((item_sum= it++))
+    {
+      item_sum->remove();
+    }
+  }
+
+  /* Sum functions that this cursor handles. */
+  List<Item_sum> sum_functions;
+};
+
+/*
+  A class that owns cursor objects associated with a specific window function.
+*/
+class Cursor_manager
+{
+public:
+  bool add_cursor(Frame_cursor *cursor)
+  {
+    return cursors.push_back(cursor);
+  }
+
+  void initialize_cursors(READ_RECORD *info)
+  {
+    List_iterator_fast<Frame_cursor> iter(cursors);
+    Frame_cursor *fc;
+    while ((fc= iter++))
+      fc->init(info);
+  }
+
+  void notify_cursors_partition_changed(ha_rows rownum)
+  {
+    List_iterator_fast<Frame_cursor> iter(cursors);
+    Frame_cursor *cursor;
+    while ((cursor= iter++))
+      cursor->pre_next_partition(rownum);
+
+    iter.rewind();
+    while ((cursor= iter++))
+      cursor->next_partition(rownum);
+  }
+
+  void notify_cursors_next_row()
+  {
+    List_iterator_fast<Frame_cursor> iter(cursors);
+    Frame_cursor *cursor;
+    while ((cursor= iter++))
+      cursor->pre_next_row();
+
+    iter.rewind();
+    while ((cursor= iter++))
+      cursor->next_row();
+  }
+
+  ~Cursor_manager() { cursors.delete_elements(); }
+
+private:
+  /* List of the cursors that this manager owns. */
+  List<Frame_cursor> cursors;
 };
 
+
+
 //////////////////////////////////////////////////////////////////////////////
 // RANGE-type frames
 //////////////////////////////////////////////////////////////////////////////
@@ -841,16 +891,12 @@ class Frame_range_n_top : public Frame_cursor
   */
   int order_direction;
 public:
-  Frame_range_n_top(bool is_preceding_arg, Item *n_val_arg) :
+  Frame_range_n_top(THD *thd,
+                    SQL_I_List<ORDER> *partition_list,
+                    SQL_I_List<ORDER> *order_list,
+                    bool is_preceding_arg, Item *n_val_arg) :
     n_val(n_val_arg), item_add(NULL), is_preceding(is_preceding_arg)
-  {}
-
-  void init(THD *thd, READ_RECORD *info,
-            SQL_I_List<ORDER> *partition_list,
-            SQL_I_List<ORDER> *order_list)
   {
-    cursor.init(info);
-
     DBUG_ASSERT(order_list->elements == 1);
     Item *src_expr= order_list->first->item[0];
     if (order_list->first->direction == ORDER::ORDER_ASC)
@@ -872,24 +918,30 @@ class Frame_range_n_top : public Frame_cursor
     item_add->fix_fields(thd, &item_add);
   }
 
-  void pre_next_partition(ha_rows rownum, Item_sum* item)
+  void init(READ_RECORD *info)
+  {
+    cursor.init(info);
+
+  }
+
+  void pre_next_partition(ha_rows rownum)
   {
     // Save the value of FUNC(current_row)
     range_expr->fetch_value_from(item_add);
   }
 
-  void next_partition(ha_rows rownum, Item_sum* item)
+  void next_partition(ha_rows rownum)
   {
     cursor.move_to(rownum);
-    walk_till_non_peer(item);
+    walk_till_non_peer();
   }
 
-  void pre_next_row(Item_sum* item)
+  void pre_next_row()
   {
     range_expr->fetch_value_from(item_add);
   }
 
-  void next_row(Item_sum* item)
+  void next_row()
   {
     /*
       Ok, our cursor is at the first row R where
@@ -900,19 +952,19 @@ class Frame_range_n_top : public Frame_cursor
     {
       if (order_direction * range_expr->cmp_read_only() <= 0)
         return;
-      item->remove();
+      remove_value_from_items();
     }
-    walk_till_non_peer(item);
+    walk_till_non_peer();
   }
 
 private:
-  void walk_till_non_peer(Item_sum* item)
+  void walk_till_non_peer()
   {
     while (!cursor.get_next())
     {
       if (order_direction * range_expr->cmp_read_only() <= 0)
         break;
-      item->remove();
+      remove_value_from_items();
     }
   }
 };
@@ -950,16 +1002,13 @@ class Frame_range_n_bottom: public Frame_cursor
   */
   int order_direction;
 public:
-  Frame_range_n_bottom(bool is_preceding_arg, Item *n_val_arg) :
-    n_val(n_val_arg), item_add(NULL), is_preceding(is_preceding_arg)
-  {}
-
-  void init(THD *thd, READ_RECORD *info,
-            SQL_I_List<ORDER> *partition_list,
-            SQL_I_List<ORDER> *order_list)
+  Frame_range_n_bottom(THD *thd,
+                       SQL_I_List<ORDER> *partition_list,
+                       SQL_I_List<ORDER> *order_list,
+                       bool is_preceding_arg, Item *n_val_arg) :
+    cursor(thd, partition_list), n_val(n_val_arg), item_add(NULL),
+    is_preceding(is_preceding_arg)
   {
-    cursor.init(thd, info, partition_list);
-
     DBUG_ASSERT(order_list->elements == 1);
     Item *src_expr= order_list->first->item[0];
 
@@ -982,7 +1031,12 @@ class Frame_range_n_bottom: public Frame_cursor
     item_add->fix_fields(thd, &item_add);
   }
 
-  void pre_next_partition(ha_rows rownum, Item_sum* item)
+  void init(READ_RECORD *info)
+  {
+    cursor.init(info);
+  }
+
+  void pre_next_partition(ha_rows rownum)
   {
     // Save the value of FUNC(current_row)
     range_expr->fetch_value_from(item_add);
@@ -991,20 +1045,20 @@ class Frame_range_n_bottom: public Frame_cursor
     end_of_partition= false;
   }
 
-  void next_partition(ha_rows rownum, Item_sum* item)
+  void next_partition(ha_rows rownum)
   {
     cursor.move_to(rownum);
-    walk_till_non_peer(item);
+    walk_till_non_peer();
   }
 
-  void pre_next_row(Item_sum* item)
+  void pre_next_row()
   {
     if (end_of_partition)
       return;
     range_expr->fetch_value_from(item_add);
   }
 
-  void next_row(Item_sum* item)
+  void next_row()
   {
     if (end_of_partition)
       return;
@@ -1017,20 +1071,20 @@ class Frame_range_n_bottom: public Frame_cursor
     {
       if (order_direction * range_expr->cmp_read_only() < 0)
         return;
-      item->add();
+      add_value_to_items();
     }
-    walk_till_non_peer(item);
+    walk_till_non_peer();
   }
 
 private:
-  void walk_till_non_peer(Item_sum* item)
+  void walk_till_non_peer()
   {
     int res;
     while (!(res= cursor.get_next()))
     {
       if (order_direction * range_expr->cmp_read_only() < 0)
         break;
-      item->add();
+      add_value_to_items();
     }
     if (res)
       end_of_partition= true;
@@ -1043,11 +1097,11 @@ class Frame_range_n_bottom: public Frame_cursor
      ...
    | peer1
    | peer2  <----- current_row
-   | peer3 
+   | peer3
    +-peer4  <----- the cursor points here. peer4 itself is included.
      nonpeer1
      nonpeer2
-  
+
   This bound moves in front of the current_row. It should be a the first row
   that is still a peer of the current row.
 */
@@ -1060,15 +1114,20 @@ class Frame_range_current_row_bottom: public Frame_cursor
 
   bool dont_move;
 public:
-  void init(THD *thd, READ_RECORD *info,
-            SQL_I_List<ORDER> *partition_list,
-            SQL_I_List<ORDER> *order_list)
+  Frame_range_current_row_bottom(THD *thd,
+                                 SQL_I_List<ORDER> *partition_list,
+                                 SQL_I_List<ORDER> *order_list) :
+    cursor(thd, partition_list), peer_tracker(thd, order_list)
+  {
+  }
+
+  void init(READ_RECORD *info)
   {
-    cursor.init(thd, info, partition_list);
-    peer_tracker.init(thd, order_list);
+    cursor.init(info);
+    peer_tracker.init();
   }
 
-  void pre_next_partition(ha_rows rownum, Item_sum* item)
+  void pre_next_partition(ha_rows rownum)
   {
     // Save the value of the current_row
     peer_tracker.check_if_next_group();
@@ -1076,23 +1135,23 @@ class Frame_range_current_row_bottom: public Frame_cursor
     if (rownum != 0)
     {
       // Add the current row now because our cursor has already seen it
-      item->add();
+      add_value_to_items();
     }
   }
 
-  void next_partition(ha_rows rownum, Item_sum* item)
+  void next_partition(ha_rows rownum)
   {
-    walk_till_non_peer(item);
+    walk_till_non_peer();
   }
 
-  void pre_next_row(Item_sum* item)
+  void pre_next_row()
   {
     dont_move= !peer_tracker.check_if_next_group();
     if (!dont_move)
-      item->add();
+      add_value_to_items();
   }
 
-  void next_row(Item_sum* item)
+  void next_row()
   {
     // Check if our cursor is pointing at a peer of the current row.
     // If not, move forward until that becomes true
@@ -1104,11 +1163,11 @@ class Frame_range_current_row_bottom: public Frame_cursor
       */
       return;
     }
-    walk_till_non_peer(item);
+    walk_till_non_peer();
   }
 
 private:
-  void walk_till_non_peer(Item_sum* item)
+  void walk_till_non_peer()
   {
     /*
       Walk forward until we've met first row that's not a peer of the current
@@ -1118,7 +1177,7 @@ class Frame_range_current_row_bottom: public Frame_cursor
     {
       if (peer_tracker.compare_with_cache())
         break;
-      item->add();
+      add_value_to_items();
     }
   }
 };
@@ -1148,33 +1207,38 @@ class Frame_range_current_row_top : public Frame_cursor
 
   bool move;
 public:
-  void init(THD *thd, READ_RECORD *info,
-            SQL_I_List<ORDER> *partition_list,
-            SQL_I_List<ORDER> *order_list)
+  Frame_range_current_row_top(THD *thd,
+                              SQL_I_List<ORDER> *partition_list,
+                              SQL_I_List<ORDER> *order_list) :
+    bound_tracker(thd, partition_list), cursor(), peer_tracker(thd, order_list),
+    move(false)
+  {}
+
+  void init(READ_RECORD *info)
   {
-    bound_tracker.init(thd, partition_list);
+    bound_tracker.init();
 
     cursor.init(info);
-    peer_tracker.init(thd, order_list);
+    peer_tracker.init();
   }
 
-  void pre_next_partition(ha_rows rownum, Item_sum* item)
+  void pre_next_partition(ha_rows rownum)
   {
     // Fetch the value from the first row
     peer_tracker.check_if_next_group();
     cursor.move_to(rownum+1);
   }
 
-  void next_partition(ha_rows rownum, Item_sum* item) {}
+  void next_partition(ha_rows rownum) {}
 
-  void pre_next_row(Item_sum* item)
+  void pre_next_row()
   {
     // Check if the new current_row is a peer of the row that our cursor is
     // pointing to.
     move= peer_tracker.check_if_next_group();
   }
 
-  void next_row(Item_sum* item)
+  void next_row()
   {
     if (move)
     {
@@ -1187,7 +1251,7 @@ class Frame_range_current_row_top : public Frame_cursor
         // todo: need the following check ?
         if (!peer_tracker.compare_with_cache())
           return;
-        item->remove();
+        remove_value_from_items();
       }
 
       do
@@ -1196,7 +1260,7 @@ class Frame_range_current_row_top : public Frame_cursor
           return;
         if (!peer_tracker.compare_with_cache())
           return;
-        item->remove();
+        remove_value_from_items();
       }
       while (1);
     }
@@ -1214,7 +1278,14 @@ class Frame_range_current_row_top : public Frame_cursor
 class Frame_unbounded_preceding : public Frame_cursor
 {
 public:
-  void next_partition(ha_rows rownum, Item_sum* item)
+  Frame_unbounded_preceding(THD *thd,
+                            SQL_I_List<ORDER> *partition_list,
+                            SQL_I_List<ORDER> *order_list)
+  {}
+
+  void init(READ_RECORD *info) {}
+
+  void next_partition(ha_rows rownum)
   {
     /*
       UNBOUNDED PRECEDING frame end just stays on the first row.
@@ -1222,7 +1293,7 @@ class Frame_unbounded_preceding : public Frame_cursor
     */
   }
 
-  void next_row(Item_sum* item)
+  void next_row()
   {
     /* Do nothing, UNBOUNDED PRECEDING frame end doesn't move. */
   }
@@ -1239,18 +1310,22 @@ class Frame_unbounded_following : public Frame_cursor
   Partition_read_cursor cursor;
 
 public:
-  void init(THD *thd, READ_RECORD *info, SQL_I_List<ORDER> *partition_list,
-            SQL_I_List<ORDER> *order_list)
+  Frame_unbounded_following(THD *thd,
+      SQL_I_List<ORDER> *partition_list,
+      SQL_I_List<ORDER> *order_list) :
+    cursor(thd, partition_list) {}
+
+  void init(READ_RECORD *info)
   {
-    cursor.init(thd, info, partition_list);
+    cursor.init(info);
   }
 
-  void pre_next_partition(ha_rows rownum, Item_sum* item)
+  void pre_next_partition(ha_rows rownum)
   {
     cursor.on_next_partition(rownum);
   }
 
-  void next_partition(ha_rows rownum, Item_sum* item)
+  void next_partition(ha_rows rownum)
   {
     if (!rownum)
     {
@@ -1258,16 +1333,16 @@ class Frame_unbounded_following : public Frame_cursor
       if (cursor.get_next())
         return;
     }
-    item->add();
+    add_value_to_items();
 
     /* Walk to the end of the partition, updating the SUM function */
     while (!cursor.get_next())
     {
-      item->add();
+      add_value_to_items();
     }
   }
 
-  void next_row(Item_sum* item)
+  void next_row()
   {
     /* Do nothing, UNBOUNDED FOLLOWING frame end doesn't move */
   }
@@ -1277,9 +1352,12 @@ class Frame_unbounded_following : public Frame_cursor
 class Frame_unbounded_following_set_count : public Frame_unbounded_following
 {
 public:
-  // pre_next_partition is inherited
+  Frame_unbounded_following_set_count(
+      THD *thd,
+      SQL_I_List<ORDER> *partition_list, SQL_I_List<ORDER> *order_list) :
+    Frame_unbounded_following(thd, partition_list, order_list) {}
 
-  void next_partition(ha_rows rownum, Item_sum* item)
+  void next_partition(ha_rows rownum)
   {
     ha_rows num_rows_in_partition= 0;
     if (!rownum)
@@ -1292,13 +1370,16 @@ class Frame_unbounded_following_set_count : public Frame_unbounded_following
 
     /* Walk to the end of the partition, find how many rows there are. */
     while (!cursor.get_next())
-    {
       num_rows_in_partition++;
-    }
 
-    Item_sum_window_with_row_count* item_with_row_count =
-      static_cast<Item_sum_window_with_row_count *>(item);
-    item_with_row_count->set_row_count(num_rows_in_partition);
+    List_iterator_fast<Item_sum> it(sum_functions);
+    Item_sum* item;
+    while ((item= it++))
+    {
+      Item_sum_window_with_row_count* item_with_row_count =
+        static_cast<Item_sum_window_with_row_count *>(item);
+      item_with_row_count->set_row_count(num_rows_in_partition);
+    }
   }
 };
 
@@ -1324,13 +1405,12 @@ class Frame_n_rows_preceding : public Frame_cursor
     is_top_bound(is_top_bound_arg), n_rows(n_rows_arg)
   {}
 
-  void init(THD *thd, READ_RECORD *info, SQL_I_List<ORDER> *partition_list,
-            SQL_I_List<ORDER> *order_list)
+  void init(READ_RECORD *info)
   {
     cursor.init(info);
   }
 
-  void next_partition(ha_rows rownum, Item_sum* item)
+  void next_partition(ha_rows rownum)
   {
     /*
       Position our cursor to point at the first row in the new partition
@@ -1357,12 +1437,12 @@ class Frame_n_rows_preceding : public Frame_cursor
     if (n_rows_to_skip == ha_rows(-1))
     {
       cursor.get_next();
-      item->add();
+      add_value_to_items();
       n_rows_to_skip= 0;
     }
   }
 
-  void next_row(Item_sum* item)
+  void next_row()
   {
     if (n_rows_to_skip)
     {
@@ -1374,9 +1454,9 @@ class Frame_n_rows_preceding : public Frame_cursor
       return;  // this is not expected to happen.
 
     if (is_top_bound) // this is frame start endpoint
-      item->remove();
+      remove_value_from_items();
     else
-      item->add();
+      add_value_to_items();
   }
 };
 
@@ -1391,17 +1471,18 @@ class Frame_n_rows_preceding : public Frame_cursor
 class Frame_rows_current_row_bottom : public Frame_cursor
 {
 public:
-  void pre_next_partition(ha_rows rownum, Item_sum* item)
+
+  void pre_next_partition(ha_rows rownum)
   {
-    item->add();
+    add_value_to_items();
   }
-  void next_partition(ha_rows rownum, Item_sum* item) {}
-  void pre_next_row(Item_sum* item)
+  void next_partition(ha_rows rownum) {}
+  void pre_next_row()
   {
     /* Temp table's current row is current_row. Add it to the window func */
-    item->add();
+    add_value_to_items();
   }
-  void next_row(Item_sum* item) {};
+  void next_row() {};
 };
 
 
@@ -1443,20 +1524,23 @@ class Frame_n_rows_following : public Frame_cursor
   Partition_read_cursor cursor;
   bool at_partition_end;
 public:
-  Frame_n_rows_following(bool is_top_bound_arg, ha_rows n_rows_arg) :
-    is_top_bound(is_top_bound_arg), n_rows(n_rows_arg)
+  Frame_n_rows_following(THD *thd,
+            SQL_I_List<ORDER> *partition_list,
+            SQL_I_List<ORDER> *order_list,
+            bool is_top_bound_arg, ha_rows n_rows_arg) :
+    is_top_bound(is_top_bound_arg), n_rows(n_rows_arg),
+    cursor(thd, partition_list)
   {
     DBUG_ASSERT(n_rows > 0);
   }
 
-  void init(THD *thd, READ_RECORD *info, SQL_I_List<ORDER> *partition_list,
-            SQL_I_List<ORDER> *order_list)
+  void init(READ_RECORD *info)
   {
-    cursor.init(thd, info, partition_list);
+    cursor.init(info);
     at_partition_end= false;
   }
 
-  void pre_next_partition(ha_rows rownum, Item_sum* item)
+  void pre_next_partition(ha_rows rownum)
   {
     at_partition_end= false;
 
@@ -1469,39 +1553,39 @@ class Frame_n_rows_following : public Frame_cursor
 
       // Current row points at the first row in the partition
       if (is_top_bound) // this is frame top endpoint
-        item->remove();
+        remove_value_from_items();
       else
-        item->add();
+        add_value_to_items();
     }
   }
 
   /* Move our cursor to be n_rows ahead.  */
-  void next_partition(ha_rows rownum, Item_sum* item)
+  void next_partition(ha_rows rownum)
   {
     ha_rows i_end= n_rows + ((rownum==0)?1:0)- is_top_bound;
     for (ha_rows i= 0; i < i_end; i++)
     {
-      if (next_row_intern(item))
+      if (next_row_intern())
         break;
     }
   }
 
-  void next_row(Item_sum* item)
+  void next_row()
   {
     if (at_partition_end)
       return;
-    next_row_intern(item);
+    next_row_intern();
   }
 
 private:
-  bool next_row_intern(Item_sum *item)
+  bool next_row_intern()
   {
     if (!cursor.get_next())
     {
       if (is_top_bound) // this is frame start endpoint
-        item->remove();
+        remove_value_from_items();
       else
-        item->add();
+        add_value_to_items();
     }
     else
       at_partition_end= true;
@@ -1513,8 +1597,9 @@ class Frame_n_rows_following : public Frame_cursor
 /*
   Get a Frame_cursor for a frame bound. This is a "factory function".
 */
-Frame_cursor *get_frame_cursor(Window_frame *frame, bool is_top_bound)
+Frame_cursor *get_frame_cursor(THD *thd, Window_spec *spec, bool is_top_bound)
 {
+  Window_frame *frame= spec->window_frame;
   if (!frame)
   {
     /*
@@ -1536,9 +1621,13 @@ Frame_cursor *get_frame_cursor(Window_frame *frame, bool is_top_bound)
         so again the same frame bounds can be used.
     */
     if (is_top_bound)
-      return new Frame_unbounded_preceding;
+      return new Frame_unbounded_preceding(thd,
+                                           spec->partition_list,
+                                           spec->order_list);
     else
-      return new Frame_range_current_row_bottom;
+      return new Frame_range_current_row_bottom(thd,
+                                                spec->partition_list,
+                                                spec->order_list);
   }
 
   Window_frame_bound *bound= is_top_bound? frame->top_bound :
@@ -1554,9 +1643,13 @@ Frame_cursor *get_frame_cursor(Window_frame *frame, bool is_top_bound)
     {
       /* The following serve both RANGE and ROWS: */
       if (is_preceding)
-        return new Frame_unbounded_preceding;
-      else
-        return new Frame_unbounded_following;
+        return new Frame_unbounded_preceding(thd,
+                                             spec->partition_list,
+                                             spec->order_list);
+
+      return new Frame_unbounded_following(thd,
+                                           spec->partition_list,
+                                           spec->order_list);
     }
 
     if (frame->units == Window_frame::UNITS_ROWS)
@@ -1567,15 +1660,21 @@ Frame_cursor *get_frame_cursor(Window_frame *frame, bool is_top_bound)
       DBUG_ASSERT((longlong) n_rows >= 0);
       if (is_preceding)
         return new Frame_n_rows_preceding(is_top_bound, n_rows);
-      else
-        return new Frame_n_rows_following(is_top_bound, n_rows);
+
+      return new Frame_n_rows_following(
+          thd, spec->partition_list, spec->order_list,
+          is_top_bound, n_rows);
     }
     else
     {
       if (is_top_bound)
-        return new Frame_range_n_top(is_preceding, bound->offset);
-      else
-        return new Frame_range_n_bottom(is_preceding, bound->offset);
+        return new Frame_range_n_top(
+            thd, spec->partition_list, spec->order_list,
+            is_preceding, bound->offset);
+
+      return new Frame_range_n_bottom(thd,
+          spec->partition_list, spec->order_list,
+          is_preceding, bound->offset);
     }
   }
 
@@ -1585,67 +1684,154 @@ Frame_cursor *get_frame_cursor(Window_frame *frame, bool is_top_bound)
     {
       if (is_top_bound)
         return new Frame_rows_current_row_top;
-      else
-        return new Frame_rows_current_row_bottom;
+
+      return new Frame_rows_current_row_bottom;
     }
     else
     {
       if (is_top_bound)
-        return new Frame_range_current_row_top;
-      else
-        return new Frame_range_current_row_bottom;
+        return new Frame_range_current_row_top(
+            thd, spec->partition_list, spec->order_list);
+
+      return new Frame_range_current_row_bottom(
+          thd, spec->partition_list, spec->order_list);
     }
   }
   return NULL;
 }
 
-void add_extra_frame_cursors(List<Frame_cursor> *cursors,
-                             const Item_sum *window_func)
+void add_extra_frame_cursors(THD *thd, Cursor_manager *cursor_manager,
+                             Item_window_func *window_func)
 {
-  switch (window_func->sum_func())
+  Window_spec *spec= window_func->window_spec;
+  Item_sum *item_sum= window_func->window_func();
+  Frame_cursor *fc;
+  switch (item_sum->sum_func())
   {
     case Item_sum::CUME_DIST_FUNC:
-      cursors->push_back(new Frame_unbounded_preceding);
-      cursors->push_back(new Frame_range_current_row_bottom);
+      fc= new Frame_unbounded_preceding(thd,
+                                        spec->partition_list,
+                                        spec->order_list);
+      fc->add_sum_func(item_sum);
+      cursor_manager->add_cursor(fc);
+      fc= new Frame_range_current_row_bottom(thd,
+                                             spec->partition_list,
+                                             spec->order_list);
+      fc->add_sum_func(item_sum);
+      cursor_manager->add_cursor(fc);
       break;
     default:
-      cursors->push_back(new Frame_unbounded_preceding);
-      cursors->push_back(new Frame_rows_current_row_bottom);
+      fc= new Frame_unbounded_preceding(
+              thd, spec->partition_list, spec->order_list);
+      fc->add_sum_func(item_sum);
+      cursor_manager->add_cursor(fc);
+
+      fc= new Frame_rows_current_row_bottom;
+      fc->add_sum_func(item_sum);
+      cursor_manager->add_cursor(fc);
   }
 }
 
-void get_window_func_required_cursors(
-    List<Frame_cursor> *result, const Item_window_func* item_win)
+
+/*
+   Create required frame cursors for the list of window functions.
+   Register all functions to their appropriate cursors.
+   If the window functions share the same frame specification,
+   those window functions will be registered to the same cursor.
+*/
+void get_window_functions_required_cursors(
+    THD *thd,
+    List<Item_window_func>& window_functions,
+    List<Cursor_manager> *cursor_managers)
 {
-  if (item_win->requires_partition_size())
-    result->push_back(new Frame_unbounded_following_set_count);
+  List_iterator_fast<Item_window_func> it(window_functions);
+  Item_window_func* item_win_func;
+  Item_sum *sum_func;
+  while ((item_win_func= it++))
+  {
+    Cursor_manager *cursor_manager = new Cursor_manager();
+    sum_func = item_win_func->window_func();
+    Frame_cursor *fc;
+    /*
+      Some window functions require the partition size for computing values.
+      Add a cursor that retrieves it as the first one in the list if necessary.
+    */
+    if (item_win_func->requires_partition_size())
+    {
+      fc= new Frame_unbounded_following_set_count(thd,
+                item_win_func->window_spec->partition_list,
+                item_win_func->window_spec->order_list);
+      fc->add_sum_func(sum_func);
+      cursor_manager->add_cursor(fc);
+    }
 
-  /*
-    If it is not a regular window function that follows frame specifications,
-    specific cursors are required.
-  */
-  if (item_win->is_frame_prohibited())
+    /*
+      If it is not a regular window function that follows frame specifications,
+      specific cursors are required. ROW_NUM, RANK, NTILE and others follow
+      such rules. Check is_frame_prohibited check for the full list.
+    */
+    if (item_win_func->is_frame_prohibited())
+    {
+      add_extra_frame_cursors(thd, cursor_manager, item_win_func);
+      cursor_managers->push_back(cursor_manager);
+      continue;
+    }
+
+    Frame_cursor *frame_bottom= get_frame_cursor(thd,
+        item_win_func->window_spec, false);
+    Frame_cursor *frame_top= get_frame_cursor(thd,
+        item_win_func->window_spec, true);
+
+    frame_bottom->add_sum_func(sum_func);
+    frame_top->add_sum_func(sum_func);
+
+    /*
+       The order of these cursors is important. A sum function
+       must first add values (via frame_bottom) then remove them via
+       frame_top. Removing items first doesn't make sense in the case of all
+       window functions.
+    */
+    cursor_manager->add_cursor(frame_bottom);
+    cursor_manager->add_cursor(frame_top);
+    cursor_managers->push_back(cursor_manager);
+  }
+}
+
+/**
+  Helper function that takes a list of window functions and writes
+  their values in the current table record.
+*/
+static
+bool save_window_function_values(List<Item_window_func>& window_functions,
+                                 TABLE *tbl, uchar *rowid_buf)
+{
+  List_iterator_fast<Item_window_func> iter(window_functions);
+  tbl->file->ha_rnd_pos(tbl->record[0], rowid_buf);
+  store_record(tbl, record[1]);
+  while (Item_window_func *item_win= iter++)
   {
-    add_extra_frame_cursors(result, item_win->window_func());
-    return;
+    int err;
+    item_win->save_in_field(item_win->result_field, true);
+    // TODO check if this can be placed outside the loop.
+    err= tbl->file->ha_update_row(tbl->record[1], tbl->record[0]);
+    if (err && err != HA_ERR_RECORD_IS_THE_SAME)
+      return true;
   }
 
-  /* A regular window function follows the frame specification. */
-  result->push_back(get_frame_cursor(item_win->window_spec->window_frame,
-                                     false));
-  result->push_back(get_frame_cursor(item_win->window_spec->window_frame,
-                                     true));
+  return false;
 }
 
 /*
+  TODO(cvicentiu) update this comment to reflect the new execution.
+
   Streamed window function computation with window frames.
 
   We make a single pass over the ordered temp.table, but we're using three
-  cursors: 
+  cursors:
    - current row - the row that we're computing window func value for)
    - start_bound - the start of the frame
    - bottom_bound   - the end of the frame
-   
+
   All three cursors move together.
 
   @todo
@@ -1655,7 +1841,7 @@ void get_window_func_required_cursors(
   @detail
     ROWS BETWEEN 3 PRECEDING  -- frame start
               AND 3 FOLLOWING  -- frame end
-     
+
                                     /------ frame end (aka BOTTOM)
     Dataset start                   |
      --------====*=======[*]========*========-------->> dataset end
@@ -1663,7 +1849,7 @@ void get_window_func_required_cursors(
                  |         +-------- current row
                  |
                  \-------- frame start ("TOP")
-  
+
     - frame_end moves forward and adds rows into the aggregate function.
     - frame_start follows behind and removes rows from the aggregate function.
     - current_row is the row where the value of aggregate function is stored.
@@ -1672,97 +1858,90 @@ void get_window_func_required_cursors(
   condition (Others can catch up by counting rows?)
 
 */
-
-bool compute_window_func_with_frames(Item_window_func *item_win,
-                                     TABLE *tbl, READ_RECORD *info)
+bool compute_window_func(THD *thd,
+                         List<Item_window_func>& window_functions,
+                         List<Cursor_manager>& cursor_managers,
+                         TABLE *tbl,
+                         SORT_INFO *filesort_result)
 {
-  THD *thd= tbl->in_use;
-  int err= 0;
+  List_iterator_fast<Item_window_func> iter_win_funcs(window_functions);
+  List_iterator_fast<Cursor_manager> iter_cursor_managers(cursor_managers);
+  uint err;
 
-  Item_sum *sum_func= item_win->window_func();
-  /* This algorithm doesn't support DISTINCT aggregator */
-  sum_func->set_aggregator(Aggregator::SIMPLE_AGGREGATOR);
+  READ_RECORD info;
 
-  List<Frame_cursor> cursors;
-  get_window_func_required_cursors(&cursors, item_win);
+  if (init_read_record(&info, current_thd, tbl, NULL/*select*/, filesort_result,
+                       0, 1, FALSE))
+    return true;
 
-  List_iterator_fast<Frame_cursor> it(cursors);
-  Frame_cursor *c;
-  while((c= it++))
+  Cursor_manager *cursor_manager;
+  while ((cursor_manager= iter_cursor_managers++))
+    cursor_manager->initialize_cursors(&info);
+
+  /* One partition tracker for each window function. */
+  List<Group_bound_tracker> partition_trackers;
+  Item_window_func *win_func;
+  while ((win_func= iter_win_funcs++))
   {
-    c->init(thd, info, item_win->window_spec->partition_list,
-            item_win->window_spec->order_list);
+    Group_bound_tracker *tracker= new Group_bound_tracker(thd,
+                                        win_func->window_spec->partition_list);
+    // TODO(cvicentiu) This should be removed and placed in constructor.
+    tracker->init();
+    partition_trackers.push_back(tracker);
   }
 
-  bool is_error= false;
+  List_iterator_fast<Group_bound_tracker> iter_part_trackers(partition_trackers);
   ha_rows rownum= 0;
   uchar *rowid_buf= (uchar*) my_malloc(tbl->file->ref_length, MYF(0));
 
   while (true)
   {
-    /* Move the current_row */
-    if ((err=info->read_record(info)))
-    {
-      break; /* End of file */
-    }
-    bool partition_changed= item_win->check_if_partition_changed();
+    if ((err= info.read_record(&info)))
+      break; // End of file.
 
+    /* Remember current row so that we can restore it before computing
+       each window function. */
     tbl->file->position(tbl->record[0]);
     memcpy(rowid_buf, tbl->file->ref, tbl->file->ref_length);
 
-    if (partition_changed || (rownum == 0))
-    {
-      sum_func->clear();
-      /*
-        pre_XXX functions assume that tbl->record[0] contains current_row, and 
-        they may not change it.
-      */
-      it.rewind();
-      while ((c= it++))
-        c->pre_next_partition(rownum, sum_func);
-      /*
-        We move bottom_bound first, because we want rows to be added into the
-        aggregate before top_bound attempts to remove them.
-      */
-      it.rewind();
-      while ((c= it++))
-        c->next_partition(rownum, sum_func);
-    }
-    else
-    {
-      /* Again, both pre_XXX function can find current_row in tbl->record[0] */
-      it.rewind();
-      while ((c= it++))
-        c->pre_next_row(sum_func);
-
-      /* These make no assumptions about tbl->record[0] and may change it */
-      it.rewind();
-      while ((c= it++))
-        c->next_row(sum_func);
-    }
-    rownum++;
+    iter_win_funcs.rewind();
+    iter_part_trackers.rewind();
+    iter_cursor_managers.rewind();
 
-    /*
-      Frame cursors may have made tbl->record[0] to point to some record other
-      than current_row. This applies to tbl->file's internal state, too.
-      Fix this by reading the current row again.
-    */
-    tbl->file->ha_rnd_pos(tbl->record[0], rowid_buf);
-    store_record(tbl,record[1]);
-    item_win->save_in_field(item_win->result_field, true);
-    err= tbl->file->ha_update_row(tbl->record[1], tbl->record[0]);
-    if (err && err != HA_ERR_RECORD_IS_THE_SAME)
+    Group_bound_tracker *tracker;
+    while ((win_func= iter_win_funcs++) &&
+           (tracker= iter_part_trackers++) &&
+           (cursor_manager= iter_cursor_managers++))
     {
-      is_error= true;
-      break;
+      if (tracker->check_if_next_group() || (rownum == 0))
+      {
+        /* TODO(cvicentiu)
+           Clearing window functions should happen through cursors. */
+        win_func->window_func()->clear();
+        cursor_manager->notify_cursors_partition_changed(rownum);
+      }
+      else
+      {
+        cursor_manager->notify_cursors_next_row();
+      }
+      /* Return to current row after notifying cursors for each window
+         function. */
+      tbl->file->ha_rnd_pos(tbl->record[0], rowid_buf);
     }
+
+    /* We now have computed values for each window function. They can now
+       be saved in the current row. */
+    save_window_function_values(window_functions, tbl, rowid_buf);
+
+    rownum++;
   }
 
   my_free(rowid_buf);
-  cursors.delete_elements();
-  return is_error? true: false;
-}
+  partition_trackers.delete_elements();
+  end_read_record(&info);
 
+  return false;
+}
 
 /* Make a list that is a concation of two lists of ORDER elements */
 
@@ -1799,25 +1978,18 @@ static ORDER* concat_order_lists(MEM_ROOT *mem_root, ORDER *list1, ORDER *list2)
   return res;
 }
 
-
-bool Window_func_runner::setup(THD *thd)
+bool Window_func_runner::add_function_to_run(Item_window_func *win_func)
 {
-  win_func->setup_partition_border_check(thd);
+
+  Item_sum *sum_func= win_func->window_func();
+  sum_func->setup_window_func(current_thd, win_func->window_spec);
 
   Item_sum::Sumfunctype type= win_func->window_func()->sum_func();
-  switch (type) 
+  switch (type)
   {
     case Item_sum::ROW_NUMBER_FUNC:
     case Item_sum::RANK_FUNC:
     case Item_sum::DENSE_RANK_FUNC:
-    {
-      /*
-        One-pass window function computation, walk through the rows and
-        assign values.
-      */
-      compute_func= compute_window_func_values;
-      break;
-    }
     case Item_sum::COUNT_FUNC:
     case Item_sum::SUM_BIT_FUNC:
     case Item_sum::SUM_FUNC:
@@ -1825,43 +1997,49 @@ bool Window_func_runner::setup(THD *thd)
     case Item_sum::PERCENT_RANK_FUNC:
     case Item_sum::CUME_DIST_FUNC:
     case Item_sum::NTILE_FUNC:
-    {
-      /*
-        Frame-aware window function computation. It does one pass, but
-        uses three cursors -frame_start, current_row, and frame_end.
-      */
-      compute_func= compute_window_func_with_frames;
       break;
-    }
+
     default:
-      my_error(ER_NOT_SUPPORTED_YET, MYF(0), "This aggregate as window function"); 
+      my_error(ER_NOT_SUPPORTED_YET, MYF(0),
+               "This aggregate as window function");
       return true;
   }
 
-  return false;
+  return window_functions.push_back(win_func);
 }
 
 
 /*
   Compute the value of window function for all rows.
 */
-bool Window_func_runner::exec(TABLE *tbl, SORT_INFO *filesort_result)
+bool Window_func_runner::exec(THD *thd, TABLE *tbl, SORT_INFO *filesort_result)
 {
-  THD *thd= current_thd;
-  win_func->set_phase_to_computation();
-
-  /* Go through the sorted array and compute the window function */
-  READ_RECORD info;
-
-  if (init_read_record(&info, thd, tbl, NULL/*select*/, filesort_result,
-                       0, 1, FALSE))
-    return true;
+  List_iterator_fast<Item_window_func> it(window_functions);
+  Item_window_func *win_func;
+  while ((win_func= it++))
+  {
+    win_func->set_phase_to_computation();
+    // TODO(cvicentiu) Setting the aggregator should probably be done during
+    // setup of Window_funcs_sort.
+    win_func->window_func()->set_aggregator(Aggregator::SIMPLE_AGGREGATOR);
+  }
+  it.rewind();
 
-  bool is_error= compute_func(win_func, tbl, &info);
+  List<Cursor_manager> cursor_managers;
+  get_window_functions_required_cursors(thd, window_functions,
+                                        &cursor_managers);
 
-  win_func->set_phase_to_retrieval();
+  /* Go through the sorted array and compute the window function */
+  bool is_error= compute_window_func(thd,
+                                     window_functions,
+                                     cursor_managers,
+                                     tbl, filesort_result);
+  while ((win_func= it++))
+  {
+    win_func->set_phase_to_retrieval();
+  }
 
-  end_read_record(&info);
+  cursor_managers.delete_elements();
 
   return is_error;
 }
@@ -1872,21 +2050,15 @@ bool Window_funcs_sort::exec(JOIN *join)
   THD *thd= join->thd;
   JOIN_TAB *join_tab= &join->join_tab[join->top_join_tab_count];
 
+  /* Sort the table based on the most specific sorting criteria of
+     the window functions. */
   if (create_sort_index(thd, join, join_tab, filesort))
     return true;
 
   TABLE *tbl= join_tab->table;
   SORT_INFO *filesort_result= join_tab->filesort_result;
 
-  bool is_error= false;
-  List_iterator<Window_func_runner> it(runners);
-  Window_func_runner *runner;
-
-  while ((runner= it++))
-  {
-    if ((is_error= runner->exec(tbl, filesort_result)))
-      break;
-  }
+  bool is_error= runner.exec(thd, tbl, filesort_result);
 
   delete join_tab->filesort_result;
   join_tab->filesort_result= NULL;
@@ -1894,30 +2066,32 @@ bool Window_funcs_sort::exec(JOIN *join)
 }
 
 
-bool Window_funcs_sort::setup(THD *thd, SQL_SELECT *sel, 
-                             List_iterator<Item_window_func> &it)
+bool Window_funcs_sort::setup(THD *thd, SQL_SELECT *sel,
+                              List_iterator<Item_window_func> &it)
 {
   Item_window_func *win_func= it.peek();
   Item_window_func *prev_win_func;
 
+  /* The iterator should point to a valid function at the start of execution. */
+  DBUG_ASSERT(win_func);
   do
   {
-    Window_func_runner *runner;
-    if (!(runner= new Window_func_runner(win_func)) ||
-        runner->setup(thd))
-    {
+    if (runner.add_function_to_run(win_func))
       return true;
-    }
-    runners.push_back(runner);
     it++;
     prev_win_func= win_func;
-  } while ((win_func= it.peek()) && !(win_func->marker & SORTORDER_CHANGE_FLAG));
-  
+  } while ((win_func= it.peek()) &&
+           !(win_func->marker & SORTORDER_CHANGE_FLAG));
+
   /*
     The sort criteria must be taken from the last win_func in the group of
-    adjacent win_funcs that do not have SORTORDER_CHANGE_FLAG.
+    adjacent win_funcs that do not have SORTORDER_CHANGE_FLAG. This is
+    because the sort order must be the most specific sorting criteria defined
+    within the window function group. This ensures that we sort the table
+    in a way that the result is valid for all window functions belonging to
+    this Window_funcs_sort.
   */
-  Window_spec *spec = prev_win_func->window_spec;
+  Window_spec *spec= prev_win_func->window_spec;
 
   ORDER* sort_order= concat_order_lists(thd->mem_root, 
                                         spec->partition_list->first,
@@ -1932,8 +2106,8 @@ bool Window_funcs_sort::setup(THD *thd, SQL_SELECT *sel,
 
 
 bool Window_funcs_computation::setup(THD *thd,
-                                          List<Item_window_func> *window_funcs,
-                                          JOIN_TAB *tab)
+                                     List<Item_window_func> *window_funcs,
+                                     JOIN_TAB *tab)
 {
   order_window_funcs_by_window_specs(window_funcs);
 
diff --git a/sql/sql_window.h b/sql/sql_window.h
index 54e39d8..c384724 100644
--- a/sql/sql_window.h
+++ b/sql/sql_window.h
@@ -154,9 +154,7 @@ int setup_windows(THD *thd, Ref_ptr_array ref_pointer_array, TABLE_LIST *tables,
 // Classes that make window functions computation a part of SELECT's query plan
 //////////////////////////////////////////////////////////////////////////////
 
-typedef bool (*window_compute_func_t)(Item_window_func *item_win,
-                                      TABLE *tbl, READ_RECORD *info);
-
+class Frame_cursor;
 /*
   This handles computation of one window function.
 
@@ -165,21 +163,17 @@ typedef bool (*window_compute_func_t)(Item_window_func *item_win,
 
 class Window_func_runner : public Sql_alloc
 {
-  Item_window_func *win_func;
-
-  /* The function to use for computation*/
-  window_compute_func_t compute_func;
-
 public:
-  Window_func_runner(Item_window_func *win_func_arg) :
-    win_func(win_func_arg)
-  {}
+  /* Add the function to be computed during the execution pass  */
+  bool add_function_to_run(Item_window_func *win_func);
 
-  // Set things up. Create filesort structures, etc
-  bool setup(THD *thd);
- 
-  // This sorts and runs the window function.
-  bool exec(TABLE *tbl, SORT_INFO *filesort_result);
+  /* Compute and fill the fields in the table. */
+  bool exec(THD *thd, TABLE *tbl, SORT_INFO *filesort_result);
+
+private:
+  /* A list of window functions for which this Window_func_runner will compute
+     values during the execution phase. */
+  List<Item_window_func> window_functions;
 };
 
 
@@ -191,21 +185,24 @@ class Window_func_runner : public Sql_alloc
 
 class Window_funcs_sort : public Sql_alloc
 {
-  List<Window_func_runner> runners;
-
-  /* Window functions can be computed over this sorting */
-  Filesort *filesort;
 public:
   bool setup(THD *thd, SQL_SELECT *sel, List_iterator<Item_window_func> &it);
   bool exec(JOIN *join);
   void cleanup() { delete filesort; }
 
   friend class Window_funcs_computation;
+
+private:
+  Window_func_runner runner;
+
+  /* Window functions can be computed over this sorting */
+  Filesort *filesort;
 };
 
 
 struct st_join_table;
 class Explain_aggr_window_funcs;
+
 /*
   This is a "window function computation phase": a single object of this class
   takes care of computing all window functions in a SELECT.


More information about the commits mailing list