C#使用rabbitmq在接收消息事件处理中报错:
Already closed: The AMQP operation was interrupted: AMQP close-reason, initiated by Peer, code=505, text='UNEXPECTED_FRAME - expected content header for class 60, got non content header frame instead', classId=60, methodId=40
解决办法是将接收事件代码里面末尾加个线程休眠“System.Threading.Thread.Sleep(1);”文章来源:https://uudwc.com/A/9d1Wj
/// <summary>
/// 监听消息队里的消息
/// </summary>
/// <param name="func">外围业务方法</param>
public static void Receive(Func<string, bool> func)
{
try
{
//如果未连接则重连连接队列
AgainInitMessageQueue();
//创建消费者对象
var consumer = new EventingBasicConsumer(channel);
//监听消费事件,如果执行shutdown了,此事件会执行失败
consumer.Received += (model, ea) =>
{
//接收到的消息
var message = Encoding.UTF8.GetString(ea.Body.Span);
//委托外围方法处理业务
var result = func(message);
if (result)
{
//业务处理成功后单条通知生产者
channel.BasicAck(ea.DeliveryTag, true);//手动应答MQ已经成功接收,批量应答
LogHelper.Info(typeof(MessageQueueFactory), $"MessageQueueFactory.Receive()-消费成功,message={message}");
}
else
{
//业务处理失败需要重回队列等待消费
//channel.BasicReject(ea.DeliveryTag, true);//拒绝接收单条消息,是否重回队列
LogHelper.Error(typeof(MessageQueueFactory), $"MessageQueueFactory.Receive()-业务处理失败,进入缓存等待重新入列,message={message}", true);
//###为了防止队里一直处于等到出列,导致后面数据无法消费,此处自动消费成功,进入缓存等待重新入列消费####
//添加处理失败的数据进缓存中
//添加消息重试次数缓存 超过上限不再进行重试
var mqBaseModel = SerializerHelper.DeserializerJson<MQBaseModel>(message);
if (mqBaseModel != null)
{
int retryTimes = ConvertBasic.ToInt32(RedisClient.GetValue<int>(CacheKey.SendMsgFailRetryKey + mqBaseModel.MQCode));
RedisClient.SetValue(CacheKey.SendMsgFailRetryKey + mqBaseModel.MQCode, retryTimes + 1);
}
var ret = RedisClient.SAdd(CacheKey.SendMsgFailListKey, message);
if (!ret)
{
LogHelper.Error(typeof(MessageQueueFactory), $"MessageQueueFactory.Receive()-添加处理失败的数据进缓存中失败,message={message}", true);
}
channel.BasicAck(ea.DeliveryTag, true);//手动应答MQ已经成功接收,批量应答
}
Thread.Sleep(1);
//此行代码必须,不然写入队列消息会报错
//错误消息:Already closed: The AMQP operation was interrupted: AMQP close-reason, initiated by Peer, code=505, text='UNEXPECTED_FRAME - expected content header for class 60, got non content header frame instead', classId=60, methodId=40
};
//监听shutdown事件
consumer.Shutdown += (model, e) =>
{
//记录日志
LogHelper.Error(typeof(MessageQueueFactory), $"MessageQueueFactory.Receive()-shutdown被执行,队列监听失败e={SerializerHelper.SerializerJson(e)}", true);
};
//消费者开启监听,手动应答
channel.BasicQos(0, 1, false);//逐条消费
channel.BasicConsume(queue: MQNameConfig.CommentQueue, autoAck: false, consumer: consumer);
}
catch (Exception ex)
{
LogHelper.Error(typeof(MessageQueueFactory), $"MessageQueueFactory.Receive()-监听消息队里的消息出错,{ex.Message}", ex, true);
}
}
文章来源地址https://uudwc.com/A/9d1Wj