天翼专属云分布式消息服务kafka 天翼云专属云分类
2025-03-21 10:21 - 立有生活网
Kafka发送消息报Error while fetching metadata
四、JMS消息服务搜了一下,大家都在说是server.properties的配置问题。
天翼专属云分布式消息服务kafka 天翼云专属云分类
天翼专属云分布式消息服务kafka 天翼云专属云分类
但是查了一下我这边,这次并不是这在kafka的 config 目录下找到 server.properties 配置文件个原因,而是服务的端口安全策略的问题。
后来负责的同学改了一下相关规则就ok了。
JMS(一)---消息中间件概念
P三、消息中间件示例2P的特点非底层作系统软件,非业务应用软件,不是直接给最终用户使用的,不能直接给客户带来价值的软件统称为中间件
关注于 数据的发送和接收 ,利用高效可靠的 异步 消息传递机制集成 分布式 系统。
Ja消息服务(Ja Message Serv) 即JMS ,是一个Ja平台中关于面向消息中间的API,用于在两个应用程序之间或者分布式系统中发送消息,进行异步通信。
2、主题模式
AMQP(aanced message queuing protocol) 是一个 提供统一消息服务的应用层标准协议 ,基于此协议的客户端与消息中间件可传递消息,并 不受客户端/中间件不同产品,不同开发语言等条件的限制 。
ActiveMQ是Apache出品,的,能力强劲的开源消息总线。ActiveMQ是一个完全支持JMS1.1和J2EE1.4规范的JMS Provider实现,尽管JMS规范出台已经是很久的事情了,但是JMS是在当今的J2EE应用中间仍然扮演着特殊的地位。
RabbitMQ是一个开源的AMQP实现,端用Erlang语言编写。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。
Kafka是一种高吞吐量的分布式发布消息系统,是一个分布式的、分区的、可靠的分布式日志存储服务。它通过一种的设计提供了一个消息系统的功能。
OpenTelemetry、Spring Cloud Sleuth、Kafka、Jager实现分布式跟踪
让我们看看我们需要启动哪些 docker 容器来运行这三个微服务并观察它们的分布式跟踪,前三个微服务在上面的部分中进行了解释。分布式跟踪可让您深入了解特定服务在分布式软件系统中作为整体的一部分是如何执行的。它跟踪和记录从起点到目的地的请求以及它们经过的系统。
在本文中,我们将使用 OpenTelemetry、Spring Cloud Sleuth、Kafka 和 Jaeger 在三个 Spring Boot 微服务 中实现分布式跟踪。
我们先来看看分布式中的一些基本术语。
跨度:表示系统内的单个工作单元。跨度可以相互嵌套以模拟工作的分解。例如,一个跨度可能正在调用一个 REST 端点,然后另一个子跨度可能是该端点调用另一个,等等在不同的服务中。
Trace:所有共享相同根跨度的跨度,或者更简单地说,将所有跨度创建为原始请求的直接结果。跨度的层次结构(每个跨度在根跨度旁边都有自己的父跨度)可用于形成有向无环图,显示请求在通过各种组件时的路径。
OpenTelemetry ,也简称为 OTel,是一个供应商中立的开源 Observability 框架,用于检测、生成、收集和导出遥测数据,例如 跟踪 、 指标 和 日志 。作为 云原生 计算基金会 (CNCF) 的孵化项目,OTel 旨在提供与供应商无关的统一库和 API 集——主要用于收集数据并将其传输到某处。OTel 正在成为生成和管理遥测数据的世界标准,并被广泛采用。
Sleuth 是一个由 Spring Cloud 团队管理和维护的项目,旨在将分布式跟踪功能集成到 Spring Boot 应用程序中。它作为一个典型Spring Starter的 . 以下是一些开箱即用的 Sleuth 工具:
Sleuth 添加了一个,以确保在请求中传递所有跟踪信息。每次调用时,都会创建一个新的 Span。它在收到响应后关闭。
Sleuth 能够跟踪您的请求和消息,以便您可以将该通信与相应的日志条目相关联。您还可以将跟踪信息导出到外部系统以可视化延迟。
Jaeger 最初由 Uber 的团队构建,然后于 2015 年开源。它于 2017 年被接受为云原生孵化项目,并于 2019 年毕业。作为 CNCF 的一部分,Jaeger 是云原生 架构 中公认的项目。它的源代码主要是用 Go 编写的。Jaeger 的架构包括:
与 Jaeger 类似,Zipkin 在其架构中也提供了相同的组件集。尽管 Zipkin 是一个较老的项目,但 Jaeger 具有更现代和可扩展的设计。对于此示例,我们选择 Jaeger 作为后端。
让我们设计三个 Spring Boot 微服务:
这三个微服务旨在:
这是为了观察 OpenTelemetry 如何结合 Spring Cloud Sleuth 处理代码的自动检测以及生成和传输跟踪数据。上面的虚线捕获了微服务导出的跟踪数据的路径,通过OTLP(OpenTelemetry Protocol)传输到OpenTelemetry Collector,收集器依次处理并将跟踪数据导出到后端Jaeger进行存储和查询。
使用 monorepo,我们的项目结构如下:
第 1 步:添加 POM 依赖项
这是使用 OTel 和 Spring Cloud Sleuth 实现分布式跟踪的关键。我们的目标是不必手动检测我们的代码,因此我们依靠这些依赖项来完成它们设计的工作——自动检测我们的代码,除了跟踪实现、将遥测数据导出到 OTel 收集器等。
第 2 步:OpenTelemetry 配置
OpenTelemetry 收集器端点
对于每个微服务,我们需要在其中添加以下配置application.yml(请参阅下面部分中的示例片段)。spring.sleuth.o.exporter.otlp.endpoint主要是配置OTel Collector端点。它告诉导出器,在我们的例子中是 Sleuth,通过 OTLP 将跟踪数据发送到指定的收集器端点://o-collector:4317。注意o-collector端点 URL 来自o-collector图像的 docker-come 服务。
跟踪数据概率抽样
spring.sleuth.o.config.trace-id-ratio-based属性定义了跟踪数据的采样概率。它根据提供给采样器的分数对一部分迹线进行采样。概率抽样允许 OpenTelemetry 跟踪用户通过使用随机抽样技术降低跨度收集成本。如果该比率小于 1.0,则某些迹线将不会被导出。对于此示例,我们将采样配置为 1.0、。
有关其他 OTel Spring Cloud Sleuth 属性,请参阅常见应用程序属性。
OpenTelemetry 配置文件
我们需要项目根目录下的 OTel 配置文件o-config.yaml。内容如下。此配置文件定义了 OTel 接收器、处理器和导出器的行为。正如我们所看到的,我们定义了我们的接收器来 gRPC 和 HTTP,处理器使用批处理和导出器作为 jaeger 和日志记录。
第 3 步:docker-come 将所有内容串在一起
运行docker-come up -d以调出所有九个容器:
第 4 步:数据在行动
快乐之路
启动 Jaeger UI, [= Trace(1) ConnectionFactorys按钮,这是我们看到的创建客户跟踪:它跨越三个服务,总共跨越六个,持续时间 82.35 毫秒。
除了 Trace Timeline 视图(上面的屏幕截图),Jaeger 还提供了一个图形视图(Trace Graph在右上角的下拉菜单中选择):
三个微服务在 docker 中的日志输出显示相同的跟踪 id,以红色突出显示,并根据其应用程序名称显示不同的跨度 id(应用程序名称及其对应的跨度 id 以匹配的颜色突出显示)。在 的情况下customer-serv,相同的 span id 从 REST API 请求传递到 Kafka 发布者请求。
customer-serv让我们在 docker 中暂停我们的PostgreSQL 数据库,然后重复从customer-serv-bff. 500 internal server error正如预期的那样,我们得到了。检查 Jaeger,我们看到以下跟踪,异常堆栈跟踪抱怨SocketTimeoutException,再次如预期的那样。
识别长期运行的跨度
Jaeger UI 允许我们搜索超过指定持续时间的跟踪。例如,我们可以搜索所有耗时超过 1000 毫秒的跟踪。然后,我们可以深入研究长期运行的跟踪以调查其根本原因。
在这个故事中,我们从 OpenTelemetry、Spring Cloud Sleuth 和 Jaeger 的角度解压了分布式跟踪,验证了 REST API 调用和 Kafka pub/sub 中分布式跟踪的自动检测。我希望这个故事能让你更好地理解这些跟踪框架和工具,尤其是 OpenTelemetry,以及它如何从根本上改变我们在 分布式系统 中进行可观察性的方式。
RocketMQ和Kafka到底选哪个
4.1.1 P2P模式1、适用场景
kafka适合日志处理
rocketmq适合业务处理
结论:两者没有区别,根据具体业务定夺
2、性能
kafka单机写入TPS号称在百万条/秒
rocketmq大约在10万条/秒
结论:追求性能方面,kafka单机性能更高
kafka使用异步刷盘方式,异步Replication
rocketmq支持异步/同步刷盘,异步/同步Replication
结论:rocketmq所支持的同步方式提升了数据的可靠性
4、实时性
kafka和rocketmq均支持pull长轮询,rocketmq消息实时性更高
结论:rocketmq胜出
kafka单机超过64个队列/分区,消息发送性能降低
rocketmq单机支持5W个队列,性能稳定
结论:长远看,rocketmq胜出,
6、消息顺序性
kafka某些配置下,支持消息顺序,但是一台Broker宕机后,就会产生消息乱序
rocketmq支持严格的消息顺序,一台Broker宕机后,发送消息会失败,但是不会乱序
结论:rocketmq胜出
7、消息失败重试机制
kafka消费失败不支持重试
rocketmq消费失败支持定时重试,每次重试间隔时间顺延
8、定时/延时消息
kafka不支持定时消息
rocketmq支持定时消息
9、分布式事务消Broker息
kafka不支持分布式事务消息
rocketmq未来会支持
10、消息查询机制
kafka不支持消息查询
rocketmq支持根据message id查询消息,也支持根据消息内容查询消息
11、消息回溯
kafka可以按照offset回溯消息
rocketmq支持按照时间回溯消息,例如从一天之前的某时某分开始重新消费消息
问题一:push和pull模式
push模式:客户端与服务端建立连接后,当服务端有消息时,将消息推送到客户端
pull模式:客户端不断的轮询请求服务端,来获取新的消息
在具体实现时,push和pull模式都是采用消费端主动拉取的方式,即consumer轮询从broker拉取消息
区别:
push 方式中,consumer把轮询过程封装了,并注册了MessageListener,取到消息后,唤醒MessageListener的consumerMessage来消费,用户而言,觉得消息被推送过来的
pull方式中,取消息的过程需要用户自己写,首先通过打算消费的Topic拿到MessageQueue的,遍历MessageQueue,然后针对每个MessageQueue批量获取消息,一次取完之后,记录该队列下一次要取的开始offset,直到取完了,再换另一个MessageQueue
长轮询:rocketmq时采用长轮询的方式实现的,指的是在请求的过程中,若是端数据并没有更新,那么则将这个连接挂起,直到推送新的数据,再返回,然后进入循环周期
客户端像传统轮询一样从服务端请求数据,服务端会阻塞请求不会立刻返回,直到有数据或者超时才返回给客户端,然后关闭连接,客户端处理完响应信息后再向发送新的请求
大型的PHP应用,通常使用什么应用做消息队列?
现在,让我们启动customer-serv-bff流程的入口点,以创建新客户。一、消息队列概述x0dx0a消息队列中间件是分布式系统中重要的组件,主要解决应用耦合,异步消息,流量削锋等问题。实现高性能,高可用,可伸缩和最终一致性架构。是大型分布式系统不可缺少的中间件。x0dx0a目前在生产环境,使用较多的消息队列有ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ等。x0dx0a二、消息队列应用场景x0dx0a以下介绍消息队列在实际应用中常用的使用场景。异步处理,应用解耦,流量削锋和消息通讯四个场景。x0dx0a2.1异步处理x0dx0a场景说明:用户注册后,需要发注册邮件和注册短信。传统的做法有两种1.串行的方式;2.并行方式。x0dx0a(1)串行方式:将注册信息写入数据库成功后,发送注册邮件,再发送注册短信。以上三个任务全部完成后,返回给客户端。(架构KKQ:46609发送者和接收者之间在时间上没有依赖性,也就是说当发送者发送了消息之后,不管接收者有没有正在运行,它不会影响到消息被发送到队列7527,欢迎加入)x0dx0a(2)并行方式:将注册信息写入数据库成功后,发送注册邮件的同时,发送注册短信。以上三个任务完成后,返回给客户端。与串行的别是,并行的方式可以提高处理的时间。x0dx0a设三个业务每个使用50毫秒钟,不考虑网络等其他开销,则串行方式的时间是150毫秒,并行的时间可能是100毫秒。x0dx0a因为CPU在单位时间内处理的请求数是一定的,设CPU1秒内吞吐量是100次。则串行方式1秒内CPU可处理的请求量是7次(1000/150)。并行方式处理的请求量是10次(1000/100)。x0dx0a小结:如以上案例描述,传统的方式系统的性能(并发量,吞吐量,响应时间)会有瓶颈。如何解决这个问题呢?x0dx0a引入消息队列,将不是必须的业务逻辑,异步处理。改造后的架构如下:x0dx0a按照以上约定,用户的响应时间相当于是注册信息写入数据库的时间,也就是50毫秒。注册邮件,发送短信写入消息队列后,直接返回,因此写入消息队列的速度很快,基本可以忽略,因此用户的响应时间可能是50毫秒。因此架构改变后,系统的吞吐量提高到每秒20 QPS。比串行提高了3倍,比并行提高了两倍。x0dx0a2.2应用解耦x0dx0a场景说明:用户下单后,订单系统需要通知库存系统。传统的做法是,订单系统调用库存系统的接口。如下图:x0dx0a传统模式的缺点:x0dx0a1) 如库存系统无法访问,则订单减库存将失败,从而导致订单失败;x0dx0a2) 订单系统与库存系统耦合;x0dx0a如何解决以上问题呢?引入应用消息队列后的方案,如下图:x0dx0a订单系统:用户下单后,订单系统完成持久化处理,将消息写入消息队列,返回用户订单下单成功。x0dx0a库存系统:下单的消息,采用拉/推的方式,获取下单信息,库存系统根据下单信息,进行库存作。x0dx0a如:在下单时库存系统不能正常使用。也不影响正常下单,因为下单后,订单系统写入消息队列就不再关心其他的后续作了。实现订单系统与库存系统的应用解耦。x0dx0a2.3流量削锋x0dx0a流量削锋也是消息队列中的常用场景,一般在秒杀或团抢活动中使用广泛。x0dx0a应用场景:秒杀活动,一般会因为流量过大,导致流量暴增,应用挂掉。为解决这个问题,一般需要在应用前端加入消息队列。x0dx0a可以控制活动的人数;x0dx0a可以缓解短时间内高流量压垮应用;x0dx0a用户的请求,接收后,首先写入消息队列。如消息队列长度超过数量,则直接抛弃用户请求或跳转到错误页面;x0dx0a秒杀业务根据消息队列中的请求信息,再做后续处理。x0dx0a2.4日志处理x0dx0a日志处理是指将消息队列用在日志处理中,比如Kafka的应用,解决大量日志传输的问题。架构简化如下:x0dx0a日志采集客户端,负责日志数据采集,定时写受写入Kafka队列;x0dx0aKafka消息队列,负责日志数据的接收,存储和转发;x0dx0a日志处理应用:并消费kafka队列中的日志数据;x0dx0a以下是新浪kafka日志处理应用案例:x0dx0a(1)Kafka:接收用户日志的消息队列。x0dx0a(2)Logstash:做日志解析,统一成JSON输出给Elasticsearch。x0dx0a(3)Elasticsearch:实时日志分析服务的核心技术,一个schemaless,实时的数据存储服务,通过index组织数据,兼具强大的搜索和统计功能。x0dx0a(4)Kibana:基于Elasticsearch的数据可视化组件,超强的数据可视化能力是众多公司选择ELK stack的重要原因。x0dx0a2.5消息通讯x0dx0a消息通讯是指,消息队列一般都内置了高效的通信机制,因此也可以用在纯的消息通讯。比如实现点对点消息队列,或者等。x0dx0a点对点通讯:x0dx0a客户端A和客户端B使用同一队列,进行消息通讯。x0dx0a通讯:x0dx0a客户端A,客户端B,客户端N同一主题,进行消息发布和接收。实现类似效果。x0dx0a以上实际是消息队列的两种消息模式,点对点或发布模式。模型为示意图,供参考。x0dx0a三、消息中间件示例x0dx0a3.1电商系统x0dx0a消息队列采用高可用,可持久化的消息中间件。比如Active MQ,Rabbit MQ,Rocket Mq。(1)应用将主干逻辑处理完成后,写入消息队列。消息发送是否成功可以开启消息的确认模式。(消息队列返回消息接收成功状态后,应用再返回,这样保障消息的完整性)x0dx0a(2)扩展流程(发短信,配送处理)队列消息。采用推或拉的方式获取消息并处理。x0dx0a(3)消息将应用解耦的同时,带来了数据一致性问题,可以采用最终一致性方式解决。比如主数据写入数据库,扩展应用根据消息队列,并结合数据库方式实现基于消息队列的后续处理。x0dx0a3.2日志收集系统x0dx0a分为Zookeeper注册中心,日志收集客户端,Kafka集群和Storm集群(OtherApp)四部分组成。x0dx0aZookeeper注册中心,提出负载均衡和地址查找服务;x0dx0a日志收集客户端,用于采集应用系统的日志,并将数据推送到kafka队列;x0dx0a四、JMS消息服务x0dx0a讲消息队列就不得不提JMS 。JMS(Ja Message Serv,Ja消息服务)API是一个消息服务的标准/规范,允许应用程序组件基于JaEE平台创建、发送、接收和读取消息。它使分布式通信耦合度更低,消息服务更加可靠以及异步性。x0dx0a在EJB架构中,有消息bean可以无缝的与JM消息服务集成。在J2EE架构模式中,有消息服务者模式,用于实现消息与应用直接的解耦。x0dx0a4.1消息模型x0dx0a在JMS标准中,有两种消息模型P2P(Point to Point),Publish/Subscribe(Pub/Sub)。x0dx0a4.1.1 P2P模式x0dx0aP2P模式包含三个角色:消息队列(Queue),发送者(Sender),接收者(Receiver)。每个消息都被发送到一个特定的队列,接收者从队列中获取消息。队列保留着消息,直到他们被消费或超时。x0dx0aP2P的特点x0dx0a每个消息只有一个消费者(Consumer)(即一旦被消费,消息就不再在消息队列中)x0dx0a发送者和接收者之间在时间上没有依赖性,也就是说当发送者发送了消息之后,不管接收者有没有正在运行,它不会影响到消息被发送到队列x0dx0a接收者在成功接收消息之后需向队列应答成功x0dx0a如果希望发送的每个消息都会被成功处理的话,那么需要P2P模式。(架构KKQ:466097527,欢迎加入)x0dx0a4.1.2 Pub/sub模式x0dx0a包含三个角色主题(Topic),发布者(Publisher),者(Subscriber) 。多个发布者将消息发送到Topic,系统将这些消息传递给多个者。x0dx0aPub/Sub的特点x0dx0a每个消息可以有多个消费者x0dx0a发布者和者之间有时间上的依赖性。针对某个主题(Topic)的者,它必须创建一个者之后,才能消费发布者的消息。x0dx0a为了消费消息,者必须保持运行的状态。x0dx0a为了缓和这样严格的时间相关性,JMS允许者创建一个可持久化的。这样,即使者没有被激活(运行),它也能接收到发布者的消息。x0dx0a如果希望发送的消息可以不被做任何处理、或者只被一个消息者处理、或者可以被多个消费者处理的话,那么可以采用Pub/Sub模型。x0dx0a4.2消息消费x0dx0a在JMS中,消息的产生和消费都是异步的。对于消费来说,JMS的消息者可以通过两种方式来消费消息。x0dx0a(1)同步x0dx0a者或接收者通过receive方法来接收消息,receive方法在接收到消息之前(或超时之前)将一直阻塞;x0dx0a(2)异步x0dx0a者或接收者可以注册为一个消息。当消息到达之后,系统自动调用的onMessage方法。x0dx0aJNDI:Ja命名和目录接口,是一种标准的Ja命名系统接口。可以在网络上查找和访问服务。通过指定一个资源名称,该名称对应于数据库或命名服务中的一个记录,同时返回资源连接建立所必须的信息。x0dx0aJNDI在JMS中起到查找和访问发送目标或消息来源的作用。(架构KKQ:466097527,欢迎加入)x0dx0a4.3JMS编程模型x0dx0a(1) ConnectionFactoryx0dx0a创建Connection对象的工厂,针对两种不同的jms消息模型,分别有QueueConnectionFactory和TopicConnectionFactory两种。可以通过JNDI来查找ConnectionFactory对象。x0dx0a(2) Destinationx0dx0aDestination的意思是消息生产者的消息发送目标或者说消息消费者的消息来源。对于消息生产者来说,它的Destination是某个队列(Queue)或某个主题(Topic);对于消息消费者来说,它的Destination也是某个队列或主题(即消息来源)。x0dx0a所以,Destination实际上就是两种类型的对象:Queue、Topic可以通过JNDI来查找Destination。x0dx0a(3) Connectionx0dx0aConnection表示在客户端和JMS系统之间建立的链接(对TCP/IP socket的包装)。Connection可以产生一个或多个Session。跟ConnectionFactory一样,Connection也有两种类型:QueueConnection和TopicConnection。x0dx0a(4) Sessionx0dx0aSession是作消息的接口。可以通过session创建生产者、消费者、消息等。Session提供了事务的功能。当需要使用session发送/接收多个消息时,可以将这些发送/接收动作放到一个事务中。同样,也分QueueSession和TopicSession。x0dx0a(5) 消息的生产者x0dx0a消息生产者由Session创建,并用于将消息发送到Destination。同样,消息生产者分两种类型:QueueSender和TopicPublisher。可以调用消息生产者的方法(send或publish方法)发送消息。x0dx0a(6) 消息消费者x0dx0a消息消费者由Session创建,用于接收被发送到Destination的消息。两种类型:QueueReceiver和TopicSubscriber。可分别通过session的createReceiver(Queue)或createSubscriber(Topic)来创建。当然,也可以session的creatDurableSubscriber方法来创建持久化的者。x0dx0a(7) MessageListenerx0dx0a消息。如果注册了消息,一旦消息到达,将自动调用的onMessage方法。EJB中的MDB(Message-Driven Bean)就是一种MessageListener。x0dx0a深入学习JMS对掌握JAVA架构,EJB架构有很好的帮助,消息中间件也是大型分布式系统必须的组件。本次分享主要做全局性介绍,具体的深入需要大家学习,实践,总结,领会。x0dx0a五、常用消息队列x0dx0a一般商用的容器,比如WebLogic,JBoss,都支持JMS标准,开发上很方便。但免费的比如Tomcat,Jetty等则需要使用第三方的消息中间件。本部分内容介绍常用的消息中间件(Active MQ,Rabbit MQ,Zero MQ,Kafka)以及他们的特点。x0dx0a5.1 ActiveMQx0dx0aActiveMQ 是Apache出品,的,能力强劲的开源消息总线。ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现,尽管JMS规范出台已经是很久的事情了,但是JMS在当今的J2EE应用中间仍然扮演着特殊的地位。x0dx0aActiveMQ特性如下:x0dx0a⒈ 多种语言和协议编写客户端。语言: Ja,C,C++,C#,Ruby,Perl,Python,PHP。应用协议: OpenWire,Stomp REST,WS Notification,XMPP,AMQPx0dx0a⒉ 完全支持JMS1.1和J2EE 1.4规范 (持久化,XA消息,事务)x0dx0a⒊ 对spring的支持,ActiveMQ可以很容易内嵌到使用Spring的系统里面去,而且也支持Spring2.0的特性x0dx0a⒋ 通过了常见J2EE(如 Geronimo,JBoss 4,GlassFish,WebLogic)的测试,其中通过JCA 1.5 resource adaptors的配置,可以让ActiveMQ可以自动的部署到任何兼容J2EE 1.4 商业上x0dx0a⒌ 支持多种传送协议:in-VM,TCP,SSL,NIO,UDP,JGroups,JXTAx0dx0a⒍ 支持通过JDBC和journal提供高速的消息持久化x0dx0a⒎ 从设计上保证了高性能的集群,客户端-,点对点x0dx0a⒏ 支持Ajaxx0dx0a⒐ 支持与Axis的整合x0dx0a⒑ 可以很容易得调用内嵌JMS provider,进行测试x0dx0a5.2 RabbitMQx0dx0aRabbitMQ是流行的开源消息队列系统,用erlang语言开发。RabbitMQ是AMQP(高级消息队列协议)的标准实现。支持多种客户端,如:Python、Ruby、.NET、Ja、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX,持久化。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。x0dx0a几个重要概念:x0dx0aBroker:简单来说就是消息队列实体。x0dx0aExchange:消息交换机,它指定消息按什么规则,路由到哪个队列。x0dx0aQueue:消息队列载体,每个消息都会被投入到一个或多个队列。x0dx0aBinding:绑定,它的作用就是把exchange和queue按照路由规则绑定起来。x0dx0aRouting Key:路由关键字,exchange根据这个关键字进行消息投递。x0dx0host:虚拟主机,一个broker里可以开设多个vhost,用作不同用户的权限分离。x0dx0aproducer:消息生产者,就是投递消息的程序。x0dx0aconsumer:消息消费者,就是接受消息的程序。x0dx0achannel:消息通道,在客户端的每个连接里,可建立多个channel,每个channel代表一个会话任务。x0dx0a消息队列的使用过程,如下:x0dx0a(1)客户端连接到消息队列,打开一个channel。x0dx0a(2)客户端声明一个exchange,并设置相关属性。x0dx0a(3)客户端声明一个queue,并设置相关属性。x0dx0a(4)客户端使用routing key,在exchange和queue之间建立好绑定关系。x0dx0a(5)客户端投递消息到exchange。x0dx0aexchange接收到消息后,就根据消息的key和已经设置的binding,进行消息路由,将消息投递到一个或多个队列里。x0dx0a5.3 ZeroMQx0dx0a号称史上最快的消息队列,它实际类似于Socket的一系列接口,他跟Socket的区别是:普通的socket是端到端的(1:1的关系),而ZMQ却是可以N:M 的关系,人们对BSD套接字的了解较多的是点对点的连接,点对点连接需要显式地建立连接、销毁连接、选择协议(TCP/UDP)和处理错误等,而ZMQ屏蔽了这些细节,让你的网络编程更为简单。ZMQ用于node与node间的通信,node可以是主机或者是进程。x0dx0a引用的说法: “ZMQ(以下ZeroMQ简称ZMQ)是一个简单好用的传输层,像框架一样的一个socket library,他使得Socket编程更加简单、简洁和性能更高。是一个消息处理队列库,可在多个线程、内核和主机盒之间弹性伸缩。ZMQ的明确目标是“成为标准网络协议栈的一部分,之后进入Linux内核”。现在还未看到它们的成功。但是,它无疑是极具前景的、并且是人们更加需要的“传统”BSD套接字之上的一 层封装。ZMQ让编写高性能网络应用程序极为简单和有趣。”x0dx0a特点是:x0dx0a高性能,非持久化;x0dx0a跨平台:支持Linux、Windows、OS X等。x0dx0a多语言支持; C、C++、Ja、.NET、Python等30多种开发语言。x0dx0a可单独部署或集成到应用中使用;x0dx0a可作为Socket通信库使用。x0dx0a与RabbitMQ相比,ZMQ并不像是一个传统意义上的消息队列,事实上,它也根本不是一个,更像一个底层的网络通讯库,在Socket API之上做了一层封装,将网络通讯、进程通讯和线程通讯抽象为统一的API接口。支持“Request-Reply “,”Publisher-Subscriber“,”Parallel Pipeline”三种基本模型和扩展模型。x0dx0aZeroMQ高性能设计要点:x0dx0a1、无锁的队列模型x0dx0a对于跨线程间的交互(用户端和session)之间的数据交换通道pipe,采用无锁的队列算法CAS;在pipe两端注册有异步,在读或者写消息到pipe的时,会自动触发读写。x0dx0a2、批量处理的算法x0dx0a对于传统的消息处理,每个消息在发送和接收的时候,都需要系统的调用,这样对于大量的消息,系统的开销比较大,zeroMQ对于批量的消息,进行了适应性的优化,可以批量的接收和发送消息。x0dx0a3、多核下的线程绑定,无须CPU切换x0dx0a区别于传统的多线程并发模式,信号量或者临界区, zeroMQ充分利用多核的优势,每个核绑定运行一个工作者线程,避免多线程之间的CPU切换开销。x0dx0a5.4 Kafkax0dx0aKafka是一种高吞吐量的分布式发布消息系统,它可以处理消费者规模的网站中的所有动作流数据。 这种动作(网页浏览,搜索和其他用户的行动)是在现代网络上的许多功能的一个关键因素。 这些数据通常是由于吞吐量的要求而通过处理日志和日志聚合来解决。 对于像Hadoop的一样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案。Kafka的目的是通过Hadoop的并行加载机制来统一线上和离线的消息处理,也是为了通过集群机来提供实时的消费。x0dx0aKafka是一种高吞吐量的分布式发布消息系统,有如下特性:x0dx0a通过O(1)的磁盘数据结构提供消息的持久化,这种结构对于即使数以TB的消息存储也能够保持长时间的稳定性能。(文件追加的方式写入数据,过期的数据定期删除)x0dx0a高吞吐量:即使是非常普通的硬件Kafka也可以支持每秒数百万的消息。x0dx0a支持通过Kafka和消费机集群来分区消息。x0dx0a支持Hadoop并行数据加载。x0dx0aKafka相关概念x0dx0aBrokerx0dx0aKafka集群包含一个或多个,这种被称为broker[5]x0dx0aTopicx0dx0a每条发布到Kafka集群的消息都有一个类别,这个类别被称为Topic。(物理上不同Topic的消息分开存储,逻辑上一个Topic的消息虽然保存于一个或多个broker上但用户只需指定消息的Topic即可生产或消费数据而不必关心数据存于何处)x0dx0aPartitionx0dx0aParition是物理上的概念,每个Topic包含一个或多个Partition.x0dx0aProducerx0dx0a负责发布消息到Kafka brokerx0dx0aConsumerx0dx0a消息消费者,向Kafka broker读取消息的客户端。x0dx0aConsumer Groupx0dx0a每个Consumer属于一个特定的Consumer Group(可为每个Consumer指定group name,若不指定group name则属于默认的group)。x0dx0a一般应用在大数据日志处理或对实时性(少量延迟),可靠性(少量丢数据)要求稍低的场景使用。
SpringBoot集成Kafka,实现简单的收发消息
Pub/Sub的特点把 listeners 和 aertised.listeners 两处配置的注释去掉,可以根据需要配置连接的 外网IPBroker:简单来说就是消息队列实体。 和 端口号 ,我这里演示选择的是本地 localhost 和默认端口 9092
KafkaTemplate 这个类包装了个生产者 Producer ,来提供方便的发送数据到 kafka 的主题 topic 里面。
send() 方法的源码, KafkaTemplate 类中还重载了很多 send() 方法,有需要可以看看源码
通过 KafkaTemplate 模板类发送数据。
kafkaTemplate.send(String topic, K key, V data) ,个入参是主题,第二个入参是发送的对象,第三个入参是发送的数据。通过 @KafkaListener 注解配置用户 topics
bootstrap-servers :kafka地址(可以多个)
consumer.group-id :指定一个默认的组名
不指定的话会报
1. earliest :当各分区下有已提交的 offset 时,从提交的 offset 开始消费;无提交的 offset 时,从头开始消费
2. latest :当各分区下有已提交的 offset 时,从提交的 offset 开始消费;无提交的 offset 时,消费新产生的该分区下的数据
3. none : topic 各分区都存在已提交的 offset 时,从 offset 后开始消费;只要有一个分区不存在已提交的 offset ,则抛出异常
这个属性也是必须配置的,不然也是会报错的
在使用Kafka发送接收消息时,生产者 producer 端需要序列化,消费者 consumer 端需要反序列化,由于网络传输过来的是 byte[] ,只有反序列化后才能得到生产者发送的真实的消息内容。这样消息才能进行网络传输
consumer.key-deserializer 和 consumer.value-deserializer 是消费者 key/value 反序列化
producer.key-deserializer 和 producer.value-deserializer 是生产者 key/value 序列化
StringDeserializer 是内置的字符串反序列化方式
StringSerializer 是内置的字符串序列化方式
在 org.apache.kafkmon.serialization 源码包中还提供了多种类型的序列化和反序列化方式
要自定义序列化方式,需要实现接口 Serializer
要自定义反序列化方式,需要实现接口 Deserializer
详细可以参考
这是 Kafka 的消费者 Consumer 的配置信息,每个消费者都会输出该配置信息
访问 ,就可以看到控制台打印消息了
大型的PHP应用,通常使用什么应用做消息队列?
疑问:既然都是采用pull方式实现,rocketmq怎么保证消息的实时性?一、消息队列概述
消息队列中间件是分布式系统中重要的组件,主要解决应用耦合,异步消息,流量削锋等问题。实现高性能,高可用,可伸缩和最终一致性架构。是大型分布式系统不可缺少的中间件。
目前在生产环境,使用较多的消息队列有ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ等。
二、消息队列应用场景
以下介绍消息队列在实际应用中常用的使用场景。异步处理,应用解耦,流量削锋和消息通讯四个场景。
2.1异步处理
场景说明:用户注册后,需要发注册邮件和注册短信。传统的做法有两种1.串行的方式;2.并行方式。
(1)串行方式:将注册信息写入数据库成功后,发送注册邮件,再发送注册短信。以上三个任务全部完成后,返回给客户端。(架构KKQ:466097527,欢迎加入)
(2)并行方式:将注册信息写入数据库成功后,发送注册邮件的同时,发送注册短信。以上三个任务完成后,返回给客户端。与串行的别是,并行的方式可以提高处理的时间。
设三个业务每个使用50毫秒钟,不考虑网络等其他开销,则串行方式的时间是150毫秒,并行的时间可能是100毫秒。
因为CPU在单位时间内处理的请求数是一定的,设CPU1秒内吞吐量是100次。则串行方式1秒内CPU可处理的请求量是7次(1000/150)。并行方式处理的请求量是10次(1000/100)。
小结:如以上案例描述,传统的方式系统的性能(并发量,吞吐量,响应时间)会有瓶颈。如何解决这个问题呢?
引入消息队列,将不是必须的业务逻辑,异步处理。改造后的架构如下:
按照以上约定,用户的响应时间相当于是注册信息写入数据库的时间,也就是50毫秒。注册邮件,发送短信写入消息队列后,直接返回,因此写入消息队列的速度很快,基本可以忽略,因此用户的响应时间可能是50毫秒。因此架构改变后,系统的吞吐量提高到每秒20 QPS。比串行提高了3倍,比并行提高了两倍。
2.2应用解耦
场景说明:用户下单后,订单系统需要通知库存系统。传统的做法是,订单系统调用库存系统的接口。如下图:
传统模式的缺点:
1) 如库存系统无法访问,则订单减库存将失败,从而导致订单失败;
2) 订单系统与库存系统耦合;
如何解决以上问题呢?引入应用消息队列后的方案,如下图:
订单系统:用户下单后,订单系统完成持久化处理,将消息写入消息队列,返回用户订单下单成功。
库存系统:下单的消息,采用拉/推的方式,获取下单信息,库存系统根据下单信息,进行库存作。
如:在下单时库存系统不能正常使用。也不影响正常下单,因为下单后,订单系统写入消息队列就不再关心其他的后续作了。实现订单系统与库存系统的应用解耦。
2.3流量削锋
应用场景:秒杀活动,一般会因为流量过大,导致流量暴增,应用挂掉。为解决这个问题,一般需要在应用前端加入消息队列。
可以控制活动的人数;
可以缓解短时间内高流量压垮应用;
用户的请求,接收后,首先写入消息队列。如消息队列长度超过数量,则直接抛弃用户请求或跳转到错误页面;
秒杀业务根据消息队列中的请求信息,再做后续处理。
2.4日志处理
日志处理是指将消息队列用在日志处理中,比如Kafka的应用,解决大量日志传输的问题。架构简化如下:
日志采集客户端,负责日志数据采集,定时写受写入Kafka队列;
Kafka消息队列,负责日志数据的接收,存储和转发;
日志处理应用:并消费kafka队列中的日志数据;
以下是新浪kafka日志处理应用案例:
(1)Kafka:接收用户日志的消息队列。
(2)Logstash:做日志解析,统一成JSON输出给Elasticsearch。
(3)Elasticsearch:实时日志分析服务的核心技术,一个schemaless,实时的数据存储服务,通过index组织数据,兼具强大的搜索和统计功能。
(4)Kibana:基于Elasticsearch的数据可视化组件,超强的数据可视化能力是众多公司选择ELK stack的重要原因。
2.5消息通讯
消息通讯是指,消息队列一般都内置了高效的通信机制,因此也可以用在纯的消息通讯。比如实现点对点消息队列,或者等。
点对点通讯:
客户端A和客户端B使用同一队列,进行消息通讯。
通讯:
客户端A,客户端B,客户端N同一主题,进行消息发布和接收。实现类似效果。
以上实际是消息队列的两种消息模式,点对点或发布模式。模型为示意图,供参考。
3.1电商系统
消息队列采用高可用,可持久化的消息中间件。比如A流量削锋也是消息队列中的常用场景,一般在秒杀或团抢活动中使用广泛。ctive MQ,Rabbit MQ,Rocket Mq。(1)应用将主干逻辑处理完成后,写入消息队列。消息发送是否成功可以开启消息的确认模式。(消息队列返回消息接收成功状态后,应用再返回,这样保障消息的完整性)
(2)扩展流程(发短信,配送处理)队列消息。采用推或拉的方式获取消息并处理。
(3)消息将应用解耦的同时,带来了数据一致性问题,可以采用最终一致性方式解决。比如主数据写入数据库,扩展应用根据消息队列,并结合数据库方式实现基于消息队列的后续处理。
3.2日志收集系统
分为Zookeeper注册中心,日志收集客户端,Kafka集群和Storm集群(OtherApp)四部分组成。
Zookeeper注册中心,提出负载均衡和地址查找服务;
日志收集客户端,用于采集应用系统的日志,并将数据推送到kafka队列;
讲消息队列就不得不提JMS 。JMS(Ja Message Serv,Ja消息服务)API是一个消息服务的标准/规范,允许应用程序组件基于JaEE平台创建、发送、接收和读取消息。它使分布式通信耦合度更低,消息服务更加可靠以及异步性。
在EJB架构中,有消息bean可以无缝的与JM消息服务集成。在J2EE架构模式中,有消息服务者模式,用于实现消息与应用直接的解耦。
4.1消息模型
在JMS标准中,有两种消息模型P2P(Point to Point),Publish/Subscribe(Pub/Sub)。
P2P模式包含三个角色:消息队列(Queue),发送者(Sender),接收者(Receiver)。每个消息都被发送到一个特定的队列,接收者从队列中获取消息。队列保留着消息,直到他们被消费或超时。
每个消息只有一个消费者(Consumer)(即一旦被消费,消息就不再在消息队列中)
接收者在成功接收消息之后需向队列应答成功
如果希望发送的每个消息都会被成功处理的话,那么需要P2P模式。(架构KKQ:466097527,欢迎加入)
4.1.2 Pub/sub模式
包含三个角色主题(Topic),发布者(Publisher),者(Subscriber) 。多个发布者将消息发送到Topic,系统将这些消息传递给多个者。
每个消息可以有多个消费者
发布者和者之间有时间上的依赖性。针对某个主题(Topic)的者,它必须创建一个者之后,才能消费发布者的消息。
为了消费消息,者必须保持运行的状态。
为了缓和这样严格的时间相关性,JMS允许者创建一个可持久化的。这样,即使者没有被激活(运行),它也能接收到发布者的消息。
如果希望发送的消息可以不被做任何处理、或者只被一个消息者处理、或者可以被多个消费者处理的话,那么可以采用Pub/Sub模型。
4.2消息消费
在JMS中,消息的产生和消费都是异步的。对于消费来说,JMS的消息者可以通过两种方式来消费消息。
(1)同步
者或接收者通过receive方法来接收消息,receive方法在接收到消息之前(或超时之前)将一直阻塞;
(2)异步
者或接收者可以注册为一个消息。当消息到达之后,系统自动调用的onMessage方法。
JNDI:Ja命名和目录接口,是一种标准的Ja命名系统接口。可以在网络上查找和访问服务。通过指定一个资源名称,该名称对应于数据库或命名服务中的一个记录,同时返回资源连接建立所必须的信息。
JNDI在JMS中起到查找和访问发送目标或消息来源的作用。(架构KKQ:466097527,欢迎加入)
4.3JMS编程模型
创建Connection对象的工厂,针对两种不同的jms消息模型,分别有QueueConnectionFactory和TopicConnectionFactory两种。可以通过JNDI来查找ConnectionFactory对象。
(2) Destination
Destination的意思是消息生产者的消息发送目标或者说消息消费者的消息来源。对于消息生产者来说,它的Destination是某个队列(Queue)或某个主题(Topic);对于消息消费者来说,它的Destination也是某个队列或主题(即消息来源)。
所以,Destination实际上就是两种类型的对象:Queue、Topic可以通过JNDI来查找Destination。
(3) Connection
Connection表示在客户端和JMS系统之间建立的链接(对TCP/IP socket的包装)。Connection可以产生一个或多个Session。跟ConnectionFactory一样,Connection也有两种类型:QueueConnection和TopicConnection。
(4) Session
Session是作消息的接口。可以通过session创建生产者、消费者、消息等。Session提供了事务的功能。当需要使用session发送/接收多个消息时,可以将这些发送/接收动作放到一个事务中。同样,也分QueueSession和TopicSession。
(5) 消息的生产者
消息生产者由Session创建,并用于将消息发送到Destination。同样,消息生产者分两种类型:QueueSender和TopicPublisher。可以调用消息生产者的方法(send或publish方法)发送消息。
(6) 消息消费者
消息消费者由Session创建,用于接收被发送到Destination的消息。两种类型:QueueReceiver和TopicSubscriber。可分别通过session的createReceiver(Queue)或createSubscriber(Topic)来创建。当然,也可以session的creatDurableSubscriber方法来创建持久化的者。
(7) MessageListener
消息。如果注册了消息,一旦消息到达,将自动调用的onMessage方法。EJB中的MDB(Message-Driven Bean)就是一种MessageListener。
深入学习JMS对掌握JAVA架构,EJB架构有很好的帮助,消息中间件也是大型分布式系统必须的组件。本次分享主要做全局性介绍,具体的深入需要大家学习,实践,总结,领会。
五、常用消息队列
一般商用的容器,比如WebLogic,JBoss,都支持JMS标准,开发上很方便。但免费的比如Tomcat,Jetty等则需要使用第三方的消息中间件。本部分内容介绍常用的消息中间件(Active MQ,Rabbit MQ,Zero MQ,Kafka)以及他们的特点。
5.1 ActiveMQ
ActiveMQ 是Apache出品,的,能力强劲的开源消息总线。ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现,尽管JMS规范出台已经是很久的事情了,但是JMS在当今的J2EE应用中间仍然扮演着特殊的地位。
ActiveMQ特性如下:
⒈ 多种语言和协议编写客户端。语言: Ja,C,C++,C#,Ruby,Perl,Python,PHP。应用协议: OpenWire,Stomp REST,WS Notification,XMPP,AMQP
⒉ 完全支持JMS1.1和J2EE 1.4规范 (持久化,XA消息,事务)
⒊ 对spring的支持,ActiveMQ可以很容易内嵌到使用Spring的系统里面去,而且也支持Spring2.0的特性
⒋ 通过了常见J2EE(如 Geronimo,JBoss 4,GlassFish,WebLogic)的测试,其中通过JCA 1.5 resource adaptors的配置,可以让ActiveMQ可以自动的部署到任何兼容J2EE 1.4 商业上
⒌ 支持多种传送协议:in-VM,TCP,SSL,NIO,UDP,JGroups,JXTA
⒍ 支持通过JDBC和journal提供高速的消息持久化
⒎ 从设计上保证了高性能的集群,客户端-,点对点
⒐ 支持与Axis的整合
⒑ 可以很容易得调用内嵌JMS provider,进行测试
5.2 RabbitMQ
RabbitMQ是流行的开源消息队列系统,用erlang语言开发。RabbitMQ是AMQP(高级消息队列协议)的标准实现。支持多种客户端,如:Python、Ruby、.NET、Ja、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX,持久化。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。
几个重要概念:
Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列。
Queue:消息队列载体,每个消息都会被投入到一个或多个队列。
Binding:绑定,它的作用就是把exchange和queue按照路由规则绑定起来。
Routing Key:路由关键字,exchange根据这个关键字进行消息投递。
vhost:虚拟主机,一个broker里可以开设多个vhost,用作不同用户的权限分离。
producer:消息生产者,就是投递消息的程序。
consumer:消息消费者,就是接受消息的程序。
channel:消息通道,在客户端的每个连接里,可建立多个channel,每个channel代表一个会话任务。
消息队列的使用过程,如下:
(1)客户端连接到消息队列,打开一个channel。
(2)客户端声明一个exchange,并设置相关属性。
(4)客户端使用routing key,在exchange和queue之间建立好绑定关系。
(5)客户端投递消息到exchange。
exchange接收到消息后,就根据消息的key和已经设置的binding,进行消息路由,将消息投递到一个或多个队列里。
5.3 ZeroMQ
号称史上最快的消息队列,它实际类似于Socket的一系列接口,他跟Socket的区别是:普通的socket是端到端的(1:1的关系),而ZMQ却是可以N:M 的关系,人们对BSD套接字的了解较多的是点对点的连接,点对点连接需要显式地建立连接、销毁连接、选择协议(TCP/UDP)和处理错误等,而ZMQ屏蔽了这些细节,让你的网络编程更为简单。ZMQ用于node与node间的通信,node可以是主机或者是进程。
特点是:
高性能,非持久化;
跨平台:支持Linux、Windows、OS X等。
多语言支持; C、C++、Ja、.NET、Python等30多种开发语言。
可单独部署或集成到应用中使用;
可作为Socket通信库使用。
与RabbitMQ相比,ZMQ并不像是一个传统意义上的消息队列,事实上,它也根本不是一个,更像一个底层的网络通讯库,在Socket API之上做了一层封装,将网络通讯、进程通讯和线程通讯抽象为统一的API接口。支持“Request-Reply “,”Publisher-Subscriber“,”Parallel Pipeline”三种基本模型和扩展模型。
ZeroMQ高性能设计要点:
1、无锁的队列模型
对于跨线程间的交互(用户端和session)之间的数据交换通道pipe,采用无锁的队列算法CAS;在pipe两端注册有异步,在读或者写消息到pipe的时,会自动触发读写。
2、批量处理的算法
对于传统的消息处理,每个消息在发送和接收的时候,都需要系统的调用,这样对于大量的消息,系统的开销比较大,zeroMQ对于批量的消息,进行了适应性的优化,可以批量的接收和发送消息。
3、多核下的线程绑定,无须CPU切换
区别于传统的多线程并发模式,信号量或者临界区, zeroMQ充分利用多核的优势,每个核绑定运行一个工作者线程,避免多线程之间的CPU切换开销。
5.4 Kafka
Kafka是一种高吞吐量的分布式发布消息系统,它可以处理消费者规模的网站中的所有动作流数据。 这种动作(网页浏览,搜索和其他用户的行动)是在现代网络上的许多功能的一个关键因素。 这些数据通常是由于吞吐量的要求而通过处理日志和日志聚合来解决。 对于像Hadoop的一样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案。Kafka的目的是通过Hadoop的并行加载机制来统一线上和离线的消息处理,也是为了通过集群机来提供实时的消费。
Kafka是一种高吞吐量的分布式发布消息系统,有如下特性:
通过O(1)的磁盘数据结构提供消息的持久化,这种结构对于即使数以TB的消息存储也能够保持长时间的稳定性能。(文件追加的方式写入数据,过期的数据定期删除)
高吞吐量:即使是非常普通的硬件Kafka也可以支持每秒数百万的消息。
支持通过Kafka和消费机集群来分区消息。
支持Hadoop并行数据加载。
Kafka相关概念
Kafka集群包含一个或多个,这种被称为broker[5]
Topic
Partition
Parition是物理上的概念,每个Topic包含一个或多个Partition.
Producer
负责发布消息到Kafka broker
Consumer
消息消费者,向Kafka broker读取消息的客户端。
Consumer Group
每个Consumer属于一个特定的Consumer Group(可为每个Consumer指定group name,若不指定group name则属于默认的group)。
一般应用在大数据日志处理或对实时性(少量延迟),可靠性(少量丢数据)要求稍低的场景使用。
昌平电脑培训分享微服务架构开发的工具都有哪些
⒏ 支持Ajax关于微服务架构的文章相信大家应该看过不少了,其中关于微服务的架构技巧以及开发工具的介绍也有很多。今天,昌平电脑培训就给大家汇总了一下,其中适合微服务架构的工具都有哪些种类,一起来了解一下吧。 API管理和测试
(3)客户端声明一个queue,并设置相关属性。1.APIFortress APIFortress是API测试和健康检测工具,为企业级API提供自动化的功能测试、健康检测和负载测试。它的设计原则是无代码,完全基于现代API架构实践和模式而构建。
2.Postman Postman是面向个体开发者和团队的API开发套件,可让你轻松运行UI驱动的API测试。Postman还是一个功能强大的HTTP客户端,让RESTfulAPI探索变得轻而易举。用户可以将简单和复杂的HTTP请求组合在一起,实现快速的API测试、开发和文档化。
3.Tyk Tyk是一款开箱即用的开源API管理平台,速度快,可伸缩。无论是部署在内部,还是部署在云端,或者使用两者的混合,对Tyk来说都不在话下。除了可以降低管理成本,Tyk还将为你带来高可用性和低延迟。
消息服务 4.RabbitMQ
RabbitMQ可作为微服务之间的通信桥梁,它支持各种模式,可提高应用程序的可伸缩性,并解决大多数分布式系统都存在的问题。RabbitMQ可用在微服务环境或任何其他分布式系统中。你还可以使用这个工具在服务之间交换。 5.亚马逊简单队列服务(SQS)
亚马逊SQS提供了强大、灵活且可靠的微服务通信机制。作为一种基于发布的微服务通信模型,亚马逊SQS可以帮助开发人员解决很多问题。除了更好的安全性之外,队列还通过为待处理消息提供储存来增强可靠性。 6.ApacheKafka
消息队列对于微服务架构来说是非常重要的,可用来处理微服务之间的通信以及微服务与外部源之间的通信,不管是密集型的数据处理还是API调用。ApacheKafka是一个具有高容错和弹性的分布式流处理平台。
AWS正式发布Kafka云服务,不用再为配置复杂心了
引用的说法: “ZMQ(以下ZeroMQ简称ZMQ)是一个简单好用的传输层,像框架一样的一个socket library,他使得Socket编程更加简单、简洁和性能更高。是一个消息处理队列库,可在多个线程、内核和主机盒之间弹性伸缩。ZMQ的明确目标是“成为标准网络协议栈的一部分,之后进入Linux内核”。现在还未看到它们的成功。但是,它无疑是极具前景的、并且是人们更加需要的“传统”BSD套接字之上的一 层封装。ZMQ让编写高性能网络应用程序极为简单和有趣。”AWS在re:Invent 20会上首先发布了托管Apache Kafka消息队列服务(Amazon Mad Streaming for Apache Kafka,MSK)的消息,现在已经从预览成为正式服务。
Apache Kafka是一个分布式的消息队列系统,其使用发布以及的架构,将产生的流数据的应用与利用流数据的角色分离。Apache Kafka让使用者可以捕捉如消息队列、交易、物联网5、支持的队列数等,或是应用与日志等流数据,还能实时进行分析,连续不间断地转换数据,并再将收到的数据经过处理后,分发到其他的数据湖和数据库中。
AWS提到,用户在生产环境中要配置Apache Kafka,需要克服一些障碍,特别是在后续的管理以及规模扩展工作上,而现在AWS正式推出的MSK服务,则由AWS负责管理任务,让用户可以简单地配置使用,而且由于近几个版本的Kafka,都需要与协调程序Zookeeper共同使用,因此MSK服务也只要简单地设定,就能让Kafka与ZooKeeper一同运行。
使用MSK服务,用户可以在几分钟内创建集群,并使用AWS身分管理与访问控制IAM管理集群作,也能通过ACM(AWS Certificate Mar)完全托管的TLS私密凭证颁发机构授权客户端,以TLS加密数据,并使用KMS(AWS Key Mament Serv)中的密钥加密其他数据。当发生故障时,MSK还会替换故障机器,自动执行修补,用户可以从Amazon CloudWatch中,服务的状态指标。
AWS表示,MSK与Kafka 1.1.1和2.1.0版本完全兼容,因此用户可以在AWS直接执行原本的Kafka应用以及工具,而不需要修改任何的代码,用户能使用开源工具MirrorMaker,将数据从现有的Kafka集群直接迁移到MSK上。
MSK的计价方式是以Kafka Broker以及配置存储每小时计价,MSK的数据传输费用与原本的AWS数据传输相同,而集群所使用的Zookeeper,还有区域集群的Broker和Zookeeper互传数据是不额外收费的。现在用户已经可以在大部分的AWS区域使用到MSK服务,包括北美、与欧洲。
杭州各开发区排名 杭州几个经济开发区

富阳在杭州排名第几 第8名。杭州拱墅区上升至榜首,与杭州西湖区、余杭区、高新开发区(滨江)、萧山区共同包揽榜单前5名;杭州富阳区、绍兴越城区进步显著,杭州富阳区排名上升22位至第···
足球丨罗布森·德·索萨·罗比尼奥参加比赛日

罗布森·德·索萨·罗比尼奥的参加比赛 比赛日期比赛性质代表球队对手球队主客场比分胜负状态出场时间进球助攻得牌详情2014-12-01巴甲桑托斯博塔弗戈2:0首发46-- 2014-11-24巴甲桑托斯圣保罗0:1首发···
浪淘沙山寺夜半闻钟 浪淘沙夜雨

大家好我是小蚪,浪淘沙山寺夜半闻钟,关于浪淘沙夜雨很多人还不知道,那么现在让我们一起来看看吧! 浪淘沙山寺夜半闻钟 浪淘沙夜雨 浪淘沙山寺夜半闻钟 浪淘沙夜雨 1、其三浪淘沙·北戴河···