mongodb 没有官方的游标滚动实现深度分页功能,建议的都是选择出一个字段,如_id,然后每次查询时限制该字段,而不进行分页处理。
也没有看到更优的实现方式,本文做一个大胆的假设,自行实现滚动分页功能。供大家思路参考。
但是猜想可以自行实现一个,简单思路就是,第一次查询时不带limit进行查询全量数据,然后自己通过cursor迭代出需要的行数后返回调用端,下次再调用时,直接取出上一次的cursor,再迭代limit的数量返回。
优势是只需计算一次,后续就直接复用结果即可。该功能需要有mongodb的clientSession功能支持。
但是需要复杂的自己维护cursor实例,打开、关闭、过期等。稍微管理不好,可能就客户端内存泄漏或者mongo server内存泄漏。
实践步骤:
1. 引入mongo 驱动:
<!-- https://mvnrepository.com/artifact/org.mongodb/mongodb-driver-sync -->
<dependency>
<groupId>org.mongodb</groupId>
<artifactId>mongodb-driver-sync</artifactId>
<version>4.4.2</version>
</dependency>
<dependency>
<groupId>org.mongodb</groupId>
<artifactId>mongodb-driver-core</artifactId>
<version>4.4.2</version>
</dependency>
<dependency>
<groupId>org.mongodb</groupId>
<artifactId>bson</artifactId>
<version>4.4.2</version>
</dependency>
注意版本不匹配问题,所以要引入多个包。
2. 创建测试类:
验证接入mongo无误,且造入适量的数据。
import static com.mongodb.client.model.Filters.eq;
import com.mongodb.ConnectionString;
import com.mongodb.MongoClientSettings;
import com.mongodb.WriteConcern;
import com.mongodb.client.*;
import com.mongodb.client.result.InsertOneResult;
import org.bson.Document;
import org.junit.Before;
import org.junit.Test;
import org.openjdk.jmh.annotations.Setup;
public class MongoQuickStartTest {
private MongoClient mongoClient;
@Before
public void setup() {
// Replace the placeholder with your MongoDB deployment's connection string
String uri = "mongodb://localhost:27017";
MongoClientSettings options = MongoClientSettings.builder()
.applyConnectionString(new ConnectionString(uri))
.writeConcern(WriteConcern.W1).build();
mongoClient = MongoClients.create(options);
}
@Test
public void testFind() {
// ConnectionString connectionString = new ConnectionString("mongodb://localhost:27017");
// MongoClient mongoClient = MongoClients.create(connectionString);
// Replace the placeholder with your MongoDB deployment's connection string
MongoDatabase database = mongoClient.getDatabase("local");
MongoCollection<Document> collection = database.getCollection("test01");
Document doc = collection.find(eq("name", "zhangsan1")).first();
if (doc != null) {
System.out.println(doc.toJson());
} else {
System.out.println("No matching documents found.");
}
}
@Test
public void testInsert() {
Document body = new Document();
long startId = 60011122212L;
MongoDatabase database = mongoClient.getDatabase("local");
MongoCollection<Document> collection = database.getCollection("test01");
int i;
for (i = 0; i < 500000; i++) {
String id = (startId + i) + "";
body.put("_id", id);
body.put("name", "name_" + id);
body.put("title", "title_" + id);
InsertOneResult result = collection.insertOne(body);
}
System.out.println("insert " + i + " rows");
}
}
3. 创建cursor的分页查询实现类
基于springboot创建 controller进行会话测试,使用一个固定的查询语句进行分页测试。
import com.mongodb.ConnectionString;
import com.mongodb.MongoClientSettings;
import com.mongodb.WriteConcern;
import com.mongodb.client.*;
import org.bson.Document;
import org.springframework.stereotype.Service;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@Service
public class MongoDbService {
private MongoClient mongoClient;
// 所有游标容器,简单测试,真正的管理很复杂
private Map<String, MongoCursor<Document>> cursorHolder
= new ConcurrentHashMap<>();
public void ensureMongo() {
// Replace the placeholder with your MongoDB deployment's connection string
String uri = "mongodb://localhost:27017";
MongoClientSettings options = MongoClientSettings.builder()
.applyConnectionString(new ConnectionString(uri))
.writeConcern(WriteConcern.W1).build();
mongoClient = MongoClients.create(options);
}
// 特殊实现的 cursor 滚动查询
public List<Document> findDataWithCursor(String searchAfter, int limit) {
ensureMongo();
MongoDatabase database = mongoClient.getDatabase("local");
MongoCollection<Document> collection = database.getCollection("test01");
List<Document> resultList = new ArrayList<>();
MongoCursor<Document> cursor = cursorHolder.get(searchAfter);
if(cursor == null) {
// 第一次取用需要查询,后续直接复用cursor即可
cursor = collection.find().sort(new Document("name", 1)).iterator();
cursorHolder.put(searchAfter, cursor);
}
int i = 0;
// 自行计数,到达后即返回前端
while (cursor.hasNext()) {
resultList.add(cursor.next());
if(++i >= limit) {
break;
}
}
if(!cursor.hasNext()) {
cursor.close();
cursorHolder.remove(searchAfter);
}
return resultList;
}
}
应用调用controller:
@Resource
private MongoDbService mongoDbService;
@GetMapping("/mongoPageScroll")
@ResponseBody
public Object mongoPageScroll(@RequestParam(required = false) String params,
@RequestParam String scrollId) {
return mongoDbService.findDataWithCursor(scrollId, 9);
}
4. 测试方式或使用方法
测试方式分为首次查询和下一页查询,首次访问接口:http://localhost:8080/hello/mongoPageScroll?scrollId=c,然后反复调用(下一页)。
如此,只要前端第一次查询时,不存在cursor就创建,后续就直接使用原来的结果。第一次可能慢,第二次就很快了。
结论,是可以简单实现的,但是生产不一定能用。因为,如何管理cursor,绝对是个超级复杂的事,何时打开,何时关闭,超时处理,单点故障,机器宕机等,很难解决。思路仅参考!
5. search_after机制实现
而同样的事情如果交给db server也许是容易些的,但遇到的困难也很多,主要更多了一个内存过大问题很难处理,所以es的高版本实现已经把 scroll 机制去除了。
es的高版本去除了scroll机制,而是替换成了search_after机制。那么search_after机制又有什么不同呢?其表象是每次查询下一页时把最后一条记录的sort字段携带上,然后就不再skip记录了,而是直接取limit条即可。那么它的底层原理是什么呢?缓存机制?查询语句改写?
具体方式后面再细细研究,如果是语义改写,我们是可以做点什么的。如果是缓存机制则可能要放弃了。
下面给出一点语义改写的思路:
1. 如果是单个字段,那么相对简单,只要新生成一个排序字段和_id字段组合串,用户下次查询时带上就可以了,但是要求两个排序的方向一致,即单方向,从而下次偏移时知道是大于还是小于了;比如如果asc,那么下次的语义改写就是添加一个条件: and _id > 'last_id'; 而如果是desc,那么语义改写就是: and _id < 'last_id';
2. 如果是复合字段,如果方向相同,可以参考第一点(仅参考,实际是不能应用的),如果是多方向的,那么就不能简单的使用><进行偏移了;简单来说可能就是取反逻辑,但如何取反却是很难的。
比如以2个字段排序为例:
原始排序依据是:order by fd1 asc, fd2 desc;
首先要保证准确的排序展现,后端必须隐形默默地加上_id排序,即会变成:order by fd1 asc, fd2 desc, _id asc; 但为说清楚原理简单起见,这种情况不在我们的理论讨论范围内。即忽略,假设每条记录都可以通过排序字段区分出来。
那么,语义改写则可能是:and ( (fd1 > 'last_fd1') or (fd1 = 'last_fd1' and fd2 > 'last_fd2') )
比如以3个字段排序为例:
原始排序依据是:order by fd1 asc, fd2 desc, fd3 asc;
那么,语义改写则可能是:and ( (fd1 > 'last_fd1') or (fd1 = 'last_fd1' and fd2 < 'last_fd2') or (fd1 = 'last_fd1' and fd2 = 'last_fd2' and fd3 > 'last_fd3'))
更多字段依此类推,只要以下几种情况都是可以的:
1. 第一字段满足,停止;
2. 第一+第二字段满足,停止;
3. 第一+第二字段+第三字段满足,停止;
4. 更多。。。 即有几个排序字体就有几个改写的可能;
这种改写与skip有什么差别吗?还是有的,skip的实现方式是先找到所有数据,再跳过。而这种改写是缩小了结果集范围,减少了运算量,效果应该是要好一点的。更优化的方式是,在排序字段上加上索引,那么性能就差别更大了,就像前面的_id字段优化,已成为了最佳实践。