/* * Copyright 2009 10gen, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include "perl_mongo.h" #include "mongo_link.h" static int cursor_free (pTHX_ SV *sv, MAGIC *mg) { mongo_cursor *cursor; PERL_UNUSED_ARG(sv); cursor = (mongo_cursor *)mg->mg_ptr; if (cursor) { if (cursor->buf.start) { Safefree(cursor->buf.start); } Safefree(cursor); } mg->mg_ptr = NULL; return 0; } static int cursor_clone (pTHX_ MAGIC *mg, CLONE_PARAMS *params) { mongo_cursor *cursor, *new_cursor; size_t buflen; PERL_UNUSED_ARG (params); cursor = (mongo_cursor *)mg->mg_ptr; Newx(new_cursor, 1, mongo_cursor); Copy(cursor, new_cursor, 1, mongo_cursor); buflen = cursor->buf.end - cursor->buf.start; Newx(new_cursor->buf.start, buflen, char); Copy(cursor->buf.start, new_cursor->buf.start, buflen, char); new_cursor->buf.end = new_cursor->buf.start + buflen; new_cursor->buf.pos = new_cursor->buf.start + (cursor->buf.pos - cursor->buf.start); mg->mg_ptr = (char *)new_cursor; return 0; } MGVTBL cursor_vtbl = { NULL, NULL, NULL, NULL, cursor_free, #if MGf_COPY NULL, #endif #if MGf_DUP cursor_clone, #endif #if MGf_LOCAL NULL, #endif }; static mongo_cursor* get_cursor(SV *self); static int has_next(SV *self, mongo_cursor *cursor); static void kill_cursor(SV *self); static mongo_cursor* get_cursor(SV *self) { perl_mongo_call_method(self, "_do_query", G_DISCARD, 0); return (mongo_cursor*)perl_mongo_get_ptr_from_instance(self, &cursor_vtbl); } static int has_next(SV *self, mongo_cursor *cursor) { SV *link, *limit, *ns, *request_id, *response_to; mongo_msg_header header; buffer buf; int size, heard; limit = perl_mongo_call_reader (self, "_limit"); if ((SvIV(limit) > 0 && cursor->at >= SvIV(limit)) || cursor->num == 0 || (cursor->at == cursor->num && cursor->cursor_id == 0)) { SvREFCNT_dec(limit); return 0; } else if (cursor->at < cursor->num) { SvREFCNT_dec(limit); return 1; } link = perl_mongo_call_reader (self, "_client"); ns = perl_mongo_call_reader (self, "_ns"); // we have to go and check with the db size = 34+strlen(SvPV_nolen(ns)); Newx(buf.start, size, char); buf.pos = buf.start; buf.end = buf.start + size; response_to = perl_mongo_call_reader(self, "_request_id"); request_id = get_sv("MongoDB::Cursor::_request_id", GV_ADD); CREATE_RESPONSE_HEADER(buf, SvPV_nolen(ns), SvIV(response_to), OP_GET_MORE); // change this cursor's request id so we can match the response perl_mongo_call_method(self, "_request_id", G_DISCARD, 1, request_id); SvREFCNT_dec(response_to); perl_mongo_serialize_int(&buf, SvIV(limit)); perl_mongo_serialize_long(&buf, cursor->cursor_id); perl_mongo_serialize_size(buf.start, &buf); SvREFCNT_dec(limit); SvREFCNT_dec(ns); // fails if we're out of elems if(mongo_link_say(link, &buf) == -1) { SvREFCNT_dec(link); Safefree(buf.start); die("can't get db response, not connected"); return 0; } Safefree(buf.start); // if we have cursor->at == cursor->num && recv fails, // we're probably just out of results // mongo_link_hear returns 1 on success, 0 on failure heard = mongo_link_hear(self); SvREFCNT_dec(link); return heard > 0; } static void kill_cursor(SV *self) { mongo_cursor *cursor = (mongo_cursor*)perl_mongo_get_ptr_from_instance(self, &cursor_vtbl); SV *link = perl_mongo_call_reader (self, "_client"); SV *request_id_sv = perl_mongo_call_reader (self, "_request_id"); char quickbuf[128]; buffer buf; mongo_msg_header header; // we allocate a cursor even if no results are returned, but the database will // throw an assertion if we try to kill a non-existant cursor non-cursors have // ids of 0 if (cursor->cursor_id == 0) { SvREFCNT_dec(link); SvREFCNT_dec(request_id_sv); return; } buf.pos = quickbuf; buf.start = buf.pos; buf.end = buf.start + 128; // std header CREATE_MSG_HEADER(SvIV(request_id_sv), 0, OP_KILL_CURSORS); SvREFCNT_dec(request_id_sv); APPEND_HEADER(buf, 0); // # of cursors perl_mongo_serialize_int(&buf, 1); // cursor ids perl_mongo_serialize_long(&buf, cursor->cursor_id); perl_mongo_serialize_size(buf.start, &buf); mongo_link_say(link, &buf); SvREFCNT_dec(link); } MODULE = MongoDB::Cursor PACKAGE = MongoDB::Cursor PROTOTYPES: DISABLE void _init (self) SV *self PREINIT: mongo_cursor *cursor; CODE: Newxz(cursor, 1, mongo_cursor); // attach a mongo_cursor* to the MongoDB::Cursor perl_mongo_attach_ptr_to_instance(self, cursor, &cursor_vtbl); bool has_next (self) SV *self PREINIT: mongo_cursor *cursor; CODE: cursor = get_cursor(self); RETVAL = has_next(self, cursor); OUTPUT: RETVAL SV * next (self) SV *self PREINIT: mongo_cursor *cursor; SV *dt_type_sv; CODE: cursor = get_cursor(self); if (has_next(self, cursor)) { dt_type_sv = perl_mongo_call_reader( self, "_dt_type" ); if ( SvOK( dt_type_sv ) ) { char *dt_type = SvPV( dt_type_sv, SvLEN( dt_type_sv ) ); RETVAL = perl_mongo_bson_to_sv(&cursor->buf, dt_type); } else { // dt type is undef RETVAL = perl_mongo_bson_to_sv(&cursor->buf, NULL); } cursor->at++; if (cursor->num == 1 && hv_exists((HV*)SvRV(RETVAL), "$err", strlen("$err"))) { SV **err = 0, **code = 0; err = hv_fetch((HV*)SvRV(RETVAL), "$err", strlen("$err"), 0); code = hv_fetch((HV*)SvRV(RETVAL), "code", strlen("code"), 0); if (code && SvIOK(*code) && (SvIV(*code) == 10107 || SvIV(*code) == 13435 || SvIV(*code) == 13436)) { SV *conn = perl_mongo_call_method (self, "_client", 0, 0); set_disconnected(conn); } croak("query error: %s", SvPV_nolen(*err)); } } else { RETVAL = newSV(0); } OUTPUT: RETVAL SV * reset (self) SV *self PREINIT: mongo_cursor *cursor; CODE: cursor = (mongo_cursor*)perl_mongo_get_ptr_from_instance(self, &cursor_vtbl); cursor->buf.pos = cursor->buf.start; cursor->at = 0; cursor->num = 0; perl_mongo_call_method (self, "started_iterating", G_DISCARD, 1, &PL_sv_no); RETVAL = SvREFCNT_inc(self); OUTPUT: RETVAL SV * info (self) SV *self PREINIT: mongo_cursor *cursor; HV *hv; CODE: cursor = (mongo_cursor*)perl_mongo_get_ptr_from_instance(self, &cursor_vtbl); hv = newHV(); hv_store(hv, "flag", strlen("flag"), newSViv(cursor->flag), 0); hv_store(hv, "cursor_id", strlen("cursor_id"), newSViv(cursor->cursor_id), 0); hv_store(hv, "start", strlen("start"), newSViv(cursor->start), 0); hv_store(hv, "at", strlen("at"), newSViv(cursor->at), 0); hv_store(hv, "num", strlen("num"), newSViv(cursor->num), 0); RETVAL = newRV_noinc((SV*)hv); OUTPUT: RETVAL void DESTROY (self) SV *self PREINIT: mongo_link *link; SV *link_sv; CODE: link_sv = perl_mongo_call_reader(self, "_client"); link = (mongo_link*)perl_mongo_get_ptr_from_instance(link_sv, &connection_vtbl); // check if cursor is connected if (link->master && link->master->connected) { kill_cursor(self); } SvREFCNT_dec(link_sv);