Elasticsearch 的 CRUD全集

忘机山人

发布于182天前
博客图片修整中,看不了可以先搜索公众号“忘机山人”看。
> 本章导读:本章将详细介绍 Elasticsearch 的 CRUD(创建、读取、更新、删除)操作,包括索引管理、文档操作、批量处理和数据迁移等核心功能,帮助读者掌握 ES 数据操作的基础技能。



https://appstore.lazycat.cloud/#/shop/detail/xu.deploy.elasticsearch



## 目录

- [前置知识](#前置知识)
- [索引操作](#索引操作)
- [文档操作](#文档操作)
- [批量操作](#批量操作)
- [更新操作详解](#更新操作详解)
- [数据迁移](#数据迁移)
- [实践示例](#实践示例)
- [本章小结](#本章小结)
- [参考资料](#参考资料)


## 索引操作

索引是 Elasticsearch 中存储数据的逻辑容器。在进行文档操作之前,通常需要先创建和配置索引。

### 创建索引

#### 基本创建

```json
// 创建一个简单的索引
PUT /products
```

#### 带设置的创建

```json
// 创建索引并指定设置
PUT /products
{
  "settings": {
    "number_of_shards": 3,
    "number_of_replicas": 1,
    "refresh_interval": "1s"
  }
}
```

#### 带映射的创建

```json
// 创建索引并定义映射
PUT /products
{
  "settings": {
    "number_of_shards": 3,
    "number_of_replicas": 1
  },
  "mappings": {
    "properties": {
      "name": {
        "type": "text",
        "analyzer": "standard",
        "fields": {
          "keyword": {
            "type": "keyword"
          }
        }
      },
      "price": {
        "type": "double"
      },
      "category": {
        "type": "keyword"
      },
      "description": {
        "type": "text"
      },
      "stock": {
        "type": "integer"
      },
      "created_at": {
        "type": "date",
        "format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis"
      },
      "is_available": {
        "type": "boolean"
      },
      "tags": {
        "type": "keyword"
      }
    }
  }
}
```


### 查看索引

```json
// 查看单个索引信息
GET /products

// 查看索引设置
GET /products/_settings

// 查看索引映射
GET /products/_mapping

// 查看所有索引(表格格式)
GET /_cat/indices?v

// 查看索引详细信息
GET /_cat/indices/products?v&h=index,health,status,pri,rep,docs.count,store.size
```

### 更新索引设置

```json
// 更新动态设置
PUT /products/_settings
{
  "index": {
    "number_of_replicas": 2,
    "refresh_interval": "30s"
  }
}

// 关闭索引后更新静态设置
POST /products/_close

PUT /products/_settings
{
  "index": {
    "analysis": {
      "analyzer": {
        "my_analyzer": {
          "type": "custom",
          "tokenizer": "standard",
          "filter": ["lowercase"]
        }
      }
    }
  }
}

POST /products/_open
```

![image.png](https://dl.playground.lazycat.cloud/guidelines/459/a5101427-706d-42e2-ac73-500b751b05ca.png "image.png")

### 删除索引

```json
// 删除单个索引
DELETE /products

// 删除多个索引
DELETE /products,orders

// 使用通配符删除(谨慎使用)
DELETE /logs-2024-*

// 删除所有索引(极度危险,生产环境禁用)
// DELETE /_all
// DELETE /*
```

> ⚠️ **警告**:删除索引是不可逆操作,请确保已备份重要数据。生产环境建议设置 `action.destructive_requires_name: true` 禁止通配符删除。

### 索引别名

别名是指向一个或多个索引的虚拟名称,可用于零停机切换索引。

```json
// 创建别名
POST /_aliases
{
  "actions": [
    {
      "add": {
        "index": "products_v1",
        "alias": "products"
      }
    }
  ]
}

// 切换别名(原子操作)
POST /_aliases
{
  "actions": [
    { "remove": { "index": "products_v1", "alias": "products" } },
    { "add": { "index": "products_v2", "alias": "products" } }
  ]
}

// 查看别名
GET /_cat/aliases?v
GET /products/_alias
```

### 索引开关

```json
// 关闭索引(节省资源,不可读写)
POST /products/_close

// 打开索引
POST /products/_open

// 查看索引状态
GET /_cat/indices/products?v
```

## 文档操作

### 创建文档

#### 指定 ID 创建

```json
// 使用 PUT 指定文档 ID
PUT /products/_doc/1
{
  "name": "iPhone 15 Pro",
  "price": 8999,
  "category": "手机",
  "description": "Apple 最新旗舰手机,搭载 A17 Pro 芯片",
  "stock": 100,
  "created_at": "2024-01-15",
  "is_available": true,
  "tags": ["苹果", "5G", "旗舰"]
}
```

响应示例:

```json
{
  "_index": "products",
  "_id": "1",
  "_version": 1,
  "result": "created",
  "_shards": {
    "total": 2,
    "successful": 2,
    "failed": 0
  },
  "_seq_no": 0,
  "_primary_term": 1
}
```

#### 自动生成 ID

```json
// 使用 POST 自动生成 ID
POST /products/_doc
{
  "name": "MacBook Pro 14",
  "price": 14999,
  "category": "笔记本",
  "description": "Apple M3 Pro 芯片,专业级性能",
  "stock": 50,
  "created_at": "2024-01-16",
  "is_available": true,
  "tags": ["苹果", "专业", "M3"]
}
```

#### 强制创建(防止覆盖)

```json
// 使用 _create 端点,如果文档已存在则报错
PUT /products/_create/1
{
  "name": "iPhone 15",
  "price": 7999
}

// 或使用 op_type 参数
PUT /products/_doc/1?op_type=create
{
  "name": "iPhone 15",
  "price": 7999
}
```

如果文档已存在,返回 409 冲突错误:

```json
{
  "error": {
    "type": "version_conflict_engine_exception",
    "reason": "[1]: version conflict, document already exists"
  },
  "status": 409
}
```


### 读取文档

#### 获取单个文档

```json
// 获取完整文档
GET /products/_doc/1

// 只获取 _source 字段
GET /products/_source/1

// 获取指定字段
GET /products/_doc/1?_source_includes=name,price

// 排除指定字段
GET /products/_doc/1?_source_excludes=description
```

响应示例:

```json
{
  "_index": "products",
  "_id": "1",
  "_version": 1,
  "_seq_no": 0,
  "_primary_term": 1,
  "found": true,
  "_source": {
    "name": "iPhone 15 Pro",
    "price": 8999,
    "category": "手机",
    "description": "Apple 最新旗舰手机,搭载 A17 Pro 芯片",
    "stock": 100,
    "created_at": "2024-01-15",
    "is_available": true,
    "tags": ["苹果", "5G", "旗舰"]
  }
}
```

#### 检查文档是否存在

```json
// 使用 HEAD 请求检查文档存在性
HEAD /products/_doc/1
```

存在返回 `200 OK`,不存在返回 `404 Not Found`。

#### 批量获取文档

```json
// 使用 _mget 批量获取
GET /products/_mget
{
  "ids": ["1", "2", "3"]
}

// 跨索引批量获取
GET /_mget
{
  "docs": [
    { "_index": "products", "_id": "1" },
    { "_index": "orders", "_id": "100" }
  ]
}

// 指定返回字段
GET /products/_mget
{
  "ids": ["1", "2"],
  "_source": ["name", "price"]
}
```

### 更新文档

#### 部分更新

```json
// 使用 _update API 部分更新
POST /products/_update/1
{
  "doc": {
    "price": 8499,
    "stock": 90
  }
}
```

#### 使用脚本更新

```json
// 使用 Painless 脚本更新
POST /products/_update/1
{
  "script": {
    "source": "ctx._source.stock -= params.quantity",
    "params": {
      "quantity": 10
    }
  }
}

// 条件更新
POST /products/_update/1
{
  "script": {
    "source": """
      if (ctx._source.stock >= params.quantity) {
        ctx._source.stock -= params.quantity;
      } else {
        ctx.op = 'noop';
      }
    """,
    "params": {
      "quantity": 5
    }
  }
}

// 添加数组元素
POST /products/_update/1
{
  "script": {
    "source": "ctx._source.tags.add(params.tag)",
    "params": {
      "tag": "热销"
    }
  }
}

// 删除数组元素
POST /products/_update/1
{
  "script": {
    "source": "ctx._source.tags.remove(ctx._source.tags.indexOf(params.tag))",
    "params": {
      "tag": "热销"
    }
  }
}
```

#### Upsert(更新或插入)

```json
// 如果文档存在则更新,不存在则创建
POST /products/_update/999
{
  "doc": {
    "price": 5999
  },
  "upsert": {
    "name": "新商品",
    "price": 5999,
    "category": "其他",
    "stock": 100,
    "created_at": "2024-01-20",
    "is_available": true
  }
}

// 使用 scripted_upsert
POST /products/_update/999
{
  "scripted_upsert": true,
  "script": {
    "source": """
      if (ctx.op == 'create') {
        ctx._source.name = params.name;
        ctx._source.price = params.price;
        ctx._source.view_count = 1;
      } else {
        ctx._source.view_count += 1;
      }
    """,
    "params": {
      "name": "新商品",
      "price": 5999
    }
  },
  "upsert": {}
}
```

#### 完全替换文档

```json
// 使用 PUT 完全替换(会删除未指定的字段)
PUT /products/_doc/1
{
  "name": "iPhone 15 Pro Max",
  "price": 9999,
  "category": "手机"
}
```

### 删除文档

#### 删除单个文档

```json
// 根据 ID 删除
DELETE /products/_doc/1
```

响应示例:

```json
{
  "_index": "products",
  "_id": "1",
  "_version": 2,
  "result": "deleted",
  "_shards": {
    "total": 2,
    "successful": 2,
    "failed": 0
  },
  "_seq_no": 1,
  "_primary_term": 1
}
```

#### 条件删除

```json
// 使用 Delete By Query 删除匹配的文档
POST /products/_delete_by_query
{
  "query": {
    "term": {
      "is_available": false
    }
  }
}

// 删除某个类别的所有商品
POST /products/_delete_by_query
{
  "query": {
    "term": {
      "category": "过期商品"
    }
  }
}

// 带冲突处理的删除
POST /products/_delete_by_query?conflicts=proceed
{
  "query": {
    "range": {
      "created_at": {
        "lt": "2023-01-01"
      }
    }
  }
}
```


## 批量操作

Bulk API 允许在单个请求中执行多个索引、更新或删除操作,大幅提高数据处理效率。

### Bulk API 基本语法

Bulk 请求使用 NDJSON(Newline Delimited JSON)格式:

```
action_and_meta_data\n
optional_source\n
action_and_meta_data\n
optional_source\n
...
```

### 批量索引

```json
POST /products/_bulk
{"index":{"_id":"1"}}
{"name":"iPhone 15","price":7999,"category":"手机","stock":100}
{"index":{"_id":"2"}}
{"name":"MacBook Pro","price":14999,"category":"笔记本","stock":50}
{"index":{"_id":"3"}}
{"name":"iPad Pro","price":8999,"category":"平板","stock":80}
{"index":{"_id":"4"}}
{"name":"AirPods Pro","price":1899,"category":"配件","stock":200}
```

### 批量混合操作

```json
POST /_bulk
{"index":{"_index":"products","_id":"10"}}
{"name":"商品A","price":999}
{"create":{"_index":"products","_id":"11"}}
{"name":"商品B","price":1999}
{"update":{"_index":"products","_id":"1"}}
{"doc":{"price":7499}}
{"delete":{"_index":"products","_id":"99"}}
```

### Bulk 操作类型

| 操作 | 说明 | 是否需要文档体 |
|------|------|----------------|
| `index` | 创建或替换文档 | 是 |
| `create` | 创建文档(已存在则失败) | 是 |
| `update` | 部分更新文档 | 是 |
| `delete` | 删除文档 | 否 |

### Bulk 响应解析

```json
{
  "took": 30,
  "errors": false,
  "items": [
    {
      "index": {
        "_index": "products",
        "_id": "1",
        "_version": 1,
        "result": "created",
        "status": 201
      }
    },
    {
      "index": {
        "_index": "products",
        "_id": "2",
        "_version": 1,
        "result": "created",
        "status": 201
      }
    }
  ]
}
```

### Bulk 性能优化

```json
// 1. 合理设置批量大小(建议 1000-5000 条或 5-15MB)
// 2. 使用 refresh=false 禁用自动刷新
POST /products/_bulk?refresh=false
{"index":{"_id":"1"}}
{"name":"商品1","price":100}
{"index":{"_id":"2"}}
{"name":"商品2","price":200}

// 3. 批量完成后手动刷新
POST /products/_refresh
```

**Bulk 最佳实践**:

| 参数 | 建议值 | 说明 |
|------|--------|------|
| 批量大小 | 1000-5000 条 | 根据文档大小调整 |
| 请求大小 | 5-15 MB | 避免过大导致内存问题 |
| 并发数 | 2-4 | 根据集群能力调整 |
| refresh | false | 批量完成后统一刷新 |

### curl 命令示例

```bash
# 从文件批量导入
curl -X POST "localhost:9200/products/_bulk" \
  -H "Content-Type: application/x-ndjson" \
  --data-binary @products.ndjson

# products.ndjson 文件内容:
# {"index":{"_id":"1"}}
# {"name":"商品1","price":100}
# {"index":{"_id":"2"}}
# {"name":"商品2","price":200}
```

## 更新操作详解

### Update API

Update API 用于部分更新文档,只修改指定字段而不影响其他字段。

```json
// 基本部分更新
POST /products/_update/1
{
  "doc": {
    "price": 7499,
    "updated_at": "2024-01-20"
  }
}

// 检测是否有实际更改
POST /products/_update/1?detect_noop=true
{
  "doc": {
    "price": 7499
  }
}

// 强制更新(即使内容相同)
POST /products/_update/1?detect_noop=false
{
  "doc": {
    "price": 7499
  }
}
```

### Update By Query

Update By Query 用于批量更新匹配条件的文档。

```json
// 将所有手机类商品价格降低 10%
POST /products/_update_by_query
{
  "query": {
    "term": {
      "category": "手机"
    }
  },
  "script": {
    "source": "ctx._source.price = ctx._source.price * 0.9",
    "lang": "painless"
  }
}

// 为所有商品添加新字段
POST /products/_update_by_query
{
  "query": {
    "match_all": {}
  },
  "script": {
    "source": "ctx._source.updated_at = params.date",
    "params": {
      "date": "2024-01-20"
    }
  }
}

// 带限制的更新
POST /products/_update_by_query?scroll_size=1000&conflicts=proceed
{
  "query": {
    "range": {
      "stock": {
        "lt": 10
      }
    }
  },
  "script": {
    "source": "ctx._source.is_available = false"
  }
}
```

### Update By Query 参数

| 参数 | 说明 | 默认值 |
|------|------|--------|
| `conflicts` | 冲突处理:`abort`(中止)或 `proceed`(继续) | abort |
| `refresh` | 完成后是否刷新 | false |
| `scroll_size` | 每批处理的文档数 | 1000 |
| `wait_for_completion` | 是否等待完成 | true |
| `requests_per_second` | 限流(每秒请求数) | -1(不限制) |

### 异步更新

```json
// 异步执行 Update By Query
POST /products/_update_by_query?wait_for_completion=false
{
  "query": {
    "match_all": {}
  },
  "script": {
    "source": "ctx._source.batch_updated = true"
  }
}

// 返回任务 ID
{
  "task": "node1:12345"
}

// 查看任务状态
GET /_tasks/node1:12345

// 取消任务
POST /_tasks/node1:12345/_cancel
```


## 数据迁移

### Reindex API

Reindex API 用于将数据从一个索引复制到另一个索引,常用于映射变更、索引重建等场景。

#### 基本重建索引

```json
// 将 products_v1 的数据复制到 products_v2
POST /_reindex
{
  "source": {
    "index": "products_v1"
  },
  "dest": {
    "index": "products_v2"
  }
}
```

#### 带查询条件的重建

```json
// 只迁移特定条件的数据
POST /_reindex
{
  "source": {
    "index": "products_v1",
    "query": {
      "term": {
        "is_available": true
      }
    }
  },
  "dest": {
    "index": "products_v2"
  }
}
```

#### 使用脚本转换数据

```json
// 迁移时转换数据
POST /_reindex
{
  "source": {
    "index": "products_v1"
  },
  "dest": {
    "index": "products_v2"
  },
  "script": {
    "source": """
      ctx._source.price_with_tax = ctx._source.price * 1.13;
      ctx._source.migrated_at = '2024-01-20';
    """
  }
}
```

#### 跨集群重建

```json
// 从远程集群迁移数据
POST /_reindex
{
  "source": {
    "remote": {
      "host": "http://remote-es:9200",
      "username": "user",
      "password": "pass"
    },
    "index": "products"
  },
  "dest": {
    "index": "products_local"
  }
}
```

#### Reindex 性能优化

```json
// 优化 Reindex 性能
POST /_reindex?wait_for_completion=false&refresh=false
{
  "source": {
    "index": "products_v1",
    "size": 5000
  },
  "dest": {
    "index": "products_v2"
  }
}

// 使用 slices 并行处理
POST /_reindex?slices=auto
{
  "source": {
    "index": "products_v1"
  },
  "dest": {
    "index": "products_v2"
  }
}
```

### 零停机索引重建流程

```
步骤 1: 创建新索引(新映射)
┌─────────────────────────────────────────┐
│  PUT /products_v2                       │
│  { "mappings": { ... } }                │
└─────────────────────────────────────────┘
                    │
                    ▼
步骤 2: 重建索引数据
┌─────────────────────────────────────────┐
│  POST /_reindex                         │
│  { "source": {"index": "products_v1"},  │
│    "dest": {"index": "products_v2"} }   │
└─────────────────────────────────────────┘
                    │
                    ▼
步骤 3: 切换别名(原子操作)
┌─────────────────────────────────────────┐
│  POST /_aliases                         │
│  { "actions": [                         │
│    {"remove": {"index": "products_v1",  │
│                "alias": "products"}},   │
│    {"add": {"index": "products_v2",     │
│             "alias": "products"}}       │
│  ]}                                     │
└─────────────────────────────────────────┘
                    │
                    ▼
步骤 4: 删除旧索引(可选)
┌─────────────────────────────────────────┐
│  DELETE /products_v1                    │
└─────────────────────────────────────────┘
```

## 实践示例

### 电商商品管理完整示例

```json
// 1. 创建商品索引
PUT /shop_products
{
  "settings": {
    "number_of_shards": 3,
    "number_of_replicas": 1,
    "refresh_interval": "1s"
  },
  "mappings": {
    "properties": {
      "product_id": { "type": "keyword" },
      "name": {
        "type": "text",
        "analyzer": "standard",
        "fields": {
          "keyword": { "type": "keyword" }
        }
      },
      "description": { "type": "text" },
      "category": { "type": "keyword" },
      "brand": { "type": "keyword" },
      "price": { "type": "double" },
      "original_price": { "type": "double" },
      "stock": { "type": "integer" },
      "sales": { "type": "integer" },
      "rating": { "type": "float" },
      "tags": { "type": "keyword" },
      "is_available": { "type": "boolean" },
      "created_at": { "type": "date" },
      "updated_at": { "type": "date" }
    }
  }
}

// 2. 批量导入商品
POST /shop_products/_bulk
{"index":{"_id":"P001"}}
{"product_id":"P001","name":"iPhone 15 Pro","description":"Apple 最新旗舰手机","category":"手机","brand":"Apple","price":8999,"original_price":9999,"stock":100,"sales":500,"rating":4.8,"tags":["5G","旗舰","热销"],"is_available":true,"created_at":"2024-01-01","updated_at":"2024-01-15"}
{"index":{"_id":"P002"}}
{"product_id":"P002","name":"华为 Mate 60 Pro","description":"华为旗舰手机,麒麟芯片","category":"手机","brand":"华为","price":6999,"original_price":7499,"stock":80,"sales":800,"rating":4.9,"tags":["5G","旗舰","国产"],"is_available":true,"created_at":"2024-01-02","updated_at":"2024-01-15"}
{"index":{"_id":"P003"}}
{"product_id":"P003","name":"MacBook Pro 14","description":"Apple M3 Pro 芯片笔记本","category":"笔记本","brand":"Apple","price":14999,"original_price":15999,"stock":50,"sales":200,"rating":4.7,"tags":["专业","M3","高性能"],"is_available":true,"created_at":"2024-01-03","updated_at":"2024-01-15"}

// 3. 查询商品
GET /shop_products/_doc/P001

// 4. 更新库存(减少库存)
POST /shop_products/_update/P001
{
  "script": {
    "source": """
      if (ctx._source.stock >= params.quantity) {
        ctx._source.stock -= params.quantity;
        ctx._source.sales += params.quantity;
        ctx._source.updated_at = params.now;
      } else {
        ctx.op = 'noop';
      }
    """,
    "params": {
      "quantity": 1,
      "now": "2024-01-20"
    }
  }
}

// 5. 批量更新价格(促销活动)
POST /shop_products/_update_by_query
{
  "query": {
    "term": {
      "brand": "Apple"
    }
  },
  "script": {
    "source": """
      ctx._source.original_price = ctx._source.price;
      ctx._source.price = ctx._source.price * 0.9;
      ctx._source.tags.add('促销');
      ctx._source.updated_at = params.now;
    """,
    "params": {
      "now": "2024-01-20"
    }
  }
}

// 6. 下架缺货商品
POST /shop_products/_update_by_query
{
  "query": {
    "range": {
      "stock": {
        "lte": 0
      }
    }
  },
  "script": {
    "source": "ctx._source.is_available = false"
  }
}

// 7. 删除过期商品
POST /shop_products/_delete_by_query
{
  "query": {
    "bool": {
      "must": [
        { "term": { "is_available": false } },
        { "range": { "updated_at": { "lt": "2023-01-01" } } }
      ]
    }
  }
}
```


### curl 命令示例

```bash
# 创建索引
curl -X PUT "localhost:9200/products" -H 'Content-Type: application/json' -d'
{
  "settings": {
    "number_of_shards": 3,
    "number_of_replicas": 1
  }
}'

# 创建文档
curl -X PUT "localhost:9200/products/_doc/1" -H 'Content-Type: application/json' -d'
{
  "name": "iPhone 15",
  "price": 7999,
  "category": "手机"
}'

# 获取文档
curl -X GET "localhost:9200/products/_doc/1"

# 更新文档
curl -X POST "localhost:9200/products/_update/1" -H 'Content-Type: application/json' -d'
{
  "doc": {
    "price": 7499
  }
}'

# 删除文档
curl -X DELETE "localhost:9200/products/_doc/1"

# 批量操作
curl -X POST "localhost:9200/products/_bulk" -H 'Content-Type: application/x-ndjson' -d'
{"index":{"_id":"1"}}
{"name":"商品1","price":100}
{"index":{"_id":"2"}}
{"name":"商品2","price":200}
'
```

## 本章小结

本章详细介绍了 Elasticsearch 的 CRUD 操作:

1. **索引操作**:
   - 创建索引时可指定设置和映射
   - 动态设置可随时修改,静态设置需关闭索引后修改
   - 使用别名实现零停机索引切换

2. **文档操作**:
   - PUT 指定 ID 创建,POST 自动生成 ID
   - `_create` 端点防止覆盖已存在文档
   - `_mget` 批量获取多个文档

3. **批量操作**:
   - Bulk API 支持 index、create、update、delete 四种操作
   - 建议批量大小 1000-5000 条或 5-15MB
   - 使用 `refresh=false` 提高批量写入性能

4. **更新操作**:
   - Update API 支持部分更新和脚本更新
   - Update By Query 批量更新匹配条件的文档
   - Upsert 实现更新或插入逻辑

5. **数据迁移**:
   - Reindex API 用于索引重建和数据迁移
   - 支持查询过滤、脚本转换、跨集群迁移
   - 配合别名实现零停机迁移

## 参考资料

- [Index APIs](https://www.elastic.co/guide/en/elasticsearch/reference/current/indices.html)
- [Document APIs](https://www.elastic.co/guide/en/elasticsearch/reference/current/docs.html)
- [Bulk API](https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html)
- [Update API](https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-update.html)
- [Update By Query API](https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-update-by-query.html)
- [Reindex API](https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-reindex.html)

评论

0

暂无评论

说点什么呢~
收藏
0
0
0