use apache_avro::{AvroSchema, Schema}; use rdkafka::producer::{FutureProducer, FutureRecord}; use rdkafka::ClientConfig; use schema_registry_converter::async_impl::avro::AvroEncoder; use schema_registry_converter::async_impl::schema_registry::SrSettings; use schema_registry_converter::avro_common::get_supplied_schema; use schema_registry_converter::schema_registry_common::SubjectNameStrategy; use serde::{Deserialize, Serialize}; use std::fs; use std::sync::Arc; use std::time::Duration; #[derive(Clone, Debug, Deserialize, Serialize)] pub struct Store { pub third_type: String, pub third_id: String, pub third_sub_id: String, pub legal_name: String, pub country_code: String, pub order_currency: String, pub is_internal: Option, pub tax_number: Option, pub vat_number: Option, pub third_gln: Option, pub sort_name: Option, pub cnuf_code: Option, pub street: Option, pub address_complement: Option, pub zip_code: Option, pub city: Option, pub phone_number: Option, pub fax_number: Option, pub rivers_id: Option, pub is_autonomous: Option, pub is_linked_to_dp: Option, pub is_subcontractor: Option, pub fiscal_company_number: Option, pub accounting_entity: Option, pub cost_center: Option, pub jde_code: Option, pub treeview_organization: Option, pub treeview_level: Option, pub treeview_number: Option, pub creation_date: Option, pub modification_date: Option, pub is_closed: Option, pub language_code: Option, pub logistic_area: Option, } impl AvroSchema for Store { fn get_schema() -> apache_avro::schema::Schema { let schema_path = "schemas/supplyThirdReferential.avsc"; let schema_json = fs::read_to_string(schema_path).unwrap(); Schema::parse_str(&schema_json).unwrap() } } #[derive(Clone, Debug, Deserialize, Serialize)] pub struct Link { pub parent_third_type: String, pub parent_third_id: String, pub parent_third_sub_id: String, pub third_type: String, pub third_id: String, pub third_sub_id: String, pub link_id: String, pub creation_date: Option, } impl AvroSchema for Link { fn get_schema() -> apache_avro::schema::Schema { let schema_path = "schemas/supplyThirdLink.avsc"; let schema_json = fs::read_to_string(schema_path).unwrap(); Schema::parse_str(&schema_json).unwrap() } } #[derive(Clone, Debug, Deserialize, Serialize)] pub struct Detail { pub store_third_type: String, pub store_third_id: String, pub store_third_sub_id: String, pub sap_store_id: String, pub linear_size: Option, pub opening_date: Option, pub river_id: Option, pub logistic_area: Option, pub store_type: Option, pub store_sign: Option, } impl AvroSchema for Detail { fn get_schema() -> apache_avro::schema::Schema { let schema_path = "schemas/supplyStoreDetail.avsc"; let schema_json = fs::read_to_string(schema_path).unwrap(); Schema::parse_str(&schema_json).unwrap() } } #[derive(Clone, Debug, Deserialize, Serialize)] #[serde(rename_all = "camelCase")] pub struct StoreAvro { pub third_type: i32, pub third_num: i32, pub name: String, pub country: Option, pub fiscal_company_number: Option, pub ean_code: Option, pub address: AddressAvro, pub phones: PhonesAvro, pub generic_store_type: Option, pub category: Option, pub language: Option, pub status: String, pub franchised: bool, pub opening_date: Option, // logicalType date = int (jours depuis epoch) pub working_hours: Vec, pub leader: Option, pub social_medias: Vec, pub publish_on_ecommerce: bool, pub picture: Option, pub welcome_messages: Vec, pub exceptional_hours: Vec, } #[derive(Clone, Debug, Deserialize, Serialize)] #[serde(rename_all = "camelCase")] pub struct CountryAvro { pub country_code: Option, pub country_name: Option, } #[derive(Clone, Debug, Deserialize, Serialize)] #[serde(rename_all = "camelCase")] pub struct AddressAvro { pub street: Option, pub address_complement: Option, pub zip_code: Option, pub city: String, pub administrative_area_level1: Option, pub administrative_area_level2: Option, pub administrative_area_level3: Option, pub administrative_area_level4: Option, pub administrative_area_level5: Option, pub latitude: Option, pub longitude: Option, pub timezone: Option, } #[derive(Clone, Debug, Deserialize, Serialize)] #[serde(rename_all = "camelCase")] pub struct PhonesAvro { pub phone1: PhoneAvro, pub phone2: PhoneAvro, } #[derive(Clone, Debug, Deserialize, Serialize)] #[serde(rename_all = "camelCase")] pub struct PhoneAvro { pub label: String, pub index: Option, pub number: String, } #[derive(Clone, Debug, Deserialize, Serialize)] #[serde(rename_all = "camelCase")] pub struct WorkingHourAvro { pub day: String, pub status: String, pub opens: Option, pub closes: Option, pub divided_closes: Option, pub divided_opens: Option, } #[derive(Clone, Debug, Deserialize, Serialize)] #[serde(rename_all = "camelCase")] pub struct SocialMediaAvro { pub name: String, pub link: String, } #[derive(Clone, Debug, Deserialize, Serialize)] #[serde(rename_all = "camelCase")] pub struct WelcomeMessageAvro { pub language: String, pub message: String, } #[derive(Clone, Debug, Deserialize, Serialize)] #[serde(rename_all = "camelCase")] pub struct ExceptionalHourAvro { pub date: String, pub status: String, pub opens: Option, pub closes: Option, pub divided_closes: Option, pub divided_opens: Option, } impl AvroSchema for StoreAvro { fn get_schema() -> Schema { let schema_json = fs::read_to_string("schemas/store.avsc").expect("Impossible de lire schemas/store.avsc"); Schema::parse_str(&schema_json).expect("Schema Avro invalide") } } pub struct KafkaProducer<'a> { producer: FutureProducer, #[warn(dead_code)] avro_encoder: Arc>, } impl KafkaProducer<'_> { pub fn new(bootstrap_servers: String, schema_registry_url: String) -> Self { let producer: FutureProducer = ClientConfig::new() .set("bootstrap.servers", bootstrap_servers) .set("produce.offset.report", "true") .set("message.timeout.ms", "5000") .set("queue.buffering.max.messages", "10") .create() .expect("Producer creation error"); let sr_settings = SrSettings::new(schema_registry_url); let avro_encoder = AvroEncoder::new(sr_settings); Self { producer, avro_encoder: Arc::new(avro_encoder), } } pub async fn produce(&self, key: String, payload: T, topic: &str) -> bool where T: Serialize + apache_avro::AvroSchema, { let value_strategy = SubjectNameStrategy::TopicNameStrategyWithSchema( topic.to_string(), true, get_supplied_schema(&T::get_schema()), ); let payload_bytes = match self .avro_encoder .encode_struct(payload, &value_strategy) .await { Ok(bytes) => bytes, Err(e) => panic!("Error encoding payload: {}", e), }; let record = FutureRecord::to(topic).key(&key).payload(&payload_bytes); match self.producer.send(record, Duration::from_secs(10)).await { Ok(_) => true, Err(_) => false, } } }