Monday, December 1, 2014

Dynamic Where Clause

Special shout out to Michael Johnson for working on this solution with me. Here is his LinkedIn profile.

We recently came across a requirement that could be solved by doing 1 of 2 things.
  1. Create a huge case statement
  2. Use a dynamic where clause
The case statement idea seemed the easiest, but would have been around 100 cases long, and if any changes needed to be made, it would require changing the ETL and re-releasing the code.

Instead we decided to store the where clauses in a spreadsheet, and then use that to drive output rather than the case statement. We could have gone with a table, and then built a form on top of it, but a spreadsheet was the simplest and quickest solution.

Here is an example of what we were dealing with:








Our group dimension was defined by the component column, and went on for an additional 94 groups. Because each value definition in this dimension was not based on the same set of columns we couldn't use the standard lookup_ext function, or a join. 

We did consider using a case statement, but then that would have been huge, and it was anticipated that changes would be made to these definitions and we didn't want to have to re-release the ETL every time a change was made.

Getting dynamic sql into your Data Services query is done by using the pushdown_sql function, and feeding in whatever sql you want it to use.

The challenge for us was to make sure that we went through each of the rows in our table, and applied the where clauses one by one.

The first thing we did was copy all the dimension data into a working table, and added an extra column called processed and defaulted all the values to N. We did this so that we'd have a list to step through and mark off each time we'd processed a record.







When then setup a loop to go through each of our rows and process them.


The script Processed_Count initialised the counter variable for our loop.

$G_ProcessedCount = sql('Datawarehouse','select count(*) from WK_DIM_GROUP');

The inside of our loop looks like this:


($G_ProcessedCount >= 1) is there as our loop control. Once that variable is 0 we have processed all the dynamic where clauses in our list.

The Set_Variables script is where we setup the dynamic where clause, and the value that will be assigned to the rows that filter through the where clause.

$G_GroupID = sql('Datawarehouse','select min(GROUP_ID) as GROUP_ID from WK_GROUP where PROCESSED = \'N\'');

$G_GroupLogic  = sql('Datawarehouse','select a.component from WK_GROUP a where a.GROUP_ID = [$G_GroupID]');

$G_GroupID will hold the key that we want to assign to the rows. We select the min because we want the first row that has not been processed yet. As we step through the rows and update the processed rows to Y, we will end up moving to the subsequent N rows until all of them have been processed and updated to Y.

$G_GroupLogic will hold the where clause you need to filter down the rows.

Now all we need to do is apply that in a query.

In the above picture that is done in the data flow called DF_WK_GROUP_ID_Load.

pushdown_sql('Datawarehouse', '{$G_GroupLogic}' ) becomes the where clause for our query. 

$G_GroupID is mapped to the column that holds the dimension table foreign key.

Now you just need to update the row you have processed in the working table from N to Y. We do that in the DF_WK_GROUP_Update data flow.

Finally, update the $G_ProcessedCount variable in the Update_CountVariable script:
$G_ProcessedCount = $G_ProcessedCount -1;

We'll keep on going through this process until there are no more rows to process.

Monday, November 24, 2014

Files as data sources - Part 5 - Missing Files

To handle files that don't arrive I check through the SYS_FILE table to see if there is an entry for that file for the day. If there is no entry I add a row to the SYS_FILE table to record the fact that no file arrived, and then I send out an email to person responsible for that file, to let them know that the file did not arrive.

Here is the basic script:


$L_EmailBody  = 'The following files did not arrive today: ';
$L_Counter = 0;

#Check if file $G_ActivityEmfe  has arrived
If (db_type('Datawarehouse') = 'Oracle')
BEGIN
   $L_FileName = replace_substr( $G_ActivityEmfe ,'*','%');
END

IF (0 = sql('Datawarehouse','SELECT COUNT(*) FROM SYS_FILE WHERE FILE_LOCATION = {$G_FileLocation} AND FILE_NAME like {$L_FileName} AND trunc(FILE_PROCESS_DATETIME) = trunc(SYSDATE)'))
begin
$L_EmailBody  = $L_EmailBody || chr(13) || chr(10) || $G_ActivityEmfe;
$L_Counter = $L_Counter + 1;
end


#Check if file $G_ActivityEmms has arrived
If (db_type('Datawarehouse') = 'Oracle')
BEGIN
   $L_FileName = replace_substr( $G_ActivityEmms ,'*','%');
END

IF (0 = sql('Datawarehouse','SELECT COUNT(*) FROM SYS_FILE WHERE FILE_LOCATION = {$G_FileLocation} AND FILE_NAME like {$L_FileName} AND trunc(FILE_PROCESS_DATETIME) = trunc(SYSDATE)'))
begin
$L_EmailBody  = $L_EmailBody || chr(13) || chr(10) || $G_ActivityEmms;
$L_Counter = $L_Counter + 1;
end

#If any files did not arrive, ie counter>0, then send email. if ($L_Counter > 0)
begin
    print($L_EmailBody);
smtp_to($G_AbsentReportingAddresses, 'Absent Files Report', $L_EmailBody , 0, 0);
end

Monday, November 17, 2014

Files as data sources - Part 4 - File Errors

I always place the dataflow that is reading the file within a try...catch block to specifically respond to any file errors that may occur.


The two error types you want to look out for are:

  1. Flat file processing errors (1004)
  2. File access errors (1005)
If an error does occur I run the following in the script:


#Record file failure in SYS_FILE table
FileFailed($G_FileLocation,$G_ActivityEmms);

#Archive file
FileMove($G_ActivityEmms, $G_FileLocation, $G_FileErrorLocation);

#Truncate staging table to prevent error data from getting into datamart
sql( 'Datawarehouse','TRUNCATE TABLE STG_ACTIVITY_EMMS_EM');

The FileFailed function has nearly the same code as the FileComplete function. You could actually combine them into a single function that updates the file status to either COMPLETE or FAILED, or anything else you might need.


If (db_type('Datawarehouse') = 'Oracle')
BEGIN
    $L_FileName = replace_substr( $P_FileName,'*','%');
END

sql('Datawarehouse','UPDATE SYS_FILE SET STATUS = \'FAILED\' WHERE FILE_LOCATION = {$P_Directory} AND FILE_NAME like {$L_FileName} AND AUDIT_ID = [$G_AuditID]');

We've already reviewed how the FileMove function works.

Lastly I clear out the staging table so that no partially loaded data goes through the rest of the load.

It would also be a good idea to send out an email alerting the owner of the file that there was something wrong with the file so that they can take an remedial action before resending the file for processing.

Monday, November 10, 2014

Files as data sources - Part 3 - Moving files around

So now that file has been processed, lets look at what we should do next.

I like to have an archive folder on the file system to keep any files that have been processed, this is so that I can go back for auditing and/or dealing with any queries about the data that may come up.

I also update the STATUS field in the SYS_FILE table that references the file to COMPLETE.

Lets start with the update function:

If (db_type('Datawarehouse') = 'Oracle')
BEGIN
    $L_FileName = replace_substr( $P_FileName,'*','%');
END

sql('Datawarehouse','UPDATE SYS_FILE SET STATUS = \'COMPLETE\' WHERE FILE_LOCATION = {$P_Directory} AND FILE_NAME like {$L_FileName} AND AUDIT_ID = [$G_AuditID]');

For this function I pass in the directory and file name, look up the relevant row\s in the SYS_FILE table and then update them to COMPLETE.

First thing I do is check the database type. If I am using Oracle I will need to convert any *s to % as that is the wildcard character in oracle. You can add additional database types in here as required.

I look for the rows in the SYS_FILE table that match the file name. The reason I update more than one row at a time is because when you have multiple files processed through the file object such as File1, File2 and File3, and you refer to the file name in your file object as File*; then these files are collated into one batch for processing. So if the dataflow completed successfully then all of the files have been loaded.

Once I've update the rows in the SYS_FILE table I move the files to the archive directory with a little FileMove function:

$L_FileName = word_ext( $P_FileName ,1,'.');

exec('sh',' -c "mv ' || $P_SourceDirectory || $L_FileName  || ' ' || $P_TargetDirectory || '"',8);

In this example my job server is running on a UNIX OS, but you could re-write this a Microsoft OS by replace the 'sh' with cmd, and using 'move' instead of 'mv'.

I use the word_ext function at the top to avoid any issues with file extensions and wildcards. You may not have this issue so will not necessarily have to include the first line of the function.




Monday, November 3, 2014

Files as data sources - Part 2 - Keeping track

In the last post I spoke about how to time your job with the file's arrival. In this post I will talk about how keep track of your files by keeping some basic metadata, and also how to relate the data in your target destination back to the original file.

Here again is the IsFileAvailable function I am using to wait for the files:

$L_FileFound = wait_for_file($P_FileDirectory || $P_FileName, $P_TimeOut,$P_Interval , -1, $L_FileNames, $L_FileListSize, ',');

if ($L_FileFound = 1)
begin
   $L_Counter = 1;
   WHILE ($L_Counter <= $L_FileListSize)
   begin
      FileNew($P_FileDirectory, word_ext($L_FileNames,$L_Counter,','));
      $L_Counter = $L_Counter + 1;
   end
end

Return $L_FileFound;

Once I have found a file, or files, I then run a routine to step through each of the file names. I use the word_ext function to choose which file name in the list I want to access, and then a counter to step through the list.

For each file name I then call a function I wrote called FileNew:

#Get time the file was created
$L_FileDateTime = get_file_attibute($P_FullFilePathAndName, 'date_created')

#Get the file type eg .txt .csv .pipe
$L_Type = word_ext($L_FileName,2,'.');

#Get new file id
$L_File_ID = sql('Datawarehouse','Select max(FILE_ID) from SYS_FILE') + 1

#Insert a new record into the SYS_FILENAME table
SQL('Datawarehouse','INSERT INTO SYS_FILE (FILE_ID, DI_FILENAME, FILE_DATETIME, FILE_PROCESS_DATETIME, TYPE, STATUS, AUDIT_ID)
VALUES ([$L_File_ID]{$P_FullFilePathAndName}, to_date({$L_FileDateTime}, \'YYYY/MM/DD hh24:mi:ss\'), sysdate,  {$L_Type},  \'STARTED\', [$G_AuditID])');

Return $L_File_ID;

What I am doing here is firstly gathering some metadata about the file such as the date it was created and the type of file it is, and then I am inserting that data in a table called SYS_FILE which is where I keep track of all the files processed. I also assign a file_id by adding 1 to the max file_id currently in the table.

I set the status of the file to STARTED, to signify that we have started processing this file. Later on I will either set the file to COMPLETE or ERROR.

Here is what the data in the file looks like:


You could always expand the number of columns in the SYS_FILE table to store all sorts of additional metadata about the file such as its source, how it was delivered or who is responsible for it.

Now that I have created a record in the SYS_FILE table for a file, how do I link that to the records that are coming through from the file?

First of all you need to go into your file object and set the source information on the file as follows:


There are similar options on XML and Excel files too.

What this does is it adds an extra column at the end of your file that will store the file name along with its path. This is particularly important if you are receiving and processing multiple files at once because DS will batch process all the files through at the same time. So lets say you are looking for File* and the files received are File1, File2, and File3. DS will process all 3 files together at once. It will include the file name that a row of data came from in the DI_FILENAME column, and we will use that information to tie the data back to the specific row we have created for that file.

In order to tie up the row of data with the correct SYS_FILE entry, we use thelookup_ext function as follows:


So now that we have our file and have processed it, we need to decide what to do with the file. I'll be covering archiving and updating the SYS_FILE table to show the completed files in the next post.


Monday, October 27, 2014

Files as data sources - Part 1 - Waiting for the files to arrive


In this post I am going to take a look at how to handle file arrival.

You have a few options for dealing with file arrival.
  1. Run the job at a set time each day. If the file is there it gets processed, and if it isn't, you just try again tomorrow.
  2. Run the job frequently, say every couple of minutes. Each time the job runs it attempts to process the file. If the file is there is gets processed, and if it isn't Data Services just tries again on the next run. This method is useful if the file could arrive multiple times a day, and you can't be sure exactly when the file will arrive.
  3. Start the job, and have the job wait for the file to arrive. This method is best if you're processing just one instance of the file everyday, but can't be exactly sure when the file will arrive, and you want the ETL run as soon as it arrives.
For dealing with all three options above I have written a function called IsFileAvailable that makes use of the built in DS function called wait_for_file. The reason I don't use the wait_for_file function alone is because I have built in additional functionality that will enable me to deal with some of the other issues I mentioned in the introduction. I will be going through those in subsequent posts.

Here is what the functions looks like:

$L_FileFound = wait_for_file($P_FileDirectory || $P_FileName, $P_TimeOut,$P_Interval , -1, $L_FileNames, $L_FileListSize, ',');

if ($L_FileFound = 1)
begin
   $L_Counter = 1;
   WHILE ($L_Counter <= $L_FileListSize)
   begin
      FileNew($P_FileDirectory, word_ext($L_FileNames,$L_Counter,','));
      $L_Counter = $L_Counter + 1;
   end
end

Return $L_FileFound;

The above function first waits for the file\s to arrive, and then writes a record in to a file handling table using the FileNew function.

I am using the wait_for_file function to determine when the file arrives.

The return values from the this function are:
    0 - No file matched
    1 - At least one file was matched
   -1 - The function timed out with no files found
   -2 - One of the input values is illegal

I'm generally only interested if a file has been found ie if the value 1 is returned.

The first few parameters are fairly straight forward.

Firstly it needs to know the name and location of the file you are waiting for. This can contain a wildcard, so if you are waiting for any file that starts with the letters file, you can set that value to be file*.txt. If you are not certain of the extension you can also have it be file*.*, and if you don't even care what the file name is, as long as any file arrives you can set the value as *.* .

The next parameter is how long you would like Data Services to wait for the file to arrive, the timeout parameter. This is set in milliseconds, so if you want Data Services to wait 30 minutes for the file to arrive, then that value should be 30 * 60 (seconds) * 1,000 (milliseconds) to get the value 1,800,000. If the timeout duration expires, 30 minutes in this example, then the wait_for_file function will return the value -1. This means that it looks for the file for 30 minutes, but no file arrived.

The 3rd parameter is how often you want Data Services to check whether the file has arrived. Again its the same formula for setting the value. If you want it to have a look ever 5 minutes then its 5 * 60 * 1,000 to get 300,000.

The next 4 parameters are all about returning the names of the files that Data Services finds.

In this example I have -1 set for the max match parameter. This means that I want DS to return the names of all the matched files that it finds. You could set this to 0 if you don't want any of them, or any other positive number if you only want a specific number of file names returned.

The next parameter is an output parameter that will store the list of file names returned. So lets say you set the 1st parameter in the function to file*.txt, and then there are 3 files in the directory: file1.txt, file2.txt and file3.txt. This variable will hold all three of those file names.

The next parameter will return the number of files found that match the search pattern. So again if you're looking for file*.txt, and 3 files are found that match file*.txt, then this output parameter will return the value 3.

The final parameter in the function allows you to set the list separator for the list of file. In this example I set it to be a comma. So the variable I have above called $L_FileNames, will end up with the values File1.txt, File2.txt, File3.txt.

The next part of the IsFileAvailable function loops through the list of file names and calls another function I have written called FileNew for each of the file name values in the list. The purpose of the FileNew function is to write a record into my SYS_FILE table for each file found.

I'll be going through the purpose of the SYS_FILE table and how you can use it to tie up data in the target table to the source files in my next post.

Monday, October 20, 2014

Files as Data Sources - Introduction

When files are the source of your data there are a few things to take into consideration:

  • When will the files arrive?
  • How will you keep track of the files processed?
  • How do you relate data in your target tables with the original file?
  • What if multiple files arrive for the same data set?
  • What should you do with files once they've been processed
  • What happens if there is an error with the file?
  • What should you do if a file doesn't arrive?
For this blog series I am going to go through all the points above. Most of what I discuss will be relevant for both standard text files as well as xml and Excel files.

Here is the standard structure I use when processing a file through Data Services:




So firstly I used a conditional to check whether a file has arrived or not. I've created my own function, IsFileAvailable, to look out for the file arrival. Yes, I could have used the built in function wait_for_file, but there is additional functionality that you might find useful that I have built into IsFileAvailable.

If the file arrives I print that out to the trace file. You don't have to do this, but I just find it easier to see whats going on that way.

Then I place the data flow that will process the file within a try catch block. This is so that I can handle any file errors without bringing down the entire job. Within that error handler I can report the file errors to the owner\s of the file and move the file to an error file location.

In the else section of the conditional I place some code to handle the file not arriving.

Over the next few posts I'll break out the detail of how each of the above pieces works.