/* LibMemcached * Copyright (C) 2010 Brian Aker * All rights reserved. * * Use and distribution licensed under the BSD license. See * the COPYING file in the parent directory for full text. * * Summary: connects to a host, and makes sure it is alive. * */ /* -*- Mode: C; tab-width: 2; c-basic-offset: 2; indent-tabs-mode: nil -*- */ #include "libmemcached/common.h" #include "libmemcached/memcached_util.h" #include #include struct memcached_pool_st { pthread_mutex_t mutex; pthread_cond_t cond; memcached_st *master; memcached_st **mmc; int firstfree; uint32_t size; uint32_t current_size; char *version; }; static memcached_return_t mutex_enter(pthread_mutex_t *mutex) { int ret; do ret= pthread_mutex_lock(mutex); while (ret == -1 && errno == EINTR); return (ret == -1) ? MEMCACHED_ERRNO : MEMCACHED_SUCCESS; } static memcached_return_t mutex_exit(pthread_mutex_t *mutex) { int ret; do ret= pthread_mutex_unlock(mutex); while (ret == -1 && errno == EINTR); return (ret == -1) ? MEMCACHED_ERRNO : MEMCACHED_SUCCESS; } /** * Grow the connection pool by creating a connection structure and clone the * original memcached handle. */ static int grow_pool(memcached_pool_st* pool) { memcached_st *obj= calloc(1, sizeof(*obj)); if (obj == NULL) return -1; if (memcached_clone(obj, pool->master) == NULL) { free(obj); return -1; } pool->mmc[++pool->firstfree] = obj; pool->current_size++; return 0; } memcached_pool_st *memcached_pool_create(memcached_st* mmc, uint32_t initial, uint32_t max) { memcached_pool_st* ret = NULL; memcached_pool_st object = { .mutex = PTHREAD_MUTEX_INITIALIZER, .cond = PTHREAD_COND_INITIALIZER, .master = mmc, .mmc = calloc(max, sizeof(memcached_st*)), .firstfree = -1, .size = max, .current_size = 0 }; if (object.mmc != NULL) { ret= calloc(1, sizeof(*ret)); if (ret == NULL) { free(object.mmc); return NULL; } *ret = object; /* Try to create the initial size of the pool. An allocation failure at this time is not fatal.. */ for (unsigned int ii= 0; ii < initial; ++ii) { if (grow_pool(ret) == -1) break; } } return ret; } memcached_st* memcached_pool_destroy(memcached_pool_st* pool) { memcached_st *ret = pool->master; for (int xx= 0; xx <= pool->firstfree; ++xx) { memcached_free(pool->mmc[xx]); free(pool->mmc[xx]); pool->mmc[xx] = NULL; } pthread_mutex_destroy(&pool->mutex); pthread_cond_destroy(&pool->cond); free(pool->mmc); free(pool); return ret; } memcached_st* memcached_pool_pop(memcached_pool_st* pool, bool block, memcached_return_t *rc) { memcached_st *ret= NULL; if ((*rc= mutex_enter(&pool->mutex)) != MEMCACHED_SUCCESS) return NULL; do { if (pool->firstfree > -1) ret= pool->mmc[pool->firstfree--]; else if (pool->current_size == pool->size) { if (!block) { *rc= mutex_exit(&pool->mutex); return NULL; } if (pthread_cond_wait(&pool->cond, &pool->mutex) == -1) { int err = errno; mutex_exit(&pool->mutex); errno = err; *rc= MEMCACHED_ERRNO; return NULL; } } else if (grow_pool(pool) == -1) { *rc= mutex_exit(&pool->mutex); return NULL; } } while (ret == NULL); *rc= mutex_exit(&pool->mutex); return ret; } memcached_return_t memcached_pool_push(memcached_pool_st* pool, memcached_st *mmc) { memcached_return_t rc= mutex_enter(&pool->mutex); if (rc != MEMCACHED_SUCCESS) return rc; char* version= memcached_get_user_data(mmc); /* Someone updated the behavior on the object.. */ if (version != pool->version) { memcached_free(mmc); memset(mmc, 0, sizeof(*mmc)); if (memcached_clone(mmc, pool->master) == NULL) { rc= MEMCACHED_SOME_ERRORS; } } pool->mmc[++pool->firstfree]= mmc; if (pool->firstfree == 0 && pool->current_size == pool->size) { /* we might have people waiting for a connection.. wake them up :-) */ pthread_cond_broadcast(&pool->cond); } memcached_return_t rval= mutex_exit(&pool->mutex); if (rc == MEMCACHED_SOME_ERRORS) return rc; return rval; } memcached_return_t memcached_pool_behavior_set(memcached_pool_st *pool, memcached_behavior_t flag, uint64_t data) { memcached_return_t rc= mutex_enter(&pool->mutex); if (rc != MEMCACHED_SUCCESS) return rc; /* update the master */ rc= memcached_behavior_set(pool->master, flag, data); if (rc != MEMCACHED_SUCCESS) { mutex_exit(&pool->mutex); return rc; } ++pool->version; memcached_set_user_data(pool->master, pool->version); /* update the clones */ for (int xx= 0; xx <= pool->firstfree; ++xx) { rc= memcached_behavior_set(pool->mmc[xx], flag, data); if (rc == MEMCACHED_SUCCESS) memcached_set_user_data(pool->mmc[xx], pool->version); else { memcached_free(pool->mmc[xx]); memset(pool->mmc[xx], 0, sizeof(*pool->mmc[xx])); if (memcached_clone(pool->mmc[xx], pool->master) == NULL) { /* I'm not sure what to do in this case.. this would happen if we fail to push the server list inside the client.. I should add a testcase for this, but I believe the following would work, except that you would add a hole in the pool list.. in theory you could end up with an empty pool.... */ free(pool->mmc[xx]); pool->mmc[xx]= NULL; } } } return mutex_exit(&pool->mutex); } memcached_return_t memcached_pool_behavior_get(memcached_pool_st *pool, memcached_behavior_t flag, uint64_t *value) { memcached_return_t rc= mutex_enter(&pool->mutex); if (rc != MEMCACHED_SUCCESS) { return rc; } *value= memcached_behavior_get(pool->master, flag); return mutex_exit(&pool->mutex); }