ELK - Elastic | Logstash | Kibana
http://bit.ly/elkubuntu - Hisham
ElasticSearch cluster, Logstash & Kibana (Elastic Stack) installation with x-pack on Ubuntu 18.04
ElasticSearch cluster, Logstash & Kibana (Elastic Stack) installation with x-pack on Ubuntu 18.04
Revision History
Date
|
Details
|
Version
|
|
1.4
| |
|
1.3
| |
|
1.2
| |
Formatting improvements
|
1.1
| |
30/5/2018
|
First revision
|
1.0
|
Table of Content
Simple Architectural Diagram
Java installation
Oracle Java
- Add repo
$ sudo add-apt-repository -y ppa:webupd8team/java
- Apt update:
$ sudo apt update
- Install java:
$ sudo apt -y install oracle-java8-installer
- Check if java is installed:
$ java -version
Open JDK
- Install openjdk:
$ sudo apt install openjdk-8-jdk
- Check if java is installed:
$ java -version
Elasticsearch
Installation
- Add repo key
$ wget -qO - https://artifacts.elastic.co/GPG-KEY-elasticsearch | sudo apt-key add -
- Install apt-transport-https (for secure apt package download)
$ sudo apt install apt-transport-https
- Add in elastic 6.x repo
$ echo "deb https://artifacts.elastic.co/packages/6.x/apt stable main" | sudo tee -a /etc/apt/sources.list.d/elastic-6.x.list
- Install elasticsearch
$ sudo apt update && sudo apt install elasticsearch -y
- Install x-pack plugin for elasticsearch (Optional)
$ sudo /usr/share/elasticsearch/bin/elasticsearch-plugin install x-pack
- Start elasticsearch
$ sudo systemctl start elasticsearch
- Enable on boot
$ sudo systemctl enable elasticsearch
Configuration
Change below configuration files accordingly
- /etc/hosts
$ cat /etc/hosts
127.0.0.1 localhost
127.0.1.1 elk-1
10.0.14.1 elk-1
- /etc/elasticsearch/elasticsearch.yml
cluster.name: my-cluster
node.name: elk-1 bootstrap.memory_lock: true # network.host: elk-1 # network.host: 192.168.8.137 network.host: 0.0.0.0 discovery.zen.ping.unicast.hosts: ["elk-1", "elk-2"] |
- /etc/default/elasticsearch (1024m is half of physical memory of the machine)
ES_JAVA_OPTS='-Xms1024m -Xmx1024m'
MAX_LOCKED_MEMORY=unlimited
- /usr/lib/systemd/system/elasticsearch.service
LimitMEMLOCK=infinity
[Install]
WantedBy=multi-user.target
- Check log to know whether there is any issue with elasticsearch configuration:
$ sudo tail -f /var/log/elasticsearch/elasticsearch.log
|
Restart elasticsearch
- Reload systemd daemon, because we have changed file in /usr/lib/systemd
$ sudo systemctl daemon-reload
- Restart elasticsearch
$ sudo systemctl enable elasticsearch.service
- Check elasticsearch service status
$ sudo systemctl status elasticsearch
- Check full systemd logs for elasticsearch
$ sudo journalctl --unit=elasticsearch
- Check elasticsearch using curl
$ curl elk-1:9200/?pretty
{
"name" : "elk-1",
"cluster_name" : "my-cluster",
"cluster_uuid" : "_na_",
"version" : {
"number" : "5.4.0",
"build_hash" : "780f8c4",
"build_date" : "2017-04-28T17:43:27.229Z",
"build_snapshot" : false,
"lucene_version" : "6.5.0"
},
"tagline" : "You Know, for Search"
}
- If elastic failed to start, append the /etc/security/limits.conf with below config, and reboot
elasticsearch soft memlock unlimited
elasticsearch hard memlock unlimited
- To test (you should see ‘“mlockall”: true’):
Kibana
Installation & access
- Install kibana
$ sudo apt install kibana
- Create log directory for kibana
$ sudo mkdir /var/log/kibana
$ sudo touch /var/log/kibana/kibana.log
$ sudo chown -R kibana /var/log/kibana
- Change some settings in /etc/kibana/kibana.yml
$ cat /etc/kibana/kibana.yml
server.host: "10.0.14.1"
elasticsearch.url: "http://elk-1:9200"
logging.dest: /var/log/kibana/kibana.log
- Enable & start kibana
$ sudo systemctl enable kibana
$ sudo systemctl start kibana
- Access kibana using curl
- Access kibana using a web browser
Creating index pattern
To create index pattern:
- Open kibana using web browser
- Go to “Management” → “Index Pattern”
- Click on the + sign
- Fill in your index name or pattern, and choose your Time-field name
- Click “Create”
Viewing index pattern
To view created index pattern:
- Click on “Discover”
- On the left hand side, choose your saved index pattern
- Choose your time period
- You can now view your created index pattern
Logstash
Installation & service management
- Install logstash
$ sudo apt install logstash
- Add some configuration in /etc/logstash/logstash.yml
$ cat /etc/logstash/logstash.yml
node.name: node1
path.data: /var/lib/logstash
path.config: /etc/logstash/conf.d
path.logs: /var/log/logstash
- Start logstash
$ sudo systemctl start logstash
- Enable logstash on boot
$ sudo systemctl enable logstash
Configuration sample
Logstash to receive log on port 8514 from rsyslog, simple filter using grok and send data to elasticsearch with index name
Configure logstash to listen to port 8514/tcp, and receive log data on 8514/tcp
- Put below settings to /etc/logstash/conf.d/10-input.conf
input {
tcp {
port => 8514
type => "syslog"
}
}
- Restart elasticsearch
$ sudo systemctl restart elasticsearch
- Check for listening port
$ sudo netstat -tulpn | grep 8514
tcp6 0 0 :::8514 :::* LISTEN 587/java
Simple grok to filter syslog to 2 parts, timestamp and message
- Put below config into /etc/logstash/conf.d/40-syslog-filter.conf
filter {
if [type] == "syslog" {
grok {
match => { "message" => "%{SYSLOGTIMESTAMP:timestamp} %{GREEDYDATA:message}" }
}
}
}
- Restart logstash
$ sudo systemctl restart logstash
Send filtered log to elasticsearch
- Put below settings to /etc/logstash/conf.d/80-syslog-filter.conf
output {
if "syslog" in [type] {
elasticsearch {
hosts => ["elk-1:9200"]
index => "logstash-%{+YYYY.MM.dd}"
}
}
}
- Restart logstash
$ sudo systemctl restart logstash
Set rsyslog to send logs to logstash
- Put below settings into /etc/rsyslog.d/logstash.conf
$ActionQueueFileName logstash1 # unique name prefix for spool files
$ActionQueueMaxDiskSpace 1g # 1gb space limit (use as much as possible)
$ActionQueueSaveOnShutdown on # save messages to disk on shutdown
$ActionQueueType LinkedList # run asynchronously
$ActionResumeRetryCount -1 # infinite retries if host is down
# remote host is: name/ip:port, e.g. 192.168.0.1:514, port optional
*.* @@elk-1:8514
- Restart rsyslog
$ sudo systemctl restart rsyslog
Input, filter and output for loading csv files into logstash
- Put below configurations into /etc/logstash/conf.d/csv_upload.conf
input {
file {
path => "/tmp/*.csv"
start_position => "beginning"
sincedb_path => "/dev/null"
type => "csv"
}
}
filter {
if [type] == "csv" {
csv {
separator => ","
columns => ["Date","Open","High","Low","Close","Volume","Weighted Price"]
}
}
}
output {
if "csv" in [type] {
elasticsearch {
hosts => "http://elk-1:9200"
index => "csv-%{+YYYY.MM.dd}"
}
}
}
- Restart logstash
$ sudo systemctl restart logstash
Better syslog grok filter
- Put below configurations into /etc/logstash/conf.d/41-better-syslog-filter.conf, to better parse date and message, and to remove timestamp field
filter {
if [type] == "syslog" {
grok {
match => {
"message" => [ "%{SYSLOG5424PRI}%{SYSLOGBASE2}","%{SYSLOGBASE2}", "%{SYSLOGPAMSESSION}", "%{CRONLOG}", "%{SYSLOGLINE}" ]
}
tag_on_failure => [ "failedPattern_syslog" ]
add_field => [ "received_at", "%{@timestamp}" ]
add_field => [ "received_from", "%{host}" ]
add_tag => [ "syslog" ]
}
date {
locale => "en"
timezone => "Asia/Kuala_Lumpur"
match => ["timestamp", "MMM d HH:mm:ss", "MMM dd HH:mm:ss", "ISO8601"]
remove_field => ["timestamp"]
}
date {
locale => "en"
timezone => "Asia/Kuala_Lumpur"
match => ["timestamp8601", "ISO8601"]
remove_field => ["timestamp8601"]
}
syslog_pri {
syslog_pri_field_name => "syslog5424_pri"
}
}
}
- Restart logstash
$ sudo systemctl restart logstash
Grok filter for parsing auth.log
- Put below configurations into /etc/logstash/conf.d/42-ssh-filter.conf
filter {
# start if utama
if "syslog" in [tags] and "sshd" in [program] {
### if yang mula ini tutupnya dihujung sekali.
### Start Rule 1a tanpa if
# 2015-12-31T08:12:44.598377+08:00 logindexer sshd[45885]: Accepted password for haris from 192.168.0.9 port 43824 ssh2
grok {
match => {
"message" => [ "(?:%{TIMESTAMP_ISO8601:timestamp}|%{SYSLOGTIMESTAMP:timestamp}) %{HOSTNAME:host_target} sshd\[%{BASE10NUM}\]: Accepted password for %{USERNAME:username} from %{IP:src_ip} port %{BASE10NUM:ssh_port} ssh2" ]
}
add_tag => [ "ssh_sucessful_login", "correlation" ]
remove_field => [ "timestamp" ]
}
### Start Rule 2
## 2015-12-30T12:43:39.892283+08:00 logindexer sshd[25461]: Received disconnect from 192.168.0.9: 11: disconnected by user
if "syslog" in [tags] and "_grokparsefailure" in [tags] {
grok {
match => {
"message" => [ "(?:%{TIMESTAMP_ISO8601:timestamp}|%{SYSLOGTIMESTAMP:timestamp})%{SPACE}%{HOSTNAME:host_target}%{SPACE}%{NOTSPACE}%{NOTSPACE}%{SPACE}Received disconnect from %{IP:src_ip}%{GREEDYDATA}" ]
}
add_tag => [ "ssh_disconnect", "correlation" ]
remove_field => [ "timestamp" ]
}
}
### Start Rule 3
if "syslog" in [tags] and "_grokparsefailure" in [tags] {
grok {
match => {
"message" => [ "(?:%{TIMESTAMP_ISO8601:timestamp}|%{SYSLOGTIMESTAMP:timestamp}) %{HOSTNAME:host_target} sshd\[%{BASE10NUM}\]: Failed password for invalid user %{USERNAME:username} from %{IP:src_ip} port %{BASE10NUM:ssh_port} ssh2" ]
}
add_tag => [ "ssh_brute_force_attack", "correlation" ]
remove_field => [ "timestamp" ]
}
}
### Start Rule 4
# 2015-12-30T12:30:02.759578+08:00 logindexer sshd[25434]: Failed password for haris from 192.168.0.9 port 35200 ssh2
if "syslog" in [tags] and "_grokparsefailure" in [tags] {
grok {
match => {
"message" => [ "(?:%{TIMESTAMP_ISO8601:timestamp}|%{SYSLOGTIMESTAMP:timestamp}) %{HOSTNAME:host_target} sshd\[%{BASE10NUM}\]: Failed password for %{USERNAME:username} from %{IP:src_ip} port %{BASE10NUM:ssh_port} ssh2" ]
}
add_tag => [ "ssh_failed_login", "correlation" ]
remove_field => [ "timestamp" ]
}
}
### Start Rule 5
if "syslog" in [tags] and "_grokparsefailure" in [tags] {
grok {
match => {
"message" => [ "(?:%{TIMESTAMP_ISO8601:timestamp}|%{SYSLOGTIMESTAMP:timestamp})%{SPACE}%{HOSTNAME:host_target}%{SPACE}%{NOTSPACE}%{NOTSPACE}%{SPACE}reverse mapping checking getaddrinfo for%{SPACE}%{NOTSPACE}%{SPACE}\[%{IP:src_ip}]%{SPACE}failed - POSSIBLE BREAK-IN ATTEMPT!" ]
}
add_tag => [ "ssh_break-in_attempt", "correlation" ]
remove_field => [ "timestamp" ]
}
}
### Start Rule 6
if "syslog" in [tags] and "_grokparsefailure" in [tags] {
grok {
match => {
"message" => [ "(?:%{TIMESTAMP_ISO8601:timestamp}|%{SYSLOGTIMESTAMP:timestamp})%{SPACE}%{HOSTNAME:host_target}%{SPACE}sshd%{NOTSPACE}%{SPACE}input_userauth_request: invalid user%{SPACE}%{USERNAME:username}%{SPACE}\[preauth]" ]
}
add_tag => [ "ssh_input_userauth_request", "correlation" ]
remove_field => [ "timestamp" ]
}
}
### Start Rule 7
if "syslog" in [tags] and "_grokparsefailure" in [tags] {
grok {
match => {
"message" => [ "(?:%{TIMESTAMP_ISO8601:timestamp}|%{SYSLOGTIMESTAMP:timestamp})%{SPACE}%{HOSTNAME:host_target}%{SPACE}%{NOTSPACE}%{SPACE}pam_unix%{NOTSPACE}%{SPACE}authentication failure;%{SPACE}logname=%{SPACE:logname}uid=%{BASE10NUM:uid}%{SPACE}euid=%{BASE10NUM:euid}%{SPACE}tty=%{NOTSPACE:tty}%{SPACE}ruser=%{SPACE:ruser}%{SPACE}rhost=%{IP:src_ip}%{SPACE}user=%{USERNAME:username}" ]
}
add_tag => [ "ssh_pam_authentication_failure", "correlation" ]
remove_field => [ "timestamp" ]
}
}
### Start Rule 8
# 2015-12-13T11:08:25.579252+08:00 logindexer sshd[10237]: pam_unix(sshd:session): session opened for user haris by (uid=0)
if "syslog" in [tags] and "_grokparsefailure" in [tags] {
grok {
match => {
"message" => [ "(?:%{TIMESTAMP_ISO8601:timestamp}|%{SYSLOGTIMESTAMP:timestamp})%{SPACE}%{HOSTNAME:host_target}%{SPACE}sshd\[%{BASE10NUM}]: pam_unix\(sshd:session\): session opened for user %{USERNAME:username} by \(uid=%{BASE10NUM:uid}\)" ]
}
add_tag => [ "ssh_pam_session_open", "correlation" ]
remove_field => [ "timestamp" ]
}
}
### Start Rule 9
# 2015-12-31T13:46:39.978184+08:00 192.168.90.205 sshd[5658]: pam_unix(sshd:session): session closed for user oracle
if "syslog" in [tags] and "_grokparsefailure" in [tags] {
grok {
match => {
"message" => [ "(?:%{TIMESTAMP_ISO8601:timestamp}|%{SYSLOGTIMESTAMP:timestamp})%{SPACE}%{HOSTNAME:host_target} sshd\[%{BASE10NUM}]: pam_unix\(sshd:session\): session closed for user %{USERNAME:username}" ]
}
add_tag => [ "ssh_pam_session_closed", "correlation" ]
remove_field => [ "timestamp" ]
}
}
### Start Rule 10
### <38>Sep 4 09:53:42 ns1 sshd[8649]: Did not receive identification string from 78.197.27.48
if "syslog" in [tags] and "_grokparsefailure" in [tags] {
grok {
match => {
"message" => [ "%{SPACE}Did not receive identification string from%{SPACE}%{IP:src_ip}" ]
}
add_tag => [ "ssh_not_receive_identification_string", "correlation" ]
}
}
### Start Rule 11
## <84>Sep 4 11:38:52 rancherserver sshd[5186]: pam_unix(sshd:auth): check pass; user unknown
if "syslog" in [tags] and "_grokparsefailure" in [tags] {
grok {
match => {
"message" => [ "%{SPACE}pam_unix\(sshd:auth\): check pass; user%{SPACE}%{USERNAME:username}" ]
}
add_tag => [ "ssh_check_pass_username", "correlation" ]
}
}
### Start Rule 12
### <38>Sep 4 11:38:47 ns1 sshd[9584]: Invalid user admin from 91.224.160.131
if "syslog" in [tags] and "_grokparsefailure" in [tags] {
grok {
match => {
"message" => [ "%{SPACE}Invalid user%{SPACE}%{USERNAME:username}%{SPACE}from%{SPACE}%{IP:src_ip}" ]
}
add_tag => [ "ssh_invalid_user", "correlation" ]
}
}
### Start Rule 13
## <38>Sep 4 15:41:15 rancherserver sshd[5310]: Disconnected from 220.124.151.130 port 49200 [preauth]
if "syslog" in [tags] and "_grokparsefailure" in [tags] {
grok {
match => {
"message" => [ "%{SPACE}Disconnected from%{SPACE}%{IP:src_ip}%{SPACE}port%{SPACE}%{BASE10NUM:ssh_port}%{SPACE}\[preauth]" ]
}
add_tag => [ "ssh_disconnected_from", "correlation" ]
}
}
##############
# grok ini mesti last
# tujuan nak buang tag _grokparsefailure
#####
if [type] == "syslog" and "_grokparsefailure" in [tags] {
grok {
match => {
"message" => [ "%{GREEDYDATA}" ]
}
remove_tag => [ "_grokparsefailure" ]
remove_field => [ "timestamp" ]
}
}
# end if
}
# end if yang utama yang libatkan sshd
}
### sebab nested kerana mahu bypass filter yang besar
- Restart logstash
$ sudo systemctl restart logstash
Filebeat
Filebeat is a log data shipper for local files. Installed as an agent on your servers, Filebeat monitors the log directories or specific log files, tails the files, and forwards them either to Elasticsearch or Logstash for indexing.
Installation and configuration
To install filebeat, and configure it to send logs to logstash:
- Install from elastic repo
$ sudo apt install filebeat
- Configure the input_type, paths and output to logstash
filebeat.prospectors:
- input_type: log
output.logstash:
hosts: ["elk-1:5044"]
- Restart filebeat
$ sudo systemctl restart filebeat
- Configure logstash to listen on port 5044 for accepting logs from filebeat, do simple grok and output to elasticsearch. Put below config into /etc/logstash/conf.d/filebeat.conf
input {
tcp {
port => 5044
type => "filebeat"
}
}
filter {
if [type] == "filebeat" {
grok {
match => { "message" => "%{SYSLOGTIMESTAMP:timestamp} %{GREEDYDATA:message}" }
}
}
}
output {
if "filebeat" in [type] {
elasticsearch {
hosts => ["elk-1:9200"]
index => "filebeat-%{+YYYY.MM.dd}"
}
}
}
- Restart logstash
$ sudo systemctl restart logstash
- Create index in kibana
Metricbeat
Metricbeat helps you monitor your servers and the services they host by collecting metrics from the operating system and services.
Installation and configuration
- Install metricbeat
$ sudo apt install metricbeat
- Configure /etc/metricbeat/metricbeat.yml as below, to collect cpu, filesystem, memory, network and process, and also output to elasticsearch
metricbeat.modules:
- module: system
metricsets:
- cpu
- filesystem
- memory
- network
- process
enabled: true
period: 10s
processes: ['.*']
cpu_ticks: false
output.elasticsearch:
hosts: ["elk-1:9200"]
template.name: "metricbeat"
template.path: "metricbeat.template.json"
template.overwrite: false
- Test metricbeat
$ sudo metricbeat.sh -configtest -e
- Start metricbeat
$ sudo /etc/init.t/metricbeat start
- See your metric index
$ curl http://elk-1:9200/metricbeat-*/_search?pretty
- View your metric in kibana
Performance improvement
Use nginx to cache request to kibana, and add username and password authentication to kibana
- Install nginx
$ sudo apt install nginx
- Add below to /etc/nginx/sites-available/kibana.conf
server {
listen 80;
server_name 10.0.14.1 localhost;
auth_basic "Restricted Access";
auth_basic_user_file /etc/nginx/htpasswd.users;
location / {
proxy_pass http://localhost:5601;
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection 'upgrade';
proxy_set_header Host $host;
proxy_cache_bypass $http_upgrade;
}
}
- Create symlink to sites-enabled for kibana.conf
$ cd /etc/nginx/sites-enabled; sudo ln -s ../sites-available/kibana.conf
- Create an authentication file
$ sudo touch /etc/nginx/htpasswd.users
- Create a user named kibanaadmin with apr1 encrypted password
$ echo "kibanaadmin:`openssl passwd -apr1 yourpassword` | sudo tee -a /etc/nginx/htpasswd.users
- Check for syntax error
$ sudo nginx -t
- Restart nginx
$ sudo systemctl restart nginx
- Test using curl, showing authentication is required to access
$ curl localhost
<html>
<head><title>401 Authorization Required</title></head>
<body bgcolor="white">
<center><h1>401 Authorization Required</h1></center>
<hr><center>nginx/1.10.0 (Ubuntu)</center>
</body>
</html>
- Login to kibana through port 80, and you will be asked username and password
Use nginx to cache request to elasticsearch
- Put below config to /etc/nginx/conf.d/sites-available/es.conf
server {
listen 8080;
server_name 10.99.4.1;
location / {
proxy_pass http://localhost:9200;
proxy_http_version 1.1;
proxy_set_header Connection "Keep-Alive";
proxy_set_header Proxy-Connection "Keep-Alive";
}
}
- Create symlink in /etc/nginx/conf.d/sites-enabled
- Restart nginx
$ sudo systemctl restart nginx
- Test accessing elastic on port 8080
$ curl localhost:8080
{
"name" : "THNkIG4",
"cluster_name" : "elasticsearch",
"cluster_uuid" : "RQawLJKuQxijXJExfcbTsA",
"version" : {
"number" : "5.1.1",
"build_hash" : "5395e21",
"build_date" : "2016-12-06T12:36:15.409Z",
"build_snapshot" : false,
"lucene_version" : "6.3.0"
},
"tagline" : "You Know, for Search"
}
- Change pointing in kibana.yml to elk-1:8080
$ diff -u {~,/etc/kibana}/kibana.yml
--- /home/sysadmin/kibana.yml 2017-05-26 17:11:05.808059815 +0800
+++ /etc/kibana/kibana.yml 2017-05-26 17:11:22.616632793 +0800
@@ -19,7 +19,7 @@
#server.name: "your-hostname"
# The URL of the Elasticsearch instance to use for all your queries.
-elasticsearch.url: "http://elk-1:9200"
+elasticsearch.url: "http://elk-1:8080"
# When this setting's value is true Kibana uses the hostname specified in the server.host
# setting. When the value of this setting is false, Kibana uses the hostname of the host
- Restart kibana
$ sudo systemctl restart kibana
Use nginx to do a simple round robin load balancer for elasticsearch
- Add upstream stanza to /etc/nginx/sites-available/es.conf
$ sudo diff -u {~,/etc/nginx/sites-available}/es.conf
--- /home/sysadmin/es.conf 2017-05-26 17:14:58.079457994 +0800
+++ /etc/nginx/sites-available/es.conf 2017-05-27 07:27:53.881833373 +0800
@@ -1,3 +1,9 @@
+upstream elasticsearch {
+ server elk-1:9200;
+ server elk-2:9200;
+
+ keepalive 15;
+}
server {
listen 8080;
server_name elk-ubuntu localhost;
- Restart nginx
$ sudo systemctl restart nginx
- Test the load balancer using curl, you will see the name changes every time you run the curl command, showing nginx is load balancing between all elastic clusters
$ curl localhost:8080 | grep name
"name" : "elk-1",
"cluster_name" : "my-cluster",
$ curl localhost:8080 | grep name
"name" : "elk-2",
"cluster_name" : "my-cluster",
Tuning limits.conf for elasticsearch
- Put below into /etc/security/limits.conf
elasticsearch soft memlock unlimited
elasticsearch hard memlock unlimited
elasticsearch - nofile 655350
elasticsearch - memlock unlimited
logstash - nofile 655350
logstash - memlock unlimited
- Logout, and login back in, to activate the changes
Issues:
Already convert it to date. But..
Not detected as date but string
Kibana DevTools | Console
get /_cat/master?help
get /_cat/master?v
get /_cat/master?h=id,ip
get bindlog-2018.07.31/_search
{
"from" : 0, "size" : 100,
"query" : {
"term" : { "clientip" : "203.201.187.194" }
}
sincedb
sincedb is like a store point of which line of log file has been send to elastic/logstash
Kibana
inspect kibana json post. It may contain query to elasticsearch.



Comments
Post a Comment