Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions src/api/include/pdc_client_connect.h
Original file line number Diff line number Diff line change
Expand Up @@ -1140,4 +1140,11 @@ void report_avg_server_profiling_rst();
perr_t PDC_Client_transfer_pthread_create();
perr_t PDC_Client_transfer_pthread_terminate();
perr_t PDC_Client_transfer_pthread_cnt_add(int n);

perr_t PDC_Client_create_bucket(char *prefix);
uint32_t PDC_prefix_binary_search(char *prefix, char **target_prefix);
perr_t PDC_metadata_key_add(pdcid_t obj_id, pdc_kvtag_t *kvtag, int is_cont);
perr_t PDC_metadata_key_delete(pdcid_t obj_id, pdc_kvtag_t *kvtag, int is_cont);
perr_t PDC_delete_metadata_key(pdcid_t obj_id, pdc_kvtag_t *kvtag, int is_cont);
hg_return_t metadata_create_bucket_client_rpc_cb(const struct hg_cb_info *callback_info);
#endif /* PDC_CLIENT_CONNECT_H */
309 changes: 307 additions & 2 deletions src/api/pdc_client_connect.c
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,11 @@
#include <inttypes.h>
#include <math.h>
#include <sys/time.h>
#include <errno.h>
#include "pdc_timing.h"
#include "errno.h"

/* #define TANG_DEBUG 1 */
#define ROOT_BUCKET "#"

int is_client_debug_g = 0;
pdc_server_selection_t pdc_server_selection_g = PDC_SERVER_DEFAULT;
Expand Down Expand Up @@ -144,6 +148,8 @@ static hg_id_t metadata_delete_by_id_register_id_g;
static hg_id_t metadata_update_register_id_g;
static hg_id_t metadata_add_tag_register_id_g;
static hg_id_t metadata_add_kvtag_register_id_g;
static hg_id_t metadata_check_prefix_register_id_g;
static hg_id_t metadata_key_add_register_id_g;
static hg_id_t metadata_del_kvtag_register_id_g;
static hg_id_t metadata_get_kvtag_register_id_g;
static hg_id_t region_lock_register_id_g;
Expand Down Expand Up @@ -1341,6 +1347,9 @@ PDC_Client_mercury_init(hg_class_t **hg_class, hg_context_t **hg_context, int po
metadata_update_register_id_g = PDC_metadata_update_register(*hg_class);
metadata_add_tag_register_id_g = PDC_metadata_add_tag_register(*hg_class);
metadata_add_kvtag_register_id_g = PDC_metadata_add_kvtag_register(*hg_class);
metadata_check_prefix_register_id_g = PDC_metadata_check_prefix_register(*hg_class);
metadata_create_bucket_register_id_g = PDC_metadata_create_bucket_register(*hg_class);
metadata_key_add_register_id_g = PDC_metadata_key_add_register(*hg_class);
metadata_del_kvtag_register_id_g = PDC_metadata_del_kvtag_register(*hg_class);
metadata_get_kvtag_register_id_g = PDC_metadata_get_kvtag_register(*hg_class);
region_lock_register_id_g = PDC_region_lock_register(*hg_class);
Expand Down Expand Up @@ -1544,8 +1553,9 @@ PDC_Client_init()
}

if (pdc_client_mpi_rank_g == 0) {
LOG_INFO("Using [%s] as tmp dir, %d clients per server\n", pdc_client_tmp_dir_g,
LOG_INFO("Using [%s] as tmp dir112, %d clients per server\n", pdc_client_tmp_dir_g,
pdc_nclient_per_server_g);
PDC_Client_create_bucket(ROOT_BUCKET);
}

if (mercury_has_init_g) {
Expand Down Expand Up @@ -2040,6 +2050,68 @@ metadata_add_tag_rpc_cb(const struct hg_cb_info *callback_info)
FUNC_LEAVE(ret_value);
}

static hg_return_t
metadata_check_prefix_rpc_cb(const struct hg_cb_info *callback_info)
{
FUNC_ENTER(NULL);

hg_return_t ret_value;
metadata_check_prefix_out_t output;
metadata_check_prefix_out_t *args = (metadata_check_prefix_out_t *)callback_info->arg;
hg_handle_t handle = callback_info->info.forward.handle;

printf("metadata_check_prefix_rpc_cb: callback_info->arg = %p\n", callback_info->arg);
/* Get output from server*/
ret_value = HG_Get_output(handle, &output);
printf("metadata_check_prefix_rpc_cb: after HG_Get_output\n");
if (ret_value != HG_SUCCESS) {
ret_value = -1;
PGOTO_ERROR(HG_OTHER_ERROR, "Error with HG_Get_output");
}
args->found = output.found;
args->server_id = output.server_id;
args->ret = output.ret;
args->leaf = output.leaf;
printf("metadata_check_prefix_rpc_cb: output.found = %d\n", args->found);
printf("metadata_check_prefix_rpc_cb: output.leaf = %d\n", args->leaf);
printf("metadata_check_prefix_rpc_cb: output.ret = %d\n", args->ret);
printf("metadata_check_prefix_rpc_cb: output.server_id = %d\n", args->server_id);

done:
hg_atomic_decr32(&atomic_work_todo_g);
HG_Free_output(handle, &output);

FUNC_LEAVE(ret_value);
}

static hg_return_t
metadata_key_add_rpc_cb(const struct hg_cb_info *callback_info)
{
FUNC_ENTER(NULL);

hg_return_t ret_value;
metadata_key_add_out_t output;
metadata_key_add_out_t *args = (metadata_key_add_out_t *)callback_info->arg;
hg_handle_t handle = callback_info->info.forward.handle;

printf("metadata_key_add_rpc_cb: callback_info->arg = %p\n", callback_info->arg);
/* Get output from server*/
ret_value = HG_Get_output(handle, &output);
printf("metadata_key_add_rpc_cb: after HG_Get_output\n");
if (ret_value != HG_SUCCESS) {
ret_value = -1;
PGOTO_ERROR(HG_OTHER_ERROR, "Error with HG_Get_output");
}
args->ret = output.ret;
printf("metadata_check_prefix_rpc_cb: output.ret = %d\n", args->ret);

done:
hg_atomic_decr32(&atomic_work_todo_g);
HG_Free_output(handle, &output);

FUNC_LEAVE(ret_value);
}

perr_t
PDC_Client_add_tag(pdcid_t obj_id, const char *tag)
{
Expand Down Expand Up @@ -6357,6 +6429,7 @@ PDC_add_kvtag(pdcid_t obj_id, pdc_kvtag_t *kvtag, int is_cont)
}

server_id = PDC_get_server_by_obj_id(meta_id, pdc_server_num_g);
printf("Object ID %" PRIu64 " mapped to server %" PRIu32 "\n", obj_id, server_id);

// Debug statistics for counting number of messages sent to each server.
debug_server_id_count[server_id]++;
Expand Down Expand Up @@ -6394,6 +6467,172 @@ PDC_add_kvtag(pdcid_t obj_id, pdc_kvtag_t *kvtag, int is_cont)

FUNC_LEAVE(ret_value);
}
/***
* Check if the prefix exists in the PHT
* @param server: The server to check against
* @param prefix: The prefix to check
*
* This function will send mercury RPC request to the server and check if the prefix exists
*/

static perr_t
PDC_check_prefix(uint32_t server, char *prefix, metadata_check_prefix_out_t *out)
{
FUNC_ENTER(NULL);
perr_t ret_value = SUCCEED;
hg_return_t hg_ret = 0;
hg_handle_t metadata_check_prefix_handle;
struct _pdc_client_lookup_args lookup_args;
metadata_check_prefix_in_t in;

if (PDC_Client_try_lookup_server(server, 0) != SUCCEED)
PGOTO_ERROR(FAIL, "Error with PDC_Client_try_lookup_server");

HG_Create(send_context_g, pdc_server_info_g[server].addr, metadata_check_prefix_register_id_g,
&metadata_check_prefix_handle);

// Fill input structure
in.prefix = prefix;
printf("Checking prefix client %s on server %d\n", prefix, server);
hg_ret = HG_Forward(metadata_check_prefix_handle, metadata_check_prefix_rpc_cb, out, &in);
if (hg_ret != HG_SUCCESS)
PGOTO_ERROR(FAIL, "Could not start HG_Forward");

// Wait for response from server
hg_atomic_set32(&atomic_work_todo_g, 1);
PDC_Client_check_response(&send_context_g);

if (out->ret != 1)
LOG_ERROR("Check prefix NOT successful, ret_value = %d\n", out->ret);

done:
HG_Destroy(metadata_check_prefix_handle);

FUNC_LEAVE(ret_value);
}

uint32_t
PDC_prefix_binary_search(char *prefix, char **target_prefix)
{
metadata_check_prefix_out_t resp;
uint64_t temp_server_id = -1;
int max = strlen(prefix) - 1;
int mid = 1;
char * slice;

printf("Searching prefix %s\n", prefix);
while (mid <= max) {
slice = malloc((mid + 1) * sizeof(char));
strncpy(slice, prefix, mid);
slice[mid] = '\0';
uint64_t hash_value = prefix_hash(prefix);
temp_server_id = PDC_get_server_using_pht(hash_value);
printf("Server %d is responsible for prefix %s till length %d\n", temp_server_id, slice, mid);
perr_t ret = PDC_check_prefix(temp_server_id, slice, &resp);

printf("Checking prefix %s on server %d: found=%d, leaf=%d\n", slice, temp_server_id, resp.found,
resp.leaf);
// TODO: check if redirect to another server

/*** If the prefix exists and a leaf node, return the target server */
if (resp.found == 1 && resp.leaf == 1) {
*target_prefix = malloc((mid + 1) * sizeof(char));
strncpy(*target_prefix, slice, mid);
(*target_prefix)[mid] = '\0';
free(slice);
return temp_server_id;
}
else if (resp.found == 1 &&
resp.leaf == 0) { /** If the prefix exists but not a leaf node, search longer prefix */
mid++;
}
else { /** If the prefix does not exist, search shorter prefix */
max = mid - 1;
mid = mid / 2;
}

free(slice);
return temp_server_id;
}
return -1; // No valid prefix found
}

perr_t
PDC_metadata_key_add(pdcid_t obj_id, pdc_kvtag_t *kvtag, int is_cont)
{
FUNC_ENTER(NULL);
char * key_name;
char * target_prefix;
perr_t ret_value = SUCCEED;
hg_return_t hg_ret = 0;
hg_handle_t metadata_key_add_handle;
metadata_key_add_in_t in;
struct _pdc_client_lookup_args lookup_args;

printf("PDC_metadata_key_add: Adding key %s to metadata\n", kvtag->name);
key_name = kvtag->name;
char *key_bin = string_to_binary(key_name);
char *prefix = malloc(strlen(key_bin) + 2);
strncpy(prefix, ROOT_BUCKET, 1);
strncpy(prefix + 1, key_bin, strlen(key_bin));
prefix[strlen(key_bin) + 1] = '\0';
free(key_bin);

uint32_t server_id = PDC_prefix_binary_search(prefix, &target_prefix);
if (PDC_Client_try_lookup_server(server_id, 0) != SUCCEED)
PGOTO_ERROR(FAIL, "Error with PDC_Client_try_lookup_server");

HG_Create(send_context_g, pdc_server_info_g[server_id].addr, metadata_key_add_register_id_g,
&metadata_key_add_handle);

in.key = strdup(kvtag->name);
in.value = kvtag->value;
in.prefix = strdup(target_prefix);
in.size = kvtag->size;

printf("PDC_metadata_key_add: Inserting key %s with prefix %s and value %s to server %d\n", in.key,
in.prefix, in.value, server_id);

hg_ret = HG_Forward(metadata_key_add_handle, metadata_key_add_rpc_cb, &lookup_args, &in);
printf("HG_Forward returned %d\n", hg_ret);
if (hg_ret != HG_SUCCESS)
PGOTO_ERROR(FAIL, "Could not start HG_Forward");

// Wait for response from server
hg_atomic_set32(&atomic_work_todo_g, 1);
PDC_Client_check_response(&send_context_g);

free(target_prefix);
free(prefix);
done:
HG_Destroy(metadata_key_add_handle);

FUNC_LEAVE(ret_value);
}

perr_t
PDC_metadata_key_delete(pdcid_t obj_id, pdc_kvtag_t *kvtag, int is_cont)
{
FUNC_ENTER(NULL);
char * key_name;
char * target_prefix;
perr_t ret_value = SUCCEED;

printf("PDC_metadata_key_delete: Adding key %s to metadata\n", kvtag->name);
key_name = kvtag->name;
char *key_bin = string_to_binary(key_name);
char *prefix = malloc(strlen(key_bin) + 2);
strncpy(prefix, ROOT_BUCKET, 1);
strncpy(prefix + 1, key_bin, strlen(key_bin));
prefix[strlen(key_bin) + 1] = '\0';
free(key_bin);
uint32_t server_id = PDC_prefix_binary_search(prefix, &target_prefix);
printf("PDC_metadata_key_delete: Inserting key %s with prefix %s to server %d\n", key_name, target_prefix,
server_id);
free(target_prefix);
free(prefix);
FUNC_LEAVE(ret_value);
}

static hg_return_t
metadata_get_kvtag_rpc_cb(const struct hg_cb_info *callback_info)
Expand Down Expand Up @@ -8445,6 +8684,72 @@ PDC_Client_search_obj_ref_through_dart_mpi(dart_hash_algo_t hash_algo, char *que
*out = dart_out;
FUNC_LEAVE(ret_value);
}

perr_t
PDC_Client_create_bucket(char *prefix)
{
FUNC_ENTER(NULL);

perr_t ret_value = SUCCEED;
hg_return_t hg_ret = 0;
hg_handle_t metadata_create_bucket_handle;
metadata_create_bucket_in_t in;
struct _pdc_client_lookup_args lookup_args;

uint64_t hash_value = prefix_hash(prefix);
uint32_t server_id = PDC_get_server_using_pht(hash_value);

if (PDC_Client_try_lookup_server(server_id, 0) != SUCCEED)
PGOTO_ERROR(FAIL, "Error with PDC_Client_try_lookup_server");

HG_Create(send_context_g, pdc_server_info_g[server_id].addr, metadata_create_bucket_register_id_g,
&metadata_create_bucket_handle);

// Fill input structure
in.prefix = prefix;

hg_ret =
HG_Forward(metadata_create_bucket_handle, metadata_create_bucket_client_rpc_cb, &lookup_args, &in);
if (hg_ret != HG_SUCCESS)
PGOTO_ERROR(FAIL, "Could not start HG_Forward");
// Wait for response from server
hg_atomic_set32(&atomic_work_todo_g, 1);
PDC_Client_check_response(&send_context_g); // BLOCKING CALL

if (lookup_args.ret != 1)
LOG_ERROR("Add create_bucket NOT successful, ret_value = %d\n", lookup_args.ret);
done:
HG_Destroy(metadata_create_bucket_handle);

FUNC_LEAVE(ret_value);
}

hg_return_t
metadata_create_bucket_client_rpc_cb(const struct hg_cb_info *callback_info)
{

FUNC_ENTER(NULL);
hg_return_t ret_value;
hg_handle_t handle = callback_info->info.forward.handle;
struct _pdc_client_lookup_args *client_lookup_args = (struct _pdc_client_lookup_args *)callback_info->arg;
/* Get output from server*/

metadata_create_bucket_out_t output;
ret_value = HG_Get_output(handle, &output);

if (ret_value != HG_SUCCESS) {
client_lookup_args->ret = -1;
PGOTO_ERROR(HG_OTHER_ERROR, "Error with HG_Get_output");
}
printf("metadata_create_bucket_rpc_cb: output.ret = %d\n", output.ret);
client_lookup_args->ret = output.ret;
done:

hg_atomic_decr32(&atomic_work_todo_g);
HG_Free_output(handle, &output);
FUNC_LEAVE(ret_value);
}

#endif

/******************** Collective Object Selection Query Ends *******************************/
Loading
Loading