欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

java多线程处理执行solr创建索引示例

程序员文章站 2024-02-22 17:02:46
复制代码 代码如下:public class solrindexer implements indexer, searcher, disposablebean { ...

复制代码 代码如下:

public class solrindexer implements indexer, searcher, disposablebean {
 //~ static fields/initializers =============================================

 static final logger logger = loggerfactory.getlogger(solrindexer.class);

 private static final long shutdown_timeout    = 5 * 60 * 1000l; // long enough

 private static final int  input_queue_length  = 16384;

 //~ instance fields ========================================================

 private commonshttpsolrserver server;

 private blockingqueue<operation> inputqueue;

 private thread updatethread;
 volatile boolean running = true;
 volatile boolean shuttingdown = false;

 //~ constructors ===========================================================

 public solrindexer(string url) throws malformedurlexception {
  server = new commonshttpsolrserver(url);

  inputqueue = new arrayblockingqueue<operation>(input_queue_length);

  updatethread = new thread(new updatetask());
  updatethread.setname("solrindexer");
  updatethread.start();
 }

 //~ methods ================================================================

 public void setsotimeout(int timeout) {
  server.setsotimeout(timeout);
 }

 public void setconnectiontimeout(int timeout) {
  server.setconnectiontimeout(timeout);
 }

 public void setallowcompression(boolean allowcompression) {
  server.setallowcompression(allowcompression);
 }


 public void addindex(indexable indexable) throws indexingexception {
  if (shuttingdown) {
   throw new illegalstateexception("solrindexer is shutting down");
  }
  inputqueue.offer(new operation(indexable, operationtype.update));
 }
 

 public void delindex(indexable indexable) throws indexingexception {
  if (shuttingdown) {
   throw new illegalstateexception("solrindexer is shutting down");
  }
  inputqueue.offer(new operation(indexable, operationtype.delete));
 }

 
 private void updateindices(string type, list<indexable> indices) throws indexingexception {
  if (indices == null || indices.size() == 0) {
   return;
  }

  logger.debug("updating {} indices", indices.size());

  updaterequest req = new updaterequest("/" + type + "/update");
  req.setaction(updaterequest.action.commit, false, false);

  for (indexable idx : indices) {
   doc doc = idx.getdoc();

   solrinputdocument solrdoc = new solrinputdocument();
   solrdoc.setdocumentboost(doc.getdocumentboost());
   for (iterator<field> i = doc.iterator(); i.hasnext();) {
    field field = i.next();
    solrdoc.addfield(field.getname(), field.getvalue(), field.getboost());
   }

   req.add(solrdoc);   
  }

  try {
   req.process(server);   
  } catch (solrserverexception e) {
   logger.error("solrserverexception occurred", e);
   throw new indexingexception(e);
  } catch (ioexception e) {
   logger.error("ioexception occurred", e);
   throw new indexingexception(e);
  }
 }

 
 private void delindices(string type, list<indexable> indices) throws indexingexception {
  if (indices == null || indices.size() == 0) {
   return;
  }

  logger.debug("deleting {} indices", indices.size());

  updaterequest req = new updaterequest("/" + type + "/update");
  req.setaction(updaterequest.action.commit, false, false);
  for (indexable indexable : indices) {   
   req.deletebyid(indexable.getdocid());
  }

  try {
   req.process(server);
  } catch (solrserverexception e) {
   logger.error("solrserverexception occurred", e);
   throw new indexingexception(e);
  } catch (ioexception e) {
   logger.error("ioexception occurred", e);
   throw new indexingexception(e);
  }
 }

 
 public queryresult search(query query) throws indexingexception {
  solrquery sq = new solrquery();
  sq.setquery(query.getquery());
  if (query.getfilter() != null) {
   sq.addfilterquery(query.getfilter());
  }
  if (query.getorderfield() != null) {
   sq.addsortfield(query.getorderfield(), query.getorder() == query.order.desc ? solrquery.order.desc : solrquery.order.asc);
  }
  sq.setstart(query.getoffset());
  sq.setrows(query.getlimit());

  queryrequest req = new queryrequest(sq);
  req.setpath("/" + query.gettype() + "/select");

  try {
   queryresponse rsp = req.process(server);
   solrdocumentlist docs = rsp.getresults();

   queryresult result = new queryresult();
   result.setoffset(docs.getstart());
   result.settotal(docs.getnumfound());
   result.setsize(sq.getrows());

   list<doc> resultdocs = new arraylist<doc>(result.getsize());
   for (iterator<solrdocument> i = docs.iterator(); i.hasnext();) {
    solrdocument solrdocument = i.next();

    doc doc = new doc();
    for (iterator<map.entry<string, object>> iter = solrdocument.iterator(); iter.hasnext();) {
     map.entry<string, object> field = iter.next();
     doc.addfield(field.getkey(), field.getvalue());
    }

    resultdocs.add(doc);
   }

   result.setdocs(resultdocs);
   return result;

  } catch (solrserverexception e) {
   logger.error("solrserverexception occurred", e);
   throw new indexingexception(e);
  }
 }
 

 public void destroy() throws exception {
  shutdown(shutdown_timeout, timeunit.milliseconds);  
 }

 public boolean shutdown(long timeout, timeunit unit) {
  if (shuttingdown) {
   logger.info("suppressing duplicate attempt to shut down");
   return false;
  }
  shuttingdown = true;
  string basename = updatethread.getname();
  updatethread.setname(basename + " - shutting down");
  boolean rv = false;
  try {
   // conditionally wait
   if (timeout > 0) {
    updatethread.setname(basename + " - shutting down (waiting)");
    rv = waitforqueue(timeout, unit);
   }
  } finally {
   // but always begin the shutdown sequence
   running = false;
   updatethread.setname(basename + " - shutting down (informed client)");
  }
  return rv;
 }

 /**
  * @param timeout
  * @param unit
  * @return
  */
 private boolean waitforqueue(long timeout, timeunit unit) {
  countdownlatch latch = new countdownlatch(1);
  inputqueue.add(new stopoperation(latch));
  try {
   return latch.await(timeout, unit);
  } catch (interruptedexception e) {
   throw new runtimeexception("interrupted waiting for queues", e);
  }
 }

 

 class updatetask implements runnable {
  public void run() {
   while (running) {
    try {
     syncindices();
    } catch (throwable e) {
     if (shuttingdown) {
      logger.warn("exception occurred during shutdown", e);
     } else {
      logger.error("problem handling solr indexing updating", e);
     }
    }
   }
   logger.info("shut down solrindexer");
  }
 }

 void syncindices() throws interruptedexception {
  operation op = inputqueue.poll(1000l, timeunit.milliseconds);

  if (op == null) {
   return;
  }

  if (op instanceof stopoperation) {
   ((stopoperation) op).stop();
   return;
  }

  // wait 1 second
  try {
   thread.sleep(1000);
  } catch (interruptedexception e) {

  }

  list<operation> ops = new arraylist<operation>(inputqueue.size() + 1);
  ops.add(op);
  inputqueue.drainto(ops);

  map<string, list<indexable>> deletemap = new hashmap<string, list<indexable>>(4);
  map<string, list<indexable>> updatemap = new hashmap<string, list<indexable>>(4);

  for (operation o : ops) {
   if (o instanceof stopoperation) {
    ((stopoperation) o).stop();
   } else {
    indexable indexable = o.indexable;
    if (o.type == operationtype.delete) {
     list<indexable> docs = deletemap.get(indexable.gettype());
     if (docs == null) {
      docs = new linkedlist<indexable>();
      deletemap.put(indexable.gettype(), docs);
     }
     docs.add(indexable);
    } else {
     list<indexable> docs = updatemap.get(indexable.gettype());
     if (docs == null) {
      docs = new linkedlist<indexable>();
      updatemap.put(indexable.gettype(), docs);
     }
     docs.add(indexable);
    }
   }
  }

  for (iterator<map.entry<string, list<indexable>>> i = deletemap.entryset().iterator(); i.hasnext();) {
   map.entry<string, list<indexable>> entry = i.next();
   delindices(entry.getkey(), entry.getvalue());
  }

  for (iterator<map.entry<string, list<indexable>>> i = updatemap.entryset().iterator(); i.hasnext();) {
   map.entry<string, list<indexable>> entry = i.next();
   updateindices(entry.getkey(), entry.getvalue());
  }
 }

 enum operationtype { delete, update, shutdown }

 static class operation {
  operationtype type;
  indexable indexable;

  operation() {}

  operation(indexable indexable, operationtype type) {
   this.indexable = indexable;
   this.type = type;
  }
 }

 static class stopoperation extends operation {
  countdownlatch latch;

  stopoperation(countdownlatch latch) {
   this.latch = latch;
   this.type = operationtype.shutdown;
  }

  public void stop() {
   latch.countdown();
  }
 }

 //~ accessors ===============

}