EFK日志告警系统
注:现在新版本elastic stack已经支持将kafka作为输出和输入的目标
当日志不是结构化数据:*.log->filebeat->kafka->logstash->elasticsearch
当日志是结构化数据:*.log->filebeat->kafka->filebeat->elasticsearch
区别在于是否需要logstash进行日志的过滤和结构化;
结构化日志
连接示例配置文件
filebeat 1
此filebeat实例为日志采集端,即kafka生产者,可以有多个实例进行采集
filebeat.inputs:
- type: filestream #新版本常用文件流输入方式
id: my-filestream-id #每个文件流输入必须有一个唯一的 ID。省略或更改文件流 ID 可能会导致数据重复。如果没有唯一的 ID,filestream 无法正确跟踪文件的状态。
paths:
- /var/log/messages
- /var/log/*.log
- type: filestream
id: apache-filestream-id
paths:
- "/var/log/apache2/*"
fields:
apache: true #fields字段随后会被存储在es中通过kibana进行查询,便于识别这些日志数据来源于Apache服务;
#输出到kafka 此输出可以连接到 Kafka 0.8.2.0 及更高版本
output.kafka:
#用于读取集群元数据的brokers
hosts: ["kafka1:9092", "kafka2:9092", "kafka3:9092"]
#消息主题选择+分区
topic: '%{[fields.log_topic]}'
partition.round_robin:
reachable_only: false
required_acks: 1
compression: gzip
max_message_bytes: 1000000 #大于 max_message_bytes 的事件将被丢弃
kafka+kafka-ui
kafka为docker部署,镜像版本为:apache/kafka:3.7.0
docker-compose.yml
networks:
kafka:
name: kafka
driver: bridge
services:
kafka:
image: apache/kafka:3.7.0
restart: unless-stopped
user: root
volumes:
- ./config:/mnt/shared/config # 需要看kafka文档,需要挂载此路径下的配置文件才可以进行更新
- ./data:/var/lib/kafka/data
- ./logs:/opt/kafka/logs
- ./kraft-combined-logs:/var/log/kafka/kraft-combined-logs
ports:
- "9092:9092"
- "9093:9093"
- "9094:9094"
container_name: kafka
networks:
- kafka
kafka-ui:
container_name: kafka-ui
restart: unless-stopped
image: provectuslabs/kafka-ui:latest
ports:
- "8001:8080"
environment:
DYNAMIC_CONFIG_ENABLED: true
volumes:
- ./kafka-ui/dynamic_config.yaml:/etc/kafkaui/dynamic_config.yaml
deploy:
resources:
limits:
memory: 1G
networks:
- kafka
只列出示例配置文件中的原始部分和修改部分
server.properties
#listeners=PLAINTEXT://:9092,CONTROLLER://:9093
listeners=INSIDE://0.0.0.0:9092,OUTSIDE://0.0.0.0:9094,CONTROLLER://:9093
# Name of listener used for communication between brokers.
#inter.broker.listener.name=PLAINTEXT,INSIDE,OUTSIDE
inter.broker.listener.name=INSIDE
# Listener name, hostname and port the broker will advertise to clients.
# If not set, it uses the value for "listeners".
advertised.listeners=INSIDE://moniti.xxxxxx.cn:9092,OUTSIDE://monito.xxxxxx.cn:9094
# A comma-separated list of the names of the listeners used by the controller.
# If no explicit mapping set in `listener.security.protocol.map`, default will be using PLAINTEXT protocol
# This is required if running in KRaft mode.
controller.listener.names=CONTROLLER
# Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details
#listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
listener.security.protocol.map=INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT,CONTROLLER:PLAINTEXT
# A comma separated list of directories under which to store log files
log.dirs=/var/log/kafka/kraft-combined-logs
# The minimum age of a log file to be eligible for deletion due to age
log.retention.hours=168
dynamic_config.yaml
——此文件为kafka-ui自动生成
auth:
type: DISABLED #此位置可以接入ldap
kafka:
clusters:
- bootstrapServers: kafka:9092
name: kafka-log
properties: {}
readOnly: false
rbac:
roles: []
webclient: {}
filebeat 2
此filebeat为消费者,消费kafka中的日志信息并由processer处理后存入es对应的index中
filebeat.docker.yml
filebeat.inputs:
- type: kafka
hosts: # Kafka 引导主机(代理)列表
- kafka-broker-1:9092
- kafka-broker-2:9092
topics: ["my-topic"]
group_id: "filebeat" #消费者组 ID
fetch.min: 500000 #要等待的最小字节数,默认为 1。
processors:
- decode_json_fields:
fields: ["message"]
max_depth: 2
target: ""
overwrite_keys: true
add_error_key: true
# 输出到es
output.elasticsearch:
hosts: ["https://myEShost:9200"]
username: "filebeat_writer"
password: "YOUR_PASSWORD"
indices:
- index: "%{[fields.log_topic]}-%{[agent.version]}-%{+yyyy.MM.dd}"
when.contains:
fields.log_topic: "log_name_example"
- index: "filebeat-%{[agent.version]}-%{+yyyy.MM.dd}"
# 确保Filebeat可以使用索引模板
setup.ilm.enabled: false
setup.template.enabled: false
setup.template.overwrite: true
setup.template.json.enabled: true
setup.template.name: "filebeat-%{[agent.version]}"
setup.template.pattern: "filebeat-%{[agent.version]}-*"
setup.template.settings:
index.number_of_shards: 1
index.number_of_replicas: 0
logstash
日志数据按需进行过滤和处理
# 持久化部署
docker cp logstash:/usr/share/logstash/config ./
docker cp logstash:/usr/share/logstash/pipeline ./
# 配置文件 logstash.yml
http.host: "0.0.0.0"
xpack.monitoring.elasticsearch.hosts: [ "http://elasticsearch:9200" ]
xpack.monitoring.elasticsearch.username: "elastic"
xpack.monitoring.elasticsearch.password: "elastic"
# pipelines.yml
# This file is where you define your pipelines. You can define multiple.
# For more information on multiple pipelines, see the documentation:
# https://www.elastic.co/guide/en/logstash/current/multiple-pipelines.html
# 这个需要定义多条独立的管道,否则会混乱写入
- pipeline.id: main
path.config: "/usr/share/logstash/pipeline"
- pipeline.id: java_pipeline
path.config: "/usr/share/logstash/pipeline/java-nginx-pipeline.conf"
- pipeline.id: abroad_pipeline
path.config: "/usr/share/logstash/pipeline/java-nginx-pipeline-abroad.conf"
Elasticsearch
elasticsearch.yml
cluster.name: "docker-cluster"
network.host: 0.0.0.0
discovery.seed_hosts: ["192.168.8.99"]
#cluster.initial_master_nodes: ["docker-cluster"]
discovery.type: single-node
# 开启x-pack插件,用于添加账号密码、安全控制等配置
xpack.security.enabled: true
#xpack.security.http.ssl.enabled: false
xpack.security.transport.ssl.enabled: true
xpack.security.transport.ssl.verification_mode: certificate
xpack.security.transport.ssl.keystore.path: /usr/share/elasticsearch/data/certs/elastic-certificates.p12
xpack.security.transport.ssl.truststore.path: /usr/share/elasticsearch/data/certs/elastic-certificates.p12
#xpack.security.enrollment.enabled: true
Kibana
kibana.yml
# Default Kibana configuration for docker target
server.host: "0.0.0.0"
server.shutdownTimeout: "5s"
elasticsearch.hosts: [ "http://elasticsearch:9200" ]
monitoring.ui.container.elasticsearch.enabled: true
elasticsearch.ssl.certificateAuthorities: "/usr/share/kibana/config/certs/elastic-stack-ca.p12"
elasticsearch.username: "kibana"
elasticsearch.password: "elastic"
i18n.locale: "zh-CN"
Elastalert
配置文件
config.yaml
# This is the folder that contains the rule yaml files
# This can also be a list of directories
# Any .yaml file will be loaded as a rule
rules_folder: /opt/elastalert/rules
# How often ElastAlert will query Elasticsearch
# The unit can be anything from weeks to seconds
run_every:
minutes: 1
# ElastAlert will buffer results from the most recent
# period of time, in case some log sources are not in real time
buffer_time:
minutes: 15
# The Elasticsearch hostname for metadata writeback
# Note that every rule can have its own Elasticsearch host
es_host: 10.1.1.30
# The Elasticsearch port
es_port: 9200
# The AWS region to use. Set this when using AWS-managed elasticsearch
#aws_region: us-east-1
# The AWS profile to use. Use this if you are using an aws-cli profile.
# See http://docs.aws.amazon.com/cli/latest/userguide/cli-chap-getting-started.html
# for details
#profile: test
# Optional URL prefix for Elasticsearch
#es_url_prefix: elasticsearch
# Optional prefix for statsd metrics
#statsd_instance_tag: elastalert
# Optional statsd host
#statsd_host: dogstatsd
# Connect with TLS to Elasticsearch
#use_ssl: True
# Verify TLS certificates
#verify_certs: True
# Show TLS or certificate related warnings
#ssl_show_warn: True
# GET request with body is the default option for Elasticsearch.
# If it fails for some reason, you can pass 'GET', 'POST' or 'source'.
# See https://elasticsearch-py.readthedocs.io/en/master/connection.html?highlight=send_get_body_as#transport
# for details
#es_send_get_body_as: GET
# Option basic-auth username and password for Elasticsearch
es_username: elastic
es_password: elastic
# Use SSL authentication with client certificates client_cert must be
# a pem file containing both cert and key for client
#ca_certs: /path/to/cacert.pem
#client_cert: /path/to/client_cert.pem
#client_key: /path/to/client_key.key
# The index on es_host which is used for metadata storage
# This can be a unmapped index, but it is recommended that you run
# elastalert-create-index to set a mapping
writeback_index: elastalert_status
# If an alert fails for some reason, ElastAlert will retry
# sending the alert until this time period has elapsed
alert_time_limit:
days: 2
# 可选的时间戳格式。
# ElastAlert将使用此格式在警报消息和日志消息中打印时间戳。
custom_pretty_ts_format: '%Y-%m-%d %H:%M'
# 自定义日志配置
# 如果您希望设置自己的日志配置以记录到文件中,或记录到Logstash,并且/或者修改日志级别,请使用下面的配置并根据需要进行调整。
# 注意:如果您以--verbose/--debug运行ElastAlert,“elastalert”记录器的日志级别将更改为INFO,如果它还不是INFO/DEBUG。
logging:
version: 1
incremental: false
disable_existing_loggers: false
formatters:
logline:
format: '%(asctime)s %(levelname)+8s %(name)+20s %(message)s'
handlers:
console:
class: logging.StreamHandler
formatter: logline
level: DEBUG
stream: ext://sys.stderr
file:
class : logging.FileHandler
formatter: logline
level: DEBUG
filename: elastalert.log
loggers:
elastalert:
level: WARN
handlers: []
propagate: true
elasticsearch:
level: WARN
handlers: []
propagate: true
elasticsearch.trace:
level: WARN
handlers: []
propagate: true
'': # 根记录器
level: WARN
handlers:
- console
- file
propagate: false
nginx-rules.yaml
name: "Nginx-Error"
type: "frequency"
index: "nginx-error-8.13.2*" # 查询日志所在的索引
is_enabled: true
num_events: 3 # 事件出现的次数
timeframe:
minutes: 2 # 1分钟内出现了num_events几次就报警
realert:
minutes: 180 # 5分钟内忽略重复告警
timestamp_field: "@timestamp"
timestamp_type: "iso"
use_strftime_index: true
filter:
- query:
query_string:
query: "message:\"error\" NOT message:\"No such file or directory\" NOT message:\"forbidden by rule\"" # 告警查询语句
alert_text_type: alert_text_only
alert_text: "### Nginx-Error(Prod)日志告警\n- **采集位置:** {0}\n- **文件:** {1}\n- **匹配次数:** {2}\n- **日志id:** {3}\n\n- [点我去查询]()"
# 上述括号中填写查询链接
alert_text_args: # 告警模板中用到的参数
- agent.name
- "log.file.path"
- num_matches
- _id
- _id
alert:
- "dingtalk"
dingtalk_access_token: ""
dingtalk_msgtype: "markdown"
Enhancements自定义开发&镜像构建
Tip
初始化流程可以按照官方文档进行
Example:Ruby日志格式匹配提取request-id
创建文件夹
mkdir elastalert/elastalert_modules
touch __init__.py extract_request_id.py
extract_request_id.py
from elastalert.enhancements import BaseEnhancement
import re
class Extract_Requestid(BaseEnhancement):
def process(self, match):
# 定义正则表达式模式
pattern = r"\b[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}\b"
# 检查 'message' 字段是否存在于match中
if 'message' in match:
log_message = match['message']
match_obj = re.search(pattern, log_message)
if match_obj:
match['extract_requestid'] = match_obj.group(0)
else:
match['extract_requestid'] = "request id not found"
docker compose 部署
生产端
services:
filebeat:
image: elastic/filebeat:8.13.2
container_name: filebeat
user: root
restart: unless-stopped
volumes:
- ./filebeat.docker.yml:/usr/share/filebeat/filebeat.yml
- /var/lib/docker/containers:/var/lib/docker/containers:ro
- /var/run/docker.sock:/var/run/docker.sock:ro
- ./data:/usr/share/filebeat/data:rw
- /home:/home
- /var/log:/var/log
消费端
networks:
logstack:
name: logstack
driver: bridge
services:
filebeat:
image: elastic/filebeat:8.13.2
container_name: filebeat
restart: unless-stopped
ports:
- "5067:5067"
volumes:
- ./filebeat/filebeat.docker.yml:/usr/share/filebeat/filebeat.yml
- /var/lib/docker/containers:/var/lib/docker/containers:ro
- /var/run/docker.sock:/var/run/docker.sock:ro
- ./filebeat/data:/usr/share/filebeat/data:rw
networks:
- logstack
elasticsearch:
image: elastic/elasticsearch:8.13.2
container_name: elasticsearch
restart: unless-stopped
environment:
- "ES_JAVA_OPTS=-Xms6144m -Xmx6144m"
- "cluster.name=elasticsearch"
volumes:
- ./elasticsearch/data:/usr/share/elasticsearch/data
- ./elasticsearch/config/elasticsearch.yml:/usr/share/elasticsearch/config/elasticsearch.yml
ports:
- "9200:9200"
- "9300:9300"
networks:
- logstack
kibana:
image: elastic/kibana:8.13.2
container_name: kibana
restart: unless-stopped
volumes:
- ./kibana/config/kibana.yml:/usr/share/kibana/config/kibana.yml
- ./kibana/certs:/usr/share/kibana/config/certs
ports:
- "5601:5601"
depends_on:
- elasticsearch
networks:
- logstack
metricbeat:
image: elastic/metricbeat:8.13.2
container_name: metricbeat
restart: unless-stopped
volumes:
- ./metricbeat/metricbeat.docker.yml:/usr/share/metricbeat/metricbeat.yml:ro
- /var/run/docker.sock:/var/run/docker.sock:ro
- /sys/fs/cgroup:/hostfs/sys/fs/cgroup:ro
- /proc:/hostfs/proc:ro
- /:/hostfs:ro
networks:
- logstack
esalert:
image: jertel/elastalert2:2.18.0
container_name: esalert
restart: unless-stopped
user: root
volumes:
- ./esalert/config.yaml:/opt/elastalert/config.yaml
- ./esalert/rules:/opt/elastalert/rules
depends_on:
- elasticsearch
networks:
- logstack
environment:
TZ: "Asia/Shanghai"
logstash:
image: elastic/logstash:8.13.2
container_name: logstash
restart: always
volumes:
- ./logstash/config:/usr/share/logstash/config
- ./logstash/data:/usr/share/logstash/data
- ./logstash/pipeline:/usr/share/logstash/pipeline
- /var/lib/docker/containers:/var/lib/docker/containers:ro
- /var/run/docker.sock:/var/run/docker.sock:ro
networks:
- logstack
非结构化日志(未测试)
filebeat 1
filebeat.inputs:
- type: filestream #新版本常用文件流输入方式
id: my-filestream-id #每个文件流输入必须有一个唯一的 ID。省略或更改文件流 ID 可能会导致数据重复。如果没有唯一的 ID,filestream 无法正确跟踪文件的状态。
paths:
- /var/log/messages
- /var/log/*.log
- type: filestream
id: apache-filestream-id
paths:
- "/var/log/apache2/*"
fields:
apache: true #fields字段随后会被存储在es中通过kibana进行查询,便于识别这些日志数据来源于Apache服务;
#输出到kafka 此输出可以连接到 Kafka 0.8.2.0 及更高版本
output.kafka:
#用于读取集群元数据的brokers
hosts: ["kafka1:9092", "kafka2:9092", "kafka3:9092"]
#消息主题选择+分区
topic: '%{[fields.log_topic]}'
partition.round_robin:
reachable_only: false
required_acks: 1
compression: gzip
max_message_bytes: 1000000 #大于 max_message_bytes 的事件将被丢弃
kafka
#暂定
logstash
实例配置文件:
input {
kafka {
id => "my-kafka-consumer" # 唯一标识此Kafka消费者
topics => ["my-topic"] # Kafka主题列表
topic_id => "%{topic}" # 用于标识消息来自哪个主题
group_id => "my-consumer-group" # Kafka消费者组ID
bootstrap_servers => "localhost:9092" # Kafka服务器地址
# 其他可选配置
auto_offset_reset => "earliest" # 当没有初始偏移量时,从最早的消息开始消费
enable_auto_commit => true # 自动提交偏移量
max_poll_records => 500 # 每次轮询获取的最大记录数
max_poll_interval_ms => 300000 # 轮询间隔的最大毫秒数
}
}
filter {
# 在这里添加任何你需要的过滤或数据转换操作
# 例如,使用grok插件解析日志格式
grok {
match => { "message" => "%{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:level} %{WORD:thread} %{GREEDYDATA:message}" }
}
date {
match => ["timestamp", "ISO8601"] # 转换时间戳格式
}
mutate {
lowercase => ["level"] # 将日志级别转换为小写
}
}
output {
elasticsearch {
hosts => ["https://...]
cacert => '/etc/logstash/config/certs/ca.crt'
}
}
问题
-
this action would add [2] shards, but this cluster currently has [1000]/[1000] maximum normal shards open:es分片数量超限
PUT /_cluster/settings { "persistent": { "cluster": { "max_shards_per_node":10000 } } } #查询 GET /_cluster/settings?pretty
🍺转载文章请注明出处,谢谢!🍺