Hi Varun,

Here's the review for your patch.

> diff --git a/mysql-test/t/percentile.test b/mysql-test/t/percentile.test
> new file mode 100644
> index 00000000000..0958fc05e7d
> --- /dev/null
> +++ b/mysql-test/t/percentile.test
> @@ -0,0 +1,41 @@
Whenever you are adding a test case, please add the result file too.
It makes things easier to review.
> +CREATE TABLE student (name CHAR(10), test double, score TINYINT); 
> +
> +INSERT INTO student VALUES 
> +('Chun', 0, null), ('Chun', 0, 4), 
> +('Esben', 1, null), ('Esben', 1, null), 
> +('Kaolin', 0.5, 56), ('Kaolin', 0.5, 88), 
> +('Tatiana', 0.8, 2), ('Tatiana', 0.8, 1);
> +
> +
> +
> +select name, percentile_disc(0.6)  within group(order by score) over (partition by name) from student;
> +select name, percentile_disc(test)  within group(order by score) over (partition by name) from student;
> +select name, percentile_disc(0.4)  within group(order by score) over (partition by name) from student;
> +
> +
> +#select name, percentile_cont(null)  within group(order by score) over (partition by name) from student;
> +#select name, cume_dist()  over (partition by name order by score) from student;
> +
> +
> +#normal parsing
> +#select percentile_cont(0.5) within group(order by score)  over w1  from student
> +#window w1 AS (partition by name);
> +
> +# no partition clause
> +#select percentile_cont(0.5) within group(order by score)  over ()  from student;
> +
> +
> +# only one sort allowed
> +#select percentile_cont(0.5) within group(order by score) over (partition by name);
> +
> +#parameter value should be in the range of 0 to 1
> +#select percentile_cont(1.5) within group(order by score) over (partition by name);
> +
> +
> +#
> +#select rank() over (partition by name order by score  ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) from student;
> +
> +
> +
> +drop table student;
> +
> diff --git a/sql/item.h b/sql/item.h
> index 76ce4aa935f..3df4ec4f358 100644
> --- a/sql/item.h
> +++ b/sql/item.h
> @@ -5215,6 +5217,14 @@ class Cached_item_item : public Cached_item
>      cmp();
>      item= save;
>    }
This is not used in the patch. Is there a need for it any more?
> +  Item* get_item()
> +  {
> +    return item;
> +  }
Add an empty line here here.
> +  void clear()
> +  {
> +    null_value= false;
> +  }
>  };
>  
>  class Cached_item_str :public Cached_item_item
> @@ -5236,6 +5250,12 @@ class Cached_item_real :public Cached_item_item
>    Cached_item_real(Item *item_par) :Cached_item_item(item_par),value(0.0) {}
>    bool cmp(void);
>    int  cmp_read_only();
Space after ). Mark this function as const
> +  double get_value(){ return value;}
I don't quite get what this clear is supposed to do. I'd expect clear to set
the null_value to true, not to false. Same for other functions in the diff.
Add a comment explaining this.
> +  void clear()
> +  {
Space after =
> +    value=0.0;
> +    null_value= false;
> +  }
>  };
>  
>  class Cached_item_int :public Cached_item_item
> @@ -5245,6 +5265,12 @@ class Cached_item_int :public Cached_item_item
>    Cached_item_int(Item *item_par) :Cached_item_item(item_par),value(0) {}
>    bool cmp(void);
>    int  cmp_read_only();
Space after ). Mark this function as const.
> +  longlong get_value(){ return value;}
> +  void clear()
> +  {
Space after =
> +    value=0.0;
> +    null_value= false;
> +  }
>  };
>  
>  
> @@ -5255,6 +5281,12 @@ class Cached_item_decimal :public Cached_item_item
>    Cached_item_decimal(Item *item_par);
>    bool cmp(void);
>    int  cmp_read_only();
Space after ) Mark this function as const.
> +  my_decimal *get_value(){ return &value;};
> +  void clear()
> +  {
> +    null_value= false;
> +    my_decimal_set_zero(&value);
> +  }
>  };
>  
>  class Cached_item_field :public Cached_item
> diff --git a/sql/item_windowfunc.cc b/sql/item_windowfunc.cc
> index 6ab903a81bb..b711642c52c 100644
> --- a/sql/item_windowfunc.cc
> +++ b/sql/item_windowfunc.cc
> @@ -93,6 +93,17 @@ Item_window_func::fix_fields(THD *thd, Item **ref)
>      my_error(ER_NO_ORDER_LIST_IN_WINDOW_SPEC, MYF(0), window_func()->func_name());
>      return true;
>    }
> +
> +  if (only_single_element_order_list())
> +  {
Mark such comments as TODO, like so:
TODO(varunt): <message>
> +    // need to change the error, the error should say that we have more than one element in the order list
> +    if (window_spec->order_list->elements != 1)
> +    {
> +      my_error(ER_NO_ORDER_LIST_IN_WINDOW_SPEC, MYF(0), window_func()->func_name());
> +      return true;
> +    }
> +  }
> +
>    /*
>      TODO: why the last parameter is 'ref' in this call? What if window_func
>      decides to substitute itself for something else and does *ref=.... ? 
> @@ -179,6 +190,16 @@ void Item_sum_dense_rank::setup_window_func(THD *thd, Window_spec *window_spec)
>    clear();
>  }
I don't see any added value for this extra setup_percentile_func, when it's only
I guess you can just put the code in directly.
> +void Item_sum_percentile_disc::setup_window_func(THD *thd, Window_spec *window_spec)
> +{
> +  setup_percentile_func(thd, window_spec->order_list);
> +}
> +
This function does nothing. get_handler_by_cmp_type returns an item and then you
don't do anything with it.
> +void Item_sum_percentile_disc::set_type_handler(Window_spec *window_spec)
> +{
> +  type_handler()->get_handler_by_cmp_type(window_spec->order_list->first->item[0]->result_type());
> +}
> +
>  bool Item_sum_dense_rank::add()
>  {
>    if (peer_tracker->check_if_next_group() || first_add)
> diff --git a/sql/item_windowfunc.h b/sql/item_windowfunc.h
> index 77cbd556e60..5179d1832e8 100644
> --- a/sql/item_windowfunc.h
> +++ b/sql/item_windowfunc.h
> @@ -16,10 +17,13 @@ class Group_bound_tracker
>  
>    Group_bound_tracker(THD *thd, SQL_I_List<ORDER> *list)
>    {
As discussed on slack, lets keep this Group_bound_tracker as is.
Better yet, add a DBUG_ASSERT(list).
> -    for (ORDER *curr = list->first; curr; curr=curr->next) 
> +    if (list)
>      {
> -      Cached_item *tmp= new_Cached_item(thd, curr->item[0], TRUE);
> -      group_fields.push_back(tmp);
> +      for (ORDER *curr = list->first; curr; curr=curr->next)
> +      {
> +        Cached_item *tmp= new_Cached_item(thd, curr->item[0], TRUE);
> +        group_fields.push_back(tmp);
> +      }
>      }
>    }
>  
> @@ -71,6 +75,19 @@ class Group_bound_tracker
>      return 0;
>    }
>  
Add a comment as to what this function does. I don't quite like how this
function works. cmp has a side effect of overwriting the cached item with the
compared value. In addition this function stops if we encounter a null value
and the group fields are now partially updated.

I assume this is only for a single element order list. Why not make something
specialised for that and not use this cached_item construct.
> +  bool compare_with_cache_for_null_values()
> +  {
> +    List_iterator<Cached_item> li(group_fields);
> +    Cached_item *ptr;
> +    while ((ptr= li++))
> +    {
> +      ptr->cmp();
> +      if (ptr->null_value)
> +        return true;
> +    }
> +    return false;
> +  }
> +
>  private:
>    List<Cached_item> group_fields;
>    /*
> @@ -678,6 +704,138 @@ class Item_sum_ntile : public Item_sum_window_with_row_count
>    ulong current_row_count_;
>  };
>  
> +class Item_sum_percentile_disc : public Item_sum_cume_dist,
> +                                 public Type_handler_hybrid_field_type
> +{
> +public:
> +  Item_sum_percentile_disc(THD *thd, Item* arg) : Item_sum_cume_dist(thd, arg),
> +                           Type_handler_hybrid_field_type(&type_handler_longlong),
> +                           value(NULL), result_value(NULL), val_calculated(FALSE) {}
> +
> +  double val_real()
> +  {
> +    if (get_row_count() == 0 || get_arg(0)->is_null())
> +    {
> +      null_value= true;
> +      return 0;
> +    }
> +    null_value= false;
Look at Item_sum_hybrid_simple. See how Item_cache * works there. Make use of
that instead of Cached_item_xxx.
> +    return ((Cached_item_real*) result_value)->get_value();
> +  }
> +
> +  longlong val_int()
> +  {
> +    if (get_row_count() == 0 || get_arg(0)->is_null())
> +    {
> +      null_value= true;
> +      return 0;
> +    }
> +    null_value= false;
Same comment as for val_real()
> +    return ((Cached_item_int*) result_value)->get_value();
> +  }
> +
> +  my_decimal* val_decimal(my_decimal* dec)
> +  {
> +    if (get_row_count() == 0 || get_arg(0)->is_null())
> +    {
> +      null_value= true;
> +      return 0;
> +    }
> +    null_value= false;
Same comment as for val_decimal()
> +    return ((Cached_item_decimal*) result_value)->get_value();
> +  }
> +
Lets try implementing this with Item_cache, instead of cached_item. Cached_item
is designed for comparison, while Item_cache is used for storing something just
to cache it.
> +  bool add()
> +  {
> +    Item *arg = get_arg(0);
> +    if (arg->is_null())
> +      return true;
> +    /*
> +      need to ensure that the Item arg is constant across the entire partition
> +      and its value ranges between [0,1]
> +    */
> +    value->cmp();
> +
> +    /* for the null values of the row, we dont count take those rows in account for calculating
> +    the CUME_DIST */
> +
> +    if(value->null_value)
> +      return false;
> +
> +    Item_sum_cume_dist::add();
> +    double val1= Item_sum_cume_dist::val_real();
> +    /* need to check type and return value accordingly*/
> +    double val2 =arg->val_real_from_decimal();
> +
> +    /* use Cached_item to do the comparision using cmp_read_only() */
> +
> +    if( val1 >= val2 && !val_calculated)
> +    {
> +      val_calculated= true;
> +      result_value->cmp();
> +      return false;
> +    }
> +    return false;
> +  }
> +
> +  enum Sumfunctype sum_func() const
> +  {
> +    return PERCENTILE_DISC_FUNC;
> +  }
> +
> +  void clear()
> +  {
> +    val_calculated= false;
> +    value->clear();
> +    result_value->clear();
> +    Item_sum_cume_dist::clear();
> +  }
> +
> +  const char*func_name() const
> +  {
> +    return "percentile_disc";
> +  }
> +
> +  void update_field() {}
> +  void set_type_handler(Window_spec *window_spec);
> +  const Type_handler *type_handler() const
> +  {return Type_handler_hybrid_field_type::type_handler();}
> +
> +  void fix_length_and_dec()
> +  {
> +    decimals = 10;  // TODO-cvicentiu find out how many decimals the standard
> +                    // requires.
> +  }
> +
> +  Item *get_copy(THD *thd, MEM_ROOT *mem_root)
> +  { return get_item_copy<Item_sum_percentile_disc>(thd, mem_root, this); }
> +  void setup_window_func(THD *thd, Window_spec *window_spec);
> +  void setup_percentile_func(THD *thd, SQL_I_List<ORDER> *list)
> +  {
> +    value= new_Cached_item(thd, list->first->item[0], FALSE);
> +    result_value= new_Cached_item(thd, list->first->item[0], FALSE);
> +  }
> +  void cleanup()
> +  {
> +    if (value)
> +    {
> +      delete value;
> +      value= NULL;
> +    }
> +    if(result_value)
> +    {
> +      delete result_value;
> +      result_value= NULL;
> +    }
> +    Item_sum_num::cleanup();
> +  }
> +
> +private:
> +  Cached_item *value;
> +  Cached_item *result_value;
> +  bool val_calculated;
> +};
> +
>  
>  class Item_window_func : public Item_func_or_sum
>  {
> diff --git a/sql/sql_window.cc b/sql/sql_window.cc
> index 74e1ad25183..9fda4520a05 100644
> --- a/sql/sql_window.cc
> +++ b/sql/sql_window.cc
> @@ -868,9 +873,39 @@ class Partition_read_cursor : public Table_read_cursor
>      }
>      return 0;
>    }
This is not something we want in Partition_read_cursor. Move the contents
of it to the most derived class that needs it.
> +  bool next_func(ha_rows *counter)
> +  {
> +    if (next())
> +      return true;
> +    if (!check_for_null_row())
> +    {
> +      (*counter)++;
> +    }
> +    return false;
> +  }
This is also not something we want in Partition_read_cursor.
> +  bool fetch_func(ha_rows *counter)
> +  {
> +    if (fetch())
> +      return true;
> +    if (!check_for_null_row())
> +    {
> +      (*counter)++;
> +    }
> +    return false;
> +  }
Null row here has no meaning. Partition_read_cursor looks at the bound_tracker
for partitions. What "row" are we talking about here. Then again,
this logic can probably go into our "special" frame cursor below.
> +  bool check_for_null_row()
> +  {
> +    if (!end_of_partition)
> +    {
Check compare_with_cache_for_null_values comment.
> +      if (order_tracker.compare_with_cache_for_null_values())
> +        return true;
> +    }
> +    return false;
> +  }
>  
>  private:
>    Group_bound_tracker bound_tracker;
Not something we want in Partition_read_cursor. If you need an order_tracker
define a different class.
> +  Group_bound_tracker order_tracker;
>    bool end_of_partition;
>  };
>  
> @@ -1102,7 +1137,7 @@ class Frame_range_n_top : public Frame_cursor
>                      SQL_I_List<ORDER> *partition_list,
>                      SQL_I_List<ORDER> *order_list,
>                      bool is_preceding_arg, Item *n_val_arg) :
So this gets reverted as discussed on slack.
> -    cursor(thd, partition_list), n_val(n_val_arg), item_add(NULL),
> +    cursor(thd, partition_list, NULL), n_val(n_val_arg), item_add(NULL),
>      is_preceding(is_preceding_arg)
>    {
>      DBUG_ASSERT(order_list->elements == 1);
> @@ -1241,7 +1276,7 @@ class Frame_range_n_bottom: public Frame_cursor
>                         SQL_I_List<ORDER> *partition_list,
>                         SQL_I_List<ORDER> *order_list,
>                         bool is_preceding_arg, Item *n_val_arg) :
So this gets reverted as discussed on slack.
> -    cursor(thd, partition_list), n_val(n_val_arg), item_add(NULL),
> +    cursor(thd, partition_list, NULL), n_val(n_val_arg), item_add(NULL),
>      is_preceding(is_preceding_arg), added_values(false)
>    {
>      DBUG_ASSERT(order_list->elements == 1);
> @@ -1371,7 +1406,7 @@ class Frame_range_current_row_bottom: public Frame_cursor
>    Frame_range_current_row_bottom(THD *thd,
>                                   SQL_I_List<ORDER> *partition_list,
>                                   SQL_I_List<ORDER> *order_list) :
So this gets reverted as discussed on slack.
> -    cursor(thd, partition_list), peer_tracker(thd, order_list)
> +    cursor(thd, partition_list, NULL), peer_tracker(thd, order_list)
>    {
>    }
>  
> @@ -1658,6 +1693,35 @@ class Frame_unbounded_following_set_count : public Frame_unbounded_following
>    }
>  };
>  
As discussed on slack, this needs to be renamed to something that makes sense.
> +class Frame_unbounded_following_set_count_special : public Frame_unbounded_following_set_count
> +{
> +public:
> +  Frame_unbounded_following_set_count_special(
> +      THD *thd,
> +      SQL_I_List<ORDER> *partition_list, SQL_I_List<ORDER> *order_list) :
> +    Frame_unbounded_following_set_count(thd, partition_list, order_list)
> +    {}
> +
next_partition can have the logic of fetch_func directly in it.
> +  void next_partition(ha_rows rownum)
> +  {
> +    ha_rows num_rows_in_partition= 0;
> +    if (cursor.fetch_func(&num_rows_in_partition))
> +      return;
> +
Dangerous to have a while loop like this. Lets put next_func code here directly.
> +    /* Walk to the end of the partition, find how many rows there are. */
> +    while (!cursor.next_func(&num_rows_in_partition));
> +
> +    List_iterator_fast<Item_sum> it(sum_functions);
> +    Item_sum* item;
> +    while ((item= it++))
> +    {
> +      Item_sum_window_with_row_count* item_with_row_count =
> +        static_cast<Item_sum_window_with_row_count *>(item);
> +      item_with_row_count->set_row_count(num_rows_in_partition);
> +    }
> +  }
> +};
> +
>  /////////////////////////////////////////////////////////////////////////////
>  // ROWS-type frame bounds
>  /////////////////////////////////////////////////////////////////////////////
> diff --git a/sql/sql_yacc.yy b/sql/sql_yacc.yy
> index a0bbf39b138..6c64477f08d 100644
> --- a/sql/sql_yacc.yy
> +++ b/sql/sql_yacc.yy
> @@ -10671,6 +10676,45 @@ simple_window_func:
>            }
>          ;
>  
> +inverse_distribution_function:
> +        inverse_distribution_function_def WITHIN GROUP_SYM
> +        '('
> +         { Select->prepare_add_window_spec(thd); }
> +         order_by_single_element_list ')' OVER_SYM
> +        '(' opt_window_ref opt_window_partition_clause ')'
> +         {
> +           LEX *lex= Lex;
> +           if (Select->add_window_spec(thd, lex->win_ref,
> +                                      Select->group_list,
> +                                      Select->order_list,
> +                                      NULL))
> +             MYSQL_YYABORT;
> +           $$= new (thd->mem_root) Item_window_func(thd, (Item_sum *) $1,
> +                                                    thd->lex->win_spec);
> +           if ($$ == NULL)
> +             MYSQL_YYABORT;
> +           if (Select->add_window_func((Item_window_func *) $$))
> +             MYSQL_YYABORT;
> +         }
> +        ;
> +
> +inverse_distribution_function_def:
> +         PERCENTILE_CONT_SYM '(' expr ')'
Indentation for { and } is not correct here. Please check this file and do the same.
> +       {
mark it as TODO(varun):
> +         //Not yet started implementing
> +       }
> +       | PERCENTILE_DISC_SYM '(' expr ')'
Same, check indentation for { and }
> +       {
> +          $$= new (thd->mem_root) Item_sum_percentile_disc(thd, $3);
> +          if ($$ == NULL)
> +            MYSQL_YYABORT;
> +       }
> +      ;
> +
Indentation check!
> +order_by_single_element_list:
> +    ORDER_SYM BY order_list
> +    ;
> +
>  window_name:
>            ident
>            {