java多线程处理执行solr创建索引示例
public class solrindexer implements indexer, searcher, disposablebean {
//~ static fields/initializers =============================================
static final logger logger = loggerfactory.getlogger(solrindexer.class);
private static final long shutdown_timeout = 5 * 60 * 1000l; // long enough
private static final int input_queue_length = 16384;
//~ instance fields ========================================================
private commonshttpsolrserver server;
private blockingqueue<operation> inputqueue;
private thread updatethread;
volatile boolean running = true;
volatile boolean shuttingdown = false;
//~ constructors ===========================================================
public solrindexer(string url) throws malformedurlexception {
server = new commonshttpsolrserver(url);
inputqueue = new arrayblockingqueue<operation>(input_queue_length);
updatethread = new thread(new updatetask());
updatethread.setname("solrindexer");
updatethread.start();
}
//~ methods ================================================================
public void setsotimeout(int timeout) {
server.setsotimeout(timeout);
}
public void setconnectiontimeout(int timeout) {
server.setconnectiontimeout(timeout);
}
public void setallowcompression(boolean allowcompression) {
server.setallowcompression(allowcompression);
}
public void addindex(indexable indexable) throws indexingexception {
if (shuttingdown) {
throw new illegalstateexception("solrindexer is shutting down");
}
inputqueue.offer(new operation(indexable, operationtype.update));
}
public void delindex(indexable indexable) throws indexingexception {
if (shuttingdown) {
throw new illegalstateexception("solrindexer is shutting down");
}
inputqueue.offer(new operation(indexable, operationtype.delete));
}
private void updateindices(string type, list<indexable> indices) throws indexingexception {
if (indices == null || indices.size() == 0) {
return;
}
logger.debug("updating {} indices", indices.size());
updaterequest req = new updaterequest("/" + type + "/update");
req.setaction(updaterequest.action.commit, false, false);
for (indexable idx : indices) {
doc doc = idx.getdoc();
solrinputdocument solrdoc = new solrinputdocument();
solrdoc.setdocumentboost(doc.getdocumentboost());
for (iterator<field> i = doc.iterator(); i.hasnext();) {
field field = i.next();
solrdoc.addfield(field.getname(), field.getvalue(), field.getboost());
}
req.add(solrdoc);
}
try {
req.process(server);
} catch (solrserverexception e) {
logger.error("solrserverexception occurred", e);
throw new indexingexception(e);
} catch (ioexception e) {
logger.error("ioexception occurred", e);
throw new indexingexception(e);
}
}
private void delindices(string type, list<indexable> indices) throws indexingexception {
if (indices == null || indices.size() == 0) {
return;
}
logger.debug("deleting {} indices", indices.size());
updaterequest req = new updaterequest("/" + type + "/update");
req.setaction(updaterequest.action.commit, false, false);
for (indexable indexable : indices) {
req.deletebyid(indexable.getdocid());
}
try {
req.process(server);
} catch (solrserverexception e) {
logger.error("solrserverexception occurred", e);
throw new indexingexception(e);
} catch (ioexception e) {
logger.error("ioexception occurred", e);
throw new indexingexception(e);
}
}
public queryresult search(query query) throws indexingexception {
solrquery sq = new solrquery();
sq.setquery(query.getquery());
if (query.getfilter() != null) {
sq.addfilterquery(query.getfilter());
}
if (query.getorderfield() != null) {
sq.addsortfield(query.getorderfield(), query.getorder() == query.order.desc ? solrquery.order.desc : solrquery.order.asc);
}
sq.setstart(query.getoffset());
sq.setrows(query.getlimit());
queryrequest req = new queryrequest(sq);
req.setpath("/" + query.gettype() + "/select");
try {
queryresponse rsp = req.process(server);
solrdocumentlist docs = rsp.getresults();
queryresult result = new queryresult();
result.setoffset(docs.getstart());
result.settotal(docs.getnumfound());
result.setsize(sq.getrows());
list<doc> resultdocs = new arraylist<doc>(result.getsize());
for (iterator<solrdocument> i = docs.iterator(); i.hasnext();) {
solrdocument solrdocument = i.next();
doc doc = new doc();
for (iterator<map.entry<string, object>> iter = solrdocument.iterator(); iter.hasnext();) {
map.entry<string, object> field = iter.next();
doc.addfield(field.getkey(), field.getvalue());
}
resultdocs.add(doc);
}
result.setdocs(resultdocs);
return result;
} catch (solrserverexception e) {
logger.error("solrserverexception occurred", e);
throw new indexingexception(e);
}
}
public void destroy() throws exception {
shutdown(shutdown_timeout, timeunit.milliseconds);
}
public boolean shutdown(long timeout, timeunit unit) {
if (shuttingdown) {
logger.info("suppressing duplicate attempt to shut down");
return false;
}
shuttingdown = true;
string basename = updatethread.getname();
updatethread.setname(basename + " - shutting down");
boolean rv = false;
try {
// conditionally wait
if (timeout > 0) {
updatethread.setname(basename + " - shutting down (waiting)");
rv = waitforqueue(timeout, unit);
}
} finally {
// but always begin the shutdown sequence
running = false;
updatethread.setname(basename + " - shutting down (informed client)");
}
return rv;
}
/**
* @param timeout
* @param unit
* @return
*/
private boolean waitforqueue(long timeout, timeunit unit) {
countdownlatch latch = new countdownlatch(1);
inputqueue.add(new stopoperation(latch));
try {
return latch.await(timeout, unit);
} catch (interruptedexception e) {
throw new runtimeexception("interrupted waiting for queues", e);
}
}
class updatetask implements runnable {
public void run() {
while (running) {
try {
syncindices();
} catch (throwable e) {
if (shuttingdown) {
logger.warn("exception occurred during shutdown", e);
} else {
logger.error("problem handling solr indexing updating", e);
}
}
}
logger.info("shut down solrindexer");
}
}
void syncindices() throws interruptedexception {
operation op = inputqueue.poll(1000l, timeunit.milliseconds);
if (op == null) {
return;
}
if (op instanceof stopoperation) {
((stopoperation) op).stop();
return;
}
// wait 1 second
try {
thread.sleep(1000);
} catch (interruptedexception e) {
}
list<operation> ops = new arraylist<operation>(inputqueue.size() + 1);
ops.add(op);
inputqueue.drainto(ops);
map<string, list<indexable>> deletemap = new hashmap<string, list<indexable>>(4);
map<string, list<indexable>> updatemap = new hashmap<string, list<indexable>>(4);
for (operation o : ops) {
if (o instanceof stopoperation) {
((stopoperation) o).stop();
} else {
indexable indexable = o.indexable;
if (o.type == operationtype.delete) {
list<indexable> docs = deletemap.get(indexable.gettype());
if (docs == null) {
docs = new linkedlist<indexable>();
deletemap.put(indexable.gettype(), docs);
}
docs.add(indexable);
} else {
list<indexable> docs = updatemap.get(indexable.gettype());
if (docs == null) {
docs = new linkedlist<indexable>();
updatemap.put(indexable.gettype(), docs);
}
docs.add(indexable);
}
}
}
for (iterator<map.entry<string, list<indexable>>> i = deletemap.entryset().iterator(); i.hasnext();) {
map.entry<string, list<indexable>> entry = i.next();
delindices(entry.getkey(), entry.getvalue());
}
for (iterator<map.entry<string, list<indexable>>> i = updatemap.entryset().iterator(); i.hasnext();) {
map.entry<string, list<indexable>> entry = i.next();
updateindices(entry.getkey(), entry.getvalue());
}
}
enum operationtype { delete, update, shutdown }
static class operation {
operationtype type;
indexable indexable;
operation() {}
operation(indexable indexable, operationtype type) {
this.indexable = indexable;
this.type = type;
}
}
static class stopoperation extends operation {
countdownlatch latch;
stopoperation(countdownlatch latch) {
this.latch = latch;
this.type = operationtype.shutdown;
}
public void stop() {
latch.countdown();
}
}
//~ accessors ===============
}