Navigation
- Prev: HStreaming Shell
- Next: Realtime Processing with Pig
Realtime MapReduce¶
HStreaming’s MapReduce API is compatible to Apache Hadoop’s New API. Existing applications require no or only minimal changes to work with the HStreaming realtime API. This section gives an introduction on how to build a simple MapReduce program using the native MapReduce API (in contrast to a high-level language like Apache Pig).
Note
HStreaming only supports Hadoop’s new MapReduce API; the old deprecated API is not supported for streaming.
For a detailed explanation of Hadoop and the native MapReduce API, we recommend the book Hadoop: The Definitive Guide authored by Tom White and published by O’Reilly.
Streaming Jobs¶
The main user-observable difference between a streaming and a batch job on Hadoop is that a streaming job runs continuously. All map and reduce tasks of a job are instantiated at the same time so that they can establish a communication pipeline. Once all tasks are running and connected, the data flows between the tasks and computation takes place in the mappers and reducers.
HStreaming provides an API to configure a MapReduce Job for streaming. Following is the most basic example which reads data from a TCP socket. For better readability we avoided import statements. The examples listed here are in available in source code in the documentation directory.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 | public class BasicMR extends Configured implements Tool {
public static class BasicMapper extends Mapper<LongWritable, Text, LongWritable, Text>
{
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
context.write(key, value);
}
}
public static class BasicReducer extends Reducer<LongWritable, Text, LongWritable, Text>
{
@Override
protected void reduce(LongWritable key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
for (Text v: values)
context.write(key, v);
}
}
@Override
public int run(String[] args) throws Exception {
Job job = new Job(getConf());
job.setMapperClass(BasicMapper.class);
job.setReducerClass(BasicReducer.class);
job.setMapOutputKeyClass(LongWritable.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(LongWritable.class);
job.setOutputValueClass(Text.class);
job.setNumReduceTasks(1);
// configure streaming input and output formats
HStreamingJobConf.setIsStreamingJob(job, true);
job.setInputFormatClass(TextHStreamingInputFormat.class);
job.setOutputFormatClass(TextHStreamingOutputFormat.class);
TextHStreamingInputFormat.addInputStream(job, 1, 1000, "", "tcp://localhost:10000");
TextHStreamingOutputFormat.setOutputPath(job, "http://localhost:50000/");
job.setJarByClass(this.getClass());
job.submit();
return job.waitForCompletion(true) ? 0 : 1;
}
public static void main(String[] args) throws Exception {
ToolRunner.run(new BasicMR(), args);
}
}
|
For users who are writing native Hadoop jobs, the code should be very familiar. The job implements a BasicMapper and a BasicReducer class which in this example don’t do much but simply take the input data and write it to output. BasicMapper and BasicReducer classes are derived from Hadoop’s Mapper and Reducer base classes and are specialized for their data types.
The run function sets up the job and submits it. Again, the code is identical to a batch MapReduce job. The differences are in lines 34 to 38. In line 34, the job is configured to be a streaming job using the HStreamingJobConf helper class. In line 35 and 36 the input and output formats are configured to be text-based streaming formats using the classes TextHStreamingInputFormat and TextHStreamingOutputFormat. In line 37 and 38 the endpoints get configured; the input endpoint is on tcp port 10000 on localhost and the output is an HTTP server on localhost port 50000.
This example will launch one mapper and one reducer with the mapper connecting via TCP and the reducer providing an HTTP server. The data flows from the mapper to the reducer.
Job Configuration Options¶
A variety of configuration options can be used to configure the behavior of streaming jobs. Following is a list of the most commonly used settings. For the exact definition please refer to the Java Docs.
| Configuration Function | Description |
|---|---|
| HStreamingJobConf.setStreamingWindowSize | Configures the window size of the job |
| HStreamingJobConf.setStreamingFastStart | Determines if the job starts processing immediately or delays processing until a full data window has been accumulated |
| HStreamingJobConf.setStreamingMaxRetry | Maximum number of connection attempts before failing the task |
| HStreamingJobConf.setStreamingRetryTimeout | Timeout for connection attempts |
Stream Endpoints¶
HStreaming defines streaming endpoints to inter-connect map and reduce tasks which participate in a stream process. Streaming endpoints between jobs (i.e., inbound endpoints for mappers and outbound endpoints for reducers) are defined and named by the user. Streaming endpoints between mappers and reducers are managed transparently by the system.
Endpoint are defined through a URI which gives the endpoint a tag (or name), defines the communication protocol, port ranges etc. With the instantiation of a streaming job, an endpoint definition may lead to multiple endpoint instantiations. Multiple endpoint instantiations may occur, for example, when a job creates multiple parallel mappers or reducers, or if a machine becomes unavailable and the endpoint gets re-instantiated on another machine.
Endpoints use the URI as a tag to identify the stream. It is the user’s responsibility to ensure that tags are not used multiple times or to disambiguate endpoints using other means. Given the host part of the URI is ignored and the streaming tasks binds to all interfaces, the user can use the hostname as distinguishing part of the tag. For example, the URI http://end-point-tag-id:50000/ defines a streaming endpoint using the http protocol and binding to port base 50000.
HStreaming provides an API to resolve endpoints names to actual endpoints in the cluster. For example, in a system with two mappers running on node1 and node2 the endpoint may resolve to:
http://end-point-tag-id:50000/ { http://node1:50000/, http://node2:50001/ }
HStreaming’s connectors which expose a server port (i.e., allow for inbound connections) use the provided port as a base port hint. Depending on the mapper or reducer id, the connector will try to allocate a network port adding the base port and the mapper or reducer id. In this example, mapper 0 would use port 50000 and mapper 1 will use port 50001. If the port is unavaible (e.g., if another application already bound to the port), the system will try alternative port numbers until the connector found a valid port id.
For inter-job cascading, HStreaming provides special API functions allowing jobs to read the streaming output from previous streaming tasks without having to care about the actual streaming endpoint. addresses. If an output job sets its own streaming output tag to streamTag via HStreamingOutputformat.SetOutputStreamTag(streamTag), a subsequent job can read from that output stream via HStreamingInputformat.SetInputStreamTag(streamTag, i), where i is the reducer partition/number to read from.
Hint
In order to guarantee unique tag names, a user may generate UUIDs for the tag.
Stream Identifiers API¶
Streams are identified through their stream IDs. A stream ID has a type (INBOUND, OUTBOUND, and INTERNAL) and a stream tag. The stream IDs are part of the job configuration. The stream identifiers can be queried through the function StreamInfo.get and it returns a set of StreamInfo objects which then can be used to find the current endpoint for a given stream.
The following example resolves the inbound endpoint instances of the stream tag streamtag for a given job and prints each task ID and the associated URI.
List<StreamInfo> streamInfos = get(job.getConfiguration(),
new StreamID(StreamID.Type.INBOUND, "streamtag"));
for (StreamInfo info: streamInfos) {
System.out.println("Stream endpoint: " + info.getTaskID() + ", " + info.getStreamURI());
}
Note
HStreaming supports multiple alternative stream resolution mechanisms depending on the cluster configuration. Portable applications must use the provided API to resolve stream identifiers. The underlying implementation and stream representation may change between different versions.
Last update: 2011-12-20 08:56