You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
120 lines
4.2 KiB
120 lines
4.2 KiB
9 months ago
|
using Confluent.Kafka;
|
||
|
using FangYar.Model.TBL;
|
||
|
using Newtonsoft.Json;
|
||
|
using System;
|
||
|
using System.Collections.Generic;
|
||
|
using System.Linq;
|
||
|
using System.Text;
|
||
|
using System.Threading;
|
||
|
using System.Threading.Tasks;
|
||
|
|
||
|
namespace FangYar.Common
|
||
|
{
|
||
|
/// <summary>
|
||
|
/// Kafka操作帮助类
|
||
|
/// </summary>
|
||
|
public class KafkaHelper
|
||
|
{
|
||
|
/// <summary>
|
||
|
/// 设置是否记录操作日志到Kafka
|
||
|
/// </summary>
|
||
|
public static string IsKafkaLog = System.Configuration.ConfigurationManager.AppSettings["IsKafkaLog"] + "";
|
||
|
|
||
|
/// <summary>
|
||
|
/// Kafka服务器地址
|
||
|
/// </summary>
|
||
|
public static string KafkaServer = System.Configuration.ConfigurationManager.AppSettings["KafkaServer"] + "";
|
||
|
|
||
|
/// <summary>
|
||
|
/// Kafka服务器地址
|
||
|
/// </summary>
|
||
|
public static string KafkaServerKey = System.Configuration.ConfigurationManager.AppSettings["KafkaServerKey"] + "";
|
||
|
|
||
|
private static int index = 0;
|
||
|
private static IProducer<string, object> producer = null;
|
||
|
|
||
|
/// <summary>
|
||
|
/// 添加记录至于KafKa
|
||
|
/// </summary>
|
||
|
/// <param name="MsgJson"></param>
|
||
|
/// <returns></returns>
|
||
|
public static void SendKafKaMsg(Tbl_Sys_Operation_Log_Model addMo)
|
||
|
{
|
||
|
|
||
|
// 将线程丢进线程池,参数是线程要做的事情
|
||
|
ThreadPool.QueueUserWorkItem(t =>
|
||
|
{
|
||
|
try
|
||
|
{
|
||
|
|
||
|
addMo.ID = Guid.NewGuid().ToString("N");
|
||
|
addMo.Client_IP = addMo.Client_IP.Replace("\r\n", "").Replace("\n", "").Replace("\r", "").Replace(" ", "");
|
||
|
addMo.createtime = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss");
|
||
|
//string MsgJson = JsonHelper.ToJSON1(addMo);
|
||
|
//ProducerConfig config = new ProducerConfig();
|
||
|
//config.BootstrapServers = KafkaServer;
|
||
|
//var builder = new ProducerBuilder<string, object>(config);
|
||
|
//builder.SetValueSerializer(new KafkaConverter());//设置序列化方式
|
||
|
//var producer = builder.Build();
|
||
|
|
||
|
if (producer == null)
|
||
|
{
|
||
|
ProducerConfig config = new ProducerConfig();
|
||
|
config.BootstrapServers = KafkaServer;
|
||
|
var builder = new ProducerBuilder<string, object>(config);
|
||
|
builder.SetValueSerializer(new KafkaConverter());//设置序列化方式
|
||
|
producer = builder.Build();
|
||
|
}
|
||
|
|
||
|
//producer.Produce(KafkaServerKey, new Message<string, object>() { Key = KafkaServerKey, Value = addMo });
|
||
|
var i = index % 10;
|
||
|
producer.Produce(new TopicPartition(KafkaServerKey, new Partition(i)), new Message<string, object>() { Key = KafkaServerKey, Value = addMo });
|
||
|
index++;
|
||
|
if (index > 10)
|
||
|
{
|
||
|
i = 0;
|
||
|
}
|
||
|
|
||
|
producer.Flush(TimeSpan.FromSeconds(10));
|
||
|
|
||
|
}
|
||
|
catch (Exception ex)
|
||
|
{
|
||
|
|
||
|
string str = "添加Kafka数据异常:" + ex;
|
||
|
FangYar.Common.MyLogHelper.WriteMsg(new FangYar.Common.LogInfoMo()
|
||
|
{
|
||
|
message = str,
|
||
|
msgType = FangYar.Common.EnumLogMsgTypeEnum.Error,
|
||
|
path = "SysOperationLogKafka"
|
||
|
});
|
||
|
}
|
||
|
});
|
||
|
|
||
|
//new Thread(() =>
|
||
|
//{
|
||
|
|
||
|
|
||
|
//}).Start();
|
||
|
|
||
|
}
|
||
|
|
||
|
}
|
||
|
|
||
|
public class KafkaConverter : ISerializer<object>
|
||
|
{
|
||
|
/// <summary>
|
||
|
/// 序列化数据成字节
|
||
|
/// </summary>
|
||
|
/// <param name="data"></param>
|
||
|
/// <param name="context"></param>
|
||
|
/// <returns></returns>
|
||
|
public byte[] Serialize(object data, SerializationContext context)
|
||
|
{
|
||
|
var json = JsonConvert.SerializeObject(data);
|
||
|
json = json.Replace("\r\n", "").Replace("\n", "").Replace("\r", "");
|
||
|
return Encoding.UTF8.GetBytes(json);
|
||
|
}
|
||
|
}
|
||
|
}
|