原理介绍及安装部署,js_脚本之家

Egg.js : 基于KOA2的协作社级框架

卡夫卡 原理介绍及安装配置

标签:kafka 安装


卡夫卡:高吞吐量的布满式发布订阅信息系统

简介

卡夫卡 是 Linkedin 于 二零零六 年 四月份开源的音信系统,它主要用于拍卖活跃的流式数据,包蕴网址的点击量、顾客访问或探究的从头到尾的经过等。

卡夫卡 是叁个轻量级的/分布式的/具有 replication
工夫的日志搜集组件,常常被并入到利用系列中,搜聚“客户作为日志”等,并能够利用种种花费终端(consumer卡塔尔国将音讯转存到
HDFS 等其余布局化数据存款和储蓄系统中。

卡夫卡 的功力相同于缓存,即活跃的数据和离线管理体系里面包车型客车缓存。

本小说将集成egg + kafka + mysql 的日记系统例子

特性

高吞吐率:固然在平凡的节点上每分钟也能管理成都百货上千的 message。

显式布满式:即怀有的 producer、broker 和 consumer
都会有四个,均为布满式的。

轻松扩张:能够由三个节点扩充至数千个节点,无需结束集群。

系统须要:日志记录,通过kafka进行音讯队列调整

动用处境

此地消费者和劳动者都由日志系统提供

Messaging

对此有个别例行的音讯系统,kafka 是个准确的抉择。卡夫卡 的
partitons/replication 和容错,使其有着杰出的扩大性和品质优势。

而是,kafka 并不曾提供 JMS
中的“事务性”、“音信传输承保(消息确认机制卡塔尔”、“新闻分组”等公司级本性。kafka
只可以利用作为”常规”的音讯系统,在自然水准上,尚未确定保障音信的发送与吸收接纳相对可相信(举例,音信重发,新闻发送错失等卡塔尔国。

λ.1 意况准备

Websit activity tracking

卡夫卡 能够当作“网址活性追踪”的一级工具,能够将网页/客户操作等消息发送到
kafka 中,并展开实时监控,大概离线计算深入分析等。

①Kafka

Log Aggregation

kafka
的表征决定它特别切合营为“日志搜聚中央”,应用程序能够将操作日志“批量”“异步”的发送到
kafka 集群中,实际不是保存在本土或许数据库中。

Kafka能够批量付给音信/压缩音讯等,那对劳动者来说,差相当的少感到不到质量的开辟。当时消费者能够应用
Hadoop 等任何系统化的积累和深入分析系统。

官方网站下载kafka后,解压

原理布局

启动zookeeper:

原理

Kafka的统筹初志是期望做为一个联合的音信征集平台,能够实时的网罗报告音讯,并索要能够帮忙非常大的数据量,且独具非凡的容错技艺。

卡夫卡 使用文件存款和储蓄讯息(append only log卡塔尔(قطر‎,那就径直调控了 kafka
在品质上严重信任文件系统的自家特性。为了减弱磁盘写入的次数,broker
会将消息一时缓存起来,当音信的个数(或尺寸State of Qatar到达自然阀值时,再一同刷新到磁盘,那样会降价扣磁盘
IO 调用的次数。

Producer 将会和 topic 下有所 partition leader 保持 socket 连接。音讯由
producer 直接通过 socket 发送到 broker,中间不会通过其余“路由层”。

实际上,音讯被路由到哪些 partition 上,由 producer
客商端决定,默许方式为“轮询”。

Consumer 端向 broker 发送 “fetch” 央浼,并报告其得到音讯的 offset,今后consumer 将会博得一定条数的消息。Consumer 端也得以重新初始化 offset
来再度花费音信。

卡夫卡 将种种 partition 数据复制到八个 server 上,任何叁个 partition
都有贰个 leader 和多个 follower (可以未有卡塔尔(قطر‎。

www.js8331.com,备份的个数能够透过 broker 配置文件来设定,当中 leader
处理全体的读写伏乞,follower 须求和 leader 保持同步。

当 leader 失效时,需在 followers 中另行选用出新的 leader,恐怕那个时候follower 落后于 leader,因而必要采纳二个 “up-to-date” 的 follower。选择follower 时须求统筹多个标题,就是新的 leader server 上所早就承载的
partition leader 的个数,假诺二个 server 上有过多的 partition
leader,意味着此 server 将经受着更加多的 IO 压力,由此在公投新 leader
时,必要思索到“负载均衡”。

Kafka 中享有的 topic 内容都以以日记的方式积累在 broker 上。如若三个topic 的称谓为 “my_topic”,它有 2 个 partitions,那么日志将会保留在
my_topic_0 和 my_topic_1 多个目录中。

日志文件中保留了一体系 “log entries” (日志条约卡塔尔,各样 log entry
格式为“4个字节的数字 N 表示新闻的尺寸” + “N
个字节的新闻内容”。各类日志都有叁个 offset 来独一的标识一条消息,offset
的值为8个字节的数字,表示此新闻在这里 partition 中所处的初叶地点。

bin/zookeeper-server-start.sh config/zookeeper.properties

配置结构

卡夫卡 集群、producer 和 consumer 都依据于 zookeeper
来保证系统的可用性,保存一些元数据音信。

kafka 集群大约不须要维护其余 consumer 和 producer 状态新闻,这一个音信由
zookeeper 保存,因而 producer 和 consumer
的顾客端实现丰裕轻量级,它们能够随便离开,而不会对集群产生额外的影响。

Producer 端使用 zookeeper 用来开掘 broker 列表,以至和 Topic 下每一个partition leader 创建 socket 连接并发送音信。

Broker 端使用 zookeeper 用来注册 broker 消息,监测 partition leader
存活性。

Consumer 端使用 zookeeper 用来注册 consumer 音讯,当中包涵 consumer
花费的 partition 列表等,同期也用来发掘 broker 列表,并和 partition
leader 建构 socket 连接,获取新闻。

启动Kafka server

安装配置

这里config/server.properties中校num.partitions=5,大家设置5个partitions

安装

卡夫卡 的设置比较轻便,只须求保险 zookeeper 集群运营寻常化,并计划好
server.properties 文件就能够。

改革配置文件中的以下几项,并保管在各节点上保持一致:

broker.id=0     //该属性的值要保证各个节点之间不能重复,该值可以为随意的整数
port=9092
log.dirs=/opt/kafka-0.8.2/data
zookeeper.connect=localhost:2181    //此处需要修改成使用的 zookeeper 集群的信息,逗号分隔
bin/kafka-server-start.sh config/server.properties

启动

保险 zookeeper 集群符合规律运维,然后在各样节点上实践以下命令,运维进度:

/opt/kafka-0.8.2/bin/kafka-server-start.sh /opt/kafka-0.8.2/config/server.properties &

② egg + mysql

验证

能够使用 kafka 自带的 producer 和 consumer 来证实集群是或不是能健康办事。

运用 bin 目录下的 kafka-console-consumer.sh 和 kafka-console-producer.sh
脚本能够运转 consumer 和 producer 客商端。

  1. 进去 kafka 的设置目录,实行以下命令(倘若 zookeeper
    集群消息为:server1:2181,server2:2181,server3:2181),创造叁个名字为“my_topic”的topic:

bin/kafka-topics.sh --create --zookeeper server1:2181,server2:2181,server3:2181 --replication-factor 1 --partitions 1 --topic my_topic
  1. 开发银行三个 producer,将音讯发送到 “my_topic”,奉行以下命令(即使kafka 集群新闻为:server1:9092,server2:9092,server3:9092):

bin/kafka-console-producer.sh --borker-list server1:9092,server2:9092,server3:9092 --topic my_topic
  1. 输入以下消息:

This is a message.
This is another message.
  1. 在集群中的另一个节点上,步入 kafka 的设置目录,然后运行三个consumer,订阅 “my_topic” 的消息,实践以下命令:

bin/kafka-console-consumer.sh --zookeeper server1:2181,server2:2181,server3:2181 --topic my_topic --from-beginning
  1. 下一场能够看到终端上输出以下内容,注解集群能够平常使用:

This is a message.
This is another message.

据书上说脚手架搭建好egg,再多安装kafka-node,egg-mysql

API

mysql 用户名root 密码123456

Producer

0.8 早先版本的 Procuder API 有三种:kafka.producer.SyncProducer 和
kafka.producer.async.AsyncProducer.它们都达成了同四个接口。

0.8 现在的新本子 Producer API 提供了以下效率:

  • 能够将多少个音讯缓存到地面队列里,然后异步的批量发送到
    broker,能够通过参数 producer.type=async 做到。

  • 谐和编辑 Encoder 来连串化音信,只需兑现下边那几个接口。私下认可的 Encoder
    是 kafka.serializer.DefaultEncoder。

  • 提供了依据 Zookeeper 的 broker 自动感知本事,能够由此参数 zk.connect
    完毕。借使不应用 Zookeeper,也足以运用 broker.list
    参数钦点二个静态的 brokers 列表,那样音信将被随便的发送到三个 broker
    上,一旦当选的 broker 失利了,音讯发送也就停业了。

  • 透过分区函数 kafka.producer.Partitioner 类对音信分区,能够由此参数
    partitioner.class 定制分区函数。

λ.2 集成

Consumer

Consumer API 有八个等第:低等别和高档别。

低档别的和三个点名的 broker
保持三番五次,并在选拔完新闻后关门连接,那个品级是无状态的,每一次读取新闻都带着
offset。

高等其余 API 隐蔽了和 brokers
连接的细节,在不必关怀服务端布局的情事下和服务端通讯。还足以本人维护花费情况,并能够通过有些规范化钦定订阅特定的
topic,举例白名单黑名单可能正则表达式。

1、根目录新建app.js,那一个文件在每一回项目加载时候都会运作

低级别 API

低端其余 API 是高等级 API
达成的底蕴,也是为了局部对有限帮衬花费情况有异样供给的场景,譬喻 Hadoop
consumer 那样的离线 consumer。

'use strict'; const kafka = require; module.exports = app => { app.beforeStart => { const ctx = app.createAnonymousContext(); const Producer = kafka.Producer; const client = new kafka.KafkaClient({ kafkaHost: app.config.kafkaHost }); const producer = new Producer(client, app.config.producerConfig); producer.on('error', function { console.error('ERROR: [Producer] ' + err); }); app.producer = producer; const consumer = new kafka.Consumer(client, app.config.consumerTopics, { autoCommit: false, }); consumer.on('message', async function { try { await ctx.service.log.insert(JSON.parse; consumer.commit => { console.error; }); } catch  { console.error('ERROR: [GetMessage] ', message, error); } }); consumer.on('error', function { console.error('ERROR: [Consumer] ' + err); }); });};

高级别 API

其一 API 围绕着由 KafkaStream
达成的迭代器张开,每一种流代表一多元从叁个或多少个分区的 broker
上聚合来的音信,每一个流由多个线程管理,所以顾客端能够在开创的时候经过参数钦命想要多少个流。

二个流是多个分区四个 broker 的联合,可是各样分区的音信只会流向贰个流。

上述代码新建了劳动者、消费者。

代码示例

以下是多个轻松的 Producer 和 Consumer 的代码示例。

Producer(循环向topic中发送新闻):

import java.util.Properties;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

public class Producer extends Thread{
  private final kafka.javaapi.producer.Producer<Integer, String> producer;
  private final String topic;
  private final Properties props = new Properties();

  public Producer(String topic){
    props.put("serializer.class", "kafka.serializer.StringEncoder");
    props.put("metadata.broker.list", "10.106.1.234:9092"); //需要替换成自己的broker信息
    producer = new kafka.javaapi.producer.Producer<Integer, String>(new ProducerConfig(props));
    this.topic = topic;
  }

  public void run() {
    int messageNo = 1;
    while(true){
      String messageStr = new String("Message_" + messageNo);
      producer.send(new KeyedMessage<Integer, String>(topic, messageStr));
      messageNo++;
    }
  }
}

Consumer(订阅topic音讯,并在支配台出口):

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;

public class Consumer extends Thread{
  private final ConsumerConnector consumer;
  private final String topic;

  public Consumer(String topic){
    consumer = kafka.consumer.Consumer.createJavaConsumerConnector(createConsumerConfig());
    this.topic = topic;
  }

  private static ConsumerConfig createConsumerConfig(){
    Properties props = new Properties();
    props.put("zookeeper.connect", zkConnect);  //需要将zkConnect替换成自己的Zookeeper集群信息
    props.put("group.id", "group1");
    props.put("zookeeper.session.timeout.ms", "400");
    props.put("zookeeper.sync.time.ms", "200");
    props.put("auto.commit.interval.ms", "1000");

    return new ConsumerConfig(props);
  }

  public void run() {
    Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
    topicCountMap.put(topic, new Integer(1));
    Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
    KafkaStream<byte[], byte[]> stream =  consumerMap.get(topic).get(0);
    ConsumerIterator<byte[], byte[]> it = stream.iterator();
    while(it.hasNext())
      System.out.println(new String(it.next().message()));
  }
} 

生产者新建后加载进app全局对象。大家就要号令时候坐蓐音讯。这里只是先新建实例

对比

消费者获得音讯将访谈service层的insert方法。

Kafka VS Flume

  • Kafka是二个不行通用的种类。多个坐褥者和消费者能够分享五个主旨。比较之下,Flume
    被设计为一个意志往 HDFS 或 HBase 发送数据的专项使用工具,它对 HDFS
    有特异的优化,何况集成了 Hadoop 的崇左特点。

  • Flume 内置了广大的 source 和 sink 组件。而 Kafka独有多个更加小的生育消费者生态系统,而且 Kafka 的社区协理不佳。使用
    卡夫卡 常常须要团结编排生产者和消费者代码。

  • Flume
    能够选择拦截器实时管理数据,那对于数据屏蔽也许高于是很有用的。而
    卡夫卡 须要外部的流管理系统技术做到。

  • 卡夫卡 和 Flume
    都是百无一失的系列,通过适当的配置都能确定保障零数据错过。不过,Flume
    不帮衬副手艺件,若是 Flume
    代理的二个节点奔溃了,就算使用了可信赖的文本管道格局,也会甩掉这一个事件直到恢复生机这么些磁盘。而
    卡夫卡 则未有这么些难点。

具体参数能够参照kafka-node官方API,往下看会有临蓐者和买主的布署参数。

Kafka VS RabbitMQ

  • RabbitMQ 服从 AMQP 合同,以 broker 为宗旨,有消息的承认机制。Kafka信守平时的 MQ 结构,以 consumer 为主干,无音信确认机制。

  • 卡夫卡具有非常高的吞吐量,内部使用音讯的批量拍卖,新闻处理的效用超级高。RabbitMQ
    在吞吐量方面稍逊于
    卡夫卡,援助对新闻的保障的传递,支持职业,但不扶持批量的操作,基于存款和储蓄的可相信性的须要存款和储蓄能够选取内部存款和储蓄器依然硬盘。

  • 卡夫卡 接纳 Zookeeper 对集群中的 broker、consumer 进行拘押,能够挂号
    topic 到 Zookeeper 上;通过 Zookeeper 的调养机制,producer 保存对应
    topic 的 broker 新闻,能够随意大概轮询发送到 broker 上;并且producer 可以依附语义钦点分片,音讯发送到 broker 的某分片上。而
    RabbitMQ 的载荷均衡必要独自的 loadbalancer 进行支撑。

2、controller · log.js

此地收获到了producer,并传往service层

'use strict'; const Controller = require.Controller; class LogController extends Controller { /** * @description Kafka控制日志信息流 * @host /log/notice * @method POST * @param {Log} log 日志信息 */ async notice() { const producer = this.ctx.app.producer; const Response = new this.ctx.app.Response(); const requestBody = this.ctx.request.body; const backInfo = await this.ctx.service.log.send(producer, requestBody); this.ctx.body = Response.success; }} module.exports = LogController;

3、service · log.js

这里有八个send方法,这里调用了producer.send ,实行临蓐者生产

insert方法则是数据库插入数据

'use strict'; const Service = require.Service;const uuidv1 = require; class LogService extends Service { async send { const payloads = [ { topic: this.ctx.app.config.topic, messages: JSON.stringify, }, ]; producer.send(payloads, function { console.log; return 'success'; } async insert { try { const logDB = this.ctx.app.mysql.get; const ip = this.ctx.ip; const Logs = this.ctx.model.Log.build, type: message.type || '', level: message.level || 0, operator: message.operator || '', content: message.content || '', ip, user_agent: message.user_agent || '', error_stack: message.error_stack || '', url: message.url || '', request: message.request || '', response: message.response || '', created_at: new Date(), updated_at: new Date; const result = await logDB.insert('logs', Logs.dataValues); if (result.affectedRows === 1) { console.log(`SUCEESS: [Insert ${message.type}]`); } else console.error('ERROR: [Insert DB] ', result); } catch  { console.error('ERROR: [Insert] ', message, error); } }} module.exports = LogService;

4、config · config.default.js

有的上述代码应用的布署参数具体在那地,注这里开了5个partition。

'use strict'; module.exports = appInfo => { const config = ; const topic = 'logAction_p5'; // add your config here config.middleware = []; config.security = { csrf: { enable: false, }, }; // mysql database configuration config.mysql = { clients: { basic: { host: 'localhost', port: '3306', user: 'root', password: '123456', database: 'merchants_basic', }, log: { host: 'localhost', port: '3306', user: 'root', password: '123456', database: 'merchants_log', }, }, default: {}, app: true, agent: false, }; // sequelize config config.sequelize = { dialect: 'mysql', database: 'merchants_log', host: 'localhost', port: '3306', username: 'root', password: '123456', dialectOptions: { requestTimeout: 999999, }, pool: { acquire: 999999, }, }; // kafka config config.kafkaHost = 'localhost:9092'; config.topic = topic; config.producerConfig = { // Partitioner type (default = 0, random = 1, cyclic = 2, keyed = 3, custom = 4), default 0 partitionerType: 1, }; config.consumerTopics = [ { topic, partition: 0 }, { topic, partition: 1 }, { topic, partition: 2 }, { topic, partition: 3 }, { topic, partition: 4 }, ]; return config;};

mode · log.js

这里运用了 Sequelize

'use strict'; module.exports = app => { const { STRING, INTEGER, DATE, TEXT } = app.Sequelize; const Log = app.model.define('log', { /** * UUID */ id: { type: STRING, primaryKey: true }, /** * 日志类型 */ type: STRING, /** * 优先等级 */ level: INTEGER, /** * 操作者 */ operator: STRING, /** * 日志内容 */ content: TEXT, /** * IP */ ip: STRING, /** * 当前用户代理信息 */ user_agent: STRING, /** * 错误堆栈 */ error_stack: TEXT, /** * URL */ url: STRING, /** * 请求对象 */ request: TEXT, /** * 响应对象 */ response: TEXT, /** * 创建时间 */ created_at: DATE, /** * 更新时间 */ updated_at: DATE, }); return Log;};

6、测试Python脚本:

import requests from multiprocessing import Poolfrom threading import Thread from multiprocessing import Process def loop(): t = 1000 while t: url = "http://localhost:7001/log/notice" payload = "{nt"type": "ERROR",nt"level": 1,nt"content": "URL send ERROR",nt"operator": "Knove"n}" headers = { 'Content-Type': "application/json", 'Cache-Control': "no-cache" } response = requests.request("POST", url, data=payload, headers=headers) print if __name__ == '__main__': for i in range: t = Thread t.start()

SET NAMES utf8mb4;SET FOREIGN_KEY_CHECKS = 0; -- ------------------------------ Table structure for logs-- ----------------------------DROP TABLE IF EXISTS `logs`;CREATE TABLE `logs`  CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NOT NULL, `type` varchar CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NOT NULL COMMENT '日志类型', `level` int NULL DEFAULT NULL COMMENT '优先等级', `operator` varchar CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NULL DEFAULT NULL COMMENT '操作人', `content` text CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NULL COMMENT '日志信息', `ip` varchar CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NULL DEFAULT NULL COMMENT 'IPrnIP', `user_agent` varchar CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NULL DEFAULT NULL COMMENT '当前用户代理信息', `error_stack` text CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NULL COMMENT '错误堆栈', `url` varchar CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NULL DEFAULT NULL COMMENT '当前URL', `request` text CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NULL COMMENT '请求对象', `response` text CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NULL COMMENT '响应对象', `created_at` datetime NULL DEFAULT NULL COMMENT '创建时间', `updated_at` datetime NULL DEFAULT NULL COMMENT '更新时间', PRIMARY KEY  ENGINE = InnoDB CHARACTER SET = utf8mb4 COLLATE = utf8mb4_bin ROW_FORMAT = Dynamic; SET FOREIGN_KEY_CHECKS = 1;

λ.3 后话

英特网相像资料甚少,啃各类文档,探索才能实现格局

以上正是本文的全部内容,希望对我们的就学抱有利于,也期望我们不吝赐教脚本之家。

发表评论

电子邮件地址不会被公开。 必填项已用*标注