跳转到主要内容

Elasticsearch搜索引擎优化:从索引设计到查询性能调优的完整指南

博主
22 分钟
4559 字
--

AI 导读

深刻理解和准确把握"Elasticsearch搜索引擎优化:从索引设计到查询性能调优的完整指南"这一重要概念的核心要义,本文从理论基础、实践应用和发展前景等多个维度进行了系统性阐述,为读者提供了全面而深入的分析视角。

内容由AI智能生成

Elasticsearch搜索引擎优化:从索引设计到查询性能调优的完整指南

Elasticsearch作为当今最流行的搜索引擎,在处理大规模数据搜索、日志分析、实时分析等场景中发挥着重要作用。本文将深入探讨Elasticsearch的优化策略,从索引设计到查询性能调优,提供完整的优化指南。

Elasticsearch架构概述

核心概念

graph TB
    subgraph "Elasticsearch集群"
        subgraph "Master节点"
            M1[Master Node 1]
            M2[Master Node 2]
            M3[Master Node 3]
        end
        
        subgraph "数据节点"
            D1[Data Node 1]
            D2[Data Node 2]
            D3[Data Node 3]
            D4[Data Node 4]
        end
        
        subgraph "协调节点"
            C1[Coordinating Node 1]
            C2[Coordinating Node 2]
        end
        
        subgraph "摄取节点"
            I1[Ingest Node 1]
            I2[Ingest Node 2]
        end
    end
    
    Client[客户端应用] --> C1
    Client --> C2
    
    C1 --> D1
    C1 --> D2
    C2 --> D3
    C2 --> D4
    
    I1 --> D1
    I2 --> D2
    
    M1 -.-> M2
    M2 -.-> M3
    M3 -.-> M1

节点角色配置

# elasticsearch.yml - Master节点配置
cluster.name: production-cluster
node.name: master-node-1
node.roles: [master]
discovery.seed_hosts: ["master-node-1", "master-node-2", "master-node-3"]
cluster.initial_master_nodes: ["master-node-1", "master-node-2", "master-node-3"]

# 内存配置
bootstrap.memory_lock: true
indices.memory.index_buffer_size: 10%

# 网络配置
network.host: 0.0.0.0
http.port: 9200
transport.port: 9300

# 安全配置
xpack.security.enabled: true
xpack.security.transport.ssl.enabled: true
xpack.security.http.ssl.enabled: true

---
# elasticsearch.yml - 数据节点配置
cluster.name: production-cluster
node.name: data-node-1
node.roles: [data, data_content, data_hot, data_warm, data_cold]

# 数据路径配置
path.data: ["/data1/elasticsearch", "/data2/elasticsearch"]
path.logs: "/var/log/elasticsearch"

# 内存配置
bootstrap.memory_lock: true
indices.memory.index_buffer_size: 20%
indices.memory.min_index_buffer_size: 96mb

# 线程池配置
thread_pool:
  write:
    size: 8
    queue_size: 1000
  search:
    size: 13
    queue_size: 1000
  get:
    size: 8
    queue_size: 1000

---
# elasticsearch.yml - 协调节点配置
cluster.name: production-cluster
node.name: coordinating-node-1
node.roles: []

# 内存配置
bootstrap.memory_lock: true
indices.memory.index_buffer_size: 5%

# 搜索配置
search.max_buckets: 65536
search.allow_expensive_queries: false

索引设计优化

映射(Mapping)设计最佳实践

{
  "mappings": {
    "properties": {
      "id": {
        "type": "keyword",
        "store": true
      },
      "title": {
        "type": "text",
        "analyzer": "ik_max_word",
        "search_analyzer": "ik_smart",
        "fields": {
          "keyword": {
            "type": "keyword",
            "ignore_above": 256
          },
          "suggest": {
            "type": "completion",
            "analyzer": "simple",
            "preserve_separators": true,
            "preserve_position_increments": true,
            "max_input_length": 50
          }
        }
      },
      "content": {
        "type": "text",
        "analyzer": "ik_max_word",
        "search_analyzer": "ik_smart",
        "index_options": "positions"
      },
      "category": {
        "type": "keyword",
        "fields": {
          "text": {
            "type": "text",
            "analyzer": "standard"
          }
        }
      },
      "tags": {
        "type": "keyword"
      },
      "publish_date": {
        "type": "date",
        "format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis"
      },
      "view_count": {
        "type": "integer",
        "index": false
      },
      "author": {
        "properties": {
          "id": {
            "type": "keyword"
          },
          "name": {
            "type": "text",
            "analyzer": "ik_max_word",
            "fields": {
              "keyword": {
                "type": "keyword",
                "ignore_above": 64
              }
            }
          },
          "email": {
            "type": "keyword",
            "index": false
          }
        }
      },
      "location": {
        "type": "geo_point"
      },
      "metadata": {
        "type": "object",
        "enabled": false
      }
    }
  },
  "settings": {
    "number_of_shards": 3,
    "number_of_replicas": 1,
    "refresh_interval": "30s",
    "max_result_window": 50000,
    "analysis": {
      "analyzer": {
        "ik_max_word": {
          "type": "ik_max_word"
        },
        "ik_smart": {
          "type": "ik_smart"
        },
        "custom_analyzer": {
          "type": "custom",
          "tokenizer": "standard",
          "filter": [
            "lowercase",
            "stop",
            "snowball"
          ]
        }
      }
    },
    "index": {
      "sort.field": ["publish_date", "_score"],
      "sort.order": ["desc", "desc"]
    }
  }
}

索引模板配置

{
  "index_patterns": ["logs-*", "metrics-*"],
  "template": {
    "settings": {
      "number_of_shards": 1,
      "number_of_replicas": 1,
      "refresh_interval": "5s",
      "index.lifecycle.name": "logs-policy",
      "index.lifecycle.rollover_alias": "logs-write"
    },
    "mappings": {
      "properties": {
        "@timestamp": {
          "type": "date"
        },
        "level": {
          "type": "keyword"
        },
        "message": {
          "type": "text",
          "analyzer": "standard"
        },
        "service": {
          "type": "keyword"
        },
        "host": {
          "type": "keyword"
        },
        "tags": {
          "type": "keyword"
        }
      }
    }
  },
  "composed_of": ["component-template-mappings", "component-template-settings"],
  "priority": 200,
  "version": 1,
  "_meta": {
    "description": "Template for application logs"
  }
}

索引生命周期管理(ILM)

{
  "policy": {
    "phases": {
      "hot": {
        "min_age": "0ms",
        "actions": {
          "rollover": {
            "max_size": "10gb",
            "max_age": "1d",
            "max_docs": 10000000
          },
          "set_priority": {
            "priority": 100
          }
        }
      },
      "warm": {
        "min_age": "1d",
        "actions": {
          "allocate": {
            "number_of_replicas": 0,
            "include": {
              "data_tier": "data_warm"
            }
          },
          "forcemerge": {
            "max_num_segments": 1
          },
          "set_priority": {
            "priority": 50
          }
        }
      },
      "cold": {
        "min_age": "7d",
        "actions": {
          "allocate": {
            "number_of_replicas": 0,
            "include": {
              "data_tier": "data_cold"
            }
          },
          "set_priority": {
            "priority": 0
          }
        }
      },
      "delete": {
        "min_age": "30d",
        "actions": {
          "delete": {}
        }
      }
    }
  }
}

查询性能优化

查询DSL优化策略

// 1. 使用过滤器而非查询(Filter vs Query)
{
  "query": {
    "bool": {
      "must": [
        {
          "match": {
            "title": "elasticsearch optimization"
          }
        }
      ],
      "filter": [
        {
          "term": {
            "status": "published"
          }
        },
        {
          "range": {
            "publish_date": {
              "gte": "2024-01-01"
            }
          }
        }
      ]
    }
  }
}

// 2. 使用constant_score查询避免评分计算
{
  "query": {
    "constant_score": {
      "filter": {
        "bool": {
          "must": [
            {
              "term": {
                "category": "technology"
              }
            },
            {
              "range": {
                "view_count": {
                  "gte": 1000
                }
              }
            }
          ]
        }
      },
      "boost": 1.0
    }
  }
}

// 3. 使用multi_match查询优化
{
  "query": {
    "multi_match": {
      "query": "elasticsearch performance",
      "fields": [
        "title^3",
        "content^1",
        "tags^2"
      ],
      "type": "best_fields",
      "tie_breaker": 0.3,
      "minimum_should_match": "75%"
    }
  }
}

// 4. 使用function_score自定义评分
{
  "query": {
    "function_score": {
      "query": {
        "match": {
          "title": "elasticsearch"
        }
      },
      "functions": [
        {
          "filter": {
            "range": {
              "publish_date": {
                "gte": "2024-01-01"
              }
            }
          },
          "weight": 2
        },
        {
          "field_value_factor": {
            "field": "view_count",
            "factor": 0.1,
            "modifier": "log1p",
            "missing": 1
          }
        }
      ],
      "score_mode": "multiply",
      "boost_mode": "multiply"
    }
  }
}

// 5. 使用聚合优化
{
  "size": 0,
  "aggs": {
    "categories": {
      "terms": {
        "field": "category",
        "size": 10,
        "order": {
          "_count": "desc"
        }
      },
      "aggs": {
        "avg_views": {
          "avg": {
            "field": "view_count"
          }
        },
        "top_articles": {
          "top_hits": {
            "size": 3,
            "_source": ["title", "author.name"],
            "sort": [
              {
                "view_count": {
                  "order": "desc"
                }
              }
            ]
          }
        }
      }
    },
    "date_histogram": {
      "date_histogram": {
        "field": "publish_date",
        "calendar_interval": "month",
        "format": "yyyy-MM"
      },
      "aggs": {
        "article_count": {
          "value_count": {
            "field": "id"
          }
        }
      }
    }
  }
}

查询性能监控脚本

#!/usr/bin/env python3
# scripts/elasticsearch_query_monitor.py

import json
import time
import requests
import argparse
from datetime import datetime, timedelta
from collections import defaultdict
import statistics

class ElasticsearchQueryMonitor:
    def __init__(self, es_host, username=None, password=None):
        self.es_host = es_host.rstrip('/')
        self.session = requests.Session()
        
        if username and password:
            self.session.auth = (username, password)
        
        self.session.headers.update({
            'Content-Type': 'application/json'
        })
    
    def get_cluster_stats(self):
        """获取集群统计信息"""
        try:
            response = self.session.get(f"{self.es_host}/_cluster/stats")
            response.raise_for_status()
            return response.json()
        except Exception as e:
            print(f"Error getting cluster stats: {e}")
            return {}
    
    def get_node_stats(self):
        """获取节点统计信息"""
        try:
            response = self.session.get(f"{self.es_host}/_nodes/stats")
            response.raise_for_status()
            return response.json()
        except Exception as e:
            print(f"Error getting node stats: {e}")
            return {}
    
    def get_index_stats(self, index_pattern="*"):
        """获取索引统计信息"""
        try:
            response = self.session.get(f"{self.es_host}/{index_pattern}/_stats")
            response.raise_for_status()
            return response.json()
        except Exception as e:
            print(f"Error getting index stats: {e}")
            return {}
    
    def get_slow_queries(self, index_pattern="*", time_range="1h"):
        """获取慢查询日志"""
        query = {
            "query": {
                "bool": {
                    "must": [
                        {
                            "range": {
                                "@timestamp": {
                                    "gte": f"now-{time_range}"
                                }
                            }
                        },
                        {
                            "exists": {
                                "field": "elasticsearch.slowlog.took"
                            }
                        }
                    ]
                }
            },
            "sort": [
                {
                    "elasticsearch.slowlog.took": {
                        "order": "desc"
                    }
                }
            ],
            "size": 100
        }
        
        try:
            response = self.session.post(
                f"{self.es_host}/{index_pattern}/_search",
                json=query
            )
            response.raise_for_status()
            return response.json()
        except Exception as e:
            print(f"Error getting slow queries: {e}")
            return {}
    
    def analyze_query_performance(self, index_name, query_body):
        """分析查询性能"""
        try:
            # 执行查询并获取详细信息
            response = self.session.post(
                f"{self.es_host}/{index_name}/_search?explain=true&profile=true",
                json=query_body
            )
            response.raise_for_status()
            result = response.json()
            
            analysis = {
                'took_ms': result.get('took', 0),
                'total_hits': result.get('hits', {}).get('total', {}).get('value', 0),
                'max_score': result.get('hits', {}).get('max_score', 0),
                'profile': result.get('profile', {}),
                'suggestions': []
            }
            
            # 分析profile信息
            if 'shards' in result.get('profile', {}):
                for shard in result['profile']['shards']:
                    for search in shard.get('searches', []):
                        for query in search.get('query', []):
                            query_time = query.get('time_in_nanos', 0) / 1000000  # 转换为毫秒
                            if query_time > 100:  # 超过100ms的查询
                                analysis['suggestions'].append({
                                    'type': 'slow_query_component',
                                    'component': query.get('type', 'unknown'),
                                    'time_ms': query_time,
                                    'suggestion': f"查询组件 {query.get('type')} 耗时 {query_time:.2f}ms,考虑优化"
                                })
            
            return analysis
            
        except Exception as e:
            print(f"Error analyzing query performance: {e}")
            return {}
    
    def get_cache_stats(self):
        """获取缓存统计信息"""
        try:
            response = self.session.get(f"{self.es_host}/_nodes/stats/indices/query_cache,request_cache,fielddata")
            response.raise_for_status()
            stats = response.json()
            
            cache_analysis = {
                'query_cache': {},
                'request_cache': {},
                'fielddata_cache': {}
            }
            
            for node_id, node_stats in stats.get('nodes', {}).items():
                indices_stats = node_stats.get('indices', {})
                
                # Query Cache
                query_cache = indices_stats.get('query_cache', {})
                cache_analysis['query_cache'][node_id] = {
                    'memory_size_bytes': query_cache.get('memory_size_in_bytes', 0),
                    'total_count': query_cache.get('total_count', 0),
                    'hit_count': query_cache.get('hit_count', 0),
                    'miss_count': query_cache.get('miss_count', 0),
                    'cache_size': query_cache.get('cache_size', 0),
                    'evictions': query_cache.get('evictions', 0)
                }
                
                # Request Cache
                request_cache = indices_stats.get('request_cache', {})
                cache_analysis['request_cache'][node_id] = {
                    'memory_size_bytes': request_cache.get('memory_size_in_bytes', 0),
                    'hit_count': request_cache.get('hit_count', 0),
                    'miss_count': request_cache.get('miss_count', 0),
                    'evictions': request_cache.get('evictions', 0)
                }
                
                # Fielddata Cache
                fielddata = indices_stats.get('fielddata', {})
                cache_analysis['fielddata_cache'][node_id] = {
                    'memory_size_bytes': fielddata.get('memory_size_in_bytes', 0),
                    'evictions': fielddata.get('evictions', 0)
                }
            
            return cache_analysis
            
        except Exception as e:
            print(f"Error getting cache stats: {e}")
            return {}
    
    def generate_performance_report(self, indices=None):
        """生成性能报告"""
        report = {
            'timestamp': datetime.now().isoformat(),
            'cluster_stats': self.get_cluster_stats(),
            'node_stats': self.get_node_stats(),
            'cache_stats': self.get_cache_stats(),
            'index_analysis': {}
        }
        
        # 分析指定索引
        if indices:
            for index_name in indices:
                print(f"分析索引: {index_name}")
                index_stats = self.get_index_stats(index_name)
                report['index_analysis'][index_name] = index_stats
        
        return report
    
    def print_performance_summary(self, report):
        """打印性能摘要"""
        print("\n" + "="*80)
        print("Elasticsearch性能监控报告")
        print("="*80)
        print(f"生成时间: {report['timestamp']}")
        
        # 集群概览
        cluster_stats = report.get('cluster_stats', {})
        if cluster_stats:
            indices_stats = cluster_stats.get('indices', {})
            nodes_stats = cluster_stats.get('nodes', {})
            
            print(f"\n集群概览:")
            print(f"  节点数量: {nodes_stats.get('count', {}).get('total', 0)}")
            print(f"  索引数量: {indices_stats.get('count', 0)}")
            print(f"  文档总数: {indices_stats.get('docs', {}).get('count', 0):,}")
            print(f"  存储大小: {self._format_bytes(indices_stats.get('store', {}).get('size_in_bytes', 0))}")
        
        # 缓存分析
        cache_stats = report.get('cache_stats', {})
        if cache_stats:
            print(f"\n缓存分析:")
            
            # Query Cache
            query_cache_total_memory = sum(
                node_cache.get('memory_size_bytes', 0) 
                for node_cache in cache_stats.get('query_cache', {}).values()
            )
            query_cache_total_hits = sum(
                node_cache.get('hit_count', 0) 
                for node_cache in cache_stats.get('query_cache', {}).values()
            )
            query_cache_total_misses = sum(
                node_cache.get('miss_count', 0) 
                for node_cache in cache_stats.get('query_cache', {}).values()
            )
            
            if query_cache_total_hits + query_cache_total_misses > 0:
                hit_rate = query_cache_total_hits / (query_cache_total_hits + query_cache_total_misses) * 100
                print(f"  Query Cache命中率: {hit_rate:.1f}%")
                print(f"  Query Cache内存使用: {self._format_bytes(query_cache_total_memory)}")
            
            # Request Cache
            request_cache_total_hits = sum(
                node_cache.get('hit_count', 0) 
                for node_cache in cache_stats.get('request_cache', {}).values()
            )
            request_cache_total_misses = sum(
                node_cache.get('miss_count', 0) 
                for node_cache in cache_stats.get('request_cache', {}).values()
            )
            
            if request_cache_total_hits + request_cache_total_misses > 0:
                hit_rate = request_cache_total_hits / (request_cache_total_hits + request_cache_total_misses) * 100
                print(f"  Request Cache命中率: {hit_rate:.1f}%")
        
        # 节点性能分析
        node_stats = report.get('node_stats', {})
        if node_stats and 'nodes' in node_stats:
            print(f"\n节点性能:")
            
            for node_id, node_data in node_stats['nodes'].items():
                node_name = node_data.get('name', node_id)
                jvm_stats = node_data.get('jvm', {})
                indices_stats = node_data.get('indices', {})
                
                print(f"  节点: {node_name}")
                
                # JVM内存使用
                if 'mem' in jvm_stats:
                    heap_used_percent = jvm_stats['mem'].get('heap_used_percent', 0)
                    heap_max = jvm_stats['mem'].get('heap_max_in_bytes', 0)
                    print(f"    JVM堆内存使用: {heap_used_percent}% / {self._format_bytes(heap_max)}")
                
                # 搜索性能
                if 'search' in indices_stats:
                    search_stats = indices_stats['search']
                    query_total = search_stats.get('query_total', 0)
                    query_time_ms = search_stats.get('query_time_in_millis', 0)
                    avg_query_time = query_time_ms / query_total if query_total > 0 else 0
                    print(f"    平均查询时间: {avg_query_time:.2f}ms")
                    print(f"    查询总数: {query_total:,}")
                
                # 索引性能
                if 'indexing' in indices_stats:
                    indexing_stats = indices_stats['indexing']
                    index_total = indexing_stats.get('index_total', 0)
                    index_time_ms = indexing_stats.get('index_time_in_millis', 0)
                    avg_index_time = index_time_ms / index_total if index_total > 0 else 0
                    print(f"    平均索引时间: {avg_index_time:.2f}ms")
    
    def _format_bytes(self, bytes_value):
        """格式化字节数"""
        if bytes_value == 0:
            return "0 B"
        
        units = ['B', 'KB', 'MB', 'GB', 'TB']
        unit_index = 0
        
        while bytes_value >= 1024 and unit_index < len(units) - 1:
            bytes_value /= 1024
            unit_index += 1
        
        return f"{bytes_value:.1f} {units[unit_index]}"
    
    def monitor_queries(self, duration_minutes=60, interval_seconds=30):
        """持续监控查询性能"""
        print(f"开始监控查询性能,持续时间: {duration_minutes} 分钟")
        
        end_time = datetime.now() + timedelta(minutes=duration_minutes)
        query_times = []
        
        while datetime.now() < end_time:
            try:
                # 获取当前性能指标
                node_stats = self.get_node_stats()
                
                current_metrics = {}
                for node_id, node_data in node_stats.get('nodes', {}).items():
                    search_stats = node_data.get('indices', {}).get('search', {})
                    query_total = search_stats.get('query_total', 0)
                    query_time_ms = search_stats.get('query_time_in_millis', 0)
                    
                    if query_total > 0:
                        avg_query_time = query_time_ms / query_total
                        current_metrics[node_id] = avg_query_time
                        query_times.append(avg_query_time)
                
                # 打印当前状态
                if current_metrics:
                    avg_time = statistics.mean(current_metrics.values())
                    print(f"[{datetime.now().strftime('%H:%M:%S')}] 平均查询时间: {avg_time:.2f}ms")
                
                time.sleep(interval_seconds)
                
            except KeyboardInterrupt:
                print("\n监控已停止")
                break
            except Exception as e:
                print(f"监控过程中发生错误: {e}")
                time.sleep(interval_seconds)
        
        # 生成监控摘要
        if query_times:
            print(f"\n监控摘要:")
            print(f"  平均查询时间: {statistics.mean(query_times):.2f}ms")
            print(f"  最大查询时间: {max(query_times):.2f}ms")
            print(f"  最小查询时间: {min(query_times):.2f}ms")
            print(f"  查询时间标准差: {statistics.stdev(query_times):.2f}ms")

def main():
    parser = argparse.ArgumentParser(description='Elasticsearch查询性能监控工具')
    parser.add_argument('--host', required=True, help='Elasticsearch主机地址')
    parser.add_argument('--username', help='用户名')
    parser.add_argument('--password', help='密码')
    parser.add_argument('--action', choices=['report', 'monitor'], default='report', help='执行动作')
    parser.add_argument('--duration', type=int, default=60, help='监控持续时间(分钟)')
    parser.add_argument('--interval', type=int, default=30, help='监控间隔(秒)')
    parser.add_argument('--indices', help='要分析的索引模式,用逗号分隔')
    
    args = parser.parse_args()
    
    monitor = ElasticsearchQueryMonitor(args.host, args.username, args.password)
    
    try:
        if args.action == 'report':
            indices = args.indices.split(',') if args.indices else None
            report = monitor.generate_performance_report(indices)
            monitor.print_performance_summary(report)
            
            # 保存报告
            timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
            filename = f"elasticsearch_performance_report_{timestamp}.json"
            with open(filename, 'w', encoding='utf-8') as f:
                json.dump(report, f, indent=2, ensure_ascii=False, default=str)
            print(f"\n详细报告已保存到: {filename}")
            
        elif args.action == 'monitor':
            monitor.monitor_queries(args.duration, args.interval)
            
    except Exception as e:
        print(f"执行过程中发生错误: {e}")

if __name__ == "__main__":
    main()

集群配置优化

JVM配置优化

#!/bin/bash
# scripts/elasticsearch_jvm_tuning.sh

# JVM堆内存配置
# 设置为物理内存的50%,但不超过32GB
PHYSICAL_MEMORY_GB=$(free -g | awk '/^Mem:/{print $2}')
HEAP_SIZE_GB=$((PHYSICAL_MEMORY_GB / 2))

if [ $HEAP_SIZE_GB -gt 32 ]; then
    HEAP_SIZE_GB=32
fi

# 生成jvm.options配置
cat > /etc/elasticsearch/jvm.options.d/heap.options << EOF
# 堆内存配置
-Xms${HEAP_SIZE_GB}g
-Xmx${HEAP_SIZE_GB}g

# GC配置
-XX:+UseG1GC
-XX:G1HeapRegionSize=16m
-XX:+UseG1GC
-XX:+UnlockExperimentalVMOptions
-XX:+UseG1GC
-XX:MaxGCPauseMillis=200
-XX:+DisableExplicitGC

# 内存配置
-XX:+AlwaysPreTouch
-Xss1m
-Djava.awt.headless=true
-Dfile.encoding=UTF-8
-Djna.nosys=true
-XX:-OmitStackTraceInFastThrow
-Dio.netty.noUnsafe=true
-Dio.netty.noKeySetOptimization=true
-Dio.netty.recycler.maxCapacityPerThread=0
-Dlog4j.shutdownHookEnabled=false
-Dlog4j2.disable.jmx=true

# 错误处理
-XX:+HeapDumpOnOutOfMemoryError
-XX:HeapDumpPath=/var/lib/elasticsearch
-XX:ErrorFile=/var/log/elasticsearch/hs_err_pid%p.log

# GC日志
-Xlog:gc*,gc+age=trace,safepoint:gc.log:time,level,tags
-XX:+UseGCLogFileRotation
-XX:NumberOfGCLogFiles=32
-XX:GCLogFileSize=64m
EOF

echo "JVM配置已生成: /etc/elasticsearch/jvm.options.d/heap.options"
echo "建议的堆内存大小: ${HEAP_SIZE_GB}GB"

系统级优化脚本

#!/bin/bash
# scripts/elasticsearch_system_optimization.sh

set -euo pipefail

# 日志函数
log() {
    echo "[$(date '+%Y-%m-%d %H:%M:%S')] $1" >&2
}

# 优化文件描述符限制
optimize_file_descriptors() {
    log "优化文件描述符限制..."
    
    # 设置系统级限制
    cat >> /etc/security/limits.conf << EOF
elasticsearch soft nofile 65536
elasticsearch hard nofile 65536
elasticsearch soft nproc 4096
elasticsearch hard nproc 4096
elasticsearch soft memlock unlimited
elasticsearch hard memlock unlimited
EOF
    
    # 设置systemd服务限制
    mkdir -p /etc/systemd/system/elasticsearch.service.d
    cat > /etc/systemd/system/elasticsearch.service.d/override.conf << EOF
[Service]
LimitNOFILE=65536
LimitNPROC=4096
LimitMEMLOCK=infinity
EOF
    
    systemctl daemon-reload
    log "文件描述符限制优化完成"
}

# 优化虚拟内存
optimize_virtual_memory() {
    log "优化虚拟内存设置..."
    
    # 设置vm.max_map_count
    echo 'vm.max_map_count=262144' >> /etc/sysctl.conf
    sysctl -p
    
    # 设置swappiness
    echo 'vm.swappiness=1' >> /etc/sysctl.conf
    sysctl -p
    
    log "虚拟内存优化完成"
}

# 优化磁盘I/O
optimize_disk_io() {
    log "优化磁盘I/O设置..."
    
    # 获取数据磁盘设备
    DATA_DEVICES=$(lsblk -no NAME,MOUNTPOINT | grep '/data' | awk '{print $1}' | sed 's/[0-9]*$//')
    
    for device in $DATA_DEVICES; do
        if [ -b "/dev/$device" ]; then
            # 设置I/O调度器为deadline或noop
            echo deadline > /sys/block/$device/queue/scheduler
            
            # 设置读取预读
            echo 128 > /sys/block/$device/queue/read_ahead_kb
            
            # 设置队列深度
            echo 32 > /sys/block/$device/queue/nr_requests
            
            log "已优化设备 /dev/$device 的I/O设置"
        fi
    done
}

# 优化网络设置
optimize_network() {
    log "优化网络设置..."
    
    cat >> /etc/sysctl.conf << EOF
# 网络优化
net.core.rmem_default = 262144
net.core.rmem_max = 16777216
net.core.wmem_default = 262144
net.core.wmem_max = 16777216
net.ipv4.tcp_rmem = 4096 65536 16777216
net.ipv4.tcp_wmem = 4096 65536 16777216
net.core.netdev_max_backlog = 5000
net.ipv4.tcp_congestion_control = bbr
EOF
    
    sysctl -p
    log "网络优化完成"
}

# 创建监控脚本
create_monitoring_script() {
    log "创建系统监控脚本..."
    
    cat > /usr/local/bin/elasticsearch_system_monitor.sh << 'EOF'
#!/bin/bash

# Elasticsearch系统监控脚本

# 检查JVM堆内存使用
check_jvm_memory() {
    local es_host="${1:-localhost:9200}"
    
    curl -s "$es_host/_nodes/stats/jvm" | jq -r '
        .nodes[] | 
        "节点: " + .name + 
        " | 堆内存使用: " + (.jvm.mem.heap_used_percent | tostring) + "%" +
        " | GC次数: " + (.jvm.gc.collectors.young.collection_count | tostring)
    '
}

# 检查磁盘使用情况
check_disk_usage() {
    local es_host="${1:-localhost:9200}"
    
    curl -s "$es_host/_nodes/stats/fs" | jq -r '
        .nodes[] | 
        "节点: " + .name + 
        " | 磁盘使用: " + ((.fs.total.total_in_bytes - .fs.total.available_in_bytes) * 100 / .fs.total.total_in_bytes | floor | tostring) + "%"
    '
}

# 检查查询性能
check_query_performance() {
    local es_host="${1:-localhost:9200}"
    
    curl -s "$es_host/_nodes/stats/indices/search" | jq -r '
        .nodes[] | 
        "节点: " + .name + 
        " | 查询总数: " + (.indices.search.query_total | tostring) +
        " | 平均查询时间: " + ((.indices.search.query_time_in_millis / .indices.search.query_total) | floor | tostring) + "ms"
    '
}

# 检查集群健康状态
check_cluster_health() {
    local es_host="${1:-localhost:9200}"
    
    curl -s "$es_host/_cluster/health" | jq -r '
        "集群状态: " + .status +
        " | 节点数: " + (.number_of_nodes | tostring) +
        " | 数据节点数: " + (.number_of_data_nodes | tostring) +
        " | 活跃分片: " + (.active_shards | tostring) +
        " | 重定位分片: " + (.relocating_shards | tostring) +
        " | 初始化分片: " + (.initializing_shards | tostring) +
        " | 未分配分片: " + (.unassigned_shards | tostring)
    '
}

# 主函数
main() {
    local es_host="${1:-localhost:9200}"
    
    echo "Elasticsearch系统监控报告 - $(date)"
    echo "=================================="
    
    echo -e "\n集群健康状态:"
    check_cluster_health "$es_host"
    
    echo -e "\nJVM内存使用:"
    check_jvm_memory "$es_host"
    
    echo -e "\n磁盘使用情况:"
    check_disk_usage "$es_host"
    
    echo -e "\n查询性能:"
    check_query_performance "$es_host"
}

main "$@"
EOF
    
    chmod +x /usr/local/bin/elasticsearch_system_monitor.sh
    log "系统监控脚本已创建: /usr/local/bin/elasticsearch_system_monitor.sh"
}

# 主函数
main() {
    local action="${1:-all}"
    
    case "$action" in
        "fd")
            optimize_file_descriptors
            ;;
        "vm")
            optimize_virtual_memory
            ;;
        "io")
            optimize_disk_io
            ;;
        "network")
            optimize_network
            ;;
        "monitor")
            create_monitoring_script
            ;;
        "all")
            optimize_file_descriptors
            optimize_virtual_memory
            optimize_disk_io
            optimize_network
            create_monitoring_script
            log "所有系统优化完成,请重启Elasticsearch服务"
            ;;
        *)
            echo "用法: $0 {fd|vm|io|network|monitor|all}"
            echo "  fd      - 优化文件描述符限制"
            echo "  vm      - 优化虚拟内存设置"
            echo "  io      - 优化磁盘I/O设置"
            echo "  network - 优化网络设置"
            echo "  monitor - 创建监控脚本"
            echo "  all     - 执行所有优化"
            exit 1
            ;;
    esac
}

# 检查root权限
if [[ $EUID -ne 0 ]]; then
   echo "此脚本需要root权限运行" 
   exit 1
fi

# 执行主函数
main "$@"

搜索相关性优化

自定义分析器配置

{
  "settings": {
    "analysis": {
      "char_filter": {
        "html_strip_filter": {
          "type": "html_strip"
        },
        "mapping_filter": {
          "type": "mapping",
          "mappings": [
            "& => and",
            "| => or"
          ]
        }
      },
      "tokenizer": {
        "custom_keyword_tokenizer": {
          "type": "keyword",
          "buffer_size": 256
        },
        "custom_pattern_tokenizer": {
          "type": "pattern",
          "pattern": "\\W+",
          "lowercase": true
        }
      },
      "filter": {
        "chinese_stop": {
          "type": "stop",
          "stopwords": ["的", "了", "在", "是", "我", "有", "和", "就", "不", "人", "都", "一", "一个", "上", "也", "很", "到", "说", "要", "去", "你", "会", "着", "没有", "看", "好", "自己", "这"]
        },
        "english_stop": {
          "type": "stop",
          "stopwords": "_english_"
        },
        "synonym_filter": {
          "type": "synonym",
          "synonyms": [
            "elasticsearch,es,elastic search",
            "database,db,数据库",
            "optimization,optimisation,优化",
            "performance,性能"
          ]
        },
        "custom_stemmer": {
          "type": "stemmer",
          "language": "english"
        }
      },
      "analyzer": {
        "chinese_analyzer": {
          "type": "custom",
          "char_filter": ["html_strip_filter"],
          "tokenizer": "ik_max_word",
          "filter": [
            "lowercase",
            "chinese_stop",
            "synonym_filter"
          ]
        },
        "english_analyzer": {
          "type": "custom",
          "char_filter": ["html_strip_filter", "mapping_filter"],
          "tokenizer": "standard",
          "filter": [
            "lowercase",
            "english_stop",
            "synonym_filter",
            "custom_stemmer"
          ]
        },
        "search_analyzer": {
          "type": "custom",
          "tokenizer": "ik_smart",
          "filter": [
            "lowercase",
            "synonym_filter"
          ]
        }
      }
    }
  }
}

搜索相关性调优脚本

#!/usr/bin/env python3
# scripts/elasticsearch_relevance_tuning.py

import json
import requests
import argparse
from datetime import datetime
import math

class ElasticsearchRelevanceTuner:
    def __init__(self, es_host, username=None, password=None):
        self.es_host = es_host.rstrip('/')
        self.session = requests.Session()
        
        if username and password:
            self.session.auth = (username, password)
        
        self.session.headers.update({
            'Content-Type': 'application/json'
        })
    
    def analyze_search_results(self, index_name, query, expected_results=None):
        """分析搜索结果相关性"""
        search_body = {
            "query": query,
            "size": 20,
            "explain": True
        }
        
        try:
            response = self.session.post(
                f"{self.es_host}/{index_name}/_search",
                json=search_body
            )
            response.raise_for_status()
            results = response.json()
            
            analysis = {
                'total_hits': results.get('hits', {}).get('total', {}).get('value', 0),
                'max_score': results.get('hits', {}).get('max_score', 0),
                'results': [],
                'score_distribution': {},
                'relevance_metrics': {}
            }
            
            scores = []
            for hit in results.get('hits', {}).get('hits', []):
                score = hit.get('_score', 0)
                scores.append(score)
                
                result_info = {
                    'id': hit.get('_id'),
                    'score': score,
                    'source': hit.get('_source', {}),
                    'explanation': hit.get('_explanation', {})
                }
                analysis['results'].append(result_info)
            
            # 计算分数分布
            if scores:
                analysis['score_distribution'] = {
                    'min': min(scores),
                    'max': max(scores),
                    'avg': sum(scores) / len(scores),
                    'std_dev': math.sqrt(sum((x - sum(scores) / len(scores)) ** 2 for x in scores) / len(scores))
                }
            
            # 计算相关性指标
            if expected_results:
                analysis['relevance_metrics'] = self._calculate_relevance_metrics(
                    [r['id'] for r in analysis['results'][:10]], 
                    expected_results
                )
            
            return analysis
            
        except Exception as e:
            print(f"Error analyzing search results: {e}")
            return {}
    
    def _calculate_relevance_metrics(self, actual_results, expected_results):
        """计算相关性指标(精确率、召回率、NDCG等)"""
        # 计算精确率@K
        def precision_at_k(actual, expected, k):
            actual_k = actual[:k]
            relevant_retrieved = len(set(actual_k) & set(expected))
            return relevant_retrieved / k if k > 0 else 0
        
        # 计算召回率@K
        def recall_at_k(actual, expected, k):
            actual_k = actual[:k]
            relevant_retrieved = len(set(actual_k) & set(expected))
            return relevant_retrieved / len(expected) if len(expected) > 0 else 0
        
        # 计算NDCG@K
        def ndcg_at_k(actual, expected, k):
            actual_k = actual[:k]
            dcg = 0
            for i, doc_id in enumerate(actual_k):
                if doc_id in expected:
                    relevance = 1  # 简化的相关性评分
                    dcg += relevance / math.log2(i + 2)
            
            # 理想DCG
            idcg = sum(1 / math.log2(i + 2) for i in range(min(k, len(expected))))
            
            return dcg / idcg if idcg > 0 else 0
        
        metrics = {}
        for k in [1, 3, 5, 10]:
            metrics[f'precision@{k}'] = precision_at_k(actual_results, expected_results, k)
            metrics[f'recall@{k}'] = recall_at_k(actual_results, expected_results, k)
            metrics[f'ndcg@{k}'] = ndcg_at_k(actual_results, expected_results, k)
        
        return metrics
    
    def optimize_query_weights(self, index_name, test_queries):
        """优化查询权重"""
        optimization_results = []
        
        for test_case in test_queries:
            query_text = test_case['query']
            expected_results = test_case.get('expected_results', [])
            
            print(f"优化查询: {query_text}")
            
            # 测试不同的字段权重组合
            weight_combinations = [
                {"title": 3, "content": 1, "tags": 2},
                {"title": 5, "content": 1, "tags": 3},
                {"title": 2, "content": 1, "tags": 1},
                {"title": 4, "content": 2, "tags": 2}
            ]
            
            best_combination = None
            best_score = 0
            
            for weights in weight_combinations:
                query = {
                    "multi_match": {
                        "query": query_text,
                        "fields": [f"{field}^{weight}" for field, weight in weights.items()],
                        "type": "best_fields"
                    }
                }
                
                analysis = self.analyze_search_results(index_name, query, expected_results)
                
                if analysis and 'relevance_metrics' in analysis:
                    # 使用NDCG@10作为主要评估指标
                    ndcg_score = analysis['relevance_metrics'].get('ndcg@10', 0)
                    
                    if ndcg_score > best_score:
                        best_score = ndcg_score
                        best_combination = weights
            
            optimization_results.append({
                'query': query_text,
                'best_weights': best_combination,
                'best_ndcg_score': best_score
            })
        
        return optimization_results
    
    def create_custom_scoring_query(self, base_query, boost_factors):
        """创建自定义评分查询"""
        function_score_query = {
            "function_score": {
                "query": base_query,
                "functions": [],
                "score_mode": "multiply",
                "boost_mode": "multiply"
            }
        }
        
        # 添加时间衰减函数
        if 'time_decay' in boost_factors:
            function_score_query["function_score"]["functions"].append({
                "gauss": {
                    "publish_date": {
                        "origin": "now",
                        "scale": boost_factors['time_decay'].get('scale', '30d'),
                        "decay": boost_factors['time_decay'].get('decay', 0.5)
                    }
                }
            })
        
        # 添加字段值因子
        if 'field_factors' in boost_factors:
            for field, config in boost_factors['field_factors'].items():
                function_score_query["function_score"]["functions"].append({
                    "field_value_factor": {
                        "field": field,
                        "factor": config.get('factor', 1.0),
                        "modifier": config.get('modifier', 'none'),
                        "missing": config.get('missing', 1)
                    }
                })
        
        # 添加脚本评分
        if 'script_score' in boost_factors:
            function_score_query["function_score"]["functions"].append({
                "script_score": {
                    "script": {
                        "source": boost_factors['script_score']
                    }
                }
            })
        
        return function_score_query
    
    def benchmark_queries(self, index_name, queries, iterations=10):
        """基准测试查询性能"""
        benchmark_results = []
        
        for query_name, query_body in queries.items():
            print(f"基准测试查询: {query_name}")
            
            times = []
            for i in range(iterations):
                start_time = datetime.now()
                
                try:
                    response = self.session.post(
                        f"{self.es_host}/{index_name}/_search",
                        json=query_body
                    )
                    response.raise_for_status()
                    
                    end_time = datetime.now()
                    duration_ms = (end_time - start_time).total_seconds() * 1000
                    times.append(duration_ms)
                    
                except Exception as e:
                    print(f"查询执行失败: {e}")
                    continue
            
            if times:
                benchmark_results.append({
                    'query_name': query_name,
                    'avg_time_ms': sum(times) / len(times),
                    'min_time_ms': min(times),
                    'max_time_ms': max(times),
                    'std_dev_ms': math.sqrt(sum((x - sum(times) / len(times)) ** 2 for x in times) / len(times)),
                    'iterations': len(times)
                })
        
        return benchmark_results
    
    def generate_optimization_recommendations(self, index_name):
        """生成优化建议"""
        recommendations = []
        
        try:
            # 获取索引设置和映射
            response = self.session.get(f"{self.es_host}/{index_name}")
            response.raise_for_status()
            index_info = response.json()
            
            index_settings = index_info[index_name]['settings']['index']
            index_mappings = index_info[index_name]['mappings']
            
            # 分析分片配置
            num_shards = int(index_settings.get('number_of_shards', 1))
            num_replicas = int(index_settings.get('number_of_replicas', 1))
            
            # 获取索引统计信息
            stats_response = self.session.get(f"{self.es_host}/{index_name}/_stats")
            stats_response.raise_for_status()
            index_stats = stats_response.json()
            
            total_docs = index_stats['indices'][index_name]['total']['docs']['count']
            index_size_bytes = index_stats['indices'][index_name]['total']['store']['size_in_bytes']
            
            # 分片大小建议
            avg_shard_size_gb = (index_size_bytes / num_shards) / (1024**3)
            if avg_shard_size_gb > 50:
                recommendations.append({
                    'type': 'shard_size',
                    'priority': 'high',
                    'message': f"分片过大 ({avg_shard_size_gb:.1f}GB),建议增加分片数量"
                })
            elif avg_shard_size_gb < 1:
                recommendations.append({
                    'type': 'shard_size',
                    'priority': 'medium',
                    'message': f"分片过小 ({avg_shard_size_gb:.1f}GB),建议减少分片数量"
                })
            
            # 刷新间隔建议
            refresh_interval = index_settings.get('refresh_interval', '1s')
            if refresh_interval == '1s' and total_docs > 1000000:
                recommendations.append({
                    'type': 'refresh_interval',
                    'priority': 'medium',
                    'message': "大索引建议增加refresh_interval以提高索引性能"
                })
            
            # 映射分析
            properties = index_mappings.get('properties', {})
            
            # 检查未使用的字段
            for field_name, field_config in properties.items():
                if field_config.get('index') is False and field_config.get('type') not in ['geo_point', 'geo_shape']:
                    recommendations.append({
                        'type': 'unused_field',
                        'priority': 'low',
                        'message': f"字段 {field_name} 未被索引,如果不需要搜索可以考虑移除"
                    })
            
            # 检查text字段的分析器配置
            for field_name, field_config in properties.items():
                if field_config.get('type') == 'text':
                    if 'analyzer' not in field_config:
                        recommendations.append({
                            'type': 'analyzer_missing',
                            'priority': 'medium',
                            'message': f"text字段 {field_name} 未指定分析器,建议配置合适的分析器"
                        })
            
            return recommendations
            
        except Exception as e:
            print(f"Error generating recommendations: {e}")
            return []
    
    def print_optimization_report(self, index_name, benchmark_results, recommendations):
        """打印优化报告"""
        print("\n" + "="*80)
        print("Elasticsearch搜索相关性优化报告")
        print("="*80)
        print(f"索引: {index_name}")
        print(f"生成时间: {datetime.now().isoformat()}")
        
        # 基准测试结果
        if benchmark_results:
            print(f"\n查询性能基准测试:")
            print("-" * 40)
            for result in benchmark_results:
                print(f"查询: {result['query_name']}")
                print(f"  平均耗时: {result['avg_time_ms']:.2f}ms")
                print(f"  最小耗时: {result['min_time_ms']:.2f}ms")
                print(f"  最大耗时: {result['max_time_ms']:.2f}ms")
                print(f"  标准差: {result['std_dev_ms']:.2f}ms")
                print(f"  测试次数: {result['iterations']}")
                print()
        
        # 优化建议
        if recommendations:
            print(f"优化建议:")
            print("-" * 40)
            
            high_priority = [r for r in recommendations if r['priority'] == 'high']
            medium_priority = [r for r in recommendations if r['priority'] == 'medium']
            low_priority = [r for r in recommendations if r['priority'] == 'low']
            
            if high_priority:
                print("高优先级:")
                for rec in high_priority:
                    print(f"  🔴 {rec['message']}")
            
            if medium_priority:
                print("中优先级:")
                for rec in medium_priority:
                    print(f"  🟡 {rec['message']}")
            
            if low_priority:
                print("低优先级:")
                for rec in low_priority:
                    print(f"  🟢 {rec['message']}")

def main():
    parser = argparse.ArgumentParser(description='Elasticsearch搜索相关性调优工具')
    parser.add_argument('--host', required=True, help='Elasticsearch主机地址')
    parser.add_argument('--username', help='用户名')
    parser.add_argument('--password', help='密码')
    parser.add_argument('--index', required=True, help='要分析的索引名称')
    parser.add_argument('--queries-file', help='包含测试查询的JSON文件')
    parser.add_argument('--benchmark', action='store_true', help='执行查询基准测试')
    parser.add_argument('--recommendations', action='store_true', help='生成优化建议')
    
    args = parser.parse_args()
    
    tuner = ElasticsearchRelevanceTuner(args.host, args.username, args.password)
    
    try:
        benchmark_results = []
        recommendations = []
        
        if args.benchmark and args.queries_file:
            # 加载测试查询
            with open(args.queries_file, 'r', encoding='utf-8') as f:
                test_queries = json.load(f)
            
            benchmark_results = tuner.benchmark_queries(args.index, test_queries)
        
        if args.recommendations:
            recommendations = tuner.generate_optimization_recommendations(args.index)
        
        # 生成报告
        tuner.print_optimization_report(args.index, benchmark_results, recommendations)
        
    except Exception as e:
        print(f"执行过程中发生错误: {e}")

if __name__ == "__main__":
    main()

性能测试与基准测试

负载测试脚本

#!/bin/bash
# scripts/elasticsearch_load_test.sh

set -euo pipefail

# 配置参数
ES_HOST="${ES_HOST:-http://localhost:9200}"
INDEX_NAME="${INDEX_NAME:-test-index}"
CONCURRENT_USERS="${CONCURRENT_USERS:-10}"
TEST_DURATION="${TEST_DURATION:-300}"
RAMP_UP_TIME="${RAMP_UP_TIME:-60}"

# 日志函数
log() {
    echo "[$(date '+%Y-%m-%d %H:%M:%S')] $1" >&2
}

# 创建测试数据
create_test_data() {
    local num_docs="${1:-10000}"
    
    log "创建测试数据,文档数量: $num_docs"
    
    # 创建索引映射
    curl -X PUT "$ES_HOST/$INDEX_NAME" -H 'Content-Type: application/json' -d '{
        "settings": {
            "number_of_shards": 3,
            "number_of_replicas": 1,
            "refresh_interval": "30s"
        },
        "mappings": {
            "properties": {
                "id": {"type": "keyword"},
                "title": {
                    "type": "text",
                    "analyzer": "standard",
                    "fields": {
                        "keyword": {"type": "keyword", "ignore_above": 256}
                    }
                },
                "content": {"type": "text", "analyzer": "standard"},
                "category": {"type": "keyword"},
                "tags": {"type": "keyword"},
                "publish_date": {"type": "date"},
                "view_count": {"type": "integer"},
                "rating": {"type": "float"}
            }
        }
    }'
    
    # 生成批量插入数据
    python3 << EOF
import json
import random
from datetime import datetime, timedelta

categories = ["technology", "science", "business", "sports", "entertainment"]
tags_pool = ["elasticsearch", "database", "search", "performance", "optimization", "big data", "analytics"]

bulk_data = []
for i in range($num_docs):
    doc_id = f"doc_{i:06d}"
    
    # 创建索引操作
    bulk_data.append(json.dumps({"index": {"_id": doc_id}}))
    
    # 创建文档数据
    doc = {
        "id": doc_id,
        "title": f"Test Document {i} - Sample Title with Keywords",
        "content": f"This is test content for document {i}. It contains various keywords for testing search functionality. Lorem ipsum dolor sit amet, consectetur adipiscing elit.",
        "category": random.choice(categories),
        "tags": random.sample(tags_pool, random.randint(1, 3)),
        "publish_date": (datetime.now() - timedelta(days=random.randint(0, 365))).isoformat(),
        "view_count": random.randint(0, 10000),
        "rating": round(random.uniform(1.0, 5.0), 1)
    }
    bulk_data.append(json.dumps(doc))

# 写入文件
with open('/tmp/bulk_data.json', 'w') as f:
    f.write('\n'.join(bulk_data) + '\n')
EOF
    
    # 批量插入数据
    curl -X POST "$ES_HOST/$INDEX_NAME/_bulk" \
        -H 'Content-Type: application/x-ndjson' \
        --data-binary @/tmp/bulk_data.json
    
    # 刷新索引
    curl -X POST "$ES_HOST/$INDEX_NAME/_refresh"
    
    log "测试数据创建完成"
}

# 执行搜索负载测试
run_search_load_test() {
    log "开始搜索负载测试"
    
    # 创建JMeter测试计划
    cat > /tmp/elasticsearch_test_plan.jmx << 'EOF'
<?xml version="1.0" encoding="UTF-8"?>
<jmeterTestPlan version="1.2" properties="5.0" jmeter="5.4.1">
  <hashTree>
    <TestPlan guiclass="TestPlanGui" testclass="TestPlan" testname="Elasticsearch Load Test" enabled="true">
      <stringProp name="TestPlan.comments"></stringProp>
      <boolProp name="TestPlan.functional_mode">false</boolProp>
      <boolProp name="TestPlan.tearDown_on_shutdown">true</boolProp>
      <boolProp name="TestPlan.serialize_threadgroups">false</boolProp>
      <elementProp name="TestPlan.arguments" elementType="Arguments" guiclass="ArgumentsPanel" testclass="Arguments" testname="User Defined Variables" enabled="true">
        <collectionProp name="Arguments.arguments"/>
      </elementProp>
      <stringProp name="TestPlan.user_define_classpath"></stringProp>
    </TestPlan>
    <hashTree>
      <ThreadGroup guiclass="ThreadGroupGui" testclass="ThreadGroup" testname="Search Thread Group" enabled="true">
        <stringProp name="ThreadGroup.on_sample_error">continue</stringProp>
        <elementProp name="ThreadGroup.main_controller" elementType="LoopController" guiclass="LoopControllerGui" testclass="LoopController" testname="Loop Controller" enabled="true">
          <boolProp name="LoopController.continue_forever">false</boolProp>
          <intProp name="LoopController.loops">-1</intProp>
        </elementProp>
        <stringProp name="ThreadGroup.num_threads">${CONCURRENT_USERS}</stringProp>
        <stringProp name="ThreadGroup.ramp_time">${RAMP_UP_TIME}</stringProp>
        <boolProp name="ThreadGroup.scheduler">true</boolProp>
        <stringProp name="ThreadGroup.duration">${TEST_DURATION}</stringProp>
        <stringProp name="ThreadGroup.delay"></stringProp>
      </ThreadGroup>
      <hashTree>
        <HTTPSamplerProxy guiclass="HttpTestSampleGui" testclass="HTTPSamplerProxy" testname="Search Request" enabled="true">
          <elementProp name="HTTPsampler.Arguments" elementType="Arguments" guiclass="HTTPArgumentsPanel" testclass="Arguments" testname="User Defined Variables" enabled="true">
            <collectionProp name="Arguments.arguments">
              <elementProp name="" elementType="HTTPArgument">
                <boolProp name="HTTPArgument.always_encode">false</boolProp>
                <stringProp name="Argument.value">{
  "query": {
    "bool": {
      "should": [
        {"match": {"title": "${__RandomString(5,abcdefghijklmnopqrstuvwxyz)}"}},
        {"match": {"content": "${__RandomString(8,abcdefghijklmnopqrstuvwxyz)}"}},
        {"term": {"category": "${__RandomFromMultipleVars(technology|science|business|sports|entertainment)}"}}
      ]
    }
  },
  "size": 10,
  "sort": [{"_score": {"order": "desc"}}]
}</stringProp>
                <stringProp name="Argument.metadata">=</stringProp>
                <boolProp name="HTTPArgument.use_equals">true</boolProp>
                <stringProp name="Argument.name"></stringProp>
              </elementProp>
            </collectionProp>
          </elementProp>
          <stringProp name="HTTPSampler.domain">localhost</stringProp>
          <stringProp name="HTTPSampler.port">9200</stringProp>
          <stringProp name="HTTPSampler.protocol">http</stringProp>
          <stringProp name="HTTPSampler.contentEncoding"></stringProp>
          <stringProp name="HTTPSampler.path">/${INDEX_NAME}/_search</stringProp>
          <stringProp name="HTTPSampler.method">POST</stringProp>
          <boolProp name="HTTPSampler.follow_redirects">true</boolProp>
          <boolProp name="HTTPSampler.auto_redirects">false</boolProp>
          <boolProp name="HTTPSampler.use_keepalive">true</boolProp>
          <boolProp name="HTTPSampler.DO_MULTIPART_POST">false</boolProp>
          <stringProp name="HTTPSampler.embedded_url_re"></stringProp>
          <stringProp name="HTTPSampler.connect_timeout"></stringProp>
          <stringProp name="HTTPSampler.response_timeout"></stringProp>
        </HTTPSamplerProxy>
        <hashTree>
          <HeaderManager guiclass="HeaderPanel" testclass="HeaderManager" testname="HTTP Header Manager" enabled="true">
            <collectionProp name="HeaderManager.headers">
              <elementProp name="" elementType="Header">
                <stringProp name="Header.name">Content-Type</stringProp>
                <stringProp name="Header.value">application/json</stringProp>
              </elementProp>
            </collectionProp>
          </HeaderManager>
          <hashTree/>
        </hashTree>
      </hashTree>
    </hashTree>
  </hashTree>
</jmeterTestPlan>
EOF
    
    # 使用Apache Bench进行简单负载测试
    log "使用Apache Bench执行负载测试"
    
    # 创建搜索请求文件
    cat > /tmp/search_request.json << EOF
{
  "query": {
    "multi_match": {
      "query": "test document",
      "fields": ["title^2", "content"]
    }
  },
  "size": 10
}
EOF
    
    # 执行负载测试
    ab -n 1000 -c $CONCURRENT_USERS -T 'application/json' -p /tmp/search_request.json \
        "$ES_HOST/$INDEX_NAME/_search" > /tmp/ab_results.txt
    
    log "负载测试完成,结果保存在 /tmp/ab_results.txt"
}

# 监控集群性能
monitor_cluster_performance() {
    local duration="${1:-300}"
    local interval="${2:-10}"
    
    log "开始监控集群性能,持续时间: ${duration}秒"
    
    local end_time=$(($(date +%s) + duration))
    local output_file="/tmp/cluster_performance_$(date +%Y%m%d_%H%M%S).csv"
    
    # 写入CSV头部
    echo "timestamp,heap_used_percent,query_total,query_time_avg_ms,index_total,index_time_avg_ms,search_current,index_current" > "$output_file"
    
    while [ $(date +%s) -lt $end_time ]; do
        local timestamp=$(date '+%Y-%m-%d %H:%M:%S')
        
        # 获取节点统计信息
        local stats=$(curl -s "$ES_HOST/_nodes/stats/jvm,indices")
        
        # 解析统计信息
        local heap_used_percent=$(echo "$stats" | jq -r '.nodes | to_entries[0].value.jvm.mem.heap_used_percent')
        local query_total=$(echo "$stats" | jq -r '.nodes | to_entries[0].value.indices.search.query_total')
        local query_time_ms=$(echo "$stats" | jq -r '.nodes | to_entries[0].value.indices.search.query_time_in_millis')
        local index_total=$(echo "$stats" | jq -r '.nodes | to_entries[0].value.indices.indexing.index_total')
        local index_time_ms=$(echo "$stats" | jq -r '.nodes | to_entries[0].value.indices.indexing.index_time_in_millis')
        local search_current=$(echo "$stats" | jq -r '.nodes | to_entries[0].value.indices.search.query_current')
        local index_current=$(echo "$stats" | jq -r '.nodes | to_entries[0].value.indices.indexing.index_current')
        
        # 计算平均时间
        local query_time_avg=0
        local index_time_avg=0
        
        if [ "$query_total" != "null" ] && [ "$query_total" -gt 0 ]; then
            query_time_avg=$(echo "scale=2; $query_time_ms / $query_total" | bc)
        fi
        
        if [ "$index_total" != "null" ] && [ "$index_total" -gt 0 ]; then
            index_time_avg=$(echo "scale=2; $index_time_ms / $index_total" | bc)
        fi
        
        # 写入CSV
        echo "$timestamp,$heap_used_percent,$query_total,$query_time_avg,$index_total,$index_time_avg,$search_current,$index_current" >> "$output_file"
        
        # 打印当前状态
        echo "[$timestamp] 堆内存: ${heap_used_percent}%, 查询: ${search_current}, 索引: ${index_current}"
        
        sleep $interval
    done
    
    log "性能监控完成,数据保存在: $output_file"
}

# 生成性能报告
generate_performance_report() {
    log "生成性能测试报告"
    
    local report_file="/tmp/elasticsearch_performance_report_$(date +%Y%m%d_%H%M%S).html"
    
    cat > "$report_file" << 'EOF'
<!DOCTYPE html>
<html>
<head>
    <title>Elasticsearch性能测试报告</title>
    <meta charset="UTF-8">
    <style>
        body { font-family: Arial, sans-serif; margin: 20px; }
        .header { background-color: #f0f0f0; padding: 20px; border-radius: 5px; }
        .section { margin: 20px 0; }
        .metric { display: inline-block; margin: 10px; padding: 15px; background-color: #e8f4f8; border-radius: 5px; }
        .chart { width: 100%; height: 400px; border: 1px solid #ccc; margin: 10px 0; }
        table { border-collapse: collapse; width: 100%; }
        th, td { border: 1px solid #ddd; padding: 8px; text-align: left; }
        th { background-color: #f2f2f2; }
    </style>
    <script src="https://cdn.jsdelivr.net/npm/chart.js"></script>
</head>
<body>
    <div class="header">
        <h1>Elasticsearch性能测试报告</h1>
        <p>生成时间: $(date)</p>
        <p>测试索引: $INDEX_NAME</p>
        <p>并发用户: $CONCURRENT_USERS</p>
        <p>测试持续时间: $TEST_DURATION 秒</p>
    </div>
    
    <div class="section">
        <h2>负载测试结果</h2>
        <div id="load-test-results">
            <!-- 这里会插入负载测试结果 -->
        </div>
    </div>
    
    <div class="section">
        <h2>集群性能指标</h2>
        <canvas id="performanceChart" class="chart"></canvas>
    </div>
    
    <div class="section">
        <h2>优化建议</h2>
        <ul>
            <li>根据查询模式优化索引映射</li>
            <li>调整JVM堆内存大小</li>
            <li>优化分片和副本配置</li>
            <li>使用适当的刷新间隔</li>
            <li>启用查询缓存和请求缓存</li>
        </ul>
    </div>
    
    <script>
        // 这里可以添加图表渲染代码
        const ctx = document.getElementById('performanceChart').getContext('2d');
        const chart = new Chart(ctx, {
            type: 'line',
            data: {
                labels: ['时间1', '时间2', '时间3', '时间4', '时间5'],
                datasets: [{
                    label: '查询响应时间(ms)',
                    data: [12, 19, 3, 5, 2],
                    borderColor: 'rgb(75, 192, 192)',
                    tension: 0.1
                }]
            },
            options: {
                responsive: true,
                scales: {
                    y: {
                        beginAtZero: true
                    }
                }
            }
        });
    </script>
</body>
</html>
EOF
    
    log "性能报告已生成: $report_file"
}

# 清理测试数据
cleanup_test_data() {
    log "清理测试数据"
    
    curl -X DELETE "$ES_HOST/$INDEX_NAME"
    rm -f /tmp/bulk_data.json /tmp/search_request.json /tmp/ab_results.txt
    
    log "测试数据清理完成"
}

# 主函数
main() {
    local action="${1:-all}"
    
    case "$action" in
        "create-data")
            create_test_data "${2:-10000}"
            ;;
        "load-test")
            run_search_load_test
            ;;
        "monitor")
            monitor_cluster_performance "${2:-300}" "${3:-10}"
            ;;
        "report")
            generate_performance_report
            ;;
        "cleanup")
            cleanup_test_data
            ;;
        "all")
            create_test_data 10000
            run_search_load_test &
            LOAD_TEST_PID=$!
            
            monitor_cluster_performance 300 10 &
            MONITOR_PID=$!
            
            wait $LOAD_TEST_PID
            wait $MONITOR_PID
            
            generate_performance_report
            ;;
        *)
            echo "用法: $0 {create-data|load-test|monitor|report|cleanup|all}"
            echo "  create-data [数量] - 创建测试数据"
            echo "  load-test          - 执行负载测试"
            echo "  monitor [时长] [间隔] - 监控集群性能"
            echo "  report             - 生成性能报告"
            echo "  cleanup            - 清理测试数据"
            echo "  all                - 执行完整测试流程"
            exit 1
            ;;
    esac
}

# 检查依赖
check_dependencies() {
    local missing_deps=()
    
    if ! command -v curl &> /dev/null; then
        missing_deps+=("curl")
    fi
    
    if ! command -v jq &> /dev/null; then
        missing_deps+=("jq")
    fi
    
    if ! command -v bc &> /dev/null; then
        missing_deps+=("bc")
    fi
    
    if [ ${#missing_deps[@]} -gt 0 ]; then
        echo "缺少依赖: ${missing_deps[*]}"
        echo "请安装缺少的依赖后重试"
        exit 1
    fi
}

# 检查依赖并执行主函数
check_dependencies
main "$@"

总结

本文深入探讨了Elasticsearch搜索引擎的全面优化策略,涵盖了以下关键领域:

核心优化要点

  1. 索引设计优化

    • 合理的映射配置和字段类型选择
    • 自定义分析器和分词器配置
    • 索引模板和生命周期管理
  2. 查询性能调优

    • DSL查询优化技巧
    • 过滤器vs查询的合理使用
    • 自定义评分和相关性调优
  3. 集群配置优化

    • JVM参数调优
    • 系统级优化配置
    • 缓存策略优化
  4. 监控和诊断

    • 性能监控脚本
    • 慢查询分析
    • 集群健康状态监控

最佳实践建议

  • 分片策略: 根据数据量和查询模式合理设置分片数量
  • 内存管理: 优化JVM堆内存和系统内存配置
  • 查询优化: 使用过滤器、缓存和合适的查询类型
  • 监控告警: 建立完善的监控体系和告警机制

通过系统性的优化实践,可以显著提升Elasticsearch集群的性能、稳定性和可扩展性,为企业级搜索应用提供强有力的技术支撑。

分享文章