1 Star 0 Fork 2.9K

Runner365/dgiot

forked from dgiot开源社区/dgiot 
加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
emqx_cm_registry.erl 4.73 KB
一键复制 编辑 原始数据 按行查看 历史
jhonliu 提交于 2022-12-29 18:13 . feat: v4.4.11
%%--------------------------------------------------------------------
%% Copyright (c) 2019-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
%% Global Channel Registry
-module(emqx_cm_registry).
-behaviour(gen_server).
-include("emqx.hrl").
-include("logger.hrl").
-include("types.hrl").
-logger_header("[Registry]").
-export([start_link/0]).
-export([is_enabled/0]).
-export([ register_channel/1
, unregister_channel/1
]).
-export([lookup_channels/1]).
%% gen_server callbacks
-export([ init/1
, handle_call/3
, handle_cast/2
, handle_info/2
, terminate/2
, code_change/3
]).
-define(REGISTRY, ?MODULE).
-define(TAB, emqx_channel_registry).
-define(LOCK, {?MODULE, cleanup_down}).
-record(channel, {chid, pid}).
%% @doc Start the global channel registry.
-spec(start_link() -> startlink_ret()).
start_link() ->
gen_server:start_link({local, ?REGISTRY}, ?MODULE, [], []).
%%--------------------------------------------------------------------
%% APIs
%%--------------------------------------------------------------------
%% @doc Is the global registry enabled?
-spec(is_enabled() -> boolean()).
is_enabled() ->
emqx:get_env(enable_session_registry, true).
%% @doc Register a global channel.
-spec(register_channel(emqx_types:clientid()
| {emqx_types:clientid(), pid()}) -> ok).
register_channel(ClientId) when is_binary(ClientId) ->
register_channel({ClientId, self()});
register_channel({ClientId, ChanPid}) when is_binary(ClientId), is_pid(ChanPid) ->
case is_enabled() of
true -> mnesia:dirty_write(?TAB, record(ClientId, ChanPid));
false -> ok
end.
%% @doc Unregister a global channel.
-spec(unregister_channel(emqx_types:clientid()
| {emqx_types:clientid(), pid()}) -> ok).
unregister_channel(ClientId) when is_binary(ClientId) ->
unregister_channel({ClientId, self()});
unregister_channel({ClientId, ChanPid}) when is_binary(ClientId), is_pid(ChanPid) ->
case is_enabled() of
true -> mnesia:dirty_delete_object(?TAB, record(ClientId, ChanPid));
false -> ok
end.
%% @doc Lookup the global channels.
-spec(lookup_channels(emqx_types:clientid()) -> list(pid())).
lookup_channels(ClientId) ->
[ChanPid || #channel{pid = ChanPid} <- mnesia:dirty_read(?TAB, ClientId)].
record(ClientId, ChanPid) ->
#channel{chid = ClientId, pid = ChanPid}.
%%--------------------------------------------------------------------
%% gen_server callbacks
%%--------------------------------------------------------------------
init([]) ->
ok = ekka_mnesia:create_table(?TAB, [
{type, bag},
{ram_copies, [node()]},
{record_name, channel},
{attributes, record_info(fields, channel)},
{storage_properties, [{ets, [{read_concurrency, true},
{write_concurrency, true}]}]}]),
ok = ekka_mnesia:copy_table(?TAB, ram_copies),
ok = ekka:monitor(membership),
{ok, #{}}.
handle_call(Req, _From, State) ->
?LOG(error, "Unexpected call: ~p", [Req]),
{reply, ignored, State}.
handle_cast(Msg, State) ->
?LOG(error, "Unexpected cast: ~p", [Msg]),
{noreply, State}.
handle_info({membership, {mnesia, down, Node}}, State) ->
global:trans({?LOCK, self()},
fun() ->
mnesia:transaction(fun cleanup_channels/1, [Node])
end),
{noreply, State};
handle_info({membership, _Event}, State) ->
{noreply, State};
handle_info(Info, State) ->
?LOG(error, "Unexpected info: ~p", [Info]),
{noreply, State}.
terminate(_Reason, _State) ->
ok.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%%--------------------------------------------------------------------
%% Internal functions
%%--------------------------------------------------------------------
cleanup_channels(Node) ->
Pat = [{#channel{pid = '$1', _ = '_'}, [{'==', {node, '$1'}, Node}], ['$_']}],
lists:foreach(fun delete_channel/1, mnesia:select(?TAB, Pat, write)).
delete_channel(Chan) ->
mnesia:delete_object(?TAB, Chan, write).
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Erlang
1
https://gitee.com/iocs/dgiot.git
git@gitee.com:iocs/dgiot.git
iocs
dgiot
dgiot
master

搜索帮助