diff --git a/src/axl/CMakeLists.txt b/src/axl/CMakeLists.txt index dde1636f..ccb86700 100644 --- a/src/axl/CMakeLists.txt +++ b/src/axl/CMakeLists.txt @@ -1,5 +1,6 @@ +# TODO: Separate PTHREADS from SOCKETS IF(ENABLE_PTHREADS) - LIST(APPEND axl_optional_srcs axl_pthread.c) + LIST(APPEND axl_optional_srcs axl_pthread.c axl_socket.c) ENDIF(ENABLE_PTHREADS) IF(BBAPI_FOUND) @@ -10,23 +11,9 @@ IF(HAVE_DATAWARP) LIST(APPEND axl_optional_srcs axl_async_datawarp.c) ENDIF(HAVE_DATAWARP) -LIST(APPEND libaxl_srcs - axl.c - axl_sync.c - axl_err.c - axl_io.c - axl_util.c - ${axl_optional_srcs} -) - -LIST(APPEND libaxl_serial_srcs - axl.c - axl_sync.c - axl_err.c - axl_io.c - axl_util.c - ${axl_optional_srcs} -) +LIST(APPEND libaxl_srcs axl.c axl_sync.c axl_err.c axl_io.c axl_util.c ${axl_optional_srcs}) + +LIST(APPEND libaxl_serial_srcs axl.c axl_sync.c axl_err.c axl_io.c axl_util.c ${axl_optional_srcs}) IF(MPI_C_FOUND) LIST(APPEND libaxl_srcs axl_mpi.c) @@ -40,6 +27,14 @@ IF(BUILD_SHARED_LIBS) SET_PROPERTY(TARGET axl_serial_o PROPERTY POSITION_INDEPENDENT_CODE 1) ENDIF() +# TODO: Separate PTHREADS from SOCKETS +IF(HAVE_PTHREADS) + ADD_EXECUTABLE(axl_socket_daemon axl_socket_daemon.c) + TARGET_LINK_LIBRARIES(axl_socket_daemon axl_o kvtree_o ${SCR_EXTERNAL_LIBS}) + INSTALL(PROGRAMS ${CMAKE_CURRENT_BINARY_DIR}/axl_socket_daemon DESTINATION ${CMAKE_INSTALL_LIBEXECDIR}) +ENDIF(HAVE_PTHREADS) + + IF(ENABLE_TESTS) ADD_SUBDIRECTORY(test) ENDIF(ENABLE_TESTS) diff --git a/src/axl/axl.c b/src/axl/axl.c index f674e746..a07c0a9f 100644 --- a/src/axl/axl.c +++ b/src/axl/axl.c @@ -30,6 +30,9 @@ /* xfer methods */ #include "axl_sync.h" +/* (Optional) service functions */ +#include "axl_socket.h" + #ifdef HAVE_PTHREADS #include "axl_pthread.h" #endif /* HAVE_PTHREAD */ @@ -75,18 +78,19 @@ int axl_copy_metadata; /* global rank of calling process, used for BBAPI */ int axl_rank = -1; +/* + * If we are NOT running as a server, use axl_client_xfer_list to contain + * our list of transfer items fro AXL Create. Otherwise, the server will + * change the axl_xfer_list pointer for each connection it is servicing. + */ +static struct axl_transfer_array axl_client_xfer_list = { + .axl_kvtrees = NULL, .axl_kvtrees_count = 0 +}; +struct axl_transfer_array* axl_xfer_list = &axl_client_xfer_list; + /* reference count for number of times AXL_Init has been called */ static unsigned int axl_init_count = 0; -/* Array for all the AXL_Create'd kvtree pointers. It's indexed by the AXL id. - * - * Note: We only expand this array, we never shrink it. This is fine since - * the user is only going to be calling AXL_Create() a handful of times. It - * also simplifies the code if we never shrink it, and the extra memory usage - * is negligible, if any at all. */ -kvtree** axl_kvtrees; -static unsigned int axl_kvtrees_count = 0; - #ifdef HAVE_BBAPI static int bbapi_is_loaded = 0; #endif @@ -110,11 +114,11 @@ static int axl_alloc_id(const char* state_file) kvtree_util_set_str(new, AXL_KEY_STATE_FILE, state_file); } - int id = axl_kvtrees_count; - axl_kvtrees_count++; + int id = axl_xfer_list->axl_kvtrees_count; + axl_xfer_list->axl_kvtrees_count++; - axl_kvtrees = realloc(axl_kvtrees, sizeof(struct kvtree*) * axl_kvtrees_count); - axl_kvtrees[id] = new; + axl_xfer_list->axl_kvtrees = realloc(axl_xfer_list->axl_kvtrees, sizeof(struct kvtree*) * axl_xfer_list->axl_kvtrees_count); + axl_xfer_list->axl_kvtrees[id] = new; return id; } @@ -122,7 +126,7 @@ static int axl_alloc_id(const char* state_file) /* Remove the state file for an id, if one exists */ static void axl_remove_state_file(int id) { - kvtree* file_list = axl_kvtrees[id]; + kvtree* file_list = axl_xfer_list->axl_kvtrees[id]; char* state_file = NULL; if (kvtree_util_get_str(file_list, AXL_KEY_STATE_FILE, &state_file) == KVTREE_SUCCESS) @@ -136,15 +140,15 @@ static void axl_free_id(int id) { axl_remove_state_file(id); - /* kvtree_delete() will set axl_kvtrees[id] = NULL */ - kvtree_delete(&axl_kvtrees[id]); + /* kvtree_delete() will set axl_xfer_list->axl_kvtrees[id] = NULL */ + kvtree_delete(&axl_xfer_list->axl_kvtrees[id]); } /* If the user specified a state_file then write our kvtree to it. If not, then * do nothing. */ void axl_write_state_file(int id) { - kvtree* file_list = axl_kvtrees[id]; + kvtree* file_list = axl_xfer_list->axl_kvtrees[id]; char* state_file = NULL; if (kvtree_util_get_str(file_list, AXL_KEY_STATE_FILE, &state_file) == KVTREE_SUCCESS) @@ -163,7 +167,7 @@ static int axl_get_info(int id, kvtree** list, axl_xfer_t* type, axl_xfer_state_ *state = AXL_XFER_STATE_NULL; /* lookup transfer info for the given id */ - kvtree* file_list = axl_kvtrees[id]; + kvtree* file_list = axl_xfer_list->axl_kvtrees[id]; if (file_list == NULL) { AXL_ERR("Could not find fileset for UID %d", id); return AXL_FAILURE; @@ -260,6 +264,13 @@ int AXL_Init (void) axl_make_directories = atoi(val); } + /* If the user has set both the AXL_SERVICE_HOST and AXL_SERVICE_PORT + * environment variables, then they are expecting to use the AXL Service + * rather than the library included with the SCR library. + */ + char* axl_service_host = NULL; + int axl_service_port = -1; + /* initialize our flag on whether to first copy files to temporary names with extension */ axl_use_extension = 0; val = getenv("AXL_USE_EXTENSION"); @@ -275,7 +286,26 @@ int AXL_Init (void) } /* keep a reference count to free memory on last AXL_Finalize */ - axl_init_count++; + if (axl_init_count++ == 0) { + /* + * If we are not running as the AXL Server, check to see if we are + * expected to run as a client and then connect if so. + */ + if (axl_service_mode != AXL_SOCKET_SERVER) { + if ( (val = getenv("AXL_SERVICE_HOST")) != NULL) { + axl_service_host = strdup(val); + + if ( (val = getenv("AXL_SERVICE_PORT")) != NULL) { + axl_service_port = atoi(val); + + if (axl_socket_client_init(axl_service_host, (unsigned short)axl_service_port)) { + axl_service_mode = AXL_SOCKET_CLIENT; + } + } + free(axl_service_host); + } + } + } return rc; } @@ -297,8 +327,13 @@ int AXL_Finalize (void) axl_init_count--; if (axl_init_count == 0) { /* TODO: are there cases where we also need to delete trees? */ - axl_free(&axl_kvtrees); - axl_kvtrees_count = 0; + axl_free(&axl_xfer_list->axl_kvtrees); + axl_xfer_list->axl_kvtrees_count = 0; + + if (axl_service_mode == AXL_SOCKET_CLIENT) { + axl_socket_client_AXL_Finalize(); + } + } return rc; @@ -360,7 +395,7 @@ static kvtree* AXL_Config_Set(const kvtree* config) char* endptr; long id = strtol(key, &endptr, 10); if ((*key == '\0' || *endptr != '\0') || - (id < 0 || id >= axl_kvtrees_count)) + (id < 0 || id >= axl_xfer_list->axl_kvtrees_count)) { retval = NULL; break; @@ -372,7 +407,7 @@ static kvtree* AXL_Config_Set(const kvtree* config) break; } - kvtree* file_list = axl_kvtrees[id]; + kvtree* file_list = axl_xfer_list->axl_kvtrees[id]; const char** opt; for (opt = known_transfer_options; *opt != NULL; opt++) { @@ -462,6 +497,10 @@ static kvtree* AXL_Config_Set(const kvtree* config) } } + if (axl_service_mode == AXL_SOCKET_CLIENT) { + axl_socket_client_AXL_Config_Set(config); + } + return retval; } @@ -502,8 +541,8 @@ static kvtree* AXL_Config_Get() /* per transfer options */ int id; - for (id = 0; id < axl_kvtrees_count; id++) { - kvtree* file_list = axl_kvtrees[id]; + for (id = 0; id < axl_xfer_list->axl_kvtrees_count; id++) { + kvtree* file_list = axl_xfer_list->axl_kvtrees[id]; if (file_list == NULL) { /* TODO: check if it would be better to return an empty hash instead */ continue; @@ -613,7 +652,7 @@ int AXL_Create(axl_xfer_t xtype, const char* name, const char* state_file) return -1; } - kvtree* file_list = axl_kvtrees[id]; + kvtree* file_list = axl_xfer_list->axl_kvtrees[id]; kvtree_util_set_int(file_list, AXL_KEY_XFER_TYPE, xtype); kvtree_util_set_str(file_list, AXL_KEY_UNAME, name); if (!reload_from_state_file) { @@ -1521,8 +1560,8 @@ int AXL_Stop () /* cancel each active id */ int id; - for (id = 0; id < axl_kvtrees_count; id++) { - if (!axl_kvtrees[id]) { + for (id = 0; id < axl_xfer_list->axl_kvtrees_count; id++) { + if (!axl_xfer_list->axl_kvtrees[id]) { continue; } @@ -1532,8 +1571,8 @@ int AXL_Stop () } /* wait */ - for (id = 0; id < axl_kvtrees_count; id++) { - if (!axl_kvtrees[id]) { + for (id = 0; id < axl_xfer_list->axl_kvtrees_count; id++) { + if (!axl_xfer_list->axl_kvtrees[id]) { continue; } @@ -1543,8 +1582,8 @@ int AXL_Stop () } /* and free it */ - for (id = 0; id < axl_kvtrees_count; id++) { - if (!axl_kvtrees[id]) { + for (id = 0; id < axl_xfer_list->axl_kvtrees_count; id++) { + if (!axl_xfer_list->axl_kvtrees[id]) { continue; } diff --git a/src/axl/axl_internal.h b/src/axl/axl_internal.h index 10d97c62..361523da 100644 --- a/src/axl/axl_internal.h +++ b/src/axl/axl_internal.h @@ -17,10 +17,18 @@ /* unless otherwise indicated all global variables defined in this file must * only be accessed by the main thread */ -/* - * A list of pointers to kvtrees, indexed by AXL ID. - */ -extern kvtree** axl_kvtrees; +struct axl_transfer_array { + /* Array for all the AXL_Create'd kvtree pointers. It's indexed by the AXL id. + * + * Note: We only expand this array, we never shrink it. This is fine since + * the user is only going to be calling AXL_Create() a handful of times. It + * also simplifies the code if we never shrink it, and the extra memory usage + * is negligible, if any at all. */ + kvtree** axl_kvtrees; + unsigned int axl_kvtrees_count; +}; + +extern struct axl_transfer_array* axl_xfer_list; /* current debug level for AXL library, * set in AXL_Init and AXL_Config used in axl_dbg. @@ -63,6 +71,7 @@ extern int axl_rank; /* attaches function name, file name, and line number to error messages * https://gcc.gnu.org/onlinedocs/cpp/Variadic-Macros.html */ +#define AXL_ABORT(exitcode, format, ...) axl_abort(exitcode, format " @ %s %s:%d", ##__VA_ARGS__, __func__, __FILE__, __LINE__) #define AXL_ERR(format, ...) axl_err(format " @ %s %s:%d", ##__VA_ARGS__, __func__, __FILE__, __LINE__) #define AXL_DBG(level, format, ...) axl_dbg(level, format " @ %s %s:%d", ##__VA_ARGS__, __func__, __FILE__, __LINE__) diff --git a/src/axl/axl_mpi.c b/src/axl/axl_mpi.c index 07d9719a..f5f4cd12 100644 --- a/src/axl/axl_mpi.c +++ b/src/axl/axl_mpi.c @@ -81,9 +81,11 @@ int AXL_Finalize_comm ( return rc; } +#include + int AXL_Create_comm ( axl_xfer_t type, /**< [IN] - AXL transfer type (AXL_XFER_SYNC, AXL_XFER_PTHREAD, etc) */ - const char* name, + const char* name, const char* file, MPI_Comm comm) /**< [IN] - communicator used for coordination and flow control */ { @@ -144,59 +146,6 @@ int AXL_Dispatch_comm ( int id, /**< [IN] - transfer hander ID returned from AXL_Create */ MPI_Comm comm) /**< [IN] - communicator used for coordination and flow control */ { -#if 0 - /* lookup transfer info for the given id */ - kvtree* file_list = NULL; - axl_xfer_t xtype = AXL_XFER_NULL; - axl_xfer_state_t xstate = AXL_XFER_STATE_NULL; - if (axl_get_info(id, &file_list, &xtype, &xstate) != AXL_SUCCESS) { - AXL_ERR("Could not find transfer info for UID %d", id); - return AXL_FAILURE; - } - - /* check that handle is in correct state to dispatch */ - if (xstate != AXL_XFER_STATE_CREATED) { - AXL_ERR("Invalid state to dispatch UID %d", id); - return AXL_FAILURE; - } - kvtree_util_set_int(file_list, AXL_KEY_STATE, (int)AXL_XFER_STATE_DISPATCHED); -#endif - -#if 0 - /* create destination directories for each file */ - if (axl_make_directories) { - /* count number of files we have */ - kvtree* file_list = kvtree_get_kv_int(axl_file_lists, AXL_KEY_HANDLE_UID, id); - kvtree* files_hash = kvtree_get(file_list, AXL_KEY_FILES); - int num_files = kvtree_size(files_hash); - - /* allocate pointer for each one */ - const char** files = (const char**) AXL_MALLOC(num_files * sizeof(char*)); - - /* set pointer to each file */ - int i; - char* dest; - kvtree_elem* elem; - while ((elem = axl_get_next_path(id, elem, NULL, &dest))) { - files[i] = dest; - i++; - } - - /* create directories */ - axl_create_dirs(num_files, files, comm); - - /* free list of files */ - axl_free2(&files); - } - - /* TODO: this is hacky */ - /* delegate remaining work to regular dispatch, - * but disable mkdir since we already did that */ - int make_dir = axl_make_directories; - axl_make_directories = 0; - int rc = AXL_Dispatch(id); - axl_make_directories = make_dir; -#endif /* delegate remaining work to regular dispatch */ int rc = AXL_Dispatch(id); diff --git a/src/axl/axl_pthread.c b/src/axl/axl_pthread.c index 34ef6768..0ae64f51 100644 --- a/src/axl/axl_pthread.c +++ b/src/axl/axl_pthread.c @@ -38,7 +38,7 @@ struct axl_pthread_data /* AXL ID associated with this data */ int id; - /* AXL transfer options from axl_kvtrees */ + /* AXL transfer options from axl_xfer_list->axl_kvtrees */ kvtree* file_list; /* If resume = 1, try to resume old transfers */ @@ -135,7 +135,7 @@ struct axl_pthread_data* axl_pthread_data_lookup(int id) void axl_pthread_data_add(int id, struct axl_pthread_data* pdata) { pdata->id = id; - pdata->file_list = axl_kvtrees[id]; + pdata->file_list = axl_xfer_list->axl_kvtrees[id]; pthread_mutex_lock(&axl_all_pthread_data.lock); @@ -332,7 +332,7 @@ static int __axl_pthread_start (int id, int resume) int rc = AXL_SUCCESS; /* get pointer to file list for this dataset */ - kvtree* file_list = axl_kvtrees[id]; + kvtree* file_list = axl_xfer_list->axl_kvtrees[id]; /* mark dataset as in progress */ kvtree_util_set_int(file_list, AXL_KEY_STATUS, AXL_STATUS_INPROG); diff --git a/src/axl/axl_socket.c b/src/axl/axl_socket.c new file mode 100644 index 00000000..c5d30354 --- /dev/null +++ b/src/axl/axl_socket.c @@ -0,0 +1,293 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "axl_internal.h" +#include "axl_socket.h" +#include "kvtree.h" + +/* + * Flag to state whether the AXL client/server mode of operation is enabled, + * and if so, whether the code is running as the client or the server. + */ +axl_socket_RunMode axl_service_mode = AXL_SOCKET_DISABLED; + +static int axl_socket_socket = -1; + +/* + * Client implementation + */ +int axl_socket_client_init(char* host, unsigned short port) +{ + struct sockaddr_in server; + struct hostent *hostnm = gethostbyname(host); + + if (hostnm == (struct hostent *) 0) { + AXL_ERR("Gethostbyname failed: (%s)", strerror(errno)); + return 0; + } + + if ( (axl_socket_socket = socket(AF_INET, SOCK_STREAM, 0)) < 0) { + AXL_ERR("socket() failed: (%s)", strerror(errno)); + return 0; + } + + memset(&server, 0, sizeof(server)); + server.sin_family = AF_INET; + server.sin_port = htons(port); + server.sin_addr.s_addr = *((unsigned long *)hostnm->h_addr); + + if ( connect(axl_socket_socket, (struct sockaddr *)&server, sizeof(server) ) < 0) { + AXL_ERR("connect() failed: (%s)", strerror(errno)); + close(axl_socket_socket); + return 0; + } + + return 1; // success +} + +/* + * function to perform client-side request to server for AXL_Finalize() + */ +void axl_socket_client_AXL_Finalize() +{ + if (axl_socket_socket >= 0) + close(axl_socket_socket); +} + +/* + * function to perform client-side request to server for AXL_Config_Set + */ +void axl_socket_client_AXL_Config_Set(const kvtree* config) +{ + ssize_t bytecount; + axl_socket_Request request; + axl_socket_Response response; + + request.request = AXL_SOCKET_AXL_CONFIG_SET; + request.payload_length = (ssize_t)kvtree_pack_size(config); + + bytecount = axl_write_attempt("AXLSVC Client --> AXL_Config_Set_1", + axl_socket_socket, &request, sizeof(request)); + + if (bytecount != sizeof(request)) { + AXL_ABORT(-1, "Unexpected Write Response to server: Expected %d, Got %d", + sizeof(request), bytecount); + } + + bytecount = kvtree_write_fd("AXLSVC Client --> AXL_Config_Set_2", + axl_socket_socket, config); + + if (bytecount != request.payload_length) { + AXL_ABORT(-1, "Unexpected Write Response to server: Expected %d, Got %d", + request.payload_length, bytecount); + } + + bytecount = axl_read("AXLSVC Client <-- Response", + axl_socket_socket, &response, sizeof(response)); + + if (bytecount != sizeof(response)) { + AXL_ABORT(-1, "Unexpected Write Response to server: Expected %d, Got %d", + sizeof(response), bytecount); + } + + if (response.response != AXL_SOCKET_SUCCESS) { + AXL_ABORT(-1, "Unexpected Response from server: %d", response.response); + } +} + +/* + * Server Implementation + */ + +static int time_to_leave = 0; + +#define AXL_SOCKET_MAX_CLIENTS 16 +struct axl_socket_conn_ctx { + int sd; /* Connection to our socket */ + struct axl_transfer_array xfr; /* Pointer to client-specific xfer array */ +} axl_socket_conn_ctx_array[AXL_SOCKET_MAX_CLIENTS]; + +#if 0 +static kvtree* service_request_AXL_Config_Set(int sd) +{ + kvtree* config = kvtree_new(); + kvtree* rval; + ssize_t bytecount; + + bytecount = kvtree_read_fd("Service_AXL_Config_Set", sd, config); + + return bytecount; +} +#endif + +static ssize_t axl_socket_request_from_client(int sd) +{ + ssize_t bytecount; + axl_socket_Request req; + axl_socket_Response response; + char* buffer; + + bytecount = axl_read("AXLSVC Client Reqeust", sd, &req, sizeof(req)); + + if (bytecount == 0) { + AXL_DBG(2, "Client for socket %d closed", sd); + return bytecount; + } + + buffer = malloc(req.payload_length); + + bytecount = axl_read("AXLSVC Reqeust Payload", sd, &buffer, req.payload_length); + + if (bytecount != req.payload_length) { + AXL_ABORT(-1, "Unexpected Payload Length: Expected %d, Got %d", req.payload_length, bytecount); + } + + switch (req.request) { + case AXL_SOCKET_AXL_CONFIG_SET: + AXL_DBG(1, "AXL_SOCKET_AXL_CONFIG_SET(kfile=%s", buffer); + response.response = AXL_SOCKET_SUCCESS; + response.payload_length = 0; + bytecount = axl_write_attempt("AXLSVC Response to Client", sd, &response, sizeof(response)); + if (bytecount != sizeof(response)) { + AXL_ABORT(-1, "Unexpected Write Response to client: Expected %d, Got %d", + sizeof(response), bytecount); + } + break; + default: + AXL_ABORT(-1, "AXLSVC Unknown Request Type %d", req.request); + break; + } + + free(buffer); + return bytecount; +} + +static void sigterm_handler(int sig, siginfo_t* info, void* ucontext) +{ + AXL_DBG(2, "SIGTERM Received"); + time_to_leave++; +} + +static int use_sigterm_to_exit() +{ + struct sigaction act = {0}; + + act.sa_flags = 0; + sigemptyset(&act.sa_mask); + act.sa_sigaction = sigterm_handler; + if (sigaction(SIGTERM, &act, NULL) == -1) { + perror("sigaction"); + return AXL_FAILURE; + } + + return AXL_SUCCESS; +} + +int axl_socket_server_run(int port) +{ + int server_socket; + int opt = 1; + struct sockaddr_in address; + int addrlen; + int new_socket; + fd_set readfds; + int activity; + int max_sd; + int rval = AXL_FAILURE; + + axl_service_mode = AXL_SOCKET_SERVER; + memset(axl_socket_conn_ctx_array, 0, sizeof(axl_socket_conn_ctx_array)); + + /* + * Need to check whether calling AXL_Init() at this point is really appropriate + */ + if ( (rval = AXL_Init()) != AXL_SUCCESS) + return rval; + + if ((rval = use_sigterm_to_exit()) != AXL_SUCCESS) + return rval; + + if ((server_socket = socket(AF_INET, SOCK_STREAM, 0)) == 0) { + AXL_ABORT(-1, "socket() failed: (%s)", strerror(errno)); + } + + if (setsockopt(server_socket, SOL_SOCKET, SO_REUSEADDR, (char *)&opt, sizeof(opt)) < 0 ) { + AXL_ABORT(-1, "setsockopt() failed: (%s)", strerror(errno)); + } + + address.sin_family = AF_INET; + address.sin_addr.s_addr = INADDR_ANY; + address.sin_port = htons(port); + + if (bind(server_socket, (struct sockaddr *)&address, sizeof(address)) < 0) { + AXL_ABORT(-1, "bind() failed: (%s)", strerror(errno)); + } + + if (listen(server_socket, AXL_SOCKET_MAX_CLIENTS) < 0) { + AXL_ABORT(-1, "listen() failed: (%s)", strerror(errno)); + } + + addrlen = sizeof(address); + + while (!time_to_leave) { + FD_ZERO(&readfds); + FD_SET(server_socket, &readfds); + max_sd = server_socket; + + for (int i = 0 ; i < AXL_SOCKET_MAX_CLIENTS ; i++) { + if (axl_socket_conn_ctx_array[i].sd > 0) + FD_SET(axl_socket_conn_ctx_array[i].sd, &readfds); + + if (axl_socket_conn_ctx_array[i].sd > max_sd) + max_sd = axl_socket_conn_ctx_array[i].sd; + } + + activity = select(max_sd + 1 , &readfds , NULL , NULL , NULL); + + if (time_to_leave) + break; + + if (activity < 0 && errno != EINTR) { + AXL_ABORT(-1, "select() error: (%s)", strerror(errno)); + } + + if (FD_ISSET(server_socket, &readfds)) { + AXL_DBG(1, "Accepting new incomming connection"); + if ((new_socket = accept(server_socket, (struct sockaddr *)&address, + (socklen_t*)&addrlen)) < 0) { + AXL_ABORT(-1, "accept() error: (%s)", strerror(errno)); + } + + for (int i = 0; i < AXL_SOCKET_MAX_CLIENTS; i++) { + if(axl_socket_conn_ctx_array[i].sd == 0 ){ + axl_socket_conn_ctx_array[i].sd = new_socket; + break; + } + } + AXL_DBG(1, "Connection established"); + } + + for ( int i = 0; i < AXL_SOCKET_MAX_CLIENTS; i++) { + if (FD_ISSET(axl_socket_conn_ctx_array[i].sd , &readfds)) { + axl_xfer_list = &axl_socket_conn_ctx_array[i].xfr; + + if (axl_socket_request_from_client(axl_socket_conn_ctx_array[i].sd) == 0) { + AXL_DBG(1, "Closing server side socket(%d) to client", axl_socket_conn_ctx_array[i].sd); + close(axl_socket_conn_ctx_array[i].sd); + axl_socket_conn_ctx_array[i].sd = 0; + axl_free(&axl_xfer_list->axl_kvtrees); + axl_xfer_list->axl_kvtrees_count = 0; + } + } + } + } + return 0; +} + diff --git a/src/axl/axl_socket.h b/src/axl/axl_socket.h new file mode 100644 index 00000000..e3d279ac --- /dev/null +++ b/src/axl/axl_socket.h @@ -0,0 +1,60 @@ +#ifndef AXL_SOCKET_H +#define AXL_SOCKET_H + +#include +#include "kvtree.h" + +#if defined(__cplusplus) +extern "C" { +#endif + +typedef enum { + AXL_SOCKET_DISABLED = 0, /* Default - Not utilizing AXL service (lib only) */ + AXL_SOCKET_CLIENT = 1, /* Using AXL service and we are the client */ + AXL_SOCKET_SERVER = 2 /* Using AXL service and we are the server */ +} axl_socket_RunMode; + +/* + * Flag to state whether the AXL client/server mode of operation is enabled, + * and if so, whether the code is running as the client or the server. + */ +extern axl_socket_RunMode axl_service_mode; + +typedef enum { + AXL_SOCKET_AXL_CONFIG_SET = 0, /* payload is config kvtree hash buffer */ +} axl_socket_request_t; + +typedef struct { + axl_socket_request_t request; + ssize_t payload_length; +} axl_socket_Request; + +typedef enum { + AXL_SOCKET_SUCCESS = 0, + AXL_SOCKET_FAILURE = -1, +} axl_socket_response_t; + +typedef struct { + axl_socket_response_t response; + ssize_t payload_length; // Optional error/status string +} axl_socket_Response; + +int axl_socket_client_init(char* host, unsigned short port); + +/* + * function to perform client-side request to server for AXL_Finalize() + */ +void axl_socket_client_AXL_Finalize(); + +/* + * function to perform client-side request to server for AXL_Config_Set + */ +void axl_socket_client_AXL_Config_Set(const kvtree* config); + +int axl_socket_server_run(int port); + +#if defined(__cplusplus) +extern "C" } +#endif + +#endif /* AXL_SOCKET_H */ diff --git a/src/axl/axl_socket_daemon.c b/src/axl/axl_socket_daemon.c new file mode 100644 index 00000000..16fca7d2 --- /dev/null +++ b/src/axl/axl_socket_daemon.c @@ -0,0 +1,18 @@ +#include +#include + +#include "axl_internal.h" +#include "axl_socket.h" + +int main(int argc , char *argv[]) +{ + int rval = AXL_FAILURE; + + if (argc == 2 && atoi(argv[1]) > 0) { + rval = axl_socket_server_run(atoi(argv[1])); + } else { + fprintf(stderr, "Usage: %s \n", argv[0]); + } + + return rval; +} diff --git a/src/axl/axl_sync.c b/src/axl/axl_sync.c index b5fd1aea..1abc6088 100644 --- a/src/axl/axl_sync.c +++ b/src/axl/axl_sync.c @@ -12,7 +12,7 @@ int __axl_sync_start (int id, int resume) int rc = AXL_SUCCESS; /* get pointer to file list for this dataset */ - kvtree* file_list = axl_kvtrees[id]; + kvtree* file_list = axl_xfer_list->axl_kvtrees[id]; /* mark dataset as in progress */ kvtree_util_set_int(file_list, AXL_KEY_STATUS, AXL_STATUS_INPROG); @@ -39,7 +39,7 @@ int __axl_sync_start (int id, int resume) } /* TODO: check bytecount conversion success, do not use global - * axl_kvtrees to get file_list */ + * axl_xfer_list->axl_kvtrees to get file_list */ unsigned long file_buf_size; int success = kvtree_util_get_bytecount(file_list, AXL_KEY_CONFIG_FILE_BUF_SIZE, &file_buf_size); @@ -91,7 +91,7 @@ int axl_sync_test (int id) int axl_sync_wait (int id) { /* get pointer to file list for this dataset */ - kvtree* file_list = axl_kvtrees[id]; + kvtree* file_list = axl_xfer_list->axl_kvtrees[id]; /* determine whether transfer was successful */ int status; diff --git a/src/axl/axl_util.c b/src/axl/axl_util.c index 8b0ebbb9..98b62d17 100644 --- a/src/axl/axl_util.c +++ b/src/axl/axl_util.c @@ -110,7 +110,7 @@ kvtree_elem* axl_get_next_path(int id, kvtree_elem* elem, char** src, char** dst { if (! elem) { /* lookup transfer info for the given id */ - kvtree* file_list = axl_kvtrees[id]; + kvtree* file_list = axl_xfer_list->axl_kvtrees[id]; if (! file_list) { return NULL; } diff --git a/src/axl/test/CMakeLists.txt b/src/axl/test/CMakeLists.txt index 927251dc..6cddc62e 100644 --- a/src/axl/test/CMakeLists.txt +++ b/src/axl/test/CMakeLists.txt @@ -3,15 +3,13 @@ ############### INCLUDE_DIRECTORIES(${PROJECT_BINARY_DIR}) -LIST(APPEND axl_test_srcs - axl_cp.c -) - ADD_EXECUTABLE(axl_cp axl_cp.c) ADD_EXECUTABLE(axl_test_config test_config.c) +ADD_EXECUTABLE(test_client_server test_client_server.c) TARGET_LINK_LIBRARIES(axl_cp axl_o kvtree_o ${SCR_EXTERNAL_LIBS}) TARGET_LINK_LIBRARIES(axl_test_config axl_o kvtree_o ${SCR_EXTERNAL_LIBS}) +TARGET_LINK_LIBRARIES(test_client_server axl_o kvtree_o ${SCR_EXTERNAL_LIBS}) ################ # Add tests to ctest @@ -19,6 +17,7 @@ TARGET_LINK_LIBRARIES(axl_test_config axl_o kvtree_o ${SCR_EXTERNAL_LIBS}) CONFIGURE_FILE(test_axl.sh ${CMAKE_CURRENT_BINARY_DIR} COPYONLY) CONFIGURE_FILE(test_axl_metadata.sh ${CMAKE_CURRENT_BINARY_DIR} COPYONLY) +CONFIGURE_FILE(test_driver_client_server.py ${CMAKE_CURRENT_BINARY_DIR} COPYONLY) ADD_TEST(axl_sync_test test_axl.sh sync) @@ -59,6 +58,8 @@ ENDIF(BBAPI_FOUND) ADD_TEST(axl_test_config axl_test_config) +ADD_TEST(axl_test_client_server test_driver_client_server.py) + #################### # make a verbose "test" target named "axl_check" #################### diff --git a/src/axl/test/test_client_server.c b/src/axl/test/test_client_server.c new file mode 100644 index 00000000..30eb4985 --- /dev/null +++ b/src/axl/test/test_client_server.c @@ -0,0 +1,58 @@ +#include +#include +#include +#include +#include +#include + +#include "axl.h" + +#define AXLCS_SUCCESS 0 +#define AXLCS_CLIENT_INVALID 1 +#define AXLCS_SERVICE_CREATION_FAILURE 1000 +#define AXLCS_SERVICE_KILLED 2000 +#define AXLCS_SERVICE_FAIL 3000 + +extern int axl_socket_server_run(int port); + +int run_service() +{ + fprintf(stdout, "Service Started!\n"); + int rval = axl_socket_server_run(2000); + fprintf(stdout, "Service Ending!\n"); + return rval; +} + +int run_client() +{ + int rval; + + if ((rval = AXL_Init()) != AXL_SUCCESS) { + fprintf(stderr, "Call to AXL_Init failed with code: %d\n", rval); + } else { + if ((rval = AXL_Finalize()) != AXL_SUCCESS) { + fprintf(stderr, "Call to AXL_Init failed with code: %d\n", rval); + } + } + + return rval; +} + +int main(int ac, char **av) +{ + fprintf(stderr, "Just testing stderr...\n"); + if (ac != 2) { + fprintf(stderr, "Command count (%d) incorrect:\nUsage: test_client_server --\n", ac); + return AXLCS_CLIENT_INVALID; + } + + if (strcmp("--server", av[1]) == 0) { + return run_service(); + } + else if (strcmp("--client", av[1]) == 0) { + return run_client(); + } + + fprintf(stderr, "Unknown Argument (%s) incorrect:\nUsage: test_client_server --\n", av[1]); + return AXLCS_CLIENT_INVALID; +} \ No newline at end of file diff --git a/src/axl/test/test_driver_client_server.py b/src/axl/test/test_driver_client_server.py new file mode 100755 index 00000000..36f43dbb --- /dev/null +++ b/src/axl/test/test_driver_client_server.py @@ -0,0 +1,37 @@ +#!/usr/bin/env python3 + +import sys +import subprocess +import os +import time + +def wait_for_completion(procname, proc, wait_time): + try: + outs, err = proc.communicate(timeout=wait_time) + except: + proc.terminate() + outs, err = proc.communicate() + + print("{} Return Code: {}".format(procname, proc.returncode)) + print("stdout:\n{}".format(outs.decode("utf-8"))) + print("stderr:\n{}".format(err.decode("utf-8"))) + + return proc.returncode, outs, err + +if __name__ == '__main__': + errors = 0 + # Launch the server then the client + test_env = {'AXL_DEBUG': '44', 'AXL_SERVICE_HOST': 'localhost', 'AXL_SERVICE_PORT': '2000'} + server = subprocess.Popen(['./test_client_server', '--server'], env=dict(os.environ, **test_env), stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=False) + time.sleep(2) # Give server a chance to start + + client = subprocess.Popen(['./test_client_server', '--client'], env=dict(os.environ, **test_env), stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=False) + + # Wait for the client then the server to finish + client_ecode, client_out, client_err = wait_for_completion("axl_client", client, 30) + server_ecode, server_out, server_err = wait_for_completion("axl_server", server, 2) + + if server_ecode != 0 or client_ecode != 0: + errors = server_ecode + client_ecode + + sys.exit(errors)