/*
* Copyright (C) 2008 Search Solution Corporation. All rights reserved by Search Solution.
*
* Redistribution and use in source and binary forms, with or without modification,
* are permitted provided that the following conditions are met:
*
* - Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
*
* - Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
*
* - Neither the name of the <ORGANIZATION> nor the names of its contributors
* may be used to endorse or promote products derived from this software without
* specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
* ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
* WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
* IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
* INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
* BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA,
* OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
* WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY
* OF SUCH DAMAGE.
*
*/
/*
* cci_network.c -
*/
#ident "$Id$"
/************************************************************************
* IMPORTED SYSTEM HEADER FILES *
************************************************************************/
#include <string.h>
#include <stdlib.h>
#include <fcntl.h>
#include <errno.h>
#if defined(WINDOWS)
#include <winsock2.h>
#include <windows.h>
#include <io.h>
#else
#include <unistd.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <netinet/tcp.h>
#include <sys/time.h>
#include <poll.h>
#endif
/************************************************************************
* OTHER IMPORTED HEADER FILES *
************************************************************************/
#include "cci_common.h"
#include "cas_cci.h"
#include "cci_log.h"
#include "cci_network.h"
#include "cas_protocol.h"
#include "cci_query_execute.h"
#include "cci_util.h"
#if defined(WINDOWS)
#include "version.h"
#endif
/************************************************************************
* PRIVATE DEFINITIONS *
************************************************************************/
#define WRITE_TO_SOCKET(SOCKFD, MSG, SIZE) \
send(SOCKFD, MSG, SIZE, 0)
#define READ_FROM_SOCKET(SOCKFD, MSG, SIZE) \
recv(SOCKFD, MSG, SIZE, 0)
#define SOCKET_TIMEOUT 5000 /* msec */
/************************************************************************
* PRIVATE TYPE DEFINITIONS *
************************************************************************/
/************************************************************************
* PRIVATE FUNCTION PROTOTYPES *
************************************************************************/
static int connect_srv (unsigned char *ip_addr, int port, char is_retry,
SOCKET * ret_sock, int login_timeout);
#if defined(ENABLE_UNUSED_FUNCTION)
static int net_send_int (SOCKET sock_fd, int value);
#endif
static int net_recv_int (SOCKET sock_fd, int port, int *value);
static int net_recv_stream (SOCKET sock_fd, int port, char *buf, int size,
int timeout);
static int net_send_stream (SOCKET sock_fd, char *buf, int size);
static void init_msg_header (MSG_HEADER * header);
static int net_send_msg_header (SOCKET sock_fd, MSG_HEADER * header);
static int net_recv_msg_header (SOCKET sock_fd, int port, MSG_HEADER * header,
int timeout);
static bool net_peer_socket_alive (SOCKET sd, int port, int timeout_msec);
static int net_cancel_request_internal (unsigned char *ip_addr, int port,
char *msg, int msglen);
static int net_cancel_request_w_local_port (unsigned char *ip_addr, int port,
int pid,
unsigned short local_port);
static int net_cancel_request_wo_local_port (unsigned char *ip_addr, int port,
int pid);
/************************************************************************
* INTERFACE VARIABLES *
************************************************************************/
/************************************************************************
* PUBLIC VARIABLES *
************************************************************************/
#if defined(CCI_OLEDB) || defined(CCI_ODBC)
static char cci_client_type = CAS_CLIENT_ODBC;
#else
static char cci_client_type = CAS_CLIENT_CCI;
#endif
/************************************************************************
* PRIVATE VARIABLES *
************************************************************************/
/************************************************************************
* IMPLEMENTATION OF INTERFACE FUNCTIONS *
************************************************************************/
/************************************************************************
* IMPLEMENTATION OF PUBLIC FUNCTIONS *
************************************************************************/
int
net_connect_srv (T_CON_HANDLE * con_handle, int host_id,
T_CCI_ERROR * err_buf, int login_timeout)
{
SOCKET srv_sock_fd;
char client_info[SRV_CON_CLIENT_INFO_SIZE];
char db_info[SRV_CON_DB_INFO_SIZE];
char ver_str[SRV_CON_VER_STR_MAX_SIZE];
MSG_HEADER msg_header;
int err_code, ret_value;
int err_indicator;
int new_port;
char *msg_buf, *info, *p, *ver_ptr;
unsigned char *ip_addr;
int port;
int body_len;
T_BROKER_VERSION broker_ver;
init_msg_header (&msg_header);
memset (client_info, 0, sizeof (client_info));
memset (db_info, 0, sizeof (db_info));
strncpy (client_info, SRV_CON_CLIENT_MAGIC_STR, SRV_CON_CLIENT_MAGIC_LEN);
client_info[SRV_CON_MSG_IDX_CLIENT_TYPE] = cci_client_type;
client_info[SRV_CON_MSG_IDX_PROTO_VERSION] = CAS_PROTO_PACK_CURRENT_NET_VER;
client_info[SRV_CON_MSG_IDX_FUNCTION_FLAG] =
BROKER_RECONNECT_WHEN_SERVER_DOWN;
client_info[SRV_CON_MSG_IDX_RESERVED2] = 0;
info = db_info;
if (con_handle->db_name)
{
strncpy (info, con_handle->db_name, SRV_CON_DBNAME_SIZE);
}
info += SRV_CON_DBNAME_SIZE;
if (con_handle->db_user)
{
strncpy (info, con_handle->db_user, SRV_CON_DBUSER_SIZE);
}
info += SRV_CON_DBUSER_SIZE;
if (con_handle->db_passwd)
{
strncpy (info, con_handle->db_passwd, SRV_CON_DBPASSWD_SIZE);
}
info += SRV_CON_DBPASSWD_SIZE;
strncpy (info, con_handle->url, SRV_CON_URL_SIZE - 1);
strncpy (ver_str, MAKE_STR (BUILD_NUMBER), SRV_CON_VER_STR_MAX_SIZE);
ver_ptr = info + strlen (con_handle->url) + 1;
if (strlen (con_handle->url) + strlen (ver_str) + 3 <= SRV_CON_URL_SIZE)
{
ver_ptr[0] = (char) strlen (ver_str) + 1;
memcpy (ver_ptr + 1, ver_str, strlen (ver_str) + 1);
}
else
{
ver_ptr[0] = (char) 0;
}
info += SRV_CON_URL_SIZE;
broker_ver = hm_get_broker_version (con_handle);
if (broker_ver == 0)
{
/* Interpretable session information supporting version
* later than PROTOCOL_V3 as well as version earlier
* than PROTOCOL_V3 should be delivered since no broker information
* is provided at the time of initial connection.
*/
snprintf (info, DRIVER_SESSION_SIZE, "%u", 0);
}
else if (hm_broker_understand_the_protocol (broker_ver, PROTOCOL_V3))
{
memcpy (info, con_handle->session_id.id, DRIVER_SESSION_SIZE);
}
else
{
unsigned int v;
v = *(unsigned int *) con_handle->session_id.id;
snprintf (info, DRIVER_SESSION_SIZE, "%u", v);
}
if (host_id < 0)
{
ip_addr = con_handle->ip_addr;
port = con_handle->port;
}
else
{
ip_addr = con_handle->alter_hosts[host_id].ip_addr;
port = con_handle->alter_hosts[host_id].port;
}
ret_value = connect_srv (ip_addr, port, con_handle->is_retry, &srv_sock_fd,
login_timeout);
if (ret_value < 0)
{
return ret_value;
}
if (net_send_stream (srv_sock_fd, client_info, SRV_CON_CLIENT_INFO_SIZE) <
0)
{
err_code = CCI_ER_COMMUNICATION;
goto connect_srv_error;
}
ret_value = net_recv_stream (srv_sock_fd, port, (char *) &err_code, 4,
login_timeout);
if (ret_value < 0)
{
err_code = ret_value;
goto connect_srv_error;
}
err_code = ntohl (err_code);
if (err_code < 0)
{
/* in here, all errors are sent by only a broker
* the error greater than -10000 is sent by old broker
*/
if (err_code > -10000)
{
err_code -= 9000;
}
goto connect_srv_error;
}
new_port = err_code;
if (new_port != port && new_port > 0)
{
CLOSE_SOCKET (srv_sock_fd);
ret_value = connect_srv (ip_addr, new_port, con_handle->is_retry,
&srv_sock_fd, login_timeout);
if (ret_value < 0)
{
return ret_value;
}
}
if (net_send_stream (srv_sock_fd, db_info, SRV_CON_DB_INFO_SIZE) < 0)
{
err_code = CCI_ER_COMMUNICATION;
goto connect_srv_error;
}
ret_value = net_recv_msg_header (srv_sock_fd, port, &msg_header,
login_timeout);
if (ret_value < 0)
{
err_code = ret_value;
goto connect_srv_error;
}
memcpy (con_handle->cas_info, msg_header.info_ptr, MSG_HEADER_INFO_SIZE);
msg_buf = (char *) MALLOC (*(msg_header.msg_body_size_ptr));
if (msg_buf == NULL)
{
err_code = CCI_ER_NO_MORE_MEMORY;
goto connect_srv_error;
}
ret_value = net_recv_stream (srv_sock_fd, port, msg_buf,
*(msg_header.msg_body_size_ptr),
login_timeout);
if (ret_value < 0)
{
FREE_MEM (msg_buf);
err_code = ret_value;
goto connect_srv_error;
}
memcpy (&err_indicator, msg_buf + CAS_PROTOCOL_ERR_INDICATOR_INDEX,
CAS_PROTOCOL_ERR_INDICATOR_SIZE);
err_indicator = ntohl (err_indicator);
if (err_indicator < 0)
{
memcpy (&err_code, msg_buf + CAS_PROTOCOL_ERR_CODE_INDEX,
CAS_PROTOCOL_ERR_CODE_SIZE);
err_code = ntohl (err_code);
/* the error less than -10000 with CAS_ERROR_INDICATOR is sent by new broker
* -10018 (CAS_ER_NOT_AUTHORIZED_CLIENT) is especial case
*/
if ((err_indicator == CAS_ERROR_INDICATOR && err_code < -10000)
|| err_code == -10018)
{
err_code += 9000;
}
if (err_indicator == DBMS_ERROR_INDICATOR)
{
if (err_buf)
{
memcpy (err_buf->err_msg, msg_buf + CAS_PROTOCOL_ERR_MSG_INDEX,
*(msg_header.msg_body_size_ptr) -
(CAS_PROTOCOL_ERR_INDICATOR_SIZE +
CAS_PROTOCOL_ERR_CODE_SIZE));
err_buf->err_code = err_code;
}
err_code = CCI_ER_DBMS;
}
FREE_MEM (msg_buf);
goto connect_srv_error;
}
/* connection success */
con_handle->cas_pid = err_indicator;
p = msg_buf + CAS_PID_SIZE;
memcpy (con_handle->broker_info, p, BROKER_INFO_SIZE);
p += BROKER_INFO_SIZE;
body_len = *(msg_header.msg_body_size_ptr);
broker_ver = hm_get_broker_version (con_handle);
if (hm_broker_understand_the_protocol (broker_ver, PROTOCOL_V4))
{
if (body_len != CAS_CONNECTION_REPLY_SIZE)
{
err_code = CCI_ER_COMMUNICATION;
goto connect_srv_error;
}
}
else if (hm_broker_understand_the_protocol (broker_ver, PROTOCOL_V3))
{
if (body_len != CAS_CONNECTION_REPLY_SIZE_V3)
{
err_code = CCI_ER_COMMUNICATION;
goto connect_srv_error;
}
}
else
{
if (body_len != CAS_CONNECTION_REPLY_SIZE_PRIOR_PROTOCOL_V3)
{
err_code = CCI_ER_COMMUNICATION;
goto connect_srv_error;
}
}
if (hm_broker_understand_the_protocol (broker_ver, PROTOCOL_V4))
{
con_handle->cas_id = ntohl (*(int *) p);
p += CAS_PID_SIZE;
}
else
{
con_handle->cas_id = -1;
}
if (hm_broker_understand_the_protocol (broker_ver, PROTOCOL_V3))
{
memcpy (con_handle->session_id.id, p, DRIVER_SESSION_SIZE);
}
else
{
memcpy (con_handle->session_id.id, p, SESSION_ID_SIZE);
*(unsigned int *) con_handle->session_id.id =
ntohl (*(unsigned int *) con_handle->session_id.id);
}
FREE_MEM (msg_buf);
con_handle->sock_fd = srv_sock_fd;
con_handle->alter_host_id = host_id;
if (con_handle->alter_host_count > 0)
{
con_handle->is_retry = 0;
}
else
{
con_handle->is_retry = 1;
}
if (con_handle->isolation_level > TRAN_UNKNOWN_ISOLATION)
{
qe_set_db_parameter (con_handle, CCI_PARAM_ISOLATION_LEVEL,
&(con_handle->isolation_level), err_buf);
}
if (con_handle->lock_timeout != CCI_LOCK_TIMEOUT_DEFAULT)
{
qe_set_db_parameter (con_handle, CCI_PARAM_LOCK_TIMEOUT,
&(con_handle->lock_timeout), err_buf);
}
return CCI_ER_NO_ERROR;
connect_srv_error:
CLOSE_SOCKET (srv_sock_fd);
return err_code;
}
static int
net_cancel_request_internal (unsigned char *ip_addr, int port,
char *msg, int msglen)
{
SOCKET srv_sock_fd;
int err_code;
if (connect_srv (ip_addr, port, 0, &srv_sock_fd, 0) < 0)
{
return CCI_ER_CONNECT;
}
if (net_send_stream (srv_sock_fd, msg, msglen) < 0)
{
err_code = CCI_ER_COMMUNICATION;
goto cancel_error;
}
if (net_recv_stream (srv_sock_fd, port, (char *) &err_code, 4, 0) < 0)
{
err_code = CCI_ER_COMMUNICATION;
goto cancel_error;
}
err_code = ntohl (err_code);
if (err_code < 0)
goto cancel_error;
CLOSE_SOCKET (srv_sock_fd);
return CCI_ER_NO_ERROR;
cancel_error:
CLOSE_SOCKET (srv_sock_fd);
return err_code;
}
static int
net_cancel_request_w_local_port (unsigned char *ip_addr, int port, int pid,
unsigned short local_port)
{
char msg[10];
memset (msg, 0, sizeof (msg));
strcpy (msg, "QC");
pid = htonl (pid);
memcpy (msg + 2, (char *) &pid, 4);
local_port = htons (local_port);
memcpy (msg + 6, (char *) &local_port, 2);
return net_cancel_request_internal (ip_addr, port, msg, sizeof (msg));
}
static int
net_cancel_request_wo_local_port (unsigned char *ip_addr, int port, int pid)
{
char msg[10];
memset (msg, 0, sizeof (msg));
strcpy (msg, "CANCEL");
pid = htonl (pid);
memcpy (msg + 6, (char *) &pid, 4);
return net_cancel_request_internal (ip_addr, port, msg, sizeof (msg));
}
static int
net_cancel_request_ex (unsigned char *ip_addr, int port, int pid)
{
char msg[10];
msg[0] = 'X';
msg[1] = '1';
msg[2] = CAS_CLIENT_CCI;
msg[3] = 0;
msg[4] = 0;
msg[5] = 0;
pid = htonl (pid);
memcpy (msg + 6, (char *) &pid, 4);
return net_cancel_request_internal (ip_addr, port, msg, sizeof (msg));
}
int
net_cancel_request (T_CON_HANDLE * con_handle)
{
struct sockaddr_in local_sockaddr;
socklen_t local_sockaddr_len;
unsigned short local_port = 0;
int error;
int broker_port;
T_BROKER_VERSION broker_ver;
if (con_handle->alter_host_id < 0)
{
broker_port = con_handle->port;
}
else
{
broker_port = con_handle->alter_hosts[con_handle->alter_host_id].port;
}
broker_ver = hm_get_broker_version (con_handle);
if (hm_broker_understand_the_protocol (broker_ver, PROTOCOL_V4))
{
return net_cancel_request_ex (con_handle->ip_addr, broker_port,
con_handle->cas_pid);
}
else if (hm_broker_understand_the_protocol (broker_ver, PROTOCOL_V1))
{
local_sockaddr_len = sizeof (local_sockaddr);
error = getsockname (con_handle->sock_fd,
(struct sockaddr *) &local_sockaddr,
&local_sockaddr_len);
if (error == 0)
{
local_port = ntohs (local_sockaddr.sin_port);
}
return net_cancel_request_w_local_port (con_handle->ip_addr,
broker_port,
con_handle->cas_pid,
local_port);
}
else
{
return net_cancel_request_wo_local_port (con_handle->ip_addr,
broker_port,
con_handle->cas_pid);
}
}
int
net_check_cas_request (T_CON_HANDLE * con_handle)
{
char msg[9];
MSG_HEADER msg_header;
int data_size;
int ret_value = -1;
char status[4];
int broker_port;
if (con_handle->alter_host_id < 0)
{
broker_port = con_handle->port;
}
else
{
broker_port = con_handle->alter_hosts[con_handle->alter_host_id].port;
}
if (IS_INVALID_SOCKET (con_handle->sock_fd))
return 0;
API_SLOG (con_handle);
init_msg_header (&msg_header);
data_size = 1;
data_size = htonl (data_size);
memcpy (msg, (char *) &data_size, 4);
/* just send con->cas_info to cas for debuging */
msg[4] = con_handle->cas_info[CAS_INFO_STATUS];
msg[5] = con_handle->cas_info[CAS_INFO_RESERVED_1];
msg[6] = con_handle->cas_info[CAS_INFO_RESERVED_2];
msg[7] = con_handle->cas_info[CAS_INFO_ADDITIONAL_FLAG];
msg[8] = CAS_FC_CHECK_CAS;
if (net_send_stream (con_handle->sock_fd, msg, sizeof (msg)) < 0)
{
API_ELOG (con_handle, -1);
return -1;
}
if (net_recv_int (con_handle->sock_fd, broker_port, &ret_value) < 0)
{
API_ELOG (con_handle, -2);
return -1;
}
if (net_recv_stream (con_handle->sock_fd, broker_port, status, 4, 0) < 0)
{
API_ELOG (con_handle, -3);
return -1;
}
con_handle->cas_info[CAS_INFO_STATUS] = status[0];
con_handle->cas_info[CAS_INFO_RESERVED_1] = status[1];
con_handle->cas_info[CAS_INFO_RESERVED_2] = status[2];
con_handle->cas_info[CAS_INFO_ADDITIONAL_FLAG] = status[3];
API_ELOG (con_handle, ret_value);
return ret_value;
}
int
net_send_msg (T_CON_HANDLE * con_handle, char *msg, int size)
{
MSG_HEADER send_msg_header;
int err;
struct timeval ts, te;
init_msg_header (&send_msg_header);
*(send_msg_header.msg_body_size_ptr) = size;
memcpy (send_msg_header.info_ptr, con_handle->cas_info,
MSG_HEADER_INFO_SIZE);
/* send msg header */
if (con_handle->log_trace_network)
{
gettimeofday (&ts, NULL);
}
err = net_send_msg_header (con_handle->sock_fd, &send_msg_header);
if (con_handle->log_trace_network)
{
long elapsed;
gettimeofday (&te, NULL);
elapsed = ut_timeval_diff_msec (&ts, &te);
CCI_LOGF_DEBUG (con_handle->logger, "[NET][W][H][S:%d][E:%d][T:%d]",
MSG_HEADER_SIZE, err, elapsed);
}
if (err < 0)
{
return CCI_ER_COMMUNICATION;
}
if (con_handle->log_trace_network)
{
gettimeofday (&ts, NULL);
}
err = net_send_stream (con_handle->sock_fd, msg, size);
if (con_handle->log_trace_network)
{
long elapsed;
gettimeofday (&te, NULL);
elapsed = ut_timeval_diff_msec (&ts, &te);
CCI_LOGF_DEBUG (con_handle->logger, "[NET][W][B][S:%d][E:%d][T:%d]",
size, err, elapsed);
}
if (err < 0)
{
return CCI_ER_COMMUNICATION;
}
return 0;
}
int
net_recv_msg_timeout (T_CON_HANDLE * con_handle, char **msg, int *msg_size,
T_CCI_ERROR * err_buf, int timeout)
{
char *tmp_p = NULL;
MSG_HEADER recv_msg_header;
int result_code = 0;
struct timeval ts, te;
int broker_port;
if (con_handle->alter_host_id < 0)
{
broker_port = con_handle->port;
}
else
{
broker_port = con_handle->alter_hosts[con_handle->alter_host_id].port;
}
init_msg_header (&recv_msg_header);
if (msg)
{
*msg = NULL;
}
if (msg_size)
{
*msg_size = 0;
}
if (con_handle->log_trace_network)
{
gettimeofday (&ts, NULL);
}
result_code =
net_recv_msg_header (con_handle->sock_fd, broker_port,
&recv_msg_header, timeout);
if (con_handle->log_trace_network)
{
long elapsed;
gettimeofday (&te, NULL);
elapsed = ut_timeval_diff_msec (&ts, &te);
CCI_LOGF_DEBUG (con_handle->logger, "[NET][R][H][S:%d][E:%d][T:%d]",
MSG_HEADER_SIZE, result_code, elapsed);
}
if (result_code < 0)
{
if (result_code == CCI_ER_QUERY_TIMEOUT)
{
/* send cancel message */
net_cancel_request (con_handle);
if (con_handle->disconnect_on_query_timeout == false)
{
result_code =
net_recv_msg_header (con_handle->sock_fd, broker_port,
&recv_msg_header, 0);
}
}
if (result_code < 0)
{
goto error_return;
}
}
memcpy (con_handle->cas_info, recv_msg_header.info_ptr,
MSG_HEADER_INFO_SIZE);
if (con_handle->cas_info[CAS_INFO_STATUS] == CAS_INFO_STATUS_INACTIVE)
{
con_handle->con_status = CCI_CON_STATUS_OUT_TRAN;
}
else
{
con_handle->con_status = CCI_CON_STATUS_IN_TRAN;
}
if (*(recv_msg_header.msg_body_size_ptr) > 0)
{
tmp_p = (char *) MALLOC (*(recv_msg_header.msg_body_size_ptr));
if (tmp_p == NULL)
{
result_code = CCI_ER_NO_MORE_MEMORY;
goto error_return;
}
if (con_handle->log_trace_network)
{
gettimeofday (&ts, NULL);
}
result_code = net_recv_stream (con_handle->sock_fd, broker_port, tmp_p,
*(recv_msg_header.msg_body_size_ptr),
timeout);
if (con_handle->log_trace_network)
{
long elapsed;
gettimeofday (&te, NULL);
elapsed = ut_timeval_diff_msec (&ts, &te);
CCI_LOGF_DEBUG (con_handle->logger, "[NET][R][B][S:%d][E:%d][T:%d]",
*(recv_msg_header.msg_body_size_ptr), result_code,
elapsed);
}
if (result_code < 0)
{
goto error_return;
}
memcpy ((char *) &result_code, tmp_p + CAS_PROTOCOL_ERR_INDICATOR_INDEX,
CAS_PROTOCOL_ERR_INDICATOR_SIZE);
result_code = ntohl (result_code);
if (result_code < 0)
{
int err_code = 0;
int err_msg_size;
memcpy ((char *) &err_code, tmp_p + CAS_PROTOCOL_ERR_CODE_INDEX,
CAS_PROTOCOL_ERR_CODE_SIZE);
err_code = ntohl (err_code);
if (result_code == DBMS_ERROR_INDICATOR)
{
err_msg_size = *(recv_msg_header.msg_body_size_ptr) -
(CAS_PROTOCOL_ERR_INDICATOR_SIZE +
CAS_PROTOCOL_ERR_CODE_SIZE);
if (hm_broker_reconnect_when_server_down (con_handle)
&& (con_handle->cas_info[CAS_INFO_ADDITIONAL_FLAG]
& CAS_INFO_FLAG_MASK_NEW_SESSION_ID))
{
char *p;
p = tmp_p + CAS_PROTOCOL_ERR_MSG_INDEX + err_msg_size -
DRIVER_SESSION_SIZE;
memcpy (con_handle->session_id.id, p, DRIVER_SESSION_SIZE);
err_msg_size -= DRIVER_SESSION_SIZE;
}
if (err_buf)
{
memcpy (err_buf->err_msg,
tmp_p + CAS_PROTOCOL_ERR_MSG_INDEX, err_msg_size);
err_buf->err_code = err_code;
}
err_code = CCI_ER_DBMS;
}
FREE_MEM (tmp_p);
return err_code;
}
}
if (msg)
{
*msg = tmp_p;
}
else
{
FREE_MEM (tmp_p);
}
if (msg_size)
{
*msg_size = *(recv_msg_header.msg_body_size_ptr);
}
if (con_handle->cas_info[CAS_INFO_STATUS] == CAS_INFO_STATUS_INACTIVE
&& con_handle->broker_info[BROKER_INFO_KEEP_CONNECTION] ==
CAS_KEEP_CONNECTION_OFF)
{
CLOSE_SOCKET (con_handle->sock_fd);
con_handle->sock_fd = INVALID_SOCKET;
}
return result_code;
error_return:
FREE_MEM (tmp_p);
CLOSE_SOCKET (con_handle->sock_fd);
con_handle->sock_fd = INVALID_SOCKET;
return result_code;
}
int
net_recv_msg (T_CON_HANDLE * con_handle, char **msg, int *msg_size,
T_CCI_ERROR * err_buf)
{
return net_recv_msg_timeout (con_handle, msg, msg_size, err_buf, 0);
}
bool
net_peer_alive (unsigned char *ip_addr, int port, int timeout_msec)
{
SOCKET sock_fd;
int ret, dummy;
const char *ping_msg = "PING_TEST!";
if (connect_srv (ip_addr, port, 0, &sock_fd, timeout_msec) !=
CCI_ER_NO_ERROR)
{
CLOSE_SOCKET (sock_fd);
return false;
}
send_again:
ret = WRITE_TO_SOCKET (sock_fd, ping_msg, strlen (ping_msg));
if (ret < 0)
{
if (errno == EAGAIN)
{
SLEEP_MILISEC (0, 1);
goto send_again;
}
else
{
CLOSE_SOCKET (sock_fd);
return false;
}
}
recv_again:
ret = READ_FROM_SOCKET (sock_fd, (char *) &dummy, sizeof (int));
if (ret < 0)
{
if (errno == EAGAIN)
{
SLEEP_MILISEC (0, 1);
goto recv_again;
}
else
{
CLOSE_SOCKET (sock_fd);
return false;
}
}
CLOSE_SOCKET (sock_fd);
return true;
}
bool
net_check_broker_alive (unsigned char *ip_addr, int port, int timeout_msec)
{
SOCKET sock_fd;
MSG_HEADER msg_header;
char client_info[SRV_CON_CLIENT_INFO_SIZE];
char db_info[SRV_CON_DB_INFO_SIZE];
char db_name[SRV_CON_DBNAME_SIZE];
char url[SRV_CON_URL_SIZE];
char *info, *msg_buf;
int err_code, ret_value;
struct timeval start_time, end_time;
long elapsed_time_msec;
bool result = false;
init_msg_header (&msg_header);
memset (client_info, 0, sizeof (client_info));
memset (db_info, 0, sizeof (db_info));
strncpy (client_info, SRV_CON_CLIENT_MAGIC_STR, SRV_CON_CLIENT_MAGIC_LEN);
client_info[SRV_CON_MSG_IDX_CLIENT_TYPE] = cci_client_type;
client_info[SRV_CON_MSG_IDX_PROTO_VERSION] = CAS_PROTO_PACK_CURRENT_NET_VER;
client_info[SRV_CON_MSG_IDX_FUNCTION_FLAG] = 0;
client_info[SRV_CON_MSG_IDX_RESERVED2] = 0;
snprintf (db_name, SRV_CON_DBNAME_SIZE, HEALTH_CHECK_DUMMY_DB);
snprintf (url, SRV_CON_URL_SIZE,
"cci:cubrid:%s:%d:%s::********:", ip_addr, port, db_name);
info = db_info;
strncpy (info, db_name, SRV_CON_DBNAME_SIZE - 1);
info += (SRV_CON_DBNAME_SIZE + SRV_CON_DBUSER_SIZE + SRV_CON_DBPASSWD_SIZE);
strncpy (info, url, SRV_CON_URL_SIZE - 1);
if (connect_srv (ip_addr, port, 0, &sock_fd, timeout_msec) < 0)
{
return false;
}
if (net_send_stream (sock_fd, client_info, SRV_CON_CLIENT_INFO_SIZE) < 0)
{
goto finish_health_check;
}
ret_value = net_recv_stream (sock_fd, port, (char *) &err_code, 4,
timeout_msec);
if (ret_value < 0)
{
goto finish_health_check;
}
err_code = ntohl (err_code);
if (err_code < 0)
{
goto finish_health_check;
}
if (net_send_stream (sock_fd, db_info, SRV_CON_DB_INFO_SIZE) < 0)
{
goto finish_health_check;
}
if (net_recv_msg_header (sock_fd, port, &msg_header, timeout_msec) < 0)
{
goto finish_health_check;
}
result = true;
finish_health_check:
CLOSE_SOCKET (sock_fd);
return result;
}
#if defined (ENABLE_UNUSED_FUNCTION)
int
net_send_file (SOCKET sock_fd, char *filename, int filesize)
{
int remain_size = filesize;
int fd;
char read_buf[1024];
int read_len;
fd = open (filename, O_RDONLY);
if (fd < 0)
{
return CCI_ER_FILE;
}
#if defined(WINDOWS)
setmode (fd, O_BINARY);
#endif
while (remain_size > 0)
{
read_len = (int) read (fd, read_buf,
(int) MIN (remain_size, SSIZEOF (read_buf)));
if (read_len < 0)
{
close (fd);
return CCI_ER_FILE;
}
if (net_send_stream (sock_fd, read_buf, read_len) < 0)
{
close (fd);
return CCI_ER_FILE;
}
remain_size -= read_len;
}
close (fd);
return 0;
}
int
net_recv_file (SOCKET sock_fd, int port, int file_size, int out_fd)
{
int read_len;
char read_buf[1024];
while (file_size > 0)
{
read_len = (int) MIN (file_size, SSIZEOF (read_buf));
if (net_recv_stream (sock_fd, port, read_buf, read_len, 0) < 0)
{
return CCI_ER_COMMUNICATION;
}
write (out_fd, read_buf, read_len);
file_size -= read_len;
}
return 0;
}
#endif
/************************************************************************
* IMPLEMENTATION OF PRIVATE FUNCTIONS *
************************************************************************/
#if defined(ENABLE_UNUSED_FUNCTION)
static int
net_send_int (SOCKET sock_fd, int value)
{
value = htonl (value);
return (net_send_stream (sock_fd, (char *) &value, 4));
}
#endif
static int
net_recv_int (SOCKET sock_fd, int port, int *value)
{
int read_value;
if (net_recv_stream (sock_fd, port, (char *) &read_value, 4, 0) < 0)
{
return CCI_ER_COMMUNICATION;
}
read_value = ntohl (read_value);
*value = read_value;
return 0;
}
static int
net_recv_stream (SOCKET sock_fd, int port, char *buf, int size, int timeout)
{
int read_len, tot_read_len = 0;
#if defined(WINDOWS)
fd_set rfds;
struct timeval tv;
#else
struct pollfd po[1] = { {0, 0, 0} };
int polling_timeout;
#endif
int n;
while (tot_read_len < size)
{
#if defined(WINDOWS)
FD_ZERO (&rfds);
FD_SET (sock_fd, &rfds);
if (timeout <= 0 || timeout > SOCKET_TIMEOUT)
{
tv.tv_sec = SOCKET_TIMEOUT / 1000;
tv.tv_usec = (SOCKET_TIMEOUT % 1000) * 1000;
}
else
{
tv.tv_sec = timeout / 1000;
tv.tv_usec = (timeout % 1000) * 1000;
}
n = select (sock_fd + 1, &rfds, NULL, NULL, &tv);
#else
po[0].fd = sock_fd;
po[0].events = POLLIN;
if (timeout <= 0 || timeout > SOCKET_TIMEOUT)
{
polling_timeout = SOCKET_TIMEOUT;
}
else
{
polling_timeout = timeout;
}
n = poll (po, 1, polling_timeout);
#endif
if (n == 0)
{
/* select / poll return time out */
if (timeout > 0)
{
timeout -= SOCKET_TIMEOUT;
if (timeout <= 0)
{
assert (tot_read_len == 0 || size == tot_read_len);
return CCI_ER_QUERY_TIMEOUT;
}
else
{
continue;
}
}
if (net_peer_socket_alive (sock_fd, port, SOCKET_TIMEOUT) == true)
{
continue;
}
else
{
return CCI_ER_COMMUNICATION;
}
}
else if (n < 0)
{
/* select / poll return error */
if (errno == EINTR)
{
continue;
}
return CCI_ER_COMMUNICATION;
}
#if !defined (WINDOWS)
else if (po[0].revents & POLLERR || po[0].revents & POLLHUP)
{
po[0].revents = 0;
return CCI_ER_COMMUNICATION;
}
#endif /* !WINDOWS */
read_len = READ_FROM_SOCKET (sock_fd, buf + tot_read_len,
size - tot_read_len);
if (read_len <= 0)
{
return CCI_ER_COMMUNICATION;
}
tot_read_len += read_len;
}
return 0;
}
static bool
net_peer_socket_alive (SOCKET sd, int port, int timeout_msec)
{
unsigned char ip_addr[4];
struct sockaddr_in saddr;
socklen_t slen;
slen = sizeof (saddr);
if (getpeername (sd, (struct sockaddr *) &saddr, &slen) < 0)
{
return false;
}
/* if Unix domain socket, the peer(=local) is alive always */
if (saddr.sin_family != AF_INET)
{
return true;
}
memcpy (ip_addr, &saddr.sin_addr, 4);
return net_peer_alive (ip_addr, port, timeout_msec);
}
static int
net_recv_msg_header (SOCKET sock_fd, int port, MSG_HEADER * header,
int timeout)
{
int result_code;
result_code =
net_recv_stream (sock_fd, port, header->buf, MSG_HEADER_SIZE, timeout);
if (result_code < 0)
{
return result_code;
}
*(header->msg_body_size_ptr) = ntohl (*(header->msg_body_size_ptr));
if ((*header->msg_body_size_ptr) < 0)
{
return CCI_ER_COMMUNICATION;
}
return 0;
}
static int
net_send_msg_header (SOCKET sock_fd, MSG_HEADER * header)
{
*(header->msg_body_size_ptr) = htonl (*(header->msg_body_size_ptr));
if (net_send_stream (sock_fd, header->buf, MSG_HEADER_SIZE) < 0)
{
return CCI_ER_COMMUNICATION;
}
return 0;
}
static int
net_send_stream (SOCKET sock_fd, char *msg, int size)
{
int write_len;
while (size > 0)
{
write_len = WRITE_TO_SOCKET (sock_fd, msg, size);
if (write_len <= 0)
{
return CCI_ER_COMMUNICATION;
}
msg += write_len;
size -= write_len;
}
return 0;
}
static void
init_msg_header (MSG_HEADER * header)
{
header->msg_body_size_ptr = (int *) (header->buf);
header->info_ptr = (char *) (header->buf + MSG_HEADER_MSG_SIZE);
*(header->msg_body_size_ptr) = 0;
header->info_ptr[0] = CAS_INFO_STATUS_ACTIVE;
header->info_ptr[1] = CAS_INFO_RESERVED_DEFAULT;
header->info_ptr[2] = CAS_INFO_RESERVED_DEFAULT;
header->info_ptr[3] = CAS_INFO_RESERVED_DEFAULT;
}
static int
connect_srv (unsigned char *ip_addr, int port, char is_retry,
SOCKET * ret_sock, int login_timeout)
{
struct sockaddr_in sock_addr;
SOCKET sock_fd;
int sock_addr_len;
int one = 1;
int retry_count = 0;
int con_retry_count;
int ret;
int sock_flags;
#if defined (WINDOWS)
struct timeval timeout_val;
fd_set rset, wset, eset;
#else
int error, len;
int flags;
struct pollfd po[1] = { {0, 0, 0} };
#endif
con_retry_count = (is_retry) ? 10 : 0;
connect_retry:
#if defined (WINDOWS)
timeout_val.tv_sec = login_timeout / 1000;
timeout_val.tv_usec = (login_timeout % 1000) * 1000; /* micro second */
#endif
sock_fd = socket (AF_INET, SOCK_STREAM, 0);
if (IS_INVALID_SOCKET (sock_fd))
{
return CCI_ER_CONNECT;
}
memset (&sock_addr, 0, sizeof (struct sockaddr_in));
sock_addr.sin_family = AF_INET;
sock_addr.sin_port = htons ((unsigned short) port);
memcpy (&sock_addr.sin_addr, ip_addr, 4);
sock_addr_len = sizeof (struct sockaddr_in);
#if defined (WINDOWS)
if (ioctlsocket (sock_fd, FIONBIO, (u_long *) & one) < 0)
{
CLOSE_SOCKET (sock_fd);
return CCI_ER_CONNECT;
}
#else
flags = (sock_fd, F_GETFL);
fcntl (sock_fd, F_SETFL, flags | O_NONBLOCK);
#endif
ret = connect (sock_fd, (struct sockaddr *) &sock_addr, sock_addr_len);
if (ret < 0)
{
#if defined (WINDOWS)
if (WSAGetLastError () == WSAEWOULDBLOCK)
#else
if (errno == EINPROGRESS)
#endif
{
#if defined (WINDOWS)
FD_ZERO (&rset);
FD_ZERO (&wset);
FD_ZERO (&eset);
FD_SET (sock_fd, &rset);
FD_SET (sock_fd, &wset);
FD_SET (sock_fd, &eset);
ret = select (sock_fd + 1, &rset, &wset, &eset,
((login_timeout == 0) ? NULL : &timeout_val));
#else
po[0].fd = sock_fd;
po[0].events = POLLOUT;
po[0].revents = 0;
if (login_timeout == 0)
{
login_timeout = -1;
}
ret = poll (po, 1, login_timeout);
#endif
if (ret == 0)
{
CLOSE_SOCKET (sock_fd);
return CCI_ER_LOGIN_TIMEOUT;
}
else if (ret < 0)
{
CLOSE_SOCKET (sock_fd);
if (retry_count < con_retry_count)
{
retry_count++;
SLEEP_MILISEC (0, 100);
if (login_timeout > 0)
{
login_timeout -= 100;
if (login_timeout <= 0)
{
return CCI_ER_LOGIN_TIMEOUT;
}
}
goto connect_retry;
}
return CCI_ER_CONNECT;
}
else
{
#if defined (WINDOWS)
if (FD_ISSET (sock_fd, &eset))
{
CLOSE_SOCKET (sock_fd);
return CCI_ER_CONNECT;
}
#else
if (po[0].revents & POLLERR || po[0].revents & POLLHUP)
{
CLOSE_SOCKET (sock_fd);
return CCI_ER_CONNECT;
}
#if defined (AIX)
error = 0;
len = sizeof (error);
getsockopt (sock_fd, SOL_SOCKET, SO_ERROR, &error, &len);
if (error != 0 && error != EISCONN)
{
CLOSE_SOCKET (sock_fd);
return CCI_ER_CONNECT;
}
#endif
#endif
}
}
else
{
CLOSE_SOCKET (sock_fd);
if (retry_count < con_retry_count)
{
retry_count++;
SLEEP_MILISEC (0, 100);
if (login_timeout > 0)
{
login_timeout -= 100;
if (login_timeout <= 0)
{
return CCI_ER_LOGIN_TIMEOUT;
}
}
goto connect_retry;
}
return CCI_ER_CONNECT;
}
}
#if defined (WINDOWS)
one = 0;
if (ioctlsocket (sock_fd, FIONBIO, (u_long *) & one) < 0)
{
CLOSE_SOCKET (sock_fd);
return CCI_ER_CONNECT;
}
#else
fcntl (sock_fd, F_SETFL, flags);
#endif
setsockopt (sock_fd, IPPROTO_TCP, TCP_NODELAY, (char *) &one, sizeof (one));
*ret_sock = sock_fd;
return CCI_ER_NO_ERROR;
}