Skip to main content

Schema Promotion

Also known as Reader/Writer schemas, or Schema Resolution.

cavro supports reading avro messages using a different schema definition that data was written with, provided the standard schema resolution rules are met.

resolution is done by calling Schema.reader_for_writer on the reader's schema, which returns a special schema object that can be used for decoding (but not encoding) avro:

writer_schema = cavro.Schema({
'type': 'record',
'name': 'Record',
'fields': [
{'name': 'user_id', 'type': 'bytes'},
{'name': 'ip', 'type': {'type': 'fixed', 'size': 4, 'name': 'IP'}},
{'name': 'created', 'type': 'long'},
]
})
avro_data = writer_schema.binary_encode({'user_id': b'John', 'ip': b'\x7f\x00\x00\x01', 'created': 1234567890})

reader_schema = cavro.Schema({
'type': 'record',
'name': 'Record',
'fields': [
# Read string instead of bytes
{'name': 'user_id', 'type': 'string'},
# Union rather than single type
{'name': 'ip', 'type': [
# Rename type with aliases
{'type': 'fixed', 'size': 4, 'name': 'IPv4', 'aliases': ['IP']},
{'type': 'fixed', 'size': 16, 'name': 'IPv6'},
]},
# Convert long to double
{'name': 'created', 'type': 'double'},
# New field with default
{'name': 'deleted', 'type': 'boolean', 'default': False}
]
})

reader_for_writer = reader_schema.reader_for_writer(writer_schema)

print(reader_for_writer.binary_decode(avro_data))
<Record:Record {user_id: 'John' ip: b'\x7f\x00\x00\x01' created: 1234567890.0 deleted: False...}>

Trying to do the same on the reader schema directly will result in errors or corrupt data:

try:
reader_schema.binary_decode(avro_data)
except Exception as e:
print(e)
Value -64 is not valid for a union of 2 items

Object Container promotion

When reading object containers with cavro.ContainerReader, schema resolution is performed automatically based on the writer schema embedded in the container file.

Eager promotion

Unlike most other libraries, schema resolution is done up-front, before any avro data is read, allowing for efficient decode strategies.

This does mean that errors may be raised sooner than otherwise expected. There is an option defer_schema_promotion_errors that will ignore promotion errors until a value is parsed.

incompatible_schema = cavro.Schema('"int"')

try:
incompatible_schema.reader_for_writer(writer_schema)
except Exception as e:
print(e)
Cannot promote int to {'name': 'Record', 'fields': [{'name': 'user_id', 'type': 'bytes'}, {'name': 'ip', 'type': {'name': 'IP', 'size': 4, 'type': 'fixed'}}, {'name': 'created', 'type': 'long'}], 'type': 'record'}
incompatible_schema = cavro.Schema('"int"', defer_schema_promotion_errors=True)
incompatible_reader = incompatible_schema.reader_for_writer(writer_schema)

print('We got a reader schema: ', incompatible_reader)
print('But when a value is read:')
try:
incompatible_reader.binary_decode(avro_data)
except Exception as e:
print(e)
We got a reader schema:  <cavro.ResolvedSchema object at 0x164846d90>
But when a value is read:
Cannot promote int to {'name': 'Record', 'fields': [{'name': 'user_id', 'type': 'bytes'}, {'name': 'ip', 'type': {'name': 'IP', 'size': 4, 'type': 'fixed'}}, {'name': 'created', 'type': 'long'}], 'type': 'record'}