// -*-C++-*- mode
#include "osp-preamble.h"
#include "osperl.h"
#include "Ring.h"

#undef MAX
#define MAX(x,y) (x>y? x : y)
#undef MIN
#define MIN(x,y) (x<y? x : y)

osp_ring_page::osp_ring_page()
{
  next = prev = 0;
  fill = old_fill = 0;
  first = 0;
}

osp_ring_page::~osp_ring_page()
{
  if (next) next->prev = prev;
  if (prev) prev->next = next;
}

void osp_ring_page::uncache_keys()
{
  keys.set_undef();
}

void osp_ring_page::cache_keys(osp_pathexam &exam, OSSVPV *pv)
{
  exam.load_keypack1(pv, keys);
}

void osp_ring_page::verify_keys(osp_pathexam &exam)
{
  if (!keys.cnt) return;

  osp_keypack1 kp;
  exam.load_keypack1(*first, kp);
  if (!(kp == keys)) croak("Key cache out of sync");
}

int osp_ring_page::qck_cmp(osp_pathexam &exam, int update_ok)
{
  if (update_ok && !keys.cnt)
    exam.load_keypack1(*first, keys);
  return exam.compare(keys, 1);
}

void osp_ring_page::_move(OSPVptr *start, OSPVptr *end, OSPVptr *to)
{
  assert(start <= end);
  if (start < to) {
    to += end - start;
    while (1) {
      to->steal(*end);
      if (start == end) break;
      --end;
      --to;
    }
    /*
    for (int xx=end-start; xx >= 0; xx--) {
      ar[to+xx].steal(ar[start+xx]);
    }
    */
  } else {
    while (1) {
      to->steal(*start);
      if (start == end) break;
      ++start;
      ++to;
    }
    /*
    for (int xx=0; xx <= end-start; xx++) {
      ar[to+xx].steal(ar[start+xx]);
    }
    */
  }
}

void osp_ring_page::reset_first()
{
  if (!first) { first = array(); return; }
  if (first == array()) return;
  OSPVptr *top = array();
  _move(first, first+fill-1, top);
  first = top;
}

OSSVPV *osp_ring_page::shift()
{
  assert(fill > 0);
  if (!first) first = array();
  OSSVPV *pv = first->resolve();
  ++first;
  --fill;
  return pv;
}

OSSVPV *osp_ring_page::pop()
{
  assert(fill > 0);
  --fill;
  if (!first) first = array();
  OSSVPV *pv = (first + fill)->resolve();
  return pv;
}

void osp_ring_page::extend(int to)
{
  reset_first();
  assert(to <= get_max());
  assert(to > fill);
  fill = to;
}

void osp_ring_page::push(SV **base, int items)
{
  reset_first();
  assert(fill + items <= get_max());
  for (int xx=0; xx < items; xx++) {
    *(first + fill) = osp_thr::sv_2bridge(base[xx], 0,
					  os_segment::of(this))->ospv();
    ++fill;
  }
}

void osp_ring_page::push(OSSVPV *pv)
{
  reset_first();
  assert(fill + 1 <= get_max());
  *(first+fill) = pv;
  ++fill;
}

void osp_ring_page::prepare_insert(int items)
{
  uncache_keys();
  OSPVptr *top = array();
  if (!first) first = top;
  assert(fill + items <= get_max());
  if (first - top < items) {
    if (fill) _move(first, first+fill-1, top + items);
    first = top + items;
  }
}

void osp_ring_page::unshift(SV **base, int items)
{
  prepare_insert(items);
  first -= items;
  for (int xx=0; xx < items; xx++) {
    *(first+xx) = osp_thr::sv_2bridge(base[xx], 0, os_segment::of(this))->ospv();
  }
  fill += items;
}

void osp_ring_page::unshift(OSSVPV *pv)
{
  prepare_insert(1);
  --first;
  *first = pv;
  ++fill;
}

void osp_ring_page::fwd_xfer(int at)
{
  int move = fill - at;
  assert(move > 0 && next->avail() >= move);
  next->prepare_insert(move);
  next->first -= move;
  OSPVptr *dst = next->first;
  OSPVptr *src = first + at;
  for (int xx=0; xx < move; xx++) {
    dst->steal(*src);
    ++dst;
    ++src;
  }
  fill -= move;
  next->fill += move;
  // backfill at the same time XXX
}

void osp_ring_page::insert_after(int at, OSSVPV *pv)
{
  reset_first();
  if (at < fill - 1) {
    _move(first + at + 1, first + fill - 1, first + at + 2);
  }
  *(first + at + 1) = pv;
  ++fill;
}

void osp_ring_page::splice(int offset, int length, SV **base, int count)
{
  reset_first();
  if (length != count && offset + length < fill)
    _move(first + offset + length, first + fill-1, first + offset + count);
  for (int xx=offset; xx < offset+count; xx++) {
    *(first + xx) = osp_thr::sv_2bridge(base[xx-offset], 0,
					os_segment::of(this))->ospv();
  }
  fill += count - length;
}

// -------------------------------------------------

int osp_ring_page1::get_max()
{ return OSP_RING_PAGE1_MAX; }

int osp_ring_page1::avail()
{ return OSP_RING_PAGE1_MAX - fill; }

OSPVptr *osp_ring_page1::array()
{ return ar; }

OSPVptr &osp_ring_page1::at(int xx)
{
  if (xx < 0 || xx >= fill)
    croak("Attempt to access unallocated page slot");
  return *(first+xx);
}

// -------------------------------------------------

OSPV_ring_index1::OSPV_ring_index1()
{
  version = 0;
  max = fill = 0;
  first = last = 0;
}

void OSPV_ring_index1::fix_stats()
{
  if (!RG_STALE(this)) return;
  if (first)
    fill += first->fill - first->old_fill;
  if (first != last)
    fill += last->fill - last->old_fill;
  RG_STALE_off(this);
}

U32 OSPV_ring_index1::read_fill()
{
  if (!RG_STALE(this)) return fill;
  else {
    U32 zfill = fill;
    if (first)
      zfill += first->fill - first->old_fill;
    if (first != last)
      zfill += last->fill - last->old_fill;
    return zfill;
  }
}

void OSPV_ring_index1::stale_stats()
{
  // playing tricks to avoid writes
  if (RG_STALE(this)) return;
  first->old_fill = first->fill;
  last->old_fill = last->fill;
  RG_STALE_on(this);
}

osp_ring_page *OSPV_ring_index1::new_page(osp_ring_page *ref, int after)
{
  osp_ring_page *npg;
  fix_stats();
  NEW_OS_OBJECT(npg, os_segment::of(this),
		osp_ring_page1::get_os_typespec(), osp_ring_page1());
  max += npg->get_max();
  if (!ref) {
    assert(fill == 0);
    assert(!first);
    first = last = npg;
    return npg;
  }
  else {
    assert(ref);
    if (after) {
      npg->prev = ref;
      npg->next = ref->next;
    } else {
      npg->next = ref;
      npg->prev = ref->prev;
    }
    if (npg->next)
      npg->next->prev = npg;
    else
      last = npg;
    if (npg->prev)
      npg->prev->next = npg;
    else
      first = npg;
    return npg;
  }
}

osp_ring_page *OSPV_ring_index1::get_page(U32 loc, I16 *offset)
{
  U32 zfill = read_fill();
  U32 pgtop;
  osp_ring_page *pp;
  assert(loc < zfill);
  if (loc < zfill/2) {
    pp = first;
    pgtop = 0;
    while (loc >= pgtop + pp->fill) {
      pgtop += pp->fill;
      pp = pp->next;
      assert(pp);
    }
  }
  else {
    pp = last;
    pgtop = zfill - pp->fill;
    while (loc < pgtop) {
      pp = pp->prev;
      assert(pp);
      pgtop -= pp->fill; 
    }
  }
  *offset = loc - pgtop;
  assert(*offset >= 0 && *offset < pp->fill);
  return pp;
}

OSPV_ring_index1::~OSPV_ring_index1()
{ CLEAR(); }

int OSPV_ring_index1::get_perl_type()
{ return SVt_PVAV; }

char *OSPV_ring_index1::os_class(STRLEN *len)
{ *len = 15; return "ObjStore::Index"; }

char *OSPV_ring_index1::rep_class(STRLEN *len)
{ *len = 26; return "ObjStore::REP::Ring::Index"; }

int OSPV_ring_index1::FETCHSIZE()
{ return read_fill(); }

void OSPV_ring_index1::CLEAR()
{
  fix_stats();
  ++version;
  while (first) {
    first->fill = 0;
    free_page(first);
  }
  fill = 0;
  assert(!max);
}

void OSPV_ring_index1::_debug1(void *)
{
  dOSP;
  osp_pathexam *exam = &osp->exam;
  exam->init();
  exam->load_path(((OSSVPV*)conf)->avx(1)->safe_rv());

  osp_ring_page *pp = first;
  while (pp) {
    pp->verify_keys(*exam);
    pp = pp->next;
  }
}

void OSPV_ring_index1::free_page(osp_ring_page *pp)
{
  fix_stats();
  ++version;
  assert(pp->fill == 0);
  if (first == pp) first = pp->next;
  if (last == pp) last = pp->prev;
  max -= pp->get_max();
  delete pp;
}

void OSPV_ring_index1::Extend(U32 to)
{
  osp_ring_page *pp;
  fix_stats();
  if (to < fill) return;
  pp = last? last : new_page(0,0);
  U32 more = to - fill;
  if (more > pp->avail()) more = pp->avail();
  pp->extend(pp->fill + more);
  fill += more;
  while (fill < to) {
    pp = new_page(pp, 1);
    more = to - fill;
    if (more > pp->avail()) more = pp->avail();
    pp->extend(pp->fill + more);
    fill += more;
  }
}

void OSPV_ring_index1::FETCH(SV *key)
{
  I16 off;
  osp_ring_page *pp;
  U32 to = osp_thr::sv_2aelem(key);
  if (to < 0 || to >= read_fill())
    return;
  pp = get_page(to, &off);
  OSSVPV *pv = pp->at(off).resolve();
  SV *sv = osp_thr::ospv_2sv(pv);
  dSP;
  XPUSHs(sv);
  PUTBACK;
}

void OSPV_ring_index1::STORE(SV *key, SV *nval)
{
  U32 to = osp_thr::sv_2aelem(key);
  I16 off;
  Extend(to+1);
  osp_ring_page *pp = get_page(to, &off);
  if (off == 0) pp->uncache_keys();
  pp->at(off) = osp_thr::sv_2bridge(nval, 0, os_segment::of(this))->ospv();
  dTHR;
  if (GIMME_V == G_VOID) return;
  djSP;
  XPUSHs(sv_mortalcopy(nval));
  PUTBACK;
}

void OSPV_ring_index1::POP()
{
  osp_ring_page *pp = last;
  if (!pp) return;
  stale_stats();
  SV *ret = osp_thr::ospv_2sv(pp->pop());
  if (!pp->fill)
    free_page(pp);
  djSP;
  XPUSHs(ret);
  PUTBACK;
}

void OSPV_ring_index1::SHIFT()
{
  osp_ring_page *pp = first;
  if (!pp) return;
  stale_stats();
  pp->uncache_keys();
  SV *ret = osp_thr::ospv_2sv(pp->shift());
  if (!pp->fill)
    free_page(pp);
  djSP;
  XPUSHs(ret);
  PUTBACK;
}

void OSPV_ring_index1::PUSH(SV **base, int items)
{
  while (items) {
    osp_ring_page *pp = last;
    if (!pp || !pp->avail()) pp = new_page(last, 1);
    stale_stats();
    int chunk = MIN(pp->avail(), items);
    pp->push(base, chunk);
    base += chunk;
    items -= chunk;
  }
}

void OSPV_ring_index1::UNSHIFT(SV **base, int items)
{
  while (items) {
    osp_ring_page *pp = first;
    if (!pp || !pp->avail()) pp = new_page(first, 0);
    else pp->uncache_keys();
    stale_stats();
    int chunk = MIN(pp->avail(), items);
    pp->unshift(base, chunk);
    base += chunk;
    items -= chunk;
  }
}

void OSPV_ring_index1::split(osp_ring_page *pp, int at)
{
  if (!pp->next || pp->next->avail() <= pp->fill - at) new_page(pp, 1);
  pp->uncache_keys();
  pp->fwd_xfer(at);
}

void OSPV_ring_index1::SPLICE(int offset, int length, SV **top, int count)
{
  fix_stats();
  Extend(offset + length);
  osp_ring_page *pp = 0;
  I16 at;
  while (length) {
    if (!pp) { pp = get_page(offset, &at); }
    else {
      pp = pp->next;
      at = 0;
      pp->uncache_keys();
    }
    int chunk = pp->get_max() - at;
    int clen = MIN(length, chunk);
    int ccnt = MIN(count, chunk);

    pp->splice(at, clen, top, ccnt);

    length = MAX(0, length - clen);
    count = MAX(0, count - ccnt);
    top += ccnt;
  }
  if (count) {
    if (!pp) pp = get_page(offset, &at);
    split(pp, at);
    
    fill += count;
    while (count) {
      if (!pp->avail()) pp = new_page(pp, 1);
      int chunk = MIN(pp->avail(), count);
      pp->push(top, chunk);
      top += chunk;
      count -= chunk;
    }
  }
}

OSSVPV *OSPV_ring_index1::new_cursor(os_segment *seg)
{ return new(seg, OSPV_ring_index1_cs::get_os_typespec())
    OSPV_ring_index1_cs(this); }

double OSPV_ring_index1::_percent_filled()
{ return fill/(double)max; }

//-----------------------------------------------------------

void OSPV_ring_index1_cs::CHK_VER()
{
  // incr(version) only happens when pages are freed!
  OSPV_ring_index1 *rp = (OSPV_ring_index1 *) myfocus.resolve();
  STRLEN _len;
  if (version != rp->version)
    croak("Version mismatch on 0x%x=%s", rp, rp->os_class(&_len));
}

OSPV_ring_index1_cs::OSPV_ring_index1_cs(OSPV_ring_index1 *_fo)
{
  myfocus = _fo;
  version = _fo->version;
  page=0;
}

OSSVPV *OSPV_ring_index1_cs::focus()
{ return myfocus; }

I32 OSPV_ring_index1_cs::pos()
{
  if (!RG_POSITION(this)) {
    if (!RG_ATEND(this)) {
      return -1;
    } else {
      OSPV_ring_index1 *rp = (OSPV_ring_index1 *) myfocus.resolve();
      return rp->read_fill();
    }
  } else {
    CHK_VER();
    return abpos;
  }
}

void OSPV_ring_index1_cs::at()
{
  if (!RG_POSITION(this)) return;
  CHK_VER();
  SV *ret = osp_thr::ospv_2sv(page->at(reloff));
  dSP;
  XPUSHs(ret);
  PUTBACK;
}

void OSPV_ring_index1_cs::store(SV *nval)
{
  if (!RG_POSITION(this)) croak("Can't store to unpositioned cursor");
  CHK_VER();
  if (reloff == 0) page->uncache_keys();
  page->at(reloff) = osp_thr::sv_2bridge(nval, 0, os_segment::of(this))->ospv();
}

void OSPV_ring_index1_cs::keys()
{
  if (!RG_POSITION(this)) return;
  CHK_VER();
  OSPV_ring_index1 *rp = (OSPV_ring_index1 *) myfocus.resolve();
  OSSVPV *conf = rp->conf;
  if (!conf) croak("keys() on unconfigured index");
  dOSP;
  osp_pathexam *exam = &osp->exam;
  exam->init();
  exam->load_path(conf->avx(1)->safe_rv());
  exam->load_target('x', page->at(reloff));
  exam->push_keys();
}

void OSPV_ring_index1_cs::moveto(I32 xto)  //unsigned? XXX
{
  OSPV_ring_index1 *rp = (OSPV_ring_index1 *) myfocus.resolve();
  version = rp->version;
  if (xto < 0 || xto >= rp->read_fill()) {
    RG_POSITION_off(this);
    if (xto < 0) RG_ATEND_off(this); else RG_ATEND_on(this);
    return;
  }
  RG_POSITION_on(this);
  abpos = xto;
  page = rp->get_page(xto, &reloff);
}

void OSPV_ring_index1_cs::step(I32 delta)
{
  OSPV_ring_index1 *rp = (OSPV_ring_index1 *) myfocus.resolve();
  CHK_VER();
  abpos += delta;
  if (delta > 0) {
    while (delta) {
      //assert(delta > 0);
      if (!RG_POSITION(this)) {
	if (RG_ATEND(this)) return;
	page = rp->first;
	if (!page) { RG_ATEND_on(this); return; }
	reloff = 0;
	abpos = 0;
	RG_POSITION_on(this);
	--delta;
      } else {
	int chunk = MIN(delta, page->fill-1 - reloff);
	if (chunk) {
	  reloff += chunk;
	  delta -= chunk;
	}
	else {
	  page = page->next;
	  if (!page) {
	    RG_POSITION_off(this);
	    RG_ATEND_on(this);
	    return;
	  }
	  reloff = 0;
	  --delta;
	}
      }
    }
  } else /* delta < 0 */ {
    while (delta) {
      //assert(delta < 0);
      if (!RG_POSITION(this)) {
	if (!RG_ATEND(this)) return;
	page = rp->last;
	if (!page) { RG_ATEND_off(this); return; }
	abpos = rp->read_fill() - 1;
	reloff = page->fill - 1;
	RG_POSITION_on(this);
	++delta;
      } else {
	int chunk = MIN(-delta, reloff);
	if (chunk) {
	  reloff -= chunk;
	  delta += chunk;
	}
	else {
	  page = page->prev;
	  if (!page) {
	    RG_POSITION_off(this);
	    RG_ATEND_off(this);
	    return;
	  }
	  reloff = page->fill - 1;
	  ++delta;
	}
      }
    }
  }
}

// MESSY stuff -------------------------------------

int OSPV_ring_index1::add(OSSVPV *pv)
{
  if (!conf) croak("Index not configured");

  dOSP;
  osp_pathexam *exam = &osp->exam;
  exam->init();
  exam->load_path(((OSSVPV*)conf)->avx(1)->safe_rv());
  if (!exam->load_target('x', pv)) return 0;

  osp_ring_page *pp = last;
  if (!pp || exam->compare(pp->at(pp->fill - 1), 0) >= 0) {
    // can insert as last record
    if (!pp || pp->avail() <= 1) pp = new_page(last, 1);
    stale_stats();
    if (!pp->fill) pp->cache_keys(*exam, pv);
    pp->push(pv);
    return 1;
  }
  
  fix_stats();
  ++fill;

  while (pp && pp->qck_cmp(*exam, 1) < 0) pp = pp->prev;
  if (!pp) {
    pp = first;
    if (!pp || !pp->avail()) pp = new_page(first, 0);
    //warn("4");
    pp->unshift(pv);
    return 1;
  }
  for (int xx = pp->fill - 1; xx >= 0; xx--) {
    if (exam->compare(pp->at(xx), 0) >= 0) {
      if (pp->avail()) {
	//warn("1 [%d]", xx);
	pp->insert_after(xx, pv);
      } else {
	if (xx < pp->fill - 1) {
	  //warn("2");
	  split(pp, xx+1);
	  pp->push(pv);
	} else {
	  //warn("3");
	  osp_ring_page *npg = new_page(pp, 1);
	  pp->fwd_xfer(pp->fill/2);
	  npg->push(pv);
	}
      }
      return 1;
    }
  }
  croak("Add failed?");
  return 0;
}

int OSPV_ring_index1::remove(OSSVPV *pv)
{
  if (!conf) croak("Index not configured");
  croak("Not implemented");
  return 0;
}

int OSPV_ring_index1_cs::seek(osp_pathexam &exam)
{
  OSPV_ring_index1 *rp = (OSPV_ring_index1 *) myfocus.resolve();
  if (rp->version != version || !RG_POSITION(this)) {
    // start at the last page if not already positioned
    page = rp->last;
    // let abpos become invalid
    if (!page) return 0;
  }
  OSSVPV *conf = rp->conf;
  if (!conf) croak("Index not configured");
  exam.load_path(conf->avx(1)->safe_rv());
  if (exam.get_keycnt() < 1) {
    warn("Seek to where?  No keys found");
    return 0;
  }

  dTXN;
  int updt = !txn? 1 : txn->can_update(this);

  //int dir = page->qck_cmp(exam);
  // binary search
  // then linear search
  // give up
}

// cope with descending XXX

MODULE = ObjStore::REP::Ring	PACKAGE = ObjStore::REP::Ring

PROTOTYPES: DISABLE

BOOT:
{
  extern _Application_schema_info ObjStore_REP_Ring_dll_schema_info;
  osp_thr::use("ObjStore::REP::Ring", OSPERL_API_VERSION);
  osp_thr::register_schema("ObjStore::REP::Ring",
	&ObjStore_REP_Ring_dll_schema_info);
}

MODULE = ObjStore::REP::Ring	PACKAGE = ObjStore::REP::Ring::Index

static void
new(clsv, seg)
  SV *clsv;
  SV *seg;
	PPCODE:
	os_segment *area = osp_thr::sv_2segment(seg);
	PUTBACK;
	OSPV_ring_index1 *pv;
	NEW_OS_OBJECT(pv, area, OSPV_ring_index1::get_os_typespec(), OSPV_ring_index1());
	pv->bless(ST(0));
	return;

void
OSSVPV::_conf_slot(...)
	PPCODE:
	PUTBACK;
	OSPV_ring_index1 *pv = (OSPV_ring_index1 *) THIS;
	SV *ret=0;
	if (items == 2) {
	  pv->fix_stats();
	  if (pv->fill) croak("Can't reconfigure non-empty index");
	  pv->conf = SvOK(ST(1))? osp_thr::sv_2bridge(ST(1), 1)->ospv() : 0;
	}
	else {
	  ret = osp_thr::ospv_2sv(pv->conf);
	}
	SPAGAIN;
	if (ret) XPUSHs(ret);