1. 发布线程
import java.util.concurrent.TimeUnit;import redis.clients.jedis.Jedis;public class Publish { public static String CHANNEL ="cctv.news"; public static String[] news = new String[] {"We are brocasting Boxing", "We are brocasting night news", "We are brocasting Cooking", "We are brocating Football"}; public static void getPublish() throws InterruptedException { Jedis jedis = JedisPoolClass.getResource(); for(String curr:news) { jedis.publish(CHANNEL, curr); TimeUnit.SECONDS.sleep(1); } jedis.close(); }}
2. 订阅监听类
import redis.clients.jedis.JedisPubSub;public class SubscriberListener extends JedisPubSub{ @Override public void onMessage(String channel, String message) { super.onMessage(channel, message); System.out.println("get message from "+channel+" and the message is"+message); } @Override public void onPMessage(String pattern, String channel, String message) { super.onPMessage(pattern, channel, message); System.out.println("get message from "+channel+" and the message is"+message); } @Override public void onPSubscribe(String pattern, int subscribedChannels) { super.onPSubscribe(pattern, subscribedChannels); System.out.println("onPubscribe the channel "+subscribedChannels); } @Override public void onPUnsubscribe(String pattern, int subscribedChannels) { super.onPUnsubscribe(pattern, subscribedChannels); System.out.println("onPubscribe the channel "+subscribedChannels); } @Override public void onSubscribe(String channel, int subscribedChannels) { super.onSubscribe(channel, subscribedChannels); System.out.println("onSubscribe the channel "+subscribedChannels); } @Override public void onUnsubscribe(String channel, int subscribedChannels) { super.onUnsubscribe(channel, subscribedChannels); System.out.println("onUnSubscribe the channel "+subscribedChannels); } }
3.连接池
import redis.clients.jedis.Jedis;import redis.clients.jedis.JedisPool;public class JedisPoolClass { private static JedisPool pool = null; static { pool = new JedisPool("localhost", 6379); } public static Jedis getResource() { return pool.getResource(); }}
4.测试类
/** * 必须得先发布再订阅,但是先发布再订阅会导致第一个发布的消息丢失,在实际应用的过程中要考虑到这点 */ private static void PubSubTest() { Jedis jedis = JedisPoolClass.getResource(); Thread thread = new Thread(new Runnable() { @Override public void run() { try { Publish.getPublish(); } catch (InterruptedException e) { e.printStackTrace(); } } }); thread.start(); jedis.subscribe(new SubscriberListener(), Publish.CHANNEL); }