#!/usr/bin/perl -w # # dbjoin.pm # Copyright (C) 1991-2008 by John Heidemann # $Id$ # # This program is distributed under terms of the GNU general # public license, version 2. See the file COPYING # in $dblibdir for details. # package Fsdb::Filter::dbjoin; =head1 NAME dbjoin - join two tables on common columns =head1 SYNOPSIS dbjoin [-Sid] --input table1.fsdb --input table2.fsdb [-nNrR] column [column...] OR cat table1.fsdb | dbjoin [-Sid] --input table2.fsdb [-nNrR] column [column...] =head1 DESCRIPTION Does a natural, inner join on TABLE1 and TABLE2 the specified columns. With the C<-a> option, or with C<-t outer> it will do a natural, full outer join. By default, data will be sorted lexically, but the usual sorting options can be mixed with the column specification. If data is already sorted, dbjoin will run more efficiently with the C<-S> option. Because two tables are required, input is typically in files. Standard input is accessible by the file "-". =head1 OPTIONS =over 4 =item B<-a> or B<--all> Perform a I, include non-matches (each record which doesn't match at all will appear once). Default is an I. =item B<-t TYPE> or B<--type TYPE> Explicitly specify the join type. TYPE must be inner, outer, left, or right. Currently only inner and outer are implemented. =item B<-S> or B<--pre-sorted> assume (and verify) data is already sorted =item B<-e E> or B<--empty E> give value E as the value for empty (null) records =item B<-T TmpDir> where to put tmp files. Also uses environment variable TMPDIR, if -T is not specified. Default is /tmp. =back Sort specification options (can be interspersed with column names): =over 4 =item B<-r> or B<--descending> sort in reverse order (high to low) =item B<-R> or B<--ascending> sort in normal order (low to high) =item B<-n> or B<--numeric> sort numerically =item B<-N> or B<--lexical> sort lexicographically =back =for comment begin_standard_fsdb_options This module also supports the standard fsdb options: =over 4 =item B<-d> Enable debugging output. =item B<-i> or B<--input> InputSource Read from InputSource, typically a file name, or C<-> for standard input, or (if in Perl) a IO::Handle, Fsdb::IO or Fsdb::BoundedQueue objects. =item B<-o> or B<--output> OutputDestination Write to OutputDestination, typically a file name, or C<-> for standard output, or (if in Perl) a IO::Handle, Fsdb::IO or Fsdb::BoundedQueue objects. =item B<--autorun> or B<--noautorun> By default, programs process automatically, but Fsdb::Filter objects in Perl do not run until you invoke the run() method. The C<--(no)autorun> option controls that behavior within Perl. =item B<--help> Show help. =item B<--man> Show full manual. =back =for comment end_standard_fsdb_options =head1 SAMPLE USAGE =head2 Input: #fsdb sid cid 1 10 2 11 1 12 2 12 And in the file F: #fsdb cid cname 10 pascal 11 numanal 12 os =head2 Command: cat DATA/reg.fsdb | dbsort -n cid | dbjoin -i - -i DATA/classes -n cid =head2 Output: #fsdb cid sid cname 10 1 pascal 11 2 numanal 12 1 os 12 2 os # - COMMENTS: # | /home/johnh/BIN/DB/dbsort -n cid # DATA/classes COMMENTS: # joined comments: # | /home/johnh/BIN/DB/dbjoin - DATA/classes cid =head1 SEE ALSO L. =head1 CLASS FUNCTIONS =cut @ISA = qw(Fsdb::Filter); ($VERSION) = 2.0; use strict; use Pod::Usage; use Carp; use Fsdb::Filter; use Fsdb::IO::Reader; use Fsdb::IO::Writer; use Fsdb::Filter::dbpipeline qw(dbpipeline_filter dbsort); =head2 new $filter = new Fsdb::Filter::dbjoin(@arguments); Create a new dbjoin object, taking command-line arguments. =cut sub new ($@) { my $class = shift @_; my $self = $class->SUPER::new(@_); bless $self, $class; $self->set_defaults; $self->parse_options(@_); $self->SUPER::post_new(); return $self; } =head2 set_defaults $filter->set_defaults(); Internal: set up defaults. =cut sub set_defaults ($) { my($self) = @_; $self->SUPER::set_defaults(); $self->{_pre_sorted} = 0; $self->{_sort_argv} = []; $self->{_join_type} = 'inner'; $self->{_tmpdir} = defined($ENV{'TMPDIR'}) ? $ENV{'TMPDIR'} : "/tmp"; } =head2 parse_options $filter->parse_options(@ARGV); Internal: parse command-line arguments. =cut sub parse_options ($@) { my $self = shift @_; my(@argv) = @_; $self->get_options( \@argv, 'help|?' => sub { pod2usage(1); }, 'man' => sub { pod2usage(-verbose => 2); }, 'a|all!' => sub { $self->{_join_type} = 'outer'; }, 'autorun!' => \$self->{_autorun}, 'close!' => \$self->{_close}, 'd|debug+' => \$self->{_debug}, 'e|empty=s' => \$self->{_empty}, 'i|input=s@' => sub { $self->parse_io_option('inputs', @_); }, 'log!' => \$self->{_logprog}, 'o|output=s' => sub { $self->parse_io_option('output', @_); }, 'S|pre-sorted+' => \$self->{_pre_sorted}, 't|type=s' => \$self->{_join_type}, 'T|tmpdir|tempdir=s' => \$self->{_tmpdir}, # sort key options: 'n|numeric' => sub { $self->parse_sort_option(@_); }, 'N|lexical' => sub { $self->parse_sort_option(@_); }, 'r|descending' => sub { $self->parse_sort_option(@_); }, 'R|ascending' => sub { $self->parse_sort_option(@_); }, '<>' => sub { $self->parse_sort_option('<>', @_); }, ) or pod2usage(2); croak $self->{_prog} . ": internal error, extra arguments.\n" if ($#argv != -1); } =head2 setup $filter->setup(); Internal: setup, parse headers. =cut sub setup ($) { my($self) = @_; croak $self->{_prog} . ": no sorting key specified.\n" if ($#{$self->{_sort_argv}} == -1); croak $self->{_prog} . ": join types left and right are not currently supported.\n" if ($self->{_join_type} eq 'left' || $self->{_join_type} eq 'right'); croak $self->{_prog} . ": unknown join type " . $self->{_join_type} . ".\n" if (!($self->{_join_type} eq 'inner' || $self->{_join_type} eq 'outer')); $self->setup_exactly_two_inputs; # automatic input sorting? if (!$self->{_pre_sorted}) { foreach (0..1) { my($new_reader, $new_thread) = dbpipeline_filter($self->{_inputs}[$_], [-comment_handler => $self->create_delay_comments_sub], dbsort('--nolog', @{$self->{_sort_argv}})); $self->{_pre_sorted_inputs}[$_] = $self->{_inputs}[$_]; $self->{_ins}[$_] = $new_reader; $self->{_in_threads}[$_] = $new_thread; }; } else { $self->finish_io_option('inputs', -comment_handler => $self->create_delay_comments_sub); }; croak $self->{_prog} . ": cannot handle input data with different field separators.\n" if (!defined($self->{_ins}[0]->compare($self->{_ins}[1]))); # # figure the joined columns # my $i = 0; my %join_keys; my $key; my @output_columns; my %all_keys; my @copy_codes = ('', ''); for $key (@{$self->{_sort_argv}}) { next if ($key =~ /^-/); # we deal with this later my @col_i; foreach (0..1) { $col_i[$_] = $self->{_ins}[$_]->col_to_i($key); die($self->{_prog} . ": column ``$key'' is not in " . ($_ == 0 ? "left" : "right") ." join source.\n") if (!defined($col_i[$_])); $copy_codes[$_] .= '$out_fref->[' . $i . '] = $fref[' . $_ . "]->[$col_i[$_]];\n"; }; push(@output_columns, $key); $join_keys{$key} = $i; $all_keys{$key} = $i; $i++; } # # and the rest # my $copy_codes = ''; my $col_i; foreach $key (@{$self->{_ins}[0]->cols}) { next if (defined($all_keys{$key})); # already got it push(@output_columns, $key); $col_i = $self->{_ins}[0]->col_to_i($key); defined($col_i) or die "assert"; $copy_codes[0] .= '$out_fref->[' . $i . '] = $fref[0]->[' . $col_i . '];' . "\n"; $all_keys{$key} = $i; $i++; }; foreach $key (@{$self->{_ins}[1]->cols}) { next if (defined($join_keys{$key})); # detect duplicates that are not joined upon (error) # (this represents duplicate fieds in the two merged things). # Reject this because we don't want to silently prefer one to the other. if (defined($all_keys{$key})) { croak $self->{_prog} . ": column $key is in both of the joined files, but is not joined upon.\nAll non-joined columns must be unique.\nBefore joining you must\nrename one of the source columns\nor remove one of the duplicate input columns.\n"; }; push(@output_columns, $key); $col_i = $self->{_ins}[1]->col_to_i($key); defined($col_i) or die "assert"; $copy_codes[1] .= '$out_fref->[' . $i . '] = $fref[1]->[' . $col_i . '];' . "\n"; $all_keys{$key} = $i; $i++; }; $self->{_copy_codes} = \@copy_codes; $self->finish_io_option('output', -clone => $self->{_ins}[0], -cols => \@output_columns); # # comparision code # $self->{_compare_code} = $self->create_compare_code(@{$self->{_ins}}, 'fref[0]', 'fref[1]');; print "COMPARE CODE:\n\t" . $self->{_compare_code} . "\n" if ($self->{_debug}); foreach (0..1) { $self->{_compare_code_ins}[$_] = $self->create_compare_code($self->{_ins}[$_], $self->{_ins}[$_], "prev_fref[$_]", "fref[$_]"); }; } =head2 run $filter->run(); Internal: run over each rows. =cut sub run ($) { my($self) = @_; # # Eval compare_sub in this lexical context # of our variables. # my @prev_fref; my @fref; my(@right_frefs); my $compare_sub; my @check_compare_subs; my $code = '$compare_sub = ' . $self->{_compare_code} . "\n" . '$check_compare_subs[0] = ' . $self->{_compare_code_ins}[0] . "\n" . '$check_compare_subs[1] = ' . $self->{_compare_code_ins}[1] . "\n"; eval $code; $@ && croak $self->{_prog} . ": internal eval error in compare code: $@.\n"; my @fastpath_subs; foreach (0..1) { $fastpath_subs[$_] = $self->{_ins}[$_]->fastpath_sub(); }; my $out_fastpath_sub = $self->{_out}->fastpath_sub(); # # Set up some "macros". # my $out_fref = []; my $copy_left_to_out_fref; my $copy_right_to_out_fref; $code = '$copy_left_to_out_fref = sub {' . "\n" . $self->{_copy_codes}[0] . "\n};\n" . '$copy_right_to_out_fref = sub {' . "\n" . $self->{_copy_codes}[1] . "\n};\n"; eval $code; $@ && croak $self->{_prog} . ": internal eval error in copy code: $@.\n$code\n"; my $reset_out_fref = sub { $out_fref = [ ($self->{_empty}) x $self->{_out}->ncols ]; }; my $emit_non_match_left = sub { &{$reset_out_fref}(); &{$copy_left_to_out_fref}(); &{$out_fastpath_sub}($out_fref); }; my $emit_non_match_right = sub { &{$reset_out_fref}(); &{$copy_right_to_out_fref}(); &{$out_fastpath_sub}($out_fref); }; if ($self->{_join_type} eq 'inner') { $emit_non_match_left = $emit_non_match_right = sub {}; }; my $advance_left = sub { $prev_fref[0] = $fref[0]; $fref[0] = &{$fastpath_subs[0]}(); if (defined($fref[0])) { &{$check_compare_subs[0]}() <= 0 or die "dbjoin: left stream is unsorted.\n"; }; }; my $advance_right = sub { $prev_fref[1] = $fref[1]; $fref[1] = &{$fastpath_subs[1]}(); if (defined($fref[1])) { &{$check_compare_subs[1]}() <= 0 or die "dbjoin: right stream is unsorted.\n"; }; }; # # join the data (assumes data already sorted) # # Algorithm (standard "Sort Merge Join"): # # more: walk through left (0) until it matches right # emitting non-matching records as we go # then we're in a match: # find all matching rights and save them (maybe more than mem) # then walk all matching lefts: # for each one, output all the matches against the saved rights # when we get a non-matching left, discard our saved rights # and do more (above) # finally, we may have leftover, non-matching rights, output them as non-matches # # Oh, and along the way, verify the inputs are actually sorted. # # The above algorithm (sort/merge join) is theoretically optimal # at O(n log n) in n input records, but not experimentally optimal # if left or right is small. # In the current implementation, if |right| >> |left|, we lose. # # As an exercise to the reader, we could allow different # algorithms. If right (or left) is small, we should just read # it into memory and do a hash join against it. # See for details. # # prime the pump foreach (0..1) { $fref[$_] = &{$fastpath_subs[$_]}(); }; # # Main loop: walk through left # until_eof: for (;;) { # eof on either stream? quit main loop last if (!defined($fref[0]) || !defined($fref[1])); # eat data until we have a match my $left_right_cmp; until_match: for (;;) { defined($fref[0]) or die "assert"; defined($fref[1]) or die "assert"; $left_right_cmp = &{$compare_sub}(); if ($left_right_cmp < 0) { # left wins, so eat left &{$emit_non_match_left}(); &{$advance_left}(); last until_eof if (!defined($fref[0])); } elsif ($left_right_cmp > 0) { # right wins, eat right &{$emit_non_match_right}(); &{$advance_right}(); last until_eof if (!defined($fref[1])); } else { last until_match; }; }; # # match, whoo hoo! # $left_right_cmp == 0 or die "assert"; defined($fref[0]) or die "assert"; defined($fref[1]) or die "assert"; # accumulate rights # Sigh, we save them in memory. # xxx: we should really spill to disk if we get too many. @right_frefs = (); accumulate_rights: for (;;) { push(@right_frefs, $fref[1]); warn "internal warning: dbjoin: lots of matching rows accumulating in memory, should really extend to spill to disk\n" if ($#right_frefs == 2000); # just emit warning once &{$advance_right}(); last accumulate_rights if (!defined($fref[1])); $left_right_cmp = &{$compare_sub}(); last accumulate_rights if ($left_right_cmp != 0); }; (!defined($fref[1]) || $left_right_cmp != 0) or die "assert"; # # Ok, this is a bit gross, but we do it anyway. # Right is now one beyond a match. # Save it aside and we'll come back to it later. # This way we can iterate with $fref[1] over our saved @right_frefs, # keeping our &{$fns}()'s happy. # my $right_fref_past_match = $fref[1]; # now walk lefts &{$reset_out_fref}(); walk_lefts: for (;;) { &{$copy_left_to_out_fref}(); foreach my $matching_right (@right_frefs) { $fref[1] = $matching_right; # hacky, but satisifys next line's call &{$copy_right_to_out_fref}(); &{$out_fastpath_sub}($out_fref); }; &{$advance_left}(); last walk_lefts if (!defined($fref[0])); $left_right_cmp = &{$compare_sub}(); last walk_lefts if ($left_right_cmp != 0); }; (!defined($fref[0]) || $left_right_cmp != 0) or die "assert"; # Put back our one-beyond right. Could even be eof. $fref[1] = $right_fref_past_match; # ok, we're now past a match, # and maybe at eof on one stream. # loop back to try again. (!defined($fref[0]) || !defined($fref[1]) || $left_right_cmp != 0) or die "assert"; }; # Ok, now at least one side or the other is eof. # so drain both sides. while (defined($fref[0])) { &{$emit_non_match_left}(); &{$advance_left}(); }; while (defined($fref[1])) { &{$emit_non_match_right}(); &{$advance_right}(); }; # Reap the theads to suppress a warning (they SHOULD be done # because they gave us eof, but who knows). if (!$self->{_pre_sorted}) { foreach (0..1) { $self->{_in_threads}[$_]->join; }; }; } =head1 AUTHOR and COPYRIGHT Copyright (C) 1991-2008 by John Heidemann This program is distributed under terms of the GNU general public license, version 2. See the file COPYING with the distribution for details. =cut 1;