7 分钟
分布式系统
Graph 是分布式系统实现的协议。
连接可能失败。 请求无序到达。 具有不同步时钟和状态的很多计算机在处理相关请求。 服务器重新启动。 重组发生在请求之间。以上这些问题是所有分布式系统所固有的,但在全球范围内运行的系统中会更加严重。
考虑这个例子,如果客户端在重组期间轮询索引人以获取最新数据,可能会发生什么。
- 索引人获取区块 8
- 区块 8 的数据传送给客户端
- 索引人获取区块 9
- 索引人获取区块 10A
- 区块 10A 的数据传送给客户端
- 索引人检测到区块重组到 10B 并回转到 10A
- 区块 9 的数据传送给客户端
- 索引人获取区块 10B
- 索引人获取区块 11
- 区块 11 的数据传送给客户端
从索引人的角度来看,事情进展得完全合乎逻辑。 时间不断前行,尽管我们确实必须回滚一个相邻区块,并在共识下处理该区块。 在此过程中,索引人使用它当时知道的最新状态为请求提供服务。
然而,从客户的角度来看,事情却显得很混乱。 客户端观察到响应依次是区块 8、10、9 和 11。 我们称之为“区块摆动”问题。 当客户端遇到区块摆动时,数据可能会随着时间的推移而自相矛盾。 当考虑到索引人不会同时获取最新区块,并且您的请求可能会被路由到多个索引人的时候,情况会变得更糟。
客户端和服务器有责任协同工作,以便向用户提供一致的数据。 根据所需的一致性目标的不同,我们必须使用不同的方法,因为没有一个适合所有问题的完美解决方案。
通过分布式系统的影响进行推理是困难的,但修复可能不是! 我们已经建立了一些 API 和模式来帮助您解决一些常见的用例。 以下示例说明了这些模式,但仍然省略了生产代码所需的细节(如错误处理和消除),以免混淆主要思想。
轮询更新的数据
The Graph 提供 block: { number_gte: $minBlock }
API,确保响应是针对等于或高于$minBlock
的单个区块。 如果向graph-node
实例发出请求并且最小区块尚未同步,则graph-node
将返回错误。 如果 graph-node
已同步最小区块,它将返回最新区块的响应。 如果请求是发给 Edge &Node 网关的,网关将过滤掉任何尚未同步最小区块的索引人,并请求索引人已同步的最新区块。
我们可以使用number_gte
, 从而确保在循环中轮询数据时,时间不会倒流。 这是一个例子:
1/// Updates the protocol.paused variable to the latest2/// known value in a loop by fetching it using The Graph.3async function updateProtocolPaused() {4 // It's ok to start with minBlock at 0. The query will be served5 // using the latest block available. Setting minBlock to 0 is the6 // same as leaving out that argument.7 let minBlock = 089 for (;;) {10 // Schedule a promise that will be ready once11 // the next Ethereum block will likely be available.12 const nextBlock = new Promise((f) => {13 setTimeout(f, 14000)14 })1516 const query = `17 query GetProtocol($minBlock: Int!) {18 protocol(block: { number_gte: $minBlock } id: "0") {19 paused20 }21 _meta {22 block {23 number24 }25 }26 }`2728 const variables = { minBlock }29 const response = await graphql(query, variables)30 minBlock = response._meta.block.number3132 // TODO: Do something with the response data here instead of logging it.33 console.log(response.protocol.paused)3435 // Sleep to wait for the next block36 await nextBlock37 }38}
获取一组相关项目
另一个用例是检索一个更大的集合,或者更一般地说,跨多个请求检索相关项目。 与轮询案例(所需的一致性是及时向前进行)不同,此用例所需的一致性是针对单个时间点的。
在这里,我们将使用 block: { hash: $blockHash }
参数将我们所有的结果锚定到同一个区块。
1/// Gets a list of domain names from a single block using pagination2async function getDomainNames() {3 // Set a cap on the maximum number of items to pull.4 let pages = 55 const perPage = 100067 // The first query will get the first page of results and also get the block8 // hash so that the remainder of the queries are consistent with the first.9 const listDomainsQuery = `10 query ListDomains($perPage: Int!) {11 domains(first: $perPage) {12 name13 id14 }15 _meta {16 block {17 hash18 }19 }20 }`2122 let data = await graphql(listDomainsQuery, { perPage })23 let result = data.domains.map((d) => d.name)24 let blockHash = data._meta.block.hash2526 let query27 // Continue fetching additional pages until either we run into the limit of28 // 5 pages total (specified above) or we know we have reached the last page29 // because the page has fewer entities than a full page.30 while (data.domains.length == perPage && --pages) {31 let lastID = data.domains[data.domains.length - 1].id32 query = `33 query ListDomains($perPage: Int!, $lastID: ID!, $blockHash: Bytes!) {34 domains(first: $perPage, where: { id_gt: $lastID }, block: { hash: $blockHash }) {35 name36 id37 }38 }`3940 data = await graphql(query, { perPage, lastID, blockHash })4142 // Accumulate domain names into the result43 for (domain of data.domains) {44 result.push(domain.name)45 }46 }47 return result48}
请注意,在重组的情况下,客户端将需要从第一个请求重试,以将区块hash更新为非相邻区块。