Files
kafka-producer/src/registry.rs
2026-04-07 12:44:18 +02:00

266 lines
8.0 KiB
Rust

use apache_avro::{AvroSchema, Schema};
use rdkafka::producer::{FutureProducer, FutureRecord};
use rdkafka::ClientConfig;
use schema_registry_converter::async_impl::avro::AvroEncoder;
use schema_registry_converter::async_impl::schema_registry::SrSettings;
use schema_registry_converter::avro_common::get_supplied_schema;
use schema_registry_converter::schema_registry_common::SubjectNameStrategy;
use serde::{Deserialize, Serialize};
use std::fs;
use std::sync::Arc;
use std::time::Duration;
#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct Store {
pub third_type: String,
pub third_id: String,
pub third_sub_id: String,
pub legal_name: String,
pub country_code: String,
pub order_currency: String,
pub is_internal: Option<bool>,
pub tax_number: Option<String>,
pub vat_number: Option<String>,
pub third_gln: Option<String>,
pub sort_name: Option<String>,
pub cnuf_code: Option<String>,
pub street: Option<String>,
pub address_complement: Option<String>,
pub zip_code: Option<String>,
pub city: Option<String>,
pub phone_number: Option<String>,
pub fax_number: Option<String>,
pub rivers_id: Option<String>,
pub is_autonomous: Option<bool>,
pub is_linked_to_dp: Option<bool>,
pub is_subcontractor: Option<bool>,
pub fiscal_company_number: Option<String>,
pub accounting_entity: Option<String>,
pub cost_center: Option<String>,
pub jde_code: Option<String>,
pub treeview_organization: Option<String>,
pub treeview_level: Option<String>,
pub treeview_number: Option<String>,
pub creation_date: Option<String>,
pub modification_date: Option<String>,
pub is_closed: Option<bool>,
pub language_code: Option<String>,
pub logistic_area: Option<String>,
}
impl AvroSchema for Store {
fn get_schema() -> apache_avro::schema::Schema {
let schema_path = "schemas/supplyThirdReferential.avsc";
let schema_json = fs::read_to_string(schema_path).unwrap();
Schema::parse_str(&schema_json).unwrap()
}
}
#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct Link {
pub parent_third_type: String,
pub parent_third_id: String,
pub parent_third_sub_id: String,
pub third_type: String,
pub third_id: String,
pub third_sub_id: String,
pub link_id: String,
pub creation_date: Option<String>,
}
impl AvroSchema for Link {
fn get_schema() -> apache_avro::schema::Schema {
let schema_path = "schemas/supplyThirdLink.avsc";
let schema_json = fs::read_to_string(schema_path).unwrap();
Schema::parse_str(&schema_json).unwrap()
}
}
#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct Detail {
pub store_third_type: String,
pub store_third_id: String,
pub store_third_sub_id: String,
pub sap_store_id: String,
pub linear_size: Option<String>,
pub opening_date: Option<String>,
pub river_id: Option<String>,
pub logistic_area: Option<String>,
pub store_type: Option<String>,
pub store_sign: Option<String>,
}
impl AvroSchema for Detail {
fn get_schema() -> apache_avro::schema::Schema {
let schema_path = "schemas/supplyStoreDetail.avsc";
let schema_json = fs::read_to_string(schema_path).unwrap();
Schema::parse_str(&schema_json).unwrap()
}
}
#[derive(Clone, Debug, Deserialize, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct StoreAvro {
pub third_type: i32,
pub third_num: i32,
pub name: String,
pub country: Option<CountryAvro>,
pub fiscal_company_number: Option<String>,
pub ean_code: Option<String>,
pub address: AddressAvro,
pub phones: PhonesAvro,
pub generic_store_type: Option<String>,
pub category: Option<String>,
pub language: Option<String>,
pub status: String,
pub franchised: bool,
pub opening_date: Option<i32>, // logicalType date = int (jours depuis epoch)
pub working_hours: Vec<WorkingHourAvro>,
pub leader: Option<String>,
pub social_medias: Vec<SocialMediaAvro>,
pub publish_on_ecommerce: bool,
pub picture: Option<String>,
pub welcome_messages: Vec<WelcomeMessageAvro>,
pub exceptional_hours: Vec<ExceptionalHourAvro>,
}
#[derive(Clone, Debug, Deserialize, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct CountryAvro {
pub country_code: Option<String>,
pub country_name: Option<String>,
}
#[derive(Clone, Debug, Deserialize, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct AddressAvro {
pub street: Option<String>,
pub address_complement: Option<String>,
pub zip_code: Option<String>,
pub city: String,
pub administrative_area_level1: Option<String>,
pub administrative_area_level2: Option<String>,
pub administrative_area_level3: Option<String>,
pub administrative_area_level4: Option<String>,
pub administrative_area_level5: Option<String>,
pub latitude: Option<String>,
pub longitude: Option<String>,
pub timezone: Option<String>,
}
#[derive(Clone, Debug, Deserialize, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct PhonesAvro {
pub phone1: PhoneAvro,
pub phone2: PhoneAvro,
}
#[derive(Clone, Debug, Deserialize, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct PhoneAvro {
pub label: String,
pub index: Option<i32>,
pub number: String,
}
#[derive(Clone, Debug, Deserialize, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct WorkingHourAvro {
pub day: String,
pub status: String,
pub opens: Option<String>,
pub closes: Option<String>,
pub divided_closes: Option<String>,
pub divided_opens: Option<String>,
}
#[derive(Clone, Debug, Deserialize, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct SocialMediaAvro {
pub name: String,
pub link: String,
}
#[derive(Clone, Debug, Deserialize, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct WelcomeMessageAvro {
pub language: String,
pub message: String,
}
#[derive(Clone, Debug, Deserialize, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct ExceptionalHourAvro {
pub date: String,
pub status: String,
pub opens: Option<String>,
pub closes: Option<String>,
pub divided_closes: Option<String>,
pub divided_opens: Option<String>,
}
impl AvroSchema for StoreAvro {
fn get_schema() -> Schema {
let schema_json =
fs::read_to_string("schemas/store.avsc").expect("Impossible de lire schemas/store.avsc");
Schema::parse_str(&schema_json).expect("Schema Avro invalide")
}
}
pub struct KafkaProducer<'a> {
producer: FutureProducer,
#[warn(dead_code)]
avro_encoder: Arc<AvroEncoder<'a>>,
}
impl KafkaProducer<'_> {
pub fn new(bootstrap_servers: String, schema_registry_url: String) -> Self {
let producer: FutureProducer = ClientConfig::new()
.set("bootstrap.servers", bootstrap_servers)
.set("produce.offset.report", "true")
.set("message.timeout.ms", "5000")
.set("queue.buffering.max.messages", "10")
.create()
.expect("Producer creation error");
let sr_settings = SrSettings::new(schema_registry_url);
let avro_encoder = AvroEncoder::new(sr_settings);
Self {
producer,
avro_encoder: Arc::new(avro_encoder),
}
}
pub async fn produce<T>(&self, key: String, payload: T, topic: &str) -> bool
where
T: Serialize + apache_avro::AvroSchema,
{
let value_strategy = SubjectNameStrategy::TopicNameStrategyWithSchema(
topic.to_string(),
true,
get_supplied_schema(&T::get_schema()),
);
let payload_bytes = match self
.avro_encoder
.encode_struct(payload, &value_strategy)
.await
{
Ok(bytes) => bytes,
Err(e) => panic!("Error encoding payload: {}", e),
};
let record = FutureRecord::to(topic).key(&key).payload(&payload_bytes);
match self.producer.send(record, Duration::from_secs(10)).await {
Ok(_) => true,
Err(_) => false,
}
}
}