Üzenetkezelők AWS-ben: Kinesis Data Streams
|

Ez a cikk egy, az elosztott rendszerek és a mikroszervíz-alapú alkalmazások komponensei közötti üzenetküldési lehetőségekkel foglalkozó cikksorozat harmadik része. Az első rész az SQS Standard Queue-ról, a második az SQS FIFO Queue-ról szólt, a sorozat további részeiben tervezzük az SNS és az Amazon MQ tárgyalását.

Az Amazon Kinesis nem egy konkrét szolgáltatás, hanem több szolgáltatás gyűjtőneve. A több szolgáltatás közül ebben a cikbben a Kinesis Data Streams-szel foglalkozunk, de az egyszerűség okán a cikkben sokszor csak Kinesisnek hívjuk majd.

A Kinesis Data Streams azon felül is sok szempontból hasonlít az SQS-re, hogy ezzel is az alkalmazásunk részei között küldhetünk üzeneteket:

  • Több producer küldhet üzeneteket egyetlen helyre,
  • több consumer veheti el az üzeneteket,
  • maga a szolgáltatás skálázható,
  • mindig több AZ-ben, redundánsan tárolja az adatokat, azaz ha a Kinesis OK-t küldött a producernek, akkor adat már nem vész el.

És - ahogy látni fogjuk - rengeteg dologban különbözik az SQS-től. Emeljünk ki pár szóhasználatbeli eltérést:

  • Az SQS-ben sorokat (queue) adtunk meg, a Kinesisben stream-ről (áram, áramlat) beszélünk, ezzel is hangsúlyozva, hogy a Kinesisben arra számítunk, hogy az adatok, bár kis csomagonként, de gyakorlatilag megszakítás nélkül érkeznek.
  • Az SQS esetében üzenetekről beszélünk, a Kinesis esetében rekordokról.
Amazon Kinesis

Hol használjuk a Kinesis-t?

Már említettük, hogy alapvetően olyan helyeken, ahol az adatok folyamatosan jönnek:

  • méréseknél, szenzorok adatainak feldolgozásánál,
  • online játékoknál,
  • naplóüzenetek és metrikák begyűjtésekor.

A felhasználás körét lényegesen módosítja, kitágítja az SQS-hez képest, hogy a feldolgozott üzenetek megmaradnak a Kinesisben. Ez lehetővé teszi, hogy amíg az alkalmazásunk egyik része el van foglalva az adatok feldolgozásával, értelmezésével (például egy játék esetén a játékosok mozgatásával), addig egy másik rész, egy másik mikroszervíz például statisztikát készít ugyanabból a streamből.

Shardok

Amikor létrehozunk egy Kinesis Stream-et, egészen biztosan létrejön benne egy shard (szilánk, töredék), de egy streamben lehet több shard is. A rekordok mindig egy konkrét shardba kerülnek a stream-en belül. Ezek a shardok azok, amik az adatátbocsátás sebességét meghatározzák. Egy shard képes

  • 1 MB adatot és
  • 1000 PUT műveletet

feldolgozni másodpercenként. Amelyiket először elérjük, az korlátoz. Van olvasási korlát is: egy shardból másodpercenként 2 MB adat olvasható ki. Ha a három adat bármelyike korlátoz, akkor több shardra van szükségünk. Shardból régiónként egy AWS-fiók tízet kap, de ez soft limit, azaz az AWS Supportnál jegyet nyitva kérhető az emelése - százezerig.

Alaphelyzetben nem tudjuk közvetlenül meghatározni, hogy melyik shardba kerüljön az adatunk. Ha a "közvetlenül" kitételre felkaptuk a fejünk, jól tettük. A beérkező terhelés shardok közti egyenletes elosztását az AWS úgy igyekszik elérni, hogy minden rekordhoz meg kell adnunk egy partition key-t, ami alapján az AWS előállít egy int128(md5(partition_key)) pszeudokóddal leírható hash-t. Ez a hash dönti el, hogy melyik shardba kerül az adatunk. Az alaphelyzetben meglévő egyetlen shard minden lehetséges hasht-t fogad:

aws kinesis describe-stream --stream-name myStream
{
    "StreamDescription": {
        "Shards": [
            {
                "ShardId": "shardId-000000000000",
                "HashKeyRange": {
                    "StartingHashKey": "0",
                    "EndingHashKey": "340282366920938463463374607431768211455"
                },
                ...

Azt látjuk, hogy a streamünk nulladik - és egyetlen - shardja fogja átvenni az összes üzenetet, ugyanis a hash-intervalluma magába foglalja az összes lehetséges hash-értéket.

Ha kérjük, hogy legyen mégis két shardunk, akkor a fogadott üzenetek a hash-ek alapján két shardba kerülnek. Látjuk, hogy az eredeti shard nem tűnik el azonnal (a 0-s a régi, az 1-es és a 2-es shard az új):

aws kinesis update-shard-count --stream-name myStream --target-shard-count 2 --scaling-type UNIFORM_SCALING
aws kinesis describe-stream --stream-name myStream
{
    "StreamDescription": {
        "Shards": [
            {
                "ShardId": "shardId-000000000000",
                "HashKeyRange": {
                    "StartingHashKey": "0",
                    "EndingHashKey": "340282366920938463463374607431768211455"
                },
                "SequenceNumberRange": {
                    "StartingSequenceNumber": "49610814757437399477767104689162985062854364633730383874",
                    "EndingSequenceNumber": "49610814757448549850366370000732543996171501345576058882"
                }
            },
            {
                "ShardId": "shardId-000000000001",
                "ParentShardId": "shardId-000000000000",
                "HashKeyRange": {
                    "StartingHashKey": "0",
                    "EndingHashKey": "170141183460469231731687303715884105727"
                },
                "SequenceNumberRange": {
                    "StartingSequenceNumber": "49610816975759426611203780447145488798005255947107172370"
                }
            },
            {
                "ShardId": "shardId-000000000002",
                "ParentShardId": "shardId-000000000000",
                "HashKeyRange": {
                    "StartingHashKey": "170141183460469231731687303715884105728",
                    "EndingHashKey": "340282366920938463463374607431768211455"
                },
            ...

A két új shard kapja az új rekordokat. Ha mostantól kellőképp különböző partition key-eket adunk meg a rekordok elküldésekor (például a játékunkban az adott játékos felhasználóazonosítóját), akkor megvalósul a shardok egyenletes terhelése. Az azonos primary key-jel rendelkező üzenetek egészen biztosan ugyanabba a shardba kerülnek.

Bizonyára vannak olyan olvasóink, akiknek nem csak a "közvetlenül", hanem az "alaphelyzetben" szó is szemet szúrt. Nos, van lehetőség arra is, hogy az emlegetett hash-algoritmust felülbírálva, közvetlenül megadjuk a hash-t a beküldött rekord mellé. Ez az érték persze így nem lesz hash (nem hash-eltünk semmit), de azért úgy hívjuk. Ilyen módszerrel az alkalmazásuknak módjában áll eldönteni, hogy melyik shardra kerüljön a rekord.

Rekordok beküldése

Rekordokat (amik ugyebár legfeljebb egy megabájtosak lehetnek) base64 eljárással kódoltan adhatunk meg:

aws kinesis put-record --stream-name myStream --partition-key 'walami' --data 'TmFow6F0LCBlbG9sdmFzdGFkPwo='
{
  "ShardId": "shardId-000000000001",
  "SequenceNumber": "49600902273357540915989931256901506243878407835297513618",
  ...

A base64 kód előállítása Unix-szerű rendszereken egyszerűen az echo 'Üzenet'|base64 paranccsal történhet. Ha ez a lehetőség nem áll rendelkezésre, vagy úgy kényelmesebb, hamar találunk az interneten base64-átalakítást végző weboldalt.

Rekordok kiolvasása

Kinesis Data Streams

Hűséges olvasóink hajlamosak lehetnek úgy vélekedni, hogy a mellékelt kép ugyanaz, mint amit az SQS FIFO-soroknál bemutattunk. Igazuk is van, eltekintve egy apró, ám jelentős különbségtől: a jobb oldali nyilak ezúttal a "cső" fölött kezdődnek, ezzel szemléltetve azt, hogy az olvasás nem a streamből, hanem egy adott shardból történik. Emlékszünk még az SQS-re? Ott azt mondtuk - amit az AWS is -, hogy a "next best" üzenetet adja oda a sor, nem tudjuk eldönteni, hogy melyik üzenetcsoportból olvasunk. Hát itt nem csak hogy tudjuk, egyenesen kell. Na és melyik shardra került a beküldött rekord? Ugyan kiírta az aws cli, amikor az előző parancsot kiadtuk, de a stream másik végén hallgatózó alkalmazásunk erről mit sem tud. Ha épp tervezési fázisban vagyunk, segítségünkre lehet az alábbi Python egysoros:

python3 -c "import hashlib;print(int(hashlib.md5('walami'.encode('utf-8')).hexdigest(), 16))"
aminek az eredményéről (18437533795427874473157828846196577706), ha nem is ránézésre, de tudjuk, hogy az 1-es shard HashKeyRange-ébe esik, azaz ott kérdezősködünk. Megjegyezzük, hogy a "Melyik shardot kérdezzem?" probléma az igazi alkalmazásoknál is fennáll, és a fejlesztőnek kell megoldania, hogy minden shard elegendő figyelmet kapjon. Az első dolgunk egy iterátor beszerzése, amit el is teszünk egy változóba, mert igen hosszú:
ITERATOR=$(aws kinesis get-shard-iterator --stream-name myStream --shard-id shardId-000000000001 --shard-iterator-type TRIM_HORIZON --query 'ShardIterator')
Ha már van iterátorunk, mehet a kérés a shardhoz:
aws kinesis get-records --shard-iterator $ITERATOR
{
    "Records": [
        {
            "SequenceNumber": "49610816975759426611203780447152742352923072502454812690",
            "ApproximateArrivalTimestamp": "2020-09-15T14:08:07.358000+02:00",
            "Data": "TmFow6F0LCBlbG9sdmFzdGFkPwo=",
            "PartitionKey": "walami"
        }
És meg is van a - még mindig base64-kódolt - adatunk. Iterátorból többfélét kérhetünk: állhatunk a shard elejére vagy adott számú rekordjára, a fent használt TRIM_HORIZON pedig az első kiolvasatalan értéktől indít. Az iterátor adott ideig érvényes, és mindig megkapjuk a következő iterátort is, mert jól elképzelhető, hogy mire az elsővel kiolvassuk az adatokat, újak kerülnek a shardra.

Rekord törlése

Ez a szakasz rövid lesz: nincs mód rekordot törölni a Kinesisből. A rekordok a retenciós idő lejártával automatikusan ürülnek. Ez az idő alapesetben 24 óra, és legfeljebb 168 órára növelhető.

És miért ne használjunk SQS helyett mindig Kinesis-t?

Szinte annyi shard-ot hozunk létre, amennyi tetszik, az üzenetek biztosan sorban érkeznek meg, és amennyiben a producerünk megkapja az OK-t, még duplák sem kerülnek a streambe. Mi szól mégis az SQS mellett? Alapvetően a Kinesis-re támaszkodó alkalmazás komplexitása. A mi dolgunk figyelni arra,

  • hogy minden shardot "igazságosan" olvassunk,
  • hogy ha új shardot hozunk létre a bemenő oldalon, akkor a kimenő is értesüljön róla,
  • hogy melyik rekordot olvastuk már (bár ebben a Kinesis segít), ami különösen akkor válhat bonyolulttá, ha több consumerünk is van.

Az AWS is látja ezt a problémát, és rendelkezésre bocsátja a Kinesis Consumer Library-t a fejlesztők életét megkönnyítendő. Párjával, a KPL-lel (P mint Producer) együtt hasznos segédeszköz lehet sok esetben.

Befejezve mélyreható áttekintésünket, búcsút veszünk a Kinesis Data Streams-től. Remélhetőleg tudtunk segíteni azoknak a rendszertervezőknek, architecteknek, akik épp e technológia felé pillantgatnak.

[Vissza a bejegyzésekhez]