private long prepareCommitInternal() throws IOException { startCommitTime = System.nanoTime(); synchronized(commitLock) { ensureOpen(false); if (infoStream.isEnabled("IW")) { infoStream.message("IW", "prepareCommit: flush"); infoStream.message("IW", " index before flush " + segString()); }
if (tragedy.get() != null) { throw new IllegalStateException("this writer hit an unrecoverable error; cannot commit", tragedy.get()); }
if (pendingCommit != null) { throw new IllegalStateException("prepareCommit was already called with no corresponding call to commit"); }
doBeforeFlush(); testPoint("startDoFlush"); SegmentInfos toCommit = null; boolean anyChanges = false; long seqNo; MergePolicy.MergeSpecification pointInTimeMerges = null; AtomicBoolean stopAddingMergedSegments = new AtomicBoolean(false); final long maxCommitMergeWaitMillis = config.getMaxFullFlushMergeWaitMillis(); // This is copied from doFlush, except it's modified to // clone & incRef the flushed SegmentInfos inside the // sync block:
try {
synchronized (fullFlushLock) { boolean flushSuccess = false; boolean success = false; try { //写文件,执行所有的flush seqNo = docWriter.flushAllThreads(); if (seqNo < 0) { anyChanges = true; seqNo = -seqNo; } if (anyChanges == false) { // prevent double increment since docWriter#doFlush increments the flushcount // if we flushed anything. flushCount.incrementAndGet(); } //写索引数据 publishFlushedSegments(true); // cannot pass triggerMerges=true here else it can lead to deadlock: processEvents(false); flushSuccess = true;
applyAllDeletesAndUpdates(); synchronized(this) { writeReaderPool(true); if (changeCount.get() != lastCommitChangeCount) { // There are changes to commit, so we will write a new segments_N in startCommit. // The act of committing is itself an NRT-visible change (an NRT reader that was // just opened before this should see it on reopen) so we increment changeCount // and segments version so a future NRT reopen will see the change: changeCount.incrementAndGet(); segmentInfos.changed(); }
if (commitUserData != null) { Map<String,String> userData = new HashMap<>(); for(Map.Entry<String,String> ent : commitUserData) { userData.put(ent.getKey(), ent.getValue()); } segmentInfos.setUserData(userData, false); }
// Must clone the segmentInfos while we still // hold fullFlushLock and while sync'd so that // no partial changes (eg a delete w/o // corresponding add from an updateDocument) can // sneak into the commit point: toCommit = segmentInfos.clone(); pendingCommitChangeCount = changeCount.get(); // This protects the segmentInfos we are now going // to commit. This is important in case, eg, while // we are trying to sync all referenced files, a // merge completes which would otherwise have // removed the files we are now syncing. deleter.incRef(toCommit.files(false)); if (anyChanges && maxCommitMergeWaitMillis > 0) { // we can safely call preparePointInTimeMerge since writeReaderPool(true) above wrote all // necessary files to disk and checkpointed them. pointInTimeMerges = preparePointInTimeMerge(toCommit, stopAddingMergedSegments::get, MergeTrigger.COMMIT, sci->{}); } } success = true; } finally { if (!success) { if (infoStream.isEnabled("IW")) { infoStream.message("IW", "hit exception during prepareCommit"); } } assert Thread.holdsLock(fullFlushLock); // Done: finish the full flush! docWriter.finishFullFlush(flushSuccess); doAfterFlush(); } } } catch (VirtualMachineError tragedy) { tragicEvent(tragedy, "prepareCommit"); throw tragedy; } finally { maybeCloseOnTragicEvent(); }
// 进行段合并 是在条件下执行 if (pointInTimeMerges != null) { if (infoStream.isEnabled("IW")) { infoStream.message("IW", "now run merges during commit: " + pointInTimeMerges.segString(directory)); } mergeScheduler.merge(mergeSource, MergeTrigger.COMMIT); pointInTimeMerges.await(maxCommitMergeWaitMillis, TimeUnit.MILLISECONDS); if (infoStream.isEnabled("IW")) { infoStream.message("IW", "done waiting for merges during commit"); } synchronized (this) { // we need to call this under lock since mergeFinished above is also called under the IW lock stopAddingMergedSegments.set(true); } }
// do this after handling any pointInTimeMerges since the files will have changed if any merges // did complete filesToCommit = toCommit.files(false); try { if (anyChanges) { maybeMerge.set(true); } // 写segments_N文件 startCommit(toCommit); if (pendingCommit == null) { return -1; } else { return seqNo; } } catch (Throwable t) { synchronized (this) { if (filesToCommit != null) { try { deleter.decRef(filesToCommit); } catch (Throwable t1) { t.addSuppressed(t1); } finally { filesToCommit = null; } } } throw t; }
/* * FlushAllThreads is synced by IW fullFlushLock. Flushing all threads is a * two stage operation; the caller must ensure (in try/finally) that finishFlush * is called after this method, to release the flush lock in DWFlushControl */ long flushAllThreads() throws IOException { final DocumentsWriterDeleteQueue flushingDeleteQueue; if (infoStream.isEnabled("DW")) { infoStream.message("DW", "startFullFlush"); }
long seqNo; synchronized (this) { pendingChangesInCurrentFullFlush = anyChanges(); flushingDeleteQueue = deleteQueue; /* Cutover to a new delete queue. This must be synced on the flush control * otherwise a new DWPT could sneak into the loop with an already flushing * delete queue */ seqNo = flushControl.markForFullFlush(); // swaps this.deleteQueue synced on FlushControl assert setFlushingDeleteQueue(flushingDeleteQueue); } assert currentFullFlushDelQueue != null; assert currentFullFlushDelQueue != deleteQueue; boolean anythingFlushed = false; try { DocumentsWriterPerThread flushingDWPT; // Help out with flushing: //遍历所有等待刷新的DocumentsWriterPerThread while ((flushingDWPT = flushControl.nextPendingFlush()) != null) { anythingFlushed |= doFlush(flushingDWPT); } // If a concurrent flush is still in flight wait for it flushControl.waitForFlush(); if (anythingFlushed == false && flushingDeleteQueue.anyChanges()) { // apply deletes if we did not flush any document if (infoStream.isEnabled("DW")) { infoStream.message("DW", Thread.currentThread().getName() + ": flush naked frozen global deletes"); } assert assertTicketQueueModification(flushingDeleteQueue); ticketQueue.addDeletes(flushingDeleteQueue); } // we can't assert that we don't have any tickets in teh queue since we might add a DocumentsWriterDeleteQueue // concurrently if we have very small ram buffers this happens quite frequently assert !flushingDeleteQueue.anyChanges(); } finally { assert flushingDeleteQueue == currentFullFlushDelQueue; flushingDeleteQueue.close(); // all DWPT have been processed and this queue has been fully flushed to the ticket-queue } if (anythingFlushed) { return -seqNo; } else { return seqNo; } }
/** Flush all pending docs to a new segment */ FlushedSegment flush(DocumentsWriter.FlushNotifications flushNotifications) throws IOException { assert flushPending.get() == Boolean.TRUE; assert numDocsInRAM > 0; assert deleteSlice.isEmpty() : "all deletes must be applied in prepareFlush"; segmentInfo.setMaxDoc(numDocsInRAM); final SegmentWriteState flushState = new SegmentWriteState(infoStream, directory, segmentInfo, fieldInfos.finish(), pendingUpdates, new IOContext(new FlushInfo(numDocsInRAM, lastCommittedBytesUsed))); final double startMBUsed = lastCommittedBytesUsed / 1024. / 1024.;
// Apply delete-by-docID now (delete-byDocID only // happens when an exception is hit processing that // doc, eg if analyzer has some problem w/ the text): if (numDeletedDocIds > 0) { flushState.liveDocs = new FixedBitSet(numDocsInRAM); flushState.liveDocs.set(0, numDocsInRAM); for (int i = 0; i < numDeletedDocIds; i++) { flushState.liveDocs.clear(deleteDocIDs[i]); } flushState.delCountOnFlush = numDeletedDocIds; deleteDocIDs = new int[0]; }
if (aborted) { if (infoStream.isEnabled("DWPT")) { infoStream.message("DWPT", "flush: skip because aborting is set"); } return null; }
long t0 = System.nanoTime();
if (infoStream.isEnabled("DWPT")) { infoStream.message("DWPT", "flush postings as segment " + flushState.segmentInfo.name + " numDocs=" + numDocsInRAM); } final Sorter.DocMap sortMap; try { DocIdSetIterator softDeletedDocs; if (indexWriterConfig.getSoftDeletesField() != null) { softDeletedDocs = consumer.getHasDocValues(indexWriterConfig.getSoftDeletesField()); } else { softDeletedDocs = null; } // 将文档数据和索引结构写入磁盘 sortMap = consumer.flush(flushState); if (softDeletedDocs == null) { flushState.softDelCountOnFlush = 0; } else { flushState.softDelCountOnFlush = PendingSoftDeletes.countSoftDeletes(softDeletedDocs, flushState.liveDocs); assert flushState.segmentInfo.maxDoc() >= flushState.softDelCountOnFlush + flushState.delCountOnFlush; } // We clear this here because we already resolved them (private to this segment) when writing postings: pendingUpdates.clearDeleteTerms(); segmentInfo.setFiles(new HashSet<>(directory.getCreatedFiles()));
final SegmentCommitInfo segmentInfoPerCommit = new SegmentCommitInfo(segmentInfo, 0, flushState.softDelCountOnFlush, -1L, -1L, -1L, StringHelper.randomId()); if (infoStream.isEnabled("DWPT")) { infoStream.message("DWPT", "new segment has " + (flushState.liveDocs == null ? 0 : flushState.delCountOnFlush) + " deleted docs"); infoStream.message("DWPT", "new segment has " + flushState.softDelCountOnFlush + " soft-deleted docs"); infoStream.message("DWPT", "new segment has " + (flushState.fieldInfos.hasVectors() ? "vectors" : "no vectors") + "; " + (flushState.fieldInfos.hasNorms() ? "norms" : "no norms") + "; " + (flushState.fieldInfos.hasDocValues() ? "docValues" : "no docValues") + "; " + (flushState.fieldInfos.hasProx() ? "prox" : "no prox") + "; " + (flushState.fieldInfos.hasFreq() ? "freqs" : "no freqs")); infoStream.message("DWPT", "flushedFiles=" + segmentInfoPerCommit.files()); infoStream.message("DWPT", "flushed codec=" + codec); }
// pack the DV type and hasNorms in one byte output.writeByte(docValuesByte(fi.getDocValuesType())); output.writeLong(fi.getDocValuesGen()); output.writeMapOfStrings(fi.attributes()); output.writeVInt(fi.getPointDimensionCount()); if (fi.getPointDimensionCount() != 0) { output.writeVInt(fi.getPointIndexDimensionCount()); output.writeVInt(fi.getPointNumBytes()); } } CodecUtil.writeFooter(output); } }
此处之后就有磁盘文件就加好了
写结构数据完成
而添加索引原始数据是通过add
1 2 3
public long addDocuments(Iterable<? extends Iterable<? extends IndexableField>> docs) throws IOException { return updateDocuments((DocumentsWriterDeleteQueue.Node<?>) null, docs); }