5 minutes
Distribuované systémy
Graf je protokol implementovaný jako distribuovaný systém.
Připojení selhávají. Požadavky přicházejí mimo pořadí. Různé počítače s nesynchronizovanými hodinami a stavy zpracovávají související požadavky. Servery se restartují. Mezi požadavky dochází k opětovným Re-orgs. Tyto problémy jsou vlastní všem distribuovaným systémům, ale v systémech pracujících v globálním měřítku se ještě zhoršují.
Vezměme si tento příklad toho, co může nastat, pokud se klient dotazuje indexeru na nejnovější data během re-org.
- Indexer zpracovat blok 8
- Požadavek doručený klientovi pro blok 8
- Indexer požívá blok 9
- Indexer požívá blok 10A
- Požadavek doručený klientovi pro blok 10A
- Indexer zjistí reorg na 10B a vrátí zpět 10A
- Požadavek doručený klientovi pro blok 9
- Indexer požívá blok 10B
- Indexer požívá blok 11
- Požadavek doručený klientovi pro blok 11
Z pohledu indexátora postupují věci logicky vpřed. Čas se posouvá vpřed, i když jsme museli vrátit blok strýce a přehrát na něj blok v rámci konsensu vpřed. Po cestě Indexer obsluhuje požadavky pomocí nejnovějšího stavu, o kterém v danou chvíli ví.
Z pohledu klienta se však situace jeví jako chaotická. Klient si všimne, že odpovědi se týkaly bloků 8, 10, 9 a 11 v tomto pořadí. Tomu říkáme problém “kmitání bloků”. Když klient zažije blokové kmitání, může se zdát, že si data v průběhu času odporují. Situace se ještě zhorší, když uvážíme, že všechny indexátory nepřijímají nejnovější bloky současně a vaše požadavky mohou být směrovány k více indexátorům.
Je odpovědností klienta a serveru, aby spolupracovali a poskytovali uživateli konzistentní data. V závislosti na požadované konzistenci je třeba použít různé přístupy, protože pro každý problém neexistuje jeden správný program.
Uvažovat o důsledcích distribuovaných systémů je těžké, ale náprava možná není! Vytvořili jsme API a vzory, které vám pomohou orientovat se v některých běžných případech použití. Následující příklady ilustrují tyto vzory, ale přesto opomíjejí detaily vyžadované produkčním kódem (jako je zpracování chyb a zrušení), aby nebyly zastřeny hlavní myšlenky.
Polling for updated data
The Graph provides the block: { number_gte: $minBlock }
API, which ensures that the response is for a single block equal or higher to $minBlock
. If the request is made to a graph-node
instance and the min block is not yet synced, graph-node
will return an error. If graph-node
has synced min block, it will run the response for the latest block. If the request is made to an Edge & Node Gateway, the Gateway will filter out any Indexers that have not yet synced min block and make the request for the latest block the Indexer has synced.
We can use number_gte
to ensure that time never travels backward when polling for data in a loop. Here is an example:
1/// Updates the protocol.paused variable to the latest2/// known value in a loop by fetching it using The Graph.3async function updateProtocolPaused() {4 // It's ok to start with minBlock at 0. The query will be served5 // using the latest block available. Setting minBlock to 0 is the6 // same as leaving out that argument.7 let minBlock = 089 for (;;) {10 // Schedule a promise that will be ready once11 // the next Ethereum block will likely be available.12 const nextBlock = new Promise((f) => {13 setTimeout(f, 14000)14 })1516 const query = `17 query GetProtocol($minBlock: Int!) {18 protocol(block: { number_gte: $minBlock } id: "0") {19 paused20 }21 _meta {22 block {23 number24 }25 }26 }`2728 const variables = { minBlock }29 const response = await graphql(query, variables)30 minBlock = response._meta.block.number3132 // TODO: Do something with the response data here instead of logging it.33 console.log(response.protocol.paused)3435 // Sleep to wait for the next block36 await nextBlock37 }38}
Získání sady souvisejících položek
Dalším případem použití je načítání velkého souboru nebo obecněji načítání souvisejících položek ve více požadavcích. Na rozdíl od případu dotazování (kde byla požadovaná konzistence pro posun v čase) je požadovaná konzistence pro jeden bod v čase.
Here we will use the block: { hash: $blockHash }
argument to pin all of our results to the same block.
1/// Gets a list of domain names from a single block using pagination2async function getDomainNames() {3 // Set a cap on the maximum number of items to pull.4 let pages = 55 const perPage = 100067 // The first query will get the first page of results and also get the block8 // hash so that the remainder of the queries are consistent with the first.9 const listDomainsQuery = `10 query ListDomains($perPage: Int!) {11 domains(first: $perPage) {12 name13 id14 }15 _meta {16 block {17 hash18 }19 }20 }`2122 let data = await graphql(listDomainsQuery, { perPage })23 let result = data.domains.map((d) => d.name)24 let blockHash = data._meta.block.hash2526 let query27 // Continue fetching additional pages until either we run into the limit of28 // 5 pages total (specified above) or we know we have reached the last page29 // because the page has fewer entities than a full page.30 while (data.domains.length == perPage && --pages) {31 let lastID = data.domains[data.domains.length - 1].id32 query = `33 query ListDomains($perPage: Int!, $lastID: ID!, $blockHash: Bytes!) {34 domains(first: $perPage, where: { id_gt: $lastID }, block: { hash: $blockHash }) {35 name36 id37 }38 }`3940 data = await graphql(query, { perPage, lastID, blockHash })4142 // Accumulate domain names into the result43 for (domain of data.domains) {44 result.push(domain.name)45 }46 }47 return result48}
Všimněte si, že v případě reorganizace bude muset klient zopakovat první požadavek na aktualizaci hashe bloku na blok, který není strýčkem.