Last active
March 7, 2019 05:35
-
-
Save designershao/d0fa152a27c28438a59025f62c47f322 to your computer and use it in GitHub Desktop.
OpenTSDB Limit Query
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import com.google.common.base.Strings; | |
import com.stumbleupon.async.Callback; | |
import com.stumbleupon.async.Deferred; | |
import net.opentsdb.core.*; | |
import net.opentsdb.utils.Config; | |
import java.io.IOException; | |
import java.util.ArrayList; | |
import java.util.Iterator; | |
public class TSDQueryMain { | |
private static Config config; | |
private static TSDB tsdb; | |
/** | |
* 查询 | |
* @param metric 测点名称 | |
* @param start 开始时间 | |
* @param cursor 游标:上一次请求的最后一条数据的时间戳 | |
* @param callback 回调 | |
*/ | |
public static void Query(String metric, String start, final String cursor, final Callback<Object, Iterator<DataPoint>> callback) { | |
final TSQuery query = new TSQuery(); | |
query.setStart(start); | |
final TSSubQuery subQuery = new TSSubQuery(); | |
subQuery.setAggregator("sum"); // DEMO 暂时写死 | |
subQuery.setMetric(metric); // DEMO 暂时写死 | |
subQuery.setDownsample("4m-avg"); // DEMO 暂时写死 | |
final ArrayList<TSSubQuery> subQueries = new ArrayList<TSSubQuery>(1); | |
subQueries.add(subQuery); | |
query.setQueries(subQueries); | |
query.validateAndSetQuery(); | |
Query[] tsdbqueries = query.buildQueries(tsdb); | |
final ArrayList<DataPoints[]> results = new ArrayList<DataPoints[]>(tsdbqueries.length); | |
final ArrayList<Deferred<DataPoints[]>> deferreds = new ArrayList<Deferred<DataPoints[]>>(tsdbqueries.length); | |
for (int i = 0;i<tsdbqueries.length;i++) { | |
deferreds.add(tsdbqueries[i].runAsync()); | |
} | |
class QueriesCB implements Callback<Object, ArrayList<DataPoints[]>> { | |
public Object call(ArrayList<DataPoints[]> dataPoints) throws Exception { | |
results.addAll(dataPoints); | |
for (final DataPoints[] dataSets : results) { | |
for (final DataPoints data : dataSets) { | |
final SeekableView it = data.iterator(); | |
if (!Strings.isNullOrEmpty(cursor)) { | |
it.seek(Long.parseLong(cursor)); // 根据游标进行移动 | |
} | |
callback.call(it); | |
} | |
} | |
return null; | |
} | |
} | |
try { | |
Deferred.groupInOrder(deferreds) | |
.addCallback(new QueriesCB()) | |
.join(); | |
} catch (Exception e) { | |
e.printStackTrace(); | |
} | |
} | |
/** | |
* 查询 | |
* @param metric 测点名称 | |
* @param start 开始时间 | |
* @param cursor 游标:上一次请求的最后一条数据的时间戳 | |
* @param limit 限制返回条数 | |
* @param callback 回调 | |
*/ | |
public static void Query(String metric, String start, String cursor, final int limit, final Callback<Object, ArrayList<DataPoint>> callback) { | |
final ArrayList<DataPoint> results = new ArrayList<DataPoint>(); | |
class Cb implements Callback<Object, Iterator<DataPoint>> { | |
public Object call(Iterator<DataPoint> dataPointIterator) throws Exception { | |
int i = 1; | |
while (dataPointIterator.hasNext() && i <= limit) { | |
final DataPoint dp = dataPointIterator.next(); | |
results.add(dp); | |
i++; | |
} | |
callback.call(results); | |
return null; | |
} | |
} | |
Query(metric, start,cursor,new Cb()); | |
} | |
public static void main(String[] args) throws IOException { | |
config = new Config("your opentsdb.conf path"); | |
tsdb = new TSDB(config); | |
class Cb implements Callback<Object, ArrayList<DataPoint>> { | |
public Object call(ArrayList<DataPoint> dataPoints) { | |
for(int i = 0;i< dataPoints.size(); i ++) { | |
DataPoint dp = dataPoints.get(i); | |
System.out.println(" " + dp.timestamp() + " " | |
+ (dp.isInteger() ? dp.longValue() : dp.doubleValue())); | |
} | |
return null; | |
} | |
} | |
Query("proc.loadavg.total_threads", "1day-ago","1551823920000", 10,new Cb()); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment