百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 博客教程 > 正文

基于nginx,kafka,zookeeper的Log收集系统

connygpt 2024-11-21 13:21 5 浏览

项目经历时间:2022年7月-2022年8月

项目人员:3人

项目环境:

9台虚拟机(使用Centos7系统)

第1,2号虚拟机做代理集群,用于做负载均衡和反向代理(nginx/1.20.1)

第3,4,5号虚拟机用做应用集群,提供一个静态页面展(nginx/1.20.1,filebeat)

第6,7,8号虚拟机做消息中间件,基于kafka集群和zookeeper集群(kafka2.12,zookeeper3.6)

第9台虚拟机做mysql数据库来收集数据(MySQL5.7.34)

项目简介:

此项目是用于模拟企业公司进行大工作时产生的大数据进行日志收集,并对其进行清洗,将需要的数据存入数据库中

项目步骤:

1.规划好整个项目的拓扑结构和思维导图,并细分解析每一步需要做的事

2.安装好每个虚拟机需要的环境,模块与软件。并且配置好静态ip,DNS域名解析,修改每台主机名方便区分每台虚拟机的作

3.利用两台虚拟机用作nginx代理集群,配置好keepalive双vip的环境用做负载均衡和高可用

4.利用三台nginx虚拟机来做web页面的静态展示,在etc/nginx/nginx.cof下配置好其端口号,源代码文本路径,访问日志的

保存路径。

5.使用三台虚拟机来搭建kakfa应用集群,用作消息中间件,修改/opt/kafka_2.12-2.8.1/config /server.properties文件来配

置broker,监听端口和zookeeper连接

6.再在kafka基础上搭建zookeeper来管理kafka集群,在/opt/apache-zookeeper-3.6.3-bin/confs文件下,配置相连的

kafka集群,创建/tmp/zookeeper目录,在目录添加myid文件,里面存放的是每台zookeeper的id

7.在web静态页面的三台虚拟机上部署filebeat,来实现读取对应位置的日志,上报到相应的kafka集群上去

8..开启zookeeper和kafka,创建topic和生产者消费者进行测试,检测生产者产生的数据能否被消费者消费

9.编写python脚本,创建消费者并连接MySQL数据库来存放消费的数据。使用了json,requests,time,pymysql模块实现

项目的详细过程:

1.准备环境

1.创建好9台linux虚拟机(Centos7系统)

2.配置好静态ip地址

vim /etc/sysconfig/network-scripts/ifcfg-ens33

3.配置好本地DNS服务器(114.114.114.114)

vim /etc/resolv.conf

4.修改主机名(此方法永久生效)

hostnamectl  set-hostname  +主机名

5.每一台机器上都写好域名解析(后续就可以直接使用主机名操作)

vim  /etc/hosts

6.安装好需要的软件(wget用于获取web的数据,chronyd是时间同步服务)

yum install wget -y
yum install vim -y
yum install chronyd -y

7.关闭防火墙,打开chronyd服务

systemctl start chronyd
systemctl enable chronyd
systemctl stop firewalld
systemctl disable firewalld

8.关闭SELINUX,设置SELINUX=disable

vim /etc/selinux/config

2.搭建nignx

1.安装epel源和nignx服务

yum install epel-release -y
yum install nginx -y

2.启动nginx并设置其开机自启

systemctl start nginx
systemctl enable nginx

3.编辑配置文件

vim /etc/nginx/conf.d/sc.conf

server {
        listen 80 default_server;
        server_name  www.sc.com;

        root         /usr/share/nginx/html;

        access_log  /var/log/nginx/sc/access.log main;

        location  / {

   }
}

4.重启nginx服务

nginx -s reload

3.在nignx上搭建kafka和zookeeper

ps:kafka是一种消息中间件,和其他MQ相比,有着单机10万级高吞吐量,高可用性强,分布式,一个partition多个replica,少数宕机不会丢失数据,一般配合大数据类系统进行实时数据计算,日志分析场景。

broker:kafka的节点。一台服务器相当于一个节点

topic:主题,消息的分类。比如nginx,mysql日志给不同的主题,就是不同的类型。

partition:分区。提高吞吐量,提高并发性。(多个partition会导致消息顺序混乱,如果对消息顺序有要求就只设置一个partition就可以了)

replica: 副本。完整的分区备份。

zookeeper是一种分布式应用协调管理服务,具有配置管理,域名管理,分布式数据存储,集群管理等功能,在本次项目中用于对kafka集群进行配置(topic,partition,replica等)管理

1.安装基础软件

yum install java wget  -y

wget   https://mirrors.bfsu.edu.cn/apache/kafka/2.8.1/kafka_2.12-2.8.1.tgz 

tar  xf  kafka_2.12-2.8.1.tgz

wget   https://mirrors.bfsu.edu.cn/apache/zookeeper/zookeeper-3.6.3/apache-zookeeper-3.6.3-bin.tar.gz

tar  xf  apache-zookeeper-3.6.3-bin.tar.gz

2.搭建kafka

vim /opt/kafka_2.12-2.8.1/config /server.properties

修改以下代码

broker.id=1(第x台这里就填x)

listeners=PLAINTEXT://nginx-kafka01(主机名):9092

zookeeper.connect=192.168.127.128:2181,192.168.127.133:2181,192.168.127.134:2181(三台虚拟机的IP)

3.在kafka基础上搭建zookeeper

cd /opt/apache-zookeeper-3.6.3-bin/confs
cp zoo_sample.cfg zoo.cfg
#修改zoo.cfg, 添加如下三行:
server.1=192.168.127.128:3888:4888
server.2=192.168.127.133:3888:4888
server.3=192.168.127.134:3888:4888

4.创建/tmp/zookeeper目录 ,在目录中添加myid文件

第一台机器上

echo 1 > /tmp/zookeeper/myid

以此类推

5.搭建完成后准备启动服务(注意:开启zookeeper和kafka的时候,一定是先启动zookeeper,再启动kafka;关闭服务的时候,kafka先关闭,再关闭zookeeper)

zookeeper启动

bin/zkServer.sh start

kafka启动

bin/kafka-server-start.sh -daemon config/server.properties

查看zookeeper是否成功管理kafka

cd /opt/apache-zookeeper-3.6.3-bin
cd bin
./zkCli.sh

正确的显示如下:

6.创建topic

bin/kafka-topics.sh --create --zookeeper 192.168.127.128:2181 --replication-factor 3 --partitions 3 --topic sc
 
bin/kafka-topics.sh --list --zookeeper 192.168.127.128:2181

7.创建生产者消费者测试消息中间件是否能正常运行

#创建生产者
bin/kafka-console-producer.sh --broker-list 192.168.127.128:9092 --topic sc
#创建消费者    
bin/kafka-console-consumer.sh --bootstrap-server 192.168.127.134:9092 --topic sc --from-beginning

成功则如下:

4.部署filebeat

1.安装依赖包

rpm --import https://packages.elastic.co/GPG-KEY-elasticsearch

2.vim /etc/yum.repos.d/fb.repo

[elastic-7.x]
name=Elastic repository for 7.x packages
baseurl=https://artifacts.elastic.co/packages/7.x/yum
gpgcheck=1
gpgkey=https://artifacts.elastic.co/GPG-KEY-elasticsearch
enabled=1
autorefresh=1
type=rpm-md

3.安装filebeat

yum install filebeat -y

4.vim /etc/filebeat/filebeat.yml

filebeat.inputs:
- type: log
  # Change to true to enable this input configuration.
  enabled: true
  # Paths that should be crawled and fetched. Glob based paths.
  paths:
    - /var/log/nginx/sc/access.log 
#==========------------------------------kafka-----------------------------------
output.kafka:
  hosts: ["192.168.1.213:9092","192.168.1.214:9092","192.168.1.215:9092"]
  topic: nginxlog
  keep_alive: 10s

5.启动filebeat并设置开机自启

systemctl start filebeat
systemctl enable filebeat

启动成功如下:

5.编写python脚本,创建消费者并连接MySQL数据库来存放消费的数据

import json
import requests
import time
import pymysql
 
#连接数据库
db = pymysql.connect(
    host = "192.168.127.128",      #mysql主机ip
    user = "sc",              #用户名
    passwd = "123456",            #密码
    database = "nginx"                 #数据库
)
 
taobao_url = "https://ip.taobao.com/outGetIpInfo?accessKey=alibaba-inc&ip="
#查询ip地址的信息(省份和运营商isp),通过taobao网的接口
def resolv_ip(ip):
    response = requests.get(taobao_url+ip)
    if response.status_code == 200:
       tmp_dict = json.loads(response.text)
       prov = tmp_dict["data"]["region"]
       isp = tmp_dict["data"]["isp"]
       return prov,isp
    return None,None
 
#将日志里读取的格式转换为我们指定的格式
def trans_time(dt):
     #把字符串转成时间格式
    timeArray = time.strptime(dt, "%d/%b/%Y:%H:%M:%S")
    #timeStamp = int(time.mktime(timeArray))
    #把时间格式转成字符串
    new_time = time.strftime("%Y-%m-%d %H:%M:%S", timeArray)    
    return new_time
 
#从kafka里获取数据,清洗为我们需要的ip,时间,带宽
from pykafka import KafkaClient
client = KafkaClient(hosts="192.168.127.128:9092,192.168.127.133:9092,192.168.127.134:9092")
topic = client.topics['nginxlog'] 
balanced_consumer = topic.get_balanced_consumer(
  consumer_group='testgroup',
    #自动提交offset
  auto_commit_enable=True,    
  zookeeper_connect='nginx-kafka01:2181,nginx-kafka02:2181,nginx-kafka03:2181'
) 
#consumer = topic.get_simple_consumer()
i = 1
for message in balanced_consumer:
   if message is not None: 
       line = json.loads(message.value.decode("utf-8"))
       log = line["message"]
       tmp_lst = log.split()
       ip = tmp_lst[0]
       dt = tmp_lst[3].replace("[","")
       bt = tmp_lst[9]
       dt = trans_time(dt)
       prov, isp = resolv_ip(ip)
       if prov and isp:
          print(dt,prov,isp,bt)
 
          cursor = db.cursor()
          try:
            cursor.execute(f"insert into mynginxlog values({i},{dt},'{prov}','{isp}',{bt})")
            db.commit()
            i += 1
          except Exception as e:
              print("插入失败",e)
              db.rollback()
 
# create table mynginxlog(
# id int primary key auto_increment,
# dt datetime not null,
# prov varchar(20),
# isp varchar(20),
# bd float
# )charset=utf8;
 
#关闭数据库
db.close()

得到效果如下:


6.项目心得

由于经验不足,遇到的问题还是比较多的,比如nginx服务器有时没有全部开全导致消费者消费失败,编写python脚本也比较

容易出现失误导致程序没法正确运行等。

但通过这次项目还是收获了许多:

1.因为有提前写好拓扑结构和思维导图使得整个流程还是较为顺利,出现错误也能够较快发现错误发生的地方。

2.更加的深入的了解了kafka和zookeeper的原理。

3.了解到了许多新的知识,zookeeper的脑裂,nginx的代理集群可利用keepalive做负载均衡和高可用,zookeeper的选举方

式等

4.提高了团队交流和协作能力,遇到问题大家能一起想方式。

5.提高了自主学习的能力

相关推荐

3分钟让你的项目支持AI问答模块,完全开源!

hello,大家好,我是徐小夕。之前和大家分享了很多可视化,零代码和前端工程化的最佳实践,今天继续分享一下最近开源的Next-Admin的最新更新。最近对这个项目做了一些优化,并集成了大家比较关注...

干货|程序员的副业挂,12个平台分享

1、D2adminD2Admin是一个完全开源免费的企业中后台产品前端集成方案,使用最新的前端技术栈,小于60kb的本地首屏js加载,已经做好大部分项目前期准备工作,并且带有大量示例代码,助...

Github标星超200K,这10个可视化面板你知道几个

在Github上有很多开源免费的后台控制面板可以选择,但是哪些才是最好、最受欢迎的可视化控制面板呢?今天就和大家推荐Github上10个好看又流行的可视化面板:1.AdminLTEAdminLTE是...

开箱即用的炫酷中后台前端开源框架第二篇

#头条创作挑战赛#1、SoybeanAdmin(1)介绍:SoybeanAdmin是一个基于Vue3、Vite3、TypeScript、NaiveUI、Pinia和UnoCSS的清新优...

搭建React+AntDeign的开发环境和框架

搭建React+AntDeign的开发环境和框架随着前端技术的不断发展,React和AntDesign已经成为越来越多Web应用程序的首选开发框架。React是一个用于构建用户界面的JavaScrip...

基于.NET 5实现的开源通用权限管理平台

??大家好,我是为广大程序员兄弟操碎了心的小编,每天推荐一个小工具/源码,装满你的收藏夹,每天分享一个小技巧,让你轻松节省开发效率,实现不加班不熬夜不掉头发,是我的目标!??今天小编推荐一款基于.NE...

StreamPark - 大数据流计算引擎

使用Docker完成StreamPark的部署??1.基于h2和docker-compose进行StreamPark部署wgethttps://raw.githubusercontent.com/a...

教你使用UmiJS框架开发React

1、什么是Umi.js?umi,中文可发音为乌米,是一个可插拔的企业级react应用框架。你可以将它简单地理解为一个专注性能的类next.js前端框架,并通过约定、自动生成和解析代码等方式来辅助...

简单在线流程图工具在用例设计中的运用

敏捷模式下,测试团队的用例逐渐简化以适应快速的发版节奏,大家很早就开始运用思维导图工具比如xmind来编写测试方法、测试点。如今不少已经不少利用开源的思维导图组件(如百度脑图...)来构建测试测试...

【开源分享】神奇的大数据实时平台框架,让Flink&Spark开发更简单

这是一个神奇的框架,让Flink|Spark开发更简单,一站式大数据实时平台!他就是StreamX!什么是StreamX大数据技术如今发展的如火如荼,已经呈现百花齐放欣欣向荣的景象,实时处理流域...

聊聊规则引擎的调研及实现全过程

摘要本期主要以规则引擎业务实现为例,陈述在陌生业务前如何进行业务深入、调研、技术选型、设计及实现全过程分析,如果你对规则引擎不感冒、也可以从中了解一些抽象实现过程。诉求从硬件采集到的数据提供的形式多种...

【开源推荐】Diboot 2.0.5 发布,自动化开发助理

一、前言Diboot2.0.5版本已于近日发布,在此次发布中,我们新增了file-starter组件,完善了iam-starter组件,对core核心进行了相关优化,让devtools也支持对IAM...

微软推出Copilot Actions,使用人工智能自动执行重复性任务

IT之家11月19日消息,微软在今天举办的Ignite大会上宣布了一系列新功能,旨在进一步提升Microsoft365Copilot的智能化水平。其中最引人注目的是Copilot...

Electron 使用Selenium和WebDriver

本节我们来学习如何在Electron下使用Selenium和WebDriver。SeleniumSelenium是ThoughtWorks提供的一个强大的基于浏览器的开源自动化测试工具...

Quick 'n Easy Web Builder 11.1.0设计和构建功能齐全的网页的工具

一个实用而有效的应用程序,能够让您轻松构建、创建和设计个人的HTML网站。Quick'nEasyWebBuilder是一款全面且轻巧的软件,为用户提供了一种简单的方式来创建、编辑...