1: <?php
2: 3: 4: 5: 6: 7: 8: 9: 10:
11:
12: namespace ManiaLive\Threading;
13:
14: use ManiaLive\Database\Connection;
15:
16: 17: 18: 19:
20: class Thread extends \ManiaLib\Utils\Singleton
21: {
22: private $threadId;
23: private $parentId;
24: private $database;
25: private $logger;
26: private $taskBuffer = array();
27:
28: protected function __construct()
29: {
30: $options = getopt(null, array('threadId:', 'parentId:'));
31: $this->threadId = (int) $options['threadId'];
32: $this->parentId = (int) $options['parentId'];
33: $this->initDatabase();
34: $this->logger = \ManiaLive\Utilities\Logger::getLog('threading');
35:
36: $this->logger->write('Thread started successfully!', array('Process #'.$this->parentId.'.'.$this->threadId));
37: if($this->database->isConnected())
38: $this->logger->write('DB is connected, waiting for jobs ...', array('Process #'.$this->parentId.'.'.$this->threadId));
39: }
40:
41: private function initDatabase()
42: {
43: $options = getopt(null, array('dbHost::', 'dbPort::', 'dbUsername::', 'dbPassword::', 'dbDatabase::'));
44:
45: $dbConfig = \ManiaLive\Database\Config::getInstance();
46: foreach($options as $key => $value)
47: $dbConfig->{lcfirst(substr($key, 2))} = $value;
48:
49: $this->database = Connection::getConnection(
50: $dbConfig->host,
51: $dbConfig->username,
52: $dbConfig->password,
53: $dbConfig->database,
54: 'MySQL',
55: $dbConfig->port
56: );
57:
58: $configs = array(
59: 'config' => \ManiaLive\Config\Config::getInstance(),
60: 'wsapi' => \ManiaLive\Features\WebServices\Config::getInstance(),
61: 'manialive' => \ManiaLive\Application\Config::getInstance(),
62: 'server' => \ManiaLive\DedicatedApi\Config::getInstance(),
63: 'threading' => Config::getInstance()
64: );
65: foreach($configs as $dbName => $instance)
66: {
67: $data = $this->getData($dbName, array());
68: foreach((array) $data as $key => $value)
69: $instance->$key = $value;
70: }
71: }
72:
73: function setData($key, $value)
74: {
75: $this->database->execute(
76: 'INSERT INTO ThreadingData(parentId, name, value) VALUES (%d, %s, %s)',
77: getmypid(),
78: $this->database->quote($key),
79: $this->database->quote(base64_encode(serialize($value)))
80: );
81:
82: return $this->database->affectedRows() > 0;
83: }
84:
85: function getData($key, $default = null)
86: {
87: $result = $this->database->execute(
88: 'SELECT value FROM ThreadingData WHERE name=%s AND parentId=%d',
89: $this->database->quote($key),
90: getmypid()
91: );
92:
93: return $result->recordAvailable() ? unserialize(base64_decode($result->fetchSingleValue())) : $default;
94: }
95:
96: function run()
97: {
98: while(true)
99: {
100: $task = $this->nextTask();
101:
102: if($task)
103: {
104: $this->logger->write('Executing Command #'.$task['commandId'].'...', array('Process #'.$this->parentId.'.'.$this->threadId));
105:
106: $startTime = microtime(true);
107: $result = $task['task']->run();
108: $timeTaken = (microtime(true) - $startTime) * 1000;
109:
110: $this->database->execute(
111: 'UPDATE ThreadingCommands SET result=%s, timeTaken=%f WHERE commandId=%d AND parentId=%d',
112: $this->database->quote(base64_encode(serialize($result))),
113: $timeTaken,
114: $task['commandId'],
115: $this->parentId
116: );
117:
118: $this->logger->write('Command #'.$task['commandId'].' done in '.round($timeTaken, 3).' ms!', array('Process #'.$this->parentId.'.'.$this->threadId));
119: }
120: else
121: sleep(1);
122:
123: if(!$this->isParentRunning())
124: exit();
125: }
126: }
127:
128: private function nextTask()
129: {
130: if(empty($this->taskBuffer))
131: {
132: $tasks = $this->database->execute(
133: 'SELECT commandId, task FROM ThreadingCommands WHERE threadId=%d AND result IS NULL AND parentId=%d ORDER BY commandId ASC',
134: $this->threadId,
135: $this->parentId
136: );
137:
138: while(($task = $tasks->fetchAssoc()))
139: {
140: $task['task'] = unserialize(base64_decode($task['task']));
141: $this->taskBuffer[] = $task;
142: }
143: }
144:
145: return array_shift($this->taskBuffer);
146: }
147:
148: private function isParentRunning()
149: {
150:
151: if(stripos(PHP_OS, 'WIN') !== 0)
152: {
153: exec('ps '.$this->parentId, $output);
154: return count($output) >= 2 && strpos($output[1], 'bootstrapper.php') !== false;
155: }
156:
157: else
158: {
159: exec('tasklist /FI "PID eq '.$this->parentId.'"', $output);
160: return count($output) >= 4 && strpos($output[3], 'php.exe') !== false;
161: }
162: }
163:
164: }