一、介绍

  Packetbeat是elastic公司开发的网络抓包、嗅探以及分析工具。和tcpdump一样,它的底层依赖libpcap。但它比tcpdump、tcpcopy功能强大的多。
  Packetbeat是Elastic Stack 的一部分,因此能够与 Logstash、Elasticsearch 和 Kibana 无缝协作。你可以将数据存储到Elasticsearch,在Kibana中做数据展示(本文没有使用到Logstash)。
  Packetbeat的工作原理是捕获应用程序服务器之间的网络流量,解码应用程序层协议(HTTP,MySQL,Redis等),将请求与响应相关联,并记录感兴趣的字段。
  支持以下协议(7.0.1版本):

  • ICMP (v4 and v6)
    DHCP (v4)
    DNS
    HTTP
    AMQP 0.9.1
    Cassandra
    Mysql
    PostgreSQL
    Redis
    Thrift-RPC
    MongoDB
    Memcache
    NFS
    TLS

  具体介绍详见官网:https://www.elastic.co/guide/en/beats/packetbeat/current/index.html

二、部署

环境和版本清单

  • 操作系统:Centos 7
  • Packetbeat 7.0.1
  • Elasticsearch 7.0.1
  • Kibana 7.0.1 (Packetbeat, Elasticsearch, Kibana的版本最好保持一致)

1、Elasticsearch部署

采用容器化单点部署,本次并没有那么大的业务量

1
docker run -d --name es -p 9200:9200 -p 9300:9300  -v /opt/elk/es/data:/usr/share/elasticsearch/data -v /opt/elk/es/logs:/usr/share/elasticsearch/logs -v /opt/elk/es/config:/usr/share/elasticsearch/config docker.elastic.co/elasticsearch/elasticsearch:7.0.1

将宿主机的9200端口,9300端口映射到容器端口上,
/usr/share/elasticsearch/data 是elasticsearch的数据和索引存储目录
/usr/share/elasticsearch/logs 是elasticsearch的日志目录
/usr/share/elasticsearch/config 是elasticsearch的配置目录(先将容器内的这个目录拷贝出来,修改配置再挂到容器上,方便以后做配置修改)
elasticsearch.yml主要配置文件

1
2
3
4
5
6
7
cat /opt/elk/es/config/elasticsearch.yml
cluster.name: "docker-cluster"
network.host: 0.0.0.0
node.name: node-1
cluster.initial_master_nodes: ["node-1"]
http.cors.enabled: true
http.cors.allow-origin: "*"

验证:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
[root@jenkins ~]# curl 127.0.0.1:9200
{
"name" : "node-1",
"cluster_name" : "docker-cluster",
"cluster_uuid" : "F7EVXds5SQKoJLkZ4seTIw",
"version" : {
"number" : "7.0.1",
"build_flavor" : "default",
"build_type" : "docker",
"build_hash" : "e4efcb5",
"build_date" : "2019-04-29T12:56:03.145736Z",
"build_snapshot" : false,
"lucene_version" : "8.0.0",
"minimum_wire_compatibility_version" : "6.7.0",
"minimum_index_compatibility_version" : "6.0.0-beta1"
},
"tagline" : "You Know, for Search"
}

2、Kibana部署

采用容器化部署
需要设置一个环境变量ELASTICSEARCH_HOST,指定elasticsearch的数据涞源

1
docker run -d --name kibana -p 5601:5601 -e ELASTICSEARCH_HOSTS=http://192.168.0.2:9200 docker.elastic.co/kibana/kibana:7.0.1

将宿主机的5601端口映射到容器端口上

3、Packetbeat部署

直接使用二进制文件运行
下载 packetbeat-7.0.1-linux-x86_64.tar.gz
解压 tar zxvf packetbeat-7.0.1-linux-x86_64.tar.gz
解压后到目录结构为:

  • drwxr-xr-x@ 11 root 1660616606 352 5 7 09:34 .
    drwx——+ 113 root 1660616606 3616 5 8 14:20 ..
    -rw-r--r--@ 1 root 1660616606 41 4 29 20:04 .build_hash.txt
    -rw-r--r--@ 1 root 1660616606 13675 4 29 19:49 LICENSE.txt
    -rw-r--r--@ 1 root 1660616606 168334 4 29 19:49 NOTICE.txt
    -rw-r--r--@ 1 root 1660616606 832 4 29 20:04 README.md
    -rw-r--r--@ 1 root 1660616606 147301 4 29 20:01 fields.yml
    drwxr-xr-x@ 3 root 1660616606 96 4 29 20:01 kibana
    -rwxr-xr-x@ 1 root 1660616606 43535840 4 29 20:03 packetbeat
    -rw-r--r--@ 1 root 1660616606 60984 4 29 20:04 packetbeat.reference.yml
    -rw-------@ 1 root 1660616606 8309 4 29 20:04 packetbeat.yml

导出索引模板

1
packetbeat export template> packetbeat.template.json

安装模版

1
curl -XPUT -H'Content-Type:application/json' http//192.168.0.2:9200/_template/packetbeat-7.0.1 -d@packetbeat.template.json

配置文件:
packetbeat.yml
默认监听所有网卡,可以设置成你想监听的网卡

1
packetbeat.interfaces.device: any

指定对应协议的端口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
packetbeat.protocols:
- type: icmp
enabled: true
- type: amqp
ports: [5672]
- type: cassandra
ports: [9042]
- type: dhcpv4
ports: [67, 68]
- type: dns
ports: [53]
- type: http
ports: [80, 8080]
- type: memcache
ports: [11211]
- type: mysql
ports: [3306,3307]
- type: pgsql
ports: [5432]
- type: redis
ports: [6379]
- type: thrift
ports: [9090]
- type: mongodb
ports: [27017]
- type: nfs
ports: [2049]
- type: tls
ports:
- 443 # HTTPS
- 993 # IMAPS
- 995 # POP3S
- 5223 # XMPP over SSL
- 8443
- 8883 # Secure MQTT
- 9243 # Elasticsearch

如果要针对协议解析时,需要指明监听端口,可以使用脚本获取当前机器的该协议的对应端口
以下是我启动前获取http的监听端口,可以参考以下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
ports=''
ret=`netstat -nltp |grep LISTEN|grep -v ':::'|awk '{print $4}'|awk -F ':' '{print $2}'`
port1=`echo $ret|sed 's/ /,/g'`
ret2=`netstat -nltp |grep LISTEN|grep ':::'|awk '{print $4}'|awk -F ':' '{print $4}'`
port2=`echo $ret2|sed 's/ /,/g'`
if [ -z "$port1" ];then
ports=$port2
else
ports=$port1,$port2
fi
#echo $ports
if [ -f ${basedir}/packetbeat.yml ];then
rm -f ${basedir}/packetbeat.yml
fi
cp ${basedir}/packetbeat.yml.bak ${basedir}/packetbeat.yml
sed -i s/xxxxxx/$ports/g $basedir/packetbeat.yml

启动:

1
./packetbeat --path.config /usr/local/packetbeat

4、打卡kibana,创建pattern,就能看到数据

5、elasticsearch 定时清理数据脚本

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
#!/usr/bin/python
#-*- encoding: utf8 -*-
import requests
import json
import sys
reload(sys)
sys.setdefaultencoding("utf-8")
es_host = '192.168.0.2' # Elasticsearch地址
#新版本需要加这个请求头
headers = {
'Content-Type': 'application/json'
}
url = 'http://{}:9200/packetbeat-*/_delete_by_query?conflicts=proceed'.format(es_host)
data = {
"query": {
"range": {
"@timestamp": {
"lt": "now-1h", # 删除1小时前的数据
"format": "epoch_millis"
}
}
}
}
response = requests.post(url, headers=headers, data=json.dumps(data))
print(response.json())
# 删除后,需要执行forcemerge操作,手动释放磁盘空间
url2 ='http://{}:9200/_forcemerge?only_expunge_deletes=true&max_num_segments=1'.format(es_host)
response = requests.post(url2)
print(response.json())

参考连接:
https://www.elastic.co/guide/en/beats/packetbeat/current/index.html