存储实现
使用的是Lucene做索引的ElasticSearch,和磁盘的交互就是通过Lucene的store模块来进行的,ElasticSearch的Store使用的是Lucene的Directory类对应。
Lucene的Directory
类声明了 对目标源的写入删除,读取,同步,重命名等功能。BaseDirectory
内部引入了一个LockFactory。这样我们几乎也明白了执行交互的时候使用加锁来控制并发访问。直接开上面类图中BaseDirectory的实现类
- FSDirectory
用于在文件系统存储索引的基类,根据操作系统的类型在Windows使用SimpleFSDirectory进行操作,在Linux系统使用NIOFSDirectory。在64位的JVM中开启了MMap分配的使用的是MMapDirectory。系统调用处理读取操作会将文件映射到同样大小的虚拟地址空间去,因为mmap没有加锁操作读取索引文件相当于读取缓存一样快,这就是因为调用mmap读取不需要把文件加载到操作系统缓存,相当于直接交互的IO;
- ByteBuffersDirectory
在堆内存中存放在ByteBuffer当中,基于内存形式的索引存储;
- RAMDirectory
最新版的Lucene当中已经标记为过期类。RAMDirectory是存放到内存当中的一个区域,FSDirectory是存放到文件系统中的磁盘里,虽然向其添加Document的过程与使用FSDDirectory一样,但是由于它是内存中的一块区域寄存器,因此,如果不将RAMDirectory中的内存写入磁盘,当虚拟机退出后,里面的内容也会随之消失。
在基于文件方式的存取在上面提供了多种的实现类
- SimpleFSDirectory
打开索引文件:使用Files的newByteChannel方法来打开一个索引文件,比如说通过DirectoryReader.open(IndexWriter)读取索引文件信息会调用此方法
读取索引文件:使用FileChannelImpl读取索引文件,使得可以随机访问索引文件的一块连续数据。
SimpleFSDirectory类不支持并发读取同一个索引文件,多线程读取时候会被处理为顺序访问,如果业务有这方面的需求,那么最好使用NIOFSDirectory或者MMapDirectory。同步代码如下
protected void readInternal(byte[] b, int offset, int len) throws IOException {
ByteBuffer bb;
if (b == this.buffer) {
assert this.byteBuf != null;
bb = this.byteBuf;
this.byteBuf.clear().position(offset);
} else {
bb = ByteBuffer.wrap(b, offset, len);
}
synchronized(this.channel) {
long pos = this.getFilePointer() + this.off;
if (pos + (long)len > this.end) {
throw new EOFException("read past EOF: " + this);
} else {
try {
this.channel.position(pos);
int readLength;
int i;
for(readLength = len; readLength > 0; readLength -= i) {
int toRead = Math.min(16384, readLength);
bb.limit(bb.position() + toRead);
assert bb.remaining() == toRead;
i = this.channel.read(bb);
if (i < 0) {
throw new EOFException("read past EOF: " + this + " off: " + offset + " len: " + len + " pos: " + pos + " chunkLen: " + toRead + " end: " + this.end);
}
assert i > 0 : "SeekableByteChannel.read with non zero-length bb.remaining() must always read at least one byte (Channel is in blocking mode, see spec of ReadableByteChannel)";
pos += (long)i;
}
assert readLength == 0;
} catch (IOException var12) {
throw new IOException(var12.getMessage() + ": " + this, var12);
}
}
}
}
如果有多个线程读取同一个索引文件,当执行线程被打断(Thread.interrupt()或Future.cancel())后,该索引文件的文件描述符(file descriptor)会被关闭,那么阻塞的线程随后读取该索引文件时会抛出ClosedChannelException的异常,不过可以使用RAFDirectory来代替SimpleFSDirectory,它使用了RandomAccessFile来读取索引文件,因为它是不可打断的(not interruptible)。RAFDirectory已经作为一个旧的API(legacy API)被丢到了misc模块中,它同样不支持并发读取索引文件,所以跟SimpleFSDirectory很类似.
- NIOFSDirectory
打开索引文件:使用Files的FileChannel.open方法来打开一个索引文件
读取索引文件:使用FileChannelImpl读取索引文件,使得可以随机访问索引文件的一块连续数据。
NIOFSDirectory类支持并发读取同一个索引文件,但是它存在跟SimpleFSDirectory一样的多线程下执行线程被打断的问题,如果业务中存在这个情况,那么可以使用RAFDirectory来代替NIOFSDirectory
另外如果Lucene是运行在Windows操作系统上,毕竟Windows对于Nio的支持不是很友好,有个bug。还是使用RAFDirectory吧
LockFactory在Lucene中用来对索引文件所在的目录进行加锁,使得同一时间总是只有一个IndexWriter对象可以更改索引文件,即保证单进程内(single in-process)多个不同IndexWriter对象互斥更改(多线程持有相同引用的IndexWriter对象视为一个IndexWriter不会受制于LockFactory,而是受制于对象锁(synchronized(this))、多进程内(multi-processes)多个对象互斥更改。 LockFactory是一个抽象类,提供了以下几种子类,即NoLockFactory
、SingleInstanceLockFactory
、SimpleFSLockFactory
、NativeFSLockFactory
、VerifyingLockFactory
- SimpleFSLockFactory
加锁就是在指定目录创建一个文件,解锁就是locks容器(HashSet对象)中移除键值为的元素。并且删除文件,该LockFactory的缺点在于如果JVM异常退出,那么索引文件锁可能无法被释放,即没有删除write.lock文件。
解决的方法只能是通过手动删除write.lock文件,注意是,手动删除前用户得自己保证(certain)目前没有IndexWriter正在写入,否则非常容易破坏(corrupt)索引文件,比如说由于删除了write.lock文件,使得多个IndexWriter对象同时更改了索引文件。
- NativeFSLockFactory
NativeFSLockFactory同SimpleFSLockFactory一样,只能用于FSDirectory,它是Directory默认使用的LockFactory的,同样的通过在索引文件所在目录生成一个锁文件,但是该类还使用了FileChannel来管理锁文件。进程内(in-process)的其他线程的不同IndexWriter对象占有,通过一个线程安全的同步Set容器(Collection.synchronizedSet())实现,:使用FileChannel来尝试获得进程间(inter-process)级别的文件锁FileLock,即判断write.lock文件是否被其他进程占用,如果占用则直接抛出异常。释放锁的过程分两步走:释放锁文件的FileLock。清空同步Set容器中的内容;
尽管NativeFSLockFactory是默认的FSDirectory的索引文件锁,但基于实际场景,有时候使用SimpleFSLockFactory能更好的工作(work perfectly)。
- NativeFSLockFactory基于 java.nio.*来获得FileLock,但在某些文件系统下可能会受限,比如说在NFS下可能无法获得FileLock(the lock can incorrectly be double acquired),此时使用SimpleFSLockFactory就不会有这个问题
- 当JVM异常退出时,残留的(leftover)write.lock文件无法删除,如果使用SimpleFSLockFactory需要手动的去删除该文件,否则尝试获得索引文件锁时就直接抛出异常,而使用NativeFSLockFactory时,不用关心当前write.lock文件是否被正确删除,因为它只关心write.lock是否被其他进程占用,而JVM异常退出后,会自动释放FileLock(操作系统会释放FileLock),所以不能通过判断write.lock文件在索引文件的目录中就认为索引文件被锁定了(locked),Lucene从不会因为异常去删除write.lock文件
准实时刷新
在理论条件下,新索引的数据,我们需要立即获取,ElasticSearch在这方面仿佛如同实时刷新一样,即便是在多节点的集群中也可以做到很好的反馈,ElasticSearch默认索引写入到可以被查询会有1秒的延时;
在第一篇基础知识中我们讲到了ElasticSearch的索引对应的是一个或者多个Lucene的索引,然而一个lucene的索引会存在一个或者多个索引段。由于索引段在被索引之后就是不可更改的,所以Lucene维护着一个可以被检索的索引段的几个segments_N
的集合,新增的Lucene索引段会追加在其中,这样来看索引过程和检索文档的过程是并行的,当文档被加入segments_N
集合,就认为该索引已经被提交。此时就可以来读取,就可以被Searcher实例来读取到。重新打开后的Searcher实例就可以读取到最新提交的索引。这个过程就是refresh。这个过程默认是一秒;
Searcher实例去读取数据按段检索的过程如下图如下所示:
- 新文档被收集到内存索引缓存,一个在内存缓存中包含新文档的 Lucene 索引
- 内存缓存进行提交:一个追加的倒排索引被写入磁盘 首先是把一个新的包含新段名字的提交点被写入磁盘。磁盘进行同步所有在文件系统缓存中等待的写入都刷新到磁盘
- 提交后,一个新的段被添加到提交点而且缓存被清空,新的段被开启,让它包含的文档可见以被搜索,内存缓存被清空,等待接收新的文档。
对于删除和更新来讲,我们在检索过程中依旧是可以吧删除的段查询到的,因为Lucene索引写入的不变形,除非是进过段合并之后。在获取回来的文档中,回去吧.del
文件中的变更删除的文档过滤掉。在 Elasticsearch 中,写入和打开一个新段的轻量的过程叫做 refresh 。 默认情况下每个分片会每秒自动刷新一次。这就是为什么我们说 Elasticsearch 是 近 实时搜索: 文档的变化并不是立即对搜索可见,但会在一秒之内变为可见。
这些行为可能会对新用户造成困惑: 他们索引了一个文档然后尝试搜索它,但却没有搜到。这个问题的解决办法是用 refresh API 执行一次手动刷新
段合并
由于新来的索引文档会导致Lucene创建一个新的段 ,这样会导致短段数量暴增。而段数目太多会带来较大的麻烦。 每一个段都会消耗文件句柄、内存和cpu运行周期。更重要的是,每个搜索请求都必须轮流检查每个段;所以段越多,搜索也就越慢。
Elasticsearch通过Lucene的段合并来解决这个问题。小的段被合并到大的段,然后这些大的段再被合并到更大的段。
当索引的时候,刷新(refresh)操作会创建新的段并将段打开以供搜索使用。
合并线程选择一小部分大小相似的段,并且在后台将它们合并到更大的段中。这并不会中断索引和搜索。
合并结束,老的段被删除” 说明合并完成时的活动:新的段被刷新(flush)到了磁盘。写入一个包含新段且排除旧的和较小的段的新提交点。新的段被打开用来搜索。老的段被删除。
transaction log
为了保证 Elasticsearch 的可靠性,需要确保数据变化被持久化到磁盘,Elasticsearch提供了transaction log
来记录数据的一致性。一次完整的提交会将段刷到磁盘,并写入一个包含所有segments_N
段列表的提交点。Elasticsearch 在启动或重新打开一个索引的过程中使用这个提交点来判断哪些段隶属于当前分片。即使通过每秒刷新(refresh)实现了近实时搜索,我们仍然需要经常进行完整提交来确保能从失败中恢复。translog 提供所有还没有被刷到磁盘的操作的一个持久化纪录。当 Elasticsearch 启动的时候, 它会从磁盘中使用最后一个提交点去恢复已知的段,并且会重放translog
中所有在最后一次提交后发生的变更操作。
transaction log 的工作流程,也是基于二提交
- 一个文档被索引之后,就会被添加到内存缓冲区,并且 追加到了 translog ,新的文档被添加到内存缓冲区并且被追加到了事务日志; 双写 缓冲区和事务日志,和数据库的binlog和redolog一样;
- 每一秒执行刷新API,刷新完成后, 缓存被清空但是事务日志不会清空,执行完刷新api之后这些在内存缓冲区的文档被写入到一个新的段中,且没有进行磁盘同步操作。这个段被打开,使其可被搜索,内存缓冲区被清空。
- 这个节点继续工作,更多的文档被添加到内存缓冲区和追加到事务日志。
- 分片每隔30分钟时间或者当translog变得越来越大—索引被刷新(flush);一个新的 translog 被创建。旧的translog会被清空。索引被刷新(flush)代表着 清空当前事务日志,内存数据刷到新的段,把段内的提交点的数据持久化到磁盘;
索引并发
更新
针对索引的更新,我们知道是删除就文档,然后重新添加一个新文档。索引被写入磁盘后是不可改变的,不变带来好处就是无需锁控制更新在并发场景中,应为就不更新。第二个就是缓存方便使用,由于是不变的 当你内存足够大,加载到内存以后就可以不常驻,减少了磁盘IO的交互,当然也有不好的点,就是带来了你不能修改它。如果你需要让一个新的文档 可被搜索,你需要重建整个索引。这要么对一个索引所能包含的数据量造成了很大的限制,要么对索引可被更新的频率造成了很大的限制。
当我们多个请求来去执行更改索引文档的时候,会出现多种不可预估的状态,可能它覆盖你 也可能你覆盖他,或者操作失败;这也就是并发问题。全局锁,文档锁,是Elasticsearch提供的并发更新的解决方案,当然你也可以引入第三方的分布式锁来实现,但是es本身就可以,你这样显的ES像个铁憨憨。
- 全局锁
你可以理解为独占锁,在es中制定一个锁的全局索引。当有并发访问的出现,先去创建一个索引,允许创建则代表拿到锁,创建失败代表获取锁失败。拿到锁之后 执行写入业务的索引,执行完毕之后释放。也就是删除对应的全局索引。
弊端嘛就是粒度太粗,全局索引变更的频繁程度以及时间消耗,会对系统造成大幅度的性能限制。 我们可以通过让我们的锁更细粒度的方式来增加并行度。 - 文档锁
为了确保文档的旧版本不会覆盖新版本,对文档执行的每个操作都由协调该更改的主分片分配一个序列号。
序列号随着每次操作的增加而增加,因此更新的操作保证拥有比旧操作更高的序列号。Elasticsearch然后可以使用操作的序列号,以确保更新的文档版本不会被分配给它的序列号更小的更改覆盖。
例如我们索引了一个文档1567;
PUT products/_doc/1567
{
"product" : "r2d2",
"details" : "A resourceful astromech droid"
}
可以在响应的_seq_no
和_primary_term
字段中看到分配的序列号和主term。
GET products/_doc/1567
{
"_shards": {
"total": 2,
"failed": 0,
"successful": 1
},
"_index": "products",
"_type": "_doc",
"_id": "1567",
"_version": 1,
"_seq_no": 362,
"_primary_term": 2,
"result": "created"
}
那么我们更新的时候,就可以使用这俩个字段来控制并发访问,这可以通过设置索引API、更新API或删除API的if_seq_no
和if_primary_term
参数来实现。
PUT products/_doc/1567?if_seq_no=362&if_primary_term=2
{
"product": "r2d2",
"details": "A resourceful astromech droid",
"tags": [ "droid" ]
}