# RDF::Trine::Iterator::Bindings # ----------------------------------------------------------------------------- =head1 NAME RDF::Trine::Iterator::Bindings - Stream (iterator) class for bindings query results. =head1 VERSION This document describes RDF::Trine::Iterator::Bindings version 0.111 =head1 SYNOPSIS use RDF::Trine::Iterator; my $iterator = RDF::Trine::Iterator::Bindings->new( \&data, \@names ); while (my $row = $iterator->next) { my @vars = keys %$row; # do something with @vars } =head1 METHODS =over 4 =cut package RDF::Trine::Iterator::Bindings; use utf8; use strict; use warnings; no warnings 'redefine'; use Data::Dumper; use JSON 2.0; use Text::Table; use Log::Log4perl; use Scalar::Util qw(reftype); use RDF::Trine::Iterator::Bindings::Materialized; use RDF::Trine::Iterator qw(smap); use base qw(RDF::Trine::Iterator); our ($VERSION); BEGIN { $VERSION = '0.111'; } =item C =item C Returns a new SPARQL Result interator object. Results must be either an reference to an array containing results or a CODE reference that acts as an iterator, returning successive items when called, and returning undef when the iterator is exhausted. $type should be one of: bindings, boolean, graph. =cut sub new { my $class = shift; my $stream = shift || sub { undef }; my $names = shift || []; Carp::confess unless (scalar(@_) % 2 == 0); my %args = @_; my $type = 'bindings'; my $self = $class->SUPER::new( $stream, $type, $names, %args ); my $s = $args{ sorted_by } || []; Carp::confess unless (reftype($s) eq 'ARRAY'); $self->{sorted_by} = $s; return $self; } sub _new { my $class = shift; my $stream = shift; my $type = shift; my $names = shift; my %args = @_; return $class->new( $stream, $names, %args ); } =item C<< materialize >> Returns a materialized version of the current binding iterator. =cut sub materialize { my $self = shift; my @data = $self->get_all; my @args = $self->construct_args; return $self->_mclass->_new( \@data, @args ); } sub _mclass { return 'RDF::Trine::Iterator::Bindings::Materialized'; } =item C<< project ( @columns ) >> Returns a new stream that projects the current bindings to only the given columns. =cut sub project { my $self = shift; my $class = ref($self); my @proj = @_; my $sub = sub { my $row = $self->next; return unless ($row); my $p = { map { $_ => $row->{ $_ } } @proj }; return $p; }; my $args = $self->_args(); my $proj = $class->new( $sub, [@proj], %$args ); return $proj; } =item C Performs a natural, nested loop join of the two streams, returning a new stream of joined results. =cut sub join_streams { my $self = shift; my $a = shift; my $b = shift; # my $bridge = shift; my %args = @_; Carp::confess unless ($a->isa('RDF::Trine::Iterator::Bindings')); Carp::confess unless ($b->isa('RDF::Trine::Iterator::Bindings')); my $l = Log::Log4perl->get_logger("rdf.trine.iterator.bindings"); my @join_sorted_by; if (my $o = $args{ orderby }) { my $req_sort = join(',', map { $_->[1]->name => $_->[0] } @$o); my $a_sort = join(',', $a->sorted_by); my $b_sort = join(',', $b->sorted_by); if ($l->is_debug) { $l->debug('---------------------------'); $l->debug('REQUESTED SORT in JOIN: ' . Dumper($req_sort)); $l->debug('JOIN STREAM SORTED BY: ' . Dumper($a_sort)); $l->debug('JOIN STREAM SORTED BY: ' . Dumper($b_sort)); } my $actual_sort; if (substr( $a_sort, 0, length($req_sort) ) eq $req_sort) { $l->debug("first stream is already sorted. using it in the outer loop."); } elsif (substr( $b_sort, 0, length($req_sort) ) eq $req_sort) { $l->debug("second stream is already sorted. using it in the outer loop."); ($a,$b) = ($b,$a); } else { my $a_common = join('!', $a_sort, $req_sort); my $b_common = join('!', $b_sort, $req_sort); if ($a_common =~ qr[^([^!]+)[^!]*!\1]) { # shared prefix between $a_sort and $req_sort? $l->debug("first stream is closely sorted ($1)."); } elsif ($b_common =~ qr[^([^!]+)[^!]*!\1]) { # shared prefix between $b_sort and $req_sort? $l->debug("second stream is closely sorted ($1)."); ($a,$b) = ($b,$a); } } @join_sorted_by = ($a->sorted_by, $b->sorted_by); } my $stream = $self->nested_loop_join( $a, $b, %args ); $l->debug("JOINED stream is sorted by: " . join(',', @join_sorted_by)); $stream->{sorted_by} = \@join_sorted_by; return $stream; } =item C<< nested_loop_join ( $outer, $inner ) >> Performs a natural, nested loop join of the two streams, returning a new stream of joined results. Note that the values from the C<< $inner >> iterator are fully materialized for this join, and the results of the join are in the order of values from the C<< $outer >> iterator. This suggests that: * If sorting needs to be preserved, the C<< $outer >> iterator should be used to determine the result ordering. * If one iterator is much smaller than the other, it should likely be used as the C<< $inner >> iterator since materialization will require less total memory. =cut sub nested_loop_join { my $self = shift; my $astream = shift; my $bstream = shift; # my $bridge = shift; my %args = @_; Carp::confess unless ($astream->isa('RDF::Trine::Iterator::Bindings')); Carp::confess unless ($bstream->isa('RDF::Trine::Iterator::Bindings')); my $l = Log::Log4perl->get_logger("rdf.trine.iterator.bindings"); ################################################ ### BNODE MAP STUFF my $a_extra = $astream->extra_result_data || {}; my $b_extra = $bstream->extra_result_data || {}; my (%a_map, %b_map); foreach my $h (@{ $a_extra->{'bnode-map'} || [] }) { foreach my $id (keys %$h) { my @values = @{ $h->{ $id } }; push( @{ $a_map{ $id } }, @values ); } } foreach my $h (@{ $b_extra->{'bnode-map'} || [] }) { foreach my $id (keys %$h) { my @values = @{ $h->{ $id } }; push( @{ $b_map{ $id } }, @values ); } } my $a_map = (%a_map) ? \%a_map : undef; my $b_map = (%b_map) ? \%b_map : undef; ################################################ my @names = RDF::Trine::_uniq( map { $_->binding_names() } ($astream, $bstream) ); my $a = $astream->project( @names ); my $b = $bstream->project( @names ); my @data = $b->get_all(); no warnings 'uninitialized'; my $inner_index; my $rowa; my $need_new_a = 1; my $sub = sub { OUTER: while (1) { if ($need_new_a) { $l->debug("### fetching new outer tuple"); $rowa = $a->next; $inner_index = 0; $need_new_a = 0; } $l->debug("OUTER: " . Dumper($rowa)); return undef unless ($rowa); LOOP: while ($inner_index <= $#data) { my $rowb = $data[ $inner_index++ ]; $l->debug("- INNER[ $inner_index ]: " . Dumper($rowb)); $l->debug("[--JOIN--] " . join(' ', map { my $row = $_; '{' . join(', ', map { join('=', $_, ($row->{$_}) ? $row->{$_}->as_string : '(undef)') } (keys %$row)) . '}' } ($rowa, $rowb))); my %keysa = map {$_=>1} (keys %$rowa); my @shared = grep { $keysa{ $_ } } (keys %$rowb); foreach my $key (@shared) { my $val_a = $rowa->{ $key }; my $val_b = $rowb->{ $key }; my $defined = 0; foreach my $n ($val_a, $val_b) { $defined++ if (defined($n)); } if ($defined == 2) { my $equal = $val_a->equal( $val_b ); if (not $equal) { my $query = $args{ query }; my $bridge = $args{ bridge }; if ($query and $bridge) { warn 'join values and bnode maps: ' . Dumper($val_a, $val_b, $a_map, $b_map) if ($a_map or $b_map); if ($a_map and $val_a->isa('RDF::Trine::Node::Blank')) { my $anames = Set::Scalar->new( @{ $a_map{ $val_a->blank_identifier } } ); my $bnames = Set::Scalar->new( RDF::Query::Algebra::Service->_names_for_node( $val_b, $query, $bridge, {} ) ); if (my $int = $anames->intersection( $bnames )) { warn "node equality based on $int"; $equal = 1; } } elsif ($b_map and $val_b->isa('RDF::Trine::Node::Blank')) { my $bnames = Set::Scalar->new( @{ $b_map{ $val_b->blank_identifier } } ); my $anames = Set::Scalar->new( RDF::Query::Algebra::Service->_names_for_node( $val_a, $query, $bridge, {} ) ); warn "anames: $anames\n"; warn "bnames: $bnames\n"; if (my $int = $anames->intersection( $bnames )) { warn "node equality based on $int"; $equal = 1; } } } else { if ($l->is_debug) { $l->logcluck("no query,bridge in args"); } } } unless ($equal) { $l->debug("can't join because mismatch of $key (" . join(' <==> ', map {$_->as_string} ($val_a, $val_b)) . ")"); next LOOP; } } } my $row = { (map { $_ => $rowa->{$_} } grep { defined($rowa->{$_}) } keys %$rowa), (map { $_ => $rowb->{$_} } grep { defined($rowb->{$_}) } keys %$rowb) }; if ($l->is_debug) { $l->debug("JOINED:"); foreach my $key (keys %$row) { $l->debug("$key\t=> " . $row->{ $key }->as_string); } } return $row; } $need_new_a = 1; } }; my $args = $astream->_args; return $astream->_new( $sub, 'bindings', \@names, %$args ); } =item C<< sorted_by >> =cut sub sorted_by { my $self = shift; my $sorted = $self->{sorted_by}; return @$sorted; } =item C<< binding_value_by_name ( $name ) >> Returns the binding of the named variable in the current result. =cut sub binding_value_by_name { my $self = shift; my $name = shift; my $row = ($self->open) ? $self->current : $self->next; if (exists( $row->{ $name } )) { return $row->{ $name }; } else { warn "No variable named '$name' is present in query results.\n"; } } =item C<< binding_value ( $i ) >> Returns the binding of the $i-th variable in the current result. =cut sub binding_value { my $self = shift; my $val = shift; my @names = $self->binding_names; return $self->binding_value_by_name( $names[ $val ] ); } =item C Returns a list of the binding values from the current result. =cut sub binding_values { my $self = shift; my $row = ($self->open) ? $self->current : $self->next; return @{ $row }{ $self->binding_names }; } =item C Returns a list of the binding names. =cut sub binding_names { my $self = shift; my $names = $self->{_names}; return @$names; } =item C Returns the name of the $i-th result column. =cut sub binding_name { my $self = shift; my $val = shift; my $names = $self->{_names}; return $names->[ $val ]; } =item C Returns the number of variable bindings in the current result. =cut sub bindings_count { my $self = shift; my $names = $self->{_names}; return scalar( @$names ) if (scalar(@$names)); return 0; } =item C Returns true if the underlying result is a set of variable bindings. =cut sub is_bindings { my $self = shift; return 1; } =item C Returns a JSON serialization of the stream data. =cut sub as_json { my $self = shift; my $max_result_size = shift || 0; my $width = $self->bindings_count; # my $bridge = $self->bridge; my @variables; for (my $i=0; $i < $width; $i++) { my $name = $self->binding_name($i); push(@variables, $name) if $name; } my $count = 0; my @sorted = $self->sorted_by; my $order = scalar(@sorted) ? JSON::true : JSON::false; my $dist = $self->_args->{distinct} ? JSON::true : JSON::false; my $data = { head => { vars => \@variables }, results => { ordered => $order, distinct => $dist }, }; my @bindings; while (!$self->finished) { my %row; for (my $i = 0; $i < $width; $i++) { my $name = $self->binding_name($i); my $value = $self->binding_value($i); if (my ($k, $v) = format_node_json($value, $name)) { $row{ $k } = $v; } } push(@{ $data->{results}{bindings} }, \%row); last if ($max_result_size and ++$count >= $max_result_size); } continue { $self->next_result } return to_json( $data ); } =item C Returns an XML serialization of the stream data. =cut sub as_xml { my $self = shift; my $max_result_size = shift || 0; my $string = ''; open( my $fh, '>', \$string ); $self->print_xml( $fh, $max_result_size ); close($fh); return $string; } =item C Returns a string table serialization of the stream data. =cut sub as_string { my $self = shift; my $max_result_size = shift || 0; my @names = $self->binding_names; my $headers = \@names; my $rows = [ map { [ @{$_}{ @names } ] } $self->get_all ]; my @rule = qw(- +); my @headers = (\q"| "); push(@headers, map { $_ => \q" | " } @$headers); pop @headers; push @headers => (\q" |"); unless ('ARRAY' eq ref $rows) { die("make_table() rows must be an AoA with rows being same size as headers"); } if ('ARRAY' eq ref $rows->[0]) { if (@$headers == @{ $rows->[0] }) { my $table = Text::Table->new(@headers); $table->rule(@rule); $table->body_rule(@rule); $table->load(@$rows); return join('', $table->rule(@rule), $table->title, $table->rule(@rule), map({ $table->body($_) } 0 .. @$rows), $table->rule(@rule) ); } else { die("make_table() rows must be an AoA with rows being same size as headers"); } } else { return ''; } } =item C<< print_xml ( $fh, $max_size ) >> Prints an XML serialization of the stream data to the filehandle $fh. =cut sub print_xml { my $self = shift; my $fh = shift; my $max_result_size = shift || 0; my $width = $self->bindings_count; my @variables; for (my $i=0; $i < $width; $i++) { my $name = $self->binding_name($i); push(@variables, $name) if $name; } no strict 'refs'; print {$fh} <<"END"; END my $t = join("\n", map { qq(\t) } @variables); my $delay_output = 0; my $delayed = ''; if ($self->extra_result_data) { $delay_output = $fh; undef $fh; open( $fh, '>', \$delayed ) or die $!; } else { print {$fh} "${t}\n"; } print {$fh} <<"END"; END my $count = 0; while (my $row = $self->next) { my @row; print {$fh} "\t\t\n"; for (my $i = 0; $i < $width; $i++) { my $name = $self->binding_name($i); my $value = $row->{ $name }; print {$fh} "\t\t\t" . $self->format_node_xml($value, $name) . "\n"; } print {$fh} "\t\t\n"; last if ($max_result_size and ++$count >= $max_result_size); } if ($delay_output) { my $extra = $self->extra_result_data; my $extraxml = ''; foreach my $tag (keys %$extra) { $extraxml .= qq[\n]; my $value = $extra->{ $tag }; foreach my $e (@$value) { foreach my $k (keys %$e) { my $v = $e->{ $k }; my @values = @$v; foreach ($k, @values) { s/&/&/g; s/] . join(',', @values) . qq[\n]; } } $extraxml .= "\n"; } my $u = URI->new('data:'); $u->media_type('text/xml'); $u->data($extraxml); my $uri = "$u"; $uri =~ s/&/&/g; $uri =~ s/\n]; print {$fh} $delayed; } print {$fh} "\n"; print {$fh} "\n"; } =begin private =item C Returns a string representation of C<$node> for use in a JSON serialization. =end private =cut sub format_node_json ($$$) { # my $bridge = shift; # return undef unless ($bridge); my $node = shift; my $name = shift; my $node_label; if(!defined $node) { return; } elsif ($node->is_resource) { $node_label = $node->uri_value; return $name => { type => 'uri', value => $node_label }; } elsif ($node->is_literal) { $node_label = $node->literal_value; return $name => { type => 'literal', value => $node_label }; } elsif ($node->is_blank) { $node_label = $node->blank_identifier; return $name => { type => 'bnode', value => $node_label }; } else { return; } } =item C<< construct_args >> Returns the arguments necessary to pass to the stream constructor _new to re-create this stream (assuming the same closure as the first argument). =cut sub construct_args { my $self = shift; my $type = $self->type; my @names = $self->binding_names; my $args = $self->_args || {}; return ($type, \@names, %{ $args }); } 1; __END__ =back =head1 DEPENDENCIES L L =head1 AUTHOR Gregory Todd Williams C<< >> =head1 COPYRIGHT Copyright (c) 2006-2009 Gregory Todd Williams. All rights reserved. This program is free software; you can redistribute it and/or modify it under the same terms as Perl itself. =cut