16 package uk.ac.cam.eng.extraction.hadoop.features.phrase;
18 import java.io.IOException;
19 import java.util.List;
21 import org.apache.hadoop.conf.Configuration;
22 import org.apache.hadoop.mapreduce.Job;
23 import org.apache.hadoop.mapreduce.Mapper;
24 import org.apache.hadoop.mapreduce.Partitioner;
25 import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
26 import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
27 import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;
28 import org.apache.hadoop.util.ToolRunner;
44 public static class Source2TargetComparator
extends 45 MarginalReducer.MRComparator {
48 protected boolean isSource2Target() {
54 private static class Source2TargetPartitioner
extends 55 Partitioner<Rule, ProvenanceCountMap> {
57 private Partitioner<List<Symbol>,
ProvenanceCountMap> defaultPartitioner =
new HashPartitioner<>();
62 return defaultPartitioner.getPartition(key.getSource(), value,
68 private static class KeepProvenanceCountsOnlyMapper
70 Mapper<Rule, ExtractedData, Rule, ProvenanceCountMap> {
74 Context context)
throws IOException, InterruptedException {
81 public Job
getJob(Configuration conf)
throws IOException {
82 conf.setIfUnset(
"mapreduce.map.child.java.opts",
"-Xmx200m");
83 conf.setIfUnset(
"mapreduce.reduce.child.java.opts",
"-Xmx5128m");
84 conf.setIfUnset(
"mapreduce.map.memory.mb",
"1000");
85 conf.setIfUnset(
"mapreduce.reduce.memory.mb",
"6000");
86 conf.setBoolean(MarginalReducer.SOURCE_TO_TARGET,
true);
87 Job job =
new Job(conf);
89 job.setJobName(
"Source2Taget");
90 job.setSortComparatorClass(Source2TargetComparator.class);
91 job.setPartitionerClass(Source2TargetPartitioner.class);
92 job.setMapperClass(KeepProvenanceCountsOnlyMapper.class);
93 job.setReducerClass(MarginalReducer.class);
94 job.setMapOutputKeyClass(Rule.class);
96 job.setOutputKeyClass(Rule.class);
98 job.setInputFormatClass(SequenceFileInputFormat.class);
99 job.setOutputFormatClass(SequenceFileOutputFormat.class);
104 public static void main(String[] args)
throws Exception {