Skip to content

Instantly share code, notes, and snippets.

@serihiro
Last active July 15, 2017 12:18
Show Gist options
  • Save serihiro/6da0392bf3d94a9b673562370259d355 to your computer and use it in GitHub Desktop.
Save serihiro/6da0392bf3d94a9b673562370259d355 to your computer and use it in GitHub Desktop.
Embulk input pluginを書いてみたときの備忘録

環境

$ javac -version
javac 1.8.0_91

$ embulk --version
embulk 0.8.27

template作成

$ embulk new java-input plugin_example

documentに従ってセットアップ

$ cd ./embulk-input-plugin_example
$ ./gradlew package

いきなり雑にconfigを書いて動かしてみる

  • embulk new で生成されたtemplateのコードにはいくつかoptionが指定されているので、とりあえず最低限validationエラーにならないようにoptionを記載したconfig.ymlを書いて実行してみる
in:
  type: plugin_example
  option1: 1
  option2: example2
  columns:
    - {name: id, type: long}
    - {name: account, type: long}
    - {name: time, type: timestamp, format: '%Y-%m-%d %H:%M:%S'}
    - {name: purchase, type: timestamp, format: '%Y%m%d'}
    - {name: comment, type: string}
out: {type: stdout}
$ embulk run -L ./embulk-input-plugin_example embulk-input-plugin_example/example/config.yml
org.embulk.exec.PartialExecutionException: java.lang.UnsupportedOperationException: PluginExampleInputPlugin.run method is not implemented yet
        at org.embulk.exec.BulkLoader$LoaderState.buildPartialExecuteException(org/embulk/exec/BulkLoader.java:375)
        at org.embulk.exec.BulkLoader.doRun(org/embulk/exec/BulkLoader.java:607)
        at org.embulk.exec.BulkLoader.access$000(org/embulk/exec/BulkLoader.java:35)
        at org.embulk.exec.BulkLoader$1.run(org/embulk/exec/BulkLoader.java:391)
        at org.embulk.exec.BulkLoader$1.run(org/embulk/exec/BulkLoader.java:387)
        at org.embulk.spi.Exec.doWith(org/embulk/spi/Exec.java:25)
        at org.embulk.exec.BulkLoader.run(org/embulk/exec/BulkLoader.java:387)
        at org.embulk.EmbulkEmbed.run(org/embulk/EmbulkEmbed.java:180)
        at java.lang.reflect.Method.invoke(java/lang/reflect/Method.java:498)
        at org.jruby.javasupport.JavaMethod.invokeDirectWithExceptionHandling(org/jruby/javasupport/JavaMethod.java:453)
        at org.jruby.javasupport.JavaMethod.invokeDirect(org/jruby/javasupport/JavaMethod.java:314)
        at RUBY.run(/Users/seri/.embulk/bin/embulk!/embulk/runner.rb:84)
        at RUBY.run(/Users/seri/.embulk/bin/embulk!/embulk/command/embulk_run.rb:269)
        at RUBY.<main>(/Users/seri/.embulk/bin/embulk!/embulk/command/embulk_main.rb:2)
        at org.jruby.Ruby.runInterpreter(org/jruby/Ruby.java:850)
        at org.jruby.Ruby.loadFile(org/jruby/Ruby.java:2976)
        at org.jruby.RubyKernel.requireCommon(org/jruby/RubyKernel.java:963)
        at org.jruby.RubyKernel.require(org/jruby/RubyKernel.java:956)
        at org.jruby.RubyKernel$INVOKER$s$1$0$require19.call(org/jruby/RubyKernel$INVOKER$s$1$0$require19.gen)
        at RUBY.(root)(uri:classloader:/META-INF/jruby.home/lib/ruby/stdlib/rubygems/core_ext/kernel_require.rb:1)
        at Users.seri.$_dot_embulk.bin.embulk.embulk.command.embulk_bundle.invokeOther34:require(Users/seri/$_dot_embulk/bin/embulk/embulk/command/file:/Users/seri/.embulk/bin/embulk!/embulk/command/embulk_bundle.rb:46)
        at Users.seri.$_dot_embulk.bin.embulk.embulk.command.embulk_bundle.<main>(file:/Users/seri/.embulk/bin/embulk!/embulk/command/embulk_bundle.rb:46)
        at java.lang.invoke.MethodHandle.invokeWithArguments(java/lang/invoke/MethodHandle.java:627)
        at org.jruby.Ruby.runScript(org/jruby/Ruby.java:834)
        at org.jruby.Ruby.runNormally(org/jruby/Ruby.java:749)
        at org.jruby.Ruby.runNormally(org/jruby/Ruby.java:767)
        at org.jruby.Ruby.runFromMain(org/jruby/Ruby.java:580)
        at org.jruby.Main.doRunFromMain(org/jruby/Main.java:425)
        at org.jruby.Main.internalRun(org/jruby/Main.java:313)
        at org.jruby.Main.run(org/jruby/Main.java:242)
        at org.jruby.Main.main(org/jruby/Main.java:204)
        at org.embulk.cli.Main.main(org/embulk/cli/Main.java:23)
  • 雑に PluginExampleInputPlugin.run を実装する
-        // Write your code here :)
-        throw new UnsupportedOperationException("PluginExampleInputPlugin.run method is not implemented yet");
+        return Exec.newTaskReport();
  • 一応こんなんでも動くことは動くようだ(何もしないけど)
embulk run -L ./embulk-input-plugin_example embulk-input-plugin_example/example/config.yml
2017-07-15 15:04:26.857 +0900: Embulk v0.8.27
2017-07-15 15:04:29.423 +0900 [INFO] (0001:transaction): Loaded plugin embulk/input/plugin_example from a load path
2017-07-15 15:04:29.475 +0900 [INFO] (0001:transaction): Using local thread executor with max_threads=8 / output tasks 4 = input tasks 1 * 4
2017-07-15 15:04:29.491 +0900 [INFO] (0001:transaction): {done:  0 / 1, running: 0}
2017-07-15 15:04:29.518 +0900 [INFO] (0001:transaction): {done:  1 / 1, running: 0}
2017-07-15 15:04:29.541 +0900 [INFO] (main): Committed.
2017-07-15 15:04:29.541 +0900 [INFO] (main): Next config diff: {"in":{},"out":{}}

とりあえず固定文字列をoutputに渡したい

  • 決め打ちのcolumnに固定文字列を入れてoutputに渡してみたい
  • いくつかinput pluginの実装を呼んだが PageBuilderクラスをnewしてそこに渡しているように見えた
  • 試行錯誤した結果、以下のようにrun methodを実装することで動作した
    @Override
    public TaskReport run(TaskSource taskSource,
                          Schema schema, int taskIndex,
                          PageOutput output) {
        PluginTask task = taskSource.loadTask(PluginTask.class);

        PageBuilder pagebuilder =
                new PageBuilder(Exec.getBufferAllocator(), schema, output);
        pagebuilder.setString(schema.getColumn(0), "unko");

        pagebuilder.addRecord();
        pagebuilder.finish();

        return Exec.newTaskReport();
    }
in:
  type: plugin_example
  option1: 1
  option2: example2
  columns:
    - {name: comment, type: string}
$ embulk run -L ./embulk-input-plugin_example embulk-input-plugin_example/example/config.yml

2017-07-15 16:10:43.233 +0900: Embulk v0.8.27
2017-07-15 16:10:45.502 +0900 [INFO] (0001:transaction): Loaded plugin embulk/input/plugin_example from a load path
2017-07-15 16:10:45.555 +0900 [INFO] (0001:transaction): Using local thread executor with max_threads=8 / output tasks 4 = input tasks 1 * 4
2017-07-15 16:10:45.580 +0900 [INFO] (0001:transaction): {done:  0 / 1, running: 0}
unko
2017-07-15 16:10:45.679 +0900 [INFO] (0001:transaction): {done:  1 / 1, running: 0}
2017-07-15 16:10:45.698 +0900 [INFO] (main): Committed.
2017-07-15 16:10:45.698 +0900 [INFO] (main): Next config diff: {"in":{},"out":{}}
  • なお、columnで定義しているのに値を設定していないと addRecord() でschemaの全てのcolumnの値をbufferにwriteしようとするので、値を設定していないcolumnがあるとぬるぽになる。
  • 具体的には以下のようなケース
  type: plugin_example
  option1: 1
  option2: example2
  columns:
    - {name: comment, type: string}
    - {name: no_value_assigned, type: string}
org.embulk.exec.PartialExecutionException: java.lang.NullPointerException
	at org.embulk.exec.BulkLoader$LoaderState.buildPartialExecuteException(org/embulk/exec/BulkLoader.java:375)
	at org.embulk.exec.BulkLoader.doRun(org/embulk/exec/BulkLoader.java:607)
	at org.embulk.exec.BulkLoader.access$000(org/embulk/exec/BulkLoader.java:35)
	at org.embulk.exec.BulkLoader$1.run(org/embulk/exec/BulkLoader.java:391)
	at org.embulk.exec.BulkLoader$1.run(org/embulk/exec/BulkLoader.java:387)
	at org.embulk.spi.Exec.doWith(org/embulk/spi/Exec.java:25)
	at org.embulk.exec.BulkLoader.run(org/embulk/exec/BulkLoader.java:387)
	at org.embulk.EmbulkEmbed.run(org/embulk/EmbulkEmbed.java:180)
	at java.lang.reflect.Method.invoke(java/lang/reflect/Method.java:498)
	at org.jruby.javasupport.JavaMethod.invokeDirectWithExceptionHandling(org/jruby/javasupport/JavaMethod.java:453)
	at org.jruby.javasupport.JavaMethod.invokeDirect(org/jruby/javasupport/JavaMethod.java:314)
	at RUBY.run(/Users/seri/.embulk/bin/embulk!/embulk/runner.rb:84)
	at RUBY.run(/Users/seri/.embulk/bin/embulk!/embulk/command/embulk_run.rb:269)
	at RUBY.<main>(/Users/seri/.embulk/bin/embulk!/embulk/command/embulk_main.rb:2)
	at org.jruby.Ruby.runInterpreter(org/jruby/Ruby.java:850)
	at org.jruby.Ruby.loadFile(org/jruby/Ruby.java:2976)
	at org.jruby.RubyKernel.requireCommon(org/jruby/RubyKernel.java:963)
	at org.jruby.RubyKernel.require(org/jruby/RubyKernel.java:956)
	at org.jruby.RubyKernel$INVOKER$s$1$0$require19.call(org/jruby/RubyKernel$INVOKER$s$1$0$require19.gen)
	at RUBY.(root)(uri:classloader:/META-INF/jruby.home/lib/ruby/stdlib/rubygems/core_ext/kernel_require.rb:1)
	at Users.seri.$_dot_embulk.bin.embulk.embulk.command.embulk_bundle.invokeOther34:require(Users/seri/$_dot_embulk/bin/embulk/embulk/command/file:/Users/seri/.embulk/bin/embulk!/embulk/command/embulk_bundle.rb:46)
	at Users.seri.$_dot_embulk.bin.embulk.embulk.command.embulk_bundle.<main>(file:/Users/seri/.embulk/bin/embulk!/embulk/command/embulk_bundle.rb:46)
	at java.lang.invoke.MethodHandle.invokeWithArguments(java/lang/invoke/MethodHandle.java:627)
	at org.jruby.Ruby.runScript(org/jruby/Ruby.java:834)
	at org.jruby.Ruby.runNormally(org/jruby/Ruby.java:749)
	at org.jruby.Ruby.runNormally(org/jruby/Ruby.java:767)
	at org.jruby.Ruby.runFromMain(org/jruby/Ruby.java:580)
	at org.jruby.Main.doRunFromMain(org/jruby/Main.java:425)
	at org.jruby.Main.internalRun(org/jruby/Main.java:313)
	at org.jruby.Main.run(org/jruby/Main.java:242)
	at org.jruby.Main.main(org/jruby/Main.java:204)
	at org.embulk.cli.Main.main(org/embulk/cli/Main.java:23)
Caused by: java.lang.NullPointerException
	at org.embulk.spi.PageBuilder.writeString(PageBuilder.java:190)
	at org.embulk.spi.PageBuilder.access$1300(PageBuilder.java:14)
	at org.embulk.spi.PageBuilder$StringColumnValue.writeNotNull(PageBuilder.java:542)
	at org.embulk.spi.PageBuilder$AbstractColumnValue.write(PageBuilder.java:440)
	at org.embulk.spi.PageBuilder$Row.write(PageBuilder.java:367)
	at org.embulk.spi.PageBuilder$Row.access$800(PageBuilder.java:285)
	at org.embulk.spi.PageBuilder.addRecord(PageBuilder.java:219)
	at org.embulk.input.plugin_example.PluginExampleInputPlugin.run(PluginExampleInputPlugin.java:67)
	at org.embulk.exec.LocalExecutorPlugin$ScatterExecutor.runInputTask(LocalExecutorPlugin.java:294)
	at org.embulk.exec.LocalExecutorPlugin$ScatterExecutor.access$000(LocalExecutorPlugin.java:212)
	at org.embulk.exec.LocalExecutorPlugin$ScatterExecutor$1.call(LocalExecutorPlugin.java:257)
	at org.embulk.exec.LocalExecutorPlugin$ScatterExecutor$1.call(LocalExecutorPlugin.java:253)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment