Elasticsearch搜索引擎优化:从索引设计到查询性能调优的完整指南
Elasticsearch作为当今最流行的搜索引擎,在处理大规模数据搜索、日志分析、实时分析等场景中发挥着重要作用。本文将深入探讨Elasticsearch的优化策略,从索引设计到查询性能调优,提供完整的优化指南。
Elasticsearch架构概述
核心概念
graph TB
subgraph "Elasticsearch集群"
subgraph "Master节点"
M1[Master Node 1]
M2[Master Node 2]
M3[Master Node 3]
end
subgraph "数据节点"
D1[Data Node 1]
D2[Data Node 2]
D3[Data Node 3]
D4[Data Node 4]
end
subgraph "协调节点"
C1[Coordinating Node 1]
C2[Coordinating Node 2]
end
subgraph "摄取节点"
I1[Ingest Node 1]
I2[Ingest Node 2]
end
end
Client[客户端应用] --> C1
Client --> C2
C1 --> D1
C1 --> D2
C2 --> D3
C2 --> D4
I1 --> D1
I2 --> D2
M1 -.-> M2
M2 -.-> M3
M3 -.-> M1
节点角色配置
# elasticsearch.yml - Master节点配置
cluster.name: production-cluster
node.name: master-node-1
node.roles: [master]
discovery.seed_hosts: ["master-node-1", "master-node-2", "master-node-3"]
cluster.initial_master_nodes: ["master-node-1", "master-node-2", "master-node-3"]
# 内存配置
bootstrap.memory_lock: true
indices.memory.index_buffer_size: 10%
# 网络配置
network.host: 0.0.0.0
http.port: 9200
transport.port: 9300
# 安全配置
xpack.security.enabled: true
xpack.security.transport.ssl.enabled: true
xpack.security.http.ssl.enabled: true
---
# elasticsearch.yml - 数据节点配置
cluster.name: production-cluster
node.name: data-node-1
node.roles: [data, data_content, data_hot, data_warm, data_cold]
# 数据路径配置
path.data: ["/data1/elasticsearch", "/data2/elasticsearch"]
path.logs: "/var/log/elasticsearch"
# 内存配置
bootstrap.memory_lock: true
indices.memory.index_buffer_size: 20%
indices.memory.min_index_buffer_size: 96mb
# 线程池配置
thread_pool:
write:
size: 8
queue_size: 1000
search:
size: 13
queue_size: 1000
get:
size: 8
queue_size: 1000
---
# elasticsearch.yml - 协调节点配置
cluster.name: production-cluster
node.name: coordinating-node-1
node.roles: []
# 内存配置
bootstrap.memory_lock: true
indices.memory.index_buffer_size: 5%
# 搜索配置
search.max_buckets: 65536
search.allow_expensive_queries: false
索引设计优化
映射(Mapping)设计最佳实践
{
"mappings": {
"properties": {
"id": {
"type": "keyword",
"store": true
},
"title": {
"type": "text",
"analyzer": "ik_max_word",
"search_analyzer": "ik_smart",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
},
"suggest": {
"type": "completion",
"analyzer": "simple",
"preserve_separators": true,
"preserve_position_increments": true,
"max_input_length": 50
}
}
},
"content": {
"type": "text",
"analyzer": "ik_max_word",
"search_analyzer": "ik_smart",
"index_options": "positions"
},
"category": {
"type": "keyword",
"fields": {
"text": {
"type": "text",
"analyzer": "standard"
}
}
},
"tags": {
"type": "keyword"
},
"publish_date": {
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis"
},
"view_count": {
"type": "integer",
"index": false
},
"author": {
"properties": {
"id": {
"type": "keyword"
},
"name": {
"type": "text",
"analyzer": "ik_max_word",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 64
}
}
},
"email": {
"type": "keyword",
"index": false
}
}
},
"location": {
"type": "geo_point"
},
"metadata": {
"type": "object",
"enabled": false
}
}
},
"settings": {
"number_of_shards": 3,
"number_of_replicas": 1,
"refresh_interval": "30s",
"max_result_window": 50000,
"analysis": {
"analyzer": {
"ik_max_word": {
"type": "ik_max_word"
},
"ik_smart": {
"type": "ik_smart"
},
"custom_analyzer": {
"type": "custom",
"tokenizer": "standard",
"filter": [
"lowercase",
"stop",
"snowball"
]
}
}
},
"index": {
"sort.field": ["publish_date", "_score"],
"sort.order": ["desc", "desc"]
}
}
}
索引模板配置
{
"index_patterns": ["logs-*", "metrics-*"],
"template": {
"settings": {
"number_of_shards": 1,
"number_of_replicas": 1,
"refresh_interval": "5s",
"index.lifecycle.name": "logs-policy",
"index.lifecycle.rollover_alias": "logs-write"
},
"mappings": {
"properties": {
"@timestamp": {
"type": "date"
},
"level": {
"type": "keyword"
},
"message": {
"type": "text",
"analyzer": "standard"
},
"service": {
"type": "keyword"
},
"host": {
"type": "keyword"
},
"tags": {
"type": "keyword"
}
}
}
},
"composed_of": ["component-template-mappings", "component-template-settings"],
"priority": 200,
"version": 1,
"_meta": {
"description": "Template for application logs"
}
}
索引生命周期管理(ILM)
{
"policy": {
"phases": {
"hot": {
"min_age": "0ms",
"actions": {
"rollover": {
"max_size": "10gb",
"max_age": "1d",
"max_docs": 10000000
},
"set_priority": {
"priority": 100
}
}
},
"warm": {
"min_age": "1d",
"actions": {
"allocate": {
"number_of_replicas": 0,
"include": {
"data_tier": "data_warm"
}
},
"forcemerge": {
"max_num_segments": 1
},
"set_priority": {
"priority": 50
}
}
},
"cold": {
"min_age": "7d",
"actions": {
"allocate": {
"number_of_replicas": 0,
"include": {
"data_tier": "data_cold"
}
},
"set_priority": {
"priority": 0
}
}
},
"delete": {
"min_age": "30d",
"actions": {
"delete": {}
}
}
}
}
}
查询性能优化
查询DSL优化策略
// 1. 使用过滤器而非查询(Filter vs Query)
{
"query": {
"bool": {
"must": [
{
"match": {
"title": "elasticsearch optimization"
}
}
],
"filter": [
{
"term": {
"status": "published"
}
},
{
"range": {
"publish_date": {
"gte": "2024-01-01"
}
}
}
]
}
}
}
// 2. 使用constant_score查询避免评分计算
{
"query": {
"constant_score": {
"filter": {
"bool": {
"must": [
{
"term": {
"category": "technology"
}
},
{
"range": {
"view_count": {
"gte": 1000
}
}
}
]
}
},
"boost": 1.0
}
}
}
// 3. 使用multi_match查询优化
{
"query": {
"multi_match": {
"query": "elasticsearch performance",
"fields": [
"title^3",
"content^1",
"tags^2"
],
"type": "best_fields",
"tie_breaker": 0.3,
"minimum_should_match": "75%"
}
}
}
// 4. 使用function_score自定义评分
{
"query": {
"function_score": {
"query": {
"match": {
"title": "elasticsearch"
}
},
"functions": [
{
"filter": {
"range": {
"publish_date": {
"gte": "2024-01-01"
}
}
},
"weight": 2
},
{
"field_value_factor": {
"field": "view_count",
"factor": 0.1,
"modifier": "log1p",
"missing": 1
}
}
],
"score_mode": "multiply",
"boost_mode": "multiply"
}
}
}
// 5. 使用聚合优化
{
"size": 0,
"aggs": {
"categories": {
"terms": {
"field": "category",
"size": 10,
"order": {
"_count": "desc"
}
},
"aggs": {
"avg_views": {
"avg": {
"field": "view_count"
}
},
"top_articles": {
"top_hits": {
"size": 3,
"_source": ["title", "author.name"],
"sort": [
{
"view_count": {
"order": "desc"
}
}
]
}
}
}
},
"date_histogram": {
"date_histogram": {
"field": "publish_date",
"calendar_interval": "month",
"format": "yyyy-MM"
},
"aggs": {
"article_count": {
"value_count": {
"field": "id"
}
}
}
}
}
}
查询性能监控脚本
#!/usr/bin/env python3
# scripts/elasticsearch_query_monitor.py
import json
import time
import requests
import argparse
from datetime import datetime, timedelta
from collections import defaultdict
import statistics
class ElasticsearchQueryMonitor:
def __init__(self, es_host, username=None, password=None):
self.es_host = es_host.rstrip('/')
self.session = requests.Session()
if username and password:
self.session.auth = (username, password)
self.session.headers.update({
'Content-Type': 'application/json'
})
def get_cluster_stats(self):
"""获取集群统计信息"""
try:
response = self.session.get(f"{self.es_host}/_cluster/stats")
response.raise_for_status()
return response.json()
except Exception as e:
print(f"Error getting cluster stats: {e}")
return {}
def get_node_stats(self):
"""获取节点统计信息"""
try:
response = self.session.get(f"{self.es_host}/_nodes/stats")
response.raise_for_status()
return response.json()
except Exception as e:
print(f"Error getting node stats: {e}")
return {}
def get_index_stats(self, index_pattern="*"):
"""获取索引统计信息"""
try:
response = self.session.get(f"{self.es_host}/{index_pattern}/_stats")
response.raise_for_status()
return response.json()
except Exception as e:
print(f"Error getting index stats: {e}")
return {}
def get_slow_queries(self, index_pattern="*", time_range="1h"):
"""获取慢查询日志"""
query = {
"query": {
"bool": {
"must": [
{
"range": {
"@timestamp": {
"gte": f"now-{time_range}"
}
}
},
{
"exists": {
"field": "elasticsearch.slowlog.took"
}
}
]
}
},
"sort": [
{
"elasticsearch.slowlog.took": {
"order": "desc"
}
}
],
"size": 100
}
try:
response = self.session.post(
f"{self.es_host}/{index_pattern}/_search",
json=query
)
response.raise_for_status()
return response.json()
except Exception as e:
print(f"Error getting slow queries: {e}")
return {}
def analyze_query_performance(self, index_name, query_body):
"""分析查询性能"""
try:
# 执行查询并获取详细信息
response = self.session.post(
f"{self.es_host}/{index_name}/_search?explain=true&profile=true",
json=query_body
)
response.raise_for_status()
result = response.json()
analysis = {
'took_ms': result.get('took', 0),
'total_hits': result.get('hits', {}).get('total', {}).get('value', 0),
'max_score': result.get('hits', {}).get('max_score', 0),
'profile': result.get('profile', {}),
'suggestions': []
}
# 分析profile信息
if 'shards' in result.get('profile', {}):
for shard in result['profile']['shards']:
for search in shard.get('searches', []):
for query in search.get('query', []):
query_time = query.get('time_in_nanos', 0) / 1000000 # 转换为毫秒
if query_time > 100: # 超过100ms的查询
analysis['suggestions'].append({
'type': 'slow_query_component',
'component': query.get('type', 'unknown'),
'time_ms': query_time,
'suggestion': f"查询组件 {query.get('type')} 耗时 {query_time:.2f}ms,考虑优化"
})
return analysis
except Exception as e:
print(f"Error analyzing query performance: {e}")
return {}
def get_cache_stats(self):
"""获取缓存统计信息"""
try:
response = self.session.get(f"{self.es_host}/_nodes/stats/indices/query_cache,request_cache,fielddata")
response.raise_for_status()
stats = response.json()
cache_analysis = {
'query_cache': {},
'request_cache': {},
'fielddata_cache': {}
}
for node_id, node_stats in stats.get('nodes', {}).items():
indices_stats = node_stats.get('indices', {})
# Query Cache
query_cache = indices_stats.get('query_cache', {})
cache_analysis['query_cache'][node_id] = {
'memory_size_bytes': query_cache.get('memory_size_in_bytes', 0),
'total_count': query_cache.get('total_count', 0),
'hit_count': query_cache.get('hit_count', 0),
'miss_count': query_cache.get('miss_count', 0),
'cache_size': query_cache.get('cache_size', 0),
'evictions': query_cache.get('evictions', 0)
}
# Request Cache
request_cache = indices_stats.get('request_cache', {})
cache_analysis['request_cache'][node_id] = {
'memory_size_bytes': request_cache.get('memory_size_in_bytes', 0),
'hit_count': request_cache.get('hit_count', 0),
'miss_count': request_cache.get('miss_count', 0),
'evictions': request_cache.get('evictions', 0)
}
# Fielddata Cache
fielddata = indices_stats.get('fielddata', {})
cache_analysis['fielddata_cache'][node_id] = {
'memory_size_bytes': fielddata.get('memory_size_in_bytes', 0),
'evictions': fielddata.get('evictions', 0)
}
return cache_analysis
except Exception as e:
print(f"Error getting cache stats: {e}")
return {}
def generate_performance_report(self, indices=None):
"""生成性能报告"""
report = {
'timestamp': datetime.now().isoformat(),
'cluster_stats': self.get_cluster_stats(),
'node_stats': self.get_node_stats(),
'cache_stats': self.get_cache_stats(),
'index_analysis': {}
}
# 分析指定索引
if indices:
for index_name in indices:
print(f"分析索引: {index_name}")
index_stats = self.get_index_stats(index_name)
report['index_analysis'][index_name] = index_stats
return report
def print_performance_summary(self, report):
"""打印性能摘要"""
print("\n" + "="*80)
print("Elasticsearch性能监控报告")
print("="*80)
print(f"生成时间: {report['timestamp']}")
# 集群概览
cluster_stats = report.get('cluster_stats', {})
if cluster_stats:
indices_stats = cluster_stats.get('indices', {})
nodes_stats = cluster_stats.get('nodes', {})
print(f"\n集群概览:")
print(f" 节点数量: {nodes_stats.get('count', {}).get('total', 0)}")
print(f" 索引数量: {indices_stats.get('count', 0)}")
print(f" 文档总数: {indices_stats.get('docs', {}).get('count', 0):,}")
print(f" 存储大小: {self._format_bytes(indices_stats.get('store', {}).get('size_in_bytes', 0))}")
# 缓存分析
cache_stats = report.get('cache_stats', {})
if cache_stats:
print(f"\n缓存分析:")
# Query Cache
query_cache_total_memory = sum(
node_cache.get('memory_size_bytes', 0)
for node_cache in cache_stats.get('query_cache', {}).values()
)
query_cache_total_hits = sum(
node_cache.get('hit_count', 0)
for node_cache in cache_stats.get('query_cache', {}).values()
)
query_cache_total_misses = sum(
node_cache.get('miss_count', 0)
for node_cache in cache_stats.get('query_cache', {}).values()
)
if query_cache_total_hits + query_cache_total_misses > 0:
hit_rate = query_cache_total_hits / (query_cache_total_hits + query_cache_total_misses) * 100
print(f" Query Cache命中率: {hit_rate:.1f}%")
print(f" Query Cache内存使用: {self._format_bytes(query_cache_total_memory)}")
# Request Cache
request_cache_total_hits = sum(
node_cache.get('hit_count', 0)
for node_cache in cache_stats.get('request_cache', {}).values()
)
request_cache_total_misses = sum(
node_cache.get('miss_count', 0)
for node_cache in cache_stats.get('request_cache', {}).values()
)
if request_cache_total_hits + request_cache_total_misses > 0:
hit_rate = request_cache_total_hits / (request_cache_total_hits + request_cache_total_misses) * 100
print(f" Request Cache命中率: {hit_rate:.1f}%")
# 节点性能分析
node_stats = report.get('node_stats', {})
if node_stats and 'nodes' in node_stats:
print(f"\n节点性能:")
for node_id, node_data in node_stats['nodes'].items():
node_name = node_data.get('name', node_id)
jvm_stats = node_data.get('jvm', {})
indices_stats = node_data.get('indices', {})
print(f" 节点: {node_name}")
# JVM内存使用
if 'mem' in jvm_stats:
heap_used_percent = jvm_stats['mem'].get('heap_used_percent', 0)
heap_max = jvm_stats['mem'].get('heap_max_in_bytes', 0)
print(f" JVM堆内存使用: {heap_used_percent}% / {self._format_bytes(heap_max)}")
# 搜索性能
if 'search' in indices_stats:
search_stats = indices_stats['search']
query_total = search_stats.get('query_total', 0)
query_time_ms = search_stats.get('query_time_in_millis', 0)
avg_query_time = query_time_ms / query_total if query_total > 0 else 0
print(f" 平均查询时间: {avg_query_time:.2f}ms")
print(f" 查询总数: {query_total:,}")
# 索引性能
if 'indexing' in indices_stats:
indexing_stats = indices_stats['indexing']
index_total = indexing_stats.get('index_total', 0)
index_time_ms = indexing_stats.get('index_time_in_millis', 0)
avg_index_time = index_time_ms / index_total if index_total > 0 else 0
print(f" 平均索引时间: {avg_index_time:.2f}ms")
def _format_bytes(self, bytes_value):
"""格式化字节数"""
if bytes_value == 0:
return "0 B"
units = ['B', 'KB', 'MB', 'GB', 'TB']
unit_index = 0
while bytes_value >= 1024 and unit_index < len(units) - 1:
bytes_value /= 1024
unit_index += 1
return f"{bytes_value:.1f} {units[unit_index]}"
def monitor_queries(self, duration_minutes=60, interval_seconds=30):
"""持续监控查询性能"""
print(f"开始监控查询性能,持续时间: {duration_minutes} 分钟")
end_time = datetime.now() + timedelta(minutes=duration_minutes)
query_times = []
while datetime.now() < end_time:
try:
# 获取当前性能指标
node_stats = self.get_node_stats()
current_metrics = {}
for node_id, node_data in node_stats.get('nodes', {}).items():
search_stats = node_data.get('indices', {}).get('search', {})
query_total = search_stats.get('query_total', 0)
query_time_ms = search_stats.get('query_time_in_millis', 0)
if query_total > 0:
avg_query_time = query_time_ms / query_total
current_metrics[node_id] = avg_query_time
query_times.append(avg_query_time)
# 打印当前状态
if current_metrics:
avg_time = statistics.mean(current_metrics.values())
print(f"[{datetime.now().strftime('%H:%M:%S')}] 平均查询时间: {avg_time:.2f}ms")
time.sleep(interval_seconds)
except KeyboardInterrupt:
print("\n监控已停止")
break
except Exception as e:
print(f"监控过程中发生错误: {e}")
time.sleep(interval_seconds)
# 生成监控摘要
if query_times:
print(f"\n监控摘要:")
print(f" 平均查询时间: {statistics.mean(query_times):.2f}ms")
print(f" 最大查询时间: {max(query_times):.2f}ms")
print(f" 最小查询时间: {min(query_times):.2f}ms")
print(f" 查询时间标准差: {statistics.stdev(query_times):.2f}ms")
def main():
parser = argparse.ArgumentParser(description='Elasticsearch查询性能监控工具')
parser.add_argument('--host', required=True, help='Elasticsearch主机地址')
parser.add_argument('--username', help='用户名')
parser.add_argument('--password', help='密码')
parser.add_argument('--action', choices=['report', 'monitor'], default='report', help='执行动作')
parser.add_argument('--duration', type=int, default=60, help='监控持续时间(分钟)')
parser.add_argument('--interval', type=int, default=30, help='监控间隔(秒)')
parser.add_argument('--indices', help='要分析的索引模式,用逗号分隔')
args = parser.parse_args()
monitor = ElasticsearchQueryMonitor(args.host, args.username, args.password)
try:
if args.action == 'report':
indices = args.indices.split(',') if args.indices else None
report = monitor.generate_performance_report(indices)
monitor.print_performance_summary(report)
# 保存报告
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
filename = f"elasticsearch_performance_report_{timestamp}.json"
with open(filename, 'w', encoding='utf-8') as f:
json.dump(report, f, indent=2, ensure_ascii=False, default=str)
print(f"\n详细报告已保存到: {filename}")
elif args.action == 'monitor':
monitor.monitor_queries(args.duration, args.interval)
except Exception as e:
print(f"执行过程中发生错误: {e}")
if __name__ == "__main__":
main()
集群配置优化
JVM配置优化
#!/bin/bash
# scripts/elasticsearch_jvm_tuning.sh
# JVM堆内存配置
# 设置为物理内存的50%,但不超过32GB
PHYSICAL_MEMORY_GB=$(free -g | awk '/^Mem:/{print $2}')
HEAP_SIZE_GB=$((PHYSICAL_MEMORY_GB / 2))
if [ $HEAP_SIZE_GB -gt 32 ]; then
HEAP_SIZE_GB=32
fi
# 生成jvm.options配置
cat > /etc/elasticsearch/jvm.options.d/heap.options << EOF
# 堆内存配置
-Xms${HEAP_SIZE_GB}g
-Xmx${HEAP_SIZE_GB}g
# GC配置
-XX:+UseG1GC
-XX:G1HeapRegionSize=16m
-XX:+UseG1GC
-XX:+UnlockExperimentalVMOptions
-XX:+UseG1GC
-XX:MaxGCPauseMillis=200
-XX:+DisableExplicitGC
# 内存配置
-XX:+AlwaysPreTouch
-Xss1m
-Djava.awt.headless=true
-Dfile.encoding=UTF-8
-Djna.nosys=true
-XX:-OmitStackTraceInFastThrow
-Dio.netty.noUnsafe=true
-Dio.netty.noKeySetOptimization=true
-Dio.netty.recycler.maxCapacityPerThread=0
-Dlog4j.shutdownHookEnabled=false
-Dlog4j2.disable.jmx=true
# 错误处理
-XX:+HeapDumpOnOutOfMemoryError
-XX:HeapDumpPath=/var/lib/elasticsearch
-XX:ErrorFile=/var/log/elasticsearch/hs_err_pid%p.log
# GC日志
-Xlog:gc*,gc+age=trace,safepoint:gc.log:time,level,tags
-XX:+UseGCLogFileRotation
-XX:NumberOfGCLogFiles=32
-XX:GCLogFileSize=64m
EOF
echo "JVM配置已生成: /etc/elasticsearch/jvm.options.d/heap.options"
echo "建议的堆内存大小: ${HEAP_SIZE_GB}GB"
系统级优化脚本
#!/bin/bash
# scripts/elasticsearch_system_optimization.sh
set -euo pipefail
# 日志函数
log() {
echo "[$(date '+%Y-%m-%d %H:%M:%S')] $1" >&2
}
# 优化文件描述符限制
optimize_file_descriptors() {
log "优化文件描述符限制..."
# 设置系统级限制
cat >> /etc/security/limits.conf << EOF
elasticsearch soft nofile 65536
elasticsearch hard nofile 65536
elasticsearch soft nproc 4096
elasticsearch hard nproc 4096
elasticsearch soft memlock unlimited
elasticsearch hard memlock unlimited
EOF
# 设置systemd服务限制
mkdir -p /etc/systemd/system/elasticsearch.service.d
cat > /etc/systemd/system/elasticsearch.service.d/override.conf << EOF
[Service]
LimitNOFILE=65536
LimitNPROC=4096
LimitMEMLOCK=infinity
EOF
systemctl daemon-reload
log "文件描述符限制优化完成"
}
# 优化虚拟内存
optimize_virtual_memory() {
log "优化虚拟内存设置..."
# 设置vm.max_map_count
echo 'vm.max_map_count=262144' >> /etc/sysctl.conf
sysctl -p
# 设置swappiness
echo 'vm.swappiness=1' >> /etc/sysctl.conf
sysctl -p
log "虚拟内存优化完成"
}
# 优化磁盘I/O
optimize_disk_io() {
log "优化磁盘I/O设置..."
# 获取数据磁盘设备
DATA_DEVICES=$(lsblk -no NAME,MOUNTPOINT | grep '/data' | awk '{print $1}' | sed 's/[0-9]*$//')
for device in $DATA_DEVICES; do
if [ -b "/dev/$device" ]; then
# 设置I/O调度器为deadline或noop
echo deadline > /sys/block/$device/queue/scheduler
# 设置读取预读
echo 128 > /sys/block/$device/queue/read_ahead_kb
# 设置队列深度
echo 32 > /sys/block/$device/queue/nr_requests
log "已优化设备 /dev/$device 的I/O设置"
fi
done
}
# 优化网络设置
optimize_network() {
log "优化网络设置..."
cat >> /etc/sysctl.conf << EOF
# 网络优化
net.core.rmem_default = 262144
net.core.rmem_max = 16777216
net.core.wmem_default = 262144
net.core.wmem_max = 16777216
net.ipv4.tcp_rmem = 4096 65536 16777216
net.ipv4.tcp_wmem = 4096 65536 16777216
net.core.netdev_max_backlog = 5000
net.ipv4.tcp_congestion_control = bbr
EOF
sysctl -p
log "网络优化完成"
}
# 创建监控脚本
create_monitoring_script() {
log "创建系统监控脚本..."
cat > /usr/local/bin/elasticsearch_system_monitor.sh << 'EOF'
#!/bin/bash
# Elasticsearch系统监控脚本
# 检查JVM堆内存使用
check_jvm_memory() {
local es_host="${1:-localhost:9200}"
curl -s "$es_host/_nodes/stats/jvm" | jq -r '
.nodes[] |
"节点: " + .name +
" | 堆内存使用: " + (.jvm.mem.heap_used_percent | tostring) + "%" +
" | GC次数: " + (.jvm.gc.collectors.young.collection_count | tostring)
'
}
# 检查磁盘使用情况
check_disk_usage() {
local es_host="${1:-localhost:9200}"
curl -s "$es_host/_nodes/stats/fs" | jq -r '
.nodes[] |
"节点: " + .name +
" | 磁盘使用: " + ((.fs.total.total_in_bytes - .fs.total.available_in_bytes) * 100 / .fs.total.total_in_bytes | floor | tostring) + "%"
'
}
# 检查查询性能
check_query_performance() {
local es_host="${1:-localhost:9200}"
curl -s "$es_host/_nodes/stats/indices/search" | jq -r '
.nodes[] |
"节点: " + .name +
" | 查询总数: " + (.indices.search.query_total | tostring) +
" | 平均查询时间: " + ((.indices.search.query_time_in_millis / .indices.search.query_total) | floor | tostring) + "ms"
'
}
# 检查集群健康状态
check_cluster_health() {
local es_host="${1:-localhost:9200}"
curl -s "$es_host/_cluster/health" | jq -r '
"集群状态: " + .status +
" | 节点数: " + (.number_of_nodes | tostring) +
" | 数据节点数: " + (.number_of_data_nodes | tostring) +
" | 活跃分片: " + (.active_shards | tostring) +
" | 重定位分片: " + (.relocating_shards | tostring) +
" | 初始化分片: " + (.initializing_shards | tostring) +
" | 未分配分片: " + (.unassigned_shards | tostring)
'
}
# 主函数
main() {
local es_host="${1:-localhost:9200}"
echo "Elasticsearch系统监控报告 - $(date)"
echo "=================================="
echo -e "\n集群健康状态:"
check_cluster_health "$es_host"
echo -e "\nJVM内存使用:"
check_jvm_memory "$es_host"
echo -e "\n磁盘使用情况:"
check_disk_usage "$es_host"
echo -e "\n查询性能:"
check_query_performance "$es_host"
}
main "$@"
EOF
chmod +x /usr/local/bin/elasticsearch_system_monitor.sh
log "系统监控脚本已创建: /usr/local/bin/elasticsearch_system_monitor.sh"
}
# 主函数
main() {
local action="${1:-all}"
case "$action" in
"fd")
optimize_file_descriptors
;;
"vm")
optimize_virtual_memory
;;
"io")
optimize_disk_io
;;
"network")
optimize_network
;;
"monitor")
create_monitoring_script
;;
"all")
optimize_file_descriptors
optimize_virtual_memory
optimize_disk_io
optimize_network
create_monitoring_script
log "所有系统优化完成,请重启Elasticsearch服务"
;;
*)
echo "用法: $0 {fd|vm|io|network|monitor|all}"
echo " fd - 优化文件描述符限制"
echo " vm - 优化虚拟内存设置"
echo " io - 优化磁盘I/O设置"
echo " network - 优化网络设置"
echo " monitor - 创建监控脚本"
echo " all - 执行所有优化"
exit 1
;;
esac
}
# 检查root权限
if [[ $EUID -ne 0 ]]; then
echo "此脚本需要root权限运行"
exit 1
fi
# 执行主函数
main "$@"
搜索相关性优化
自定义分析器配置
{
"settings": {
"analysis": {
"char_filter": {
"html_strip_filter": {
"type": "html_strip"
},
"mapping_filter": {
"type": "mapping",
"mappings": [
"& => and",
"| => or"
]
}
},
"tokenizer": {
"custom_keyword_tokenizer": {
"type": "keyword",
"buffer_size": 256
},
"custom_pattern_tokenizer": {
"type": "pattern",
"pattern": "\\W+",
"lowercase": true
}
},
"filter": {
"chinese_stop": {
"type": "stop",
"stopwords": ["的", "了", "在", "是", "我", "有", "和", "就", "不", "人", "都", "一", "一个", "上", "也", "很", "到", "说", "要", "去", "你", "会", "着", "没有", "看", "好", "自己", "这"]
},
"english_stop": {
"type": "stop",
"stopwords": "_english_"
},
"synonym_filter": {
"type": "synonym",
"synonyms": [
"elasticsearch,es,elastic search",
"database,db,数据库",
"optimization,optimisation,优化",
"performance,性能"
]
},
"custom_stemmer": {
"type": "stemmer",
"language": "english"
}
},
"analyzer": {
"chinese_analyzer": {
"type": "custom",
"char_filter": ["html_strip_filter"],
"tokenizer": "ik_max_word",
"filter": [
"lowercase",
"chinese_stop",
"synonym_filter"
]
},
"english_analyzer": {
"type": "custom",
"char_filter": ["html_strip_filter", "mapping_filter"],
"tokenizer": "standard",
"filter": [
"lowercase",
"english_stop",
"synonym_filter",
"custom_stemmer"
]
},
"search_analyzer": {
"type": "custom",
"tokenizer": "ik_smart",
"filter": [
"lowercase",
"synonym_filter"
]
}
}
}
}
}
搜索相关性调优脚本
#!/usr/bin/env python3
# scripts/elasticsearch_relevance_tuning.py
import json
import requests
import argparse
from datetime import datetime
import math
class ElasticsearchRelevanceTuner:
def __init__(self, es_host, username=None, password=None):
self.es_host = es_host.rstrip('/')
self.session = requests.Session()
if username and password:
self.session.auth = (username, password)
self.session.headers.update({
'Content-Type': 'application/json'
})
def analyze_search_results(self, index_name, query, expected_results=None):
"""分析搜索结果相关性"""
search_body = {
"query": query,
"size": 20,
"explain": True
}
try:
response = self.session.post(
f"{self.es_host}/{index_name}/_search",
json=search_body
)
response.raise_for_status()
results = response.json()
analysis = {
'total_hits': results.get('hits', {}).get('total', {}).get('value', 0),
'max_score': results.get('hits', {}).get('max_score', 0),
'results': [],
'score_distribution': {},
'relevance_metrics': {}
}
scores = []
for hit in results.get('hits', {}).get('hits', []):
score = hit.get('_score', 0)
scores.append(score)
result_info = {
'id': hit.get('_id'),
'score': score,
'source': hit.get('_source', {}),
'explanation': hit.get('_explanation', {})
}
analysis['results'].append(result_info)
# 计算分数分布
if scores:
analysis['score_distribution'] = {
'min': min(scores),
'max': max(scores),
'avg': sum(scores) / len(scores),
'std_dev': math.sqrt(sum((x - sum(scores) / len(scores)) ** 2 for x in scores) / len(scores))
}
# 计算相关性指标
if expected_results:
analysis['relevance_metrics'] = self._calculate_relevance_metrics(
[r['id'] for r in analysis['results'][:10]],
expected_results
)
return analysis
except Exception as e:
print(f"Error analyzing search results: {e}")
return {}
def _calculate_relevance_metrics(self, actual_results, expected_results):
"""计算相关性指标(精确率、召回率、NDCG等)"""
# 计算精确率@K
def precision_at_k(actual, expected, k):
actual_k = actual[:k]
relevant_retrieved = len(set(actual_k) & set(expected))
return relevant_retrieved / k if k > 0 else 0
# 计算召回率@K
def recall_at_k(actual, expected, k):
actual_k = actual[:k]
relevant_retrieved = len(set(actual_k) & set(expected))
return relevant_retrieved / len(expected) if len(expected) > 0 else 0
# 计算NDCG@K
def ndcg_at_k(actual, expected, k):
actual_k = actual[:k]
dcg = 0
for i, doc_id in enumerate(actual_k):
if doc_id in expected:
relevance = 1 # 简化的相关性评分
dcg += relevance / math.log2(i + 2)
# 理想DCG
idcg = sum(1 / math.log2(i + 2) for i in range(min(k, len(expected))))
return dcg / idcg if idcg > 0 else 0
metrics = {}
for k in [1, 3, 5, 10]:
metrics[f'precision@{k}'] = precision_at_k(actual_results, expected_results, k)
metrics[f'recall@{k}'] = recall_at_k(actual_results, expected_results, k)
metrics[f'ndcg@{k}'] = ndcg_at_k(actual_results, expected_results, k)
return metrics
def optimize_query_weights(self, index_name, test_queries):
"""优化查询权重"""
optimization_results = []
for test_case in test_queries:
query_text = test_case['query']
expected_results = test_case.get('expected_results', [])
print(f"优化查询: {query_text}")
# 测试不同的字段权重组合
weight_combinations = [
{"title": 3, "content": 1, "tags": 2},
{"title": 5, "content": 1, "tags": 3},
{"title": 2, "content": 1, "tags": 1},
{"title": 4, "content": 2, "tags": 2}
]
best_combination = None
best_score = 0
for weights in weight_combinations:
query = {
"multi_match": {
"query": query_text,
"fields": [f"{field}^{weight}" for field, weight in weights.items()],
"type": "best_fields"
}
}
analysis = self.analyze_search_results(index_name, query, expected_results)
if analysis and 'relevance_metrics' in analysis:
# 使用NDCG@10作为主要评估指标
ndcg_score = analysis['relevance_metrics'].get('ndcg@10', 0)
if ndcg_score > best_score:
best_score = ndcg_score
best_combination = weights
optimization_results.append({
'query': query_text,
'best_weights': best_combination,
'best_ndcg_score': best_score
})
return optimization_results
def create_custom_scoring_query(self, base_query, boost_factors):
"""创建自定义评分查询"""
function_score_query = {
"function_score": {
"query": base_query,
"functions": [],
"score_mode": "multiply",
"boost_mode": "multiply"
}
}
# 添加时间衰减函数
if 'time_decay' in boost_factors:
function_score_query["function_score"]["functions"].append({
"gauss": {
"publish_date": {
"origin": "now",
"scale": boost_factors['time_decay'].get('scale', '30d'),
"decay": boost_factors['time_decay'].get('decay', 0.5)
}
}
})
# 添加字段值因子
if 'field_factors' in boost_factors:
for field, config in boost_factors['field_factors'].items():
function_score_query["function_score"]["functions"].append({
"field_value_factor": {
"field": field,
"factor": config.get('factor', 1.0),
"modifier": config.get('modifier', 'none'),
"missing": config.get('missing', 1)
}
})
# 添加脚本评分
if 'script_score' in boost_factors:
function_score_query["function_score"]["functions"].append({
"script_score": {
"script": {
"source": boost_factors['script_score']
}
}
})
return function_score_query
def benchmark_queries(self, index_name, queries, iterations=10):
"""基准测试查询性能"""
benchmark_results = []
for query_name, query_body in queries.items():
print(f"基准测试查询: {query_name}")
times = []
for i in range(iterations):
start_time = datetime.now()
try:
response = self.session.post(
f"{self.es_host}/{index_name}/_search",
json=query_body
)
response.raise_for_status()
end_time = datetime.now()
duration_ms = (end_time - start_time).total_seconds() * 1000
times.append(duration_ms)
except Exception as e:
print(f"查询执行失败: {e}")
continue
if times:
benchmark_results.append({
'query_name': query_name,
'avg_time_ms': sum(times) / len(times),
'min_time_ms': min(times),
'max_time_ms': max(times),
'std_dev_ms': math.sqrt(sum((x - sum(times) / len(times)) ** 2 for x in times) / len(times)),
'iterations': len(times)
})
return benchmark_results
def generate_optimization_recommendations(self, index_name):
"""生成优化建议"""
recommendations = []
try:
# 获取索引设置和映射
response = self.session.get(f"{self.es_host}/{index_name}")
response.raise_for_status()
index_info = response.json()
index_settings = index_info[index_name]['settings']['index']
index_mappings = index_info[index_name]['mappings']
# 分析分片配置
num_shards = int(index_settings.get('number_of_shards', 1))
num_replicas = int(index_settings.get('number_of_replicas', 1))
# 获取索引统计信息
stats_response = self.session.get(f"{self.es_host}/{index_name}/_stats")
stats_response.raise_for_status()
index_stats = stats_response.json()
total_docs = index_stats['indices'][index_name]['total']['docs']['count']
index_size_bytes = index_stats['indices'][index_name]['total']['store']['size_in_bytes']
# 分片大小建议
avg_shard_size_gb = (index_size_bytes / num_shards) / (1024**3)
if avg_shard_size_gb > 50:
recommendations.append({
'type': 'shard_size',
'priority': 'high',
'message': f"分片过大 ({avg_shard_size_gb:.1f}GB),建议增加分片数量"
})
elif avg_shard_size_gb < 1:
recommendations.append({
'type': 'shard_size',
'priority': 'medium',
'message': f"分片过小 ({avg_shard_size_gb:.1f}GB),建议减少分片数量"
})
# 刷新间隔建议
refresh_interval = index_settings.get('refresh_interval', '1s')
if refresh_interval == '1s' and total_docs > 1000000:
recommendations.append({
'type': 'refresh_interval',
'priority': 'medium',
'message': "大索引建议增加refresh_interval以提高索引性能"
})
# 映射分析
properties = index_mappings.get('properties', {})
# 检查未使用的字段
for field_name, field_config in properties.items():
if field_config.get('index') is False and field_config.get('type') not in ['geo_point', 'geo_shape']:
recommendations.append({
'type': 'unused_field',
'priority': 'low',
'message': f"字段 {field_name} 未被索引,如果不需要搜索可以考虑移除"
})
# 检查text字段的分析器配置
for field_name, field_config in properties.items():
if field_config.get('type') == 'text':
if 'analyzer' not in field_config:
recommendations.append({
'type': 'analyzer_missing',
'priority': 'medium',
'message': f"text字段 {field_name} 未指定分析器,建议配置合适的分析器"
})
return recommendations
except Exception as e:
print(f"Error generating recommendations: {e}")
return []
def print_optimization_report(self, index_name, benchmark_results, recommendations):
"""打印优化报告"""
print("\n" + "="*80)
print("Elasticsearch搜索相关性优化报告")
print("="*80)
print(f"索引: {index_name}")
print(f"生成时间: {datetime.now().isoformat()}")
# 基准测试结果
if benchmark_results:
print(f"\n查询性能基准测试:")
print("-" * 40)
for result in benchmark_results:
print(f"查询: {result['query_name']}")
print(f" 平均耗时: {result['avg_time_ms']:.2f}ms")
print(f" 最小耗时: {result['min_time_ms']:.2f}ms")
print(f" 最大耗时: {result['max_time_ms']:.2f}ms")
print(f" 标准差: {result['std_dev_ms']:.2f}ms")
print(f" 测试次数: {result['iterations']}")
print()
# 优化建议
if recommendations:
print(f"优化建议:")
print("-" * 40)
high_priority = [r for r in recommendations if r['priority'] == 'high']
medium_priority = [r for r in recommendations if r['priority'] == 'medium']
low_priority = [r for r in recommendations if r['priority'] == 'low']
if high_priority:
print("高优先级:")
for rec in high_priority:
print(f" 🔴 {rec['message']}")
if medium_priority:
print("中优先级:")
for rec in medium_priority:
print(f" 🟡 {rec['message']}")
if low_priority:
print("低优先级:")
for rec in low_priority:
print(f" 🟢 {rec['message']}")
def main():
parser = argparse.ArgumentParser(description='Elasticsearch搜索相关性调优工具')
parser.add_argument('--host', required=True, help='Elasticsearch主机地址')
parser.add_argument('--username', help='用户名')
parser.add_argument('--password', help='密码')
parser.add_argument('--index', required=True, help='要分析的索引名称')
parser.add_argument('--queries-file', help='包含测试查询的JSON文件')
parser.add_argument('--benchmark', action='store_true', help='执行查询基准测试')
parser.add_argument('--recommendations', action='store_true', help='生成优化建议')
args = parser.parse_args()
tuner = ElasticsearchRelevanceTuner(args.host, args.username, args.password)
try:
benchmark_results = []
recommendations = []
if args.benchmark and args.queries_file:
# 加载测试查询
with open(args.queries_file, 'r', encoding='utf-8') as f:
test_queries = json.load(f)
benchmark_results = tuner.benchmark_queries(args.index, test_queries)
if args.recommendations:
recommendations = tuner.generate_optimization_recommendations(args.index)
# 生成报告
tuner.print_optimization_report(args.index, benchmark_results, recommendations)
except Exception as e:
print(f"执行过程中发生错误: {e}")
if __name__ == "__main__":
main()
性能测试与基准测试
负载测试脚本
#!/bin/bash
# scripts/elasticsearch_load_test.sh
set -euo pipefail
# 配置参数
ES_HOST="${ES_HOST:-http://localhost:9200}"
INDEX_NAME="${INDEX_NAME:-test-index}"
CONCURRENT_USERS="${CONCURRENT_USERS:-10}"
TEST_DURATION="${TEST_DURATION:-300}"
RAMP_UP_TIME="${RAMP_UP_TIME:-60}"
# 日志函数
log() {
echo "[$(date '+%Y-%m-%d %H:%M:%S')] $1" >&2
}
# 创建测试数据
create_test_data() {
local num_docs="${1:-10000}"
log "创建测试数据,文档数量: $num_docs"
# 创建索引映射
curl -X PUT "$ES_HOST/$INDEX_NAME" -H 'Content-Type: application/json' -d '{
"settings": {
"number_of_shards": 3,
"number_of_replicas": 1,
"refresh_interval": "30s"
},
"mappings": {
"properties": {
"id": {"type": "keyword"},
"title": {
"type": "text",
"analyzer": "standard",
"fields": {
"keyword": {"type": "keyword", "ignore_above": 256}
}
},
"content": {"type": "text", "analyzer": "standard"},
"category": {"type": "keyword"},
"tags": {"type": "keyword"},
"publish_date": {"type": "date"},
"view_count": {"type": "integer"},
"rating": {"type": "float"}
}
}
}'
# 生成批量插入数据
python3 << EOF
import json
import random
from datetime import datetime, timedelta
categories = ["technology", "science", "business", "sports", "entertainment"]
tags_pool = ["elasticsearch", "database", "search", "performance", "optimization", "big data", "analytics"]
bulk_data = []
for i in range($num_docs):
doc_id = f"doc_{i:06d}"
# 创建索引操作
bulk_data.append(json.dumps({"index": {"_id": doc_id}}))
# 创建文档数据
doc = {
"id": doc_id,
"title": f"Test Document {i} - Sample Title with Keywords",
"content": f"This is test content for document {i}. It contains various keywords for testing search functionality. Lorem ipsum dolor sit amet, consectetur adipiscing elit.",
"category": random.choice(categories),
"tags": random.sample(tags_pool, random.randint(1, 3)),
"publish_date": (datetime.now() - timedelta(days=random.randint(0, 365))).isoformat(),
"view_count": random.randint(0, 10000),
"rating": round(random.uniform(1.0, 5.0), 1)
}
bulk_data.append(json.dumps(doc))
# 写入文件
with open('/tmp/bulk_data.json', 'w') as f:
f.write('\n'.join(bulk_data) + '\n')
EOF
# 批量插入数据
curl -X POST "$ES_HOST/$INDEX_NAME/_bulk" \
-H 'Content-Type: application/x-ndjson' \
--data-binary @/tmp/bulk_data.json
# 刷新索引
curl -X POST "$ES_HOST/$INDEX_NAME/_refresh"
log "测试数据创建完成"
}
# 执行搜索负载测试
run_search_load_test() {
log "开始搜索负载测试"
# 创建JMeter测试计划
cat > /tmp/elasticsearch_test_plan.jmx << 'EOF'
<?xml version="1.0" encoding="UTF-8"?>
<jmeterTestPlan version="1.2" properties="5.0" jmeter="5.4.1">
<hashTree>
<TestPlan guiclass="TestPlanGui" testclass="TestPlan" testname="Elasticsearch Load Test" enabled="true">
<stringProp name="TestPlan.comments"></stringProp>
<boolProp name="TestPlan.functional_mode">false</boolProp>
<boolProp name="TestPlan.tearDown_on_shutdown">true</boolProp>
<boolProp name="TestPlan.serialize_threadgroups">false</boolProp>
<elementProp name="TestPlan.arguments" elementType="Arguments" guiclass="ArgumentsPanel" testclass="Arguments" testname="User Defined Variables" enabled="true">
<collectionProp name="Arguments.arguments"/>
</elementProp>
<stringProp name="TestPlan.user_define_classpath"></stringProp>
</TestPlan>
<hashTree>
<ThreadGroup guiclass="ThreadGroupGui" testclass="ThreadGroup" testname="Search Thread Group" enabled="true">
<stringProp name="ThreadGroup.on_sample_error">continue</stringProp>
<elementProp name="ThreadGroup.main_controller" elementType="LoopController" guiclass="LoopControllerGui" testclass="LoopController" testname="Loop Controller" enabled="true">
<boolProp name="LoopController.continue_forever">false</boolProp>
<intProp name="LoopController.loops">-1</intProp>
</elementProp>
<stringProp name="ThreadGroup.num_threads">${CONCURRENT_USERS}</stringProp>
<stringProp name="ThreadGroup.ramp_time">${RAMP_UP_TIME}</stringProp>
<boolProp name="ThreadGroup.scheduler">true</boolProp>
<stringProp name="ThreadGroup.duration">${TEST_DURATION}</stringProp>
<stringProp name="ThreadGroup.delay"></stringProp>
</ThreadGroup>
<hashTree>
<HTTPSamplerProxy guiclass="HttpTestSampleGui" testclass="HTTPSamplerProxy" testname="Search Request" enabled="true">
<elementProp name="HTTPsampler.Arguments" elementType="Arguments" guiclass="HTTPArgumentsPanel" testclass="Arguments" testname="User Defined Variables" enabled="true">
<collectionProp name="Arguments.arguments">
<elementProp name="" elementType="HTTPArgument">
<boolProp name="HTTPArgument.always_encode">false</boolProp>
<stringProp name="Argument.value">{
"query": {
"bool": {
"should": [
{"match": {"title": "${__RandomString(5,abcdefghijklmnopqrstuvwxyz)}"}},
{"match": {"content": "${__RandomString(8,abcdefghijklmnopqrstuvwxyz)}"}},
{"term": {"category": "${__RandomFromMultipleVars(technology|science|business|sports|entertainment)}"}}
]
}
},
"size": 10,
"sort": [{"_score": {"order": "desc"}}]
}</stringProp>
<stringProp name="Argument.metadata">=</stringProp>
<boolProp name="HTTPArgument.use_equals">true</boolProp>
<stringProp name="Argument.name"></stringProp>
</elementProp>
</collectionProp>
</elementProp>
<stringProp name="HTTPSampler.domain">localhost</stringProp>
<stringProp name="HTTPSampler.port">9200</stringProp>
<stringProp name="HTTPSampler.protocol">http</stringProp>
<stringProp name="HTTPSampler.contentEncoding"></stringProp>
<stringProp name="HTTPSampler.path">/${INDEX_NAME}/_search</stringProp>
<stringProp name="HTTPSampler.method">POST</stringProp>
<boolProp name="HTTPSampler.follow_redirects">true</boolProp>
<boolProp name="HTTPSampler.auto_redirects">false</boolProp>
<boolProp name="HTTPSampler.use_keepalive">true</boolProp>
<boolProp name="HTTPSampler.DO_MULTIPART_POST">false</boolProp>
<stringProp name="HTTPSampler.embedded_url_re"></stringProp>
<stringProp name="HTTPSampler.connect_timeout"></stringProp>
<stringProp name="HTTPSampler.response_timeout"></stringProp>
</HTTPSamplerProxy>
<hashTree>
<HeaderManager guiclass="HeaderPanel" testclass="HeaderManager" testname="HTTP Header Manager" enabled="true">
<collectionProp name="HeaderManager.headers">
<elementProp name="" elementType="Header">
<stringProp name="Header.name">Content-Type</stringProp>
<stringProp name="Header.value">application/json</stringProp>
</elementProp>
</collectionProp>
</HeaderManager>
<hashTree/>
</hashTree>
</hashTree>
</hashTree>
</hashTree>
</jmeterTestPlan>
EOF
# 使用Apache Bench进行简单负载测试
log "使用Apache Bench执行负载测试"
# 创建搜索请求文件
cat > /tmp/search_request.json << EOF
{
"query": {
"multi_match": {
"query": "test document",
"fields": ["title^2", "content"]
}
},
"size": 10
}
EOF
# 执行负载测试
ab -n 1000 -c $CONCURRENT_USERS -T 'application/json' -p /tmp/search_request.json \
"$ES_HOST/$INDEX_NAME/_search" > /tmp/ab_results.txt
log "负载测试完成,结果保存在 /tmp/ab_results.txt"
}
# 监控集群性能
monitor_cluster_performance() {
local duration="${1:-300}"
local interval="${2:-10}"
log "开始监控集群性能,持续时间: ${duration}秒"
local end_time=$(($(date +%s) + duration))
local output_file="/tmp/cluster_performance_$(date +%Y%m%d_%H%M%S).csv"
# 写入CSV头部
echo "timestamp,heap_used_percent,query_total,query_time_avg_ms,index_total,index_time_avg_ms,search_current,index_current" > "$output_file"
while [ $(date +%s) -lt $end_time ]; do
local timestamp=$(date '+%Y-%m-%d %H:%M:%S')
# 获取节点统计信息
local stats=$(curl -s "$ES_HOST/_nodes/stats/jvm,indices")
# 解析统计信息
local heap_used_percent=$(echo "$stats" | jq -r '.nodes | to_entries[0].value.jvm.mem.heap_used_percent')
local query_total=$(echo "$stats" | jq -r '.nodes | to_entries[0].value.indices.search.query_total')
local query_time_ms=$(echo "$stats" | jq -r '.nodes | to_entries[0].value.indices.search.query_time_in_millis')
local index_total=$(echo "$stats" | jq -r '.nodes | to_entries[0].value.indices.indexing.index_total')
local index_time_ms=$(echo "$stats" | jq -r '.nodes | to_entries[0].value.indices.indexing.index_time_in_millis')
local search_current=$(echo "$stats" | jq -r '.nodes | to_entries[0].value.indices.search.query_current')
local index_current=$(echo "$stats" | jq -r '.nodes | to_entries[0].value.indices.indexing.index_current')
# 计算平均时间
local query_time_avg=0
local index_time_avg=0
if [ "$query_total" != "null" ] && [ "$query_total" -gt 0 ]; then
query_time_avg=$(echo "scale=2; $query_time_ms / $query_total" | bc)
fi
if [ "$index_total" != "null" ] && [ "$index_total" -gt 0 ]; then
index_time_avg=$(echo "scale=2; $index_time_ms / $index_total" | bc)
fi
# 写入CSV
echo "$timestamp,$heap_used_percent,$query_total,$query_time_avg,$index_total,$index_time_avg,$search_current,$index_current" >> "$output_file"
# 打印当前状态
echo "[$timestamp] 堆内存: ${heap_used_percent}%, 查询: ${search_current}, 索引: ${index_current}"
sleep $interval
done
log "性能监控完成,数据保存在: $output_file"
}
# 生成性能报告
generate_performance_report() {
log "生成性能测试报告"
local report_file="/tmp/elasticsearch_performance_report_$(date +%Y%m%d_%H%M%S).html"
cat > "$report_file" << 'EOF'
<!DOCTYPE html>
<html>
<head>
<title>Elasticsearch性能测试报告</title>
<meta charset="UTF-8">
<style>
body { font-family: Arial, sans-serif; margin: 20px; }
.header { background-color: #f0f0f0; padding: 20px; border-radius: 5px; }
.section { margin: 20px 0; }
.metric { display: inline-block; margin: 10px; padding: 15px; background-color: #e8f4f8; border-radius: 5px; }
.chart { width: 100%; height: 400px; border: 1px solid #ccc; margin: 10px 0; }
table { border-collapse: collapse; width: 100%; }
th, td { border: 1px solid #ddd; padding: 8px; text-align: left; }
th { background-color: #f2f2f2; }
</style>
<script src="https://cdn.jsdelivr.net/npm/chart.js"></script>
</head>
<body>
<div class="header">
<h1>Elasticsearch性能测试报告</h1>
<p>生成时间: $(date)</p>
<p>测试索引: $INDEX_NAME</p>
<p>并发用户: $CONCURRENT_USERS</p>
<p>测试持续时间: $TEST_DURATION 秒</p>
</div>
<div class="section">
<h2>负载测试结果</h2>
<div id="load-test-results">
<!-- 这里会插入负载测试结果 -->
</div>
</div>
<div class="section">
<h2>集群性能指标</h2>
<canvas id="performanceChart" class="chart"></canvas>
</div>
<div class="section">
<h2>优化建议</h2>
<ul>
<li>根据查询模式优化索引映射</li>
<li>调整JVM堆内存大小</li>
<li>优化分片和副本配置</li>
<li>使用适当的刷新间隔</li>
<li>启用查询缓存和请求缓存</li>
</ul>
</div>
<script>
// 这里可以添加图表渲染代码
const ctx = document.getElementById('performanceChart').getContext('2d');
const chart = new Chart(ctx, {
type: 'line',
data: {
labels: ['时间1', '时间2', '时间3', '时间4', '时间5'],
datasets: [{
label: '查询响应时间(ms)',
data: [12, 19, 3, 5, 2],
borderColor: 'rgb(75, 192, 192)',
tension: 0.1
}]
},
options: {
responsive: true,
scales: {
y: {
beginAtZero: true
}
}
}
});
</script>
</body>
</html>
EOF
log "性能报告已生成: $report_file"
}
# 清理测试数据
cleanup_test_data() {
log "清理测试数据"
curl -X DELETE "$ES_HOST/$INDEX_NAME"
rm -f /tmp/bulk_data.json /tmp/search_request.json /tmp/ab_results.txt
log "测试数据清理完成"
}
# 主函数
main() {
local action="${1:-all}"
case "$action" in
"create-data")
create_test_data "${2:-10000}"
;;
"load-test")
run_search_load_test
;;
"monitor")
monitor_cluster_performance "${2:-300}" "${3:-10}"
;;
"report")
generate_performance_report
;;
"cleanup")
cleanup_test_data
;;
"all")
create_test_data 10000
run_search_load_test &
LOAD_TEST_PID=$!
monitor_cluster_performance 300 10 &
MONITOR_PID=$!
wait $LOAD_TEST_PID
wait $MONITOR_PID
generate_performance_report
;;
*)
echo "用法: $0 {create-data|load-test|monitor|report|cleanup|all}"
echo " create-data [数量] - 创建测试数据"
echo " load-test - 执行负载测试"
echo " monitor [时长] [间隔] - 监控集群性能"
echo " report - 生成性能报告"
echo " cleanup - 清理测试数据"
echo " all - 执行完整测试流程"
exit 1
;;
esac
}
# 检查依赖
check_dependencies() {
local missing_deps=()
if ! command -v curl &> /dev/null; then
missing_deps+=("curl")
fi
if ! command -v jq &> /dev/null; then
missing_deps+=("jq")
fi
if ! command -v bc &> /dev/null; then
missing_deps+=("bc")
fi
if [ ${#missing_deps[@]} -gt 0 ]; then
echo "缺少依赖: ${missing_deps[*]}"
echo "请安装缺少的依赖后重试"
exit 1
fi
}
# 检查依赖并执行主函数
check_dependencies
main "$@"
总结
本文深入探讨了Elasticsearch搜索引擎的全面优化策略,涵盖了以下关键领域:
核心优化要点
-
索引设计优化
- 合理的映射配置和字段类型选择
- 自定义分析器和分词器配置
- 索引模板和生命周期管理
-
查询性能调优
- DSL查询优化技巧
- 过滤器vs查询的合理使用
- 自定义评分和相关性调优
-
集群配置优化
- JVM参数调优
- 系统级优化配置
- 缓存策略优化
-
监控和诊断
- 性能监控脚本
- 慢查询分析
- 集群健康状态监控
最佳实践建议
- 分片策略: 根据数据量和查询模式合理设置分片数量
- 内存管理: 优化JVM堆内存和系统内存配置
- 查询优化: 使用过滤器、缓存和合适的查询类型
- 监控告警: 建立完善的监控体系和告警机制
通过系统性的优化实践,可以显著提升Elasticsearch集群的性能、稳定性和可扩展性,为企业级搜索应用提供强有力的技术支撑。