// -*-C++-*- mode #include "osp-preamble.h" #include "osperl.h" #include "Ring.h" #undef MAX #define MAX(x,y) (x>y? x : y) #undef MIN #define MIN(x,y) (xprev = prev; if (prev) prev->next = next; } void osp_ring_page::uncache_keys() { keys.set_undef(); } void osp_ring_page::cache_keys(osp_pathexam &exam, OSSVPV *pv) { exam.load_keypack1(pv, keys); } void osp_ring_page::verify_keys(osp_pathexam &exam) { if (!keys.cnt) return; osp_keypack1 kp; exam.load_keypack1(*first, kp); if (!(kp == keys)) croak("Key cache out of sync"); } int osp_ring_page::qck_cmp(osp_pathexam &exam, int update_ok) { if (update_ok && !keys.cnt) exam.load_keypack1(*first, keys); return exam.compare(keys, 1); } void osp_ring_page::_move(OSPVptr *start, OSPVptr *end, OSPVptr *to) { assert(start <= end); if (start < to) { to += end - start; while (1) { to->steal(*end); if (start == end) break; --end; --to; } /* for (int xx=end-start; xx >= 0; xx--) { ar[to+xx].steal(ar[start+xx]); } */ } else { while (1) { to->steal(*start); if (start == end) break; ++start; ++to; } /* for (int xx=0; xx <= end-start; xx++) { ar[to+xx].steal(ar[start+xx]); } */ } } void osp_ring_page::reset_first() { if (!first) { first = array(); return; } if (first == array()) return; OSPVptr *top = array(); _move(first, first+fill-1, top); first = top; } OSSVPV *osp_ring_page::shift() { assert(fill > 0); if (!first) first = array(); OSSVPV *pv = first->resolve(); ++first; --fill; return pv; } OSSVPV *osp_ring_page::pop() { assert(fill > 0); --fill; if (!first) first = array(); OSSVPV *pv = (first + fill)->resolve(); return pv; } void osp_ring_page::extend(int to) { reset_first(); assert(to <= get_max()); assert(to > fill); fill = to; } void osp_ring_page::push(SV **base, int items) { reset_first(); assert(fill + items <= get_max()); for (int xx=0; xx < items; xx++) { *(first + fill) = osp_thr::sv_2bridge(base[xx], 0, os_segment::of(this))->ospv(); ++fill; } } void osp_ring_page::push(OSSVPV *pv) { reset_first(); assert(fill + 1 <= get_max()); *(first+fill) = pv; ++fill; } void osp_ring_page::prepare_insert(int items) { uncache_keys(); OSPVptr *top = array(); if (!first) first = top; assert(fill + items <= get_max()); if (first - top < items) { if (fill) _move(first, first+fill-1, top + items); first = top + items; } } void osp_ring_page::unshift(SV **base, int items) { prepare_insert(items); first -= items; for (int xx=0; xx < items; xx++) { *(first+xx) = osp_thr::sv_2bridge(base[xx], 0, os_segment::of(this))->ospv(); } fill += items; } void osp_ring_page::unshift(OSSVPV *pv) { prepare_insert(1); --first; *first = pv; ++fill; } void osp_ring_page::fwd_xfer(int at) { int move = fill - at; assert(move > 0 && next->avail() >= move); next->prepare_insert(move); next->first -= move; OSPVptr *dst = next->first; OSPVptr *src = first + at; for (int xx=0; xx < move; xx++) { dst->steal(*src); ++dst; ++src; } fill -= move; next->fill += move; // backfill at the same time XXX } void osp_ring_page::insert_after(int at, OSSVPV *pv) { reset_first(); if (at < fill - 1) { _move(first + at + 1, first + fill - 1, first + at + 2); } *(first + at + 1) = pv; ++fill; } void osp_ring_page::splice(int offset, int length, SV **base, int count) { reset_first(); if (length != count && offset + length < fill) _move(first + offset + length, first + fill-1, first + offset + count); for (int xx=offset; xx < offset+count; xx++) { *(first + xx) = osp_thr::sv_2bridge(base[xx-offset], 0, os_segment::of(this))->ospv(); } fill += count - length; } // ------------------------------------------------- int osp_ring_page1::get_max() { return OSP_RING_PAGE1_MAX; } int osp_ring_page1::avail() { return OSP_RING_PAGE1_MAX - fill; } OSPVptr *osp_ring_page1::array() { return ar; } OSPVptr &osp_ring_page1::at(int xx) { if (xx < 0 || xx >= fill) croak("Attempt to access unallocated page slot"); return *(first+xx); } // ------------------------------------------------- OSPV_ring_index1::OSPV_ring_index1() { version = 0; max = fill = 0; first = last = 0; } void OSPV_ring_index1::fix_stats() { if (!RG_STALE(this)) return; if (first) fill += first->fill - first->old_fill; if (first != last) fill += last->fill - last->old_fill; RG_STALE_off(this); } U32 OSPV_ring_index1::read_fill() { if (!RG_STALE(this)) return fill; else { U32 zfill = fill; if (first) zfill += first->fill - first->old_fill; if (first != last) zfill += last->fill - last->old_fill; return zfill; } } void OSPV_ring_index1::stale_stats() { // playing tricks to avoid writes if (RG_STALE(this)) return; first->old_fill = first->fill; last->old_fill = last->fill; RG_STALE_on(this); } osp_ring_page *OSPV_ring_index1::new_page(osp_ring_page *ref, int after) { osp_ring_page *npg; fix_stats(); NEW_OS_OBJECT(npg, os_segment::of(this), osp_ring_page1::get_os_typespec(), osp_ring_page1()); max += npg->get_max(); if (!ref) { assert(fill == 0); assert(!first); first = last = npg; return npg; } else { assert(ref); if (after) { npg->prev = ref; npg->next = ref->next; } else { npg->next = ref; npg->prev = ref->prev; } if (npg->next) npg->next->prev = npg; else last = npg; if (npg->prev) npg->prev->next = npg; else first = npg; return npg; } } osp_ring_page *OSPV_ring_index1::get_page(U32 loc, I16 *offset) { U32 zfill = read_fill(); U32 pgtop; osp_ring_page *pp; assert(loc < zfill); if (loc < zfill/2) { pp = first; pgtop = 0; while (loc >= pgtop + pp->fill) { pgtop += pp->fill; pp = pp->next; assert(pp); } } else { pp = last; pgtop = zfill - pp->fill; while (loc < pgtop) { pp = pp->prev; assert(pp); pgtop -= pp->fill; } } *offset = loc - pgtop; assert(*offset >= 0 && *offset < pp->fill); return pp; } OSPV_ring_index1::~OSPV_ring_index1() { CLEAR(); } int OSPV_ring_index1::get_perl_type() { return SVt_PVAV; } char *OSPV_ring_index1::os_class(STRLEN *len) { *len = 15; return "ObjStore::Index"; } char *OSPV_ring_index1::rep_class(STRLEN *len) { *len = 26; return "ObjStore::REP::Ring::Index"; } int OSPV_ring_index1::FETCHSIZE() { return read_fill(); } void OSPV_ring_index1::CLEAR() { fix_stats(); ++version; while (first) { first->fill = 0; free_page(first); } fill = 0; assert(!max); } void OSPV_ring_index1::_debug1(void *) { dOSP; osp_pathexam *exam = &osp->exam; exam->init(); exam->load_path(((OSSVPV*)conf)->avx(1)->safe_rv()); osp_ring_page *pp = first; while (pp) { pp->verify_keys(*exam); pp = pp->next; } } void OSPV_ring_index1::free_page(osp_ring_page *pp) { fix_stats(); ++version; assert(pp->fill == 0); if (first == pp) first = pp->next; if (last == pp) last = pp->prev; max -= pp->get_max(); delete pp; } void OSPV_ring_index1::Extend(U32 to) { osp_ring_page *pp; fix_stats(); if (to < fill) return; pp = last? last : new_page(0,0); U32 more = to - fill; if (more > pp->avail()) more = pp->avail(); pp->extend(pp->fill + more); fill += more; while (fill < to) { pp = new_page(pp, 1); more = to - fill; if (more > pp->avail()) more = pp->avail(); pp->extend(pp->fill + more); fill += more; } } void OSPV_ring_index1::FETCH(SV *key) { I16 off; osp_ring_page *pp; U32 to = osp_thr::sv_2aelem(key); if (to < 0 || to >= read_fill()) return; pp = get_page(to, &off); OSSVPV *pv = pp->at(off).resolve(); SV *sv = osp_thr::ospv_2sv(pv); dSP; XPUSHs(sv); PUTBACK; } void OSPV_ring_index1::STORE(SV *key, SV *nval) { U32 to = osp_thr::sv_2aelem(key); I16 off; Extend(to+1); osp_ring_page *pp = get_page(to, &off); if (off == 0) pp->uncache_keys(); pp->at(off) = osp_thr::sv_2bridge(nval, 0, os_segment::of(this))->ospv(); dTHR; if (GIMME_V == G_VOID) return; djSP; XPUSHs(sv_mortalcopy(nval)); PUTBACK; } void OSPV_ring_index1::POP() { osp_ring_page *pp = last; if (!pp) return; stale_stats(); SV *ret = osp_thr::ospv_2sv(pp->pop()); if (!pp->fill) free_page(pp); djSP; XPUSHs(ret); PUTBACK; } void OSPV_ring_index1::SHIFT() { osp_ring_page *pp = first; if (!pp) return; stale_stats(); pp->uncache_keys(); SV *ret = osp_thr::ospv_2sv(pp->shift()); if (!pp->fill) free_page(pp); djSP; XPUSHs(ret); PUTBACK; } void OSPV_ring_index1::PUSH(SV **base, int items) { while (items) { osp_ring_page *pp = last; if (!pp || !pp->avail()) pp = new_page(last, 1); stale_stats(); int chunk = MIN(pp->avail(), items); pp->push(base, chunk); base += chunk; items -= chunk; } } void OSPV_ring_index1::UNSHIFT(SV **base, int items) { while (items) { osp_ring_page *pp = first; if (!pp || !pp->avail()) pp = new_page(first, 0); else pp->uncache_keys(); stale_stats(); int chunk = MIN(pp->avail(), items); pp->unshift(base, chunk); base += chunk; items -= chunk; } } void OSPV_ring_index1::split(osp_ring_page *pp, int at) { if (!pp->next || pp->next->avail() <= pp->fill - at) new_page(pp, 1); pp->uncache_keys(); pp->fwd_xfer(at); } void OSPV_ring_index1::SPLICE(int offset, int length, SV **top, int count) { fix_stats(); Extend(offset + length); osp_ring_page *pp = 0; I16 at; while (length) { if (!pp) { pp = get_page(offset, &at); } else { pp = pp->next; at = 0; pp->uncache_keys(); } int chunk = pp->get_max() - at; int clen = MIN(length, chunk); int ccnt = MIN(count, chunk); pp->splice(at, clen, top, ccnt); length = MAX(0, length - clen); count = MAX(0, count - ccnt); top += ccnt; } if (count) { if (!pp) pp = get_page(offset, &at); split(pp, at); fill += count; while (count) { if (!pp->avail()) pp = new_page(pp, 1); int chunk = MIN(pp->avail(), count); pp->push(top, chunk); top += chunk; count -= chunk; } } } OSSVPV *OSPV_ring_index1::new_cursor(os_segment *seg) { return new(seg, OSPV_ring_index1_cs::get_os_typespec()) OSPV_ring_index1_cs(this); } double OSPV_ring_index1::_percent_filled() { return fill/(double)max; } //----------------------------------------------------------- void OSPV_ring_index1_cs::CHK_VER() { // incr(version) only happens when pages are freed! OSPV_ring_index1 *rp = (OSPV_ring_index1 *) myfocus.resolve(); STRLEN _len; if (version != rp->version) croak("Version mismatch on 0x%x=%s", rp, rp->os_class(&_len)); } OSPV_ring_index1_cs::OSPV_ring_index1_cs(OSPV_ring_index1 *_fo) { myfocus = _fo; version = _fo->version; page=0; } OSSVPV *OSPV_ring_index1_cs::focus() { return myfocus; } I32 OSPV_ring_index1_cs::pos() { if (!RG_POSITION(this)) { if (!RG_ATEND(this)) { return -1; } else { OSPV_ring_index1 *rp = (OSPV_ring_index1 *) myfocus.resolve(); return rp->read_fill(); } } else { CHK_VER(); return abpos; } } void OSPV_ring_index1_cs::at() { if (!RG_POSITION(this)) return; CHK_VER(); SV *ret = osp_thr::ospv_2sv(page->at(reloff)); dSP; XPUSHs(ret); PUTBACK; } void OSPV_ring_index1_cs::store(SV *nval) { if (!RG_POSITION(this)) croak("Can't store to unpositioned cursor"); CHK_VER(); if (reloff == 0) page->uncache_keys(); page->at(reloff) = osp_thr::sv_2bridge(nval, 0, os_segment::of(this))->ospv(); } void OSPV_ring_index1_cs::keys() { if (!RG_POSITION(this)) return; CHK_VER(); OSPV_ring_index1 *rp = (OSPV_ring_index1 *) myfocus.resolve(); OSSVPV *conf = rp->conf; if (!conf) croak("keys() on unconfigured index"); dOSP; osp_pathexam *exam = &osp->exam; exam->init(); exam->load_path(conf->avx(1)->safe_rv()); exam->load_target('x', page->at(reloff)); exam->push_keys(); } void OSPV_ring_index1_cs::moveto(I32 xto) //unsigned? XXX { OSPV_ring_index1 *rp = (OSPV_ring_index1 *) myfocus.resolve(); version = rp->version; if (xto < 0 || xto >= rp->read_fill()) { RG_POSITION_off(this); if (xto < 0) RG_ATEND_off(this); else RG_ATEND_on(this); return; } RG_POSITION_on(this); abpos = xto; page = rp->get_page(xto, &reloff); } void OSPV_ring_index1_cs::step(I32 delta) { OSPV_ring_index1 *rp = (OSPV_ring_index1 *) myfocus.resolve(); CHK_VER(); abpos += delta; if (delta > 0) { while (delta) { //assert(delta > 0); if (!RG_POSITION(this)) { if (RG_ATEND(this)) return; page = rp->first; if (!page) { RG_ATEND_on(this); return; } reloff = 0; abpos = 0; RG_POSITION_on(this); --delta; } else { int chunk = MIN(delta, page->fill-1 - reloff); if (chunk) { reloff += chunk; delta -= chunk; } else { page = page->next; if (!page) { RG_POSITION_off(this); RG_ATEND_on(this); return; } reloff = 0; --delta; } } } } else /* delta < 0 */ { while (delta) { //assert(delta < 0); if (!RG_POSITION(this)) { if (!RG_ATEND(this)) return; page = rp->last; if (!page) { RG_ATEND_off(this); return; } abpos = rp->read_fill() - 1; reloff = page->fill - 1; RG_POSITION_on(this); ++delta; } else { int chunk = MIN(-delta, reloff); if (chunk) { reloff -= chunk; delta += chunk; } else { page = page->prev; if (!page) { RG_POSITION_off(this); RG_ATEND_off(this); return; } reloff = page->fill - 1; ++delta; } } } } } // MESSY stuff ------------------------------------- int OSPV_ring_index1::add(OSSVPV *pv) { if (!conf) croak("Index not configured"); dOSP; osp_pathexam *exam = &osp->exam; exam->init(); exam->load_path(((OSSVPV*)conf)->avx(1)->safe_rv()); if (!exam->load_target('x', pv)) return 0; osp_ring_page *pp = last; if (!pp || exam->compare(pp->at(pp->fill - 1), 0) >= 0) { // can insert as last record if (!pp || pp->avail() <= 1) pp = new_page(last, 1); stale_stats(); if (!pp->fill) pp->cache_keys(*exam, pv); pp->push(pv); return 1; } fix_stats(); ++fill; while (pp && pp->qck_cmp(*exam, 1) < 0) pp = pp->prev; if (!pp) { pp = first; if (!pp || !pp->avail()) pp = new_page(first, 0); //warn("4"); pp->unshift(pv); return 1; } for (int xx = pp->fill - 1; xx >= 0; xx--) { if (exam->compare(pp->at(xx), 0) >= 0) { if (pp->avail()) { //warn("1 [%d]", xx); pp->insert_after(xx, pv); } else { if (xx < pp->fill - 1) { //warn("2"); split(pp, xx+1); pp->push(pv); } else { //warn("3"); osp_ring_page *npg = new_page(pp, 1); pp->fwd_xfer(pp->fill/2); npg->push(pv); } } return 1; } } croak("Add failed?"); return 0; } int OSPV_ring_index1::remove(OSSVPV *pv) { if (!conf) croak("Index not configured"); croak("Not implemented"); return 0; } int OSPV_ring_index1_cs::seek(osp_pathexam &exam) { OSPV_ring_index1 *rp = (OSPV_ring_index1 *) myfocus.resolve(); if (rp->version != version || !RG_POSITION(this)) { // start at the last page if not already positioned page = rp->last; // let abpos become invalid if (!page) return 0; } OSSVPV *conf = rp->conf; if (!conf) croak("Index not configured"); exam.load_path(conf->avx(1)->safe_rv()); if (exam.get_keycnt() < 1) { warn("Seek to where? No keys found"); return 0; } dTXN; int updt = !txn? 1 : txn->can_update(this); //int dir = page->qck_cmp(exam); // binary search // then linear search // give up } // cope with descending XXX MODULE = ObjStore::REP::Ring PACKAGE = ObjStore::REP::Ring PROTOTYPES: DISABLE BOOT: { extern _Application_schema_info ObjStore_REP_Ring_dll_schema_info; osp_thr::use("ObjStore::REP::Ring", OSPERL_API_VERSION); osp_thr::register_schema("ObjStore::REP::Ring", &ObjStore_REP_Ring_dll_schema_info); } MODULE = ObjStore::REP::Ring PACKAGE = ObjStore::REP::Ring::Index static void new(clsv, seg) SV *clsv; SV *seg; PPCODE: os_segment *area = osp_thr::sv_2segment(seg); PUTBACK; OSPV_ring_index1 *pv; NEW_OS_OBJECT(pv, area, OSPV_ring_index1::get_os_typespec(), OSPV_ring_index1()); pv->bless(ST(0)); return; void OSSVPV::_conf_slot(...) PPCODE: PUTBACK; OSPV_ring_index1 *pv = (OSPV_ring_index1 *) THIS; SV *ret=0; if (items == 2) { pv->fix_stats(); if (pv->fill) croak("Can't reconfigure non-empty index"); pv->conf = SvOK(ST(1))? osp_thr::sv_2bridge(ST(1), 1)->ospv() : 0; } else { ret = osp_thr::ospv_2sv(pv->conf); } SPAGAIN; if (ret) XPUSHs(ret);