From 28a8f1c8c123ddf46aea8e787c118b86110fb1f7 Mon Sep 17 00:00:00 2001 From: ldrogo27 Date: Tue, 7 Apr 2026 12:44:18 +0200 Subject: [PATCH] feat: Add Store avro --- .gitignore | 4 + Cargo.lock | 112 ++++---- Cargo.toml | 6 +- config.yaml | 13 +- detail.csv | 2 + link.csv | 2 + schemas/store.avsc | 415 ++++++++++++++++++++++++++++ schemas/supplyStoreDetail.avsc | 82 ++++++ schemas/supplyThirdLink.avsc | 54 ++++ schemas/supplyThirdReferential.avsc | 300 ++++++++++++++++++++ src/config.rs | 11 +- src/main.rs | 251 ++++++++++++++--- src/registry.rs | 233 ++++++++++++++-- store.csv | 2 + 14 files changed, 1357 insertions(+), 130 deletions(-) create mode 100644 detail.csv create mode 100644 link.csv create mode 100644 schemas/store.avsc create mode 100644 schemas/supplyStoreDetail.avsc create mode 100644 schemas/supplyThirdLink.avsc create mode 100644 schemas/supplyThirdReferential.avsc create mode 100644 store.csv diff --git a/.gitignore b/.gitignore index ea8c4bf..56bec56 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,5 @@ /target +.DS_Store +.idea + +Cargo.lock \ No newline at end of file diff --git a/Cargo.lock b/Cargo.lock index 0683922..7321f23 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -93,9 +93,9 @@ checksum = "b0674a1ddeecb70197781e945de4b3b8ffb61fa939a5597bcf48503737663100" [[package]] name = "apache-avro" -version = "0.19.0" +version = "0.21.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ce74162d8bc5bc22bd746e1849fb33f2c9416d2ce505d1d65ba49f170a207c90" +checksum = "36fa98bc79671c7981272d91a8753a928ff6a1cd8e4f20a44c45bd5d313840bf" dependencies = [ "bigdecimal", "bon", @@ -111,7 +111,7 @@ dependencies = [ "serde_json", "strum", "strum_macros", - "thiserror 2.0.15", + "thiserror 2.0.18", "uuid", ] @@ -156,9 +156,9 @@ checksum = "72b3254f16251a8381aa12e40e3c4d2f0199f8c6508fbecb9d91f575e0fbb8c6" [[package]] name = "bigdecimal" -version = "0.4.8" +version = "0.4.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1a22f228ab7a1b23027ccc6c350b72868017af7ea8356fbdf19f8d991c690013" +checksum = "4d6867f1565b3aad85681f1015055b087fcfd840d6aeee6eee7f2da317603695" dependencies = [ "autocfg", "libm", @@ -166,7 +166,6 @@ dependencies = [ "num-integer", "num-traits", "serde", - "serde_json", ] [[package]] @@ -186,9 +185,9 @@ dependencies = [ [[package]] name = "bon" -version = "3.7.1" +version = "3.9.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "537c317ddf588aab15c695bf92cf55dec159b93221c074180ca3e0e5a94da415" +checksum = "f47dbe92550676ee653353c310dfb9cf6ba17ee70396e1f7cf0a2020ad49b2fe" dependencies = [ "bon-macros", "rustversion", @@ -196,9 +195,9 @@ dependencies = [ [[package]] name = "bon-macros" -version = "3.7.1" +version = "3.9.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ca5abbf2d4a4c6896197c9de13d6d7cb7eff438c63dacde1dde980569cb00248" +checksum = "519bd3116aeeb42d5372c29d982d16d0170d3d4a5ed85fc7dd91642ffff3c67c" dependencies = [ "darling", "ident_case", @@ -957,9 +956,9 @@ dependencies = [ [[package]] name = "log" -version = "0.4.27" +version = "0.4.29" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "13dc2df351e3202783a1fe0d44375f7295ffb4049267b0f3018346dc122a1d94" +checksum = "5e5032e24019045c762d3c0f28f5b6b8bbf38563a65908389bf7978758920897" [[package]] name = "memchr" @@ -1265,9 +1264,9 @@ dependencies = [ [[package]] name = "rdkafka" -version = "0.38.0" +version = "0.39.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5f1856d72dbbbea0d2a5b2eaf6af7fb3847ef2746e883b11781446a51dbc85c0" +checksum = "d7956f9ac12b5712e50372d9749a3102f4810a8d42481c5eae3748d36d585bcf" dependencies = [ "futures-channel", "futures-util", @@ -1283,9 +1282,9 @@ dependencies = [ [[package]] name = "rdkafka-sys" -version = "4.9.0+2.10.0" +version = "4.10.0+2.12.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5230dca48bc354d718269f3e4353280e188b610f7af7e2fcf54b7a79d5802872" +checksum = "e234cf318915c1059d4921ef7f75616b5219b10b46e9f3a511a15eb4b56a3f77" dependencies = [ "libc", "libz-sys", @@ -1304,15 +1303,15 @@ dependencies = [ [[package]] name = "regex-lite" -version = "0.1.6" +version = "0.1.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "53a49587ad06b26609c52e423de037e7f57f20d53535d66e08c695f347df952a" +checksum = "cab834c73d247e67f4fae452806d17d3c7501756d98c8808d7c9c7aa7d18f973" [[package]] name = "reqwest" -version = "0.12.23" +version = "0.13.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d429f34c8092b2d42c7c93cec323bb4adeb7c67698f70839adec842ec10c7ceb" +checksum = "ab3f43e3283ab1488b624b44b0e988d0acea0b3214e694730a055cb6b2efa801" dependencies = [ "base64", "bytes", @@ -1331,7 +1330,6 @@ dependencies = [ "rustls-pki-types", "serde", "serde_json", - "serde_urlencoded", "sync_wrapper", "tokio", "tokio-native-tls", @@ -1395,9 +1393,9 @@ dependencies = [ [[package]] name = "schema_registry_converter" -version = "4.5.0" +version = "4.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a88502ff545fb0501e0418f06e58936667b1bf99c15b4c7cdd769a917de6c3a2" +checksum = "67d077e1c21a26fb61a65460945db7cedd14e0865ae999abf51ff68ee383e54e" dependencies = [ "apache-avro", "byteorder", @@ -1439,27 +1437,38 @@ dependencies = [ [[package]] name = "serde" -version = "1.0.219" +version = "1.0.228" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5f0e2c6ed6606019b4e29e69dbaba95b11854410e5347d525002456dbbb786b6" +checksum = "9a8e94ea7f378bd32cbbd37198a4a91436180c5bb472411e48b5ec2e2124ae9e" dependencies = [ + "serde_core", "serde_derive", ] [[package]] name = "serde_bytes" -version = "0.11.17" +version = "0.11.19" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8437fd221bde2d4ca316d61b90e337e9e702b3820b87d63caa9ba6c02bd06d96" +checksum = "a5d440709e79d88e51ac01c4b72fc6cb7314017bb7da9eeff678aa94c10e3ea8" dependencies = [ "serde", + "serde_core", +] + +[[package]] +name = "serde_core" +version = "1.0.228" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "41d385c7d4ca58e59fc732af25c3983b67ac852c1a25000afe1175de458b67ad" +dependencies = [ + "serde_derive", ] [[package]] name = "serde_derive" -version = "1.0.219" +version = "1.0.228" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5b0276cf7f2c73365f7157c8123c21cd9a50fbbd844757af28ca1f5925fc2a00" +checksum = "d540f220d3187173da220f885ab66608367b6574e925011a9353e4badda91d79" dependencies = [ "proc-macro2", "quote", @@ -1468,26 +1477,15 @@ dependencies = [ [[package]] name = "serde_json" -version = "1.0.143" +version = "1.0.149" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d401abef1d108fbd9cbaebc3e46611f4b1021f714a0597a71f41ee463f5f4a5a" +checksum = "83fc039473c5595ace860d8c4fafa220ff474b3fc6bfdb4293327f1a37e94d86" dependencies = [ "itoa", "memchr", - "ryu", - "serde", -] - -[[package]] -name = "serde_urlencoded" -version = "0.7.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d3491c14715ca2294c4d6a88f15e84739788c1d030eed8c110436aafdaa2f3fd" -dependencies = [ - "form_urlencoded", - "itoa", - "ryu", "serde", + "serde_core", + "zmij", ] [[package]] @@ -1623,11 +1621,11 @@ dependencies = [ [[package]] name = "thiserror" -version = "2.0.15" +version = "2.0.18" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "80d76d3f064b981389ecb4b6b7f45a0bf9fdac1d5b9204c7bd6714fecc302850" +checksum = "4288b5bcbc7920c07a1149a35cf9590a2aa808e0bc1eafaade0b80947865fbc4" dependencies = [ - "thiserror-impl 2.0.15", + "thiserror-impl 2.0.18", ] [[package]] @@ -1643,9 +1641,9 @@ dependencies = [ [[package]] name = "thiserror-impl" -version = "2.0.15" +version = "2.0.18" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "44d29feb33e986b6ea906bd9c3559a856983f92371b3eaa5e83782a351623de0" +checksum = "ebc4ee7f67670e9b64d05fa4253e753e016c6c95ff35b89b7941d6b856dec1d5" dependencies = [ "proc-macro2", "quote", @@ -1737,9 +1735,9 @@ dependencies = [ [[package]] name = "tower-http" -version = "0.6.6" +version = "0.6.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "adc82fd73de2a9722ac5da747f12383d2bfdb93591ee6c58486e0097890f05f2" +checksum = "d4e6559d53cc268e5031cd8429d05415bc4cb4aefc4aa5d6cc35fbf5b924a1f8" dependencies = [ "bitflags", "bytes", @@ -1827,12 +1825,12 @@ checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821" [[package]] name = "uuid" -version = "1.18.0" +version = "1.23.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f33196643e165781c20a5ead5582283a7dacbb87855d867fbc2df3f81eddc1be" +checksum = "5ac8b6f42ead25368cf5b098aeb3dc8a1a2c05a3eee8a9a1a68c640edbfc79d9" dependencies = [ "js-sys", - "serde", + "serde_core", "wasm-bindgen", ] @@ -2244,3 +2242,9 @@ dependencies = [ "quote", "syn", ] + +[[package]] +name = "zmij" +version = "1.0.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b8848ee67ecc8aedbaf3e4122217aff892639231befc6a1b58d29fff4c2cabaa" diff --git a/Cargo.toml b/Cargo.toml index b3c976c..5a88327 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,8 +12,8 @@ serde = { version = "1", features = ["derive"] } serde_json = "1" serde_yaml2 = "0.1" -rdkafka = { version = "0.38", features = ["tokio"] } -schema_registry_converter = { version = "4", features = ["avro"] } -apache-avro = "0.19" +rdkafka = { version = "0.39", features = ["tokio"] } +schema_registry_converter = { version = "4.8.0", features = ["avro"] } +apache-avro = "0.21" anyhow = "1" tokio = { version = "1", features = ["full"] } \ No newline at end of file diff --git a/config.yaml b/config.yaml index be0a4c7..918390f 100644 --- a/config.yaml +++ b/config.yaml @@ -1,11 +1,16 @@ kafka: brokers: "localhost:9092" + schema_registry: "http://localhost:9093/apis/ccompat/v6" security: protocol: "PLAINTEXT" schemas: - users: - file: "schemas/stores.avsc" - orders: - file: "schemas/link.avsc" + third: + file: "schemas/supplyThirdReferential.avsc" + link: + file: "schemas/supplyThirdlink.avsc" + detail: + file: "schemas/supplyStoreDetail.avsc" + store: + file: "schemas/store.avsc" diff --git a/detail.csv b/detail.csv new file mode 100644 index 0000000..79a2c69 --- /dev/null +++ b/detail.csv @@ -0,0 +1,2 @@ +store_third_type,store_third_id,store_third_sub_id,sap_store_id,linear_size,opening_date,river_id,logistic_area +007,00839,00839,0839,3973,2014-09-05,839,00038 \ No newline at end of file diff --git a/link.csv b/link.csv new file mode 100644 index 0000000..e23b5ab --- /dev/null +++ b/link.csv @@ -0,0 +1,2 @@ +parent_third_type,parent_third_id,parent_third_sub_id,third_type,third_id,third_sub_id,link_id,creation_date +007,10460,10460,007,43386,43386,500,2025-08-29 \ No newline at end of file diff --git a/schemas/store.avsc b/schemas/store.avsc new file mode 100644 index 0000000..3a8b1bc --- /dev/null +++ b/schemas/store.avsc @@ -0,0 +1,415 @@ +{ + "type": "record", + "name": "StoreAvro", + "namespace": "com.decathlon.onestore.avro", + "fields": [ + { + "name": "thirdType", + "type": "int" + }, + { + "name": "thirdNum", + "type": "int" + }, + { + "name": "name", + "type": "string" + }, + { + "name": "country", + "type": [ + "null", + { + "type": "record", + "name": "CountryAvro", + "fields": [ + { + "name": "countryCode", + "type": [ + "null", + "string" + ], + "default": null + }, + { + "name": "countryName", + "type": [ + "null", + "string" + ], + "default": null + } + ] + } + ], + "default": null + }, + { + "name": "fiscalCompanyNumber", + "type": [ + "null", + "string" + ], + "default": null + }, + { + "name": "eanCode", + "type": [ + "null", + "string" + ], + "default": null + }, + { + "name": "address", + "type": { + "type": "record", + "name": "AddressAvro", + "fields": [ + { + "name": "street", + "type": [ + "null", + "string" + ], + "default": null + }, + { + "name": "addressComplement", + "type": [ + "null", + "string" + ], + "default": null + }, + { + "name": "zipCode", + "type": [ + "null", + "string" + ], + "default": null + }, + { + "name": "city", + "type": "string" + }, + { + "name": "administrativeAreaLevel1", + "type": [ + "null", + "string" + ], + "default": null + }, + { + "name": "administrativeAreaLevel2", + "type": [ + "null", + "string" + ], + "default": null + }, + { + "name": "administrativeAreaLevel3", + "type": [ + "null", + "string" + ], + "default": null + }, + { + "name": "administrativeAreaLevel4", + "type": [ + "null", + "string" + ], + "default": null + }, + { + "name": "administrativeAreaLevel5", + "type": [ + "null", + "string" + ], + "default": null + }, + { + "name": "latitude", + "type": [ + "null", + "string" + ], + "default": null + }, + { + "name": "longitude", + "type": [ + "null", + "string" + ], + "default": null + }, + { + "name": "timezone", + "type": [ + "null", + "string" + ], + "default": null + } + ] + } + }, + { + "name": "phones", + "type": { + "type": "record", + "name": "PhonesAvro", + "fields": [ + { + "name": "phone1", + "type": { + "type": "record", + "name": "PhoneAvro", + "fields": [ + { + "name": "label", + "type": "string" + }, + { + "name": "index", + "type": [ + "null", + "int" + ], + "default": null + }, + { + "name": "number", + "type": "string" + } + ] + } + }, + { + "name": "phone2", + "type": "PhoneAvro" + } + ] + } + }, + { + "name": "genericStoreType", + "type": [ + "null", + "string" + ], + "default": null + }, + { + "name": "category", + "type": [ + "null", + "string" + ], + "default": null + }, + { + "name": "language", + "type": [ + "null", + "string" + ], + "default": null + }, + { + "name": "status", + "type": "string" + }, + { + "name": "franchised", + "type": "boolean", + "default": false + }, + { + "name": "openingDate", + "type": [ + "null", + { + "type": "int", + "logicalType": "date" + } + ], + "default": null + }, + { + "name": "workingHours", + "type": { + "type": "array", + "items": { + "type": "record", + "name": "WorkingHourAvro", + "fields": [ + { + "name": "day", + "type": "string" + }, + { + "name": "status", + "type": "string" + }, + { + "name": "opens", + "type": [ + "null", + "string" + ], + "default": null + }, + { + "name": "closes", + "type": [ + "null", + "string" + ], + "default": null + }, + { + "name": "dividedCloses", + "type": [ + "null", + "string" + ], + "default": null + }, + { + "name": "dividedOpens", + "type": [ + "null", + "string" + ], + "default": null + } + ] + } + }, + "default": [] + }, + { + "name": "leader", + "type": [ + "null", + "string" + ], + "default": null + }, + { + "name": "socialMedias", + "type": { + "type": "array", + "items": { + "type": "record", + "name": "SocialMediaAvro", + "fields": [ + { + "name": "name", + "type": "string" + }, + { + "name": "link", + "type": "string" + } + ] + } + }, + "default": [] + }, + { + "name": "publishOnEcommerce", + "type": "boolean" + }, + { + "name": "picture", + "type": [ + "null", + "string" + ], + "default": null + }, + { + "name": "welcomeMessages", + "type": { + "type": "array", + "items": { + "type": "record", + "name": "WelcomeMessageAvro", + "fields": [ + { + "name": "language", + "type": "string" + }, + { + "name": "message", + "type": "string" + } + ] + } + }, + "default": [] + }, + { + "name": "exceptionalHours", + "type": { + "type": "array", + "items": { + "type": "record", + "name": "ExceptionalHourAvro", + "fields": [ + { + "name": "date", + "type": "string" + }, + { + "name": "status", + "type": "string" + }, + { + "name": "opens", + "type": [ + "null", + "string" + ], + "default": null + }, + { + "name": "closes", + "type": [ + "null", + "string" + ], + "default": null + }, + { + "name": "dividedCloses", + "type": [ + "null", + "string" + ], + "default": null + }, + { + "name": "dividedOpens", + "type": [ + "null", + "string" + ], + "default": null + } + ] + } + }, + "default": [] + } + ] +} \ No newline at end of file diff --git a/schemas/supplyStoreDetail.avsc b/schemas/supplyStoreDetail.avsc new file mode 100644 index 0000000..bdd3f24 --- /dev/null +++ b/schemas/supplyStoreDetail.avsc @@ -0,0 +1,82 @@ +{ + "doc": "Legacy Store data", + "domain": "Supply", + "fields": [ + { + "doc": "Third type", + "name": "store_third_type", + "type": "string" + }, + { + "doc": "Third Id", + "name": "store_third_id", + "type": "string" + }, + { + "doc": "Third sub id", + "name": "store_third_sub_id", + "type": "string" + }, + { + "doc": "Store Sap Identifier", + "name": "sap_store_id", + "type": "string" + }, + { + "default": null, + "doc": "Linear Size", + "name": "linear_size", + "type": [ + "null", + "string" + ] + }, + { + "default": null, + "doc": "Opening Date", + "name": "opening_date", + "type": [ + "null", + "string" + ] + }, + { + "default": null, + "doc": "River Identifier", + "name": "river_id", + "type": [ + "null", + "string" + ] + }, + { + "default": null, + "doc": "Logistic arera", + "name": "logistic_area", + "type": [ + "null", + "string" + ] + }, + { + "default": null, + "name": "store_type", + "type": [ + "null", + "string" + ] + }, + { + "default": null, + "name": "store_sign", + "type": [ + "null", + "string" + ] + } + ], + "name": "SupplyStoreDetail", + "namespace": "com.decathlon.vct.vcstream.avro.supplystoredetail", + "type": "record", + "version": "1" +} \ No newline at end of file diff --git a/schemas/supplyThirdLink.avsc b/schemas/supplyThirdLink.avsc new file mode 100644 index 0000000..172e239 --- /dev/null +++ b/schemas/supplyThirdLink.avsc @@ -0,0 +1,54 @@ +{ + "doc": "", + "domain": "Supply", + "fields": [ + { + "doc": "parent third type", + "name": "parent_third_type", + "type": "string" + }, + { + "doc": "parent_third_id", + "name": "parent_third_id", + "type": "string" + }, + { + "doc": "Parent third sub Id", + "name": "parent_third_sub_id", + "type": "string" + }, + { + "doc": "third type", + "name": "third_type", + "type": "string" + }, + { + "doc": "third Id", + "name": "third_id", + "type": "string" + }, + { + "doc": "third sub Id", + "name": "third_sub_id", + "type": "string" + }, + { + "doc": "link between 2 thirds", + "name": "link_id", + "type": "string" + }, + { + "default": null, + "doc": "Creation Date", + "name": "creation_date", + "type": [ + "null", + "string" + ] + } + ], + "name": "SupplyThirdlink", + "namespace": "com.decathlon.vct.vcstream.avro.supplythirdlink", + "type": "record", + "version": "1" +} \ No newline at end of file diff --git a/schemas/supplyThirdReferential.avsc b/schemas/supplyThirdReferential.avsc new file mode 100644 index 0000000..5786713 --- /dev/null +++ b/schemas/supplyThirdReferential.avsc @@ -0,0 +1,300 @@ +{ + "doc": "Main Third data", + "domain": "Supply", + "fields": [ + { + "doc": "Third type", + "name": "third_type", + "type": "string" + }, + { + "doc": "Third Id", + "name": "third_id", + "type": "string" + }, + { + "doc": "Third sub Id", + "name": "third_sub_id", + "type": "string" + }, + { + "doc": "Supplier Name", + "name": "legal_name", + "type": "string" + }, + { + "default": null, + "doc": "Country Code", + "name": "country_code", + "type": [ + "null", + "string" + ] + }, + { + "default": null, + "doc": "Currency for Purchase Order", + "name": "order_currency", + "type": [ + "null", + "string" + ] + }, + { + "default": null, + "doc": "Is this material internal ?", + "name": "is_internal", + "type": [ + "null", + "boolean" + ] + }, + { + "default": null, + "doc": "SIREN Code", + "name": "tax_number", + "type": [ + "null", + "string" + ] + }, + { + "default": null, + "doc": "Code identifier for VAT", + "name": "vat_number", + "type": [ + "null", + "string" + ] + }, + { + "default": null, + "doc": "Vendor GLN Code", + "name": "third_gln", + "type": [ + "null", + "string" + ] + }, + { + "default": null, + "doc": "Sort Name", + "name": "sort_name", + "type": [ + "null", + "string" + ] + }, + { + "default": null, + "doc": "CNUF Code", + "name": "cnuf_code", + "type": [ + "null", + "string" + ] + }, + { + "default": null, + "doc": "Street description", + "name": "street", + "type": [ + "null", + "string" + ] + }, + { + "default": null, + "doc": "Address Complement", + "name": "address_complement", + "type": [ + "null", + "string" + ] + }, + { + "default": null, + "doc": "ZIP Code", + "name": "zip_code", + "type": [ + "null", + "string" + ] + }, + { + "default": null, + "doc": "City", + "name": "city", + "type": [ + "null", + "string" + ] + }, + { + "default": null, + "doc": "Phone number", + "name": "phone_number", + "type": [ + "null", + "string" + ] + }, + { + "default": null, + "doc": "Fax number", + "name": "fax_number", + "type": [ + "null", + "string" + ] + }, + { + "default": null, + "doc": "Rivers Id (obsolete)", + "name": "rivers_id", + "type": [ + "null", + "string" + ] + }, + { + "default": null, + "doc": "Is this material autonomous ?", + "name": "is_autonomous", + "type": [ + "null", + "boolean" + ] + }, + { + "default": null, + "doc": "define the vendor type (specific Brazil)", + "name": "is_linked_to_dp", + "type": [ + "null", + "boolean" + ] + }, + { + "default": null, + "doc": "is it a subcontractor?", + "name": "is_subcontractor", + "type": [ + "null", + "boolean" + ] + }, + { + "default": null, + "doc": "Fiscal company number", + "name": "fiscal_company_number", + "type": [ + "null", + "string" + ] + }, + { + "default": null, + "doc": "Accounting Entity", + "name": "accounting_entity", + "type": [ + "null", + "string" + ] + }, + { + "default": null, + "doc": "Cost center", + "name": "cost_center", + "type": [ + "null", + "string" + ] + }, + { + "default": null, + "doc": "jde Code", + "name": "jde_code", + "type": [ + "null", + "string" + ] + }, + { + "default": null, + "doc": "treeview organization", + "name": "treeview_organization", + "type": [ + "null", + "string" + ] + }, + { + "default": null, + "doc": "treeview level", + "name": "treeview_level", + "type": [ + "null", + "string" + ] + }, + { + "default": null, + "doc": "treeview number", + "name": "treeview_number", + "type": [ + "null", + "string" + ] + }, + { + "default": null, + "doc": "Creation date", + "name": "creation_date", + "type": [ + "null", + "string" + ] + }, + { + "default": null, + "doc": "Modification date", + "name": "modification_date", + "type": [ + "null", + "string" + ] + }, + { + "default": null, + "doc": "Is this third closed ?", + "name": "is_closed", + "type": [ + "null", + "boolean" + ] + }, + { + "default": null, + "doc": "language code", + "name": "language_code", + "type": [ + "null", + "string" + ] + }, + { + "default": null, + "doc": "Logistic area", + "name": "logistic_area", + "type": [ + "null", + "string" + ] + } + ], + "name": "SupplyThirdReferential", + "namespace": "com.decathlon.vct.vcstream.avro.supplythirdreferential", + "type": "record", + "version": "1" +} \ No newline at end of file diff --git a/src/config.rs b/src/config.rs index 2dd888e..1004df7 100644 --- a/src/config.rs +++ b/src/config.rs @@ -1,10 +1,9 @@ use serde::Deserialize; -use std::collections::HashMap; #[derive(Debug, Deserialize)] pub struct AppConfig { pub kafka: KafkaConfig, - pub schemas: HashMap, + // pub schemas: HashMap, } #[derive(Debug, Deserialize)] @@ -13,7 +12,7 @@ pub struct KafkaConfig { pub schema_registry: String, } -#[derive(Debug, Deserialize)] -pub struct SchemaConfig { - pub file: String, -} +// #[derive(Debug, Deserialize)] +// pub struct SchemaConfig { +// pub file: String, +// } diff --git a/src/main.rs b/src/main.rs index e57e344..d040732 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,47 +1,50 @@ -mod registry; mod config; +mod registry; -use clap::Parser; -use csv::ReaderBuilder; -use schema_registry_converter::async_impl::avro::AvroEncoder; -use schema_registry_converter::async_impl::schema_registry::SrSettings; -use serde::Deserialize; -use std::fs; - +use anyhow::{Context, Ok}; +use clap::{Parser, ValueEnum}; +use serde::de::DeserializeOwned; +use std::{env, fs}; use crate::config::AppConfig; -use crate::registry::{ChatMessage, KafkaProducer}; +use crate::registry::{ + AddressAvro, CountryAvro, Detail, KafkaProducer, Link, PhoneAvro, PhonesAvro, Store, StoreAvro, + WorkingHourAvro, +}; + +/// Énumération pour un choix de topic sécurisé via la ligne de commande. +#[derive(ValueEnum, Clone, Debug)] +#[clap(rename_all = "kebab_case")] +enum Topic { + Store, + Third, + Detail, + Link, +} /// CLI tool to send CSV rows as Avro messages to Kafka -#[derive(Parser, Debug)] +#[derive(Parser)] #[command(author, version, about, long_about = None)] struct Args { - /// Path to the config file (YAML) #[arg(short, long, default_value = "config.yaml")] config: String, /// Path to the CSV file - #[arg(short, long)] - csv: String, + #[arg(long)] + csv: Option, /// Kafka topic to produce to - #[arg(short, long)] - topic: String, - - /// Schema Registry URL (Apicurio or Confluent compatible) - #[arg(short = 'r', long, default_value = "http://localhost:8080/apis/registry/v2/groups/default/subjects")] - registry_url: String, - -} - -#[derive(Debug, Deserialize)] -struct User { - id: i32, - name: String, + #[arg(value_enum)] + topic: Topic, } #[tokio::main] async fn main() -> anyhow::Result<()> { + // this method needs to be inside main() method + unsafe { + env::set_var("RUST_BACKTRACE", "1"); + } + let args = Args::parse(); // Charger la configuration @@ -49,21 +52,195 @@ async fn main() -> anyhow::Result<()> { let cfg: AppConfig = serde_yaml2::from_str(&cfg_str).unwrap(); // 1. Kafka Producer - let producer: KafkaProducer = KafkaProducer::new(cfg.kafka.brokers, cfg.kafka.schema_registry, args.topic); + let producer: KafkaProducer = KafkaProducer::new(cfg.kafka.brokers, cfg.kafka.schema_registry); - // 2. Lecture CSV - let mut rdr = ReaderBuilder::new() - .has_headers(true) - .from_path(&args.csv)?; + let payload = StoreAvro { + third_type: 7, + third_num: 77777, + name: "Decathlon Lille".to_string(), + country: Some(CountryAvro { + country_code: Some("FR".to_string()), + country_name: Some("France".to_string()), + }), + fiscal_company_number: Some("200".to_string()), + ean_code: Some("3020913027281".to_string()), + address: AddressAvro { + street: Some("1 Avenue Frederic et Irene Joliot-Curie PLACE DE LA BOULE".to_string()), + address_complement: None, + zip_code: Some("59000".to_string()), + city: "Lille".to_string(), + administrative_area_level1: None, + administrative_area_level2: None, + administrative_area_level3: None, + administrative_area_level4: None, + administrative_area_level5: None, + latitude: Some("48.888112987144".to_string()), + longitude: Some("2.2013429927292".to_string()), + timezone: Some("Europe/Paris".to_string()), + }, + phones: PhonesAvro { + phone1: PhoneAvro { + label: "main".into(), + index: Some(1), + number: "0102030405".into(), + }, + phone2: PhoneAvro { + label: "alt".into(), + index: Some(2), + number: "0607080910".into(), + }, + }, + generic_store_type: None, + category: None, + language: Some("fr".into()), + status: "OPEN".into(), + franchised: false, + opening_date: None, + working_hours: vec![ + WorkingHourAvro { + day: "MONDAY".into(), + status: "Closed".into(), + opens: None, + closes: None, + divided_opens: None, + divided_closes: None, + }, + WorkingHourAvro { + day: "TUESDAY".into(), + status: "Closed".into(), + opens: None, + closes: None, + divided_opens: None, + divided_closes: None, + }, + WorkingHourAvro { + day: "WEDNESDAY".into(), + status: "Closed".into(), + opens: None, + closes: None, + divided_opens: None, + divided_closes: None, + }, + WorkingHourAvro { + day: "THURSDAY".into(), + status: "Closed".into(), + opens: None, + closes: None, + divided_opens: None, + divided_closes: None, + }, + WorkingHourAvro { + day: "FRIDAY".into(), + status: "Closed".into(), + opens: None, + closes: None, + divided_opens: None, + divided_closes: None, + }, + WorkingHourAvro { + day: "SATURDAY".into(), + status: "Closed".into(), + opens: None, + closes: None, + divided_opens: None, + divided_closes: None, + }, + WorkingHourAvro { + day: "SUNDAY".into(), + status: "Closed".into(), + opens: None, + closes: None, + divided_opens: None, + divided_closes: None, + }, + ], + leader: Some("Vincent, Leader Magasin".to_string()), + social_medias: vec![], + publish_on_ecommerce: true, + picture: Some("https://onestore-cdn.decathlon.net/shop/117".to_string()), + welcome_messages: vec![], + exceptional_hours: vec![], + }; - for result in rdr.deserialize() { - let chat: ChatMessage = result.unwrap(); + match args.topic { + Topic::Third => { + let ok = producer + .produce( + "store-key-77777".to_string(), + payload.clone(), + "onestore_v2_store", + ) + .await; + if ok { + println!("✅ Sent third {:?}", payload); + } + } + Topic::Store => { + let csv_path = args + .csv + .as_deref() + .context("L'option --csv est obligatoire pour ce topic")?; - // 3. Envoi Kafka - if producer.produce(chat.user.to_string(), chat.clone()).await { - println!("✅ Sent chat {:?}", chat); + let records = read_csv_file::(csv_path)?; + for store in records { + if producer + .produce("Store".to_string(), store.clone(), "third_referential") + .await + { + println!("✅ Sent store {:?}", store); + } + } + } + Topic::Link => { + let csv_path = args + .csv + .as_deref() + .context("L'option --csv est obligatoire pour ce topic")?; + let records = read_csv_file::(csv_path)?; + for link in records { + if producer + .produce("link".to_string(), link.clone(), "third_link") + .await + { + println!("✅ Sent link {:?}", link); + } + } + } + Topic::Detail => { + let csv_path = args + .csv + .as_deref() + .context("L'option --csv est obligatoire pour ce topic")?; + let records = read_csv_file::(csv_path)?; + for detail in records { + if producer + .produce("Detail".to_string(), detail.clone(), "store_detail") + .await + { + println!("✅ Sent detail {:?}", detail); + } else { + println!("X Sent detail {:?}", detail); + } + } } } Ok(()) -} \ No newline at end of file +} + +fn read_csv_file(file: &str) -> Result, anyhow::Error> +where + T: DeserializeOwned, +{ + let mut rdr = csv::Reader::from_path(file) + .with_context(|| format!("Impossible d'ouvrir le fichier CSV : {:?}", file)) + .unwrap(); + + let mut values = Vec::new(); + + for result in rdr.deserialize() { + let record: T = result.unwrap(); + values.push(record); + } + Ok(values) +} diff --git a/src/registry.rs b/src/registry.rs index 3ebf8f9..6528ed2 100644 --- a/src/registry.rs +++ b/src/registry.rs @@ -6,39 +6,220 @@ use schema_registry_converter::async_impl::schema_registry::SrSettings; use schema_registry_converter::avro_common::get_supplied_schema; use schema_registry_converter::schema_registry_common::SubjectNameStrategy; use serde::{Deserialize, Serialize}; +use std::fs; use std::sync::Arc; use std::time::Duration; -#[derive(Clone, Serialize, Deserialize, Debug)] -pub struct ChatMessage { - pub user: String, - pub message: String, +#[derive(Clone, Debug, Deserialize, Serialize)] +pub struct Store { + pub third_type: String, + pub third_id: String, + pub third_sub_id: String, + pub legal_name: String, + pub country_code: String, + pub order_currency: String, + pub is_internal: Option, + pub tax_number: Option, + pub vat_number: Option, + pub third_gln: Option, + pub sort_name: Option, + pub cnuf_code: Option, + pub street: Option, + pub address_complement: Option, + pub zip_code: Option, + pub city: Option, + pub phone_number: Option, + pub fax_number: Option, + pub rivers_id: Option, + pub is_autonomous: Option, + pub is_linked_to_dp: Option, + pub is_subcontractor: Option, + pub fiscal_company_number: Option, + pub accounting_entity: Option, + pub cost_center: Option, + pub jde_code: Option, + pub treeview_organization: Option, + pub treeview_level: Option, + pub treeview_number: Option, + pub creation_date: Option, + pub modification_date: Option, + pub is_closed: Option, + pub language_code: Option, + pub logistic_area: Option, } -impl AvroSchema for ChatMessage { - fn get_schema() -> Schema { - // Define the Avro schema for ChatMessage - Schema::parse_str( - r#"{ - "type": "record", - "name": "ChatMessage", - "fields": [ - {"name": "user", "type": "string"}, - {"name": "message", "type": "string"} - ] - }"# - ).unwrap() +impl AvroSchema for Store { + fn get_schema() -> apache_avro::schema::Schema { + let schema_path = "schemas/supplyThirdReferential.avsc"; + let schema_json = fs::read_to_string(schema_path).unwrap(); + + Schema::parse_str(&schema_json).unwrap() } } +#[derive(Clone, Debug, Deserialize, Serialize)] +pub struct Link { + pub parent_third_type: String, + pub parent_third_id: String, + pub parent_third_sub_id: String, + pub third_type: String, + pub third_id: String, + pub third_sub_id: String, + pub link_id: String, + pub creation_date: Option, +} + +impl AvroSchema for Link { + fn get_schema() -> apache_avro::schema::Schema { + let schema_path = "schemas/supplyThirdLink.avsc"; + let schema_json = fs::read_to_string(schema_path).unwrap(); + + Schema::parse_str(&schema_json).unwrap() + } +} + +#[derive(Clone, Debug, Deserialize, Serialize)] +pub struct Detail { + pub store_third_type: String, + pub store_third_id: String, + pub store_third_sub_id: String, + pub sap_store_id: String, + pub linear_size: Option, + pub opening_date: Option, + pub river_id: Option, + pub logistic_area: Option, + pub store_type: Option, + pub store_sign: Option, +} + +impl AvroSchema for Detail { + fn get_schema() -> apache_avro::schema::Schema { + let schema_path = "schemas/supplyStoreDetail.avsc"; + let schema_json = fs::read_to_string(schema_path).unwrap(); + + Schema::parse_str(&schema_json).unwrap() + } +} + + +#[derive(Clone, Debug, Deserialize, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct StoreAvro { + pub third_type: i32, + pub third_num: i32, + pub name: String, + pub country: Option, + pub fiscal_company_number: Option, + pub ean_code: Option, + pub address: AddressAvro, + pub phones: PhonesAvro, + pub generic_store_type: Option, + pub category: Option, + pub language: Option, + pub status: String, + pub franchised: bool, + pub opening_date: Option, // logicalType date = int (jours depuis epoch) + pub working_hours: Vec, + pub leader: Option, + pub social_medias: Vec, + pub publish_on_ecommerce: bool, + pub picture: Option, + pub welcome_messages: Vec, + pub exceptional_hours: Vec, +} + +#[derive(Clone, Debug, Deserialize, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct CountryAvro { + pub country_code: Option, + pub country_name: Option, +} + +#[derive(Clone, Debug, Deserialize, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct AddressAvro { + pub street: Option, + pub address_complement: Option, + pub zip_code: Option, + pub city: String, + pub administrative_area_level1: Option, + pub administrative_area_level2: Option, + pub administrative_area_level3: Option, + pub administrative_area_level4: Option, + pub administrative_area_level5: Option, + pub latitude: Option, + pub longitude: Option, + pub timezone: Option, +} + +#[derive(Clone, Debug, Deserialize, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct PhonesAvro { + pub phone1: PhoneAvro, + pub phone2: PhoneAvro, +} + +#[derive(Clone, Debug, Deserialize, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct PhoneAvro { + pub label: String, + pub index: Option, + pub number: String, +} + +#[derive(Clone, Debug, Deserialize, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct WorkingHourAvro { + pub day: String, + pub status: String, + pub opens: Option, + pub closes: Option, + pub divided_closes: Option, + pub divided_opens: Option, +} + +#[derive(Clone, Debug, Deserialize, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct SocialMediaAvro { + pub name: String, + pub link: String, +} + +#[derive(Clone, Debug, Deserialize, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct WelcomeMessageAvro { + pub language: String, + pub message: String, +} + +#[derive(Clone, Debug, Deserialize, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct ExceptionalHourAvro { + pub date: String, + pub status: String, + pub opens: Option, + pub closes: Option, + pub divided_closes: Option, + pub divided_opens: Option, +} + +impl AvroSchema for StoreAvro { + fn get_schema() -> Schema { + let schema_json = + fs::read_to_string("schemas/store.avsc").expect("Impossible de lire schemas/store.avsc"); + Schema::parse_str(&schema_json).expect("Schema Avro invalide") + } +} + + pub struct KafkaProducer<'a> { producer: FutureProducer, + #[warn(dead_code)] avro_encoder: Arc>, - topic: String, } impl KafkaProducer<'_> { - pub fn new(bootstrap_servers: String, schema_registry_url: String, topic: String) -> Self { + pub fn new(bootstrap_servers: String, schema_registry_url: String) -> Self { let producer: FutureProducer = ClientConfig::new() .set("bootstrap.servers", bootstrap_servers) .set("produce.offset.report", "true") @@ -52,14 +233,16 @@ impl KafkaProducer<'_> { Self { producer, - topic, avro_encoder: Arc::new(avro_encoder), } } - pub async fn produce(&self, key: String, payload: T) -> bool { - let value_strategy = SubjectNameStrategy::TopicNameStrategyWithSchema( - self.topic.clone(), + pub async fn produce(&self, key: String, payload: T, topic: &str) -> bool + where + T: Serialize + apache_avro::AvroSchema, + { + let value_strategy = SubjectNameStrategy::TopicNameStrategyWithSchema( + topic.to_string(), true, get_supplied_schema(&T::get_schema()), ); @@ -73,9 +256,7 @@ impl KafkaProducer<'_> { Err(e) => panic!("Error encoding payload: {}", e), }; - let record = FutureRecord::to(&self.topic) - .key(&key) - .payload(&payload_bytes); + let record = FutureRecord::to(topic).key(&key).payload(&payload_bytes); match self.producer.send(record, Duration::from_secs(10)).await { Ok(_) => true, diff --git a/store.csv b/store.csv new file mode 100644 index 0000000..a66fb1c --- /dev/null +++ b/store.csv @@ -0,0 +1,2 @@ +third_type,third_id,third_sub_id,legal_name,country_code,order_currency,is_internal,third_gln,sort_name,cnuf_code,street,zip_code,city,phone_number,fax_number,is_autonomous,is_linked_to_dp,fiscal_company_number,accounting_entity,cost_center,jde_code,treeview_organization,treeview_level,treeview_number,creation_date,modification_date,is_closed,language_code +007,01692,01692,ROTHSCHILD,IL,ILS,false,3020916685709,ROTHS,0000000000,36 Rothschild,65122,TEL AVIV,0,0,false,false,0024,1691,1034501691,0668570,0002,0006,00001,2019-01-04,2025-08-19,false,HE