C# 如何实现一个事件总线

news/发布时间2024/5/15 21:58:21

EventBus(事件总线)是一种用于在应用程序内部或跨应用程序组件之间进行事件通信的机制。

它允许不同的组件通过发布和订阅事件来进行解耦和通信。在给定的代码片段中,我们可以看到一个使用C#实现的Event Bus。它定义了一些接口和类来实现事件的发布和订阅。

首先,我们有两个基本的约束接口:IEventIAsyncEventHandler<TEvent>

IEvent是一个空接口,用于约束事件的类型。IAsyncEventHandler<TEvent>是一个泛型接口,用于约束事件处理程序的类型。它定义了处理事件的异步方法HandleAsync和处理异常的方法HandleException。接下来,我们有一个IEventBus接口,它定义了一些操作方法用于发布和订阅事件。

其中,Publish<TEvent>PublishAsync<TEvent>方法用于发布事件,而OnSubscribe<TEvent>方法用于订阅事件。然后,我们看到一个实现了本地事件总线的类LocalEventBusManager<TEvent>。它实现了ILocalEventBusManager<TEvent>接口,用于在单一管道内处理本地事件。它使用了一个Channel<TEvent>来存储事件,并提供了发布事件的方法PublishPublishAsync。此外,它还提供了一个自动处理事件的方法AutoHandle

总的来说Event Bus提供了一种方便的方式来实现组件之间的松耦合通信。

通过发布和订阅事件,组件可以独立地进行操作,而不需要直接依赖于彼此的实现细节。

这种机制可以提高代码的可维护性和可扩展性。

Github仓库地址:https://github.com/DonPangPang/soda-event-bus

实现一些基本约束

先实现一些约束,实现IEvent约束事件,实现IAsyncEvnetHandler<TEvent> where TEvent:IEvent来约束事件的处理程序。

public interface IEvent
{}public interface IAsyncEventHandler<in TEvent> where TEvent : IEvent
{Task HandleAsync(IEvent @event);void HandleException(IEvent @event, Exception ex);
}

接下来规定一下咱们的IEventBus,会有哪些操作方法。基本就是发布和订阅。

public interface IEventBus
{void Publish<TEvent>(TEvent @event) where TEvent : IEvent;Task PublishAsync<TEvent>(TEvent @event) where TEvent : IEvent;void OnSubscribe<TEvent>() where TEvent : IEvent;
}

实现一个本地事件总线

本地事件处理

本地事件的处理我打算采用两种方式实现,一种是LocalEventBusManager即本地事件管理,第二种是LocalEventBusPool池化本地事件。

LocalEvnetBusManager

LocalEventBusManager主要在单一管道内进行处理,集中进行消费。

public interface ILocalEventBusManager<in TEvent>where TEvent : IEvent
{void Publish(TEvent @event);Task PublishAsync(TEvent @event) ;void AutoHandle();
}public class LocalEventBusManager<TEvent>(IServiceProvider serviceProvider):ILocalEventBusManager<TEvent>where TEvent: IEvent
{readonly IServiceProvider _servicesProvider = serviceProvider;private readonly Channel<TEvent> _eventChannel = Channel.CreateUnbounded<TEvent>();public void Publish(TEvent @event){Debug.Assert(_eventChannel != null, nameof(_eventChannel) + " != null");_eventChannel.Writer.WriteAsync(@event);}private CancellationTokenSource Cts { get; } = new();public void Cancel(){Cts.Cancel();}public async Task PublishAsync(TEvent @event){await _eventChannel.Writer.WriteAsync(@event);}public void AutoHandle(){// 确保只启动一次if (!Cts.IsCancellationRequested) return;Task.Run(async () =>{while (!Cts.IsCancellationRequested){var reader = await _eventChannel.Reader.ReadAsync();await HandleAsync(reader);}}, Cts.Token);}async Task HandleAsync(TEvent @event){var handler = _servicesProvider.GetService<IAsyncEventHandler<TEvent>>();if (handler is null){throw new NullReferenceException($"No handler for event {@event.GetType().Name}");}try{await handler.HandleAsync(@event);}catch (Exception ex){handler.HandleException( @event, ex);}}
}
LocalEventBusPool

LocalEventBusPool即所有的Event都会有一个单独的管道处理,单独消费处理,并行能力更好一些。

public sealed class LocalEventBusPool(IServiceProvider serviceProvider)
{private readonly IServiceProvider _serviceProvider = serviceProvider;private class ChannelKey{public required string Key { get; init; }public int Subscribers { get; set; }public override bool Equals(object? obj){if (obj is ChannelKey key){return string.Equals(key.Key, Key, StringComparison.OrdinalIgnoreCase);}return false;}public override int GetHashCode(){return 0;}}private Channel<IEvent> Rent(string channel){_channels.TryGetValue(new ChannelKey() { Key = channel }, out var value);if (value != null) return value;value = Channel.CreateUnbounded<IEvent>();_channels.TryAdd(new ChannelKey() { Key = channel }, value);return value;}private Channel<IEvent> Rent(ChannelKey channelKey){_channels.TryGetValue(channelKey, out var value);if (value != null) return value;value = Channel.CreateUnbounded<IEvent>();_channels.TryAdd(channelKey, value);return value;}private readonly ConcurrentDictionary<ChannelKey, Channel<IEvent>> _channels = new();private CancellationTokenSource Cts { get; } = new();public void Cancel(){Cts.Cancel();_channels.Clear();Cts.TryReset();}public async Task PublishAsync<TEvent>(TEvent @event) where TEvent : IEvent{await Rent(typeof(TEvent).Name).Writer.WriteAsync(@event);}public void Publish<TEvent>(TEvent @event) where TEvent : IEvent{Rent(typeof(TEvent).Name).Writer.TryWrite(@event);}public void OnSubscribe<TEvent>() where TEvent : IEvent{var channelKey = _channels.FirstOrDefault(x => x.Key.Key == typeof(TEvent).Name).Key ??new ChannelKey() { Key = typeof(TEvent).Name };channelKey.Subscribers++;Task.Run(async () =>{try{while (!Cts.IsCancellationRequested){var @event = await ReadAsync(channelKey);var handler = _serviceProvider.GetService<IAsyncEventHandler<TEvent>>();if (handler == null) throw new NullReferenceException($"No handler for Event {typeof(TEvent).Name}");try{await handler.HandleAsync((TEvent)@event);}catch (Exception ex){handler.HandleException((TEvent)@event, ex);}}}catch (Exception e){throw new InvalidOperationException("Error on onSubscribe handler", e);}}, Cts.Token);}private async Task<IEvent> ReadAsync(string channel){return await Rent(channel).Reader.ReadAsync(Cts.Token);}private async Task<IEvent> ReadAsync(ChannelKey channel){return await Rent(channel).Reader.ReadAsync(Cts.Token);}
}
LocalEventBus

实现LocalEventBus继承自IEventBus即可,如果有需要扩展的方法自行添加,池化和管理器的情况单独处理。

public interface ILocalEventBus: IEventBus
{}
public class LocalEventBus(IServiceProvider serviceProvider, LocalEventBusOptions options) : ILocalEventBus
{private  LocalEventBusPool? EventBusPool => serviceProvider.GetService<LocalEventBusPool>();public void Publish<TEvent>(TEvent @event) where TEvent : IEvent{if (options.Pool){Debug.Assert(EventBusPool != null, nameof(EventBusPool) + " != null");EventBusPool.Publish(@event);}else{var manager = serviceProvider.GetService<LocalEventBusManager<TEvent>>();if (manager is null) throw new NullReferenceException($"No manager for event {typeof(TEvent).Name}, please add singleton service it.");manager.Publish(@event);}}public async Task PublishAsync<TEvent>(TEvent @event) where TEvent : IEvent{if (options.Pool){Debug.Assert(EventBusPool != null, nameof(EventBusPool) + " != null");await EventBusPool.PublishAsync(@event);}else{var manager = serviceProvider.GetService<LocalEventBusManager<TEvent>>();if (manager is null) throw new NullReferenceException($"No manager for event {typeof(TEvent).Name}, please add singleton service it.");await manager.PublishAsync(@event);}}public void OnSubscribe<TEvent>() where TEvent : IEvent{if (options.Pool){Debug.Assert(EventBusPool != null, nameof(EventBusPool) + " != null");EventBusPool.OnSubscribe<TEvent>();}else{var manager = serviceProvider.GetService<LocalEventBusManager<TEvent>>();if (manager is null) throw new NullReferenceException($"No manager for event {typeof(TEvent).Name}, please add singleton service it.");manager.AutoHandle();}}
}

分布式事件总线

根据需要扩展即可,基本逻辑相同,但可能需要增加确认机制等。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.bcls.cn/RpZo/744.shtml

如若内容造成侵权/违法违规/事实不符,请联系编程老四网进行投诉反馈email:xxxxxxxx@qq.com,一经查实,立即删除!

相关文章

详解平面点云面积计算

部分代码展示&#xff1a; &#xff08;1&#xff09;利用格网法计算面积&#xff1a; //&#xff08;2&#xff09;测试使用格网法计算平面点云面积 void main() {char *inputpath "D:\\testdata\\data.txt";vector<pcl::PointXYZ> points ReadPointXYZIn…

从入门到精通:AI绘画与修图实战指南

&#x1f482; 个人网站:【 海拥】【神级代码资源网站】【办公神器】&#x1f91f; 基于Web端打造的&#xff1a;&#x1f449;轻量化工具创作平台&#x1f485; 想寻找共同学习交流的小伙伴&#xff0c;请点击【全栈技术交流群】 在这篇文章中&#xff0c;我们将深入探讨如何利…

蓝桥杯嵌入式第10届真题(完成) STM32G431

蓝桥杯嵌入式第10届真题(完成) STM32G431 题目 main.c /* USER CODE BEGIN Header */ /********************************************************************************* file : main.c* brief : Main program body********************************…

应急响应实战笔记02日志分析篇(5)

第5篇:MySQL日志分析 常见的数据库攻击包括弱口令、SQL注入、提升权限、窃取备份等。对数据库日志进行分析&#xff0c;可以发现攻击行为&#xff0c;进一步还原攻击场景及追溯攻击源。 0x01 Mysql日志分析 general query log能记录成功连接和每次执行的查询&#xff0c;我们…

Linux环境变量配置文件--《一图胜千言》

这张图是一个关于Linux系统中shell启动时配置文件加载顺序的流程图。图中分为登录shell和非登录shell两种情况&#xff0c;来描述不同配置文件的读取过程。 登录shell&#xff1a; 当用户登录时&#xff0c;会首先检查是否存在/etc/profile文件&#xff0c;如果存在&#xff0c…

华为配置旁挂二层组网直接转发示例

配置旁挂二层组网直接转发示例 组网图形 图1 配置旁挂二层组网直接转发示例组网图 业务需求组网需求数据规划配置思路配置注意事项操作步骤配置文件扩展阅读 业务需求 企业用户通过WLAN接入网络&#xff0c;以满足移动办公的最基本需求。且在覆盖区域内移动发生漫游时&#xff…

【Java】数据类型与变量

1.数据类型 在Java中数据类型主要分为两类&#xff1a;基本数据类型和引用数据类型。 基本数据类型有四类八种&#xff1a; 四类&#xff1a;整型、浮点型、字符型以及布尔型八种&#xff1a; 注意&#xff1a;不论是在16位系统还是32位系统&#xff0c;int都占用4个字节&am…

java客运管理系统Myeclipse开发mysql数据库web结构java编程计算机网页项目

一、源码特点 java客运管理系统是一套完善的java web信息管理系统&#xff0c;对理解JSP java编程开发语言有帮助&#xff0c;系统具有完整的源代码和数据库&#xff0c;系统主要采用B/S模式开发。开发环境为TOMCAT7.0,Myeclipse8.5开发&#xff0c;数据库为Mysql5.0&#…

快速学习Spring

Spring 简介 Spring 是一个开源的轻量级、非侵入式的 JavaEE 框架&#xff0c;它为企业级 Java 应用提供了全面的基础设施支持。Spring 的设计目标是简化企业应用的开发&#xff0c;并解决 Java 开发中常见的复杂性和低效率问题。 Spring常用依赖 <dependencies><!-…

docker 启动镜像命令

Docker 的启动命令用于启动 Docker 容器。这些命令可以从基本的 docker run 命令扩展到包括多个选项和参数&#xff0c;以满足不同的需求。以下是一些常用的 Docker 启动命令和选项的示例&#xff1a; 启动一个新容器&#xff1a; docker run [OPTIONS] IMAGE [COMMAND] [ARG..…

AMD FPGA设计优化宝典笔记(5)低频全局复位与高扇出

亚军老师的这本书《AMD FPGA设计优化宝典》&#xff0c;他主要讲了两个东西&#xff1a; 第一个东西是代码的良好风格&#xff1b; 第二个是设计收敛等的本质。 这个书的结构是一个总论&#xff0c;加上另外的9个优化&#xff0c;包含的有&#xff1a;时钟网络、组合逻辑、触发…

C++并发编程 -3.同步并发操作

本文介绍如何使用条件变量控制并发的同步操作、C 并发三剑客&#xff0c;函数式编程 一.条件变量 1.概念 C条件变量&#xff08;condition variable&#xff09;是一种多线程编程中常用的同步机制&#xff0c;用于线程间的通信和协调。它允许一个或多个线程等待某个条件的发生…

Kafka King 推荐一款漂亮、现代、实用的kafka客户端

Kafka King 一个漂亮、现代、实用的kafka客户端&#xff0c;使用python flet、flutter构建。 Github主页&#xff1a;https://github.com/Bronya0/Kafka-King 下载&#xff1a;https://github.com/Bronya0/Kafka-King/releases 功能清单 查看集群节点列表创建主题&#xf…

前端|JavaScript 基础 - 第2天(黑马笔记)

JavaScript 基础 - 第2天 目录 JavaScript 基础 - 第2天一、运算符1.算术运算符2.赋值运算符3.自增/自减运算符4.比较运算符5.逻辑运算符6.运算符优先级 二、语句1.表达式和语句2.分支语句if 分支语句if双分支语句if 多分支语句三元运算符&#xff08;三元表达式&#xff09;sw…

SpringSecurity + OAuth2 详解

SpringSecurity入门到精通 ************************************************************************** SpringSecurity 介绍 **************************************************************************一、入门1.简介与选择2.入门案例-默认的登录和登出接口3.登录经过了…

英文论文(sci)解读复现【NO.21】一种基于空间坐标的轻量级目标检测器无人机航空图像的自注意

此前出了目标检测算法改进专栏&#xff0c;但是对于应用于什么场景&#xff0c;需要什么改进方法对应与自己的应用场景有效果&#xff0c;并且多少改进点能发什么水平的文章&#xff0c;为解决大家的困惑&#xff0c;此系列文章旨在给大家解读发表高水平学术期刊中的 SCI论文&a…

AI专题:5G-A扬帆风正劲,踏AI增长新浪潮

今天分享的是AI系列深度研究报告&#xff1a;《AI专题&#xff1a;5G-A扬帆风正劲&#xff0c;踏AI增长新浪潮》。 &#xff08;报告出品方&#xff1a;开源证券&#xff09; 报告共计&#xff1a;22页 足立连接&#xff0c;拓展算力&#xff0c;双曲线稳步发力 中兴通讯拥…

ZISUOJ 2022年算法基础公选课练习四(Map)

说明&#xff1a; 博主为了提早预习数据结构和C的一些知识&#xff0c;自己琢磨外加查阅资料所写的代码&#xff0c;题目来源于22年初的学院老师组织的算法基础公选课的练习。我的代码甚至思路肯定存在许多不足和错误&#xff0c;欢迎大家批评指正。 题目列表&#xff1a; 问题…

猫多喝水好吗?最有效解决猫不喝水的办法

猫多喝水好吗&#xff1f;充足的水分摄入对猫咪的健康非常重要&#xff0c;有助于维持其体液平衡&#xff0c;促进消化&#xff0c;降低便秘的风险&#xff0c;并保护泌尿系统的健康。猫多喝水好吗&#xff1f;建议每公斤体重的猫每天摄入60-80毫升的水&#xff0c;除了与体重相…

Windows系统cmd常用指令大全

文章目录 前言一、怎么打开cmd&#xff08;命令提示符&#xff09;&#xff1f;1.键盘法&#xff1a;快捷键Win R2.鼠标法 二、如何使用cmd指令&#xff1f;1.关机、重启、休眠、注销指令2.查看本机IP3.复制、移动、删除文件&#xff08;1&#xff09;复制文件&#xff08;2&a…
推荐文章