Skip to content

Commit

Permalink
1.0.0
Browse files Browse the repository at this point in the history
  • Loading branch information
28810 authored and 28810 committed Jul 30, 2019
1 parent 58dc77a commit 193f28c
Show file tree
Hide file tree
Showing 21 changed files with 1,324 additions and 737 deletions.
242 changes: 242 additions & 0 deletions ImCore/ImClient.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,242 @@
using CSRedis;
using Newtonsoft.Json;
using System;
using System.Collections.Generic;
using System.Globalization;
using System.Linq;

/// <summary>
/// im 核心类实现的配置所需
/// </summary>
public class ImClientOptions
{
/// <summary>
/// CSRedis 对象,用于存储数据和发送消息
/// </summary>
public CSRedisClient Redis { get; set; }
/// <summary>
/// 负载的服务端
/// </summary>
public string[] Servers { get; set; }
/// <summary>
/// websocket请求的路径,默认值:/ws
/// </summary>
public string PathMatch { get; set; } = "/ws";
}

/// <summary>
/// im 核心类实现
/// </summary>
public class ImClient
{
protected CSRedisClient _redis;
protected string[] _servers;
protected string _redisPrefix;
protected string _pathMatch;

public EventHandler Error;

/// <summary>
/// 初始化 imclient
/// </summary>
/// <param name="options"></param>
public ImClient(ImClientOptions options)
{
if (options.Redis == null) throw new ArgumentException("ImClientOptions.Redis 参数不能为空");
if (options.Servers.Any() == false) throw new ArgumentException("ImClientOptions.Servers 参数不能为空");
_redis = options.Redis;
_servers = options.Servers;
_redisPrefix = $"wsim{options.PathMatch.Replace('/', '_')}";
_pathMatch = options.PathMatch ?? "/ws";
}

/// <summary>
/// 负载分区规则:取clientId后四位字符,转成10进制数字0-255,求模
/// </summary>
/// <param name="clientId">客户端id</param>
/// <returns></returns>
protected string SelectServer(Guid clientId)
{
var servers_idx = int.Parse(clientId.ToString("N").Substring(28), NumberStyles.HexNumber) % _servers.Length;
if (servers_idx >= _servers.Length) servers_idx = 0;
return _servers[servers_idx];
}

/// <summary>
/// ImServer 连接前的负载、授权,返回 ws 目标地址,使用该地址连接 websocket 服务端
/// </summary>
/// <param name="clientId">客户端id</param>
/// <param name="clientMetaData">客户端相关信息,比如ip</param>
/// <returns>websocket 地址:ws://xxxx/ws?token=xxx</returns>
public string PrevConnectServer(Guid clientId, string clientMetaData)
{
var server = SelectServer(clientId);
var token = $"{Guid.NewGuid()}{Guid.NewGuid()}{Guid.NewGuid()}{Guid.NewGuid()}".Replace("-", "");
_redis.Set($"{_redisPrefix}Token{token}", JsonConvert.SerializeObject((clientId, clientMetaData)), 10);
return $"ws://{server}{_pathMatch}?token={token}";
}

/// <summary>
/// 向指定的多个客户端id发送消息
/// </summary>
/// <param name="senderClientId">发送者的客户端id</param>
/// <param name="receiveClientId">接收者的客户端id</param>
/// <param name="message">消息</param>
/// <param name="receipt">是否回执</param>
public void SendMessage(Guid senderClientId, Guid[] receiveClientId, object message, bool receipt = false)
{
receiveClientId = receiveClientId.Distinct().ToArray();
Dictionary<string, List<Guid>> redata = new Dictionary<string, List<Guid>>();

foreach (var uid in receiveClientId)
{
string server = SelectServer(uid);
if (redata.ContainsKey(server) == false) redata.Add(server, new List<Guid>());
redata[server].Add(uid);
}
var messageJson = JsonConvert.SerializeObject(message);
foreach (string server in redata.Keys)
_redis.Publish($"{_redisPrefix}Server{server}",
JsonConvert.SerializeObject((senderClientId, redata[server], messageJson, receipt)));
}

/// <summary>
/// 获取所在线客户端id
/// </summary>
/// <returns></returns>
public string[] GetClientListByOnline()
{
return _redis.HKeys($"{_redisPrefix}Online");
}

/// <summary>
/// 事件订阅
/// </summary>
/// <param name="online">上线</param>
/// <param name="offline">下线</param>
public void EventBus(
Action<(Guid clientId, string clientMetaData)> online,
Action<(Guid clientId, string clientMetaData)> offline)
{
_redis.Subscribe(
($"evt_{_redisPrefix}Online", msg => online(JsonConvert.DeserializeObject<(Guid clientId, string clientMetaData)>(msg.Body))),
($"evt_{_redisPrefix}Offline", msg => offline(JsonConvert.DeserializeObject<(Guid clientId, string clientMetaData)>(msg.Body))));
}

#region 群聊频道,每次上线都必须重新加入

/// <summary>
/// 加入群聊频道,每次上线都必须重新加入
/// </summary>
/// <param name="clientId">客户端id</param>
/// <param name="chan">群聊频道名</param>
public void JoinChan(Guid clientId, string chan)
{
_redis.StartPipe(a => a
.HSet($"{_redisPrefix}Chan{chan}", clientId.ToString(), 0)
.HSet($"{_redisPrefix}Client{clientId}", chan, 0)
.HIncrBy($"{_redisPrefix}ListChan", chan, 1));
}
/// <summary>
/// 离开群聊频道
/// </summary>
/// <param name="clientId">客户端id</param>
/// <param name="chans">群聊频道名</param>
public void LeaveChan(Guid clientId, params string[] chans)
{
if (chans?.Any() != true) return;
using (var pipe = _redis.StartPipe())
{
foreach (var chan in chans)
pipe
.HDel($"{_redisPrefix}Chan{chan}", clientId.ToString())
.HDel($"{_redisPrefix}Client{clientId}", chan)
.Eval($"if redis.call('HINCRBY', KEYS[1], '{chan}', '-1') <= 0 then redis.call('HDEL', KEYS[1], '{chan}') end return 1",
$"{_redisPrefix}ListChan");
}
}
/// <summary>
/// 获取群聊频道所有客户端id(测试)
/// </summary>
/// <param name="chan">群聊频道名</param>
/// <returns></returns>
public Guid[] GetChanClientList(string chan)
{
return _redis.HKeys($"{_redisPrefix}Chan{chan}").Select(a => Guid.Parse(a)).ToArray();
}
/// <summary>
/// 清理群聊频道的离线客户端(测试)
/// </summary>
/// <param name="chan">群聊频道名</param>
public void ClearChanClient(string chan)
{
var websocketIds = _redis.HKeys($"{_redisPrefix}Chan{chan}");
var offline = new List<string>();
var span = websocketIds.AsSpan();
var start = span.Length;
while (start > 0)
{
start = start - 10;
var length = 10;
if (start < 0)
{
length = start + 10;
start = 0;
}
var slice = span.Slice(start, length);
var hvals = _redis.HMGet($"{_redisPrefix}Online", slice.ToArray().Select(b => b.ToString()).ToArray());
for (var a = length - 1; a >= 0; a--)
{
if (string.IsNullOrEmpty(hvals[a]))
{
offline.Add(span[start + a]);
span[start + a] = null;
}
}
}
//删除离线订阅
if (offline.Any()) _redis.HDel($"{_redisPrefix}Chan{chan}", offline.ToArray());
}

/// <summary>
/// 获取所有群聊频道和在线人数
/// </summary>
/// <returns>频道名和在线人数</returns>
public IEnumerable<(string chan, long online)> GetChanList()
{
var ret = _redis.HGetAll<long>($"{_redisPrefix}ListChan");
return ret.Select(a => (a.Key, a.Value));
}
/// <summary>
/// 获取用户参与的所有群聊频道
/// </summary>
/// <param name="clientId">客户端id</param>
/// <returns></returns>
public string[] GetChanListByClientId(Guid clientId)
{
return _redis.HKeys($"{_redisPrefix}Client{clientId}");
}
/// <summary>
/// 获取群聊频道的在线人数
/// </summary>
/// <param name="chan">群聊频道名</param>
/// <returns>在线人数</returns>
public long GetChanOnline(string chan)
{
return _redis.HGet<long>($"{_redisPrefix}ListChan", chan);
}

/// <summary>
/// 发送群聊消息,所有在线的用户将收到消息
/// </summary>
/// <param name="senderClientId">发送者的客户端id</param>
/// <param name="chan">群聊频道名</param>
/// <param name="message">消息</param>
public void SendChanMessage(Guid senderClientId, string chan, object message)
{
var websocketIds = _redis.HKeys($"{_redisPrefix}Chan{chan}");
SendMessage(Guid.Empty, websocketIds.Where(a => !string.IsNullOrEmpty(a)).Select(a => Guid.TryParse(a, out var tryuuid) ? tryuuid : Guid.Empty).ToArray(), message);
}

#endregion
}
32 changes: 32 additions & 0 deletions ImCore/ImCore.csproj
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFramework>netstandard2.0</TargetFramework>
<Version>1.0.0</Version>
<GeneratePackageOnBuild>true</GeneratePackageOnBuild>
<Authors>YeXiangQin</Authors>
<Description>.NETCore websocket 实现简易、高性能、集群即时通讯组件,支持点对点通讯、群聊通讯、上线下线事件消息等众多实用性功能.</Description>
<PackageProjectUrl>https://github.com/2881099/im</PackageProjectUrl>
<RepositoryUrl>https://github.com/2881099/im</RepositoryUrl>
<RepositoryType>git</RepositoryType>
<PackageLicenseExpression>MIT</PackageLicenseExpression>
<PackageTags>im;websocket</PackageTags>
<PackageId>$(AssemblyName)</PackageId>
<Title>$(AssemblyName)</Title>
<IsPackable>true</IsPackable>
<GenerateAssemblyInfo>true</GenerateAssemblyInfo>
</PropertyGroup>

<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Debug|AnyCPU'">
<DocumentationFile>ImCore.xml</DocumentationFile>
<WarningLevel>3</WarningLevel>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="CSRedisCore" Version="3.0.66" />
<PackageReference Include="Microsoft.AspNetCore.WebSockets" Version="2.1.7" />
<PackageReference Include="Newtonsoft.Json" Version="12.0.2" />
<PackageReference Include="System.Memory" Version="4.5.3" />
</ItemGroup>

</Project>
Loading

0 comments on commit 193f28c

Please sign in to comment.