二维码

jedis的publish/subscribe[转]含有redis源码解析

1460 人阅读 | 时间:2019年12月29日 23:25

首先使用redis客户端来进行publish与subscribe的功能是否能够正常运行。

打开redis服务器

[root@localhost ~]# redis-server /opt/redis-2.4.10/redis.conf  
[7719] 16 Apr 11:37:22 # Warning: 32 bit instance detected but no memory limit set. Setting 3.5 GB maxmemory limit with 'noeviction' policy now. 
[7719] 16 Apr 11:37:22 * Server started, Redis version 2.4.10 [7719] 16 Apr 11:37:22 # WARNING overcommit_memory is set to 0! Background save may fail under low memory condition. To fix this issue add 'vm.overcommit_memory = 1' to /etc/sysctl.conf and then reboot or run the command 'sysctl vm.overcommit_memory=1' for this to take effect.

打开一个客户端订阅一个news.sports的channel。

[root@localhost ~]# redis-cli 
redis 127.0.0.1:6379> subscribe news.sports 
Reading messages... (press Ctrl-C to quit) 
1) "subscribe" 
2) "news.sports" 
3) (integer) 1

可以看到已经开始了监听,向news.sports channel发布一条消息

[root@localhost ~]# redis-cli             
redis 127.0.0.1:6379> publish news.sports "kaka is back" (integer) 1

 订阅的客户端顺利收到消息

jedis的publish/subscribe[转]含有redis源码解析
redis 127.0.0.1:6379> subscribe news.sports 
Reading messages... (press Ctrl-C to quit) 
1) "subscribe" 
2) "news.sports" 
3) (integer) 1 
1) "message" 
2) "news.sports" 
3) "kaka is back"
jedis的publish/subscribe[转]含有redis源码解析

接下来使用jedis来进行发布/订阅的验证

发布消息是通过jedis.publish(String channel, String message)来发布的,其实就是往redis服务器发布一条publish命令。

public void publish(final byte[] channel, final byte[] message) { 
       sendCommand(PUBLISH, channel, message); 
   }

订阅消息是通过jedis.subscribe(JedisPub pub,String channel)来进行的,channel好理解,那么JedisPub是什么呢。

 看源码吧。

 Jedis订阅方法的源码为

jedis的publish/subscribe[转]含有redis源码解析
public void subscribe(JedisPubSub jedisPubSub, String... channels) { 
      checkIsInMulti(); 
      connect(); 
      client.setTimeoutInfinite(); 
      jedisPubSub.proceed(client, channels); 
      client.rollbackTimeout(); 
  }
jedis的publish/subscribe[转]含有redis源码解析

可以看到,主要是通过jedisPubSub.proceed(client, channels);来进行订阅的。看proceed方法。

public void proceed(Client client, String... channels) { 
       this.client = client; 
       client.subscribe(channels); 
       client.flush(); 
       process(client); 
   }

追踪client.subscribe(channels)可以看到,

public void subscribe(final byte[]... channels) { 
       sendCommand(SUBSCRIBE, channels); 
   }

其只是向服务器发送了一个subcribe的命令而已。

那么要了解jedisPubSub的作用,只能看process方法了。简单看process其实是一个do...while循环

private void process(Client client) { 
       do { 
           
       } while (isSubscribed()); 
   }

我们可以猜测正是靠着这个循环来不断的读取服务器那边传到来的订阅的消息。

看主体

jedis的publish/subscribe[转]含有redis源码解析
List<Object> reply = client.getObjectMultiBulkReply(); 
           final Object firstObj = reply.get(0); 
           if (!(firstObj instanceof byte[])) { 
               throw new JedisException("Unknown message type: " + firstObj); 
           } 
           final byte[] resp = (byte[]) firstObj; 
           if (Arrays.equals(SUBSCRIBE.raw, resp)) { 
               subscribedChannels = ((Long) reply.get(2)).intValue(); 
               final byte[] bchannel = (byte[]) reply.get(1); 
               final String strchannel = (bchannel == null) ? null 
                       : SafeEncoder.encode(bchannel); 
               onSubscribe(strchannel, subscribedChannels); 
           } else if (Arrays.equals(UNSUBSCRIBE.raw, resp)) { 
               subscribedChannels = ((Long) reply.get(2)).intValue(); 
               final byte[] bchannel = (byte[]) reply.get(1); 
               final String strchannel = (bchannel == null) ? null 
                       : SafeEncoder.encode(bchannel); 
               onUnsubscribe(strchannel, subscribedChannels); 
           } else if (Arrays.equals(MESSAGE.raw, resp)) { 
               final byte[] bchannel = (byte[]) reply.get(1); 
               final byte[] bmesg = (byte[]) reply.get(2); 
               final String strchannel = (bchannel == null) ? null 
                       : SafeEncoder.encode(bchannel); 
               final String strmesg = (bmesg == null) ? null : SafeEncoder 
                       .encode(bmesg); 
               onMessage(strchannel, strmesg); 
           } else if (Arrays.equals(PMESSAGE.raw, resp)) { 
               final byte[] bpattern = (byte[]) reply.get(1); 
               final byte[] bchannel = (byte[]) reply.get(2); 
               final byte[] bmesg = (byte[]) reply.get(3); 
               final String strpattern = (bpattern == null) ? null 
                       : SafeEncoder.encode(bpattern); 
               final String strchannel = (bchannel == null) ? null 
                       : SafeEncoder.encode(bchannel); 
               final String strmesg = (bmesg == null) ? null : SafeEncoder 
                       .encode(bmesg); 
               onPMessage(strpattern, strchannel, strmesg); 
           } else if (Arrays.equals(PSUBSCRIBE.raw, resp)) { 
               subscribedChannels = ((Long) reply.get(2)).intValue(); 
               final byte[] bpattern = (byte[]) reply.get(1); 
               final String strpattern = (bpattern == null) ? null 
                       : SafeEncoder.encode(bpattern); 
               onPSubscribe(strpattern, subscribedChannels); 
           } else if (Arrays.equals(PUNSUBSCRIBE.raw, resp)) { 
               subscribedChannels = ((Long) reply.get(2)).intValue(); 
               final byte[] bpattern = (byte[]) reply.get(1); 
               final String strpattern = (bpattern == null) ? null 
                       : SafeEncoder.encode(bpattern); 
               onPUnsubscribe(strpattern, subscribedChannels); 
           } else { 
               throw new JedisException("Unknown message type: " + firstObj); 
           }
jedis的publish/subscribe[转]含有redis源码解析

可以看到,通过client.getObjectMultiBulkReply()来得到返回来的消息。判断消息的类型来进行不同的操作。比如Arrays.equals(SUBSCRIBE.raw, resp)判断返回来的消息是订阅,subscribedChannels = ((Long) reply.get(2)).intValue();是取得消息,也是do...while判断循环的条件,也就是说这一次如果读到消息了,则进行下一次循环。那么onSubscribe(String channel, int subscribedChannels)究竟做了什么事,看开头

jedis的publish/subscribe[转]含有redis源码解析
public abstract void onMessage(String channel, String message); 
 
   public abstract void onPMessage(String pattern, String channel, 
           String message); 
 
   public abstract void onSubscribe(String channel, int subscribedChannels); 
 
   public abstract void onUnsubscribe(String channel, int subscribedChannels); 
 
   public abstract void onPUnsubscribe(String pattern, int subscribedChannels); 
 
   public abstract void onPSubscribe(String pattern, int subscribedChannels);
jedis的publish/subscribe[转]含有redis源码解析

可以看到这是xetorthio留给我们的方法。onSubscrible是订阅时应该做些什么,onMessage就是有消息传来是做些什么,以此类推。

接下来可以写一个方法来发布和订阅消息了。

jedis的publish/subscribe[转]含有redis源码解析
package redis.client.jredis.tests; 
 
import java.util.Timer; 
import java.util.TimerTask; 
 
import org.junit.Test; 
 
import redis.clients.jedis.Jedis; 
import redis.clients.jedis.JedisPool; 
import redis.clients.jedis.JedisPoolConfig; 
import redis.clients.jedis.JedisPubSub; 
 
public class JedisTest extends JedisTestBase { 
    JedisPool pool = null; 
    /** 
     * 测试发布验证 
     */ 
    @Test 
    public void testPS(){ 
        /** 
        Jedis jedis = new Jedis("192.168.5.146",6379); 
        jedis.set("name", "xiaoruoen"); 
        jedis.publish("news.blog.title", "Hello,World"); 
        //*/ 
        final String host = "192.168.5.146"; 
        JedisPoolConfig config = new JedisPoolConfig(); 
        pool = new JedisPool(new JedisPoolConfig(),host); 
        subscribe(new NewsListener(), "news.sports"); 
        Timer timer = new Timer(); 
        timer.schedule(new TimerTask() { 
             
            @Override 
            public void run() { 
                // TODO Auto-generated method stub 
                publish("news.sports", "{\"_id\":335566,\"author\":\"xiaoruoen\",\"title\":\"kaka is back\"}"); 
            } 
        }, 1000, 3000); 
         
    } 
     
    public Jedis getResource(int dbnum){ 
        Jedis jedis = pool.getResource(); 
        jedis.select(dbnum); 
        return jedis; 
    } 
     
    /** 
     *  
     * @param channel 
     * @param message ex:"{\"_id\":335566,\"author\":\"xiaoruoen\",\"title\":\"kaka is back\"}" 
     */ 
    public void publish(String channel,String message){ 
        Jedis jedis = getResource(12); 
        jedis.publish(channel, message); 
        pool.returnResource(jedis); 
    } 
     
    public void subscribe(JedisPubSub listener,String channel){ 
        Jedis jedis = getResource(12); 
        jedis.subscribe(listener, channel); 
        pool.returnResource(jedis); 
    } 
 
}
jedis的publish/subscribe[转]含有redis源码解析
jedis的publish/subscribe[转]含有redis源码解析
package redis.client.jredis.tests; 
 
import redis.clients.jedis.JedisPubSub; 
 
public class NewsListener extends JedisPubSub { 
 
    @Override 
    public void onMessage(String channel, String message) { 
        System.out.println("get message from"+channel+"   "+message); 
    } 
 
    @Override 
    public void onPMessage(String pattern, String channel, String message) { 
        System.out.println("get message from"+channel+"   "+message); 
    } 
 
    @Override 
    public void onSubscribe(String channel, int subscribedChannels) { 
        System.out.println("subscribe the channel:"+channel); 
    } 
 
    @Override 
    public void onUnsubscribe(String channel, int subscribedChannels) { 
        System.out.println("get message from"+channel); 
    } 
 
    @Override 
    public void onPUnsubscribe(String pattern, int subscribedChannels) { 
        System.out.println("get message from"+subscribedChannels); 
    } 
 
    @Override 
    public void onPSubscribe(String pattern, int subscribedChannels) { 
        System.out.println("get message from"+subscribedChannels); 
    } 
 
}
jedis的publish/subscribe[转]含有redis源码解析

发现只打印了一条数据subscribe the channel:news.sports

没按我们所期望的那样那所有发布的消息都打印出来。

到官网查看

https://github.com/xetorthio/jedis/wiki/AdvancedUsage

看到Note that subscribe is a blocking operation operation because it will poll Redis for responses on the thread that calls subscribe.可以看到subcribe是一个线程中的块操作。我猜测是在发布与接收的过程中,如果在同一线程里面进行操作,一边阻塞着流,另一边无法进行操作。于是将publish改写为另一线程启动。修改如下:

jedis的publish/subscribe[转]含有redis源码解析
public static void main(String[] args){ 
        final String host = "192.168.5.146"; 
        JedisPoolConfig config = new JedisPoolConfig(); 
        pool = new JedisPool(new JedisPoolConfig(),host); 
        Thread thread = new Thread(new Test().new PublishThread()); 
        thread.start(); 
        subscribe(new NewsListener(), "news.sports"); 
       
}
jedis的publish/subscribe[转]含有redis源码解析
jedis的publish/subscribe[转]含有redis源码解析
class PublishThread implements Runnable{ 
        @Override 
        public void run() { 
            Timer timer = new Timer(); 
            timer.schedule(new TimerTask() { 
                 
                @Override 
                public void run() { 
                    // TODO Auto-generated method stub 
                    publish("news.sports", "{\"_id\":335566,\"author\":\"xiaoruoen\",\"title\":\"kaka is back\"}"); 
                } 
            }, 1000, 3000); 
        }  
         
    }
jedis的publish/subscribe[转]含有redis源码解析

最终发布订阅成功。

jedis的publish/subscribe[转]含有redis源码解析
Pool:redis.clients.jedis.JedisPool@18e2b22 
subscribe the channel:news.sports 
Pool:redis.clients.jedis.JedisPool@18e2b22 
get message fromnews.sports   {"_id":335566,"author":"xiaoruoen","title":"kaka is back"} 
Pool:redis.clients.jedis.JedisPool@18e2b22 
get message fromnews.sports   {"_id":335566,"author":"xiaoruoen","title":"kaka is back"} 
Pool:redis.clients.jedis.JedisPool@18e2b22 
get message fromnews.sports   {"_id":335566,"author":"xiaoruoen","title":"kaka is back"} 
Pool:redis.clients.jedis.JedisPool@18e2b22 
get message fromnews.sports   {"_id":335566,"author":"xiaoruoen","title":"kaka is back"} 
Pool:redis.clients.jedis.JedisPool@18e2b22
jedis的publish/subscribe[转]含有redis源码解析

本文出自 “若是人间” 博客,请务必保留此出处http://xiaoruoen.blog.51cto.com/4828946/835710


©著作权归作者所有:来自ZhiKuGroup博客作者没文化的原创作品,如需转载,请注明出处,否则将追究法律责任 来源:ZhiKuGroup博客,欢迎分享。

评论专区
  • 昵 称必填
  • 邮 箱选填
  • 网 址选填
◎已有 0 人评论
搜索
作者介绍
30天热门
×
×
本站会员尊享VIP特权,现在就加入我们吧!登录注册×
»
会员登录
新用户注册
×
会员注册
已有账号登录
×