Resources
HStreaming Developer Guide

Realtime Processing with Pig

Pig is HStreaming is a stream-processing enhanced version of Apache Pig. Apache Pig is a high-level data-flow language and execution framework for parallel computation, developed by Yahoo!

Pig uses the language Pig Latin to express computation in a high-level language in comparison to Hadoop’s native MapReduce Java API. A Pig program gets translated into a directed acyclic dependency graph of data processing steps which then execute as MapReduce jobs on a Hadoop cluster. Pig has been designed to be highly extensible and it is easy to write user-defined functions (UDF) in Java. Pig further allows to develop UDFs in scripting languages such as Python and Ruby making it very easy to develop powerful data analytics programs in very short time.

The streaming-enhanced version of Apache Pig distributed with HStreaming supports two data processing modes: normal file-based batch processing and additionally stream based processing. Both processing modes can be intermixed such that file-based static data can be used, pre-processed, and accessed for operations with streaming data.

Pig automatically infers the processing mode (stream vs. batch) from the data input source and schedules sub-tasks accordingly. The programmer does not need to specify further information.

For a detailed description of Apache Pig we highly recommend the book Programming Pig by Alan Gates published by O’Reilly Media and the Pig Online Documentation

Streaming “Hello World”

All Hadoop examples start with WordCount, an example program which counts the number of word occurances in text files. The following example shows WordCount as a Pig Latin program reading the input files from directory textfiledir and storing the results into the directory resultdir.

A = load 'textfiledir' using PigStorage();
B = FOREACH A GENERATE FLATTEN(TOKENIZE($0)) as word;
C = GROUP B BY word;
D = FOREACH C generate COUNT(B) as count, group as word;
store D into 'resultdir' using PigStorage();

The same query can be rewritten into a real time query loading live data from Twitter and providing the results as an HTTP stream:

A = load 'https://$user:$pass@stream.twitter.com/1/statuses/sample.json'
    using HStreamJson('{"batchMsec":"1000"}');
B = FOREACH A GENERAGE FLATTEN(TOKENIZE(REPLACE((chararray)($0#'text'), '\n', ' ')))
    as word;
C = GROUP B BY word;
D = FOREACH C GENERATE COUNT(B) as count, group as word;
store D into 'http://localhost:50000/' using HStream();

The key differences between the two queries are in the input and output specification and the tokenization of the date. Let’s analyze the code line-by-line:

A = load 'https://$user:$pass@stream.twitter.com/1/statuses/sample.json'
    using HStreamJson('{"batchMsec":"1000"}');

This statement assigns the relation A with the input from an https stream from Twitter. The input stream is processed using the Pig loader HStreamJson which is a built-in JSON parser for HStreaming. HStreamJson takes a JSON-formatted configuration parameter. In this example, we use a batch size of 1000 milliseconds, that is once per second a new data set is computed.

Each text line is interpreted as one JSON string and transformed into the Pig datatype Map. A Twitter post, or tweet, has the following (simplified) JSON format:

{ "in_reply_to_user_id_str":"414100409",
  "text":"Lorem ipsum dolor sit amet, consectetur adipiscing elit.
          Phasellus vulputate lobortis lacus nec tempor.",
  "hashtags":[],
  "source":"web",
  "lang":"en",
  "created_at":"Sat Dec 10 20:25:12 +0000 2011",
  "id":145599970177855488}

Each of the elements is translated into a map and can be referenced using its name. In the following line we perform 4 transformation steps in a single command:

  1. extract the text field from the map,
  2. remove line feeds from the text field using REPLACE,
  3. tokenize the string on white space boundaries, and
  4. unbox (or flatten) the tuple with strings and store the result into the field word:
B = FOREACH A GENERAGE FLATTEN(TOKENIZE(REPLACE((chararray)($0#'text'), '\n', ' ')))
    as word;

A dump of relation B (dump B;) would result in the following output:

(Lorem, ipsum, dolor, sit, amet, consectetur, adipiscing, elit, Phasellus,
 vulputate, lobortis, lacus, nec, tempor)

In the next line, identical words a grouped together:

C = GROUP B BY word;

A dump of relation C generates the following output:

Lorem, (Lorem)
ipsum, (ipsum)
dolor, (dolor)
...

In the next line, we compute relation D by counting the number of elements in group (in this example 1 for each word):

D = FOREACH C generate COUNT(B) as count, group as word;

A dump of relation D generates the following output:

1       Lorem
1       ipsum
1       dolor
...

Finally, we store the relation D into an output stream http://localhost:50000. The http scheme denotes that the output result will be provided as an HTTP server on port 50000 on the machine where the task executes. The port is a hint and if the port is already in use, Pig will try the next higher available port. Multiple clients can connect to this endpoint, for example using a web browser or curl.

store D into 'http://localhost:50000/' using HStream();

Data Windows

Pig operates on relations of data, very similar to SQL. Input and output relations are well-defined and have a beginning and an end. In stream processing systems, data is continuous and never ends and a computation would never complete.

Similar to other complex event processing systems, HStreaming uses data batches and data windows to introduce well-defined input data sets. A batch may be denoted by a time span (e.g., all data which arrives within one second) or based on the number of elements (e.g., the next 1000 data elements). The control over the start and end of batches is under control of the InputFormat and the connector may use any source to determine the begin and end of a batch. In the example before we used a batch size of 1 second.

A query can now span a data window over multiple data batches. Every time a new data batch arrives, the window moves. It drops the oldest batch and adds the newest batch to the relation. In HStreaming’s version of Apache Pig we introduced the keyword window which can be used similar to the keyword parallel. When the keyword window is specified on a grouping operation, then the result set spans as many batches as specified by the parameter following the window keyword.

The following example shows the Twitter word-count example from before but spanning counting the words of all tweets of last 60 seconds:

A = load 'https://$user:$pass@stream.twitter.com/1/statuses/sample.json'
    using HStreamJson('{"batchMsec":"1000"}');
B = FOREACH A GENERAGE FLATTEN(TOKENIZE(REPLACE((chararray)($0#'text'), '\n', ' '))) as word;
C = GROUP B BY word window 60;
D = FOREACH C GENERATE COUNT(B) as count, group as word;
store D into 'http://localhost:50000/' using HStream();

Mixing Static and Streaming Data

HStreaming’s streaming-enhanced version of Pig allows to mix static and dynamic data in a single query. This is useful when a data stream needs to be cross-correlated to or enriched with existing data. Any connector written for stock Apache Pig can be used and connectors exists for a wide variety of platforms including HBase, Cassandra, Hive, text files, SQL databases etc.

Following is an example which uses an ISO639 language mapping file to translate the language abbreviations into the full length language names (e.g., en to English):

A = load 'https://$user:$pass@stream.twitter.com/1/statuses/sample.json' using
    HStreamJson();
B = load 'iso639.txt' using PigStorage(' ') as (language, longlanguage);
C = FOREACH A generate $0:'lang' as language, $0:'text' as text;
D = COGROUP B by language, C by language;

It is possible to perform operations on the static data, such as filtering etc. Pig will automatically determine an efficient execution plan and schedule the individual operators on Hadoop.

Realtime Visualization

HStreaming ships with a built-in visualization connector which allows to quickly generate powerful web-accessible visualizations. The visualization connector uses Google’s Visualization Library and is highly customizable.

The visualization connector VisStream requires a data table as input which is then rendered for the data servlet. The connector has four parameters: the chart type, width, height, and formatting options as a JSON object.

store A into 'http://dummy2:$outportbase/' using VisStream('PieChart', '1000', '600',
        '{"title":"Datasift Feed: Average Twitter Followers for Users by Language \
                  ($windowsize Seconds Interval)",
          "titleTextStyle" : { "fontSize" : 14 },
          "chartArea": { "left": 150, "top" : 50, "width": 650, "height": 500 },
          "legendTextStyle" : { "fontSize" : 12 },
          "pieSliceText" : "label",
          "is3D" : "true"}');

This code block renders into a Pie chart as shown below.

../_images/visualization.png

All visualization types supported by the Google Visualization Library are supported, including pie charts, bar charts, line charts, area charts, candlestick charts, gauge, scatter charts, geo charts, tables, and tree maps. For formatting options please refer to the online documentation of each respective visualization.

Realtime Dashboards

Real-time dashboards can be built using javascript. Each visualization connector exposes the underlying data set via the /data URL. Real time data can be accessed either using Google Visualization Library or directly using the Google Query.

Using the Google Visualization Library, it is easy to build rich web-based dashboards, for example using the Google Visualization Dashboard Control.

Pig Examples

HStreaming comes with a variety of pig examples, which can be used to test the streaming system and also to get familiar with coding to the streaming-enhanced version of Apache Pig. The source code for all examples is installed in /usr/share/doc/hstreaming/examples/pig.

$ pig-hstreaming -f /usr/share/doc/hstreaming/examples/pig/hstreaming_apache_projcount.pig

will invoke a sample script that analyzes log files formatted in standard CommonLogFormat. The example waits for streaming data of log files listening on UDP port 10000. The example script parses the incoming log files and extracts the URI part from each entry. It then uses the third part of the URI’s path as project identifier and generates a list of project identifiers together with the number of references in the logs. The example script publishes the result Output using an HTTP servlet on port 50000.

Once the script is running, the streamgen command of HStreaming’s command shell can be used to generate sample apache log file and stream it to the script:

$ hstreaming streamgen apachelog <hostname>

Whereby hostname is the node which runs the mapper job. On a cluster with more then one machine, please use hstreaming list to determine hostname.

The script output can be displayed either using a web browser or curl:

$ curl <hostname>:50000

Similarly,

$ pig /usr/share/doc/hstreaming/examples/pig/hstreaming_netflow_ipcount.pig

will invoke a sample script that receives streams of Netflow v5 packets, parses the number of source IPs in each packet record, and generates a list of source IPs and their number of occurence to TCP port 50000.

Once the script is running, the streamgen command of HStreaming’s command shell can be used to stream a sample apache log file to the script:

$ hstreaming streamgen apachelog <hostname>

The script output can be displayed using a web browser or netcat:

$ netcat <hostname> 50000
Prev: Realtime MapReduce Next: Stream Connectors

Last update: 2011-12-20 08:56