16 package uk.ac.cam.eng.rule.retrieval;
18 import java.io.BufferedWriter;
19 import java.io.IOException;
20 import java.util.ArrayList;
21 import java.util.Collection;
22 import java.util.Collections;
23 import java.util.HashMap;
24 import java.util.List;
26 import java.util.Map.Entry;
29 import org.apache.commons.lang.time.StopWatch;
30 import org.apache.hadoop.hbase.util.BloomFilter;
31 import org.apache.hadoop.io.DataOutputBuffer;
47 class HFileRuleQuery
implements Runnable {
49 private final HFileRuleReader reader;
51 private final BloomFilter bf;
53 private final BufferedWriter out;
55 private final Collection<RuleString> query;
57 private final RuleRetriever retriever;
59 private final TTableClient s2tClient;
61 private final TTableClient t2sClient;
63 private final DataOutputBuffer tempOut =
new DataOutputBuffer();
65 private final Map<Rule, Pair<EnumRuleType, RuleData>> queue =
new HashMap<>();
67 private static final int BATCH_SIZE = 1000;
69 public HFileRuleQuery(HFileRuleReader reader, BloomFilter bf,
70 BufferedWriter out, Collection<RuleString> query,
71 RuleRetriever retriever, CLI.ServerParams params) {
76 this.retriever = retriever;
77 this.s2tClient =
new TTableClient();
78 this.t2sClient =
new TTableClient();
79 if (retriever.fReg.hasLexicalFeatures()) {
80 s2tClient.setup(params, retriever.fReg.getNoOfProvs(),
true);
81 t2sClient.setup(params, retriever.fReg.getNoOfProvs(),
false);
85 private void drainQueue()
87 if (retriever.fReg.hasLexicalFeatures()) {
88 s2tClient.queryRules(queue);
89 t2sClient.queryRules(queue);
91 for (Entry<Rule, Pair<EnumRuleType, RuleData>> e : queue.entrySet()) {
92 Rule rule = e.getKey();
93 EnumRuleType type = e.getValue().getFirst();
94 RuleData rawFeatures = e.getValue().getSecond();
95 if (retriever.passThroughRules.contains(rule)) {
96 Rule asciiRule =
new Rule(rule);
97 synchronized (retriever.foundPassThroughRules) {
98 retriever.foundPassThroughRules.add(asciiRule);
100 retriever.writeRule(type, rule, retriever.fReg
101 .createFoundPassThroughRuleFeatures(rawFeatures
102 .getFeatures()), out);
104 Map<Integer, Double> processed = retriever.fReg
105 .processFeatures(rule, rawFeatures);
106 retriever.writeRule(type, rule, processed, out);
112 @SuppressWarnings(
"unchecked")
115 List<RuleString> sortedQuery =
new ArrayList<>(query);
117 StopWatch stopWatch =
new StopWatch();
118 System.out.println(
"Sorting query");
120 Collections.sort(sortedQuery,
new MergeComparator());
121 System.out.printf(
"Query sort took %d seconds\n",
122 stopWatch.getTime() / 1000);
126 for (RuleString source : sortedQuery) {
128 source.write(tempOut);
129 if (!bf.contains(tempOut.getData(), 0, tempOut.getLength(),
133 if (reader.seek(source)) {
134 if (retriever.testVocab.contains(source)) {
135 synchronized (retriever.foundTestVocab) {
136 retriever.foundTestVocab.add(source);
139 List<Pair<Rule, RuleData>> rules =
new ArrayList<>();
140 for (Pair<Rule, RuleData> entry : reader
141 .getRulesForSource()) {
142 rules.add(Pair.createPair(
new Rule(entry.getFirst()),
143 new RuleData(entry.getSecond())));
145 SidePattern pattern = source.toPattern();
146 Map<Rule, RuleData> filtered = retriever.filter.filter(
148 EnumRuleType type = pattern.isPhrase() ? EnumRuleType.V
150 Set<Integer> sentenceIds = retriever.sourceToSentenceId.get(source);
151 for (Entry<Rule, RuleData> e : filtered.entrySet()) {
152 queue.put(e.getKey(), Pair.createPair(type, e.getValue()));
153 List<Symbol> words = e.getKey().target().getTerminals();
154 for(
int id : sentenceIds){
155 synchronized(retriever.targetSideVocab){
156 retriever.targetSideVocab.get(
id).addAll(words);
160 if(queue.size() > BATCH_SIZE){
166 }
catch (IOException e) {
172 .printf(
"Query took %d seconds\n", stopWatch.getTime() / 1000);
void run(ucam::util::RegistryPO const &rg)