基于nginx+flume+kafka+mongodb实现埋点数据采集

PoeIngram 发布于5月前

名词解释

埋点其实就是用于记录用户在页面的一些操作行为。例如,用户访问页面(PV,Page Views)、访问页面用户数量(UV,User Views)、页面停留、按钮点击、文件下载等,这些都属于用户的操作行为。

开发背景

我司之前在处理埋点数据采集时,模式很简单,当用户操作页面控件时,前端监听到操作事件,并根据上下文环境,将事件相关的数据通过接口调用发送至埋点数据采集服务(简称ets服务),ets服务对数据解析处理后,做入库操作。

流程图如下

基于nginx+flume+kafka+mongodb实现埋点数据采集

这种方式其实没毛病,有如下优势:

(1) 开发难度低,前后端开发人员约定好好接口、参数、模型等即可。

(2) 系统部署容易,排查问题难度低。

但是仔细想想,这种模式弊端也很多:

(1) 开发工作量大。例如,这次有个记录用户点击支付按钮的埋点,前端需要在支付的前端代码中插入一段埋点的代码,后端可能也需要改;后面又来了一个记录用户点击加入购物车按钮的埋点,前端又需要在加入购物车代码中插入一段埋点的代码…… 后续,只要有新的埋点需求,前端就得加代码或者改代码。

(2) 前端埋点接口调用过多影响性能。有可能访问一次页面,或者点击一次连接,要调用十几个埋点接口,而这些接口调用的结果,并不是用户所关心的内容。这样频繁的接口调用,对后端服务也构成了压力。

(3) 增加调试成本。每一次新的埋点需求,都需要前后端联调,沟通、参数、模型等,增加了调试时间。

设计目标

针对老的埋点系统的弊端,需要做重构,达成以下目标:

(1) 埋点数据收集自动化、实时收集;

(2) 减少前端开发工作量;

(3) 减少前后端联调工作量;

(4) 减少前端埋点相关代码。

软件、硬件依赖

请求日志记录

Nginx

日志收集程序

Flume

消息中间件

kakfa+zookeeper

后端日志数据处理服务

ets埋点数据采集微服务,采用springcloud stream技术。

缓存

redis

数据仓库

MongoDB

系统流程图

基于nginx+flume+kafka+mongodb实现埋点数据采集

系统逻辑

(1) 客户端发送请求,通过Nginx进行请求转发,Nginx以json格式记录请求参数等信息至access.log;

(2) flume实时监控Nginx日志变化,收集并过滤有用日志,发送至kafka;

(3) ets服务作为消息消费者,监听kafka topic消息,收到消息后,对日志消息解析处理,确定日志数据存储集合;

(4) 解析token获取用户信息;

(5) 将转化好的日志数据入库。

系统说明

用户的大部分操作行为,都对应着一个URL后端请求或者前端请求。而这些请求必然会经过nginx进行请求转发,nginx日志会记录每一个请求的信息。

通过对nginx日志的实时监控、采集、入库,我们将用户行为数据标准化入库,从而替代了老式的前端调用埋点接口的方式,减少了开发量,减少了埋点请求数量。

========================下面为具体实现细节=======================

nginx日志JSON格式配置

logformat按照如下格式配置,日志会打印为json格式,便于flume和ets服务做数据解析。

重要的配置含义如下:

配置 含义
$remote_addr 请求IP地址
$time_local 请求时间
$uri 请求uri,例如:/order/api/order/detail
$request_method 请求方式,GET/POST/PUT等
$request_id 请求唯一ID,nginx自动生成
$status 请求状态,400/401/500等
$body bytes sent 请求body大小
$request_body 请求body
$args 请求参数,例如:orderId=xxxxx&username=xxxx
$http_referer 访问来源页面URL
$http user agent 客户端浏览器信息
$http Request Source 自定义header,记录请求来源
$http Backend Request 自定义header,是否后端请求
$http_Authorization token

注:

1.escape=json 防止乱码

2."source":"$ http request source" 记录访问来源(用来区分不同的应用程序),前端需在http header追加request_source参数

例如我司开发的两款应用:

单分享APP 1
单分享小程序 2

3."token":"$http_Authorization" 记录token

日志效果:

POST:

{

 "ipaddress": "192.168.8.1",

 "remote_user": "",

 "time_local": "26/Apr/2020:13:51:49 +0800",

 "request": "POST /order/api/zz/zz/zzHTTP/1.1",

 "request_uri": "/order/api/zz/zz/zz",

 "uri_with_args": "/order/api/zz/zz/zz",

 "request_method": "POST",

 "request_id": "123456",

 "status": "200",

 "body_bytes_sent": "388",

 "request_body": "{\"merchantCode\":\"123455",\"orderProductList\":[{\"skuId\":\"12345\",\"quantity\":1}],\"shippingMethod\":1}",

 "args": "",

 "http_referer": "https://s.h.com/hh/b2b/order/zzz",

 "http_user_agent": "Mozilla/5.0 (Linux; Android 9; MHA-AL00 Build/HUAWEIMHA-AL00; wv) AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/79.0.3945.116 Mobile Safari/537.36 agentweb/4.0.2 UCBrowser/11.6.4.950",

 "http_x_forwarded_for": "",

 "request_source": "",

 "backend_request": "",

 "token": "Bearer token token", 

 "header-buz-params": ""

}

flume配置

通过flume来探测和收集nginx日志,并发送至kafka topic

配置日志拦截器

拦截器的作用是,过滤无用的nginx日志,筛选出有用的nginx日志(与埋点相关的日志)以发送至kafka

拦截器逻辑

public class MyFlumeInterceptor implements Interceptor {
  @Override
  public void initialize() {}

  // 单个事件拦截
  @Override
  public Event intercept(Event event) {
    String line = new String(event.getBody(), Charset.forName("UTF-8"));

    try {
      Object parse = JSON.parse(line);
      Map map=(Map)parse;
      Boolean backendRequest = MapUtils.getBoolean(map, "backend_request");
      if(!backendRequest){
        return null;
      }
    }
    catch (Exception e) {
      return null;
    }
    return event;
  }

  // 批量事件拦截
  @Override
  public List<Event> intercept(List<Event> events) {
    List<Event> out = Lists.newArrayList();
    Iterator it = events.iterator();
    while (it.hasNext()) {
      Event event = (Event) it.next();
      Event outEvent = this.intercept(event);
      if (outEvent != null) {
        out.add(outEvent);
      }
    }
    return out;
  }

  @Override
  public void close() {
  }

  public static class Builder implements Interceptor.Builder {
    @Override
    public Interceptor build() {
      return new MyFlumeInterceptor();
    }

    @Override
    public void configure(Context context) {
    }
  }
}

拦截器打包

执行maven package,将拦截器项目打包后放入flume安装目录lib目录下

配置日志收集探测和发送

配置日志来源、kafka地址、kafka topic、channel

a1.sources = r1
a1.sinks = k1
a1.channels = c1

# source配置 
a1.sources.r1.type = exec

# 监控多个日志文件,-f 参数后指定一组文件即可 
a1.sources.r1.command = tail -F /data/logs/xxx/xxx/xxx/cloud-gateway1.log /data/logs/xxx/xxx/xxx/cloud-gateway2.log /data/logs/xxx/xxx/xxx/cloud-gateway3.log
 
# sink配置 kafka
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.topic = nginx_log_topic
a1.sinks.k1.brokerList = 192.168.xx.xx:9092
a1.sinks.k1.requiredAcks = 1
a1.sinks.k1.batchSize = 20

# channel配置 
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# sink、channel绑定配置 
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

修改flume源码

目的:解决日志过长无法发送问题

注:如果日志不是很长,可不必修改

日志长度超过2048时,日志无法发送到kafka,原因是flume对日志长度做了限制

LineDeserializer类有个常量MAXLINE_DELT=2048,单行日志超出该长度时发送失败

解决办法:

修改源码,把2048改成2048*10,重新打包并放入lib文件夹下,重启flume

zookeeper集群配置

关于zookeeper集群的配置,网上有很多参考资料。这里作简要说明

配置zoo.cfg

tickTime=2000
initLimit=10
syncLimit=5
#快照日志的存储路径
dataDir=/home/data/zookeeper
dataLogDir=/data/logs/zookeeper
clientPort=2181
server.1=192.168.xx.xx:2888:3888
server.2=192.168.xx.xx:2888:3888
server.3=192.168.xx.xx:2888:3888

#server.1 这个1是服务器的标识也可以是其他的数字, 表示这个是第几号服务器,用来标识服务器,这个标识要写到快照目录下面myid文件里

#192.168.xx.xx为集群里的IP地址,第一个端口是master和slave之间的通信端口,默认是2888,第二个端口是leader选举的端口,集群刚启动的时候选举或者leader挂掉之后进行新的选举的端口默认是3888

创建myid文件

在241,242,243机器上,分别创建3个myid文件,相当于标记自己的ID:

#server1
echo "1" > /home/data/zookeeper/myid
#server2
echo "2" > /home/data/zookeeper/myid
#server3
echo "3" > /home/data/zookeeper/myid

kafka集群配置

关于kafka集群的配置,网上资料也很多,这里做简要说明

### server.properties

broker.id=3
listeners=PLAINTEXT://192.168.20.243:9092
advertised.host.name=192.168.20.243
advertised.port=9092
num.network.threads=3
# The number of threads that the server uses for processing requests, which may include disk I/O
num.io.threads=8
# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=102400
# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=102400
# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600

############################# Log Basics #############################
# A comma separated list of directories under which to store log files
log.dirs=/data/logs/kafka
num.partitions=2
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1

# The minimum age of a log file to be eligible for deletion due to age
# log.retention.hours=168
log.retention.minutes=10
log.cleanup.policy=delete
log.cleaner.enable=true
log.segment.delete.delay.ms=0
log.segment.bytes=1073741824

log.retention.check.interval.ms=300000
#设置zookeeper的连接端口
zookeeper.connect=192.168.xx.xxx:2181

# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0
#配置kafka集群地址列表
broker.list=192.168.xx.xx:9092
producer.type=async

设置消息生命周期

目的:kafka topic中的日志消息被ets服务消费后,就没什么用处了,这时候需要即时清理掉,防止消息积压,占用内存

server.properties增加数据清理配置:
log.retention.minutes=10 数据最多保存10分钟。
log.cleanup.policy=delete 日志清理策略:删除
log.cleaner.enable=true 开启消息日志清理

以上配置是全局配置

单独对某个topic消息设置有效期:

./kafka-configs.sh --zookeeper localhost:2181 --alter --entity-name mytopic --entity-type topics --add-config retention.ms=86400000

用户行为数据采集服务

以下只列出与kafka相关的配置。其他配置由于保密问题不作列出,参考springcloud相关技术文档

Application.yml配置

配置bingdings、kafka topic

server:
  port: 37000

spring:
  cloud:
    stream:
      bindings:
        nginx_kafka_log_input:
          destination: nginx_log_topic
          group: s1
      default-binder: kafka

配置中心配置

将kafka公用配置配置到springcloud config配置中心

spring:
  kafka:
    bootstrap-servers: 192.168.xx.xx:9092,192.168.xx.xx:9092,192.168.xx.xx:9092
    auto-create-topics: true
    enable-auto-commit: true
    auto-commit-interval: 20000
    key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

定义消息通道

public interface NginxLogMessageChannel {
    /**
     * 接收消息通道名称
     */
    String NGINX_KAFKA_LOG_INPUT = "nginx_kafka_log_input";

    /**
     *接收消息通道
     */
    @Input(NGINX_KAFKA_LOG_INPUT)
    MessageChannel recieveLogMessageChannel();
}

消息订阅

监听kafka topic的nginx日志消息,解析并存入MongoDB

@EnableBinding(value = NginxLogMessageChannel.class)
@Component
@Slf4j
public class NginxLogMessageListener {
    private static final ThreadPoolExecutor EXECUTOR = new ThreadPoolExecutor(4, 8, 0, TimeUnit.SECONDS, new ArrayBlockingQueue<>(100), new ThreadPoolExecutor.CallerRunsPolicy());

    @StreamListener(NginxLogMessageChannel.NGINX_KAFKA_LOG_INPUT)
    public void receiveLog(Message<String> message) {
        EXECUTOR.execute(() -> {
            try {
               //日志消息解析、入库逻辑…………………..
            }
            catch (Exception e) {
              log.error("解析nginx请求日志时出错", e);
            }
        });
    }
}

消息重复消费问题

如果消费者消费消息的时间大于最大心跳持续时间(例如网络波动、服务器压力大 等),kafka会默认这个消费者已经挂掉了,kafka会协调其他分区的消费者再去消费此消息。

但其实该消费者没挂掉,还在消费消息,等他消费完成后,kafka新协调的消费者也消费了这条消息,会导致消息重复消费

解决方案:

对已消费的消息,将其request_id存入redis,并设置失效时间;在消费消息前,先去redis查询该id是否已经消费过

MongoDB基本模型设计

列名 含义 实例
requestId 请求唯一ID a2e03237eee1
remoteUser 提供基本身份验证用户名
remoteIp 用户IP地址
requestUri 请求URI
requestMethod 请求类型,post、get等
userId 用户ID
username 用户名
merchantCode 发送该请求的商户CODE
companyCode 发送该请求的企业code
requestSource 请求来源 1.APP 2.微信小程序
userAgent 用户终端浏览器等信息 Mozilla/4.0(compatible;MSIE 8.0;WindowsNT5.1;Trident/4.0; SV1; GTB7.0; .NET4.0C
requestStatus 请求状态 200、500
httpReferer 访问来源页面URL
requestTime 请求时间
mapParams 请求参数。此列为对象类型,包含了post请求的body参数和url参数以及url占位符参数
mapParams.orderId 例子
mapParams.userId 例子
mapParams.productId 例子
listParams 数组类型请求参数。此列为数组类型
stringParams 字符串类型参数。适用于请求参数只有字符串的请求
addtionalParams 请求头附加参数。此列为对象类型,保存header里的请求业务附加参数。
addtionalParams.shareId 例子

Redis 数据配置

URI-集合映射关系

在Redis中,配置与埋点相关的uri和该uri请求日志存储的集合的映射关系。

key为 hosjoy-hbp-ets: url-collection-map: ,数据类型为hash类型。存储结构为:hashkey: uri+ "-"+请求方式+"-"+请求来源 hashvalue: 集合名

例如: hashkey: order/api/app/xx/xx-POST-1 hashvalue:t app log

如果有多个集合,则用逗号分开

可变URI配置

带有URL占位符的URL配置形式如下:/order/api/xx/{id}/detail-GET-1

系统最终效果

用户通过APP访问某商品的商品详情页,我们根据该动作的请求URI、请求方式、请求来源去redis获取配置的mongo集合,然后去mongo中查看该动作的埋点数据

JSON数据样例如下:

{
    _id: ObjectId("5ea2580eda1591218c72d1a4"),
    requestId: "1cdd5881421a3353f36304949bbcab41",
    requestUri: "/product/api/xx/xx",
    requestMethod: "GET",
    requestSource: "2",
    requestStatus: "200",
    userId: "1234",
    username: "123456789",   
    remoteIp: "xx.xx.xx.xx",
    remoteUser: "",
    httpReferer: "https://xx.com/xx",
    requestTime: ISODate("2020-04-24T03:07:58.742Z"),
    userAgent: "Mozilla/5.0 (Linux; Android 9; MHA-AL00 Build/HUAWEIMHA-AL00; wv) AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/79.0.3945.116 Mobile Safari/537.36 MicroMessenger/7.0.13.1640(0x27000D39) Process/appbrand0 NetType/4G Language/zh_CN ABI/arm64 WeChat/arm64",
    mapParams: {
        merchantCode: "123",
        id: "234"
    }
}

可以看出,通过监控nginx日志并记录用户行为动作的用户信息、时间、访问来源、浏览器信息、请求参数等,可以方便的做用户行为数据分析,而无需前端再去编写埋点相关代码,实现了自动、实时的采集用户行为数据。

后续做统计也很方便, 例如,现在需要统计2020年4月20日至2020年4月25日之间,访问来源为APP(request_source为2)的某商品的商品详情页访问uv:(以下为伪代码,主要便于理解,具体统计时以mongo语法为准):

select count(1) as UV from (
    select distinct userId from t_test_log
    where requestUri=” /product/api/xx/xx” 
    and requestMethod=”GET” 
    and requestSource=”2” 
    and requestStatus=”200” 
    and mapParams.id=”输入商品ID” 
    and requestTime>2020-04-20 
    and requestTime<2020-04-25
)

查看原文: 基于nginx+flume+kafka+mongodb实现埋点数据采集

  • bluecat
  • tinycat
  • 雨季的水
  • JiangHanXin
  • 雨季的水