1010"""Ingest some Avro records, and report how long it takes"""
1111
1212import argparse
13+ import json
1314import os
1415import time
1516from typing import IO , NamedTuple
@@ -87,21 +88,14 @@ def main() -> None:
8788 type = int ,
8889 help = "Number of Avro records to generate" ,
8990 )
90- parser .add_argument (
91- "-d" ,
92- "--distribution" ,
93- default = "benchmark" ,
94- type = str ,
95- help = "Distribution to use in kafka-avro-generator" ,
96- )
9791 args = parser .parse_args ()
9892
9993 os .chdir (ROOT )
10094 repo = mzbuild .Repository (ROOT )
10195
10296 wait_for_confluent (args .confluent_host )
10397
104- images = ["kafka-avro-generator " , "materialized" ]
98+ images = ["kgen " , "materialized" ]
10599 deps = repo .resolve_dependencies ([repo .images [name ] for name in images ])
106100 deps .acquire ()
107101
@@ -114,18 +108,18 @@ def main() -> None:
114108 )
115109
116110 docker_client .containers .run (
117- deps ["kafka-avro-generator " ].spec (),
111+ deps ["kgen " ].spec (),
118112 [
119- "-n " ,
120- str ( args .records ) ,
121- "-b " ,
122- f" { args . confluent_host } :9092 " ,
123- "-r " ,
124- f"http:// { args . confluent_host } :8081 " ,
125- "-t " ,
126- "bench_data " ,
127- "-d " ,
128- args . distribution ,
113+ f"--num-records= { args . records } " ,
114+ f"--bootstrap-server= { args .confluent_host } :9092" ,
115+ f"--schema-registry-url=http:// { args . confluent_host } :8081 " ,
116+ "--topic=bench_data " ,
117+ "--keys=avro " ,
118+ "--values=avro " ,
119+ f"--avro-schema= { VALUE_SCHEMA } " ,
120+ f"--avro-distribution= { VALUE_DISTRIBUTION } " ,
121+ f"--avro-key-schema= { KEY_SCHEMA } " ,
122+ f"--avro-key- distribution= { KEY_DISTRIBUTION } " ,
129123 ],
130124 network_mode = "host" ,
131125 )
@@ -158,5 +152,112 @@ def main() -> None:
158152 prev = print_stats (mz_container , prev , results_file )
159153
160154
155+ KEY_SCHEMA = json .dumps (
156+ {
157+ "name" : "testrecordkey" ,
158+ "type" : "record" ,
159+ "namespace" : "com.acme.avro" ,
160+ "fields" : [{"name" : "Key1" , "type" : "long" }, {"name" : "Key2" , "type" : "long" }],
161+ }
162+ )
163+
164+ KEY_DISTRIBUTION = json .dumps (
165+ {
166+ "com.acme.avro.testrecordkey::Key1" : [0 , 100 ],
167+ "com.acme.avro.testrecordkey::Key2" : [0 , 250000 ],
168+ }
169+ )
170+
171+ VALUE_SCHEMA = json .dumps (
172+ {
173+ "name" : "testrecord" ,
174+ "type" : "record" ,
175+ "namespace" : "com.acme.avro" ,
176+ "fields" : [
177+ {"name" : "Key1Unused" , "type" : "long" },
178+ {"name" : "Key2Unused" , "type" : "long" },
179+ {
180+ "name" : "OuterRecord" ,
181+ "type" : {
182+ "name" : "OuterRecord" ,
183+ "type" : "record" ,
184+ "fields" : [
185+ {
186+ "name" : "Record1" ,
187+ "type" : {
188+ "name" : "Record1" ,
189+ "type" : "record" ,
190+ "fields" : [
191+ {
192+ "name" : "InnerRecord1" ,
193+ "type" : {
194+ "name" : "InnerRecord1" ,
195+ "type" : "record" ,
196+ "fields" : [
197+ {"name" : "Point" , "type" : "long" }
198+ ],
199+ },
200+ },
201+ {
202+ "name" : "InnerRecord2" ,
203+ "type" : {
204+ "name" : "InnerRecord2" ,
205+ "type" : "record" ,
206+ "fields" : [
207+ {"name" : "Point" , "type" : "long" }
208+ ],
209+ },
210+ },
211+ ],
212+ },
213+ },
214+ {
215+ "name" : "Record2" ,
216+ "type" : {
217+ "name" : "Record2" ,
218+ "type" : "record" ,
219+ "fields" : [
220+ {
221+ "name" : "InnerRecord3" ,
222+ "type" : {
223+ "name" : "InnerRecord3" ,
224+ "type" : "record" ,
225+ "fields" : [
226+ {"name" : "Point" , "type" : "long" }
227+ ],
228+ },
229+ },
230+ {
231+ "name" : "InnerRecord4" ,
232+ "type" : {
233+ "name" : "InnerRecord4" ,
234+ "type" : "record" ,
235+ "fields" : [
236+ {"name" : "Point" , "type" : "long" }
237+ ],
238+ },
239+ },
240+ ],
241+ },
242+ },
243+ ],
244+ },
245+ },
246+ ],
247+ }
248+ )
249+
250+ VALUE_DISTRIBUTION = json .dumps (
251+ {
252+ "com.acme.avro.testrecord::Key1Unused" : [0 , 100 ],
253+ "com.acme.avro.testrecord::Key2Unused" : [0 , 250000 ],
254+ "com.acme.avro.InnerRecord1::Point" : [10000 , 1000000000 ],
255+ "com.acme.avro.InnerRecord2::Point" : [10000 , 1000000000 ],
256+ "com.acme.avro.InnerRecord3::Point" : [10000 , 1000000000 ],
257+ "com.acme.avro.InnerRecord4::Point" : [10000 , 10000000000 ],
258+ }
259+ )
260+
261+
161262if __name__ == "__main__" :
162263 main ()
0 commit comments