//Шаблон исходящих сообщений в управляемых событиями микросервисных архитектурах ASP.NET Core

Шаблон исходящих сообщений в управляемых событиями микросервисных архитектурах ASP.NET Core

На первом этапе вы создадите два микросервиса . У каждого микросервиса своя база данных . Они используют события для публикации изменений в шине событий RabbitMQ . (Вы можете пропустить эту часть, если вы уже реализовали ее в моей последней статье ). Затем вы увидите, как сообщения теряются, например, когда шина сообщений не работает.

Во второй части вы применяете шаблон транзакционного почтового ящика и видите, как он предотвращает потерю сообщений .

На последнем этапе вы добавляете подтверждения издателя и подписчика, а также дублирующие / неупорядоченные сообщения .

1. Создайте микросервисы .NET Core и обмен сообщениями.

См. Мою предыдущую статью о том, как создать микросервисы и настроить RabbitMQ .

Теперь должны работать и микросервисы, и контейнер RabbitMQ some-rabbit. Используйте REST API службы User, чтобы создавать и изменять пользователей, а также убедитесь, что все работает . Служба пользователей должна отправлять события в шину событий, а служба почты их обрабатывать. И пользователи в базе данных пользователей синхронизируются с пользователями в базе данных сообщений. Сообщения теряются

Остановите контейнер RabbitMQ:

C: \ dev> докер остановите кроликаИспользуйте пользовательский интерфейс Swagger для создания пользователя в UserService:

{
«name»: «Chris2»,
«mail»: «chris2@chris2.com»,
«otherData»: «Некоторые другие данные»
}

Событие потеряно, поэтому пользователя нет в базе данных сообщений, и микросервисы теперь несовместимы. На следующем шаге этого руководства вы увидите, как решить эту проблему.

2. Реализуйте шаблон транзакционной исходящей почты.

В этой части руководства вы добавите шаблон исходящих транзакций в проект UserService, чтобы предотвратить потерю сообщений.

Если вы хотите узнать о деталях и концепциях шаблона транзакционной исходящей почты, вы можете получить дополнительную информацию о нем по адресу: https://microservices.io/patterns/data/transactional-outbox.html

Создайте сущность IntegrationEvent:

пространство имен  UserService . Сущности
{
    открытый  класс  IntegrationEvent
    {
        общедоступный  int  ID { получить ; набор ; }
        общедоступная  строка  Event { получить ; набор ; }
        общедоступная  строка  Data { get ; набор ; }
    }
}

Измените UserController

Код в PutUser и PostUser запускает транзакцию и обновляет / вставляет объект User. В той же транзакции он вставляет IntegrationEvent в базу данных вместо прямой публикации события:

с помощью  Microsoft . AspNetCore . Mvc ;
с помощью  Microsoft . EntityFrameworkCore ;
с помощью  Newtonsoft . Json ;
используя  System . Коллекции . Универсальный ;
используя  System . Резьба . Задачи ;
с помощью  UserService . Данные ;
с помощью  UserService . Сущности ;
 
пространство имен  UserService . Контроллеры
{
    [ Маршрут ( » api / [контроллер] » )]
    [ ApiController ]
    открытый  класс  UsersController : ControllerBase
    {
        частная  чтение  UserServiceContext  _context ;
 
        общедоступный  UsersController ( контекст UserServiceContext  )
        {
            _context  =  context ;
        }
 
        [ HttpGet ]
        общедоступная  асинхронная  задача < ActionResult < IEnumerable < User >>> GetUser ()
        {
            вернуться  AWAIT  _context . Пользователь . ToListAsync ();
        }
 
        [ HttpPut ( » {id} » )]
        общедоступная  асинхронная  задача < IActionResult > PutUser ( int  id , пользователь-  пользователь )
        {
            используя  var  transaction  =  _context . База данных . BeginTransaction ();
 
            _context . Запись ( пользователь ). Состояние  =  EntityState . Изменено ;
            ждать  _context . SaveChangesAsync ();
 
            var  integrationEventData  =  JsonConvert . SerializeObject ( новый
            {
                id  =  пользователь . ID ,
                новое имя  =  пользователь . Имя ,
            });
            _context . IntegrationEventOutbox . Добавить (
                новое  IntegrationEvent ()
                {
                    Event  =  » user.update » ,
                    Данные  =  integrationEventData
                });
 
            _context . SaveChanges ();
            сделка . Зафиксировать ();
 
            return  NoContent ();
        }
 
        [ HttpPost ]
        public  async  Task < ActionResult < User >> PostUser ( Пользователь-  пользователь )
        {
            используя  var  transaction  =  _context . База данных . BeginTransaction ();
            _context . Пользователь . Добавить ( пользователь );
            _context . SaveChanges ();
 
            var  integrationEventData  =  JsonConvert . SerializeObject ( новый
            {
                id  =  пользователь . ID ,
                имя  =  пользователь . Имя
            });
 
            _context . IntegrationEventOutbox . Добавить (
                новое  IntegrationEvent ()
                {
                    Event  =  » user.add » ,
                    Данные  =  integrationEventData
                });
 
            _context . SaveChanges ();
            сделка . Зафиксировать ();
 
            return  CreatedAtAction ( » GetUser » , новый { id  =  user . ID }, пользователь );
        }
    }
}

Создайте издателя как BackgroundService

IntegrationEventSenderService опрашивает базу данных и отправляет все ожидающие события в RabbitMQ:

using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using RabbitMQ.Client;
using System;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using UserService.Data;
 
namespace UserService
{
    public class IntegrationEventSenderService : BackgroundService
    {
        private readonly IServiceScopeFactory _scopeFactory;
 
        public IntegrationEventSenderService(IServiceScopeFactory scopeFactory)
        {
            _scopeFactory = scopeFactory;
            using var scope = _scopeFactory.CreateScope();
            using var dbContext = scope.ServiceProvider.GetRequiredService<UserServiceContext>();
            dbContext.Database.EnsureCreated();
        }
 
        protected override async Task ExecuteAsync(CancellationToken stoppingToken)
        {
            while (!stoppingToken.IsCancellationRequested)
            {
                await PublishOutstandingIntegrationEvents(stoppingToken);
            }
        }
 
        private async Task PublishOutstandingIntegrationEvents(CancellationToken stoppingToken)
        {
            try
            {
                var factory = new ConnectionFactory();
                var connection = factory.CreateConnection();
                var channel = connection.CreateModel();
 
                while (!stoppingToken.IsCancellationRequested)
                {
                    {
                        using var scope = _scopeFactory.CreateScope();
                        using var dbContext = scope.ServiceProvider.GetRequiredService<UserServiceContext>();
                        var events = dbContext.IntegrationEventOutbox.OrderBy(o => o.ID).ToList();
                        foreach (var e in events)
                        {
                            var body = Encoding.UTF8.GetBytes(e.Data);
                            channel.BasicPublish(exchange: «user»,
                                                             routingKey: e.Event,
                                                             basicProperties: null,
                                                             body: body);
                            Console.WriteLine(«Published: » + e.Event + » » + e.Data);
                            dbContext.Remove(e);
                            dbContext.SaveChanges();
                        }
                    }
                    await Task.Delay(1000, stoppingToken);
                }
            }
            catch (Exception e)
            {
                Console.WriteLine(e.ToString());
                await Task.Delay(5000, stoppingToken);
            }
        }
    }
}

Постоянно опрашивать базу данных — это плохо. На следующем шаге вы улучшите опрос.

Добавьте IntegrationEventSenderService как HostedService в Startup.cs:

public  void  ConfigureServices ( службы IServiceCollection  )
{
 
    услуги . AddControllers ();
    услуги . AddSwaggerGen ( c  =>
    {
        c . SwaggerDoc ( » v1 » , новый  OpenApiInfo { Title  =  » UserService » , Version  =  » v1 » });
    });
 
    услуги . AddDbContext < UserServiceContext > ( параметры  =>
         варианты . UseSqlite ( @ » Источник данных = user.db » ));
 
    услуги . AddSingleton < IntegrationEventSenderService > ();
    услуги . AddHostedService < IntegrationEventSenderService > ( provider  =>  provider . GetService < IntegrationEventSenderService > ());
}