Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 70 additions & 1 deletion lang/java/avro/src/main/java/org/apache/avro/Resolver.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;

import org.apache.avro.generic.GenericData;
import org.apache.avro.Schema.Field;
Expand Down Expand Up @@ -110,6 +111,9 @@ private static Action resolve(Schema w, Schema r, GenericData d, Map<SeenPair, A
return EnumAdjust.resolve(w, r, d);

case RECORD:
if (w.hasChild()) {
return WriterExtends.resolve(w, r, d, seen);
}
return RecordAdjust.resolve(w, r, d, seen);

default:
Expand All @@ -133,7 +137,8 @@ private static Action resolve(Schema w, Schema r, GenericData d, Map<SeenPair, A
public static abstract class Action {
/** Helps us traverse faster. */
public enum Type {
DO_NOTHING, ERROR, PROMOTE, CONTAINER, ENUM, SKIP, RECORD, WRITER_UNION, READER_UNION
DO_NOTHING, ERROR, PROMOTE, CONTAINER, ENUM, SKIP, RECORD, WRITER_UNION, READER_UNION, WRITER_EXTENDS,
READER_EXTENDS
}

public final Schema writer, reader;
Expand Down Expand Up @@ -797,4 +802,68 @@ private static boolean unionEquiv(Schema write, Schema read, Map<SeenPair, Boole
throw new IllegalArgumentException("Unknown schema type: " + write.getType());
}
}

public static class WriterExtends extends Action {
public final Action[] actions;

private WriterExtends(Schema w, Schema r, GenericData d, Action[] a) {
super(w, r, d, Type.WRITER_EXTENDS);
actions = a;
}

public static Action resolve(Schema writeSchema, Schema readSchema, GenericData data, Map<SeenPair, Action> seen) {

final Action[] actions = writeSchema.visitHierarchy()
.map((Schema schema) -> RecordAdjust.resolve(schema, schema, data, seen)).toArray(Action[]::new);
return new WriterExtends(writeSchema, readSchema, data, actions);
}
}

/**
* In this case, the reader is a union and the writer is not. For this case, we
* need to pick the first branch of the reader that matches the writer and
* pretend to the reader that the index of this branch was found in the writer's
* data stream.
*
* To support this case, the {@link ReaderUnion} object has two (public) fields:
* <tt>firstMatch</tt> gives the index of the first matching branch in the
* reader's schema, and <tt>actualResolution</tt> is the {@link Action} that
* resolves the writer's schema with the schema found in the <tt>firstMatch</tt>
* branch of the reader's schema.
*/
public static class ReaderExtends extends Action {
public final Schema firstMatch;
public final Action actualAction;

public ReaderExtends(Schema w, Schema r, GenericData d, Schema firstMatch, Action actual) {
super(w, r, d, Action.Type.READER_EXTENDS);
this.firstMatch = firstMatch;
this.actualAction = actual;
}

/**
* Returns a {@link ReaderUnion} action for resolving <tt>w</tt> and <tt>r</tt>,
* or an {@link ErrorAction} if there is no branch in the reader that matches
* the writer.
*
* @throws RuntimeException if <tt>r</tt> is not a union schema or <tt>w</tt>
* <em>is</em> a union schema
*/
public static Action resolve(Schema w, Schema r, GenericData d, Map<SeenPair, Action> seen) {
if (w.getType() == Schema.Type.RECORD) {
throw new IllegalArgumentException("Writer schema is not Record.");
}
Schema schema = firstMatchingChild(w, r);
if (schema != null) {
return new ReaderExtends(w, r, d, schema, Resolver.resolve(w, schema, d, seen));
}
return new ErrorAction(w, r, d, ErrorType.NO_MATCHING_BRANCH);
}

private static Schema firstMatchingChild(Schema w, Schema r) {
return r.visitHierarchy().filter((Schema rs) -> Objects.equals(rs.getFullName(), w.getFullName())).findFirst()
.orElse(null);
}
}

}
Loading