Skip to content

Instantly share code, notes, and snippets.

@marceloinacio
Last active February 9, 2017 13:06
Show Gist options
  • Save marceloinacio/ad3f70852eda4017652b74a789964092 to your computer and use it in GitHub Desktop.
Save marceloinacio/ad3f70852eda4017652b74a789964092 to your computer and use it in GitHub Desktop.
PubNub Fetch Messages
package com.we.pubnubtest.utils;
import com.pubnub.api.models.consumer.history.PNFetchMessagesResult;
/**
* Created by tukunare on 2/8/2017.
*/
public abstract class CallbackSkeleton {
public CallbackSkeleton() {
}
public abstract void handleResponse(PNFetchMessagesResult result);
public abstract void finish();
}
package com.we.pubnubtest.utils;
import com.pubnub.api.PubNub;
import com.pubnub.api.callbacks.PNCallback;
import com.pubnub.api.models.consumer.PNStatus;
import com.pubnub.api.models.consumer.history.PNFetchMessagesResult;
import com.pubnub.api.models.consumer.pubsub.PNMessageResult;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CountDownLatch;
/**
* Created by tukunare on 2/8/2017.
*/
public class PubnubRecursiveHistoryFetcher {
private PubNub pubNub;
private String channels;
public PubnubRecursiveHistoryFetcher(PubNub pubNub, String channels) {
this.pubNub = pubNub;
this.channels = channels;
}
public void getAllMessages(CallbackSkeleton callback) {
getAllMessages(null, callback);
}
public void getAllMessages(Long startTimestamp, final CallbackSkeleton callback) {
final CountDownLatch latch = new CountDownLatch(1);
final Long timeToken;
if (startTimestamp == null) {
startTimestamp = -1L;
}
timeToken = startTimestamp;
pubNub.fetchMessages()
.channels(Arrays.asList(channels)) // where to fetch history from
.start(startTimestamp) // first timestamp
.maximumPerChannel(25)
.async(new PNCallback<PNFetchMessagesResult>() {
@Override
public void onResponse(PNFetchMessagesResult result, PNStatus status) {
// handle resposne here
if (!status.isError() && result.getChannels().size()!=0) {
callback.handleResponse(result);
Long timetoken = timeToken;
for (List<PNMessageResult> itens: result.getChannels().values()) {
for (PNMessageResult item: itens) {
if (item.getTimetoken()<timetoken) {
timetoken = item.getTimetoken();
}
}
}
getAllMessages(timetoken, callback);
}
else {
callback.finish();
}
}
});
}
}
package com.we.pubnubtest.utils;
import com.pubnub.api.PNConfiguration;
import com.pubnub.api.PubNub;
import com.pubnub.api.callbacks.PNCallback;
import com.pubnub.api.enums.PNLogVerbosity;
import com.pubnub.api.models.consumer.PNPublishResult;
import com.pubnub.api.models.consumer.PNStatus;
import com.pubnub.api.models.consumer.history.PNFetchMessagesResult;
import com.pubnub.api.models.consumer.pubsub.PNMessageResult;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
/**
* Created by tukunare on 2/8/2017.
*/
public class TestFetchHistory {
static public PubNub pubNub;
static String channels = "ch1,ch2,ch3";
static List<PNMessageResult> messages = new ArrayList<PNMessageResult>();
public static void main( String [] args ) {
BufferedReader br = new BufferedReader(new InputStreamReader(System.in));
PNConfiguration config = new PNConfiguration()
.setPublishKey("your pub key")
.setSubscribeKey("your sub key")
.setLogVerbosity(PNLogVerbosity.BODY);
pubNub = new PubNub(config);
PubnubRecursiveHistoryFetcher pubnubRecursiveHistoryFetcher = new PubnubRecursiveHistoryFetcher(pubNub, channels);
pubNub.subscribe().channels(Arrays.asList(channels)).withPresence().execute();
pubNub.publish().channel("ch1").message("Test").async(new PNCallback<PNPublishResult>() {
@Override
public void onResponse(PNPublishResult pnPublishResult, PNStatus pnStatus) {
System.out.println(pnStatus.getOperation().toString());
}
});
try {
System.out.print("Press ENTER");
String s = br.readLine();
}
catch (Exception e) {
}
pubnubRecursiveHistoryFetcher.getAllMessages(14866448636182451L, new CallbackSkeleton() {
@Override
public void handleResponse(PNFetchMessagesResult result) {
for (List<PNMessageResult> itens: result.getChannels().values()) {
for (PNMessageResult item: itens) {
messages.add(item);
}
}
}
@Override
public void finish() {
System.out.println("Total: " + messages.size());
}
});
try {
System.out.print("Press ENTER");
String s = br.readLine();
}
catch (Exception e) {
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment