Hi There
I am noticing file handle leaks appearing on Index files. I think the
leaks occur during the Lucene merge operation.
Lsof reports the following:
java 28604 root 213r REG 8,33 1098681
57409621 /var/index/vol201009/_a4w.cfs (deleted)
java 28604 root 214r REG 8,33 35164
57409699 /var/index/vol201009/_a4x.cfs (deleted)
java 28604 root 215r REG 8,33 46139
57409691 /var/index/vol201009/_a4y.cfs (deleted)
java 28604 root 216r REG 8,33 40342
57409673 /var/index/vol201009/_a4z.cfs (deleted)
java 28604 root 217r REG 8,33 44204
57409675 /var/index/vol201009/_a50.cfs (deleted)
We are using Lucene's realtime search feature so have handles open on
the index. Could it be that we are not
handling the merge situation correctly or something? Your ideas are most
appreciated.
The source code to our index file as follows:
package com.stimulus.archiva.index;
import com.stimulus.util.*;
import java.io.File;
import java.io.IOException;
import java.io.PrintStream;
import org.apache.commons.logging.*;
import org.apache.lucene.document.Document;
import org.apache.lucene.index.*;
import org.apache.lucene.store.FSDirectory;
import com.stimulus.archiva.domain.Config;
import com.stimulus.archiva.exception.*;
import com.stimulus.archiva.language.AnalyzerFactory;
import com.stimulus.archiva.search.*;
import java.util.*;
import org.apache.lucene.store.LockObtainFailedException;
import org.apache.lucene.store.AlreadyClosedException;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.*;
public class LuceneIndex extends Thread {
protected ArrayBlockingQueue<LuceneDocument> queue;
protected static final Log logger =
LogFactory.getLog(LuceneIndex.class.getName());
protected static final Log indexLog =
LogFactory.getLog("indexlog");
IndexWriter writer = null;
protected static ScheduledExecutorService scheduler;
protected static ScheduledFuture<?> scheduledTask;
protected LuceneDocument EXIT_REQ = null;
ReentrantLock indexLock = new ReentrantLock();
ArchivaAnalyzer analyzer = new ArchivaAnalyzer();
File indexLogFile;
PrintStream indexLogOut;
IndexProcessor indexProcessor;
String friendlyName;
String indexPath;
int maxSimultaneousDocs;
int indexThreads;
IndexReader reader = null;
volatile boolean reopen = false;
FSDirectory fsDirectory;
ReentrantLock readerLock = new ReentrantLock();
enum Status { READY, SHUTDOWN };
Status status = Status.SHUTDOWN;
public LuceneIndex(int queueSize, LuceneDocument exitReq,
String friendlyName, String indexPath, int maxSimultaneousDocs, int
indexThreads) {
this.queue = new
ArrayBlockingQueue<LuceneDocument>(queueSize);
this.EXIT_REQ = exitReq;
this.friendlyName = friendlyName;
this.indexPath = indexPath;
this.maxSimultaneousDocs = maxSimultaneousDocs;
this.indexThreads = indexThreads;
this.status = Status.SHUTDOWN;
// if (indexLog.isDebugEnabled()) {
//setLog(friendlyName);
//}
}
public int getMaxSimultaneousDocs() {
return maxSimultaneousDocs;
}
public void setMaxSimultaneousDocs(int maxSimultaneousDocs) {
this.maxSimultaneousDocs = maxSimultaneousDocs;
}
public ReentrantLock getIndexLock() {
return indexLock;
}
protected void setLog(String logName) {
try {
indexLogFile = getIndexLogFile(logName);
if (indexLogFile!=null) {
if (indexLogFile.length()>10485760)
indexLogFile.delete();
indexLogOut = new PrintStream(indexLogFile);
}
logger.debug("set index log file path
{path='"+indexLogFile.getCanonicalPath()+"'}");
} catch (Exception e) {
logger.error("failed to open index log
file:"+e.getMessage(),e);
}
}
protected File getIndexLogFile(String logName) {
try {
String logfilepath =
Config.getFileSystem().getLogPath()+File.separator+"indexdebug_"+logName+".log";
return new File(logfilepath);
} catch (Exception e) {
logger.error("failed to open index log
file:"+e.getMessage(),e);
return null;
}
}
protected void openIndex() throws MessageSearchException {
Exception lastError = null;
if (writer==null) {
logger.debug("openIndex() index "+friendlyName+" will
be opened. it is currently closed.");
} else {
logger.debug("openIndex() did not bother opening index
"+friendlyName+". it is already open.");
return;
}
logger.debug("opening index "+friendlyName+" for write");
logger.debug("opening search index "+friendlyName+" for
write {indexpath='"+indexPath+"'}");
boolean writelock;
int attempt = 0;
int maxattempt = 10;
if
(Config.getConfig().getIndex().getMultipleIndexProcesses()) {
maxattempt = 10000;
} else {
maxattempt = 10;
}
do {
writelock = false;
try {
fsDirectory = FSDirectory.open(new
File(indexPath));
int maxIndexChars =
Config.getConfig().getIndex().getMaxIndexPerFieldChars();
writer = new
IndexWriter(fsDirectory,analyzer,new
IndexWriter.MaxFieldLength(maxIndexChars));
if (indexLog.isDebugEnabled() &&
indexLogOut!=null) {
writer.setInfoStream(indexLogOut);
}
} catch (LockObtainFailedException lobfe) {
logger.debug("write lock on index
"+friendlyName+". will reopen in 50ms.");
try { Thread.sleep(50); } catch (Exception e) {}
attempt++;
writelock = true;
} catch (CorruptIndexException cie) {
throw new MessageSearchException("index
"+friendlyName+" appears to be corrupt. please reindex the active
volume."+cie.getMessage(),logger);
} catch (Throwable io) {
throw new MessageSearchException("failed to write
document to index "+friendlyName+":"+io.getMessage(),logger);
}
} while (writelock && attempt<maxattempt);
if (attempt>=10000)
throw new MessageSearchException("failed to open index
"+friendlyName+" writer {indexPath='"+indexPath+"'}",lastError,logger);
}
public void indexDocument(LuceneDocument luceneDocument) throws
MessageSearchException {
logger.debug("index document {"+luceneDocument+"}");
if (status==Status.SHUTDOWN) {
throw new MessageSearchException("index is
shutdown.",logger);
}
long s = (new Date()).getTime();
if (luceneDocument == null)
throw new MessageSearchException("assertion failure:
null document",logger);
try {
queue.put(luceneDocument);
} catch (InterruptedException ie) {
throw new MessageSearchException("failed to add
document to queue:"+ie.getMessage(),ie,logger);
}
logger.debug("document indexed successfully
{"+luceneDocument+"}");
logger.debug("indexing message end {"+luceneDocument+"}");
long e = (new Date()).getTime();
logger.debug("indexing time {time='"+(e-s)+"'}");
}
public class DocWriter implements Runnable {
LuceneDocument doc;
String language;
LinkedList<LuceneDocument> pushbacks;
ReentrantLock pushbackLock;
public DocWriter(LuceneDocument doc,String
language,LinkedList<LuceneDocument> pushbacks, ReentrantLock pushbackLock) {
this.doc = doc;
this.language = language;
this.pushbacks = pushbacks;
}
public void run() {
try {
writer.addDocument(doc.getDocument(),AnalyzerFactory.getAnalyzer(language,AnalyzerFactory.Operation.INDEX));
} catch (IOException io) {
logger.error("failed to add document to
index:"+io.getMessage(),io);
} catch (AlreadyClosedException e) {
try {
pushbackLock.lock();
pushbacks.add(doc);
} finally {
pushbackLock.unlock();
}
}
}
}
public class IndexProcessor extends Thread {
public IndexProcessor() {
setName("index processor");
}
public void run() {
boolean exit = false;
LuceneDocument luceneDocument = null;
LinkedList<LuceneDocument> pushbacks = new
LinkedList<LuceneDocument>();
ReentrantLock pushbackLock = new ReentrantLock();
while (!exit) {
//documentPool =
Executors.newFixedThreadPool(Config.getConfig().getArchiver().getArchiveThreads());
luceneDocument = null;
try {
luceneDocument = (LuceneDocument) queue.take();
} catch (InterruptedException e) {
logger.debug("index exit req received. exiting");
exit = true;
continue;
}
if (luceneDocument==EXIT_REQ) {
logger.debug("index exit req received. exiting");
exit = true;
continue;
}
try {
indexLock.lock();
if (luceneDocument==null) {
logger.debug("index info is null");
}
int i = 0;
ExecutorService threadPool =
Executors.newFixedThreadPool(indexThreads,ThreadUtil.getFlexibleThreadFactory("indexwritepool",Thread.NORM_PRIORITY,true));
while(luceneDocument!=null &&
i<maxSimultaneousDocs) {
Document doc =
luceneDocument.getDocument();
String language = doc.get("lang");
if (language==null) {
language =
Config.getConfig().getIndex().getIndexLanguage();
}
DocWriter docWriter = new
DocWriter(luceneDocument,language,pushbacks,pushbackLock);
threadPool.submit(docWriter);
i++;
if (i<maxSimultaneousDocs) {
luceneDocument = (LuceneDocument)
queue.poll();
if (luceneDocument==null) {
logger.debug("index info is
null");
}
if (luceneDocument==EXIT_REQ) {
logger.debug("index exit
req received. exiting (2)");
exit = true;
break;
}
}
}
threadPool.shutdown();
threadPool.awaitTermination(30,TimeUnit.MINUTES);
try {
pushbackLock.lock();
if (pushbacks.size()>0) {
for (LuceneDocument pushback :
pushbacks) {
try {
writer.addDocument(pushback.getDocument());
} catch (IOException io) {
logger.error("failed to add
document to index:"+io.getMessage(),io);
} catch (AlreadyClosedException
e) {
pushbacks.add(pushback);
}
i++;
}
}
} finally {
pushbackLock.unlock();
}
logger.debug("index commit");
try {
if (writer!=null) {
writer.commit();
}
} catch (Exception e) {
logger.error("failed to commit
index:"+e.getMessage(),e);
try {
readerLock.lock();
closeIndex();
openIndex();
} finally {
readerLock.unlock();
}
}
} catch (Throwable ie) {
logger.error("index write
interrupted:"+ie.getMessage(),ie);
} finally {
indexLock.unlock();
}
}
logger.debug("exit indexer");
}
public class IndexDocument extends Thread {
LuceneDocument luceneDocument = null;
List<LuceneDocument> pushbacks = null;
public IndexDocument(LuceneDocument
luceneDocument,List<LuceneDocument> pushbacks) {
this.luceneDocument = luceneDocument;
this.pushbacks = pushbacks;
setName("index document");
}
public void run() {
try {
writer.addDocument(luceneDocument.getDocument());
} catch (IOException io) {
logger.error("failed to add document to
index:"+io.getMessage(),io);
} catch (AlreadyClosedException e) {
pushbacks.add(luceneDocument);
} catch (Throwable t) {
logger.error("failed to add document to
index:"+t.getMessage(),t);
}
}};
}
protected void closeIndex() {
try {
indexLock.lock();
if (writer!=null) {
writer.close();
}
if (fsDirectory!=null) {
fsDirectory.close();
}
} catch (Throwable io) {
logger.error("failed to close index
writer:"+io.getMessage(),io);
} finally {
writer = null;
indexLock.unlock();
}
}
public void optimize() throws MessageSearchException {
logger.debug("optimize volume");
try {
indexLock.lock();
try {
writer.optimize(false);
} catch (Exception io) {
throw new MessageSearchException("failed to
optimize the index:"+io.getMessage(),io,logger);
}
} catch (Throwable t) { // diskspace problems could arise
logger.error("failed to optimize
index:"+t.getMessage(),t);
} finally {
indexLock.unlock();
}
}
public void deleteDocs(Term[] terms) throws
MessageSearchException {
logger.debug("delete docs");
if (status==Status.SHUTDOWN) {
throw new MessageSearchException("index is
shutdown.",logger);
}
try {
indexLock.lock();
openIndex();
try {
writer.deleteDocuments(terms);
} catch (Exception e) {
throw new MessageSearchException("failed to
delete doc from index:"+e.getMessage(),e,logger);
} finally {
try {
writer.commit();
writer.expungeDeletes(false);
} catch (Exception io) {
throw new MessageSearchException("failed to
expunge docs from index:"+io.getMessage(),io,logger);
}
}
} catch (Throwable t) {
logger.error("failed to delete docs from
index."+t.getMessage(),t);
} finally {
indexLock.unlock();
}
}
public void deleteIndex() throws MessageSearchException {
logger.debug("delete index
{indexpath='"+indexPath+"'}");
try {
indexLock.lock();
closeIndex();
File indexFile = new File(indexPath);
//deleteDirContents(indexFile);
try {
int maxIndexChars =
Config.getConfig().getIndex().getMaxIndexPerFieldChars();
writer = new
IndexWriter(FSDirectory.open(indexFile),analyzer,true,new
IndexWriter.MaxFieldLength(maxIndexChars));
} catch (Throwable cie) {
logger.error("failed to delete index
{index='"+indexPath+"'}",cie);
return;
} finally {
try { writer.close(); } catch (Exception e) {
logger.debug("failed to close writer:"+e.getMessage()); }
writer = null;
}
} finally {
openIndex();
indexLock.unlock();
}
}
public void startup() throws MessageSearchException {
logger.debug("luceneindex is starting up");
File lockFile = new File(indexPath+File.separatorChar +
"write.lock");
if (lockFile.exists()) {
if
(Config.getConfig().getIndex().getMultipleIndexProcesses()) {
logger.debug("index lock file detected on
volumeindex startup.");
} else {
logger.warn("index lock file detected. the server
was shutdown incorrectly. automatically deleting lock file.
{lockFile='"+lockFile.getPath()+"'}");
lockFile.delete();
}
}
openIndex();
scheduler =
Executors.newScheduledThreadPool(1,ThreadUtil.getFlexibleThreadFactory("index
reopen",Thread.NORM_PRIORITY-1,true));
scheduledTask = scheduler.scheduleWithFixedDelay(new
Runnable() { public void run() { reopen=true; }},1,1,TimeUnit.SECONDS);
indexProcessor = new IndexProcessor();
indexProcessor.start();
Runtime.getRuntime().addShutdownHook(this);
status = Status.READY;
}
public IndexReader getReader() throws MessageSearchException {
if (status==Status.SHUTDOWN) {
throw new MessageSearchException("index is
shutdown.",logger);
}
readerLock.lock();
try {
if (writer==null) {
throw new MessageSearchException("cannot retrieve
reader. writer is closed (or null)",logger);
}
if (reader == null) {
reader = new VolumeIndexReader(writer.getReader(5));
} else {
try {
if (reopen) {
reader = new
VolumeIndexReader(writer.getReader(5));
reopen = false;
}
} catch (AlreadyClosedException ace) {
logger.debug("reader was found closed.
reopening");
reader = new
VolumeIndexReader(writer.getReader(5));
}
}
} catch (IOException io) {
throw new MessageSearchException("failed to retrieve
reader from writer:"+io.getMessage(),io,logger);
} finally {
readerLock.unlock();
}
return reader;
}
public void shutdown() {
status = Status.SHUTDOWN;
try { queue.put(EXIT_REQ); } catch (InterruptedException
e) {}
if (reader!=null) {
try {
reader.close();
} catch (Exception e) {
logger.error("failed to close index
reader:"+e.getMessage());
}
}
reader = null;
if (scheduler!=null) {
scheduler.shutdown();
}
closeIndex();
indexProcessor.interrupt();
if (scheduler!=null) {
scheduler.shutdownNow();
}
}
@Override
public void run() {
shutdown();
}
public interface LuceneDocument {
public String toString();
public Document getDocument();
public void finalize();
}
public static void deleteDirContents(File path) {
if( path.exists() ) {
File[] files = path.listFiles();
for(int i=0; i<files.length; i++) {
if(files[i].isFile()) {
files[i].delete();
}
}
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]