OPCUA-Python
pip install opcua
Server
from threading import Thread import copy import logging from datetime import datetime import time from math import sin import sys sys.path.insert(0, "..") try: from IPython import embed except ImportError: import code def embed(): myvars = globals() myvars.update(locals()) shell = code.InteractiveConsole(myvars) shell.interact() from opcua import ua, uamethod, Server class SubHandler(object): """ Subscription Handler. To receive events from server for a subscription """ def datachange_notification(self, node, val, data): print("Python: New data change event", node, val) def event_notification(self, event): print("Python: New event", event) # method to be exposed through server def func(parent, variant): ret = False if variant.Value % 2 == 0: ret = True return [ua.Variant(ret, ua.VariantType.Boolean)] # method to be exposed through server # uses a decorator to automatically convert to and from variants @uamethod def multiply(parent, x, y): print("multiply method call with parameters: ", x, y) return x * y class VarUpdater(Thread): def __init__(self, var): Thread.__init__(self) self._stopev = False self.var = var def stop(self): self._stopev = True def run(self): while not self._stopev: v = sin(time.time() / 10) self.var.set_value(v) time.sleep(0.1) if __name__ == "__main__": # optional: setup logging logging.basicConfig(level=logging.WARN) #logger = logging.getLogger("opcua.address_space") # logger.setLevel(logging.DEBUG) #logger = logging.getLogger("opcua.internal_server") # logger.setLevel(logging.DEBUG) #logger = logging.getLogger("opcua.binary_server_asyncio") # logger.setLevel(logging.DEBUG) #logger = logging.getLogger("opcua.uaprocessor") # logger.setLevel(logging.DEBUG) # now setup our server server = Server() #server.disable_clock() #server.set_endpoint("opc.tcp://localhost:4840/freeopcua/server/") server.set_endpoint("opc.tcp://0.0.0.0:4841/freeopcua/server/") server.set_server_name("FreeOpcUa Example Server") # set all possible endpoint policies for clients to connect through server.set_security_policy([ ua.SecurityPolicyType.NoSecurity, # ua.SecurityPolicyType.Basic256Sha256_SignAndEncrypt, # ua.SecurityPolicyType.Basic256Sha256_Sign ]) # setup our own namespace uri = "http://examples.freeopcua.github.io" idx = server.register_namespace(uri) # create a new node type we can instantiate in our address space dev = server.nodes.base_object_type.add_object_type(0, "MyDevice") dev.add_variable(0, "sensor1", 1.0).set_modelling_rule(True) dev.add_property(0, "device_id", "0340").set_modelling_rule(True) ctrl = dev.add_object(0, "controller") ctrl.set_modelling_rule(True) ctrl.add_property(0, "state", "Idle").set_modelling_rule(True) # populating our address space # First a folder to organise our nodes myfolder = server.nodes.objects.add_folder(idx, "myEmptyFolder") # instanciate one instance of our device mydevice = server.nodes.objects.add_object(idx, "Device0001", dev) mydevice_var = mydevice.get_child(["0:controller", "0:state"]) # get proxy to our device state variable # create directly some objects and variables myobj = server.nodes.objects.add_object(idx, "MyObject") myvar = myobj.add_variable(idx, "MyVariable", 6.7) mysin = myobj.add_variable(idx, "MySin", 0, ua.VariantType.Float) myvar.set_writable() # Set MyVariable to be writable by clients mystringvar = myobj.add_variable(idx, "MyStringVariable", "Really nice string") mystringvar.set_writable() # Set MyVariable to be writable by clients mydtvar = myobj.add_variable(idx, "MyDateTimeVar", datetime.utcnow()) mydtvar.set_writable() # Set MyVariable to be writable by clients myarrayvar = myobj.add_variable(idx, "myarrayvar", [6.7, 7.9]) myarrayvar = myobj.add_variable(idx, "myStronglytTypedVariable", ua.Variant([], ua.VariantType.UInt32)) myprop = myobj.add_property(idx, "myproperty", "I am a property") mymethod = myobj.add_method(idx, "mymethod", func, [ua.VariantType.Int64], [ua.VariantType.Boolean]) multiply_node = myobj.add_method(idx, "multiply", multiply, [ua.VariantType.Int64, ua.VariantType.Int64], [ua.VariantType.Int64]) # import some nodes from xml # server.import_xml("custom_nodes.xml") # creating a default event object # The event object automatically will have members for all events properties # you probably want to create a custom event type, see other examples myevgen = server.get_event_generator() myevgen.event.Severity = 300 # starting! server.start() print("Available loggers are: ", logging.Logger.manager.loggerDict.keys()) vup = VarUpdater(mysin) # just a stupide class update a variable vup.start() try: # enable following if you want to subscribe to nodes on server side #handler = SubHandler() #sub = server.create_subscription(500, handler) #handle = sub.subscribe_data_change(myvar) # trigger event, all subscribed clients wil receive it var = myarrayvar.get_value() # return a ref to value in db server side! not a copy! var = copy.copy(var) # WARNING: we need to copy before writting again otherwise no data change event will be generated var.append(9.3) myarrayvar.set_value(var) mydevice_var.set_value("Running") myevgen.trigger(message="This is BaseEvent") server.set_attribute_value(myvar.nodeid, ua.DataValue(9.9)) # Server side write method which is a but faster than using set_value embed() finally: vup.stop() server.stop()
这个服务程序演示了opcua服务端,几乎所有的功能,其中Event部分没有不断发送,所以仅供参考。
下图展示了server端的对象结构
客户端
from IPython import embed from opcua import Client class SubHandler(object): def event_notification(self, event): print("Event:", event.EventId, event.Time, event.proper_random, event.Message.Text) def main_c(): url = "opc.tcp://127.0.0.1:4841/freeopcua/server/" c = Client(url) try: c.connect() root = c.get_root_node() embed() except Exception as e: print("Client Exception:", e) finally: c.disconnect() if __name__ == "__main__": main_c()
客户端遍历流程
- 1.获取
root
下的Objects
节点的所有子节点,一般类型都为Object
,取一代表为A - 2.获取A的
NodeClass
(一般为Object)和TypeDefinition
,其中NodeId需要与路径["0:Types","0:ObjectTypes","0:BaseObjectType"]
下的类型对照(自定义类型也在内)。 - 3.继续以A为根,遍历子节点,如果时Object继续2,如果是
Variable
,NodeId对照["0:Types","0:VariableTypes","0:BaseVariableType"]
,可以判断是变量(variable,63)还是属性(property,65)。 - 4.对于
Method
使用a_root.call_method(a, arg1)
调用。对于Variable
使用get_value/get_data_value()
获取存储的值。如果是Object
继续2. - 5.对于
Variable
使用access_level
获取是否有写权限,如果有set_value
可以设置值。
一个比较完美的遍历客户端
from opcua import Client, ua def brower_child(root): """ 递归调用遍历,格式化不好做,有深度问题 """ name = root.get_node_class().name # print(name) if name == "Object": brower_obj(root) for c in root.get_children(): print(" ", end='') brower_child(c) elif name == 'Variable': brower_var(root) else: brower_method(root) class CurState(): def __init__(self, parent=None, p=None, d=0): self.parent = parent # unused self.p = p self.d = d def brower_child2(root, max_d=-1, ignore=[]): """ 栈+循环遍历,非常好用 """ stack = [CurState(None, root, 0)] while len(stack): cur = stack.pop() name = cur.p.get_node_class().name print(''.join([' ' for i in range(cur.d)]), end="") if cur.p.get_browse_name().Name in ignore: continue if name == "Object": brower_obj(cur.p) if max_d > 0 and cur.d >= max_d: continue for c in cur.p.get_children(): stack.append(CurState(cur.p, c, cur.d+1)) elif name == 'Variable': brower_var(cur.p) else: brower_method(cur.p) def brower_obj(v): # print(v.get_browse_name()) rw = 'R ' bname = v.get_browse_name() print("*%2d:%-30s (%-2s, %-23s)" % (bname.NamespaceIndex, bname.Name, rw, "Object")) def brower_var(v): # print(v.get_browse_name()) rw = 'R ' if ua.AccessLevel.CurrentWrite in v.get_access_level(): rw = "RW" bname = v.get_browse_name() tv = v.get_data_value().Value v_show = tv.Value if len(str(v_show)) > 1024: v_show = str(v_show[:56]) + "..." print("-%2d:%-30s (%-2s, %-23s) =>" % (bname.NamespaceIndex, bname.Name, rw, tv.VariantType), v_show) def brower_method(v): # print(v.get_description()) rw = 'C ' bname = v.get_browse_name() # args = [] # for a in v.get_properties(): # dt = a.get_data_type().NodeIdType.name # args.append(dt) print("@%2d:%-30s (%-2s, %-23s)" % (bname.NamespaceIndex, bname.Name, rw, "Method")) def main_c(): url = "opc.tcp://127.0.0.1:4841/freeopcua/server/" c = Client(url) try: c.connect() root = c.get_root_node() print("rnBrower:") brower_child2(root.get_child(["0:Objects"]), -1, ["Server"]) except Exception as e: print("Client Exception:", e) finally: c.disconnect() if __name__ == "__main__": main_c()
总结
以上为个人经验,希望能给大家一个参考,也希望大家多多支持IT俱乐部。