using Confluent.Kafka; using FangYar.Model.TBL; using Newtonsoft.Json; using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading; using System.Threading.Tasks; namespace FangYar.Common { /// /// Kafka操作帮助类 /// public class KafkaHelper { /// /// 设置是否记录操作日志到Kafka /// public static string IsKafkaLog = System.Configuration.ConfigurationManager.AppSettings["IsKafkaLog"] + ""; /// /// Kafka服务器地址 /// public static string KafkaServer = System.Configuration.ConfigurationManager.AppSettings["KafkaServer"] + ""; /// /// Kafka服务器地址 /// public static string KafkaServerKey = System.Configuration.ConfigurationManager.AppSettings["KafkaServerKey"] + ""; private static int index = 0; private static IProducer producer = null; /// /// 添加记录至于KafKa /// /// /// public static void SendKafKaMsg(Tbl_Sys_Operation_Log_Model addMo) { // 将线程丢进线程池,参数是线程要做的事情 ThreadPool.QueueUserWorkItem(t => { try { addMo.ID = Guid.NewGuid().ToString("N"); addMo.Client_IP = addMo.Client_IP.Replace("\r\n", "").Replace("\n", "").Replace("\r", "").Replace(" ", ""); addMo.createtime = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss"); //string MsgJson = JsonHelper.ToJSON1(addMo); //ProducerConfig config = new ProducerConfig(); //config.BootstrapServers = KafkaServer; //var builder = new ProducerBuilder(config); //builder.SetValueSerializer(new KafkaConverter());//设置序列化方式 //var producer = builder.Build(); if (producer == null) { ProducerConfig config = new ProducerConfig(); config.BootstrapServers = KafkaServer; var builder = new ProducerBuilder(config); builder.SetValueSerializer(new KafkaConverter());//设置序列化方式 producer = builder.Build(); } //producer.Produce(KafkaServerKey, new Message() { Key = KafkaServerKey, Value = addMo }); var i = index % 10; producer.Produce(new TopicPartition(KafkaServerKey, new Partition(i)), new Message() { Key = KafkaServerKey, Value = addMo }); index++; if (index > 10) { i = 0; } producer.Flush(TimeSpan.FromSeconds(10)); } catch (Exception ex) { string str = "添加Kafka数据异常:" + ex; FangYar.Common.MyLogHelper.WriteMsg(new FangYar.Common.LogInfoMo() { message = str, msgType = FangYar.Common.EnumLogMsgTypeEnum.Error, path = "SysOperationLogKafka" }); } }); //new Thread(() => //{ //}).Start(); } } public class KafkaConverter : ISerializer { /// /// 序列化数据成字节 /// /// /// /// public byte[] Serialize(object data, SerializationContext context) { var json = JsonConvert.SerializeObject(data); json = json.Replace("\r\n", "").Replace("\n", "").Replace("\r", ""); return Encoding.UTF8.GetBytes(json); } } }