#!/usr/bin/perl -w # # iterate over mail archives, calling a function on each message. package Mail::SpamAssassin::ArchiveIterator; use strict; use bytes; use IO::Select; use IO::Socket; use Mail::SpamAssassin::Util; use constant BIG_BYTES => 256*1024; # 256k is a big email use constant BIG_LINES => BIG_BYTES/65; # 65 bytes/line is a good approximation my $no; my $tz; BEGIN { $no = 1; $tz = local_tz(); } use vars qw { $MESSAGES }; my @ISA = qw($MESSAGES); ########################################################################### sub new { my $class = shift; $class = ref($class) || $class; my $self = shift; if (!defined $self) { $self = { }; } bless ($self, $class); $self->{s} = { }; # spam, of course $self->{h} = { }; # ham, as if you couldn't guess $self->{opt_after} ||= 0; # default to 0 $self; } ########################################################################### sub set_functions { my ($self, $wanted, $result) = @_; $self->{wanted_sub} = $wanted; $self->{result_sub} = $result; } ########################################################################### sub run { my ($self, @targets) = @_; if (!defined $self->{wanted_sub}) { die "set_functions never called"; } if ($self->{opt_j} == 1) { my $message; my $class; my $result; my $messages; # message-array ($MESSAGES,$messages) = $self->message_array(\@targets); while ($message = (shift @{$messages})) { my ($class, undef, $date) = index_unpack($message); $result = $self->run_message($message); &{$self->{result_sub}}($class, $result, $date) if $result; } } elsif ($self->{opt_j} > 1) { my $select = IO::Select->new(); my $total_count = 0; my $needs_restart = 0; my @child = (); my @pid = (); my $messages; $self->start_children($self->{opt_j}, \@child, \@pid, $select); # message-array ($MESSAGES,$messages) = $self->message_array(\@targets); #warn ">> total: $MESSAGES\n"; # feed childen while ($select->count()) { foreach my $socket ($select->can_read()) { my $result = ''; my $line; while ($line = readline $socket) { if ($line =~ /^RESULT (.+)$/) { my($class,$type,$date) = index_unpack($1); #warn ">> RESULT: $class, $type, $date\n"; if (defined $self->{opt_restart} && ($total_count % $self->{opt_restart}) == 0) { $needs_restart = 1; } # if messages remain, and we don't need to restart, send a message if (($MESSAGES>$total_count) && !$needs_restart) { print { $socket } (shift @{$messages}) . "\n"; $total_count++; #warn ">> recv: $MESSAGES $total_count\n"; } else { # stop listening on this child since we're done with it. #warn ">> removeresult: $needs_restart $MESSAGES $total_count\n"; $select->remove($socket); } # Deal with the result we got. if ($result) { chop $result; # need to chop the \n before RESULT &{$self->{result_sub}}($class, $result, $date); } last; # this will get out of the read for this client } elsif ($line eq "START\n") { if ($MESSAGES>$total_count) { # we still have messages, send one to child print { $socket } (shift @{$messages}) . "\n"; $total_count++; #warn ">> new: $MESSAGES $total_count\n"; } else { # no more messages, so stop listening on this child #warn ">> removestart: $needs_restart $MESSAGES $total_count\n"; $select->remove($socket); } last; # this will get out of the read for this client } else { # result line, remember it. $result .= $line; } } # some error happened during the read! if ( !defined $line || !$line ) { $needs_restart = 1; warn "Got an undef from readline?!? Restarting all children, probably lost some results. :(\n"; $select->remove($socket); } } #warn ">> out of loop, $MESSAGES $total_count $needs_restart ".$select->count()."\n"; # If there are still messages to process, and we need to restart # the children, and all of the children are idle, let's go ahead. if ($needs_restart && $select->count() == 0 && ($MESSAGES>$total_count)) { $needs_restart = 0; #warn "debug: Needs restart, $MESSAGES total, $total_count done.\n"; $self->reap_children($self->{opt_j}, \@child, \@pid); @child=(); @pid=(); $self->start_children($self->{opt_j}, \@child, \@pid, $select); } } # reap children $self->reap_children($self->{opt_j}, \@child, \@pid); } } ############################################################################ sub message_array { my($self, $targets) = @_; foreach my $target (@${targets}) { my ($class, $format, $rawloc) = split(/:/, $target, 3); my @locations = $self->fix_globs($rawloc); foreach my $location (@locations) { $class = substr($class, 0, 1); my $method; if ($format eq 'detect') { #We need to detect what the format is. if ($location eq '-' || !(-d $location)) { #stdin is considered a file if not passed as mbox $method = \&scan_file; } else { #It's a directory $method = \&scan_directory; } } else { if ($format eq "dir") { $method = \&scan_directory; } elsif ($format eq "file") { $method = \&scan_file; } elsif ($format eq "mbox") { $method = \&scan_mailbox; } } if(defined($method)) { &{$method}($self, $class, $location); } else { warn "Format $format unknown!"; } } } my @messages; if ($self->{opt_n}) { my %both = (%{ $self->{s} }, %{$self->{h}}); undef $self->{s}; undef $self->{h}; @messages = sort({ $both{$a} <=> $both{$b} } keys %both); splice(@messages, $self->{opt_head}) if $self->{opt_head}; splice(@messages, 0, -$self->{opt_tail}) if $self->{opt_tail}; } else { my @s = sort({ $self->{s}->{$a} <=> $self->{s}->{$b} } keys %{$self->{s}}); undef $self->{s}; my @h = sort({ $self->{h}->{$a} <=> $self->{h}->{$b} } keys %{$self->{h}}); undef $self->{h}; splice(@s, $self->{opt_head}) if $self->{opt_head}; splice(@s, 0, -$self->{opt_tail}) if $self->{opt_tail}; splice(@h, $self->{opt_head}) if $self->{opt_head}; splice(@h, 0, -$self->{opt_tail}) if $self->{opt_tail}; while (@s && @h) { push @messages, (shift @s); push @messages, (shift @h); } push @messages, (splice @s), (splice @h); } return (scalar(@messages),\@messages); } sub start_children { my($self, $count, $child, $pid, $socket) = @_; my $io = IO::Socket->new(); my $parent; # create children for (my $i = 0; $i < $count; $i++) { ($child->[$i],$parent) = $io->socketpair(AF_UNIX,SOCK_STREAM,PF_UNSPEC) or die "socketpair failed: $!"; if ($pid->[$i] = fork) { close $parent; # disable caching for parent<->child relations my($old) = select($child->[$i]); $|++; select($old); $socket->add($child->[$i]); #warn "debug: starting new child $i (pid ",$pid->[$i],")\n"; next; } elsif (defined $pid->[$i]) { my $result; my $line; close $child->[$i]; select($parent); $| = 1; # print to parent by default, turn off buffering print "START\n"; while ($line = readline $parent) { chomp $line; if ($line eq "exit") { print "END\n"; close $parent; exit; } $result = $self->run_message($line); $result ||= ''; print "$result\nRESULT $line\n"; } exit; } else { die "cannot fork: $!"; } } } sub reap_children { my($self, $count, $socket, $pid) = @_; # If the child died, sending it the exit will generate a SIGPIPE # but we don't really care since the readline will go undef which is fine, # then we do the waitpid which will finish it off. So we end up in the # right state, in theory. local $SIG{'PIPE'} = 'IGNORE'; for (my $i = 0; $i < $count; $i++) { #warn "debug: killing child $i (pid ",$pid->[$i],")\n"; print { $socket->[$i] } "exit\n"; # tell the child to die. my $line = readline $socket->[$i]; # read its END statement. close $socket->[$i]; waitpid($pid->[$i], 0); # wait for the signal ... } } sub mail_open { my ($file) = @_; my $expr; if ($file =~ /\.gz$/) { $expr = "gunzip -cd $file |"; } elsif ($file =~ /\.bz2$/) { $expr = "bzip2 -cd $file |"; } else { $expr = "$file"; } if (!open (INPUT, $expr)) { warn "unable to open $file: $!\n"; return 0; } return 1; } sub first_date { my (@strings) = @_; foreach my $string (@strings) { my $time = Mail::SpamAssassin::Util::parse_rfc822_date($string); return $time if defined($time) && $time; } return undef; } sub receive_date { my ($self, $header) = @_; $header ||= ''; $header =~ s/\n[ \t]+/ /gs; # fix continuation lines my @rcvd = ($header =~ /^Received:(.*)/img); my @local; my $time; if (@rcvd) { if ($rcvd[0] =~ /qmail \d+ invoked by uid \d+/ || $rcvd[0] =~ /\bfrom (?:localhost\s|(?:\S+ ){1,2}\S*\b127\.0\.0\.1\b)/) { push @local, (shift @rcvd); } if (@rcvd && ($rcvd[0] =~ m/\bby localhost with \w+ \(fetchmail-[\d.]+/)) { push @local, (shift @rcvd); } elsif (@local) { unshift @rcvd, (shift @local); } } if (@rcvd) { $time = first_date(shift @rcvd); return $time if defined($time); } if (@local) { $time = first_date(@local); return $time if defined($time); } if ($header =~ /^(?:From|X-From-Line:)\s+(.+)$/im) { my $string = $1; $string .= " $tz" unless $string =~ /(?:[-+]\d{4}|\b[A-Z]{2,4}\b)/; $time = first_date($string); return $time if defined($time); } if (@rcvd) { $time = first_date(@rcvd); return $time if defined($time); } if ($header =~ /^Resent-Date:\s*(.+)$/im) { $time = first_date($1); return $time if defined($time); } if ($header =~ /^Date:\s*(.+)$/im) { $time = first_date($1); return $time if defined($time); } return time; } ############################################################################ sub message_is_useful_by_date { my ($self, $date) = @_; if (!$self->{opt_after}) { return 1; } # not using that feature if (!$date) { return 0; } # undef or 0 date = unusable if ($date <= $self->{opt_after}) { return 0; } else { return 1; } } ############################################################################ sub index_pack { return join("\000", @_); } sub index_unpack { return split(/\000/, $_[0]); } sub scan_directory { my ($self, $class, $folder) = @_; my @files; opendir(DIR, $folder) || die "Can't open '$folder' dir: $!"; if (-f "$folder/cyrus.header") { # cyrus metadata: http://unix.lsa.umich.edu/docs/imap/imap-lsa-srv_3.html @files = grep { /^\S+$/ && !/^cyrus\.(?:index|header|cache|seen)/ } readdir(DIR); } else { # ignore ,234 (deleted or refiled messages) and MH metadata dotfiles @files = grep { /^[^,.]\S*$/ } readdir(DIR); } closedir(DIR); @files = grep { -f } map { "$folder/$_" } @files; foreach my $mail (@files) { if ($self->{opt_n}) { $self->{$class}->{index_pack($class, "f", $no, $mail)} = $no; $no++; next; } my $header; mail_open($mail) or next; while () { last if /^$/; $header .= $_; } close(INPUT); my $date = $self->receive_date($header); next if !$self->message_is_useful_by_date($date); $self->{$class}->{index_pack($class, "f", $date, $mail)} = $date; } } sub scan_file { my ($self, $class, $mail) = @_; if ($self->{opt_n}) { $self->{$class}->{index_pack($class, "f", $no, $mail)} = $no; $no++; return; } my $header; mail_open($mail) or return; while () { last if /^$/; $header .= $_; } close(INPUT); my $date = $self->receive_date($header); return if !$self->message_is_useful_by_date($date); $self->{$class}->{index_pack($class, "f", $date, $mail)} = $date; } sub scan_mailbox { my ($self, $class, $folder) = @_; my @files; if ($folder ne '-' && -d $folder) { #Got passed a directory of mboxen. $folder =~ s/\/\s*$//; #Remove trailing slash, if there opendir(DIR, $folder) || die "Can't open '$folder' dir: $!"; while($_ = readdir(DIR)) { if(/^[^\.]\S*$/ && ! -d "$folder/$_") { push(@files, "$folder/$_"); } } closedir(DIR); } else { push(@files, $folder); } foreach my $file (@files) { if ($folder =~ /\.(?:gz|bz2)$/) { die "compressed mbox folders are not supported at this time\n"; } mail_open($file) or return; my $start = 0; # start of a message my $where = 0; # current byte offset my $first = ''; # first line of message my $header = ''; # header text my $in_header = 0; # are in we a header? while (!eof INPUT) { my $offset = $start; # byte offset of this message my $header = $first; # remember first line while () { if ($in_header) { if (/^$/) { $in_header = 0; } else { $header .= $_; } } if (substr($_,0,5) eq "From ") { $in_header = 1; $first = $_; $start = $where; $where = tell INPUT; last; } $where = tell INPUT; } if ($header) { my $t; if ($self->{opt_n}) { $t = $no++; } else { $t = $self->receive_date($header); next if !$self->message_is_useful_by_date($t); } $self->{$class}->{index_pack($class, "m", $t, "$file.$offset")} = $t; } } close INPUT; } } ############################################################################ sub run_message { my ($self, $msg) = @_; my (undef, $format, $date, $mail) = index_unpack($msg); if ($format eq "f") { return $self->run_file($mail, $date); } elsif ($format eq "m") { return $self->run_mailbox($mail, $date); } } sub run_file { my ($self, $where, $date) = @_; mail_open($where) or return; # skip too-big mails if (! $self->{opt_all} && -s INPUT > BIG_BYTES) { close INPUT; return; } my @msg = (); close INPUT; &{$self->{wanted_sub}}($where, $date, \@msg); } sub run_mailbox { my ($self, $where, $date) = @_; my ($file, $offset) = ($where =~ m/(.*)\.(\d+)$/); my @msg; mail_open($file) or return; seek(INPUT,$offset,0); my $past = 0; while () { if ($past) { last if substr($_,0,5) eq "From "; } else { $past = 1; } # skip too-big mails if (! $self->{opt_all} && @msg > BIG_LINES) { close INPUT; return; } push (@msg, $_); } close INPUT; &{$self->{wanted_sub}}("$file.$offset", $date, \@msg); } ############################################################################ sub fix_globs { my ($self, $path) = @_; # replace leading tilde with home dir: ~/abc => /home/jm/abc $path =~ s/^~/$ENV{'HOME'}/; # protect/escape spaces: ./Mail/My Letters => ./Mail/My\ Letters $path =~ s/([^\\])(\s)/$1\\$2/g; my @paths; if ($] < 5.006 && Mail::SpamAssassin::Util::am_running_in_taint_mode()) { # glob is not allowed in taint-mode on 5.005 push(@paths, $path); } else { # apply csh-style globs: ./corpus/*.mbox => er, you know what it does ;) @paths = glob $path; } return @paths; } ############################################################################ 1;