Menu
快讀
  • 旅遊
  • 生活
    • 美食
    • 寵物
    • 養生
    • 親子
  • 娛樂
    • 動漫
  • 時尚
  • 社會
  • 探索
  • 故事
  • 科技
  • 軍事
  • 国际
快讀

kafka的序列化和反序列化

2021 年 3 月 11 日 母婴放大镜

簡介

kafka內部發送和接收消息的時候,使用的是byte[]字節數組的方式(RPC底層也是用這種通訊格式)。但是我們在應用層其實可以使用更多的數據類型,比如int,short, long,String等,這歸功于kafka的序列化和反序列化機制。

基本原理分析

在之前的一篇文章springboot集成kafka示例中,我使用的是kafka原生的StringSerializer序列化方式,

spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer

源碼如下:

public class StringSerializer implements Serializer<String> { private String encoding = “UTF8”;

public StringSerializer() { }

public void configure(Map<String, ?> configs, boolean isKey) { String propertyName = isKey ? “key.serializer.encoding” : “value.serializer.encoding”; Object encodingValue = configs.get(propertyName); if (encodingValue == null) { encodingValue = configs.get(“serializer.encoding”); }

if (encodingValue instanceof String) { this.encoding = (String)encodingValue; }

}

public byte[] serialize(String topic, String data) { try { return data == null ? null : data.getBytes(this.encoding); } catch (UnsupportedEncodingException var4) { throw new SerializationException(“Error when serializing string to byte[] due to unsupported encoding ” + this.encoding); } }

public void close() { } }

其實很簡單,configure方法設置序列化(serialize方法)需要使用的編碼,如果沒有設置就使用UTF8格式。這個方法是在生成producer實例的時候被調用的。serialize方法使用的就是String的getBytes把String類型的消息轉化爲byte字節數組。

反序列呢?聰明如你應該能想到,使用new String就可以解決了。源碼如下,

1234567891011121314 @Override public String deserialize(String topic, byte[] data) { try { if (data == null) return null; else return new String(data, encoding); } catch (UnsupportedEncodingException e) { throw new SerializationException(“Error when deserializing byte[] to string due to unsupported encoding ” + encoding); } } ———————

是不是簡單到爆呢?

其它的內置序列化組件,像Double, Integer,Long這些原理都類似,就不一一分析了。

自定義序列化組件

有時候內置的組件不能滿足我們的需要。比如我有個自定義的對象要作爲kafka的消息進行收發(把對象轉化爲json字符串通過String的方式也是一種思路),希望能有一個針對我這個對象自定義的序列化和反序列化組件。

我們先定義一個消息對象,

@Data @ToString public class Person { private int id; private String name; private int age;

}

然後自定義自己的序列化和反序列化實現類,

@Slf4j public class PersonSerializer implements Serializer<Person> { private static Gson gson; static { gson = new GsonBuilder().create(); }

@Override public void configure(Map<String, ?> map, boolean b) { log.info(“自定義的序列化組件–configure”); }

@Override public byte[] serialize(String s, Person person) { log.info(“自定義的序列化組件–serialize”); return JSON.toJSONBytes(person); }

@Override public void close() { log.info(“自定義的序列化組件–close”); } }

@Slf4j public class PersonSerializer implements Serializer<Person> { private static Gson gson; static { gson = new GsonBuilder().create(); }

@Override public void configure(Map<String, ?> map, boolean b) { log.info(“自定義的序列化組件–configure”); }

@Override public byte[] serialize(String s, Person person) { log.info(“自定義的序列化組件–serialize”); return JSON.toJSONBytes(person); }

@Override public void close() { log.info(“自定義的序列化組件–close”); } }

代碼一看就明白,其實核心就是利用fastjson的toJSONBytes把對象轉化爲byte數組。

然後我們在配置裏指定使用我們自己的序列化和反序列化實現類,

spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.value-serializer=com.ponymaggie.github.kafka.serializer.PersonSerializer

spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer spring.kafka.consumer.value-deserializer=com.ponymaggie.github.kafka.serializer.PersonDeserializer

相關文章:

  • 人教精通版英語四年級上冊全冊知識點
  • 8.2 新加坡折扣合輯 |…
  • NBT:宏基因組二、三代混合組裝軟件OPERA-MS
  • 程序員:單個TCP(Socket)連接,發送多個文件
  • 新加坡"黑暗"餐廳--NOX Dine In The Dark
  • Pokémon Center Singapore推出3D伊布玩偶襪子!萌到炸裂,皇後表示立馬來一打!
時尚

發佈留言 取消回覆

發佈留言必須填寫的電子郵件地址不會公開。 必填欄位標示為 *

©2025 快讀 | 服務協議 | DMCA | 聯繫我們