簡介
kafka內部發送和接收消息的時候,使用的是byte[]字節數組的方式(RPC底層也是用這種通訊格式)。但是我們在應用層其實可以使用更多的數據類型,比如int,short, long,String等,這歸功于kafka的序列化和反序列化機制。
基本原理分析
在之前的一篇文章springboot集成kafka示例中,我使用的是kafka原生的StringSerializer序列化方式,
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
源碼如下:
public class StringSerializer implements Serializer<String> { private String encoding = “UTF8”;
public StringSerializer() { }
public void configure(Map<String, ?> configs, boolean isKey) { String propertyName = isKey ? “key.serializer.encoding” : “value.serializer.encoding”; Object encodingValue = configs.get(propertyName); if (encodingValue == null) { encodingValue = configs.get(“serializer.encoding”); }
if (encodingValue instanceof String) { this.encoding = (String)encodingValue; }
}
public byte[] serialize(String topic, String data) { try { return data == null ? null : data.getBytes(this.encoding); } catch (UnsupportedEncodingException var4) { throw new SerializationException(“Error when serializing string to byte[] due to unsupported encoding ” + this.encoding); } }
public void close() { } }
其實很簡單,configure方法設置序列化(serialize方法)需要使用的編碼,如果沒有設置就使用UTF8格式。這個方法是在生成producer實例的時候被調用的。serialize方法使用的就是String的getBytes把String類型的消息轉化爲byte字節數組。
反序列呢?聰明如你應該能想到,使用new String就可以解決了。源碼如下,
1234567891011121314 @Override public String deserialize(String topic, byte[] data) { try { if (data == null) return null; else return new String(data, encoding); } catch (UnsupportedEncodingException e) { throw new SerializationException(“Error when deserializing byte[] to string due to unsupported encoding ” + encoding); } } ———————
是不是簡單到爆呢?
其它的內置序列化組件,像Double, Integer,Long這些原理都類似,就不一一分析了。
自定義序列化組件
有時候內置的組件不能滿足我們的需要。比如我有個自定義的對象要作爲kafka的消息進行收發(把對象轉化爲json字符串通過String的方式也是一種思路),希望能有一個針對我這個對象自定義的序列化和反序列化組件。
我們先定義一個消息對象,
@Data @ToString public class Person { private int id; private String name; private int age;
}
然後自定義自己的序列化和反序列化實現類,
@Slf4j public class PersonSerializer implements Serializer<Person> { private static Gson gson; static { gson = new GsonBuilder().create(); }
@Override public void configure(Map<String, ?> map, boolean b) { log.info(“自定義的序列化組件–configure”); }
@Override public byte[] serialize(String s, Person person) { log.info(“自定義的序列化組件–serialize”); return JSON.toJSONBytes(person); }
@Override public void close() { log.info(“自定義的序列化組件–close”); } }
@Slf4j public class PersonSerializer implements Serializer<Person> { private static Gson gson; static { gson = new GsonBuilder().create(); }
@Override public void configure(Map<String, ?> map, boolean b) { log.info(“自定義的序列化組件–configure”); }
@Override public byte[] serialize(String s, Person person) { log.info(“自定義的序列化組件–serialize”); return JSON.toJSONBytes(person); }
@Override public void close() { log.info(“自定義的序列化組件–close”); } }
代碼一看就明白,其實核心就是利用fastjson的toJSONBytes把對象轉化爲byte數組。
然後我們在配置裏指定使用我們自己的序列化和反序列化實現類,
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.value-serializer=com.ponymaggie.github.kafka.serializer.PersonSerializer
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer spring.kafka.consumer.value-deserializer=com.ponymaggie.github.kafka.serializer.PersonDeserializer