How Data-Pipeline installs taskrunner on Ec2 instance?

Data-pipeline launches an Ec2 instances on your behalf using with the following user-data script.


#!/bin/bash
set -x -e
wget -O remote-runner-install -N 'http://datapipeline-us-west-2.s3.amazonaws.com/us-west-2/bootstrap-actions/latest/TaskRunner/install-remote-runner-ec2-v2'

## Do not remote existing parameters ever. Add new ones at the end.
bash remote-runner-install --workerGroup='resource:df-0663241HXFGFQS14TQG_@EC2ResourceObj_2015-12-06T04:48:34' --endpoint='https://datapipeline.us-west-2.amazonaws.com' --region='us-west-2' --logUri='none' --taskRunnerId='667b7bc3-7dc9-41a5-a313-29e299b613b5' --zipFile='http://datapipeline-us-west-2.s3.amazonaws.com/us-west-2/software/latest/TaskRunner/TaskRunner-1.0.zip' --mysqlFile='http://datapipeline-us-west-2.s3.amazonaws.com/us-west-2/software/latest/TaskRunner/mysql-connector-java-bin.jar' --ec2User='ec2-user' --proxyHost='' --proxyPort='-1' --username='' --password='' --windowsDomain='' --windowsWorkgroup=''

It downloads a script called remote-runner-install which installs the Taskrunner with options passed from Data-Pipelines service.

Here’s how the script looks like:

#!/bin/bash

##  Task Runner installation for a datapipeline managed Ec2 resource
##  This is run as cloudinit.  See the logs in /var/log/cloud-init-log

set -x -e

for key in "$@"
do
case $key in
    --workerGroup=*)
    WORKER_GROUP="${key#*=}"
    echo "WORKER_GROUP = $WORKER_GROUP"
    shift
    ;;

    --endpoint=*)
    ENDPOINT="${key#*=}"
    echo "ENDPOINT = $ENDPOINT"
    shift
    ;;

    --region=*)
    REGION="${key#*=}"
    echo "REGION = $REGION"
    shift
    ;;

    --logUri=*)
    LOG_URI="${key#*=}"
    echo "LOG_URI = $LOG_URI"
    shift
    ;;

    --taskRunnerId=*)
    TASKRUNNER_ID="${key#*=}"
    echo "TASKRUNNER_ID = $TASKRUNNER_ID"
    shift
    ;;

    --zipFile=*)
    ZIP_FILE="${key#*=}"
    echo "ZIP_FILE = $ZIP_FILE"
    shift
    ;;

    --mysqlFile=*)
    MYSQL_FILE="${key#*=}"
    echo "MYSQL_FILE = $MYSQL_FILE"
    shift
    ;;

    --ec2User=*)
    EC2_USER="${key#*=}"
    echo "EC2_USER = $EC2_USER"
    shift
    ;;

    --proxyHost=*)
    PROXY_HOST="${key#*=}"
    echo "PROXY_HOST = $PROXY_HOST"
    shift
    ;;

    --proxyPort=*)
    PROXY_PORT="${key#*=}"
    echo "PROXY_PORT = $PROXY_PORT"
    shift
    ;;

    --username=*)
    USERNAME="${key#*=}"
    echo "USERNAME = $USERNAME"
    shift
    ;;

    --password=*)
    PASSWORD="${key#*=}"
    echo "PASSWORD = $PASSWORD"
    shift
    ;;

    --windowsDomain=*)
    WINDOWS_DOMAIN="${key#*=}"
    echo "WINDOWS_DOMAIN = $WINDOWS_DOMAIN"
    shift
    ;;

    --windowsWorkgroup=*)
    WINDOWS_WORKGROUP="${key#*=}"
    echo "WINDOWS_WORKGROUP = $WINDOWS_WORKGROUP"
    shift
    ;;

    *)
    echo "Unlnown option : $key"
    ;;

esac
done

# prepare installation directory in /mnt/taskRunner
#   using /media/ephemeral0
if [ -d /media/ephemeral0 ] && ! [ -d /mnt/taskRunner ] ; then
  mkdir -p /media/ephemeral0/mnt/taskRunner
  mkdir -p /mnt
  chown -R $EC2_USER /media/ephemeral0/mnt/taskRunner
  ln -s /media/ephemeral0/mnt/taskRunner /mnt/taskRunner
fi

mkdir -p /mnt/taskRunner
chown -R $EC2_USER /mnt/taskRunner

# allow sudo to run for ec2 user without a tty
sudofile=task-runner-ec2-user
cat > /tmp/$sudofile <&1  | awk -F '"' '/version/ {print $2}'`

if [[ ($KERNEL =~ "amzn") && ( $VERSION  < "1.7" ) ]]; then
    echo "Found : KERNEL : $KERNEL  JAVA VERSION : $VERSION"
    echo "Upgrading to java 1.7"
    yum install java-1.7.0-openjdk -y
fi

su - $EC2_USER <> run.out 2>&1 < /dev/null &
  disown %
EOF

if [ -d /etc/init.d ] ; then
cat > /tmp/datapipeline-taskrunnner-service << EOF
#!/bin/bash

### BEGIN INIT INFO
# Provides:          hive-server-nanny
# Required-Start:
# Required-Stop:
# Default-Start:     2 3 4 5
# Default-Stop:      0 1 6
# Short-Description: Nanny to restart hive server
# Description:       Nanny to restart hive server
### END INIT INFO

if [ -e /etc/init.d/functions ]; then
. /etc/init.d/functions
fi

function start {
  killall aws-datapipeline-taskrunner-v2.sh
  if [ -x /mnt/taskRunner/aws-datapipeline-taskrunner-v2.sh ] ; then
    echo "Starting Data Pipeline task runner as a daemon...."
    su - $EC2_USER bash -c "cd /mnt/taskRunner/;/mnt/taskRunner/aws-datapipeline-taskrunner-v2.sh --workerGroup="$WORKER_GROUP" --endpoint="$ENDPOINT" --region="$REGION" --logUri="$LOG_URI" --taskRunnerId="$TASKRUNNER_ID" --proxyHost="$PROXY_HOST" --proxyPort="$PROXY_PORT" --username="$USERNAME" --password="$PASSWORD" --windowsDomain="$WINDOWS_DOMAIN" --windowsWorkgroup="$WINDOWS_WORKGROUP" >> /mnt/taskRunner/run.out 2>&1" > /dev/null 2>&1 &
    disown %
  fi
}

function stop {
    killall aws-datapipeline-taskrunner-v2.sh
}

function reload {
  echo "Nothing to reload"
}

case \$1 in
    'start' )
        start
        ;;
    'stop' )
        stop
        ;;
    'restart' )
        stop
        start
        ;;
    'force-reload' )
        stop
        start
        ;;
    'status' )
        ;;
    *)
        echo "usage: `basename \$0` {start|stop|status}"
esac

exit 0
EOF
mv /tmp/datapipeline-taskrunnner-service /etc/init.d/
chmod 500 /etc/init.d/datapipeline-taskrunnner-service

if [ -e /usr/sbin/update-rc.d ]; then
  /usr/sbin/update-rc.d -f datapipeline-taskrunnner-service remove
  /usr/sbin/update-rc.d datapipeline-taskrunnner-service defaults 40 60
elif [ -e /sbin/chkconfig ]; then
  /sbin/chkconfig --del datapipeline-taskrunnner-service
  /sbin/chkconfig --add datapipeline-taskrunnner-service
else
  echo "Unable to register taskrunner service OS. TaskRunner will NOT
  automatically come up if a reboot happens"
fi
fi

Now, this script in-turn runs and passes all arguments to aws-datapipeline-taskrunner-v2.sh. aws-datapipeline-taskrunner-v2.sh script is responsible for running the task runner by invoking the actual TaskRunner jar.

#!/bin/bash

##  This script starts the task runner by invoking the TaskRunner jar

set -x

for key in "$@"
do
case $key in
    --workerGroup=*)
    WORKER_GROUP="${key#*=}"
    echo "WORKER_GROUP = $WORKER_GROUP"
    shift
    ;;

    --endpoint=*)
    ENDPOINT="${key#*=}"
    echo "ENDPOINT = $ENDPOINT"
    shift
    ;;

    --region=*)
    REGION="${key#*=}"
    echo "REGION = $REGION"
    shift
    ;;

    --logUri=*)
    LOG_URI="${key#*=}"
    echo "LOG_URI = $LOG_URI"
    shift
    ;;

    --taskRunnerId=*)
    TASKRUNNER_ID="${key#*=}"
    echo "TASKRUNNER_ID = $TASKRUNNER_ID"
    shift
    ;;

    --proxyHost=*)
    PROXY_HOST="${key#*=}"
    echo "PROXY_HOST = $PROXY_HOST"
    shift
    ;;

    --proxyPort=*)
    PROXY_PORT="${key#*=}"
    echo "PROXY_PORT = $PROXY_PORT"
    shift
    ;;

    --username=*)
    USERNAME="${key#*=}"
    echo "USERNAME = $USERNAME"
    shift
    ;;

    --password=*)
    PASSWORD="${key#*=}"
    echo "PASSWORD = $PASSWORD"
    shift
    ;;

    --windowsDomain=*)
    WINDOWS_DOMAIN="${key#*=}"
    echo "WINDOWS_DOMAIN = $WINDOWS_DOMAIN"
    shift
    ;;

    --windowsWorkgroup=*)
    WINDOWS_WORKGROUP="${key#*=}"
    echo "WINDOWS_WORKGROUP = $WINDOWS_WORKGROUP"
    shift
    ;;

    --releaseLabel=*)
    RELEASELABEL="${key#*=}"
    echo "RELEASELABEL = $RELEASELABEL"
    shift
    ;;

    --s3NoProxy=*)
    S3NOPROXY="${key#*=}"
    echo "S3NOPROXY = $S3NOPROXY"
    shift
    ;;

    *)
    echo "Unknown option : $key"
    ;;

esac
done

HADOOP_CLASSPATH=""
mv staging-hadoop1-lib.jar staging-hadoop-lib.jar
TASKRUNNER_CLASSPATH="/mnt/taskRunner/TaskRunner-1.0.jar:/mnt/taskRunner/common/*:/mnt/taskRunner/staging-hadoop-lib.jar:"
TASKRUNNER_CLASSPATH="$TASKRUNNER_CLASSPATH:/mnt/taskRunner/emr-hadoop-goodies.jar:/mnt/taskRunner/emr-hive-goodies.jar:/mnt/taskRunner/emr-pig-goodies.jar"

#Load jars based on Hadoop version

if hash hadoop 2>/dev/null; then

   TASKRUNNER_CLASSPATH="$TASKRUNNER_CLASSPATH:`hadoop classpath`"

   # Adding hive jars
   if [ -n "$RELEASELABEL" ]; then
      TASKRUNNER_CLASSPATH="$TASKRUNNER_CLASSPATH:/usr/lib/hive/lib/*"
   else
      TASKRUNNER_CLASSPATH="$TASKRUNNER_CLASSPATH:/home/hadoop/hive/lib/*"
   fi

   #determine hadoop version
   VERSION=`hadoop version`
   MAJOR_VERSION=`[[ $VERSION =~ [0-9]+\. ]] && echo $BASH_REMATCH`

   if [ "${MAJOR_VERSION:0:1}" == "1" ]
   then
     mv pipeline-serde-hadoop1.jar pipeline-serde.jar
   else
     mv staging-hadoop2-lib.jar staging-hadoop-lib.jar
     mv pipeline-serde-hadoop2.jar pipeline-serde.jar
   fi

fi

TASKRUNNER_CLASSPATH="$TASKRUNNER_CLASSPATH:/mnt/taskRunner/pig-0.11.1.1-amzn.jar"
COMMAND_LAUNCH_TASKRUNNER="java -cp "$TASKRUNNER_CLASSPATH" amazonaws.datapipeline.taskrunner.Main \
         --workerGroup "$WORKER_GROUP" --endpoint "$ENDPOINT" --region "$REGION" --logUri "$LOG_URI" --taskrunnerId "$TASKRUNNER_ID""

if [ -n "$PROXY_HOST" ]; then
    COMMAND_LAUNCH_TASKRUNNER="$COMMAND_LAUNCH_TASKRUNNER --proxyHost "$PROXY_HOST" --proxyPort "$PROXY_PORT""
fi

if [ -n "$USERNAME" ]; then
    COMMAND_LAUNCH_TASKRUNNER="$COMMAND_LAUNCH_TASKRUNNER --proxyUsername "$USERNAME" --proxyPassword "$PASSWORD""
fi

if [ -n "$WINDOWS_DOMAIN" ]; then
    COMMAND_LAUNCH_TASKRUNNER="$COMMAND_LAUNCH_TASKRUNNER --proxyDomain "$WINDOWS_DOMAIN""
fi

if [ -n "$WINDOWS_WORKGROUP" ]; then
    COMMAND_LAUNCH_TASKRUNNER="$COMMAND_LAUNCH_TASKRUNNER --proxyWorkstation "$WINDOWS_WORKGROUP""
fi

if [ -n "$S3NOPROXY" ]; then
    COMMAND_LAUNCH_TASKRUNNER="$COMMAND_LAUNCH_TASKRUNNER --s3NoProxy "$S3NOPROXY""
fi

while true ; do
    date
    $COMMAND_LAUNCH_TASKRUNNER
    echo "Process exited with status $?"
    sleep 3

As you can see , Just like installing taskrunner on existing resources[1], Data-Pipelines runs the command java -cp “$TASKRUNNER_CLASSPATH” amazonaws.datapipeline.taskrunner.Main
—workerGroup “$WORKER_GROUP” —endpoint “$ENDPOINT” —region “$REGION” —logUri “$LOG_URI” —taskrunnerId “$TASKRUNNER_ID”
, by passing all required arguments to the taskrunner.

Taskrunner process is responsible for polling AWS Data Pipeline service for tasks and then performs those tasks. Task Lifecycle