Skip to content

Instantly share code, notes, and snippets.

@anson0370
Created May 23, 2012 06:04
Show Gist options
  • Save anson0370/2773496 to your computer and use it in GitHub Desktop.
Save anson0370/2773496 to your computer and use it in GitHub Desktop.
通过SDK来使用数据订阅推送服务 For Java

通过SDK来使用长连接主动推送服务 For Java

概述

现在三方集成的长连接主动推送订阅使用经过简单改造的TOP长连接推送SDK,在使用上有一些和TOP长连接推送区别的地方需要特殊注意,在下面的示例代码中都有体现

  • 使用特殊的Configuration子类来初始化长连接: JushitaConfiguration,该配置类有两点需要注意
    1. 消费线程池被设置为最大1个线程,强制的完全顺序的消费数据
    2. 带有一个abstract的方法public abstract List<String> getTopicInfo();需要实现,通过该方法返回需要订阅的topic信息。该方法在每次连接发起前都会被调用以重新计算topic信息,请在这个方法里携带最新的带offset的topic信息
  • 为保证不丢任何消息,需要做两件事情
    1. onReceiveMsg的回调中获取到消息携带的offset信息并持久化下来
    2. JushitaConfiguration.getTopicInfo的回调中取出之前持久化的offset信息返回,确保SDK重连会携带最后收到的offset信息

关于避免丢消息以及消息携带的offset属性

因为网络状况、服务端或客户端断电以及各种异步因素的影响,可能出现服务端认为消息发送成功但是客户端并没有确实处理消息的情况,这就是丢消息。

因为HTTP长连接推送协议天生无法做出多应答,为了避免丢消息,三方集成的服务端会给每一个消息携带一个offset属性,这是这条消息在服务端磁盘上的一个顺序指针。 通过每次把对应topic下收到消息携带的offset保存下来,再下一次连接的时候带上有offset的topic信息,可以保证指针永远是连续的,从而就可以保证消息不会丢。 在SDK的表现上,就是由JushitaConfiguration实现一个获取topic信息的回调,ISV在那里实现将最后一条收到的消息所携带的offset作为topic返回的代码。 而在onReceiveMsg的回调中就需要将offset按照topic为key持久化袭来,只要保留最后收到的消息的offset就可以了。

现在也允许不传递携带offset的topic信息,但是这样就无法保证不丢消息。

关于消息接收和处理的建议

考虑到吞吐量和性能,建议收到消息后只简单将消息持久化下来,并更新消息携带的offset信息。这两步成功就是成功的收取了消息,然后再由其余的线程去处理已经持久化下来的消息。 这样不会因为处理消息错误而导致收消息中断,也不会因为处理时间过久而导致消息积压。

示例代码

    String appkey = "483936"; // app_key
    String secret = "d41ee217895de8b64ec44653edce9263"; // app_key对应的secret
    String connectId = "ckckckck1"; // 长连接ID 同样的长连接ID新的会踢掉旧的
    String connectUrl = "http://eai.taobao.com/message/sub"; // 长连接推送订阅的url
    JushitaConfiguration conf = new JushitaConfiguration(appkey, secret, connectId) {

        /**
         * 特别注意的方法 此方法用来在每次连接之前获取要订阅的topic列表
         * 会在每次SDK连接(包括自动重连,比如到达生命周期超时时间,网络暂时中断等产生的重连)之前调用
         * 在这个回调方法中通过示例代码这样的方法把保存的带偏移量的topic信息置入
         * 如果没有保存过的信息,也就是初次连接,就直接填topic就好了
         */
        @Override
        public List<String> getTopicInfo() {
            // 可能的实现: 注意,这需要ISV自己实现
            // // 先初始化要订阅的topic列表
            // List<String> topicToSubList = new ArrayList<String>();
            // topicToSubList.add("test-topic-1");
            // topicToSubList.add("test-topic-2");
            // topicToSubList.add("test-topic-3");
            // // 这是结果
            // List<String> topics = new ArrayList<String>();
            // for (String topicToSub : topicToSubList) {
            //     // 去持久化的地方查询,没查到就是没收到过,则使用不带offset的信息订阅
                   // 否则使用带offset的信息订阅
            //     TopicInfo topicInfo = topicInfoDAO.selectTopicInfo(topicToSub);
            //     if (topicInfo == null) {
            //         topics.add(topicToSub);
            //     } else {
            //         topics.add(topicInfo.getTopicWithOffset());
            //     }
            // }
            // return topics;
            List<String> topics = new ArrayList<String>();
            topics.add("guichen-topic-4");
            return topics;
        }
    };
    conf.setConnectUrl(connectUrl); // 在连沙箱环境的时候才需要这一步
    TopCometStream stream = new TopCometStreamFactory(conf).getInstance();
    stream.setMessageListener(new TopCometMessageListener() {
            .... // 别的接口方法视需要实现

            /**
             * 特别注意的方法 会在每次收到业务消息的时候调用
             * 在这个回调方法中示例代码获得了消息中携带的带有偏移量的topic信息
             * 整个消息体携带的信息有:
             *     message: 业务消息本身
             *     offset: 带有偏移量的topic信息,指示此条消息的下一条所在的位置
             *     pubAppKey: 发布此条消息的ISV的appKey
             *     topic: 消息所属topic
             *     userId: 消息数据所有者的userId
             *     version: 消息格式版本
             *
             * 其中最需要关注的是offset和message
             */
            @Override
            public void onReceiveMsg(String message) {
                Map<?, ?> jsonMap = TaobaoUtils.parseJson(message);
                String topic = (String) jsonMap.get("topic");
                String topicWithOffset = (String) jsonMap.get("offset"); // 获得带有偏移量的topic信息
                // 使用者如果要保证不丢任何消息,就需要通过某种持久化方式将topicWithOffset这个值保存起来
                // 一个可能的实现是使用一张带有两个字段的DB表,字段分别是topic和topicWithOffset,每次获得topicWithOffset信息后update对应的topic的记录
                // 可能的实现: 注意,这需要ISV自己实现
                // topicInfoDAO.updateTopicWithOffset(topic, topicWithOffset);
                System.out.println("receive count:" + receiveCount);
            }

            .... // 别的接口方法视需要实现
        });
    stream.start(); // 调用start后会在stream内部启动一个独立的线程,如果是单元测试的方式来跑,可能需要挂起主线程,以免主线程结束后stream的线程自动结束

通过SDK来推送业务数据给订阅服务 For Java

概述

通过使用API互通的SDK就可以推送数据到三方集成,有一些不同点需要注意,下面的示例代码中都有体现

  • client.post()方法中的第一个参数apiName对于推送数据到三方集成是没有意义的,可以随便填写
  • 必须在client.post()方法中的第三个参数params中携带的参数包括:
    • user_id 业务消息的userId
    • topic 业务消息所属topic
    • message 业务消息本身

示例代码

    // 实例化一个JushitaTaobaoClient准备进行API调用,需要注意的是url要传入推送数据的url,其余两个参数是appKey和secret
    // 此client无状态,因此可以复用
    JushitaTaobaoClient client = new JushitaTaobaoClient("http://eai.taobao.com/message/pub", "483936", "d41ee217895de8b64ec44653edce9263");
    // 构造业务参数,对于推送数据给三方集成来说有且仅有下面三个参数需要设置
    Map<String, String> params = new HashMap<String, String>();
    params.put("user_id", "2010770429");
    params.put("topic", "guichen-topic-4");
    params.put("message", message);
    try {
        // 进行POST调用,如果没有报错 就是放数据成功了
        // 参数分别为appName(在这里不起作用,可以随意写), 调用参数, session
        client.execute("put_data", params, "2af1f79f85a4328110f5dd0cfa3ece064900df10f1402b666270bd3d7b3655d4");
    } catch (ApiException e) {
        // 如果catch到ApiException,就是放数据出错了,建议把错误信息打印出来,然后做相应处理
        e.printStackTrace();
        Assert.assertTrue(false);
    }
@anson0370
Copy link
Author

带group的订阅实例

        String appkey = "487526";
        String secret = "c7001ff65ea2feb229b52eec7c585f2d";
        String connectId = "ckckck123";
        String connectUrl = "http://10.232.19.202:8080/message/sub";
        JushitaConfiguration conf = new JushitaConfiguration(appkey, secret, connectId) {
            @Override
            public List<String> getTopicInfo() {
                List<String> topics = new ArrayList<String>();
                topics.add("cf-topic-1");
                return topics;
            }
        };
        conf.setConnectUrl(connectUrl);
        // 区别在这里 暂时需要通过这种方式设置group参数
        for (TopCometStreamRequest cometReq : conf.getConnectReqParam()) {
            if (cometReq.getOtherParam() == null) {
                cometReq.setOtherParam(new HashMap<String, String>());
            }
            cometReq.getOtherParam().put("group", "testGroup");
        }
        TopCometStream stream = new TopCometStreamFactory(conf).getInstance();
        // 消息接收实现类 具体请看上面文档中的解释
        stream.setMessageListener(new DefaultTopCometMessageListener());
        stream.start();

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment