#include #include #include #include #include #include #include #include #include #include #include #include #include #include #define PROGNAME "keyedmutexd" #define VERSION "0.04" #define DEFAULT_SOCKPATH "/tmp/" PROGNAME ".sock" #define DEFAULT_CONNS_SIZE 32 #define DEFAULT_TIMEOUT_SECS 30 #define KEY_SIZE (16) #ifndef MIN #define MIN(x, y) ((x) < (y) ? (x) : (y)) #endif #ifndef MAX #define MAX(x, y) ((x) < (y) ? (y) : (x)) #endif #ifndef INLINE #define INLINE __inline #endif /* messages */ #define OWNER_MSG "O" #define RELEASE_MSG "R" /* states, note that connections w. valid key have their lsb set */ #define CS_NOCONN 0x0 #define CS_KEYREAD 0x2 #define CS_OWNER 0x1 #define CS_NONOWNER 0x3 #define CS_STATE_MASK (0x3) #define CS_IKEY_MASK (~CS_STATE_MASK) /* conn_* arrays start from socket no. listen_fd + 1 */ static int* conn_states; static char* conn_key_offsets; static char (*conn_keys)[KEY_SIZE]; static time_t* owner_timeouts; static int conns_length = 0; /* index of last valid conn + 1 */ static int conns_size = DEFAULT_CONNS_SIZE; /* size of conns */ static int listen_fd; static int exit_loop; static struct sockaddr_un sun; static struct sockaddr_in sin; static int use_tcp; static int timeout_secs = DEFAULT_TIMEOUT_SECS; static int force; static int print_info; static int no_log; static struct option longopts[] = { { "socket", required_argument, NULL, 's' }, { "maxconn", required_argument, NULL, 'm' }, { "timeout", required_argument, NULL, 't' }, { "force", no_argument, NULL, 'f' }, { "help", no_argument, &print_info, 'h' }, { "version", no_argument, &print_info, 'v' }, { "no-log", no_argument, &no_log, 1 }, { NULL, no_argument, NULL, 0 }, }; #define LOG(fd, status, key) \ do { \ if (! no_log) write_log((fd), (status), (key)); \ } while (0) static void write_log(int fd, const char* status, const char* key) { char hexkey[KEY_SIZE * 2 + 2]; int i; if (key != NULL) { hexkey[0] = ' '; for (i = 0; i < KEY_SIZE; i++) { hexkey[i * 2 + 1] = ("0123456789abcdef")[(key[i] >> 4) & 0xf]; hexkey[i * 2 + 2] = ("0123456789abcdef")[key[i] & 0xf]; } hexkey[KEY_SIZE * 2 + 1] = '\0'; } else { hexkey[0] = '\0'; } printf("%d %s%s\n", fd, status, hexkey); } INLINE int reuse_addr(int fd) { int arg = 1; return setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &arg, sizeof(arg)); } INLINE int nonblock(int fd) { return fcntl(fd, F_SETFL, O_NONBLOCK); } INLINE int nodelay(int fd) { int arg = 0; return setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &arg, sizeof(arg)); } INLINE int key2i(const char* _key) { const int* key = (void*)(_key + KEY_SIZE); int ikey = 0; do { ikey ^= *--key; } while (key != (void*)_key); return ikey & CS_IKEY_MASK; } INLINE void close_conn(int i) { int fd = i + listen_fd + 1; close(fd); conn_states[i] = CS_NOCONN; if (i + 1 == conns_length) { for (conns_length -= 1; conns_length != 0; conns_length--) { if (conn_states[conns_length - 1] != CS_NOCONN) { break; } } } LOG(fd, "closed", NULL); } INLINE void setup_conn(int i) { conn_states[i] = CS_KEYREAD; conn_key_offsets[i] = 0; } static int owner_exists(int ikey, const char* key) { int state = ikey | CS_OWNER, i; for (i = 0; i < conns_length; i++) { if (conn_states[i] == state && memcmp(conn_keys[i], key, KEY_SIZE) == 0) { return 1; } } return 0; } static void notify_nonowners(int ikey, const char* key) { int state = ikey | CS_NONOWNER, i; for (i = 0; i < conns_length; i++) { if (conn_states[i] == state && memcmp(conn_keys[i], key, KEY_SIZE) == 0) { int fd = i + listen_fd + 1; if (write(fd, RELEASE_MSG, 1) <= 0) { close_conn(i); } else { LOG(fd, "notify", key); setup_conn(i); } } } } static void loop(void) { while (! exit_loop) { fd_set readfds; struct timeval tv = { 60, 900 * 1000 }; int i, num_conns = 0; time_t now = time(NULL); /* setup readfds and set noconn_exists */ FD_ZERO(&readfds); for (i = 0; i < conns_length; i++) { switch (conn_states[i] & CS_STATE_MASK) { case CS_OWNER: if (owner_timeouts[i] <= now) { tv.tv_sec = 0; } else { tv.tv_sec = MIN(tv.tv_sec, owner_timeouts[i] - now); } /* continues */ case CS_KEYREAD: case CS_NONOWNER: FD_SET(i + listen_fd + 1, &readfds); num_conns++; break; case CS_NOCONN: break; } } if (num_conns < conns_size) { FD_SET(listen_fd, &readfds); } /* select, and update time */ select(listen_fd + conns_length + 1, &readfds, NULL, NULL, &tv); now = time(NULL); /* accept new connections */ if (FD_ISSET(listen_fd, &readfds)) { do { int fd = accept(listen_fd, NULL, NULL); if (fd == -1) { break; } nodelay(fd); i = fd - listen_fd - 1; assert(0 <= i && i < conns_size); assert(conn_states[i] == CS_NOCONN); setup_conn(i); conns_length = MAX(i + 1, conns_length); LOG(fd, "connected", NULL); num_conns++; } while (num_conns < conns_size); } /* read data */ for (i = 0; i < conns_length; i++) { int fd = i + listen_fd + 1; switch (conn_states[i] & CS_STATE_MASK) { case CS_KEYREAD: if (FD_ISSET(fd, &readfds)) { int r = read(fd, conn_keys[i] + conn_key_offsets[i], KEY_SIZE - conn_key_offsets[i]); if (r <= 0) { close_conn(i); } else { if ((conn_key_offsets[i] += r) == KEY_SIZE) { int ikey = key2i(conn_keys[i]); if (owner_exists(ikey, conn_keys[i])) { conn_states[i] = ikey | CS_NONOWNER; LOG(fd, "notowner", conn_keys[i]); } else { write(fd, OWNER_MSG, 1); conn_states[i] = ikey | CS_OWNER; owner_timeouts[i] = now + timeout_secs; LOG(fd, "owner", conn_keys[i]); } } } } break; case CS_OWNER: if (FD_ISSET(fd, &readfds)) { char ch; int r = read(fd, &ch, 1); LOG(fd, "release", conn_keys[i]); notify_nonowners(conn_states[i] & CS_IKEY_MASK, conn_keys[i]); if (r <= 0 || ch != RELEASE_MSG[0]) { close_conn(i); } else { setup_conn(i); } } else { if (owner_timeouts[i] <= now) { LOG(fd, "release_to", conn_keys[i]); notify_nonowners(conn_states[i] & CS_IKEY_MASK, conn_keys[i]); close_conn(i); } } break; case CS_NONOWNER: if (FD_ISSET(fd, &readfds)) { close_conn(i); } break; default: assert(! FD_ISSET(fd, &readfds)); break; } } } } void term_handler(int _unused) { exit_loop = 1; } static void usage(void) { fprintf(stdout, "Usage: " PROGNAME " [OPTION]...\n" "\n" "Keyedmutexd is a tiny daemon that acts as a mutex for supplied key.\n" "\n" "Options:\n" " -f,--force removes old socket file if exists\n" " -s,--socket=SOCKET unix domain socket or tcp port number\n" " (default: %s)\n" " -m,--maxconn=MAXCONN number of max. connections (default: %d)\n" " -t,--timeout=secs timeout for holding locks (default: %d)\n" " --no-log omit logging\n" " --help help\n" " --version version\n" "\n" "Report bugs to http://labs.cybozu.co.jp/blog/kazuhoatwork/\n", DEFAULT_SOCKPATH, DEFAULT_CONNS_SIZE, DEFAULT_TIMEOUT_SECS); exit(0); } int main(int argc, char** argv) { int ch; sun.sun_family = AF_UNIX; strcpy(sun.sun_path, DEFAULT_SOCKPATH); sin.sin_family = AF_INET; sin.sin_addr.s_addr = htonl(INADDR_ANY); while ((ch = getopt_long(argc, argv, "s:m:t:f", longopts, NULL)) != -1) { switch (ch) { case 's': { unsigned short p; if (sscanf(optarg, "%hu", &p) == 1) { sin.sin_port = htons(p); use_tcp = 1; } else { strncpy(sun.sun_path, optarg, sizeof(sun.sun_path) - 1); sun.sun_path[sizeof(sun.sun_path) - 1] = '\0'; } } break; case 'f': force = 1; break; case 'm': if (sscanf(optarg, "%d", &conns_size) != 1 || conns_size <= 0) { fprintf(stderr, "invalid value for parameter \"-n\"\n"); exit(1); } break; case 't': if (sscanf(optarg, "%d", &timeout_secs) != 1 || timeout_secs <= 0) { fprintf(stderr, "invalid value for parameter \"-t\"\n"); exit(1); } break; case 0: switch (print_info) { case 'h': usage(); break; case 'v': fputs(PROGNAME " " VERSION "\n", stdout); exit(0); } break; default: fprintf(stderr, "unknown option: %s\n", argv[optind - 1]); exit(1); } } if ((conn_states = calloc(conns_size, sizeof(*conn_states))) == NULL || (conn_key_offsets = calloc(conns_size, sizeof(*conn_key_offsets))) == NULL || (conn_keys = calloc(conns_size, sizeof(*conn_keys))) == NULL || (owner_timeouts = calloc(conns_size, sizeof(*owner_timeouts))) == NULL ) { perror(NULL); exit(2); } if (use_tcp) { if ((listen_fd = socket(AF_INET, SOCK_STREAM, 0)) == -1 || reuse_addr(listen_fd) == -1 || nonblock(listen_fd) == -1 || bind(listen_fd, (struct sockaddr*)&sin, sizeof(sin)) == -1 || listen(listen_fd, 5) == -1) { perror("failed to open a listening socket"); exit(2); } } else { if (force) { unlink(sun.sun_path); } if ((listen_fd = socket(AF_UNIX, SOCK_STREAM, 0)) == -1 || nonblock(listen_fd) == -1 || bind(listen_fd, (struct sockaddr*)&sun, sizeof(sun)) == -1 || listen(listen_fd, 5) == -1) { perror("failed to open a listening socket"); exit(2); } } signal(SIGPIPE, SIG_IGN); signal(SIGHUP, term_handler); signal(SIGINT, term_handler); signal(SIGTERM, term_handler); loop(); close(listen_fd); if (! use_tcp) { unlink(sun.sun_path); } return 0; }