Published on

ARTS 第40周

Authors
  • Algorithm: 561. Array Partition I, 627. Swap Salary
  • Tips: filebeat+kafka+elk 实现nginx access log 实时监控分析

Algorithm

561. Array Partition I

Given an array of 2n integers, your task is to group these integers into n pairs of integer, say (a1, b1), (a2, b2), ..., (an, bn) which makes sum of min(ai, bi) for all i from 1 to n as large as possible.

Example 1:

Input: [1,4,3,2]

Output: 4
Explanation: n is 2, and the maximum sum of pairs is 4 = min(1, 2) + min(3, 4).

Note:

  1. n is a positive integer, which is in the range of [1, 10000].

  2. All the integers in the array will be in the range of [-10000, 10000].

Solution

package org.nocoder.leetcode.solution;

import java.util.Arrays;

/**
 * 561. Array Partition I
 *
 * https://leetcode.com/problems/array-partition-i/
 *
 * @author jason
 * @date 2019/6/17.
 */
public class ArrayPairSum {
    public static int arrayPairSum(int[] nums){
        Arrays.sort(nums);
        int max = 0;
        for (int i = 0; i < nums.length; i+=2) {
            max += Math.min(nums[i], nums[i+1]);
        }
        return max;
    }

    public static void main(String[] args) {
        int[] arr = new int[]{3, 2, 4, 1};
        System.out.println(arrayPairSum(arr));
    }
}

627. Swap Salary

Given a table salary, such as the one below, that has m=male and f=female values. Swap all f and m values (i.e., change all f values to m and vice versa) with a single update statement and no intermediate temp table.

Note that you must write a single update statement, DO NOT write any select statement for this problem.

Example:

idnamesexsalary
1Am2500
2Bf1500
3Cm5500
4Df500
After running your update statement, the above salary table should have the following rows:
idnamesexsalary
-----------------------
1Af2500
2Bm1500
3Cf5500
4Dm500

Solution

update salary set sex = (case sex when 'm' then 'f' when 'f' then 'm' end)

Tip

filebeat+kafka+elk 实现nginx access log 实时监控分析

本文只记录相关的配置和注意事项,不包含安装步骤

  • filebeat 从日志文件读取日志记录,并输出到kafka

  • kafka 消息队列,起缓冲作用

  • logstash 从kafka读取日志消息,进行日志记录的规则过滤,然后保存到elasticsearch

  • elasticsearch 用来存储日志记录

  • kibana 通过读取elasticsearch中的记录,生成各种视图以实时监控分析访问日志

为了方便分析历史的access日志,我没有对access log 的格式进行修改,默认情况下,nginx的log格式如下:

log_format  main  remote_addr - remote_user [time_local] "request" status body_bytes_sent "http_referer" "http_user_agent" "$http_x_forwarded_for";

例如:

222.211.162.225 - - [14/Jun/2019:17:15:22 +0800] "GET / HTTP/1.1" 304 0 "-" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_4) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/75.0.3770.90 Safari/537.36"

117.136.62.63 - - [14/Jun/2019:17:15:36 +0800] "GET / HTTP/1.1" 304 0 "-" "Mozilla/5.0 (Linux; Android 9; ONEPLUS A6000) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/75.0.3770.67 Mobile Safari/537.36"

117.136.62.63 - - [14/Jun/2019:17:15:56 +0800] "GET /1111 HTTP/1.1" 404 571 "-" "Mozilla/5.0 (Linux; Android 9; ONEPLUS A6000) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/75.0.3770.67 Mobile Safari/537.36"

180.163.220.4 - - [14/Jun/2019:17:21:13 +0800] "GET / HTTP/1.1" 200 612 "-" "Mozilla/5.0 (Linux; U; Android 8.1.0; zh-CN; EML-AL00 Build/HUAWEIEML-AL00) AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/57.0.2987.108 UCBrowser/11.9.4.974 UWS/2.13.1.48 Mobile Safari/537.36 AliApp(DingTalk/4.5.11) com.alibaba.android.rimet/10487439 Channel/227200 language/zh-CN"

配置步骤

  1. 首先配置filebeat的输入输出,添加自定义字段fields.log_topic,为不同的access log 创建不同的topic

     filebeat.prospectors:
    
    - type: log
    
    enabled: true
    
    fields:
    
        log_topic: nginx-access-log
    
    paths:
    
    - /usr/local/nginx/logs/access.log
    
    output.kafka:
    
        hosts: ["127.0.0.1:9092"]
    
        topic: '%{[fields.log_topic]}'
    
        partition.round_robin:
    
            reachable_only: false
    
        required_acks: 1
    
        compression: gzip
    
        max_message_bytes: 100000000
    
  2. 配置 logstash

input: logstash作为消费端从kafka读取记录,指定topic,可以指定多个topic。

filter: 数据经过filter,先将message 解析成json格式,根据自定义的fields.log_topic字段判断是否是对应topic中的消息,然后用grok表达式将message中的各个字段匹配出来,使用access log 中的时间替换@timestamp,然后将message字段移除。

output: 根据自定义的fields.log_topic字段判断是否是对应topic中的消息,如果是,则将数据写入elasticsearch对应的index中。

input {

  kafka{

    bootstrap_servers => "47.106.130.196:9092"

    consumer_threads => 2

    topics => ["nginx-access-log"]

  }

}

filter {

    json {

    	source => "message"

  	}

		if fields == "nginx-access-log" {

      	grok {

      		match => {

                    "message" => "%{IP:remote_addr} -%SER:remote_user} [%{HTTPDATE:time_local}]"%{WORD:method} DATA:request_url} HTTP/{NUMBER:http_version}" %UMBER:status} {NUMBER:body    _bytes_sent}"%ATA:http_referer}" "{DATA:http_user_agent}""

                }

				}

				date {

                match => [ "time_local","dd/MMM/YYYY:HH:mm:ss Z"]

                target => "@timestamp"

				}

				mutate {

                remove_field => ["message"]

            }

        }

    }

output {

        if fields == "nginx-access-log"{

            elasticsearch {

                hosts => ["http://127.0.0.1:9200"]

                index => "nginx-access-log-%{+YYYY.MM.dd}"

            }

        }

}