软测单独项目
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

119 lines
4.2 KiB

using Confluent.Kafka;
using FangYar.Model.TBL;
using Newtonsoft.Json;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace FangYar.Common
{
/// <summary>
/// Kafka操作帮助类
/// </summary>
public class KafkaHelper
{
/// <summary>
/// 设置是否记录操作日志到Kafka
/// </summary>
public static string IsKafkaLog = System.Configuration.ConfigurationManager.AppSettings["IsKafkaLog"] + "";
/// <summary>
/// Kafka服务器地址
/// </summary>
public static string KafkaServer = System.Configuration.ConfigurationManager.AppSettings["KafkaServer"] + "";
/// <summary>
/// Kafka服务器地址
/// </summary>
public static string KafkaServerKey = System.Configuration.ConfigurationManager.AppSettings["KafkaServerKey"] + "";
private static int index = 0;
private static IProducer<string, object> producer = null;
/// <summary>
/// 添加记录至于KafKa
/// </summary>
/// <param name="MsgJson"></param>
/// <returns></returns>
public static void SendKafKaMsg(Tbl_Sys_Operation_Log_Model addMo)
{
// 将线程丢进线程池,参数是线程要做的事情
ThreadPool.QueueUserWorkItem(t =>
{
try
{
addMo.ID = Guid.NewGuid().ToString("N");
addMo.Client_IP = addMo.Client_IP.Replace("\r\n", "").Replace("\n", "").Replace("\r", "").Replace(" ", "");
addMo.createtime = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss");
//string MsgJson = JsonHelper.ToJSON1(addMo);
//ProducerConfig config = new ProducerConfig();
//config.BootstrapServers = KafkaServer;
//var builder = new ProducerBuilder<string, object>(config);
//builder.SetValueSerializer(new KafkaConverter());//设置序列化方式
//var producer = builder.Build();
if (producer == null)
{
ProducerConfig config = new ProducerConfig();
config.BootstrapServers = KafkaServer;
var builder = new ProducerBuilder<string, object>(config);
builder.SetValueSerializer(new KafkaConverter());//设置序列化方式
producer = builder.Build();
}
//producer.Produce(KafkaServerKey, new Message<string, object>() { Key = KafkaServerKey, Value = addMo });
var i = index % 10;
producer.Produce(new TopicPartition(KafkaServerKey, new Partition(i)), new Message<string, object>() { Key = KafkaServerKey, Value = addMo });
index++;
if (index > 10)
{
i = 0;
}
producer.Flush(TimeSpan.FromSeconds(10));
}
catch (Exception ex)
{
string str = "添加Kafka数据异常:" + ex;
FangYar.Common.MyLogHelper.WriteMsg(new FangYar.Common.LogInfoMo()
{
message = str,
msgType = FangYar.Common.EnumLogMsgTypeEnum.Error,
path = "SysOperationLogKafka"
});
}
});
//new Thread(() =>
//{
//}).Start();
}
}
public class KafkaConverter : ISerializer<object>
{
/// <summary>
/// 序列化数据成字节
/// </summary>
/// <param name="data"></param>
/// <param name="context"></param>
/// <returns></returns>
public byte[] Serialize(object data, SerializationContext context)
{
var json = JsonConvert.SerializeObject(data);
json = json.Replace("\r\n", "").Replace("\n", "").Replace("\r", "");
return Encoding.UTF8.GetBytes(json);
}
}
}