initial commit
This commit is contained in:
1
.gitignore
vendored
Normal file
1
.gitignore
vendored
Normal file
@@ -0,0 +1 @@
|
|||||||
|
/target
|
||||||
2246
Cargo.lock
generated
Normal file
2246
Cargo.lock
generated
Normal file
File diff suppressed because it is too large
Load Diff
19
Cargo.toml
Normal file
19
Cargo.toml
Normal file
@@ -0,0 +1,19 @@
|
|||||||
|
[package]
|
||||||
|
name = "kafka-producer"
|
||||||
|
version = "0.1.0"
|
||||||
|
edition = "2024"
|
||||||
|
|
||||||
|
[dependencies]
|
||||||
|
clap = { version = "4", features = ["derive"] }
|
||||||
|
csv = "1"
|
||||||
|
|
||||||
|
# Serialization / parsing
|
||||||
|
serde = { version = "1", features = ["derive"] }
|
||||||
|
serde_json = "1"
|
||||||
|
serde_yaml2 = "0.1"
|
||||||
|
|
||||||
|
rdkafka = { version = "0.38", features = ["tokio"] }
|
||||||
|
schema_registry_converter = { version = "4", features = ["avro"] }
|
||||||
|
apache-avro = "0.19"
|
||||||
|
anyhow = "1"
|
||||||
|
tokio = { version = "1", features = ["full"] }
|
||||||
11
config.yaml
Normal file
11
config.yaml
Normal file
@@ -0,0 +1,11 @@
|
|||||||
|
kafka:
|
||||||
|
brokers: "localhost:9092"
|
||||||
|
security:
|
||||||
|
protocol: "PLAINTEXT"
|
||||||
|
|
||||||
|
schemas:
|
||||||
|
users:
|
||||||
|
file: "schemas/stores.avsc"
|
||||||
|
orders:
|
||||||
|
file: "schemas/link.avsc"
|
||||||
|
|
||||||
19
src/config.rs
Normal file
19
src/config.rs
Normal file
@@ -0,0 +1,19 @@
|
|||||||
|
use serde::Deserialize;
|
||||||
|
use std::collections::HashMap;
|
||||||
|
|
||||||
|
#[derive(Debug, Deserialize)]
|
||||||
|
pub struct AppConfig {
|
||||||
|
pub kafka: KafkaConfig,
|
||||||
|
pub schemas: HashMap<String, SchemaConfig>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Deserialize)]
|
||||||
|
pub struct KafkaConfig {
|
||||||
|
pub brokers: String,
|
||||||
|
pub schema_registry: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Deserialize)]
|
||||||
|
pub struct SchemaConfig {
|
||||||
|
pub file: String,
|
||||||
|
}
|
||||||
69
src/main.rs
Normal file
69
src/main.rs
Normal file
@@ -0,0 +1,69 @@
|
|||||||
|
mod registry;
|
||||||
|
mod config;
|
||||||
|
|
||||||
|
use clap::Parser;
|
||||||
|
use csv::ReaderBuilder;
|
||||||
|
use schema_registry_converter::async_impl::avro::AvroEncoder;
|
||||||
|
use schema_registry_converter::async_impl::schema_registry::SrSettings;
|
||||||
|
use serde::Deserialize;
|
||||||
|
use std::fs;
|
||||||
|
|
||||||
|
use crate::config::AppConfig;
|
||||||
|
use crate::registry::{ChatMessage, KafkaProducer};
|
||||||
|
|
||||||
|
/// CLI tool to send CSV rows as Avro messages to Kafka
|
||||||
|
#[derive(Parser, Debug)]
|
||||||
|
#[command(author, version, about, long_about = None)]
|
||||||
|
struct Args {
|
||||||
|
|
||||||
|
/// Path to the config file (YAML)
|
||||||
|
#[arg(short, long, default_value = "config.yaml")]
|
||||||
|
config: String,
|
||||||
|
|
||||||
|
/// Path to the CSV file
|
||||||
|
#[arg(short, long)]
|
||||||
|
csv: String,
|
||||||
|
|
||||||
|
/// Kafka topic to produce to
|
||||||
|
#[arg(short, long)]
|
||||||
|
topic: String,
|
||||||
|
|
||||||
|
/// Schema Registry URL (Apicurio or Confluent compatible)
|
||||||
|
#[arg(short = 'r', long, default_value = "http://localhost:8080/apis/registry/v2/groups/default/subjects")]
|
||||||
|
registry_url: String,
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Deserialize)]
|
||||||
|
struct User {
|
||||||
|
id: i32,
|
||||||
|
name: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::main]
|
||||||
|
async fn main() -> anyhow::Result<()> {
|
||||||
|
let args = Args::parse();
|
||||||
|
|
||||||
|
// Charger la configuration
|
||||||
|
let cfg_str = fs::read_to_string(&args.config).expect("Empty config");
|
||||||
|
let cfg: AppConfig = serde_yaml2::from_str(&cfg_str).unwrap();
|
||||||
|
|
||||||
|
// 1. Kafka Producer
|
||||||
|
let producer: KafkaProducer = KafkaProducer::new(cfg.kafka.brokers, cfg.kafka.schema_registry, args.topic);
|
||||||
|
|
||||||
|
// 2. Lecture CSV
|
||||||
|
let mut rdr = ReaderBuilder::new()
|
||||||
|
.has_headers(true)
|
||||||
|
.from_path(&args.csv)?;
|
||||||
|
|
||||||
|
for result in rdr.deserialize() {
|
||||||
|
let chat: ChatMessage = result.unwrap();
|
||||||
|
|
||||||
|
// 3. Envoi Kafka
|
||||||
|
if producer.produce(chat.user.to_string(), chat.clone()).await {
|
||||||
|
println!("✅ Sent chat {:?}", chat);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
85
src/registry.rs
Normal file
85
src/registry.rs
Normal file
@@ -0,0 +1,85 @@
|
|||||||
|
use apache_avro::{AvroSchema, Schema};
|
||||||
|
use rdkafka::producer::{FutureProducer, FutureRecord};
|
||||||
|
use rdkafka::ClientConfig;
|
||||||
|
use schema_registry_converter::async_impl::avro::AvroEncoder;
|
||||||
|
use schema_registry_converter::async_impl::schema_registry::SrSettings;
|
||||||
|
use schema_registry_converter::avro_common::get_supplied_schema;
|
||||||
|
use schema_registry_converter::schema_registry_common::SubjectNameStrategy;
|
||||||
|
use serde::{Deserialize, Serialize};
|
||||||
|
use std::sync::Arc;
|
||||||
|
use std::time::Duration;
|
||||||
|
|
||||||
|
#[derive(Clone, Serialize, Deserialize, Debug)]
|
||||||
|
pub struct ChatMessage {
|
||||||
|
pub user: String,
|
||||||
|
pub message: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl AvroSchema for ChatMessage {
|
||||||
|
fn get_schema() -> Schema {
|
||||||
|
// Define the Avro schema for ChatMessage
|
||||||
|
Schema::parse_str(
|
||||||
|
r#"{
|
||||||
|
"type": "record",
|
||||||
|
"name": "ChatMessage",
|
||||||
|
"fields": [
|
||||||
|
{"name": "user", "type": "string"},
|
||||||
|
{"name": "message", "type": "string"}
|
||||||
|
]
|
||||||
|
}"#
|
||||||
|
).unwrap()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct KafkaProducer<'a> {
|
||||||
|
producer: FutureProducer,
|
||||||
|
avro_encoder: Arc<AvroEncoder<'a>>,
|
||||||
|
topic: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl KafkaProducer<'_> {
|
||||||
|
pub fn new(bootstrap_servers: String, schema_registry_url: String, topic: String) -> Self {
|
||||||
|
let producer: FutureProducer = ClientConfig::new()
|
||||||
|
.set("bootstrap.servers", bootstrap_servers)
|
||||||
|
.set("produce.offset.report", "true")
|
||||||
|
.set("message.timeout.ms", "5000")
|
||||||
|
.set("queue.buffering.max.messages", "10")
|
||||||
|
.create()
|
||||||
|
.expect("Producer creation error");
|
||||||
|
|
||||||
|
let sr_settings = SrSettings::new(schema_registry_url);
|
||||||
|
let avro_encoder = AvroEncoder::new(sr_settings);
|
||||||
|
|
||||||
|
Self {
|
||||||
|
producer,
|
||||||
|
topic,
|
||||||
|
avro_encoder: Arc::new(avro_encoder),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn produce<T: Serialize + apache_avro::AvroSchema>(&self, key: String, payload: T) -> bool {
|
||||||
|
let value_strategy = SubjectNameStrategy::TopicNameStrategyWithSchema(
|
||||||
|
self.topic.clone(),
|
||||||
|
true,
|
||||||
|
get_supplied_schema(&T::get_schema()),
|
||||||
|
);
|
||||||
|
|
||||||
|
let payload_bytes = match self
|
||||||
|
.avro_encoder
|
||||||
|
.encode_struct(payload, &value_strategy)
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
Ok(bytes) => bytes,
|
||||||
|
Err(e) => panic!("Error encoding payload: {}", e),
|
||||||
|
};
|
||||||
|
|
||||||
|
let record = FutureRecord::to(&self.topic)
|
||||||
|
.key(&key)
|
||||||
|
.payload(&payload_bytes);
|
||||||
|
|
||||||
|
match self.producer.send(record, Duration::from_secs(10)).await {
|
||||||
|
Ok(_) => true,
|
||||||
|
Err(_) => false,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user