[Commits] 78d3bd5: MDEV-10059: Compute window functions with same sorting criteria simultaneously
Vicențiu Ciorbaru
vicentiu at mariadb.org
Tue Sep 6 19:56:15 EEST 2016
Hi Sergey,
This is the patch that performs the refactoring for computing multiple
window functions in a single scan.
I know the patch is large, the main points changed are the following:
1. Frame_cursors are now initialized at construction. The init function
just sets the READ_RECORD.
2. We no longer have a partition tracker within Item_window_func. Instead
we create it during the values computation stage.
3. Cursor_manager is a new class that owns cursors belonging to 1 window
function. Through this class (and only this class) we advance the cursors.
4. Window_func_runner deals with computing all the window function values
belonging to a Window_funcs_sort.
I recommend you start with Window_func_runner::exec as that's where
everything begins to branch out.
Vicentiu
On Tue, 6 Sep 2016 at 19:51 vicentiu <vicentiu at mariadb.org> wrote:
> revision-id: 78d3bd5e0b8827d21c4fe907aa56dcda85d7dd89
> (mariadb-10.2.1-21-g78d3bd5)
> parent(s): 31a8cf54c8a7913338480a0571feaf32143b5f64
> author: Vicențiu Ciorbaru
> committer: Vicențiu Ciorbaru
> timestamp: 2016-09-06 19:50:50 +0300
> message:
>
> MDEV-10059: Compute window functions with same sorting criteria
> simultaneously
>
> Perform only one table scan for each window function present. We do this
> by keeping keeping cursors for each window function frame bound and
> running them for each function for every row.
>
> ---
> sql/item_windowfunc.cc | 43 +--
> sql/item_windowfunc.h | 90 ++---
> sql/sql_window.cc | 884
> +++++++++++++++++++++++++++++--------------------
> sql/sql_window.h | 37 +--
> 4 files changed, 607 insertions(+), 447 deletions(-)
>
> diff --git a/sql/item_windowfunc.cc b/sql/item_windowfunc.cc
> index d157d54..c8ea979 100644
> --- a/sql/item_windowfunc.cc
> +++ b/sql/item_windowfunc.cc
> @@ -41,7 +41,7 @@ Item_window_func::resolve_window_name(THD *thd)
> return true;
> }
>
> - return false;
> + return false;
> }
>
>
> @@ -154,7 +154,7 @@ void Item_window_func::split_sum_func(THD *thd,
> Ref_ptr_array ref_pointer_array,
>
>
> /*
> - This must be called before advance_window() can be called.
> + This must be called before attempting to compute the window function
> values.
>
> @detail
> If we attempt to do it in fix_fields(), partition_fields will refer
> @@ -162,30 +162,25 @@ void Item_window_func::split_sum_func(THD *thd,
> Ref_ptr_array ref_pointer_array,
> We need it to refer to temp.table columns.
> */
>
> -void Item_window_func::setup_partition_border_check(THD *thd)
> -{
> - partition_tracker.init(thd, window_spec->partition_list);
> - window_func()->setup_window_func(thd, window_spec);
> -}
> -
> -
> void Item_sum_rank::setup_window_func(THD *thd, Window_spec *window_spec)
> {
> /* TODO: move this into Item_window_func? */
> - peer_tracker.init(thd, window_spec->order_list);
> + peer_tracker = new Group_bound_tracker(thd, window_spec->order_list);
> + peer_tracker->init();
> clear();
> }
>
> void Item_sum_dense_rank::setup_window_func(THD *thd, Window_spec
> *window_spec)
> {
> /* TODO: consider moving this && Item_sum_rank's implementation */
> - peer_tracker.init(thd, window_spec->order_list);
> + peer_tracker = new Group_bound_tracker(thd, window_spec->order_list);
> + peer_tracker->init();
> clear();
> }
>
> bool Item_sum_dense_rank::add()
> {
> - if (peer_tracker.check_if_next_group() || first_add)
> + if (peer_tracker->check_if_next_group() || first_add)
> {
> first_add= false;
> dense_rank++;
> @@ -198,7 +193,7 @@ bool Item_sum_dense_rank::add()
> bool Item_sum_rank::add()
> {
> row_number++;
> - if (peer_tracker.check_if_next_group())
> + if (peer_tracker->check_if_next_group())
> {
> /* Row value changed */
> cur_rank= row_number;
> @@ -206,25 +201,10 @@ bool Item_sum_rank::add()
> return false;
> }
>
> -bool Item_window_func::check_if_partition_changed()
> -{
> - return partition_tracker.check_if_next_group();
> -}
> -
> -void Item_window_func::advance_window()
> -{
> - if (check_if_partition_changed())
> - {
> - /* Next partition */
> - window_func()->clear();
> - }
> - window_func()->add();
> -}
> -
> bool Item_sum_percent_rank::add()
> {
> row_number++;
> - if (peer_tracker.check_if_next_group())
> + if (peer_tracker->check_if_next_group())
> {
> /* Row value changed. */
> cur_rank= row_number;
> @@ -235,8 +215,7 @@ bool Item_sum_percent_rank::add()
> void Item_sum_percent_rank::setup_window_func(THD *thd, Window_spec
> *window_spec)
> {
> /* TODO: move this into Item_window_func? */
> - peer_tracker.init(thd, window_spec->order_list);
> + peer_tracker = new Group_bound_tracker(thd, window_spec->order_list);
> + peer_tracker->init();
> clear();
> }
> -
> -
> diff --git a/sql/item_windowfunc.h b/sql/item_windowfunc.h
> index 9d2fa13..5b352a4 100644
> --- a/sql/item_windowfunc.h
> +++ b/sql/item_windowfunc.h
> @@ -12,25 +12,19 @@ int test_if_group_changed(List<Cached_item> &list);
> /* A wrapper around test_if_group_changed */
> class Group_bound_tracker
> {
> - List<Cached_item> group_fields;
> - /*
> - During the first check_if_next_group, the list of cached_items is not
> - initialized. The compare function will return that the items match if
> - the field's value is the same as the Cached_item's default value (0).
> - This flag makes sure that we always return true during the first
> check.
> -
> - XXX This is better to be implemented within test_if_group_changed, but
> - since it is used in other parts of the codebase, we keep it here for
> now.
> - */
> - bool first_check;
> public:
> - void init(THD *thd, SQL_I_List<ORDER> *list)
> +
> + Group_bound_tracker(THD *thd, SQL_I_List<ORDER> *list)
> {
> for (ORDER *curr = list->first; curr; curr=curr->next)
> {
> Cached_item *tmp= new_Cached_item(thd, curr->item[0], TRUE);
> group_fields.push_back(tmp);
> }
> + }
> +
> + void init()
> + {
> first_check= true;
> }
>
> @@ -76,6 +70,19 @@ class Group_bound_tracker
> }
> return 0;
> }
> +
> +private:
> + List<Cached_item> group_fields;
> + /*
> + During the first check_if_next_group, the list of cached_items is not
> + initialized. The compare function will return that the items match if
> + the field's value is the same as the Cached_item's default value (0).
> + This flag makes sure that we always return true during the first
> check.
> +
> + XXX This is better to be implemented within test_if_group_changed, but
> + since it is used in other parts of the codebase, we keep it here for
> now.
> + */
> + bool first_check;
> };
>
> /*
> @@ -92,19 +99,22 @@ class Item_sum_row_number: public Item_sum_int
> longlong count;
>
> public:
> +
> + Item_sum_row_number(THD *thd)
> + : Item_sum_int(thd), count(0) {}
> +
> void clear()
> {
> count= 0;
> }
> - bool add()
> +
> + bool add()
> {
> count++;
> - return false;
> + return false;
> }
> - void update_field() {}
>
> - Item_sum_row_number(THD *thd)
> - : Item_sum_int(thd), count(0) {}
> + void update_field() {}
>
> enum Sumfunctype sum_func() const
> {
> @@ -119,7 +129,6 @@ class Item_sum_row_number: public Item_sum_int
> {
> return "row_number(";
> }
> -
> };
>
>
> @@ -145,9 +154,12 @@ class Item_sum_rank: public Item_sum_int
> protected:
> longlong row_number; // just ROW_NUMBER()
> longlong cur_rank; // current value
> -
> - Group_bound_tracker peer_tracker;
> +
> + Group_bound_tracker *peer_tracker;
> public:
> +
> + Item_sum_rank(THD *thd) : Item_sum_int(thd), peer_tracker(NULL) {}
> +
> void clear()
> {
> /* This is called on partition start */
> @@ -168,10 +180,6 @@ class Item_sum_rank: public Item_sum_int
> TODO: ^^ what does this do ? It is not called ever?
> */
>
> -public:
> - Item_sum_rank(THD *thd)
> - : Item_sum_int(thd) {}
> -
> enum Sumfunctype sum_func () const
> {
> return RANK_FUNC;
> @@ -183,9 +191,12 @@ class Item_sum_rank: public Item_sum_int
> }
>
> void setup_window_func(THD *thd, Window_spec *window_spec);
> +
> void cleanup()
> {
> - peer_tracker.cleanup();
> + if (peer_tracker)
> + peer_tracker->cleanup();
> + delete peer_tracker;
> Item_sum_int::cleanup();
> }
> };
> @@ -214,7 +225,7 @@ class Item_sum_dense_rank: public Item_sum_int
> {
> longlong dense_rank;
> bool first_add;
> - Group_bound_tracker peer_tracker;
> + Group_bound_tracker *peer_tracker;
> public:
> /*
> XXX(cvicentiu) This class could potentially be implemented in the
> rank
> @@ -233,7 +244,7 @@ class Item_sum_dense_rank: public Item_sum_int
> }
>
> Item_sum_dense_rank(THD *thd)
> - : Item_sum_int(thd), dense_rank(0), first_add(true) {}
> + : Item_sum_int(thd), dense_rank(0), first_add(true),
> peer_tracker(NULL) {}
> enum Sumfunctype sum_func () const
> {
> return DENSE_RANK_FUNC;
> @@ -248,7 +259,11 @@ class Item_sum_dense_rank: public Item_sum_int
>
> void cleanup()
> {
> - peer_tracker.cleanup();
> + if (peer_tracker)
> + {
> + peer_tracker->cleanup();
> + delete peer_tracker;
> + }
> Item_sum_int::cleanup();
> }
> };
> @@ -289,7 +304,7 @@ class Item_sum_percent_rank: public
> Item_sum_window_with_row_count
> {
> public:
> Item_sum_percent_rank(THD *thd)
> - : Item_sum_window_with_row_count(thd), cur_rank(1) {}
> + : Item_sum_window_with_row_count(thd), cur_rank(1),
> peer_tracker(NULL) {}
>
> longlong val_int()
> {
> @@ -347,11 +362,15 @@ class Item_sum_percent_rank: public
> Item_sum_window_with_row_count
> longlong cur_rank; // Current rank of the current row.
> longlong row_number; // Value if this were ROW_NUMBER() function.
>
> - Group_bound_tracker peer_tracker;
> + Group_bound_tracker *peer_tracker;
>
> void cleanup()
> {
> - peer_tracker.cleanup();
> + if (peer_tracker)
> + {
> + peer_tracker->cleanup();
> + delete peer_tracker;
> + }
> Item_sum_num::cleanup();
> }
> };
> @@ -502,12 +521,6 @@ class Item_window_func : public Item_func_or_sum
> public:
> Window_spec *window_spec;
>
> - /*
> - This stores the data about the partition we're currently in.
> - advance_window() uses this to tell when we've left one partition and
> - entered another
> - */
> - Group_bound_tracker partition_tracker;
> public:
> Item_window_func(THD *thd, Item_sum *win_func, LEX_STRING *win_name)
> : Item_func_or_sum(thd, (Item *) win_func),
> @@ -600,9 +613,6 @@ class Item_window_func : public Item_func_or_sum
> */
> void setup_partition_border_check(THD *thd);
>
> - void advance_window();
> - bool check_if_partition_changed();
> -
> enum_field_types field_type() const
> {
> return ((Item_sum *) args[0])->field_type();
> diff --git a/sql/sql_window.cc b/sql/sql_window.cc
> index 4c5ef53..a862821 100644
> --- a/sql/sql_window.cc
> +++ b/sql/sql_window.cc
> @@ -37,12 +37,12 @@
> Window_spec::check_window_names(List_iterator_fast<Window_spec> &it)
> if (win_spec->order_list->elements && order_list->elements)
> {
> my_error(ER_ORDER_LIST_IN_REFERENCING_WINDOW_SPEC, MYF(0),
> ref_name);
> - return true;
> + return true;
> }
> - if (win_spec->window_frame)
> + if (win_spec->window_frame)
> {
> my_error(ER_WINDOW_FRAME_IN_REFERENCED_WINDOW_SPEC, MYF(0),
> ref_name);
> - return true;
> + return true;
> }
> referenced_win_spec= win_spec;
> if (partition_list->elements == 0)
> @@ -54,7 +54,7 @@
> Window_spec::check_window_names(List_iterator_fast<Window_spec> &it)
> if (ref_name && !referenced_win_spec)
> {
> my_error(ER_WRONG_WINDOW_SPEC_NAME, MYF(0), ref_name);
> - return true;
> + return true;
> }
> window_names_are_checked= true;
> return false;
> @@ -73,7 +73,7 @@ Window_frame::check_frame_bounds()
> top_bound->precedence_type == Window_frame_bound::FOLLOWING))
> {
> my_error(ER_BAD_COMBINATION_OF_WINDOW_FRAME_BOUND_SPECS, MYF(0));
> - return true;
> + return true;
> }
>
> return false;
> @@ -86,7 +86,7 @@ Window_frame::check_frame_bounds()
>
> int
> setup_windows(THD *thd, Ref_ptr_array ref_pointer_array, TABLE_LIST
> *tables,
> - List<Item> &fields, List<Item> &all_fields,
> + List<Item> &fields, List<Item> &all_fields,
> List<Window_spec> &win_specs, List<Item_window_func>
> &win_funcs)
> {
> Window_spec *win_spec;
> @@ -116,7 +116,7 @@ setup_windows(THD *thd, Ref_ptr_array
> ref_pointer_array, TABLE_LIST *tables,
> it.rewind();
>
> List_iterator_fast<Window_spec> itp(win_specs);
> -
> +
> while ((win_spec= it++))
> {
> bool hidden_group_fields;
> @@ -131,7 +131,7 @@ setup_windows(THD *thd, Ref_ptr_array
> ref_pointer_array, TABLE_LIST *tables,
> {
> DBUG_RETURN(1);
> }
> -
> +
> if (win_spec->window_frame &&
> win_spec->window_frame->exclusion != Window_frame::EXCL_NONE)
> {
> @@ -188,7 +188,7 @@ setup_windows(THD *thd, Ref_ptr_array
> ref_pointer_array, TABLE_LIST *tables,
> }
> }
> }
> -
> +
> /* "ROWS PRECEDING|FOLLOWING $n" must have a numeric $n */
> if (win_spec->window_frame &&
> win_spec->window_frame->units == Window_frame::UNITS_ROWS)
> @@ -219,7 +219,7 @@ setup_windows(THD *thd, Ref_ptr_array
> ref_pointer_array, TABLE_LIST *tables,
> {
> win_func_item->update_used_tables();
> }
> -
> +
> DBUG_RETURN(0);
> }
>
> @@ -445,7 +445,7 @@ typedef int (*Item_window_func_cmp)(Item_window_func
> *f1,
> @brief
> Sort window functions so that those that can be computed together are
> adjacent.
> -
> +
> @detail
> Sort window functions by their
> - required sorting order,
> @@ -498,48 +498,15 @@ void
> order_window_funcs_by_window_specs(List<Item_window_func> *win_func_list)
> }
> else if (win_spec_prev->window_frame != win_spec_curr->window_frame)
> curr->marker|= FRAME_CHANGE_FLAG;
> -
> - prev= curr;
> - }
> +
> + prev= curr;
> + }
> }
>
>
>
> /////////////////////////////////////////////////////////////////////////////
>
>
> -/*
> - Do a pass over sorted table and compute window function values.
> -
> - This function is for handling window functions that can be computed on
> the
> - fly. Examples are RANK() and ROW_NUMBER().
> -*/
> -bool compute_window_func_values(Item_window_func *item_win,
> - TABLE *tbl, READ_RECORD *info)
> -{
> - int err;
> - while (!(err=info->read_record(info)))
> - {
> - store_record(tbl,record[1]);
> -
> - /*
> - This will cause window function to compute its value for the
> - current row :
> - */
> - item_win->advance_window();
> -
> - /*
> - Put the new value into temptable's field
> - TODO: Should this use item_win->update_field() call?
> - Regular aggegate function implementations seem to implement it.
> - */
> - item_win->save_in_field(item_win->result_field, true);
> - err= tbl->file->ha_update_row(tbl->record[1], tbl->record[0]);
> - if (err && err != HA_ERR_RECORD_IS_THE_SAME)
> - return true;
> - }
> - return false;
> -}
> -
>
> /////////////////////////////////////////////////////////////////////////////
> // Window Frames support
>
> /////////////////////////////////////////////////////////////////////////////
> @@ -571,11 +538,6 @@ bool clone_read_record(const READ_RECORD *src,
> READ_RECORD *dst)
>
> class Rowid_seq_cursor
> {
> - uchar *cache_start;
> - uchar *cache_pos;
> - uchar *cache_end;
> - uint ref_length;
> -
> public:
> virtual ~Rowid_seq_cursor() {}
>
> @@ -595,17 +557,17 @@ class Rowid_seq_cursor
> cache_pos+= ref_length;
> return 0;
> }
> -
> +
> ha_rows get_rownum()
> {
> return (cache_pos - cache_start) / ref_length;
> }
>
> - // will be called by ROWS n FOLLOWING to catch up.
> void move_to(ha_rows row_number)
> {
> cache_pos= cache_start + row_number * ref_length;
> }
> +
> protected:
> bool at_eof() { return (cache_pos == cache_end); }
>
> @@ -618,6 +580,12 @@ class Rowid_seq_cursor
> }
>
> uchar *get_curr_rowid() { return cache_pos; }
> +
> +private:
> + uchar *cache_start;
> + uchar *cache_pos;
> + uchar *cache_end;
> + uint ref_length;
> };
>
>
> @@ -627,11 +595,6 @@ class Rowid_seq_cursor
>
> class Table_read_cursor : public Rowid_seq_cursor
> {
> - /*
> - Note: we don't own *read_record, somebody else is using it.
> - We only look at the constant part of it, e.g. table, record buffer,
> etc.
> - */
> - READ_RECORD *read_record;
> public:
> virtual ~Table_read_cursor() {}
>
> @@ -668,7 +631,14 @@ class Table_read_cursor : public Rowid_seq_cursor
> return false; // didn't restore
> }
>
> - // todo: should move_to() also read row here?
> +private:
> + /*
> + Note: we don't own *read_record, somebody else is using it.
> + We only look at the constant part of it, e.g. table, record buffer,
> etc.
> + */
> + READ_RECORD *read_record;
> +
> + // TODO(spetrunia): should move_to() also read row here?
> };
>
>
> @@ -679,14 +649,14 @@ class Table_read_cursor : public Rowid_seq_cursor
>
> class Partition_read_cursor
> {
> - Table_read_cursor tbl_cursor;
> - Group_bound_tracker bound_tracker;
> - bool end_of_partition;
> public:
> - void init(THD *thd, READ_RECORD *info, SQL_I_List<ORDER>
> *partition_list)
> + Partition_read_cursor(THD *thd, SQL_I_List<ORDER> *partition_list) :
> + bound_tracker(thd, partition_list) {}
> +
> + void init(READ_RECORD *info)
> {
> tbl_cursor.init(info);
> - bound_tracker.init(thd, partition_list);
> + bound_tracker.init();
> end_of_partition= false;
> }
>
> @@ -704,7 +674,7 @@ class Partition_read_cursor
> }
>
> /*
> - Moves to a new row. The row is assumed to be within the current
> partition
> + Moves to a new row. The row is assumed to be within the current
> partition.
> */
> void move_to(ha_rows rownum) { tbl_cursor.move_to(rownum); }
>
> @@ -731,14 +701,18 @@ class Partition_read_cursor
> {
> return tbl_cursor.restore_last_row();
> }
> +
> +private:
> + Table_read_cursor tbl_cursor;
> + Group_bound_tracker bound_tracker;
> + bool end_of_partition;
> };
>
>
> /////////////////////////////////////////////////////////////////////////////
>
> -
> /*
> Window frame bound cursor. Abstract interface.
> -
> +
> @detail
> The cursor moves within the partition that the current row is in.
> It may be ahead or behind the current row.
> @@ -775,11 +749,12 @@ class Partition_read_cursor
> class Frame_cursor : public Sql_alloc
> {
> public:
> - virtual void init(THD *thd, READ_RECORD *info,
> - SQL_I_List<ORDER> *partition_list,
> - SQL_I_List<ORDER> *order_list)
> - {}
> + virtual void init(READ_RECORD *info) {};
>
> + bool add_sum_func(Item_sum* item)
> + {
> + return sum_functions.push_back(item);
> + }
> /*
> Current row has moved to the next partition and is positioned on the
> first
> row there. Position the frame bound accordingly.
> @@ -796,20 +771,95 @@ class Frame_cursor : public Sql_alloc
> - The callee may move tbl->file and tbl->record[0] to point to some
> other
> row.
> */
> - virtual void pre_next_partition(ha_rows rownum, Item_sum* item){};
> - virtual void next_partition(ha_rows rownum, Item_sum* item)=0;
> -
> + virtual void pre_next_partition(ha_rows rownum) {};
> + virtual void next_partition(ha_rows rownum)=0;
> +
> /*
> The current row has moved one row forward.
> Move this frame bound accordingly, and update the value of aggregate
> function as necessary.
> */
> - virtual void pre_next_row(Item_sum* item){};
> - virtual void next_row(Item_sum* item)=0;
> -
> - virtual ~Frame_cursor(){}
> + virtual void pre_next_row() {};
> + virtual void next_row()=0;
> +
> + virtual ~Frame_cursor() {}
> +
> +protected:
> + inline void add_value_to_items()
> + {
> + List_iterator_fast<Item_sum> it(sum_functions);
> + Item_sum *item_sum;
> + while ((item_sum= it++))
> + {
> + item_sum->add();
> + }
> + }
> + inline void remove_value_from_items()
> + {
> + List_iterator_fast<Item_sum> it(sum_functions);
> + Item_sum *item_sum;
> + while ((item_sum= it++))
> + {
> + item_sum->remove();
> + }
> + }
> +
> + /* Sum functions that this cursor handles. */
> + List<Item_sum> sum_functions;
> +};
> +
> +/*
> + A class that owns cursor objects associated with a specific window
> function.
> +*/
> +class Cursor_manager
> +{
> +public:
> + bool add_cursor(Frame_cursor *cursor)
> + {
> + return cursors.push_back(cursor);
> + }
> +
> + void initialize_cursors(READ_RECORD *info)
> + {
> + List_iterator_fast<Frame_cursor> iter(cursors);
> + Frame_cursor *fc;
> + while ((fc= iter++))
> + fc->init(info);
> + }
> +
> + void notify_cursors_partition_changed(ha_rows rownum)
> + {
> + List_iterator_fast<Frame_cursor> iter(cursors);
> + Frame_cursor *cursor;
> + while ((cursor= iter++))
> + cursor->pre_next_partition(rownum);
> +
> + iter.rewind();
> + while ((cursor= iter++))
> + cursor->next_partition(rownum);
> + }
> +
> + void notify_cursors_next_row()
> + {
> + List_iterator_fast<Frame_cursor> iter(cursors);
> + Frame_cursor *cursor;
> + while ((cursor= iter++))
> + cursor->pre_next_row();
> +
> + iter.rewind();
> + while ((cursor= iter++))
> + cursor->next_row();
> + }
> +
> + ~Cursor_manager() { cursors.delete_elements(); }
> +
> +private:
> + /* List of the cursors that this manager owns. */
> + List<Frame_cursor> cursors;
> };
>
> +
> +
>
> //////////////////////////////////////////////////////////////////////////////
> // RANGE-type frames
>
> //////////////////////////////////////////////////////////////////////////////
> @@ -841,16 +891,12 @@ class Frame_range_n_top : public Frame_cursor
> */
> int order_direction;
> public:
> - Frame_range_n_top(bool is_preceding_arg, Item *n_val_arg) :
> + Frame_range_n_top(THD *thd,
> + SQL_I_List<ORDER> *partition_list,
> + SQL_I_List<ORDER> *order_list,
> + bool is_preceding_arg, Item *n_val_arg) :
> n_val(n_val_arg), item_add(NULL), is_preceding(is_preceding_arg)
> - {}
> -
> - void init(THD *thd, READ_RECORD *info,
> - SQL_I_List<ORDER> *partition_list,
> - SQL_I_List<ORDER> *order_list)
> {
> - cursor.init(info);
> -
> DBUG_ASSERT(order_list->elements == 1);
> Item *src_expr= order_list->first->item[0];
> if (order_list->first->direction == ORDER::ORDER_ASC)
> @@ -872,24 +918,30 @@ class Frame_range_n_top : public Frame_cursor
> item_add->fix_fields(thd, &item_add);
> }
>
> - void pre_next_partition(ha_rows rownum, Item_sum* item)
> + void init(READ_RECORD *info)
> + {
> + cursor.init(info);
> +
> + }
> +
> + void pre_next_partition(ha_rows rownum)
> {
> // Save the value of FUNC(current_row)
> range_expr->fetch_value_from(item_add);
> }
>
> - void next_partition(ha_rows rownum, Item_sum* item)
> + void next_partition(ha_rows rownum)
> {
> cursor.move_to(rownum);
> - walk_till_non_peer(item);
> + walk_till_non_peer();
> }
>
> - void pre_next_row(Item_sum* item)
> + void pre_next_row()
> {
> range_expr->fetch_value_from(item_add);
> }
>
> - void next_row(Item_sum* item)
> + void next_row()
> {
> /*
> Ok, our cursor is at the first row R where
> @@ -900,19 +952,19 @@ class Frame_range_n_top : public Frame_cursor
> {
> if (order_direction * range_expr->cmp_read_only() <= 0)
> return;
> - item->remove();
> + remove_value_from_items();
> }
> - walk_till_non_peer(item);
> + walk_till_non_peer();
> }
>
> private:
> - void walk_till_non_peer(Item_sum* item)
> + void walk_till_non_peer()
> {
> while (!cursor.get_next())
> {
> if (order_direction * range_expr->cmp_read_only() <= 0)
> break;
> - item->remove();
> + remove_value_from_items();
> }
> }
> };
> @@ -950,16 +1002,13 @@ class Frame_range_n_bottom: public Frame_cursor
> */
> int order_direction;
> public:
> - Frame_range_n_bottom(bool is_preceding_arg, Item *n_val_arg) :
> - n_val(n_val_arg), item_add(NULL), is_preceding(is_preceding_arg)
> - {}
> -
> - void init(THD *thd, READ_RECORD *info,
> - SQL_I_List<ORDER> *partition_list,
> - SQL_I_List<ORDER> *order_list)
> + Frame_range_n_bottom(THD *thd,
> + SQL_I_List<ORDER> *partition_list,
> + SQL_I_List<ORDER> *order_list,
> + bool is_preceding_arg, Item *n_val_arg) :
> + cursor(thd, partition_list), n_val(n_val_arg), item_add(NULL),
> + is_preceding(is_preceding_arg)
> {
> - cursor.init(thd, info, partition_list);
> -
> DBUG_ASSERT(order_list->elements == 1);
> Item *src_expr= order_list->first->item[0];
>
> @@ -982,7 +1031,12 @@ class Frame_range_n_bottom: public Frame_cursor
> item_add->fix_fields(thd, &item_add);
> }
>
> - void pre_next_partition(ha_rows rownum, Item_sum* item)
> + void init(READ_RECORD *info)
> + {
> + cursor.init(info);
> + }
> +
> + void pre_next_partition(ha_rows rownum)
> {
> // Save the value of FUNC(current_row)
> range_expr->fetch_value_from(item_add);
> @@ -991,20 +1045,20 @@ class Frame_range_n_bottom: public Frame_cursor
> end_of_partition= false;
> }
>
> - void next_partition(ha_rows rownum, Item_sum* item)
> + void next_partition(ha_rows rownum)
> {
> cursor.move_to(rownum);
> - walk_till_non_peer(item);
> + walk_till_non_peer();
> }
>
> - void pre_next_row(Item_sum* item)
> + void pre_next_row()
> {
> if (end_of_partition)
> return;
> range_expr->fetch_value_from(item_add);
> }
>
> - void next_row(Item_sum* item)
> + void next_row()
> {
> if (end_of_partition)
> return;
> @@ -1017,20 +1071,20 @@ class Frame_range_n_bottom: public Frame_cursor
> {
> if (order_direction * range_expr->cmp_read_only() < 0)
> return;
> - item->add();
> + add_value_to_items();
> }
> - walk_till_non_peer(item);
> + walk_till_non_peer();
> }
>
> private:
> - void walk_till_non_peer(Item_sum* item)
> + void walk_till_non_peer()
> {
> int res;
> while (!(res= cursor.get_next()))
> {
> if (order_direction * range_expr->cmp_read_only() < 0)
> break;
> - item->add();
> + add_value_to_items();
> }
> if (res)
> end_of_partition= true;
> @@ -1043,11 +1097,11 @@ class Frame_range_n_bottom: public Frame_cursor
> ...
> | peer1
> | peer2 <----- current_row
> - | peer3
> + | peer3
> +-peer4 <----- the cursor points here. peer4 itself is included.
> nonpeer1
> nonpeer2
> -
> +
> This bound moves in front of the current_row. It should be a the first
> row
> that is still a peer of the current row.
> */
> @@ -1060,15 +1114,20 @@ class Frame_range_current_row_bottom: public
> Frame_cursor
>
> bool dont_move;
> public:
> - void init(THD *thd, READ_RECORD *info,
> - SQL_I_List<ORDER> *partition_list,
> - SQL_I_List<ORDER> *order_list)
> + Frame_range_current_row_bottom(THD *thd,
> + SQL_I_List<ORDER> *partition_list,
> + SQL_I_List<ORDER> *order_list) :
> + cursor(thd, partition_list), peer_tracker(thd, order_list)
> + {
> + }
> +
> + void init(READ_RECORD *info)
> {
> - cursor.init(thd, info, partition_list);
> - peer_tracker.init(thd, order_list);
> + cursor.init(info);
> + peer_tracker.init();
> }
>
> - void pre_next_partition(ha_rows rownum, Item_sum* item)
> + void pre_next_partition(ha_rows rownum)
> {
> // Save the value of the current_row
> peer_tracker.check_if_next_group();
> @@ -1076,23 +1135,23 @@ class Frame_range_current_row_bottom: public
> Frame_cursor
> if (rownum != 0)
> {
> // Add the current row now because our cursor has already seen it
> - item->add();
> + add_value_to_items();
> }
> }
>
> - void next_partition(ha_rows rownum, Item_sum* item)
> + void next_partition(ha_rows rownum)
> {
> - walk_till_non_peer(item);
> + walk_till_non_peer();
> }
>
> - void pre_next_row(Item_sum* item)
> + void pre_next_row()
> {
> dont_move= !peer_tracker.check_if_next_group();
> if (!dont_move)
> - item->add();
> + add_value_to_items();
> }
>
> - void next_row(Item_sum* item)
> + void next_row()
> {
> // Check if our cursor is pointing at a peer of the current row.
> // If not, move forward until that becomes true
> @@ -1104,11 +1163,11 @@ class Frame_range_current_row_bottom: public
> Frame_cursor
> */
> return;
> }
> - walk_till_non_peer(item);
> + walk_till_non_peer();
> }
>
> private:
> - void walk_till_non_peer(Item_sum* item)
> + void walk_till_non_peer()
> {
> /*
> Walk forward until we've met first row that's not a peer of the
> current
> @@ -1118,7 +1177,7 @@ class Frame_range_current_row_bottom: public
> Frame_cursor
> {
> if (peer_tracker.compare_with_cache())
> break;
> - item->add();
> + add_value_to_items();
> }
> }
> };
> @@ -1148,33 +1207,38 @@ class Frame_range_current_row_top : public
> Frame_cursor
>
> bool move;
> public:
> - void init(THD *thd, READ_RECORD *info,
> - SQL_I_List<ORDER> *partition_list,
> - SQL_I_List<ORDER> *order_list)
> + Frame_range_current_row_top(THD *thd,
> + SQL_I_List<ORDER> *partition_list,
> + SQL_I_List<ORDER> *order_list) :
> + bound_tracker(thd, partition_list), cursor(), peer_tracker(thd,
> order_list),
> + move(false)
> + {}
> +
> + void init(READ_RECORD *info)
> {
> - bound_tracker.init(thd, partition_list);
> + bound_tracker.init();
>
> cursor.init(info);
> - peer_tracker.init(thd, order_list);
> + peer_tracker.init();
> }
>
> - void pre_next_partition(ha_rows rownum, Item_sum* item)
> + void pre_next_partition(ha_rows rownum)
> {
> // Fetch the value from the first row
> peer_tracker.check_if_next_group();
> cursor.move_to(rownum+1);
> }
>
> - void next_partition(ha_rows rownum, Item_sum* item) {}
> + void next_partition(ha_rows rownum) {}
>
> - void pre_next_row(Item_sum* item)
> + void pre_next_row()
> {
> // Check if the new current_row is a peer of the row that our cursor
> is
> // pointing to.
> move= peer_tracker.check_if_next_group();
> }
>
> - void next_row(Item_sum* item)
> + void next_row()
> {
> if (move)
> {
> @@ -1187,7 +1251,7 @@ class Frame_range_current_row_top : public
> Frame_cursor
> // todo: need the following check ?
> if (!peer_tracker.compare_with_cache())
> return;
> - item->remove();
> + remove_value_from_items();
> }
>
> do
> @@ -1196,7 +1260,7 @@ class Frame_range_current_row_top : public
> Frame_cursor
> return;
> if (!peer_tracker.compare_with_cache())
> return;
> - item->remove();
> + remove_value_from_items();
> }
> while (1);
> }
> @@ -1214,7 +1278,14 @@ class Frame_range_current_row_top : public
> Frame_cursor
> class Frame_unbounded_preceding : public Frame_cursor
> {
> public:
> - void next_partition(ha_rows rownum, Item_sum* item)
> + Frame_unbounded_preceding(THD *thd,
> + SQL_I_List<ORDER> *partition_list,
> + SQL_I_List<ORDER> *order_list)
> + {}
> +
> + void init(READ_RECORD *info) {}
> +
> + void next_partition(ha_rows rownum)
> {
> /*
> UNBOUNDED PRECEDING frame end just stays on the first row.
> @@ -1222,7 +1293,7 @@ class Frame_unbounded_preceding : public Frame_cursor
> */
> }
>
> - void next_row(Item_sum* item)
> + void next_row()
> {
> /* Do nothing, UNBOUNDED PRECEDING frame end doesn't move. */
> }
> @@ -1239,18 +1310,22 @@ class Frame_unbounded_following : public
> Frame_cursor
> Partition_read_cursor cursor;
>
> public:
> - void init(THD *thd, READ_RECORD *info, SQL_I_List<ORDER>
> *partition_list,
> - SQL_I_List<ORDER> *order_list)
> + Frame_unbounded_following(THD *thd,
> + SQL_I_List<ORDER> *partition_list,
> + SQL_I_List<ORDER> *order_list) :
> + cursor(thd, partition_list) {}
> +
> + void init(READ_RECORD *info)
> {
> - cursor.init(thd, info, partition_list);
> + cursor.init(info);
> }
>
> - void pre_next_partition(ha_rows rownum, Item_sum* item)
> + void pre_next_partition(ha_rows rownum)
> {
> cursor.on_next_partition(rownum);
> }
>
> - void next_partition(ha_rows rownum, Item_sum* item)
> + void next_partition(ha_rows rownum)
> {
> if (!rownum)
> {
> @@ -1258,16 +1333,16 @@ class Frame_unbounded_following : public
> Frame_cursor
> if (cursor.get_next())
> return;
> }
> - item->add();
> + add_value_to_items();
>
> /* Walk to the end of the partition, updating the SUM function */
> while (!cursor.get_next())
> {
> - item->add();
> + add_value_to_items();
> }
> }
>
> - void next_row(Item_sum* item)
> + void next_row()
> {
> /* Do nothing, UNBOUNDED FOLLOWING frame end doesn't move */
> }
> @@ -1277,9 +1352,12 @@ class Frame_unbounded_following : public
> Frame_cursor
> class Frame_unbounded_following_set_count : public
> Frame_unbounded_following
> {
> public:
> - // pre_next_partition is inherited
> + Frame_unbounded_following_set_count(
> + THD *thd,
> + SQL_I_List<ORDER> *partition_list, SQL_I_List<ORDER> *order_list) :
> + Frame_unbounded_following(thd, partition_list, order_list) {}
>
> - void next_partition(ha_rows rownum, Item_sum* item)
> + void next_partition(ha_rows rownum)
> {
> ha_rows num_rows_in_partition= 0;
> if (!rownum)
> @@ -1292,13 +1370,16 @@ class Frame_unbounded_following_set_count : public
> Frame_unbounded_following
>
> /* Walk to the end of the partition, find how many rows there are. */
> while (!cursor.get_next())
> - {
> num_rows_in_partition++;
> - }
>
> - Item_sum_window_with_row_count* item_with_row_count =
> - static_cast<Item_sum_window_with_row_count *>(item);
> - item_with_row_count->set_row_count(num_rows_in_partition);
> + List_iterator_fast<Item_sum> it(sum_functions);
> + Item_sum* item;
> + while ((item= it++))
> + {
> + Item_sum_window_with_row_count* item_with_row_count =
> + static_cast<Item_sum_window_with_row_count *>(item);
> + item_with_row_count->set_row_count(num_rows_in_partition);
> + }
> }
> };
>
> @@ -1324,13 +1405,12 @@ class Frame_n_rows_preceding : public Frame_cursor
> is_top_bound(is_top_bound_arg), n_rows(n_rows_arg)
> {}
>
> - void init(THD *thd, READ_RECORD *info, SQL_I_List<ORDER>
> *partition_list,
> - SQL_I_List<ORDER> *order_list)
> + void init(READ_RECORD *info)
> {
> cursor.init(info);
> }
>
> - void next_partition(ha_rows rownum, Item_sum* item)
> + void next_partition(ha_rows rownum)
> {
> /*
> Position our cursor to point at the first row in the new partition
> @@ -1357,12 +1437,12 @@ class Frame_n_rows_preceding : public Frame_cursor
> if (n_rows_to_skip == ha_rows(-1))
> {
> cursor.get_next();
> - item->add();
> + add_value_to_items();
> n_rows_to_skip= 0;
> }
> }
>
> - void next_row(Item_sum* item)
> + void next_row()
> {
> if (n_rows_to_skip)
> {
> @@ -1374,9 +1454,9 @@ class Frame_n_rows_preceding : public Frame_cursor
> return; // this is not expected to happen.
>
> if (is_top_bound) // this is frame start endpoint
> - item->remove();
> + remove_value_from_items();
> else
> - item->add();
> + add_value_to_items();
> }
> };
>
> @@ -1391,17 +1471,18 @@ class Frame_n_rows_preceding : public Frame_cursor
> class Frame_rows_current_row_bottom : public Frame_cursor
> {
> public:
> - void pre_next_partition(ha_rows rownum, Item_sum* item)
> +
> + void pre_next_partition(ha_rows rownum)
> {
> - item->add();
> + add_value_to_items();
> }
> - void next_partition(ha_rows rownum, Item_sum* item) {}
> - void pre_next_row(Item_sum* item)
> + void next_partition(ha_rows rownum) {}
> + void pre_next_row()
> {
> /* Temp table's current row is current_row. Add it to the window func
> */
> - item->add();
> + add_value_to_items();
> }
> - void next_row(Item_sum* item) {};
> + void next_row() {};
> };
>
>
> @@ -1443,20 +1524,23 @@ class Frame_n_rows_following : public Frame_cursor
> Partition_read_cursor cursor;
> bool at_partition_end;
> public:
> - Frame_n_rows_following(bool is_top_bound_arg, ha_rows n_rows_arg) :
> - is_top_bound(is_top_bound_arg), n_rows(n_rows_arg)
> + Frame_n_rows_following(THD *thd,
> + SQL_I_List<ORDER> *partition_list,
> + SQL_I_List<ORDER> *order_list,
> + bool is_top_bound_arg, ha_rows n_rows_arg) :
> + is_top_bound(is_top_bound_arg), n_rows(n_rows_arg),
> + cursor(thd, partition_list)
> {
> DBUG_ASSERT(n_rows > 0);
> }
>
> - void init(THD *thd, READ_RECORD *info, SQL_I_List<ORDER>
> *partition_list,
> - SQL_I_List<ORDER> *order_list)
> + void init(READ_RECORD *info)
> {
> - cursor.init(thd, info, partition_list);
> + cursor.init(info);
> at_partition_end= false;
> }
>
> - void pre_next_partition(ha_rows rownum, Item_sum* item)
> + void pre_next_partition(ha_rows rownum)
> {
> at_partition_end= false;
>
> @@ -1469,39 +1553,39 @@ class Frame_n_rows_following : public Frame_cursor
>
> // Current row points at the first row in the partition
> if (is_top_bound) // this is frame top endpoint
> - item->remove();
> + remove_value_from_items();
> else
> - item->add();
> + add_value_to_items();
> }
> }
>
> /* Move our cursor to be n_rows ahead. */
> - void next_partition(ha_rows rownum, Item_sum* item)
> + void next_partition(ha_rows rownum)
> {
> ha_rows i_end= n_rows + ((rownum==0)?1:0)- is_top_bound;
> for (ha_rows i= 0; i < i_end; i++)
> {
> - if (next_row_intern(item))
> + if (next_row_intern())
> break;
> }
> }
>
> - void next_row(Item_sum* item)
> + void next_row()
> {
> if (at_partition_end)
> return;
> - next_row_intern(item);
> + next_row_intern();
> }
>
> private:
> - bool next_row_intern(Item_sum *item)
> + bool next_row_intern()
> {
> if (!cursor.get_next())
> {
> if (is_top_bound) // this is frame start endpoint
> - item->remove();
> + remove_value_from_items();
> else
> - item->add();
> + add_value_to_items();
> }
> else
> at_partition_end= true;
> @@ -1513,8 +1597,9 @@ class Frame_n_rows_following : public Frame_cursor
> /*
> Get a Frame_cursor for a frame bound. This is a "factory function".
> */
> -Frame_cursor *get_frame_cursor(Window_frame *frame, bool is_top_bound)
> +Frame_cursor *get_frame_cursor(THD *thd, Window_spec *spec, bool
> is_top_bound)
> {
> + Window_frame *frame= spec->window_frame;
> if (!frame)
> {
> /*
> @@ -1536,9 +1621,13 @@ Frame_cursor *get_frame_cursor(Window_frame *frame,
> bool is_top_bound)
> so again the same frame bounds can be used.
> */
> if (is_top_bound)
> - return new Frame_unbounded_preceding;
> + return new Frame_unbounded_preceding(thd,
> + spec->partition_list,
> + spec->order_list);
> else
> - return new Frame_range_current_row_bottom;
> + return new Frame_range_current_row_bottom(thd,
> + spec->partition_list,
> + spec->order_list);
> }
>
> Window_frame_bound *bound= is_top_bound? frame->top_bound :
> @@ -1554,9 +1643,13 @@ Frame_cursor *get_frame_cursor(Window_frame *frame,
> bool is_top_bound)
> {
> /* The following serve both RANGE and ROWS: */
> if (is_preceding)
> - return new Frame_unbounded_preceding;
> - else
> - return new Frame_unbounded_following;
> + return new Frame_unbounded_preceding(thd,
> + spec->partition_list,
> + spec->order_list);
> +
> + return new Frame_unbounded_following(thd,
> + spec->partition_list,
> + spec->order_list);
> }
>
> if (frame->units == Window_frame::UNITS_ROWS)
> @@ -1567,15 +1660,21 @@ Frame_cursor *get_frame_cursor(Window_frame
> *frame, bool is_top_bound)
> DBUG_ASSERT((longlong) n_rows >= 0);
> if (is_preceding)
> return new Frame_n_rows_preceding(is_top_bound, n_rows);
> - else
> - return new Frame_n_rows_following(is_top_bound, n_rows);
> +
> + return new Frame_n_rows_following(
> + thd, spec->partition_list, spec->order_list,
> + is_top_bound, n_rows);
> }
> else
> {
> if (is_top_bound)
> - return new Frame_range_n_top(is_preceding, bound->offset);
> - else
> - return new Frame_range_n_bottom(is_preceding, bound->offset);
> + return new Frame_range_n_top(
> + thd, spec->partition_list, spec->order_list,
> + is_preceding, bound->offset);
> +
> + return new Frame_range_n_bottom(thd,
> + spec->partition_list, spec->order_list,
> + is_preceding, bound->offset);
> }
> }
>
> @@ -1585,67 +1684,154 @@ Frame_cursor *get_frame_cursor(Window_frame
> *frame, bool is_top_bound)
> {
> if (is_top_bound)
> return new Frame_rows_current_row_top;
> - else
> - return new Frame_rows_current_row_bottom;
> +
> + return new Frame_rows_current_row_bottom;
> }
> else
> {
> if (is_top_bound)
> - return new Frame_range_current_row_top;
> - else
> - return new Frame_range_current_row_bottom;
> + return new Frame_range_current_row_top(
> + thd, spec->partition_list, spec->order_list);
> +
> + return new Frame_range_current_row_bottom(
> + thd, spec->partition_list, spec->order_list);
> }
> }
> return NULL;
> }
>
> -void add_extra_frame_cursors(List<Frame_cursor> *cursors,
> - const Item_sum *window_func)
> +void add_extra_frame_cursors(THD *thd, Cursor_manager *cursor_manager,
> + Item_window_func *window_func)
> {
> - switch (window_func->sum_func())
> + Window_spec *spec= window_func->window_spec;
> + Item_sum *item_sum= window_func->window_func();
> + Frame_cursor *fc;
> + switch (item_sum->sum_func())
> {
> case Item_sum::CUME_DIST_FUNC:
> - cursors->push_back(new Frame_unbounded_preceding);
> - cursors->push_back(new Frame_range_current_row_bottom);
> + fc= new Frame_unbounded_preceding(thd,
> + spec->partition_list,
> + spec->order_list);
> + fc->add_sum_func(item_sum);
> + cursor_manager->add_cursor(fc);
> + fc= new Frame_range_current_row_bottom(thd,
> + spec->partition_list,
> + spec->order_list);
> + fc->add_sum_func(item_sum);
> + cursor_manager->add_cursor(fc);
> break;
> default:
> - cursors->push_back(new Frame_unbounded_preceding);
> - cursors->push_back(new Frame_rows_current_row_bottom);
> + fc= new Frame_unbounded_preceding(
> + thd, spec->partition_list, spec->order_list);
> + fc->add_sum_func(item_sum);
> + cursor_manager->add_cursor(fc);
> +
> + fc= new Frame_rows_current_row_bottom;
> + fc->add_sum_func(item_sum);
> + cursor_manager->add_cursor(fc);
> }
> }
>
> -void get_window_func_required_cursors(
> - List<Frame_cursor> *result, const Item_window_func* item_win)
> +
> +/*
> + Create required frame cursors for the list of window functions.
> + Register all functions to their appropriate cursors.
> + If the window functions share the same frame specification,
> + those window functions will be registered to the same cursor.
> +*/
> +void get_window_functions_required_cursors(
> + THD *thd,
> + List<Item_window_func>& window_functions,
> + List<Cursor_manager> *cursor_managers)
> {
> - if (item_win->requires_partition_size())
> - result->push_back(new Frame_unbounded_following_set_count);
> + List_iterator_fast<Item_window_func> it(window_functions);
> + Item_window_func* item_win_func;
> + Item_sum *sum_func;
> + while ((item_win_func= it++))
> + {
> + Cursor_manager *cursor_manager = new Cursor_manager();
> + sum_func = item_win_func->window_func();
> + Frame_cursor *fc;
> + /*
> + Some window functions require the partition size for computing
> values.
> + Add a cursor that retrieves it as the first one in the list if
> necessary.
> + */
> + if (item_win_func->requires_partition_size())
> + {
> + fc= new Frame_unbounded_following_set_count(thd,
> + item_win_func->window_spec->partition_list,
> + item_win_func->window_spec->order_list);
> + fc->add_sum_func(sum_func);
> + cursor_manager->add_cursor(fc);
> + }
>
> - /*
> - If it is not a regular window function that follows frame
> specifications,
> - specific cursors are required.
> - */
> - if (item_win->is_frame_prohibited())
> + /*
> + If it is not a regular window function that follows frame
> specifications,
> + specific cursors are required. ROW_NUM, RANK, NTILE and others
> follow
> + such rules. Check is_frame_prohibited check for the full list.
> + */
> + if (item_win_func->is_frame_prohibited())
> + {
> + add_extra_frame_cursors(thd, cursor_manager, item_win_func);
> + cursor_managers->push_back(cursor_manager);
> + continue;
> + }
> +
> + Frame_cursor *frame_bottom= get_frame_cursor(thd,
> + item_win_func->window_spec, false);
> + Frame_cursor *frame_top= get_frame_cursor(thd,
> + item_win_func->window_spec, true);
> +
> + frame_bottom->add_sum_func(sum_func);
> + frame_top->add_sum_func(sum_func);
> +
> + /*
> + The order of these cursors is important. A sum function
> + must first add values (via frame_bottom) then remove them via
> + frame_top. Removing items first doesn't make sense in the case of
> all
> + window functions.
> + */
> + cursor_manager->add_cursor(frame_bottom);
> + cursor_manager->add_cursor(frame_top);
> + cursor_managers->push_back(cursor_manager);
> + }
> +}
> +
> +/**
> + Helper function that takes a list of window functions and writes
> + their values in the current table record.
> +*/
> +static
> +bool save_window_function_values(List<Item_window_func>& window_functions,
> + TABLE *tbl, uchar *rowid_buf)
> +{
> + List_iterator_fast<Item_window_func> iter(window_functions);
> + tbl->file->ha_rnd_pos(tbl->record[0], rowid_buf);
> + store_record(tbl, record[1]);
> + while (Item_window_func *item_win= iter++)
> {
> - add_extra_frame_cursors(result, item_win->window_func());
> - return;
> + int err;
> + item_win->save_in_field(item_win->result_field, true);
> + // TODO check if this can be placed outside the loop.
> + err= tbl->file->ha_update_row(tbl->record[1], tbl->record[0]);
> + if (err && err != HA_ERR_RECORD_IS_THE_SAME)
> + return true;
> }
>
> - /* A regular window function follows the frame specification. */
> - result->push_back(get_frame_cursor(item_win->window_spec->window_frame,
> - false));
> - result->push_back(get_frame_cursor(item_win->window_spec->window_frame,
> - true));
> + return false;
> }
>
> /*
> + TODO(cvicentiu) update this comment to reflect the new execution.
> +
> Streamed window function computation with window frames.
>
> We make a single pass over the ordered temp.table, but we're using three
> - cursors:
> + cursors:
> - current row - the row that we're computing window func value for)
> - start_bound - the start of the frame
> - bottom_bound - the end of the frame
> -
> +
> All three cursors move together.
>
> @todo
> @@ -1655,7 +1841,7 @@ void get_window_func_required_cursors(
> @detail
> ROWS BETWEEN 3 PRECEDING -- frame start
> AND 3 FOLLOWING -- frame end
> -
> +
> /------ frame end (aka BOTTOM)
> Dataset start |
> --------====*=======[*]========*========-------->> dataset end
> @@ -1663,7 +1849,7 @@ void get_window_func_required_cursors(
> | +-------- current row
> |
> \-------- frame start ("TOP")
> -
> +
> - frame_end moves forward and adds rows into the aggregate function.
> - frame_start follows behind and removes rows from the aggregate
> function.
> - current_row is the row where the value of aggregate function is
> stored.
> @@ -1672,97 +1858,90 @@ void get_window_func_required_cursors(
> condition (Others can catch up by counting rows?)
>
> */
> -
> -bool compute_window_func_with_frames(Item_window_func *item_win,
> - TABLE *tbl, READ_RECORD *info)
> +bool compute_window_func(THD *thd,
> + List<Item_window_func>& window_functions,
> + List<Cursor_manager>& cursor_managers,
> + TABLE *tbl,
> + SORT_INFO *filesort_result)
> {
> - THD *thd= tbl->in_use;
> - int err= 0;
> + List_iterator_fast<Item_window_func> iter_win_funcs(window_functions);
> + List_iterator_fast<Cursor_manager>
> iter_cursor_managers(cursor_managers);
> + uint err;
>
> - Item_sum *sum_func= item_win->window_func();
> - /* This algorithm doesn't support DISTINCT aggregator */
> - sum_func->set_aggregator(Aggregator::SIMPLE_AGGREGATOR);
> + READ_RECORD info;
>
> - List<Frame_cursor> cursors;
> - get_window_func_required_cursors(&cursors, item_win);
> + if (init_read_record(&info, current_thd, tbl, NULL/*select*/,
> filesort_result,
> + 0, 1, FALSE))
> + return true;
>
> - List_iterator_fast<Frame_cursor> it(cursors);
> - Frame_cursor *c;
> - while((c= it++))
> + Cursor_manager *cursor_manager;
> + while ((cursor_manager= iter_cursor_managers++))
> + cursor_manager->initialize_cursors(&info);
> +
> + /* One partition tracker for each window function. */
> + List<Group_bound_tracker> partition_trackers;
> + Item_window_func *win_func;
> + while ((win_func= iter_win_funcs++))
> {
> - c->init(thd, info, item_win->window_spec->partition_list,
> - item_win->window_spec->order_list);
> + Group_bound_tracker *tracker= new Group_bound_tracker(thd,
> +
> win_func->window_spec->partition_list);
> + // TODO(cvicentiu) This should be removed and placed in constructor.
> + tracker->init();
> + partition_trackers.push_back(tracker);
> }
>
> - bool is_error= false;
> + List_iterator_fast<Group_bound_tracker>
> iter_part_trackers(partition_trackers);
> ha_rows rownum= 0;
> uchar *rowid_buf= (uchar*) my_malloc(tbl->file->ref_length, MYF(0));
>
> while (true)
> {
> - /* Move the current_row */
> - if ((err=info->read_record(info)))
> - {
> - break; /* End of file */
> - }
> - bool partition_changed= item_win->check_if_partition_changed();
> + if ((err= info.read_record(&info)))
> + break; // End of file.
>
> + /* Remember current row so that we can restore it before computing
> + each window function. */
> tbl->file->position(tbl->record[0]);
> memcpy(rowid_buf, tbl->file->ref, tbl->file->ref_length);
>
> - if (partition_changed || (rownum == 0))
> - {
> - sum_func->clear();
> - /*
> - pre_XXX functions assume that tbl->record[0] contains
> current_row, and
> - they may not change it.
> - */
> - it.rewind();
> - while ((c= it++))
> - c->pre_next_partition(rownum, sum_func);
> - /*
> - We move bottom_bound first, because we want rows to be added into
> the
> - aggregate before top_bound attempts to remove them.
> - */
> - it.rewind();
> - while ((c= it++))
> - c->next_partition(rownum, sum_func);
> - }
> - else
> - {
> - /* Again, both pre_XXX function can find current_row in
> tbl->record[0] */
> - it.rewind();
> - while ((c= it++))
> - c->pre_next_row(sum_func);
> -
> - /* These make no assumptions about tbl->record[0] and may change it
> */
> - it.rewind();
> - while ((c= it++))
> - c->next_row(sum_func);
> - }
> - rownum++;
> + iter_win_funcs.rewind();
> + iter_part_trackers.rewind();
> + iter_cursor_managers.rewind();
>
> - /*
> - Frame cursors may have made tbl->record[0] to point to some record
> other
> - than current_row. This applies to tbl->file's internal state, too.
> - Fix this by reading the current row again.
> - */
> - tbl->file->ha_rnd_pos(tbl->record[0], rowid_buf);
> - store_record(tbl,record[1]);
> - item_win->save_in_field(item_win->result_field, true);
> - err= tbl->file->ha_update_row(tbl->record[1], tbl->record[0]);
> - if (err && err != HA_ERR_RECORD_IS_THE_SAME)
> + Group_bound_tracker *tracker;
> + while ((win_func= iter_win_funcs++) &&
> + (tracker= iter_part_trackers++) &&
> + (cursor_manager= iter_cursor_managers++))
> {
> - is_error= true;
> - break;
> + if (tracker->check_if_next_group() || (rownum == 0))
> + {
> + /* TODO(cvicentiu)
> + Clearing window functions should happen through cursors. */
> + win_func->window_func()->clear();
> + cursor_manager->notify_cursors_partition_changed(rownum);
> + }
> + else
> + {
> + cursor_manager->notify_cursors_next_row();
> + }
> + /* Return to current row after notifying cursors for each window
> + function. */
> + tbl->file->ha_rnd_pos(tbl->record[0], rowid_buf);
> }
> +
> + /* We now have computed values for each window function. They can now
> + be saved in the current row. */
> + save_window_function_values(window_functions, tbl, rowid_buf);
> +
> + rownum++;
> }
>
> my_free(rowid_buf);
> - cursors.delete_elements();
> - return is_error? true: false;
> -}
> + partition_trackers.delete_elements();
> + end_read_record(&info);
>
> + return false;
> +}
>
> /* Make a list that is a concation of two lists of ORDER elements */
>
> @@ -1799,25 +1978,18 @@ static ORDER* concat_order_lists(MEM_ROOT
> *mem_root, ORDER *list1, ORDER *list2)
> return res;
> }
>
> -
> -bool Window_func_runner::setup(THD *thd)
> +bool Window_func_runner::add_function_to_run(Item_window_func *win_func)
> {
> - win_func->setup_partition_border_check(thd);
> +
> + Item_sum *sum_func= win_func->window_func();
> + sum_func->setup_window_func(current_thd, win_func->window_spec);
>
> Item_sum::Sumfunctype type= win_func->window_func()->sum_func();
> - switch (type)
> + switch (type)
> {
> case Item_sum::ROW_NUMBER_FUNC:
> case Item_sum::RANK_FUNC:
> case Item_sum::DENSE_RANK_FUNC:
> - {
> - /*
> - One-pass window function computation, walk through the rows and
> - assign values.
> - */
> - compute_func= compute_window_func_values;
> - break;
> - }
> case Item_sum::COUNT_FUNC:
> case Item_sum::SUM_BIT_FUNC:
> case Item_sum::SUM_FUNC:
> @@ -1825,43 +1997,49 @@ bool Window_func_runner::setup(THD *thd)
> case Item_sum::PERCENT_RANK_FUNC:
> case Item_sum::CUME_DIST_FUNC:
> case Item_sum::NTILE_FUNC:
> - {
> - /*
> - Frame-aware window function computation. It does one pass, but
> - uses three cursors -frame_start, current_row, and frame_end.
> - */
> - compute_func= compute_window_func_with_frames;
> break;
> - }
> +
> default:
> - my_error(ER_NOT_SUPPORTED_YET, MYF(0), "This aggregate as window
> function");
> + my_error(ER_NOT_SUPPORTED_YET, MYF(0),
> + "This aggregate as window function");
> return true;
> }
>
> - return false;
> + return window_functions.push_back(win_func);
> }
>
>
> /*
> Compute the value of window function for all rows.
> */
> -bool Window_func_runner::exec(TABLE *tbl, SORT_INFO *filesort_result)
> +bool Window_func_runner::exec(THD *thd, TABLE *tbl, SORT_INFO
> *filesort_result)
> {
> - THD *thd= current_thd;
> - win_func->set_phase_to_computation();
> -
> - /* Go through the sorted array and compute the window function */
> - READ_RECORD info;
> -
> - if (init_read_record(&info, thd, tbl, NULL/*select*/, filesort_result,
> - 0, 1, FALSE))
> - return true;
> + List_iterator_fast<Item_window_func> it(window_functions);
> + Item_window_func *win_func;
> + while ((win_func= it++))
> + {
> + win_func->set_phase_to_computation();
> + // TODO(cvicentiu) Setting the aggregator should probably be done
> during
> + // setup of Window_funcs_sort.
> +
> win_func->window_func()->set_aggregator(Aggregator::SIMPLE_AGGREGATOR);
> + }
> + it.rewind();
>
> - bool is_error= compute_func(win_func, tbl, &info);
> + List<Cursor_manager> cursor_managers;
> + get_window_functions_required_cursors(thd, window_functions,
> + &cursor_managers);
>
> - win_func->set_phase_to_retrieval();
> + /* Go through the sorted array and compute the window function */
> + bool is_error= compute_window_func(thd,
> + window_functions,
> + cursor_managers,
> + tbl, filesort_result);
> + while ((win_func= it++))
> + {
> + win_func->set_phase_to_retrieval();
> + }
>
> - end_read_record(&info);
> + cursor_managers.delete_elements();
>
> return is_error;
> }
> @@ -1872,21 +2050,15 @@ bool Window_funcs_sort::exec(JOIN *join)
> THD *thd= join->thd;
> JOIN_TAB *join_tab= &join->join_tab[join->top_join_tab_count];
>
> + /* Sort the table based on the most specific sorting criteria of
> + the window functions. */
> if (create_sort_index(thd, join, join_tab, filesort))
> return true;
>
> TABLE *tbl= join_tab->table;
> SORT_INFO *filesort_result= join_tab->filesort_result;
>
> - bool is_error= false;
> - List_iterator<Window_func_runner> it(runners);
> - Window_func_runner *runner;
> -
> - while ((runner= it++))
> - {
> - if ((is_error= runner->exec(tbl, filesort_result)))
> - break;
> - }
> + bool is_error= runner.exec(thd, tbl, filesort_result);
>
> delete join_tab->filesort_result;
> join_tab->filesort_result= NULL;
> @@ -1894,30 +2066,32 @@ bool Window_funcs_sort::exec(JOIN *join)
> }
>
>
> -bool Window_funcs_sort::setup(THD *thd, SQL_SELECT *sel,
> - List_iterator<Item_window_func> &it)
> +bool Window_funcs_sort::setup(THD *thd, SQL_SELECT *sel,
> + List_iterator<Item_window_func> &it)
> {
> Item_window_func *win_func= it.peek();
> Item_window_func *prev_win_func;
>
> + /* The iterator should point to a valid function at the start of
> execution. */
> + DBUG_ASSERT(win_func);
> do
> {
> - Window_func_runner *runner;
> - if (!(runner= new Window_func_runner(win_func)) ||
> - runner->setup(thd))
> - {
> + if (runner.add_function_to_run(win_func))
> return true;
> - }
> - runners.push_back(runner);
> it++;
> prev_win_func= win_func;
> - } while ((win_func= it.peek()) && !(win_func->marker &
> SORTORDER_CHANGE_FLAG));
> -
> + } while ((win_func= it.peek()) &&
> + !(win_func->marker & SORTORDER_CHANGE_FLAG));
> +
> /*
> The sort criteria must be taken from the last win_func in the group of
> - adjacent win_funcs that do not have SORTORDER_CHANGE_FLAG.
> + adjacent win_funcs that do not have SORTORDER_CHANGE_FLAG. This is
> + because the sort order must be the most specific sorting criteria
> defined
> + within the window function group. This ensures that we sort the table
> + in a way that the result is valid for all window functions belonging
> to
> + this Window_funcs_sort.
> */
> - Window_spec *spec = prev_win_func->window_spec;
> + Window_spec *spec= prev_win_func->window_spec;
>
> ORDER* sort_order= concat_order_lists(thd->mem_root,
> spec->partition_list->first,
> @@ -1932,8 +2106,8 @@ bool Window_funcs_sort::setup(THD *thd, SQL_SELECT
> *sel,
>
>
> bool Window_funcs_computation::setup(THD *thd,
> - List<Item_window_func>
> *window_funcs,
> - JOIN_TAB *tab)
> + List<Item_window_func> *window_funcs,
> + JOIN_TAB *tab)
> {
> order_window_funcs_by_window_specs(window_funcs);
>
> diff --git a/sql/sql_window.h b/sql/sql_window.h
> index 54e39d8..c384724 100644
> --- a/sql/sql_window.h
> +++ b/sql/sql_window.h
> @@ -154,9 +154,7 @@ int setup_windows(THD *thd, Ref_ptr_array
> ref_pointer_array, TABLE_LIST *tables,
> // Classes that make window functions computation a part of SELECT's
> query plan
>
> //////////////////////////////////////////////////////////////////////////////
>
> -typedef bool (*window_compute_func_t)(Item_window_func *item_win,
> - TABLE *tbl, READ_RECORD *info);
> -
> +class Frame_cursor;
> /*
> This handles computation of one window function.
>
> @@ -165,21 +163,17 @@ typedef bool
> (*window_compute_func_t)(Item_window_func *item_win,
>
> class Window_func_runner : public Sql_alloc
> {
> - Item_window_func *win_func;
> -
> - /* The function to use for computation*/
> - window_compute_func_t compute_func;
> -
> public:
> - Window_func_runner(Item_window_func *win_func_arg) :
> - win_func(win_func_arg)
> - {}
> + /* Add the function to be computed during the execution pass */
> + bool add_function_to_run(Item_window_func *win_func);
>
> - // Set things up. Create filesort structures, etc
> - bool setup(THD *thd);
> -
> - // This sorts and runs the window function.
> - bool exec(TABLE *tbl, SORT_INFO *filesort_result);
> + /* Compute and fill the fields in the table. */
> + bool exec(THD *thd, TABLE *tbl, SORT_INFO *filesort_result);
> +
> +private:
> + /* A list of window functions for which this Window_func_runner will
> compute
> + values during the execution phase. */
> + List<Item_window_func> window_functions;
> };
>
>
> @@ -191,21 +185,24 @@ class Window_func_runner : public Sql_alloc
>
> class Window_funcs_sort : public Sql_alloc
> {
> - List<Window_func_runner> runners;
> -
> - /* Window functions can be computed over this sorting */
> - Filesort *filesort;
> public:
> bool setup(THD *thd, SQL_SELECT *sel, List_iterator<Item_window_func>
> &it);
> bool exec(JOIN *join);
> void cleanup() { delete filesort; }
>
> friend class Window_funcs_computation;
> +
> +private:
> + Window_func_runner runner;
> +
> + /* Window functions can be computed over this sorting */
> + Filesort *filesort;
> };
>
>
> struct st_join_table;
> class Explain_aggr_window_funcs;
> +
> /*
> This is a "window function computation phase": a single object of this
> class
> takes care of computing all window functions in a SELECT.
>
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.askmonty.org/pipermail/commits/attachments/20160906/ad8ff0f1/attachment-0001.html>
More information about the commits
mailing list