1 ETL详解
1.1 ETL
https://www.cnblogs.com/yjd_hycf_space/p/7772722.html
ETL是将业务系统的数据经过抽取、清洗转换之后加载到数据仓库的过程。
ETL的设计分三部分:数据抽取、数据的清洗转换、数据的加载。
1.1.1 ElasticSearch
全文搜索引擎:http://www.ruanyifeng.com/blog/2017/08/elasticsearch.html
1.1.2 Kibana
通过Kibana,能够对Elasticsearch中的数据进行可视化并在Elastic Stack进行操作。
Kibana核心产品搭载了一批经典功能:柱状图、线状图、饼图、旭日图等。
https://www.elastic.co/cn/products/kibana
Kibana是一个针对Elasticsearch的开源分析及可视化平台,用来搜索、查看交互存储在Elasticsearch索引中的数据。使用Kibana,可以通过各种图表进行高级数据分析及展示
1.1.3 Logstash
https://www.elastic.co/cn/products/logstash
Logstash是开源的服务器端数据处理管道,能够同时从多个来源采集数据,转换数据,然后将数据存储到数据库。
Logstash支持各种输入选择,可以在同一时间从众多常用来源捕捉时间。能够以连续的流式传输方式,轻松地从日志、指标、Web应用、数据存储以及各种AWS服务采集数据。
Logstash 提供众多输出选择,您可以将数据发送到您要指定的地方,并且能够灵活地解锁众多下游用例。
2 ETL启动报错
2.0.1 启动检查没有通过
修改/etc/security/limits.conf
修改/etc/security/limits.d/90-nproc.conf
3 ETL实例
3.1 elasticsearch常用请求
3.1.1 查看索引目录(装有es的机器上执行)
curl "localhost:9200/_cat/indices?v"
3.1.2 创建customer索引
curl -XPUT "localhost:9200/customer?pretty"
3.1.3 给customer索引创建文档
curl -XPUT "localhost:9200/customer/external/1?pretty" -d '{"name":"TEST"}'
3.1.4 在索引上查找文档
curl -XGET "localhost:9200/customer/external/1?pretty"
3.1.5 修改索引下的文档
curl -XPOST "localhost:9200/customer/external/1/_update?pretty" -d ' { "doc": { "name": "Lenovo","Location":"Beijing" }}'
3.2 ELK日志采集
ELK+syslog+nginx访问日志收集+分词处理
http://blog.51cto.com/lrtao2010/1949334
https://www.2cto.com/kf/201610/560348.html
https://blog.csdn.net/qq_22211217/article/details/80764568
3.2.1 控制台输入,控制台输出测试
编辑配置文件
将集群中的logstash停止,然后通过命令启动并指定刚创建的配置文件。
logstash -f test.conf
3.2.2 集群中修改配置文件logstash-data-source
添加配置组
添加配置如下
检测/var/log/ambary-server/ambary-server.log日志文件
查看ElasticSearch索引
curl "node18.sleap.com:9200/_cat/indices?v"
查看kibana展示
http://node16.sleap.com:5601
3.2.3 问题
logstash配置文件如何编写(过滤部分)
http://www.cnblogs.com/yincheng/p/logstash.html
kibana如何使用
https://www.elastic.co/cn/products/kibana
https://www.elastic.co/guide/cn/kibana/current/dashboard.html
ElasticSearch中数据如何查看
https://www.yiibai.com/elasticsearch/elasticsearch-getting-start.html
3.3 ELK与kafka整合
参考连接
https://sematext.com/blog/kafka-connect-elasticsearch-how-to/
https://www.cnblogs.com/smartloli/p/6978645.html
https://www.confluent.io/blog/the-simplest-useful-kafka-connect-data-pipeline-in-the-world-or-thereabouts-part-2/
http://www.cnblogs.com/JetpropelledSnake/p/10057545.html
http://www.demodashi.com/demo/10181.html
https://blog.csdn.net/qq_37502106/article/details/79262721
3.3.1 日志处理流程
使用java把日志传入kafka,然后通过kafka将日志发送给logstash,logstash再将日志写入elasticsearch,这样elasticsearch就有了日志数据了,最后,则使用kibana将存放在elasticsearch中的日志数据显示出来,并且可以做实时的数据图表分析等等。
zookeeper查看leader节点:echo stat | nc node17.sleap.com 2181
3.3.2 kafka常用命令
https://www.cnblogs.com/xtdxs/p/7112683.html
创建topic data1
kafka-topics --create --zookeeper node17.sleap.com:2181 --replication-factor 2 --partitions 1 --topic data2
查看所有topic
kafka-topics --list --zookeeper node17.sleap.com:2181
创建消费者消费topic data1
kafka-console-consumer --zookeeper node17.sleap.com:2181 --topic data2 --from-beginning
创建生产者
kafka-console-producer --broker-list node17.sleap.com:6667 --topic data2
3.3.3 查看ElasticSearch索引
curl "node18.sleap.com:9200/_cat/indices?v"
curl –XGET "node18.sleap.com:9200/kafka-logstash/_search"
3.3.4 kafka配置参数
https://www.elastic.co/guide/en/logstash/current/plugins-outputs-kafka.html
https://www.elastic.co/guide/en/logstash/current/plugins-inputs-kafka.html
https://segmentfault.com/a/1190000016595992
logstash高低版本配置不同:(具体配置可查看对应版本的官网配置介绍)
https://www.elastic.co/guide/en/logstash/index.html
logstash 5
bootstrap_servers => “node17.sleap.com:6667”
topics => [“data1”]
logstash 2.4
zk_connect => “node17.sleap.com:2181”
topic_id => “data1”
控制台输入,控制台输出stdin-stdout.conf
input {
stdin {}
}
output {
stdout {
codec => rubydebug
}
}
kafka输入数据到logstash
input-kafka.conf
input {
kafka {
bootstrap_servers => "10.110.181.39:6667"
topics => ["data1"]
type => "kafka.logstash"
}
}
output {
if [type] == "kafka.logstash" {
stdout {
codec => rubydebug
}
}
}
logstash输出数据到kafka
input {
stdin {}
}
output {
kafka {
bootstrap_servers => "10.110.181.39:9092"
topic_id => "data1"
}
}
采集ambari-server.log到elasticsearch
# test ambari log
ambari-log.conf
input {
file {
path => "/var/log/ambari-server/ambari-server.log"
start_position => beginning
type => "ambari.log"
}
}
filter {
}
output {
if [type] == "ambari.log" {
elasticsearch {
hosts => ["node17.sleap.com:9200","node18.sleap.com:9200"]
index => "ambari-log"
}
}
}
查看logstash插件版本是否与kafka版本兼容。
./bin/logstash-plugin list –verbose
logstash-input-kafka (5.1.7)
logstash-output-kafka (5.1.6)
logstash_LEAP3.4.4.0-5.4.1+ldh1.2.0+c0001-b0048.el6.x86_64.rpm
kafka_LEAP3.4.4.0-0.9.0+ldh1.2.0+c0001-b0061.el6.x86_64.rpm
elasticsearch_LEAP3.4.4.0-5.4.1+ldh1.2.0+c0001-b0061.NOARCH.rpm
https://cwiki.apache.org/confluence/display/KAFKA/Compatibility+Matrix
kafka版本不兼容:Kafka Connect: versions <= 0.10.0 or >= 0.10.2
plugins-inputs-kafka配置
https://www.elastic.co/guide/en/logstash/5.4/plugins-inputs-kafka.html
3.3.5 kafka问题: Error reading field 'topics'
https://blog.csdn.net/badyting/article/details/56667873
3.4 hive数据导入ElasticSearch
https://blog.csdn.net/qianshangding0708/article/details/50388750
http://www.voidcn.com/article/p-ftrfzdop-bqu.html
3.5 ElasticSearch数据导入hive
https://blog.csdn.net/shan1369678/article/details/51331296
4 本地验证
centos7 下kafka的安装介绍:
https://segmentfault.com/a/1190000012990954
centos7 安装部署ELK 6.2.4:
http://blog.51cto.com/andyxu/2124697
Logstash连接kafka指南
https://wdxtub.com/2016/08/18/logstash-kafka-guide/
启动zookeeper
zookeeper-server-start.sh kafka_2.12-1.0.0/config/zookeeper.properties > /dev/null 2>&1 &
启动kafka
kafka-server-start.sh kafka_2.12-1.0.0/config/server.properties > /dev/null 2>&1 &
创建topic
kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic data1
查看topic
kafka-topics.sh --list --zookeeper localhost:2181
创建消费者
kafka-console-consumer.sh --zookeeper 10.110.181.50:2181 --topic data1 --from-beginning
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test
创建生产者
kafka-console-producer.sh --broker-list localhost:9092 --topic test
删除topic
kafka-topics.sh --zookeeper localhost:2181 --topic data1
/usr/share/logstash/bin/logstash -f /etc/logstash/conf.d/logstash.conf
查看elasticsearch的index
查看input-kafka的内容
/usr/share/logstash/bin/logstash -f input-kafka.conf
5 ELK实例
kafka接收数据,导入到logstash,过滤出特定ip日志,存储在hive。
来源:我是码农,转载请保留出处和链接!
本文链接:http://www.54manong.com/?id=1214
评论专区