package Helios::Service::SolrIndexer; use strict; use warnings; use base qw(Helios::Service); use Error qw(:try); use Sys::Syslog qw(:macros); use HTTP::Request; use LWP::UserAgent; use Encode; use XML::Writer; our $VERSION = '0.01_01'; =head1 NAME Helios::Service::SolrIndexer - a demostration indexing application for the Helios job processing framework =head1 DESCRIPTION Helios::Service::SolrIndexer (SolrIndexer for short) is a simple example application to demonstrate the typical Helios application design pattern in the context of a search engine index update (in this case, Apache Solr). =head1 HELIOS CONFIG PARAMETERS SolrIndexer does require several config parameters to be defined in your Helios collective for it to function correctly. These can be placed in either helios.ini or the Helios Ctrl Panel (the Ctrl Panel method is recommended): =over 4 =item index_endpoint The URI endpoint of the Solr index (eg http://localhost:8983/solr) =item source_dsn The DBI datasource name of the database table to be indexed =item source_user Username to use to connect to the source database. =item source_password Password to use to connect the source database. =item source_tb Name of the table to be indexed in the source database. =item source_fields A comma-delimited string specifying which of the source table's fields should be selected and given to Solr to index. Remember, these must be set up in the Solr index's schema beforehand, or Solr will just return an error when an update is attempted. =item source_id_field The field name of the primary key in the source field in the database. The values of this field will passed in via the job arguments and a SQL WHERE clause built around it to uniquely identify the record in the database table. The contents of this field will also become the id of the document in the Solr index. =back =head1 JOB ARGUMENTS Job arguments for this service should be specified in the form: 1234 where the section contains the primary key of the source table to be indexed in the database. =head1 METHODS =head2 run($helios_job) As is typical for Helios services, run() is the main workhorse of SolrIndexer. It will be called by Helios workers to service a job. The $helios_job passed to it will be a Helios::Job object. Once run() has pulled in its configuration hashref and parsed the Helios::Job object's argument XML, run() performs 4 tasks to accomplish a job: =over 4 =item 1 Generates the SQL to retrieve the records from the database =item 2 Executes the SQL with the id given to it in the job arguments =item 3 Reformats the retrieved database record into a UTF-8 encoded XML stream for Solr (Solr requires UTF-8 encoding) =item 4 Sends the XML stream to Solr to be added to the index =back If all these steps are successful, run() calls Helios::Service->completedJob() to mark the job as completed successfully. If an error occurs, it calls Helios::Service->logMsg() to log the error message and Helios::Service->failedJob() to mark the job as failed. =cut sub run { my $self = shift; my $job = shift; my $config = $self->getConfig(); my $args = $self->getJobArgs($job); try { my $id = $args->{id}; $self->logMsg($job, LOG_INFO, 'Adding '.$config->{source_id_field}.' '.$args->{id}.' to the index'); my $sql = $self->generateSQL(); if ($self->debug) { print "SQL: $sql\n"; } my $dbresult = $self->retrieveFromDb($sql, $id); my $xml = $self->generateXML($dbresult); if ($self->debug) { print "XML: $xml\n"; } $self->updateIndex($xml); $self->logMsg($job, LOG_INFO, $config->{source_id_field}.' '.$args->{id}.' successfully added to the index'); $self->completedJob($job); } catch Helios::Error::Fatal with { my $e = shift; $self->logMsg($job, LOG_ERR, "Error: ".$e->text); $self->failedJob($job, $e->text); } catch Helios::Error::FatalNoRetry with { my $e = shift; $self->logMsg($job, LOG_ERR, "Error (permanent): ".$e->text); $self->failedJob($job, $e->text); } otherwise { my $e = shift; $self->logMsg($job, LOG_ERR, "Unexpected error: ".$e->text); $self->failedJob($job, $e->text); }; } =head2 generateSQL() Generates the SQL necessary to retrieve the database record. This method determines the correct SQL by looking at the configuration parameters defined in Helios. =cut sub generateSQL { my $self = shift; my $config = $self->getConfig(); my $table = $config->{source_tb}; my $fields = $config->{source_fields}; my $id_field = $config->{source_id_field}; return "SELECT $fields FROM $table WHERE $id_field = ?"; } =head2 retrieveFromDb($sql, $id) Given a SQL SELECT statement and a unique id, retrieveFromDb() retrieves the record identified by the $id using the supplied $sql. It returns the record in the form of a hashref. =cut sub retrieveFromDb { my $self = shift; my $sql = shift; my $id = shift; my $config = $self->getConfig(); my $dbh = $self->dbConnect($config->{source_dsn}, $config->{source_user}, $config->{source_password}); my $sth = $dbh->prepare($sql); $sth->execute($id); my $result = $sth->fetchrow_hashref(); $sth->finish(); return $result; } =head2 generateXML($hashref) Given a hashref, generateXML() takes the hashref's keys and values and turns them into an XML stream be passed to Solr. It returns this string of XML to the calling routine. =cut sub generateXML { my $self = shift; my $result = shift; my $xml; my $wtr = new XML::Writer(OUTPUT => \$xml, ENCODING => 'utf-8'); $wtr->startTag('add'); $wtr->startTag('doc'); foreach (keys %$result) { $wtr->startTag("field", "name" => $_); my $text = encode("utf-8", $result->{$_}); $wtr->characters($text); $wtr->endTag("field"); } $wtr->endTag('doc'); $wtr->endTag('add'); $wtr->end(); return $xml; } =head2 updateIndex($xml) Given a Solr XML document addition stream, updateIndex() builds an HTTP::Request object using the stream and the Solr endpoint URI. It then uses LWP::UserAgent to POST the request to Solr. If the document update is successful, updateIndex() returns the successful status (usually '200 OK' to the calling routine. If the request was not successful, the method will throw a Helios::Error::Fatal exception with the erroneous status as the message. =cut sub updateIndex { my $self = shift; my $xml = shift; my $config = $self->getConfig(); my $endpoint = $config->{index_endpoint}; # generate URL # if ($endpoint =~ /\/$/) { chop $endpoint; } # solr doesn't like // my $url = $endpoint . '/update'; # put together the request my $request = HTTP::Request->new(POST => $url); $request->header('Content-type' => 'text/xml', 'charset' => 'utf-8'); $request->content($xml); my $ua = LWP::UserAgent->new(); my $response = $ua->request($request); unless ($response->is_success) { throw Helios::Error::Fatal($response->status_line); } return $response->status_line(); } 1; __END__ =head1 SEE ALSO L, L, L, , L =head1 AUTHOR Andrew Johnson, Elajandy at cpan.orgE =head1 COPYRIGHT AND LICENSE This library is free software; you can redistribute it and/or modify it under the same terms as Perl itself, either Perl version 5.8.0 or, at your option, any later version of Perl 5 you may have available. =head1 WARRANTY This software comes with no warranty of any kind. =cut