我是Hadoop框架的新手.我试图编写一个从hdfs读取
XML文件的程序,使用JDOM解析它并将其发送到数据库.以下是
Java文件
package JDOMprs; import java.io.IOException; import java.util.ArrayList; import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import org.jdom2.Document; import org.jdom2.Element; import org.jdom2.JDOMException; import org.jdom2.input.SAXBuilder; import com.vertica.hadoop.VerticaOutputFormat; import com.vertica.hadoop.VerticaRecord; public class ExampleParser extends Configured implements Tool { public static class Map extends Mapper<LongWritable,Text,DoubleWritable> { private final static DoubleWritable one = new DoubleWritable(1); private Text word = new Text(); private List mylist; public void map(LongWritable key,Text value,Context context) throws IOException,InterruptedException { context.write(value,one); } } public static class Reduce extends Reducer<Text,DoubleWritable,VerticaRecord> { VerticaRecord record = null; String src_name; String comment; String rev_by; String rev_dt; String com_title; public void setup(Context context) throws IOException,InterruptedException { super.setup(context); try { record = new VerticaRecord(context.getConfiguration()); } catch (Exception e) { throw new IOException(e); } } public void reduce(Text key,Iterable<DoubleWritable> values,Context context) throws IOException,InterruptedException { if (record == null) { throw new IOException("No output record found"); } /******************** JDOM PARSER ***************************/ SAXBuilder builder = new SAXBuilder(); // File xmlFile = new // File("C:/Users/Administrator/workspace/VerticaHadoop/src/JDOMprs/HadoopXML.xml"); try { Document document = (Document) builder.build(key.toString()); Element rootNode = document.getRootElement(); List list = rootNode.getChildren("source"); // List ls= new ArrayList(); // Jdomparse jp= new Jdomparse(); // ls=jp.getParse(key); // for (int i = 0; i < list.size(); i++) { Element node = (Element) list.get(i); // System.out.println("Source Name : " + // node.getChildText("source-name")); // System.out.println("comment : " + // node.getChildText("comment")); // System.out.println("review by : " + // node.getChildText("review-by")); // System.out.println("review date : " + // node.getChildText("review-date")); // System.out.println("comment-title : " + // node.getChildText("comment-title")); record.set(0,node.getChildText("source-name").toString()); record.set(0,node.getChildText("comment").toString()); record.set(0,node.getChildText("review-by").toString()); record.set(0,node.getChildText("review-date").toString()); record.set(0,node.getChildText("comment-title").toString()); } } catch (IOException io) { System.out.println(io.getMessage()); } catch (JDOMException jdomex) { System.out.println(jdomex.getMessage()); } /****************** END OF PARSER *****************************/ context.write(new Text("reviewtbl"),record); } } @Override public int run(String[] args) throws Exception { // Set up the configuration and job objects Configuration conf = getConf(); Job job = new Job(conf); conf = job.getConfiguration(); conf.set("mapreduce.job.tracker","local"); job.setJobName("vertica test"); job.setInputFormatClass(org.apache.hadoop.mapreduce.lib.input.FileInputFormat.class); FileInputFormat.addInputPath(job,new Path("/user/cloudera/input")); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(DoubleWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(VerticaRecord.class); job.setOutputFormatClass(VerticaOutputFormat.class); job.setJarByClass(ExampleParser.class); job.setMapperClass(Map.class); job.setReducerClass(Reduce.class); VerticaOutputFormat.setOutput(job,"reviewtbl",true,"source varchar","comment varchar","rev_by varchar","rev_dt varchar","com_title varchar"); job.waitForCompletion(true); return 0; } public static void main(String[] args) throws Exception { int res = ToolRunner.run(new Configuration(),new ExampleParser(),args); System.exit(res); } }
但我得到以下例外情况.
12/12/20 02:41:34 INFO mapred.JobClient: Cleaning up the staging area hdfs://0.0.0.0/var/lib/hadoop-0.20/cache/mapred/mapred/staging/root/.staging/job_201212191356_0006 Exception in thread "main" java.lang.RuntimeException: java.lang.InstantiationException at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:115) at org.apache.hadoop.mapred.JobClient.writeNewSplits(JobClient.java:947) at org.apache.hadoop.mapred.JobClient.writeSplits(JobClient.java:967) at org.apache.hadoop.mapred.JobClient.access$500(JobClient.java:170) at org.apache.hadoop.mapred.JobClient$2.run(JobClient.java:880) at org.apache.hadoop.mapred.JobClient$2.run(JobClient.java:833) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:396) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1177) at org.apache.hadoop.mapred.JobClient.submitJobInternal(JobClient.java:833) at org.apache.hadoop.mapreduce.Job.submit(Job.java:476) at org.apache.hadoop.mapreduce.Job.waitForCompletion(Job.java:506) at ExampleParser.run(ExampleParser.java:148) at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:65) at ExampleParser.main(ExampleParser.java:153) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) at java.lang.reflect.Method.invoke(Method.java:597) at org.apache.hadoop.util.RunJar.main(RunJar.java:197) Caused by: java.lang.InstantiationException at sun.reflect.InstantiationExceptionConstructorAccessorImpl.newInstance(InstantiationExceptionConstructorAccessorImpl.java:30) at java.lang.reflect.Constructor.newInstance(Constructor.java:513) at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:113) ... 19 more
解决方法
job.setInputFormatClass( org.apache.hadoop.mapreduce.lib.input.FileInputFormat.class);
您不能使用/实例化FileInputFormat类:它是一个抽象类.
如果你想自己解析XML,那么你需要编写自己的扩展FileInputFormat的InputFormat,并且记录阅读器可以将整个内容作为值传递给mapper.我认为Hadoop – The Definitive Guide有一个WholeFileInputFormat的例子,或类似的东西,或谷歌可能会有一些东西: