# NOT RUN {
## A simple word count example
## Put some xml files on the HDFS:
# }
# NOT RUN {
DFS_put( system.file("defaults/core/", package = "hive"),
"/tmp/input" )
# }
# NOT RUN {
DFS_put( system.file("defaults/hdfs/hdfs-default.xml", package = "hive"),
"/tmp/input" )
# }
# NOT RUN {
DFS_put( system.file("defaults/mapred/mapred-default.xml", package = "hive"),
"/tmp/input" )
# }
# NOT RUN {
## Define the mapper and reducer function to be applied:
## Note that a Hadoop map or reduce job retrieves data line by line from stdin.
# }
# NOT RUN {
mapper <- function(x){
con <- file( "stdin", open = "r" )
while (length(line <- readLines(con, n = 1L, warn = FALSE)) > 0) {
terms <- unlist(strsplit(line, " "))
terms <- terms[nchar(terms) > 1 ]
if( length(terms) )
cat( paste(terms, 1, sep = "\t"), sep = "\n")
}
}
reducer <- function(x){
env <- new.env( hash = TRUE )
con <- file( "stdin", open = "r" )
while (length(line <- readLines(con, n = 1L, warn = FALSE)) > 0) {
keyvalue <- unlist( strsplit(line, "\t") )
if( exists(keyvalue[1], envir = env, inherits = FALSE) ){
assign( keyvalue[1], get(keyvalue[1], envir = env) + as.integer(keyvalue[2]),
envir = env )
} else {
assign( keyvalue[1], as.integer(keyvalue[2]), envir = env )
}
}
env <- as.list(env)
for( term in names(env) )
writeLines( paste(c(term, env[[term]]), collapse ="\t") )
}
hive_set_nreducer(1)
hive_stream( mapper = mapper, reducer = reducer, input = "/tmp/input", output = "/tmp/output" )
DFS_list("/tmp/output")
head( DFS_read_lines("/tmp/output/part-00000") )
# }
# NOT RUN {
## Don't forget to clean file system
# }
# NOT RUN {
DFS_dir_remove("/tmp/input")
# }
# NOT RUN {
DFS_dir_remove("/tmp/output")
# }
Run the code above in your browser using DataLab