介绍
Quickwit可以将数据从一个或多个源插入到索引中。创建索引后,可以使用CLI 命令quickwit source create添加源,支持的源有:file、kafka、kinesis、pulsar。
本章讲解如何从Quickwit搜索引擎中创建Kafka源和获取Kafka源主题数据流,注意从Kafka流中读取数据流中的每条消息都必须包含一个JSON 对象,目前官方只支持数据源JSON格式数据导入。
官方关于Kafka源文档说明:Kafka | Quickwit
创建Quickwit索引
因此前已在docker服务主机上创建了包含quickwit服务的docker容器,本文不详述quickwit安装过程,可参见《Docker安装Quickwit搜索引擎》。
官方示例YMAL配置
#
# Index config file for gh-archive dataset.
#
version: 0.8
index_id: gh-archive
doc_mapping:
field_mappings:
- name: id
type: text
tokenizer: raw
- name: type
type: text
fast: true
tokenizer: raw
- name: public
type: bool
fast: true
- name: payload
type: json
tokenizer: default
- name: org
type: json
tokenizer: default
- name: repo
type: json
tokenizer: default
- name: actor
type: json
tokenizer: default
- name: other
type: json
tokenizer: default
- name: created_at
type: datetime
fast: true
input_formats:
- rfc3339
fast_precision: seconds
timestamp_field: created_at
indexing_settings:
commit_timeout_secs: 10
也可直接下载官方示例yaml文件,并通过quickwit服务命令创建索引
# Download GH Archive index config.
wget -O gh-archive.yaml https://raw.githubusercontent.com/quickwit-oss/quickwit/main/config/tutorials/gh-archive/index-config.yaml
# Create index.
./quickwit index create --index-config gh-archive.yaml
Kafka服务安装
可自行百度,非本文核心,略过...
下载Kafka测试数据
测试数据来源于Quickwit官方,参见:Kafka | Quickwit
创建kafka主题,下载官方提供的json测试数据包,将数据推送到Kafka主题;
# Create a topic named `gh-archive` with 3 partitions.
bin/kafka-topics.sh --create --topic gh-archive --partitions 3 --bootstrap-server localhost:9092
# Download a few GH Archive files.
wget https://data.gharchive.org/2022-05-12-{10..15}.json.gz
# Load the events into Kafka topic.
gunzip -c 2022-05-12*.json.gz | \
bin/kafka-console-producer.sh --topic gh-archive --bootstrap-server localhost:9092
由于文件包太大,本地下载其中1天json数据包,执行命令如下
curl https://data.gharchive.org/2022-05-12-10.json.gz -o 2022-05-12-10.json.gz
创建Quickwit数据源
kafka数据源yaml配置示例
#
# Kafka source config file.
#
version: 0.8
source_id: kafka-source
source_type: kafka
num_pipelines: 2
params:
topic: gh-archive
client_params:
bootstrap.servers: localhost:9092
也可直接下载官方提供的kafka-source.yaml文件,通过quickwit服务命令直接创建gh-archive索引对应的kafka源;
# Download Kafka source config.
wget https://raw.githubusercontent.com/quickwit-oss/quickwit/main/config/tutorials/gh-archive/kafka-source.yaml
# Create source.
./quickwit source create --index gh-archive --source-config kafka-source.yaml
完成上述操作后,Quickwit会创建索引器和搜索器,索引器将会边接到Kafka源指定topic主题上,并从对应group组主题分区上获取数据,通过流式传输到Quickwit中;
Java推送到Kafka主题
基于Java发送消息到Kafka示例代码,工具类可参见《Kafka消息服务之Java工具类》,本章不作详细讲述,通过代码片段进行演示;
java">//生产者发送消息
KafkaUtils.KafkaStreamServer kafkaStreamServer = KafkaUtils.bulidServer().createKafkaStreamServer("192.168.1.3", 9092);
String topic = "gh-archive";
int n = 0;
List<String>lines = FileUtils.readLines(new File("D:\\test\\kafka\\2022-05-12-10.json"), "UTF-8");
for (String line : lines) {
System.out.println("发送消息:" + line.substring(0,30) + " ...");
//向kafka队列发送数据
kafkaStreamServer.sendMsg(topic, line);
if (n > 0 && n / 1000 == 0) {
//线程休眠
TimeUnit.MILLISECONDS.sleep(RandomUtils.nextInt(1, 200));
}
n ++;
}
//共累计推送到kafka数量:156040条
System.out.println("共累计推送到kafka数量:" + n + "条");
kafkaStreamServer.close();
因官方测试数据比较大,本章节通过提前下载Json数据压缩包,只使用了其中一个Json压缩包:2022-05-12-10.json.gz;
curl https://data.gharchive.org/2022-05-12-10.json.gz -o 2022-05-12-10.json.gz
Java创建Quickwit索引和Kafka数据源
Quickwit提供了丰富的REST API,因此支持通过HTTP请求创建、维护索引、索引查询以及数据源维护等;以下直接使用Java程序通过Http演示示例;
QuickwitKafkaSourceTest.java
java">package com.example;
import org.junit.jupiter.api.Test;
public class QuickwitKafkaSourceTest {
private final static String QUICKWIT_URL = "http://192.168.1.3:7280/";
/**
* 创建索引
* @throws Exception
*/
@Test
public void createIndex() throws Exception {
String indexConf = """
{
"version": "0.8",
"index_id": "gh-archive",
"doc_mapping": {
"field_mappings": [
{
"name": "id",
"type": "text",
"tokenizer": "raw"
},
{
"name": "type",
"type": "text",
"fast": true,
"tokenizer": "raw"
},
{
"name": "public",
"type": "bool",
"fast": true
},
{
"name": "payload",
"type": "json",
"tokenizer": "default"
},
{
"name": "org",
"type": "json",
"tokenizer": "default"
},
{
"name": "repo",
"type": "json",
"tokenizer": "default"
},
{
"name": "actor",
"type": "json",
"tokenizer": "default"
},
{
"name": "other",
"type": "json",
"tokenizer": "default"
},
{
"name": "created_at",
"type": "datetime",
"fast": true,
"input_formats": ["rfc3339"],
"fast_precision": "seconds"
}
],
"timestamp_field": "created_at"
},
"indexing_settings": {
"commit_timeout_secs": 10
}
}
""";
System.out.println("响应:" + HttpUtils.build().httpPost(QUICKWIT_URL + "api/v1/indexes", indexConf));
}
/**
* 创建数据源, 源支持:(kafka,kinesis,file)
* @throws Exception
*/
@Test
public void createSources() throws Exception {
String indexId = "gh-archive";
String jsonDate = """
{
"version": "0.8",
"source_id": "kafka-source",
"source_type": "kafka",
"num_pipelines": 1,
"input_format": "json",
"params": {
"topic": "gh-archive",
"client_params": {
"auto.offset.reset": "earliest",
"bootstrap.servers": "192.168.1.5:9092"
}
}
}
""";
System.out.println("响应:" + HttpUtils.build().httpPost(QUICKWIT_URL + "api/v1/indexes/" + indexId + "/sources", jsonDate));
}
/**
* 将索引切换到一个新的源上
* @throws Exception
*/
@Test
public void indexesSourceToggle() throws Exception {
String indexId = "gh-archive";
String sourceId = "kafka-source";
System.out.println("响应:" + HttpUtils.build().http(QUICKWIT_URL + "api/v1/indexes/" + indexId + "/sources/" + sourceId + "/toggle", "PUT", ""));
}
/**
* 重置索引绑定的源
* @throws Exception
*/
@Test
public void indexesSourceReset() throws Exception {
String indexId = "gh-archive";
String sourceId = "kafka-source";
System.out.println("响应:" + HttpUtils.build().http(QUICKWIT_URL + "api/v1/indexes/" + indexId + "/sources/" + sourceId + "/reset-checkpoint", "PUT", ""));
}
/**
* 删除索引绑定的源
* @throws Exception
*/
@Test
public void indexesSourceDelete() throws Exception {
String indexId = "gh-archive";
String sourceId = "my-kafka-source";
System.out.println("响应:" + HttpUtils.build().http(QUICKWIT_URL + "api/v1/indexes/" + indexId + "/sources/" + sourceId, "DELETE", null));
}
/**
* 获取索引描述信息
* @throws Exception
*/
@Test
public void getIndexesDescribe() throws Exception {
String indexId = "gh-archive";
System.out.println("响应:" + HttpUtils.build().httpGet(QUICKWIT_URL + "api/v1/indexes/" + indexId + "/describe"));
}
/**
* 删除索引数据和所有元数据
* @throws Exception
*/
@Test
public void deleteIndexes() throws Exception {
String indexId = "gh-archive";
System.out.println("响应:" + HttpUtils.build().http(QUICKWIT_URL + "api/v1/indexes/" + indexId , "DELETE", null));
}
}
HttpUtils.java
java">package com.example;
import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.nio.charset.StandardCharsets;
public class HttpUtils {
private HttpClient httpClient;
public static HttpUtils build(){
return new HttpUtils();
}
private HttpUtils() {
httpClient = HttpClient.newHttpClient();
}
public String httpPost(String url, String jsonData) throws Exception {
return http(url, "POST", jsonData);
}
public String httpGet(String url) throws Exception {
return http(url, "GET", null);
}
public String http(String url, String method, String jsonData) throws Exception {
System.out.println("请求:" + url);
HttpRequest request = null;
HttpRequest.Builder builder = HttpRequest.newBuilder()
.uri(new URI( url))
.header("Content-Type", "application/json; charset=UTF-8" )
.header("Timeout", "5000");
if ("POST".equals(method)) {
request = builder.POST(HttpRequest.BodyPublishers.ofString(jsonData, StandardCharsets.UTF_8)).build();
} else if ("PUT".equals(method)) {
request = builder.PUT(HttpRequest.BodyPublishers.ofString(jsonData, StandardCharsets.UTF_8)).build();
} else if ("DELETE".equals(method)) {
request = builder.DELETE().build();
} else {
request = builder.GET().build();
}
HttpResponse<String> response = httpClient.send(request, HttpResponse.BodyHandlers.ofString());
System.out.println("状态码:" + response.statusCode());
return response.body();
}
}
在Java开发工具中分别执行createIndex()和createSources()测试方法后,即可创建索引、绑定Kafka数据源;
检索Kafka数据
在推送Kafka数据后,通过Redpanda控制台工具(此工具本文不做详述,参见官方《redpanda-console-kafka-ui》)查看Kfaka主题大小约438M;并且Json数据格式中有多层json嵌套;
在浏览器上输入Quickwit UI访问地址:http://127.0.0.1:7280,在Quickwit UI 》 Indexs 中查看已创建的gh-archive索引;
点击gh-archive索引,进入到SOURCES中查看已配Kafka源;
Quickwit索引器从Kafka源中获取数据流后写入到指定索引中,在Query editor功能中可以查询到已存储的索引数据;
参考:
REST API | Quickwit