The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.
<html>
<head><title>Using Multiload with DBD::Teradata</title></head>
<body>
<center><h2>Using Multiload with DBD::Teradata</h2></center>
<p>
DBD::Teradata 2.2.0 provides support for IMPORT multiload (DELETE multiload
will be incorporated in a future release).
Using Multiload with DBD::Teradata requires some special considerations.
<p>
<b>DBD::Teradata does not support recovery logging of utilities; i.e.,
you cannot PAUSE and then restart DBD::Teradata-based utility applications.</b>
If DBD::Teradata detects a failure during a multiload operation, it will
automatically submit a RELEASE MLOAD request to attempt to clean up.
<p>

The attributes hash provided to tdat_UtilitySetup includes the following
keys:<p>
<table border=1>
<tr bgcolor=orange><th>Attribute</th><th>Required ?</th><th width="70%">Description</th></tr>
<tr>
	<td align=center valign=top><b>Utility</b></td>
	<td align=center valign=top>Yes</td>
	<td valign=top>set to 'MLOAD'</td></tr>
<tr>
	<td align=center valign=top><b>SQL</b></td>
	<td align=center valign=top>Yes</td>
	<td valign=top>
set to an arrayref of multistatement SQL requests.<p>
Each multistatment request is considered a separate Multiload job,
as per multiple .DML statements in a Multiload script.
The SQL specified includes the DML extension statements
defined for the MLOAD utility, namely:<p>
<ul>
<li>&lt; MARK | IGNORE &gt; DUPLICATE [ &lt; INSERT | UPDATE &gt; ] ROWS
<li>&lt; MARK | IGNORE &gt; MISSING [ &lt; UPDATE | DELETE &gt; ] ROWS
<li>DO INSERT FOR [ MISSING UPDATE ] ROWS
</ul><p>
These statement should precede the actual INSERT/UPDATE/DELETE statements.
If none of these statements are specified the default behavior is
"MARK DUPLICATE ROWS; MARK MISSING ROWS;"</td></tr>
<tr>
	<td align=center valign=top><b>Source</b></td>
	<td align=center valign=top>Yes</td>
	<td valign=top>
can be set to either a subroutine reference, a file description,
or a connection handle to be used as the control session of a fastexport
job for Loopback (see below)
which will act as the source of the multiload data.<p>

A file description is specified as<p>
<code>&lt; VARTEXT 'c' | INDICDATA | DATA &gt; <i>filename</i></code><p>
where 'c' is the separator character. Use '|' for compatibility with
the default Teradata VARTEXT file format. INDICDATA indicates a fastload
formatted file with indicator bytes, and DATA indicates a fastload
formatted file without indicator bytes.<p>
If a subroutine reference is specified, the specified subroutine is
responsible for providing a multiload bitmask when binding
parameter data. The bitmask provides a means for an application
to implement the conditional APPLY supported in Multiload scripts
(i.e., "APPLY dmllabel WHERE condition").
The bitmask is supplied to the statement handle using the
<code><b>tdat_mlmask</b></code> attribute, which is set to either a scalar
value, or an arrayref of bitmasks (for array binding).
When multiple multiload jobs are defined (i.e., more than 1 request is
specified with the SQL attribute), each job receives its own copy of
each record.
This bitmask indicates which multiload jobs
are to receive a copy of the associated record. If bit 0 is set, then
the first job receives a copy of the associated record;
if bit 1 is set, the second job receives a copy of the record.
<ul>
<li>If a scalar value is supplied with tdat_mlmask, and array binding
is used, then the scalar is applied to all the bound records.
<li>if no tdat_mlmask value is provided, the default is all jobs get
a copy of all records
<li>if an arrayref supplied with tdat_mlmask is shorter than the
array(s) used to bind records, a copy of the additional records will
be supplied to each job.
<li>for file or Loopback sources, all jobs get copies of all records.
</ul><p>
In addition, when binding rows, the subroutine is responsible for
keeping track of the current size of the output buffer. Assuming
the final size of the record can be computed, an additional 14 bytes
per record is added by the driver for Multiload control information,
and the size in the buffer will be multiplied by the number of multiload jobs
to which the record is applied. In summary, each record will add the following
number of bytes to the output buffer:<p>
<code>total_bytes = (record_length + 14 + indicator_byte_count) * number_of_jobs</code><p>
Note that the subroutine <b>supplies each record only once</b>; DBD::Teradata will
perform the replication of each record, based on the bitmask described above.
</td></tr>
<tr>
	<td align=center valign=top><b>SourceFields</b></td>
	<td align=center valign=top>Yes</td>
	<td valign=top> a USING clause string that describes the source record format</td></tr>
<tr>
	<td align=center valign=top><b>Report</b></td>
	<td align=center valign=top>No</td>
	<td valign=top>a subroutine reference which is called with a status message as the Multiload progresses.</td></tr>
<tr>
	<td align=center valign=top><b>LogTables</b></td>
	<td align=center valign=top>No</td>
	<td valign=top>a hashref that maps the tables to be multiloaded to
their work, error, and duplicate tables. The tablenames are the keys,
and the values are arrayrefs with the names of the work/error/duplicate tables
(see examples below). Default is the name of each table specified in the
SQL requests, prefixed with WT_, ET_, and UV_ for the work,error, and duplicate
table, respectively (Tablenames longer than 27 characters are truncated).
<p>
Note that the worktables are automatically dropped when the multiload completes.
Also, empty error and duplicate tables are also dropped.</td></tr>
<tr>
	<td align=center valign=top><b>Checkpoint</b></td>
	<td align=center valign=top>No</td>
	<td valign=top>the number of records processed between each checkpoint</td></tr>
<tr>
	<td align=center valign=top><b>Sessions</b></td>
	<td align=center valign=top>Yes</td>
	<td valign=top>the number of multiload sessions to use</td></tr>
<tr>
	<td align=center valign=top><b>CheckpointCallback</b></td>
	<td align=center valign=top>No</td>
	<td valign=top>a subroutine reference that is called each time a checkpoint is taken</td></tr>
<tr>
	<td align=center valign=top><b>Context</b></td>
	<td align=center valign=top>No</td>
	<td valign=top>a hashref that can contain anything the application
desires; it will be passed to the Source subroutine reference each time
records are to be collected</td></tr>
<tr>
	<td align=center valign=top><b>MP</b></td>
	<td align=center valign=top>No</td>
	<td valign=top>when set to a nonzero numeric value, or the string 'nothreads',
causes multiple processes to be fork()'d, 1 per multiload session.
When set to the string 'threads', causes multiple threads to be fork()'d, 1 per multiload session.
This attribute may provide performance improvement on certain platforms (esp. multiprocessor platforms).</td></tr>
<tr>
	<td align=center valign=top><b>Loopback</b></td>
	<td align=center valign=top>No</td>
	<td valign=top>when set to a SQL SELECT statement, causes a fastexport job
to be created from which data will be taken and supplied to the multiload job.
Note that the MP attribute must be specified as well. Each fork' process gets
both a fastexport and a multiload session, and data is transferred directly
from the fastexport session to the multiload session.</td></tr>
<tr>
	<td align=center valign=top><b>ErrorLimit</b></td>
	<td align=center valign=top>No</td>
	<td valign=top>specifies the maximum number of errors to be permitted during the
acquisition phase before the multiload is terminated. Default is 1,000,000 errors.</td></tr>
<tr>
	<td align=center valign=top><b>ActivityCounts</b></td>
	<td align=center valign=top>No</td>
	<td valign=top>specifies a hashref to receive the Insert/Update/Delete activity counts
for each MLOAD'ed table. The hashref will be keyed by the table names, and each
table's entry wili be a hashref with the keys "Inserts", "Updates", and "Deletes",
and each value is the total rowcount applied to each activity for the given table
(see example below)</td></tr>
<tr>
	<td align=center valign=top><b>RequestSize</b></td>
	<td align=center valign=top>No</td>
	<td valign=top>integer size in bytes; sets the maximum request buffer size to use for
	the utility sessions. Larger values permit more tuples to be sent to the DBMS in a single
	message. Default is 64256; maximum value is 1,048,000. Setting to a value outside that
	range, or for pre-V2R6.0 Teradata servers, will be silently ignored.</td></tr>
<tr>
	<td align=center valign=top><b>Retry</b></td>
	<td align=center valign=top>No</td>
	<td valign=top>either a scalar or arrayref; if scalar, indicates the number of seconds
	to wait between retrying to start the multiload in the event the prior attempt failed due to
	no remaining utility job slots available on the database. If an arrayref, the first
	element is the number of seconds between retries, and the second element is the maximum
	number of retry attempts. The scalar form will retry indefinitely.</td></tr>
</table><p>

<h3>Examples</h3>
<h4>Multijob MP Multload with subroutine source</h4>
<pre><code>
    my $ctldbh = DBI-&gt;connect('dbi:Teradata:dbc', $username, $password,
        { RaiseError =&gt; 0, PrintError =&gt; 0, tdat_lsn =&gt; 0 });

    my %act_counts = ();	# to receive each table's activity counts
    my $total = $ctldbh-&gt;tdat_UtilitySetup(
        {
        Utility =&gt; 'MLOAD',
        Sessions =&gt; $sesscount,
        SQL =&gt; [
            'DO INSERT FOR MISSING UPDATE ROWS;
            UPDATE alltypetst SET col4 = :col4 WHERE col1 = :col1;
            INSERT INTO alltypetst VALUES(:col1, :col2, :col3, :col4, :col5, :col6,
                :col7, :col8, :col9, :col10, :col11, :col12, :col13);',

            "MARK DUPLICATE INSERT ROWS;
            INSERT INTO alltypetst2 VALUES(:col1, :col2, :col3, :col4, :col5, :col6,
                :col7, :col8, :col9, :col10, :col11, TIME '10:20:30', :col13);",

            'MARK MISSING DELETE ROWS;
            DELETE FROM alltypetst3 WHERE col1 = :col1;'
        ],
        SourceFields =&gt; 'USING (col1 integer, col2 smallint, col3 byteint, col4 char(20),
                        col5 varchar(100), col6 float, col7 decimal(2,1),
                        col8 decimal(4,2), col9 decimal(8,4), col10 float, col11 date,
                        col12 time, col13 timestamp(0))',
        Checkpoint =&gt; 20000,
        LogTables =&gt; {
            alltypetst =&gt; [ wt_alltt, et_alltt, uv_alltt ],
            alltypetst2 =&gt; [ wt_alltt2, et_alltt2, uv_alltt2 ],
            alltypetst3 =&gt; [ wt_alltt3, et_alltt3, uv_alltt3 ],
            },
        Report =&gt; \&report_cb,
        Source =&gt; \&ml_get_data,
        CheckpointCallback =&gt; \&mlcheckpoint,
        Context =&gt; {
            Count =&gt; \$count,
            Runtime =&gt; \$mlstarted,
            Base =&gt; [ 0, 1000000 ],
            JobCount =&gt; 3
            },
        ActivityCounts => \%act_counts,
        MP => 1,
        Retry => [ 120, 3] # retry every 2 minutes up to 3 times
        });
#
#	display our activity counts
#
    if ($total && ($total &gt; 0)) {
    print '
Table    Inserts    Updates    Deletes
----------------------------------------
';
        print $act_counts{$_}, "\t",
        	$act_counts{$_}->{Inserts}, "\t",
        	$act_counts{$_}->{Updatess}, "\t",
        	$act_counts{$_}->{Deletes}, "\n"
        	foreach (keys %act_counts);
    }

    print $ctldbh-&gt;errstr
        unless ($total && ($total &gt; 0));

sub mlcheckpoint {
    my ($function, $rowcount, $ctxt) = @_;

    my $flstarted = $ctxt-&gt;{Runtime};
    $$flstarted = time,
    print "$rowcount MLOAD sessions logged on.\n" and
    return 1
    if ($function eq 'INIT');

    $$flstarted  = time - $$flstarted,
    print "$rowcount rows loaded.\n" and
    return 1
    if ($function eq 'FINISH');

    print "Check point at $rowcount rows\n" and
    return 1
    if ($function eq 'CHECKPOINT');
    1;
}

sub ml_get_data {
    my ($function, $sth, $sessnum, $maxrows, $ctxt) = @_;
    my ($ary, $rc, $rowcnt);

    $sth-&gt;{tdat_raw} = $ctxt-&gt;{Mode}, return -1
        if ($function eq 'INIT');

    return 0 if (($function eq 'MOREDATA') && (${$ctxt-&gt;{Count}} &gt;= 10000));

    if ($function eq 'MOREDATA') {
        $ary = collect_recs_each($ctxt-&gt;{Count},
            ($maxrows &gt; 280) ? 280 : $maxrows, $ctxt-&gt;{JobCount}, $ctxt-&gt;{Base}-&gt;[$sessnum]);
        print "Sending " . ($#{$$ary[0]}+1) . " rows for session $sessnum...\n";
        foreach my $i (0..$#$ary) {
            $rc = $sth-&gt;bind_param_array($i+1, $$ary[$i]);
        }

        return $#{$$ary[0]}+1;
    }

    print "Got IO Finish for $sessnum\n" and return -1
        if ($function eq 'FINISH');

    print "Got CHECKPOINT for $sessnum\n" and return -1
        if ($function eq 'CHECKPOINT');

    return 0;
}

sub report_cb {
    my ($msg) = @_;
    print $msg, "\n";
}

</code>
</pre>
<h4>Multithreaded MP Multload with INDICDATA source</h4>
<pre>
<code>
    my $ctldbh = DBI-&gt;connect('dbi:Teradata:dbc', $username, $password,
        { RaiseError =&gt; 0, PrintError =&gt; 0, tdat_lsn =&gt; 0 });

    my $total = $ctldbh-&gt;tdat_UtilitySetup(
        {
        Utility =&gt; 'MLOAD',
        Sessions =&gt; $sesscount,
        SQL =&gt; [
            'DO INSERT FOR MISSING UPDATE ROWS;
            UPDATE alltypetst SET col4 = :col4 WHERE col1 = :col1;
            INSERT INTO alltypetst VALUES(:col1, :col2, :col3, :col4, :col5, :col6,
                :col7, :col8, :col9, :col10, :col11, :col12, :col13);',

            "MARK DUPLICATE INSERT ROWS;
            INSERT INTO alltypetst2 VALUES(:col1, :col2, :col3, :col4, :col5, :col6,
                :col7, :col8, :col9, :col10, :col11, TIME '10:20:30', :col13);",

            'MARK MISSING DELETE ROWS;
            DELETE FROM alltypetst3 WHERE col1 = :col1;'
        ],
        SourceFields =&gt; 'USING (col1 integer, col2 smallint, col3 byteint, col4 char(20),
                        col5 varchar(100), col6 float, col7 decimal(2,1),
                        col8 decimal(4,2), col9 decimal(8,4), col10 float, col11 date,
                        col12 time, col13 timestamp(0))',
        Checkpoint =&gt; 20000,
        LogTables =&gt; {
            alltypetst =&gt; [ wt_alltt, et_alltt, uv_alltt ],
            alltypetst2 =&gt; [ wt_alltt2, et_alltt2, uv_alltt2 ],
            alltypetst3 =&gt; [ wt_alltt3, et_alltt3, uv_alltt3 ],
            },
        Report =&gt; \&report_cb,
        Source =&gt; 'INDICDATA rawdata.dat',
        CheckpointCallback =&gt; \&mlcheckpoint,
        ActivityCounts => \%act_counts,
        MP => 'threads'
        });
#
#	display our activity counts
#
    if ($total && ($total &gt; 0)) {
    print '
Table    Inserts    Updates    Deletes
----------------------------------------
';
        print $act_counts{$_}, "\t",
        	$act_counts{$_}->{Inserts}, "\t",
        	$act_counts{$_}->{Updatess}, "\t",
        	$act_counts{$_}->{Deletes}, "\n"
        	foreach (keys %act_counts);
    }

    print $ctldbh-&gt;errstr
        unless ($total && ($total &gt; 0));
</code>
</pre>

<h4>Multijob MP Multload with VARTEXT source</h4>
<pre>
<code>
    my $ctldbh = DBI-&gt;connect('dbi:Teradata:dbc', $username, $password,
        { RaiseError =&gt; 0, PrintError =&gt; 0, tdat_lsn =&gt; 0 });

    my $total = $ctldbh-&gt;tdat_UtilitySetup(
        {
        Utility =&gt; 'MLOAD',
        Sessions =&gt; $sesscount,
        SQL =&gt; [
            'DO INSERT FOR MISSING UPDATE ROWS;
            UPDATE alltypetst SET col4 = :col4 WHERE col1 = :col1;
            INSERT INTO alltypetst VALUES(:col1, :col2, :col3, :col4, :col5, :col6,
                :col7, :col8, :col9, :col10, :col11, :col12, :col13);',

            "MARK DUPLICATE INSERT ROWS;
            INSERT INTO alltypetst2 VALUES(:col1, :col2, :col3, :col4, :col5, :col6,
                :col7, :col8, :col9, :col10, :col11, TIME '10:20:30', :col13);",

            'MARK MISSING DELETE ROWS;
            DELETE FROM alltypetst3 WHERE col1 = :col1;'
        ],
        SourceFields =&gt; 'USING (col1 varchar(9), col2 varchar(6), col3 varchar(4),
            col4 varchar(20), col5 varchar(100), col6 varchar(30), col7 varchar(4),
            col8 varchar(7), col9 varchar(10), col10 varchar(30), col11 varchar(10),
            col12 varchar(15), col13 varchar(19))',
        Checkpoint =&gt; 20000,
        LogTables =&gt; {
            alltypetst =&gt; [ wt_alltt, et_alltt, uv_alltt ],
            alltypetst2 =&gt; [ wt_alltt2, et_alltt2, uv_alltt2 ],
            alltypetst3 =&gt; [ wt_alltt3, et_alltt3, uv_alltt3 ],
            },
        Report =&gt; \&report_cb,
        Source =&gt; "VARTEXT '|' vardata.txt",
        CheckpointCallback =&gt; \&mlcheckpoint,
        ActivityCounts => \%act_counts,
        MP => 1
        });
#
#	display our activity counts
#
    if ($total && ($total &gt; 0)) {
    print '
Table    Inserts    Updates    Deletes
----------------------------------------
';
        print $act_counts{$_}, "\t",
        	$act_counts{$_}->{Inserts}, "\t",
        	$act_counts{$_}->{Updatess}, "\t",
        	$act_counts{$_}->{Deletes}, "\n"
        	foreach (keys %act_counts);
    }

    print $ctldbh-&gt;errstr
        unless ($total && ($total &gt; 0));

</code>
</pre>

<h4>Multijob MP Multload with Loopback source</h4>
<pre>
<code>
    my $ctldbh = DBI-&gt;connect('dbi:Teradata:dbc', $username, $password,
        { RaiseError =&gt; 0, PrintError =&gt; 0, tdat_lsn =&gt; 0 });

    my $fedbh = DBI-&gt;connect("dbi:Teradata:otherdbc", $userid, $passwd,
    { PrintError =&gt; 0, RaiseError =&gt; 0, AutoCommit =&gt; 1, tdat_lsn =&gt; 0 });

    my $total = $ctldbh-&gt;tdat_UtilitySetup(
        {
        Utility =&gt; 'MLOAD',
        Sessions =&gt; $sesscount,
        SQL =&gt; [
            'DO INSERT FOR MISSING UPDATE ROWS;
            UPDATE alltypetst SET col4 = :col4 WHERE col1 = :col1;
            INSERT INTO alltypetst VALUES(:col1, :col2, :col3, :col4, :col5, :col6,
                :col7, :col8, :col9, :col10, :col11, :col12, :col13);',

            "MARK DUPLICATE INSERT ROWS;
            INSERT INTO alltypetst2 VALUES(:col1, :col2, :col3, :col4, :col5, :col6,
                :col7, :col8, :col9, :col10, :col11, TIME '10:20:30', :col13);",

            'MARK MISSING DELETE ROWS;
            DELETE FROM alltypetst3 WHERE col1 = :col1;'
        ],
        SourceFields =&gt; 'USING (col1 varchar(9), col2 varchar(6), col3 varchar(4),
            col4 varchar(20), col5 varchar(100), col6 varchar(30), col7 varchar(4),
            col8 varchar(7), col9 varchar(10), col10 varchar(30), col11 varchar(10),
            col12 varchar(15), col13 varchar(19))',
        Checkpoint =&gt; 20000,
        LogTables =&gt; {
            alltypetst =&gt; [ wt_alltt, et_alltt, uv_alltt ],
            alltypetst2 =&gt; [ wt_alltt2, et_alltt2, uv_alltt2 ],
            alltypetst3 =&gt; [ wt_alltt3, et_alltt3, uv_alltt3 ],
            },
        Loopback =&gt; 'SELECT * FROM sometable',
        Source =&gt; $fedbh,
        Report =&gt; \&report_cb,
        CheckpointCallback =&gt; \&mlcheckpoint,
        MP => 1
        });

    print $ctldbh-&gt;errstr
        unless ($total && ($total &gt; 0));

</code>
</pre>
</body>
</html>