Cambridge SMT System
MergeJob.java
Go to the documentation of this file.
1 /*******************************************************************************
2  * Licensed under the Apache License, Version 2.0 (the "License");
3  * you may not use these files except in compliance with the License.
4  * You may obtain a copy of the License at
5  *
6  * http://www.apache.org/licenses/LICENSE-2.0
7  *
8  * Unless required by applicable law or agreed to in writing, software
9  * distributed under the License is distributed on an "AS IS" BASIS,
10  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11  * See the License for the specific language governing permissions and
12  * limitations under the License.
13  *
14  * Copyright 2014 - Juan Pino, Aurelien Waite, William Byrne
15  *******************************************************************************/
16 package uk.ac.cam.eng.extraction.hadoop.merge;
17 
18 import java.io.IOException;
19 import java.util.ArrayList;
20 import java.util.List;
21 import java.util.Map;
22 
23 import org.apache.hadoop.conf.Configuration;
24 import org.apache.hadoop.conf.Configured;
25 import org.apache.hadoop.fs.Path;
26 import org.apache.hadoop.mapreduce.Job;
27 import org.apache.hadoop.mapreduce.Mapper;
28 import org.apache.hadoop.mapreduce.Reducer;
29 import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
30 import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
31 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
32 import org.apache.hadoop.util.StringUtils;
33 import org.apache.hadoop.util.Tool;
34 import org.apache.hadoop.util.ToolRunner;
35 
36 import uk.ac.cam.eng.extraction.Rule;
37 import uk.ac.cam.eng.extraction.RuleString;
45 import uk.ac.cam.eng.util.CLI;
47 import uk.ac.cam.eng.util.Pair;
48 
49 import com.beust.jcommander.ParameterException;
50 
56 public class MergeJob extends Configured implements Tool {
57 
58  public static class MergeFeatureMapper extends
59  Mapper<Rule, FeatureMap, Rule, RuleData> {
60 
61  private RuleData ruleData = new RuleData();
62 
63  @Override
64  protected void map(Rule key, FeatureMap value, Context context)
65  throws IOException, InterruptedException {
66  ruleData.setFeatures(value);
67  context.write(key, ruleData);
68  }
69 
70  }
71 
72  public static class MergeRuleMapper extends
73  Mapper<Rule, ExtractedData, Rule, RuleData> {
74 
75  private RuleData ruleData = new RuleData();
76 
77  @Override
78  protected void map(Rule key, ExtractedData value, Context context)
79  throws IOException, InterruptedException {
80  ruleData.setProvCounts(value.getProvenanceCountMap());
82  context.write(key, ruleData);
83  }
84  }
85 
86  private static class MergeCombiner extends
87  Reducer<Rule, RuleData, Rule, RuleData> {
88 
89  private RuleData ruleData = new RuleData();
90 
91  @Override
92  protected void reduce(Rule key, Iterable<RuleData> values,
93  Context context) throws IOException, InterruptedException {
94  ruleData.clear();
95  for (RuleData value : values) {
96  ruleData.merge(value);
97  }
98  context.write(key, ruleData);
99  }
100  }
101 
102  private static class MergeReducer extends
103  Reducer<Rule, RuleData, RuleString, TargetFeatureList> {
104 
105  private TargetFeatureList list = new TargetFeatureList();
106 
107  private List<Pair<Rule, RuleData>> unfiltered = new ArrayList<>();
108 
109  private RuleString source = new RuleString();
110 
111  private RuleFilter filter;
112 
113  @Override
114  protected void setup(
115  Reducer<Rule, RuleData, RuleString, TargetFeatureList>.Context context)
116  throws IOException, InterruptedException {
117  super.setup(context);
118  Configuration conf = context.getConfiguration();
119  FilterParams params = new FilterParams();
120  params.minSource2TargetPhrase = Double.parseDouble(conf
121  .get(FilterParams.MIN_SOURCE2TARGET_PHRASE));
122  params.minTarget2SourcePhrase = Double.parseDouble(conf
123  .get(FilterParams.MIN_TARGET2SOURCE_PHRASE));
124  params.minSource2TargetRule = Double.parseDouble(conf
125  .get(FilterParams.MIN_SOURCE2TARGET_RULE));
126  params.minTarget2SourceRule = Double.parseDouble(conf
127  .get(FilterParams.MIN_TARGET2SOURCE_RULE));
128  params.provenanceUnion = conf.getBoolean(
129  FilterParams.PROVENANCE_UNION, false);
130  params.allowedPatternsFile = conf
131  .get(FilterParams.ALLOWED_PATTERNS);
132  params.sourcePatterns = conf.get(FilterParams.SOURCE_PATTERNS);
133  filter = new RuleFilter(params, conf);
134  }
135 
136  private void writeOutput(Context context) throws IOException,
137  InterruptedException {
138  if (!filter.filterSource(source)) {
139  Map<Rule, RuleData> filtered = filter.filter(
140  source.toPattern(), unfiltered);
141  for (Map.Entry<Rule, RuleData> entry : filtered.entrySet()) {
142  list.add(Pair.createPair(entry.getKey().target(),
143  entry.getValue()));
144  }
145  if (!list.isEmpty()) {
146  context.write(source, list);
147  }
148  }
149  unfiltered.clear();
150  list.clear();
151  }
152 
153  @Override
154  protected void reduce(Rule key, Iterable<RuleData> values,
155  Context context) throws IOException, InterruptedException {
156  // First rule!
157  if (source.javaSize() == 0) {
158  source.set(key.source());
159  }
160  if (!source.equals(key.source())) {
161  writeOutput(context);
162  source.set(key.source());
163  }
164  RuleData ruleData = new RuleData();
165  for (RuleData value : values) {
166  ruleData.merge(value);
167  }
168  RuleString target = new RuleString();
169  target.set(key.target());
170  unfiltered.add(Pair.createPair(new Rule(source, target), ruleData));
171  }
172 
173  @Override
174  protected void cleanup(Context context) throws IOException,
175  InterruptedException {
176  super.cleanup(context);
177  writeOutput(context);
178  }
179  }
180 
181  public static Job getJob(Configuration conf) throws IOException {
182 
183  conf.setIfUnset("mapreduce.map.child.java.opts", "-Xmx200m");
184  conf.setIfUnset("mapreduce.reduce.child.java.opts", "-Xmx8000m");
185  conf.setIfUnset("mapreduce.map.memory.mb", "1000");
186  conf.setIfUnset("mapreduce.reduce.memory.mb", "10000");
187  Job job = new Job(conf);
188  job.setJarByClass(MergeJob.class);
189  job.setJobName("Merge");
190  job.setSortComparatorClass(MergeComparator.class);
191  job.setPartitionerClass(MergePartitioner.class);
192  job.setReducerClass(MergeReducer.class);
193  job.setCombinerClass(MergeCombiner.class);
194  job.setMapOutputKeyClass(Rule.class);
195  job.setMapOutputValueClass(RuleData.class);
196  job.setOutputKeyClass(Rule.class);
197  job.setOutputValueClass(RuleString.class);
198  job.setInputFormatClass(SequenceFileInputFormat.class);
199  job.setOutputFormatClass(SimpleHFileOutputFormat.class);
200  return job;
201  }
202 
203  public int run(String[] args) throws IllegalArgumentException,
204  IllegalAccessException, IOException, ClassNotFoundException,
205  InterruptedException {
207  try {
208  Util.parseCommandLine(args, params);
209  } catch (ParameterException e) {
210  return 1;
211  }
212  Configuration conf = getConf();
213  Util.ApplyConf(params, conf);
214  Job job = getJob(conf);
215 
216  String[] featurePathNames = params.inputFeatures.split(",");
217  Path[] featurePaths = StringUtils.stringToPath(featurePathNames);
218  for (Path featurePath : featurePaths) {
219  MultipleInputs.addInputPath(job, featurePath,
220  SequenceFileInputFormat.class, MergeFeatureMapper.class);
221  }
222  Path rulePath = new Path(params.inputRules);
223  MultipleInputs.addInputPath(job, rulePath,
224  SequenceFileInputFormat.class, MergeRuleMapper.class);
225 
226  FileOutputFormat.setOutputPath(job, new Path(params.output));
227  return job.waitForCompletion(true) ? 0 : 1;
228  }
229 
230  public static void main(String[] args) throws Exception {
231  int res = ToolRunner.run(new MergeJob(), args);
232  System.exit(res);
233  }
234 }
static void ApplyConf(Object params, Configuration conf)
Definition: Util.java:80
static< F, S > Pair< F, S > createPair(F first, S second)
Definition: Pair.java:46
void setAlignments(AlignmentCountMapWritable alignments)
Definition: RuleData.java:123
static JCommander parseCommandLine(String[] args, Object params)
Definition: Util.java:85
fst::TropicalWeightTpl< F > Map(double)
Map< Rule, RuleData > filter(SidePattern sourcePattern, List< Pair< Rule, RuleData >> toFilter)
boolean filterSource(RuleString source)
static Job getJob(Configuration conf)
Definition: MergeJob.java:181
void setProvCounts(ProvenanceCountMap provCounts)
Definition: RuleData.java:119