NATS 를 구동하기 위한 실행 파일을 아래 사이트에서 다운로드합니다.
https://github.com/nats-io/nats-streaming-server/releases
사이트로 들어가면 목록에 windows 버전이 안 보이는데 show all 버튼을 클릭해 모든 Assets 가 보이도록 하면
windows 버전이 있습니다. 저는 Windows 버전으로 다운로드하였습니다.
nats-streaming-server-v0.25.6-windows-amd64.zip
nats-streaming-server.exe 파일을 찾아서 실행합니다.
통신을 위한 Nats 는 구동이 완료되었고 이제 데이터를 주고받을 Pub/Sub 프로젝트를 구성합니다.
메세지 수신 시 처리하는 로직이 들어갑니다.
Nuget 에서 STAN.Client Nuget Package 를 설치합니다.
그리고 아래와 같이 코딩합니다.
using System.Text;
using STAN.Client;
namespace Nats.StreamTest.Sub
{
internal class Program
{
static void Main(string[] args)
{
string clientId = "ClientID" + Guid.NewGuid().ToString().Replace("-", "");
var cf = new StanConnectionFactory();
StanOptions options = StanOptions.GetDefaultOptions();
options.NatsURL = "nats://localhost:4222";
using (var c = cf.CreateConnection("test-cluster", clientId, options))
{
var opts = StanSubscriptionOptions.GetDefaultOptions();
// 구독 시작 시간 등 추가 옵션
//opts.DeliverAllAvailable();
//opts.StartAt(15);
//opts.StartAt(TimeSpan.FromSeconds(10));
//opts.StartAt(new DateTime(2019, 9, 18, 9, 0, 0));
//opts.StartWithLastReceived();
//opts.DurableName = "durable";
var s = c.Subscribe("nats.streaming.channel", opts, (obj, args) =>
{
// 메세지 수신
string message = Encoding.UTF8.GetString(args.Message.Data);
Console.WriteLine($"[#{args.Message.Sequence}] {message}");
});
Console.WriteLine($"Subcribe - client id '{clientId}'");
Console.ReadKey(true);
// 구독취소
//s.Unsubscribe();
c.Close();
}
}
}
}
위 코드를 보면 알 수 있듯이 Subscribe 시 통신 채널을 'nats.streaming.channel' 을 사용하는 걸 알 수 있습니다.
메세지 전송을 처리합니다.
Nuget 에서 STAN.Client Nuget Package 를 설치합니다.
그리고 아래와 같이 코딩합니다.
using System.Text;
using STAN.Client;
namespace Nats.StreamTest.Pub
{
internal class Program
{
static void Main(string[] args)
{
string clientId = $"pub-{Guid.NewGuid().ToString()}";
var cf = new StanConnectionFactory();
StanOptions options = StanOptions.GetDefaultOptions();
options.NatsURL = "nats://localhost:4222";
using (var c = cf.CreateConnection("test-cluster", clientId, options))
{
for (int i = 1; i <= 25; i++)
{
string message = $"[{DateTime.Now.ToString("hh:mm:ss:fffffff")}] Message {i}";
Console.WriteLine($"Pubish - Sending {message}");
// 메세지 전송
c.Publish("nats.streaming.channel", Encoding.UTF8.GetBytes(message));
}
}
Console.ReadKey(true);
}
}
}
위 코드를 보면 알 수 있듯이 Publish 할 때 'nats.streaming.channel' 채널을 통해 메세지를 전달합니다.
Subscribe 프로젝트를 실행하고 Publish 프로젝트를 실행합니다.
아래처럼 Publish 하는 쪽에서는 지속적으로 메세지를 보내고 있으며
아래처럼 Subscribe 쪽에서는 메세지를 받아 처리하게 됩니다.
streaming 의 장점은 데이터 양이 많은 경우 실시간으로 데이터를 보여줄 수 있어서
데이터 양이 많은 경우 기다리지 않고 화면에 받은 데이터부터 표시해 줄수 있습니다.
DB에서 데이터를 가져올 때 대용량 데이터를 한꺼번에 전송하기 부담스럽다면
DataReader 를 이용해 한줄씩 전달하여 네트워크 비용을 줄일 수 있습니다.
참고
https://github.com/nats-io/stan.net
C# ArvoConvert 사용하기 (0) | 2024.03.04 |
---|---|
C# Indexer 사용하기 (0) | 2024.03.04 |
C# Quartz Scheduler 사용하기 - Cron 표기법 사용 (0) | 2024.02.29 |
C# IDataReader 를 이용해 List<T> 로 변환해서 사용하는 방법 (0) | 2024.02.27 |
C# ExpandoObject 객체 Property 값 알아내는 방법 (0) | 2024.02.27 |