Search This Blog

Sunday, January 26, 2020

Using arrays with parallel

OpenAF is a mix of Javascript and Java, but "pure" javascript isn't "thread-safe" in the Java world. Nevertheless being able to use Java threads with Javascript is very useful starting with performance.

One common pitfall is not paying attention to what is "thread-safe" and what isn't. Meaning, what is "aware" that it can be running on a multi-thread environment where several threads might be competing for access to a resource and what isn't.

Let's see a pratical example of a common pitfall with arrays.

Example of what should NOT be done

In this example we will create a simple array and push new elements in parallel. So we would expect that it would have the same number of elements always:

var targetArray = [];
// making up a source array with 10 elements to push to targetArray
var sourceArray = repeat(9, '-').split(/-/);
parallel4Array(source, v => {
    targetArray.push(v);
    return v;
});

print(sourceArray.length);  // 10
print(targetArray.length);  // 10

Great, so for a source array and target array are equal. It works, right? Let's increase the "thread competition" to 10000 and compare again:

var targetArray = [];
var sourceArray = repeat(9999, '-').split(/-/);
parallel4Array(source, v => {
    targetArray.push(v);
    return v;
});

print(sourceArray.length);  // 10000
print(targetArray.length);  // 9961

There are no longer equal. And if you run it again and increase the number of elements on the source array the difference will increase.

This is what happens when you forget that a javascript array is not thread-safe (and it's actually a good thing because being thread-safe is usually slower when you are not using threads).

Examples of what should be done

Using sync

The first immediate way to solve this is to ensure that just one thread will access the targetArray at a time. You can do this in OpenAF with the sync function:

var targetArray = [];
var sourceArray = repeat(9999, '-').split(/-/);
parallel4Array(source, v => {
    sync(() => {
        targetArray.push(v);
    }, targetArray);
    return v;
});

print(sourceArray.length);  // 10000
print(targetArray.length);  // 10000

The sync function will use Java underneath (synchronize) to ensure that only one thread at a time accessing targetArray will execute the provided function.

So the problem is solved, right? Well if you measure the performance of using sync and not using sync you will notice that it can be up to twice as bad.

Why? Because when one thread is inserting a value into targetArray all other threads have to stop and wait. That will slow down everything and probably make it not so much as effective as running it sequentially (depending on the processing outside the sync function).

Using syncArray

On the OpenAF library ow.obj there is a wrapper for you to use a "thread-safe" java array: ow.obj.syncArray

ow.loadObj();
var targetArray = new ow.obj.syncArray();
var sourceArray = repeat(99999, '-').split(/-/);
parallel4Array(source, v => {
    targetArray.add(v);
    return v;
});

print(sourceArray.length);            // 100000
print(targetArray.toArray().length);  // 100000

The java array version is optimized to be faster in these conditions. So the bigger is the sourceArray the bigger the benefits with ow.obj.syncArray.

The ow.obj.syncArray has more methods that you can explore from the OpenAF's help including: addAll, clear, get, indexOf, length, remove and even getJavaObject that will let you iteract with the original java object directly.

Comparison

Changing the above examples to perform something "harder" for each array element like calculating the Math.sin of each value and then comparing the performance you would get something similar to:

Strategy Source size Target size Average time
Parallel 100000 <100000 2.11s
Sync 100000 100000 2.44s
ow.obj.syncArray 100000 100000 2.06s

But what's the time if it's done sequentially? In this simple example: a lot better (~1.1s). Keep in mind that you only gain a performance advantage when the time spent dealing with threads and concurrent access is lot lower relatively to the time spent processing each array element.

So, in conclusion, there is no right or wrong answer. You need to test to get the best for your case.

Tuesday, January 14, 2020

Relaxed JSON parser

In OpenAF, when using the jsonParser function, the parsing sticks to the strict JSON definition.

For example the following behaves as expected:

> jsonParser("{ \"a\": 1 }");
{
   "a": 1
}
> JSON.parse("{ \"a\": 1 }");
{
   "a": 1
}

But using a more "relaxed" JSON definition, the same functions will fail:

> jsonParser("{ a: 1 }");
{ a: 1 }
> JSON.parse("{ a: 1 }");
-- SyntaxError: Unexpected token in object literal

The jsonParser function will return the text string representation as it's unable to parse the JSON string. The native JSON.parse will actually throw an execption.

Using GSON

OpenAF includes the GSON library which can parse more "relaxed" JSON definitions. Since OpenAF version 20200108, the OpenAF jsonParser function can also use the GSON library. There is a new second boolean argument that if true alternates to GSON for parsing the string provided on the first argument:

> jsonParse("{ a: 1 }", true);
{
   "a": 1
}

Monday, January 13, 2020

oJob exception handling

When you create an OpenAF's oJob each job runs independently and you are responsible for handling exceptions on each.

But couldn't we have a global try/catch function for all jobs? Yes, you can add a function with ojob.catch.

oJob throwing exceptions example

todo:
  - Normal job
  - Bug job
  - Another Bug job

jobs:
  #-----------------
  - name: Normal job
    exec: |
      print("I'm a normal job.");

  #--------------
  - name: Bug job
    exec: |
      print("I'm a buggy job.");
      throw "BUG!";

  #----------------------
  - name: Another Bug job
    exec: |
      print("I'm another buggy job.");
      throw "BUG!";

This ojob has 3 jobs. "Normal job" will execute without throwing any exceptions. But "Bug job" and "Another Bug job" will throw two exceptions when executed.

Executing you will see the three jobs executing with two of them failing. The entire oJob process will finish with exit code 0.

oJob general exception example

todo:
  - Normal job
  - Bug job
  - Another Bug job

ojob:
  catch: |
    var msg = "Error executing job '" + job.name + "'\n";
    msg += "\nArguments:\n" + stringify(args, void 0, "") + "\n";
    msg += "\nException:\n" + stringify(exception, void 0, "") + "\n";
    logErr(msg, { async: false });

    if (String(exception) == "BUG!") exit(1);

jobs:
  #-----------------
  - name: Normal job
    exec: |
      print("I'm a normal job.");

  #--------------
  - name: Bug job
    exec: |
      print("I'm a buggy job.");
      throw "BUG!";

  #----------------------
  - name: Another Bug job
    exec: |
      print("I'm another buggy job.");
      throw "BUG!";

This example is equal to the previous one but it adds ojob.catch. The catch function receives several arguments:

Argument Type Description
args Map The current args map at the point of the exception.
job Map The job map where the exception occurred.
id Number If the job was executed within a specific sub-id.
deps Map A map of job dependencies.
exception Exception The exception itself.

If the function returns true the exception will be recorded as usual and the job will be registered has failed. If the function returns false the exception will be ignored.

In this example the function will actually stop the entire oJob process with exit code 1.

Sunday, January 12, 2020

Using channel peers

If you use OpenAF channels there are some cases where you would like to connect them across OpenAF scripts.

To achieve this you can use the $ch.expose and the $ch.createRemote functions that allow you to expose an internal OpenAF to be accessible by other OpenAF scripts remotely.

But if the original OpenAF script which exposed the channel "dies" all the others will no longer be able to access the corresponding data. This might not be the desired scenario in some cases.

Peering

But making a mix of expose and createRemote functions between several OpenAF scripts you can achieve what is provided by the \$ch.peer function.

The \$ch.peer will expose an OpenAF channel and exchange sync data with a set of "peer" OpenAF scripts that just have to execute the same similar command:

$ch("myChannel").peer(12340, 
                      "/myURI", 
                      [ "http://script01:12340/myURI", 
                        "http://script02:12341/myURI", 
                        "http://script03:12343/myURI"]);

The signature is:

$ch.peer(aLocalPortOrServer, aPath, aRemoteURL, aAuthFunc, aUnAuthFunc)

The parameters are similar to the $ch.expose function:

Parameter Type Description
aLocalPortOrServer Object/Number The local port or a HTTPServer object to hold the HTTP/HTTPs server.
aPath String The URI where the channel interaction will be performed.
aRemoteURL Array An array of peer URLs
aAuthFunc Function This function will be called with user and password. If returns true the authentication is successful.
aUnAuthFunc Function This function will be called with the http server and the http request.

If you call it, after the first time, with a different array of aRemoteURL it will replace the existing. If the array references itself it will be ignored.

Unpeering

To remove a peering you can call:

$ch.unpeer(aRemoteURL)

Testing

Setting up data channels in each peer

First script (on host h1):

$ch("data").peer(9080, "/data", [ "http://h1:9080/data", "http://h1:9090/data", "http://h2:9080/data" ]);

Second script (on host h1):

$ch("data").peer(9090, "/data", [ "http://h1:9080/data", "http://h1:9090/data", "http://h2:9080/data" ]);

Third script (on host h2):

$ch("data").peer(9080, "/data", [ "http://h1:9080/data", "http://h1:9090/data", "http://h2:9080/data" ]);

Changing and retrieving data

  1. On host h2:
> $ch("data").size()
0
  1. On host h1:
> $ch("data").size()
0
> $ch("data").setAll(["canonicalPath"], io.listFiles(".").files);
> $ch("data").size()
25
  1. On host h2:
> $ch("data").size()
25

Sunday, December 8, 2019

Logging to log files

There are several benefits of using the OpenAF's log* functions. One of them is to quickly have a logs folder with automated housekeeping with just a couple of lines of code.

Let's start with a simple script with log functions:

log("Init");

try {
    log("Writing file...");
    io.writeFileString("test.txt", "test");
    log("File test.txt written.");
} catch(e) {
    logErr("Couldn't write file test.txt. Error: " + String(e));
}

log("Done");

Executing you will get the expected standard output:

2019-12-08 09:19:00.540 | INFO | Init
2019-12-08 09:19:00.750 | INFO | Writing file...
2019-12-08 09:19:00.780 | INFO | File test.txt written.
2019-12-08 09:19:00.785 | INFO | Done

Log to a folder

Let's log to a folder:

ow.loadCh();
// Setting default logging to the logs folder
ow.ch.utils.setLogToFile({ logFolder: "logs" });
// Ensuring logs are written synchronously
setLog({ async: true });

log("Init");

try {
    log("Writing file...");
    io.writeFileString("test.txt", "test");
    log("File test.txt written.");
} catch(e) {
    logErr("Couldn't write file test.txt. Error: " + String(e));
}

log("Done");

Executing again the result seems similar but a new "logs" folder was created with a daily log file containing all the log messages. When creating a new daily log file it will automatically gzip all old daily log files.

But that's the standard behaviour that can, of course, be customizable. Let's add housekeeping to delete files older than one week:

ow.loadCh();
ow.ch.utils.setLogToFile({ 
    logFolder: "logs",
    HKhowLongAgoInMinutes: 7200
});
//...

p.s.: you can specify a different folder to hold the previous compressed log files using _backupFolder. You can also control if old log files should be compressed or not with dontCompress._

Logging to files by hour

The default is logging to daily files but you can customize to any date time grouping. To group files by hour can simply add a specific fileDateFormat. Additionally let's change the log filenames generated using filenameTemplate (that uses handlebars notation).

Since we are changing the log filenames for the housekeeping process to be able to identify the new log filenames you need to provide a HKRegExPattern regular expression pattern also.

ow.loadCh();
ow.ch.utils.setLogToFile({ 
    logFolder: "logs",
    filenameTemplate: "hourly-logs-{{timedate}}.log",
    fileDateFormat: "yyyy-MM-dd-HH",
    HKhowLongAgoInMinutes: 7200,
    HKRegExPattern: "hourly-log-\\d{4}-\\d{2}-\\d{2}-\\d{2]\\.log"
});
//...

Logging as CSV files

To log into CSV files you just need to change the filenameTemplate and the lineTemplate:

ow.loadCh();
ow.ch.utils.setLogToFile({ 
    logFolder: "logs",
    filenameTemplate: "log-{{timedate}}.csv",
    lineTemplate: "\"{{timedate}}\";\"{{type}}\";\"{{{message}}}\"\n",
    HKhowLongAgoInMinutes: 7200
});
//...

Logging as NDJSON files

To log as NDJSON:

ow.loadCh();
ow.ch.utils.setLogToFile({ 
    logFolder: "logs",
    filenameTemplate: "log-{{timedate}}.csv",
    lineTemplate: "{d:\"{{timedate}}\",t:\"{{type}}\",m:\"{{{message}}}\"}\n",
    HKhowLongAgoInMinutes: 7200
});

Keeping log entries only on files

You can also specify that log entries should only be recorded in the log files and not shown on the standard output console:

ow.loadCh();
ow.ch.utils.setLogToFile({
    logFolder: "logs",
    setLogOff: true
});

And more...

You can check all the available options by executing:

> help ow.ch.utils.setLogToFile

on an openaf-console prompt.

Tuesday, November 12, 2019

oJob Check for stall

When running an oJob there might be situations that you want to ensure that the entire process won't enter into a stall (e.g. being stopped on a "dead-end" waiting for some service or lock or whatever).

In oJob there is actually a feature to allow you to ensure, no matter what, your oJob won't run pass a specific timeout or for a function to be executed to determine if the oJob is at a stall situation.

Killing after x seconds

The easiest configuration is ensuring that there is a general timeout for the entire oJob:

ojob:
  checkStall:
    # check for a stall every x seconds (default 60)
    everySeconds    : 1
    # kill the entire process after x seconds
    killAfterSeconds: 4

todo:
  - Test job

jobs:
  #---------------
  - name: Test job
    exec: |
      args.wait = _$(args.wait).default(5000);

      log("Waiting for " + args.wait + "ms...");
      sleep(args.wait, true);
      log("Done");

Executing this oJob you will get different results depending on the amount of time the "Test job" takes. It's configured to "kill it self" if it takes longer than 4 seconds and it will check for that every second (e.g. on real situations you should use the default of 60 seconds).

For 1,5 seconds:

$ ojob test.yaml wait=1500
>> [Test job] | STARTED | 2019-11-11T12:13:17.199Z------------------------
2019-11-11 12:13:17.230 | INFO | Waiting for 1500ms...
2019-11-11 12:13:18.736 | INFO | Done

<< [Test job] | Ended with SUCCESS | 2019-11-11T12:13:18.740Z ============

For 5 seconds:

$ ojob test.yaml wait=5000
>> [Test job] | STARTED | 2019-11-11T12:22:00.058Z -----------------------
2019-11-11 12:22:00.085 | INFO | Waiting for 5000ms...
oJob: Check stall over 4000
2019-11-11 12:22:03.878 | ERROR | oJob: Check stall over 4000

Killing depending on a function

If you have certain conditions that can be easily checked to determine if the oJob is stalled you can use a function:

ojob:
  checkStall:
    everySeconds    : 1
    checkFunc       : |
      print("checking for stall...");
      if (global.canDie) {
        print("should die.");
        return true;
      }

todo:
  - Init
  - Test job

jobs:
  #-----------
  - name: Init
    exec: |
      global.canDie = false;

  #---------------
  - name: Test job
    exec: |
      log("Waiting for 2500ms...");
      sleep(2500, true);

      log("Setting canDie to true...");
      global.canDie = true;

      log("Waiting for another 2500ms...");
      sleep(2500, true);

      log("Done");

In this case a global variable canDie is only set to true after the first 2,5 seconds of execution of the Test job job. As soon as the checkFunc is executed and confirms the conditions by returning a true value the oJob is immediatelly stopped.

$ ojob test2.yaml
checking for stall...
>> [Init] | STARTED | 2019-11-11T12:32:02.828Z -----------------------------
<< [Init] | Ended with SUCCESS | 2019-11-11T12:32:02.857Z ==================
>> [Test job] | STARTED | 2019-11-11T12:32:02.893Z -------------------------
2019-11-11 12:32:02.924 | INFO | Waiting for 2500ms...
checking for stall...
checking for stall...
2019-11-11 12:32:05.429 | INFO | Setting canDie to true...
2019-11-11 12:32:05.430 | INFO | Waiting for another 2500ms...
checking for stall...
should die.

You can see the several checkFunc executions by the output "checking for stall…" and once the global variable canDie was true all the oJob stopped it's execution.

Checking at the job level

All the previous options checked for stall for the entire oJob execution but you can specify the same at the job level using typeArgs.timeout and typeArgs.stopWhen that are available for all types of jobs in oJob.

Example with typeArgs.timeout

In this example the Test job job is set to timeout after 1,5 seconds:

todo:
  - Init
  - Test job
  - Done

jobs:
  #-----------
  - name: Init
    exec: |
      global.canDie = false;

  #-----------
  - name: Done
    exec: |
      log("Everything is done.");

  #-------------------
  - name    : Test job
    typeArgs:
      timeout: 1500
    exec    : |
      log("Waiting for 2500ms...");
      sleep(2500, true);

      log("Setting canDie to true...");
      global.canDie = true;

      log("Waiting for another 2500ms...");
      sleep(2500, true);

      log("Done");

Executing it the job will actually end in error after the specified timeout:

>> [Init] | STARTED | 2019-11-11T12:47:25.568Z ----------------------------
<< [Init] | Ended with SUCCESS | 2019-11-11T12:47:25.624Z =================
>> [Test job] | STARTED | 2019-11-11T12:47:25.662Z ------------------------
2019-11-11 12:47:25.684 | INFO | Waiting for 2500ms...

!! [Test job] | Ended in ERROR | 2019-11-11T12:47:27.197Z =================
- id: 8ebbf961-822d-3b95-ca09-8dfb335ab6cb
  error: Job exceeded timeout of 1500ms

===========================================================================
>> [Done] | STARTED | 2019-11-11T12:47:27.277Z ----------------------------
2019-11-11 12:47:27.297 | INFO | Everything is done.

<< [Done] | Ended with SUCCESS | 2019-11-11T12:47:27.300Z =================

Example with typeArgs.stopWhen

In this example the Test job job is set stop whenever the stopWhen function returns a true value:

todo:
  - Init
  - Test job
  - Done

jobs:
  #-----------
  - name: Init
    exec: |
      global.canDie = false;

  #-----------
  - name: Done
    exec: |
      log("Everything is done.");

  #-------------------
  - name    : Test job
    typeArgs:
      stopWhen: |
        if (global.canDie) {
           print("should die...");
           return true;
        }
    exec    : |
      log("Waiting for 2500ms...");
      sleep(2500, true);

      log("Setting canDie to true...");
      global.canDie = true;

      log("Waiting for another 2500ms...");
      sleep(2500, true);

      log("Done");

Executing the job will actually stop without any error if the stopWhen function returns the a true value. To end the job with an error simply throw an exception on the stopWhen function.

>> [Init] | STARTED | 2019-11-11T12:38:54.232Z -----------------------------
<< [Init] | Ended with SUCCESS | 2019-11-11T12:38:54.263Z ==================
>> [Test job] | STARTED | 2019-11-11T12:38:54.298Z -------------------------
2019-11-11 12:38:54.330 | INFO | Waiting for 2500ms...
2019-11-11 12:38:56.837 | INFO | Setting canDie to true...
should die...
2019-11-11 12:38:56.838 | INFO | Waiting for another 2500ms...

<< [Test job] | Ended with SUCCESS | 2019-11-11T12:38:56.857Z ==============
>> [Done] | STARTED | 2019-11-11T12:38:56.012Z =============================
2019-11-11 12:38:56.025 | INFO | Everything is done.

<< [Done] | Ended with SUCCESS | 2019-11-11T12:38:56.098Z ==================

Thursday, November 7, 2019

Handling oJob deps failure

When executing an oJob each job can depend on the successfull execution of other jobs. That means that if a depending job fails the current job execution will fail or stall (if it's an ojob.sequential = true).

Dependency timeout

If ojob.sequential != true any failed job dependency will keep the oJob waiting for a successfull execution of the dependent job. You can avoid that using a dependency timeout:

ojob:
  # timeout of 2500ms
  depsTimeout: 2500 

In this case whenever a job doesn't execute because another failed it will wait just the amount of specific ms for another successfull execution. Otherwise it will terminate with an error indicating the a dependency timeout has occurred.

Individual dependency

You can also execute code to decide what should be done if a dependent job fails when ojob.sequential != true:

todo:
  - Init
  - Test 1
  - Test 2

jobs:
  #-----------
  - name: Init
    exec: |
      // Initialize error flag
      global.inError = false;

  #-------------
  - name: Test 1
    exec: |
      print("Test 1");
      // Throw an error if args.error is defined
      if (args.error) throw("Problem with test 1");

  #-------------
  - name: Test 2
    deps:
      - name  : Init
      - name  : Test 1
        onFail: |
          // if the dependent job fails, 
          // change the global error flag and proceed
          global.inError = true;
          return true;
    exec: |
      if (!global.inError)
        print("Test 2");
      else
        printErr("Can not execute Test 2");

In this example there are three jobs:

  • Init - initializes a global inError flag.
  • Test 1 - generates an error if the error argument is defined during this oJob execution.
  • Test 2 - depends on the Init and Test 1 jobs. If Test 1 job fails it sets the global inError flag and proceeds with the execution that checks that same flag.

The onFail entry on the list of dependencies for job Test 2 is actually the code of a function that receives three parameters:

  • args - the current job arguments
  • job - the current job definition
  • id - the current job execution id

If this onFail functions returns true the job execution will proceed. If it returns false the job execution stalls as it does by default.

Using arrays with parallel

OpenAF is a mix of Javascript and Java, but "pure" javascript isn't "thread-safe" in the Java world. Nevertheless be...