$ javac -version
javac 1.8.0_91
$ embulk --version
embulk 0.8.27
$ embulk new java-input plugin_example
$ cd ./embulk-input-plugin_example
$ ./gradlew package
embulk new
で生成されたtemplateのコードにはいくつかoptionが指定されているので、とりあえず最低限validationエラーにならないようにoptionを記載したconfig.ymlを書いて実行してみる
in:
type: plugin_example
option1: 1
option2: example2
columns:
- {name: id, type: long}
- {name: account, type: long}
- {name: time, type: timestamp, format: '%Y-%m-%d %H:%M:%S'}
- {name: purchase, type: timestamp, format: '%Y%m%d'}
- {name: comment, type: string}
out: {type: stdout}
$ embulk run -L ./embulk-input-plugin_example embulk-input-plugin_example/example/config.yml
org.embulk.exec.PartialExecutionException: java.lang.UnsupportedOperationException: PluginExampleInputPlugin.run method is not implemented yet
at org.embulk.exec.BulkLoader$LoaderState.buildPartialExecuteException(org/embulk/exec/BulkLoader.java:375)
at org.embulk.exec.BulkLoader.doRun(org/embulk/exec/BulkLoader.java:607)
at org.embulk.exec.BulkLoader.access$000(org/embulk/exec/BulkLoader.java:35)
at org.embulk.exec.BulkLoader$1.run(org/embulk/exec/BulkLoader.java:391)
at org.embulk.exec.BulkLoader$1.run(org/embulk/exec/BulkLoader.java:387)
at org.embulk.spi.Exec.doWith(org/embulk/spi/Exec.java:25)
at org.embulk.exec.BulkLoader.run(org/embulk/exec/BulkLoader.java:387)
at org.embulk.EmbulkEmbed.run(org/embulk/EmbulkEmbed.java:180)
at java.lang.reflect.Method.invoke(java/lang/reflect/Method.java:498)
at org.jruby.javasupport.JavaMethod.invokeDirectWithExceptionHandling(org/jruby/javasupport/JavaMethod.java:453)
at org.jruby.javasupport.JavaMethod.invokeDirect(org/jruby/javasupport/JavaMethod.java:314)
at RUBY.run(/Users/seri/.embulk/bin/embulk!/embulk/runner.rb:84)
at RUBY.run(/Users/seri/.embulk/bin/embulk!/embulk/command/embulk_run.rb:269)
at RUBY.<main>(/Users/seri/.embulk/bin/embulk!/embulk/command/embulk_main.rb:2)
at org.jruby.Ruby.runInterpreter(org/jruby/Ruby.java:850)
at org.jruby.Ruby.loadFile(org/jruby/Ruby.java:2976)
at org.jruby.RubyKernel.requireCommon(org/jruby/RubyKernel.java:963)
at org.jruby.RubyKernel.require(org/jruby/RubyKernel.java:956)
at org.jruby.RubyKernel$INVOKER$s$1$0$require19.call(org/jruby/RubyKernel$INVOKER$s$1$0$require19.gen)
at RUBY.(root)(uri:classloader:/META-INF/jruby.home/lib/ruby/stdlib/rubygems/core_ext/kernel_require.rb:1)
at Users.seri.$_dot_embulk.bin.embulk.embulk.command.embulk_bundle.invokeOther34:require(Users/seri/$_dot_embulk/bin/embulk/embulk/command/file:/Users/seri/.embulk/bin/embulk!/embulk/command/embulk_bundle.rb:46)
at Users.seri.$_dot_embulk.bin.embulk.embulk.command.embulk_bundle.<main>(file:/Users/seri/.embulk/bin/embulk!/embulk/command/embulk_bundle.rb:46)
at java.lang.invoke.MethodHandle.invokeWithArguments(java/lang/invoke/MethodHandle.java:627)
at org.jruby.Ruby.runScript(org/jruby/Ruby.java:834)
at org.jruby.Ruby.runNormally(org/jruby/Ruby.java:749)
at org.jruby.Ruby.runNormally(org/jruby/Ruby.java:767)
at org.jruby.Ruby.runFromMain(org/jruby/Ruby.java:580)
at org.jruby.Main.doRunFromMain(org/jruby/Main.java:425)
at org.jruby.Main.internalRun(org/jruby/Main.java:313)
at org.jruby.Main.run(org/jruby/Main.java:242)
at org.jruby.Main.main(org/jruby/Main.java:204)
at org.embulk.cli.Main.main(org/embulk/cli/Main.java:23)
- 雑に
PluginExampleInputPlugin.run
を実装する
- // Write your code here :)
- throw new UnsupportedOperationException("PluginExampleInputPlugin.run method is not implemented yet");
+ return Exec.newTaskReport();
- 一応こんなんでも動くことは動くようだ(何もしないけど)
embulk run -L ./embulk-input-plugin_example embulk-input-plugin_example/example/config.yml
2017-07-15 15:04:26.857 +0900: Embulk v0.8.27
2017-07-15 15:04:29.423 +0900 [INFO] (0001:transaction): Loaded plugin embulk/input/plugin_example from a load path
2017-07-15 15:04:29.475 +0900 [INFO] (0001:transaction): Using local thread executor with max_threads=8 / output tasks 4 = input tasks 1 * 4
2017-07-15 15:04:29.491 +0900 [INFO] (0001:transaction): {done: 0 / 1, running: 0}
2017-07-15 15:04:29.518 +0900 [INFO] (0001:transaction): {done: 1 / 1, running: 0}
2017-07-15 15:04:29.541 +0900 [INFO] (main): Committed.
2017-07-15 15:04:29.541 +0900 [INFO] (main): Next config diff: {"in":{},"out":{}}
- 決め打ちのcolumnに固定文字列を入れてoutputに渡してみたい
- いくつかinput pluginの実装を呼んだが PageBuilderクラスをnewしてそこに渡しているように見えた
- 試行錯誤した結果、以下のようにrun methodを実装することで動作した
@Override
public TaskReport run(TaskSource taskSource,
Schema schema, int taskIndex,
PageOutput output) {
PluginTask task = taskSource.loadTask(PluginTask.class);
PageBuilder pagebuilder =
new PageBuilder(Exec.getBufferAllocator(), schema, output);
pagebuilder.setString(schema.getColumn(0), "unko");
pagebuilder.addRecord();
pagebuilder.finish();
return Exec.newTaskReport();
}
in:
type: plugin_example
option1: 1
option2: example2
columns:
- {name: comment, type: string}
$ embulk run -L ./embulk-input-plugin_example embulk-input-plugin_example/example/config.yml
2017-07-15 16:10:43.233 +0900: Embulk v0.8.27
2017-07-15 16:10:45.502 +0900 [INFO] (0001:transaction): Loaded plugin embulk/input/plugin_example from a load path
2017-07-15 16:10:45.555 +0900 [INFO] (0001:transaction): Using local thread executor with max_threads=8 / output tasks 4 = input tasks 1 * 4
2017-07-15 16:10:45.580 +0900 [INFO] (0001:transaction): {done: 0 / 1, running: 0}
unko
2017-07-15 16:10:45.679 +0900 [INFO] (0001:transaction): {done: 1 / 1, running: 0}
2017-07-15 16:10:45.698 +0900 [INFO] (main): Committed.
2017-07-15 16:10:45.698 +0900 [INFO] (main): Next config diff: {"in":{},"out":{}}
- なお、columnで定義しているのに値を設定していないと
addRecord()
でschemaの全てのcolumnの値をbufferにwriteしようとするので、値を設定していないcolumnがあるとぬるぽになる。
- 具体的には以下のようなケース
type: plugin_example
option1: 1
option2: example2
columns:
- {name: comment, type: string}
- {name: no_value_assigned, type: string}
org.embulk.exec.PartialExecutionException: java.lang.NullPointerException
at org.embulk.exec.BulkLoader$LoaderState.buildPartialExecuteException(org/embulk/exec/BulkLoader.java:375)
at org.embulk.exec.BulkLoader.doRun(org/embulk/exec/BulkLoader.java:607)
at org.embulk.exec.BulkLoader.access$000(org/embulk/exec/BulkLoader.java:35)
at org.embulk.exec.BulkLoader$1.run(org/embulk/exec/BulkLoader.java:391)
at org.embulk.exec.BulkLoader$1.run(org/embulk/exec/BulkLoader.java:387)
at org.embulk.spi.Exec.doWith(org/embulk/spi/Exec.java:25)
at org.embulk.exec.BulkLoader.run(org/embulk/exec/BulkLoader.java:387)
at org.embulk.EmbulkEmbed.run(org/embulk/EmbulkEmbed.java:180)
at java.lang.reflect.Method.invoke(java/lang/reflect/Method.java:498)
at org.jruby.javasupport.JavaMethod.invokeDirectWithExceptionHandling(org/jruby/javasupport/JavaMethod.java:453)
at org.jruby.javasupport.JavaMethod.invokeDirect(org/jruby/javasupport/JavaMethod.java:314)
at RUBY.run(/Users/seri/.embulk/bin/embulk!/embulk/runner.rb:84)
at RUBY.run(/Users/seri/.embulk/bin/embulk!/embulk/command/embulk_run.rb:269)
at RUBY.<main>(/Users/seri/.embulk/bin/embulk!/embulk/command/embulk_main.rb:2)
at org.jruby.Ruby.runInterpreter(org/jruby/Ruby.java:850)
at org.jruby.Ruby.loadFile(org/jruby/Ruby.java:2976)
at org.jruby.RubyKernel.requireCommon(org/jruby/RubyKernel.java:963)
at org.jruby.RubyKernel.require(org/jruby/RubyKernel.java:956)
at org.jruby.RubyKernel$INVOKER$s$1$0$require19.call(org/jruby/RubyKernel$INVOKER$s$1$0$require19.gen)
at RUBY.(root)(uri:classloader:/META-INF/jruby.home/lib/ruby/stdlib/rubygems/core_ext/kernel_require.rb:1)
at Users.seri.$_dot_embulk.bin.embulk.embulk.command.embulk_bundle.invokeOther34:require(Users/seri/$_dot_embulk/bin/embulk/embulk/command/file:/Users/seri/.embulk/bin/embulk!/embulk/command/embulk_bundle.rb:46)
at Users.seri.$_dot_embulk.bin.embulk.embulk.command.embulk_bundle.<main>(file:/Users/seri/.embulk/bin/embulk!/embulk/command/embulk_bundle.rb:46)
at java.lang.invoke.MethodHandle.invokeWithArguments(java/lang/invoke/MethodHandle.java:627)
at org.jruby.Ruby.runScript(org/jruby/Ruby.java:834)
at org.jruby.Ruby.runNormally(org/jruby/Ruby.java:749)
at org.jruby.Ruby.runNormally(org/jruby/Ruby.java:767)
at org.jruby.Ruby.runFromMain(org/jruby/Ruby.java:580)
at org.jruby.Main.doRunFromMain(org/jruby/Main.java:425)
at org.jruby.Main.internalRun(org/jruby/Main.java:313)
at org.jruby.Main.run(org/jruby/Main.java:242)
at org.jruby.Main.main(org/jruby/Main.java:204)
at org.embulk.cli.Main.main(org/embulk/cli/Main.java:23)
Caused by: java.lang.NullPointerException
at org.embulk.spi.PageBuilder.writeString(PageBuilder.java:190)
at org.embulk.spi.PageBuilder.access$1300(PageBuilder.java:14)
at org.embulk.spi.PageBuilder$StringColumnValue.writeNotNull(PageBuilder.java:542)
at org.embulk.spi.PageBuilder$AbstractColumnValue.write(PageBuilder.java:440)
at org.embulk.spi.PageBuilder$Row.write(PageBuilder.java:367)
at org.embulk.spi.PageBuilder$Row.access$800(PageBuilder.java:285)
at org.embulk.spi.PageBuilder.addRecord(PageBuilder.java:219)
at org.embulk.input.plugin_example.PluginExampleInputPlugin.run(PluginExampleInputPlugin.java:67)
at org.embulk.exec.LocalExecutorPlugin$ScatterExecutor.runInputTask(LocalExecutorPlugin.java:294)
at org.embulk.exec.LocalExecutorPlugin$ScatterExecutor.access$000(LocalExecutorPlugin.java:212)
at org.embulk.exec.LocalExecutorPlugin$ScatterExecutor$1.call(LocalExecutorPlugin.java:257)
at org.embulk.exec.LocalExecutorPlugin$ScatterExecutor$1.call(LocalExecutorPlugin.java:253)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)