Skip to content

Commit

Permalink
Merge pull request #76 from Derecho-Project/get_my_shard
Browse files Browse the repository at this point in the history
Enable 'get_my_shard' API for UDL
  • Loading branch information
songweijia authored Jul 30, 2024
2 parents 38654c1 + edec60f commit 7e54e48
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 0 deletions.
48 changes: 48 additions & 0 deletions include/cascade/detail/service_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,54 @@ uint32_t ServiceClient<CascadeTypes...>::get_number_of_shards (
return get_number_of_shards(opm.subgroup_type_index,opm.subgroup_index);
}

template <typename... CascadeTypes>
template <typename SubgroupType>
int32_t ServiceClient<CascadeTypes...>::get_my_shard(uint32_t subgroup_index) const {
if (!is_external_client()) {
return group_ptr->template get_my_shard<SubgroupType>(subgroup_index);
} else {
return -1;
}
}

template <typename... CascadeTypes>
template <typename FirstType,typename SecondType, typename...RestTypes>
int32_t ServiceClient<CascadeTypes...>::type_recursive_get_my_shard (
uint32_t type_index,uint32_t subgroup_index) const {
if (type_index == 0) {
return this->template get_my_shard<FirstType>(subgroup_index);
} else {
return this->template type_recursive_get_number_of_shards<SecondType,RestTypes...>(type_index-1,subgroup_index);
}
}

template <typename... CascadeTypes>
template <typename LastType>
int32_t ServiceClient<CascadeTypes...>::type_recursive_get_my_shard (
uint32_t type_index, uint32_t subgroup_index) const {
if (type_index == 0) {
return this->template get_my_shard<LastType>(subgroup_index);
} else {
throw derecho::derecho_exception(std::string(__PRETTY_FUNCTION__) + " type index is out of boundary");
}
}

template <typename... CascadeTypes>
int32_t ServiceClient<CascadeTypes...>::get_my_shard (
uint32_t subgroup_type_index, uint32_t subgroup_index) const {
return this->template type_recursive_get_my_shard<CascadeTypes...>(subgroup_type_index,subgroup_index);
}

template <typename... CascadeTypes>
int32_t ServiceClient<CascadeTypes...>::get_my_shard (
const std::string& object_pool_pathname) {
auto opm = find_object_pool(object_pool_pathname);
if (!opm.is_valid() || opm.is_null() || opm.deleted) {
throw derecho::derecho_exception("Failed to find object_pool:" + object_pool_pathname);
}
return get_my_shard(opm.subgroup_type_index,opm.subgroup_index);
}

template <typename... CascadeTypes>
template <typename SubgroupType>
void ServiceClient<CascadeTypes...>::set_member_selection_policy(uint32_t subgroup_index,uint32_t shard_index,
Expand Down
28 changes: 28 additions & 0 deletions include/cascade/service.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -603,6 +603,9 @@ namespace cascade {
* shard index.
* - get_number_of_subgroups returns the number of subgroups of a given type
* - get_number_of_shards returns the number of shards of a given subgroup
* - get_my_shard returns the shard number that this node is a member of in the specific
* subgroup (by subgroup type and index), or -1 if this node is not a member
* of any shard in the specified subgroup.
* During view change, the Client might experience failure if the member is gone. In such a case, the client needs
* refresh its local member cache by calling get_shard_members.
*/
Expand Down Expand Up @@ -657,6 +660,31 @@ namespace cascade {
* @param[in] object_pool_pathname - the object pool name
*/
uint32_t get_number_of_shards(const std::string& object_pool_pathname);

template <typename SubgroupType>
int32_t get_my_shard(uint32_t subgroup_index) const;
protected:
template <typename FirstType,typename SecondType, typename...RestTypes>
int32_t type_recursive_get_my_shard(uint32_t type_index, uint32_t subgroup_index) const;
template <typename LastType>
int32_t type_recursive_get_my_shard(uint32_t type_index, uint32_t subgroup_index) const;
public:
/**
* @fn int32_t get_my_shard(uint32_t subgroup_type_index, uint32_t subgroup_index) const
* @brief find the shard I belong to, given the subgroup specified by type and index.
* @param[in] subgroup_type_index - the type index of the subgroup type.
* @param[in] subgroup_index - the subgroup index in the given type.
* @return The number of the shard, or -1 if current node is not in the specified subgroup.
*/
int32_t get_my_shard(uint32_t subgroup_type_index, uint32_t subgroup_index) const;

/**
* @fn int32_t get_my_shard(const std::string& object_pool_pathname)
* @brief find the shard I belong to, given the object pool specified by object pool path name.
* @param[in] object_pool_pathname - the object pool path name.
* @return The number of the shard, or -1 if current node is not in the specified subgroup.
*/
int32_t get_my_shard(const std::string& object_pool_pathname);

/**
* Member selection policy control API.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,13 @@ class ConsolePrinterOCDPO: public OffCriticalDataPathObserver {
uint32_t worker_id) override {
std::cout << "[console printer ocdpo]: I(" << worker_id << ") received an object with key=" << key_string
<< ", matching prefix=" << key_string.substr(0,prefix_length) << std::endl;
auto* typed_ctxt = dynamic_cast<DefaultCascadeContextType*>(ctxt);
std::string prefix = key_string.substr(0,prefix_length);
std::cout << "[console printer ocdpo]: my shard @"
<< prefix
<< " is:"
<< typed_ctxt->get_service_client_ref().get_my_shard(prefix)
<< std::endl;
}

static std::shared_ptr<OffCriticalDataPathObserver> ocdpo_ptr;
Expand Down

0 comments on commit 7e54e48

Please sign in to comment.