#!/usr/bin/env perl -l ### ### THIS SCRIPT DOES NOT WORK ### ### I suspect that Coro and MX::Workers won't play nice, but ### it will have to wait till later for me to debug it ### # # http://www.tbray.org/ongoing/When/200x/2007/09/20/Wide-Finder # # # Rather than Erlang (as was in here before) this is more based on the # Scala version of this code at # http://www.martin-probst.com/2007/09/24/wide-finder-in-scala/ # # # requires the data at http://www.tbray.org/tmp/o10k.ap # $|++; sub main { die 'no file' unless -e 'ex/tbray.data.big'; Slurp->new( filename => 'ex/tbray.data.big' ); } { package Slurp; use MooseX::Coro; use IO::File; use Coro; has filename => ( isa => 'Str', is => 'ro', ); has count => ( isa => 'HashRef', is => 'rw', default => sub { {} }, ); has file => ( isa => 'IO::File', is => 'ro', lazy => 1, default => sub { IO::File->new( $_[0]->filename, 'r' ); }, ); has counter => ( reader => 'c', default => sub { Count->new( sender => $_[0] ) }, handles => { counter => 'yield' }, ); sub START { $_[0]->yield('loop'); } event loop => sub { my ($self) = @_; my $file = $self->file; while ( not eof $file ) { my @chunk; for ( 0 .. ( 600000 / 8 ) ) { $_ = <$file>; push @chunk, $_; } $self->counter( 'loop', $self, \@chunk ); } $self->yield('tally'); }; event inc => sub { my ( $self, $chunk ) = @_; my $count = $self->count; $count->{$_} += $chunk->{$_} for ( keys %$chunk ); $_[0]->count($count); }; event tally => sub { my $count = $_[0]->count; print "$count->{$_}: $_" for sort { $count->{$b} <=> $count->{$a} } keys %$count; $_[0]->yield('STOP'); }; __PACKAGE__->meta->make_immutable; } { package Count; use MooseX::Coro; use JSON::Any qw(XS); with qw(MooseX::Workers); sub BUILD { POE::Kernel->run } has sender => ( reader => 's', required => 1, handles => { sender => 'yield' }, ); event loop => sub { my ( $self, $chunk ) = @_; my $count = {}; my $rx = qr|GET /ongoing/When/\d\d\dx/(\d\d\d\d/\d\d/\d\d/[^ .]+)|o; $self->spawn( sub { for my $line (@$chunk) { $count->{$1}++ if $line =~ $rx; } print JSON::Any->encode($count); } ); }; sub worker_stdout { my ( $self, $out ) = @_; warn $out; my $count = JSON::Any->decode($out); $self->sender_return( 'inc', $count ); } sub worker_manager_start { warn 'started worker manager' } sub worker_manager_stop { warn 'stopped worker manager' } sub max_workers_reached { warn 'maximum worker count reached' } sub worker_stderr { shift; warn 'STDERR: ' . join ' ', @_; } sub worker_error { shift; warn join ' ', @_; } sub worker_done { shift; warn 'DONE: ' . join ' ', @_; } sub worker_started { shift; warn 'STARTED: ' . join ' ', @_; } sub sig_child { shift; warn join ' ', @_; } __PACKAGE__->meta->make_immutable; } main();