忘机山人
> 本章导读:本章将详细介绍 Elasticsearch 的 CRUD(创建、读取、更新、删除)操作,包括索引管理、文档操作、批量处理和数据迁移等核心功能,帮助读者掌握 ES 数据操作的基础技能。
https://appstore.lazycat.cloud/#/shop/detail/xu.deploy.elasticsearch
## 目录
- [前置知识](#前置知识)
- [索引操作](#索引操作)
- [文档操作](#文档操作)
- [批量操作](#批量操作)
- [更新操作详解](#更新操作详解)
- [数据迁移](#数据迁移)
- [实践示例](#实践示例)
- [本章小结](#本章小结)
- [参考资料](#参考资料)
## 索引操作
索引是 Elasticsearch 中存储数据的逻辑容器。在进行文档操作之前,通常需要先创建和配置索引。
### 创建索引
#### 基本创建
```json
// 创建一个简单的索引
PUT /products
```
#### 带设置的创建
```json
// 创建索引并指定设置
PUT /products
{
"settings": {
"number_of_shards": 3,
"number_of_replicas": 1,
"refresh_interval": "1s"
}
}
```
#### 带映射的创建
```json
// 创建索引并定义映射
PUT /products
{
"settings": {
"number_of_shards": 3,
"number_of_replicas": 1
},
"mappings": {
"properties": {
"name": {
"type": "text",
"analyzer": "standard",
"fields": {
"keyword": {
"type": "keyword"
}
}
},
"price": {
"type": "double"
},
"category": {
"type": "keyword"
},
"description": {
"type": "text"
},
"stock": {
"type": "integer"
},
"created_at": {
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis"
},
"is_available": {
"type": "boolean"
},
"tags": {
"type": "keyword"
}
}
}
}
```
### 查看索引
```json
// 查看单个索引信息
GET /products
// 查看索引设置
GET /products/_settings
// 查看索引映射
GET /products/_mapping
// 查看所有索引(表格格式)
GET /_cat/indices?v
// 查看索引详细信息
GET /_cat/indices/products?v&h=index,health,status,pri,rep,docs.count,store.size
```
### 更新索引设置
```json
// 更新动态设置
PUT /products/_settings
{
"index": {
"number_of_replicas": 2,
"refresh_interval": "30s"
}
}
// 关闭索引后更新静态设置
POST /products/_close
PUT /products/_settings
{
"index": {
"analysis": {
"analyzer": {
"my_analyzer": {
"type": "custom",
"tokenizer": "standard",
"filter": ["lowercase"]
}
}
}
}
}
POST /products/_open
```

### 删除索引
```json
// 删除单个索引
DELETE /products
// 删除多个索引
DELETE /products,orders
// 使用通配符删除(谨慎使用)
DELETE /logs-2024-*
// 删除所有索引(极度危险,生产环境禁用)
// DELETE /_all
// DELETE /*
```
> ⚠️ **警告**:删除索引是不可逆操作,请确保已备份重要数据。生产环境建议设置 `action.destructive_requires_name: true` 禁止通配符删除。
### 索引别名
别名是指向一个或多个索引的虚拟名称,可用于零停机切换索引。
```json
// 创建别名
POST /_aliases
{
"actions": [
{
"add": {
"index": "products_v1",
"alias": "products"
}
}
]
}
// 切换别名(原子操作)
POST /_aliases
{
"actions": [
{ "remove": { "index": "products_v1", "alias": "products" } },
{ "add": { "index": "products_v2", "alias": "products" } }
]
}
// 查看别名
GET /_cat/aliases?v
GET /products/_alias
```
### 索引开关
```json
// 关闭索引(节省资源,不可读写)
POST /products/_close
// 打开索引
POST /products/_open
// 查看索引状态
GET /_cat/indices/products?v
```
## 文档操作
### 创建文档
#### 指定 ID 创建
```json
// 使用 PUT 指定文档 ID
PUT /products/_doc/1
{
"name": "iPhone 15 Pro",
"price": 8999,
"category": "手机",
"description": "Apple 最新旗舰手机,搭载 A17 Pro 芯片",
"stock": 100,
"created_at": "2024-01-15",
"is_available": true,
"tags": ["苹果", "5G", "旗舰"]
}
```
响应示例:
```json
{
"_index": "products",
"_id": "1",
"_version": 1,
"result": "created",
"_shards": {
"total": 2,
"successful": 2,
"failed": 0
},
"_seq_no": 0,
"_primary_term": 1
}
```
#### 自动生成 ID
```json
// 使用 POST 自动生成 ID
POST /products/_doc
{
"name": "MacBook Pro 14",
"price": 14999,
"category": "笔记本",
"description": "Apple M3 Pro 芯片,专业级性能",
"stock": 50,
"created_at": "2024-01-16",
"is_available": true,
"tags": ["苹果", "专业", "M3"]
}
```
#### 强制创建(防止覆盖)
```json
// 使用 _create 端点,如果文档已存在则报错
PUT /products/_create/1
{
"name": "iPhone 15",
"price": 7999
}
// 或使用 op_type 参数
PUT /products/_doc/1?op_type=create
{
"name": "iPhone 15",
"price": 7999
}
```
如果文档已存在,返回 409 冲突错误:
```json
{
"error": {
"type": "version_conflict_engine_exception",
"reason": "[1]: version conflict, document already exists"
},
"status": 409
}
```
### 读取文档
#### 获取单个文档
```json
// 获取完整文档
GET /products/_doc/1
// 只获取 _source 字段
GET /products/_source/1
// 获取指定字段
GET /products/_doc/1?_source_includes=name,price
// 排除指定字段
GET /products/_doc/1?_source_excludes=description
```
响应示例:
```json
{
"_index": "products",
"_id": "1",
"_version": 1,
"_seq_no": 0,
"_primary_term": 1,
"found": true,
"_source": {
"name": "iPhone 15 Pro",
"price": 8999,
"category": "手机",
"description": "Apple 最新旗舰手机,搭载 A17 Pro 芯片",
"stock": 100,
"created_at": "2024-01-15",
"is_available": true,
"tags": ["苹果", "5G", "旗舰"]
}
}
```
#### 检查文档是否存在
```json
// 使用 HEAD 请求检查文档存在性
HEAD /products/_doc/1
```
存在返回 `200 OK`,不存在返回 `404 Not Found`。
#### 批量获取文档
```json
// 使用 _mget 批量获取
GET /products/_mget
{
"ids": ["1", "2", "3"]
}
// 跨索引批量获取
GET /_mget
{
"docs": [
{ "_index": "products", "_id": "1" },
{ "_index": "orders", "_id": "100" }
]
}
// 指定返回字段
GET /products/_mget
{
"ids": ["1", "2"],
"_source": ["name", "price"]
}
```
### 更新文档
#### 部分更新
```json
// 使用 _update API 部分更新
POST /products/_update/1
{
"doc": {
"price": 8499,
"stock": 90
}
}
```
#### 使用脚本更新
```json
// 使用 Painless 脚本更新
POST /products/_update/1
{
"script": {
"source": "ctx._source.stock -= params.quantity",
"params": {
"quantity": 10
}
}
}
// 条件更新
POST /products/_update/1
{
"script": {
"source": """
if (ctx._source.stock >= params.quantity) {
ctx._source.stock -= params.quantity;
} else {
ctx.op = 'noop';
}
""",
"params": {
"quantity": 5
}
}
}
// 添加数组元素
POST /products/_update/1
{
"script": {
"source": "ctx._source.tags.add(params.tag)",
"params": {
"tag": "热销"
}
}
}
// 删除数组元素
POST /products/_update/1
{
"script": {
"source": "ctx._source.tags.remove(ctx._source.tags.indexOf(params.tag))",
"params": {
"tag": "热销"
}
}
}
```
#### Upsert(更新或插入)
```json
// 如果文档存在则更新,不存在则创建
POST /products/_update/999
{
"doc": {
"price": 5999
},
"upsert": {
"name": "新商品",
"price": 5999,
"category": "其他",
"stock": 100,
"created_at": "2024-01-20",
"is_available": true
}
}
// 使用 scripted_upsert
POST /products/_update/999
{
"scripted_upsert": true,
"script": {
"source": """
if (ctx.op == 'create') {
ctx._source.name = params.name;
ctx._source.price = params.price;
ctx._source.view_count = 1;
} else {
ctx._source.view_count += 1;
}
""",
"params": {
"name": "新商品",
"price": 5999
}
},
"upsert": {}
}
```
#### 完全替换文档
```json
// 使用 PUT 完全替换(会删除未指定的字段)
PUT /products/_doc/1
{
"name": "iPhone 15 Pro Max",
"price": 9999,
"category": "手机"
}
```
### 删除文档
#### 删除单个文档
```json
// 根据 ID 删除
DELETE /products/_doc/1
```
响应示例:
```json
{
"_index": "products",
"_id": "1",
"_version": 2,
"result": "deleted",
"_shards": {
"total": 2,
"successful": 2,
"failed": 0
},
"_seq_no": 1,
"_primary_term": 1
}
```
#### 条件删除
```json
// 使用 Delete By Query 删除匹配的文档
POST /products/_delete_by_query
{
"query": {
"term": {
"is_available": false
}
}
}
// 删除某个类别的所有商品
POST /products/_delete_by_query
{
"query": {
"term": {
"category": "过期商品"
}
}
}
// 带冲突处理的删除
POST /products/_delete_by_query?conflicts=proceed
{
"query": {
"range": {
"created_at": {
"lt": "2023-01-01"
}
}
}
}
```
## 批量操作
Bulk API 允许在单个请求中执行多个索引、更新或删除操作,大幅提高数据处理效率。
### Bulk API 基本语法
Bulk 请求使用 NDJSON(Newline Delimited JSON)格式:
```
action_and_meta_data\n
optional_source\n
action_and_meta_data\n
optional_source\n
...
```
### 批量索引
```json
POST /products/_bulk
{"index":{"_id":"1"}}
{"name":"iPhone 15","price":7999,"category":"手机","stock":100}
{"index":{"_id":"2"}}
{"name":"MacBook Pro","price":14999,"category":"笔记本","stock":50}
{"index":{"_id":"3"}}
{"name":"iPad Pro","price":8999,"category":"平板","stock":80}
{"index":{"_id":"4"}}
{"name":"AirPods Pro","price":1899,"category":"配件","stock":200}
```
### 批量混合操作
```json
POST /_bulk
{"index":{"_index":"products","_id":"10"}}
{"name":"商品A","price":999}
{"create":{"_index":"products","_id":"11"}}
{"name":"商品B","price":1999}
{"update":{"_index":"products","_id":"1"}}
{"doc":{"price":7499}}
{"delete":{"_index":"products","_id":"99"}}
```
### Bulk 操作类型
| 操作 | 说明 | 是否需要文档体 |
|------|------|----------------|
| `index` | 创建或替换文档 | 是 |
| `create` | 创建文档(已存在则失败) | 是 |
| `update` | 部分更新文档 | 是 |
| `delete` | 删除文档 | 否 |
### Bulk 响应解析
```json
{
"took": 30,
"errors": false,
"items": [
{
"index": {
"_index": "products",
"_id": "1",
"_version": 1,
"result": "created",
"status": 201
}
},
{
"index": {
"_index": "products",
"_id": "2",
"_version": 1,
"result": "created",
"status": 201
}
}
]
}
```
### Bulk 性能优化
```json
// 1. 合理设置批量大小(建议 1000-5000 条或 5-15MB)
// 2. 使用 refresh=false 禁用自动刷新
POST /products/_bulk?refresh=false
{"index":{"_id":"1"}}
{"name":"商品1","price":100}
{"index":{"_id":"2"}}
{"name":"商品2","price":200}
// 3. 批量完成后手动刷新
POST /products/_refresh
```
**Bulk 最佳实践**:
| 参数 | 建议值 | 说明 |
|------|--------|------|
| 批量大小 | 1000-5000 条 | 根据文档大小调整 |
| 请求大小 | 5-15 MB | 避免过大导致内存问题 |
| 并发数 | 2-4 | 根据集群能力调整 |
| refresh | false | 批量完成后统一刷新 |
### curl 命令示例
```bash
# 从文件批量导入
curl -X POST "localhost:9200/products/_bulk" \
-H "Content-Type: application/x-ndjson" \
--data-binary @products.ndjson
# products.ndjson 文件内容:
# {"index":{"_id":"1"}}
# {"name":"商品1","price":100}
# {"index":{"_id":"2"}}
# {"name":"商品2","price":200}
```
## 更新操作详解
### Update API
Update API 用于部分更新文档,只修改指定字段而不影响其他字段。
```json
// 基本部分更新
POST /products/_update/1
{
"doc": {
"price": 7499,
"updated_at": "2024-01-20"
}
}
// 检测是否有实际更改
POST /products/_update/1?detect_noop=true
{
"doc": {
"price": 7499
}
}
// 强制更新(即使内容相同)
POST /products/_update/1?detect_noop=false
{
"doc": {
"price": 7499
}
}
```
### Update By Query
Update By Query 用于批量更新匹配条件的文档。
```json
// 将所有手机类商品价格降低 10%
POST /products/_update_by_query
{
"query": {
"term": {
"category": "手机"
}
},
"script": {
"source": "ctx._source.price = ctx._source.price * 0.9",
"lang": "painless"
}
}
// 为所有商品添加新字段
POST /products/_update_by_query
{
"query": {
"match_all": {}
},
"script": {
"source": "ctx._source.updated_at = params.date",
"params": {
"date": "2024-01-20"
}
}
}
// 带限制的更新
POST /products/_update_by_query?scroll_size=1000&conflicts=proceed
{
"query": {
"range": {
"stock": {
"lt": 10
}
}
},
"script": {
"source": "ctx._source.is_available = false"
}
}
```
### Update By Query 参数
| 参数 | 说明 | 默认值 |
|------|------|--------|
| `conflicts` | 冲突处理:`abort`(中止)或 `proceed`(继续) | abort |
| `refresh` | 完成后是否刷新 | false |
| `scroll_size` | 每批处理的文档数 | 1000 |
| `wait_for_completion` | 是否等待完成 | true |
| `requests_per_second` | 限流(每秒请求数) | -1(不限制) |
### 异步更新
```json
// 异步执行 Update By Query
POST /products/_update_by_query?wait_for_completion=false
{
"query": {
"match_all": {}
},
"script": {
"source": "ctx._source.batch_updated = true"
}
}
// 返回任务 ID
{
"task": "node1:12345"
}
// 查看任务状态
GET /_tasks/node1:12345
// 取消任务
POST /_tasks/node1:12345/_cancel
```
## 数据迁移
### Reindex API
Reindex API 用于将数据从一个索引复制到另一个索引,常用于映射变更、索引重建等场景。
#### 基本重建索引
```json
// 将 products_v1 的数据复制到 products_v2
POST /_reindex
{
"source": {
"index": "products_v1"
},
"dest": {
"index": "products_v2"
}
}
```
#### 带查询条件的重建
```json
// 只迁移特定条件的数据
POST /_reindex
{
"source": {
"index": "products_v1",
"query": {
"term": {
"is_available": true
}
}
},
"dest": {
"index": "products_v2"
}
}
```
#### 使用脚本转换数据
```json
// 迁移时转换数据
POST /_reindex
{
"source": {
"index": "products_v1"
},
"dest": {
"index": "products_v2"
},
"script": {
"source": """
ctx._source.price_with_tax = ctx._source.price * 1.13;
ctx._source.migrated_at = '2024-01-20';
"""
}
}
```
#### 跨集群重建
```json
// 从远程集群迁移数据
POST /_reindex
{
"source": {
"remote": {
"host": "http://remote-es:9200",
"username": "user",
"password": "pass"
},
"index": "products"
},
"dest": {
"index": "products_local"
}
}
```
#### Reindex 性能优化
```json
// 优化 Reindex 性能
POST /_reindex?wait_for_completion=false&refresh=false
{
"source": {
"index": "products_v1",
"size": 5000
},
"dest": {
"index": "products_v2"
}
}
// 使用 slices 并行处理
POST /_reindex?slices=auto
{
"source": {
"index": "products_v1"
},
"dest": {
"index": "products_v2"
}
}
```
### 零停机索引重建流程
```
步骤 1: 创建新索引(新映射)
┌─────────────────────────────────────────┐
│ PUT /products_v2 │
│ { "mappings": { ... } } │
└─────────────────────────────────────────┘
│
▼
步骤 2: 重建索引数据
┌─────────────────────────────────────────┐
│ POST /_reindex │
│ { "source": {"index": "products_v1"}, │
│ "dest": {"index": "products_v2"} } │
└─────────────────────────────────────────┘
│
▼
步骤 3: 切换别名(原子操作)
┌─────────────────────────────────────────┐
│ POST /_aliases │
│ { "actions": [ │
│ {"remove": {"index": "products_v1", │
│ "alias": "products"}}, │
│ {"add": {"index": "products_v2", │
│ "alias": "products"}} │
│ ]} │
└─────────────────────────────────────────┘
│
▼
步骤 4: 删除旧索引(可选)
┌─────────────────────────────────────────┐
│ DELETE /products_v1 │
└─────────────────────────────────────────┘
```
## 实践示例
### 电商商品管理完整示例
```json
// 1. 创建商品索引
PUT /shop_products
{
"settings": {
"number_of_shards": 3,
"number_of_replicas": 1,
"refresh_interval": "1s"
},
"mappings": {
"properties": {
"product_id": { "type": "keyword" },
"name": {
"type": "text",
"analyzer": "standard",
"fields": {
"keyword": { "type": "keyword" }
}
},
"description": { "type": "text" },
"category": { "type": "keyword" },
"brand": { "type": "keyword" },
"price": { "type": "double" },
"original_price": { "type": "double" },
"stock": { "type": "integer" },
"sales": { "type": "integer" },
"rating": { "type": "float" },
"tags": { "type": "keyword" },
"is_available": { "type": "boolean" },
"created_at": { "type": "date" },
"updated_at": { "type": "date" }
}
}
}
// 2. 批量导入商品
POST /shop_products/_bulk
{"index":{"_id":"P001"}}
{"product_id":"P001","name":"iPhone 15 Pro","description":"Apple 最新旗舰手机","category":"手机","brand":"Apple","price":8999,"original_price":9999,"stock":100,"sales":500,"rating":4.8,"tags":["5G","旗舰","热销"],"is_available":true,"created_at":"2024-01-01","updated_at":"2024-01-15"}
{"index":{"_id":"P002"}}
{"product_id":"P002","name":"华为 Mate 60 Pro","description":"华为旗舰手机,麒麟芯片","category":"手机","brand":"华为","price":6999,"original_price":7499,"stock":80,"sales":800,"rating":4.9,"tags":["5G","旗舰","国产"],"is_available":true,"created_at":"2024-01-02","updated_at":"2024-01-15"}
{"index":{"_id":"P003"}}
{"product_id":"P003","name":"MacBook Pro 14","description":"Apple M3 Pro 芯片笔记本","category":"笔记本","brand":"Apple","price":14999,"original_price":15999,"stock":50,"sales":200,"rating":4.7,"tags":["专业","M3","高性能"],"is_available":true,"created_at":"2024-01-03","updated_at":"2024-01-15"}
// 3. 查询商品
GET /shop_products/_doc/P001
// 4. 更新库存(减少库存)
POST /shop_products/_update/P001
{
"script": {
"source": """
if (ctx._source.stock >= params.quantity) {
ctx._source.stock -= params.quantity;
ctx._source.sales += params.quantity;
ctx._source.updated_at = params.now;
} else {
ctx.op = 'noop';
}
""",
"params": {
"quantity": 1,
"now": "2024-01-20"
}
}
}
// 5. 批量更新价格(促销活动)
POST /shop_products/_update_by_query
{
"query": {
"term": {
"brand": "Apple"
}
},
"script": {
"source": """
ctx._source.original_price = ctx._source.price;
ctx._source.price = ctx._source.price * 0.9;
ctx._source.tags.add('促销');
ctx._source.updated_at = params.now;
""",
"params": {
"now": "2024-01-20"
}
}
}
// 6. 下架缺货商品
POST /shop_products/_update_by_query
{
"query": {
"range": {
"stock": {
"lte": 0
}
}
},
"script": {
"source": "ctx._source.is_available = false"
}
}
// 7. 删除过期商品
POST /shop_products/_delete_by_query
{
"query": {
"bool": {
"must": [
{ "term": { "is_available": false } },
{ "range": { "updated_at": { "lt": "2023-01-01" } } }
]
}
}
}
```
### curl 命令示例
```bash
# 创建索引
curl -X PUT "localhost:9200/products" -H 'Content-Type: application/json' -d'
{
"settings": {
"number_of_shards": 3,
"number_of_replicas": 1
}
}'
# 创建文档
curl -X PUT "localhost:9200/products/_doc/1" -H 'Content-Type: application/json' -d'
{
"name": "iPhone 15",
"price": 7999,
"category": "手机"
}'
# 获取文档
curl -X GET "localhost:9200/products/_doc/1"
# 更新文档
curl -X POST "localhost:9200/products/_update/1" -H 'Content-Type: application/json' -d'
{
"doc": {
"price": 7499
}
}'
# 删除文档
curl -X DELETE "localhost:9200/products/_doc/1"
# 批量操作
curl -X POST "localhost:9200/products/_bulk" -H 'Content-Type: application/x-ndjson' -d'
{"index":{"_id":"1"}}
{"name":"商品1","price":100}
{"index":{"_id":"2"}}
{"name":"商品2","price":200}
'
```
## 本章小结
本章详细介绍了 Elasticsearch 的 CRUD 操作:
1. **索引操作**:
- 创建索引时可指定设置和映射
- 动态设置可随时修改,静态设置需关闭索引后修改
- 使用别名实现零停机索引切换
2. **文档操作**:
- PUT 指定 ID 创建,POST 自动生成 ID
- `_create` 端点防止覆盖已存在文档
- `_mget` 批量获取多个文档
3. **批量操作**:
- Bulk API 支持 index、create、update、delete 四种操作
- 建议批量大小 1000-5000 条或 5-15MB
- 使用 `refresh=false` 提高批量写入性能
4. **更新操作**:
- Update API 支持部分更新和脚本更新
- Update By Query 批量更新匹配条件的文档
- Upsert 实现更新或插入逻辑
5. **数据迁移**:
- Reindex API 用于索引重建和数据迁移
- 支持查询过滤、脚本转换、跨集群迁移
- 配合别名实现零停机迁移
## 参考资料
- [Index APIs](https://www.elastic.co/guide/en/elasticsearch/reference/current/indices.html)
- [Document APIs](https://www.elastic.co/guide/en/elasticsearch/reference/current/docs.html)
- [Bulk API](https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html)
- [Update API](https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-update.html)
- [Update By Query API](https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-update-by-query.html)
- [Reindex API](https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-reindex.html)
评论
0暂无评论