Skip to content

Instantly share code, notes, and snippets.

@zhanglistar
Created September 30, 2021 10:23
Show Gist options
  • Save zhanglistar/31ac8e2e1539f13386d7fa20daf6f96b to your computer and use it in GitHub Desktop.
Save zhanglistar/31ac8e2e1539f13386d7fa20daf6f96b to your computer and use it in GitHub Desktop.
read data from alluxio using restful api
#pragma once
#include <Common/config.h>
#include <string>
#include <memory>
#include <Poco/File.h>
#include <Poco/URI.h>
#include <Poco/JSON/JSON.h>
#include <Poco/JSON/Parser.h>
#include <Poco/Net/HTTPClientSession.h>
#include <Poco/Net/HTTPRequest.h>
#include <Poco/Net/HTTPResponse.h>
#include <fmt/core.h>
#include <IO/ReadBuffer.h>
#include <IO/BufferWithOwnMemory.h>
namespace DB
{
class AlluxioClient
{
public:
using HTTPRequest = Poco::Net::HTTPRequest;
using HTTPClientSession = Poco::Net::HTTPClientSession;
using HTTPResponse = Poco::Net::HTTPResponse;
using HTTPMessage = Poco::Net::HTTPMessage;
AlluxioClient(const std::string & host_, int port_)
: host(host_)
, port(port_)
, session(host, port)
{
session.setKeepAlive(true);
}
~AlluxioClient() = default;
inline int openFile(const std::string & file)
{
std::string path = fmt::format(API_OPEN_FILE, file);
std::string body = R"({"readType": "CACHE"})";
HTTPRequest request(HTTPRequest::HTTP_POST, path, HTTPMessage::HTTP_1_1);
request.setContentType("application/json");
request.setContentLength(body.size());
session.sendRequest(request) << body;
HTTPResponse response;
std::istream & s = session.receiveResponse(response);
std::ostringstream oss;
oss << s.rdbuf();
std::string res = oss.str();
// std::cout << "res:" << res << std::endl;
return std::stoi(res);
}
inline void close(int id)
{
// std::cout << "id:" << id << std::endl;
std::string path = fmt::format(API_CLOSE_STREAM, id);
HTTPRequest request(HTTPRequest::HTTP_POST, path, HTTPMessage::HTTP_1_1);
session.sendRequest(request);
}
inline std::istream * read(int id)
{
// std::cout << "id:" << id << std::endl;
std::string path = fmt::format(API_READ_STREAM, id);
HTTPRequest request(HTTPRequest::HTTP_POST, path, HTTPMessage::HTTP_1_1);
session.sendRequest(request);
HTTPResponse response;
std::istream & res = session.receiveResponse(response);
return &res;
}
private :
std::string host;
uint16_t port;
HTTPClientSession session;
inline static const std::string API_OPEN_FILE{"/api/v1/paths/{}/open-file"};
inline static const std::string API_READ_STREAM{"/api/v1/streams/{}/read"};
inline static const std::string API_CLOSE_STREAM{"/api/v1/streams/{}/close"};
};
/** Accepts Alluxio path to file and opens it.
* Closes file by himself (thus "owns" a file descriptor).
*/
class ReadBufferFromAlluxio : public BufferWithOwnMemory<ReadBuffer>
{
public:
ReadBufferFromAlluxio(const String & host_, int port_, const String & file_, size_t buf_size_ = DBMS_DEFAULT_BUFFER_SIZE);
ReadBufferFromAlluxio(ReadBufferFromAlluxio &&) = default;
~ReadBufferFromAlluxio() override;
bool nextImpl() override;
private:
std::shared_ptr<AlluxioClient> client;
String file;
int id;
std::istream * is;
};
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment