1: <?php
2: /**
3: * Copyright 2012-2014 Rackspace US, Inc.
4: *
5: * Licensed under the Apache License, Version 2.0 (the "License");
6: * you may not use this file except in compliance with the License.
7: * You may obtain a copy of the License at
8: *
9: * http://www.apache.org/licenses/LICENSE-2.0
10: *
11: * Unless required by applicable law or agreed to in writing, software
12: * distributed under the License is distributed on an "AS IS" BASIS,
13: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14: * See the License for the specific language governing permissions and
15: * limitations under the License.
16: */
17:
18: namespace OpenCloud\ObjectStore\Upload;
19:
20: use Guzzle\Http\EntityBody;
21: use Guzzle\Http\ReadLimitEntityBody;
22:
23: /**
24: * A transfer type which executes in a concurrent fashion, i.e. with multiple workers uploading at once. Each worker is
25: * charged with uploading a particular chunk of data. The entity body is fragmented into n pieces - calculated by
26: * dividing the total size by the individual part size.
27: *
28: * @codeCoverageIgnore
29: */
30: class ConcurrentTransfer extends AbstractTransfer
31: {
32: public function transfer()
33: {
34: $totalParts = (int) ceil($this->entityBody->getContentLength() / $this->partSize);
35: $workers = min($totalParts, $this->options['concurrency']);
36: $parts = $this->collectParts($workers);
37:
38: while ($this->transferState->count() < $totalParts) {
39: $completedParts = $this->transferState->count();
40: $requests = array();
41:
42: // Iterate over number of workers until total completed parts is what we need it to be
43: for ($i = 0; $i < $workers && ($completedParts + $i) < $totalParts; $i++) {
44: // Offset is the current pointer multiplied by the standard chunk length
45: $offset = ($completedParts + $i) * $this->partSize;
46: $parts[$i]->setOffset($offset);
47:
48: // If this segment is empty (i.e. buffering a half-full chunk), break the iteration
49: if ($parts[$i]->getContentLength() == 0) {
50: break;
51: }
52:
53: // Add this to the request queue for later processing
54: $requests[] = TransferPart::createRequest(
55: $parts[$i],
56: $this->transferState->count() + $i + 1,
57: $this->client,
58: $this->options
59: );
60: }
61:
62: // Iterate over our queued requests and process them
63: foreach ($this->client->send($requests) as $response) {
64: // Add this part to the TransferState
65: $this->transferState->addPart(TransferPart::fromResponse($response));
66: }
67: }
68: }
69:
70: /**
71: * Partitions the entity body into an array - each worker is represented by a key, and the value is a
72: * ReadLimitEntityBody object, whose read limit is fixed based on this object's partSize value. This will always
73: * ensure the chunks are sent correctly.
74: *
75: * @param int The total number of workers
76: * @return array The worker array
77: */
78: private function collectParts($workers)
79: {
80: $uri = $this->entityBody->getUri();
81:
82: $array = array(new ReadLimitEntityBody($this->entityBody, $this->partSize));
83:
84: for ($i = 1; $i < $workers; $i++) {
85: // Need to create a fresh EntityBody, otherwise you'll get weird 408 responses
86: $array[] = new ReadLimitEntityBody(new EntityBody(fopen($uri, 'r')), $this->partSize);
87: }
88:
89: return $array;
90: }
91: }
92: