package KinoSearch::Store::OutStream; use strict; use warnings; use KinoSearch::Util::ToolSet; use base qw( KinoSearch::Util::CClass ); sub close { my $self = shift; $self->flush; CORE::close $self->get_fh; } 1; __END__ __XS__ MODULE = KinoSearch PACKAGE = KinoSearch::Store::OutStream =for comment Constructor - takes one arg: a filehandle. =cut OutStream* new(class, fh_sv) char *class; SV *fh_sv; CODE: RETVAL = Kino_OutStream_new(class, fh_sv); OUTPUT: RETVAL void seek(outstream, target) OutStream *outstream; double target; PPCODE: outstream->seek(outstream, target); double tell(outstream) OutStream *outstream; CODE: RETVAL = outstream->tell(outstream); OUTPUT: RETVAL double length(outstream) OutStream *outstream; CODE: RETVAL = Kino_OutStream_length(outstream); OUTPUT: RETVAL void flush(outstream); OutStream *outstream; PPCODE: Kino_OutStream_flush(outstream); =for comment Write the entire contents of an instream to an outstream. =cut void absorb(outstream, instream) OutStream *outstream; InStream *instream; PPCODE: Kino_OutStream_absorb(outstream, instream); SV* _set_or_get(outstream, ...) OutStream *outstream; ALIAS: set_fh = 1 get_fh = 2 CODE: { KINO_START_SET_OR_GET_SWITCH case 1: Kino_confess("Can't set_fh"); /* fall through */ case 2: RETVAL = newSVsv(outstream->fh_sv); break; KINO_END_SET_OR_GET_SWITCH } OUTPUT: RETVAL =begin comment $outstream->lu_write( TEMPLATE, LIST ); Write the items in LIST to the OutStream using the serialization schemes specified by TEMPLATE. =end comment =cut void lu_write (outstream, template_sv, ...) OutStream *outstream; SV *template_sv; PREINIT: STRLEN tpt_len; /* bytelength of template */ char *template; /* ptr to a spot in the template */ char *tpt_end; /* ptr to the end of the template */ int repeat_count; /* number of times to repeat sym */ int item_count; /* current place in @_ */ char sym; /* the current symbol in the template */ char countsym; /* used when calculating repeat counts */ I32 aI32; U32 aU32; double aDouble; SV *aSV; char *string; STRLEN string_len; PPCODE: { /* require an object, a template, and at least 1 item */ if (items < 2) { Kino_confess("lu_write error: too few arguments"); } /* prepare the template and get pointers */ template = SvPV(template_sv, tpt_len); tpt_end = template + tpt_len; /* reject an empty template */ if (tpt_len == 0) { Kino_confess("lu_write error: TEMPLATE cannot be empty string"); } /* init counters */ repeat_count = 0; item_count = 2; while (1) { /* only process template if we're not in the midst of a repeat */ if (repeat_count == 0) { /* fast-forward past space characters */ while (*template == ' ' && template < tpt_end) { template++; } /* if we're done, return or throw error */ if (template == tpt_end || item_count == items) { if (item_count != items) { Kino_confess( "lu_write error: Too many ITEMS, not enough TEMPLATE"); } else if (template != tpt_end) { Kino_confess( "lu_write error: Too much TEMPLATE, not enough ITEMS"); } else { /* success! */ break; } } /* derive the current symbol and a possible digit repeat sym */ sym = *template++; countsym = *template; if (template == tpt_end) { /* sym is last char in template */ repeat_count = 1; } else if (countsym >= '0' && countsym <= '9') { /* calculate numerical repeat count */ repeat_count = countsym - KINO_NUM_CHAR_OFFSET; countsym = *(++template); while ( template <= tpt_end && countsym >= '0' && countsym <= '9' ) { repeat_count = (repeat_count * 10) + (countsym - KINO_NUM_CHAR_OFFSET); countsym = *(++template); } } else { /* no numeric repeat count, so process sym only once */ repeat_count = 1; } } switch(sym) { case 'a': /* arbitrary binary data */ aSV = ST(item_count); if (!SvOK(aSV)) { Kino_confess("Internal error: undef at lu_write 'a'"); } string = SvPV(aSV, string_len); if (repeat_count != string_len) { Kino_confess( "lu_write error: repeat_count != string_len: %d %d", repeat_count, string_len); } Kino_OutStream_write_bytes(outstream, string, string_len); /* trigger next sym */ repeat_count = 1; break; case 'b': /* signed byte */ case 'B': /* unsigned byte */ aI32 = SvIV( ST(item_count) ); Kino_OutStream_write_byte(outstream, (char)(aI32 & 0xff)); break; case 'i': /* signed 32-bit integer */ aI32 = SvIV( ST(item_count) ); Kino_OutStream_write_int(outstream, (U32)aI32); break; case 'I': /* unsigned 32-bit integer */ aU32 = SvUV( ST(item_count) ); Kino_OutStream_write_int(outstream, aU32); break; case 'Q': /* unsigned "64-bit" integer */ aDouble = SvNV( ST(item_count) ); Kino_OutStream_write_long(outstream, aDouble); break; case 'V': /* VInt */ aU32 = SvUV( ST(item_count) ); Kino_OutStream_write_vint(outstream, aU32); break; case 'W': /* VLong */ aDouble = SvNV( ST(item_count) ); Kino_OutStream_write_vlong(outstream, aDouble); break; case 'T': /* string */ aSV = ST(item_count); string = SvPV(aSV, string_len); Kino_OutStream_write_string(outstream, string, string_len); break; default: Kino_confess("Illegal character in template: %c", sym); } /* use up one repeat_count and one item from the stack */ repeat_count--; item_count++; } } void DESTROY(outstream) OutStream *outstream; PPCODE: Kino_OutStream_destroy(outstream); __H__ #ifndef H_KINOIO #define H_KINOIO 1 #include "EXTERN.h" #include "perl.h" #include "XSUB.h" #include "KinoSearchStoreInStream.h" #include "KinoSearchUtilCarp.h" #include "KinoSearchUtilMathUtils.h" typedef struct outstream { PerlIO *fh; SV *fh_sv; char *buf; Off_t buf_start; int buf_pos; void (*seek) (struct outstream*, double); double (*tell) (struct outstream*); void (*write_byte) (struct outstream*, char); void (*write_bytes) (struct outstream*, char*, STRLEN); void (*write_int) (struct outstream*, U32); void (*write_long) (struct outstream*, double); void (*write_vint) (struct outstream*, U32); void (*write_vlong) (struct outstream*, double); void (*write_string)(struct outstream*, char*, STRLEN); } OutStream; OutStream* Kino_OutStream_new (char*, SV*); void Kino_OutStream_seek (OutStream*, double); double Kino_OutStream_tell (OutStream*); double Kino_OutStream_length (OutStream*); void Kino_OutStream_flush (OutStream*); void Kino_OutStream_absorb (OutStream*, InStream*); void Kino_OutStream_write_byte (OutStream*, char); void Kino_OutStream_write_bytes (OutStream*, char*, STRLEN); void Kino_OutStream_write_int (OutStream*, U32); void Kino_OutStream_write_long (OutStream*, double); void Kino_OutStream_write_vint (OutStream*, U32); int Kino_OutStream_encode_vint (U32, char*); void Kino_OutStream_write_vlong (OutStream*, double); void Kino_OutStream_write_string (OutStream*, char*, STRLEN); void Kino_OutStream_destroy (OutStream*); #endif /* include guard */ __C__ #include "KinoSearchStoreOutStream.h" OutStream* Kino_OutStream_new(char* class, SV* fh_sv) { OutStream *outstream; /* allocate */ Kino_New(0, outstream, 1, OutStream); /* assign */ outstream->fh_sv = newSVsv(fh_sv); outstream->fh = IoOFP( sv_2io(fh_sv) ); /* init buffer */ Kino_New(0, outstream->buf, KINO_IO_STREAM_BUF_SIZE, char); outstream->buf_start = 0; outstream->buf_pos = 0; /* assign methods */ outstream->seek = Kino_OutStream_seek; outstream->tell = Kino_OutStream_tell; outstream->write_byte = Kino_OutStream_write_byte; outstream->write_bytes = Kino_OutStream_write_bytes; outstream->write_int = Kino_OutStream_write_int; outstream->write_long = Kino_OutStream_write_long; outstream->write_vint = Kino_OutStream_write_vint; outstream->write_vlong = Kino_OutStream_write_vlong; outstream->write_string = Kino_OutStream_write_string; return outstream; } void Kino_OutStream_seek(OutStream *outstream, double target) { Kino_OutStream_flush(outstream); outstream->buf_start = target; PerlIO_seek(outstream->fh, target, 0); } double Kino_OutStream_tell(OutStream *outstream) { return outstream->buf_start + outstream->buf_pos; } double Kino_OutStream_length(OutStream *outstream) { double len; /* flush, go to end, note length, return to bookmark */ Kino_OutStream_flush(outstream); PerlIO_seek(outstream->fh, 0, 2); len = PerlIO_tell(outstream->fh); PerlIO_seek(outstream->fh, outstream->buf_start, 0); return len; } void Kino_OutStream_flush(OutStream *outstream) { PerlIO_write(outstream->fh, outstream->buf, outstream->buf_pos); outstream->buf_start += outstream->buf_pos; outstream->buf_pos = 0; } void Kino_OutStream_absorb(OutStream *outstream, InStream *instream) { double bytes_left, bytes_this_iter; char *buf; int check_val; /* flush, then "borrow" the buffer */ Kino_OutStream_flush(outstream); buf = outstream->buf; bytes_left = instream->len; while (bytes_left > 0) { bytes_this_iter = bytes_left < KINO_IO_STREAM_BUF_SIZE ? bytes_left : KINO_IO_STREAM_BUF_SIZE; instream->read_bytes(instream, buf, bytes_this_iter); check_val = PerlIO_write(outstream->fh, buf, bytes_this_iter); if (check_val != bytes_this_iter) { Kino_confess("outstream->absorb error: %"UVuf", %d", (UV)bytes_this_iter, check_val); } bytes_left -= bytes_this_iter; outstream->buf_start += bytes_this_iter; } } void Kino_OutStream_write_byte(OutStream *outstream, char aChar) { if (outstream->buf_pos >= KINO_IO_STREAM_BUF_SIZE) Kino_OutStream_flush(outstream); outstream->buf[ outstream->buf_pos++ ] = aChar; } void Kino_OutStream_write_bytes(OutStream *outstream, char *bytes, STRLEN len) { /* if this data is larger than the buffer size, flush and write */ if (len >= KINO_IO_STREAM_BUF_SIZE) { int check_val; Kino_OutStream_flush(outstream); check_val = PerlIO_write(outstream->fh, bytes, len); if (check_val != len) { Kino_confess("Write error: tried to write %"UVuf", got %d", (UV)len, check_val); } outstream->buf_start += len; } /* if there's not enough room in the buffer, flush then add */ else if (outstream->buf_pos + len >= KINO_IO_STREAM_BUF_SIZE) { Kino_OutStream_flush(outstream); Copy(bytes, (outstream->buf + outstream->buf_pos), len, char); outstream->buf_pos += len; } /* if there's room, just add these bytes to the buffer */ else { Copy(bytes, (outstream->buf + outstream->buf_pos), len, char); outstream->buf_pos += len; } } void Kino_OutStream_write_int(OutStream *outstream, U32 aU32) { unsigned char buf[4]; Kino_encode_bigend_U32(aU32, buf); outstream->write_bytes(outstream, (char*)buf, 4); } void Kino_OutStream_write_long(OutStream *outstream, double aDouble) { unsigned char buf[8]; U32 aU32; /* derive the upper 4 bytes by truncating a quotient */ aU32 = floor( ldexp( aDouble, -32 ) ); Kino_encode_bigend_U32(aU32, buf); /* derive the lower 4 bytes by taking a modulus against 2**32 */ aU32 = fmod(aDouble, (pow(2.0, 32.0))); Kino_encode_bigend_U32(aU32, &buf[4]); /* print encoded Long to the output handle */ outstream->write_bytes(outstream, (char*)buf, 8); } void Kino_OutStream_write_vint(OutStream *outstream, U32 aU32) { char buf[5]; int num_bytes; num_bytes = Kino_OutStream_encode_vint(aU32, buf); outstream->write_bytes(outstream, buf, num_bytes); } /* Encode a VInt. buf must have room for at 5 bytes. */ int Kino_OutStream_encode_vint(U32 aU32, char *buf) { int num_bytes = 0; while ((aU32 & ~0x7f) != 0) { buf[num_bytes++] = ( (aU32 & 0x7f) | 0x80 ); aU32 >>= 7; } buf[num_bytes++] = aU32 & 0x7f; return num_bytes; } void Kino_OutStream_write_vlong(OutStream *outstream, double aDouble) { unsigned char buf[10]; int num_bytes = 0; U32 aU32; while (aDouble > 127.0) { /* take modulus of num against 128 */ aU32 = fmod(aDouble, 128); buf[num_bytes++] = ( (aU32 & 0x7f) | 0x80 ); /* right shift for floating point! */ aDouble = floor( ldexp( aDouble, -7 ) ); } buf[num_bytes++] = aDouble; outstream->write_bytes(outstream, (char*)buf, num_bytes); } void Kino_OutStream_write_string(OutStream *outstream, char *string, STRLEN len) { Kino_OutStream_write_vint(outstream, (U32)len); Kino_OutStream_write_bytes(outstream, string, len); } void Kino_OutStream_destroy(OutStream *outstream) { Kino_OutStream_flush(outstream); SvREFCNT_dec(outstream->fh_sv); Kino_Safefree(outstream->buf); Kino_Safefree(outstream); } __POD__ =begin devdocs =head1 NAME KinoSearch::Store::OutStream - filehandles for writing invindexes =head1 SYNOPSIS # isa blessed filehandle my $outstream = $invindex->open_outstream( $filename ); $outstream->lu_write( 'V8', @eight_vints ); =head1 DESCRIPTION The OutStream class abstracts all of KinoSearch's output operations. It is akin to a narrowly-implemented, specialized IO::File. Unlike its counterpart InStream, OutStream cannot be assigned an arbitrary C or C. =head2 lu_write / lu_read template lu_write and it's opposite number, InStream's lu_read, provide a pack/unpack-style interface for handling primitive data types required by the Lucene index file format. The most notable of these specialized data types is the VInt, or Variable Integer, which is similar to the BER compressed integer (pack template 'w'). All fixed-width integer formats are stored in big-endian order (high-byte first). Signed integers use twos-complement encoding. The maximum allowable value both Long and VLong is 2**52 because it is stored inside the NV (double) storage pocket of a perl Scalar, which has a 53-bit mantissa. a Arbitrary binary data, copied to/from the scalar's PV (string) b 8-bit integer, signed B 8-bit integer, unsigned i 32-bit integer, signed I 32-bit integer, unsigned Q 64-bit integer, unsigned (max value 2**52) V VInt variable-width integer, unsigned (max value 2**32) W VLong variable-width integer, unsigned (max value 2**52) T Lucene string, which is a VInt indicating the length in bytes followed by the string. The string must be valid UTF-8. Numeric repeat counts are supported: $outstream->lu_write( 'V2 T', 0, 1, "a string" ); Other features of pack/unpack such as parentheses, infinite repeats via '*', and slash notation are not. A numeric repeat count following 'a' indicates how many bytes to read, while a count following any other symbol indicates how many scalars of that type to return. ( $three_byte_string, @eight_vints ) = $instream->lu_read('a3V8'); The behavior of lu_read and lu_write is much more strict with regards to a mismatch between TEMPLATE and LIST than pack/unpack, which are fairly forgiving in what they will accept. lu_read will confess() if it cannot read all the items specified by TEMPLATE from the InStream, and lu_write will confess() if the number of items in LIST does not match the expression in TEMPLATE. =head1 COPYRIGHT Copyright 2005-2009 Marvin Humphrey =head1 LICENSE, DISCLAIMER, BUGS, etc. See L version 0.165. =end devdocs =cut