Getting Started with an HStreaming Pig Script¶
HStreaming provides high-level language support for rapid development of programs analyzing streaming data, based on an enhanced version of Apache Pig. Heart of the HStreaming-enhanced version of Apache Pig are a set of built-in load and store functions that allow Pig to load, process, and store streaming data based on the underlying HStreaming technology. Pig abstracts the specifics of stream management, execution ordering, and execution optimization in a simple-to-use and well-understood data processing language.
Akin to the default PigStorage function in standard Apache Pig, HStreaming provides a new load/store function called HStream to load data from a TCP, UDP, or HTTP stream, and to store data as HTTP streams or, experimentally, Amazon S3 objects. Suppose we have a newline-limited data stream that can be retrieved from the URL “http://myurl.com:1234/index.html”. We can continuously load this stream into a Pig relation by using the following command:
A = LOAD 'http://myurl.com:1234/index.html' USING HStream('\n') AS (f1, f2);
In this example, HStream has one parameter, the field delimiter. With only a single parameter given, HStream will continuously load the data into the relation, and partition it into batches of fixed time intervals (default is 1000 milliseconds). HStream can have an optional second parameter streamParams, which allows custom byte sizes and time intervals of batches to be specified:
A = LOAD 'http://myurl.com:1234/index.html' USING HStream('\n', '0:10000') AS (f1, f2);
In this example, HStream will use batches of arbitrary sizes (the “0” before the colon) and timing intervals of 10000 milliseconds (the parameter after the colon).
HStream also allows multiple input streams to be defined:
A = LOAD 'http://myurl.com:1234/index.html,http://myurl2.com:1234/index.html' USING HStream('\n', '0:10000,0:100') AS (f1, f2);
In this example, HStream will connect to two different URLs, using batches of 10000 milliseconds for the first stream and batches of 100 milliseconds for the second stream; if only one set of batch configuration parameters had beed specified as streamParams, HStream would have used that one globally. To set the size of the sliding window (counted in batches) set a default parameter:
set default_windowsize 2;
The default_windowsize parameter will instruct each reduce task of the final set of job to accumulate this number of map output batches into its own reduce batch; in effect, this will result in history window of the desired size.
In future commands, we can refer to the alias A to further process the input:
B = GROUP A BY f1;
To finally store processed data into an output stream, we can again use HStream, this time as store function:
STORE B INTO 'tcp://localhost//' using HStream();
In this example, the output stream will be made available as TCP stream. Alternatively, output can also be stored as HTTP stream:
STORE B INTO 'http://localhost//' using HStream();
If the parallelism is greater than one, multiple stream endpoints will be made available. Note that, due to a inconsistency in Pig, the trailing slash needs to be, as the first trailing slash is swallowed by the grunt parser; this will be fixed in the final release of HStreaming’s distribution of Pig. Also note, that the localhost part of the output URL only serves as a placeholder to render the URL valid; replicated, the actual host/port combinations of the endpoints are determined on-line, during job run-time, and then reported back to the Pig client on the console:
Input(s): Successfully read 0 records from: "http://myurl.com:1234/index.html" Output(s): Successfully stored 0 records in: "tcp://hadoopnode:59457"
To announce streams not only at the Pig console but also report them at a custom location, we can use the keyword announce_streams:
set announce_streams 's3n://mys3bucket/outstreams';
In this example, the TCP output stream could be accessed via TCP, for example using the Linux console tool netcat:
echo "0" | netcat hadoopnode 59457
The output connector will wait for a “start byte” until sending the output, hence the piping of “0” into netcat.
If HTTP is selected as output stream, the output can be accessed using a browser or, again, using popular console tools such as wget and curl exmaple using the Linux console tool netcat:
Finally, output can also be stored into S3:
STORE B INTO `s3://mybucket/output//' using HStream();
Doing so will let Pig create an S3 object per batch and output stream. If the parallelism is greater than one, each stream will have a different name. If you specify an at specified if you enter a directory (that is, an URL with a trailing slash), Pig will create, in that directory, a new S3 object per stream output batch. If you specify a file (that is, an URL without a trailing slash), Pig will create a single file per output stream prefixed by the specified path and overwrite the object each time a new batch arrives. Again note that, due to a inconsistency in Pig, the trailing slash needs to be, as the first trailing slash is swallowed by the grunt parser; therefore, to specify an output directory, add two trailing slashes.
HStreaming’s Pig distribution retains support for file-based data input, via the original PigStorage function. If applicable, streaming relations can also be correlated with file-based data:
A = LOAD 'tcp://myurl.com:5678' USING HStream('\n') as (f1); B = LOAD 'file' USING PigStorage('\t') as (f1); C = JOIN A BY u_id, B BY u_id; STORE C into 'tcp://localhost//' USING HStream();
In this example, an input stream from a TCP source is joined against a file on hdfs. In effect, HStreaming’s distribution of Hadoop will then join the whole file against every batch of the stream. Note, that any relation containing streaming data must also be stored as streaming data, using the HStream function; storing via any other function will result in un-determined behavior.