在處理一個(gè)需要與 Confluent 平臺集成的大型項(xiàng)目時(shí),我遇到了一個(gè)棘手的問題:如何確保在 Avro 序列化格式下,模式演變不會影響下游消費(fèi)者。Confluent 開發(fā)的 Schema Registry 能夠通過驗(yàn)證模式演變來解決這個(gè)問題,但遺憾的是,Confluent 并沒有為 php 提供官方的 Avro SerDe 包。經(jīng)過一番研究,我找到了 flix-tech/avro-serde-php 這個(gè)庫,它不僅實(shí)現(xiàn)了 Confluent 的線格式,還集成了 FlixTech 的 Schema Registry 客戶端,徹底解決了我的問題。
可以通過以下地址學(xué)習(xí) composer:學(xué)習(xí)地址
使用 composer 安裝 flix-tech/avro-serde-php 非常簡單,只需運(yùn)行以下命令:
composer require 'flix-tech/avro-serde-php:^2.2'
接下來,我將介紹如何使用這個(gè)庫來解決 Avro 序列化和反序列化的問題。
首先,需要創(chuàng)建一個(gè)緩存的 Schema Registry 客戶端,以避免每次序列化或反序列化消息時(shí)都進(jìn)行 http 請求:
use FlixTechSchemaRegistryApiRegistryCacheAvroObjectCacheAdapter; use FlixTechSchemaRegistryApiRegistryCachedRegistry; use FlixTechSchemaRegistryApiRegistryPromisingRegistry; use GuzzleHttpClient; $schemaRegistryClient = new CachedRegistry( new PromisingRegistry( new Client(['base_uri' => 'registry.example.com']) ), new AvroObjectCacheAdapter() );
然后,構(gòu)建 RecordSerializer 實(shí)例,這是與庫交互的主要方式,它提供了 encodeRecord 和 decodeMessage 方法來執(zhí)行序列化和反序列化操作:
立即學(xué)習(xí)“PHP免費(fèi)學(xué)習(xí)筆記(深入)”;
use FlixTechAvroSerializerObjectsRecordSerializer; $recordSerializer = new RecordSerializer( $schemaRegistry, [ RecordSerializer::OPTION_REGISTER_MISSING_SCHEMAS => false, RecordSerializer::OPTION_REGISTER_MISSING_SUBJECTS => false, ] );
使用 RecordSerializer 可以輕松地編碼和解碼消息。例如,編碼記錄:
$subject = 'my-topic-value'; $avroSchema = AvroSchema::parse('{"type": "string"}'); $record = 'Test message'; $encodedBinaryAvro = $recordSerializer->encodeRecord($subject, $avroSchema, $record); // 發(fā)送到網(wǎng)絡(luò)...
解碼消息:
$record = $recordSerializer->decodeMessage($encodedBinaryAvro); echo $record; // 'Test message'
此外,flix-tech/avro-serde-php 還提供了多種 Schema Resolver 來管理 Avro 模式。例如,F(xiàn)ileResolver 可以從文件中加載模式,CallableResolver 可以使用自定義函數(shù)來獲取模式,而 DefinitionInterfaceResolver 則通過實(shí)現(xiàn) HasSchemaDefinitionInterface 接口來解析模式。
如果你使用 symfony Serializer 組件,這個(gè)庫還提供了相應(yīng)的集成:
use FlixTechAvroSerializerIntegrationsSymfonySerializerAvroSerDeEncoder; use FlixTechAvroSerializerObjectsDefaultRecordSerializerFactory; use SymfonyComponentSerializerNormalizerGetSetMethodNormalizer; use SymfonyComponentSerializerSerializer; $recordSerializer = DefaultRecordSerializerFactory::get( getenv('SCHEMA_REGISTRY_HOST') ); $normalizer = new GetSetMethodNormalizer(); $encoder = new AvroSerDeEncoder($recordSerializer); $symfonySerializer = new Serializer([$normalizer], [$encoder]);
最后,這個(gè)庫還提供了 Schema Builder 和 Schema Generator 功能,允許你使用 PHP 代碼定義和生成 Avro 模式。
使用 flix-tech/avro-serde-php 庫解決了我的 Avro 序列化和反序列化問題,使得我的項(xiàng)目能夠順利與 Confluent 平臺集成。它的靈活性和強(qiáng)大功能大大提高了開發(fā)效率,確保了模式演變的兼容性。我強(qiáng)烈推薦任何需要在 PHP 中處理 Avro 序列化的開發(fā)者使用這個(gè)庫。