[Commits] 78d3bd5: MDEV-10059: Compute window functions with same sorting criteria simultaneously

Vicențiu Ciorbaru vicentiu at mariadb.org
Tue Sep 6 19:56:15 EEST 2016


Hi Sergey,

This is the patch that performs the refactoring for computing multiple
window functions in a single scan.

I know the patch is large, the main points changed are the following:

1. Frame_cursors are now initialized at construction. The init function
just sets the READ_RECORD.
2. We no longer have a partition tracker within Item_window_func. Instead
we create it during the values computation stage.
3. Cursor_manager is a new class that owns cursors belonging to 1 window
function. Through this class (and only this class) we advance the cursors.
4. Window_func_runner deals with computing all the window function values
belonging to a Window_funcs_sort.

I recommend you start with Window_func_runner::exec as that's where
everything begins to branch out.

Vicentiu


On Tue, 6 Sep 2016 at 19:51 vicentiu <vicentiu at mariadb.org> wrote:

> revision-id: 78d3bd5e0b8827d21c4fe907aa56dcda85d7dd89
> (mariadb-10.2.1-21-g78d3bd5)
> parent(s): 31a8cf54c8a7913338480a0571feaf32143b5f64
> author: Vicențiu Ciorbaru
> committer: Vicențiu Ciorbaru
> timestamp: 2016-09-06 19:50:50 +0300
> message:
>
> MDEV-10059: Compute window functions with same sorting criteria
> simultaneously
>
> Perform only one table scan for each window function present. We do this
> by keeping keeping cursors for each window function frame bound and
> running them for each function for every row.
>
> ---
>  sql/item_windowfunc.cc |  43 +--
>  sql/item_windowfunc.h  |  90 ++---
>  sql/sql_window.cc      | 884
> +++++++++++++++++++++++++++++--------------------
>  sql/sql_window.h       |  37 +--
>  4 files changed, 607 insertions(+), 447 deletions(-)
>
> diff --git a/sql/item_windowfunc.cc b/sql/item_windowfunc.cc
> index d157d54..c8ea979 100644
> --- a/sql/item_windowfunc.cc
> +++ b/sql/item_windowfunc.cc
> @@ -41,7 +41,7 @@ Item_window_func::resolve_window_name(THD *thd)
>      return true;
>    }
>
> -  return false;
> +  return false;
>  }
>
>
> @@ -154,7 +154,7 @@ void Item_window_func::split_sum_func(THD *thd,
> Ref_ptr_array ref_pointer_array,
>
>
>  /*
> -  This must be called before advance_window() can be called.
> +  This must be called before attempting to compute the window function
> values.
>
>    @detail
>      If we attempt to do it in fix_fields(), partition_fields will refer
> @@ -162,30 +162,25 @@ void Item_window_func::split_sum_func(THD *thd,
> Ref_ptr_array ref_pointer_array,
>      We need it to refer to temp.table columns.
>  */
>
> -void Item_window_func::setup_partition_border_check(THD *thd)
> -{
> -  partition_tracker.init(thd, window_spec->partition_list);
> -  window_func()->setup_window_func(thd, window_spec);
> -}
> -
> -
>  void Item_sum_rank::setup_window_func(THD *thd, Window_spec *window_spec)
>  {
>    /* TODO: move this into Item_window_func? */
> -  peer_tracker.init(thd, window_spec->order_list);
> +  peer_tracker = new Group_bound_tracker(thd, window_spec->order_list);
> +  peer_tracker->init();
>    clear();
>  }
>
>  void Item_sum_dense_rank::setup_window_func(THD *thd, Window_spec
> *window_spec)
>  {
>    /* TODO: consider moving this && Item_sum_rank's implementation */
> -  peer_tracker.init(thd, window_spec->order_list);
> +  peer_tracker = new Group_bound_tracker(thd, window_spec->order_list);
> +  peer_tracker->init();
>    clear();
>  }
>
>  bool Item_sum_dense_rank::add()
>  {
> -  if (peer_tracker.check_if_next_group() || first_add)
> +  if (peer_tracker->check_if_next_group() || first_add)
>    {
>      first_add= false;
>      dense_rank++;
> @@ -198,7 +193,7 @@ bool Item_sum_dense_rank::add()
>  bool Item_sum_rank::add()
>  {
>    row_number++;
> -  if (peer_tracker.check_if_next_group())
> +  if (peer_tracker->check_if_next_group())
>    {
>      /* Row value changed */
>      cur_rank= row_number;
> @@ -206,25 +201,10 @@ bool Item_sum_rank::add()
>    return false;
>  }
>
> -bool Item_window_func::check_if_partition_changed()
> -{
> -  return partition_tracker.check_if_next_group();
> -}
> -
> -void Item_window_func::advance_window()
> -{
> -  if (check_if_partition_changed())
> -  {
> -    /* Next partition */
> -    window_func()->clear();
> -  }
> -  window_func()->add();
> -}
> -
>  bool Item_sum_percent_rank::add()
>  {
>    row_number++;
> -  if (peer_tracker.check_if_next_group())
> +  if (peer_tracker->check_if_next_group())
>    {
>      /* Row value changed. */
>      cur_rank= row_number;
> @@ -235,8 +215,7 @@ bool Item_sum_percent_rank::add()
>  void Item_sum_percent_rank::setup_window_func(THD *thd, Window_spec
> *window_spec)
>  {
>    /* TODO: move this into Item_window_func? */
> -  peer_tracker.init(thd, window_spec->order_list);
> +  peer_tracker = new Group_bound_tracker(thd, window_spec->order_list);
> +  peer_tracker->init();
>    clear();
>  }
> -
> -
> diff --git a/sql/item_windowfunc.h b/sql/item_windowfunc.h
> index 9d2fa13..5b352a4 100644
> --- a/sql/item_windowfunc.h
> +++ b/sql/item_windowfunc.h
> @@ -12,25 +12,19 @@ int test_if_group_changed(List<Cached_item> &list);
>  /* A wrapper around test_if_group_changed */
>  class Group_bound_tracker
>  {
> -  List<Cached_item> group_fields;
> -  /*
> -    During the first check_if_next_group, the list of cached_items is not
> -    initialized. The compare function will return that the items match if
> -    the field's value is the same as the Cached_item's default value (0).
> -    This flag makes sure that we always return true during the first
> check.
> -
> -    XXX This is better to be implemented within test_if_group_changed, but
> -    since it is used in other parts of the codebase, we keep it here for
> now.
> -  */
> -   bool first_check;
>  public:
> -  void init(THD *thd, SQL_I_List<ORDER> *list)
> +
> +  Group_bound_tracker(THD *thd, SQL_I_List<ORDER> *list)
>    {
>      for (ORDER *curr = list->first; curr; curr=curr->next)
>      {
>        Cached_item *tmp= new_Cached_item(thd, curr->item[0], TRUE);
>        group_fields.push_back(tmp);
>      }
> +  }
> +
> +  void init()
> +  {
>      first_check= true;
>    }
>
> @@ -76,6 +70,19 @@ class Group_bound_tracker
>      }
>      return 0;
>    }
> +
> +private:
> +  List<Cached_item> group_fields;
> +  /*
> +    During the first check_if_next_group, the list of cached_items is not
> +    initialized. The compare function will return that the items match if
> +    the field's value is the same as the Cached_item's default value (0).
> +    This flag makes sure that we always return true during the first
> check.
> +
> +    XXX This is better to be implemented within test_if_group_changed, but
> +    since it is used in other parts of the codebase, we keep it here for
> now.
> +  */
> +   bool first_check;
>  };
>
>  /*
> @@ -92,19 +99,22 @@ class Item_sum_row_number: public Item_sum_int
>    longlong count;
>
>  public:
> +
> +  Item_sum_row_number(THD *thd)
> +    : Item_sum_int(thd),  count(0) {}
> +
>    void clear()
>    {
>      count= 0;
>    }
> -  bool add()
> +
> +  bool add()
>    {
>      count++;
> -    return false;
> +    return false;
>    }
> -  void update_field() {}
>
> -  Item_sum_row_number(THD *thd)
> -    : Item_sum_int(thd),  count(0) {}
> +  void update_field() {}
>
>    enum Sumfunctype sum_func() const
>    {
> @@ -119,7 +129,6 @@ class Item_sum_row_number: public Item_sum_int
>    {
>      return "row_number(";
>    }
> -
>  };
>
>
> @@ -145,9 +154,12 @@ class Item_sum_rank: public Item_sum_int
>  protected:
>    longlong row_number; // just ROW_NUMBER()
>    longlong cur_rank;   // current value
> -
> -  Group_bound_tracker peer_tracker;
> +
> +  Group_bound_tracker *peer_tracker;
>  public:
> +
> +  Item_sum_rank(THD *thd) : Item_sum_int(thd), peer_tracker(NULL) {}
> +
>    void clear()
>    {
>      /* This is called on partition start */
> @@ -168,10 +180,6 @@ class Item_sum_rank: public Item_sum_int
>      TODO: ^^ what does this do ? It is not called ever?
>    */
>
> -public:
> -  Item_sum_rank(THD *thd)
> -    : Item_sum_int(thd) {}
> -
>    enum Sumfunctype sum_func () const
>    {
>      return RANK_FUNC;
> @@ -183,9 +191,12 @@ class Item_sum_rank: public Item_sum_int
>    }
>
>    void setup_window_func(THD *thd, Window_spec *window_spec);
> +
>    void cleanup()
>    {
> -    peer_tracker.cleanup();
> +    if (peer_tracker)
> +      peer_tracker->cleanup();
> +    delete peer_tracker;
>      Item_sum_int::cleanup();
>    }
>  };
> @@ -214,7 +225,7 @@ class Item_sum_dense_rank: public Item_sum_int
>  {
>    longlong dense_rank;
>    bool first_add;
> -  Group_bound_tracker peer_tracker;
> +  Group_bound_tracker *peer_tracker;
>   public:
>    /*
>       XXX(cvicentiu) This class could potentially be implemented in the
> rank
> @@ -233,7 +244,7 @@ class Item_sum_dense_rank: public Item_sum_int
>    }
>
>    Item_sum_dense_rank(THD *thd)
> -    : Item_sum_int(thd), dense_rank(0), first_add(true) {}
> +    : Item_sum_int(thd), dense_rank(0), first_add(true),
> peer_tracker(NULL) {}
>    enum Sumfunctype sum_func () const
>    {
>      return DENSE_RANK_FUNC;
> @@ -248,7 +259,11 @@ class Item_sum_dense_rank: public Item_sum_int
>
>    void cleanup()
>    {
> -    peer_tracker.cleanup();
> +    if (peer_tracker)
> +    {
> +      peer_tracker->cleanup();
> +      delete peer_tracker;
> +    }
>      Item_sum_int::cleanup();
>    }
>  };
> @@ -289,7 +304,7 @@ class Item_sum_percent_rank: public
> Item_sum_window_with_row_count
>  {
>   public:
>    Item_sum_percent_rank(THD *thd)
> -    : Item_sum_window_with_row_count(thd), cur_rank(1) {}
> +    : Item_sum_window_with_row_count(thd), cur_rank(1),
> peer_tracker(NULL) {}
>
>    longlong val_int()
>    {
> @@ -347,11 +362,15 @@ class Item_sum_percent_rank: public
> Item_sum_window_with_row_count
>    longlong cur_rank;   // Current rank of the current row.
>    longlong row_number; // Value if this were ROW_NUMBER() function.
>
> -  Group_bound_tracker peer_tracker;
> +  Group_bound_tracker *peer_tracker;
>
>    void cleanup()
>    {
> -    peer_tracker.cleanup();
> +    if (peer_tracker)
> +    {
> +      peer_tracker->cleanup();
> +      delete peer_tracker;
> +    }
>      Item_sum_num::cleanup();
>    }
>  };
> @@ -502,12 +521,6 @@ class Item_window_func : public Item_func_or_sum
>  public:
>    Window_spec *window_spec;
>
> -  /*
> -    This stores the data about the partition we're currently in.
> -    advance_window() uses this to tell when we've left one partition and
> -    entered another
> -  */
> -  Group_bound_tracker partition_tracker;
>  public:
>    Item_window_func(THD *thd, Item_sum *win_func, LEX_STRING *win_name)
>      : Item_func_or_sum(thd, (Item *) win_func),
> @@ -600,9 +613,6 @@ class Item_window_func : public Item_func_or_sum
>    */
>    void setup_partition_border_check(THD *thd);
>
> -  void advance_window();
> -  bool check_if_partition_changed();
> -
>    enum_field_types field_type() const
>    {
>      return ((Item_sum *) args[0])->field_type();
> diff --git a/sql/sql_window.cc b/sql/sql_window.cc
> index 4c5ef53..a862821 100644
> --- a/sql/sql_window.cc
> +++ b/sql/sql_window.cc
> @@ -37,12 +37,12 @@
> Window_spec::check_window_names(List_iterator_fast<Window_spec> &it)
>        if (win_spec->order_list->elements && order_list->elements)
>        {
>          my_error(ER_ORDER_LIST_IN_REFERENCING_WINDOW_SPEC, MYF(0),
> ref_name);
> -        return true;
> +        return true;
>        }
> -      if (win_spec->window_frame)
> +      if (win_spec->window_frame)
>        {
>          my_error(ER_WINDOW_FRAME_IN_REFERENCED_WINDOW_SPEC, MYF(0),
> ref_name);
> -        return true;
> +        return true;
>        }
>        referenced_win_spec= win_spec;
>        if (partition_list->elements == 0)
> @@ -54,7 +54,7 @@
> Window_spec::check_window_names(List_iterator_fast<Window_spec> &it)
>    if (ref_name && !referenced_win_spec)
>    {
>      my_error(ER_WRONG_WINDOW_SPEC_NAME, MYF(0), ref_name);
> -    return true;
> +    return true;
>    }
>    window_names_are_checked= true;
>    return false;
> @@ -73,7 +73,7 @@ Window_frame::check_frame_bounds()
>         top_bound->precedence_type == Window_frame_bound::FOLLOWING))
>    {
>      my_error(ER_BAD_COMBINATION_OF_WINDOW_FRAME_BOUND_SPECS, MYF(0));
> -    return true;
> +    return true;
>    }
>
>    return false;
> @@ -86,7 +86,7 @@ Window_frame::check_frame_bounds()
>
>  int
>  setup_windows(THD *thd, Ref_ptr_array ref_pointer_array, TABLE_LIST
> *tables,
> -             List<Item> &fields, List<Item> &all_fields,
> +              List<Item> &fields, List<Item> &all_fields,
>                List<Window_spec> &win_specs, List<Item_window_func>
> &win_funcs)
>  {
>    Window_spec *win_spec;
> @@ -116,7 +116,7 @@ setup_windows(THD *thd, Ref_ptr_array
> ref_pointer_array, TABLE_LIST *tables,
>    it.rewind();
>
>    List_iterator_fast<Window_spec> itp(win_specs);
> -
> +
>    while ((win_spec= it++))
>    {
>      bool hidden_group_fields;
> @@ -131,7 +131,7 @@ setup_windows(THD *thd, Ref_ptr_array
> ref_pointer_array, TABLE_LIST *tables,
>      {
>        DBUG_RETURN(1);
>      }
> -
> +
>      if (win_spec->window_frame &&
>          win_spec->window_frame->exclusion != Window_frame::EXCL_NONE)
>      {
> @@ -188,7 +188,7 @@ setup_windows(THD *thd, Ref_ptr_array
> ref_pointer_array, TABLE_LIST *tables,
>          }
>        }
>      }
> -
> +
>      /* "ROWS PRECEDING|FOLLOWING $n" must have a numeric $n */
>      if (win_spec->window_frame &&
>          win_spec->window_frame->units == Window_frame::UNITS_ROWS)
> @@ -219,7 +219,7 @@ setup_windows(THD *thd, Ref_ptr_array
> ref_pointer_array, TABLE_LIST *tables,
>    {
>      win_func_item->update_used_tables();
>    }
> -
> +
>    DBUG_RETURN(0);
>  }
>
> @@ -445,7 +445,7 @@ typedef int (*Item_window_func_cmp)(Item_window_func
> *f1,
>    @brief
>      Sort window functions so that those that can be computed together are
>      adjacent.
> -
> +
>    @detail
>      Sort window functions by their
>       - required sorting order,
> @@ -498,48 +498,15 @@ void
> order_window_funcs_by_window_specs(List<Item_window_func> *win_func_list)
>      }
>      else if (win_spec_prev->window_frame != win_spec_curr->window_frame)
>        curr->marker|= FRAME_CHANGE_FLAG;
> -
> -    prev= curr;
> -  }
> +
> +    prev= curr;
> +  }
>  }
>
>
>
>  /////////////////////////////////////////////////////////////////////////////
>
>
> -/*
> -  Do a pass over sorted table and compute window function values.
> -
> -  This function is for handling window functions that can be computed on
> the
> -  fly. Examples are RANK() and ROW_NUMBER().
> -*/
> -bool compute_window_func_values(Item_window_func *item_win,
> -                                TABLE *tbl, READ_RECORD *info)
> -{
> -  int err;
> -  while (!(err=info->read_record(info)))
> -  {
> -    store_record(tbl,record[1]);
> -
> -    /*
> -      This will cause window function to compute its value for the
> -      current row :
> -    */
> -    item_win->advance_window();
> -
> -    /*
> -      Put the new value into temptable's field
> -      TODO: Should this use item_win->update_field() call?
> -      Regular aggegate function implementations seem to implement it.
> -    */
> -    item_win->save_in_field(item_win->result_field, true);
> -    err= tbl->file->ha_update_row(tbl->record[1], tbl->record[0]);
> -    if (err && err != HA_ERR_RECORD_IS_THE_SAME)
> -      return true;
> -  }
> -  return false;
> -}
> -
>
>  /////////////////////////////////////////////////////////////////////////////
>  // Window Frames support
>
>  /////////////////////////////////////////////////////////////////////////////
> @@ -571,11 +538,6 @@ bool clone_read_record(const READ_RECORD *src,
> READ_RECORD *dst)
>
>  class Rowid_seq_cursor
>  {
> -  uchar *cache_start;
> -  uchar *cache_pos;
> -  uchar *cache_end;
> -  uint ref_length;
> -
>  public:
>    virtual ~Rowid_seq_cursor() {}
>
> @@ -595,17 +557,17 @@ class Rowid_seq_cursor
>      cache_pos+= ref_length;
>      return 0;
>    }
> -
> +
>    ha_rows get_rownum()
>    {
>      return (cache_pos - cache_start) / ref_length;
>    }
>
> -  // will be called by ROWS n FOLLOWING to catch up.
>    void move_to(ha_rows row_number)
>    {
>      cache_pos= cache_start + row_number * ref_length;
>    }
> +
>  protected:
>    bool at_eof() { return (cache_pos == cache_end); }
>
> @@ -618,6 +580,12 @@ class Rowid_seq_cursor
>    }
>
>    uchar *get_curr_rowid() { return cache_pos; }
> +
> +private:
> +  uchar *cache_start;
> +  uchar *cache_pos;
> +  uchar *cache_end;
> +  uint ref_length;
>  };
>
>
> @@ -627,11 +595,6 @@ class Rowid_seq_cursor
>
>  class Table_read_cursor : public Rowid_seq_cursor
>  {
> -  /*
> -    Note: we don't own *read_record, somebody else is using it.
> -    We only look at the constant part of it, e.g. table, record buffer,
> etc.
> -  */
> -  READ_RECORD *read_record;
>  public:
>    virtual ~Table_read_cursor() {}
>
> @@ -668,7 +631,14 @@ class Table_read_cursor : public Rowid_seq_cursor
>      return false; // didn't restore
>    }
>
> -  // todo: should move_to() also read row here?
> +private:
> +  /*
> +    Note: we don't own *read_record, somebody else is using it.
> +    We only look at the constant part of it, e.g. table, record buffer,
> etc.
> +  */
> +  READ_RECORD *read_record;
> +
> +  // TODO(spetrunia): should move_to() also read row here?
>  };
>
>
> @@ -679,14 +649,14 @@ class Table_read_cursor : public Rowid_seq_cursor
>
>  class Partition_read_cursor
>  {
> -  Table_read_cursor tbl_cursor;
> -  Group_bound_tracker bound_tracker;
> -  bool end_of_partition;
>  public:
> -  void init(THD *thd, READ_RECORD *info, SQL_I_List<ORDER>
> *partition_list)
> +  Partition_read_cursor(THD *thd, SQL_I_List<ORDER> *partition_list) :
> +    bound_tracker(thd, partition_list) {}
> +
> +  void init(READ_RECORD *info)
>    {
>      tbl_cursor.init(info);
> -    bound_tracker.init(thd, partition_list);
> +    bound_tracker.init();
>      end_of_partition= false;
>    }
>
> @@ -704,7 +674,7 @@ class Partition_read_cursor
>    }
>
>    /*
> -    Moves to a new row. The row is assumed to be within the current
> partition
> +    Moves to a new row. The row is assumed to be within the current
> partition.
>    */
>    void move_to(ha_rows rownum) { tbl_cursor.move_to(rownum); }
>
> @@ -731,14 +701,18 @@ class Partition_read_cursor
>    {
>      return tbl_cursor.restore_last_row();
>    }
> +
> +private:
> +  Table_read_cursor tbl_cursor;
> +  Group_bound_tracker bound_tracker;
> +  bool end_of_partition;
>  };
>
>
>  /////////////////////////////////////////////////////////////////////////////
>
> -
>  /*
>    Window frame bound cursor. Abstract interface.
> -
> +
>    @detail
>      The cursor moves within the partition that the current row is in.
>      It may be ahead or behind the current row.
> @@ -775,11 +749,12 @@ class Partition_read_cursor
>  class Frame_cursor : public Sql_alloc
>  {
>  public:
> -  virtual void init(THD *thd, READ_RECORD *info,
> -                    SQL_I_List<ORDER> *partition_list,
> -                    SQL_I_List<ORDER> *order_list)
> -  {}
> +  virtual void init(READ_RECORD *info) {};
>
> +  bool add_sum_func(Item_sum* item)
> +  {
> +    return sum_functions.push_back(item);
> +  }
>    /*
>      Current row has moved to the next partition and is positioned on the
> first
>      row there. Position the frame bound accordingly.
> @@ -796,20 +771,95 @@ class Frame_cursor : public Sql_alloc
>        - The callee may move tbl->file and tbl->record[0] to point to some
> other
>          row.
>    */
> -  virtual void pre_next_partition(ha_rows rownum, Item_sum* item){};
> -  virtual void next_partition(ha_rows rownum, Item_sum* item)=0;
> -
> +  virtual void pre_next_partition(ha_rows rownum) {};
> +  virtual void next_partition(ha_rows rownum)=0;
> +
>    /*
>      The current row has moved one row forward.
>      Move this frame bound accordingly, and update the value of aggregate
>      function as necessary.
>    */
> -  virtual void pre_next_row(Item_sum* item){};
> -  virtual void next_row(Item_sum* item)=0;
> -
> -  virtual ~Frame_cursor(){}
> +  virtual void pre_next_row() {};
> +  virtual void next_row()=0;
> +
> +  virtual ~Frame_cursor() {}
> +
> +protected:
> +  inline void add_value_to_items()
> +  {
> +    List_iterator_fast<Item_sum> it(sum_functions);
> +    Item_sum *item_sum;
> +    while ((item_sum= it++))
> +    {
> +      item_sum->add();
> +    }
> +  }
> +  inline void remove_value_from_items()
> +  {
> +    List_iterator_fast<Item_sum> it(sum_functions);
> +    Item_sum *item_sum;
> +    while ((item_sum= it++))
> +    {
> +      item_sum->remove();
> +    }
> +  }
> +
> +  /* Sum functions that this cursor handles. */
> +  List<Item_sum> sum_functions;
> +};
> +
> +/*
> +  A class that owns cursor objects associated with a specific window
> function.
> +*/
> +class Cursor_manager
> +{
> +public:
> +  bool add_cursor(Frame_cursor *cursor)
> +  {
> +    return cursors.push_back(cursor);
> +  }
> +
> +  void initialize_cursors(READ_RECORD *info)
> +  {
> +    List_iterator_fast<Frame_cursor> iter(cursors);
> +    Frame_cursor *fc;
> +    while ((fc= iter++))
> +      fc->init(info);
> +  }
> +
> +  void notify_cursors_partition_changed(ha_rows rownum)
> +  {
> +    List_iterator_fast<Frame_cursor> iter(cursors);
> +    Frame_cursor *cursor;
> +    while ((cursor= iter++))
> +      cursor->pre_next_partition(rownum);
> +
> +    iter.rewind();
> +    while ((cursor= iter++))
> +      cursor->next_partition(rownum);
> +  }
> +
> +  void notify_cursors_next_row()
> +  {
> +    List_iterator_fast<Frame_cursor> iter(cursors);
> +    Frame_cursor *cursor;
> +    while ((cursor= iter++))
> +      cursor->pre_next_row();
> +
> +    iter.rewind();
> +    while ((cursor= iter++))
> +      cursor->next_row();
> +  }
> +
> +  ~Cursor_manager() { cursors.delete_elements(); }
> +
> +private:
> +  /* List of the cursors that this manager owns. */
> +  List<Frame_cursor> cursors;
>  };
>
> +
> +
>
>  //////////////////////////////////////////////////////////////////////////////
>  // RANGE-type frames
>
>  //////////////////////////////////////////////////////////////////////////////
> @@ -841,16 +891,12 @@ class Frame_range_n_top : public Frame_cursor
>    */
>    int order_direction;
>  public:
> -  Frame_range_n_top(bool is_preceding_arg, Item *n_val_arg) :
> +  Frame_range_n_top(THD *thd,
> +                    SQL_I_List<ORDER> *partition_list,
> +                    SQL_I_List<ORDER> *order_list,
> +                    bool is_preceding_arg, Item *n_val_arg) :
>      n_val(n_val_arg), item_add(NULL), is_preceding(is_preceding_arg)
> -  {}
> -
> -  void init(THD *thd, READ_RECORD *info,
> -            SQL_I_List<ORDER> *partition_list,
> -            SQL_I_List<ORDER> *order_list)
>    {
> -    cursor.init(info);
> -
>      DBUG_ASSERT(order_list->elements == 1);
>      Item *src_expr= order_list->first->item[0];
>      if (order_list->first->direction == ORDER::ORDER_ASC)
> @@ -872,24 +918,30 @@ class Frame_range_n_top : public Frame_cursor
>      item_add->fix_fields(thd, &item_add);
>    }
>
> -  void pre_next_partition(ha_rows rownum, Item_sum* item)
> +  void init(READ_RECORD *info)
> +  {
> +    cursor.init(info);
> +
> +  }
> +
> +  void pre_next_partition(ha_rows rownum)
>    {
>      // Save the value of FUNC(current_row)
>      range_expr->fetch_value_from(item_add);
>    }
>
> -  void next_partition(ha_rows rownum, Item_sum* item)
> +  void next_partition(ha_rows rownum)
>    {
>      cursor.move_to(rownum);
> -    walk_till_non_peer(item);
> +    walk_till_non_peer();
>    }
>
> -  void pre_next_row(Item_sum* item)
> +  void pre_next_row()
>    {
>      range_expr->fetch_value_from(item_add);
>    }
>
> -  void next_row(Item_sum* item)
> +  void next_row()
>    {
>      /*
>        Ok, our cursor is at the first row R where
> @@ -900,19 +952,19 @@ class Frame_range_n_top : public Frame_cursor
>      {
>        if (order_direction * range_expr->cmp_read_only() <= 0)
>          return;
> -      item->remove();
> +      remove_value_from_items();
>      }
> -    walk_till_non_peer(item);
> +    walk_till_non_peer();
>    }
>
>  private:
> -  void walk_till_non_peer(Item_sum* item)
> +  void walk_till_non_peer()
>    {
>      while (!cursor.get_next())
>      {
>        if (order_direction * range_expr->cmp_read_only() <= 0)
>          break;
> -      item->remove();
> +      remove_value_from_items();
>      }
>    }
>  };
> @@ -950,16 +1002,13 @@ class Frame_range_n_bottom: public Frame_cursor
>    */
>    int order_direction;
>  public:
> -  Frame_range_n_bottom(bool is_preceding_arg, Item *n_val_arg) :
> -    n_val(n_val_arg), item_add(NULL), is_preceding(is_preceding_arg)
> -  {}
> -
> -  void init(THD *thd, READ_RECORD *info,
> -            SQL_I_List<ORDER> *partition_list,
> -            SQL_I_List<ORDER> *order_list)
> +  Frame_range_n_bottom(THD *thd,
> +                       SQL_I_List<ORDER> *partition_list,
> +                       SQL_I_List<ORDER> *order_list,
> +                       bool is_preceding_arg, Item *n_val_arg) :
> +    cursor(thd, partition_list), n_val(n_val_arg), item_add(NULL),
> +    is_preceding(is_preceding_arg)
>    {
> -    cursor.init(thd, info, partition_list);
> -
>      DBUG_ASSERT(order_list->elements == 1);
>      Item *src_expr= order_list->first->item[0];
>
> @@ -982,7 +1031,12 @@ class Frame_range_n_bottom: public Frame_cursor
>      item_add->fix_fields(thd, &item_add);
>    }
>
> -  void pre_next_partition(ha_rows rownum, Item_sum* item)
> +  void init(READ_RECORD *info)
> +  {
> +    cursor.init(info);
> +  }
> +
> +  void pre_next_partition(ha_rows rownum)
>    {
>      // Save the value of FUNC(current_row)
>      range_expr->fetch_value_from(item_add);
> @@ -991,20 +1045,20 @@ class Frame_range_n_bottom: public Frame_cursor
>      end_of_partition= false;
>    }
>
> -  void next_partition(ha_rows rownum, Item_sum* item)
> +  void next_partition(ha_rows rownum)
>    {
>      cursor.move_to(rownum);
> -    walk_till_non_peer(item);
> +    walk_till_non_peer();
>    }
>
> -  void pre_next_row(Item_sum* item)
> +  void pre_next_row()
>    {
>      if (end_of_partition)
>        return;
>      range_expr->fetch_value_from(item_add);
>    }
>
> -  void next_row(Item_sum* item)
> +  void next_row()
>    {
>      if (end_of_partition)
>        return;
> @@ -1017,20 +1071,20 @@ class Frame_range_n_bottom: public Frame_cursor
>      {
>        if (order_direction * range_expr->cmp_read_only() < 0)
>          return;
> -      item->add();
> +      add_value_to_items();
>      }
> -    walk_till_non_peer(item);
> +    walk_till_non_peer();
>    }
>
>  private:
> -  void walk_till_non_peer(Item_sum* item)
> +  void walk_till_non_peer()
>    {
>      int res;
>      while (!(res= cursor.get_next()))
>      {
>        if (order_direction * range_expr->cmp_read_only() < 0)
>          break;
> -      item->add();
> +      add_value_to_items();
>      }
>      if (res)
>        end_of_partition= true;
> @@ -1043,11 +1097,11 @@ class Frame_range_n_bottom: public Frame_cursor
>       ...
>     | peer1
>     | peer2  <----- current_row
> -   | peer3
> +   | peer3
>     +-peer4  <----- the cursor points here. peer4 itself is included.
>       nonpeer1
>       nonpeer2
> -
> +
>    This bound moves in front of the current_row. It should be a the first
> row
>    that is still a peer of the current row.
>  */
> @@ -1060,15 +1114,20 @@ class Frame_range_current_row_bottom: public
> Frame_cursor
>
>    bool dont_move;
>  public:
> -  void init(THD *thd, READ_RECORD *info,
> -            SQL_I_List<ORDER> *partition_list,
> -            SQL_I_List<ORDER> *order_list)
> +  Frame_range_current_row_bottom(THD *thd,
> +                                 SQL_I_List<ORDER> *partition_list,
> +                                 SQL_I_List<ORDER> *order_list) :
> +    cursor(thd, partition_list), peer_tracker(thd, order_list)
> +  {
> +  }
> +
> +  void init(READ_RECORD *info)
>    {
> -    cursor.init(thd, info, partition_list);
> -    peer_tracker.init(thd, order_list);
> +    cursor.init(info);
> +    peer_tracker.init();
>    }
>
> -  void pre_next_partition(ha_rows rownum, Item_sum* item)
> +  void pre_next_partition(ha_rows rownum)
>    {
>      // Save the value of the current_row
>      peer_tracker.check_if_next_group();
> @@ -1076,23 +1135,23 @@ class Frame_range_current_row_bottom: public
> Frame_cursor
>      if (rownum != 0)
>      {
>        // Add the current row now because our cursor has already seen it
> -      item->add();
> +      add_value_to_items();
>      }
>    }
>
> -  void next_partition(ha_rows rownum, Item_sum* item)
> +  void next_partition(ha_rows rownum)
>    {
> -    walk_till_non_peer(item);
> +    walk_till_non_peer();
>    }
>
> -  void pre_next_row(Item_sum* item)
> +  void pre_next_row()
>    {
>      dont_move= !peer_tracker.check_if_next_group();
>      if (!dont_move)
> -      item->add();
> +      add_value_to_items();
>    }
>
> -  void next_row(Item_sum* item)
> +  void next_row()
>    {
>      // Check if our cursor is pointing at a peer of the current row.
>      // If not, move forward until that becomes true
> @@ -1104,11 +1163,11 @@ class Frame_range_current_row_bottom: public
> Frame_cursor
>        */
>        return;
>      }
> -    walk_till_non_peer(item);
> +    walk_till_non_peer();
>    }
>
>  private:
> -  void walk_till_non_peer(Item_sum* item)
> +  void walk_till_non_peer()
>    {
>      /*
>        Walk forward until we've met first row that's not a peer of the
> current
> @@ -1118,7 +1177,7 @@ class Frame_range_current_row_bottom: public
> Frame_cursor
>      {
>        if (peer_tracker.compare_with_cache())
>          break;
> -      item->add();
> +      add_value_to_items();
>      }
>    }
>  };
> @@ -1148,33 +1207,38 @@ class Frame_range_current_row_top : public
> Frame_cursor
>
>    bool move;
>  public:
> -  void init(THD *thd, READ_RECORD *info,
> -            SQL_I_List<ORDER> *partition_list,
> -            SQL_I_List<ORDER> *order_list)
> +  Frame_range_current_row_top(THD *thd,
> +                              SQL_I_List<ORDER> *partition_list,
> +                              SQL_I_List<ORDER> *order_list) :
> +    bound_tracker(thd, partition_list), cursor(), peer_tracker(thd,
> order_list),
> +    move(false)
> +  {}
> +
> +  void init(READ_RECORD *info)
>    {
> -    bound_tracker.init(thd, partition_list);
> +    bound_tracker.init();
>
>      cursor.init(info);
> -    peer_tracker.init(thd, order_list);
> +    peer_tracker.init();
>    }
>
> -  void pre_next_partition(ha_rows rownum, Item_sum* item)
> +  void pre_next_partition(ha_rows rownum)
>    {
>      // Fetch the value from the first row
>      peer_tracker.check_if_next_group();
>      cursor.move_to(rownum+1);
>    }
>
> -  void next_partition(ha_rows rownum, Item_sum* item) {}
> +  void next_partition(ha_rows rownum) {}
>
> -  void pre_next_row(Item_sum* item)
> +  void pre_next_row()
>    {
>      // Check if the new current_row is a peer of the row that our cursor
> is
>      // pointing to.
>      move= peer_tracker.check_if_next_group();
>    }
>
> -  void next_row(Item_sum* item)
> +  void next_row()
>    {
>      if (move)
>      {
> @@ -1187,7 +1251,7 @@ class Frame_range_current_row_top : public
> Frame_cursor
>          // todo: need the following check ?
>          if (!peer_tracker.compare_with_cache())
>            return;
> -        item->remove();
> +        remove_value_from_items();
>        }
>
>        do
> @@ -1196,7 +1260,7 @@ class Frame_range_current_row_top : public
> Frame_cursor
>            return;
>          if (!peer_tracker.compare_with_cache())
>            return;
> -        item->remove();
> +        remove_value_from_items();
>        }
>        while (1);
>      }
> @@ -1214,7 +1278,14 @@ class Frame_range_current_row_top : public
> Frame_cursor
>  class Frame_unbounded_preceding : public Frame_cursor
>  {
>  public:
> -  void next_partition(ha_rows rownum, Item_sum* item)
> +  Frame_unbounded_preceding(THD *thd,
> +                            SQL_I_List<ORDER> *partition_list,
> +                            SQL_I_List<ORDER> *order_list)
> +  {}
> +
> +  void init(READ_RECORD *info) {}
> +
> +  void next_partition(ha_rows rownum)
>    {
>      /*
>        UNBOUNDED PRECEDING frame end just stays on the first row.
> @@ -1222,7 +1293,7 @@ class Frame_unbounded_preceding : public Frame_cursor
>      */
>    }
>
> -  void next_row(Item_sum* item)
> +  void next_row()
>    {
>      /* Do nothing, UNBOUNDED PRECEDING frame end doesn't move. */
>    }
> @@ -1239,18 +1310,22 @@ class Frame_unbounded_following : public
> Frame_cursor
>    Partition_read_cursor cursor;
>
>  public:
> -  void init(THD *thd, READ_RECORD *info, SQL_I_List<ORDER>
> *partition_list,
> -            SQL_I_List<ORDER> *order_list)
> +  Frame_unbounded_following(THD *thd,
> +      SQL_I_List<ORDER> *partition_list,
> +      SQL_I_List<ORDER> *order_list) :
> +    cursor(thd, partition_list) {}
> +
> +  void init(READ_RECORD *info)
>    {
> -    cursor.init(thd, info, partition_list);
> +    cursor.init(info);
>    }
>
> -  void pre_next_partition(ha_rows rownum, Item_sum* item)
> +  void pre_next_partition(ha_rows rownum)
>    {
>      cursor.on_next_partition(rownum);
>    }
>
> -  void next_partition(ha_rows rownum, Item_sum* item)
> +  void next_partition(ha_rows rownum)
>    {
>      if (!rownum)
>      {
> @@ -1258,16 +1333,16 @@ class Frame_unbounded_following : public
> Frame_cursor
>        if (cursor.get_next())
>          return;
>      }
> -    item->add();
> +    add_value_to_items();
>
>      /* Walk to the end of the partition, updating the SUM function */
>      while (!cursor.get_next())
>      {
> -      item->add();
> +      add_value_to_items();
>      }
>    }
>
> -  void next_row(Item_sum* item)
> +  void next_row()
>    {
>      /* Do nothing, UNBOUNDED FOLLOWING frame end doesn't move */
>    }
> @@ -1277,9 +1352,12 @@ class Frame_unbounded_following : public
> Frame_cursor
>  class Frame_unbounded_following_set_count : public
> Frame_unbounded_following
>  {
>  public:
> -  // pre_next_partition is inherited
> +  Frame_unbounded_following_set_count(
> +      THD *thd,
> +      SQL_I_List<ORDER> *partition_list, SQL_I_List<ORDER> *order_list) :
> +    Frame_unbounded_following(thd, partition_list, order_list) {}
>
> -  void next_partition(ha_rows rownum, Item_sum* item)
> +  void next_partition(ha_rows rownum)
>    {
>      ha_rows num_rows_in_partition= 0;
>      if (!rownum)
> @@ -1292,13 +1370,16 @@ class Frame_unbounded_following_set_count : public
> Frame_unbounded_following
>
>      /* Walk to the end of the partition, find how many rows there are. */
>      while (!cursor.get_next())
> -    {
>        num_rows_in_partition++;
> -    }
>
> -    Item_sum_window_with_row_count* item_with_row_count =
> -      static_cast<Item_sum_window_with_row_count *>(item);
> -    item_with_row_count->set_row_count(num_rows_in_partition);
> +    List_iterator_fast<Item_sum> it(sum_functions);
> +    Item_sum* item;
> +    while ((item= it++))
> +    {
> +      Item_sum_window_with_row_count* item_with_row_count =
> +        static_cast<Item_sum_window_with_row_count *>(item);
> +      item_with_row_count->set_row_count(num_rows_in_partition);
> +    }
>    }
>  };
>
> @@ -1324,13 +1405,12 @@ class Frame_n_rows_preceding : public Frame_cursor
>      is_top_bound(is_top_bound_arg), n_rows(n_rows_arg)
>    {}
>
> -  void init(THD *thd, READ_RECORD *info, SQL_I_List<ORDER>
> *partition_list,
> -            SQL_I_List<ORDER> *order_list)
> +  void init(READ_RECORD *info)
>    {
>      cursor.init(info);
>    }
>
> -  void next_partition(ha_rows rownum, Item_sum* item)
> +  void next_partition(ha_rows rownum)
>    {
>      /*
>        Position our cursor to point at the first row in the new partition
> @@ -1357,12 +1437,12 @@ class Frame_n_rows_preceding : public Frame_cursor
>      if (n_rows_to_skip == ha_rows(-1))
>      {
>        cursor.get_next();
> -      item->add();
> +      add_value_to_items();
>        n_rows_to_skip= 0;
>      }
>    }
>
> -  void next_row(Item_sum* item)
> +  void next_row()
>    {
>      if (n_rows_to_skip)
>      {
> @@ -1374,9 +1454,9 @@ class Frame_n_rows_preceding : public Frame_cursor
>        return;  // this is not expected to happen.
>
>      if (is_top_bound) // this is frame start endpoint
> -      item->remove();
> +      remove_value_from_items();
>      else
> -      item->add();
> +      add_value_to_items();
>    }
>  };
>
> @@ -1391,17 +1471,18 @@ class Frame_n_rows_preceding : public Frame_cursor
>  class Frame_rows_current_row_bottom : public Frame_cursor
>  {
>  public:
> -  void pre_next_partition(ha_rows rownum, Item_sum* item)
> +
> +  void pre_next_partition(ha_rows rownum)
>    {
> -    item->add();
> +    add_value_to_items();
>    }
> -  void next_partition(ha_rows rownum, Item_sum* item) {}
> -  void pre_next_row(Item_sum* item)
> +  void next_partition(ha_rows rownum) {}
> +  void pre_next_row()
>    {
>      /* Temp table's current row is current_row. Add it to the window func
> */
> -    item->add();
> +    add_value_to_items();
>    }
> -  void next_row(Item_sum* item) {};
> +  void next_row() {};
>  };
>
>
> @@ -1443,20 +1524,23 @@ class Frame_n_rows_following : public Frame_cursor
>    Partition_read_cursor cursor;
>    bool at_partition_end;
>  public:
> -  Frame_n_rows_following(bool is_top_bound_arg, ha_rows n_rows_arg) :
> -    is_top_bound(is_top_bound_arg), n_rows(n_rows_arg)
> +  Frame_n_rows_following(THD *thd,
> +            SQL_I_List<ORDER> *partition_list,
> +            SQL_I_List<ORDER> *order_list,
> +            bool is_top_bound_arg, ha_rows n_rows_arg) :
> +    is_top_bound(is_top_bound_arg), n_rows(n_rows_arg),
> +    cursor(thd, partition_list)
>    {
>      DBUG_ASSERT(n_rows > 0);
>    }
>
> -  void init(THD *thd, READ_RECORD *info, SQL_I_List<ORDER>
> *partition_list,
> -            SQL_I_List<ORDER> *order_list)
> +  void init(READ_RECORD *info)
>    {
> -    cursor.init(thd, info, partition_list);
> +    cursor.init(info);
>      at_partition_end= false;
>    }
>
> -  void pre_next_partition(ha_rows rownum, Item_sum* item)
> +  void pre_next_partition(ha_rows rownum)
>    {
>      at_partition_end= false;
>
> @@ -1469,39 +1553,39 @@ class Frame_n_rows_following : public Frame_cursor
>
>        // Current row points at the first row in the partition
>        if (is_top_bound) // this is frame top endpoint
> -        item->remove();
> +        remove_value_from_items();
>        else
> -        item->add();
> +        add_value_to_items();
>      }
>    }
>
>    /* Move our cursor to be n_rows ahead.  */
> -  void next_partition(ha_rows rownum, Item_sum* item)
> +  void next_partition(ha_rows rownum)
>    {
>      ha_rows i_end= n_rows + ((rownum==0)?1:0)- is_top_bound;
>      for (ha_rows i= 0; i < i_end; i++)
>      {
> -      if (next_row_intern(item))
> +      if (next_row_intern())
>          break;
>      }
>    }
>
> -  void next_row(Item_sum* item)
> +  void next_row()
>    {
>      if (at_partition_end)
>        return;
> -    next_row_intern(item);
> +    next_row_intern();
>    }
>
>  private:
> -  bool next_row_intern(Item_sum *item)
> +  bool next_row_intern()
>    {
>      if (!cursor.get_next())
>      {
>        if (is_top_bound) // this is frame start endpoint
> -        item->remove();
> +        remove_value_from_items();
>        else
> -        item->add();
> +        add_value_to_items();
>      }
>      else
>        at_partition_end= true;
> @@ -1513,8 +1597,9 @@ class Frame_n_rows_following : public Frame_cursor
>  /*
>    Get a Frame_cursor for a frame bound. This is a "factory function".
>  */
> -Frame_cursor *get_frame_cursor(Window_frame *frame, bool is_top_bound)
> +Frame_cursor *get_frame_cursor(THD *thd, Window_spec *spec, bool
> is_top_bound)
>  {
> +  Window_frame *frame= spec->window_frame;
>    if (!frame)
>    {
>      /*
> @@ -1536,9 +1621,13 @@ Frame_cursor *get_frame_cursor(Window_frame *frame,
> bool is_top_bound)
>          so again the same frame bounds can be used.
>      */
>      if (is_top_bound)
> -      return new Frame_unbounded_preceding;
> +      return new Frame_unbounded_preceding(thd,
> +                                           spec->partition_list,
> +                                           spec->order_list);
>      else
> -      return new Frame_range_current_row_bottom;
> +      return new Frame_range_current_row_bottom(thd,
> +                                                spec->partition_list,
> +                                                spec->order_list);
>    }
>
>    Window_frame_bound *bound= is_top_bound? frame->top_bound :
> @@ -1554,9 +1643,13 @@ Frame_cursor *get_frame_cursor(Window_frame *frame,
> bool is_top_bound)
>      {
>        /* The following serve both RANGE and ROWS: */
>        if (is_preceding)
> -        return new Frame_unbounded_preceding;
> -      else
> -        return new Frame_unbounded_following;
> +        return new Frame_unbounded_preceding(thd,
> +                                             spec->partition_list,
> +                                             spec->order_list);
> +
> +      return new Frame_unbounded_following(thd,
> +                                           spec->partition_list,
> +                                           spec->order_list);
>      }
>
>      if (frame->units == Window_frame::UNITS_ROWS)
> @@ -1567,15 +1660,21 @@ Frame_cursor *get_frame_cursor(Window_frame
> *frame, bool is_top_bound)
>        DBUG_ASSERT((longlong) n_rows >= 0);
>        if (is_preceding)
>          return new Frame_n_rows_preceding(is_top_bound, n_rows);
> -      else
> -        return new Frame_n_rows_following(is_top_bound, n_rows);
> +
> +      return new Frame_n_rows_following(
> +          thd, spec->partition_list, spec->order_list,
> +          is_top_bound, n_rows);
>      }
>      else
>      {
>        if (is_top_bound)
> -        return new Frame_range_n_top(is_preceding, bound->offset);
> -      else
> -        return new Frame_range_n_bottom(is_preceding, bound->offset);
> +        return new Frame_range_n_top(
> +            thd, spec->partition_list, spec->order_list,
> +            is_preceding, bound->offset);
> +
> +      return new Frame_range_n_bottom(thd,
> +          spec->partition_list, spec->order_list,
> +          is_preceding, bound->offset);
>      }
>    }
>
> @@ -1585,67 +1684,154 @@ Frame_cursor *get_frame_cursor(Window_frame
> *frame, bool is_top_bound)
>      {
>        if (is_top_bound)
>          return new Frame_rows_current_row_top;
> -      else
> -        return new Frame_rows_current_row_bottom;
> +
> +      return new Frame_rows_current_row_bottom;
>      }
>      else
>      {
>        if (is_top_bound)
> -        return new Frame_range_current_row_top;
> -      else
> -        return new Frame_range_current_row_bottom;
> +        return new Frame_range_current_row_top(
> +            thd, spec->partition_list, spec->order_list);
> +
> +      return new Frame_range_current_row_bottom(
> +          thd, spec->partition_list, spec->order_list);
>      }
>    }
>    return NULL;
>  }
>
> -void add_extra_frame_cursors(List<Frame_cursor> *cursors,
> -                             const Item_sum *window_func)
> +void add_extra_frame_cursors(THD *thd, Cursor_manager *cursor_manager,
> +                             Item_window_func *window_func)
>  {
> -  switch (window_func->sum_func())
> +  Window_spec *spec= window_func->window_spec;
> +  Item_sum *item_sum= window_func->window_func();
> +  Frame_cursor *fc;
> +  switch (item_sum->sum_func())
>    {
>      case Item_sum::CUME_DIST_FUNC:
> -      cursors->push_back(new Frame_unbounded_preceding);
> -      cursors->push_back(new Frame_range_current_row_bottom);
> +      fc= new Frame_unbounded_preceding(thd,
> +                                        spec->partition_list,
> +                                        spec->order_list);
> +      fc->add_sum_func(item_sum);
> +      cursor_manager->add_cursor(fc);
> +      fc= new Frame_range_current_row_bottom(thd,
> +                                             spec->partition_list,
> +                                             spec->order_list);
> +      fc->add_sum_func(item_sum);
> +      cursor_manager->add_cursor(fc);
>        break;
>      default:
> -      cursors->push_back(new Frame_unbounded_preceding);
> -      cursors->push_back(new Frame_rows_current_row_bottom);
> +      fc= new Frame_unbounded_preceding(
> +              thd, spec->partition_list, spec->order_list);
> +      fc->add_sum_func(item_sum);
> +      cursor_manager->add_cursor(fc);
> +
> +      fc= new Frame_rows_current_row_bottom;
> +      fc->add_sum_func(item_sum);
> +      cursor_manager->add_cursor(fc);
>    }
>  }
>
> -void get_window_func_required_cursors(
> -    List<Frame_cursor> *result, const Item_window_func* item_win)
> +
> +/*
> +   Create required frame cursors for the list of window functions.
> +   Register all functions to their appropriate cursors.
> +   If the window functions share the same frame specification,
> +   those window functions will be registered to the same cursor.
> +*/
> +void get_window_functions_required_cursors(
> +    THD *thd,
> +    List<Item_window_func>& window_functions,
> +    List<Cursor_manager> *cursor_managers)
>  {
> -  if (item_win->requires_partition_size())
> -    result->push_back(new Frame_unbounded_following_set_count);
> +  List_iterator_fast<Item_window_func> it(window_functions);
> +  Item_window_func* item_win_func;
> +  Item_sum *sum_func;
> +  while ((item_win_func= it++))
> +  {
> +    Cursor_manager *cursor_manager = new Cursor_manager();
> +    sum_func = item_win_func->window_func();
> +    Frame_cursor *fc;
> +    /*
> +      Some window functions require the partition size for computing
> values.
> +      Add a cursor that retrieves it as the first one in the list if
> necessary.
> +    */
> +    if (item_win_func->requires_partition_size())
> +    {
> +      fc= new Frame_unbounded_following_set_count(thd,
> +                item_win_func->window_spec->partition_list,
> +                item_win_func->window_spec->order_list);
> +      fc->add_sum_func(sum_func);
> +      cursor_manager->add_cursor(fc);
> +    }
>
> -  /*
> -    If it is not a regular window function that follows frame
> specifications,
> -    specific cursors are required.
> -  */
> -  if (item_win->is_frame_prohibited())
> +    /*
> +      If it is not a regular window function that follows frame
> specifications,
> +      specific cursors are required. ROW_NUM, RANK, NTILE and others
> follow
> +      such rules. Check is_frame_prohibited check for the full list.
> +    */
> +    if (item_win_func->is_frame_prohibited())
> +    {
> +      add_extra_frame_cursors(thd, cursor_manager, item_win_func);
> +      cursor_managers->push_back(cursor_manager);
> +      continue;
> +    }
> +
> +    Frame_cursor *frame_bottom= get_frame_cursor(thd,
> +        item_win_func->window_spec, false);
> +    Frame_cursor *frame_top= get_frame_cursor(thd,
> +        item_win_func->window_spec, true);
> +
> +    frame_bottom->add_sum_func(sum_func);
> +    frame_top->add_sum_func(sum_func);
> +
> +    /*
> +       The order of these cursors is important. A sum function
> +       must first add values (via frame_bottom) then remove them via
> +       frame_top. Removing items first doesn't make sense in the case of
> all
> +       window functions.
> +    */
> +    cursor_manager->add_cursor(frame_bottom);
> +    cursor_manager->add_cursor(frame_top);
> +    cursor_managers->push_back(cursor_manager);
> +  }
> +}
> +
> +/**
> +  Helper function that takes a list of window functions and writes
> +  their values in the current table record.
> +*/
> +static
> +bool save_window_function_values(List<Item_window_func>& window_functions,
> +                                 TABLE *tbl, uchar *rowid_buf)
> +{
> +  List_iterator_fast<Item_window_func> iter(window_functions);
> +  tbl->file->ha_rnd_pos(tbl->record[0], rowid_buf);
> +  store_record(tbl, record[1]);
> +  while (Item_window_func *item_win= iter++)
>    {
> -    add_extra_frame_cursors(result, item_win->window_func());
> -    return;
> +    int err;
> +    item_win->save_in_field(item_win->result_field, true);
> +    // TODO check if this can be placed outside the loop.
> +    err= tbl->file->ha_update_row(tbl->record[1], tbl->record[0]);
> +    if (err && err != HA_ERR_RECORD_IS_THE_SAME)
> +      return true;
>    }
>
> -  /* A regular window function follows the frame specification. */
> -  result->push_back(get_frame_cursor(item_win->window_spec->window_frame,
> -                                     false));
> -  result->push_back(get_frame_cursor(item_win->window_spec->window_frame,
> -                                     true));
> +  return false;
>  }
>
>  /*
> +  TODO(cvicentiu) update this comment to reflect the new execution.
> +
>    Streamed window function computation with window frames.
>
>    We make a single pass over the ordered temp.table, but we're using three
> -  cursors:
> +  cursors:
>     - current row - the row that we're computing window func value for)
>     - start_bound - the start of the frame
>     - bottom_bound   - the end of the frame
> -
> +
>    All three cursors move together.
>
>    @todo
> @@ -1655,7 +1841,7 @@ void get_window_func_required_cursors(
>    @detail
>      ROWS BETWEEN 3 PRECEDING  -- frame start
>                AND 3 FOLLOWING  -- frame end
> -
> +
>                                      /------ frame end (aka BOTTOM)
>      Dataset start                   |
>       --------====*=======[*]========*========-------->> dataset end
> @@ -1663,7 +1849,7 @@ void get_window_func_required_cursors(
>                   |         +-------- current row
>                   |
>                   \-------- frame start ("TOP")
> -
> +
>      - frame_end moves forward and adds rows into the aggregate function.
>      - frame_start follows behind and removes rows from the aggregate
> function.
>      - current_row is the row where the value of aggregate function is
> stored.
> @@ -1672,97 +1858,90 @@ void get_window_func_required_cursors(
>    condition (Others can catch up by counting rows?)
>
>  */
> -
> -bool compute_window_func_with_frames(Item_window_func *item_win,
> -                                     TABLE *tbl, READ_RECORD *info)
> +bool compute_window_func(THD *thd,
> +                         List<Item_window_func>& window_functions,
> +                         List<Cursor_manager>& cursor_managers,
> +                         TABLE *tbl,
> +                         SORT_INFO *filesort_result)
>  {
> -  THD *thd= tbl->in_use;
> -  int err= 0;
> +  List_iterator_fast<Item_window_func> iter_win_funcs(window_functions);
> +  List_iterator_fast<Cursor_manager>
> iter_cursor_managers(cursor_managers);
> +  uint err;
>
> -  Item_sum *sum_func= item_win->window_func();
> -  /* This algorithm doesn't support DISTINCT aggregator */
> -  sum_func->set_aggregator(Aggregator::SIMPLE_AGGREGATOR);
> +  READ_RECORD info;
>
> -  List<Frame_cursor> cursors;
> -  get_window_func_required_cursors(&cursors, item_win);
> +  if (init_read_record(&info, current_thd, tbl, NULL/*select*/,
> filesort_result,
> +                       0, 1, FALSE))
> +    return true;
>
> -  List_iterator_fast<Frame_cursor> it(cursors);
> -  Frame_cursor *c;
> -  while((c= it++))
> +  Cursor_manager *cursor_manager;
> +  while ((cursor_manager= iter_cursor_managers++))
> +    cursor_manager->initialize_cursors(&info);
> +
> +  /* One partition tracker for each window function. */
> +  List<Group_bound_tracker> partition_trackers;
> +  Item_window_func *win_func;
> +  while ((win_func= iter_win_funcs++))
>    {
> -    c->init(thd, info, item_win->window_spec->partition_list,
> -            item_win->window_spec->order_list);
> +    Group_bound_tracker *tracker= new Group_bound_tracker(thd,
> +
> win_func->window_spec->partition_list);
> +    // TODO(cvicentiu) This should be removed and placed in constructor.
> +    tracker->init();
> +    partition_trackers.push_back(tracker);
>    }
>
> -  bool is_error= false;
> +  List_iterator_fast<Group_bound_tracker>
> iter_part_trackers(partition_trackers);
>    ha_rows rownum= 0;
>    uchar *rowid_buf= (uchar*) my_malloc(tbl->file->ref_length, MYF(0));
>
>    while (true)
>    {
> -    /* Move the current_row */
> -    if ((err=info->read_record(info)))
> -    {
> -      break; /* End of file */
> -    }
> -    bool partition_changed= item_win->check_if_partition_changed();
> +    if ((err= info.read_record(&info)))
> +      break; // End of file.
>
> +    /* Remember current row so that we can restore it before computing
> +       each window function. */
>      tbl->file->position(tbl->record[0]);
>      memcpy(rowid_buf, tbl->file->ref, tbl->file->ref_length);
>
> -    if (partition_changed || (rownum == 0))
> -    {
> -      sum_func->clear();
> -      /*
> -        pre_XXX functions assume that tbl->record[0] contains
> current_row, and
> -        they may not change it.
> -      */
> -      it.rewind();
> -      while ((c= it++))
> -        c->pre_next_partition(rownum, sum_func);
> -      /*
> -        We move bottom_bound first, because we want rows to be added into
> the
> -        aggregate before top_bound attempts to remove them.
> -      */
> -      it.rewind();
> -      while ((c= it++))
> -        c->next_partition(rownum, sum_func);
> -    }
> -    else
> -    {
> -      /* Again, both pre_XXX function can find current_row in
> tbl->record[0] */
> -      it.rewind();
> -      while ((c= it++))
> -        c->pre_next_row(sum_func);
> -
> -      /* These make no assumptions about tbl->record[0] and may change it
> */
> -      it.rewind();
> -      while ((c= it++))
> -        c->next_row(sum_func);
> -    }
> -    rownum++;
> +    iter_win_funcs.rewind();
> +    iter_part_trackers.rewind();
> +    iter_cursor_managers.rewind();
>
> -    /*
> -      Frame cursors may have made tbl->record[0] to point to some record
> other
> -      than current_row. This applies to tbl->file's internal state, too.
> -      Fix this by reading the current row again.
> -    */
> -    tbl->file->ha_rnd_pos(tbl->record[0], rowid_buf);
> -    store_record(tbl,record[1]);
> -    item_win->save_in_field(item_win->result_field, true);
> -    err= tbl->file->ha_update_row(tbl->record[1], tbl->record[0]);
> -    if (err && err != HA_ERR_RECORD_IS_THE_SAME)
> +    Group_bound_tracker *tracker;
> +    while ((win_func= iter_win_funcs++) &&
> +           (tracker= iter_part_trackers++) &&
> +           (cursor_manager= iter_cursor_managers++))
>      {
> -      is_error= true;
> -      break;
> +      if (tracker->check_if_next_group() || (rownum == 0))
> +      {
> +        /* TODO(cvicentiu)
> +           Clearing window functions should happen through cursors. */
> +        win_func->window_func()->clear();
> +        cursor_manager->notify_cursors_partition_changed(rownum);
> +      }
> +      else
> +      {
> +        cursor_manager->notify_cursors_next_row();
> +      }
> +      /* Return to current row after notifying cursors for each window
> +         function. */
> +      tbl->file->ha_rnd_pos(tbl->record[0], rowid_buf);
>      }
> +
> +    /* We now have computed values for each window function. They can now
> +       be saved in the current row. */
> +    save_window_function_values(window_functions, tbl, rowid_buf);
> +
> +    rownum++;
>    }
>
>    my_free(rowid_buf);
> -  cursors.delete_elements();
> -  return is_error? true: false;
> -}
> +  partition_trackers.delete_elements();
> +  end_read_record(&info);
>
> +  return false;
> +}
>
>  /* Make a list that is a concation of two lists of ORDER elements */
>
> @@ -1799,25 +1978,18 @@ static ORDER* concat_order_lists(MEM_ROOT
> *mem_root, ORDER *list1, ORDER *list2)
>    return res;
>  }
>
> -
> -bool Window_func_runner::setup(THD *thd)
> +bool Window_func_runner::add_function_to_run(Item_window_func *win_func)
>  {
> -  win_func->setup_partition_border_check(thd);
> +
> +  Item_sum *sum_func= win_func->window_func();
> +  sum_func->setup_window_func(current_thd, win_func->window_spec);
>
>    Item_sum::Sumfunctype type= win_func->window_func()->sum_func();
> -  switch (type)
> +  switch (type)
>    {
>      case Item_sum::ROW_NUMBER_FUNC:
>      case Item_sum::RANK_FUNC:
>      case Item_sum::DENSE_RANK_FUNC:
> -    {
> -      /*
> -        One-pass window function computation, walk through the rows and
> -        assign values.
> -      */
> -      compute_func= compute_window_func_values;
> -      break;
> -    }
>      case Item_sum::COUNT_FUNC:
>      case Item_sum::SUM_BIT_FUNC:
>      case Item_sum::SUM_FUNC:
> @@ -1825,43 +1997,49 @@ bool Window_func_runner::setup(THD *thd)
>      case Item_sum::PERCENT_RANK_FUNC:
>      case Item_sum::CUME_DIST_FUNC:
>      case Item_sum::NTILE_FUNC:
> -    {
> -      /*
> -        Frame-aware window function computation. It does one pass, but
> -        uses three cursors -frame_start, current_row, and frame_end.
> -      */
> -      compute_func= compute_window_func_with_frames;
>        break;
> -    }
> +
>      default:
> -      my_error(ER_NOT_SUPPORTED_YET, MYF(0), "This aggregate as window
> function");
> +      my_error(ER_NOT_SUPPORTED_YET, MYF(0),
> +               "This aggregate as window function");
>        return true;
>    }
>
> -  return false;
> +  return window_functions.push_back(win_func);
>  }
>
>
>  /*
>    Compute the value of window function for all rows.
>  */
> -bool Window_func_runner::exec(TABLE *tbl, SORT_INFO *filesort_result)
> +bool Window_func_runner::exec(THD *thd, TABLE *tbl, SORT_INFO
> *filesort_result)
>  {
> -  THD *thd= current_thd;
> -  win_func->set_phase_to_computation();
> -
> -  /* Go through the sorted array and compute the window function */
> -  READ_RECORD info;
> -
> -  if (init_read_record(&info, thd, tbl, NULL/*select*/, filesort_result,
> -                       0, 1, FALSE))
> -    return true;
> +  List_iterator_fast<Item_window_func> it(window_functions);
> +  Item_window_func *win_func;
> +  while ((win_func= it++))
> +  {
> +    win_func->set_phase_to_computation();
> +    // TODO(cvicentiu) Setting the aggregator should probably be done
> during
> +    // setup of Window_funcs_sort.
> +
> win_func->window_func()->set_aggregator(Aggregator::SIMPLE_AGGREGATOR);
> +  }
> +  it.rewind();
>
> -  bool is_error= compute_func(win_func, tbl, &info);
> +  List<Cursor_manager> cursor_managers;
> +  get_window_functions_required_cursors(thd, window_functions,
> +                                        &cursor_managers);
>
> -  win_func->set_phase_to_retrieval();
> +  /* Go through the sorted array and compute the window function */
> +  bool is_error= compute_window_func(thd,
> +                                     window_functions,
> +                                     cursor_managers,
> +                                     tbl, filesort_result);
> +  while ((win_func= it++))
> +  {
> +    win_func->set_phase_to_retrieval();
> +  }
>
> -  end_read_record(&info);
> +  cursor_managers.delete_elements();
>
>    return is_error;
>  }
> @@ -1872,21 +2050,15 @@ bool Window_funcs_sort::exec(JOIN *join)
>    THD *thd= join->thd;
>    JOIN_TAB *join_tab= &join->join_tab[join->top_join_tab_count];
>
> +  /* Sort the table based on the most specific sorting criteria of
> +     the window functions. */
>    if (create_sort_index(thd, join, join_tab, filesort))
>      return true;
>
>    TABLE *tbl= join_tab->table;
>    SORT_INFO *filesort_result= join_tab->filesort_result;
>
> -  bool is_error= false;
> -  List_iterator<Window_func_runner> it(runners);
> -  Window_func_runner *runner;
> -
> -  while ((runner= it++))
> -  {
> -    if ((is_error= runner->exec(tbl, filesort_result)))
> -      break;
> -  }
> +  bool is_error= runner.exec(thd, tbl, filesort_result);
>
>    delete join_tab->filesort_result;
>    join_tab->filesort_result= NULL;
> @@ -1894,30 +2066,32 @@ bool Window_funcs_sort::exec(JOIN *join)
>  }
>
>
> -bool Window_funcs_sort::setup(THD *thd, SQL_SELECT *sel,
> -                             List_iterator<Item_window_func> &it)
> +bool Window_funcs_sort::setup(THD *thd, SQL_SELECT *sel,
> +                              List_iterator<Item_window_func> &it)
>  {
>    Item_window_func *win_func= it.peek();
>    Item_window_func *prev_win_func;
>
> +  /* The iterator should point to a valid function at the start of
> execution. */
> +  DBUG_ASSERT(win_func);
>    do
>    {
> -    Window_func_runner *runner;
> -    if (!(runner= new Window_func_runner(win_func)) ||
> -        runner->setup(thd))
> -    {
> +    if (runner.add_function_to_run(win_func))
>        return true;
> -    }
> -    runners.push_back(runner);
>      it++;
>      prev_win_func= win_func;
> -  } while ((win_func= it.peek()) && !(win_func->marker &
> SORTORDER_CHANGE_FLAG));
> -
> +  } while ((win_func= it.peek()) &&
> +           !(win_func->marker & SORTORDER_CHANGE_FLAG));
> +
>    /*
>      The sort criteria must be taken from the last win_func in the group of
> -    adjacent win_funcs that do not have SORTORDER_CHANGE_FLAG.
> +    adjacent win_funcs that do not have SORTORDER_CHANGE_FLAG. This is
> +    because the sort order must be the most specific sorting criteria
> defined
> +    within the window function group. This ensures that we sort the table
> +    in a way that the result is valid for all window functions belonging
> to
> +    this Window_funcs_sort.
>    */
> -  Window_spec *spec = prev_win_func->window_spec;
> +  Window_spec *spec= prev_win_func->window_spec;
>
>    ORDER* sort_order= concat_order_lists(thd->mem_root,
>                                          spec->partition_list->first,
> @@ -1932,8 +2106,8 @@ bool Window_funcs_sort::setup(THD *thd, SQL_SELECT
> *sel,
>
>
>  bool Window_funcs_computation::setup(THD *thd,
> -                                          List<Item_window_func>
> *window_funcs,
> -                                          JOIN_TAB *tab)
> +                                     List<Item_window_func> *window_funcs,
> +                                     JOIN_TAB *tab)
>  {
>    order_window_funcs_by_window_specs(window_funcs);
>
> diff --git a/sql/sql_window.h b/sql/sql_window.h
> index 54e39d8..c384724 100644
> --- a/sql/sql_window.h
> +++ b/sql/sql_window.h
> @@ -154,9 +154,7 @@ int setup_windows(THD *thd, Ref_ptr_array
> ref_pointer_array, TABLE_LIST *tables,
>  // Classes that make window functions computation a part of SELECT's
> query plan
>
>  //////////////////////////////////////////////////////////////////////////////
>
> -typedef bool (*window_compute_func_t)(Item_window_func *item_win,
> -                                      TABLE *tbl, READ_RECORD *info);
> -
> +class Frame_cursor;
>  /*
>    This handles computation of one window function.
>
> @@ -165,21 +163,17 @@ typedef bool
> (*window_compute_func_t)(Item_window_func *item_win,
>
>  class Window_func_runner : public Sql_alloc
>  {
> -  Item_window_func *win_func;
> -
> -  /* The function to use for computation*/
> -  window_compute_func_t compute_func;
> -
>  public:
> -  Window_func_runner(Item_window_func *win_func_arg) :
> -    win_func(win_func_arg)
> -  {}
> +  /* Add the function to be computed during the execution pass  */
> +  bool add_function_to_run(Item_window_func *win_func);
>
> -  // Set things up. Create filesort structures, etc
> -  bool setup(THD *thd);
> -
> -  // This sorts and runs the window function.
> -  bool exec(TABLE *tbl, SORT_INFO *filesort_result);
> +  /* Compute and fill the fields in the table. */
> +  bool exec(THD *thd, TABLE *tbl, SORT_INFO *filesort_result);
> +
> +private:
> +  /* A list of window functions for which this Window_func_runner will
> compute
> +     values during the execution phase. */
> +  List<Item_window_func> window_functions;
>  };
>
>
> @@ -191,21 +185,24 @@ class Window_func_runner : public Sql_alloc
>
>  class Window_funcs_sort : public Sql_alloc
>  {
> -  List<Window_func_runner> runners;
> -
> -  /* Window functions can be computed over this sorting */
> -  Filesort *filesort;
>  public:
>    bool setup(THD *thd, SQL_SELECT *sel, List_iterator<Item_window_func>
> &it);
>    bool exec(JOIN *join);
>    void cleanup() { delete filesort; }
>
>    friend class Window_funcs_computation;
> +
> +private:
> +  Window_func_runner runner;
> +
> +  /* Window functions can be computed over this sorting */
> +  Filesort *filesort;
>  };
>
>
>  struct st_join_table;
>  class Explain_aggr_window_funcs;
> +
>  /*
>    This is a "window function computation phase": a single object of this
> class
>    takes care of computing all window functions in a SELECT.
>
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.askmonty.org/pipermail/commits/attachments/20160906/ad8ff0f1/attachment-0001.html>


More information about the commits mailing list