Skip to content

Instantly share code, notes, and snippets.

@jca02266
Created November 29, 2017 06:47
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jca02266/b53e6e28f9a4fda9d273ed749d5a36cb to your computer and use it in GitHub Desktop.
Save jca02266/b53e6e28f9a4fda9d273ed749d5a36cb to your computer and use it in GitHub Desktop.
EmbulkEmbedサンプル
in:
type: file
path_prefix: ./work/csv/sample_
decoders:
- {type: gzip}
parser:
charset: UTF-8
type: multiline
skip_header_lines: 1
columns:
- {name: id, type: long}
- {name: account, type: long}
- {name: time, type: timestamp, format: '%Y-%m-%d %H:%M:%S'}
- {name: purchase, type: timestamp, format: '%Y%m%d'}
- {name: comment, type: string}
filters:
- type: column
add_columns:
- {name: d, type: timestamp, default: "2015-07-13", format: "%Y-%m-%d"}
out:
type: file
path_prefix: ./work/sample_output_
file_ext: csv
formatter:
type: csv
column_options:
d: {format: '%Y-%m-%d %H:%M:%S'}
exec:
min_output_tasks: 1
import org.embulk.EmbulkEmbed;
import org.embulk.EmbulkEmbed.Bootstrap;
import org.embulk.config.ConfigLoader;
import org.embulk.config.ConfigSource;
import java.io.File;
import java.io.IOException;
public class EmbulkRunner {
public static void main(String[] args) throws IOException {
Bootstrap bootstrap = new EmbulkEmbed.Bootstrap();
ConfigSource systemConfig = bootstrap.getSystemConfigLoader().fromYamlFile(new File("./systemConfig.yml"));
bootstrap.setSystemConfig(systemConfig);
EmbulkEmbed embulk = bootstrap.initializeCloseable();
try {
ConfigLoader loader = embulk.newConfigLoader();
ConfigSource config = loader.fromYamlFile(new File("./config.yml"));
ConfigSource in = config.getNested("in");
ConfigSource out = config.getNested("out");
for (int i = 0; i < 100; i++) {
in.set("path_prefix", "./work/csv/sample_");
out.set("path_prefix", "./work/sample_output_" + i + "_");
embulk.run(config);
}
} finally {
embulk.destroy();
}
}
}
require 'pp'
module Embulk
module Java
java_import 'org.embulk.standards.CsvParserPlugin'
java_import 'org.embulk.standards.CsvTokenizer'
end
module Parser
class Multiline < ParserPlugin
Plugin.register_parser("multiline", self)
def self.transaction(config, &control)
plugin_task = config.load_config(Java::CsvParserPlugin::PluginTask)
task = {
"plugin_task" => DataSource.from_java(plugin_task.dump),
# "property1" => config.param("property1", :string),
# "property2" => config.param("property2", :integer, default: 0),
}
columns = [
Column.new(0, "id", :string),
Column.new(1, "account", :string),
Column.new(2, "time", :string),
Column.new(3, "purchase", :string),
Column.new(4, "comment", :string),
]
yield(task, columns)
end
def init
# initialization code:
# @property1 = task["property1"]
# @property2 = task["property2"]
@plugin_task = task.param("plugin_task", :hash)
.load_task(Java::CsvParserPlugin::PluginTask)
end
def run(file_input)
java_file_input = file_input.instance_eval { @java_file_input }
decoder = Java::LineDecoder.new(java_file_input, @plugin_task)
tokenizer = Java::CsvTokenizer.new(decoder, @plugin_task)
while tokenizer.nextFile
skip_header_lines = @plugin_task.skip_header_lines
while skip_header_lines > 0
skip_header_lines -= 1
break if !tokenizer.skipHeaderLine
end
while tokenizer.nextRecord
line = []
while tokenizer.hasNextColumn
line.push tokenizer.nextColumn
end
page_builder.add(line)
end
end
page_builder.finish
end
end
end
end
jruby_load_path:
- plugin/lib
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment