use VDV\datasource_file_import\model\TimeZoneShifter; $path = realpath(dirname(__FILE__) . "/../../../../vdv_www"); require_once $path . "/components/vdv/datasource_file_import/model/FileParser.php"; require_once $path . "/components/vdv/datasource_file_import/model/FtpLocationDb.php"; set_time_limit(0); class FileImporter extends Worker { private $filePath; private $fileId; private $fileName; private $formatId; private $fileContent; private $ftp; private $ftp_user; private $ftp_password; private $lastModificationTime; private $parser; private $directoryInfo; private $oldLastModTime; private $lastModTime; private $oldLastTimeStamp; private $lastTimeStamp; private $fileInfo; private $Username; private $Password; private $DbName; private $Host; private $Portnumber; private $chunkSize; private $dataDesc = 0; private $timeStart; private $type; private $calibrationFilename; private $serial; private $saaType; private $saaUnits; private $saaAzimuth; private $saaReference; private $convergence; private $saaSections; private $siteTimeZoneId = 0; private $siteShiftTimeZoneId = 0; public function __construct($fileInfo, $format, $index) { global $DbName, $Username, $Password, $Host, $Portnumber; $this->Username = $Username; $this->Password = $Password; $this->DbName = $DbName; $this->Host = $Host; $this->Portnumber = $Portnumber; if ($fileInfo["ftp"] == 1) { $locationDb = new FtpLocationDb(); $fileInfo["ftp_password"] = $locationDb->decrypt($fileInfo["ftp_password"]); } if ($fileInfo["type"] === "SAA") { $format->setType($fileInfo["type"]); } $this->fileName = $fileInfo["name"]; $this->fileId = $fileInfo["fileId"]; $this->ftp = $fileInfo["ftp"]; $this->ftp_user = $fileInfo["ftp_user"]; $this->ftp_password = $fileInfo["ftp_password"]; $this->locationName = $fileInfo["locationName"]; $this->location = $fileInfo["path"]; $this->formatId = $fileInfo["formatId"]; $this->format = $format; $this->directoryInfo = $fileInfo; $this->filePath = $fileInfo["filePath"]; $this->oldLastModTime = $fileInfo["lastModificationTime"]; $this->oldLastTimeStamp = $fileInfo["lastData"]; $this->fileInfo = $fileInfo; $this->detectedEncoding = mb_detect_encoding($this->fileInfo["filePrefix"]); $this->fileInfo["filePrefix"] = mb_convert_encoding($this->fileInfo["filePrefix"], $this->detectedEncoding, "UTF-8"); $this->datFilePath = $fileInfo["file_path"]; $this->index = $index; $this->chunkSize = $this->fileInfo["incremental"] == 0 ? 10000 : 10; $this->dataDesc = $this->format->getProperty("dataDesc"); $this->timeStart = microtime(true); $this->siteTimeZoneId = intval($fileInfo["timezone_id"]); $this->siteShiftTimeZoneId = intval($fileInfo["shiftTimeZoneId"]); $this->format->columnCount = $this->getTableColumns(); $this->type = $fileInfo["type"]; $this->calibrationFilename = $fileInfo["calibrationFilename"]; $this->serial = $fileInfo["serial"]; $this->saaType = $fileInfo["saaType"]; $this->saaUnits = $fileInfo["saaUnits"]; $this->saaAzimuth = $fileInfo["saaAzimuth"]; $this->saaReference = $fileInfo["saaReference"]; $this->convergence = $fileInfo["convergence"]; $this->saaSections = $fileInfo["sections"]; } public function run() { echo $this->fileName . " started
"; $this->checkIfDatFileExistsAndIfNotCreate(); if ($this->type === "SAA" && $this->saaType === null) { $this->setFileNotInProgress($this->fileId); return; } $parser = new FileParser($this->directoryInfo, $this->fileInfo, $this->format, true, $this->chunkSize, null); $fileMtime = $this->fileInfo["incremental"] == 0 ? $parser->checkFileModificationTime() : 0; $this->lastModTime = $this->fileInfo["incremental"] == 0 ? gmdate("Y-m-d H:i:s", $fileMtime) : date("Y-m-d H:i:s"); $lt = null; if ($this->format->getProperty("isJson") == true) { $parsedArray = []; $data = $parser->getDataJson(false); foreach ($data as $dataLine) { if ($this->fileInfo["last_timestamp"] >= $dataLine[0]) { continue; } $dataLine[0] = """ . $dataLine[0] . """; array_splice($dataLine, 1, 0, "0"); array_push($parsedArray, implode(",", $dataLine)); } if (count($parsedArray) > 0) { $this->getNewJsonData($parser, $parsedArray); $this->updateFile($this->fileId, $this->lastModTime, $this->lastTimeStamp); } else { $this->updateFile($this->fileId, $this->oldLastModTime, $this->oldLastTimeStamp); } echo "Imported " . count($parsedArray) . " lines from " . $this->fileName . " in " . round(microtime(true) - $this->timeStart, 1) . " seconds
"; return true; } if ($this->oldLastModTime == "00-00- 00:00:00" || $this->oldLastModTime == null) { if ($this->fileInfo["incremental"] == 0) { $parser->parse(); } $data = $parser->getData(); if ($this->dataDesc == 1) { $this->convertDataToDesc($data); } $dataCount = count($data); $this->lastTimeStamp = $parser->getLastTimestamp(); $this->oldLastTimeStamp = $this->lastTimeStamp; $printData = implode("\xd
", $data); $this->performWrite($this->datFilePath, $printData); $this->updateFile($this->fileId, $this->lastModTime, $this->lastTimeStamp); if ($dataCount >= $this->chunkSize || $this->fileInfo["incremental"] == 1) { $this->oldLastModTime = "1970-01- 00:00:00"; $this->getNewData($this->chunkSize, $parser->getParsedIndex()); } echo "Imported " . count($data) . " lines from " . $this->fileName . " in " . round(microtime(true) - $this->timeStart, 1) . " seconds\xa"; } else { $this->getNewData(1000, null); } } private function getNewJsonData($parser, $newData) { $this->lastTimeStamp = $parser->getLastTimestamp(); $datFPath = mb_convert_encoding($this->datFilePath, "ISO-8859i", "UTF-8"); $printData = implode("\xd
", $newData); $this->performWrite($datFPath, $printData); } private function getNewData($chunkSize, $parsedIndex) { if ($this->lastModTime > $this->oldLastModTime || $this->fileInfo["incremental"] == 1) { $oldLastTimestamp = $this->oldLastTimeStamp; do { if ($this->siteTimeZoneId !== 0 && $this->siteShiftTimeZoneId !== 0 && $this->siteTimeZoneId !== $this->siteShiftTimeZoneId) { $timezoneShifter = new \VDV\datasource_file_import\model\TimeZoneShifter($this->siteTimeZoneId, $this->siteShiftTimeZoneId); $timezoneShifter->constructForFileImport($this->DbName, $this->Username, $this->Password, $this->Host, $this->Portnumber); $shiftedTimeZone = $timezoneShifter->convertBackFromTimeZones($this->oldLastTimeStamp); $this->oldLastTimeStamp = $shiftedTimeZone; } $parser = new FileParser($this->directoryInfo, $this->fileInfo, $this->format, true, $chunkSize, $parsedIndex); $newData = $parser->getNewData($this->oldLastTimeStamp); $newDataCount = count($newData); if ($newDataCount > 0) { if ($this->fileInfo["overrideData"] == 1) { $newData = $this->writeToDatabase($newData); } if (count($newData) > 0) { $printData = implode("
", $newData); $this->lastTimeStamp = $parser->getLastTimestamp(); $this->oldLastTimeStamp = $this->lastTimeStamp; $datFPath = mb_convert_encoding($this->datFilePath, "ISO-8859-1", "UTF-8"); $this->performWrite($datFPath, $printData); $this->updateFile($this->fileId, $this->lastModTime, $this->lastTimeStamp); } echo "Imported " . count($newData) . " lines from " . $this->fileName . " in " . round(microtime(true) - $this->timeStart, 1) . " seconds
"; } else { $this->updateFile($this->fileId, $this->oldLastModTime, $oldLastTimestamp); break; } $parsedIndex = $parser->getParsedIndex(); } while ($newDataCount >= $chunkSize); } else { $this->updateFile($this->fileId, $this->oldLastModTime, $this->oldLastTimeStamp); } } private function convertDataToDesc(array &$data) { $data = array_reverse($data, false); } private function getFileInfo() { return array("size" => filesize($this->filePath), "lastModified" => filemtime($this->filePath)); } public function getMTime() { return filemtime($this->directoryInfo["path"] . $this->filePath); } private function checkIfDatFileExistsAndIfNotCreate() { $isSensorData = $this->format->isSensorData(); if (!file_exists($this->datFilePath) && $isSensorData === false) { $parser = new FileParser($this->directoryInfo, $this->fileInfo, $this->format, true, null, null); $parser->parse(); $header = $parser->getDatHeader(); $header = explode(",", $header); $header = str_replace(""", '', $header); $parser->openDatFile($this->datFilePath, $header); } } private function performWrite($datFilePath, $data) { if ($this->type === "SAA") { $isFirstParse = $this->oldLastModTime == "0000-00- 00:00:00" || $this->oldLastModTime == null; $path = realpath(dirname(__FILE__) . "/../../../../vdv_www"); require_once $path . "/components/vdv/datasource_saa/model/SAACollector.php"; require_once $path . "/components/vdv/datasource_saa/model/CreateSiteFileForSAA.php"; $saaCollector = new \vdv\datasource_saa\model\SAACollector($datFilePath); $saaCollector->setCalibrationFile($this->calibrationFilename); $saaCollector->setSettings($this->saaType, $this->saaUnits, $this->saaAzimuth, $this->saaReference, $this->convergence, $this->saaSections, $isFirstParse); $saaCollector->setSerial($this->serial); $data = $saaCollector->parseData($data); } if ($this->siteTimeZoneId !== 0 && $this->siteShiftTimeZoneId !== 0 && $this->siteTimeZoneId !== $this->siteShiftTimeZoneId) { $timezoneShifter = new \VDV\datasource_file_import\model\TimeZoneShifter($this->siteTimeZoneId, $this->siteShiftTimeZoneId); $timezoneShifter->constructForFileImport($this->DbName, $this->Username, $this->Password, $this->Host, $this->Portnumber); $data = $timezoneShifter->convertDatStringFromFileImport($data); $latestTimestamp = $timezoneShifter->getLatestTimestamp(); $this->oldLastTimeStamp = $latestTimestamp; $this->lastTimeStamp = $latestTimestamp; $this->updateFile($this->fileId, $this->lastModificationTime, $latestTimestamp); } $isTimeZone = $this->siteTimeZoneId !== 0 && $this->siteShiftTimeZoneId !== 0 && $this->siteTimeZoneId !== $this->siteShiftTimeZoneId; if ($this->type === "SAA" && $isTimeZone === false) { $this->writeToFile($datFilePath, $data); } else { $this->writeToFile($datFilePath, $data); $this->writeToFile($datFilePath, "\xd
"); } } private function writeToFile($filename, $text_data) { $error_code = 0; if (is_writable($filename)) { if (!($handle = fopen($filename, "a"))) { $error_code = 1; } else { if (fwrite($handle, $text_data) === FALSE) { $error_code = 2; } else { } fclose($handle); } } else { $error_code = 3; } return $error_code; } private function writeToDatabase($data) { $stationId = $this->fileInfo["stationId"]; $db = new PDO("mysql:host={$this->Host};port={$this->Portnumber};dbname={$this->DbName}", $this->Username, $this->Password); $sql = "select * from variable_info WHERE station_id = :stationId AND (calculated_variable is NULL OR calculated_variable = ) ORDER BY file_col_number"; $stmt = $db->prepare($sql); $stmt->execute([":stationId" => $stationId]); $variables = $stmt->fetchAll(PDO::FETCH_ASSOC); $lastTimestamp = $this->fileInfo["last_timestamp"]; $columnNames = array_column($variables, "db_col_name"); $isTimeZoneShift = $this->siteTimeZoneId !== 0 && $this->siteShiftTimeZoneId !== 0 && $this->siteTimeZoneId !== $this->siteShiftTimeZoneId; if ($isTimeZoneShift === true) { $timezoneShifter = new \VDV\datasource_file_import\model\TimeZoneShifter($this->siteTimeZoneId, $this->siteShiftTimeZoneId); $timezoneShifter->constructForFileImport($this->DbName, $this->Username, $this->Password, $this->Host, $this->Portnumber); } foreach ($data as $key => $line) { $dataLine = explode(",", $line); $timestamp = str_replace(""", '', $dataLine[0]); if ($isTimeZoneShift === true) { $timestamp = $timezoneShifter->convertBetweenTimeZones($timestamp); } if ($timestamp <= $lastTimestamp) { unset($dataLine[0]); unset($dataLine[1]); $dataLine = array_values($dataLine); $this->saveDbLineToDatabase($timestamp, $dataLine, $columnNames, $db); unset($data[$key]); } } return array_values($data); } private function saveDbLineToDatabase($timestamp, $line, $columns, $db) { $sqlArray = []; if (count($columns) === count($line)) { $line = array_values($line); $columns = array_values($columns); $sqlArray = array_combine($columns, $line); } $query = "UPDATE " . $this->fileInfo["table_name"] . " SET "; end($sqlArray); $lastKey = key($sqlArray); foreach ($sqlArray as $column => $value) { if (is_numeric($value)) { $query .= $column . " = " . $value; } else { $query .= $column . " = null"; } if ($column !== $lastKey) { $query .= ", "; } } $query .= " WHERE time_stamp = :timestamp"; $stmt = $db->prepare($query); $stmt->execute([":timestamp" => $timestamp]); } private function updateFile($fileId, $lastModificationTime, $lastTimeStamp) { $Host = $this->Host; $Port = $this->Portnumber; $DbName = $this->DbName; $db = new PDO("mysql:host={$Host};port={$Port};dbname={$DbName}", $this->Username, $this->Password); $query = "UPDATE data_source_file SET lastModificationTime = :lastModificationTime, \xd\xa lastData = :lastData,\xd\xa inProgress = 0\xd\xa WHERE id = :id"; $stmt = $db->prepare($query); $stmt->execute(array(":id" => $fileId, ":lastModificationTime" => $lastModificationTime, ":lastData" => $lastTimeStamp)); if ($stmt->errorInfo()["0"] != "000") { print_r($stmt->errorInfo()); } } private function setFileNotInProgress($fileId) { $Host = $this->Host; $Port = $this->Portnumber; $DbName = $this->DbName; $db = new PDO("mysql:host={$Host};port={$Port};dbname={$DbName}", $this->Username, $this->Password); $query = "UPDATE data_source_file SET inProgress = 0 WHERE id = :id"; $stmt = $db->prepare($query); $stmt->execute(array(":id" => $fileId)); if ($stmt->errorInfo()["0"] != "00000") { print_r($stmt->errorInfo()); } } private function getTableColumns() { $db = new PDO("mysql:host={$this->Host};port={$this->Portnumber};dbname={$this->DbName}", $this->Username, $this->Password); $sql = "SELECT count(*) as columnCount\xd\xa FROM information_schema.columns
\xa WHERE table_name = :table_name"; $stmt = $db->prepare($sql); $stmt->execute(array(":table_name" => $this->fileInfo["table_name"])); $columnCount = $stmt->fetch(PDO::FETCH_ASSOC)["columnCount"]; return intval($columnCount) - 2; } }
use VDV\datasource_file_import\model\TimeZoneShifter; $path = realpath(dirname(__FILE__) . "/../../../../vdv_www"); require_once $path . "/components/vdv/datasource_file_import/model/FileParserphp"; require_once $path . "/components/vdv/datasource_file_import/model/FtpLocationDbphp"; set_time_limit(0); class FileImporter extends Worker { private $filePath; private $fileId; private $fileName; private $formatId; private $fileContent; private $ftp; private $ftp_user; private $ftp_password; private $lastModificationTime; private $parser; private $directoryInfo; private $oldLastModTime; private $lastModTime; private $oldLastTimeStamp; private $lastTimeStamp; private $fileInfo; private $Username; private $Password; private $DbName; private $Host; private $Portnumber; private $chunkSize; private $dataDesc = 0; private $timeStart; private $type; private $calibrationFilename; private $serial; private $saaType; private $saaUnits; private $saaAzimuth; private $saaReference; private $convergence; private $saaSections; private $siteTimeZoneId = 0; private $siteShiftTimeZoneId = 0; public function __construct($fileInfo, $format, $index) { global $DbName, $Username, $Password, $Host, $Portnumber; $this->Username = $Username; $this->Password = $Password; $this->DbName = $DbName; $this->Host = $Host; $this->Portnumber = $Portnumber; if ($fileInfo["ftp"] == 1) { $locationDb = new FtpLocationDb(); $fileInfo["ftp_password"] = $locationDb->decrypt($fileInfo["ftp_password"]); } if ($fileInfo["type"] === "SAA") { $format->setType($fileInfo["type"]); } $this->fileName = $fileInfo["name"]; $this->fileId = $fileInfo["fileId"]; $this->ftp = $fileInfo["ftp"]; $this->ftp_user = $fileInfo["ftp_user"]; $this->ftp_password = $fileInfo["ftp_password"]; $this->locationName = $fileInfo["locationName"]; $this->location = $fileInfo["path"]; $this->formatId = $fileInfo["formatId"]; $this->format = $format; $this->directoryInfo = $fileInfo; $this->filePath = $fileInfo["filePath"]; $this->oldLastModTime = $fileInfo["lastModificationTime"]; $this->oldLastTimeStamp = $fileInfo["lastData"]; $this->fileInfo = $fileInfo; $this->detectedEncoding = mb_detect_encoding($this->fileInfo["filePrefix"]); $this->fileInfo["filePrefix"] = mb_convert_encoding($this->fileInfo["filePrefix"], $this->detectedEncoding, "UTF-8"); $this->datFilePath = $fileInfo["file_path"]; $this->index = $index; $this->chunkSize = $this->fileInfo["incremental"] == 0 ? 10000 : 10; $this->dataDesc = $this->format->getProperty("dataDesc"); $this->timeStart = microtime(true); $this->siteTimeZoneId = intval($fileInfo["timezone_id"]); $this->siteShiftTimeZoneId = intval($fileInfo["shiftTimeZoneId"]); $this->format->columnCount = $this->getTableColumns(); $this->type = $fileInfo["type"]; $this->calibrationFilename = $fileInfo["calibrationFilename"]; $this->serial = $fileInfo["serial"]; $this->saaType = $fileInfo["saaType"]; $this->saaUnits = $fileInfo["saaUnits"]; $this->saaAzimuth = $fileInfo["saaAzimuth"]; $this->saaReference = $fileInfo["saaReference"]; $this->convergence = $fileInfo["convergence"]; $this->saaSections = $fileInfo["sections"]; } public function run() { echo $this->fileName . " started
"; $this->checkIfDatFileExistsAndIfNotCreate(); if ($this->type === "SAA" && $this->saaType === null) { $this->setFileNotInProgress($this->fileId); return; } $parser = new FileParser($this->directoryInfo, $this->fileInfo, $this->format, true, $this->chunkSize, null); $fileMtime = $this->fileInfo["incremental"] == 0 ? $parser->checkFileModificationTime() : 0; $this->lastModTime = $this->fileInfo["incremental"] == 0 ? gmdate("Y-m-d H:i:s", $fileMtime) : date("Y-m-d H:i:s"); $lt = null; if ($this->format->getProperty("isJson") == true) { $parsedArray = []; $data = $parser->getDataJson(false); foreach ($data as $dataLine) { if ($this->fileInfo["last_timestamp"] >= "e") { continue; } "e" = ""e""; array_splice($dataLine, 1, 0, "0"); array_push($parsedArray, implode(",", $dataLine)); } if (count($parsedArray) > 0) { $this->getNewJsonData($parser, $parsedArray); $this->updateFile($this->fileId, $this->lastModTime, $this->lastTimeStamp); } else { $this->updateFile($this->fileId, $this->oldLastModTime, $this->oldLastTimeStamp); } echo "Imported " . count($parsedArray) . " lines from " . $this->fileName . " in " . round(microtime(true) - $this->timeStart, 1) . " seconds
"; return true; } if ($this->oldLastModTime == "00-00- 00:00:00" || $this->oldLastModTime == null) { if ($this->fileInfo["incremental"] == 0) { $parser->parse(); } $data = $parser->getData(); if ($this->dataDesc == 1) { $this->convertDataToDesc($data); } $dataCount = count($data); $this->lastTimeStamp = $parser->getLastTimestamp(); $this->oldLastTimeStamp = $this->lastTimeStamp; $printData = implode("\xd
", $data); $this->performWrite($this->datFilePath, $printData); $this->updateFile($this->fileId, $this->lastModTime, $this->lastTimeStamp); if ($dataCount >= $this->chunkSize || $this->fileInfo["incremental"] == 1) { $this->oldLastModTime = "1970-01- 00:00:00"; $this->getNewData($this->chunkSize, $parser->getParsedIndex()); } echo "Imported " . count($data) . " lines from " . $this->fileName . " in " . round(microtime(true) - $this->timeStart, 1) . " seconds\xa"; } else { $this->getNewData(1000, null); } } private function getNewJsonData($parser, $newData) { $this->lastTimeStamp = $parser->getLastTimestamp(); $datFPath = mb_convert_encoding($this->datFilePath, "ISO-8859i", "UTF-8"); $printData = implode("\xd
", $newData); $this->performWrite($datFPath, $printData); } private function getNewData($chunkSize, $parsedIndex) { if ($this->lastModTime > $this->oldLastModTime || $this->fileInfo["incremental"] == 1) { $oldLastTimestamp = $this->oldLastTimeStamp; do { if ($this->siteTimeZoneId !== 0 && $this->siteShiftTimeZoneId !== 0 && $this->siteTimeZoneId !== $this->siteShiftTimeZoneId) { $timezoneShifter = new \VDV\datasource_file_import\model\TimeZoneShifter($this->siteTimeZoneId, $this->siteShiftTimeZoneId); $timezoneShifter->constructForFileImport($this->DbName, $this->Username, $this->Password, $this->Host, $this->Portnumber); $shiftedTimeZone = $timezoneShifter->convertBackFromTimeZones($this->oldLastTimeStamp); $this->oldLastTimeStamp = $shiftedTimeZone; } $parser = new FileParser($this->directoryInfo, $this->fileInfo, $this->format, true, $chunkSize, $parsedIndex); $newData = $parser->getNewData($this->oldLastTimeStamp); $newDataCount = count($newData); if ($newDataCount > 0) { if ($this->fileInfo["overrideData"] == 1) { $newData = $this->writeToDatabase($newData); } if (count($newData) > 0) { $printData = implode("
", $newData); $this->lastTimeStamp = $parser->getLastTimestamp(); $this->oldLastTimeStamp = $this->lastTimeStamp; $datFPath = mb_convert_encoding($this->datFilePath, "ISO-8859-1", "UTF-8"); $this->performWrite($datFPath, $printData); $this->updateFile($this->fileId, $this->lastModTime, $this->lastTimeStamp); } echo "Imported " . count($newData) . " lines from " . $this->fileName . " in " . round(microtime(true) - $this->timeStart, 1) . " seconds
"; } else { $this->updateFile($this->fileId, $this->oldLastModTime, $oldLastTimestamp); break; } $parsedIndex = $parser->getParsedIndex(); } while ($newDataCount >= $chunkSize); } else { $this->updateFile($this->fileId, $this->oldLastModTime, $this->oldLastTimeStamp); } } private function convertDataToDesc(array &$data) { $data = array_reverse($data, false); } private function getFileInfo() { return array("size" => filesize($this->filePath), "lastModified" => filemtime($this->filePath)); } public function getMTime() { return filemtime($this->directoryInfo["path"] . $this->filePath); } private function checkIfDatFileExistsAndIfNotCreate() { $isSensorData = $this->format->isSensorData(); if (!file_exists($this->datFilePath) && $isSensorData === false) { $parser = new FileParser($this->directoryInfo, $this->fileInfo, $this->format, true, null, null); $parser->parse(); $header = $parser->getDatHeader(); $header = explode(",", $header); $header = str_replace(""", '', $header); $parser->openDatFile($this->datFilePath, $header); } } private function performWrite($datFilePath, $data) { if ($this->type === "SAA") { $isFirstParse = $this->oldLastModTime == "0000-00- 00:00:00" || $this->oldLastModTime == null; $path = realpath(dirname(__FILE__) . "/../../../../vdv_www"); require_once $path . "/components/vdv/datasource_saa/model/SAACollectorphp"; require_once $path . "/components/vdv/datasource_saa/model/CreateSiteFileForSAAphp"; $saaCollector = new \vdv\datasource_saa\model\SAACollector($datFilePath); $saaCollector->setCalibrationFile($this->calibrationFilename); $saaCollector->setSettings($this->saaType, $this->saaUnits, $this->saaAzimuth, $this->saaReference, $this->convergence, $this->saaSections, $isFirstParse); $saaCollector->setSerial($this->serial); $data = $saaCollector->parseData($data); } if ($this->siteTimeZoneId !== 0 && $this->siteShiftTimeZoneId !== 0 && $this->siteTimeZoneId !== $this->siteShiftTimeZoneId) { $timezoneShifter = new \VDV\datasource_file_import\model\TimeZoneShifter($this->siteTimeZoneId, $this->siteShiftTimeZoneId); $timezoneShifter->constructForFileImport($this->DbName, $this->Username, $this->Password, $this->Host, $this->Portnumber); $data = $timezoneShifter->convertDatStringFromFileImport($data); $latestTimestamp = $timezoneShifter->getLatestTimestamp(); $this->oldLastTimeStamp = $latestTimestamp; $this->lastTimeStamp = $latestTimestamp; $this->updateFile($this->fileId, $this->lastModificationTime, $latestTimestamp); } $isTimeZone = $this->siteTimeZoneId !== 0 && $this->siteShiftTimeZoneId !== 0 && $this->siteTimeZoneId !== $this->siteShiftTimeZoneId; if ($this->type === "SAA" && $isTimeZone === false) { $this->writeToFile($datFilePath, $data); } else { $this->writeToFile($datFilePath, $data); $this->writeToFile($datFilePath, "\xd
"); } } private function writeToFile($filename, $text_data) { $error_code = 0; if (is_writable($filename)) { if (!($handle = fopen($filename, "a"))) { $error_code = 1; } else { if (fwrite($handle, $text_data) === FALSE) { $error_code = 2; } else { } fclose($handle); } } else { $error_code = 3; } return $error_code; } private function writeToDatabase($data) { $stationId = $this->fileInfo["stationId"]; $db = new PDO("mysql:host={$this->Host};port={$this->Portnumber};dbname={$this->DbName}", $this->Username, $this->Password); $sql = "select * from variable_info WHERE station_id = :stationId AND (calculated_variable is NULL OR calculated_variable = ) ORDER BY file_col_number"; $stmt = $db->prepare($sql); $stmt->execute([":stationId" => $stationId]); $variables = $stmt->fetchAll(PDO::FETCH_ASSOC); $lastTimestamp = $this->fileInfo["last_timestamp"]; $columnNames = array_column($variables, "db_col_name"); $isTimeZoneShift = $this->siteTimeZoneId !== 0 && $this->siteShiftTimeZoneId !== 0 && $this->siteTimeZoneId !== $this->siteShiftTimeZoneId; if ($isTimeZoneShift === true) { $timezoneShifter = new \VDV\datasource_file_import\model\TimeZoneShifter($this->siteTimeZoneId, $this->siteShiftTimeZoneId); $timezoneShifter->constructForFileImport($this->DbName, $this->Username, $this->Password, $this->Host, $this->Portnumber); } foreach ($data as $key => $line) { $timestamp = str_replace(""", '', "e"); if ($isTimeZoneShift === true) { $timestamp = $timezoneShifter->convertBetweenTimeZones($timestamp); } if ($timestamp <= $lastTimestamp) { unset("e"); unset("x"); $this->saveDbLineToDatabase($timestamp, $dataLine, $columnNames, $db); unset($data[$key]); } } return array_values($data); } private function saveDbLineToDatabase($timestamp, $line, $columns, $db) { $sqlArray = []; if (count($columns) === count($line)) { $line = array_values($line); $columns = array_values($columns); $sqlArray = array_combine($columns, $line); } $query = "UPDATE " . $this->fileInfo["table_name"] . " SET "; end($sqlArray); $lastKey = key($sqlArray); foreach ($sqlArray as $column => $value) { if (is_numeric($value)) { $query .= $column . " = " . $value; } else { $query .= $column . " = null"; } if ($column !== $lastKey) { $query .= ", "; } } $query .= " WHERE time_stamp = :timestamp"; $stmt = $db->prepare($query); $stmt->execute([":timestamp" => $timestamp]); } private function updateFile($fileId, $lastModificationTime, $lastTimeStamp) { $Host = $this->Host; $Port = $this->Portnumber; $DbName = $this->DbName; $db = new PDO("mysql:host={$Host};port={$Port};dbname={$DbName}", $this->Username, $this->Password); $query = "UPDATE data_source_file SET lastModificationTime = :lastModificationTime, \xd\xa lastData = :lastData,\xd\xa inProgress = 0\xd\xa WHERE id = :id"; $stmt = $db->prepare($query); $stmt->execute(array(":id" => $fileId, ":lastModificationTime" => $lastModificationTime, ":lastData" => $lastTimeStamp)); if ($stmt->errorInfo()["0"] != "000") { print_r($stmt->errorInfo()); } } private function setFileNotInProgress($fileId) { $Host = $this->Host; $Port = $this->Portnumber; $DbName = $this->DbName; $db = new PDO("mysql:host={$Host};port={$Port};dbname={$DbName}", $this->Username, $this->Password); $query = "UPDATE data_source_file SET inProgress = 0 WHERE id = :id"; $stmt = $db->prepare($query); $stmt->execute(array(":id" => $fileId)); if ($stmt->errorInfo()["0"] != "00000") { print_r($stmt->errorInfo()); } } private function getTableColumns() { $db = new PDO("mysql:host={$this->Host};port={$this->Portnumber};dbname={$this->DbName}", $this->Username, $this->Password); $sql = "SELECT count(*) as columnCount\xd\xa FROM information_schemacolumns
\xa WHERE table_name = :table_name"; $stmt = $db->prepare($sql); $stmt->execute(array(":table_name" => $this->fileInfo["table_name"])); $columnCount = $stmt->fetch(PDO::FETCH_ASSOC)["columnCount"]; return intval($columnCount) - 2; } }
© 2023 Quttera Ltd. All rights reserved.