16 package uk.ac.cam.eng.extraction.hadoop.merge;
18 import java.io.IOException;
19 import java.util.ArrayList;
20 import java.util.List;
23 import org.apache.hadoop.conf.Configuration;
24 import org.apache.hadoop.conf.Configured;
25 import org.apache.hadoop.fs.Path;
26 import org.apache.hadoop.mapreduce.Job;
27 import org.apache.hadoop.mapreduce.Mapper;
28 import org.apache.hadoop.mapreduce.Reducer;
29 import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
30 import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
31 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
32 import org.apache.hadoop.util.StringUtils;
33 import org.apache.hadoop.util.Tool;
34 import org.apache.hadoop.util.ToolRunner;
49 import com.beust.jcommander.ParameterException;
56 public class MergeJob extends Configured implements Tool {
58 public static class MergeFeatureMapper
extends 59 Mapper<Rule, FeatureMap, Rule, RuleData> {
64 protected void map(Rule key,
FeatureMap value, Context context)
65 throws IOException, InterruptedException {
67 context.write(key, ruleData);
72 public static class MergeRuleMapper
extends 73 Mapper<Rule, ExtractedData, Rule, RuleData> {
78 protected void map(Rule key,
ExtractedData value, Context context)
79 throws IOException, InterruptedException {
82 context.write(key, ruleData);
86 private static class MergeCombiner
extends 87 Reducer<Rule, RuleData, Rule, RuleData> {
92 protected void reduce(Rule key, Iterable<RuleData> values,
93 Context context)
throws IOException, InterruptedException {
96 ruleData.
merge(value);
98 context.write(key, ruleData);
102 private static class MergeReducer
extends 103 Reducer<Rule, RuleData, RuleString, TargetFeatureList> {
107 private List<Pair<Rule, RuleData>> unfiltered =
new ArrayList<>();
109 private RuleString source =
new RuleString();
114 protected void setup(
115 Reducer<Rule, RuleData, RuleString, TargetFeatureList>.Context context)
116 throws IOException, InterruptedException {
117 super.setup(context);
118 Configuration conf = context.getConfiguration();
119 FilterParams params =
new FilterParams();
120 params.minSource2TargetPhrase = Double.parseDouble(conf
121 .
get(FilterParams.MIN_SOURCE2TARGET_PHRASE));
122 params.minTarget2SourcePhrase = Double.parseDouble(conf
123 .
get(FilterParams.MIN_TARGET2SOURCE_PHRASE));
124 params.minSource2TargetRule = Double.parseDouble(conf
125 .
get(FilterParams.MIN_SOURCE2TARGET_RULE));
126 params.minTarget2SourceRule = Double.parseDouble(conf
127 .
get(FilterParams.MIN_TARGET2SOURCE_RULE));
128 params.provenanceUnion = conf.getBoolean(
129 FilterParams.PROVENANCE_UNION,
false);
130 params.allowedPatternsFile = conf
131 .get(FilterParams.ALLOWED_PATTERNS);
132 params.sourcePatterns = conf.get(FilterParams.SOURCE_PATTERNS);
136 private void writeOutput(Context context)
throws IOException,
137 InterruptedException {
139 Map<Rule, RuleData> filtered = filter.
filter(
140 source.toPattern(), unfiltered);
141 for (
Map.Entry<Rule,
RuleData> entry : filtered.entrySet()) {
145 if (!list.isEmpty()) {
146 context.
write(source, list);
154 protected void reduce(Rule key, Iterable<RuleData> values,
155 Context context)
throws IOException, InterruptedException {
157 if (source.javaSize() == 0) {
158 source.set(key.source());
160 if (!source.equals(key.source())) {
161 writeOutput(context);
162 source.set(key.source());
166 ruleData.
merge(value);
168 RuleString target =
new RuleString();
169 target.set(key.target());
170 unfiltered.add(
Pair.
createPair(
new Rule(source, target), ruleData));
174 protected void cleanup(Context context)
throws IOException,
175 InterruptedException {
176 super.cleanup(context);
177 writeOutput(context);
181 public static Job
getJob(Configuration conf)
throws IOException {
183 conf.setIfUnset(
"mapreduce.map.child.java.opts",
"-Xmx200m");
184 conf.setIfUnset(
"mapreduce.reduce.child.java.opts",
"-Xmx8000m");
185 conf.setIfUnset(
"mapreduce.map.memory.mb",
"1000");
186 conf.setIfUnset(
"mapreduce.reduce.memory.mb",
"10000");
187 Job job =
new Job(conf);
189 job.setJobName(
"Merge");
191 job.setPartitionerClass(MergePartitioner.class);
192 job.setReducerClass(MergeReducer.class);
193 job.setCombinerClass(MergeCombiner.class);
194 job.setMapOutputKeyClass(Rule.class);
195 job.setMapOutputValueClass(
RuleData.class);
196 job.setOutputKeyClass(Rule.class);
197 job.setOutputValueClass(RuleString.class);
198 job.setInputFormatClass(SequenceFileInputFormat.class);
203 public int run(String[] args)
throws IllegalArgumentException,
204 IllegalAccessException, IOException, ClassNotFoundException,
205 InterruptedException {
209 }
catch (ParameterException e) {
212 Configuration conf = getConf();
216 String[] featurePathNames = params.inputFeatures.split(
",");
217 Path[] featurePaths = StringUtils.stringToPath(featurePathNames);
218 for (Path featurePath : featurePaths) {
219 MultipleInputs.addInputPath(job, featurePath,
220 SequenceFileInputFormat.class, MergeFeatureMapper.class);
222 Path rulePath =
new Path(params.inputRules);
223 MultipleInputs.addInputPath(job, rulePath,
224 SequenceFileInputFormat.class, MergeRuleMapper.class);
226 FileOutputFormat.setOutputPath(job,
new Path(params.output));
227 return job.waitForCompletion(
true) ? 0 : 1;
230 public static void main(String[] args)
throws Exception {
231 int res = ToolRunner.run(
new MergeJob(), args);
static< F, S > Pair< F, S > createPair(F first, S second)
fst::TropicalWeightTpl< F > Map(double)
Map< Rule, RuleData > filter(SidePattern sourcePattern, List< Pair< Rule, RuleData >> toFilter)
boolean filterSource(RuleString source)