Pentaho MapReduce

Resources

The Pentaho MapReduce job entry allows you to build MapReduce jobs using Kettle transformations as the Mapper, Combiner, and/or Reducer.

Overview

Kettle transformations are used to manipulate data and function as the map, combine, and reduce phases of a MapReduce application. The Kettle engine is pushed down to each task node and is executed for each task. The implementation that supports the data type conversion from Hadoop data types to Kettle data types, the passing of tuples between input/output formats to the Kettle engine, and all associated configuration for the MapReduce job is collectively called Pentaho MapReduce.

Type Mapping

In order to pass data between Hadoop and Kettle we must convert between Hadoop IO data types. Here's the type mapping for the built in Kettle types:

Kettle Type

Hadoop Type

ValueMetaInterface.TYPE_STRING

org.apache.hadoop.io.Text

ValueMetaInterface.TYPE_BIGNUMBER

org.apache.hadoop.io.Text

ValueMetaInterface.TYPE_DATE

org.apache.hadoop.io.Text

ValueMetaInterface.TYPE_INTEGER

org.apache.hadoop.io.LongWritable

ValueMetaInterface.TYPE_LONG

org.apache.hadoop.io.DoubleWritable

ValueMetaInterface.TYPE_BOOLEAN

org.apache.hadoop.io.BooleanWritable

ValueMetaInterface.TYPE_BINARY

org.apache.hadoop.io.BytesWritable

Defining your own Type Converter

The Type Converter system is pluggable to support additional data types as required by custom Input/Output formats. The Type Converter SPI is a simple interface to implement: org.pentaho.hadoop.mapreduce.converter.spi.ITypeConverter. We use the Service Locator pattern; specifically Java's ServiceLoader, to resolve available converters at runtime. Providing your own is as easy as implementing ITypeConverter and providing a META-INF/services/org.pentaho.hadoop.mapreduce.converter.spi.ITypeConverter file with your implementation listed, both packaged into a jar placed in the plugins/pentaho-big-data-plugin/lib directory. You can find the default implementations defined here.

Distributed Cache

Pentaho MapReduce relies on Hadoop's Distributed Cache to distribute the Kettle environment, configuration, and plugins across the cluster. By leveraging the Distributed Cache network traffic is reduced up for subsequent executions as the Kettle environment is automatically configured on each node. This also allows you to use multiple version of Kettle against a single cluster.

How it works

Hadoop's Distributed Cache is a mechanism to distribute files into the working directory of each map and reduce task. The origin of these files is HDFS. Pentaho MapReduce will automatically configure the job to use a Kettle environment from HDFS (configured via pmr.kettle.installation.id, see #ConfigurationOptions). If the desired Kettle environment does not exist, Pentaho MapReduce will take care of "installing" it in HDFS before executing the job.

The default Kettle environment installation path within HDFS is pmr.kettle.dfs.install.dir/$id, where $id is a uniquely identifying string but can easily be a custom build that is tailored for a specific set of jobs.

The Kettle environment is staged to HDFS at pmr.kettle.dfs.install.dir/pmr.kettle.installation.id as follows:

  1. The contents of plugins/pentaho-big-data-plugin/pentaho-mapreduce-libraries.zip are extracted into HDFS at hdfs://{pmr.kettle.dfs.install.dir}/{pmr.kettle.installation.id}
  2. The Big Data Plugin contents are copied into pmr.kettle.installation.id/plugins/
    1. Only the active Hadoop configuration is copied, and specifically:
      1. The active Hadoop configuration's client-only libraries are not copied (config/lib/client)
      2. The active Hadoop configuration's "pmr" specific libraries are copied into the main hdfs://{pmr.kettle.dfs.install.dir}/{pmr.kettle.installation.id}/lib/ of the installation. This allows the Hadoop configuration to provide libraries that are accessible within an Input or Output format (or otherwise outside of the standard transformation execution environment. This is necessary for reading directly out of HBase using the HBase TableInputFormat for example).

Configuration options

Pentaho MapReduce can be configured through the plugin.properties found in the plugin's base directory, or overridden per Pentaho MapReduce job entry if they are defined in the User Defined properties tab.

The currently supported configuration properties are:

Property Name

Description

pmr.kettle.installation.id

Version of Kettle to use from the Kettle HDFS installation directory. If not set a unique id is generated from the version of Kettle, the Big Data Plugin version, and the Hadoop Configuration used to communicate with the cluster and submit the Pentaho MapReduce job.

pmr.kettle.dfs.install.dir

Installation path in HDFS for the Kettle environment used to execute a Pentaho MapReduce job. This can be a relative path, anchored to the user's home directory, or an absolute path if it starts with a /.

pmr.libraries.archive.file

Pentaho MapReduce Kettle environment runtime archive to be preloaded into kettle.hdfs.install.dir/pmr.kettle.installation.id

pmr.kettle.additional.plugins

Comma-separated list of additional plugins (by directory name) to be installed with the Kettle environment.
e.g. "steps/DummyPlugin,my-custom-plugin"

Customizing the Kettle Environment used by Pentaho MapReduce

The installation environment used by Pentaho MapReduce will be installed to pmr.kettle.dfs.install.dir/pmr.kettle.installation.id when the Pentaho MapReduce job entry is executed. If the installation already exists no modifications will be made and the job will use the environment as is. That means any modifications after the initial run, or any custom pre-loading of a kettle environment, will be used as is by Pentaho MapReduce.

Customizing the libraries used in a fresh Kettle environment install into HDFS

The pmr.libraries.archive.file contents are copied into HDFS at pmr.kettle.dfs.install.dir/pmr.kettle.installation.id. To make changes for initial installations, you must edit the archive referenced by this properly.

  1. Unzip pentaho-mapreduce-libraries.zip, it contains a single lib/ directory with the required Kettle dependencies
  2. Copy additional libraries to the lib/ directory
  3. Zip up the lib/ directory into pentaho-mapreduce-libraries-custom.zip so the archive contains the lib/ with all jars within it (you may create subdirectories within lib/. All jars found in lib/ and its subdirectories will be added to the classpath of the executing job.)
  4. Update plugin.properties and update the following properties:
    pmr.kettle.installation.id=custom
    pmr.libraries.archive.file=pentaho-mapreduce-libraries-custom.zip
    

The next time you execute Pentaho MapReduce the custom Kettle environment will be copied into HDFS at pmr.kettle.dfs.install.dir/custom and used when executing the job. You can switch between Kettle environments by specifying the pmr.kettle.installation.id property as a User Defined property per Pentaho MapReduce job entry or globally in the plugin.properties file*.

*Note: Only if the installation referenced by pmr.kettle.installation.id does not exist will the archive file and additional plugins currently configured will be used to "install" it into HDFS.

Customizing an existing Kettle environment in HDFS

You can customize an existing Kettle environment install in HDFS by manually copying jars and plugins into HDFS. This can be done manually (hadoop fs -copyFromLocal <localsrc> ... <dst> or with the Hadoop Copy Files job entry.

See Appendix B for the supported directory structure in HDFS.

Adding JDBC drivers to the Kettle environment

JDBC drivers and their required dependencies must be placed in the installation directory's lib/ directory.

Upgrading from the Pentaho Hadoop Distribution (PHD)

The PHD is no longer required and can be safely removed. If you have modified your Pentaho Hadoop Distribution installation you may wish to preserve these files so that the new Distributed Cache mechanism can take advantage of them. To do so follow the instructions above: Customizing the Kettle Environment used by Pentaho MapReduce.

If you're using a version of the Pentaho Hadoop Distribution (PHD) that allows you to configure the installation directory via mapred-site.xml, perform the following on all TaskTracker nodes:

  1. Remove the pentaho.* properties from your mapred-site.xml
  2. Remove the directories those properties referenced
  3. Restart the TaskTracker process

Appendix A: pentaho-mapreduce-libraries.zip structure

pentaho-mapreduce-libraries.zip/
  `- lib/
      +- kettle-core-{version}.jar
      +- kettle-engine-{version}.jar
      `- .. (all other required Kettle dependencies and optional jars)

Appendix B: Example Kettle environment installation directory structure within DFS

/opt/pentaho/mapreduce/
  +- 4.3.0/
  |   +- lib/
  |   |   +- kettle-core-{version}.jar
  |   |   +- kettle-engine-{version}.jar
  |   |   +- .. (Any files in the active Hadoop configuration's {{lib/pmr/}} directory)
  |   |   `- .. (all other required Kettle dependencies and optional jars - including JDBC drivers)
  |   `- plugins/
  |       +- pentaho-big-data-plugin/
  |       |  `- hadoop-configurations/
  |       |     `- hadoop-20/ (the active Hadoop configuration used to communicate with the cluster)
  |       |        +- lib/ (the {{lib/pmr/}} and {{lib/client/}} directories are omitted here)
  |       |        `- .. (all other jars)
  |       `- .. (additional optional plugins)
  `- custom/
      +- lib/
      |   +- kettle-core-{version}.jar
      |   +- kettle-engine-{version}.jar
      |   +- my-custom-code.jar
      |   `- .. (all other required Kettle dependencies and optional jars - including JDBC drivers)
      `- plugins/
          +- pentaho-big-data-plugin/
          |   ..
          `- my-custom-plugin/
              ..