NETCore2.1 RabbitMQ SignalR EFCore2.1 Angular开发微服务

巨石。根据定义,  巨石  是由单个巨大的石头或岩石组成的地质特征,例如山或作为纪念碑或建筑物内或其内部的单个大块岩石。侵蚀通常暴露这些地质构造,这些构造通常由非常坚硬和坚固的变质岩构成。

正如大多数科幻迷所知; 1968年,通过Stanley Kubrick的电影2001:A Space Odyssey,巨石一词成为流行文化的一部分  库布里克的电影推出了一个时尚的黑色巨石,这是电影和艺术史上最引人注目的标志之一。但是电影中描绘的黑色巨石是什么呢? 

在崇高的时刻,黑色巨石首先似乎激发了前人类发现技术和寻找星星。当猿人第一次看到黑色巨石时,他们正在经历一个崇高的人类时刻; 当凝视着一些似乎压倒他们的理性和感官知觉的雄伟和神秘的东西时,同时感到敬畏,惊奇和恐惧。很难说库布里克试图告诉我们什么; 也许这个信息很简单,因为黑色巨石代表了人类物种的希望。

软件巨石

在软件工程中,单片应用程序描述单个或多层软件应用程序,其中大部分代码库从单个平台组合成单个程序或组件。单片应用程序描述了一种无模块化设计的软件应用程序。模块化通常是理想的,因为它支持应用逻辑的部分的重用,并且还通过允许修复或替换应用的部分而不需要批量替换来促进维护。

单片应用程序是应用程序发展的自然方式。大多数应用程序从一个目标或少数相关目标开始。随着时间的推移,功能将添加到应用程序中以支持业务需求。不幸的是,巨石在许多方面都是不完善的,最终它们变得非常庞大,而且太昂贵而无法更新并且变得难以部署,并且最终风险太大而无法替换和现代化。

在联邦政府和大型保险和银行机构中可以找到单一系统的好例子。这些机构中的许多机构依赖于低效,昂贵,脆弱,数十年之久的系统,其中超过75%的IT预算被分配。一些机构试图使这些庞大的遗留系统现代化,但很少或没有成功。

当然它并没有止步于此。随着互联网的普及,越来越多的应用程序被编写用于万维网。遗憾的是,多年来用于Web应用程序开发的技术在大型遗留系统之后已经违反了众所周知的软件工程原理。我们现在拥有大量的遗留Web应用程序,其中包含过去十五年来开发的大量意大利面条代码。这些系统的现代化将是一项前进的挑战。

微服务架构

作为开发单片软件应用程序的替代方案,最近出现了一种称为微服务的新架构开发技术。微服务是一种软件开发技术; 面向服务的体系结构(SOA)的一种变体,它将应用程序构建为松散耦合服务的集合。在微服务架构中,服务是轻量级的。将应用程序分解为不同的较小服务的好处是它提高了模块性。这使得应用程序更易于理解,开发,测试,部署并对架构侵蚀更具弹性。

每个微服务都是一个小型应用程序,它有自己的架构,可以单独开发,测试和部署,而不会影响应用程序的其他部分。

微服务设计与规划

所以微服务架构的承诺听起来很棒。遗憾的是,目前尚未就微服务的属性达成行业共识,也缺少官方定义。经常引用的一些定义特征包括:

综上所述,很明显,在精心设计的微服务架构的开发和实现方面存在架构挑战和复杂性。需要一个好的计划和设计。

样品申请

本文的示例应用程序是一个迷你ERP应用程序,由几个后端微服务和几个后端消息队列服务组成,为前端Angular 6应用程序提供服务。以下微服务构成了示例应用程序:

样本应用的微服务

与巨石的解耦能力很难。决定将哪种解耦成微服务的能力是将整体应用程序分解为微服务生态系统的架构挑战之一。实现微服务架构最常见的问题之一是大小和粒度:如果将一个软件分成多个微服务或构建为单个微服务,微服务应该有多小。

在销售订单管理微服务的微服务设计中,我结合了维护客户和输入销售订单的功能。这两个功能似乎是相关的。可以说,维护客户应该是处理销售订单的单独微服务。 

在传统巨石中寻找领域边界既是艺术又是科学。在宏伟的计划中,您必须采用各种架构,并创造出最适合您应用的设计。作为一般规则,应用域驱动设计技术来查找定义微服务边界的有界上下文是一个很好的起点。

微服务进程间通信

只要您构建一个整体,您就不需要过多考虑模块之间的通信方式。另一方面,微服务的实现在一开始可能看起来很容易。它较小的尺寸和专注于一个特定的任务降低了它的复杂性,使其比典型的整体更容易理解。但是,当您必须实现彼此依赖的多个服务需要相互通信并共享数据时,这会很快发生变化。

没有一个解决方案,但有几个。基于微服务的应用程序是在多个进程或服务上运行的分布式系统,通常甚至跨多个服务器或主机。每个服务实例通常都是一个进程。因此,服务必须使用诸如HTTP,AMQP之类的进程间通信协议或诸如TCP之类的二进制协议进行交互,这取决于每个服务的性质。

微服务与消息队列之间的消息传递

大多数人认为构建微服务基于与使用JSON Web服务的REST相同的原则。当然,这是最常用的方法。这有一些优点,但它也有许多缺点。例如,如果被叫服务崩溃但无法响应怎么办?您的客户端服务必须实现某种重新连接或故障转移逻辑,否则,您可能会丢失请求和信息。云架构应该具有弹性,可以从故障中优雅地恢复。 

HTTP请求的替代和补充是消息队列。在处理多个相互通信服务时,使用消息队列实际上是一个相当古老的解决方案(例如Microsoft的消息队列(MSMQ)技术)。消息队列是一种方法,通过该方法,进程可以使用接口将数据交换或传递到系统管理的消息队列。消息队列可以由一个进程创建,并由多个进程使用,这些进程读取和/或写入队列中的消息。

消息队列由许多组件组成,例如:

使用消息队列,您可以在任何卷上的应用程序组件之间发送,存储和接收消息,而不会丢失消息或要求其他服务始终可用。消息队列提供了多个选项,允许您指定消息的传递,优先级和安全性。

随着云技术的激增,有几种设计和架构决策可供选择。例如,Microsoft提供Azure Service Bus,以便在应用程序和服务之间实现高度可靠的云消息传递。此外,亚马逊最近推出了一项名为Amazon MQ的新服务,这是Apache ActiveMQ的托管消息代理服务; 与大多数行业标准协议兼容的开源企业级消息代理。亚马逊选择了ActiveMQ,因为它支持大多数行业标准协议。 

RabbitMQ消息代理

示例ERP应用程序是使用Microsoft .NET Core 2.1编写的,其构思是开发可跨多种平台移植的应用程序,包括在Windows和Linux服务器上运行。为了保持可移植性,我一直在寻找便携式消息传递排队技术。在我的搜索中,我遇到了RabbitMQ。

RabbitMQ是一个开源消息代理  支持高级消息队列协议  (AMQP)。AMQP是面向消息的中间件的开放标准应用层协议。AMQP的定义特征是消息定向,排队,路由(包括点对点和发布和订阅),可靠性和安全性。

RabbitMQ轻量级,易于在内部和云中部署。RabbitMQ还可以部署以满足高规模,高可用性要求,并可在许多操作系统和云环境中运行。

消息队列建筑目标和决策

经过大量研究,我为样本应用提出了以下微服务设计

现在我们将拥有所有这些,现在我们可以浏览示例ERP应用程序的一些代码。

帐户管理登录Web API 

将使用JSON Web令牌保护和保护示例应用程序的每个微服务。JSON Web令牌(JWT)是一种开放标准(RFC 7419),它定义了一种紧凑且独立的方式,用于在各方之间作为JSON对象安全地传输信息。此信息可以通过数字签名进行验证和信任。可以使用秘密(使用HMAC算法)或使用RSA或ECDSA的公钥/私钥对来签署JWT。

要登录示例应用程序,将使用从客户端Web请求传入的用户凭据(电子邮件地址和密码)执行帐户管理Web API的登录控制器操作方法,并且操作方法将继续调用帐户管理用于根据帐户管理数据库验证用户的业务服务。

成功登录后,将生成JSON Web令牌并将其返回到客户端应用程序,在该应用程序中,它将被持久保存并保存在客户端的本地存储中。JSON Web令牌将包含在对示例应用程序的任何Web API端点发出的每个客户端HTTP请求的标头中。

隐藏   收缩    复制代码

/// <summary>/// Login/// </summary>/// <param name="accountDataTransformation"></param>/// <returns></returns>[HttpPost]
[Route("Login")]public async Task<IActionResult> Login([FromBody] AccountDataTransformation accountDataTransformation)
{
    ResponseModel<AccountDataTransformation> returnResponse = 
                                             new ResponseModel<AccountDataTransformation>();    try
    {
        returnResponse = await _accountBusinessService.Login(accountDataTransformation);        if (returnResponse.ReturnStatus == true)
        {            int userId = returnResponse.Entity.UserId;            int accountId = returnResponse.Entity.AccountId;            string firstName = returnResponse.Entity.FirstName;            string lastName = returnResponse.Entity.LastName;            string emailAddress = returnResponse.Entity.EmailAddress;            string companyName = returnResponse.Entity.CompanyName;            string tokenString = TokenManagement.CreateToken(
                                 userId, firstName, lastName, emailAddress, accountId, companyName);

            returnResponse.Entity.IsAuthenicated = true;
            returnResponse.Entity.Token = tokenString;            return Ok(returnResponse);

         }         else
         {            return BadRequest(returnResponse);
         }

    }    catch (Exception ex)
    {
         returnResponse.ReturnStatus = false;
         returnResponse.ReturnMessage.Add(ex.Message);         return BadRequest(returnResponse);
    }

}

JSON Web令牌生成

Microsoft .NET Core 2.1强大支持生成和验证JSON Web令牌。下面的CreateToken方法获取用户的凭据和帐户信息,并创建将存储在令牌中的声明信息,包括用户的帐户ID,用户ID,名字,姓氏和公司名称。此信息将用于在每个HTTP请求上对用户进行身份验证。创建声明信息后,可以对令牌进行签名并作为Web API响应中的加密字符串返回。

隐藏   收缩    复制代码

/// <summary>/// Create Token/// </summary>/// <param name="userId"></param>/// <param name="firstName"></param>/// <param name="lastName"></param>/// <param name="emailAddress"></param>/// <param name="companyName"></param>/// <returns></returns>public static string CreateToken(int userId, 
                                 string firstName, 
                                 string lastName, 
                                 string emailAddress, 
                                 int accountId, 
                                 string companyName)
{    var sharedKey = new SymmetricSecurityKey(Encoding.UTF8.GetBytes(
                                             "CodeProject.Shared.Common.TokenManagement"));

    List<Claim> claims = new List<Claim>
    {        new Claim(ClaimTypes.Email, emailAddress),        new Claim(ClaimTypes.NameIdentifier, lastName),        new Claim(ClaimTypes.GivenName, firstName),        new Claim(ClaimTypes.Name, companyName),        new Claim(ClaimTypes.PrimarySid, userId.ToString()),        new Claim(ClaimTypes.PrimaryGroupSid, accountId.ToString())
    };    var signinCredentials = new SigningCredentials(sharedKey, SecurityAlgorithms.HmacSha512Signature);    var tokenDescription = new SecurityTokenDescriptor
    {
        Subject = new ClaimsIdentity(claims),
        NotBefore = DateTime.Now,
        Expires = DateTime.Now.AddMinutes(60),
        SigningCredentials = signinCredentials
    };    var tokenHandler = new JwtSecurityTokenHandler();    var token = tokenHandler.CreateToken(tokenDescription);    string tokenString = tokenHandler.WriteToken(token);    return tokenString;

}

ASP.NET Core 2.1 Web API配置和启动

ASP.NET Core 2.1应用程序使用启动类来配置应用程序服务及其HTTP请求处理管道。ASP.NET Core 2.1体系结构具有中间件系统,这些中间件是处理请求和响应的代码片段。中间件组件彼此链接以形成管道。传入请求通过管道传递,其中每个中间件都有机会在将请求传递给下一个中间件组件之前对请求执行某些操作。传出的响应以相反的顺序通过管道传递。 

中间件架构是使ASP.NET Core 2.1成为构建可在Windows,Mac和Linux OS上运行的Web和云应用程序的精简和可组合框架的关键基础。从本质上讲,您可以完全控制Web应用程序配置中包含的功能。

隐藏   收缩    复制代码

public class Startup
{    public Startup(IConfiguration configuration)
    {
        Configuration = configuration;
    }    public IConfiguration Configuration { get; }    /// <summary>    /// This method gets called by the runtime. Use this method to add services to the container.    /// </summary>    /// <param name="services"></param>    public void ConfigureServices(IServiceCollection services)
    {
          
        CorsPolicyBuilder corsBuilder = new CorsPolicyBuilder();

        corsBuilder.AllowAnyHeader();
        corsBuilder.AllowAnyMethod();
        corsBuilder.AllowAnyOrigin();
        corsBuilder.AllowCredentials();

        services.AddCors(options =>
        {
            options.AddPolicy("SiteCorsPolicy", corsBuilder.Build());
        });

        ConnectionStrings connectionStrings = new ConnectionStrings();
        Configuration.GetSection("ConnectionStrings").Bind(connectionStrings);

        services.AddDbContext<AccountManagementDatabase>(
                              options => options.UseSqlServer(
                              Configuration.GetConnectionString("PrimaryDatabaseConnectionString")));            
        //
        //    Built-In Dependency Injection
        //

        services.AddTransient<IAccountManagementDataService, AccountManagementDataService>();

        services.AddTransient<IAccountManagementBusinessService>(provider =>
        new AccountManagementBusinessService(provider
            .GetRequiredService<IAccountManagementDataService>(), connectionStrings));

        services.AddAuthentication(JwtBearerDefaults.AuthenticationScheme).AddJwtBearer(options =>
        {
            options.TokenValidationParameters = new TokenValidationParameters
            {
                ValidateIssuer = false,
                ValidateAudience = false,
                ValidateLifetime = true,
                ValidateIssuerSigningKey = true,
                ValidIssuer = "https://codeproject.microservices.com",
                ValidAudience = "https://codeproject.microservices.com",
                IssuerSigningKey = new SymmetricSecurityKey(Encoding.ASCII.GetBytes(
                                       "CodeProject.Shared.Common.TokenManagement"))
            };
        });

        services.AddScoped<SecurityFilter>();

        services.AddMvc().SetCompatibilityVersion(CompatibilityVersion.Version_2_1);

        services.AddSignalR();

    }    // This method gets called by the runtime. Use this method to configure the HTTP request pipeline.
    public void Configure(IApplicationBuilder app, IHostingEnvironment env)
    {
           
        app.UseCors("SiteCorsPolicy");
        app.UseAuthentication();        if (env.IsDevelopment())
        {
            app.UseDeveloperExceptionPage();
        }        else
        {
            app.UseHsts();
        }

        app.UseHttpsRedirection();

        app.UseMvc();
    }  
}

在上面的帐户管理Web API应用程序的启动类中,正在配置以下项目:

ASP.NET Core 2.1附带一个简单的内置依赖注入容器。依赖注入是ASP.NET Core 2.1的核心。它允许应用程序中的组件具有改进的可测试性,并使组件松散耦合并增加可扩展性。

配置依赖项注入时,了解应用程序依赖项的生命周期管理至关重要。使用默认的ASP.NET Core 2.1依赖注入容器注册依赖项时,需要考虑生命周期管理。您可能已经注意到使用不同的方法来注册上面的启动类中的services.AddScoped  和 services.AddTransient之类的依赖项  

ASP.NET Core 2.1依赖注入中有三个服务生命周期:

示例ERP应用程序是无状态应用程序,因为在每个Web请求上创建和销毁线程和对象。考虑到这一点,应用程序的业务和数据访问依赖关系是在瞬态生命周期中创建的。

配置ASP.NET Core 2.1 Web API端点

通过在启动类中配置.NET Core 2.1管道,您现在可以保护应用程序的Web API端点。在以下销售订单控制器的代码中,配置了以下内容:

隐藏   复制代码

[ServiceFilter(typeof(SecurityFilter))]
[Authorize]
[Route("api/[controller]")]
[EnableCors("SiteCorsPolicy")]
[ApiController]public class SalesOrderController : ControllerBase
{    private readonly IInventoryManagementBusinessService _inventoryManagementBusinessService;    private IHubContext<MessageQueueHub> _messageQueueContext;    /// <summary>    /// Sales Controller    /// </summary>    public SalesOrderController(IInventoryManagementBusinessService inventoryManagementBusinessService,
                                IHubContext<MessageQueueHub> messageQueueContext)
    {
        _inventoryManagementBusinessService = inventoryManagementBusinessService;
        _messageQueueContext = messageQueueContext;
    }

}

解析JSON Web令牌

将ASP.NET Core 2.1配置为使用JWT承载令牌身份验证时,您将可以访问每个Web请求中令牌中提供的声明信息。如先前在启动时和控制器类级别配置的那样,下面的ActionFilter将在执行每个Web API控制器操作方法之前执行。ASP.NET Core 2.1公开了HttpContext用户属性作为ClaimsPrincipal对象。在  用户对象被自动通过ASP.NET核心与来自JSON网络令牌权利要求信息填充。

下面的操作筛选器提取HTTP请求标头中包含的JSON Web令牌中提供的声明,并将它们写入SecurityModel类。所述SecurityModel类被添加到HTTP上下文,以便Web API控制器的操作方法可以参考权利要求信息并转发这个信息到业务和数据访问组件用于在用户和帐户级过滤和保护数据。

隐藏   收缩    复制代码

public class SecurityFilter : IAsyncActionFilter
{    /// <summary>    /// Action Filter    /// </summary>    /// <param name="context"></param>    /// <param name="next"></param>    /// <returns></returns>    public async Task OnActionExecutionAsync(ActionExecutingContext context,
                                             ActionExecutionDelegate next)
    {        string firstName = context.HttpContext.User.FindFirst(ClaimTypes.GivenName).Value;        string lastName = context.HttpContext.User.FindFirst(ClaimTypes.NameIdentifier).Value;        string emailAddress = context.HttpContext.User.FindFirst(ClaimTypes.Email).Value;        string companyName = context.HttpContext.User.FindFirst(ClaimTypes.Name).Value        int userId = int.Parse(context.HttpContext.User.FindFirst(ClaimTypes.PrimarySid).Value);        int accountId = int.Parse(context.HttpContext.User.FindFirst(
                                  ClaimTypes.PrimaryGroupSid).Value);        string token = TokenManagement.CreateToken(userId, 
                                                   firstName, 
                                                   lastName, 
                                                   emailAddress, 
                                                   accountId, 
                                                   companyName);

        SecurityModel securityModel = new SecurityModel();
        securityModel.EmailAddress = emailAddress;
        securityModel.FirstName = firstName;
        securityModel.LastName = lastName;
        securityModel.UserId = userId;
        securityModel.AccountId = accountId;
        securityModel.Token = token;

        context.HttpContext.Items["SecurityModel"] = securityModel;        var resultContext = await next();

    }
}

示例应用程序演练

现在我们已经为ASP.NET Core 2.1配置了所有内容,我们可以开始遍历示例应用程序的一个业务事务。在示例ERP应用程序中,完整的端到端业务流程将包含以下工作流程:

  1. 在库存管理微服务中创建产品。

  2. 产品通过消息传递到销售订单管理和采购订单管理微服务。

  3. 在采购订单管理微服务中创建采购订单,以从供应商处订购产品。

  4. 采购订单将传输到库存管理微服务,以便将产品接收到仓库中。

  5. 产品在仓库中收到并记录在库存管理微服务中的采购订单上,并创建库存接收交易。

  6. 收到的库存交易将传输到采购订单管理微服务,以更新采购订单上收到的数量。

  7. 收到的库存交易也会传输到销售订单管理微服务,以便可以根据手头的可用产品下达销售订单。

  8. 在销售订单管理微服务中为可用产品创建销售订单,并将其传送到库存管理微服务,以便将销售订单上的产品发送给客户。

  9. 库存管理微服务将产品装载到库存管理微服务中的销售订单上,并创建装运库存交易以减少库存管理微服务数据库中的可用库存数量。

  10. 装运库存交易将传输到销售订单管理微服务,以更新销售订单上的装运数量。

送货产品 

对于本文,我们将介绍将销售订单上的产品发送给客户的过程。演练将涵盖.NET Core 2.1 Web API和后端.NET Core 2.1消息队列服务以及它与RabbitMQ的交互的关键点。

Web API和消息队列服务之间的后端工作流程如下:

在示例应用程序中,运输产品需要登录Angular前端应用程序并访问库存管理模块并查找销售订单,并提取订单的详细信息并在销售订单行项目上输入装运数量。

在订单项上输入装运数量并点击保存将触发在库存管理Web API中执行以下UpdateSalesOrderDetail控制器操作方法。

隐藏   收缩    复制代码

/// <summary>/// Update Sales Order Detail/// </summary>/// <param name="salesOrderDetailDataTransformation"></param>/// <returns></returns>[HttpPost]
[Route("UpdateSalesOrderDetail")]public async Task<IActionResult> UpdateSalesOrderDetail([FromBody] 
             SalesOrderDetailDataTransformation salesOrderDetailDataTransformation)
{

    SecurityModel securityModel = (SecurityModel)(HttpContext.Items["SecurityModel"]);    int accountId = securityModel.AccountId;
    salesOrderDetailDataTransformation.AccountId = accountId;

    ResponseModel<SalesOrderDetailDataTransformation> returnResponse =
                  new ResponseModel<SalesOrderDetailDataTransformation>();    try
    {
        returnResponse = await _inventoryManagementBusinessService.UpdateSalesOrderDetail(
                                                       salesOrderDetailDataTransformation);

        returnResponse.Token = securityModel.Token;        if (returnResponse.ReturnStatus == false)
        {            return BadRequest(returnResponse);
        }        await _messageQueueContext.Clients.All.SendAsync(MessageQueueEndpoints.InventoryQueue, 
                                                         string.Empty);        return Ok(returnResponse);

    }    catch (Exception ex)
    {
         returnResponse.ReturnStatus = false;
         returnResponse.ReturnMessage.Add(ex.Message);         return BadRequest(returnResponse);
    }

}

异步等待 - 异步处理

Inventory Management Web API中UpdateSalesOrderDetail控制器操作方法将异步运行。创建异步Web API控制器操作方法可以显着提高服务器性能,允许增加服务器可以处理的并发客户端数量。这是因为异步控制器操作方法通过将线程返回到可用线程池中而方法等待其他异步进程完成而更快地释放服务器线程。

ASP.NET Core 2.1允许Web API控制器和操作方法使用async await关键字异步运行示例应用程序中的所有控制器操作方法都将使用方法签名中的  async 关键字。所有控制器操作方法也将返回包含  IActionResult的  Task  该UpdateSalesOrderDetail控制器的操作方法还要求使用库存管理业务服务等待 的关键字。所有库存管理业务服务方法都实现async / await模式一直到数据访问服务层,其中Entity Framework Core将异步执行LINQ语句。

为了正确实现异步处理,应用程序的每一层都必须在整个过程中实现异步等待功能。

安全性,数据转换对象和响应模型

在执行  UpdateSalesOrderDetail控制器操作方法之前,已提取执行的安全操作筛选器和来自JSON Web令牌的声明信息,以填充附加到HttpContextSecurityModel对象控制器操作方法通过HttpContext引用此对象,并将用户的帐户ID传递到库存管理业务服务。使用JSON Web令牌中的信息是保护应用程序数据的好方法。 

所述  UpdateSalesOrderDetail 控制器的操作方法将使用销售订单数据转换对象 (DTO) DTO是一种封装数据的设计模式,用于在软件应用程序子系统之间传输数据。在示例应用程序中,A DTO是前端视图模型和后端数据库实体模型之间的中间人。

最后,如果事务成功UpdateSalesOrderDetail控制器操作方法将返回一个ResponseModel对象以及HTTP状态代码为200(OK)的HTTP响应。如果事务失败,则返回ResponseModel对象,其HTTP状态代码为401(错误请求)。

库存管理业务服务 

当控制器操作方法在Inventory Management业务服务中请求UpdateSalesOrderDetail方法时,将在支持和作为异步任务运行时在业务服务方法中执行以下操作:

隐藏   收缩    复制代码

/// <summary>/// Update Sales Order Detail/// </summary>/// <param name="salesOrderDetailDataTransformation"></param>/// <returns></returns>public async Task<ResponseModel<SalesOrderDetailDataTransformation>> UpdateSalesOrderDetail(
                      SalesOrderDetailDataTransformation salesOrderDetailDataTransformation)
{

    ResponseModel<SalesOrderDetailDataTransformation> returnResponse = 
             new ResponseModel<SalesOrderDetailDataTransformation>();

    SalesOrderDetail salesOrderDetail = new SalesOrderDetail();    try
    {        int accountId = salesOrderDetailDataTransformation.AccountId;        int salesOrderId = salesOrderDetailDataTransformation.SalesOrderId;        int salesOrderDetailId = salesOrderDetailDataTransformation.SalesOrderDetailId;        //
        //    Validate Shipped Quantity
        //

        if (salesOrderDetailDataTransformation.CurrentShippedQuantity == 0)
        {
            returnResponse.ReturnMessage.Add("Invalid Shipped Quantity");
            returnResponse.ReturnStatus = false;            return returnResponse;
        }        //
        //    Begin a Serializable Transaction
        //

        _inventoryManagementDataService.OpenConnection(  
                                        _connectionStrings.PrimaryDatabaseConnectionString);

        _inventoryManagementDataService.BeginTransaction((int)IsolationLevel.Serializable);        //
        //    Get Sales Order Header
        //

        SalesOrder salesOrder = await _inventoryManagementDataService
                                      .GetSalesOrderHeader(accountId, salesOrderId);        if (salesOrder == null)
        {
            _inventoryManagementDataService.RollbackTransaction();

            returnResponse.ReturnMessage.Add("Sales Order not found");
            returnResponse.ReturnStatus = false;            return returnResponse;
        }        //
        //    Get Sales Order Detail
        //

        salesOrderDetail = await _inventoryManagementDataService
                                 .GetSalesOrderDetailForUpdate(salesOrderDetailId);        if (salesOrderDetail == null)
        {
            _inventoryManagementDataService.RollbackTransaction();

            returnResponse.ReturnMessage.Add("Sales Order Detail not found");
            returnResponse.ReturnStatus = false;            return returnResponse;
        }        //
        //    Update Sales Order Shipped Quantity
        //

        salesOrderDetail.ShippedQuantity = salesOrderDetail.ShippedQuantity + 
                                           salesOrderDetailDataTransformation.CurrentShippedQuantity;        await _inventoryManagementDataService.UpdateSalesOrderDetail(salesOrderDetail);        //
        //    Get Product Record with an exclusive update lock
        //

        Product product = await _inventoryManagementDataService
                                .GetProductInformationForUpdate(salesOrderDetail.ProductId);        if (product == null)
        {
            _inventoryManagementDataService.RollbackTransaction();

            returnResponse.ReturnMessage.Add("Product not found");
            returnResponse.ReturnStatus = false;            return returnResponse;
        }        //
        //    Reduce Product OnHand Quantity by the quantity shipped
        //

        product.OnHandQuantity = product.OnHandQuantity - 
                                 salesOrderDetailDataTransformation.CurrentShippedQuantity;        await _inventoryManagementDataService.UpdateProduct(product);        //
        //    Create Inventory Transaction Record
        //

        InventoryTransaction inventoryTransaction = new InventoryTransaction();
        inventoryTransaction.EntityId = salesOrderDetail.SalesOrderDetailId;
        inventoryTransaction.MasterEntityId = salesOrderDetail.MasterSalesOrderDetailId;
        inventoryTransaction.ProductId = salesOrderDetail.ProductId;
        inventoryTransaction.UnitCost = product.AverageCost;
        inventoryTransaction.Quantity = salesOrderDetailDataTransformation.CurrentShippedQuantity;
        inventoryTransaction.TransactionDate = DateTime.UtcNow;        await _inventoryManagementDataService.CreateInventoryTransaction(inventoryTransaction);        //
        //    Create Transaction Queue record and create inventory transaction payload
        //

        TransactionQueueOutbound transactionQueue = new TransactionQueueOutbound();
        transactionQueue.Payload = GenerateInventoryTransactionPayload(inventoryTransaction);
        transactionQueue.TransactionCode = TransactionQueueTypes.InventoryShipped;
        transactionQueue.ExchangeName = MessageQueueExchanges.InventoryManagement;        await _inventoryManagementDataService.CreateOutboundTransactionQueue(transactionQueue);        await _inventoryManagementDataService.UpdateDatabase();        //
        //    Commit Transaction
        //

        _inventoryManagementDataService.CommitTransaction();

        returnResponse.ReturnStatus = true;

    }    catch (Exception ex)
    {
       _inventoryManagementDataService.RollbackTransaction();
       returnResponse.ReturnStatus = false;
       returnResponse.ReturnMessage.Add(ex.Message);
    }    finally
    {
       _inventoryManagementDataService.CloseConnection();
    }

    returnResponse.Entity = salesOrderDetailDataTransformation;    return returnResponse;

}

隔离级别 - 可序列化事务

数据库事务指定  隔离级别  ,该级别定义一个事务必须与其他事务所做的数据修改隔离的程度。 隔离级别  是根据允许的并发副作用(如脏读或幻像读取)来描述的。

SQL标准定义了四个隔离级别:

默认情况下,Entity Framework Core使用Read Committed的隔离级别由于样本ERP应用程序可以被数百名用户同时更新产品和库存数量使用,因此很可能有多个用户同时请求更新同一数据库表行。为确保数据完整性并防止幻像更新和数据丢失,UpdateSalesOrderDetail方法将开始可序列化的事务。使用可序列化事务将保证对相同产品行的更新将按顺序完成,其中每个SQL事务将在下一个SQL事务开始之前执行完成。

隐藏   复制代码

////    Begin a Serializable Transaction//_inventoryManagementDataService.OpenConnection(_connectionStrings.PrimaryDatabaseConnectionString);
_inventoryManagementDataService.BeginTransaction((int)IsolationLevel.Serializable);

UPDLOCK SQL提示和实体框架核心2.1

事实证明,简单地创建可序列化事务不足以确保数据完整性,同时在同一数据库表行上执行多个同时更新。

此外,在选择要更新的行时,您需要获取行级更新锁。将SQL Server UPDLOCK提示应用于SELECT语句将为您执行此操作。UPDLOCK提示指定在事务完成之前采用并保持更新锁。

最新版本的Entity Framework Core的一个很酷的事情是,您现在可以覆盖Entity Framework Core通常会创建的SELECT语句。实体框架核心允许您在使用关系数据库时下拉到原始SQL查询。

如果您要执行的查询无法使用LINQ表示,则此选项非常有用。这在这种情况下很有用,因为我们可以使用UPDLOCK提示创建一个SQL语句,并使用Entity Framework Core FromSQL方法执行带有行级更新锁定的SQL语句。

与任何接受SQL的API一样,重要的是参数化任何用户输入以防止SQL注入攻击。Entity Framework Core还支持参数化查询。您可以在SQL查询字符串中包含参数占位符,然后提供参数值作为附加参数。您提供的任何参数值将自动转换为DbParameter对象库存管理数据服务GetProductInformationUpdate方法中,产品ID作为参数化参数提供,并且所选行返回到库存管理业务服务,而SQL Server在该行上持有锁。

隐藏   复制代码

/// <summary>/// Get Product Information For Update with exclusive row lock/// </summary>/// <param name="productId"></param>/// <returns></returns>public async Task<Product> GetProductInformationForUpdate(int productId)
{    string sqlStatement = "SELECT * FROM PRODUCTS WITH (UPDLOCK) WHERE PRODUCTID = @ProductId";

    DbParameter productIdParameter = new SqlParameter("ProductId", productId);

    Product product = await dbConnection.Products.FromSql(sqlStatement, 
                                                          productIdParameter).FirstOrDefaultAsync();    return product;
}

消息队列事务表

作为一个设计决策,我希望每个微服务都是自包含的,而不是跨越边界或远程调用其他微服务。与大多数架构决策一样,进入的价格也是如此。在这种情况下,必须跨微服务共享数据。

为了支持微服务隔离,进入的价格是在多个微服务中复制数据库信息。例如,在库存管理微服务中创建和维护产品信息。采购订单管理微服务和销售订单管理微服务都需要产品信息,以允许产品经理从供应商订购产品,并允许客户根据可用库存下达销售订单。产品表和数据必须存在于每个微服务中。

这就是消息排队将发挥作用的地方,其中信息可以实时传输和共享到这些微服务。Product表的结构可以并且将在微服务之间不同。例如,库存管理数据库中的产品表将包含产品的每条信息,但采购订单管理数据库或销售订单管理数据库都不需要跟踪产品的仓位和其他仓库信息等内容。 。

作为此设计决策的一部分,我想创建消息队列消息和有效负载,这些消息可以在任何消息发送到RabbitMQ之前参与并在数据库业务事务中提交。这将保证消息永不丢失,并且可以在需要时记录和重新发送消息。

对于每个微服务,在每个专用微服务数据库中创建以下四个表以处理消息队列消息并记录它们的活动。

创建消息队列消息有效负载

消息队列消息的一部分是其  有效负载有效负载是您要传输的数据。对于示例应用程序,有效负载信息将保存在TransactionOutboundQueue表中,以便发送消息队列有效负载信息。库存管理业务服务UpdateSalesOrderDetail方法中,库存事务已提交到数据库。对于消息队列有效负载,库存事务将序列化为JSON结构并保存为TransactionOutboundQueue表中的字符串,稍后将检索表并包含在消息队列消息有效负载中。

隐藏   复制代码

/// <summary>/// Generate Inventory Transaction Payload/// </summary>/// <param name="inventoryTransaction"></param>/// <returns></returns>private string GenerateInventoryTransactionPayload(InventoryTransaction inventoryTransaction)
{
    InventoryTransactionPayload inventoryTransactionPayload = new InventoryTransactionPayload();

    inventoryTransactionPayload.ProductId = inventoryTransaction.ProductId;
    inventoryTransactionPayload.Quantity = inventoryTransaction.Quantity;
    inventoryTransactionPayload.UnitCost = inventoryTransaction.UnitCost;
    inventoryTransactionPayload.EntityId = inventoryTransaction.EntityId;
    inventoryTransactionPayload.MasterEntityId = inventoryTransaction.MasterEntityId;
    inventoryTransactionPayload.TransactionDate = inventoryTransaction.TransactionDate;    string payload = SerializationFunction<InventoryTransactionPayload>
                     .ReturnStringFromObject(inventoryTransactionPayload);    return payload;

}

RabbitMQ最佳实践

在此过程中,我们将货物库存交易提交到库存管理数据库,但我们尚未告知销售订单管理微服务订单已发货。销售订单已在库存管理数据库中更新,但销售订单也需要在销售订单管理数据库中更新。

在继续实施RabbitMQ以从Inventory Management微服务向Sales Order Management微服务发送消息之前,我想了解有关RabbitMQ最佳实践的更多信息。

某些应用程序需要非常高的吞吐量,而其他应用程序正在运行可能会延迟一段时间的批处理作业 设计系统时的目标应该是最大限度地提高对特定应用程序有意义的性能和可用性组合。糟糕的架构设计决策或错误可能会损坏或影响您的吞吐量。

互联网上记录了以下RabbitMQ最佳实践:

ASP.NET Core 2.1可伸缩性 

详细阅读RabbitMQ最佳实践让我相信,在Web API应用程序中直接合并和实现RabbitMQ并不是一个好主意。 

Web服务器内存和资源应被视为有限的资源。ASP.NET Core Web API应用程序被设计为无状态应用程序,其中每个Web请求不断创建和销毁线程,从而释放内存并提高应用程序可伸缩性; 随着用户群的增加,保留资源会增加服务器内存使用量。

如其最佳实践和建议所述,需要实现RabbitMQ连接,而无需重复打开和关闭连接。应该为发送和使用消息创建单独的连接,这意味着您至少需要在Web API应用程序中创建两个单独的单一生命周期线程。 

创建多个单独线程似乎是无状态ASP.NET Core Web API应用程序的反模式。具有单例生存期的类对象也需要为线程安全性进行管理。处理不当可能会在Web API应用程序中创建竞争条件错误。当两个或多个线程同时到达特定代码块时会发生竞争条件错误,从而产生损坏的对象和属性状态。

避免竞争条件需要锁定代码块,以便一次只有一个线程可以一次执行代码块。当您有数百个并发用户访问您的应用程序时,锁定Web API应用程序中的代码块似乎会产生瓶颈并降低应用程序可伸缩性。

构建消息队列服务的优点

为了避免在Web API应用程序中创建和管理单例生命周期对象,我决定创建一个单独的多线程.NET Core 2.1控制台应用程序,作为每个微服务的消息队列服务,管理和处理所有RabbitMQ连接,通道和消息处理。

这些控制台应用程序将运行多个线程,每个线程以预定义的间隔(5或15分钟)运行,每个线程都与RabbitMQ和SQL-Server交互。

构建消息队列服务与在Web API应用程序中集成消息队列处理相比具有许多优点和优势,因为它们可以提供以下内容:

实现ASP.NET Core 2.1 SignalR

作为一个很好的,我想实时发送消息。因此,我需要一种方法将消息发送到消息队列服务(控制台应用程序)以唤醒它以处理消息,以防它在间隔之间空闲。这导致我使用ASP.NET Core 2.1 SignalR。

ASP.NET Core SignalR是一个开源库,可以简化向应用程序添加实时Web功能的过程。实时Web功能使服务器端代码能够立即将内容推送到客户端。SignalR最常用于与JavaScript客户端交互。在这种情况下,客户端是控制台应用程序。

SignalR使用  集线器  在客户端和服务器之间进行通信。集线器是一个高级管道,允许客户端和服务器相互调用方法。SignalR自动处理跨机器边界的调度,允许客户端调用服务器上的方法,反之亦然。

要创建Hub,只需添加一个继承自 Microsoft.AspNetCore.SignalR.Hub的类,并定义Hub类中可由客户端执行的方法。由于Inventory Management Web API应用程序仅使用SignalR发送消息,因此MessageQueueHub类将不会定义任何方法。

隐藏   复制代码

namespace CodeProject.InventoryManagement.WebApi.SignalRHub
{
    public class MessageQueueHub : Hub
    {

    }
}

在ASP.NET Core SignalR中,您可以通过依赖注入访问IHubContext 的实例IHubContext的一个实例在启动类中配置并注入控制器,该实例可用于向客户端发送消息。UpdateSalesOrderDetail操作方法中,在库存管理业务服务成功提交库存装运事务后执行以下行。Clients.All.SendAynsc语句将消息发送到所有客户端,侦听URL“中发生的事件的https://本地主机:44340 / MessageQueueHub“。对于库存管理Web API,只有库存管理消息队列服务将侦听此URL。

隐藏   复制代码

await _messageQueueContext.Clients.All.SendAsync(MessageQueueEndpoints.InventoryQueue, string.Empty);

侦听ASP.NET Core SignalR消息

要监听ASP.NET Core SignalR消息,库存管理消息队列服务实现Microsoft.AspNetCore.SignalR.Client包。ASP.NET Core SignalR .NET客户端库允许您与.NET应用程序中的SignalR集线器进行通信。

消息队列服务将启动单独的线程任务,以发送,接收和处理消息队列消息。在启动SendMessages任务线程时,基于Inventory管理Web API SignalR URL“ https:// localhost:44340 / MessageQueueHub ” 建立与SignalR的连接

如果集线器在尝试连接时未启动并运行,则会添加重新连接逻辑以重试连接。一旦连接到集线器,消息队列服务就会侦听On事件,并且在每个引发的事件上,排队服务将调用GetMessgaesInQueue方法来检索消息并将它们发送到RabbitMQ。

隐藏   收缩    复制代码

/// <summary>/// Start Process Interval/// </summary>/// <param name="cancellationToken"></param>/// <returns></returns>public Task StartAsync(CancellationToken cancellationToken)
{

    StartSignalRConnection();

    _timer = new Timer(GetMessagesInQueue, null, TimeSpan.Zero,
                       TimeSpan.FromSeconds(_appConfig.SendingIntervalSeconds));

    return Task.CompletedTask;
}/// <summary>/// Start SignalR Connection/// </summary>private async void StartSignalRConnection()
{    if (string.IsNullOrEmpty(_appConfig.SignalRHubUrl))
    {        return;
    }    string url = _appConfig.SignalRHubUrl; /// "https://localhost:44340/MessageQueueHub",
    //    //  Build Hub Connection    //

    Boolean buildHubConnection = false;    while (buildHubConnection  == false)
    {        try
        {
            _signalRHubConnection = new HubConnectionBuilder().WithUrl(url).Build();
            buildHubConnection  = true;
        }        catch (Exception ex)
        {
            Console.WriteLine(ex.Message);            await Task.Delay(5000);
        }

    }
       
    //    //   Listen for SignalR messages    //      

    _signalRHubConnection.On<string>(_signalRQueue, (message) =>
    {        this.GetMessagesInQueue(null);

    });

    //    //   Listen for Hub Connection Closed Event    //

    _signalRHubConnection.Closed += async (error) =>
    {
        Console.WriteLine("SignalR Connection Closed");        await Task.Delay(10000);        await _signalRHubConnection.StartAsync();
        Console.WriteLine("Restart SignalR");
    };

    //    //  Start Hub Connection    //

    connected = false;    while (connected == false)
    {         try
         {               await _signalRHubConnection.StartAsync();
               connected = true;

        }        catch (Exception ex)
        {              await Task.Delay(10000);
        }

    }
        
}

配置库存管理消息队列服务

使用.NET Core创建控制台应用程序时,您会注意到在构建应用程序时不会创建exe。默认情况下,.NET Core将生成一个构建为不生成exe的便携式应用程序的DLL。它们由.NET Core共享运行时执行。您可以通过运行命令dotnet run来运行应用程序但是,如果你真的想生成exe,那么只需运行以下命令:

dotnet publish -c Debug -r win10-x64或  dotnet publish -c Release -r win10-x64

这将创建一个类似于当前.NET控制台应用程序的独立控制台应用程序。这使我们无需在目标计算机上运行.NET Core即可运行应用程序。

从C#版本7.1开始,您可以创建具有静态入口点的控制台应用程序作为异步任务,从而允许您创建多线程控制台应用程序。此外,.NET Core 2.1附带了一个新功能集,可简化基于控制台的服务的创建。这些新功能包括IHostHostBuilder

.NET Core 2.1应用程序配置并启动  主机主机负责应用程序启动和生命周期管理。使用.NET Core HostBuilder,后台任务可以作为  托管服务实现一个托管服务是实现后台任务逻辑类IHostedService 接口。对于库存管理消息队列服务,将创建将在计时器上运行的三个后台任务; 一个用于发送消息,一个用于接收消息,一个用于处理消息。

在控制台应用程序的Main方法中,您可以首先创建一个HostBuilder,然后使用扩展方法通过依赖注入注册服务,读取配置信息并配置应用程序所需的日志记录。对于消息队列控制台应用程序,使用services.AddTransient方法将每个后台任务注册为具有瞬态生命周期的服务

隐藏   收缩    复制代码

public static async Task Main(string[] args)
{    //
    //    get configuration information
    //

    MessageQueueAppConfig messageQueueAppConfig = new MessageQueueAppConfig();
    ConnectionStrings connectionStrings = new ConnectionStrings();    string environment = Environment.GetEnvironmentVariable("ASPNETCORE_ENVIRONMENT");    string jsonFile = $"appsettings.{environment}.json";    var configBuilder = new ConfigurationBuilder()
        .SetBasePath(Directory.GetCurrentDirectory())
        .AddJsonFile(jsonFile, optional: true, reloadOnChange: true);

    IConfigurationRoot configuration = configBuilder.Build();

    configuration.GetSection("MessageQueueAppConfig").Bind(messageQueueAppConfig);
    configuration.GetSection("ConnectionStrings").Bind(connectionStrings);    //
    //    Sending Message
    //

    IHostedService sendInventoryManagementMessages = new SendMessages();    //    //   Receive Messages     //

    IHostedService receiveInventoryManagementMessages = new ReceiveMessages();
   
    //
    //    Message Processing
    //       IHostedService processMessages = new ProcessMessages();    var builder = new HostBuilder().ConfigureAppConfiguration((hostingContext, config) => {})
        .ConfigureServices((hostContext, services) =>
        {
            services.AddTransient<IHostedService>(provider => processMessages);
        })
        .ConfigureServices((hostContext, services) =>
        {
            services.AddTransient<IHostedService>(provider => sendInventoryManagementMessages);
        })
        .ConfigureServices((hostContext, services) =>
        {
            services.AddTransient<IHostedService>(provider => receiveInventoryManagementMessages);
        })
        .ConfigureLogging((hostingContext, logging) =>
        {
            logging.AddConfiguration(hostingContext.Configuration.GetSection("Logging"));
            logging.AddConsole();
        });        await builder.RunConsoleAsync();
}

RabbitMQ入门

要开始使用RabbitMQ,您必须下载RabbitMQ服务器并按照其网站https://www.rabbitmq.com上的详细安装说明进行操作 本文末尾还提供了安装说明。当您运行适用于Windows的RabbitMQ安装程序时,它会将RabbitMQ安装为Windows服务并使用默认配置启动它。

该服务将使用其默认设置正常运行。您可以自定义RabbitMQ环境并根据需要更改其配置。RabbitMQ服务自动启动。您可以从“开始”菜单停止/重新安装/启动RabbitMQ服务。

RabbitMQ为您的RabbitMQ服务器提供Web UI管理和监视工具。在管理界面中,您可以监控,创建,删除和列出所有交换和队列。您还可以监视服务器连接和通道,监视队列长度并检查消息速率等。

 

RabbitMQ队列和交换

在开始使用RabbitMQ发送和接收消息之前,您需要深入了解RabbitMQ并了解AMQP和RabbitMQ的一些概念。RabbitMQ消息传递的一些主要概念包括:

谈到交换,RabbitMQ中有四种类型的交换:

使用扇出交换发送RabbitMQ消息

对于示例应用程序,使用Fanout Exchange发送邮件似乎是最佳选择。例如,在库存管理微服务中创建产品时,需要与销售订单管理微服务和采购订单管理微服务共享产品信息。

此外,应将消息发送到日志记录队列,以便监视并确认所有队列和微服务发送,接收和成功处理的消息的完整生命周期的成功完成。

查看库存管理消息队列服务,已设置以下交换:

隐藏   收缩    复制代码

////    Inventory Received Transactions//IMessageQueueConfiguration inventoryReceivedConfiguration = 
                           new MessageQueueConfiguration(MessageQueueExchanges.InventoryReceived,
                                                         messageQueueAppConfig, 
                                                         sendingQueueConnection);

inventoryReceivedConfiguration.AddQueue(MessageQueueEndpoints.SalesOrderQueue);
inventoryReceivedConfiguration.AddQueue(MessageQueueEndpoints.PurchaseOrderQueue);
inventoryReceivedConfiguration.AddQueue(MessageQueueEndpoints.LoggingQueue);

inventoryReceivedConfiguration.InitializeOutboundMessageQueueing();
messageQueueConfigurations.Add(inventoryReceivedConfiguration);            
////    Product Creation and Updates//
            IMessageQueueConfiguration productUpdatedConfiguration = 
                           new MessageQueueConfiguration(MessageQueueExchanges.ProductUpdated, 
                                                         messageQueueAppConfig, sendingQueueConnection);

productUpdatedConfiguration.AddQueue(MessageQueueEndpoints.SalesOrderQueue);
productUpdatedConfiguration.AddQueue(MessageQueueEndpoints.PurchaseOrderQueue);
productUpdatedConfiguration.AddQueue(MessageQueueEndpoints.LoggingQueue);

productUpdatedConfiguration.InitializeOutboundMessageQueueing();
messageQueueConfigurations.Add(productUpdatedConfiguration);            
////    Inventory Shipped Transactions//IMessageQueueConfiguration inventoryShippedConfiguration = 
                           new MessageQueueConfiguration(MessageQueueExchanges.InventoryShipped,
                                                         messageQueueAppConfig, sendingQueueConnection);

inventoryShippedConfiguration.AddQueue(MessageQueueEndpoints.SalesOrderQueue);
inventoryShippedConfiguration.AddQueue(MessageQueueEndpoints.LoggingQueue);

inventoryShippedConfiguration.InitializeOutboundMessageQueueing();
messageQueueConfigurations.Add(inventoryShippedConfiguration);

使用扇出交换发送RabbitMQ消息

使用RabbitMQ,您需要制定策略来定义和设计交换和队列。您可以阅读互联网上设计交换和队列的最佳实践和设计模式,但对于示例应用程序,我采用的方法是为每种类型的业务事务创建单独的RabbitMQ交换。

例如,在库存管理微服务中创建产品时,会创建专用的RabbitMQ交换,以便路由产品创建和产品更新消息。

为了满足示例应用程序的要求,扇出交换类型将消息路由到绑定到它的所有队列。在库存管理微服务中有三个业务事务,因此我创建了三个RabbitMQ交换,用于发送和路由消息,如下所示:

示例应用程序中包含的其他交换包括:

发送库存发货消息

为了在示例应用程序中发送消息队列消息,创建了一个所有消息队列服务将实现的通用SendMessages类。该类的基本功能包括:

隐藏   收缩    复制代码

using CodeProject.Shared.Common.Interfaces;using Microsoft.Extensions.Hosting;using System;using System.Collections.Generic;using System.Text;using System.Reactive.Subjects;using System.Threading;using System.Threading.Tasks;using Microsoft.Extensions.Logging;using Microsoft.Extensions.Options;using CodeProject.Shared.Common.Models;using CodeProject.MessageQueueing;using Microsoft.AspNetCore.SignalR.Client;using RabbitMQ.Client;namespace CodeProject.MessageQueueing
{    public class SendMessages : IHostedService, IDisposable
    {        private readonly List<IMessageQueueConfiguration> _messageQueueConfigurations;        private readonly IMessageQueueConnection _messageQueueConnection;        private readonly IMessageQueueProcessing _messageProcessor;        private readonly MessageQueueAppConfig _appConfig;        private readonly ConnectionStrings _connectionStrings;        private readonly string _signalRQueue;        private HubConnection _signalRHubConnection;        private Timer _timer;        /// <summary>        /// Send Messages        /// </summary>        /// <param name="messageQueueConnection"></param>        /// <param name="messageProcessor"></param>        /// <param name="appConfig"></param>        /// <param name="connectionStrings"></param>        /// <param name="messageQueueConfigurations"></param>        public SendMessages(IMessageQueueConnection messageQueueConnection, 
                            IMessageQueueProcessing messageProcessor, 
                            MessageQueueAppConfig appConfig, 
                            ConnectionStrings connectionStrings, 
                            List<IMessageQueueConfiguration> messageQueueConfigurations, 
                            string signalRQueue)
        {
            _messageQueueConnection = messageQueueConnection;
            _messageQueueConfigurations = messageQueueConfigurations;
            _connectionStrings = connectionStrings;
            _messageProcessor = messageProcessor;
            _appConfig = appConfig;
            _signalRQueue = signalRQueue;
        }        /// <summary>        /// Start Process Interval        /// </summary>        /// <param name="cancellationToken"></param>        /// <returns></returns>        public Task StartAsync(CancellationToken cancellationToken)
        {

            StartSignalRConnection();

            _timer = new Timer(GetMessagesInQueue, null, TimeSpan.Zero, 
                               TimeSpan.FromSeconds(_appConfig.SendingIntervalSeconds));            return Task.CompletedTask;
        }        /// <summary>        /// Start SignalR Connection        /// </summary>        private async void StartSignalRConnection()
        {
            _signalRHubConnection = new HubConnectionBuilder().WithUrl(url).Build();
           
            _signalRHubConnection.On<string>(_signalRQueue, (message) =>
            {                this.GetMessagesInQueue(null);

            });

            _signalRHubConnection.Closed += async (error) =>
            {                await Task.Delay(10000);                await _signalRHubConnection.StartAsync();
            };            await _signalRHubConnection.StartAsync();
                  
        }        
        /// <summary>        /// Get Messages In Queue        /// </summary>        /// <param name="state"></param>        private async void GetMessagesInQueue(object state)
        {
            ResponseModel<List<MessageQueue>> messages = 
                 await _messageProcessor.SendQueueMessages(_messageQueueConfigurations,
                                                           _appConfig.OutboundSemaphoreKey, 
                                                           _connectionStrings);
            
            Console.WriteLine("total messages " + messages.Entity.Count.ToString() + 
                              " sent at " + DateTime.Now);

        }        /// <summary>         /// Stop Process  
        /// </summary>        public Task StopAsync(CancellationToken cancellationToken)
        {
            _timer?.Change(Timeout.Infinite, 0);            return Task.CompletedTask;
        }

        /// <summary>        /// Dispose Timer        /// </summary>        public void Dispose()
        {
            _timer?.Dispose();
        }

    }

}

我在示例应用程序中对消息队列进行的设计决策之一是确保在将业务事务提交到数据库时按顺序处理所有业务事务  在系统中可能有数百个并发用户的环境中,库存管理队列服务将同时接收多个实时消息请求,并可能影响相同的数据。按顺序处理业务事务的目的是确保所有事务日志都以适当的顺序在微服务中记录业务事务,并最终保持数据完整性。

为了保证业务事务的顺序处理,SendQueueMessages方法实现了一个lock语句阻止多个请求同时尝试发送消息。第一个请求将获取一个独占的SQL-Server行锁并继续按顺序事务Id顺序读取TransactionQueueOutbound 表中的所有待处理事务,并提取每个事务的消息队列有效负载,并将每个事务的消息发送到执行SendMessage方法时适当的RabbitMQ交换完成此周期后,下一个消息请求将继续处理下一批待处理业务事务。

隐藏   收缩    复制代码

/// <summary>/// Send Queue Messages/// </summary>/// <param name="messageQueueConfigurations"></param>/// <param name="outboundSemaphoreKey"></param>/// <param name="connectionStrings"></param>/// <returns></returns>public async Task<ResponseModel<List<MessageQueue>>> SendQueueMessages(
             List<IMessageQueueConfiguration> messageQueueConfigurations, 
             string outboundSemaphoreKey, 
             ConnectionStrings connectionStrings)
{
    ResponseModel<List<MessageQueue>> returnResponse = new ResponseModel<List<MessageQueue>>();
    returnResponse.Entity = new List<MessageQueue>();

    Console.WriteLine("sending = " + _sending);    lock (_sendingLock)
    {        if (_sending)
        {
            Console.WriteLine("Aborted iteration still sending");            return returnResponse;
        }

        _sending = true;

    }

    Console.WriteLine("Start sending");    Boolean getMessages = true;    while (getMessages==true)
    {
        ResponseModel<List<MessageQueue>> response = 
                                          await GetMessagesToSend(messageQueueConfigurations,
                                                                  outboundSemaphoreKey, 
                                                                  connectionStrings);        foreach (MessageQueue message in response.Entity)
        {
            returnResponse.Entity.Add(message);
        }        if (response.Entity.Count == 0)
        {
            _sending = false;
            getMessages = false;
        }
    }    return returnResponse;

}        
/// <summary>/// Get Messages To Send/// </summary>/// <param name="messageQueueConfigurations"></param>/// <param name="outboundSemaphoreKey"></param>/// <param name="connectionStrings"></param>/// <returns></returns>private async Task<ResponseModel<List<MessageQueue>>> GetMessagesToSend(
     List<IMessageQueueConfiguration> messageQueueConfigurations, 
     string outboundSemaphoreKey, ConnectionStrings connectionStrings)
{
    TransactionQueueSemaphore transactionQueueSemaphore = null;

    ResponseModel<List<MessageQueue>> returnResponse = new ResponseModel<List<MessageQueue>>();
    returnResponse.Entity = new List<MessageQueue>();    try
    {
        _inventoryManagementDataService.OpenConnection(
                                        connectionStrings.PrimaryDatabaseConnectionString);

        _inventoryManagementDataService.BeginTransaction((int)IsolationLevel.Serializable);

         //         // get all pending outbound transactions         //

         List<TransactionQueueOutbound> transactionQueue = 
              await _inventoryManagementDataService.GetOutboundTransactionQueue();         foreach (TransactionQueueOutbound transactionQueueItem in transactionQueue)
         {
            MessageQueue message = new MessageQueue();
            message.ExchangeName = transactionQueueItem.ExchangeName;
            message.TransactionQueueId = transactionQueueItem.TransactionQueueOutboundId;
            message.TransactionCode = transactionQueueItem.TransactionCode;
            message.Payload = transactionQueueItem.Payload;

            //            //  the message queue configurations object has a list of the all exchange/queue            //  configurations - the where clause finds the configration needed for the             //  particular transaction being processed            //

            IMessageQueueConfiguration messageQueueConfiguration = messageQueueConfigurations
                     .Where(x => x.TransactionCode == message.TransactionCode).FirstOrDefault();            if (messageQueueConfiguration == null)
            {                break;
            }            //            //  The SendMessage method will send a message to RabbitMQ            //

            ResponseModel<MessageQueue> messageQueueResponse = 
                                        messageQueueConfiguration.SendMessage(message);            if (messageQueueResponse.ReturnStatus == true)
            {
                transactionQueueItem.SentToExchange = true;
                transactionQueueItem.DateSentToExchange = DateTime.UtcNow;                await _inventoryManagementDataService
                      .UpdateOutboundTransactionQueue(transactionQueueItem);

                returnResponse.Entity.Add(message);
            }            else
            {                break;
            }

        }        await _inventoryManagementDataService.UpdateDatabase();

        _inventoryManagementDataService.CommitTransaction();
        _inventoryManagementDataService.CloseConnection();

    }    catch (Exception ex)
    {
        _inventoryManagementDataService.RollbackTransaction();
        returnResponse.ReturnStatus = false;
        returnResponse.ReturnMessage.Add(ex.Message);
    }    finally
    {
        _inventoryManagementDataService.CloseConnection();
    }    return returnResponse;
}

创建RabbitMQ连接

要使用RabbitMQ编写C#代码,必须安装.NET RabbitMQ.Client库。RabbitMQ .NET客户端是一个开源库,是C#和其他.NET语言的AMQP客户端库的实现。使用RabbitMQ发送和接收消息需要做的第一件事是创建与RabbitMQ的连接。在开发模式下,连接只需要使用开发默认值设置的以下属性:

HostName = localhost 
UserName = guest 
密码=来宾

对于示例应用程序,每个消息队列服务中运行的每个异步任务/线程都将创建并维护与RabbitMQ的单独连接。

隐藏   收缩    复制代码

using CodeProject.Shared.Common.Interfaces;using CodeProject.Shared.Common.Models;using RabbitMQ.Client;using System;using System.Collections.Generic;using System.Text;namespace CodeProject.MessageQueueing
{    public class MessageQueueConnection  : IMessageQueueConnection
    {    
        private ConnectionFactory _connectionFactory;        private MessageQueueAppConfig _messageQueueAppConfig;        private IConnection _connection;        public MessageQueueConnection(MessageQueueAppConfig messageQueueAppConfig)
        {
            _messageQueueAppConfig = messageQueueAppConfig;
        }        /// <summary>        /// Create RabbitMQ Connection        /// </summary>        public void CreateConnection()
        {
            _connectionFactory = new ConnectionFactory();

            _connectionFactory.HostName = _messageQueueAppConfig.MessageQueueHostName;
            _connectionFactory.UserName = _messageQueueAppConfig.MessageQueueUserName;
            _connectionFactory.Password = _messageQueueAppConfig.MessageQueuePassword;

            _connection = _connectionFactory.CreateConnection();

        }        public IConnection GetConnection()
        {            return _connection;
        }

    }
}

声明,创建和配置RabbitMQ交换和队列

您可以通过两种方式配置和创建RabbitMQ交换和队列。您可以使用RabbitMQ Web UI管理控制台或RabbitMQ管理命令行工具执行这些操作。另一种选择是以编程方式配置交换和队列,我选择这样做。

一旦建立了与RabbitMQ的连接,就可以开始以编程方式创建和配置RabbitMQ交换和队列。对于示例应用程序,所有交换将配置为扇出 交换下面的  MessageQueueConfiguration类中,正在执行以下操作:

  1. 从已建立的RabbitMQ连接创建RabbitMQ 通道

  2. 创建IBasicProperties对象以将交换配置为持久性

  3. 使用交换机的名称,交换类型“ 扇出 ” 声明交换,并将交换设置为持久且不自动删除。

  4. 声明一组队列,将每个队列设置为持久且不自动删除。

  5. 当在交换机上收到消息时,将每个队列绑定到交换机以将消息扇出到这些队列。

以编程方式声明交换和队列时,如果尚未在RabbitMQ服务器上创建交换或队列,则此时将动态创建它们。队列可以绑定到多个交换。 

在声明交换和队列时,有许多配置设置可供选择。您可能想要了解的主要配置是:

隐藏   收缩    复制代码

using CodeProject.Shared.Common.Interfaces;using CodeProject.Shared.Common.Models;using Newtonsoft.Json;using RabbitMQ.Client;using System;using System.Collections.Generic;using System.Text;using RabbitMQ.Client.Events;using RabbitMQ.Client.MessagePatterns;namespace CodeProject.MessageQueueing
{    public class MessageQueueConfiguration : IMessageQueueConfiguration
    {        
        private string _exchangeName;        private List<string> _boundedQueues;        private MessageQueueAppConfig _messageQueueAppConfig;        private readonly IMessageQueueConnection _messageQueueConnection;        private Subscription _subscription;        private IBasicProperties _basicProperties;        private IModel _channel;        /// <summary>        /// Constructor        /// </summary>        /// <param name="exchangeName"></param>        /// <param name="messageQueueAppConfig"></param>        /// <param name="messageQueueConnection"></param>        public MessageQueueConfiguration(string exchangeName, 
                                         List<string> boundedQueues, 
                                         MessageQueueAppConfig messageQueueAppConfig, 
                                         IMessageQueueConnection messageQueueConnection)
        {
            _messageQueueAppConfig = messageQueueAppConfig;
            _messageQueueConnection = messageQueueConnection;
            _exchangeName = exchangeName;
            _boundedQueues = boundedQueues;
        }      
        /// <summary>        /// Initialize Initialize RabbitMQ Exchange        /// </summary>        public void InitializeRabbitMQExchange()
        {
            _channel = _messageQueueConnection.GetConnection().CreateModel();

            _basicProperties = _channel.CreateBasicProperties();
            _basicProperties.Persistent = true;            string exchangeName = _exchangeName + "_" + _messageQueueAppConfig.MessageQueueEnvironment;

            _channel.ExchangeDeclare(exchangeName, "fanout", true, false);            foreach (string queueName in _boundedQueues)
            {                string queue = queueName + "_" + _messageQueueAppConfig.MessageQueueEnvironment;

                _channel.QueueDeclare(queue, true, false, false);
                _channel.QueueBind(queue, exchangeName, _messageQueueAppConfig.RoutingKey);
            }
        }
    
    }

}

向RabbitMQ Exchange发送消息

当库存管理消息队列服务从库存管理数据库中TransactionQueueOutbound 表中获取待处理的业务事务时,它提取每个事务的有效负载信息,并将有效负载传递给以下SendMessage方法,该方法将消息发布到InventoryShippedRabbitMQ交换。

RabbitMQ支持发布/订阅消息排队模式。发布/订阅消息传递模式是其中的消息(发布者或生产者)的发送方发布消息没有它的用户如果有的话有可能是知识。类似地,订阅者或消费者仅接收他们需要的消息,而不知道哪些发布者(如果有的话)存在。

要使用RabbitMQ发布消息,首先要创建一个PublicationAddress实例并设置交换名称和交换类型属性。为了实际向交换机发送消息,BasicPublish方法从RabbitMQ通道执行,其中包含发布地址,基本属性和传递给方法的消息的有效负载。发送消息时,有效负载将作为UTF8字节数组发送。

下面SendMessage方法中,try / catch块围绕将消息发送到RabbitMQ的代码,如果尝试发送消息时发生错误,则会将错误返回到库存管理消息队列服务,它将离开业务事务在TransactionQueueOutbound 表中挂起如果向RabbitMQ发送消息时发生错误,则实质上意味着RabbitMQ服务器已关闭。

在SQL-Server中实现中间TransactionQueueInbound和TransactionQueueOutbound 表使整个消息队列过程更具容错性,更易于监视并有助于恢复和重试功能。在没有参与SQL-Server提交/回滚事务的中间消息队列表的情况下实现消息队列,使得从错误功能恢复和重试变得更加难以实现。

隐藏   收缩    复制代码

/// <summary>/// Send Message/// </summary>/// <param name="entity"></param>public ResponseModel<MessageQueue> SendMessage(MessageQueue entity)
{
    ResponseModel<MessageQueue> response = new ResponseModel<MessageQueue>();
    response.Entity = new MessageQueue();    try
    {        string output = JsonConvert.SerializeObject(entity);        byte[] payload = Encoding.UTF8.GetBytes(output);        string exchangeName = _exchangeName + "_" + _messageQueueAppConfig.MessageQueueEnvironment;

        PublicationAddress address = new PublicationAddress(ExchangeType.Fanout, 
                                                            exchangeName,
                                                            _messageQueueAppConfig.RoutingKey);

        _channel.BasicPublish(address, _basicProperties, payload);

        response.Entity.Payload = output;

        response.ReturnStatus = true;
    }    catch (Exception ex)
    {
        response.ReturnStatus = false;
        response.ReturnMessage.Add(ex.Message);
    }    return response;

}

创建和配置RabbitMQ订阅

The recommended and most convenient way to receive messages is to set up a subscription. In RabbitMQ there are a couple different configuration options for setting up a subscription.  For the sample application subscriptions are created using the RabbitMQ Subscription object. Once created, the subscription consumes from a queue. Received deliveries can be retrieved by calling Next(), or by using the Subscription object as an IEnumerator in a foreach loop.

InitializeRabbitMQSubscription方法中,通过在声明要绑定到交换的队列时首先以相同的方式声明队列来创建和配置Subscription对象。声明队列后,将其分配给订阅,并将Subscription对象分配给RabbitMQ通道。

隐藏   收缩    复制代码

using CodeProject.Shared.Common.Interfaces;using CodeProject.Shared.Common.Models;using Newtonsoft.Json;using RabbitMQ.Client;using System;using System.Collections.Generic;using System.Text;using RabbitMQ.Client.Events;using RabbitMQ.Client.MessagePatterns;namespace CodeProject.MessageQueueing
{    public class MessageQueueConfiguration : IMessageQueueConfiguration
    {        
        private string _exchangeName;        private List<string> _boundedQueues;        private MessageQueueAppConfig _messageQueueAppConfig;        private readonly IMessageQueueConnection _messageQueueConnection;        private Subscription _subscription;        private IBasicProperties _basicProperties;        private IModel _channel;        private string _originatingQueueName;        /// <summary>        /// Constructor        /// </summary>        /// <param name="exchangeName"></param>        /// <param name="messageQueueAppConfig"></param>        /// <param name="messageQueueConnection"></param>        public MessageQueueConfiguration(string exchangeName, 
                                         MessageQueueAppConfig messageQueueAppConfig, 
                                         IMessageQueueConnection messageQueueConnection)
        {
            TransactionCode = exchangeName;

            _messageQueueAppConfig = messageQueueAppConfig;
            _messageQueueConnection = messageQueueConnection;
        }        /// <summary>        /// Initialize RabbitMQ Subscription        /// </summary>        /// <param name="queueName"></param>        public void InitializeRabbitMQSubscription(string queueName)
        {
            _channel = _messageQueueConnection.GetConnection().CreateModel();            string queue = queueName + "_" + _messageQueueAppConfig.MessageQueueEnvironment;

 _          _channel.QueueDeclare(queue: queue, 
                                  durable: true,
                                  exclusive: false,
                                  autoDelete: false, 
                                  arguments: null); 

            _subscription = new Subscription(_channel, queue, false);

        }
       
    }

}

从RabbitMQ队列中消费和接收消息

对于示例应用程序,创建了一个通用的ReceiveMessages类,所有消息队列服务将使用该类来使用队列中的消息。所述ReceiveMessages类被创建为一个托管服务和运行,因为每个消息中的每个微服务排队服务的单独的异步任务,

由于库存管理消息队列服务将库存货物信息到库存出货é Xchange的,销售订单管理队列服务同时订阅,听这势必对销售订单队列库存出货交流。销售订单管理消息队列服务实现ReceiveMessages类以接收消息。

GetMessagesInQueue方法中,在连接和订阅初始化过程中先前创建Subscription对象之后,将获取对该对象的引用

在销售订单管理消息队列服务中,订阅从销售订单队列中消耗。通过在foreach循环中将订阅用作IEnumerator来检索接收的传递每次迭代时,都会从订阅中返回BasicDeliverEventArg对象。BasicDeliverEventArg包含了所有从AMQP代理传递消息的信息。

foreach循环将连续迭代。当队列中没有更多消息要处理时,循环将处于空闲状态。当更多消息进入队列时,foreach循环将自动开始再次迭代。

所述ReceiveMessages类使得引用包含方法来处理入站的出站数据和更新数据库表的自定义消息处理器组件。

销售订单管理消息队列服务将获取入站RabbitMQ消息并将其反序列化为MessageQueue对象,并将反序列化对象传递给消息处理器的  CommitInBoundMessage方法,该方法将消息提交到销售订单管理数据库中TransactionQueueInbound 表。

一旦提交到  TransactionQueueInbound 表,就会向RabbitMQ服务器发送一条确认消息,让服务器知道该消息可以从销售订单消息队列中删除。最后,再次执行消息处理器以处理TransactionQueueInbound 表中提交的消息,  以更新销售订单上的数量。

隐藏   收缩    复制代码

using System;using System.Collections.Generic;using System.Text;using System.Reactive.Subjects;using System.Threading;using System.Threading.Tasks;using Microsoft.Extensions.Hosting;using Microsoft.Extensions.Logging;using Microsoft.Extensions.Options;using CodeProject.Shared.Common.Models;using CodeProject.Shared.Common.Interfaces;using CodeProject.Shared.Common.Models.MessageQueuePayloads;using Newtonsoft.Json;using RabbitMQ.Client;using RabbitMQ.Client.Events;using RabbitMQ.Client.MessagePatterns;namespace CodeProject.MessageQueueing
{    public class ReceiveMessages : IHostedService, IDisposable
    {        private readonly List<IMessageQueueConfiguration> _messageQueueConfigurations;        private readonly IMessageQueueConnection _messageQueueConnection;        private readonly IMessageQueueProcessing _messageProcessor;        private readonly MessageQueueAppConfig _appConfig;        private readonly ConnectionStrings _connectionStrings;        private Timer _timer;        private Boolean _running = false;        public ReceiveMessages(IMessageQueueConnection messageQueueConnection, 
                               IMessageQueueProcessing messageProcessor, 
                               MessageQueueAppConfig appConfig, ConnectionStrings connectionStrings, 
                               List<IMessageQueueConfiguration> messageQueueConfigurations)
        {
            _messageQueueConnection = messageQueueConnection;
            _messageQueueConfigurations = messageQueueConfigurations;
            _connectionStrings = connectionStrings;
            _messageProcessor = messageProcessor;
            _appConfig = appConfig;
        }        /// <summary>        /// Start        /// </summary>        /// <param name="cancellationToken"></param>        /// <returns></returns>        public Task StartAsync(CancellationToken cancellationToken)
        {
            Console.WriteLine("Starting Receiving Messages");

            _timer = new Timer(GetMessagesInQueue, null, TimeSpan.Zero, 
                               TimeSpan.FromSeconds(_appConfig.ReceivingIntervalSeconds));            return Task.CompletedTask;
        }        /// <summary>        /// Get Messages In Queue        /// </summary>        /// <param name="state"></param>        private async void GetMessagesInQueue(object state)
        {    
            if (_running == true)
            {                return;
            }

            _running = true;

            Console.WriteLine("Receiving Messages at " + DateTime.Now);

            Subscription subscription = _messageQueueConfigurations[0].GetSubscription();            foreach (BasicDeliverEventArgs e in subscription)
            {                string message = Encoding.UTF8.GetString(e.Body);

                MessageQueue messageQueue = JsonConvert.DeserializeObject<MessageQueue>(message);

                ResponseModel<MessageQueue> responseMessage = 
                              await _messageProcessor.CommitInboundMessage(messageQueue, 
                                                                           _connectionStrings);                if (responseMessage.ReturnStatus == true)
                {
                   
                    Console.WriteLine($"Message Committed: {messageQueue.TransactionQueueId}");

                    subscription.Ack(e);                    
                    await _messageProcessor.ProcessMessages(_appConfig.InboundSemaphoreKey, 
                                                            _connectionStrings);

                }

            }

        }        
        /// <summary>        /// Stop Async        /// </summary>        /// <param name="cancellationToken"></param>        /// <returns></returns>        public Task StopAsync(CancellationToken cancellationToken)
        {
            Console.WriteLine("Stopping.");            return Task.CompletedTask;
        }        public void Dispose()
        {

        }
    }
}

微服务记录最佳实践

微服务架构提供了许多很多好处,例如能够使用不同的技术堆栈,独立部署应用程序,一次解决一个小问题等等。但是使用微服务需要很高的成本,因为它们很复杂。不仅在于他们如何彼此沟通,还在于如何管理它们。当一个或多个服务失败时,它们会变得更加复杂。没有有意义的日志记录机制,很难对微服务进

对于示例应用程序,创建了单独的日志记录数据库。通过RabbitMQ发送和接收的所有消息队列消息也会路由到日志消息队列。创建了一个日志消息队列服务,用于将消息记录到集中式日志记录数据库中的MessagesSent表或MessagesReceived表中。

例如,在库存管理微服务中创建产品。发生此业务事务时,会向ProductUpdate交换发送一条消息,并将消息路由到销售订单队列和采购订单队列。当所有消息队列服务成功处理后,将在记录数据库中添加三行。一个用于插入到MessagesSent表中的原始发送消息,另一个插入到MessagesReceived表中; 销售订单队列和采购订单队列接收的消息各一个。

The Logging Messaging Queue Service maintains a count of queues bound to each RabbitMQ exchange. This count is used to reconcile messages. When messages have been reconciled, the Logging Message Queuing Service will send out acknowledgement messages back through RabbitMQ. In the case of the product creation example, the inventory queue will receive an acknowledgement message and the Inventory Management Message Queuing Service will process the message and archive TransactionQueueOutbound rows to an TransactionQueueOutboundHistory table.

Additionally, all application error exceptions should also be logged into a centralized logging database. Centralizing error messages can be done the same way as any other type of message via RabbitMQ message queuing.

Some good pieces of information for logging errors include:

Contextualizing centralized logging will save you time when you need to troubleshoot problems in the system.

Microservices and Shared Libraries

In the sample application, a common shared library was created that all microservices reference. The shared library in the sample application contains all the models and interfaces for processing messages.  The shared library also contains utility functions and methods for serialization, token management, hashing and other general infrastructure functionality. The common shared library contains no business logic.

As we have made the progression from a monolith toward a microservices based architecture, the topic of shared libraries in microservices has continued to be a point of contention. One of the primary goals of microservices is to create loosely coupled services which can be changed independently from other microservices. The creation of our own “common” libraries creates coupling between the projects that depend on them.

As a rule of thumb there should be no need to put business logic into a common library. If you are doing this, then the bounded context of your microservices domains are most likely incorrect, and/or you are missing a microservice. Developers of microservices need to embrace the reality that duplication of code between microservices is actually okay; up to a point. Duplication of code within a specific microservice is not okay.

当然,现实是在微服务架构中需要一个共享库。与示例应用程序中的共享库一样,微服务架构中的共享库中的代码应该主要包括支持跨微服务的通用基础结构功能的函数和类。

在微服务架构中有几种管理共享库的技术。在Microsoft世界中,共享库可以作为跨微服务的版本化Nuget包部署,允许各种微服务在需要时实现最新版本的共享库。注意避免对共享库进行重大更改是另一种选择。使用重载函数有助于防止在共享库中创建重大更改。

遵循SOLID设计原则也可以提供帮助。  SOLID是面向对象软件开发中最流行的设计原则之一。它是以下五个设计原则的助记符缩写:

单一责任原则开放/封闭原则的论据相对简单:它使您的软件更易于实施,并防止未来变更的意外副作用。

安装示例应用程序

在Angular 6,.NET Core 2.1和RabbitMQ之间,有许多移动部件需要安装和配置才能启动和运行示例应用程序。示例应用程序还包含九个Visual Studio 2017项目。

尽管这些新技术令人兴奋,但它可能是Elm Street上梦魇,试图升级到这些技术的最新版本并处理这些技术的所有依赖关系,包括处理版本不兼容问题。如果你曾经升级到任何Visual Studio版本的最新版本,你可能知道升级是多么痛苦; 到了你经常后悔甚至升级的程度。Visual Studio升级的有时脆弱性几乎总是向前迈出两步,退一步。

为了尽可能轻松地在本地开发环境中启动和运行示例应用程序,我在下面概述了启动和运行所需的先决条件和安装步骤。

软件安装先决条件:

安装RabbitMQ服务器 - RabbitMQ需要安装64位支持的Erlang for Windows版本。http://www.erlang.org/downloads上有Erlang的Windows安装程序重要说明:必须使用管理帐户运行Erlang安装程序,否则RabbitMQ安装程序所需的注册表项将不存在。安装Erlang后,运行RabbitMQ安装程序  rabbitmq-server-3.7.9.exe,可从https://www.rabbitmq.com/install-windows.html下载它将RabbitMQ安装为Windows服务,并使用默认配置启动它。

安装RabbitMQ Web UI管理工具 - 要安装RabbitMQ Web UI管理工具,需要按如下方式安装管理插件:

下载示例应用程序源代码 - 可以从我的github.com  帐户下载示例应用程序的源代码  只需下载zip文件并将所有文件解压缩到您选择的文件夹即可。

示例应用程序数据库   - 下载源代码后,以管理员身份运行SQL Server Management Studio并附加驻留在下载的Databases文件夹中的以下数据库:

.NET Core 2.1 - 下载并安装Visual Studio 2017 Professional或Community Edition时,.NET Core 2.1应自动安装Visual Studio。如果您已经安装了Visual Studio 2017,则可以通过转到“ 工具”菜单并选择“ 获取工具和功能”来验证安装,这将启动Visual Studio安装程序。从安装程序选项中,您可以验证是否已安装.NET Core 2.1。

构建并运行示例应用程序Web API项目 -要验证所有内容是否已正确安装,请为示例应用程序编译以下四个Web API项目。在使用Visual Studio 2017打开和构建这些项目时,请务必等待一两分钟,因为Visual Studio将需要在项目打开时还原编译这些项目所需的包。

这些Web API项目配置为使用SSL。要避免SSL问题,您需要通过选择IISExpress配置文件并选择运行按钮来尝试运行项目,ASP.NET Core将创建SSL证书。Visual Studio将询问您是否要信任ASP.NET Core生成的自签名证书。选择“是”以信任证书。因为Visual Studio是Visual Studio,所以您可能必须第二次或第三次运行项目,或者退出并重新加载Visual Studio以确认项目的所有内容都正常工作。从Visual Studio运行项目时,浏览器应从值控制器启动并在浏览器中显示数据库连接字符串。 

使用.NET Core CLI构建所有项目   - .NET Core命令行界面(CLI)是用于开发.NET应用程序的新型跨平台工具链。CLI是高级工具(如集成开发环境(IDE),编辑器和构建编排器)可以使用的基础。

示例应用程序中有九个.NET Core项目需要构建。手动构建Web API项目后,可以使用名为_BuildAllProjects.bat的DOS批处理文件构建其余项目  ,您可以在Support文件夹中找到该文件。此DOS批处理文件为每个项目执行.NET Core CLI构建命令:

隐藏   复制代码

dotnet build SpawnProcesses\SpawnProcesses
dotnet build ..\AccountManagement\CodeProject.AccountManagement.WebApi
dotnet build ..\InventoryManagement\CodeProject.InventoryManagement.MessageQueueing
dotnet build ..\InventoryManagement\CodeProject.InventoryManagement.WebApi
dotnet build ..\LoggingManagement\CodeProject.LoggingManagement.MessageQueueing
dotnet build ..\PurchaseOrderManagement\CodeProject.PurchaseOrderManagement.MessageQueueing
dotnet build ..\PurchaseOrderManagement\CodeProject.PurchaseOrderManagement.WebApi
dotnet build ..\SalesOrderManagement\CodeProject.SalesOrderManagement.MessageQueueing
dotnet build ..\SalesOrderManagement\CodeProject.SalesOrderManagement.WebApi

Angular CLI 6.0.8 - Angular 6前端应用程序是通过Angular CLI构建和提供的。您可以通过运行Angular CLI命令来验证Angular CLI安装:  ng version。如果未安装Angular CLI,则可以通过键入npm install -g @ angular / cli @ 6.0.8从命令窗口安装它

构建Angular 6前端应用程序 - Angular 6前端应用程序依赖于要安装在项目的node_modules  文件夹中的节点模块可以通过转到Portal文件夹并打开Visual Studio 2017项目解决方案文件CodeProject.Portal.sln来创建所有节点模块  打开项目后, 右键单击packages.json文件并选择Restore Packages如果有一个packages-lock.json文件,请将其删除,否则将无法恢复包。安装软件包后,您可以在DOS命令窗口中使用Angular 6 CLI构建Angular 6项目,并导航到Portal - > CodeProject.Portal文件夹并执行: 建立。

运行示例应用程序后端服务和前端门户

一旦构建完所有内容,就可以开始运行示例应用程序了。首先,通过 从Support文件夹执行DOS批处理文件_StartAllDevelopmentWebServersAndQueues.bat启动所有后端Web API应用程序和消息队列服务此文件执行名为SpawnProcesses的自定义构建的.NET Core应用程序,该应用程序将启动示例应用程序的所有后端进程。 

SpawnProcesses应用程序将启动每个八个后端应用程序的一个新的进程。通过将每个进程的属性CreateNoWindow设置为false 将强制每个进程在同一个DOS窗口中执行。这很好,因为它不会在打开八个独立窗口的情况下污染您的环境。

如果需要从Visual Studio运行和调试一个或多个后端服务,可以SpawnProcesses项目appsettings.development.json文件中将属性设置为true或false,以告知应用程序应该使用哪些特定进程开始。如果您更改任何设置,只需重建SpawnProcesses项目,以便将设置添加到bin文件夹。

隐藏   复制代码

if (startUpProcesses.InventoryManagementWebApi == true)
{
        Console.WriteLine("Starting Inventory Management Web Api");

        Process process1 = new Process();
        process1.StartInfo.CreateNoWindow = false;
        process1.StartInfo.UseShellExecute = false;
        process1.StartInfo.RedirectStandardOutput = false;
        process1.StartInfo.FileName = runningPath + @"Support\StartInventoryManagementWebApi.bat";
        process1.StartInfo.Arguments = runningPath;
        process1.Start();

}if (startUpProcesses.SalesOrderManagementWebApi == true)
{
        Console.WriteLine("Starting Sales Order Management Web Api");
        Process process2 = new Process();
        process2.StartInfo.CreateNoWindow = false;
        process2.StartInfo.UseShellExecute = false;
        process2.StartInfo.RedirectStandardOutput = false;
        process2.StartInfo.FileName = runningPath + @"Support\StartSalesOrderManagementWebApi.bat";
        process2.StartInfo.Arguments = runningPath;
        process2.Start();

}

每个进程调用一个DOS批处理文件,该文件执行.NET Core CLI运行命令以启动每个应用程序。以下代码段启动了Inventory Management Web API应用程序。

隐藏   复制代码

dotnet run --verbosity m --launch-profile CodeProject.InventoryManagement.WebApi --no-build

现在,随着所有后端服务的启动和运行,我们现在可以提供示例应用程序的Web前端Angular 6应用程序。在DOS命令窗口中,导航到Portal - > CodeProject.Portal文件夹并执行Angular CLI命令:ng serve。这将启动localhost:4200上的Node.js Express Web服务器。要访问Microservices Portal应用程序,请在浏览器中导航到http:// localhost:4200

Angular 6 Web应用程序 - 兴趣点

我对示例应用程序的前端Angular 6 TypeScrip代码没有太多了解,但下面是您可能想要探索的兴趣点的简要列表:

有关Angular 6和.NET Core 2.1的更多信息,请查看我的代码项目文章  使用ASP.NET Core 2部署Angular 6应用程序。 

摘要

在库布里克的电影  2001:A Space Odyssey中, 有一个名为HAL 9000计算机的虚构角色  HAL最初被认为是船员的可靠成员,维持船舶功能并在平等的基础上与其人员配合。在电影中,HAL的人工智能很容易取得胜利。然而,随着时间的推移,HAL开始以微妙的方式发生故障,因此,决定关闭HAL以防止更严重的故障。 

微服务架构令人兴奋,并且有很多承诺,就像HAL一样,看起来可靠且引人入胜。但是,与微服务在纸面上的声音一样好,它们并非没有挑战,其中最大的挑战是增加了复杂性。必须妥善规划,开发和管理微服务。需要促进进程间通信,必须共享和/或复制数据,并且必须持续监视微服务生态系统的所有部分以跟踪异常行为和可能的故障。作为一名信息技术专业人士,我引用了HAL 9000--“ 我仍然对这项任务抱有最大的热情和信心”我很兴奋并希望微服务的命运比HAL 9000的命运要好得多。


合作伙伴

网站备案:豫ICP备15023476号-1 唯特科技