Schema Promotion
Also known as Reader/Writer schemas, or Schema Resolution.
cavro
supports reading avro messages using a different schema definition that data was written with, provided the standard schema resolution rules are met.
resolution is done by calling Schema.reader_for_writer
on the reader's schema, which returns a special schema object that can be used for decoding (but not encoding) avro:
writer_schema = cavro.Schema({
'type': 'record',
'name': 'Record',
'fields': [
{'name': 'user_id', 'type': 'bytes'},
{'name': 'ip', 'type': {'type': 'fixed', 'size': 4, 'name': 'IP'}},
{'name': 'created', 'type': 'long'},
]
})
avro_data = writer_schema.binary_encode({'user_id': b'John', 'ip': b'\x7f\x00\x00\x01', 'created': 1234567890})
reader_schema = cavro.Schema({
'type': 'record',
'name': 'Record',
'fields': [
# Read string instead of bytes
{'name': 'user_id', 'type': 'string'},
# Union rather than single type
{'name': 'ip', 'type': [
# Rename type with aliases
{'type': 'fixed', 'size': 4, 'name': 'IPv4', 'aliases': ['IP']},
{'type': 'fixed', 'size': 16, 'name': 'IPv6'},
]},
# Convert long to double
{'name': 'created', 'type': 'double'},
# New field with default
{'name': 'deleted', 'type': 'boolean', 'default': False}
]
})
reader_for_writer = reader_schema.reader_for_writer(writer_schema)
print(reader_for_writer.binary_decode(avro_data))
<Record:Record {user_id: 'John' ip: b'\x7f\x00\x00\x01' created: 1234567890.0 deleted: False...}>
Trying to do the same on the reader schema directly will result in errors or corrupt data:
try:
reader_schema.binary_decode(avro_data)
except Exception as e:
print(e)
Value -64 is not valid for a union of 2 items
Object Container promotion
When reading object containers with cavro.ContainerReader, schema resolution is performed automatically based on the writer schema embedded in the container file.
Eager promotion
Unlike most other libraries, schema resolution is done up-front, before any avro data is read, allowing for efficient decode strategies.
This does mean that errors may be raised sooner than otherwise expected. There is an option defer_schema_promotion_errors
that will ignore promotion errors until a value is parsed.
incompatible_schema = cavro.Schema('"int"')
try:
incompatible_schema.reader_for_writer(writer_schema)
except Exception as e:
print(e)
Cannot promote int to {'name': 'Record', 'fields': [{'name': 'user_id', 'type': 'bytes'}, {'name': 'ip', 'type': {'name': 'IP', 'size': 4, 'type': 'fixed'}}, {'name': 'created', 'type': 'long'}], 'type': 'record'}
incompatible_schema = cavro.Schema('"int"', defer_schema_promotion_errors=True)
incompatible_reader = incompatible_schema.reader_for_writer(writer_schema)
print('We got a reader schema: ', incompatible_reader)
print('But when a value is read:')
try:
incompatible_reader.binary_decode(avro_data)
except Exception as e:
print(e)
We got a reader schema: <cavro.ResolvedSchema object at 0x164846d90>
But when a value is read:
Cannot promote int to {'name': 'Record', 'fields': [{'name': 'user_id', 'type': 'bytes'}, {'name': 'ip', 'type': {'name': 'IP', 'size': 4, 'type': 'fixed'}}, {'name': 'created', 'type': 'long'}], 'type': 'record'}