Last active
September 19, 2015 09:19
-
-
Save fushihara/814f14e8d3bb06a68173 to your computer and use it in GitHub Desktop.
JavaからTwitterのデータをgoogle BigQueryにぶち込む方法 のソースコード ブログ記事→ https://fushihara.org/blog/archives/2980
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.io.File; | |
import java.io.IOException; | |
import java.io.PrintStream; | |
import java.security.GeneralSecurityException; | |
import java.util.ArrayList; | |
import java.util.Date; | |
import java.util.List; | |
import java.util.ResourceBundle; | |
import com.google.api.client.googleapis.auth.oauth2.GoogleCredential; | |
import com.google.api.client.googleapis.javanet.GoogleNetHttpTransport; | |
import com.google.api.client.http.HttpTransport; | |
import com.google.api.client.json.JsonFactory; | |
import com.google.api.client.json.jackson2.JacksonFactory; | |
import com.google.api.client.util.Data; | |
import com.google.api.client.util.DateTime; | |
import com.google.api.services.bigquery.Bigquery; | |
import com.google.api.services.bigquery.BigqueryScopes; | |
import com.google.api.services.bigquery.model.DatasetList; | |
import com.google.api.services.bigquery.model.DatasetList.Datasets; | |
import com.google.api.services.bigquery.model.ErrorProto; | |
import com.google.api.services.bigquery.model.GetQueryResultsResponse; | |
import com.google.api.services.bigquery.model.ProjectList; | |
import com.google.api.services.bigquery.model.ProjectList.Projects; | |
import com.google.api.services.bigquery.model.QueryRequest; | |
import com.google.api.services.bigquery.model.QueryResponse; | |
import com.google.api.services.bigquery.model.Table; | |
import com.google.api.services.bigquery.model.TableCell; | |
import com.google.api.services.bigquery.model.TableDataInsertAllRequest; | |
import com.google.api.services.bigquery.model.TableDataInsertAllResponse; | |
import com.google.api.services.bigquery.model.TableDataInsertAllResponse.InsertErrors; | |
import com.google.api.services.bigquery.model.TableFieldSchema; | |
import com.google.api.services.bigquery.model.TableList; | |
import com.google.api.services.bigquery.model.TableList.Tables; | |
import com.google.api.services.bigquery.model.TableReference; | |
import com.google.api.services.bigquery.model.TableRow; | |
import com.google.api.services.bigquery.model.TableSchema; | |
import com.google.common.collect.ImmutableList; | |
// 2015-09-19 18:06 | |
public class BigQuery { | |
private final HttpTransport httpTransport; | |
private final JsonFactory jsonFactory = new JacksonFactory(); | |
private GoogleCredential credentials; | |
private Bigquery client; | |
private String mailAddress; | |
private String applicationName; | |
private String p12EncryptFilePath; | |
private String projectId; | |
private String dataset; | |
private String table; | |
private String tableRawJson; | |
private static BigQuery instance; | |
public static BigQuery getInstance() { | |
return BigQuery.instance; | |
} | |
public BigQuery() throws GeneralSecurityException, IOException { | |
this.httpTransport = GoogleNetHttpTransport.newTrustedTransport(); | |
BigQuery.instance = this; | |
} | |
public void initialize() throws GeneralSecurityException, IOException { | |
ResourceBundle bundle = ResourceBundle.getBundle("setting"); | |
this.mailAddress = bundle.getString("bq.mail"); | |
this.applicationName = bundle.getString("bq.app"); | |
this.p12EncryptFilePath = bundle.getString("bq.p12Path"); | |
this.projectId = bundle.getString("bq.projectid"); | |
this.dataset = bundle.getString("bq.dataset"); | |
this.table = bundle.getString("bq.table"); | |
this.tableRawJson = bundle.getString("bq.tableRawJson"); | |
this.credentials = new GoogleCredential.Builder().setTransport(httpTransport).setJsonFactory(jsonFactory) | |
.setServiceAccountId(this.mailAddress) | |
.setServiceAccountScopes(ImmutableList.of(BigqueryScopes.BIGQUERY, BigqueryScopes.BIGQUERY_INSERTDATA)) | |
.setServiceAccountPrivateKeyFromP12File(new File(this.p12EncryptFilePath)).build(); | |
this.client = new Bigquery.Builder(httpTransport, jsonFactory, credentials).setApplicationName(applicationName) | |
.build(); | |
long maxResults = 1; | |
ProjectList projectList = client.projects().list().setMaxResults(maxResults).execute(); | |
System.out.println(projectList.toPrettyString()); | |
String query = "SELECT * FROM [twitter.tweet] where [twitter.tweet.message] like '%TGS%'"; | |
// runQueryRpcAndPrint(this.client, "this.projectId", query,System.out); | |
// listProjects(this.client); | |
// listDataset(this.client); | |
// listTables(this.client, "twitter"); | |
// insertTable(this.client, this.projectId, this.dataset, this.table, | |
// getTwitterLogSchema()); | |
// insertTable(this.client, this.projectId, this.dataset, | |
// this.tableRawJson, getTwitterLogRawJsonSchema()); | |
} | |
public void sendRawJson(String json) throws IOException { | |
TableRow row = new TableRow(); | |
row.set("date", new DateTime(new Date()));// dateTimeにしないとダメ。"JSON map | |
// specified for non-record | |
// field"と言われる | |
row.set("rawJson", json); | |
TableDataInsertAllRequest.Rows rows = new TableDataInsertAllRequest.Rows(); | |
rows.setInsertId(Long.toString(System.nanoTime())); | |
rows.setJson(row); | |
List<TableDataInsertAllRequest.Rows> rowList = new ArrayList<TableDataInsertAllRequest.Rows>(); | |
rowList.add(rows); | |
TableDataInsertAllRequest content = new TableDataInsertAllRequest().setRows(rowList); | |
TableDataInsertAllResponse result = this.client.tabledata() | |
.insertAll(this.projectId, this.dataset, this.tableRawJson, content).execute(); | |
tableDataInsertResult(result); | |
} | |
public void sendTweetData(twitter4j.Status status) throws IOException { | |
TableRow row = new TableRow(); | |
row.set("tweet_id", status.getId()); | |
row.set("tweet_message", status.getText()); | |
row.set("client", status.getSource()); | |
row.set("created_at", new DateTime(status.getCreatedAt())); | |
row.set("user_id", status.getUser().getId()); | |
row.set("user_name", status.getUser().getName()); | |
row.set("user_screen_name", status.getUser().getScreenName()); | |
TableDataInsertAllRequest.Rows rows = new TableDataInsertAllRequest.Rows(); | |
rows.setInsertId(Long.toString(System.nanoTime())); | |
rows.setJson(row); | |
List<TableDataInsertAllRequest.Rows> rowList = new ArrayList<TableDataInsertAllRequest.Rows>(); | |
rowList.add(rows); | |
TableDataInsertAllRequest content = new TableDataInsertAllRequest().setRows(rowList); | |
TableDataInsertAllResponse result = this.client.tabledata() | |
.insertAll(this.projectId, this.dataset, this.table, content).execute(); | |
tableDataInsertResult(result); | |
} | |
private static void tableDataInsertResult(TableDataInsertAllResponse result) { | |
if(result==null){ | |
return; | |
} | |
System.out.println(result.getKind()); | |
if(result.getInsertErrors()==null){ | |
return; | |
} | |
for (InsertErrors error : result.getInsertErrors()) { | |
for (ErrorProto rows2 : error.getErrors()) { | |
System.err.println(rows2.getDebugInfo()); | |
} | |
} | |
} | |
private void listProjects(Bigquery bigquery) throws IOException { | |
ProjectList result = bigquery.projects().list().execute(); | |
for (Projects project : result.getProjects()) { | |
System.err.println(project.toPrettyString()); | |
} | |
} | |
private void listDataset(Bigquery bigquery) throws IOException { | |
DatasetList result = bigquery.datasets().list(this.projectId).execute(); | |
for (Datasets dataset : result.getDatasets()) { | |
System.err.println(dataset.toPrettyString()); | |
} | |
} | |
private void listTables(Bigquery bigQuery, String datasetName) throws IOException { | |
TableList result = bigQuery.tables().list(this.projectId, datasetName).execute(); | |
for (Tables table : result.getTables()) { | |
System.err.println(table.toPrettyString()); | |
} | |
} | |
private static void insertTable(Bigquery bigQuery, String projectName, String datasetName, String tableId, | |
TableSchema schema) throws IOException { | |
Table table = new Table(); | |
table.setSchema(schema); | |
TableReference tableRef = new TableReference(); | |
tableRef.setDatasetId(datasetName); | |
tableRef.setProjectId(projectName); | |
tableRef.setTableId(tableId); | |
table.setTableReference(tableRef); | |
bigQuery.tables().insert(projectName, datasetName, table).execute(); | |
} | |
private static TableSchema getTwitterLogSchema() { | |
List<TableFieldSchema> root = new ArrayList<TableFieldSchema>(); | |
root.add(new TableFieldSchema().setName("tweet_id").setType("INTEGER").setMode("REQUIRED").setDescription("")); | |
root.add(new TableFieldSchema().setName("tweet_message").setType("STRING").setMode("REQUIRED") | |
.setDescription("")); | |
root.add(new TableFieldSchema().setName("client").setType("STRING").setMode("REQUIRED").setDescription("")); | |
root.add(new TableFieldSchema().setName("created_at").setType("TIMESTAMP").setMode("REQUIRED") | |
.setDescription("")); | |
root.add(new TableFieldSchema().setName("user_id").setType("INTEGER").setMode("REQUIRED").setDescription("")); | |
root.add(new TableFieldSchema().setName("user_name").setType("STRING").setMode("REQUIRED").setDescription("")); | |
root.add(new TableFieldSchema().setName("user_screen_name").setType("STRING").setMode("REQUIRED") | |
.setDescription("")); | |
TableSchema schema = new TableSchema(); | |
schema.setFields(root); | |
return schema; | |
} | |
private static TableSchema getTwitterLogRawJsonSchema() { | |
List<TableFieldSchema> root = new ArrayList<TableFieldSchema>(); | |
root.add(new TableFieldSchema().setName("date").setType("TIMESTAMP").setMode("REQUIRED").setDescription("")); | |
root.add(new TableFieldSchema().setName("rawJson").setType("STRING").setMode("REQUIRED").setDescription("")); | |
TableSchema schema = new TableSchema(); | |
schema.setFields(root); | |
return schema; | |
} | |
static void runQueryRpcAndPrint(Bigquery bigquery, String projectId, String query, PrintStream out) | |
throws IOException { | |
QueryRequest queryRequest = new QueryRequest().setQuery(query); | |
QueryResponse queryResponse = bigquery.jobs().query(projectId, queryRequest).execute(); | |
if (queryResponse.getJobComplete()) { | |
printRows(queryResponse.getRows(), out); | |
if (null == queryResponse.getPageToken()) { | |
return; | |
} | |
} | |
String pageToken = null; | |
while (true) { | |
GetQueryResultsResponse queryResults = bigquery.jobs() | |
.getQueryResults(projectId, queryResponse.getJobReference().getJobId()).setPageToken(pageToken) | |
.execute(); | |
if (queryResults.getJobComplete()) { | |
printRows(queryResults.getRows(), out); | |
pageToken = queryResults.getPageToken(); | |
if (null == pageToken) { | |
return; | |
} | |
} | |
} | |
} | |
private static void printRows(List<TableRow> rows, PrintStream out) { | |
if (rows != null) { | |
for (TableRow row : rows) { | |
for (TableCell cell : row.getF()) { | |
out.printf("%s, ", Data.isNull(cell.getV()) ? "null" : cell.getV().toString()); | |
} | |
out.println(); | |
} | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.io.IOException; | |
import java.security.GeneralSecurityException; | |
import java.util.ResourceBundle; | |
// 2015-09-19 18:06 | |
public class SendBigQuery { | |
private TwitterStreaming twitter; | |
private BigQuery bigQuery; | |
public static void main(String[] args) { | |
SendBigQuery main=new SendBigQuery(); | |
try { | |
main.init(); | |
} catch (GeneralSecurityException | IOException e) { | |
e.printStackTrace(); | |
return; | |
} | |
main.run(); | |
} | |
public void init() throws GeneralSecurityException, IOException{ | |
this.twitter=new TwitterStreaming(); | |
this.twitter.init(); | |
this.bigQuery=new BigQuery(); | |
this.bigQuery.initialize(); | |
} | |
public void run(){ | |
this.twitter.run(); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#Fushihara | |
twitter.consumerKey=xxxxxx | |
twitter.consumerSecret=xxxxxx | |
twitter.accessToken=xxxxxx | |
twitter.accessTokenSecret=xxxxxx | |
#big query | |
#↓Googleアカウントのメアドじゃないから注意! | |
bq.mail=xxxxxx@developer.gserviceaccount.com | |
#↓適当でおk。webフォームのどこかの値に合わせる必要なし | |
bq.app=test app 20150919 | |
bq.p12Path=C:/xxxxxx/web-storage-xxxxxx-key.p12 | |
bq.projectid=xxxxxx | |
bq.dataset=twitter | |
bq.table=home_timeline | |
bq.tableRawJson=home_timeline_raw |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.io.IOException; | |
import java.util.ResourceBundle; | |
import twitter4j.RawStreamListener; | |
import twitter4j.StallWarning; | |
import twitter4j.Status; | |
import twitter4j.StatusDeletionNotice; | |
import twitter4j.StatusListener; | |
import twitter4j.TwitterStream; | |
import twitter4j.TwitterStreamFactory; | |
import twitter4j.conf.Configuration; | |
import twitter4j.conf.ConfigurationBuilder; | |
// 2015-09-19 18:06 | |
public class TwitterStreaming { | |
private Configuration config; | |
public void init() { | |
ResourceBundle bundle = ResourceBundle.getBundle("setting"); | |
ConfigurationBuilder cb = new ConfigurationBuilder(); | |
cb.setDebugEnabled(true); | |
cb.setOAuthConsumerKey(bundle.getString("twitter.consumerKey")); | |
cb.setOAuthConsumerSecret(bundle.getString("twitter.consumerSecret")); | |
cb.setOAuthAccessToken(bundle.getString("twitter.accessToken")); | |
cb.setOAuthAccessTokenSecret(bundle.getString("twitter.accessTokenSecret")); | |
cb.setDebugEnabled(true); | |
this.config = cb.build(); | |
} | |
public void run() { | |
StatusListener listener = new StatusListener() { | |
public void onStatus(Status status) { | |
System.out.println("onStatus:" + status.getText().replaceAll("\n", "")); | |
try { | |
BigQuery.getInstance().sendTweetData(status); | |
} catch (IOException e) { | |
e.printStackTrace(); | |
} | |
} | |
public void onDeletionNotice(StatusDeletionNotice statusDeletionNotice) { | |
System.out.println("onDelete:" + statusDeletionNotice.getStatusId()); | |
} | |
public void onTrackLimitationNotice(int numberOfLimitedStatuses) { | |
System.out.println("onTrackLimit:" + numberOfLimitedStatuses); | |
} | |
public void onException(Exception ex) { | |
System.err.println("onException"); | |
ex.printStackTrace(); | |
} | |
@Override | |
public void onScrubGeo(long arg0, long arg1) { | |
System.out.println("onScrubGeo:" + arg0 + " / " + arg1); | |
} | |
@Override | |
public void onStallWarning(StallWarning arg0) { | |
System.out.println("onStallWarning:" + arg0.getMessage()); | |
} | |
}; | |
RawStreamListener rsl = new RawStreamListener() { | |
@Override | |
public void onException(Exception arg0) { | |
System.err.println("onException:raw:"); | |
arg0.printStackTrace(); | |
} | |
@Override | |
public void onMessage(String arg0) { | |
if (arg0 == null) { | |
System.out.println("onMessage:raw:null"); | |
} else if (arg0.length() ==0) { | |
System.out.println("onMessage:raw:empty" ); | |
} else if (arg0.length() < 50) { | |
System.out.println("onMessage:raw:" + arg0); | |
} else { | |
System.out.println("onMessage:raw:" + arg0.substring(0, 49)); | |
} | |
if(arg0==null || arg0.length()==0){ | |
return; | |
} | |
try { | |
BigQuery.getInstance().sendRawJson(arg0); | |
} catch (IOException e) { | |
e.printStackTrace(); | |
} | |
} | |
}; | |
TwitterStream twitterStream = new TwitterStreamFactory(config).getInstance(); | |
twitterStream.addListener(listener); | |
twitterStream.addListener(rsl); | |
twitterStream.user(); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment