The London Perl and Raku Workshop takes place on 26th Oct 2024. If your company depends on Perl, please consider sponsoring and/or attending.

=head1 NAME

 Stream::Aggregate - generate aggregate information from a stream of data

=head1 SYNOPSIS

 use Stream::Aggregate;

 my $af = generate_aggregation_func(
	$agg_config, 
	$extra_parameters, 
	$user_extra_parameters);

 while ($log = ???) {
	@stats = $af->($log);
 }
 @stats = $af->(undef);

=head1 DESCRIPTION

Stream::Aggregate is a general-purpose aggregation module that will aggregate from a 
stream of perl objects.  While it was written specifically for log processing, it can be used
for other things too.  

Aggregation has two key elements: how you group things and what you aggregate.  This module
understands two different ways to group things: nested and cross-product.

Nested groupings come from processing sorted input: if you have three fields you are 
considering your context, the order in which the data is sorted must match the order in
which these fields make up your context.
If you want to count things by URL, then you must sort your input by URL.

Cross-product groupings come from processing unsorted input.  Each combination of values
of the fields that make up your context is another context.  This can lead to memory 
exhaustion so you must specify the maximum number of values for each of the fields.

=head2 Nested groupings

Nested groups are most easily illustrated with a simple example: aggregating by year,
month, and day.  The input data must be sorted by year, month, and day.  The current context is
defined by the triplet: (year, month, day).  That triplet must be returned by the
C<context> code.  It is stored in the C<@current_context> array.  When a context is
finished, it must be converted into a hash by C<context2columns>.

Doing it this way, you can, for example, get the average of some data item per day, per month, and
per year in one pass though your data.

=head2 Cross-Product grouping

Cross Product grouping does not depend on the sort order of the input and can have
many contexts active at the same time.  

For example, if you're aggregating sales figures for shoes and want statistics for
the combinations of size, width, and color there isn't a sort or nesting order that will
answer your questions.

Use C<crossproduct> to limit yourself to a certain number of values for each variable
(say 10 sizes, 3 widths, and 5 colors).

=head2 API

The configuration for Stream::Aggregate is compiled into a perl function which is
then called once for each input object.  Each time it is called, it may produce one or more
aggregate objects as output.  When there is no more input data, call the aggregation
function with C<undef> as its input.

The generate-the-function routine, C<generate_aggregation_func> takes three parameters.  The
first is the configuration object (defined below).  The configuration object is 
expected (but not required) to come
from a YAML file.  
B<All examples are in L<YAML> format>.
The second and third arguments provide extra information.  Currently 
they are only used to get a description of what this aggregation is trying 
to do using the C<name> field.  Eg:

 generate_aggregation_func($agg_config, $extra, $user_extra);

 my $code = qq{#line 1 "FAKE-all-code-for-$extra->{name}"\n};

The configuration object for Stream::Aggregate is expected to be read from a YAML
file but it does not have to be created that way.

For some of the code fields (below), marked as B<Closure/Config>, you can
provide a closure instead of code.  To do that, have a C<BEGIN> block assign a value
(the closure) to the variable C<$coderef>.  If you do this, code outside the C<BEGIN> block
will only be compiled but will never be run.  When evalutating the BEGIN block, 
the variable C<$agg_config> will be set to the value of I<key_config> (assuming the field was I<key>).

The behavior of C<generate_aggregation_func> in array context may change in the future to
provide additional return values.

=head2 CONTEXT-RELATED CONFIGURATION PARAMETERS

As the aggregator runs over the input, it needs to know the boundries of the contexts 
so that it knows when to generate an aggregation result record.

For example, if you were aggretgating information about URL with nested groupings,
you need to sort your input by URL and you need to 
define a C<context> that returns the URL (in YAML format, with C<$log> as your input variable):

 context: |
   return ($log->{url})

If you want to aggregate over both the URL and the web host, the C<context> 
must return an array: host & URL (in YAML format):

 context: |
   $log->{url} =~ m{(?:https?|ftp)://([^/:]+)}
   my $host = $1;
   return ($host, $log->{url})

When the context is has multiple levels like that, there will be a resultant
aggregation record for each value at each level.

=over 23

=item context

B<Code, Optional>.  Given a C<$log> entry, return an array that describes the aggregation context.  For
example, for a URL, this array might be: domain name; host name (if different from domain name);
each component of the path of the URL except the final filename.   As Aggregate runs, it will
generate an aggregation record for each element of the array.

This code will be invoked on every input record.

This is not required for cross-product aggregations but is required for nested aggregations.

=item context2columns

B<Code, Optional>.
Given a context, in C<@current_context>, return additional key/value pairs for the resulting 
aggregation record.  This is how the context gets described in the aggregation results
records.

This code will be invoked to generate resultant values just before a context is closed.

If this code sets the variable C<$suppress_result>, then this aggregation result will be
discarded.

=item stringify_context

B<Code, Optional>.

If the new context array returned by the C<context> code
(soon to become C<@current_context>) is not an array of
strings but rather an array of references, it will be turned into strings using
YAML.  These strings may not matter because you control how the context 
manifests in the result records with C<context2columns>.

If this isn't what you want, use C<stringify_context> to do something different.
Unlike most of the other functions, C<stringify_context> operates on C<@_>.

This will be invoked for every input record.

=item crossproduct

B<Hash, Name-E<gt>Number, Optional>.

For crossproduct groupings, this defines the dimensions. The keys are the variables.
The values are the maximum number of values for each variable to track.

The keys must be C<ephemeral0>, C<ephemeral>, or C<ephemeral2> column names.

=item combinations

B<Hash, Name-E<gt>Code, Optional>.

If you have crossproduct groupings, do you also want to synthesize contexts that
exclude some or all of the crossproduct dimensions?

For each dimension, provide code that that answers the question: if you remove
this dimension from the crossproduct, should this new context be considered?
This code can look at C<$row> to see what other dimensions are active for this
potential context.  To keep this context, the code must evaluate to a true value.

Combinations are evaluated in alphabetical order.  If there is more than one path
to a combination, only the first path will be considered.  For example, if you have
three crossproduct dimensions, C<a>, C<b> and C<c> then the combinations are (in
the order the'll be considered):

=over

=item C<b> and C<c>, excluding C<a>

=item C<a> and C<c>, excluding C<b>

=item C<a> and C<b>, excluding C<c>

These have no dependency and will be produced if the combinations code returns a true
value.

=item C<c>, excluding C<a> and C<b>

This possibility will only be explored coming from C<b> and C<c>.  If the combination
rule for C<a> rejected the combination, then the C<c>-only permuation will never be
reached.

=item C<b>, excluding C<a> and C<c>

This possibility will only be explored coming from C<b> and C<c>.  If the combination
rule for C<a> rejected the combination, then the C<b>-only permuation will never be
reached.

=item C<a>, excluding C<b> and C<c>

This possibility will only be explored coming from C<a> and C<c>.  If the combination
rule for C<b> rejected the combination, then the C<a>-only permuation will never be
reached.

=item excluding C<a>, C<b> and C<c>

This possibility will only be explored coming from C<c>.  If the combination
rule for C<a> or C<c> rejected the combination, then the empty permuation will never be
reached.

=back

Using combinations can greatly expand your memory requirements.  More than four
dimensions of combinations is probably a bad idea.

If you want skip a dimension entirely, leave it out of the combinations key rather
than adding key that evaluates to false.  A key the evaluates to false will still have
to be tested many times whereas a missing key won't have any code generated for it.

=item simplify

B<Hash, Name-E<gt>Code, Optional>.

When a cross-product key is exceeding its quota of values, the default replacement
value is C<*>.  This hash allows you to override the code that chooses the new
value.

=item finalize_result

B<Code, Optional, Closure/Config>.

This code will be called after the resultant values for a context have been calculated.
It is a last-chance to modify them or to suppress the results.  The values can be found
as a reference to a hash: C<$row>.  To suppress the results, set C<$suppress_results>.

=item new_context

B<Code, Optional, Closure/Config>.

This code will be called each time there is a new context.  At the time it is called, 
C<$ps> is a reference to the new context, but C<@current_context> will not yet have been
updated to the new value.

=item merge

B<Code, Optional, Closure/Config>.

When using multiple levels of contexts, data is counted for the top-most context layer
only.  When that top-most layer finishes, the counts are merged into the next more-general
layer. 

During the merge there is both C<$ps> and C<$oldps> available to for code to reference.
The default merge handles all of the pre-defined column types.  If you are using 
C<$ps-E<gt>{heap}> storage for context data, you need to merge that data from 
C<$oldps> to C<$ps> yourself.

=back

=head2 CONTROL FLOW CONFIGURATION

=over 23

=item filter

B<Code, Optional>.
Before any of the columns are calculated or any of the values saved, run this filter
code.  If it returns a true value then proceed as normal.  If it returns a false value,
then do not count any of the values.   The filter code can remember
things in C<$ps->{heap}> that might effect how other things are counted.  

In some situations, you many want to throw away most data and count things in the
filter.  When doing that, it may be that all of the columns come from 
C<output>.

B<This may be redesigned in a future release in a way that is not backwards compatible.>

=item filter_early

B<Boolean, Optional>, default C<false>.
Check the filter early before figuring out contexts?  If so, and the result is
filtered, don't check to see if the context changed.

=item passthrough

B<Code, Optional>.
Add results to the output of the aggregation.  A value of C<$log> (assuming that's your
input record variable) adds the input
data stream to the output stream.  The passthrough code is run before the input line 
is counted.

=back

=head2 PARAMETERS CONFIGURATION

=over 23

=item max_stats_to_keep

B<Number, Optional>, default: 4000.

When aggregating large amounts of data, limit memory use by throwing away some of the
data.  When data is thrown away, keep this number of samples for statistics functions
that need bulk data like standard_deviation.

=item reduce

B<Code, Optional, Closure/Config>.

When C<max_stats_to_keep> is exceeded, data will be thrown away.  This function
will be called when that has happened.

=item preprocess

B<Code, Optional>.
Code to preprocess the input C<$log> objects.

=item item_name

B<String, Optional>, default: C<$log>.
The default name for the input values to aggregate over is C<$log>.  If this name
is not appropriate, you can use C<item_name> to change the variable name of the 
input values to something else.  Include the dollar sign (C<$>) in the name.

=item debug

B<Boolean, Optional>.
Print out some debugging information, including the code that is generated for 
building the columns.

=item strict

B<Boolean, Optional>, default: false.
Enforce strict and warnings for user code.

=back

=head2 AGGREGATE OUTPUT COLUMNS CONFIGURATION

Each of these (except C<ephemeral> & C<keep>) defines additional columns of
output that will be included in each aggregation record.   These are all optional and
all are defined as key/value pairs where the keys are column names and the values are perl
code.  You can refer to other columns using the variable 
C<$column_I<column_name>> where I<column_name> is the name of one of the other
columns.   When refering to other columns, the order in which columns are processed
matters: C<ephemeral> and C<keep> are processed first and second respecively.
Idential code fragments will be evaluated only once.  Within a group, columns are
evaluated alphabetically.

Some of the columns will have their code evaluated per-item and some are evaluated per-aggregation.

The input data is in C<$log> unless overriden by C<item_name>.

=head3 Per item callbacks

=over 23

=item ephemeral

These columns will not be included in the aggregation data.  Refer to them
as C<$column_I<column_name>>.   If you are using ephemeral to declare the 
column but do not want to assign it a value, set the value for the ephemeral
code to be undef.  In YAML, thats C<~>:  

 ephemeral:
   var1: ~
   var2: ~

=item ephemeral0

Same as C<ephemeral>, will be evaluated before C<ephemeral>.

=item ephemeral2

Same as C<ephemeral>, will be evaluated after C<ephemeral>.

=item counter

Keep a counter.  Add one if the code returns true.

=item percentage

Keep a counter.  Include the percentage of items for which the code returned
true as an output column as opposed to the number of items where the code
returned C<0>.  A return value of C<undef> does not get counted at all.

=item sum

Keep an accumulator.  Add the return values.

=item mean

Keep an accumulator.  Add the return values.
Divide by the number of items before inserting into the results.
Items whose value is C<undef> do not count towards the number of items
or the sum.

=item standard_deviation

Remeber the return values.  Compute the standard deviation of the
accumulated return values and insert that into the results.
Items whose value is C<undef> are removed before calculating the
standard_deviation.

=item median

Remeber the return values.  Compute the median of the
accumulated return values and insert that into the results.
Items whose value is C<undef> are removed before calculating the
median.

=item dominant

Remeber the return values.  Compute the mode (most frequent) of the
accumulated return values and insert that into the results.
Items whose value is C<undef> are removed before calculating the
mode.

=item min

Keep a minimum numeric value.  
Replace it with the return value if the return value is
less
than the current value.
Items whose value is C<undef> are removed before calculating the min.

=item max

Keep a maximum numeric value.  
Replace it with the return value if the return value is
greater
than the current value.
Items whose value is C<undef> are removed before calculating the max.

=item minstr

Keep a minimum string value.  
Replace it with the return value if the return value is
less
than the current value.
Items whose value is C<undef> are removed before calculating the minstr.

=item maxstr

Keep a maximum string value.  
Replace it with the return value if the return value is
greater
than the current value.
Items whose value is C<undef> are removed before calculating the maxstr.

=item keep

Remember the return values.   The return values are 
available at aggregation time as 
C<@{$ps-E<gt>{keep}{column_name}}>.
Items whose value is C<undef> are kept but they're ignored 
by L<Stream::Aggregate::Stats> functions.

=back

=head3 Per aggregation result record callbacks

For code that is per-aggregation, the saved aggregation state can be found in C<$ps>.  One item that
is probably needed is C<$ps-E<gt>{item_count}>.

=over 23

=item output

Extra columns to include in the output.  This is where to save C<$ps-E<gt>{item_count}>.

=item stat

Use arbitrary perl code to compute statistics on remembered
return values kept with C<keep>.  Write your own function or
use any of the functions in L<Stream::Aggregate::Stats>
(the global variable is pre-loaded).    No, there isn't any
difference between this and C<output>.

=back

=head2 VARIALBES AVAILABLE FOR CODE SNIPPETS TO USE

The following variables are available for the code that generates per-item and 
per-aggregation statistics:

=over 23

=item $log

The current input item (unless overridden by C<item_name>)

=item $ps-E<gt>{keep}{column_name}

An array of return values kept by C<keep>.

=item $ps-E<gt>{numeric}{column_name}

If L<Stream::Aggregate::Stats> functions are called, they will grab the 
numeric values from C<$ps-E<gt>{keep}{column_name}> and store them in
C<$ps-E<gt>{numeric}{column_name}>.

=item $ps-E<gt>{random}

For each kept item in C<$ps-E<gt>{keep}{column_name}>,
there is a corrosponding item in $ps-E<gt>{random} that is
a random number.  These random numbers are used to determine
which values to keep and which values to toss if there are too
many values to keep them all.

=item $ps-E<gt>{$column_type}{column_name}

For each type of column (output, counter, percentage, sum, min,
standard_deviation, median, stat) the values that will be part
of the final aggregation record.

=item $ps-E<gt>{$tempoary_type}{column_name}

Some columns need temporary storage for their values:
percentage_counter (the counter used by percentage);
percentage_total (the number of total items);
mean_sum (the sum used to compute the mean);
mean_count (the number of items for the mean).

=item $ps-E<gt>{heap}

A hash that can be used by the configured perl code for whatever
it wants.

=item $ps-E<gt>{item_counter}

The count of items.

=item $agg_config

The configuration object for Stream::Aggregate

=item $itemref

A reference to C<$log>.  It's always C<$itemref> even if C<$log> is
something else.

=item @current_context

The current context as returned by C<context>.

=item @context_strings

The string-ified version C<@current_context> as returned by 
C<stringify_context> or L<YAML>.

=item @contexts

The array of context objects.  C<$ps> is always C<$context[-1]>.

=item @items_seen

An array that counts the number of rows of output from this aggregation.
When the context is multi-level, the counter is multi-level.  For example,
if the context is I<domain>, I<host>, and I<URL>; then 
C<$items_seen[0]> is the number of I<domains> (so far), and 
C<$items_seen[1]> is the number of I<hosts> for this I<domain> (so far), 
and C<$items_seen[2]> is the number of I<URLs> for this I<host> (so far).

Passthrough rows do not count.

XXX what about cross-product aggregations?

=item $row

When gathering results, the variable that holds them is a reference to
a hash: C<$row>.

=item $suppress_result

After gathering results, the C<$suppress_result> variable is examined.
If it's set the results (in C<$row>) are discards.

To skip results that aren't crossproduct results, 
in C<finalize_result>, set C<$suppress_result> if C<$cross_count> isn't true.

=item $cross_count

The number of currently active crossproduct accumulator contexts.

=item $extra, $user_extra

The additional paramerts (beyond C<$agg_config>) that were passed to 
C<generate_aggregation_func()>.

=item %persist

This hash is not used by Stream::Aggregate.  It's available for
any supplied code to use however it wants.

=item $last_item

A refernece to the previous C<$log> object.  This is valid during 
C<finalize_result> and C<context2columns>.

=back

There are more.  Read the code.

=head2 HELPER FUNCTIONS

The following helper functions are available: 
everything in L<Stream::Aggregate::Stats> and:

=over

=item nonblank($value)

Returns $value if $value is defined and not the empty string.   Returns undef otherwise.

=back

=head2 EXAMPLE1

This example will look at a set of records regarding health risks.  Each record
represents a person:

 name<TAB>birthday<TAB>sex<TAB>number of hospital visits in the last year

We will generate the following aggregation records:

=over

=item *

Average age of entire sample

=item *

Median number of hospital visits in the last year for the entire sample.

=item *

Median number of hospital visits in the last year by sex.

=item *

Median number of hospital visits in the last year by age, with a minimum
sample size of five.

=item *

Median number of hospital vists in the last year by age and sex with a minimum
sample size of five.

=back

The code:

  #!/usr/bin/perl 

  #
  # Our input data is the raw strings from the input file.  Most of the
  # work is parsing them and reformatting the data.
  #

  use strict;
  use warnings;
  use Stream::Aggregate;
  use YAML;

  my $aconfig = Load(<<'END_ACONFIG');
  strict:                 1
  debug:                  0
  item_name:              $record
  max_stats_to_keep:      500
  filter_early:           1
  filter: |
    # ignore black lines and comments
    return 0 if $record =~ /^#/;
    return 0 if $record =~ /^$/;
    return 1;
  crossproduct:
    sex:                  3
    age:                  150
  combinations:
    sex:                  1
    age:                  1
  ephemeral0:
    #
    # We are using ephemeral0 to declare the column variables
    #
    name:                 ~
    birthday:             ~
    gender:               ~
    number_of_visits:     ~
  ephemeral:
    # 
    # We are using a fake column ($column_step1) in ephemeral to initialize 
    # the raw column variables we declared in ephemeral0
    #
    step1: |
       chomp($record);
       ($column_name, $column_birthday, $column_gender, $column_number_of_visits) = split(/\t/, $record);
  ephemeral2:
    #
    # We are using ephemeral2 to generate the computed input data
    #
    age: |
      use Time::ParseDate qw(parsedate);
      my $t = parsedate($column_birthday, NO_RELATIVE => 1, DATE_REQUIRED => 1, WHOLE => 1, GMT => 1);
      return undef unless $t;
      return int ((parsedate('2011-05-01', GMT => 1) - $t) / (365.24 * 86400))
    sex: |
      return 'M' if $column_gender =~ /^m/i;
      return 'F' if $column_gender =~ /^f/i;
      return undef;
    hospital_visits: |
      $column_number_of_visits =~ /^(\d+)$/;
      $1
  output:
    sample_size:          $ps->{item_counter}
  median:
    avg_hospital_visits:  $column_hospital_visits
  mean:                   
    avg_age:              $column_age
  finalize_result: |
    #
    # Don't generate result records unless there are at
    # least five items being aggregated.
    #
    $suppress_result = 1 if $ps->{item_counter} < 5;
  END_ACONFIG

  my $ag = generate_aggregation_func($aconfig, { 
	  name	=> 'Aggregate Hospital Visits',
  });

  my @results;

  while (<>) {
	  for my $result ($ag->($_)) {
		  # do something with the result records
	  }
  }

  for my $result ($ag->(undef)) {
	  # do something with the result records
  }


=head1 EXAMPLE2

Our example will count the following things: number of unique URLs per domain,
average length of the URL.

  #!/usr/bin/perl 

  use strict;
  use warnings;
  use Stream::Aggregate;
  use YAML;

  my $aconfig = Load(<<'END_ACONFIG');
  strict:                 1
  debug:                  0
  item_name:              $item
  context:                $item->{domain}
  context2columns:        return (domain => $current_context[0])
  ephemeral:
    # 
    # The only persistent unstructured place to store data from 
    # one row to the next is $ps->{heap}.   $ps->{heap} 
    # is per-context, but that's okay for our usage.
    #
    is_different: |
      my $old = $ps->{heap}{last_item};
      $ps->{heap}{last_item} = $item;
      return 1 unless $old;
      return 0 if $old->{url} eq $item->{url};
      return 1;
  sum:
    unique_urls:          $column_is_different
  mean:                   
    avg_url_length:       length($item->{url})
  finalize_result: |
    # we don't want the roll-up context of all domains
    $suppress_result = 1 unless $row->{domain};
  END_ACONFIG

  my $ag = generate_aggregation_func($aconfig, { 
	  name	=> 'Aggregate URL data'
  });

  while(<>) {
	  # we'll parse the input here
	  my $item;
	  chomp;
	  next if /^$/;
	  next if /^#/;
	  die "'$_'" unless m{^\w+:\/\/([^/]+)(?:/.*)?};
	  $item = {
		  domain	=> $1,
		  url	=> $_,
	  };
	  for my $result ($ag->($item)) {
		  # do something with the result records
	  }
  }

  for my $result ($ag->(undef)) {
	# do something with the results records
  }

=head1 LICENSE

Copyright (C) 2008-2010 David Sharnoff; Copyright (C) 2011 Google, Inc.

This package may be used and redistributed under the terms of either
the Artistic 2.0 or LGPL 2.1 license.