如何使用Jedis操作Redis消息队列的使用

Java使用消息中间件redis的问题总结
Java中使用消息中间件redis已经是一个月前的事情了,后面被经理发配到设计了,现在有空闲就梳理总结一下,我学习并使用redis过程中遇到的一些问题。
& & 第一阶段 学习阶段
由于公司有部分功能存在高并发的问题,导致客户端卡死,客户端重复请求微信的数量太多导致数据库中存在很多一毛一样的数据(数据库中以自增id为主键,也没有唯一性约束),所以经理就要求我用redis(当时我并不知道这是个啥,一脸懵逼)来解决这个需求,不过,我也比较好奇是个什么东西,后来看了一下(其实github有开源社区),具体需求:请求虽多,但是从微信返回的成功的信息只能处理一遍,而且要保证客户端的友好型和响应速度。好了,看了redis之后,有列表(list)类型和集合(set)两种数据类型有待考虑,最终还是考虑用list列表来玩了,主要是考虑list可用于生产者-消费者模式和发布者-订阅者模式,比如:可以改进推送,邮件发送等功能。
上图就是描述生产者-消费者模式,可以一对多,多对一,多对多,都是可以滴,当然List消息队列也是可以有多个的,其中的brpop是线程阻塞的,不浪费资源,毕竟我用的是一个死循环的线程。
在我所在项目中,有名为payMoney,coreWechat两个消息队列,payMoney是用于支付成功,生成销售出库单、销售收款单,coreWeChat是用于微信关注过程中,部分复杂的逻辑处理同步返回结果,异步处理,总而言之,调节系统在某些功能中,由于高并发引起的响应速度慢,数据原子性处理等问题。
&&注:图中FIFO是表示List消息队列先进先出规则。
发布者-订阅者模式是生产者-消费者模式的一个特例,我在这里就不多做说明了,简而言之,就是发布者发布了一条信息之后,所有订阅者就会收到这条信息,就相当于上图的一个生产者,多个消费者的模式(即一对多)。
& 至于redis相关的数据结构,参数配置的问题,官网很详细(虽然是英文的),我这有个redis入门指南传送门:/s/1bo0jgzd
& 第二阶段
要完成这个需求,修改的文件不多,添加的文件倒是挺多的,首先,安装redis并配置config,;其次,选择另一项目还是线程来的List消息队列进行监听,当然,我是选择线程咯,并且随项目启动而死循环;最后,由于项目是Struts2-Spring-hibernate,所以也不能坏了规矩,比如我,就有XXXService.,当然,别忘了添加配置文件咯;
废话也不多说,直接上我的监听文件代码了,当然合理的设计List消息队列(不管是个数还是功能)是很重要的,这会影响你创建的线程个数(这些线程就是异步处理的主体了,有什么问题,都可以去找他们的麻烦)。&
import java.util.L
javax.servlet.ServletContextE &
javax.servlet.ServletContextL
com.shujian.ptms.domain.base.wechat.message.RedisM
com.shujian.ptms.pay.OrderD
com.shujian.ptms.pay.WechatD
com.shujian.ptms.web.action.base.JedisU
com.shujian.ptms.web.action.base.ObjectU
public class OrderDealListener
implements ServletContextListener { &
& private MyThread paymoneyUpdC
& private MyThread paymoneyAddMyC
& private MyThread paymoneyUpdPe;
& private MyThread paymoneyUpdM
& private MyThread wechatUpdC
& public void contextDestroyed(ServletContextEvent
(paymoneyUpdCust != null &&
paymoneyUpdCust.isInterrupted()) { &
paymoneyUpdCust.interrupt(); &
(paymoneyAddMyCust != null &&
paymoneyAddMyCust.isInterrupted()) { &
paymoneyAddMyCust.interrupt(); &
(paymoneyUpdPe != null && paymoneyUpdPe.isInterrupted()) {
paymoneyUpdPe.interrupt(); &
(paymoneyUpdMarket != null &&
paymoneyUpdMarket.isInterrupted()) { &
paymoneyUpdMarket.interrupt(); &
(wechatUpdCust != null && wechatUpdCust.isInterrupted()) {
wechatUpdCust.interrupt(); &
& public void
contextInitialized(ServletContextEvent e) {
paymoneyUpdCust = new
MyThread("paymoneyUpdCust");//创建处理销售出库单、销售收款单的线程
paymoneyAddMyCust=new MyThread("paymoneyAddMyCust");
paymoneyUpdPe=new MyThread("paymoneyUpdPe");
paymoneyUpdMarket=new MyThread("paymoneyUpdMarket");
wechatUpdCust=new MyThread("wechatUpdCust");//创建处理WeChat关注的线程
paymoneyUpdCust.start(); // servlet 上下文初始化时启动 socket
paymoneyAddMyCust.start();
paymoneyUpdPe.start();
paymoneyUpdMarket.start();
wechatUpdCust.start();
class MyThread extends Thread {
private String
//备注每个线程的名字
& public MyThread(String name) {
& this.name=
@SuppressWarnings("static-access")
public void run() {
& & & while
(!this.isInterrupted()) {// 线程未中断执行循环 &
& & & List
switch(this.getName()){//判断是哪一个消息队列的监听
& & & case
"payMoney":
bytes=JedisUtil.brpop("payMoney".getBytes());
& & & case
"coreWechat":
bytes=JedisUtil.brpop("coreWechat".getBytes());
this.sleep(20000);//相隔20秒钟运行一次
} catch (InterruptedException e1)
e1.printStackTrace();
} //每隔2000ms执行一次
if(bytes==null){//如果bytes是NULL的话,我们就直接跳过本次循环操作
RedisMessage
redisMessage=(RedisMessage)ObjectUtil.bytesToObject(bytes.get(0));//将传递过来的二进制数据反序列化成对象类型
if(redisMessage.equals(null)){//如果得到的redis信息为NULL的话,就不用执行了
switch(this.getName()){ & &
& & & case
"paymoney":
OrderDeal.orderDeal(redisMessage);
& & & case
"coreWechat":
WechatDeal.wechatDeal(redisMessage);
System.out.println("点这么快,我真的是服了!!!");
& & & } catch
(Exception e) {
e.printStackTrace();
& 注:其中的switch("字符串")需要jdk1.7版本以上的,不然会报错滴。
说明一下,上面的程序,只是将List消息队列分类了而已,可以解决高并发的时候,微信相应慢导致客户端卡死的情况,但是,并不能解决重复请求导致数据库中存在重复数据的情况,所以,我们得另外建一个List列表或者集合来存放近期处理过的微信响应信息(当然,近期是多久合适呢?我也是不晓得,一直存着,太消耗内存不建议,我弄的是6小时的,没办法,其实内存也不怎么值钱了)。其中,RedisMessage中放的是key和value,当然,我这里的key是指消息队列的名字了,具体怎么用,还要看你自己咯!
什么?你想直接拷过去用?其实应用不一样,使用方式不一样的,我只能共享一个共用的使用jedispool操作redis的三个文件:jedispool.properties连接池参数配置文件,jedisutil.java操作redis相关方法及jedispool初始化,objectutil.java处理序列化与反序列化。
传送门:/s/1ge6MB4Z
当然,jedis相关包自己去下咯,另注明一点,jedispool初始化的时候,需要加同步(synchronize)。因为,公司其他的项目(投票,只持续1个月左右,提升交互速度也是不错的选择啊,ps:防止意外,还是有存入阿里云滴)中,是把redis直接放在主程序中的,初始化的时候,防止阻塞。
好了,就这么多吧,也没做集群(主从),还是需要大神带的,还有就是redis的配置和jedispool的配置信息,只能希望你看仔细一点了,收。
已投稿到:
以上网友发言只代表其个人观点,不代表新浪网的观点或立场。Redis实现简单消息队列 - 简书
Redis实现简单消息队列
任务异步化
打开浏览器,输入地址,按下回车,打开了页面。于是一个HTTP请求(request)就由客户端发送到服务器,服务器处理请求,返回响应(response)内容。
我们每天都在浏览网页,发送大大小小的请求给服务器。有时候,服务器接到了请求,会发现他也需要给另外的服务器发送请求,或者服务器也需要做另外一些事情,于是最初们发送的请求就被阻塞了,也就是要等待服务器完成其他的事情。
更多的时候,服务器做的额外事情,并不需要客户端等待,这时候就可以把这些额外的事情异步去做。从事异步任务的工具有很多。主要原理还是处理通知消息,针对通知消息通常采取是队列结构。生产和消费消息进行通信和业务实现。
生产消费与队列
上述异步任务的实现,可以抽象为生产者消费模型。如同一个餐馆,厨师在做饭,吃货在吃饭。如果厨师做了很多,暂时卖不完,厨师就会休息;如果客户很多,厨师马不停蹄的忙碌,客户则需要慢慢等待。实现生产者和消费者的方式用很多,下面使用Python标准库Queue写个小例子:
import random
import time
from Queue import Queue
from threading import Thread
queue = Queue(10)
class Producer(Thread):
def run(self):
while True:
elem = random.randrange(9)
queue.put(elem)
print "厨师 {} 做了 {} 饭 --- 还剩 {} 饭没卖完".format(self.name, elem, queue.qsize())
time.sleep(random.random())
class Consumer(Thread):
def run(self):
while True:
elem = queue.get()
print "吃货{} 吃了 {} 饭 --- 还有 {} 饭可以吃".format(self.name, elem, queue.qsize())
time.sleep(random.random())
def main():
for i in range(3):
p = Producer()
for i in range(2):
c = Consumer()
if __name__ == '__main__':
大概输出如下:
厨师 Thread-1 做了 1 饭 --- 还剩 1 饭没卖完
厨师 Thread-2 做了 8 饭 --- 还剩 2 饭没卖完
厨师 Thread-3 做了 3 饭 --- 还剩 3 饭没卖完
吃货Thread-4 吃了 1 饭 --- 还有 2 饭可以吃
吃货Thread-5 吃了 8 饭 --- 还有 1 饭可以吃
吃货Thread-4 吃了 3 饭 --- 还有 0 饭可以吃
厨师 Thread-1 做了 0 饭 --- 还剩 1 饭没卖完
厨师 Thread-2 做了 0 饭 --- 还剩 2 饭没卖完
厨师 Thread-1 做了 1 饭 --- 还剩 3 饭没卖完
厨师 Thread-1 做了 1 饭 --- 还剩 4 饭没卖完
吃货Thread-4 吃了 0 饭 --- 还有 3 饭可以吃
厨师 Thread-3 做了 3 饭 --- 还剩 4 饭没卖完
吃货Thread-5 吃了 0 饭 --- 还有 3 饭可以吃
吃货Thread-5 吃了 1 饭 --- 还有 2 饭可以吃
厨师 Thread-2 做了 8 饭 --- 还剩 3 饭没卖完
厨师 Thread-2 做了 8 饭 --- 还剩 4 饭没卖完
Redis 队列
Python内置了一个好用的队列结构。我们也可以是用redis实现类似的操作。并做一个简单的异步任务。
Redis提供了两种方式来作消息队列。一个是使用生产者消费模式模式,另外一个方法就是发布订阅者模式。前者会让一个或者多个客户端监听消息队列,一旦消息到达,消费者马上消费,谁先抢到算谁的,如果队列里没有消息,则消费者继续监听。后者也是一个或多个客户端订阅消息频道,只要发布者发布消息,所有订阅者都能收到消息,订阅者都是平等的。
生产消费模式(不建议使用)
主要使用了redis提供的blpop获取队列数据,如果队列没有数据则阻塞等待,也就是监听。
import redis
class Task(object):
def __init__(self):
self.rcon = redis.StrictRedis(host='localhost', db=5)
self.queue = 'task:prodcons:queue'
def listen_task(self):
while True:
task = self.rcon.blpop(self.queue, 0)[1]
print "Task get", task
if __name__ == '__main__':
print 'listen task queue'
Task().listen_task()
使用redis的brpop方式做队列,经过一段时间,会发现程序莫名其妙的卡主。也就是进程一切ok,redis的lpush也正常,唯独brpop不再消费。该问题十分不好复现,但是总是过了一段时间就会重现。本人采用了高并发,高延迟,弱网络环境等方式试图复现都没有成功,目前仍然在寻找解决方案。目测依赖redis做brokers的队列的celery也遇到同样的 ,并且其他语言也有类似,但是作者的解决方不适用。猜测问题的原因是redis在处理brpop的时候连接长时间不适用会自动假死。后来采用比较low的方案,每当凌晨3点左右重启一下队列服务。目前设置了更短的idle连接时间(config set timeout 10),再观察一下是否能复现。不建议在生成环境使用该方案。如果使用类似方案也遇到了问题,并且有了解决方案,希望您能联系我哈哈哈。升级了 redis 3.2 版本之后,运行了一个多月,目前没有再出现卡住的问题。
发布订阅模式
使用redis的pubsub功能,订阅者订阅频道,发布者发布消息到频道了,频道就是一个消息队列。
import redis
class Task(object):
def __init__(self):
self.rcon = redis.StrictRedis(host='localhost', db=5)
self.ps = self.rcon.pubsub()
self.ps.subscribe('task:pubsub:channel')
def listen_task(self):
for i in self.ps.listen():
if i['type'] == 'message':
print "Task get", i['data']
if __name__ == '__main__':
print 'listen task channel'
Task().listen_task()
Flask 入口
我们分别实现了两种异步任务的后端服务,直接启动他们,就能监听redis队列或频道的消息了。简单的测试如下:
import redis
import random
import logging
from flask import Flask, redirect
app = Flask(__name__)
rcon = redis.StrictRedis(host='localhost', db=5)
prodcons_queue = 'task:prodcons:queue'
pubsub_channel = 'task:pubsub:channel'
@app.route('/')
def index():
html = """
&center&&h3&Redis Message Queue&/h3&
&a href="/prodcons"&生产消费者模式&/a&
&a href="/pubsub"&发布订阅者模式&/a&
return html
@app.route('/prodcons')
def prodcons():
elem = random.randrange(10)
rcon.lpush(prodcons_queue, elem)
("lpush {} -- {}".format(prodcons_queue, elem))
return redirect('/')
@app.route('/pubsub')
def pubsub():
ps = rcon.pubsub()
ps.subscribe(pubsub_channel)
elem = random.randrange(10)
rcon.publish(pubsub_channel, elem)
return redirect('/')
if __name__ == '__main__':
app.run(debug=True)
启动脚本,使用
siege -c10 -r 5 http://127.0.0.1:5000/prodcons
siege -c10 -r 5 http://127.0.0.1:5000/pubsub
可以分别在监听的脚本输入中看到异步消息。在异步的任务中,可以执行一些耗时间的操作,当然目前这些做法并不知道异步的执行结果,如果需要知道异步的执行结果,可以考虑设计协程任务或者使用一些工具如RQ或者celery等。
艺术极客流氓为什么要用redis?二进制存储、java序列化传输、IO连接数高、连接频繁
一、序列化
  这里编写了一个java序列化的工具,主要是将对象转化为byte数组,和根据byte数组反序列化成java对象;&主要是用到了ByteArrayOutputStream和ByteArrayInputS&注意:每个需要序列化的对象都要实现Serializable接口; 其代码如下:
1 package U
2 import java.io.*;
* Created by Kinglf on .
6 public class ObjectUtil {
* 对象转byte[]
* @param obj
<span style="color: #
<span style="color: #
* @throws IOException
<span style="color: #
<span style="color: #
public static byte[] object2Bytes(Object obj) throws IOException{
<span style="color: #
ByteArrayOutputStream bo=new ByteArrayOutputStream();
<span style="color: #
ObjectOutputStream oo=new ObjectOutputStream(bo);
<span style="color: #
oo.writeObject(obj);
<span style="color: #
byte[] bytes=bo.toByteArray();
<span style="color: #
bo.close();
<span style="color: #
oo.close();
<span style="color: #
<span style="color: #
<span style="color: #
<span style="color: #
* byte[]转对象
<span style="color: #
* @param bytes
<span style="color: #
<span style="color: #
* @throws Exception
<span style="color: #
<span style="color: #
public static Object bytes2Object(byte[] bytes) throws Exception{
<span style="color: #
ByteArrayInputStream in=new ByteArrayInputStream(bytes);
<span style="color: #
ObjectInputStream sIn=new ObjectInputStream(in);
<span style="color: #
return sIn.readObject();
<span style="color: #
<span style="color: # }
二、消息类(实现Serializable接口)
import java.io.S
* Created by Kinglf on .
public class Message implements Serializable {
private static final long serialVersionUID = -047723L;
private int
public Message(int id, String content) {
this.content =
public int getId() {
public void setId(int id) {
public String getContent() {
public void setContent(String content) {
this.content =
三、Redis的操作
  利用redis做队列,我们采用的是redis中list的push和pop操作;&结合队列的特点:&只允许在一端插入新元素只能在队列的尾部FIFO:先进先出原则&Redis中lpush头入(rpop尾出)或rpush尾入(lpop头出)可以满足要求,而Redis中list药push或 pop的对象仅需要转换成byte[]即可&  java采用Jedis进行Redis的存储和Redis的连接池设置&上代码:
import redis.clients.jedis.J
import redis.clients.jedis.JedisP
import redis.clients.jedis.JedisPoolC
import java.util.L
import java.util.M
import java.util.S
* Created by Kinglf on .
public class JedisUtil {
private static String JEDIS_IP;
private static int JEDIS_PORT;
private static String JEDIS_PASSWORD;
private static JedisPool jedisP
//Configuration自行写的配置文件解析类,继承自Properties
Configuration conf=Configuration.getInstance();
JEDIS_IP=conf.getString("jedis.ip","127.0.0.1");
JEDIS_PORT=conf.getInt("jedis.port",6379);
JEDIS_PASSWORD=conf.getString("jedis.password",null);
JedisPoolConfig config=new JedisPoolConfig();
config.setMaxActive(5000);
config.setMaxIdle(256);
config.setMaxWait(5000L);
config.setTestOnBorrow(true);
config.setTestOnReturn(true);
config.setTestWhileIdle(true);
config.setMinEvictableIdleTimeMillis(60000L);
config.setTimeBetweenEvictionRunsMillis(3000L);
config.setNumTestsPerEvictionRun(-1);
jedisPool=new JedisPool(config,JEDIS_IP,JEDIS_PORT,60000);
* 获取数据
* @param key
public static String get(String key){
String value=null;
Jedis jedis=null;
jedis=jedisPool.getResource();
value=jedis.get(key);
}catch (Exception e){
jedisPool.returnBrokenResource(jedis);
e.printStackTrace();
}finally {
close(jedis);
private static void close(Jedis jedis) {
jedisPool.returnResource(jedis);
}catch (Exception e){
if(jedis.isConnected()){
jedis.quit();
jedis.disconnect();
public static byte[] get(byte[] key){
byte[] value = null;
Jedis jedis = null;
jedis = jedisPool.getResource();
value = jedis.get(key);
} catch (Exception e) {
//释放redis对象
jedisPool.returnBrokenResource(jedis);
e.printStackTrace();
} finally {
//返还到连接池
close(jedis);
public static void set(byte[] key, byte[] value) {
Jedis jedis = null;
jedis = jedisPool.getResource();
jedis.set(key, value);
} catch (Exception e) {
//释放redis对象
jedisPool.returnBrokenResource(jedis);
e.printStackTrace();
} finally {
//返还到连接池
close(jedis);
public static void set(byte[] key, byte[] value, int time) {
Jedis jedis = null;
jedis = jedisPool.getResource();
jedis.set(key, value);
jedis.expire(key, time);
} catch (Exception e) {
//释放redis对象
jedisPool.returnBrokenResource(jedis);
e.printStackTrace();
} finally {
//返还到连接池
close(jedis);
public static void hset(byte[] key, byte[] field, byte[] value) {
Jedis jedis = null;
jedis = jedisPool.getResource();
jedis.hset(key, field, value);
} catch (Exception e) {
//释放redis对象
jedisPool.returnBrokenResource(jedis);
e.printStackTrace();
} finally {
//返还到连接池
close(jedis);
public static void hset(String key, String field, String value) {
Jedis jedis = null;
jedis = jedisPool.getResource();
jedis.hset(key, field, value);
} catch (Exception e) {
//释放redis对象
jedisPool.returnBrokenResource(jedis);
e.printStackTrace();
} finally {
//返还到连接池
close(jedis);
* 获取数据
* @param key
public static String hget(String key, String field) {
String value = null;
Jedis jedis = null;
jedis = jedisPool.getResource();
value = jedis.hget(key, field);
} catch (Exception e) {
//释放redis对象
jedisPool.returnBrokenResource(jedis);
e.printStackTrace();
} finally {
//返还到连接池
close(jedis);
* 获取数据
* @param key
public static byte[] hget(byte[] key, byte[] field) {
byte[] value = null;
Jedis jedis = null;
jedis = jedisPool.getResource();
value = jedis.hget(key, field);
} catch (Exception e) {
//释放redis对象
jedisPool.returnBrokenResource(jedis);
e.printStackTrace();
} finally {
//返还到连接池
close(jedis);
public static void hdel(byte[] key, byte[] field) {
Jedis jedis = null;
jedis = jedisPool.getResource();
jedis.hdel(key, field);
} catch (Exception e) {
//释放redis对象
jedisPool.returnBrokenResource(jedis);
e.printStackTrace();
} finally {
//返还到连接池
close(jedis);
* 存储REDIS队列 顺序存储
key reids键名
value 键值
public static void lpush(byte[] key, byte[] value) {
Jedis jedis = null;
jedis = jedisPool.getResource();
jedis.lpush(key, value);
} catch (Exception e) {
//释放redis对象
jedisPool.returnBrokenResource(jedis);
e.printStackTrace();
} finally {
//返还到连接池
close(jedis);
* 存储REDIS队列 反向存储
key reids键名
value 键值
public static void rpush(byte[] key, byte[] value) {
Jedis jedis = null;
jedis = jedisPool.getResource();
jedis.rpush(key, value);
} catch (Exception e) {
//释放redis对象
jedisPool.returnBrokenResource(jedis);
e.printStackTrace();
} finally {
//返还到连接池
close(jedis);
* 将列表 source 中的最后一个元素(尾元素)弹出,并返回给客户端
key reids键名
destination 键值
public static void rpoplpush(byte[] key, byte[] destination) {
Jedis jedis = null;
jedis = jedisPool.getResource();
jedis.rpoplpush(key, destination);
} catch (Exception e) {
//释放redis对象
jedisPool.returnBrokenResource(jedis);
e.printStackTrace();
} finally {
//返还到连接池
close(jedis);
* 获取队列数据
public static List lpopList(byte[] key) {
List list = null;
Jedis jedis = null;
jedis = jedisPool.getResource();
list = jedis.lrange(key, 0, -1);
} catch (Exception e) {
//释放redis对象
jedisPool.returnBrokenResource(jedis);
e.printStackTrace();
} finally {
//返还到连接池
close(jedis);
* 获取队列数据
public static byte[] rpop(byte[] key) {
byte[] bytes = null;
Jedis jedis = null;
jedis = jedisPool.getResource();
bytes = jedis.rpop(key);
} catch (Exception e) {
//释放redis对象
jedisPool.returnBrokenResource(jedis);
e.printStackTrace();
} finally {
//返还到连接池
close(jedis);
public static void hmset(Object key, Map hash) {
Jedis jedis = null;
jedis = jedisPool.getResource();
jedis.hmset(key.toString(), hash);
} catch (Exception e) {
//释放redis对象
jedisPool.returnBrokenResource(jedis);
e.printStackTrace();
} finally {
//返还到连接池
close(jedis);
public static void hmset(Object key, Map hash, int time) {
Jedis jedis = null;
jedis = jedisPool.getResource();
jedis.hmset(key.toString(), hash);
jedis.expire(key.toString(), time);
} catch (Exception e) {
//释放redis对象
jedisPool.returnBrokenResource(jedis);
e.printStackTrace();
} finally {
//返还到连接池
close(jedis);
public static List hmget(Object key, String... fields) {
List result = null;
Jedis jedis = null;
jedis = jedisPool.getResource();
result = jedis.hmget(key.toString(), fields);
} catch (Exception e) {
//释放redis对象
jedisPool.returnBrokenResource(jedis);
e.printStackTrace();
} finally {
//返还到连接池
close(jedis);
public static Set hkeys(String key) {
Set result = null;
Jedis jedis = null;
jedis = jedisPool.getResource();
result = jedis.hkeys(key);
} catch (Exception e) {
//释放redis对象
jedisPool.returnBrokenResource(jedis);
e.printStackTrace();
} finally {
//返还到连接池
close(jedis);
public static List lrange(byte[] key, int from, int to) {
List result = null;
Jedis jedis = null;
jedis = jedisPool.getResource();
result = jedis.lrange(key, from, to);
} catch (Exception e) {
//释放redis对象
jedisPool.returnBrokenResource(jedis);
e.printStackTrace();
} finally {
//返还到连接池
close(jedis);
public static Map hgetAll(byte[] key) {
Map result = null;
Jedis jedis = null;
jedis = jedisPool.getResource();
result = jedis.hgetAll(key);
} catch (Exception e) {
//释放redis对象
jedisPool.returnBrokenResource(jedis);
e.printStackTrace();
} finally {
//返还到连接池
close(jedis);
public static void del(byte[] key) {
Jedis jedis = null;
jedis = jedisPool.getResource();
jedis.del(key);
} catch (Exception e) {
//释放redis对象
jedisPool.returnBrokenResource(jedis);
e.printStackTrace();
} finally {
//返还到连接池
close(jedis);
public static long llen(byte[] key) {
long len = 0;
Jedis jedis = null;
jedis = jedisPool.getResource();
jedis.llen(key);
} catch (Exception e) {
//释放redis对象
jedisPool.returnBrokenResource(jedis);
e.printStackTrace();
} finally {
//返还到连接池
close(jedis);
四、Configuration主要用于读取Redis的配置信息
import java.io.IOE
import java.io.InputS
import java.util.P
* Created by Kinglf on .
public class Configuration extends Properties {
private static final long serialVersionUID = -9943706L;
private static Configuration instance = null;
public static synchronized Configuration getInstance() {
if (instance == null) {
instance = new Configuration();
public String getProperty(String key, String defaultValue) {
String val = getProperty(key);
return (val == null || val.isEmpty()) ? defaultValue :
public String getString(String name, String defaultValue) {
return this.getProperty(name, defaultValue);
public int getInt(String name, int defaultValue) {
String val = this.getProperty(name);
return (val == null || val.isEmpty()) ? defaultValue : Integer.parseInt(val);
public long getLong(String name, long defaultValue) {
String val = this.getProperty(name);
return (val == null || val.isEmpty()) ? defaultValue : Integer.parseInt(val);
public float getFloat(String name, float defaultValue) {
String val = this.getProperty(name);
return (val == null || val.isEmpty()) ? defaultValue : Float.parseFloat(val);
public double getDouble(String name, double defaultValue) {
String val = this.getProperty(name);
return (val == null || val.isEmpty()) ? defaultValue : Double.parseDouble(val);
public byte getByte(String name, byte defaultValue) {
String val = this.getProperty(name);
return (val == null || val.isEmpty()) ? defaultValue : Byte.parseByte(val);
public Configuration() {
InputStream in = ClassLoader.getSystemClassLoader().getResourceAsStream("config.xml");
this.loadFromXML(in);
in.close();
} catch (IOException ioe) {
import Model.M
import Utils.JedisU
import Utils.ObjectU
import redis.clients.jedis.J
import java.io.IOE
* Created by Kinglf on .
public class TestRedisQueue {
public static byte[] redisKey = "key".getBytes();
} catch (IOException e) {
e.printStackTrace();
private static void init() throws IOException {
for (int i = 0; i & 1000000; i++) {
Message message = new Message(i, "这是第" + i + "个内容");
JedisUtil.lpush(redisKey, ObjectUtil.object2Bytes(message));
public static void main(String[] args) {
} catch (Exception e) {
e.printStackTrace();
private static void pop() throws Exception {
byte[] bytes = JedisUtil.rpop(redisKey);
Message msg = (Message) ObjectUtil.bytes2Object(bytes);
if (msg != null) {
System.out.println(msg.getId() + "----" + msg.getContent());
每执行一次pop()方法,结果如下:&br&1----这是第1个内容&br&2----这是第2个内容&br&3----这是第3个内容&br&4----这是第4个内容
至此,整个Redis消息队列的生产者和消费者代码已经完成
Message&需要传送的实体类(需实现Serializable接口)
Configuration&Redis的配置读取类,继承自Properties
ObjectUtil&将对象和byte数组双向转换的工具类
Jedis&通过消息队列的先进先出(FIFO)的特点结合Redis的list中的push和pop操作进行封装的工具类
阅读(...) 评论()}

我要回帖

更多关于 如何使用消息队列 的文章

更多推荐

版权声明:文章内容来源于网络,版权归原作者所有,如有侵权请点击这里与我们联系,我们将及时删除。

点击添加站长微信