16 package uk.ac.cam.eng.extraction.hadoop.util;
18 import java.io.ByteArrayOutputStream;
19 import java.io.DataOutputStream;
20 import java.io.IOException;
22 import org.apache.hadoop.conf.Configuration;
23 import org.apache.hadoop.fs.FileSystem;
24 import org.apache.hadoop.fs.Path;
25 import org.apache.hadoop.hbase.KeyValue;
26 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
27 import org.apache.hadoop.hbase.io.hfile.HFile;
28 import org.apache.hadoop.hbase.regionserver.BloomType;
29 import org.apache.hadoop.hbase.util.BloomFilterFactory;
30 import org.apache.hadoop.hbase.util.BloomFilterWriter;
31 import org.apache.hadoop.io.Writable;
32 import org.apache.hadoop.mapreduce.RecordWriter;
33 import org.apache.hadoop.mapreduce.TaskAttemptContext;
34 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
47 FileOutputFormat<RuleString, TargetFeatureList> {
51 TaskAttemptContext job)
throws IOException {
53 final Configuration conf = job.getConfiguration();
54 Path file = getDefaultWorkFile(job,
".hfile");
55 FileSystem fs = file.getFileSystem(conf);
56 final CacheConfig cacheConfig =
new CacheConfig(conf);
57 HFile.WriterFactory writerFactory = HFile.getWriterFactory(conf, cacheConfig).withPath(fs, file)
58 .withBlockSize(64 * 1024).withCompression(
"gz");
59 final HFile.Writer writer = writerFactory.create();
60 return new RecordWriter<RuleString, TargetFeatureList>() {
62 private ByteArrayOutputStream bytesOut =
new ByteArrayOutputStream();
64 private DataOutputStream out =
new DataOutputStream(bytesOut);
66 BloomFilterWriter bloomFilterWriter = BloomFilterFactory
67 .createGeneralBloomAtWrite(conf, cacheConfig, BloomType.ROW, -1,
70 private byte[] createBytes(Writable obj)
throws IOException {
73 return bytesOut.toByteArray();
79 byte[] keyBytes = createBytes(key);
80 byte[] valueBytes = createBytes(value);
81 KeyValue toWrite =
new KeyValue(keyBytes, Array.emptyByteArray(),
82 Array.emptyByteArray(), 0, valueBytes);
83 writer.append(toWrite);
84 bloomFilterWriter.add(keyBytes, 0, keyBytes.length);
88 public void close(TaskAttemptContext context)
throws IOException {
89 writer.addGeneralBloomFilter(bloomFilterWriter);