16 package uk.ac.cam.eng.extraction.hadoop.extraction;
18 import java.io.FileNotFoundException;
19 import java.io.IOException;
20 import java.util.HashMap;
23 import org.apache.hadoop.conf.Configuration;
24 import org.apache.hadoop.conf.Configured;
25 import org.apache.hadoop.fs.Path;
26 import org.apache.hadoop.io.ByteWritable;
27 import org.apache.hadoop.io.IntWritable;
28 import org.apache.hadoop.io.MapWritable;
29 import org.apache.hadoop.io.Text;
30 import org.apache.hadoop.io.Writable;
31 import org.apache.hadoop.mapreduce.Job;
32 import org.apache.hadoop.mapreduce.Mapper;
33 import org.apache.hadoop.mapreduce.Reducer;
34 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
35 import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
36 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
37 import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
38 import org.apache.hadoop.util.Tool;
39 import org.apache.hadoop.util.ToolRunner;
53 import com.beust.jcommander.ParameterException;
69 public static Job
getJob(Configuration conf)
throws IOException {
70 conf.setIfUnset(
"mapreduce.map.java.opts",
"-Xmx800m");
71 conf.setIfUnset(
"mapreduce.reduce.java.opts",
"-Xmx4096m");
72 conf.setIfUnset(
"mapreduce.map.memory.mb",
"1000");
73 conf.setIfUnset(
"mapreduce.reduce.memory.mb",
"6000");
74 conf.setIfUnset(
"mapreduce.input.fileinputformat.split.maxsize",
"4194304");
75 Job job =
new Job(conf,
"Rule extraction");
77 job.setMapOutputKeyClass(Rule.class);
79 job.setOutputKeyClass(Rule.class);
81 job.setMapperClass(ExtractorMapper.class);
82 job.setReducerClass(ExtractorReducer.class);
83 job.setSortComparatorClass(Source2TargetComparator.class);
84 job.setCombinerClass(ExtractorReducer.class);
85 job.setInputFormatClass(SequenceFileInputFormat.class);
86 job.setOutputFormatClass(SequenceFileOutputFormat.class);
87 FileOutputFormat.setCompressOutput(job,
true);
98 private static class ExtractorMapper
extends 99 Mapper<MapWritable, TextArrayWritable, Rule, ExtractedData> {
101 private static final IntWritable ONE =
new IntWritable(1);
105 private Map<Text, ByteWritable> prov2Id =
new HashMap<>();
107 private static final ByteWritable ALL =
new ByteWritable((byte) 0);
110 protected void setup(Context context)
throws IOException,
111 InterruptedException {
112 super.setup(context);
113 String provString = context.getConfiguration().get(Provenance.PROV);
114 String[] provs = provString.split(
",");
115 if (provs.length + 1 >= Byte.MAX_VALUE) {
116 throw new RuntimeException(
118 "Number of provenances is %d which is greater than 128",
121 for (
int i = 0; i < provs.length; ++i) {
122 prov2Id.put(
new Text(provs[i]),
123 new ByteWritable((byte) (i + 1)));
135 Context context)
throws IOException, InterruptedException {
136 Configuration conf = context.getConfiguration();
137 String sourceSentence = ((Text) value.get()[0]).
toString();
138 String targetSentence = ((Text) value.get()[1]).
toString();
139 String wordAlign = ((Text) value.get()[2]).
toString();
141 int maxSourcePhrase = conf.getInt(
143 int maxSourceElements = conf.getInt(
145 int maxTerminalLength = conf.getInt(
147 int maxNonTerminalSpan = conf.getInt(
149 boolean removeMonotonicRepeats = conf.getBoolean(
151 boolean compatabilityMode = conf.getBoolean(
153 ExtractOptions
opts =
new ExtractOptions(maxSourcePhrase,
154 maxSourceElements, maxTerminalLength, maxNonTerminalSpan,
155 removeMonotonicRepeats, compatabilityMode);
158 sourceSentence, targetSentence, wordAlign)) {
161 for (Writable prov : key.keySet()) {
162 if (prov2Id.keySet().contains(prov)) {
167 context.write(ra.getFirst(), ruleInfo);
172 private static class ExtractorReducer
extends 173 Reducer<Rule, ExtractedData, Rule, ExtractedData> {
178 protected void reduce(Rule key, Iterable<ExtractedData> values,
179 Context context)
throws IOException, InterruptedException {
184 context.write(key, compressed);
188 public int run(String[] args)
throws FileNotFoundException, IOException,
189 ClassNotFoundException, InterruptedException,
190 IllegalArgumentException, IllegalAccessException {
195 }
catch (ParameterException e) {
198 Configuration conf = getConf();
201 FileInputFormat.setInputPaths(job, params.input);
202 FileOutputFormat.setOutputPath(job,
new Path(params.output));
203 return job.waitForCompletion(
true) ? 0 : 1;
206 public static void main(String[] args)
throws Exception {
std::string toString(const T &x, uint pr=2)
Converts an arbitrary type to string Converts to string integers, floats, doubles Quits execution if ...