分散システム
Reading time: 8 min
グラフは、分散システムとして実装されたプロトコルです。
接続に失敗する。リクエストの順番が狂う。時計や状態が同期していない別のコンピュータが、関連するリクエストを処理する。サーバーが再起動する。リクエストとリクエストの間に「再衝突」が起こる。これらの問題は、すべての分散システムに固有のものですが、グローバルな規模で運用されているシステムでは、さらに悪化します。
re-org 中にクライアントがインデクサに最新のデータを要求した場合の例を考えてみましょう。
- インデクサーがブロック 8 を取り込む
- クライアントにブロック 8 のリクエストを送信
- インデクサーがブロック 9 を取り込む
- インデクサーがブロック 10A を取り込む
- ブロック 10A のクライアントにリクエストが提供される
- インデクサーは 10B への reorg を検出し、10A をロールバックする
- ブロック 9 に対してクライアントへのリクエストを提供
- インデクサーがブロック 10B を取り込む
- インデクサーがブロック 11 を取り込む
- ブロック 11 のクライアントにリクエストを提供
インデクサーの視点では、物事が論理的に進んでいます。時間は進んでいますが、アンクルブロックを巻き戻して、その上にコンセンサス中のブロックを再生する必要がありました。その間、インデクサーは、その時点で知っている最新の状態を使ってリクエストに対応します。
クライアントから見ると、事態は混沌としているように見えます。クライアントは、ブロック 8、10、9、11 の順に応答があったことを確認します。これを「block wobble」問題と呼んでいます。block wobble が発生すると、データが時間とともに矛盾していくように見えることがあります。インデクサがすべての最新ブロックを同時に取り込むわけではないことを考えると、状況は悪化し、クライアントのリクエストは複数のインデクサーにルーティングされる可能性があります。
一貫性のあるデータをユーザーに提供するには、クライアントとサーバーが協力して取り組む必要があります。すべての問題に対して正しいプログラムは存在しないため、必要な一貫性に応じて異なるアプローチを使用する必要があります。
分散システムの意味を理解するのは難しいですが、解決するのは難しいことではありません。私たちは、一般的な使用例をナビゲートするための API とパターンを確立しました。以下の例では、これらのパターンを説明していますが、本番コードで必要とされる詳細(エラー処理やキャンセルなど)は省略していますので、主旨を理解していただくことはできません。
Graph は次のようなブロック: { number_gte: $minBlock }
API を提供しています。これは、レスポンスが$minBlock
と同じかそれ以上の単一ブロックに対するものであることを保証するものです。リクエストが graph-node
のインスタンスに対して行われ、min block がまだ同期されていない場合、graph-node
はエラーを返します。graph-node
が最小ブロックを同期している場合は、最新のブロックに対するレスポンスを実行します。Edge & Node Gateway にリクエストが行われた場合、Edge & Node Gateway は、min block をまだ同期していないインデクサーをフィルタリングし、インデクサーが同期した最新ブロックのリクエストを行います。
ループの中でデータをポーリングする際に、時間が後ろに移動しないようにnumber_gte
を使用することができます。以下にその例を示します:
/// Updates the protocol.paused variable to the latest/// known value in a loop by fetching it using The Graph.async function updateProtocolPaused() {// It's ok to start with minBlock at 0. The query will be served// using the latest block available. Setting minBlock to 0 is the// same as leaving out that argument.let minBlock = 0for (;;) {// Schedule a promise that will be ready once// the next Ethereum block will likely be available.const nextBlock = new Promise((f) => {setTimeout(f, 14000)})const query = `query GetProtocol($minBlock: Int!) {protocol(block: { number_gte: $minBlock } id: "0") {paused}_meta {block {number}}}`const variables = { minBlock }const response = await graphql(query, variables)minBlock = response._meta.block.number// TODO: Do something with the response data here instead of logging it.console.log(response.protocol.paused)// Sleep to wait for the next blockawait nextBlock}}
もうひとつのケースは、大規模なセットを取得することで、一般的には、複数のリクエストにわたって関連するアイテムを取得するケースです。ポーリングの場合(時間を進めて一貫性を保つ必要があった)とは異なり、必要な一貫性はある時点でのものです。
ここでは、 block: { hash: $blockHash }
引数を使用して、すべての結果を同じブロックに固定します。
/// Gets a list of domain names from a single block using paginationasync function getDomainNames() {// Set a cap on the maximum number of items to pull.let pages = 5const perPage = 1000// The first query will get the first page of results and also get the block// hash so that the remainder of the queries are consistent with the first.const listDomainsQuery = `query ListDomains($perPage: Int!) {domains(first: $perPage) {nameid}_meta {block {hash}}}`let data = await graphql(listDomainsQuery, { perPage })let result = data.domains.map((d) => d.name)let blockHash = data._meta.block.hashlet query// Continue fetching additional pages until either we run into the limit of// 5 pages total (specified above) or we know we have reached the last page// because the page has fewer entities than a full page.while (data.domains.length == perPage && --pages) {let lastID = data.domains[data.domains.length - 1].idquery = `query ListDomains($perPage: Int!, $lastID: ID!, $blockHash: Bytes!) {domains(first: $perPage, where: { id_gt: $lastID }, block: { hash: $blockHash }) {nameid}}`data = await graphql(query, { perPage, lastID, blockHash })// Accumulate domain names into the resultfor (domain of data.domains) {result.push(domain.name)}}return result}
なお、re-org が発生した場合、クライアントは最初のリクエストから再試行して、ブロックハッシュをアンクルではないブロックに更新する必要があります。