Cambridge SMT System
SimpleHFileOutputFormat.java
Go to the documentation of this file.
1 /*******************************************************************************
2  * Licensed under the Apache License, Version 2.0 (the "License");
3  * you may not use these files except in compliance with the License.
4  * You may obtain a copy of the License at
5  *
6  * http://www.apache.org/licenses/LICENSE-2.0
7  *
8  * Unless required by applicable law or agreed to in writing, software
9  * distributed under the License is distributed on an "AS IS" BASIS,
10  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11  * See the License for the specific language governing permissions and
12  * limitations under the License.
13  *
14  * Copyright 2014 - Juan Pino, Aurelien Waite, William Byrne
15  *******************************************************************************/
16 package uk.ac.cam.eng.extraction.hadoop.util;
17 
18 import java.io.ByteArrayOutputStream;
19 import java.io.DataOutputStream;
20 import java.io.IOException;
21 
22 import org.apache.hadoop.conf.Configuration;
23 import org.apache.hadoop.fs.FileSystem;
24 import org.apache.hadoop.fs.Path;
25 import org.apache.hadoop.hbase.KeyValue;
26 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
27 import org.apache.hadoop.hbase.io.hfile.HFile;
28 import org.apache.hadoop.hbase.regionserver.BloomType;
29 import org.apache.hadoop.hbase.util.BloomFilterFactory;
30 import org.apache.hadoop.hbase.util.BloomFilterWriter;
31 import org.apache.hadoop.io.Writable;
32 import org.apache.hadoop.mapreduce.RecordWriter;
33 import org.apache.hadoop.mapreduce.TaskAttemptContext;
34 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
35 
36 import scala.Array;
37 import uk.ac.cam.eng.extraction.RuleString;
39 
46 public class SimpleHFileOutputFormat extends
47  FileOutputFormat<RuleString, TargetFeatureList> {
48 
49  @Override
50  public RecordWriter<RuleString, TargetFeatureList> getRecordWriter(
51  TaskAttemptContext job) throws IOException {
52 
53  final Configuration conf = job.getConfiguration();
54  Path file = getDefaultWorkFile(job, ".hfile");
55  FileSystem fs = file.getFileSystem(conf);
56  final CacheConfig cacheConfig = new CacheConfig(conf);
57  HFile.WriterFactory writerFactory = HFile.getWriterFactory(conf, cacheConfig).withPath(fs, file)
58  .withBlockSize(64 * 1024).withCompression("gz");
59  final HFile.Writer writer = writerFactory.create();
60  return new RecordWriter<RuleString, TargetFeatureList>() {
61 
62  private ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
63 
64  private DataOutputStream out = new DataOutputStream(bytesOut);
65 
66  BloomFilterWriter bloomFilterWriter = BloomFilterFactory
67  .createGeneralBloomAtWrite(conf, cacheConfig, BloomType.ROW, -1,
68  writer);
69 
70  private byte[] createBytes(Writable obj) throws IOException {
71  bytesOut.reset();
72  obj.write(out);
73  return bytesOut.toByteArray();
74  }
75 
76  @Override
77  public void write(RuleString key, TargetFeatureList value)
78  throws IOException {
79  byte[] keyBytes = createBytes(key);
80  byte[] valueBytes = createBytes(value);
81  KeyValue toWrite = new KeyValue(keyBytes, Array.emptyByteArray(),
82  Array.emptyByteArray(), 0, valueBytes);
83  writer.append(toWrite);
84  bloomFilterWriter.add(keyBytes, 0, keyBytes.length);
85  }
86 
87  @Override
88  public void close(TaskAttemptContext context) throws IOException {
89  writer.addGeneralBloomFilter(bloomFilterWriter);
90  writer.close();
91  }
92  };
93  }
94 
95 }
RecordWriter< RuleString, TargetFeatureList > getRecordWriter(TaskAttemptContext job)