PDI Row Distribution Plugin Development

PDI Row Distribution Plugin Development

Introduction

A row distribution plugin allows you to distribute rows of data from one step to another in a specific way.

By default there are 2 ways to distribute data (not taking into account partitioning):

  • Distribute rows of data in a round-robin fashion: each target step (copy) gets a row in turn, all target steps get equal amounts of rows
  • Copy rows: all target step (copies) receive all rows.

Sometimes you might have a need to do more specific distribution of the data based on some rule, like the availability of space in the output buffer, the available resources on a machine or some other specific rule.  This is where the row distribution plugins come in.

RowDistributionInterface

Row distribution plugins need to implement the RowDistributionInterface.
Here are the methods that need to be implemented:

  /**
   * @return The row distribution code (plugin id)
   */
  public String getCode();
  
  /**
   * @return The row distribution description (plugin description)
   */
  public String getDescription();
  /**
   * Do the actual row distribution in the step
   * @param rowMeta the meta-data of the row to distribute
   * @param row the data of the row data to distribute
   * @param stepInterface The step to distribute the rows in
   * @throws KettleStepException
   */
  public void distributeRow(RowMetaInterface rowMeta, Object[] row, StepInterface stepInterface) throws KettleStepException;
 
  /**
   * Which mini-icon needs to be shown on the hop?
   * 
   * @return the available code EImage or null if the standard icon needs to be used.
   */
  public EImage getDistributionImage();

   /**
   * @return The row distribution code (plugin id)
   */
  public String getCode();

  /**
   * @return The row distribution description (plugin description)
   */
  public String getDescription();

  /**
   * Do the actual row distribution in the step
   * @param rowMeta the meta-data of the row to distribute
   * @param row the data of the row data to distribute
   * @param stepInterface The step to distribute the rows in
   * @throws KettleStepException
   */
  public void distributeRow(RowMetaInterface rowMeta, Object[] row, StepInterface stepInterface) throws KettleStepException;

  /**
   * Which mini-icon needs to be shown on the hop?
   *
   * @return the available code EImage or null if the standard icon needs to be used.  
  Return null if you don't want to display an icon.
   */
  public EImage getDistributionImage();

Example

The following code example describes a distribution algorithm that tries to lower the parallelism or more accurately, forces asynchronous operation of the target steps as they will get out of step with each other.  The output row set buffers are filled up one by one.  If one is full, the next one is approached:

package org.pentaho.di.trans.step;package org.pentaho.di.trans.step;

import java.util.concurrent.TimeUnit;

import org.pentaho.di.core.RowSet;
import org.pentaho.di.core.exception.KettleStepException;
import org.pentaho.di.core.gui.PrimitiveGCInterface.EImage;
import org.pentaho.di.core.row.RowMetaInterface;

@RowDistributionPlugin(
    code="Overflow",
    name="Overflow",
    description="When an output row set is full, move to the next one"
    )
public class OverflowRowDistributionPlugin implements RowDistributionInterface {

  @Override
  public String getCode() {
    return "Overflow";
  }

  @Override
  public String getDescription() {
    return "When an output row set is full, move to the next one";
  }

  @Override
  public void distributeRow(RowMetaInterface rowMeta, Object[] row, StepInterface step)
      throws KettleStepException {

    RowSet rowSet = step.getOutputRowSets().get(step.getCurrentOutputRowSetNr());

    boolean added = false;
    while (!added) {
      added=rowSet.putRowWait(rowMeta, row, 1, TimeUnit.NANOSECONDS);
      if (added) {
        break;
      }
      step.setCurrentOutputRowSetNr(step.getCurrentOutputRowSetNr()+1);
      if (step.getCurrentOutputRowSetNr()>step.getOutputRowSets().size()-1) {
        step.setCurrentOutputRowSetNr(0);
      }
      rowSet = step.getOutputRowSets().get(step.getCurrentOutputRowSetNr());
    }

  }

  @Override
  public EImage getDistributionImage() {
    return null;
  }

}

Available through right-click on a step: