/* vim: set ft=c */ /* Copyright (C) 2011 Dmitry E. Oboukhov Copyright (C) 2011 Roman V. Nikolaev This program is free software, you can redistribute it and/or modify it under the terms of the Artistic License. */ #include "EXTERN.h" #include "perl.h" #include "XSUB.h" #include #include static struct tnt_tuple* tmake_tuple( AV *t ) { int i; struct tnt_tuple *r = tnt_mem_alloc( sizeof( struct tnt_tuple ) ); if ( !r ) croak("Can not allocate memory"); tnt_tuple_init( r ); r->alloc = 1; for (i = 0; i <= av_len( t ); i++) { STRLEN size; char *data = SvPV( *av_fetch( t, i, 0 ), size ); tnt_tuple_add( r, data, size ); } return r; } static struct tnt_stream * tmake_buf(void) { struct tnt_stream *b = tnt_buf( NULL ); if ( !b ) croak("Can not allocate memory"); return b; } static struct tnt_stream *tmake_oplist( AV *ops ) { int i; struct tnt_stream *b = tmake_buf(); for (i = 0; i <= av_len( ops ); i++) { uint8_t opcode; SV *op = *av_fetch( ops, i, 0 ); if (!SvROK(op) || SvTYPE( SvRV(op) ) != SVt_PVAV) croak("Wrong update operation format"); AV *aop = (AV *)SvRV(op); int asize = av_len( aop ) + 1; if ( asize < 2 ) croak("Too short operation argument list"); unsigned fno = SvIV( *av_fetch( aop, 0, 0 ) ); STRLEN size; char *opname = SvPV( *av_fetch( aop, 1, 0 ), size ); /* delete */ if ( strcmp(opname, "delete") == 0 ) { tnt_update_delete( b, fno ); continue; } if (asize < 3) croak("Too short operation argument list"); /* assign */ if ( strcmp(opname, "set") == 0 ) { char *data = SvPV( *av_fetch( aop, 2, 0 ), size ); tnt_update_assign( b, fno, data, size ); continue; } /* insert */ if ( strcmp(opname, "insert") == 0 ) { char *data = SvPV( *av_fetch( aop, 2, 0 ), size ); tnt_update_insert( b, fno, data, size ); continue; } /* arithmetic operations */ if ( strcmp(opname, "add") == 0 ) { opcode = TNT_UPDATE_ADD; goto ARITH; } if ( strcmp(opname, "and") == 0 ) { opcode = TNT_UPDATE_AND; goto ARITH; } if ( strcmp(opname, "or") == 0 ) { opcode = TNT_UPDATE_OR; goto ARITH; } if ( strcmp(opname, "xor") == 0 ) { opcode = TNT_UPDATE_XOR; goto ARITH; } /* substr */ if ( strcmp(opname, "substr") == 0 ) { if (asize < 4) croak("Too short argument list for substr"); unsigned offset = SvIV( *av_fetch( aop, 2, 0 ) ); unsigned length = SvIV( *av_fetch( aop, 3, 0 ) ); char * data; if ( asize > 4 && SvOK( *av_fetch( aop, 4, 0 ) ) ) { data = SvPV( *av_fetch( aop, 4, 0 ), size ); } else { data = ""; size = 0; } tnt_update_splice( b, fno, offset, length, data, size ); continue; } { /* unknown command */ char err[512]; snprintf(err, 512, "unknown update operation: `%s'", opname ); croak(err); } ARITH: { tnt_update_arith( b, fno, opcode, SvIV( *av_fetch( aop, 2, 0 ) ) ); continue; } } return b; } static void hash_ssave(HV *h, const char *k, const char *v) { hv_store( h, k, strlen(k), newSVpvn( v, strlen(v) ), 0 ); } static void hash_isave(HV *h, const char *k, uint32_t v) { hv_store( h, k, strlen(k), newSViv( v ), 0 ); } static AV * extract_tuples(struct tnt_reply *r) { struct tnt_iter it; tnt_iter_list(&it, TNT_REPLY_LIST(r)); AV *res = newAV(); while (tnt_next(&it)) { struct tnt_iter ifl; struct tnt_tuple *tu = TNT_ILIST_TUPLE(&it); tnt_iter(&ifl, tu); AV *t = newAV(); while (tnt_next(&ifl)) { char *data = TNT_IFIELD_DATA(&ifl); uint32_t size = TNT_IFIELD_SIZE(&ifl); av_push(t, newSVpvn(data, size)); } av_push(res, newRV_noinc((SV *) t)); } return res; } MODULE = DR::Tarantool PACKAGE = DR::Tarantool PROTOTYPES: ENABLE SV * _pkt_select( req_id, ns, idx, offset, limit, keys ) unsigned req_id unsigned ns unsigned idx unsigned offset unsigned limit AV * keys CODE: int i; struct tnt_list list; tnt_list_init( &list ); if ( ( list.count = av_len ( keys ) + 1 ) ) { list.list = tnt_mem_alloc( sizeof( struct tnt_list_ptr ) * list.count ); if ( !list.list ) return; for (i = 0; i < list.count; i++) { SV *t = *av_fetch( keys, i, 0 ); if (!SvROK(t) || (SvTYPE(SvRV(t)) != SVt_PVAV)) croak("keys must be ARRAYREF" " of ARRAYREF" ); list.list[i].ptr = tmake_tuple( (AV *)SvRV(t) ); } } struct tnt_stream *s = tmake_buf(); tnt_stream_reqid( s, req_id ); tnt_select( s, ns, idx, offset, limit, &list ); tnt_list_free( &list ); RETVAL = newSVpvn( TNT_SBUF_DATA(s), TNT_SBUF_SIZE(s) ); tnt_stream_free( s ); OUTPUT: RETVAL SV * _pkt_ping( req_id ) unsigned req_id CODE: struct tnt_stream *s = tmake_buf(); tnt_stream_reqid( s, req_id ); tnt_ping( s ); RETVAL = newSVpvn( TNT_SBUF_DATA(s), TNT_SBUF_SIZE(s) ); tnt_stream_free( s ); OUTPUT: RETVAL SV * _pkt_insert( req_id, ns, flags, tuple ) unsigned req_id unsigned ns unsigned flags AV * tuple CODE: struct tnt_tuple *t = tmake_tuple( tuple ); struct tnt_stream *s = tmake_buf(); tnt_stream_reqid( s, req_id ); tnt_insert( s, ns, flags, t ); tnt_tuple_free( t ); RETVAL = newSVpvn( TNT_SBUF_DATA( s ), TNT_SBUF_SIZE( s ) ); tnt_stream_free( s ); OUTPUT: RETVAL SV * _pkt_update( req_id, ns, flags, tuple, operations ) unsigned req_id unsigned ns unsigned flags AV *tuple AV *operations CODE: struct tnt_tuple *t = tmake_tuple( tuple ); struct tnt_stream *s = tmake_buf(); struct tnt_stream *ops = tmake_oplist( operations ); tnt_stream_reqid( s, req_id ); tnt_update( s, ns, flags, t, ops ); tnt_tuple_free( t ); RETVAL = newSVpvn( TNT_SBUF_DATA( s ), TNT_SBUF_SIZE( s ) ); tnt_stream_free( ops ); tnt_stream_free( s ); OUTPUT: RETVAL SV * _pkt_delete( req_id, ns, flags, tuple ) unsigned req_id unsigned ns unsigned flags AV *tuple CODE: struct tnt_tuple *t = tmake_tuple( tuple ); struct tnt_stream *s = tmake_buf(); tnt_stream_reqid( s, req_id ); tnt_delete( s, ns, flags, t ); tnt_tuple_free( t ); RETVAL = newSVpvn( TNT_SBUF_DATA( s ), TNT_SBUF_SIZE( s ) ); tnt_stream_free( s ); OUTPUT: RETVAL SV * _pkt_call_lua( req_id, flags, proc, tuple ) unsigned req_id unsigned flags char *proc AV *tuple CODE: struct tnt_tuple *t = tmake_tuple( tuple ); struct tnt_stream *s = tmake_buf(); tnt_stream_reqid( s, req_id ); tnt_call( s, flags, proc, t ); tnt_tuple_free( t ); RETVAL = newSVpvn( TNT_SBUF_DATA( s ), TNT_SBUF_SIZE( s ) ); tnt_stream_free( s ); OUTPUT: RETVAL HV * _pkt_parse_response( response ) SV *response CODE: if ( !SvOK(response) ) croak( "response is undefined" ); STRLEN size; char *data = SvPV( response, size ); struct tnt_reply reply; tnt_reply_init( &reply ); size_t offset = 0; int cnt = tnt_reply( &reply, data, size, &offset ); int i, j; HV *res = newHV(); if ( cnt < 0 ) { hash_ssave(res, "status", "fatal"); hash_ssave(res, "errstr", "Can't parse server response"); } else if ( cnt > 0 ) { hash_ssave(res, "status", "buffer"); hash_ssave(res, "errstr", "Input data too short"); } else { hash_isave(res, "code", reply.code ); hash_isave(res, "req_id", reply.reqid ); hash_isave(res, "type", reply.op ); hash_isave(res, "count", reply.count); if (reply.code) { hash_ssave( res, "errstr", reply.error ); hash_ssave(res, "status", "error"); } else { hash_ssave(res, "status", "ok"); AV *tuples = extract_tuples( &reply ); hv_store( res, "tuples", 6, newRV_noinc((SV *)tuples), 0 ); } } sv_2mortal( (SV *) res ); RETVAL = res; tnt_reply_free( &reply ); OUTPUT: RETVAL unsigned _op_insert() CODE: RETVAL = TNT_OP_INSERT; OUTPUT: RETVAL unsigned _op_select() CODE: RETVAL = TNT_OP_SELECT; OUTPUT: RETVAL unsigned _op_update() CODE: RETVAL = TNT_OP_UPDATE; OUTPUT: RETVAL unsigned _op_delete() CODE: RETVAL = TNT_OP_DELETE; OUTPUT: RETVAL unsigned _op_call() CODE: RETVAL = TNT_OP_CALL; OUTPUT: RETVAL unsigned _op_ping() CODE: RETVAL = TNT_OP_PING; OUTPUT: RETVAL unsigned _flag_return() CODE: RETVAL = TNT_FLAG_RETURN; OUTPUT: RETVAL unsigned _flag_add() CODE: RETVAL = TNT_FLAG_ADD; OUTPUT: RETVAL unsigned _flag_replace() CODE: RETVAL = TNT_FLAG_REPLACE; OUTPUT: RETVAL unsigned _flag_box_quiet() CODE: RETVAL = TNT_FLAG_BOX_QUIET; OUTPUT: RETVAL unsigned _flag_not_store() CODE: RETVAL = TNT_FLAG_NOT_STORE; OUTPUT: RETVAL