Introduksjon
Tide AS er Norges nest største mobilitetsselskap med en bussflåte på mer enn 2 300 busser som gjennomfører mer enn 20 000 buss- og baneavganger hver dag. Dette omfatter både offentlige kontrakter og kommersiell virksomhet.
Flere av disse bussene inneholder sendere som hyppig sender ut data om bussens tilstand, som for eksempel GPS-posisjon, bussens fart og retning, og bussens drivstofforbruk. Hvilke data bussen sender avhenger både av utstyret i bussen og FMS-standarden (Fleet Management System).
Hver buss med disse senderne sender data alt fra hvert 20 millisekund til hvert 10. sekund avhengig av hvilken data som sendes. Det er altså snakk om store mengder data! Så hvordan kan vi få best mulig nytte av den?
I denne artikkelen skal jeg beskrive hvordan vi i Altro AS har hjulpet Tide med denne problemstillingen og benyttet Microsoft Fabric sine Real-Time Intelligence-kapabiliteter for å skape innsikt i sanntidsdataen og tilgjengeliggjøre den på en effektiv måte.
Laste inn dataen
På samme måte som Tide hver dag laster mange passasjerer inn i bussene sine må dataen fra bussene lastes inn i Tide sin dataplatform for at den kan bearbeides mer og gi innsikt for de rette personene. Valget av dataplatform ble Microsoft Fabric.
Sanntidsdataen fra bussene sendes gjennom meldingsprotokollen MQTT som er en standard protokoll for IoT-meldinger og er ofte brukt i tilfeller som dette.
Microsoft har bygget Eventstream-objektet for å kunne bringe sanntidsdata inn i Fabric, transformere den og sende den videre til forskjellige destinasjoner. En Eventstream støtter nå flere eksterne kilder, som for eksempel Azure IoT Hubs, Azure Event Hubs og flere, men den støtter ikke MQTT direkte.
For å få inn MQTT-dataen tar vi derfor et lite mellomsteg med en Python-kode som sender dataen videre i et støttet format. For i Eventstreams kan man også velge “Custom endpoint”-kilder hvor man kan sende inn data på Event Hub-, AMQP- eller Kafka-format. For hvert av disse tilfellene får vi en connection string vi kan bruke for å sende data til. Vi har valgt å sende dataen inn i Eventstream på Event Hub-format. Flyten fra MQTT til Eventstream ser derfor ut som under der vi går gjennom en Spark Job Definition der Python-koden kjøres.
Nå som dataen flyter inn i Eventstream kan vi ta en titt på den. Med den innebygde forhåndsvisningen av dataen kan vi se de forskjellige feltene og hva de inneholder, samt når dataen ble prosessert og satt i kø. Dette hjelper oss å kunne se hvilke transformasjoner vi skal gjøre på dataen. For eksempel inneholder Message-kolonnen et stort nøstet JSON-objekt.
Vi får også innsikt i dataen som sendes inn ved å for eksempel se på innkommende meldinger den siste tiden. Under er de siste innkommende meldinger de siste 12 timene og vi kan se at bussene sender ulik mengde data gjennom dagen.
Transformere dataen
Når vi oppretter en Eventstream vil 1. og 2. i figuren over opprettes automatisk, og det er fra 2. forhåndsvisning og innsikt i avsnittet over kom fra. Her er 1. kilden, som i vår tilfelle er et Custom endpoint, og 2. er den første strømmen av data slik den ser ut i kilden.
I en Eventstream kan vi også transformere dataen i sanntid. I vårt eksempel viser jeg noen av transformasjonene vi gjør, men det finnes også flere. Her er en forklaring til de forskjellige punktene på figuren:
I dette steget bruker vi en filtertransformasjon som filtrerer bort test-busser som ikke sender brukbar data.
Som vi så i forhåndsvisningen av dataen inneholder Message-kolonnen et stort nøstet JSON-objekt. Ved hjelp av Manage field-transformasjonen nøster vi denne ut i flere kolonner. Her kan vi også bruke matematiske funksjoner på tall, dato-funksjoner, og streng-funksjoner på tekst.
Vi har valgt å lage en Derived Stream etter de mest sentrale transformasjonene. Denne representerer datastrømmen av den transformerte dataen, og vil da ha et annet format enn 2. som er datastrømmen av rå-dataen. Når denne er opprettet er den klar til videre forbruk eller analyse av andre i organisasjonen.
Etter dette punktet splittes strømmen, noe vi har gjort siden det er forskjellige analysebehov.
Et av feltene vi får etter punkt 4. er av typen array. I et av use-casene våre ønsker vi å ekspandere dette slik at hvert element blir en rad hver. Dette gjør vi med Expand-transformasjonen.
Igjen brukes Manage field-transformasjonen for å kun ta med de relevante kolonnene og gi dem riktig navn.
Da sitter vi igjen med et penere format på dataen i stedet for nøstede felt, som vist under med litt test-data.
vehicle_id | timestamp | latitude | longitude | total_fuel_used | total_distance |
1234 | 2024-10-30T12:22:54.127Z | 70.064023 | 22.613211 | 101689 | 292511 |
1235 | 2024-10-30T12:22:54.24Z | 63.208197 | 9.010019 | 18617 | 42511 |
Etter transformasjonene er gjort må dataene sendes videre til bruk. Her kan vi lagre dataen eller sende den videre til en Custom App utenfor Fabric. I punkt 8. lagrer vi dataen i et Eventhouse.
Lagre dataen
Vi lagrer sanntidsdataen i et Eventhouse. Et Eventhouse kan bestå av flere KQL-databaser som er optimalisert for sanntidsdata hvor vi bruker Kusto Query Language (KQL) for å spørre på og analysere dataen.
Når vi åpner et Eventhouse vil vi få en oversikt over lagring og aktivitet. Som dere ser kan vi se hvor mye data som er lagret og hvor den lagres. De to lagringsmetodene er OneLake standard storage og OneLake cache storage. Man kan for hvert Eventhouse, hver KQL-database eller hver tabell velge retention og cashing policies som påvirker hvordan dataen lagres.
Cashing sier noe om tilgjengeligheten av dataen, altså hvor raskt du kan lese den. I standard storage tar det noe lenger tid enn for cache. Vi sender all dataen som er mer enn 50 dager gammel til standard storage, og denne regelen gjelder for hele KQL-databasen.
Vi tilgjengeliggjør data til Sjåførappen til Tide som er interessert i blant annet siste GPS-posisjon. Det er derfor ikke viktig å ta vare på data som er eldre enn noen dager. I slike tilfeller setter vi en retention policy på de enkelte tabellene det gjelder. Da vil dataen slettes etter minst så mange dager du setter retention til og vi slipper lagre unødvendig data.
For data som brukes i rapportering derimot ønsker vi full historikk, så i disse tilfellene setter vi ikke en retention policy, og dataen slettes ikke.
Men sanntidsdataen i seg selv trenger ofte å berikes av annen data. Vi ønsker mer informasjon om bussene, hvilken avdeling de tilhører, sjåføren og liknende. Denne type dimensjonsdata er ikke sanntidsdata og lastes gjerne inn med batch-laster og lagres i et Lakehouse.
Heldigvis lar Fabric deg bruke data på tvers av Eventhouse og Lakehouse. Vi kan enten bruke Eventhouse-data i et Lakehouse eller omvendt, og hvilken av de som velges avhenger av behovene. For å sammenslå dataen på tvers bruker vi OneLake Availability og Shortcuts til Eventhouse.
OneLake Availability
Et av de sentrale feltene som sendes som sanntidsdata fra bussene er drivstoffdata. For Tide er det viktig å vite hvor mye drivstoff enhver buss og sjåfør forbruker slik at de kan sette i gang tiltak for å redusere overdrevent bruk. Dette er både bra for miljøet og en kostnadsbesparelse!
I slike tilfeller holder det å rapportere daglig, og siden resten av dataen som skal brukes til berikelse ligger i et Lakehouse kan vi i Eventhouse skru på OneLake Availability. Da får vi en OneLake path i Delta Lake-format som kan brukes til å lese data fra tabellen. Det er da en enkel sak å bruke denne pathen for å lage en shortcut til ditt foretrukne Lakehouse. Da kan denne tabellen brukes på samme måte som resten av tabellene der, for eksempel i Notebooks.
Som dere ser fra bildet, er det en viss latency med denne metoden. Det er den maksimale tiden det tar før dataen er tilgjengelig i OneLake-tabellen, men i use-cases som dette er det en triviell latency ettersom resten av modellen oppdateres daglig.
I vårt tilfelle bygger vi logikken videre i en Notebook og går gjennom den vanlige medaljongstrukturen for å bygge fakta- og dimensjonstabeller. Disse bruker vi videre for å lage Semantiske modeller vi rapporterer på.
Shortcuts til Eventhouse
Det er også mulig å bruke data lagret i et Lakehouse sammen med sanntidsdataen din direkte i KQL-databasen.
Å berike dataen direkte i KQL-databasen gjør det derfor mulig å bygge sanntidsrapporter som gir raskere innsikt. I tillegg gjør dette det lettere for eksterne applikasjoner, som Tides Sjåførapp, å hente ut dataen de trenger der og da med minimal latency. Det er ofte bare behov for å hente ut data om busser i en gitt avdeling, og uten denne berikingen av dataen måtte man hatt en separat prosess for å finne ut hvilken avdeling bussen tilhører.
Tilgjengeliggjøring av dataen
Tide bygger som sagt også en Sjåførapp som skal gi bussjåførene informasjon om deres kjøring og bussen(e) de skal kjøre en viss dag. Parkeringsplassene til bussene er store og ofte uoversiktlige som kan gjøre det vanskelig for sjåførene å finne bussen de skal kjøre en gitt dag med en gang. For å hjelpe sjåførene her bruker vi GPS-posisjonen bussene sender ut og tilgjengeliggjør den.
Hver dag sendes det flere millioner rader med data fra bussene, men for appen er det bare det siste som sendes fra hver buss som er interessant. Når dataen er plassert i et Eventhouse har vi derfor et materialisert view som presenterer de siste ikke-NULL verdiene. Koden for denne lagres i et KQL Queryset slik at den kan endres om nødvendig, og vi har kildekontroll på den.
Da er det bare å kopiere Query URI-en til KQL-databasen og gi en Managed Identity tilgang til det materialiserte viewet så har appen tilgang til dataen de trenger i sanntid. Dette hjelper å gjøre sjåførenes liv enklere og lar dem fokusere på å yte en god service til passasjerene sine!
Handle basert på dataen
Data Activator er en egen brukeropplevelse i Microsoft Fabric. Her kan man benytte Reflex for å handle basert på sanntidsdataen. I begynnelsen av prosjektet, da datamengden var relativt lav, benyttet vi oss av dette inne i en Eventstream direkte. Det er så enkelt som å sette en ny destinasjon i din Eventstream, slik som Eventhouse er i punkt 8. i skissen over Eventstream.
Etter hvert som prosjektet gikk, fikk vi inn mer og mer data og vi så raskt at Reflex ikke enda håndterte dette. Antall eventer per sekund inn i en Reflex kan ikke overgå 1 000 nå som det er i Public Preview.
Vi måtte derfor finne en ny metode for å kunne handle basert på dataen, og det viktigste for oss i starten var å sørge for at dataen som skulle lastes inn faktisk ble lastet inn. Dette er spesielt viktig når teknologien vi bruker er i Public Preview siden den ikke er like robust som i General Availability.
Heldigvis er det mulig å koble Reflex til KQL Querysets også. Med en relativt enkel KQL-spørring kan vi da sjekke om det kommer ny data inn til tabellene i KQL-databasen eller ikke. I koden under sjekker vi om det har kommet nye data den siste timen. Denne vil kun ha en output om Count < 1, altså om det ikke er ny data.
table_name
| where ingestion_time() between (now(-1h) .. now())
| count
| where Count < 1
Vi kan så sette en alert på dette gjennom Reflex som vil sende en e-post dersom KQL-spørringen har en output. Her er det Reflex alert_rf som håndterer dette, og vi kan her enten velge en allerede eksisterende Reflex eller lage en ny.
Denne tjenesten er også i Public Preview, men har vist seg å virke godt for formålet til nå.
I fremtiden håper vi å kunne utvide dette enda mer når Data Activator kommer i General Availability. Dette vil hjelpe mye for å sikre datakvaliteten og handle raskere basert på dataen!
Hvordan holder man styr på alt dette?
Real-Time Intelligence består av mange deler så det kunne fort blitt uoversiktlig. Heldigvis har Fabric en Real-Time hub som samler alt på ett sted (tilsvarende slik som OneLake data hub gjør). Her kan du se alle Evenstreams og KQL-tabeller som finnes i tenanten din (gitt at du har tilgang til dem) og gå dypere inn å analysere dem.
Fra Real-Time hub kan vi også laste inn flere kilder hvis du trykker på “Add source”, samt at vi kan bruke fanen til venstre for å laste inn fra Azure via CDC. Her kan vi også starte prosesser dersom det er en endring i et Fabric Workspace eller i en Azure Blob Storage.
Real-Time hub hjelper altså å holde oversikten over alt vi gjør Real-Time, samt at vi kan starte nye prosesser herfra.
Noen siste råd
Mange av Microsoft Fabric sine kapabiliteter er enda unge, og flere av dem også i Public Preview. Når vi har jobbet med Real-Time Intelligence har vi merket oss et par ting dere også burde passe på:
I en Eventstream er schema på dataen kun definert av én rad, den første raden som kommer opp i fanen “Preview data”. Det betyr at når vi skal transformere dataen er ikke dette mulig for kolonner som er NULL i første rad. Fra en av kildene våre er det slik at det alltid er en av kolonnene som er NULL i hver sending, så for å kunne definere transformasjonene var vi nødt til å sende dummy-data med tilsvarende schema først.
CI/CD for alle Fabric sine tjenester er ikke tilgjengelig enda, spesielt ikke for Real-Time Intelligence. Dette betyr dårlig kildekontrol og at det er umulig å bruke Deployment Pipelines mellom utvikler- og produksjonsmiljø. Spesielt forbedringer og forandringer av tjenester i produksjon blir derfor mer komplisert.
Data Activator sin Reflex håndterer ikke flere enn 1 000 sendinger per sekund når den brukes i en Eventstream.
Fremtiden
Vi gleder oss til Microsoft kommer ut med nye features i Real-Time Intelligence, og ikke minst gjør det som allerede er tilgjengelig, men i Public Preview til General Availability. Da blir det lettere å lage en mer strømlinjet prosess.
Vi gleder oss også til et nærmere samarbeid med tredjeparter som sender sanntidsdataen for å få inn enda flere felt og data vi kan utføre analyse på. Dette er noe vi kontinuerlig jobber med å få til og vil gjøre at vi kan utnytte enda flere av Real-Time Intelligence sine funksjoner. Jo mer data vi kan få i sanntid jo mer kan vi også benytte oss av Real-Time Dashboards i tillegg til Power BI-rapporter.
Konklusjon
Real-Time Intelligence i Microsoft Fabric har allerede et stort potensial og det gir god innsikt hos Tide. Det har hjulpet oss å laste inn, transformere og lagre data uten veldig mye ekstraarbeid. Flere metoder lar oss koble sammen batch- og sanntidsdata for å lage god innsikt, og med Data Activator er det mulig å handle på dataen i sanntid.
Hvordan ser du for deg at du kan bruke Real-Time Intelligence sine muligheter?
Σχόλια