網(wǎng)上有很多關(guān)于pos機(jī)t0和t1消費(fèi),Kafka分區(qū)與消費(fèi)者的關(guān)系的知識,也有很多人為大家解答關(guān)于pos機(jī)t0和t1消費(fèi)的問題,今天pos機(jī)之家(m.51zrwd.com)為大家整理了關(guān)于這方面的知識,讓我們一起來看下吧!
本文目錄一覽:
pos機(jī)t0和t1消費(fèi)
1. 前言
我們知道,生產(chǎn)者發(fā)送消息到主題,消費(fèi)者訂閱主題(以消費(fèi)者組的名義訂閱),而主題下是分區(qū),消息是存儲在分區(qū)中的,所以事實(shí)上生產(chǎn)者發(fā)送消息到分區(qū),消費(fèi)者則從分區(qū)讀取消息,那么,這里問題來了,生產(chǎn)者將消息投遞到哪個(gè)分區(qū)?消費(fèi)者組中的消費(fèi)者實(shí)例之間是怎么分配分區(qū)的呢?接下來,就圍繞著這兩個(gè)問題一探究竟。
2. 主題的分區(qū)數(shù)設(shè)置
如果想學(xué)習(xí)Java工程化、高性能及分布式、深入淺出。微服務(wù)、Spring,MyBatis,Netty源碼分析的朋友可以加我的Java高級交流:854630135,群里有阿里大牛直播講解技術(shù),以及Java大型互聯(lián)網(wǎng)技術(shù)的視頻免費(fèi)分享給大家。
在server.properties配置文件中可以指定一個(gè)全局的分區(qū)數(shù)設(shè)置,這是對每個(gè)主題下的分區(qū)數(shù)的默認(rèn)設(shè)置,默認(rèn)是1。
當(dāng)然每個(gè)主題也可以自己設(shè)置分區(qū)數(shù)量,如果創(chuàng)建主題的時(shí)候沒有指定分區(qū)數(shù)量,則會使用server.properties中的設(shè)置。
bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic my-topic --partitions 2 --replication-factor 1
在創(chuàng)建主題的時(shí)候,可以使用--partitions選項(xiàng)指定主題的分區(qū)數(shù)量
[root@localhost kafka_2.11-2.0.0]# bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic abcTopic:abc PartitionCount:2 ReplicationFactor:1 Configs: Topic: abc Partition: 0 Leader: 0 Replicas: 0 Isr: 0 Topic: abc Partition: 1 Leader: 0 Replicas: 0 Isr: 0
3. 生產(chǎn)者與分區(qū)
首先提出一個(gè)問題:生產(chǎn)者將消息投遞到分區(qū)有沒有規(guī)律?如果有,那么它是如何決定一條消息該投遞到哪個(gè)分區(qū)的呢?
3.1. 默認(rèn)的分區(qū)策略
The default partitioning strategy:
If a partition is specified in the record, use itIf no partition is specified but a key is present choose a partition based on a hash of the keyIf no partition or key is present choose a partition in a round-robin fashionorg.apache.kafka.clients.producer.internals.DefaultPartitioner
默認(rèn)的分區(qū)策略是:
如果在發(fā)消息的時(shí)候指定了分區(qū),則消息投遞到指定的分區(qū)如果沒有指定分區(qū),但是消息的key不為空,則基于key的哈希值來選擇一個(gè)分區(qū)如果既沒有指定分區(qū),且消息的key也是空,則用輪詢的方式選擇一個(gè)分區(qū)如果想學(xué)習(xí)Java工程化、高性能及分布式、深入淺出。微服務(wù)、Spring,MyBatis,Netty源碼分析的朋友可以加我的Java高級交流:854630135,群里有阿里大牛直播講解技術(shù),以及Java大型互聯(lián)網(wǎng)技術(shù)的視頻免費(fèi)分享給大家。
/** * Compute the partition for the given record. * * @param topic The topic name * @param key The key to partition on (or null if no key) * @param keyBytes serialized key to partition on (or null if no key) * @param value The value to partition on or null * @param valueBytes serialized value to partition on or null * @param cluster The current cluster metadata */public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { List<PartitionInfo> partitions = cluster.partitionsForTopic(topic); int numPartitions = partitions.size(); if (keyBytes == null) { int nextValue = nextValue(topic); List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic); if (availablePartitions.size() > 0) { int part = Utils.toPositive(nextValue) % availablePartitions.size(); return availablePartitions.get(part).partition(); } else { // no partitions are available, give a non-available partition return Utils.toPositive(nextValue) % numPartitions; } } else { // hash the keyBytes to choose a partition return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions; }}
通過源代碼可以更加作證這一點(diǎn)
4. 分區(qū)與消費(fèi)者
消費(fèi)者以組的名義訂閱主題,主題有多個(gè)分區(qū),消費(fèi)者組中有多個(gè)消費(fèi)者實(shí)例,那么消費(fèi)者實(shí)例和分區(qū)之前的對應(yīng)關(guān)系是怎樣的呢?
換句話說,就是組中的每一個(gè)消費(fèi)者負(fù)責(zé)那些分區(qū),這個(gè)分配關(guān)系是如何確定的呢?
同一時(shí)刻,一條消息只能被組中的一個(gè)消費(fèi)者實(shí)例消費(fèi)
消費(fèi)者組訂閱這個(gè)主題,意味著主題下的所有分區(qū)都會被組中的消費(fèi)者消費(fèi)到,如果按照從屬關(guān)系來說的話就是,主題下的每個(gè)分區(qū)只從屬于組中的一個(gè)消費(fèi)者,不可能出現(xiàn)組中的兩個(gè)消費(fèi)者負(fù)責(zé)同一個(gè)分區(qū)。
那么,問題來了。如果分區(qū)數(shù)大于或者等于組中的消費(fèi)者實(shí)例數(shù),那自然沒有什么問題,無非一個(gè)消費(fèi)者會負(fù)責(zé)多個(gè)分區(qū),(PS:當(dāng)然,最理想的情況是二者數(shù)量相等,這樣就相當(dāng)于一個(gè)消費(fèi)者負(fù)責(zé)一個(gè)分區(qū));但是,如果消費(fèi)者實(shí)例的數(shù)量大于分區(qū)數(shù),那么按照默認(rèn)的策略(PS:之所以強(qiáng)調(diào)默認(rèn)策略是因?yàn)槟阋部梢宰远x策略),有一些消費(fèi)者是多余的,一直接不到消息而處于空閑狀態(tài)。
話又說回來,假設(shè)多個(gè)消費(fèi)者負(fù)責(zé)同一個(gè)分區(qū),那么會有什么問題呢?
我們知道,Kafka它在設(shè)計(jì)的時(shí)候就是要保證分區(qū)下消息的順序,也就是說消息在一個(gè)分區(qū)中的順序是怎樣的,那么消費(fèi)者在消費(fèi)的時(shí)候看到的就是什么樣的順序,那么要做到這一點(diǎn)就首先要保證消息是由消費(fèi)者主動拉取的(pull),其次還要保證一個(gè)分區(qū)只能由一個(gè)消費(fèi)者負(fù)責(zé)。倘若,兩個(gè)消費(fèi)者負(fù)責(zé)同一個(gè)分區(qū),那么就意味著兩個(gè)消費(fèi)者同時(shí)讀取分區(qū)的消息,由于消費(fèi)者自己可以控制讀取消息的offset,就有可能C1才讀到2,而C1讀到1,C1還沒處理完,C2已經(jīng)讀到3了,則會造成很多浪費(fèi),因?yàn)檫@就相當(dāng)于多線程讀取同一個(gè)消息,會造成消息處理的重復(fù),且不能保證消息的順序,這就跟主動推送(push)無異。
4.1. 消費(fèi)者分區(qū)分配策略
如果想學(xué)習(xí)Java工程化、高性能及分布式、深入淺出。微服務(wù)、Spring,MyBatis,Netty源碼分析的朋友可以加我的Java高級交流:854630135,群里有阿里大牛直播講解技術(shù),以及Java大型互聯(lián)網(wǎng)技術(shù)的視頻免費(fèi)分享給大家。
org.apache.kafka.clients.consumer.internals.AbstractPartitionAssignor
如果是自定義分配策略的話可以繼承AbstractPartitionAssignor這個(gè)類,它默認(rèn)有3個(gè)實(shí)現(xiàn)
4.1.1. range
range策略對應(yīng)的實(shí)現(xiàn)類是org.apache.kafka.clients.consumer.RangeAssignor
這是默認(rèn)的分配策略
可以通過消費(fèi)者配置中partition.assignment.strategy參數(shù)來指定分配策略,它的值是類的全路徑,是一個(gè)數(shù)組
/** * The range assignor works on a per-topic basis. For each topic, we lay out the available partitions in numeric order * and the consumers in lexicographic order. We then divide the number of partitions by the total number of * consumers to determine the number of partitions to assign to each consumer. If it does not evenly * divide, then the first few consumers will have one extra partition. * * For example, suppose there are two consumers C0 and C1, two topics t0 and t1, and each topic has 3 partitions, * resulting in partitions t0p0, t0p1, t0p2, t1p0, t1p1, and t1p2. * * The assignment will be: * C0: [t0p0, t0p1, t1p0, t1p1] * C1: [t0p2, t1p2] */
range策略是基于每個(gè)主題的
對于每個(gè)主題,我們以數(shù)字順序排列可用分區(qū),以字典順序排列消費(fèi)者。然后,將分區(qū)數(shù)量除以消費(fèi)者總數(shù),以確定分配給每個(gè)消費(fèi)者的分區(qū)數(shù)量。如果沒有平均劃分(PS:除不盡),那么最初的幾個(gè)消費(fèi)者將有一個(gè)額外的分區(qū)。
簡而言之,就是,
1、range分配策略針對的是主題(PS:也就是說,這里所說的分區(qū)指的某個(gè)主題的分區(qū),消費(fèi)者值的是訂閱這個(gè)主題的消費(fèi)者組中的消費(fèi)者實(shí)例)
2、首先,將分區(qū)按數(shù)字順序排行序,消費(fèi)者按消費(fèi)者名稱的字典序排好序
3、然后,用分區(qū)總數(shù)除以消費(fèi)者總數(shù)。如果能夠除盡,則皆大歡喜,平均分配;若除不盡,則位于排序前面的消費(fèi)者將多負(fù)責(zé)一個(gè)分區(qū)
例如,假設(shè)有兩個(gè)消費(fèi)者C0和C1,兩個(gè)主題t0和t1,并且每個(gè)主題有3個(gè)分區(qū),分區(qū)的情況是這樣的:t0p0,t0p1,t0p2,t1p0,t1p1,t1p2
那么,基于以上信息,最終消費(fèi)者分配分區(qū)的情況是這樣的:
C0: [t0p0, t0p1, t1p0, t1p1]
C1: [t0p2, t1p2]
為什么是這樣的結(jié)果呢?
因?yàn)椋瑢τ谥黝}t0,分配的結(jié)果是C0負(fù)責(zé)P0和P1,C1負(fù)責(zé)P2;對于主題t2,也是如此,綜合起來就是這個(gè)結(jié)果
上面的過程用圖形表示的話大概是這樣的:
閱讀代碼,更有助于理解:
如果想學(xué)習(xí)Java工程化、高性能及分布式、深入淺出。微服務(wù)、Spring,MyBatis,Netty源碼分析的朋友可以加我的Java高級交流:854630135,群里有阿里大牛直播講解技術(shù),以及Java大型互聯(lián)網(wǎng)技術(shù)的視頻免費(fèi)分享給大家。
public Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic, Map<String, Subscription> subscriptions) { // 主題與消費(fèi)者的映射 Map<String, List<String>> consumersPerTopic = consumersPerTopic(subscriptions); Map<String, List<TopicPartition>> assignment = new HashMap<>(); for (String memberId : subscriptions.keySet()) assignment.put(memberId, new ArrayList<TopicPartition>()); for (Map.Entry<String, List<String>> topicEntry : consumersPerTopic.entrySet()) { String topic = topicEntry.getKey(); // 主題 List<String> consumersForTopic = topicEntry.getValue(); // 消費(fèi)者列表 // partitionsPerTopic表示主題和分區(qū)數(shù)的映射 // 獲取主題下有多少個(gè)分區(qū) Integer numPartitionsForTopic = partitionsPerTopic.get(topic); if (numPartitionsForTopic == null) continue; // 消費(fèi)者按字典序排序 Collections.sort(consumersForTopic); // 分區(qū)數(shù)量除以消費(fèi)者數(shù)量 int numPartitionsPerConsumer = numPartitionsForTopic / consumersForTopic.size(); // 取模,余數(shù)就是額外的分區(qū) int consumersWithExtraPartition = numPartitionsForTopic % consumersForTopic.size(); List<TopicPartition> partitions = AbstractPartitionAssignor.partitions(topic, numPartitionsForTopic); for (int i = 0, n = consumersForTopic.size(); i < n; i++) { int start = numPartitionsPerConsumer * i + Math.min(i, consumersWithExtraPartition); int length = numPartitionsPerConsumer + (i + 1 > consumersWithExtraPartition ? 0 : 1); // 分配分區(qū) assignment.get(consumersForTopic.get(i)).addAll(partitions.subList(start, start + length)); } } return assignment;}
4.1.2. roundrobin(輪詢)
roundronbin分配策略的具體實(shí)現(xiàn)是org.apache.kafka.clients.consumer.RoundRobinAssignor
/** * The round robin assignor lays out all the available partitions and all the available consumers. It * then proceeds to do a round robin assignment from partition to consumer. If the subscriptions of all consumer * instances are identical, then the partitions will be uniformly distributed. (i.e., the partition ownership counts * will be within a delta of exactly one across all consumers.) * * For example, suppose there are two consumers C0 and C1, two topics t0 and t1, and each topic has 3 partitions, * resulting in partitions t0p0, t0p1, t0p2, t1p0, t1p1, and t1p2. * * The assignment will be: * C0: [t0p0, t0p2, t1p1] * C1: [t0p1, t1p0, t1p2] * * When subscriptions differ across consumer instances, the assignment process still considers each * consumer instance in round robin fashion but skips over an instance if it is not subscribed to * the topic. Unlike the case when subscriptions are identical, this can result in imbalanced * assignments. For example, we have three consumers C0, C1, C2, and three topics t0, t1, t2, * with 1, 2, and 3 partitions, respectively. Therefore, the partitions are t0p0, t1p0, t1p1, t2p0, * t2p1, t2p2. C0 is subscribed to t0; C1 is subscribed to t0, t1; and C2 is subscribed to t0, t1, t2. * * Tha assignment will be: * C0: [t0p0] * C1: [t1p0] * C2: [t1p1, t2p0, t2p1, t2p2] */
輪詢分配策略是基于所有可用的消費(fèi)者和所有可用的分區(qū)的
與前面的range策略最大的不同就是它不再局限于某個(gè)主題
如果所有的消費(fèi)者實(shí)例的訂閱都是相同的,那么這樣最好了,可用統(tǒng)一分配,均衡分配
例如,假設(shè)有兩個(gè)消費(fèi)者C0和C1,兩個(gè)主題t0和t1,每個(gè)主題有3個(gè)分區(qū),分別是t0p0,t0p1,t0p2,t1p0,t1p1,t1p2
那么,最終分配的結(jié)果是這樣的:
C0: [t0p0, t0p2, t1p1]
C1: [t0p1, t1p0, t1p2]
用圖形表示大概是這樣的:
假設(shè),組中每個(gè)消費(fèi)者訂閱的主題不一樣,分配過程仍然以輪詢的方式考慮每個(gè)消費(fèi)者實(shí)例,但是如果沒有訂閱主題,則跳過實(shí)例。當(dāng)然,這樣的話分配肯定不均衡。
什么意思呢?也就是說,消費(fèi)者組是一個(gè)邏輯概念,同組意味著同一時(shí)刻分區(qū)只能被一個(gè)消費(fèi)者實(shí)例消費(fèi),換句話說,同組意味著一個(gè)分區(qū)只能分配給組中的一個(gè)消費(fèi)者。事實(shí)上,同組也可以不同訂閱,這就是說雖然屬于同一個(gè)組,但是它們訂閱的主題可以是不一樣的。
例如,假設(shè)有3個(gè)主題t0,t1,t2;其中,t0有1個(gè)分區(qū)p0,t1有2個(gè)分區(qū)p0和p1,t2有3個(gè)分區(qū)p0,p1和p2;有3個(gè)消費(fèi)者C0,C1和C2;C0訂閱t0,C1訂閱t0和t1,C2訂閱t0,t1和t2。那么,按照輪詢分配的話,C0應(yīng)該負(fù)責(zé)
首先,肯定是輪詢的方式,其次,比如說有主題t0,t1,t2,它們分別有1,2,3個(gè)分區(qū),也就是t0有1個(gè)分區(qū),t1有2個(gè)分區(qū),t2有3個(gè)分區(qū);有3個(gè)消費(fèi)者分別從屬于3個(gè)組,C0訂閱t0,C1訂閱t0和t1,C2訂閱t0,t1,t2;那么,按照輪詢分配的話,C0應(yīng)該負(fù)責(zé)t0p0,C1應(yīng)該負(fù)責(zé)t1p0,其余均由C2負(fù)責(zé)。
上述過程用圖形表示大概是這樣的:
為什么最后的結(jié)果是
C0: [t0p0]
C1: [t1p0]
C2: [t1p1, t2p0, t2p1, t2p2]
這是因?yàn)椋凑蛰喸僼0p1由C0負(fù)責(zé),t1p0由C1負(fù)責(zé),由于同組,C2只能負(fù)責(zé)t1p1,由于只有C2訂閱了t2,所以t2所有分區(qū)由C2負(fù)責(zé),綜合起來就是這個(gè)結(jié)果
細(xì)想一下可以發(fā)現(xiàn),這種情況下跟range分配的結(jié)果是一樣的
5. 測試代碼
<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.cjs.example</groupId> <artifactId>kafka-demo</artifactId> <version>0.0.1-SNAPSHOT</version> <packaging>jar</packaging> <name>kafka-demo</name> <description></description> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.0.5.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build></project>
package com.cjs.kafka.producer;import org.apache.kafka.clients.producer.*;import java.util.Properties;public class HelloProducer { public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "192.168.1.133:9092"); props.put("acks", "all"); props.put("retries", 0); props.put("batch.size", 16384); props.put("linger.ms", 1); props.put("buffer.memory", 33554432); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); Producer<String, String> producer = new KafkaProducer<String, String>(props); for (int i = 0; i < 100; i++) { producer.send(new ProducerRecord<String, String>("abc", Integer.toString(i), Integer.toString(i)), new Callback() { @Override public void onCompletion(RecordMetadata recordMetadata, Exception e) { if (null != e) { e.printStackTrace(); }else { System.out.println("callback: " + recordMetadata.topic() + " " + recordMetadata.partition() + " " + recordMetadata.offset()); } } }); } producer.close(); }}
如果想學(xué)習(xí)Java工程化、高性能及分布式、深入淺出。微服務(wù)、Spring,MyBatis,Netty源碼分析的朋友可以加我的Java高級交流:854630135,群里有阿里大牛直播講解技術(shù),以及Java大型互聯(lián)網(wǎng)技術(shù)的視頻免費(fèi)分享給大家。
package com.cjs.kafka.consumer;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.apache.kafka.clients.consumer.ConsumerRecords;import org.apache.kafka.clients.consumer.KafkaConsumer;import java.util.Arrays;import java.util.Properties;public class HelloConsumer { public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "192.168.1.133:9092"); props.put("group.id", "test"); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000");// props.put("partition.assignment.strategy", "org.apache.kafka.clients.consumer.RoundRobinAssignor"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props); consumer.subscribe(Arrays.asList("foo", "bar", "abc")); while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) { System.out.printf("partition = %s, offset = %d, key = %s, value = %s%n", record.partition(), record.offset(), record.key(), record.value()); } } }}
以上就是關(guān)于pos機(jī)t0和t1消費(fèi),Kafka分區(qū)與消費(fèi)者的關(guān)系的知識,后面我們會繼續(xù)為大家整理關(guān)于pos機(jī)t0和t1消費(fèi)的知識,希望能夠幫助到大家!









