using Confluent.Kafka;
using FangYar.Model.TBL;
using Newtonsoft.Json;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace FangYar.Common
{
///
/// Kafka操作帮助类
///
public class KafkaHelper
{
///
/// 设置是否记录操作日志到Kafka
///
public static string IsKafkaLog = System.Configuration.ConfigurationManager.AppSettings["IsKafkaLog"] + "";
///
/// Kafka服务器地址
///
public static string KafkaServer = System.Configuration.ConfigurationManager.AppSettings["KafkaServer"] + "";
///
/// Kafka服务器地址
///
public static string KafkaServerKey = System.Configuration.ConfigurationManager.AppSettings["KafkaServerKey"] + "";
private static int index = 0;
private static IProducer producer = null;
///
/// 添加记录至于KafKa
///
///
///
public static void SendKafKaMsg(Tbl_Sys_Operation_Log_Model addMo)
{
// 将线程丢进线程池,参数是线程要做的事情
ThreadPool.QueueUserWorkItem(t =>
{
try
{
addMo.ID = Guid.NewGuid().ToString("N");
addMo.Client_IP = addMo.Client_IP.Replace("\r\n", "").Replace("\n", "").Replace("\r", "").Replace(" ", "");
addMo.createtime = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss");
//string MsgJson = JsonHelper.ToJSON1(addMo);
//ProducerConfig config = new ProducerConfig();
//config.BootstrapServers = KafkaServer;
//var builder = new ProducerBuilder(config);
//builder.SetValueSerializer(new KafkaConverter());//设置序列化方式
//var producer = builder.Build();
if (producer == null)
{
ProducerConfig config = new ProducerConfig();
config.BootstrapServers = KafkaServer;
var builder = new ProducerBuilder(config);
builder.SetValueSerializer(new KafkaConverter());//设置序列化方式
producer = builder.Build();
}
//producer.Produce(KafkaServerKey, new Message() { Key = KafkaServerKey, Value = addMo });
var i = index % 10;
producer.Produce(new TopicPartition(KafkaServerKey, new Partition(i)), new Message() { Key = KafkaServerKey, Value = addMo });
index++;
if (index > 10)
{
i = 0;
}
producer.Flush(TimeSpan.FromSeconds(10));
}
catch (Exception ex)
{
string str = "添加Kafka数据异常:" + ex;
FangYar.Common.MyLogHelper.WriteMsg(new FangYar.Common.LogInfoMo()
{
message = str,
msgType = FangYar.Common.EnumLogMsgTypeEnum.Error,
path = "SysOperationLogKafka"
});
}
});
//new Thread(() =>
//{
//}).Start();
}
}
public class KafkaConverter : ISerializer