博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
几种队列
阅读量:4681 次
发布时间:2019-06-09

本文共 9906 字,大约阅读时间需要 33 分钟。

 c#中自带的队列

 使用C#自带的队列,一般会把进行队列监听的代码放于Global.asax之类的文件或寄宿windows服务之中,与应用服务器在同一台,会抢占服务器资源。后面会介绍使用其他分布式队列。

来一个简单的示例:

队列帮助类

namespace queue{    public sealed class QueueHelper    {        private static QueueHelper queue;        Queue
queueobj = new Queue
(); public static QueueHelper instance { get { if (queue==null) { queue = new QueueHelper(); } return queue; } } ///
/// 向队列中添加数据 /// ///
///
///
public void AddQueue(int id,int age, string name) { queueobj.Enqueue(new People() { ID=id, Age = age, Name=name }); } ///
/// 当前队列数据量 /// ///
public int Count() { return queueobj.Count(); } //启动一个线程去监听队列 public void StartQueue() { Task t = Task.Run(() => { ScanQueue(); }); } private void ScanQueue() { while (true) { if (queueobj.Count > 0) { try { //从队列中取出 People queueinfo = queueobj.Dequeue(); Console.WriteLine($"取得队列:ID={queueinfo.ID},name={queueinfo.Name},age={queueinfo.Age}"); } catch (Exception ex) { throw; } } else { Console.WriteLine("没有数据,先休眠5秒在扫描"); Thread.Sleep(5000); } } } }}

Main方法

    static void Main(string[] args)        {            Console.WriteLine("当前队列数:"+ QueueHelper.instance.Count());            Console.WriteLine("开启线程扫描");            QueueHelper.instance.StartQueue();            Thread.Sleep(10000);            QueueHelper.instance.AddQueue(1,25,"小王");            QueueHelper.instance.AddQueue(2, 28, "小明");            QueueHelper.instance.AddQueue(3, 99, "大爷");            Console.ReadLine();        }

运行结果

 redis队列

 对可靠性和稳定性要求不高的应用场景,可以使用redis简单方便的实现

关于redis队列的实现方式有两种:

1、生产者消费者模式(List)。

2、发布者订阅者模式(pub/sub)。

驱动:StackExchange.Redis   

以下为第一种方式示例

RedisHelper帮助类

public class RedisHelper    {        // redis实例        private static RedisHelper instance = null;        private IDatabase db;        private ConnectionMultiplexer redis;        private IServer redisServer;        private readonly string _enqueueName = "PersonObj";        ///         /// 静态单例方法        ///         /// 
public static RedisHelper Get() { if (instance == null) { instance = new RedisHelper(); } return instance; } /// /// 无参数构造函数 /// private RedisHelper() { var redisConnection = "127.0.0.1:6379"; redis = ConnectionMultiplexer.Connect(redisConnection); redisServer = redis.GetServer(redisConnection); db = redis.GetDatabase(); } /// /// 入队 /// ///
/// 队列名称 /// public void EnqueueItem
( T value) { //序列化 var valueString = JsonConvert.SerializeObject(value); db.ListLeftPushAsync(_enqueueName, valueString); } ///
/// 出队 /// ///
///
队列名称 ///
public T DequeueItem
() {
var valueString = db.ListRightPopAsync(_enqueueName).Result;        //反序列化 T obj = JsonConvert.DeserializeObject
(valueString); return obj; } ///
/// 当前队列数据量 /// ///
///
public long Count() { return db.ListLengthAsync(_enqueueName).Result; } //启动一个线程去监听队列 public void StartQueue() { Task t = Task.Run(() => { ScanQueue(); }); } private void ScanQueue() { while (true) { if (this.Count() > 0) { try { //从队列中取出 Person queueinfo = DequeueItem
(); Console.WriteLine($"取得队列:name={queueinfo.Name},age={queueinfo.Age}"); } catch (Exception ex) { throw; } } else { Console.WriteLine("没有数据,先休眠5秒在扫描"); Thread.Sleep(5000); } } } }

 Main方法:

      static void Main(string[] args)        {            Console.WriteLine($"当前队列数{RedisHelper.Get().Count()}");            Console.WriteLine("开启线程扫描");            RedisHelper.Get().StartQueue();            Thread.Sleep(10000);            RedisHelper.Get().EnqueueItem
(new Person { Name="小王",Age=20}); RedisHelper.Get().EnqueueItem
(new Person { Name = "小明", Age = 20 }); Console.WriteLine($"当前队列数{RedisHelper.Get().Count()}"); Console.ReadLine(); }

运行结果

 

RabbitMQ

 RabbitMQ是一个开源的AMQP实现,服务器端用Erlang语言编写,支持多种客户端,如:Python、Ruby、.NET、Java等,且支持AJAX。用于在分布式系统中存储转发消息,具体特点包括易用性、扩展性、高可用性、消息集群等

关于RabbitMQ的教程可以参考:https://www.cnblogs.com/julyluo/p/6262553.html

安装教程可以参考:https://www.cnblogs.com/ericli-ericli/p/5902270.html

 RabbitMq默认的监听端口是15672

public class RabbitmqHelper    {        private static RabbitmqHelper instance;        private readonly ConnectionFactory rabbitMqFactory;        //交换器名称        const string ExchangeName = "myExchange";        //当前消息队列名称        const string QueueName = "myFirstQueue";        public RabbitmqHelper()        {            //没有设置账号密码端口或服务端没有先开启权限会报:None of the specified endpoints were reachable            rabbitMqFactory = new ConnectionFactory { HostName = "192.168.0.112" };            rabbitMqFactory.Port = 5672;            rabbitMqFactory.UserName = "123456";            rabbitMqFactory.Password = "123456";        }        ///         /// 静态单例方法        ///         /// 
public static RabbitmqHelper Get() { if (instance == null) { instance = new RabbitmqHelper(); } return instance; } /// /// 生产者-发布消息 /// /// 消息 public void Enqueue(string msg) { using (IConnection conn = rabbitMqFactory.CreateConnection()) { using (IModel channel = conn.CreateModel()) { //定义交换器 channel.ExchangeDeclare(ExchangeName, "direct", durable: true, autoDelete: false, arguments: null); //持久化一个队列,如果名称相同不会重复创建 channel.QueueDeclare(QueueName, true, false, false, null); //定义exchange到queue的binding channel.QueueBind(QueueName, ExchangeName, routingKey: QueueName); byte[] bytes = Encoding.UTF8.GetBytes(msg); //设置消息持久化 IBasicProperties properties = channel.CreateBasicProperties(); properties.Persistent = true; channel.BasicPublish("", QueueName, properties, bytes); } } } /// /// 消费者-接收消息-基于订阅模式 /// /// 消息 public void Dequeue() { IConnection conn = rabbitMqFactory.CreateConnection(); IModel channel = conn.CreateModel(); //持久化一个队列,如果名称相同不会重复创建 channel.QueueDeclare(QueueName, durable: true, autoDelete: false, exclusive: false, arguments: null); //告诉broker同一时间只处理一个消息 channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false); //因处理性能问题,已被官方弃用 //var consumer2 = new QueueingBasicConsumer(channel) //var msgResponse = consumer.Queue.Dequeue(); var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { try { var msgBody = Encoding.UTF8.GetString(ea.Body); Console.WriteLine($"收到消息:{msgBody}"); //处理完成,服务端可以删除消息了同时分配新的消息 channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); } catch (Exception e) { Console.WriteLine($"出现错误:{e.Message}"); } }; //noAck设置false,发送消息之后,消息不要主动删除,先等消费者处理完 channel.BasicConsume(QueueName, false, consumer); } /// /// 消费者-接收消息-主动拉取 /// /// 消息 public void Dequeue2() { using (IConnection conn = rabbitMqFactory.CreateConnection()) { using (IModel channel = conn.CreateModel()) { while (true) { BasicGetResult res = channel.BasicGet(QueueName, false/*noAck*/); if (res != null) { var msg = System.Text.UTF8Encoding.UTF8.GetString(res.Body); Console.WriteLine($"获取到消息:{msg}"); channel.BasicAck(res.DeliveryTag, false); } } } } } }
View Code

 

转载于:https://www.cnblogs.com/qiuguochao/p/9113919.html

你可能感兴趣的文章
linux 里 /etc/passwd 、/etc/shadow和/etc/group 文件内容解释
查看>>
一.并发编程 (进程操作系统简介)
查看>>
深入理解vsto,开发word插件的利器
查看>>
PHP 在5.1.* 和5.2.*之间 PDO数据库操作中的不同!
查看>>
Laravel学习笔记之乱七八糟
查看>>
导入properties时的坑
查看>>
java校验maven下载的jar文件
查看>>
python——网络编程
查看>>
C++练习01 ---- 打印杨辉三角
查看>>
关于ajax中执行 window.location.href不跳转问题
查看>>
冲鸭队(第二周)
查看>>
【Luogu】P1144最短路计数(BFS)
查看>>
android 的闪屏效果
查看>>
Python还是很重要的,不能丢。学习IF和WHILE
查看>>
浅谈C++多态性
查看>>
金牌架构师:我们是这样设计APP数据统计产品的
查看>>
导出python的环境
查看>>
多维数据库 Oracle Essbase 和 IBM Cogons 底层原理
查看>>
各种小结
查看>>
virtualbox--在win7设置ubuntu虚拟机网络
查看>>