One issue that we see in building log aggregators using hadoop is that we
often want to do several aggregations in a single reduce task.
For instance, we have viewers who view videos and sometimes watch them to
completion and sometimes scrub to different points in the video and
sometimes close a browser without a session completion event. We want to
run map to extract session id as a reduce key and then have a reduce that
summarizes the basics of the the session (user + video + what happened).
The interesting stuff happens in the second map/reduce where we want to have
map emit records for user aggregation, user by day aggregation, video
aggregation, and video by day aggregation (this is somewhat simplified, of
course). In the second reduce, we want to compute various aggregation
functions such as total count, total distinct count and a few distributional
measures such as estimated non-robot distinct views or long-tail
coefficients or day part usage pattern.
Systems like Pig seem to be built with the assumption that there should be a
separate reduce task per aggregation. Since we have about a dozen aggregate
functions that we need to compute per aggregation type, that would entail
about an order of magnitude decrease in performance for us unless the
language is clever enough to put all aggregation functions for the same
aggregation into the same reduce task.
Also, our current ingestion rates are completely dominated by our downstream
processing so log file collection isn't a huge deal. For backfilling old
data, we have some need for higher bandwidth, but even a single NFS server
suffices as a source for heavily compressed and consolidated log files. Our
transaction volumes are pretty modest compared to something like Yahoo, of
course, since we still have less than 20 million monthly uniques, but we
expect this to continue the strong growth that we have been seeing.
On 8/7/07 10:45 AM, "Eric Baldeschwieler" wrote:
I'll have our operations folks comment on our current techniques.
We use map-reduce jobs to copy from all nodes in the cluster from the
source. Generally using either HTTP(S) or HDFS protocol.
We've seen write rates as high as 8.3 GBytes/sec on 900 nodes. This
is network limited. We see roughly 20MBytes/sec/node (double the
other rate) on one rack clusters, with everything connected with
We (the yahoo grid team) are planning to put some more energy into
making the system more useful for real-time log handling in the next
few releases. For example, I would like to be able to tail -f a file
as it is written, I would like to have a generic log aggregation
system and I would like to have the map-reduce framework log directly
into HDFS using that system.
I'd love to hear thoughts on other achievable improvements that would
really help in this area.
On Aug 3, 2007, at 1:42 AM, Jeff Hammerbacher wrote:
We have a service which writes one copy of a logfile directly into
(writes go to namenode). As Dennis mentions, since HDFS does not
atomic appends, if a failure occurs before closing a file, it never
in the file system. Thus we have to rotate logfiles at a greater
that we'd like to "checkpoint" the data into HDFS. The system
isn't perfect but bulk-loading the data into HDFS was proving
I'd be curious to hear actual performance numbers and methodologies
loads. I'll try to dig some up myself on Monday.
On 8/2/07, Dennis Kubes wrote:
You can copy data from any node, so if you can do it from
your performance would be better (although be sure not to overlap
files). The master node is updated once a the block is copied it
replication number of times. So if default replication is 3 then the 3
replicates must be active before the master is updated and the data
"appears" int the dfs.
How long the updates take to happen is a function of your server load
and network speed and file size. Generally it is fast.
So the process is the data is loaded into the dfs, replicates are
created, and the master node is updated. In terms of
the data node crashes before the data is loaded then the data won't
appear in the dfs. If the name node crashes before it is updated but
all replicates are active, the data would appear once the name node has
been fixed and updated through block reports. If a single node crashes
that has a replicate once the namenode has been updated then the data
will be replicated from one of the other 2 replicates to another 3
system if available.
Venkates .P.B. wrote:
Am I missing something very fundamental ? Can someone comment
Venkates P B
On 8/1/07, Venkates .P.B. wrote:
Few queries regarding the way data is loaded into HDFS.
-Is it a common practice to load the data into HDFS only
master node ? We are able to copy only around 35 logs (64K
in a 2 slave configuration.
-We are concerned about time it would take to update filenames
maps in the master node when data is loaded from few/all the
Can anyone let me know how long generally it takes for this
And one more question, what if the node crashes soon after the
copied into one it. How is data consistency maintained here ?
Thanks in advance,
Venkates P B